Code Reference

Internal Documentation on the full source of sr3 Not an API. subject to change without notice. What is missing:

  • All flowcallbacks are here: flowcb

  • the entry points are excluded (not compatible with sphinx/autodoc.)

sarracenia

class sarracenia.Message[source]

Bases: dict

A notification message in Sarracenia is stored as a python dictionary, with a few extra management functions.

The internal representation is very close to the v03 format defined here: https://metpx.github.io/sarracenia/Reference/sr_post.7.html

Unfortunately, sub-classing of dict means that to copy it from a dict will mean losing the type, and hence the need for the copyDict member.

__init__()[source]
__weakref__

list of weak references to the object (if defined)

computeIdentity(path, o, offset=0)[source]

check extended attributes for a cached identity sum calculation. if extended attributes are present, and * the file mtime is not too new, and * the cached sum us using the same method then use the cached value.

otherwise, calculate a checksum. set the file’s extended attributes for the new value. the method of checksum calculation is from options.identity.

sets the message ‘identity’ field if appropriate.

copyDict(d)[source]

copy dictionary into message.

dumps() str[source]

FIXME: used to be msg_dumps. print a message in a compact but relatively compact way. msg is a python dictionary. if there is a field longer than maximum_field_length, truncate.

static fromFileData(path, o, lstat=None)[source]

create a message based on a given file, calculating the checksum. returns a well-formed message, or none.

static fromFileInfo(path, o, lstat=None)[source]

based on the fiven information about the file (it’s name and a stat record if available) and a configuration options object (sarracenia.config.Config) return an sarracenia.Message suitable for placement on a worklist.

A message is a specialized python dictionary with a certain set of fields in it. The message returned will have the necessary fields for processing and posting.

The message is built for a file is based on the given path, options (o), and lstat (output of os.stat)

The lstat record is used to build ‘atime’, ‘mtime’ and ‘mode’ fields if timeCopy and permCopy options are set.

if no lstat record is supplied, then those fields will not be set.

static fromStream(path, o, data=None)[source]

Create a file and message for the given path. The file will be created or overwritten with the provided data. then invoke fromFileData() for the resulting file.

getContent(options=None)[source]

Retrieve the data referred to by a message. The data may be embedded in the messate, or this routine may resolve a link to an external server and download the data.

does not handle authentication. This routine is meant to be used with small files. using it to download large files may be very inefficient. Untested in that use-case.

Return value is the data.

often on server where one is publishing data, the file is available as a local file, and one can avoid the network usage by using a options.baseDir setting. this behaviour can be disabled by not providing the options or not setting baseDir.

setReport(code, text=None)[source]

FIXME: used to be msg_set_report set message fields to indicate result of action so reports can be generated.

set is supposed to indicate final message dispositions, so in the case of putting a message on worklist.failed… no report is generated, since it will be retried later. FIXME: should we publish an interim failure report?

updatePaths(options, new_dir=None, new_file=None)[source]

set the new_* fields in the message based on changed file placement. if new_* options are ommitted updaste the rest of the fields in the message based on their current values.

If you change file placement in a flow callback, for example. One would change new_dir and new_file in the message. This routines updates other fields in the message (e.g. relPath, baseUrl, topic ) to match new_dir/new_file.

msg[‘post_baseUrl’] defaults to msg[‘baseUrl’]

validate()[source]

FIXME: used to be msg_validate return True if message format seems ok, return True, else return False, log some reasons.

class sarracenia.Sarracenia[source]

Bases: object

Core utilities of Sarracenia. The main class here is sarracenia.Message. a Sarracenia.Message is subclassed from a dict, so for most uses, it works like the python built-in, but also we have a few major entry points some factoryies:

Building a message from a file

m = sarracenia.Message.fromFileData( path, options, lstat )

builds a notification message from a given existing file, consulting options, a parsed in memory version of the configuration settings that are applicable

Options

see the sarracenia.config.Config class for functions to parse configuration files and create corresponding python option dictionaries. One can supply small dictionaries for example:

options['topicPrefix'] = [ 'v02', 'post' ]
options['bindings'] = [ ('xpublic', [ 'v02', 'post'] , [ '#' ] )]
options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'

Above is an example of a minimal options dictionary taken from the tutorial example called moth_api_consumer.py. often

If you don’t have a file

If you don’t have a local file, then build your notification message with:

m = sarracenia.Message.fromFileInfo( path, options, lstat )

