API Documentation

Config

Second version configuration parser

FIXME: pas 2023/02/05… missing options from v2: max_queue_size, outlet, pipe

class sarracenia.config.Config(parent=None)[source]

The option parser to produce a single configuration.

it can be instantiated with one of:

  • one_config(component, config, action, isPost=False, hostdir=None) – read the options for a given component an configuration, (all in one call.)

On the other hand, a configu can be built up from the following constructors:

  • default_config() – returns an empty configuration, given a config file tree.

  • no_file_config() – returns an empty config without any config file tree.

Then just add settings manually:

cfg = no_file_config()

cfg.broker = sarracenia.config.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
cfg.topicPrefix = [ 'v02', 'post']
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.action = 'start'
cfg.bindings = [ ('xpublic', ['v02', 'post'], ['*', 'WXO-DD', 'observations', 'swob-ml', '#' ]) ]
cfg.queueName='q_anonymous.subscriber_test2'
cfg.download=True
cfg.batch=1
cfg.messageCountMax=5

# set the instance number for the flow class.
cfg.no=0

# and at the end call finalize

cfg.finalize()

class addBinding(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]

called by argparse to deal with queue bindings.

add_option(option, kind='list', default_value=None, all_values=None)[source]

options can be declared in any plugin. There are various kind of options, where the declared type modifies the parsing.

  • ‘count’ integer count type.

  • ‘octal’ base-8 (octal) integer type.

  • ‘duration’ a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)

    modified by a unit suffix ( m-minute, h-hour, w-week )

  • ‘flag’ boolean (True/False) option.

  • ‘float’ a simple floating point number.

  • ‘list’ a list of string values, each succeeding occurrence catenates to the total.

    all v2 plugin options are declared of type list.

  • ‘set’ a set of string values, each succeeding occurrence is unioned to the total.

    if all_values is provided, then constrain set to that.

  • ‘size’ integer size. Suffixes k, m, and g for kilo, mega, and giga (base 2) multipliers.

  • ‘str’ an arbitrary string value, as will all of the above types, each

    succeeding occurrence overrides the previous one.

If a value is set to None, that could mean that it has not been set.

applyComponentDefaults(component)[source]

overlay defaults options for the given component to the given configuration.

dictify()[source]

return a dict version of the cfg…

dump()[source]

print out what the configuration looks like.

finalize(component=None, config=None)[source]

Before final use, take the existing settings, and infer any missing needed defaults from what is provided. Should be called prior to using a configuration.

There are default options that apply only if they are not overridden…

hohoo_validate_urlstr(urlstr) tuple[source]

returns a tuple ( bool, expanded_url ) the bool is whether the expansion worked, and the expanded_url is one with the added necessary authentication details from sarracenia.Credentials.

mask_ppstr(mask)[source]

return a pretty print string version of the given mask, easier for humans to read.

merge(oth)[source]

merge to lists of options.

merge two lists of options if one is cumulative then merge, otherwise if not None, then take value from oth

override(oth)[source]

override a value in a set of options.

why override() method and not just assign values to the dictionary? in the configuration file, there are various ways to have variable substituion. override invokes those, so that they are properly interpreted. Otherwise, you just end up with a literal value.

parse_args(isPost=False)[source]
user information:

accept a configuration, apply argParse library to augment the given configuration with command line settings.

the post component has a different calling convention than others, so use that flag if called from post.

development notes:

Use argparse.parser to modify defaults. FIXME, many FIXME notes below. this is a currently unusable placeholder. have not figured this out yet. many issues.

FIXME #1: parseArgs often sets the value of the variable, regardless of it’s presence (normally a good thing.) ( if you have ‘store_true’ then default needed, for broker, just a string, it ignores if not present.) This has the effect of overriding settings in the file parsed before the arguments. Therefore: often supply defaults… but… sigh…

