API Documentation

Config

Sarracenia

class sarracenia.Message[source]

A notification message in Sarracenia is stored as a python dictionary, with a few extra management functions.

The internal representation is very close to the v03 format defined here: https://metpx.github.io/sarracenia/Reference/sr_post.7.html

Unfortunately, sub-classing of dict means that to copy it from a dict will mean losing the type, and hence the need for the copyDict member.

__init__()[source]
__weakref__

list of weak references to the object (if defined)

computeIdentity(path, o, offset=0)[source]

check extended attributes for a cached identity sum calculation. if extended attributes are present, and * the file mtime is not too new, and * the cached sum us using the same method then use the cached value.

otherwise, calculate a checksum. set the file’s extended attributes for the new value. the method of checksum calculation is from options.identity.

sets the message ‘identity’ field if appropriate.

copyDict(d)[source]

copy dictionary into message.

dumps() str[source]

FIXME: used to be msg_dumps. print a message in a compact but relatively compact way. msg is a python dictionary. if there is a field longer than maximum_field_length, truncate.

static fromFileData(path, o, lstat=None)[source]

create a message based on a given file, calculating the checksum. returns a well-formed message, or none.

static fromFileInfo(path, o, lstat=None)[source]

based on the fiven information about the file (it’s name and a stat record if available) and a configuration options object (sarracenia.config.Config) return an sarracenia.Message suitable for placement on a worklist.

A message is a specialized python dictionary with a certain set of fields in it. The message returned will have the necessary fields for processing and posting.

The message is built for a file is based on the given path, options (o), and lstat (output of os.stat)

The lstat record is used to build ‘atime’, ‘mtime’ and ‘mode’ fields if timeCopy and permCopy options are set.

if no lstat record is supplied, then those fields will not be set.

static fromStream(path, o, data=None)[source]

Create a file and message for the given path. The file will be created or overwritten with the provided data. then invoke fromFileData() for the resulting file.

getContent()[source]

Retrieve the data referred to by a message. The data may be embedded in the messate, or this routine may resolve a link to an external server and download the data.

does not handle authentication. This routine is meant to be used with small files. using it to download large files may be very inefficient. Untested in that use-case.

Return value is the data.

setReport(code, text=None)[source]

FIXME: used to be msg_set_report set message fields to indicate result of action so reports can be generated.

set is supposed to indicate final message dispositions, so in the case of putting a message on worklist.failed… no report is generated, since it will be retried later. FIXME: should we publish an interim failure report?

updatePaths(options, new_dir=None, new_file=None)[source]

set the new_* fields in the message based on changed file placement. if new_* options are ommitted updaste the rest of the fields in the message based on their current values.

If you change file placement in a flow callback, for example. One would change new_dir and new_file in the message. This routines updates other fields in the message (e.g. relPath, baseUrl, topic ) to match new_dir/new_file.

msg[‘post_baseUrl’] defaults to msg[‘baseUrl’]

validate()[source]

FIXME: used to be msg_validate return True if message format seems ok, return True, else return False, log some reasons.

class sarracenia.Sarracenia[source]

Core utilities of Sarracenia. The main class here is sarracenia.Message. a Sarracenia.Message is subclassed from a dict, so for most uses, it works like the python built-in, but also we have a few major entry points some factoryies:

Building a message from a file

m = sarracenia.Message.fromFileData( path, options, lstat )

builds a notification message from a given existing file, consulting options, a parsed in memory version of the configuration settings that are applicable

Options

see the sarracenia.config.Config class for functions to parse configuration files and create corresponding python option dictionaries. One can supply small dictionaries for example:

options['topicPrefix'] = [ 'v02', 'post' ]
options['bindings'] = [ ('xpublic', [ 'v02', 'post'] , [ '#' ] )]
options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'

Above is an example of a minimal options dictionary taken from the tutorial example called moth_api_consumer.py. often

If you don’t have a file

If you don’t have a local file, then build your notification message with:

m = sarracenia.Message.fromFileInfo( path, options, lstat )