where you can make up the lstat values to fill in some fields in the message. You can make a fake lstat structure to provide these values using sarracenia.filemetadata class which is either an alias for paramiko.SFTPAttributes ( https://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_attr.SFTPAttributes ) if paramiko is installed, or a simple emulation if not.

from sarracenia.filemetadata import FmdStat

lstat = FmdStat() lstat.st_mtime= utcinteger second count in UTC (numeric version of a Sarracenia timestamp.) lstat.st_atime= lstat.st_mode=0o644 lstat.st_size= size_in_bytes

optional fields that may be of interest: lstat.filename= “nameOfTheFile” lstat.longname= ‘lrwxrwxrwx 1 peter peter 20 Oct 11 20:28 nameOfTheFile’

that you can then provide as an lstat argument to the above fromFileInfo() call. However the notification message returned will lack an identity checksum field. once you get the file, you can add the Identity field with:

m.computeIdentity(path, o):

In terms of consuming notification messages, the fields in the dictionary provide metadata for the announced resource. The anounced data could be embedded in the notification message itself, or available by a URL.

Messages are generally gathered from a source such as the Message Queueing Protocol wrapper class: moth… sarracenia.moth.

data = m.getContent()

will return the content of the announced resource as raw data.

__weakref__

list of weak references to the object (if defined)

class sarracenia.TimeConversions[source]

Bases: object

Time conversion routines.

  • os.stat, and time.now() return floating point

  • The floating point representation is a count of seconds since the beginning of the epoch.

  • beginning of epoch is platform dependent, and conversion to actual date is fraught (leap seconds, etc…)

  • Entire SR_* formats are text, no floats are sent over the protocol (avoids byte order issues, null byte / encoding issues, and enhances readability.)

  • str format: YYYYMMDDHHMMSS.msec goal of this representation is that a naive conversion to floats yields comparable numbers.

  • but the number that results is not useful for anything else, so need these special routines to get a proper epochal time.

  • also OK for year 2032 or whatever (rollover of time_t on 32 bits.)

  • string representation is forced to UTC timezone to avoid having to communicate timezone.

timestr2flt() - accepts a string and returns a float.

caveat

FIXME: this encoding will break in the year 10000 (assumes four digit year) and requires leading zeroes prior to 1000. One will have to add detection of the decimal point, and change the offsets at that point.

__weakref__

list of weak references to the object (if defined)

sarracenia.durationToSeconds(str_value, default=None) float[source]

this function converts duration to seconds. str_value should be a number followed by a unit [s,m,h,d,w] ex. 1w, 4d, 12h return 0.0 for invalid string.

sarracenia.durationToString(d) str[source]

given a numbner of seconds, return a short, human readable string.

sarracenia.stat(path) SFTPAttributes[source]

os.stat call replacement which improves on it by returning and SFTPAttributes structure, in place of the OS stat one, featuring:

  • mtime and ctime with subsecond accuracy

  • fields that can be overridden (not immutable.)

sarracenia.timeflt2str(f=None)[source]

timeflt2str - accepts a float and returns a string.

flow is a floating point number such as returned by time.now() (number of seconds since beginning of epoch.)

the str is YYYYMMDDTHHMMSS.sssss

20210921T011331.0123

translates to: Sept. 21st, 2021 at 01:13 and 31.0123 seconds. always UTC timezone.

sarracenia.config

Second version configuration parser

FIXME: pas 2023/02/05… missing options from v2: max_queue_size, outlet, pipe

class sarracenia.config.Config(parent=None)[source]

Bases: object

The option parser to produce a single configuration.

it can be instantiated with one of:

  • one_config(component, config, action, isPost=False) – read the options for a given component an configuration, (all in one call.)

On the other hand, a configu can be built up from the following constructors:

  • default_config() – returns an empty configuration, given a config file tree.

  • no_file_config() – returns an empty config without any config file tree.

Then just add settings manually:

cfg = no_file_config()

cfg.broker = sarracenia.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
cfg.topicPrefix = [ 'v02', 'post']
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.action = 'start'
cfg.bindings = [ ('xpublic', ['v02', 'post'], ['*', 'WXO-DD', 'observations', 'swob-ml', '#' ]) ]
cfg.queueName='q_anonymous.subscriber_test2'
cfg.download=True
cfg.batch=1
cfg.messageCountMax=5

# set the instance number for the flow class.
cfg.no=0

# and at the end call finalize

cfg.finalize()

__deepcopy__(memo) Configuration[source]

code for this from here: https://stackoverflow.com/questions/1500718/how-to-override-the-copy-deepcopy-operations-for-a-python-object Needed for python < 3.7ish? (ubuntu 18) found this bug: https://bugs.python.org/issue10076 deepcopy fails for objects with re’s in them? ok on ubuntu 20.04

__init__(parent=None) Config[source]

instantiate an empty Configuration

__weakref__

list of weak references to the object (if defined)

_build_mask(option, arguments)[source]

return new entry to be appended to list of masks

_parse_binding(subtopic_string)[source]
FIXME: see original parse, with substitions for url encoding.

also should sqwawk about error if no exchange or topicPrefix defined. also None to reset to empty, not done.

_parse_set_string(v: str, old_value: set) set[source]

given a set string, return a python set.

_parse_setting(opt, value)[source]

v3 plugin accept options for specific modules.

parsed from: set sarracenia.flowcb.log.filter.Log.level debug

example: opt= sarracenia.flowcb.log.filter.Log.level value = debug

results in: self.settings[ sarracenia.flowcb.log.filter.Log ][level] = debug

options should be fed to plugin class on instantiation. stripped of class… * options = { ‘level’ : ‘debug’ }

_parse_v2plugin(entryPoint, value)[source]

config file parsing for a v2 plugin.

_resolve_exchange()[source]

based on the given configuration, fill in with defaults or guesses. sets self.exchange.

_sundew_basename_parts(pattern, basename)[source]

modified from metpx SenderFTP

_validate_urlstr(urlstr) tuple[source]

returns a tuple ( bool, expanded_url ) the bool is whether the expansion worked, and the expanded_url is one with the added necessary authentication details from sarracenia.Credentials.

_varsub(word)[source]

substitute variable values from options

class addBinding(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]

Bases: Action

called by argparse to deal with queue bindings.

__call__(parser, namespace, values, option_string)[source]

Call self as a function.

add_option(option, kind='list', default_value=None, all_values=None)[source]

options can be declared in any plugin. There are various kind of options, where the declared type modifies the parsing.

  • ‘count’ integer count type.

  • ‘octal’ base-8 (octal) integer type.

  • ‘duration’ a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)

    modified by a unit suffix ( m-minute, h-hour, w-week )

  • ‘flag’ boolean (True/False) option.

  • ‘float’ a simple floating point number.

  • ‘list’ a list of string values, each succeeding occurrence catenates to the total.

    all v2 plugin options are declared of type list.

  • ‘set’ a set of string values, each succeeding occurrence is unioned to the total.

    if all_values is provided, then constrain set to that.

  • ‘size’ integer size. Suffixes k, m, and g for kilo, mega, and giga (base 2) multipliers.

  • ‘str’ an arbitrary string value, as will all of the above types, each

    succeeding occurrence overrides the previous one.

If a value is set to None, that could mean that it has not been set.

applyComponentDefaults(component)[source]

overlay defaults options for the given component to the given configuration.

dictify()[source]

return a dict version of the cfg…

dump()[source]

print out what the configuration looks like.

finalize(component=None, config=None)[source]

Before final use, take the existing settings, and infer any missing needed defaults from what is provided. Should be called prior to using a configuration.

There are default options that apply only if they are not overridden…