but there is another consideration stopping me from supplying defaults, wish I remembered what it was. I think it is: FIXME #2: arguments are parsed twice: once to get basic stuff (loglevel, component, action) and if the parsing fails there, the usage will print the wrong defaults…

parse_file(cfg, component=None)[source]

add settings from a given config file to self

sundew_dirPattern(pattern, urlstr, basename, destDir)[source]

does substitutions for patterns in directories.

variableExpansion(cdir, message=None) str[source]

replace substitution patterns, variable substitutions as described in https://metpx.github.io/sarracenia/Reference/sr3_options.7.html#variables

returns: the given string with the substiturions done.

examples: ${YYYYMMDD-70m} becomes 20221107 assuming that was the current date 70 minutes ago.

environment variables, and built-in settings are replaced also.

timeoffset -70m

sarracenia.config.config_path(subdir, config, mandatory=True, ctype='conf')[source]

Given a subdir/config look for file in configish places.

return Tuple: Found (True/False), path_of_file_found|config_that_was_not_found

sarracenia.config.get_log_filename(hostdir, component, configuration, no)[source]

return the name of a single logfile for a single instance.

sarracenia.config.get_metrics_filename(hostdir, component, configuration, no)[source]

return the name of a single logfile for a single instance.

sarracenia.config.get_pid_filename(hostdir, component, configuration, no)[source]

return the file name for the pid file for the specified instance.

sarracenia.config.get_user_cache_dir(hostdir)[source]

hostdir = None if statehost is false,

sarracenia.config.logger = <Logger sarracenia.config (WARNING)>

respect appdir stuff using an environment variable. for not just hard coded as a class variable appdir_stuff

Type:

FIXME

sarracenia.config.no_file_config()[source]

initialize a config that will not use Sarracenia configuration files at all. meant for use by people writing independent programs to start up instances with python API calls.

class sarracenia.config.octal_number(value)[source]
sarracenia.config.one_config(component, config, action, isPost=False, hostDir=None)[source]

single call return a fully parsed single configuration for a single component to run.

read in admin.conf and default.conf

apply component default overrides ( maps to: component/check ?) read in component/config.conf parse arguments from command line. return config instance item.

appdir_stuff can be to override file locations for testing during development.

sarracenia.config.parse_count(cstr)[source]

number argument accepts k,m,g suffix with i and b to use base 2 ) and +- return value is integer.

sarracenia.config.parse_float(cstr)[source]

like parse_count, numeric argument accepts k,m,g suffix and +-. below 1000, return a decimal number with 3 digits max.

sarracenia.config.str_options = ['accelCpCommand', 'accelWgetCommand', 'accelScpCommand', 'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange', 'exchangeSuffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'hostname', 'identity', 'inlineEncoding', 'logFormat', 'logLevel', 'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange', 'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'queueShare', 'sendTo', 'rename', 'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver', 'nodupe_basis', 'tlsRigour', 'topic']

for backward compatibility,

convert some old plugins that are hard to get working with v2wrapper, into v3 plugin.

the fdelay ones makes in depth use of sr_replay function, and that has changed in v3 too much.

accelerators and rate limiting are now built-in, no plugin required.

Sarracenia

class sarracenia.Message[source]

A notification message in Sarracenia is stored as a python dictionary, with a few extra management functions.

The internal representation is very close to the v03 format defined here: https://metpx.github.io/sarracenia/Reference/sr_post.7.html

Unfortunately, sub-classing of dict means that to copy it from a dict will mean losing the type, and hence the need for the copyDict member.

__init__()[source]
__weakref__

list of weak references to the object (if defined)

computeIdentity(path, o, offset=0, data=None) None[source]

check extended attributes for a cached identity sum calculation. if extended attributes are present, and * the file mtime is not too new, and * the cached sum us using the same method then use the cached value.

otherwise, calculate a checksum. If the data is provided, use that as the file content, otherwise read the file form the file system.