where you can make up the lstat values to fill in some fields in the message. You can make a fake lstat structure to provide these values using sarracenia.filemetadata class which is either an alias for paramiko.SFTPAttributes ( https://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_attr.SFTPAttributes ) if paramiko is installed, or a simple emulation if not.

from sarracenia.filemetadata import FmdStat

lstat = FmdStat() lstat.st_mtime= utcinteger second count in UTC (numeric version of a Sarracenia timestamp.) lstat.st_atime= lstat.st_mode=0o644 lstat.st_size= size_in_bytes

optional fields that may be of interest: lstat.filename= “nameOfTheFile” lstat.longname= ‘lrwxrwxrwx 1 peter peter 20 Oct 11 20:28 nameOfTheFile’

that you can then provide as an lstat argument to the above fromFileInfo() call. However the notification message returned will lack an identity checksum field. once you get the file, you can add the Identity field with:

m.computeIdentity(path, o):

In terms of consuming notification messages, the fields in the dictionary provide metadata for the announced resource. The anounced data could be embedded in the notification message itself, or available by a URL.

Messages are generally gathered from a source such as the Message Queueing Protocol wrapper class: moth… sarracenia.moth.

data = m.getContent()

will return the content of the announced resource as raw data.

__weakref__

list of weak references to the object (if defined)

class sarracenia.TimeConversions[source]

Time conversion routines.

  • os.stat, and time.now() return floating point

  • The floating point representation is a count of seconds since the beginning of the epoch.

  • beginning of epoch is platform dependent, and conversion to actual date is fraught (leap seconds, etc…)

  • Entire SR_* formats are text, no floats are sent over the protocol (avoids byte order issues, null byte / encoding issues, and enhances readability.)

  • str format: YYYYMMDDHHMMSS.msec goal of this representation is that a naive conversion to floats yields comparable numbers.

  • but the number that results is not useful for anything else, so need these special routines to get a proper epochal time.

  • also OK for year 2032 or whatever (rollover of time_t on 32 bits.)

  • string representation is forced to UTC timezone to avoid having to communicate timezone.

timestr2flt() - accepts a string and returns a float.

caveat

FIXME: this encoding will break in the year 10000 (assumes four digit year) and requires leading zeroes prior to 1000. One will have to add detection of the decimal point, and change the offsets at that point.

__weakref__

list of weak references to the object (if defined)

sarracenia.durationToSeconds(str_value, default=None) float[source]

this function converts duration to seconds. str_value should be a number followed by a unit [s,m,h,d,w] ex. 1w, 4d, 12h return 0.0 for invalid string.

sarracenia.durationToString(d) str[source]

given a numbner of seconds, return a short, human readable string.

sarracenia.stat(path) SFTPAttributes[source]

os.stat call replacement which improves on it by returning and SFTPAttributes structure, in place of the OS stat one, featuring:

  • mtime and ctime with subsecond accuracy

  • fields that can be overridden (not immutable.)

sarracenia.timeflt2str(f=None)[source]

timeflt2str - accepts a float and returns a string.

flow is a floating point number such as returned by time.now() (number of seconds since beginning of epoch.)

the str is YYYYMMDDTHHMMSS.sssss

20210921T011331.0123

translates to: Sept. 21st, 2021 at 01:13 and 31.0123 seconds. always UTC timezone.

Sarracenia.FlowCB

Callbacks from running Sarracenia.Flows

class sarracenia.flowcb.FlowCB(options, class_logger=None)[source]

Bases: object

Flow Callback is the main class for implementing plugin customization to flows.

sample activation in a configuration file:

flowCallback sarracenia.flowcb.name.Name

will instantiate an object of that type whose appropriately name methods will be called at the right time.

__init__ accepts options as an argument.

options is a sarracenia.config.Config object, used to override default behaviour

a setting is declared in a configuration file like so:

set sarracenia.flowcb.filter.log.Log.level debug

(the prefix for the setting matches the type hierarchy in flowCallback) the plugin should get the setting:

options.level = 'debug'

worklist given to on_plugins…

  • worklist.incoming –> new messages to continue processing

  • worklist.ok –> successfully processed

  • worklist.rejected –> messages to not be further processed.

  • worklist.failed –> messages for which processing failed. Failed messages will be retried.

  • worklist.directories_ok –> list of directories created during processing.

Initially, all messages are placed in incoming. if a plugin entry_point decides:

  • a message is not relevant, it is moved to the rejected worklist.

  • all processing has been done, it moves it to the ok worklist

  • an operation failed and it should be retried later, append it to the failed worklist

Do not remove any message from all lists, only move messages between them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received. Messages can only removed after ack.

def __init__(self,options) -> None:

Task: initialization of the flowCallback at instantiation time.

usually contains:

self.o = options

def ack(self,messagelist) -> None:

Task: acknowledge messages from a gather source.

def gather(self, messageCountMax) -> (gather_more, messages)

Task: gather messages from a source... return a tuple:

      * gather_more ... bool whether to continue gathering
      * messages ... list of messages

      or just return a list of messages.

      In a poll, gather is always called, regardless of vip posession.

      In all other components, gather is only called when in posession
      of the vip.

return (True, list)
 OR
return list

def after_accept(self,worklist) -> None:

Task: just after messages go through accept/reject masks,
      operate on worklist.incoming to help decide which messages to process further.
      and move messages to worklist.rejected to prevent further processing.
      do not delete any messages, only move between worklists.

def after_work(self,worklist) -> None:

Task: operate on worklist.ok (files which have arrived.)

All messages on the worklist.ok list have been acknowledged, so to suppress posting
of them, or futher processing, the messages must be removed from worklist.ok.

worklist.failed processing should occur in here as it will be zeroed out after this step.
The flowcb/retry.py plugin, for example, processes failed messages.

def destfn(self,msg) -> str:

Task: look at the fields in the message, and perhaps settings and
      return a new file name for the target of the send or download.

kind of a last resort function, exists mostly for sundew compatibility.
can be used for selective renaming using accept clauses.

def download(self,msg) -> bool:

Task: looking at msg['new_dir'], msg['new_file'], msg['new_inflight_file']
      and the self.o options perform a download of a single file.
      return True on a successful transfer, False otherwise.

      if self.o.dry_run is set, simulate the output of a download without
      performing it.

This replaces built-in download functionality, providing an override.
for individual file transfers. ideally you set checksums as you download.

def metricsReport(self) -> dict:

Return a dictionary of metrics. Example: number of messages remaining in retry queues.

def on_cleanup(self) -> None::

allow plugins to perform additional work after broker resources are eliminated. local state files are still present when this runs.

def on_declare(self) -> None::

local state files are still already present when this runs. allow plugins to perform additional work besides broker resource setup.

def on_housekeeping(self) -> None:

do periodic processing.

def on_start(self) -> None:

After the connection is established with the broker and things are instantiated, but
before any message transfer occurs.

def on_stop(self) -> None:

what it says on the tin... clean up processing when stopping.

def poll(self) -> list:

Task: gather messages from a destination... return a list of messages.
      works like a gather, but...

      When specified, poll replaces the built-in poll of the poll component.
      it runs only when the machine running the poll has the vip.
      in components other than poll, poll is never called.
return []

def post(self,worklist) -> None:

Task: operate on worklist.ok, and worklist.failed. modifies them appropriately.
      message acknowledgement has already occurred before they are called.

to indicate failure to process a message, append to worklist.failed.
worklist.failed processing should occur in here as it will be zeroed out after this step.

def send(self,msg) -> bool:

Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer
      of a single file.
      return True on a successful transfer, False otherwise.

      if self.o.dry_run is set, simulate the output of a send without
      performing it.

This replaces built-in send functionality for individual files.
def please_stop(self):

Pre-warn a flowcb that a stop has been requested, allowing processing to wrap up before the full stop happens.

__init__(options, class_logger=None)[source]
__weakref__

list of weak references to the object (if defined)

please_stop()[source]

flow callbacks should not time.sleep for long periods, but only nap and check between naps if a stop has been requested.

sarracenia.flowcb.load_library(factory_path, options)[source]

Loading the entry points for a python module. It searches the normal python module path using the importlib module.

the factory_path is a combined file specification with a dot separator with a special last entry being the name of the class within the file.

factory_path a.b.c.C

means import the module named a.b.c and instantiate an object of type C. In that class-C object, look for the known callback entry points.

or C might be guessed by the last class in the path not following python convention by not starting with a capital letter, in which case, it will just guess.

re note that the ~/.config/sr3/plugins will also be in the python library path, so modules placed there will be found, in addition to those in the package itself in the sarracenia/flowcb directory

callback foo -> foo.Foo

sarracenia.flowcb.foo.Foo

callback foo.bar -> foo.bar.Bar

sarracenia.flowcb.foo.bar.Bar foo.bar sarracenia.flowcb.foo.bar

Sarracenia.Moth

exception sarracenia.transfer.TimeoutException[source]

Bases: Exception

timeout exception

__weakref__

list of weak references to the object (if defined)

class sarracenia.transfer.Transfer(proto, options)[source]

Bases: object

This is a sort of abstract base class for implementing transfer protocols. Implemented subclasses include support for: local files, https, sftp, and ftp.

This class has routines that do i/o given descriptors opened by the sub-classes, so that each one does not need to re-implement copying, for example.

Each subclass needs to implement the following routines:

if downloading:

get    ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
getAccellerated( msg, remote_file, local_file, length )
ls     ()
cd     (dir)
delete (path)

if sending:

put    ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
putAccelerated ( msg, remote_file, local_file, length=0 )
cd     (dir)
mkdir  (dir)
umask  ()
chmod  (perm)
rename (old,new)

Note that the ls() call returns are polymorphic. One of:

  • a dictionary where the key is the name of the file in the directory, and the value is an SFTPAttributes structure for if (from paramiko.) (sftp.py as an example)

  • a dictionary where the key is the name of the file, and the value is a string that looks like the output of a linux ls command. (ftp.py as an example.)

  • a sequence of bytes… will be parsed as an html page. (https.py as an example)

The first format is the vastly preferred one. The others are fallbacks when the first is not available. The flowcb/poll/__init__.py lsdir() routing will turn ls tries to transform any of these return values into the first form (a dictionary of SFTPAttributes) Each SFTPAttributes structure needs st_mode set, and folders need stat.S_IFDIR set.

if the lsdir() routine gets a sequence of bytes, the on_html_page() and on_html_parser_init(, or perhaps handle_starttag(..) and handle_data() routines) will be used to turn them into the first form.

web services with different such formats can be accommodated by subclassing and overriding the handle_* entry points.

uses options (on Sarracenia.config data structure passed to constructor/factory.) * credentials - used to authentication information. * sendTo - server to connect to. * batch - how many files to transfer before a connection is torn down and re-established. * permDefault - what permissions to set on files transferred. * permDirDefault - what permission to set on directories created. * timeout - how long to wait for operations to complete. * byteRateMax - maximum transfer rate (throttle to avoid exceeding) * bufsize - size of buffers for file transfers.

__init__(proto, options)[source]
__weakref__

list of weak references to the object (if defined)

on_data(chunk) bytes[source]

transform data as it is being read. Given a buffer, return the transformed buffer. Checksum calculation is based on pre transformation… likely need a post transformation value as well.

sarracenia.transfer.alarm_set(time)[source]

FIXME: replace with set itimer for > 1 second resolution… currently rouding to nearest second.

class sarracenia.identity.Identity[source]

Bases: object

A class for algorithms to get a fingerprint for a file being announced. Appropriate fingerprinting algorithms vary according to file type.

required methods in subclasses:

def registered_as(self):

return a one letter string identifying the algorithm (mostly for v2.) in v3, the registration comes from the identity sub-class name in lower case.

def set_path(self,path):

start a checksum for the given path… initialize.

def update(self,chunk):

update the checksum based on the given bytes from the file (sequential access assumed.)

__weakref__

list of weak references to the object (if defined)

update_file(path)[source]

read the entire file, check sum it. this is kind of last resort as it cost an extra file read. It is better to call update( as the file is being read for other reasons.

property value

return the current value of the checksum calculation.