mask_ppstr(mask)[source]

return a pretty print string version of the given mask, easier for humans to read.

merge(oth)[source]

merge to lists of options.

merge two lists of options if one is cumulative then merge, otherwise if not None, then take value from oth

override(oth)[source]

override a value in a set of options.

why override() method and not just assign values to the dictionary? in the configuration file, there are various ways to have variable substituion. override invokes those, so that they are properly interpreted. Otherwise, you just end up with a literal value.

parse_args(isPost=False)[source]
user information:

accept a configguration, apply argParse library to augment the given configuration with command line settings.

the post component has a different calling convention than others, so use that flag if called from post.

development notes:

Use argparse.parser to modify defaults. FIXME, many FIXME notes below. this is a currently unusable placeholder. have not figured this out yet. many issues.

FIXME #1: parseArgs often sets the value of the variable, regardless of it’s presence (normally a good thing.) ( if you have ‘store_true’ then default needed, for broker, just a string, it ignores if not present.) This has the effect of overriding settings in the file parsed before the arguments. Therefore: often supply defaults… but… sigh…

but there is another consideration stopping me from supplying defaults, wish I remembered what it was. I think it is: FIXME #2: arguments are parsed twice: once to get basic stuff (loglevel, component, action) and if the parsing fails there, the usage will print the wrong defaults…

parse_file(cfg, component=None)[source]

add settings from a given config file to self

sundew_dirPattern(pattern, urlstr, basename, destDir)[source]

does substitutions for patterns in directories.

variableExpansion(cdir, message=None) str[source]

replace substitution patterns, variable substitutions as described in https://metpx.github.io/sarracenia/Reference/sr3_options.7.html#variables

returns: the given string with the substiturions done.

examples: ${YYYYMMDD-70m} becomes 20221107 assuming that was the current date 70 minutes ago.

environment variables, and built-in settings are replaced also.

timeoffset -70m

sarracenia.config.config_path(subdir, config, mandatory=True, ctype='conf')[source]

Given a subdir/config look for file in configish places.

return Tuple: Found (True/False), path_of_file_found|config_that_was_not_found

sarracenia.config.get_log_filename(hostdir, component, configuration, no)[source]

return the name of a single logfile for a single instance.

sarracenia.config.get_metrics_filename(hostdir, component, configuration, no)[source]

return the name of a single logfile for a single instance.

sarracenia.config.get_pid_filename(hostdir, component, configuration, no)[source]

return the file name for the pid file for the specified instance.

sarracenia.config.get_user_cache_dir(hostdir)[source]

hostdir = None if statehost is false,

sarracenia.config.logger = <Logger sarracenia.config (WARNING)>

respect appdir stuff using an environment variable. for not just hard coded as a class variable appdir_stuff

Type:

FIXME

sarracenia.config.no_file_config()[source]

initialize a config that will not use Sarracenia configuration files at all. meant for use by people writing independent programs to start up instances with python API calls.

class sarracenia.config.octal_number(value)[source]

Bases: int

static __new__(cls, value)[source]
__repr__() str[source]

Return repr(self).

__str__() str[source]

Return str(self).

sarracenia.config.one_config(component, config, action, isPost=False)[source]

single call return a fully parsed single configuration for a single component to run.

read in admin.conf and default.conf

apply component default overrides ( maps to: component/check ?) read in component/config.conf parse arguments from command line. return config instance item.

appdir_stuff can be to override file locations for testing during development.

sarracenia.config.str_options = ['action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange', 'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'hostname', 'identity', 'inlineEncoding', 'logLevel', 'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange', 'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'sendTo', 'rename', 'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver', 'nodupe_basis', 'tlsRigour', 'topic']

for backward compatibility,

convert some old plugins that are hard to get working with v2wrapper, into v3 plugin.

the fdelay ones makes in depth use of sr_replay function, and that has changed in v3 too much.

accelerators and rate limiting are now built-in, no plugin required.

sarracenia.credentials

class sarracenia.credentials.Credential(urlstr=None)[source]

Bases: object

An object that holds information about a credential, read from a credential file, which has one credential per line, format:

url option1=value1, option2=value2
Examples::

sftp://alice@herhost/ ssh_keyfile=/home/myself/mykeys/.ssh.id_dsa ftp://georges:Gpass@hishost/ passive = True, binary = True

Format Documentation.

url

object with URL, password, etc.

Type:

urllib.parse.ParseResult

ssh_keyfile

path to SSH key file for SFTP

Type:

str

passive

use passive FTP mode, defaults to True

Type:

bool

binary

use binary FTP mode, defaults to True

Type:

bool

tls

use FTPS with TLS, defaults to False

Type:

bool

prot_p

use a secure data connection for TLS

Type:

bool

bearer_token

bearer token for HTTP authentication

Type:

str

login_method

force a specific login method for AMQP (PLAIN, AMQPLAIN, EXTERNAL or GSSAPI)

Type:

str

Usage:

# build a credential from a url string:

from sarracenia.credentials import Credential

