Code Reference
Internal Documentation on the full source of sr3 Not an API. subject to change without notice. What is missing:
All flowcallbacks are here: flowcb
the entry points are excluded (not compatible with sphinx/autodoc.)
sarracenia
- class sarracenia.Message[source]
Bases:
dict
A notification message in Sarracenia is stored as a python dictionary, with a few extra management functions.
The internal representation is very close to the v03 format defined here: https://metpx.github.io/sarracenia/Reference/sr_post.7.html
Unfortunately, sub-classing of dict means that to copy it from a dict will mean losing the type, and hence the need for the copyDict member.
- __weakref__
list of weak references to the object (if defined)
- computeIdentity(path, o, offset=0, data=None) None [source]
check extended attributes for a cached identity sum calculation. if extended attributes are present, and * the file mtime is not too new, and * the cached sum us using the same method then use the cached value.
otherwise, calculate a checksum. If the data is provided, use that as the file content, otherwise read the file form the file system.
Once the checksum is determined, set the file’s extended attributes for the new value. the method of checksum calculation is from options.identity.
sets the message ‘identity’ field if appropriate.
- deriveTopics(o, topic, separator='.')[source]
derive subtopic, topicPrefix, and topic fields based on message and options.
- dumps() str [source]
FIXME: used to be msg_dumps. print a message in a compact but relatively compact way. msg is a python dictionary. if there is a field longer than maximum_field_length, truncate.
- static fromFileData(path, o, lstat=None)[source]
create a message based on a given file, calculating the checksum. returns a well-formed message, or None.
- static fromFileInfo(path, o, lstat=None)[source]
based on the fiven information about the file (it’s name and a stat record if available) and a configuration options object (sarracenia.config.Config) return an sarracenia.Message suitable for placement on a worklist.
A message is a specialized python dictionary with a certain set of fields in it. The message returned will have the necessary fields for processing and posting.
The message is built for a file is based on the given path, options (o), and lstat (output of os.stat)
The lstat record is used to build ‘atime’, ‘mtime’ and ‘mode’ fields if timeCopy and permCopy options are set.
if no lstat record is supplied, then those fields will not be set.
- static fromStream(path, o, data=None)[source]
Create a file and message for the given path. The file will be created or overwritten with the provided data. then invoke fromFileData() for the resulting file.
- getContent(options=None)[source]
Retrieve the data referred to by a message. The data may be embedded in the messate, or this routine may resolve a link to an external server and download the data.
does not handle authentication. This routine is meant to be used with small files. using it to download large files may be very inefficient. Untested in that use-case.
Return value is the data.
often on server where one is publishing data, the file is available as a local file, and one can avoid the network usage by using a options.baseDir setting. this behaviour can be disabled by not providing the options or not setting baseDir.
- getIDStr() str [source]
return some descriptive tag string to identify the message being processed.
- new_pathWrite(options, data)[source]
expects: msg[‘new_dir’] and msg[‘new_file’] to be set. given the byte stream of data.
write the local file based on the given message, options and data. update the message to match same (recalculating checksum.)
in future: If the data field is a file, then that is taken as an open file object which can be read sequentially, and the bytes write to the path indicated by other message fields.
currently, if data is a buffer, then it’s contents is written to the file.
if data is None, then look for the ‘content’ header in the message. and use the data from that.
- setReport(code, text=None)[source]
FIXME: used to be msg_set_report set message fields to indicate result of action so reports can be generated.
set is supposed to indicate final message dispositions, so in the case of putting a message on worklist.failed… no report is generated, since it will be retried later. FIXME: should we publish an interim failure report?
- updatePaths(options, new_dir=None, new_file=None)[source]
set the new_* fields in the message based on changed file placement. if new_* options are ommitted updaste the rest of the fields in the message based on their current values.
If you change file placement in a flow callback, for example. One would change new_dir and new_file in the message. This routines updates other fields in the message (e.g. relPath, baseUrl, topic ) to match new_dir/new_file.
msg[‘post_baseUrl’] defaults to msg[‘baseUrl’]
- class sarracenia.Sarracenia[source]
Bases:
object
Core utilities of Sarracenia. The main class here is sarracenia.Message. a Sarracenia.Message is subclassed from a dict, so for most uses, it works like the python built-in, but also we have a few major entry points some factoryies:
Building a message from a file
m = sarracenia.Message.fromFileData( path, options, lstat )
builds a notification message from a given existing file, consulting options, a parsed in memory version of the configuration settings that are applicable
Options
see the sarracenia.config.Config class for functions to parse configuration files and create corresponding python option dictionaries. One can supply small dictionaries for example:
options['topicPrefix'] = [ 'v02', 'post' ] options['bindings'] = [ ('xpublic', [ 'v02', 'post'] , [ '#' ] )] options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'
Above is an example of a minimal options dictionary taken from the tutorial example called moth_api_consumer.py. often
If you don’t have a file
If you don’t have a local file, then build your notification message with:
m = sarracenia.Message.fromFileInfo( path, options, lstat )
where you can make up the lstat values to fill in some fields in the message. You can make a fake lstat structure to provide these values using sarracenia.filemetadata class which is either an alias for paramiko.SFTPAttributes ( https://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_attr.SFTPAttributes ) if paramiko is installed, or a simple emulation if not.
from sarracenia.filemetadata import FmdStat
lstat = FmdStat() lstat.st_mtime= utcinteger second count in UTC (numeric version of a Sarracenia timestamp.) lstat.st_atime= lstat.st_mode=0o644 lstat.st_size= size_in_bytes
optional fields that may be of interest: lstat.filename= “nameOfTheFile” lstat.longname= ‘lrwxrwxrwx 1 peter peter 20 Oct 11 20:28 nameOfTheFile’
that you can then provide as an lstat argument to the above fromFileInfo() call. However the notification message returned will lack an identity checksum field. once you get the file, you can add the Identity field with:
m.computeIdentity(path, o):
In terms of consuming notification messages, the fields in the dictionary provide metadata for the announced resource. The anounced data could be embedded in the notification message itself, or available by a URL.
Messages are generally gathered from a source such as the Message Queueing Protocol wrapper class: moth… sarracenia.moth.
data = m.getContent()
will return the content of the announced resource as raw data.
- __weakref__
list of weak references to the object (if defined)
- class sarracenia.TimeConversions[source]
Bases:
object
Time conversion routines.
os.stat, and time.now() return floating point
The floating point representation is a count of seconds since the beginning of the epoch.
beginning of epoch is platform dependent, and conversion to actual date is fraught (leap seconds, etc…)
Entire SR_* formats are text, no floats are sent over the protocol (avoids byte order issues, null byte / encoding issues, and enhances readability.)
str format: YYYYMMDDHHMMSS.msec goal of this representation is that a naive conversion to floats yields comparable numbers.
but the number that results is not useful for anything else, so need these special routines to get a proper epochal time.
also OK for year 2032 or whatever (rollover of time_t on 32 bits.)
string representation is forced to UTC timezone to avoid having to communicate timezone.
timestr2flt() - accepts a string and returns a float.
caveat
FIXME: this encoding will break in the year 10000 (assumes four digit year) and requires leading zeroes prior to 1000. One will have to add detection of the decimal point, and change the offsets at that point.
- __weakref__
list of weak references to the object (if defined)
- sarracenia.durationToSeconds(str_value, default=None) float [source]
this function converts duration to seconds. str_value should be a number followed by a unit [s,m,h,d,w] ex. 1w, 4d, 12h return 0.0 for invalid string.
- sarracenia.durationToString(d) str [source]
given a numbner of seconds, return a short, human readable string.
- sarracenia.stat(path) SFTPAttributes [source]
os.stat call replacement which improves on it by returning and SFTPAttributes structure, in place of the OS stat one, featuring:
mtime and ctime with subsecond accuracy
fields that can be overridden (not immutable.)
- sarracenia.timeflt2str(f=None)[source]
timeflt2str - accepts a float and returns a string.
flow is a floating point number such as returned by time.now() (number of seconds since beginning of epoch.)
the str is YYYYMMDDTHHMMSS.sssss
20210921T011331.0123
translates to: Sept. 21st, 2021 at 01:13 and 31.0123 seconds. always UTC timezone.
sarracenia.config
Second version configuration parser
FIXME: pas 2023/02/05… missing options from v2: max_queue_size, outlet, pipe
- class sarracenia.config.Config(parent=None)[source]
Bases:
object
The option parser to produce a single configuration.
it can be instantiated with one of:
one_config(component, config, action, isPost=False, hostdir=None) – read the options for a given component an configuration, (all in one call.)
On the other hand, a configu can be built up from the following constructors:
default_config() – returns an empty configuration, given a config file tree.
no_file_config() – returns an empty config without any config file tree.
Then just add settings manually:
cfg = no_file_config() cfg.broker = sarracenia.config.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca') cfg.topicPrefix = [ 'v02', 'post'] cfg.component = 'subscribe' cfg.config = 'flow_demo' cfg.action = 'start' cfg.bindings = [ ('xpublic', ['v02', 'post'], ['*', 'WXO-DD', 'observations', 'swob-ml', '#' ]) ] cfg.queueName='q_anonymous.subscriber_test2' cfg.download=True cfg.batch=1 cfg.messageCountMax=5 # set the instance number for the flow class. cfg.no=0
# and at the end call finalize
cfg.finalize()
- __deepcopy__(memo) Configuration [source]
code for this from here: https://stackoverflow.com/questions/1500718/how-to-override-the-copy-deepcopy-operations-for-a-python-object Needed for python < 3.7ish? (ubuntu 18) found this bug: https://bugs.python.org/issue10076 deepcopy fails for objects with re’s in them? ok on ubuntu 20.04
- __weakref__
list of weak references to the object (if defined)
- _parse_binding(subtopic_string)[source]
- FIXME: see original parse, with substitions for url encoding.
also should sqwawk about error if no exchange or topicPrefix defined. also None to reset to empty, not done.
- _parse_setting(opt, value)[source]
v3 plugin accept options for specific modules.
parsed from: set sarracenia.flowcb.log.filter.Log.level debug
example: opt= sarracenia.flowcb.log.filter.Log.level value = debug
results in: self.settings[ sarracenia.flowcb.log.filter.Log ][level] = debug
options should be fed to plugin class on instantiation. stripped of class… * options = { ‘level’ : ‘debug’ }
- _resolve_exchange()[source]
based on the given configuration, fill in with defaults or guesses. sets self.exchange.
- class addBinding(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]
Bases:
Action
called by argparse to deal with queue bindings.
- add_option(option, kind='list', default_value=None, all_values=None)[source]
options can be declared in any plugin. There are various kind of options, where the declared type modifies the parsing.
‘count’ integer count type.
‘octal’ base-8 (octal) integer type.
- ‘duration’ a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)
modified by a unit suffix ( m-minute, h-hour, w-week )
‘flag’ boolean (True/False) option.
‘float’ a simple floating point number.
- ‘list’ a list of string values, each succeeding occurrence catenates to the total.
all v2 plugin options are declared of type list.
- ‘set’ a set of string values, each succeeding occurrence is unioned to the total.
if all_values is provided, then constrain set to that.
‘size’ integer size. Suffixes k, m, and g for kilo, mega, and giga (base 2) multipliers.
- ‘str’ an arbitrary string value, as will all of the above types, each
succeeding occurrence overrides the previous one.
If a value is set to None, that could mean that it has not been set.
- applyComponentDefaults(component)[source]
overlay defaults options for the given component to the given configuration.
- finalize(component=None, config=None)[source]
Before final use, take the existing settings, and infer any missing needed defaults from what is provided. Should be called prior to using a configuration.
There are default options that apply only if they are not overridden…
- hohoo_validate_urlstr(urlstr) tuple [source]
returns a tuple ( bool, expanded_url ) the bool is whether the expansion worked, and the expanded_url is one with the added necessary authentication details from sarracenia.Credentials.
- mask_ppstr(mask)[source]
return a pretty print string version of the given mask, easier for humans to read.
- merge(oth)[source]
merge to lists of options.
merge two lists of options if one is cumulative then merge, otherwise if not None, then take value from oth
- override(oth)[source]
override a value in a set of options.
why override() method and not just assign values to the dictionary? in the configuration file, there are various ways to have variable substituion. override invokes those, so that they are properly interpreted. Otherwise, you just end up with a literal value.
- parse_args(isPost=False)[source]
- user information:
accept a configuration, apply argParse library to augment the given configuration with command line settings.
the post component has a different calling convention than others, so use that flag if called from post.
- development notes:
Use argparse.parser to modify defaults. FIXME, many FIXME notes below. this is a currently unusable placeholder. have not figured this out yet. many issues.
FIXME #1: parseArgs often sets the value of the variable, regardless of it’s presence (normally a good thing.) ( if you have ‘store_true’ then default needed, for broker, just a string, it ignores if not present.) This has the effect of overriding settings in the file parsed before the arguments. Therefore: often supply defaults… but… sigh…
but there is another consideration stopping me from supplying defaults, wish I remembered what it was. I think it is: FIXME #2: arguments are parsed twice: once to get basic stuff (loglevel, component, action) and if the parsing fails there, the usage will print the wrong defaults…
- sundew_dirPattern(pattern, urlstr, basename, destDir)[source]
does substitutions for patterns in directories.
- variableExpansion(cdir, message=None) str [source]
replace substitution patterns, variable substitutions as described in https://metpx.github.io/sarracenia/Reference/sr3_options.7.html#variables
returns: the given string with the substiturions done.
- examples: ${YYYYMMDD-70m} becomes 20221107 assuming that was the current date 70 minutes ago.
environment variables, and built-in settings are replaced also.
timeoffset -70m
- sarracenia.config.config_path(subdir, config, mandatory=True, ctype='conf')[source]
Given a subdir/config look for file in configish places.
return Tuple: Found (True/False), path_of_file_found|config_that_was_not_found
- sarracenia.config.get_log_filename(hostdir, component, configuration, no)[source]
return the name of a single logfile for a single instance.
- sarracenia.config.get_metrics_filename(hostdir, component, configuration, no)[source]
return the name of a single logfile for a single instance.
- sarracenia.config.get_pid_filename(hostdir, component, configuration, no)[source]
return the file name for the pid file for the specified instance.
- sarracenia.config.logger = <Logger sarracenia.config (WARNING)>
respect appdir stuff using an environment variable. for not just hard coded as a class variable appdir_stuff
- Type:
FIXME
- sarracenia.config.no_file_config()[source]
initialize a config that will not use Sarracenia configuration files at all. meant for use by people writing independent programs to start up instances with python API calls.
- sarracenia.config.one_config(component, config, action, isPost=False, hostDir=None)[source]
single call return a fully parsed single configuration for a single component to run.
read in admin.conf and default.conf
apply component default overrides ( maps to: component/check ?) read in component/config.conf parse arguments from command line. return config instance item.
appdir_stuff can be to override file locations for testing during development.
- sarracenia.config.parse_count(cstr)[source]
number argument accepts k,m,g suffix with i and b to use base 2 ) and +- return value is integer.
- sarracenia.config.parse_float(cstr)[source]
like parse_count, numeric argument accepts k,m,g suffix and +-. below 1000, return a decimal number with 3 digits max.
- sarracenia.config.str_options = ['accelCpCommand', 'accelWgetCommand', 'accelScpCommand', 'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange', 'exchangeSuffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'hostname', 'identity', 'inlineEncoding', 'logFormat', 'logLevel', 'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange', 'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'queueShare', 'sendTo', 'rename', 'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver', 'nodupe_basis', 'tlsRigour', 'topic']
for backward compatibility,
convert some old plugins that are hard to get working with v2wrapper, into v3 plugin.
the fdelay ones makes in depth use of sr_replay function, and that has changed in v3 too much.
accelerators and rate limiting are now built-in, no plugin required.
sarracenia.credentials
sarracenia.diskqueue
- class sarracenia.diskqueue.DiskQueue(options, name)[source]
Bases:
object
Process Persistent Queue…
Persist messages to a file so that processing can be attempted again later. For safety reasons, want to be writing to a file ASAP. For performance reasons, all those writes need to be Appends.
so continuous, but append-only io… with an occasional housekeeping cycle. to resolve them
not clear if we need multi-task safety… just one task writes to each queue.
retry_ttl how long
self.retry_cache
a dictionary indexed by some sort of key to prevent duplicate messages being stored in it.
retry_path = ~/.cache/sr3/<component>/<config>/diskqueue_<name>
with various suffixes:
.new – messages added to the retry list are appended to this file.
whenever a message is added to the retry_cache, it is appended to a cumulative list of entries to add to the retry list.
every housekeeping interval, the two files are consolidated.
note that the ack_id of messages retreived from the retry list, is removed. Files must be acked around the time they are placed on the retry_list, as reception from the source should have already been acknowledged.
- FIXME: would be fun to look at performance of this thing and compare it to
python persistent queue. the differences:
This class does no locking (presumed single threading.) could add locks… and they would be coarser grained than stuff in persistentqueue this should be faster than persistent queue, but who knows what magic they did. This class doesn’t implement in-memory queue… it is entirely on disk… saves memory, optimal for large queues. probably good, since retries should be slow…
not sure what will run better.
- __len__() int [source]
Returns the total number of messages in the DiskQueue.
Number of messages in the DiskQueue does not necessarily equal the number of messages available to
get
. Messages in the .new file are counted, but can’t be retrieved untilon_housekeeping()
has been run.- Returns:
number of messages in the DiskQueue.
- Return type:
int
- __weakref__
list of weak references to the object (if defined)
- _count_msgs(file_path) int [source]
Count the number of messages (lines) in the queue file. This should be used only when opening an existing file, because
get()
does not remove messages from the file.- Parameters:
file_path (str) – path to the file to be counted.
- Returns:
count of messages in file, -1 if the file could not be read.
- Return type:
int
- get(maximum_messages_to_get=1)[source]
qty number of messages to retrieve from the queue.
no housekeeping in get … if no message (and new or state file there) we wait for housekeeping to present retry messages
- in_cache(message) bool [source]
return whether the entry is message is in the cache or not. side effect: adds it.
- needs_requeuing(message) bool [source]
return * True if message is not expired, and not already in queue. * False otherwise.
sarracenia.filemetadata
- class sarracenia.filemetadata.FileMetadata(path)[source]
Bases:
object
This class implements storing metadata with a file.
on unlix/linux/mac systems, we use extended attributes, where we apply a user.sr_ prefix to the attribute names to avoid clashes.
on Windows NT, create an “sr_.json” Alternate Data Stream to store them.
API:
All values are utf-8, hence readable by some subset of humans. not bytes. no binary, go away…
x = sr_attr( path ) <- read metadata from file. x.list() <- list all extant extended attributes.
sample return value: [ ‘sum’, ‘mtime’ ]
x.get(‘sum’) <- look at one value.
returns None if missing.
x.set(‘sum’, ‘hoho’) <- set one value.
fails silently (fall-back gracefully.)
x.persist() <- write metadata back to file, if necessary.
- __weakref__
list of weak references to the object (if defined)
sarracenia.flow
- class sarracenia.flow.Flow(cfg=None)[source]
Bases:
object
Implement the General Algorithm from the Concepts Guide.
All of the component types (e.g. poll, subscribe, sarra, winnow, shovel ) are implemented as sub-classes of Flow. The constructor/factory accept a configuration (sarracenia.config.Config class) with all the settings in it.
This class takes care of starting up, running with callbacks, and clean shutdown.
need to know whether to sleep between passes o.sleep - an interval (floating point number of seconds) o.housekeeping -
A flow processes worklists of messages
worklist given to callbacks…
worklist.incoming –> new messages to continue processing
worklist.ok –> successfully processed
worklist.rejected –> messages to not be further processed.
worklist.failed –> messages for which processing failed.
worklist.dirrectories_ok –> directories created.
Initially all messages are placed in incoming. if a callback decides:
a message is not relevant, it is moved to rejected.
all processing has been done, it moves it to ok.
an operation failed and it should be retried later, move to retry
callbacks must not remove messages from all worklists, re-classify them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received.
interesting data structure: self.plugins – dict of modular functionality metadata.
self.plugins[ “load” ] contains a list of (v3) flow_callbacks to load.
self.plugins[ entry_point ] - one for each invocation times of callbacks. examples: “on_start”, “after_accept”, etc… contains routines to run at each entry_point
- __weakref__
list of weak references to the object (if defined)
- _runCallbackMetrics()[source]
Collect metrics from plugins with a
metricsReport
entry point.Expects the plugin to return a dictionary containing metrics, which is saved to
self.metrics[plugin_name]
.
- _runHousekeeping(now) float [source]
Run housekeeping callbacks Return the time when housekeeping should be run next
- do_download() None [source]
- do download work for self.worklist.incoming, placing files:
successfully downloaded in worklist.ok temporary failures in worklist.failed permanent failures (or files not to be downloaded) in worklist.rejected
- download(msg, options) int [source]
download/transfer one file based on message, return True if successful, otherwise False.
return 0 – failed, retry later. return 1 – OK download successful. return -1 – download failed permanently, retry not useful.
- file_should_be_downloaded(msg) bool [source]
determine whether a comparison of local_file and message metadata indicates that it is new enough that writing the file locally is warranted.
return True to say downloading is warranted.
False if the file in the message represents the same or an older version that what is corrently on disk.
origin: refactor & translation of v2: content_should_not_be downloaded
- Assumptions:
new_path exists… there is a file to compare against.
- has_vip() list [source]
return list of vips which are active on the current machine, or an empty list.
- link1file(msg, symbolic=True) bool [source]
perform a link of a single file, based on a message, returning boolean success if it’s Symbolic, then do that. else do a hard link.
imported from v2/subscribe/doit_download “link event, try to link the local product given by message”
- renameOneItem(old, path) bool [source]
for messages with an rename file operation, it is to rename a file.
- run()[source]
This is the core routine of the algorithm, with most important data driven loop in it. This implements the General Algorithm (as described in the Concepts Explanation Guide) check if stop_requested once in a while, but never return otherwise.
- send(msg, options) int [source]
send/transfer one file based on message, return int:
return 0 – failed temporarily, retry later. return >0 – OK download successful. return <0 – download failed permanently, retry not useful.
- set_local_file_attributes(local_file, msg)[source]
after a file has been written, restore permissions and ownership if necessary.
- stop_request() None [source]
called by the signal handler to tell self and FlowCB classes to stop. Without this, calling runCallbacksTime(‘please_stop’) from inside self.please_stop causes an infinite loop. Note: if this is called from outside of a signal handler, the interruptible_sleep function
won’t work.
- sundew_getDestInfos(msg, currentFileOption, filename)[source]
modified from sundew client
WHATFN – First part (‘:’) of filename HEADFN – Use first 2 fields of filename NONE – Use the entire filename TIME or TIME: – TIME stamp appended DESTFN=fname – Change the filename to fname
ex: mask[2] = ‘NONE:TIME’
- updateFieldsAccepted(msg, urlstr, pattern, maskDir, maskFileOption, mirror, path_strip_count, pstrip, flatten) bool [source]
Set new message fields according to values when the message is accepted.
urlstr: the urlstr being matched (baseUrl+relPath+sundew_extension)
pattern: the regex that was matched.
maskDir: the current directory to base the relPath from.
maskFileOption: filename option value (sundew compatibility options.)
strip: number of path entries to strip from the left side of the path.
pstrip: pattern strip regexp to apply instead of a count.
flatten: a character to replace path separators with toe change a multi-directory deep file name into a single long file name
return True on success
sarracenia.flow.poll
- class sarracenia.flow.poll.Poll(options)[source]
Bases:
Flow
repeatedly query a remote (non-sarracenia) server to list the files there. post messages (to post_broker) for every new file discovered there.
the sarracenia.flowcb.poll class is used to implement the remote querying, and is highly customizable to that effect.
if the vip option is set, * subscribe to the same settings that are being posted to. * consume all the messages posted, keeping new file duplicate cache updated.
sarracenia.flow.post
sarracenia.flow.report
sarracenia.flow.sarra
sarracenia.flow.sender
sarracenia.flow.shovel
sarracenia.flow.subscribe
sarracenia.flow.watch
sarracenia.flow.winnow
sarracenia.instance
- class sarracenia.instance.RedirectedTimedRotatingFileHandler(filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None, errors=None)[source]
Bases:
TimedRotatingFileHandler
- doRollover()[source]
do a rollover; in this case, a date/time stamp is appended to the filename when the rollover happens. However, you want the file to be named for the start of the interval, not the current time. If there is a backup count, then we have to get a list of matching filenames, sort them and remove the one with the oldest suffix.
- class sarracenia.instance.instance[source]
Bases:
object
Process management for a single flow instance. start and stop instances.
this is the main entry point launched from the sr3 cli, with arguments for it to turn into a specific configuration.
- __weakref__
list of weak references to the object (if defined)
- start()[source]
Main element to run a single flow instance. It parses the command line arguments twice. the first pass, is to initialize the log file and debug level, and select the configuration file to parse. Once the log file is set, and output & error re-direction is in place, the second pass begins:
The configuration files are parsed, and then the options are parsed a second time to act as overrides to the configuration file content.
As all process management is handled by sr.py, the action here is not parsed, but always either start (daemon) or foreground (interactive)
sarracenia.identity
- class sarracenia.identity.Identity[source]
Bases:
object
A class for algorithms to get a fingerprint for a file being announced. Appropriate fingerprinting algorithms vary according to file type.
required methods in subclasses:
- def registered_as(self):
return a one letter string identifying the algorithm (mostly for v2.) in v3, the registration comes from the identity sub-class name in lower case.
- def set_path(self,path):
start a checksum for the given path… initialize.
- def update(self,chunk):
update the checksum based on the given bytes from the file (sequential access assumed.)
- __weakref__
list of weak references to the object (if defined)
- update_file(path)[source]
read the entire file, check sum it. this is kind of last resort as it cost an extra file read. It is better to call update( as the file is being read for other reasons.
- property value
return the current value of the checksum calculation.
sarracenia.identity.arbitrary
- class sarracenia.identity.arbitrary.Arbitrary[source]
Bases:
Identity
For applications where there is no known way of determining equivalence, allow them to supply an arbitrary tag, that can be used to compare products for duplicate suppression purposes.
use setter to set the value… some sort of external checksum algorithm that cannot be reproduced.
- property value
return the current value of the checksum calculation.
sarracenia.identity.sha512
sarracenia.identity.md5
sarracenia.identity.random
sarracenia.moth
- class sarracenia.moth.Moth(props=None, is_subscriber=True)[source]
Bases:
object
Moth … Messages Organized by Topic Headers (en français: Messages organisés par thème hierarchique. )
A multi-protocol library for use by hierarchical message passing implementations, (messages which have a ‘topic’ header that is used for routing by brokers.)
regardless of protocol, the message format returned should be the same.
the message is turned into a sarracenia.Message object, which acts like a python dictionary, corresponding to key-value pairs in the message body, and properties.
topic is a special key that may end up in the message body, or some sort of property or metadata.
the protocol should support acknowledgement under user control. Such control indicated by the presence of an entry_point called “ack”. The entry point accepts “ack_id” as a message identifier to be passed to the broker. Whatever protocol symbol is used by the protocol, it is passed through this message property. Examples: in rabbitmq/amqp ack takes a “delivery_tag” as an argument, in MQTT, it takes a “message-id” so when receiving an AMQP message, the m[‘ack_id’] is assigned the delivery_tag from the message.
There is a special dict item: “_DeleteOnPost”, to identify keys which are added only for local use. they will be removed from the message when publishing. examples: topic (sent outside body), message-id (used for acknowledgements.) new_basedir, ack_id, new_… (settings…)
Intent is to be specialized for topic based data distribution (MQTT style.) API to allow pass-through of protocol specific properties, but apply templates for genericity.
Target protocols (and corresponding libraries.): AMQP, MQTT, ?
Things to specify:
broker
topicPrefix
subTopic
queueName (for amqp, used as client-id for mqtt)
this library knows nothing about Sarracenia, the only code used from sarracenia is to interpret duration properties, from the root sarracenia/__init__.py, the broker argument from sarracenia.config.credentials
usage:
import sarracenia.moth import sarracenia.config.credentials props = sarracenia.moth.default_options props['broker'] = sarracenia.config.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca') props['expire'] = 300 props['batch'] = 1 is_subscriber=True c= Moth( props, is_subscriber ) messages = c.newMessages() # if there are new messages from a publisher, return them, otherwise return # an empty list []]. p=Moth( { 'batch':1 }, False ) p.putNewMessage() p.close() # tear down connection.
Initialize a broker connection. Connections are unidirectional. either for subscribing (with subFactory) or publishing (with pubFactory.)
The factories return objects subclassed to match the protocol required by the broker argument.
arguments to the factories are:
broker … the url of the broker to connect to.
props is a dictionary or properties/parameters.
supplied as overrides to the default properties listed above.
Some may vary among protocols:
Protocol library implementing URL to select -------- -------------------- ------------- AMQPv0.9 --> amqplib from Celery --> amqp, amqps AMQPv0.9 --> pika --> pika, pikas MQTTv3 --> paho --> mqtt, mqtts AMQPv1.0 --> qpid-proton --> amq1, amq1s
messaging_strategy
how to manage the connection. Covers whether to treat the connection as new or assume it is set up. Also, If something goes wrong. What should be done.
reset: on startup… erase any state, and re-initialize.
stubborn: If set to True, loop forever if something bad happens. Never give up. This sort of setting is desired in operations, especially unattended. if set to False, may give up more easily.
failure_duration is to advise library how to structure connection service level.
5m - make a connection that will recover from transient errors of a few minutes, but not tax the broker too much for prolonged outages.
5d - duration outage to striving to survive connection for five days.
Changing recovery_strategy setting, might result in having to destroy and re-create consumer queues (AMQP.)
Options
both
‘topicPrefix’ : [ ‘v03’ ]
‘messageDebugDump’: False, –> enable printing of raw messages.
‘inline’: False, - Are we inlining content within messages?
‘inlineEncoding’: ‘guess’, - what encoding should we use for inlined content?
‘inlineByteMax’: 4096, - Maximum size of messages to inline.
for get
‘batch’ : 100 # how many messages to get at once
‘broker’ : an sr_broker ?
‘queueName’ : Mandatory, name of a queue. (only in AMQP… hmm…)
‘bindings’ : [ list of bindings ]
‘loop’
optional:
‘message_ttl’
for put:
‘exchange’ (only in AMQP… hmm…)
- __init__(props=None, is_subscriber=True) None [source]
If is_subscriber=True, then this is a consuming instance. expect calls to get* routines.
if is_subscriber=False, then expect/permit only calls to put*
- __weakref__
list of weak references to the object (if defined)
- ack(message: Message) bool [source]
tell broker that a given message has been received.
ack uses the ‘ack_id’ property to send an acknowledgement back to the broker. If there’s no ‘ack_id’ in the message, you should return True.
- cleanup() None [source]
get rid of server-side resources associated with a client. (queues/id’s, etc…)
- property default_options: dict
get default properties to override, used by client for validation.
- static findAllSubclasses(cls) set [source]
Recursively finds all subclasses of a class. __subclasses__() only gives direct subclasses.
- getNewMessage() Message [source]
If there is one new message available, return it. Otherwise return None. Do not block.
- side effects:
metrics. self.metrics[‘RxByteCount’] should be incremented by size of payload. self.metrics[‘RxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘RxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)
- newMessages() list [source]
If there are new messages available from the broker, return them, otherwise return None.
On Success, this routine returns immediately (non-blocking) with either None, or a list of messages.
On failure, this routine blocks, and loops reconnecting to broker, until interaction with broker is successful.
- please_stop() None [source]
register a request to cleanly stop. Any long running processes should check for _stop_requested and stop if it becomes True.
- putNewMessage(message: Message, content_type: str = 'application/json', exchange: str | None = None) bool [source]
publish a message as set up to the given topic.
return True is succeeded, False otherwise.
- side effect
self.metrics[‘TxByteCount’] should be incremented by size of payload. self.metrics[‘TxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘TxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)
- setEbo(start) None [source]
Calculate next retry time using exponential backoff note that it doesn’t look like classic EBO because the time is multiplied by how long it took to fail. Long failures should not be retried quickly, but short failures can be variable in duration. If the timing of failures is variable, the “attempt_duration” will be low, and so the next_try might get smaller even though it hasn’t succeeded yet… it should eventually settle down to a long period though.
sarracenia.moth.amqp
- class sarracenia.moth.amqp.AMQP(props, is_subscriber)[source]
Bases:
Moth
implementation of the Moth API for the amqp library, which is built to talk to rabbitmq brokers in 0.8 and 0.9 AMQP dialects.
to allow acknowledgements we map: AMQP’ ‘delivery_tag’ to the ‘ack_id’
additional AMQP specific options:
exchangeDeclare - declare exchanges before use. queueBind - bind queue to exchange before use. queueDeclare - declare queue before use.
- __connect(broker) bool
connect to broker. returns True if connected, false otherwise. * side effect: self.channel set to a new channel.
Expect caller to handle errors.
- __init__(props, is_subscriber) None [source]
connect to broker, depending on message_strategy stubborness, remain connected.
- ack(m: Message) None [source]
do what you need to acknowledge that processing of a message is done. NOTE: AMQP delivery tags (we call them ack_id) are scoped per channel. “Deliveries must be acknowledged on the same channel they were received on. Acknowledging on a different channel will result in an “unknown delivery tag” protocol exception and close the channel.”
- getNewMessage() Message [source]
If there is one new message available, return it. Otherwise return None. Do not block.
- side effects:
metrics. self.metrics[‘RxByteCount’] should be incremented by size of payload. self.metrics[‘RxGoodCount’] should be incremented by 1 if a good message is received. self.metrics[‘RxBadCount’] should be incremented by 1 if an invalid message is received (&discarded.)
- getSetup() None [source]
Setup so we can get messages.
- if message_strategy is stubborn, will loop here forever.
connect, declare queue, apply bindings.
- newMessages() list [source]
If there are new messages available from the broker, return them, otherwise return None.
On Success, this routine returns immediately (non-blocking) with either None, or a list of messages.
On failure, this routine blocks, and loops reconnecting to broker, until interaction with broker is successful.
- putNewMessage(message: Message, content_type: str = 'application/json', exchange: str | None = None) bool [source]
put a new message out, to the configured exchange by default.
- setEbo(start) None [source]
Calculate next retry time using exponential backoff note that it doesn’t look like classic EBO because the time is multiplied by how long it took to fail. Long failures should not be retried quickly, but short failures can be variable in duration. If the timing of failures is variable, the “attempt_duration” will be low, and so the next_try might get smaller even though it hasn’t succeeded yet… it should eventually settle down to a long period though.
- sarracenia.moth.amqp.logger = <Logger sarracenia.moth.amqp (WARNING)>
amqp_ss_maxlen
the maximum length of a “short string”, as per AMQP protocol, in bytes.
sarracenia.moth.pika
sarracenia.moth.mqtt
sarracenia.moth.amq1
sarracenia.rabbitmq_admin
rabbitmq administration bindings, to allow sr to invoke broker management functions.
- sarracenia.rabbitmq_admin.add_user(url, role, user, passwd, simulate)[source]
add the given user with the given credentials.
- sarracenia.rabbitmq_admin.broker_get_exchanges(url, ssl_key_file=None, ssl_cert_file=None)[source]
get the list of existing exchanges using a url query.
- sarracenia.rabbitmq_admin.exec_rabbitmqadmin(url, options, simulate=False)[source]
invoke rabbitmqadmin using a sub-process, with the given options.
- sarracenia.rabbitmq_admin.run_rabbitmqadmin(url, options, simulate=False)[source]
spawn a subprocess to run rabbitmqadmin with the given options. capture result.
- sarracenia.rabbitmq_admin.user_access(url, user)[source]
Given an administrative URL, return a list of exchanges and queues the user can access.
lox = list of exchanges, just a list of names. loq = array of queues, where the value of each is the number of messages ready.
return value:
{ 'exchanges': { 'configure': lox, 'write': lox, 'read': lox }, 'queues' : { 'configure': loq, 'write': loq, 'read': loq }, 'bindings' : { <queue> : { 'exchange': <exchange> , 'key' : <routing_key> } } }
sarracenia.transfer
- exception sarracenia.transfer.TimeoutException[source]
Bases:
Exception
timeout exception
- __weakref__
list of weak references to the object (if defined)
- class sarracenia.transfer.Transfer(proto, options)[source]
Bases:
object
This is a sort of abstract base class for implementing transfer protocols. Implemented subclasses include support for: local files, https, sftp, and ftp.
This class has routines that do i/o given descriptors opened by the sub-classes, so that each one does not need to re-implement copying, for example.
Each subclass needs to implement the following routines:
if downloading:
get ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 ) getAccellerated( msg, remote_file, local_file, length ) ls () cd (dir) delete (path)
if sending:
put ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 ) putAccelerated ( msg, remote_file, local_file, length=0 ) cd (dir) mkdir (dir) umask () chmod (perm) rename (old,new)
Note that the ls() call returns are polymorphic. One of:
a dictionary where the key is the name of the file in the directory, and the value is an SFTPAttributes structure for if (from paramiko.) (sftp.py as an example)
a dictionary where the key is the name of the file, and the value is a string that looks like the output of a linux ls command. (ftp.py as an example.)
a sequence of bytes… will be parsed as an html page. (https.py as an example)
The first format is the vastly preferred one. The others are fallbacks when the first is not available. The flowcb/poll/__init__.py lsdir() routing will turn ls tries to transform any of these return values into the first form (a dictionary of SFTPAttributes) Each SFTPAttributes structure needs st_mode set, and folders need stat.S_IFDIR set.
if the lsdir() routine gets a sequence of bytes, the on_html_page() and on_html_parser_init(, or perhaps handle_starttag(..) and handle_data() routines) will be used to turn them into the first form.
web services with different such formats can be accommodated by subclassing and overriding the handle_* entry points.
uses options (on Sarracenia.config data structure passed to constructor/factory.) * credentials - used to authentication information. * sendTo - server to connect to. * batch - how many files to transfer before a connection is torn down and re-established. * permDefault - what permissions to set on files transferred. * permDirDefault - what permission to set on directories created. * timeout - how long to wait for operations to complete. * byteRateMax - maximum transfer rate (throttle to avoid exceeding) * bufSize - size of buffers for file transfers.
- __weakref__
list of weak references to the object (if defined)
sarracenia.transfer.file
sarracenia.transfer.https
- class sarracenia.transfer.https.HTTPRedirectHandlerSameMethod[source]
Bases:
HTTPRedirectHandler
Instead of returning a new Request without a method (defaults to GET), use the same method in the new Request. https://docs.python.org/3/library/urllib.request.html#urllib.request.HTTPRedirectHandler.redirect_request https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections note [2]
- redirect_request(req, fp, code, msg, headers, newurl)[source]
Return a Request or None in response to a redirect.
This is called by the http_error_30x methods when a redirection response is received. If a redirection should take place, return a new Request to allow http_error_30x to perform the redirect. Otherwise, raise HTTPError if no-one else should try to handle this url. Return None if you can’t but another Handler might.
- class sarracenia.transfer.https.Https(proto, options)[source]
Bases:
Transfer
HyperText Transfer Protocol (HTTP) ( https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol ) sarracenia transfer protocol subclass supports/uses additional custom options:
accelWgetCommand (default: ‘/usr/bin/wget %s -o - -O %d’ )
- built with:
urllib.request ( https://docs.python.org/3/library/urllib.request.html )
- __open__(path, remote_offset=0, length=0, method: str | None = None, add_headers: dict | None = None) bool [source]
Open a URL. When the open is successful, self.http is set to a urllib.response instance that can be read from like a file.
Returns True when successfully opened, False if there was a problem.
- __url_redir_str()
Returl self.urlstr, unless the request was redirected to a different URL. If so, it will return ‘self.urlstr redirected to new_url’
sarracenia.transfer.ftp
- class sarracenia.transfer.ftp.Ftp(proto, options)[source]
Bases:
Transfer
File Transfer Protocol (FTP) ( https://datatracker.ietf.org/doc/html/rfc959 ) sarracenia transfer protocol subclass supports/uses additional custom options:
accelFtpputCommand (default: ‘/usr/bin/ncftpput %s %d’ )
accelFtpgetCommand (default: ‘/usr/bin/ncftpget %s %d’ )
built using: ftplib ( https://docs.python.org/3/library/ftplib.html )
- class sarracenia.transfer.ftp.IMPLICIT_FTP_TLS(*args, **kwargs)[source]
Bases:
FTP_TLS
FTP_TLS subclass that automatically wraps sockets in SSL to support implicit FTPS. Copied from https://stackoverflow.com/questions/12164470/python-ftp-implicit-tls-connection-issue
- __init__(*args, **kwargs)[source]
Initialization method (called by class instantiation). Initialize host to localhost, port to standard ftp port. Optional arguments are host (for connect()), and user, passwd, acct (for login()).
- property sock
Return the socket.
sarracenia.transfer.sftp
- class sarracenia.transfer.sftp.Sftp(proto, options)[source]
Bases:
Transfer
SecSH File Transfer Protocol (SFTP) ( https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt ) Sarracenia transfer protocol subclass supports/uses additional custom options:
accelScpCommand (default: ‘/usr/bin/scp %s %d’ )
The module uses the paramiko library for python SecSH support ( https://www.paramiko.org/ )