FlowCallback Reference

Documentation of the Flow callback plugins included with sr3.

sarracenia.flowcb

class sarracenia.flowcb.FlowCB(options, class_logger=None)[source]

Bases: object

Flow Callback is the main class for implementing plugin customization to flows.

sample activation in a configuration file:

flowCallback sarracenia.flowcb.name.Name

will instantiate an object of that type whose appropriately name methods will be called at the right time.

__init__ accepts options as an argument.

options is a sarracenia.config.Config object, used to override default behaviour

a setting is declared in a configuration file like so:

set sarracenia.flowcb.filter.log.Log.level debug

(the prefix for the setting matches the type hierarchy in flowCallback) the plugin should get the setting:

options.level = 'debug'

worklist given to on_plugins…

  • worklist.incoming –> new messages to continue processing

  • worklist.ok –> successfully processed

  • worklist.rejected –> messages to not be further processed.

  • worklist.failed –> messages for which processing failed. Failed messages will be retried.

  • worklist.directories_ok –> list of directories created during processing.

Initially, all messages are placed in incoming. if a plugin entry_point decides:

  • a message is not relevant, it is moved to the rejected worklist.

  • all processing has been done, it moves it to the ok worklist

  • an operation failed and it should be retried later, append it to the failed worklist

Do not remove any message from all lists, only move messages between them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received. Messages can only removed after ack.

def __init__(self,options) -> None:

Task: initialization of the flowCallback at instantiation time.

usually contains:

self.o = options

def ack(self,messagelist) -> None:

Task: acknowledge messages from a gather source.

def gather(self, messageCountMax) -> (gather_more, messages)

Task: gather messages from a source... return a tuple:

      * gather_more ... bool whether to continue gathering
      * messages ... list of messages

      or just return a list of messages.

      In a poll, gather is always called, regardless of vip posession.

      In all other components, gather is only called when in posession
      of the vip.

return (True, list)
 OR
return list

def after_accept(self,worklist) -> None:

Task: just after messages go through accept/reject masks,
      operate on worklist.incoming to help decide which messages to process further.
      and move messages to worklist.rejected to prevent further processing.
      do not delete any messages, only move between worklists.

def after_work(self,worklist) -> None:

Task: operate on worklist.ok (files which have arrived.)

All messages on the worklist.ok list have been acknowledged, so to suppress posting
of them, or futher processing, the messages must be removed from worklist.ok.

worklist.failed processing should occur in here as it will be zeroed out after this step.
The flowcb/retry.py plugin, for example, processes failed messages.

def destfn(self,msg) -> str:

Destination File Name (DESTFNSCRIPT)  routines.

Task: look at the fields in the message, and perhaps settings and
      return a new file name for the target of the send or download.

kind of a last resort function, exists mostly for sundew compatibility.
can be used for selective renaming using accept clauses.

def download(self,msg) -> bool:

Task: looking at msg['new_dir'], msg['new_file'], msg['new_inflight_file']
      and the self.o options perform a download of a single file.
      return True on a successful transfer, False otherwise.

      if self.o.dry_run is set, simulate the output of a download without
      performing it.

This replaces built-in download functionality, providing an override.
for individual file transfers. ideally you set checksums as you download.

def metricsReport(self) -> dict:

Return a dictionary of metrics. Example: number of messages remaining in retry queues.

def on_cleanup(self) -> None::

allow plugins to perform additional work after broker resources are eliminated. local state files are still present when this runs.

def on_declare(self) -> None::

local state files are still already present when this runs. allow plugins to perform additional work besides broker resource setup.

def on_housekeeping(self) -> None:

do periodic processing.

def on_start(self) -> None:

After the connection is established with the broker and things are instantiated, but
before any message transfer occurs.

def on_stop(self) -> None:

what it says on the tin... clean up processing when stopping.

def poll(self) -> list:

Task: gather messages from a destination... return a list of messages.
      works like a gather, but...

      When specified, poll replaces the built-in poll of the poll component.
      it runs only when the machine running the poll has the vip.
      in components other than poll, poll is never called.
return []

def post(self,worklist) -> None:

Task: operate on worklist.ok, and worklist.failed. modifies them appropriately.
      message acknowledgement has already occurred before they are called.

to indicate failure to process a message, append to worklist.failed.
worklist.failed processing should occur in here as it will be zeroed out after this step.

def send(self,msg) -> bool:

Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer
      of a single file.
      return True on a successful transfer, False otherwise.

      if self.o.dry_run is set, simulate the output of a send without
      performing it.

This replaces built-in send functionality for individual files.
def please_stop(self):

Pre-warn a flowcb that a stop has been requested, allowing processing to wrap up before the full stop happens.

__init__(options, class_logger=None)[source]
__weakref__

list of weak references to the object (if defined)

please_stop()[source]

flow callbacks should not time.sleep for long periods, but only nap and check between naps if a stop has been requested.

sarracenia.flowcb.load_library(factory_path, options)[source]

Loading the entry points for a python module. It searches the normal python module path using the importlib module.

the factory_path is a combined file specification with a dot separator with a special last entry being the name of the class within the file.

factory_path a.b.c.C

means import the module named a.b.c and instantiate an object of type C. In that class-C object, look for the known callback entry points.

or C might be guessed by the last class in the path not following python convention by not starting with a capital letter, in which case, it will just guess.

re note that the ~/.config/sr3/plugins will also be in the python library path, so modules placed there will be found, in addition to those in the package itself in the sarracenia/flowcb directory

callback foo -> foo.Foo

sarracenia.flowcb.foo.Foo

callback foo.bar -> foo.bar.Bar

sarracenia.flowcb.foo.bar.Bar foo.bar sarracenia.flowcb.foo.bar

sarracenia.flowcb.accept

sarracenia.flowcb.accept modules are ones where the main focus is on the after_accept entry point. This entry point is called after reject & accept rules have been called, and typically after duplicate suppression has also been applied.