Once the checksum is determined, set the file’s extended attributes for the new value. the method of checksum calculation is from options.identity.

sets the message ‘identity’ field if appropriate.

copyDict(d)[source]

copy dictionary into message.

deriveSource(o)[source]

set msg[‘source’] field as appropriate for given message and options (o)

deriveTopics(o, topic, separator='.')[source]

derive subtopic, topicPrefix, and topic fields based on message and options.

dumps() str[source]

FIXME: used to be msg_dumps. print a message in a compact but relatively compact way. msg is a python dictionary. if there is a field longer than maximum_field_length, truncate.

static fromFileData(path, o, lstat=None)[source]

create a message based on a given file, calculating the checksum. returns a well-formed message, or none.

static fromFileInfo(path, o, lstat=None)[source]

based on the fiven information about the file (it’s name and a stat record if available) and a configuration options object (sarracenia.config.Config) return an sarracenia.Message suitable for placement on a worklist.

A message is a specialized python dictionary with a certain set of fields in it. The message returned will have the necessary fields for processing and posting.

The message is built for a file is based on the given path, options (o), and lstat (output of os.stat)

The lstat record is used to build ‘atime’, ‘mtime’ and ‘mode’ fields if timeCopy and permCopy options are set.

if no lstat record is supplied, then those fields will not be set.

static fromStream(path, o, data=None)[source]

Create a file and message for the given path. The file will be created or overwritten with the provided data. then invoke fromFileData() for the resulting file.

getContent(options=None)[source]

Retrieve the data referred to by a message. The data may be embedded in the messate, or this routine may resolve a link to an external server and download the data.

does not handle authentication. This routine is meant to be used with small files. using it to download large files may be very inefficient. Untested in that use-case.

Return value is the data.

often on server where one is publishing data, the file is available as a local file, and one can avoid the network usage by using a options.baseDir setting. this behaviour can be disabled by not providing the options or not setting baseDir.

getIDStr() str[source]

return some descriptive tag string to identify the message being processed.

new_pathWrite(options, data)[source]

expects: msg[‘new_dir’] and msg[‘new_file’] to be set. given the byte stream of data.

write the local file based on the given message, options and data. update the message to match same (recalculating checksum.)

in future: If the data field is a file, then that is taken as an open file object which can be read sequentially, and the bytes write to the path indicated by other message fields.

currently, if data is a buffer, then it’s contents is written to the file.

if data is None, then look for the ‘content’ header in the message. and use the data from that.

setReport(code, text=None)[source]

FIXME: used to be msg_set_report set message fields to indicate result of action so reports can be generated.

set is supposed to indicate final message dispositions, so in the case of putting a message on worklist.failed… no report is generated, since it will be retried later. FIXME: should we publish an interim failure report?

updatePaths(options, new_dir=None, new_file=None)[source]

set the new_* fields in the message based on changed file placement. if new_* options are ommitted updaste the rest of the fields in the message based on their current values.

If you change file placement in a flow callback, for example. One would change new_dir and new_file in the message. This routines updates other fields in the message (e.g. relPath, baseUrl, topic ) to match new_dir/new_file.

msg[‘post_baseUrl’] defaults to msg[‘baseUrl’]

validate()[source]

FIXME: used to be msg_validate return True if message format seems ok, return True, else return False, log some reasons.

class sarracenia.Sarracenia[source]

Core utilities of Sarracenia. The main class here is sarracenia.Message. a Sarracenia.Message is subclassed from a dict, so for most uses, it works like the python built-in, but also we have a few major entry points some factoryies:

Building a message from a file

m = sarracenia.Message.fromFileData( path, options, lstat )

builds a notification message from a given existing file, consulting options, a parsed in memory version of the configuration settings that are applicable

Options

see the sarracenia.config.Config class for functions to parse configuration files and create corresponding python option dictionaries. One can supply small dictionaries for example:

