FlowCallback Reference
Documentation of the Flow callback plugins included with sr3.
sarracenia.flowcb
- class sarracenia.flowcb.FlowCB(options, class_logger=None)[source]
Bases:
object
Flow Callback is the main class for implementing plugin customization to flows.
sample activation in a configuration file:
flowCallback sarracenia.flowcb.name.Name
will instantiate an object of that type whose appropriately name methods will be called at the right time.
__init__ accepts options as an argument.
options is a sarracenia.config.Config object, used to override default behaviour
a setting is declared in a configuration file like so:
set sarracenia.flowcb.filter.log.Log.level debug
(the prefix for the setting matches the type hierarchy in flowCallback) the plugin should get the setting:
options.level = 'debug'
worklist given to on_plugins…
worklist.incoming –> new messages to continue processing
worklist.ok –> successfully processed
worklist.rejected –> messages to not be further processed.
worklist.failed –> messages for which processing failed. Failed messages will be retried.
worklist.directories_ok –> list of directories created during processing.
Initially, all messages are placed in incoming. if a plugin entry_point decides:
a message is not relevant, it is moved to the rejected worklist.
all processing has been done, it moves it to the ok worklist
an operation failed and it should be retried later, append it to the failed worklist
Do not remove any message from all lists, only move messages between them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received. Messages can only removed after ack.
def __init__(self,options) -> None:
Task: initialization of the flowCallback at instantiation time. usually contains: self.o = options
def ack(self,messagelist) -> None:
Task: acknowledge messages from a gather source.
def gather(self, messageCountMax) -> (gather_more, messages)
Task: gather messages from a source... return a tuple: * gather_more ... bool whether to continue gathering * messages ... list of messages or just return a list of messages. In a poll, gather is always called, regardless of vip posession. In all other components, gather is only called when in posession of the vip. return (True, list) OR return list
def after_accept(self,worklist) -> None:
Task: just after messages go through accept/reject masks, operate on worklist.incoming to help decide which messages to process further. and move messages to worklist.rejected to prevent further processing. do not delete any messages, only move between worklists.
def after_gather(self,worklist) -> None:
Task: operate on worklist.incoming to help decide which messages to process further. Move messages to worklist.rejected to prevent further processing. Should only really be used for special use cases when message processing needs to be done before going through `filter` of the flow algorithm. Otherwise, after_accept entry point should be used.
def after_work(self,worklist) -> None:
Task: operate on worklist.ok (files which have arrived.) All messages on the worklist.ok list have been acknowledged, so to suppress posting of them, or futher processing, the messages must be removed from worklist.ok. worklist.failed processing should occur in here as it will be zeroed out after this step. The flowcb/retry.py plugin, for example, processes failed messages.
def destfn(self,msg) -> str:
Destination File Name (DESTFNSCRIPT) routines. Task: look at the fields in the message, and perhaps settings and return a new file name for the target of the send or download. kind of a last resort function, exists mostly for sundew compatibility. can be used for selective renaming using accept clauses.
def download(self,msg) -> bool:
Task: looking at msg['new_dir'], msg['new_file'], msg['new_inflight_file'] and the self.o options perform a download of a single file. return True on a successful transfer, False otherwise. if self.o.dry_run is set, simulate the output of a download without performing it. This replaces built-in download functionality, providing an override. for individual file transfers. ideally you set checksums as you download.
def metricsReport(self) -> dict:
Return a dictionary of metrics. Example: number of messages remaining in retry queues.
- def on_cleanup(self) -> None::
allow plugins to perform additional work after broker resources are eliminated. local state files are still present when this runs.
- def on_declare(self) -> None::
local state files are still already present when this runs. allow plugins to perform additional work besides broker resource setup.
def on_housekeeping(self) -> None:
do periodic processing.
def on_start(self) -> None:
After the connection is established with the broker and things are instantiated, but before any message transfer occurs.
def on_stop(self) -> None:
what it says on the tin... clean up processing when stopping.
def poll(self) -> list:
Task: gather messages from a destination... return a list of messages. works like a gather, but... When specified, poll replaces the built-in poll of the poll component. it runs only when the machine running the poll has the vip. in components other than poll, poll is never called. return []
def post(self,worklist) -> None:
Task: operate on worklist.ok, and worklist.failed. modifies them appropriately. message acknowledgement has already occurred before they are called. to indicate failure to process a message, append to worklist.failed. worklist.failed processing should occur in here as it will be zeroed out after this step.
def send(self,msg) -> int:
Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer of a single file. return >0 on a successful transfer, 0 if failed but temporarily, <0 if failure is permanent. if self.o.dry_run is set, simulate the output of a send without performing it. This replaces built-in send functionality for individual files.
- def please_stop(self):
Pre-warn a flowcb that a stop has been requested, allowing processing to wrap up before the full stop happens.
- __weakref__
list of weak references to the object (if defined)
- sarracenia.flowcb.load_library(factory_path, options)[source]
Loading the entry points for a python module. It searches the normal python module path using the importlib module.
the factory_path is a combined file specification with a dot separator with a special last entry being the name of the class within the file.
factory_path a.b.c.C
means import the module named a.b.c and instantiate an object of type C. In that class-C object, look for the known callback entry points.
or C might be guessed by the last class in the path not following python convention by not starting with a capital letter, in which case, it will just guess.
re note that the ~/.config/sr3/plugins will also be in the python library path, so modules placed there will be found, in addition to those in the package itself in the sarracenia/flowcb directory
- callback foo -> foo.Foo
sarracenia.flowcb.foo.Foo
- callback foo.bar -> foo.bar.Bar
sarracenia.flowcb.foo.bar.Bar foo.bar sarracenia.flowcb.foo.bar
sarracenia.flowcb.accept
sarracenia.flowcb.accept modules are ones where the main focus is on the after_accept entry point. This entry point is called after reject & accept rules have been called, and typically after duplicate suppression has also been applied.
They can be used to further refine which files should be downloaded, by moving messages from the worklist.incoming to worklist.rejected.
sarracenia.flowcb.accept.delete
- Plugin delete.py:
the default after_accept handler for log. prints a simple notice. # FIXME is this doc accurate? seems it is also modifying a message.
- Usage:
flowcb sarracenia.flowcb.accept.delete.Delete
sarracenia.flowcb.download
Place holder customized flow callback classes whose focus is the download() entry point.
download(self,msg):
- Task: looking at msg[‘new_dir’], msg[‘new_path’], msg[‘new_inflight_path’]
and the self.o options perform a download of a single file. return True on a successful transfer, False otherwise.
if self.o.dry_run is set, simulate the output of a download without performing it.
downlaod is called by the sarra and subscribe components, and can be used to override the built-in methods for downloading a file.
It does the download based on the fields in the message provided:
retreival path: msg[‘baseUrl’] + ‘/’ + msg[‘relPath’]
taking care of the inflight setting, the inflight/temporary file: is defined for use by the download routing: msg[‘new_inflight_path’]
Final local location to store is defined by:
msg[‘new_path’] == msg[‘new_dir’] + ‘/’ + msg[‘new_file’]
This replaces built-in download functionality, providing an override. for individual file transfers. ideally you set checksums as you download.
looking at self.o.identity_method to establish download checksum algorithm. might have to allow for cod… say it is checksum_method:
checksum = sarracenia.identity.Identity.factory(self.o.checksum_method) while downloading: checksum.update(chunk)where chunk is the bytes read. It saves a file read to calculate the checksum during the download.
if the checksum does not match what is in the received message, then it is imperative, to avoid looping, to apply the actual checksum of the data to the message:
msg[‘identity’] = { ‘method’: checksum_method, ‘value’: checksum.get_sumstr() }
return Boolean success indicator. if False, download will be attempted again and/or appended to retry queue.
sarracenia.flowcb.accept.downloadbaseurl
- Plugin downloadbaseurl.py:
Downloads files sourced from the baseUrl of the poster, and saves them in the directory specified in the config. Created to use with the poll_nexrad.py plugin to download files uploaded in the NEXRAD US Weather Radar public dataset. Compatible with Python 3.5+.
Example
- A sample do_download option for subscribe.
Downloads the file located at message[‘baseUrl’] and saves it
- Usage:
flowcb sarracenia.flowcb.accept.downloadbaseurl.DownloadBaseUrl
sarracenia.flowcb.accept.hourtree
- Plugin hourtree.py:
When receiving a file, insert an hourly directory into the local delivery path hierarchy.
Example
input A/B/c.gif –> output A/B/<hour>/c.gif
- Usage:
flowcb sarracenia.flowcb.accept.hourtree.HourTree
sarracenia.flowcb.accept.httptohttps
- Plugin httptohttps.py:
This plugin simply turns messages with baseUrl http://… into https://…
The process would need to be restarted. From now on, all http messages that would be consumed, would be turned into an https message. The remaining of the process will treat the message as if posted that way. That plugin is an easy way to turn transaction between dd.weather.gc.ca and the user into secured https transfers.
- Usage:
flowcb sarracenia.flowcb.accept.httptohttps.HttpToHttps
sarracenia.flowcb.accept.longflow
- Plugin longflow.py:
This plugin is strictly for self-test purposes. Creates a header ‘toolong’ that is longer than 255 characters, and so gets truncated. Each header in a message that is too long should generate a warning message in the sarra log. flow_test checks for the ‘truncated’ error message. Put some utf characters in there to make it interesting… (truncation complex.)
- Usage:
flowcb sarracenia.flowcb.accept.longflow.LongFlow
sarracenia.flowcb.accept.posthourtree
- Plugin posthourtree.py:
When posting a file, insert an hourly directory into the delivery path hierarchy.
Example
input A/B/c.gif –> output A/B/<hour>/c.gif
- Usage:
callback accept.posthourtree
sarracenia.flowcb.accept.postoverride
- Plugin postoverride.py:
Override message header for products that are posted. This can be useful or necessary when re-distributing beyond the original intended destinations.
Example
for example company A delivers to their own DMZ server. ACME is a client of them, and so subscribes to the ADMZ server, but the to_cluster=ADMZ, when ACME downloads, they need to override the destination to specify the distribution within ACME. * postOverride to_clusters ACME * postOverrideDel from_cluster
- Usage:
flowcb sarracenia.flowcb.accept.postoverride.PostOverride postOverride x y postOverrideDel z
sarracenia.flowcb.accept.printlag
- Plugin printlag.py:
This is an after_accept plugin. print a message indicating how old messages received are. this should be used as an on_part script. For each part received it will print a line in the local log that looks like this:
2015-12-23 22:54:30,328 [INFO] posted: 20151224035429.115 (lag: 1.21364 seconds ago) to deliver: /home/peter/test/dd/bulletins/alphanumeric/20151224/SA/EGGY/03/SAUK32_EGGY_240350__EGAA_64042,
the number printed after “lag:” the time between the moment the message was originally posted on the server, and the time the script was called, which is very near the end of writing the file to local disk.
This can be used to gauge whether the number of instances or internet link are sufficient to transfer the data selected. if the lag keeps increasing, then something must be done.
- Usage:
flowcb sarracenia.flowcb.accept.printlag.PrintLag
sarracenia.flowcb.accept.rename4jicc
sarracenia.flowcb.accept.renamedmf
- Plugin renamedmf.py:
This is an example of the usage of after_accept script to rename the product when it is downloaded It adds a ‘:datetime’ to the name of the product unlikely to be useful to others, except as example.
Example
takes px name : CACN00_CWAO_081900__WZS_14623:pxatx:CWAO:CA:5:Direct:20081008190602 add datetimestamp : CACN00_CWAO_081900__WZS_14623:pxatx:CWAO:CA:5:Direct:20081008190602:20081008190612
- Usage:
flowcb sarracenia.flowcb.accept.renamedmf.RenameDMF
sarracenia.flowcb.accept.renamewhatfn
- Plugin renamewhatfn.py:
This plugin is no longer needed. Sundew compoatibility was added to Sarracenia, so now can get the same effect by using the filename option which works like it does in Sundew:
filename WHATFN
what it was used for: This renamer strips everything from the first colon in the file name to the end. This does the same thing as a ‘WHATFN’ config on a sundew sender.
Example
takes px name : /apps/dms/dms-metadata-updater/data/international_surface/import/mdicp4d:pull-international-metadata:CMC:DICTIONARY:4:ASCII:20160223124648 rename for : /apps/dms/dms-metadata-updater/data/international_surface/import/mdicp4d
- Usage:
flowcb sarracenia.flowcb.accept.renamewhatfn.RenameWhatFn
sarracenia.flowcb.accept.save
- Plugin save.py:
Converts a consuming component into one that writes the queue into a file. If there is some sort of problem with a component, then add callback save and restart.
The messages will accumulate in a save file in ~/.cache/<component>/<config>/ ??<instance>.save When the situation is returned to normal (you want the component to process the data as normal): * remove the callback save * note the queue this configuration is using to read (should be in the log on startup.) * run an sr_shovel with -restore_to_queue and the queue name.
- Options:
save.py takes ‘msgSaveFile’ as an argument. “_9999.sav” will be added to msgSaveFile, where the 9999 represents the instance number. As instances run in parallel, rather than sychronizing access, just writes to one file per instance.
- Usage:
callback accept.save msgSaveFile x
sarracenia.flowcb.accept.speedo
- Plugin speedo.py:
Gives a speedometer reading on the messages going through an exchange. as this is an after_accept Accumulate the number of messages and the bytes they represent over a period of time.
- Options:
msgSpeedoInterval -> how often the speedometer is updated. (default: 5) msg_speedo_maxlag -> if the message flow indicates that messages are ‘late’, emit warnings. (default 60)
- Usage:
callback accept.speedo msgSpeedoInterval x msg_speedo_maxlag y
sarracenia.flowcb.accept.sundewpxroute
- Plugin sundewpxroute.py:
Implement message filtering based on a routing table from MetPX-Sundew. Make it easier to feed clients exactly the same products with sarracenia, that they are used to with sundew.
- Usage:
the pxrouting option must be set in the configuration before the plugin is configured, like so: * pxRouting /local/home/peter/src/pdspx/routing/etc/pxRouting.conf * pxClient navcan-amis flowcb sarracenia.flowcb.accept.sundewpxroute.SundewPxRoute
- class sarracenia.flowcb.accept.sundewpxroute.SundewPxRoute(options)[source]
Bases:
FlowCB
- __init__(options)[source]
For World Meteorological Organization message oriented routing. Read the configured metpx-sundew routing table, and figure out which Abbreviated Header Lines (AHL’s) are configured to be sent to ‘target’ being careful to account for membership in clientAliases.
init sets ‘ahls_to_route’ according to the contents of pxrouting
sarracenia.flowcb.accept.testretry
- Plugin testretry.py:
This changes the message randomly so that it will cause a download or send an error. When a message is being retried, it is randomly fixed
- Usage:
flowcb sarracenia.flowcb.accept.testretry.TestRetry
sarracenia.flowcb.accept.toclusters
- Plugin toclusters.py:
Implements inter-pump routing by filtering destinations. This is placed on a sarra process copying data between pumps. Whenever it gets a message, it looks at the destination and processing only continues if it is beleived that that message is a valid destination for the local pump.
- Options:
- The local pump will select messages destined for the DD or DDI clusters, and reject those for DDSR, which isn’t in the list.
msgToClusters DDI
msgToClusters DD
- Usage:
flowcb sarracenia.flowcb.accept.toclusters.ToClusters msgToClusters x msgToClusters y …
sarracenia.flowcb.accept.tohttp
sarracenia.flowcb.accept.tolocal
sarracenia.flowcb.accept.tolocalfile
- Plugin tolocalfile.py:
This is a helper script to work with converters (filters) and senders.
What a data pump advertises, it will usually use Web URL, but if one is on a server where the files are available, it is more efficient to access them as local files, and so this plugin turn the message’s notice Web URL into a File URL (file:/d1/d2/…/fn)
- Normal Usage:
A Web URL in an amqp message is hold in the following values: message[‘baseUrl’] (ex.: http://localhost) and message[‘relPath’] (ex.: /<data>/<src>/d3/…/fn)
We will save these values before their modification : message[‘saved_baseUrl’] = message[‘baseUrl’] message[‘saved_relPath’] = message[‘relPath’]
We will then turn them into an absolute File Url: (Note if a baseDir was set it prefix the relPath) message[‘baseUrl’] = ‘file:’ message[‘relPath’] = [baseDir] + message[‘relPath’]
Example
baseDir /var/www/html
message pubtime=20171003131233.494 baseUrl=http://localhost relPath=/20171003/CMOE/productx.gif
flowcb sarracenia.flowcb.accept.tolocalfile.ToLocalFile
will receive this:: * message[‘baseUrl’] is ‘http://localhost’ * message[‘relPath’] is ‘/20171003/CMOE/GIF/productx.gif’
will copy/save these values
message[‘saved_baseUrl’] = message[‘baseUrl’]
message[‘saved_relPath’] = message[‘relPath’]
turn the original values into a File URL
message[‘baseUrl’] = ‘file:’
if parent[‘baseDir’] : * message[‘relPath’] = parent[‘baseDir’] + ‘/’ + message[‘relPath’] * message[‘relPath’] = message[‘relPath’].replace(‘//’,’/’)
A sequence of after_accept plugins can perform various changes to the messages and/or to the product… so here lets pretend we have an after_accept plugin that converts gif to png and prepares the proper message for it
flowcb sarracenia.flowcb.accept.giftopng.GifToPng After the tolocalfile this script could perform something like:
# build the absolute path of the png product
new_path = message['relPath'].replace('GIF','PNG')
new_path[-4:] = '.png'
# proceed to the conversion gif2png
ok = self.gif2png(gifpath=message.relPath,pngpath=new_path)
change the message to announce the new png product:
if ok :
message['baseUrl'] = message['saved_baseUrl']
message['relPath'] = new_path
if self.o.baseDir :
message['relPath'] = new_path.replace(self.o.baseDir,'',1)
else :
logger.error(...
# we are ok... proceed with this png file
- Usage:
flowcb sarracenia.flowcb.accept.tolocalfile.ToLocalFile
sarracenia.flowcb.accept.wmotypesuffix
- Plugin wmotypesuffix.py:
Given the WMO-386 TT designator of a WMO file, file type suffix to the file name. Web browsers and modern operating systems may do the right thing if files have a recognizable suffix.
Status: proof of concept demonstrator… missing many TT’s. please add! Tested with UNIDATA feeds, discrepancies: TableA says L is Aviation XML, but UNIDATA Feed, it is all GRIB. XW - should be CAP, but is GRIB. IX used by Americans for HDF, unsure if that is kosher/halal/blessed, but it is in the UNIDATA feed.
IU/IS/IB are BUFR other type designators welcome… for example, GRIB isn’t identified yet. default to .txt.
- Usage:
flowcb sarracenia.flowcb.accept.wmotypesuffix.WmoTypeSuffix
sarracenia.flowcb.clamav
A sample on_part plugin to perform virus scanning, using the ClamAV engine.
requires a clamd binding package to be installed. On debian derived systems:
sudo apt-get install python3-pyclamd
on others:
pip3 install pyclamd
author: Peter Silva
- class sarracenia.flowcb.clamav.Clamav(options)[source]
Bases:
FlowCB
Invoke ClamAV anti-virus scanning on files as they pass through a data pump. when it is invoked depends on the component it is used from.
from a sender, post, or poll, the scan should stop processing prior to the transfer.
for other components, subsscribers that download, it needs to take place after downloading.
sarracenia.flowcb.destfn.sample
a destfn plugin script is used by senders or subscribers to do complex file naming. this is an API demonstrator that prefixes the name delivered with ‘renamed_’:
filename DESTFNSCRIPT=sarracenia.flowcb.destfn.sample.Sample
An alternative method of invocation is to apply it selectively:
accept k* DESTFNSCRIPT=sarracenia.flowcb.destfn.sample.Sample
As with other flowcb plugins, the import will be done using normal python import mechanism equivalent to:
import sarracenia.flowcb.destfn.sample
and then in that class file, there is a Sample class, the sample class contains the destfn method, or entry_point.
The destfn routine consults the fields in the given message, and based on them, return a new file name for the file to have after transfer (download or send.)
the routines have access to the settings via options provided to init, accessed, by convention, as self.o.
The routine can also modify fields create new ones in the message.
the destfn routine returns the new name of the file.
sarracenia.flowcb.filter
a module that is focused on transforming data should be called a filter. Filter plugins intended for after_accept(self, worklist) entry_point.
At that point: Messages have been gathered, then passed through the accept and reject pattern matches. One of the first callbacks is the nodupe, so that duplicate suppression may cause additional rejections.
sarracenia.flowcb.filter.deleteflowfiles
sarracenia.flowcb.filter.fdelay
This plugin delays processing of messages by message_delay seconds
sarracenia.flowcb.msg.fdelay 30 import sarracenia.flowcb.filter.fdelay.Fdelay
or more simply:
fdelay 30 callback filter.fdelay
every message will be at least 30 seconds old before it is forwarded by this plugin. in the meantime, the message is placed on the retry queue by marking it as failed.
sarracenia.flowcb.filter.pclean_f90
msg_pclean_f90 module: file propagation test for Sarracenia components (in flow test) https://github.com/MetPX/sr_insects/
- class sarracenia.flowcb.filter.pclean_f90.PClean_F90(options)[source]
Bases:
PClean
functionality within the flow_tests of the sr_insects project. This plugin class receive a msg from xflow_public and check propagation of the underlying file
it checks if the propagation was ok
it randomly set a new test file with a different type in the watch dir (f31 amqp)
it posts the product to be treated by f92
when the msg for the extension file comes back, recheck the propagation
When a product is not fully propagated, the error is reported and the test is considered as a failure. It also checks if the file differs from original
sarracenia.flowcb.filter.pclean_f92
msg_pclean_f90 module: second file propagation test for Sarracenia components (in flow test)
sarracenia.flowcb.filter.wmo2msc
wmo2msc.py is an on_message plugin script to convert WMO bulletins on local disk to MSC internal format in an alternate tree. It is analogous to Sundew’s ‘bulletin-file’. Meant to be called as an sr_shovel plugin.
It prints an output line:
wmo2msc: <input_file> -> <output_file> (<detected format>)
usage:
Use the directory setting to know the root of tree where files are placed. FIXME: well, likely what you really what is something like:
<date>/<source>/dir1/dir2/dir3
<date>/<source>/dir1/dir2/newdir4/...
-- so Directory doesn't cut it.
In a sr_shovel configuration:
directory /....
callback filter.wmo2msc
Parameters:
filter_wmo2msc_replace_dir old,new
filter_wmo2msc_uniquify hash|time|anything else - whether to add a string in addition to the AHL to make the filename unique. - hash - means apply a hash, so that the additional string is content based. - if time, add a suffix _YYYYMMDDHHMMSS_99999 which ensures file name uniqueness. - otherwise, no suffix will be added. - default: hash
filter_wmo2msc_convert on|off if on, then traditional conversion to MSC-BULLETINS is done as per TANDEM/APPS & MetPX Sundew this involves n as termination character, and other charater substitutions.
filter_wmo2msc_tree on|off if tree is off, files are just placed in destination directory. if tree is on, then the file is placed in a subdirectory tree, based on the WMO 386 AHL:
TTAAii CCCC YYGGgg ( example: SACN37 CWAO 300104 ) TT = SA - surface observation. AA = CN - Canada ( but the AA depends on TT value, in many cases not a national code. ) ii = 37 - a number.. there are various conventions, they are picked to avoid duplication.
The first line of the file is expected to contain an AHL. and when we build a tree from it, we build it as follows:
TT/CCCC/GG/TTAAii_CCCC_YYGGgg_<uniquify>
assuming tree=on, uniquify=hash:
SA/CWAO/01/SACN37_CWAO_300104_1c699da91817cc4a84ab19ee4abe4e22
- NOTE: Look at the end of the file for SUPPLEMENTARY INFORMATION
including hints about debugging.
- class sarracenia.flowcb.filter.wmo2msc.Wmo2msc(options)[source]
Bases:
FlowCB
- doSpecificProcessing()[source]
Modify bulletins received from Washington via the WMO socket protocol. started as a direct copy from sundew of routine with same name in bulletinManagerWmo.py
encode/decode, and binary stuff came because of python3
- replaceChar(oldchar, newchar)[source]
replace all occurrences of oldchar by newchar in the the message byte stream. started as a direct copy from sundew of routine with same name in bulletin.py - the storage model is a bit different, we store the entire message as one bytearray. - sundew stored it as a series of lines, so replaceChar implementation changed.
sarracenia.flowcb.gather
- sarracenia.flowcb.gather.logger = <Logger sarracenia.flowcb.gather (WARNING)>
This file was originally a place holder… thought this was not needed, then there was a stack trace. and setup.py gets leaves the directory out if not present.
then started finding this convenient for common routines.
sarracenia.flowcb.gather.file
- class sarracenia.flowcb.gather.file.File(options)[source]
Bases:
FlowCB
read the file system, create messages for the files you find.
this is taken from v2’s sr_post.py
FIXME FIXME FIXME FIXME: the sr_post version would post files on the fly as it was traversing trees. so it was incremental and had little overhead. This one is does the whole recursion in one gather.
It will fail horribly for large trees. Need to re-formulate to replace recursion with interation. perhaps a good time to use python iterators.
also should likely switch from listdir to scandir
- gather(messageCountMax)[source]
from sr_post.py/run
FIXME: really bad performance with large trees: It scans an entire tree before emitting any messages. Need to re-factor with iterator style so produce result in batch sized chunks incrementally.
- post1file(path, lstat, is_directory=False) list [source]
create the notification message for a single file, based on the lstat metadata.
when lstat is present it is used to decide whether the file is an ordinary file, a link or a directory, and the appropriate message is built and returned.
if the lstat metadata is None, then that signifies a “remove” message to be created. In the remove case, without the lstat, one needs the is_directory flag to decide whether it is an ordinary file remove, or a directory remove. is_directory is not used other than for the remove case.
The return value is a list that usually contains a single message. It is a list to allow for if options are combined such that a symbolic link and the realpath it posts to may involve multiple messages for a single file. Similarly in the multi-block transfer case.
sarracenia.flowcb.gather.message
- class sarracenia.flowcb.gather.message.Message(options)[source]
Bases:
FlowCB
gather messages from a sarracenia.moth message queuing protocol source.
sarracenia.flowcb.housekeeping
plugins intended for on_message entry_point.
(when messages are received.)
sarracenia.flowcb.housekeeping.resources
Default on_housekeeping handler that: - Logs Memory and CPU usage. - Restarts components to deal with memory leaks.
If MemoryMax is not in the config, it is automatically calculated with the following procedure:
The plugin processes the first MemoryBaseLineFile items to reach a steady state. - Subscribers process messages-in - Posting programs process messages-posted
Set MemoryMax threshold to MemoryMultiplier * (memory usage at time steady state)
If memory use ever exceeds the MemoryMax threshold, then the plugin triggers a restart, reducing memory consumption.
Parameters:
- MemoryMaxsize (default: none)
Hard coded maximum for tolerable memory consumption. Must be suffixed with k/m/g for Kilo/Mega/Giga byte values. If not set then the following options will have an effect:
- MemoryBaseLineFileint, optional (default: 100)
How many files to process before measuring to establish the baseline memory usage. (how many files are expected to process before a steady state is reached)
- MemoryMultiplierint, optional (default: 3)
How many times past the steady state memory footprint you want to allow the component to grow before restarting. It could be normal for memory usage to grow, especially if plugins store data in memory.
- returns:
Nothing, restarts components if memory usage is outside of configured thresholds.
- class sarracenia.flowcb.housekeeping.resources.Resources(options)[source]
Bases:
FlowCB
- restart()[source]
Do an in-place restart of the current process (keeps pid). Gets a new memory stack/heap, keeps all file descriptors but replaces the buffers.
- threshold
Per-process maximum memory footprint that is considered too large, forcing a process restart.
sarracenia.flowcb.log
- class sarracenia.flowcb.log.Log(options)[source]
Bases:
FlowCB
The logging flow callback class. logs message at the indicated time. Controlled by:
logEvents - which entry points to emit log messages at.
logMessageDump - print literal messages when printing log messages.
every housekeeping interval, prints:
how many messages were received, rejected, %accepted.
number of files transferred, their size, and rate in files/s and bytes/s
lag: some information about how old the messages are when processed
sarracenia.flowcb.mdelaylatest
- class sarracenia.flowcb.mdelaylatest.MDelayLatest(options)[source]
Bases:
FlowCB
This plugin delays processing of messages by message_delay seconds If multiple versions of a file are published within the interval, only the latest one will be published.
mdelay 30 flowcb sarracenia.flowcb.mdelaylatest.MDelayLatest
every message will be at least 30 seconds old before it is forwarded by this plugin. In the meantime, the message is placed on the retry queue by marking it as failed.
sarracenia.flowcb.nodupe
- class sarracenia.flowcb.nodupe.NoDupe(options, class_logger=None)[source]
Bases:
FlowCB
duplicate suppression family of modules.
invoked with:
callback sarracenia.flowcb.nodupe.disk
or: callback sarracenia.flowcb.nodupe.redis
with default being loaded depdending on the presence of a
nodupe_driver “redis”
setting (defaults to disk.)
sarracenia.flowcb.nodupe.data
- class sarracenia.flowcb.nodupe.data.Data(options, class_logger=None)[source]
Bases:
FlowCB
duplicate suppression based on data alone. Overrides the path used for lookups in the cache so that all files have the same name, and so if the checksum is the same, regardless of file name, it is considered a duplicate.
sarracenia.flowcb.nodupe.name
- class sarracenia.flowcb.nodupe.name.Name(options, class_logger=None)[source]
Bases:
FlowCB
Override the the comparison so that files with the same name, regardless of what directory they are in, are considered the same. This is useful when receiving data from two different sources (two different trees) and winnowing between them.
sarracenia.flowcb.retry
- class sarracenia.flowcb.retry.Retry(options)[source]
Bases:
FlowCB
overall goal:
When file transfers fail, write the messages to a queue to be retried later. There is also a second retry queue for failed posts.
how it works:
the after_accept checks how many incoming messages we received. If there is a full batch to process, don’t try to retry any.
if there is room, then fill in the batch with some retry requests.
when after_work is called, the worklist.failed list of messages is the files where the transfer failed. write those messages to a retry queue.
the DiskQueue or RedisQueue classes are used to store the retries, and it handles expiry on each housekeeping event.
- after_accept(worklist) None [source]
If there are only a few new messages, get some from the download retry queue and put them into worklist.incoming.
Do this in the after_accept() entry point if retry_refilter is False.
- after_post(worklist) None [source]
Messages in worklist.failed should be put in the post retry queue.
- after_work(worklist) None [source]
Messages in worklist.failed should be put in the download retry queue. If there are only a few new messages, get some from the post retry queue and put them into worklist.ok.
sarracenia.flowcb.pclean
msg_pclean module: base module for propagation tests and cleanup for Sarracenia components (in flow test) Used by https://github.com/MetPX/sr_insects test suite for CI/CD.
- class sarracenia.flowcb.pclean.PClean(options)[source]
Bases:
FlowCB
Base plugin class that is used in shovel pclean_f9x:
it checks if the propagation was ok.
it randomly set a test in the watch f40.conf for propagation
it posts the product again (more test in shovel clean_f91) which is propagated too
it remove the original product
It also uses a file delay to tolerate a maximum lag for the test
The posted message contains a tag in the header for the test performed which is the extension used for the test
- build_path_dict(fxx_dirs, relpath, ext='')[source]
This build paths necessary to pclean tests
It is a subset of all flow test path based on fxx download directory provided.
- Parameters:
root – usually the sarra dev doc root directory
fxx_dirs – a list of the flow test directory needed
relpath – the relative path of the file (starting with the date) without the forward slash
ext – the extension from the extension test (optional)
- Returns:
a dictionnary of all paths built
sarracenia.flowcb.sample
sarracenia.flowcb.scheduled
- class sarracenia.flowcb.scheduled.Scheduled(options, class_logger=<Logger sarracenia.flowcb.scheduled (WARNING)>)[source]
Bases:
FlowCB
Scheduled flow callback plugin arranges to post url’s at scheduled times.
usage:
In the configuration file, need:
callback scheduled
and the schedule can be a specified as:
scheduled_interval 1m (once a minute) a duration
scheduled_hour 4,9 at 4Z and 9Z every day.
scheduled_minute 33,45 within scheduled hours which minutes.
Scheduled_interval takes precedence over the others, making it easier to specify an interval for testing/debugging purposes.
use in code (for subclassing):
from sarracenia.scheduled import Scheduled
- class hoho(Scheduled):
replace the gather() routine… keep the top lines “until time to run” replace whatever is below. will only run when it should.
sarracenia.flowcb.script
sarracenia.flowcb.shiftdir2baseurl
- class sarracenia.flowcb.shiftdir2baseurl.ShiftDir2baseUrl(options)[source]
Bases:
FlowCB
modify message to shift directories from relPath to baseUrl:
- given the setting shiftDir2baseUrl == 2 and given message with
baseDir=https://a relPath=b/c/d/e subtopic=b/c/d –> baseDir=https://a/b/c relPath=d/e subtopic=d
sarracenia.flowcb.poll
- class sarracenia.flowcb.poll.Poll(options, class_logger=<Logger sarracenia.flowcb.poll (WARNING)>)[source]
Bases:
FlowCB
The Poll flow callback class implements the main logic for polling remote resources. the poll routine returns a list of messages for new files to be filtered.
when instantiated with options, the options honoured include:
pollUrl - the URL of the server to be polled.
post_baseURL - parameter for messages to be returned. Also used to look up credentials to help subscribers with retrieval.
masks - These are the directories at the pollUrl to poll. derived from the accept/reject clauses, but filtering should happen later. entire directories are listed at this point.
timezone - interpret listings from an FTP server as being in the given timezone (as per pytz
chmod - used to identify the minimum permissions to accept for a file to be included in a polling result.
identity_method - parameter for how to build identity checksum for messages. as these are usually remote files, the default is typically “cod” (calculate on download)
rename - parameter used to to put in messages built to specify the rename field contents.
options are passed to sarracenia.Transfer classes for their use as well.
Poll uses sarracenia.transfer (ftp, sftp, https, etc… )classes to requests lists of files using those protocols using built-in logic.
Internally, Poll normalizes the listings received by placing them into paramiko.SFTPAttributes metadata records (similar to stat records) and builds a Sarracenia.Message from them. The poll routine does one pass of this, returning a list of Sarracenia.Messages.
To customize:
one can add new sarracenia.transfer protocols, each implementing the ls entry point to be compatible with this polling routine, ideally the entry point would return a list of paramiko.SFTPAttributes for each file in a directory listing. This can be used to implement polling of structured remote resources such as S3 or webdav.
one can deal with different formats of HTTP pages by overriding the handle_data entry point, as done in nasa_mls_nrt.py plugin
for traditional file servers, the listing format should be decypherable with the built-in processing.
sftp file servers provide paramiko.SFTPAttributes naturally which are timezone agnostic.
for some FTP servers, one may need to specify the timezone option to override the UTC default.
If there are problems with date or line formats, one can sub-class poll, and override only the on_line routine to deal with that.
- handle_data(data)[source]
routine called from html.parser to deal with a single line. if the line is about a file, then create a new entry for it with a metadata available from SFTPAttributes.
example lines:
- from hpfx.collab.science.gc.ca:
20230113T00Z_MSC_REPS_HGT_ISBL-0850_RLatLon0.09x0.09_PT000H.grib2 2023-01-13 03:49 5.2M
- from https://data.cosmic.ucar.edu/suominet/nrt/ncConus/y2023/
CsuPWVh_2023.011.22.00.0060_nc 11-Jan-2023 23:58 47K
this can be overridden by subclassing to deal with new web sites.
Other web servers put their file indices in a tabular format, where there is a number of cells per row: <tr><td></td><td href=filename>filename</td><td>yyyy-mm-dd hh:mm</td><td>size</td> This handle_data supports both formats… the tabular format is provided by a vanilla apache2 on a debian derived system.
sarracenia.flowcb.poll.airnow
Posts updated files of airnowtech. Compatible with Python 3.5+.
- usage:
in an sr3 poll configuration file:
pollUrl http://files.airnowtech.org/?prefix=airnow/today/
callback airnow
- STATUS: unknown… need some authentication, or perhaps the method has changed.
does not seem to work out of the box.
sarracenia.flowcb.poll.mail
Posts any new emails from an email server, connected to using the specified protocol, either pop3 or imap. The imaplib/poplib implementations in Python use the most secure SSL settings by default: PROTOCOL_TLS, OP_NO_SSLv2, and OP_NO_SSLv3. Compatible with Python 2.7+.
A sample do_poll option for sr_poll. connects to an email server with the provided credentials and posts all new messages by their msg ID.
- usage:
in an sr_poll configuration file:
pollUrl [imap|imaps|pop|pops]://[user[:password]@]host[:port]/
IMAP over SSL uses 993, POP3 over SSL uses 995 IMAP unsecured uses 143, POP3 unsecured uses 110
Full credentials must be in credentials.conf. If port is not specified it’ll default to the ones above based on protocol/ssl setting.
This posts what messages are available. A separate component is needed to download the message, which would need:
callback download.mail_ingest
to process these posts.
sarracenia.flowcb.poll.nasa_mls_nrt
sarracenia.flowcb.poll.nexrad
sarracenia.flowcb.poll.noaa_hydrometric
Posts updated files of NOAA water level/temperature hydrometric data. Station site IDs provided in the poll_noaa_stn_file. Compatible with Python 3.5+.
usage: sample url: https://tidesandcurrents.noaa.gov/api/datagetter?range=1&station=9450460&product=water_temperature&units=metric&time_zone=gmt&application=web_services&format=csv
in an sr_poll configuration file:
pollUrl http://tidesandcurrents.noaa.gov/api
retrievePathPattern /datagetter?range=1&station={0:}&product={1:}&units=metric&time_zone=gmt&application=web_services&format=csv
poll_noaa_stn_file [path/to/stn/file]
callback noaa_hydrometric
sample station file:
7|70678|9751639|Charlotte Amalie|US|VI|-4.0
7|70614|9440083|Vancouver|US|WA|-8.0
The poll: If poll_noaa_stn_file isn’t set, it’ll grab an up-to-date version of all station site code data from the NOAA website. The station list file is in the following format: SourceID | SiteID | SiteCode | SiteName | CountryID | StateID | UTCOffset Each station on its own line. Posts the file on the exchange if the request returns a valid URL.
in v2, one needed a matching downloader plugin, but in sr3 we can leverage the retrievePath feature so that normalk downloader works, so only the poll one needed.
sarracenia.flowcb.poll.usgs
sarracenia.flowcb.post
Place holder… thought this was not needed, then there was a stack trace.
sarracenia.flowcb.send.email
Email Sender
sarracenia.flowcb.send.email.Email
is an sr3 sender plugin. It will send the contents of a
file in the body of an email to the configured recipient(s).
The email subject will be the name of the file being sent.
Usage:
In the config file, include the following line:
callback send.emailAnd define the email server
sendTo
Define the email server (required) using the
sendTo
option, and the sender’s email address (optional) in the config file:sendTo smtp://email.relay.server.ca email_from santa@canada.ca # or, with a "human readable" sender name: email_from Santa Claus <santa@canada.ca>Configure recipients using accept statements. You must have at least one recipient per accept statement. Multiple recipients can be specified by separating each address by a comma.
accept .*AACN27.* test@example.com accept .*SXCN.* user1@example.com, user2@example.com accept .*CACN.* DESTFN=A_CACN_Bulletin me@ssc-spc.gc.ca,you@ssc-spc.gc.ca,someone@ssc-spc.gc.ca
To change the filename that is sent in the subject, you can use the filename option, a renamer plugin or
DESTFN/DESTFNSCRIPT on a per-accept basis. The email_subject_prepend
option can be used to add text before
the filename in the email subject. For example:
email_subject_prepend Sent by Sarracenia:
- Future Improvement Ideas:
SMTP on different ports and with authentication
Attach the file instead of putting the contents in the body (useful for binary files)
Original Author: Wahaj Taseer - June, 2019
sarracenia.flowcb.post.message
sarracenia.flowcb.send
plugins intended for on_message entry_point.
(when messages are received.)
sarracenia.flowcb.wistree
sarracenia.flowcb.work
plugins that use primarily the after_work entry point, normally executed after the file transfer (either send or get) has completed.
usually such plugins will contain a loop:
- for msg in worklist.ok
do_something.
to operate on all the files transferrred or processed successfully.
sarracenia.flowcb.work.age
print the age of files written (compare current time to mtime of message.) usage:
flowcb work.age
sarracenia.flowcb.work.rxpipe
sarracenia.flowcb.v2wrapper
- class sarracenia.flowcb.v2wrapper.V2Wrapper(o)[source]
Bases:
FlowCB
- __init__(o)[source]
A wrapper class to run v02 plugins. us run_entry(entry_point,module)
entry_point is a string like ‘on_message’, and module being the one to add.
- weird v2 stuff:
when calling init, self is a config/subscriber… when calling on_message, self is a message… that is kind of blown away for each message… parent is the config/subscriber in both cases. so v2 state variables are always stored in parent.