They can be used to further refine which files should be downloaded, by moving messages from the worklist.incoming to worklist.rejected.

sarracenia.flowcb.accept.delete

Plugin delete.py:

the default after_accept handler for log. prints a simple notice. # FIXME is this doc accurate? seems it is also modifying a message.

Usage:

flowcb sarracenia.flowcb.accept.delete.Delete

class sarracenia.flowcb.accept.delete.Delete(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.download

Place holder customized flow callback classes whose focus is the download() entry point.

download(self,msg):

Task: looking at msg[‘new_dir’], msg[‘new_path’], msg[‘new_inflight_path’]

and the self.o options perform a download of a single file. return True on a successful transfer, False otherwise.

if self.o.dry_run is set, simulate the output of a download without performing it.

downlaod is called by the sarra and subscribe components, and can be used to override the built-in methods for downloading a file.

It does the download based on the fields in the message provided:

retreival path: msg[‘baseUrl’] + ‘/’ + msg[‘relPath’]

taking care of the inflight setting, the inflight/temporary file: is defined for use by the download routing: msg[‘new_inflight_path’]

Final local location to store is defined by:

msg[‘new_path’] == msg[‘new_dir’] + ‘/’ + msg[‘new_file’]

This replaces built-in download functionality, providing an override. for individual file transfers. ideally you set checksums as you download.

looking at self.o.identity_method to establish download checksum algorithm. might have to allow for cod… say it is checksum_method:

checksum = sarracenia.identity.Identity.factory(self.o.checksum_method)
while downloading:
    checksum.update(chunk)

where chunk is the bytes read. It saves a file read to calculate the checksum during the download.

if the checksum does not match what is in the received message, then it is imperative, to avoid looping, to apply the actual checksum of the data to the message:

msg[‘identity’] = { ‘method’: checksum_method, ‘value’: checksum.get_sumstr() }

return Boolean success indicator. if False, download will be attempted again and/or appended to retry queue.

sarracenia.flowcb.accept.downloadbaseurl

Plugin downloadbaseurl.py:

Downloads files sourced from the baseUrl of the poster, and saves them in the directory specified in the config. Created to use with the poll_nexrad.py plugin to download files uploaded in the NEXRAD US Weather Radar public dataset. Compatible with Python 3.5+.

Example

A sample do_download option for subscribe.

Downloads the file located at message[‘baseUrl’] and saves it

Usage:

flowcb sarracenia.flowcb.accept.downloadbaseurl.DownloadBaseUrl

class sarracenia.flowcb.accept.downloadbaseurl.DownloadBaseUrl(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.hourtree

Plugin hourtree.py:

When receiving a file, insert an hourly directory into the local delivery path hierarchy.

Example

input A/B/c.gif –> output A/B/<hour>/c.gif

Usage:

flowcb sarracenia.flowcb.accept.hourtree.HourTree

class sarracenia.flowcb.accept.hourtree.HourTree(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.httptohttps

Plugin httptohttps.py:

This plugin simply turns messages with baseUrl http://… into https://

The process would need to be restarted. From now on, all http messages that would be consumed, would be turned into an https message. The remaining of the process will treat the message as if posted that way. That plugin is an easy way to turn transaction between dd.weather.gc.ca and the user into secured https transfers.

Usage:

flowcb sarracenia.flowcb.accept.httptohttps.HttpToHttps

class sarracenia.flowcb.accept.httptohttps.HttpToHttps(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.longflow

Plugin longflow.py:

This plugin is strictly for self-test purposes. Creates a header ‘toolong’ that is longer than 255 characters, and so gets truncated. Each header in a message that is too long should generate a warning message in the sarra log. flow_test checks for the ‘truncated’ error message. Put some utf characters in there to make it interesting… (truncation complex.)

Usage:

flowcb sarracenia.flowcb.accept.longflow.LongFlow

class sarracenia.flowcb.accept.longflow.LongFlow(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.posthourtree

Plugin posthourtree.py:

When posting a file, insert an hourly directory into the delivery path hierarchy.

Example

input A/B/c.gif –> output A/B/<hour>/c.gif

Usage:

callback accept.posthourtree

class sarracenia.flowcb.accept.posthourtree.Posthourtree(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.postoverride

Plugin postoverride.py:

Override message header for products that are posted. This can be useful or necessary when re-distributing beyond the original intended destinations.

Example

for example company A delivers to their own DMZ server. ACME is a client of them, and so subscribes to the ADMZ server, but the to_cluster=ADMZ, when ACME downloads, they need to override the destination to specify the distribution within ACME. * postOverride to_clusters ACME * postOverrideDel from_cluster

Usage:

flowcb sarracenia.flowcb.accept.postoverride.PostOverride postOverride x y postOverrideDel z

class sarracenia.flowcb.accept.postoverride.PostOverride(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.printlag

Plugin printlag.py:

This is an after_accept plugin. print a message indicating how old messages received are. this should be used as an on_part script. For each part received it will print a line in the local log that looks like this:

2015-12-23 22:54:30,328 [INFO] posted: 20151224035429.115 (lag: 1.21364 seconds ago) to deliver: /home/peter/test/dd/bulletins/alphanumeric/20151224/SA/EGGY/03/SAUK32_EGGY_240350__EGAA_64042,

the number printed after “lag:” the time between the moment the message was originally posted on the server, and the time the script was called, which is very near the end of writing the file to local disk.

This can be used to gauge whether the number of instances or internet link are sufficient to transfer the data selected. if the lag keeps increasing, then something must be done.

Usage:

flowcb sarracenia.flowcb.accept.printlag.PrintLag

class sarracenia.flowcb.accept.printlag.PrintLag(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.rename4jicc

class sarracenia.flowcb.accept.rename4jicc.Rename4Jicc(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.renamedmf

Plugin renamedmf.py:

This is an example of the usage of after_accept script to rename the product when it is downloaded It adds a ‘:datetime’ to the name of the product unlikely to be useful to others, except as example.

Example

takes px name : CACN00_CWAO_081900__WZS_14623:pxatx:CWAO:CA:5:Direct:20081008190602 add datetimestamp : CACN00_CWAO_081900__WZS_14623:pxatx:CWAO:CA:5:Direct:20081008190602:20081008190612

Usage:

flowcb sarracenia.flowcb.accept.renamedmf.RenameDMF

class sarracenia.flowcb.accept.renamedmf.RenameDMF(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.renamewhatfn

Plugin renamewhatfn.py:

This plugin is no longer needed. Sundew compoatibility was added to Sarracenia, so now can get the same effect by using the filename option which works like it does in Sundew:

filename WHATFN

what it was used for: This renamer strips everything from the first colon in the file name to the end. This does the same thing as a ‘WHATFN’ config on a sundew sender.

Example

takes px name : /apps/dms/dms-metadata-updater/data/international_surface/import/mdicp4d:pull-international-metadata:CMC:DICTIONARY:4:ASCII:20160223124648 rename for : /apps/dms/dms-metadata-updater/data/international_surface/import/mdicp4d

Usage:

flowcb sarracenia.flowcb.accept.renamewhatfn.RenameWhatFn

class sarracenia.flowcb.accept.renamewhatfn.RenameWhatFn(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.save

Plugin save.py:

Converts a consuming component into one that writes the queue into a file. If there is some sort of problem with a component, then add callback save and restart.

The messages will accumulate in a save file in ~/.cache/<component>/<config>/ ??<instance>.save When the situation is returned to normal (you want the component to process the data as normal): * remove the callback save * note the queue this configuration is using to read (should be in the log on startup.) * run an sr_shovel with -restore_to_queue and the queue name.

Options:

save.py takes ‘msgSaveFile’ as an argument. “_9999.sav” will be added to msgSaveFile, where the 9999 represents the instance number. As instances run in parallel, rather than sychronizing access, just writes to one file per instance.

Usage:

callback accept.save msgSaveFile x

class sarracenia.flowcb.accept.save.Save(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.speedo

Plugin speedo.py:

Gives a speedometer reading on the messages going through an exchange. as this is an after_accept Accumulate the number of messages and the bytes they represent over a period of time.

Options:

msgSpeedoInterval -> how often the speedometer is updated. (default: 5) msg_speedo_maxlag -> if the message flow indicates that messages are ‘late’, emit warnings. (default 60)

Usage:

callback accept.speedo msgSpeedoInterval x msg_speedo_maxlag y

class sarracenia.flowcb.accept.speedo.Speedo(options)[source]

Bases: FlowCB

__init__(options)[source]

set defaults for options. can be overridden in config file.

sarracenia.flowcb.accept.sundewpxroute

Plugin sundewpxroute.py:

Implement message filtering based on a routing table from MetPX-Sundew. Make it easier to feed clients exactly the same products with sarracenia, that they are used to with sundew.

Usage:

the pxrouting option must be set in the configuration before the plugin is configured, like so: * pxRouting /local/home/peter/src/pdspx/routing/etc/pxRouting.conf * pxClient navcan-amis flowcb sarracenia.flowcb.accept.sundewpxroute.SundewPxRoute

class sarracenia.flowcb.accept.sundewpxroute.SundewPxRoute(options)[source]

Bases: FlowCB

__init__(options)[source]

For World Meteorological Organization message oriented routing. Read the configured metpx-sundew routing table, and figure out which Abbreviated Header Lines (AHL’s) are configured to be sent to ‘target’ being careful to account for membership in clientAliases.

init sets ‘ahls_to_route’ according to the contents of pxrouting

sarracenia.flowcb.accept.testretry

Plugin testretry.py:

This changes the message randomly so that it will cause a download or send an error. When a message is being retried, it is randomly fixed

Usage:

flowcb sarracenia.flowcb.accept.testretry.TestRetry

class sarracenia.flowcb.accept.testretry.TestRetry(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.toclusters

Plugin toclusters.py:

Implements inter-pump routing by filtering destinations. This is placed on a sarra process copying data between pumps. Whenever it gets a message, it looks at the destination and processing only continues if it is beleived that that message is a valid destination for the local pump.

Options:
The local pump will select messages destined for the DD or DDI clusters, and reject those for DDSR, which isn’t in the list.
  • msgToClusters DDI

  • msgToClusters DD

Usage:

flowcb sarracenia.flowcb.accept.toclusters.ToClusters msgToClusters x msgToClusters y …

class sarracenia.flowcb.accept.toclusters.ToClusters(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.tohttp

sarracenia.flowcb.accept.tolocal

sarracenia.flowcb.accept.tolocalfile

Plugin tolocalfile.py:

This is a helper script to work with converters (filters) and senders.

What a data pump advertises, it will usually use Web URL, but if one is on a server where the files are available, it is more efficient to access them as local files, and so this plugin turn the message’s notice Web URL into a File URL (file:/d1/d2/…/fn)

Normal Usage:

A Web URL in an amqp message is hold in the following values: message[‘baseUrl’] (ex.: http://localhost) and message[‘relPath’] (ex.: /<data>/<src>/d3/…/fn)

We will save these values before their modification : message[‘saved_baseUrl’] = message[‘baseUrl’] message[‘saved_relPath’] = message[‘relPath’]

We will then turn them into an absolute File Url: (Note if a baseDir was set it prefix the relPath) message[‘baseUrl’] = ‘file:’ message[‘relPath’] = [baseDir] + message[‘relPath’]

Example

baseDir /var/www/html

message pubtime=20171003131233.494 baseUrl=http://localhost relPath=/20171003/CMOE/productx.gif

flowcb sarracenia.flowcb.accept.tolocalfile.ToLocalFile

will receive this:: * message[‘baseUrl’] is ‘http://localhost’ * message[‘relPath’] is ‘/20171003/CMOE/GIF/productx.gif’

  • will copy/save these values

  • message[‘saved_baseUrl’] = message[‘baseUrl’]

  • message[‘saved_relPath’] = message[‘relPath’]

  • turn the original values into a File URL

  • message[‘baseUrl’] = ‘file:’

  • if parent[‘baseDir’] : * message[‘relPath’] = parent[‘baseDir’] + ‘/’ + message[‘relPath’] * message[‘relPath’] = message[‘relPath’].replace(‘//’,’/’)

A sequence of after_accept plugins can perform various changes to the messages and/or to the product… so here lets pretend we have an after_accept plugin that converts gif to png and prepares the proper message for it

flowcb sarracenia.flowcb.accept.giftopng.GifToPng After the tolocalfile this script could perform something like:

# build the absolute path of the png product
new_path = message['relPath'].replace('GIF','PNG')
new_path[-4:] = '.png'

# proceed to the conversion gif2png
ok = self.gif2png(gifpath=message.relPath,pngpath=new_path)

change the message to announce the new png product:

if ok :
    message['baseUrl'] = message['saved_baseUrl']
message['relPath'] = new_path
if self.o.baseDir :
    message['relPath'] = new_path.replace(self.o.baseDir,'',1)
else :
    logger.error(...
# we are ok... proceed with this png file
Usage:

flowcb sarracenia.flowcb.accept.tolocalfile.ToLocalFile

class sarracenia.flowcb.accept.tolocalfile.ToLocalFile(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.accept.wmotypesuffix

Plugin wmotypesuffix.py:

Given the WMO-386 TT designator of a WMO file, file type suffix to the file name. Web browsers and modern operating systems may do the right thing if files have a recognizable suffix.

http://www.wmo.int/pages/prog/www/ois/Operational_Information/Publications/WMO_386/AHLsymbols/TableA.html

Status: proof of concept demonstrator… missing many TT’s. please add! Tested with UNIDATA feeds, discrepancies: TableA says L is Aviation XML, but UNIDATA Feed, it is all GRIB. XW - should be CAP, but is GRIB. IX used by Americans for HDF, unsure if that is kosher/halal/blessed, but it is in the UNIDATA feed.

IU/IS/IB are BUFR other type designators welcome… for example, GRIB isn’t identified yet. default to .txt.

Usage:

flowcb sarracenia.flowcb.accept.wmotypesuffix.WmoTypeSuffix

class sarracenia.flowcb.accept.wmotypesuffix.WmoTypeSuffix(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.clamav

A sample on_part plugin to perform virus scanning, using the ClamAV engine.

requires a clamd binding package to be installed. On debian derived systems:

sudo apt-get install python3-pyclamd

on others:

pip3 install pyclamd

author: Peter Silva

class sarracenia.flowcb.clamav.Clamav(options)[source]

Bases: FlowCB

Invoke ClamAV anti-virus scanning on files as they pass through a data pump. when it is invoked depends on the component it is used from.

from a sender, post, or poll, the scan should stop processing prior to the transfer.

for other components, subsscribers that download, it needs to take place after downloading.

__init__(options) None[source]

sarracenia.flowcb.destfn.sample

a destfn plugin script is used by senders or subscribers to do complex file naming. this is an API demonstrator that prefixes the name delivered with ‘renamed_’:

filename DESTFNSCRIPT=sarracenia.flowcb.destfn.sample.Sample

An alternative method of invocation is to apply it selectively:

accept k* DESTFNSCRIPT=sarracenia.flowcb.destfn.sample.Sample

As with other flowcb plugins, the import will be done using normal python import mechanism equivalent to:

import sarracenia.flowcb.destfn.sample

and then in that class file, there is a Sample class, the sample class contains the destfn method, or entry_point.

The destfn routine consults the fields in the given message, and based on them, return a new file name for the file to have after transfer (download or send.)

the routines have access to the settings via options provided to init, accessed, by convention, as self.o.

The routine can also modify fields create new ones in the message.

the destfn routine returns the new name of the file.

class sarracenia.flowcb.destfn.sample.Sample(options, class_logger=None)[source]

Bases: FlowCB

sarracenia.flowcb.filter

a module that is focused on transforming data should be called a filter. Filter plugins intended for after_accept(self, worklist) entry_point.

At that point: Messages have been gathered, then passed through the accept and reject pattern matches. One of the first callbacks is the nodupe, so that duplicate suppression may cause additional rejections.

sarracenia.flowcb.filter.deleteflowfiles

class sarracenia.flowcb.filter.deleteflowfiles.DeleteFlowFiles(options, class_logger=None)[source]

Bases: FlowCB

This is a custom callback class for the sr_insects flow tests. delete files for messages in two directories.

sarracenia.flowcb.filter.fdelay

This plugin delays processing of messages by message_delay seconds

sarracenia.flowcb.msg.fdelay 30 import sarracenia.flowcb.filter.fdelay.Fdelay

or more simply:

fdelay 30 callback filter.fdelay

every message will be at least 30 seconds old before it is forwarded by this plugin. in the meantime, the message is placed on the retry queue by marking it as failed.

class sarracenia.flowcb.filter.fdelay.Fdelay(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.filter.pclean_f90

msg_pclean_f90 module: file propagation test for Sarracenia components (in flow test) https://github.com/MetPX/sr_insects/

class sarracenia.flowcb.filter.pclean_f90.PClean_F90(options)[source]

Bases: PClean

functionality within the flow_tests of the sr_insects project. This plugin class receive a msg from xflow_public and check propagation of the underlying file

  • it checks if the propagation was ok

  • it randomly set a new test file with a different type in the watch dir (f31 amqp)

  • it posts the product to be treated by f92

  • when the msg for the extension file comes back, recheck the propagation

When a product is not fully propagated, the error is reported and the test is considered as a failure. It also checks if the file differs from original

sarracenia.flowcb.filter.pclean_f92

msg_pclean_f90 module: second file propagation test for Sarracenia components (in flow test)

class sarracenia.flowcb.filter.pclean_f92.PClean_F92(options)[source]

Bases: PClean

This plugin that manage the removal of every file

  • it fails if one removal failed

sarracenia.flowcb.filter.wmo2msc

wmo2msc.py is an on_message plugin script to convert WMO bulletins on local disk to MSC internal format in an alternate tree. It is analogous to Sundew’s ‘bulletin-file’. Meant to be called as an sr_shovel plugin.

It prints an output line:

wmo2msc: <input_file> -> <output_file> (<detected format>)

usage:

Use the directory setting to know the root of tree where files are placed. FIXME: well, likely what you really what is something like:

<date>/<source>/dir1/dir2/dir3

<date>/<source>/dir1/dir2/newdir4/...

-- so Directory doesn't cut it.

In a sr_shovel configuration:

directory /....
callback filter.wmo2msc

Parameters:

  • filter_wmo2msc_replace_dir old,new

  • filter_wmo2msc_uniquify hash|time|anything else - whether to add a string in addition to the AHL to make the filename unique. - hash - means apply a hash, so that the additional string is content based. - if time, add a suffix _YYYYMMDDHHMMSS_99999 which ensures file name uniqueness. - otherwise, no suffix will be added. - default: hash

  • filter_wmo2msc_convert on|off if on, then traditional conversion to MSC-BULLETINS is done as per TANDEM/APPS & MetPX Sundew this involves n as termination character, and other charater substitutions.

  • filter_wmo2msc_tree on|off if tree is off, files are just placed in destination directory. if tree is on, then the file is placed in a subdirectory tree, based on the WMO 386 AHL:

    TTAAii CCCC YYGGgg  ( example: SACN37 CWAO 300104 )
    
    TT = SA - surface observation.
    AA = CN - Canada ( but the AA depends on TT value, in many cases not a national code. )
    ii = 37 - a number.. there are various conventions, they are picked to avoid duplication.
    

    The first line of the file is expected to contain an AHL. and when we build a tree from it, we build it as follows:

    TT/CCCC/GG/TTAAii_CCCC_YYGGgg_<uniquify>
    

    assuming tree=on, uniquify=hash:

    SA/CWAO/01/SACN37_CWAO_300104_1c699da91817cc4a84ab19ee4abe4e22

NOTE: Look at the end of the file for SUPPLEMENTARY INFORMATION

including hints about debugging.

class sarracenia.flowcb.filter.wmo2msc.Wmo2msc(options)[source]

Bases: FlowCB

__init__(options)[source]
doSpecificProcessing()[source]

Modify bulletins received from Washington via the WMO socket protocol. started as a direct copy from sundew of routine with same name in bulletinManagerWmo.py

  • encode/decode, and binary stuff came because of python3

replaceChar(oldchar, newchar)[source]

replace all occurrences of oldchar by newchar in the the message byte stream. started as a direct copy from sundew of routine with same name in bulletin.py - the storage model is a bit different, we store the entire message as one bytearray. - sundew stored it as a series of lines, so replaceChar implementation changed.

sarracenia.flowcb.gather

sarracenia.flowcb.gather.logger = <Logger sarracenia.flowcb.gather (WARNING)>

This file was originally a place holder… thought this was not needed, then there was a stack trace. and setup.py gets leaves the directory out if not present.

then started finding this convenient for common routines.

sarracenia.flowcb.gather.file

class sarracenia.flowcb.gather.file.File(options)[source]

Bases: FlowCB

read the file system, create messages for the files you find.

this is taken from v2’s sr_post.py

FIXME FIXME FIXME FIXME: the sr_post version would post files on the fly as it was traversing trees. so it was incremental and had little overhead. This one is does the whole recursion in one gather.

It will fail horribly for large trees. Need to re-formulate to replace recursion with interation. perhaps a good time to use python iterators.

also should likely switch from listdir to scandir

__init__(options)[source]
gather(messageCountMax)[source]

from sr_post.py/run

FIXME: really bad performance with large trees: It scans an entire tree before emitting any messages. Need to re-factor with iterator style so produce result in batch sized chunks incrementally.

post1file(path, lstat, is_directory=False) list[source]

create the notification message for a single file, based on the lstat metadata.

when lstat is present it is used to decide whether the file is an ordinary file, a link or a directory, and the appropriate message is built and returned.

if the lstat metadata is None, then that signifies a “remove” message to be created. In the remove case, without the lstat, one needs the is_directory flag to decide whether it is an ordinary file remove, or a directory remove. is_directory is not used other than for the remove case.

The return value is a list that usually contains a single message. It is a list to allow for if options are combined such that a symbolic link and the realpath it posts to may involve multiple messages for a single file. Similarly in the multi-block transfer case.

process_event(event, src, dst)[source]

return a tuple: pop? + list of messages.

walk(src)[source]

walk directory tree returning 1 message for each file in it.

walk_priming(p)[source]

Find all the subdirectories of the given path, start watches on them. deal with symbolically linked directories correctly

class sarracenia.flowcb.gather.file.SimpleEventHandler(parent)[source]

Bases: PatternMatchingEventHandler

__init__(parent)[source]

sarracenia.flowcb.gather.message

class sarracenia.flowcb.gather.message.Message(options)[source]

Bases: FlowCB

gather messages from a sarracenia.moth message queuing protocol source.

__init__(options) None[source]
gather(messageCountMax) list[source]
Returns:

a list of messages obtained from this source.

Return type:

True … you can gather from other sources. and

please_stop() None[source]

pass stop request along to consumer Moth instance(s)

sarracenia.flowcb.housekeeping

plugins intended for on_message entry_point.

(when messages are received.)

sarracenia.flowcb.housekeeping.resources

Default on_housekeeping handler that: - Logs Memory and CPU usage. - Restarts components to deal with memory leaks.

If MemoryMax is not in the config, it is automatically calculated with the following procedure:

  1. The plugin processes the first MemoryBaseLineFile items to reach a steady state. - Subscribers process messages-in - Posting programs process messages-posted

  2. Set MemoryMax threshold to MemoryMultiplier * (memory usage at time steady state)

If memory use ever exceeds the MemoryMax threshold, then the plugin triggers a restart, reducing memory consumption.

Parameters:

MemoryMaxsize (default: none)

Hard coded maximum for tolerable memory consumption. Must be suffixed with k/m/g for Kilo/Mega/Giga byte values. If not set then the following options will have an effect:

MemoryBaseLineFileint, optional (default: 100)

How many files to process before measuring to establish the baseline memory usage. (how many files are expected to process before a steady state is reached)

MemoryMultiplierint, optional (default: 3)

How many times past the steady state memory footprint you want to allow the component to grow before restarting. It could be normal for memory usage to grow, especially if plugins store data in memory.

returns:

Nothing, restarts components if memory usage is outside of configured thresholds.

class sarracenia.flowcb.housekeeping.resources.Resources(options)[source]

Bases: FlowCB

__init__(options)[source]
restart()[source]

Do an in-place restart of the current process (keeps pid). Gets a new memory stack/heap, keeps all file descriptors but replaces the buffers.

threshold

Per-process maximum memory footprint that is considered too large, forcing a process restart.

sarracenia.flowcb.log

class sarracenia.flowcb.log.Log(options)[source]

Bases: FlowCB

The logging flow callback class. logs message at the indicated time. Controlled by:

logEvents - which entry points to emit log messages at.

logMessageDump - print literal messages when printing log messages.

every housekeeping interval, prints:

  • how many messages were received, rejected, %accepted.

  • number of files transferred, their size, and rate in files/s and bytes/s

  • lag: some information about how old the messages are when processed

__init__(options)[source]

sarracenia.flowcb.mdelaylatest

class sarracenia.flowcb.mdelaylatest.MDelayLatest(options)[source]

Bases: FlowCB

This plugin delays processing of messages by message_delay seconds If multiple versions of a file are published within the interval, only the latest one will be published.

mdelay 30 flowcb sarracenia.flowcb.mdelaylatest.MDelayLatest

every message will be at least 30 seconds old before it is forwarded by this plugin. In the meantime, the message is placed on the retry queue by marking it as failed.

__init__(options)[source]

sarracenia.flowcb.nodupe

class sarracenia.flowcb.nodupe.NoDupe(options, class_logger=None)[source]

Bases: FlowCB

duplicate suppression family of modules.

invoked with:

callback sarracenia.flowcb.nodupe.disk

or: callback sarracenia.flowcb.nodupe.redis

with default being loaded depdending on the presence of a

nodupe_driver “redis”

setting (defaults to disk.)

sarracenia.flowcb.nodupe.data

class sarracenia.flowcb.nodupe.data.Data(options, class_logger=None)[source]

Bases: FlowCB

duplicate suppression based on data alone. Overrides the path used for lookups in the cache so that all files have the same name, and so if the checksum is the same, regardless of file name, it is considered a duplicate.

sarracenia.flowcb.nodupe.name

class sarracenia.flowcb.nodupe.name.Name(options, class_logger=None)[source]

Bases: FlowCB

Override the the comparison so that files with the same name, regardless of what directory they are in, are considered the same. This is useful when receiving data from two different sources (two different trees) and winnowing between them.

sarracenia.flowcb.retry

class sarracenia.flowcb.retry.Retry(options)[source]

Bases: FlowCB

overall goal:

  • When file transfers fail, write the messages to a queue to be retried later. There is also a second retry queue for failed posts.

how it works:

  • the after_accept checks how many incoming messages we received. If there is a full batch to process, don’t try to retry any.

  • if there is room, then fill in the batch with some retry requests.

  • when after_work is called, the worklist.failed list of messages is the files where the transfer failed. write those messages to a retry queue.

  • the DiskQueue or RedisQueue classes are used to store the retries, and it handles expiry on each housekeeping event.

__init__(options) None[source]
after_accept(worklist) None[source]

If there are only a few new messages, get some from the download retry queue and put them into worklist.incoming.

Do this in the after_accept() entry point if retry_refilter is False.

after_post(worklist) None[source]

Messages in worklist.failed should be put in the post retry queue.

after_work(worklist) None[source]

Messages in worklist.failed should be put in the download retry queue. If there are only a few new messages, get some from the post retry queue and put them into worklist.ok.

gather(qty) None[source]

If there are only a few new messages, get some from the download retry queue and put them into worklist.incoming.

Do this in the gather() entry point if retry_refilter is True.

metricsReport() dict[source]

Returns the number of messages in the download_retry and post_retry queues.

Returns:

containing metrics: {'msgs_in_download_retry': (int), 'msgs_in_post_retry': (int)}

Return type:

dict

sarracenia.flowcb.pclean

msg_pclean module: base module for propagation tests and cleanup for Sarracenia components (in flow test) Used by https://github.com/MetPX/sr_insects test suite for CI/CD.

class sarracenia.flowcb.pclean.PClean(options)[source]

Bases: FlowCB

Base plugin class that is used in shovel pclean_f9x:

  • it checks if the propagation was ok.

  • it randomly set a test in the watch f40.conf for propagation

  • it posts the product again (more test in shovel clean_f91) which is propagated too

  • it remove the original product

It also uses a file delay to tolerate a maximum lag for the test

The posted message contains a tag in the header for the test performed which is the extension used for the test

__init__(options)[source]
build_path_dict(fxx_dirs, relpath, ext='')[source]

This build paths necessary to pclean tests

It is a subset of all flow test path based on fxx download directory provided.

Parameters:
  • root – usually the sarra dev doc root directory

  • fxx_dirs – a list of the flow test directory needed

  • relpath – the relative path of the file (starting with the date) without the forward slash

  • ext – the extension from the extension test (optional)

Returns:

a dictionnary of all paths built

get_extension(relpath)[source]

Check whether the extension is in the header

Parameters:

msg – the msg used for the test

Returns:

the value corresponding to the extension key in the msg header

sarracenia.flowcb.sample

class sarracenia.flowcb.sample.Sample(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.scheduled

class sarracenia.flowcb.scheduled.Scheduled(options, class_logger=<Logger sarracenia.flowcb.scheduled (WARNING)>)[source]

Bases: FlowCB

Scheduled flow callback plugin arranges to post url’s at scheduled times.

usage:

In the configuration file, need:

callback scheduled

and the schedule can be a specified as:

  • scheduled_interval 1m (once a minute) a duration

  • scheduled_hour 4,9 at 4Z and 9Z every day.

  • scheduled_minute 33,45 within scheduled hours which minutes.

Scheduled_interval takes precedence over the others, making it easier to specify an interval for testing/debugging purposes.

use in code (for subclassing):

from sarracenia.scheduled import Scheduled

class hoho(Scheduled):

replace the gather() routine… keep the top lines “until time to run” replace whatever is below. will only run when it should.

__init__(options, class_logger=<Logger sarracenia.flowcb.scheduled (WARNING)>)[source]
update_appointments(when)[source]

# make a flat list from values where comma separated on a single or multiple lines.

set self.appointments to a list of when something needs to be run during the current day.

sarracenia.flowcb.script

sarracenia.flowcb.shiftdir2baseurl

class sarracenia.flowcb.shiftdir2baseurl.ShiftDir2baseUrl(options)[source]

Bases: FlowCB

modify message to shift directories from relPath to baseUrl:

given the setting shiftDir2baseUrl == 2 and given message with

baseDir=https://a relPath=b/c/d/e subtopic=b/c/d –> baseDir=https://a/b/c relPath=d/e subtopic=d

__init__(options)[source]

sarracenia.flowcb.poll

class sarracenia.flowcb.poll.Poll(options, class_logger=<Logger sarracenia.flowcb.poll (WARNING)>)[source]

Bases: FlowCB

The Poll flow callback class implements the main logic for polling remote resources. the poll routine returns a list of messages for new files to be filtered.

when instantiated with options, the options honoured include:

  • pollUrl - the URL of the server to be polled.

  • post_baseURL - parameter for messages to be returned. Also used to look up credentials to help subscribers with retrieval.

  • masks - These are the directories at the pollUrl to poll. derived from the accept/reject clauses, but filtering should happen later. entire directories are listed at this point.

  • timezone - interpret listings from an FTP server as being in the given timezone (as per pytz

  • chmod - used to identify the minimum permissions to accept for a file to be included in a polling result.

  • identity_method - parameter for how to build identity checksum for messages. as these are usually remote files, the default is typically “cod” (calculate on download)

  • rename - parameter used to to put in messages built to specify the rename field contents.

  • options are passed to sarracenia.Transfer classes for their use as well.

Poll uses sarracenia.transfer (ftp, sftp, https, etc… )classes to requests lists of files using those protocols using built-in logic.

Internally, Poll normalizes the listings received by placing them into paramiko.SFTPAttributes metadata records (similar to stat records) and builds a Sarracenia.Message from them. The poll routine does one pass of this, returning a list of Sarracenia.Messages.

To customize:

  • one can add new sarracenia.transfer protocols, each implementing the ls entry point to be compatible with this polling routine, ideally the entry point would return a list of paramiko.SFTPAttributes for each file in a directory listing. This can be used to implement polling of structured remote resources such as S3 or webdav.

  • one can deal with different formats of HTTP pages by overriding the handle_data entry point, as done in nasa_mls_nrt.py plugin

  • for traditional file servers, the listing format should be decypherable with the built-in processing.

  • sftp file servers provide paramiko.SFTPAttributes naturally which are timezone agnostic.

  • for some FTP servers, one may need to specify the timezone option to override the UTC default.

  • If there are problems with date or line formats, one can sub-class poll, and override only the on_line routine to deal with that.

__init__(options, class_logger=<Logger sarracenia.flowcb.poll (WARNING)>)[source]
handle_data(data)[source]

routine called from html.parser to deal with a single line. if the line is about a file, then create a new entry for it with a metadata available from SFTPAttributes.

example lines:

from hpfx.collab.science.gc.ca:

20230113T00Z_MSC_REPS_HGT_ISBL-0850_RLatLon0.09x0.09_PT000H.grib2 2023-01-13 03:49 5.2M

from https://data.cosmic.ucar.edu/suominet/nrt/ncConus/y2023/

CsuPWVh_2023.011.22.00.0060_nc 11-Jan-2023 23:58 47K

this can be overridden by subclassing to deal with new web sites.

Other web servers put their file indices in a tabular format, where there is a number of cells per row: <tr><td></td><td href=filename>filename</td><td>yyyy-mm-dd hh:mm</td><td>size</td> This handle_data supports both formats… the tabular format is provided by a vanilla apache2 on a debian derived system.

on_html_page(data) dict[source]

called once per directory or page of HTML, invokes html.parser, returns a dictionary of file entries.

on_line(line) SFTPAttributes[source]

default line processing, converts a file listing into an SFTPAttributes. does nothing if input is already an SFTPAttributes item, returning it unchanged. verifies that file is accessible (based on self.o.permDefault pattern to establish minimum permissions.)

sarracenia.flowcb.poll.airnow

Posts updated files of airnowtech. Compatible with Python 3.5+.

usage:

in an sr3 poll configuration file:

pollUrl http://files.airnowtech.org/?prefix=airnow/today/

callback airnow

STATUS: unknown… need some authentication, or perhaps the method has changed.

does not seem to work out of the box.

class sarracenia.flowcb.poll.airnow.Airnow(options, class_logger=None)[source]

Bases: FlowCB

sarracenia.flowcb.poll.mail

Posts any new emails from an email server, connected to using the specified protocol, either pop3 or imap. The imaplib/poplib implementations in Python use the most secure SSL settings by default: PROTOCOL_TLS, OP_NO_SSLv2, and OP_NO_SSLv3. Compatible with Python 2.7+.

A sample do_poll option for sr_poll. connects to an email server with the provided credentials and posts all new messages by their msg ID.

usage:

in an sr_poll configuration file:

pollUrl [imap|imaps|pop|pops]://[user[:password]@]host[:port]/

IMAP over SSL uses 993, POP3 over SSL uses 995 IMAP unsecured uses 143, POP3 unsecured uses 110

Full credentials must be in credentials.conf. If port is not specified it’ll default to the ones above based on protocol/ssl setting.

This posts what messages are available. A separate component is needed to download the message, which would need:

callback download.mail_ingest

to process these posts.

class sarracenia.flowcb.poll.mail.Mail(options)[source]

Bases: Poll

__init__(options)[source]

sarracenia.flowcb.poll.nasa_mls_nrt

class sarracenia.flowcb.poll.nasa_mls_nrt.Nasa_mls_nrt(options, class_logger=<Logger sarracenia.flowcb.poll (WARNING)>)[source]

Bases: Poll

handle_data(data)[source]

decode some HTML into an SFTPAttributes record for a file.

sarracenia.flowcb.poll.nexrad

sarracenia.flowcb.poll.noaa_hydrometric

Posts updated files of NOAA water level/temperature hydrometric data. Station site IDs provided in the poll_noaa_stn_file. Compatible with Python 3.5+.

usage: sample url: https://tidesandcurrents.noaa.gov/api/datagetter?range=1&station=9450460&product=water_temperature&units=metric&time_zone=gmt&application=web_services&format=csv

in an sr_poll configuration file:

pollUrl http://tidesandcurrents.noaa.gov/api
retrievePathPattern /datagetter?range=1&station={0:}&product={1:}&units=metric&time_zone=gmt&application=web_services&format=csv

poll_noaa_stn_file [path/to/stn/file]
callback noaa_hydrometric

sample station file:

7|70678|9751639|Charlotte Amalie|US|VI|-4.0
7|70614|9440083|Vancouver|US|WA|-8.0

The poll: If poll_noaa_stn_file isn’t set, it’ll grab an up-to-date version of all station site code data from the NOAA website. The station list file is in the following format: SourceID | SiteID | SiteCode | SiteName | CountryID | StateID | UTCOffset Each station on its own line. Posts the file on the exchange if the request returns a valid URL.

in v2, one needed a matching downloader plugin, but in sr3 we can leverage the retrievePath feature so that normalk downloader works, so only the poll one needed.

class sarracenia.flowcb.poll.noaa_hydrometric.Noaa_hydrometric(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.poll.usgs

sarracenia.flowcb.post

Place holder… thought this was not needed, then there was a stack trace.

sarracenia.flowcb.send.email

sarracenia.flowcb.send.email.Email is an sr3 sender plugin. Once a file is posted, the plugin matches the topic(what the filename begins with) to the file name and sends the appropriate emails.

Usage:
  1. Need the following variables in an sr_sender config defined: file_email_to, file_email_relay Optionally, you can also provide a sender name/email as file_email_form:

    file_email_to AACN27 muhammad.taseer@canada.ca, test@test.com file_email_relay email.relay.server.ca file_email_from santa@canada.ca

  2. In the config file, include the following line:

    callback send.email

  3. sr_sender foreground emails.conf

Original Author: Wahaj Taseer - June, 2019

class sarracenia.flowcb.send.email.Email(options)[source]

Bases: FlowCB

__init__(options)[source]

sarracenia.flowcb.post.message

class sarracenia.flowcb.post.message.Message(options)[source]

Bases: FlowCB

post messages to sarracenia.moth message queuing protocol destination.

__init__(options)[source]
please_stop() None[source]

pass stop request along to publisher Moth instance(s)

sarracenia.flowcb.send

plugins intended for on_message entry_point.

(when messages are received.)

sarracenia.flowcb.wistree

sarracenia.flowcb.work

plugins that use primarily the after_work entry point, normally executed after the file transfer (either send or get) has completed.

usually such plugins will contain a loop:

for msg in worklist.ok

do_something.

to operate on all the files transferrred or processed successfully.

sarracenia.flowcb.work.age

print the age of files written (compare current time to mtime of message.) usage:

flowcb work.age

class sarracenia.flowcb.work.age.Age(options, class_logger=None)[source]

Bases: FlowCB

sarracenia.flowcb.work.rxpipe

class sarracenia.flowcb.work.rxpipe.Rxpipe(options)[source]

Bases: FlowCB

After each file is transferred, write it’s name to a named_pipe.

parameter:

rxpipe_name – the path for the named pipe to write the file names to.

__init__(options)[source]

sarracenia.flowcb.v2wrapper

class sarracenia.flowcb.v2wrapper.V2Wrapper(o)[source]

Bases: FlowCB

__init__(o)[source]

A wrapper class to run v02 plugins. us run_entry(entry_point,module)

entry_point is a string like ‘on_message’, and module being the one to add.

weird v2 stuff:

when calling init, self is a config/subscriber… when calling on_message, self is a message… that is kind of blown away for each message… parent is the config/subscriber in both cases. so v2 state variables are always stored in parent.

on_time(time)[source]

run plugins for a given entry point.

run_entry(ep, m)[source]

run plugins for a given entry point.

sarracenia.flowcb.v2wrapper.sumstrFromMessage(msg) str[source]

accepts a v3 message as argument msg. returns the corresponding sum string for a v2 ‘sum’ header.