broker = Credential(‘amqps://anonymous:anonymous@hpfx.collab.science.gc.ca’)

__init__(urlstr=None)[source]

Create a Credential object.

Parameters:

urlstr (str) – a URL in string form to be parsed.

__str__()[source]

Returns attributes of the Credential object as a readable string.

__weakref__

list of weak references to the object (if defined)

class sarracenia.credentials.CredentialDB(Unused_logger=None)[source]

Bases: object

Parses, stores and manages Credential objects.

credentials

contains all sarracenia.credentials.Credential objects managed by the CredentialDB.

Type:

dict

Usage:

# build a credential via lookup in the normal files: import CredentialDB from sarracenia.credentials

credentials = CredentialDB.read( “/the/path/to/the/credentials.conf” )

# if there are corresponding passwords or modulation of login information look it up.

broker = credentials.get( “amqps://hpfx.collab.science.gc.ca” ) remote = credentials.get( “sftp://hoho@theserver” )

__init__(Unused_logger=None)[source]

Create a CredentialDB.

Parameters:

Unused_logger – logger argument no longer used… left there for API compat with old calls.

__weakref__

list of weak references to the object (if defined)

_parse(line)[source]

Parse a line of a credentials file, add it to the CredentialDB.

Parameters:

line (str) – line to be parsed.

_resolve(urlstr, url=None)[source]

Resolve credentials for AMQP vhost from ones passed as a string, and optionally a urllib.parse.ParseResult object, into a sarracenia.credentials.Credential object.

Parameters:
  • urlstr (str) – credentials in a URL string.

  • url (urllib.parse.ParseResult) – ParseResult object with creds.

Returns:

containing

result (bool): False if the creds were not in the CredentialDB. True if they were. details (sarracenia.credentials.Credential): the updated Credential object, or None.

Return type:

tuple

add(urlstr, details=None)[source]

Add a new credential to the DB.

Parameters:
  • urlstr (str) – string-formatted URL to be parsed and added to DB.

  • details (sarracenia.credentials.Credential) – a Credential object can be passed in, otherwise one is created by parsing urlstr.

get(urlstr)[source]

Retrieve a Credential from the DB by urlstr. If the Credential is valid, but not already cached, it will be added to the CredentialDB.

Parameters:

urlstr (str) – credentials as URL string to be parsed.

Returns:

containing
cache_result (bool): True if the credential was retrieved from the CredentialDB cache, False

if it was not in the cache. Note that False does not imply the Credential or urlstr is invalid.

credential (sarracenia.credentials.Credential): the Credential

object matching the urlstr, None if urlstr is invalid.

Return type:

tuple

has(urlstr)[source]

Return True if the Credential matching the urlstr is already in the CredentialDB.

Parameters:

urlstr (str) – credentials in a URL string.

isTrue(S)[source]

Returns True if s is true, yes, on or 1.

Parameters:

S (str) – string to check if true.

isValid(url, details=None)[source]

Validates a URL and Credential object. Checks for empty passwords, schemes, etc.

Parameters:
  • url (urllib.parse.ParseResult) – ParseResult object for a URL.

  • details (sarracenia.credentials.Credential) – sarra Credential object containing additional details about the URL.

Returns:

True if a URL is valid, False if not.

Return type:

bool

read(path)[source]

Read in a file containing credentials (e.g. credentials.conf). All credentials are parsed and added to the CredentialDB.

Parameters:

path (str) – path of file to be read.

sarracenia.diskqueue

class sarracenia.diskqueue.DiskQueue(options, name)[source]

Bases: object

Process Persistent Queue…

Persist messages to a file so that processing can be attempted again later. For safety reasons, want to be writing to a file ASAP. For performance reasons, all those writes need to be Appends.

so continuous, but append-only io… with an occasional housekeeping cycle. to resolve them

not clear if we need multi-task safety… just one task writes to each queue.

retry_ttl how long

self.retry_cache

  • a dictionary indexed by some sort of key to prevent duplicate messages being stored in it.

retry_path = ~/.cache/sr3/<component>/<config>/diskqueue_<name>

with various suffixes:

.new – messages added to the retry list are appended to this file.

whenever a message is added to the retry_cache, it is appended to a cumulative list of entries to add to the retry list.

every housekeeping interval, the two files are consolidated.

note that the ack_id of messages retreived from the retry list, is removed. Files must be acked around the time they are placed on the retry_list, as reception from the source should have already been acknowledged.

FIXME: would be fun to look at performance of this thing and compare it to

python persistent queue. the differences:

This class does no locking (presumed single threading.) could add locks… and they would be coarser grained than stuff in persistentqueue this should be faster than persistent queue, but who knows what magic they did. This class doesn’t implement in-memory queue… it is entirely on disk… saves memory, optimal for large queues. probably good, since retries should be slow…

not sure what will run better.

__init__(options, name)[source]
__len__() int[source]

Returns the total number of messages in the DiskQueue.

Number of messages in the DiskQueue does not necessarily equal the number of messages available to get. Messages in the .new file are counted, but can’t be retrieved until on_housekeeping() has been run.

Returns:

number of messages in the DiskQueue.

Return type:

int

__weakref__

list of weak references to the object (if defined)

_count_msgs(file_path) int[source]

Count the number of messages (lines) in the queue file. This should be used only when opening an existing file, because get() does not remove messages from the file.

Parameters:

file_path (str) – path to the file to be counted.

Returns:

count of messages in file, -1 if the file could not be read.

Return type:

int

cleanup()[source]

remove statefiles.

close()[source]

clean shutdown.

get(maximum_messages_to_get=1)[source]

qty number of messages to retrieve from the queue.

in_cache(message) bool[source]

return whether the entry is message is in the cache or not. side effect: adds it.

is_expired(message) bool[source]

return is the given message expired ?

msg_get_from_file(fp, path)[source]

read a message from the state file.

needs_requeuing(message) bool[source]

return * True if message is not expired, and not already in queue. * False otherwise.

on_housekeeping()[source]
read rest of queue_file (from current point of unretried ones.)
  • check if message is duplicate or expired.

  • write to .hk

read .new file,
  • check if message is duplicate or expired.

  • writing to .hk (housekeeping)

remove .new rename housekeeping to queue for next period.

put(message_list)[source]

add messages to the end of the queue.

sarracenia.filemetadata

class sarracenia.filemetadata.FileMetadata(path)[source]

Bases: object

This class implements storing metadata with a file.

on unlix/linux/mac systems, we use extended attributes, where we apply a user.sr_ prefix to the attribute names to avoid clashes.

on Windows NT, create an “sr_.json” Alternate Data Stream to store them.

API:

All values are utf-8, hence readable by some subset of humans. not bytes. no binary, go away…

x = sr_attr( path ) <- read metadata from file. x.list() <- list all extant extended attributes.

  • sample return value: [ ‘sum’, ‘mtime’ ]

x.get(‘sum’) <- look at one value.

  • returns None if missing.

x.set(‘sum’, ‘hoho’) <- set one value.

  • fails silently (fall-back gracefully.)

x.persist() <- write metadata back to file, if necessary.

__init__(path)[source]
__weakref__

list of weak references to the object (if defined)

get(name) str[source]

return the value of the named extended attribute.

list()[source]

return the list of defined extended attributes. (keys to the dict.)

persist()[source]

write the in-memory extended attributes to disk.

set(name, value)[source]

set the name & value pair to the extended attributes for the file.

sarracenia.flow

class sarracenia.flow.Flow(cfg=None)[source]

Bases: object

Implement the General Algorithm from the Concepts Guide.

All of the component types (e.g. poll, subscribe, sarra, winnow, shovel ) are implemented as sub-classes of Flow. The constructor/factory accept a configuration (sarracenia.config.Config class) with all the settings in it.

This class takes care of starting up, running with callbacks, and clean shutdown.

need to know whether to sleep between passes o.sleep - an interval (floating point number of seconds) o.housekeeping -

A flow processes worklists of messages

worklist given to callbacks…

  • worklist.incoming –> new messages to continue processing

  • worklist.ok –> successfully processed

  • worklist.rejected –> messages to not be further processed.

  • worklist.failed –> messages for which processing failed.

  • worklist.dirrectories_ok –> directories created.

Initially all messages are placed in incoming. if a callback decides:

  • a message is not relevant, it is moved to rejected.

  • all processing has been done, it moves it to ok.

  • an operation failed and it should be retried later, move to retry

callbacks must not remove messages from all worklists, re-classify them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received.

interesting data structure: self.plugins – dict of modular functionality metadata.

  • self.plugins[ “load” ] contains a list of (v3) flow_callbacks to load.

  • self.plugins[ entry_point ] - one for each invocation times of callbacks. examples: “on_start”, “after_accept”, etc… contains routines to run at each entry_point

__init__(cfg=None)[source]

The cfg is should be an sarra/config object.

__weakref__

list of weak references to the object (if defined)

_runCallbackMetrics()[source]

Collect metrics from plugins with a metricsReport entry point.

Expects the plugin to return a dictionary containing metrics, which is saved to self.metrics[plugin_name].

_runHousekeeping(now) float[source]

Run housekeeping callbacks Return the time when housekeeping should be run next

do_download() None[source]
do download work for self.worklist.incoming, placing files:

successfully downloaded in worklist.ok temporary failures in worklist.failed permanent failures (or files not to be downloaded) in worklist.rejected

do_send()[source]
download(msg, options) bool[source]

download/transfer one file based on message, return True if successful, otherwise False.

file_should_be_downloaded(msg) bool[source]

determine whether a comparison of local_file and message metadata indicates that it is new enough that writing the file locally is warranted.

return True to say downloading is warranted.

False if the file in the message represents the same or an older version that what is corrently on disk.

origin: refactor & translation of v2: content_should_not_be downloaded

Assumptions:

new_path exists… there is a file to compare against.

has_vip() list[source]

return list of vips which are active on the current machine, or an empty list.

link1file(msg, symbolic=True) bool[source]

perform a link of a single file, based on a message, returning boolean success if it’s Symbolic, then do that. else do a hard link.

imported from v2/subscribe/doit_download “link event, try to link the local product given by message”

mkdir(msg) bool[source]

perform an mkdir.

reject(m, code, reason) None[source]

reject a message.

removeOneFile(path) bool[source]

process an unlink event, returning boolean success.

renameOneItem(old, path) bool[source]

for messages with an rename file operation, it is to rename a file.

run()[source]

This is the core routine of the algorithm, with most important data driven loop in it. This implements the General Algorithm (as described in the Concepts Explanation Guide) check if stop_requested once in a while, but never return otherwise.

set_local_file_attributes(local_file, msg)[source]

after a file has been written, restore permissions and ownership if necessary.

sundew_getDestInfos(msg, currentFileOption, filename)[source]

modified from sundew client

WHATFN – First part (‘:’) of filename HEADFN – Use first 2 fields of filename NONE – Use the entire filename TIME or TIME: – TIME stamp appended DESTFN=fname – Change the filename to fname

ex: mask[2] = ‘NONE:TIME’

updateFieldsAccepted(msg, urlstr, pattern, maskDir, maskFileOption, mirror, path_strip_count, pstrip, flatten) None[source]

Set new message fields according to values when the message is accepted.

  • urlstr: the urlstr being matched (baseUrl+relPath+sundew_extension)

  • pattern: the regex that was matched.

  • maskDir: the current directory to base the relPath from.

  • maskFileOption: filename option value (sundew compatibility options.)

  • strip: number of path entries to strip from the left side of the path.

  • pstrip: pattern strip regexp to apply instead of a count.

  • flatten: a character to replace path separators with toe change a multi-directory deep file name into a single long file name

write_inline_file(msg) bool[source]

write local file based on a message with inlined content.

sarracenia.flow.poll

class sarracenia.flow.poll.Poll(options)[source]

Bases: Flow

repeatedly query a remote (non-sarracenia) server to list the files there. post messages (to post_broker) for every new file discovered there.

the sarracenia.flowcb.poll class is used to implement the remote querying, and is highly customizable to that effect.

if the vip option is set, * subscribe to the same settings that are being posted to. * consume all the messages posted, keeping new file duplicate cache updated.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.post

class sarracenia.flow.post.Post(options)[source]

Bases: Flow

post messages about local files.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.report

class sarracenia.flow.report.Report(options)[source]

Bases: Flow

forward report messages.

Not really implemented at the moment. It is just a shovel synonym for now. more logic should be added.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.sarra

class sarracenia.flow.sarra.Sarra(options)[source]

Bases: Flow

  • download files from a remote server to the local one

  • modify the messages so they refer to the downloaded files.

  • re-post them to another exchange for the next other subscribers.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.sender

class sarracenia.flow.sender.Sender(options)[source]

Bases: Flow

  • subscribe to a stream of messages about local files.

  • send the files to a remote server.

  • modify the messages to refer to the remote file copies.

  • post the messages for subscribers of the remote server.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.shovel

class sarracenia.flow.shovel.Shovel(options)[source]

Bases: Flow

  • subscribe to some messages.

  • post them somewhere else.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.subscribe

class sarracenia.flow.subscribe.Subscribe(options)[source]

Bases: Flow

  • subscribe to messages about files.

  • download the corresponding files.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.watch

class sarracenia.flow.watch.Watch(options)[source]

Bases: Flow

  • create messages for files that appear in a directory.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.flow.winnow

class sarracenia.flow.winnow.Winnow(options)[source]

Bases: Flow

  • subscribe to a stream of messages.

  • suppress duplicates,

  • post the thinned out stream somewhere else.

__init__(options)[source]

The cfg is should be an sarra/config object.

sarracenia.instance

class sarracenia.instance.RedirectedTimedRotatingFileHandler(filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None, errors=None)[source]

Bases: TimedRotatingFileHandler

doRollover()[source]

do a rollover; in this case, a date/time stamp is appended to the filename when the rollover happens. However, you want the file to be named for the start of the interval, not the current time. If there is a backup count, then we have to get a list of matching filenames, sort them and remove the one with the oldest suffix.

class sarracenia.instance.instance[source]

Bases: object

Process management for a single flow instance. start and stop instances.

this is the main entry point launched from the sr3 cli, with arguments for it to turn into a specific configuration.

__init__()[source]
__weakref__

list of weak references to the object (if defined)

start()[source]

Main element to run a single flow instance. It parses the command line arguments twice. the first pass, is to initialize the log file and debug level, and select the configuration file to parse. Once the log file is set, and output & error re-direction is in place, the second pass begins:

The configuration files are parsed, and then the options are parsed a second time to act as overrides to the configuration file content.

As all process management is handled by sr.py, the action here is not parsed, but always either start (daemon) or foreground (interactive)

sarracenia.identity

class sarracenia.identity.Identity[source]

Bases: object

A class for algorithms to get a fingerprint for a file being announced. Appropriate fingerprinting algorithms vary according to file type.

required methods in subclasses:

def registered_as(self):

return a one letter string identifying the algorithm (mostly for v2.) in v3, the registration comes from the identity sub-class name in lower case.

def set_path(self,path):

start a checksum for the given path… initialize.

def update(self,chunk):

update the checksum based on the given bytes from the file (sequential access assumed.)

__weakref__

list of weak references to the object (if defined)

update_file(path)[source]

read the entire file, check sum it. this is kind of last resort as it cost an extra file read. It is better to call update( as the file is being read for other reasons.

property value

return the current value of the checksum calculation.

sarracenia.identity.arbitrary

class sarracenia.identity.arbitrary.Arbitrary[source]

Bases: Identity

For applications where there is no known way of determining equivalence, allow them to supply an arbitrary tag, that can be used to compare products for duplicate suppression purposes.

use setter to set the value… some sort of external checksum algorithm that cannot be reproduced.

__init__()[source]
property value

return the current value of the checksum calculation.

sarracenia.identity.sha512

class sarracenia.identity.sha512.Sha512[source]

Bases: Identity

The SHA512 algorithm to checksum the entire file, which is called ‘s’.

sarracenia.identity.md5

class sarracenia.identity.md5.Md5[source]

Bases: Identity

use the (obsolete) Message Digest 5 (MD5) algorithm, applied on the content of a file, to generate an identity signature.

static registered_as()[source]

v2name.

sarracenia.identity.random

class sarracenia.identity.random.Random[source]

Bases: Identity

Trivial minimalist checksumming algorithm, returns random number for any file.

property value

return the current value of the checksum calculation.

sarracenia.moth

class sarracenia.moth.Moth(props=None, is_subscriber=True)[source]

Bases: object

Moth … Messages Organized by Topic Headers (en français: Messages organisés par thème hierarchique. )

A multi-protocol library for use by hierarchical message passing implementations, (messages which have a ‘topic’ header that is used for routing by brokers.)

  • regardless of protocol, the message format returned should be the same.

  • the message is turned into a sarracenia.Message object, which acts like a python dictionary, corresponding to key-value pairs in the message body, and properties.

  • topic is a special key that may end up in the message body, or some sort of property or metadata.

  • the protocol should support acknowledgement under user control. Such control indicated by the presence of an entry_point called “ack”. The entry point accepts “ack_id” as a message identifier to be passed to the broker. Whatever protocol symbol is used by the protocol, it is passed through this message property. Examples: in rabbitmq/amqp ack takes a “delivery_tag” as an argument, in MQTT, it takes a “message-id” so when receiving an AMQP message, the m[‘ack_id’] is assigned the delivery_tag from the message.

  • There is a special dict item: “_DeleteOnPost”, to identify keys which are added only for local use. they will be removed from the message when publishing. examples: topic (sent outside body), message-id (used for acknowledgements.) new_basedir, ack_id, new_… (settings…)

Intent is to be specialized for topic based data distribution (MQTT style.) API to allow pass-through of protocol specific properties, but apply templates for genericity.

Target protocols (and corresponding libraries.): AMQP, MQTT, ?

Things to specify:

  • broker

  • topicPrefix

  • subTopic

  • queueName (for amqp, used as client-id for mqtt)

this library knows nothing about Sarracenia, the only code used from sarracenia is to interpret duration properties, from the root sarracenia/__init__.py, the broker argument from sarracenia.credentials

usage:

import sarracenia.moth
import sarracenia.credentials


props = sarracenia.moth.default_options
props['broker'] = sarracenia.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
props['expire'] = 300
props['batch'] = 1
is_subscriber=True

c= Moth( props, is_subscriber  )

messages = c.newMessages()

# if there are new messages from a publisher, return them, otherwise return
# an empty list []].

p=Moth( { 'batch':1 }, False )

p.putNewMessage()

p.close()
# tear down connection.

Initialize a broker connection. Connections are unidirectional. either for subscribing (with subFactory) or publishing (with pubFactory.)

The factories return objects subclassed to match the protocol required by the broker argument.

arguments to the factories are:

  • broker … the url of the broker to connect to.

  • props is a dictionary or properties/parameters.

  • supplied as overrides to the default properties listed above.

Some may vary among protocols:

Protocol     library implementing    URL to select
--------     --------------------    -------------

AMQPv0.9 --> amqplib from Celery --> amqp, amqps

AMQPv0.9 --> pika                --> pika, pikas

MQTTv3   --> paho                --> mqtt, mqtts

AMQPv1.0 --> qpid-proton         --> amq1, amq1s

messaging_strategy

how to manage the connection. Covers whether to treat the connection as new or assume it is set up. Also, If something goes wrong. What should be done.

  • reset: on startup… erase any state, and re-initialize.

  • stubborn: If set to True, loop forever if something bad happens. Never give up. This sort of setting is desired in operations, especially unattended. if set to False, may give up more easily.

  • failure_duration is to advise library how to structure connection service level.

    • 5m - make a connection that will recover from transient errors of a few minutes, but not tax the broker too much for prolonged outages.

    • 5d - duration outage to striving to survive connection for five days.

Changing recovery_strategy setting, might result in having to destroy and re-create consumer queues (AMQP.)

Options

both

  • ‘topicPrefix’ : [ ‘v03’ ]

  • ‘messageDebugDump’: False, –> enable printing of raw messages.

  • ‘inline’: False, - Are we inlining content within messages?

  • ‘inlineEncoding’: ‘guess’, - what encoding should we use for inlined content?

  • ‘inlineByteMax’: 4096, - Maximum size of messages to inline.

for get

  • ‘batch’ : 100 # how many messages to get at once

  • ‘broker’ : an sr_broker ?

  • ‘queueName’ : Mandatory, name of a queue. (only in AMQP… hmm…)

  • ‘bindings’ : [ list of bindings ]

  • ‘loop’

optional:

  • ‘message_ttl’

for put:

  • ‘exchange’ (only in AMQP… hmm…)

__init__(props=None, is_subscriber=True) None[source]

If is_subscriber=True, then this is a consuming instance. expect calls to get* routines.

if is_subscriber=False, then expect/permit only calls to put*

__weakref__

list of weak references to the object (if defined)

ack(message: Message) None[source]

tell broker that a given message has been received.

ack uses the ‘ack_id’ property to send an acknowledgement back to the broker.

cleanup() None[source]

get rid of server-side resources associated with a client. (queues/id’s, etc…)

property default_options: dict

get default properties to override, used by client for validation.

static findAllSubclasses(cls) set[source]

Recursively finds all subclasses of a class. __subclasses__() only gives direct subclasses.

getNewMessage() Message[source]

If there is one new message available, return it. Otherwise return None. Do not block.

side effects:

metrics. self.metrics[‘RxByteCount’] should be incremented by size of payload. self.metrics[‘RxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘RxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)

metricsDisconnect() None[source]

tear down an existing connection.

newMessages() list[source]

If there are new messages available from the broker, return them, otherwise return None.

On Success, this routine returns immediately (non-blocking) with either None, or a list of messages.

On failure, this routine blocks, and loops reconnecting to broker, until interaction with broker is successful.

putNewMessage(message: Message, content_type: str = 'application/json', exchange: str | None = None) bool[source]

publish a message as set up to the given topic.

return True is succeeded, False otherwise.

side effect

self.metrics[‘TxByteCount’] should be incremented by size of payload. self.metrics[‘TxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘TxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)

sarracenia.moth.amqp

class sarracenia.moth.amqp.AMQP(props, is_subscriber)[source]

Bases: Moth

implementation of the Moth API for the amqp library, which is built to talk to rabbitmq brokers in 0.8 and 0.9 AMQP dialects.

to allow acknowledgements we map: AMQP’ ‘delivery_tag’ to the ‘ack_id’

additional AMQP specific options:

exchangeDeclare - declare exchanges before use. queueBind - bind queue to exchange before use. queueDeclare - declare queue before use.

__connect(broker) bool

connect to broker. returns True if connected, false otherwise. * side effect: self.channel set to a new channel.

Expect caller to handle errors.

__init__(props, is_subscriber) None[source]

connect to broker, depending on message_strategy stubborness, remain connected.

ack(m: Message) None[source]

do what you need to acknowledge that processing of a message is done.

getNewMessage() Message[source]

If there is one new message available, return it. Otherwise return None. Do not block.

side effects:

metrics. self.metrics[‘RxByteCount’] should be incremented by size of payload. self.metrics[‘RxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘RxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)

getSetup() None[source]

Setup so we can get messages.

if message_strategy is stubborn, will loop here forever.

connect, declare queue, apply bindings.

newMessages() list[source]

If there are new messages available from the broker, return them, otherwise return None.

On Success, this routine returns immediately (non-blocking) with either None, or a list of messages.

On failure, this routine blocks, and loops reconnecting to broker, until interaction with broker is successful.

putNewMessage(message: Message, content_type: str = 'application/json', exchange: str | None = None) bool[source]

put a new message out, to the configured exchange by default.

sarracenia.moth.amqp.logger = <Logger sarracenia.moth.amqp (WARNING)>

amqp_ss_maxlen

the maximum length of a “short string”, as per AMQP protocol, in bytes.

sarracenia.moth.pika

class sarracenia.moth.pika.PIKA(broker)[source]

Bases: Moth

moth subclass based on the pika AMQP/rabbitmq client library.

stub: not implemented.

__init__(broker)[source]

If is_subscriber=True, then this is a consuming instance. expect calls to get* routines.

if is_subscriber=False, then expect/permit only calls to put*

sarracenia.moth.mqtt

sarracenia.moth.amq1

class sarracenia.moth.amq1.AMQ1(broker, props, is_subscriber)[source]

Bases: Moth

__init__(broker, props, is_subscriber)[source]

AMQP 1.0 library to be built with libqpid-proton (the only free amqp 1.0 library around.)

stub, not implemented

sarracenia.rabbitmq_admin

rabbitmq administration bindings, to allow sr to invoke broker management functions.

sarracenia.rabbitmq_admin.add_user(url, role, user, passwd, simulate)[source]

add the given user with the given credentials.

sarracenia.rabbitmq_admin.broker_get_exchanges(url, ssl_key_file=None, ssl_cert_file=None)[source]

get the list of existing exchanges using a url query.

sarracenia.rabbitmq_admin.del_user(url, user, simulate)[source]

delete user from the given broker.

sarracenia.rabbitmq_admin.exec_rabbitmqadmin(url, options, simulate=False)[source]

invoke rabbitmqadmin using a sub-process, with the given options.

sarracenia.rabbitmq_admin.get_exchanges(url)[source]

get the list of existing exchanges.

sarracenia.rabbitmq_admin.get_queues(url)[source]

get the list of existing queues.

sarracenia.rabbitmq_admin.get_users(url)[source]

get the list of existing users.

sarracenia.rabbitmq_admin.run_rabbitmqadmin(url, options, simulate=False)[source]

spawn a subprocess to run rabbitmqadmin with the given options. capture result.

sarracenia.rabbitmq_admin.user_access(url, user)[source]

Given an administrative URL, return a list of exchanges and queues the user can access.

lox = list of exchanges, just a list of names. loq = array of queues, where the value of each is the number of messages ready.

return value:

{ 'exchanges': { 'configure': lox, 'write': lox, 'read': lox },
  'queues' : { 'configure': loq, 'write': loq, 'read': loq },
  'bindings' : { <queue> : { 'exchange': <exchange> , 'key' : <routing_key> } }
}

sarracenia.transfer

exception sarracenia.transfer.TimeoutException[source]

Bases: Exception

timeout exception

__weakref__

list of weak references to the object (if defined)

class sarracenia.transfer.Transfer(proto, options)[source]

Bases: object

This is a sort of abstract base class for implementing transfer protocols. Implemented subclasses include support for: local files, https, sftp, and ftp.

This class has routines that do i/o given descriptors opened by the sub-classes, so that each one does not need to re-implement copying, for example.

Each subclass needs to implement the following routines:

if downloading:

get    ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
getAccellerated( msg, remote_file, local_file, length )
ls     ()
cd     (dir)
delete (path)

if sending:

put    ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
putAccelerated ( msg, remote_file, local_file, length=0 )
cd     (dir)
mkdir  (dir)
umask  ()
chmod  (perm)
rename (old,new)

Note that the ls() call returns are polymorphic. One of:

  • a dictionary where the key is the name of the file in the directory, and the value is an SFTPAttributes structure for if (from paramiko.) (sftp.py as an example)

  • a dictionary where the key is the name of the file, and the value is a string that looks like the output of a linux ls command. (ftp.py as an example.)

  • a sequence of bytes… will be parsed as an html page. (https.py as an example)

The first format is the vastly preferred one. The others are fallbacks when the first is not available. The flowcb/poll/__init__.py lsdir() routing will turn ls tries to transform any of these return values into the first form (a dictionary of SFTPAttributes) Each SFTPAttributes structure needs st_mode set, and folders need stat.S_IFDIR set.

if the lsdir() routine gets a sequence of bytes, the on_html_page() and on_html_parser_init(, or perhaps handle_starttag(..) and handle_data() routines) will be used to turn them into the first form.

web services with different such formats can be accommodated by subclassing and overriding the handle_* entry points.

uses options (on Sarracenia.config data structure passed to constructor/factory.) * credentials - used to authentication information. * sendTo - server to connect to. * batch - how many files to transfer before a connection is torn down and re-established. * permDefault - what permissions to set on files transferred. * permDirDefault - what permission to set on directories created. * timeout - how long to wait for operations to complete. * byteRateMax - maximum transfer rate (throttle to avoid exceeding) * bufsize - size of buffers for file transfers.

__init__(proto, options)[source]
__weakref__

list of weak references to the object (if defined)

on_data(chunk) bytes[source]

transform data as it is being read. Given a buffer, return the transformed buffer. Checksum calculation is based on pre transformation… likely need a post transformation value as well.

sarracenia.transfer.alarm_set(time)[source]

FIXME: replace with set itimer for > 1 second resolution… currently rouding to nearest second.

sarracenia.transfer.file

class sarracenia.transfer.file.File(proto, options)[source]

Bases: Transfer

Transfer sub-class for local file i/o.

__init__(proto, options)[source]
cd(path)[source]

proto classes are used for remote sessions, so this cd is for REMOTE directory… when file remote as a protocol it is for the source. should not change the “local” working directory when downloading.

sarracenia.transfer.https

class sarracenia.transfer.https.Https(proto, options)[source]

Bases: Transfer

HyperText Transfer Protocol (HTTP) ( https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol ) sarracenia transfer protocol subclass supports/uses additional custom options:

  • accelWgetCommand (default: ‘/usr/bin/wget %s -o - -O %d’ )

built with:

urllib.request ( https://docs.python.org/3/library/urllib.request.html )

__init__(proto, options)[source]

sarracenia.transfer.ftp

class sarracenia.transfer.ftp.Ftp(proto, options)[source]

Bases: Transfer

File Transfer Protocol (FTP) ( https://datatracker.ietf.org/doc/html/rfc959 ) sarracenia transfer protocol subclass supports/uses additional custom options:

  • accelFtpputCommand (default: ‘/usr/bin/ncftpput %s %d’ )

  • accelFtpgetCommand (default: ‘/usr/bin/ncftpget %s %d’ )

built using: ftplib ( https://docs.python.org/3/library/ftplib.html )

__init__(proto, options)[source]

sarracenia.transfer.sftp

class sarracenia.transfer.sftp.Sftp(proto, options)[source]

Bases: Transfer

SecSH File Transfer Protocol (SFTP) ( https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt ) Sarracenia transfer protocol subclass supports/uses additional custom options:

  • accelScpCommand (default: ‘/usr/bin/scp %s %d’ )

The module uses the paramiko library for python SecSH support ( https://www.paramiko.org/ )

__init__(proto, options)[source]