options['topicPrefix'] = [ 'v02', 'post' ]
options['bindings'] = [ ('xpublic', [ 'v02', 'post'] , [ '#' ] )]
options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'

Above is an example of a minimal options dictionary taken from the tutorial example called moth_api_consumer.py. often

If you don’t have a file

If you don’t have a local file, then build your notification message with:

m = sarracenia.Message.fromFileInfo( path, options, lstat )

where you can make up the lstat values to fill in some fields in the message. You can make a fake lstat structure to provide these values using sarracenia.filemetadata class which is either an alias for paramiko.SFTPAttributes ( https://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_attr.SFTPAttributes ) if paramiko is installed, or a simple emulation if not.

from sarracenia.filemetadata import FmdStat

lstat = FmdStat() lstat.st_mtime= utcinteger second count in UTC (numeric version of a Sarracenia timestamp.) lstat.st_atime= lstat.st_mode=0o644 lstat.st_size= size_in_bytes

optional fields that may be of interest: lstat.filename= “nameOfTheFile” lstat.longname= ‘lrwxrwxrwx 1 peter peter 20 Oct 11 20:28 nameOfTheFile’

that you can then provide as an lstat argument to the above fromFileInfo() call. However the notification message returned will lack an identity checksum field. once you get the file, you can add the Identity field with:

m.computeIdentity(path, o):

In terms of consuming notification messages, the fields in the dictionary provide metadata for the announced resource. The anounced data could be embedded in the notification message itself, or available by a URL.

Messages are generally gathered from a source such as the Message Queueing Protocol wrapper class: moth… sarracenia.moth.

data = m.getContent()

will return the content of the announced resource as raw data.

__weakref__

list of weak references to the object (if defined)

class sarracenia.TimeConversions[source]

Time conversion routines.

  • os.stat, and time.now() return floating point

  • The floating point representation is a count of seconds since the beginning of the epoch.

  • beginning of epoch is platform dependent, and conversion to actual date is fraught (leap seconds, etc…)

  • Entire SR_* formats are text, no floats are sent over the protocol (avoids byte order issues, null byte / encoding issues, and enhances readability.)

  • str format: YYYYMMDDHHMMSS.msec goal of this representation is that a naive conversion to floats yields comparable numbers.

  • but the number that results is not useful for anything else, so need these special routines to get a proper epochal time.

  • also OK for year 2032 or whatever (rollover of time_t on 32 bits.)

  • string representation is forced to UTC timezone to avoid having to communicate timezone.

timestr2flt() - accepts a string and returns a float.

caveat

FIXME: this encoding will break in the year 10000 (assumes four digit year) and requires leading zeroes prior to 1000. One will have to add detection of the decimal point, and change the offsets at that point.

__weakref__

list of weak references to the object (if defined)

sarracenia.durationToSeconds(str_value, default=None) float[source]

this function converts duration to seconds. str_value should be a number followed by a unit [s,m,h,d,w] ex. 1w, 4d, 12h return 0.0 for invalid string.

sarracenia.durationToString(d) str[source]

given a numbner of seconds, return a short, human readable string.

sarracenia.stat(path) SFTPAttributes[source]

os.stat call replacement which improves on it by returning and SFTPAttributes structure, in place of the OS stat one, featuring:

  • mtime and ctime with subsecond accuracy

  • fields that can be overridden (not immutable.)

sarracenia.timeflt2str(f=None)[source]

timeflt2str - accepts a float and returns a string.

flow is a floating point number such as returned by time.now() (number of seconds since beginning of epoch.)

the str is YYYYMMDDTHHMMSS.sssss

20210921T011331.0123

translates to: Sept. 21st, 2021 at 01:13 and 31.0123 seconds. always UTC timezone.

Sarracenia.FlowCB

Callbacks from running Sarracenia.Flows

class sarracenia.flowcb.FlowCB(options, class_logger=None)[source]

Bases: object

Flow Callback is the main class for implementing plugin customization to flows.

sample activation in a configuration file:

flowCallback sarracenia.flowcb.name.Name

will instantiate an object of that type whose appropriately name methods will be called at the right time.

__init__ accepts options as an argument.

options is a sarracenia.config.Config object, used to override default behaviour

a setting is declared in a configuration file like so:

set sarracenia.flowcb.filter.log.Log.level debug

(the prefix for the setting matches the type hierarchy in flowCallback) the plugin should get the setting:

options.level = 'debug'

worklist given to on_plugins…

  • worklist.incoming –> new messages to continue processing

  • worklist.ok –> successfully processed

  • worklist.rejected –> messages to not be further processed.

  • worklist.failed –> messages for which processing failed. Failed messages will be retried.

  • worklist.directories_ok –> list of directories created during processing.

Initially, all messages are placed in incoming. if a plugin entry_point decides:

  • a message is not relevant, it is moved to the rejected worklist.

  • all processing has been done, it moves it to the ok worklist

  • an operation failed and it should be retried later, append it to the failed worklist

Do not remove any message from all lists, only move messages between them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received. Messages can only removed after ack.

def __init__(self,options) -> None:

Task: initialization of the flowCallback at instantiation time.

usually contains:

self.o = options

def ack(self,messagelist) -> None:

Task: acknowledge messages from a gather source.

def gather(self, messageCountMax) -> (gather_more, messages)

Task: gather messages from a source... return a tuple:

      * gather_more ... bool whether to continue gathering
      * messages ... list of messages

      or just return a list of messages.

      In a poll, gather is always called, regardless of vip posession.

      In all other components, gather is only called when in posession
      of the vip.

return (True, list)
 OR
return list

def after_accept(self,worklist) -> None:

Task: just after messages go through accept/reject masks,
      operate on worklist.incoming to help decide which messages to process further.
      and move messages to worklist.rejected to prevent further processing.
      do not delete any messages, only move between worklists.

def after_work(self,worklist) -> None:

Task: operate on worklist.ok (files which have arrived.)

All messages on the worklist.ok list have been acknowledged, so to suppress posting
of them, or futher processing, the messages must be removed from worklist.ok.

worklist.failed processing should occur in here as it will be zeroed out after this step.
The flowcb/retry.py plugin, for example, processes failed messages.

def destfn(self,msg) -> str:

Destination File Name (DESTFNSCRIPT)  routines.

Task: look at the fields in the message, and perhaps settings and
      return a new file name for the target of the send or download.

kind of a last resort function, exists mostly for sundew compatibility.
can be used for selective renaming using accept clauses.

def download(self,msg) -> bool:

Task: looking at msg['new_dir'], msg['new_file'], msg['new_inflight_file']
      and the self.o options perform a download of a single file.
      return True on a successful transfer, False otherwise.

      if self.o.dry_run is set, simulate the output of a download without
      performing it.

This replaces built-in download functionality, providing an override.
for individual file transfers. ideally you set checksums as you download.

def metricsReport(self) -> dict:

Return a dictionary of metrics. Example: number of messages remaining in retry queues.

def on_cleanup(self) -> None::

allow plugins to perform additional work after broker resources are eliminated. local state files are still present when this runs.

def on_declare(self) -> None::

local state files are still already present when this runs. allow plugins to perform additional work besides broker resource setup.

def on_housekeeping(self) -> None:

do periodic processing.

def on_start(self) -> None:

After the connection is established with the broker and things are instantiated, but
before any message transfer occurs.

def on_stop(self) -> None:

what it says on the tin... clean up processing when stopping.

def poll(self) -> list:

Task: gather messages from a destination... return a list of messages.
      works like a gather, but...

      When specified, poll replaces the built-in poll of the poll component.
      it runs only when the machine running the poll has the vip.
      in components other than poll, poll is never called.
return []

def post(self,worklist) -> None:

Task: operate on worklist.ok, and worklist.failed. modifies them appropriately.
      message acknowledgement has already occurred before they are called.

to indicate failure to process a message, append to worklist.failed.
worklist.failed processing should occur in here as it will be zeroed out after this step.

def send(self,msg) -> bool:

Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer
      of a single file.
      return True on a successful transfer, False otherwise.

      if self.o.dry_run is set, simulate the output of a send without
      performing it.

This replaces built-in send functionality for individual files.
def please_stop(self):

Pre-warn a flowcb that a stop has been requested, allowing processing to wrap up before the full stop happens.

__init__(options, class_logger=None)[source]
__weakref__

list of weak references to the object (if defined)

please_stop()[source]

flow callbacks should not time.sleep for long periods, but only nap and check between naps if a stop has been requested.

sarracenia.flowcb.load_library(factory_path, options)[source]

Loading the entry points for a python module. It searches the normal python module path using the importlib module.

the factory_path is a combined file specification with a dot separator with a special last entry being the name of the class within the file.

factory_path a.b.c.C

means import the module named a.b.c and instantiate an object of type C. In that class-C object, look for the known callback entry points.

or C might be guessed by the last class in the path not following python convention by not starting with a capital letter, in which case, it will just guess.

re note that the ~/.config/sr3/plugins will also be in the python library path, so modules placed there will be found, in addition to those in the package itself in the sarracenia/flowcb directory

callback foo -> foo.Foo

sarracenia.flowcb.foo.Foo

callback foo.bar -> foo.bar.Bar

sarracenia.flowcb.foo.bar.Bar foo.bar sarracenia.flowcb.foo.bar

Sarracenia.Moth

class sarracenia.moth.Moth(props=None, is_subscriber=True)[source]

Bases: object

Moth … Messages Organized by Topic Headers (en français: Messages organisés par thème hierarchique. )

A multi-protocol library for use by hierarchical message passing implementations, (messages which have a ‘topic’ header that is used for routing by brokers.)

  • regardless of protocol, the message format returned should be the same.

  • the message is turned into a sarracenia.Message object, which acts like a python dictionary, corresponding to key-value pairs in the message body, and properties.

  • topic is a special key that may end up in the message body, or some sort of property or metadata.

  • the protocol should support acknowledgement under user control. Such control indicated by the presence of an entry_point called “ack”. The entry point accepts “ack_id” as a message identifier to be passed to the broker. Whatever protocol symbol is used by the protocol, it is passed through this message property. Examples: in rabbitmq/amqp ack takes a “delivery_tag” as an argument, in MQTT, it takes a “message-id” so when receiving an AMQP message, the m[‘ack_id’] is assigned the delivery_tag from the message.

  • There is a special dict item: “_DeleteOnPost”, to identify keys which are added only for local use. they will be removed from the message when publishing. examples: topic (sent outside body), message-id (used for acknowledgements.) new_basedir, ack_id, new_… (settings…)

Intent is to be specialized for topic based data distribution (MQTT style.) API to allow pass-through of protocol specific properties, but apply templates for genericity.

Target protocols (and corresponding libraries.): AMQP, MQTT, ?

Things to specify:

  • broker

  • topicPrefix

  • subTopic

  • queueName (for amqp, used as client-id for mqtt)

this library knows nothing about Sarracenia, the only code used from sarracenia is to interpret duration properties, from the root sarracenia/__init__.py, the broker argument from sarracenia.config.credentials

usage:

import sarracenia.moth
import sarracenia.config.credentials


props = sarracenia.moth.default_options
props['broker'] = sarracenia.config.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
props['expire'] = 300
props['batch'] = 1
is_subscriber=True

c= Moth( props, is_subscriber  )

messages = c.newMessages()

# if there are new messages from a publisher, return them, otherwise return
# an empty list []].

p=Moth( { 'batch':1 }, False )

p.putNewMessage()

p.close()
# tear down connection.

Initialize a broker connection. Connections are unidirectional. either for subscribing (with subFactory) or publishing (with pubFactory.)

The factories return objects subclassed to match the protocol required by the broker argument.

arguments to the factories are:

  • broker … the url of the broker to connect to.

  • props is a dictionary or properties/parameters.

  • supplied as overrides to the default properties listed above.

Some may vary among protocols:

Protocol     library implementing    URL to select
--------     --------------------    -------------

AMQPv0.9 --> amqplib from Celery --> amqp, amqps

AMQPv0.9 --> pika                --> pika, pikas

MQTTv3   --> paho                --> mqtt, mqtts

AMQPv1.0 --> qpid-proton         --> amq1, amq1s

messaging_strategy

how to manage the connection. Covers whether to treat the connection as new or assume it is set up. Also, If something goes wrong. What should be done.

  • reset: on startup… erase any state, and re-initialize.

  • stubborn: If set to True, loop forever if something bad happens. Never give up. This sort of setting is desired in operations, especially unattended. if set to False, may give up more easily.

  • failure_duration is to advise library how to structure connection service level.

    • 5m - make a connection that will recover from transient errors of a few minutes, but not tax the broker too much for prolonged outages.

    • 5d - duration outage to striving to survive connection for five days.

Changing recovery_strategy setting, might result in having to destroy and re-create consumer queues (AMQP.)

Options

both

  • ‘topicPrefix’ : [ ‘v03’ ]

  • ‘messageDebugDump’: False, –> enable printing of raw messages.

  • ‘inline’: False, - Are we inlining content within messages?

  • ‘inlineEncoding’: ‘guess’, - what encoding should we use for inlined content?

  • ‘inlineByteMax’: 4096, - Maximum size of messages to inline.

for get

  • ‘batch’ : 100 # how many messages to get at once

  • ‘broker’ : an sr_broker ?

  • ‘queueName’ : Mandatory, name of a queue. (only in AMQP… hmm…)

  • ‘bindings’ : [ list of bindings ]

  • ‘loop’

optional:

  • ‘message_ttl’

for put:

  • ‘exchange’ (only in AMQP… hmm…)

__init__(props=None, is_subscriber=True) None[source]

If is_subscriber=True, then this is a consuming instance. expect calls to get* routines.

if is_subscriber=False, then expect/permit only calls to put*

__weakref__

list of weak references to the object (if defined)

ack(message: Message) bool[source]

tell broker that a given message has been received.

ack uses the ‘ack_id’ property to send an acknowledgement back to the broker. If there’s no ‘ack_id’ in the message, you should return True.

cleanup() None[source]

get rid of server-side resources associated with a client. (queues/id’s, etc…)

property default_options: dict

get default properties to override, used by client for validation.

static findAllSubclasses(cls) set[source]

Recursively finds all subclasses of a class. __subclasses__() only gives direct subclasses.

getNewMessage() Message[source]

If there is one new message available, return it. Otherwise return None. Do not block.

side effects:

metrics. self.metrics[‘RxByteCount’] should be incremented by size of payload. self.metrics[‘RxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘RxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)

metricsDisconnect() None[source]

tear down an existing connection.

newMessages() list[source]

If there are new messages available from the broker, return them, otherwise return None.

On Success, this routine returns immediately (non-blocking) with either None, or a list of messages.

On failure, this routine blocks, and loops reconnecting to broker, until interaction with broker is successful.

please_stop() None[source]

register a request to cleanly stop. Any long running processes should check for _stop_requested and stop if it becomes True.

putNewMessage(message: Message, content_type: str = 'application/json', exchange: str | None = None) bool[source]

publish a message as set up to the given topic.

return True is succeeded, False otherwise.

side effect

self.metrics[‘TxByteCount’] should be incremented by size of payload. self.metrics[‘TxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘TxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)

setEbo(start) None[source]

Calculate next retry time using exponential backoff note that it doesn’t look like classic EBO because the time is multiplied by how long it took to fail. Long failures should not be retried quickly, but short failures can be variable in duration. If the timing of failures is variable, the “attempt_duration” will be low, and so the next_try might get smaller even though it hasn’t succeeded yet… it should eventually settle down to a long period though.

exception sarracenia.transfer.TimeoutException[source]

Bases: Exception

timeout exception

__weakref__

list of weak references to the object (if defined)

class sarracenia.transfer.Transfer(proto, options)[source]

Bases: object

This is a sort of abstract base class for implementing transfer protocols. Implemented subclasses include support for: local files, https, sftp, and ftp.

This class has routines that do i/o given descriptors opened by the sub-classes, so that each one does not need to re-implement copying, for example.

Each subclass needs to implement the following routines:

if downloading:

get    ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
getAccellerated( msg, remote_file, local_file, length )
ls     ()
cd     (dir)
delete (path)

if sending:

put    ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
putAccelerated ( msg, remote_file, local_file, length=0 )
cd     (dir)
mkdir  (dir)
umask  ()
chmod  (perm)
rename (old,new)

Note that the ls() call returns are polymorphic. One of:

  • a dictionary where the key is the name of the file in the directory, and the value is an SFTPAttributes structure for if (from paramiko.) (sftp.py as an example)

  • a dictionary where the key is the name of the file, and the value is a string that looks like the output of a linux ls command. (ftp.py as an example.)

  • a sequence of bytes… will be parsed as an html page. (https.py as an example)

The first format is the vastly preferred one. The others are fallbacks when the first is not available. The flowcb/poll/__init__.py lsdir() routing will turn ls tries to transform any of these return values into the first form (a dictionary of SFTPAttributes) Each SFTPAttributes structure needs st_mode set, and folders need stat.S_IFDIR set.

if the lsdir() routine gets a sequence of bytes, the on_html_page() and on_html_parser_init(, or perhaps handle_starttag(..) and handle_data() routines) will be used to turn them into the first form.

web services with different such formats can be accommodated by subclassing and overriding the handle_* entry points.

uses options (on Sarracenia.config data structure passed to constructor/factory.) * credentials - used to authentication information. * sendTo - server to connect to. * batch - how many files to transfer before a connection is torn down and re-established. * permDefault - what permissions to set on files transferred. * permDirDefault - what permission to set on directories created. * timeout - how long to wait for operations to complete. * byteRateMax - maximum transfer rate (throttle to avoid exceeding) * bufSize - size of buffers for file transfers.

__init__(proto, options)[source]
__weakref__

list of weak references to the object (if defined)

logProgress(sz)[source]

if there hasn’t been a log message in at least logMinumumInterval, then put out a message, so sanity does not think it is dead.

this should print out a message once in a while for long file transfers.

on_data(chunk) bytes[source]

transform data as it is being read. Given a buffer, return the transformed buffer. Checksum calculation is based on pre transformation… likely need a post transformation value as well.

sarracenia.transfer.alarm_set(time)[source]

FIXME: replace with set itimer for > 1 second resolution… currently rouding to nearest second.

class sarracenia.identity.Identity[source]

Bases: object

A class for algorithms to get a fingerprint for a file being announced. Appropriate fingerprinting algorithms vary according to file type.

required methods in subclasses:

def registered_as(self):

return a one letter string identifying the algorithm (mostly for v2.) in v3, the registration comes from the identity sub-class name in lower case.

def set_path(self,path):

start a checksum for the given path… initialize.

def update(self,chunk):

update the checksum based on the given bytes from the file (sequential access assumed.)

__weakref__

list of weak references to the object (if defined)

update_file(path)[source]

read the entire file, check sum it. this is kind of last resort as it cost an extra file read. It is better to call update( as the file is being read for other reasons.

property value

return the current value of the checksum calculation.