Customize File handling with Callbacks.

All Sarracenia components implement the Flow algorithm, with different callbacks, in the Python programming language. Sarracenia’s main (Python) class is sarracenia.flow and the a great deal of core functionality is implemented using the class created to add custom processing to a flow, the flowcb (flow callback) class.

For a detailed discussion of the flow algorithm itself, have a look at Concepts manual. For any flow, one can add custom processing at a variety of times during processing by sub-classing the sarracenia.flowcb class.

Briefly, the algorithm has the following steps:

  • init(self, options) – when the import happens, traditional python initialization

  • on_start – when an instance is started.

  • loop forever

    • gather – collect messages to be processed called: worklist.incoming

    • poll – another way to collect messages, only in the poll component.

    • filter – apply accept/reject regular expression matches to the message list. moves messages for files not to download from worklist.incoming to worklist.reject

      • after_accept callback entry point. process worklist.incoming, potentially rejecting some more.

    • ack – worklist.rejected messages are acknowledged to upstream source as processing is complete.

    • work – perform a transfer or transformation on a file.

    • ack – worklist.ok messages for successfully transferred files are acknowledged to upstream source.

      • after_work callback entry point

    • ack – worklist.failed messages for files which not successfully transferred are acknowledged.

    • post – post the result of the work done for the next step.

    • occasionaly… **on_housekeeping – do periodic cleanups…

  • on_stop – shutdown processing.

for more details about flowcb entry points available, have a look at the source code:

Lets look at using the class in a configuration:

[1]:
!sr3 remove subscribe/hpfx_amis.conf
!sr3 add subscribe/hpfx_amis.conf
2024-01-12 15:51:55,713 127453 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}
2024-01-12 15:51:55,714 127453 [INFO] root remove removing subscribe/hpfx_amis

add: 2024-01-12 15:51:56,975 127456 [INFO] sarracenia.sr add copying: /home/peter/Sarracenia/sr3/sarracenia/examples/subscribe/hpfx_amis.conf to /home/peter/.config/sr3/subscribe/hpfx_amis.conf

[2]:
!echo messageCountMax 10 >>~/.config/sr3/subscribe/hpfx_amis.conf

have the flow stop after 10 messages are consumed.

[3]:
!sr3 list fcb
2024-01-12 15:52:22,624 127524 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}
Provided callback classes: ( /home/peter/Sarracenia/sr3/sarracenia )
flowcb/accept/auth_NASA_Earthdata.py flowcb/accept/auth_copernicus.py
flowcb/accept/auth_eumetsat.py   flowcb/accept/dateappend.py
flowcb/accept/delete.py          flowcb/accept/downloadbaseurl.py
flowcb/accept/hourtree.py        flowcb/accept/httptohttps.py
flowcb/accept/longflow.py        flowcb/accept/pathreplace.py
flowcb/accept/posthourtree.py    flowcb/accept/postoverride.py
flowcb/accept/printlag.py        flowcb/accept/rename4jicc.py
flowcb/accept/renamedmf.py       flowcb/accept/renamewhatfn.py
flowcb/accept/save.py            flowcb/accept/speedo.py
flowcb/accept/sundewpxroute.py   flowcb/accept/testretry.py
flowcb/accept/toclusters.py      flowcb/accept/tohttp.py
flowcb/accept/tolocal.py         flowcb/accept/tolocalfile.py
flowcb/accept/trim_legacy_fields.py flowcb/accept/wmotypesuffix.py
flowcb/amserver.py               flowcb/block_reassembly.py
flowcb/clamav.py                 flowcb/destfn/replace.py
flowcb/destfn/sample.py          flowcb/download/mail_ingest.py
flowcb/filter/deleteflowfiles.py flowcb/filter/fdelay.py
flowcb/filter/pclean_f90.py      flowcb/filter/pclean_f92.py
flowcb/filter/wmo2msc.py         flowcb/gather/file.py
flowcb/gather/message.py         flowcb/housekeeping/resources.py
flowcb/log.py                    flowcb/mdelaylatest.py
flowcb/nodupe/data.py            flowcb/nodupe/disk.py
flowcb/nodupe/name.py            flowcb/nodupe/redis.py
flowcb/pclean.py                 flowcb/poll/airnow.py
flowcb/poll/eumetsat.py          flowcb/poll/mail.py
flowcb/poll/nasa_mls_nrt.py      flowcb/poll/nexrad.py
flowcb/poll/noaa_hydrometric.py  flowcb/poll/odata.py
flowcb/poll/poll_NASA_CMR.py     flowcb/poll/rate_limit.py
flowcb/poll/s3bucket.py          flowcb/poll/usgs.py
flowcb/post/message.py           flowcb/report.py
flowcb/retry.py                  flowcb/rootchown.py
flowcb/run.py                    flowcb/rxqueue_gzip.py
flowcb/sample.py                 flowcb/scheduled/wiski.py
flowcb/send/am.py                flowcb/send/email.py
flowcb/shiftdir2baseurl.py       flowcb/trace_on_stop.py
flowcb/v2wrapper.py              flowcb/wistree.py
flowcb/work/age.py               flowcb/work/check.py
flowcb/work/citypage_check.py    flowcb/work/delete.py
flowcb/work/rxpipe.py            flowcb/work/send_egc_les.py

Adding that line to the configuration means that the wistree flowcb subclass (source above) will be added to the flow, changing processing by having its routines called… the main one being after_accept

[4]:
!echo callback accept.posthourtree >>~/.config/sr3/subscribe/hpfx_amis.conf
[5]:
!sr3 foreground subscribe/hpfx_amis.conf
2024-01-12 15:52:39,635 127558 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}
.2024-01-12 15:52:40,036 [INFO] 127562 sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}
2024-01-12 15:52:40,038 [INFO] 127562 sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}
2024-01-12 15:52:40,038 [INFO] 127562 sarracenia.flow loadCallbacks flowCallback plugins to load: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']
2024-01-12 15:52:40,041 [INFO] 127562 sarracenia.flowcb.log __init__ subscribe initialized with: logEvents: {'after_accept', 'on_housekeeping', 'after_work', 'after_post', 'post'},  logMessageDump: False
2024-01-12 15:52:40,042 [INFO] 127562 sarracenia.flow run callbacks loaded: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']
2024-01-12 15:52:40,042 [INFO] 127562 sarracenia.flow run pid: 127562 subscribe/hpfx_amis instance: 0
2024-01-12 15:52:40,441 [INFO] 127562 sarracenia.moth.amqp _queueDeclare queue declared q_anonymous_subscribe.hpfx_amis.68942404.82515581 (as: amqps://anonymous@hpfx.collab.science.gc.ca/), (messages waiting: 0)
2024-01-12 15:52:40,441 [INFO] 127562 sarracenia.moth.amqp getSetup binding q_anonymous_subscribe.hpfx_amis.68942404.82515581 with v02.post.*.WXO-DD.bulletins.alphanumeric.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca/)
2024-01-12 15:52:40,487 [INFO] 127562 sarracenia.flow run now active on vip ['AnyAddressIsFine']
2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.82 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRCN40_KWAL_122052___32878
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 6.28 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SO/KWNB/20/SOVD83_KWNB_121900_RRX__26978
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 6.27 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SM/KWNB/20/SMVD20_KWNB_121800_RRX__52297
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 6.27 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SM/KWNB/20/SMVD20_KWNB_121800_RRX__48301
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 3.97 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRCN40_KWAL_122052___22701
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 3.97 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SN/KWNB/20/SNVD20_KWNB_121900_RRX__17918
2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flow do_download missing destination directories, makedirs: /tmp/hpfx_amis/20
2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_122052___32878
2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SOVD83_KWNB_121900_RRX__26978
2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SMVD20_KWNB_121800_RRX__52297
2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SMVD20_KWNB_121800_RRX__48301
2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_122052___22701
2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SNVD20_KWNB_121900_RRX__17918
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 1.53 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRMN70_KWAL_122052___39567
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 1.53 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRCN40_KWAL_122052___5395
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 0.52 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRWA20_KWAL_122052___2278
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.14 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SX/KWAL/20/SXCN40_KWAL_122052___60928
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.14 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRME20_KWAL_122052___58721
2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.14 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SN/KWNB/20/SNVD22_KWNB_121900_RRX__60599
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRMN70_KWAL_122052___39567
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_122052___5395
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRWA20_KWAL_122052___2278
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SXCN40_KWAL_122052___60928
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRME20_KWAL_122052___58721
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SNVD22_KWNB_121900_RRX__60599
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flow please_stop ok, telling 5 callbacks about it.
2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flow run starting last pass (without gather) through loop for cleanup.
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flow please_stop ok, telling 5 callbacks about it.
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flow run on_housekeeping pid: 127562 subscribe/hpfx_amis instance: 0
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flowcb.gather.message on_housekeeping messages: good: 12 bad: 0 bytes: 1.6 KiB average: 140 Bytes
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flowcb.retry on_housekeeping on_housekeeping
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping work_retry_00 on_housekeeping
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping No retry in list
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000168
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping post_retry_000 on_housekeeping
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping No retry in list
2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000106
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.housekeeping.resources on_housekeeping Current Memory cpu_times: user=0.44 system=0.07
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 92.3 MiB, accumulating count (12 or 12/100 so far) before self-setting threshold
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats version: 3.00.51rc5, started: 5 seconds ago, last_housekeeping:  5.7 seconds ago
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats messages received: 12, accepted: 12, rejected: 0   rate accepted: 100.0% or 2.1 m/s
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats files transferred: 12 bytes: 2.2 KiB rate: 386 Bytes/sec
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats lag: average: 3.30, maximum: 6.28
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log on_housekeeping housekeeping
2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flow run clean stop from run loop
2024-01-12 15:52:45,763 [INFO] 127562 sarracenia.flowcb.gather.message on_stop closing
2024-01-12 15:52:45,764 [INFO] 127562 sarracenia.flow close flow/close completed cleanly pid: 127562 subscribe/hpfx_amis instance: 0

Without the plugin, the download would put all files directly the reception directory. with the addition of the wistree callback, it puts places the file in /tmp/hpfx_amis. With the change it puts it in the WIS tree of directories, and adds a file type suffix.

Config File Entries and Callbacks

flowcb.log

To add a callback to a a flow, a line is added to the flows’s configuration file:

flowcb sarracenia.flowcb.log.Log

If you follow the convention, and the name of the class is a capitalized version (Log) of the file name (log), then a shorthand is available:

callback log

Either way it is done, it will cause Sarracenia to import the class and then look for entry points in the class to call at appropriate times.

The class constructor accepts a sarracenia.config.Config class object, called options, that stores all the settings to be used by the running flow. Options is used to override default behaviour of both flows and callbacks. The argument to the flowcb is a standard python class that needs to be in the normal python path for python import, and the last element is the name of the class in within the file that needs to be instantiated as a flowcb instance.

a setting for a callback is declared as follows:

set sarracenia.flowcb.filter.log.Log.logLevel debug

(the prefix for the setting matches the type hierarchy in flowCallback)

when the constructor for the callback is called, it’s options argument will contain:

options.logLevel = 'debug'

If no module specific override is present, then the more global setting is used.

So usage of callbacks can be made without much python knowledge at all, just the ability to create configuration files.

Beyond this point, we find advice for people who want to write their own callbacks in Python. Callbacks are ordinary Python, with a few wrinkles:

Writing Your Own Callbacks

A flow callback, is a python class built with routines named to indicate when the programmer wants them to be called. To do that, create a routine which subclasses sarracenia.flowcb.FlowCB so the class will normally have:

from sarracenia.flowcb import FlowCB

in among the imports near the top of the file. In the main part of the file, there will be the custom callback classes:

class Myclass(FlowCB):

declared as a subclass as FlowCB. The main routines in the class are entry points that will be called at the time their name implies. If you a class is missing a given entry point, it will just not be called. The init() class is used to initialize things for the callback class:

def __init__(self, options):

    self.o = options

    logging.basicConfig(format=self.o.logFormat,
                        level=getattr(logging, self.o.logLevel.upper()))
    logger.setLevel(getattr(logging, self.o.logLevel.upper()))

    self.o.add_option( 'myoption', 'str', 'usuallyThis')

The logging setup lines in init allow setting a specific logging level for this flowCallback class. Once the logging boiler-plate is done, the add_option routine to define settings to for the class. users can include them in configuration files, just like built-in options:

myoption IsReallyNeeded

The result of such a setting is that the self.o.myoption = ‘IsReallyNeeded’. If no value is set in the configuration, self.o.myoption will default to ‘usuallyThis’ There are various kinds of options, where the declared type modifies the parsing:

'count'    integer count type.
'duration' a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)
           modified by a unit suffix ( m-minute, h-hour, w-week )
'flag'     boolean (True/False) option.
'list'     a list of string values, each succeeding occurrence catenates to the total.
           all v2 plugin options are declared of type list.
'size'     integer size. Suffixes k, m, and g for kilo, mega, and giga (base 2) multipliers.
'str'      an arbitrary string value, as will all of the above types, each
           succeeding occurrence overrides the previous one.

Worklists

Besides options, the other main argument to after_accept and after_work callback routines is the worklist. The worklist is given to entry points occurring during message processing, and is a number of worklists of messages:

worklist.incoming --> messages to process (either new or retries.)
worklist.ok       --> successfully processed
worklist.rejected --> messages to not be further processed.
worklist.failed   --> messages for which processing failed.
                      failed messages will be retried.
worklist.directories_ok --> list of directories created during processing.

Initially, all messages are placed in worklists.incoming. if a plugin decides:

  • a message is not relevant, moved it to the rejected worklist.

  • a no further processing of the message is needed, move it to ok worklist.

  • an operation failed and it should be retried later, move to failed worklist.

Do not remove from all lists, only move messages between the worklists. it is necessary to put rejected messages in the appropriate worklist so that they are acknowledged as received. Messages can only removed after the acknowledgement has been taken care of.

Logging

Python has great built-in logging, and once has to just use the module in a normal, pythonic way, with:

import logging

After all imports in your python source file, then define a logger for the source file:

logger = logging.getLogger(_name_)

As is normal with the Python logging module, messages can then be posted to the log:

logger.debug(‘got here’)

Each message in the log will be prefixed with the class and routine emitting the log message, as well as the date/time.

Sample Flow Callback Class

With the above information about option handling, worklists, and logging, we are ready to understand the wistree module we just used. As a very simple example, here is the source code of the callback used above is given below:

[ ]:
"""
Plugin posthourtree.py:
    When posting a file, insert an hourly directory into the delivery path hierarchy.

Example:
    input A/B/c.gif  --> output A/B/<hour>/c.gif

Usage:
    callback accept.posthourtree

"""
import logging
import sys, os, os.path, time, stat
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class Posthourtree(FlowCB):

    def after_accept(self, worklist):
        for message in worklist.incoming:
            datestr = time.strftime('%H', time.gmtime())  # pick the hour
            # insert the hour into the rename header of the message to be posted.
            message['new_dir'] += '/' + datestr
            logger.info(  f"post_hour_tree: new_dir: {message['new_dir']}" )

Sample Flowcb Sub-Class

This wistree.py class, shows more aspects of the callback API, with an init.py as well as bringing in an externam python module, as well as adding fields to the messages. The Wistree class accepts files whose names begin with AHL’s (World Meteorological Organization Abbreviated Header Lines for meteorological products), and renames the directory tree to a different standard, the evolving one for the WMO WIS 2.0 (for more information on that module: https://github.com/wmo-im/GTStoWIS2)

[6]:
  from sarracenia.flowcb import FlowCB
  import logging
  import GTStoWIS2

  logger = logging.getLogger(__name__)


  class Wistree(FlowCB):

    def __init__(self, options):

        if hasattr(options, 'logLevel'):
            logger.setLevel(getattr(logging, options.logLevel.upper()))
        else:
            logger.setLevel(logging.INFO)
        self.topic_builder=GTStoWIS2.GTStoWIS2()
        self.o = options


    def after_accept(self, worklist):

        new_incoming=[]

        for msg in worklist.incoming:

            # fix file name suffix.
            type_suffix = self.topic_builder.mapAHLtoExtension( msg['new_file'][0:2] )
            tpfx=msg['subtopic']

            # input has relpath=/YYYYMMDDTHHMM/... + pubTime
            # need to move the date from relPath to BaseDir, adding the T hour from pubTime.
            try:
                new_baseSubDir=tpfx[0]+msg['pubTime'][8:11]
                t='.'.join(tpfx[0:2])+'.'+new_baseSubDir
                new_baseDir = msg['new_dir'] + os.sep + new_baseSubDir
                new_relDir = 'WIS' + os.sep + self.topic_builder.mapAHLtoTopic(msg['new_file'])
                new_dir = new_baseDir + os.sep + new_relDir

                if msg['new_file'][-len(type_suffix):] != type_suffix:
                    new_file = msg['new_file']+type_suffix
                else:
                    new_file = msg['new_file']

                msg.updatePaths( self.o, new_baseDir + os.sep + new_relDir, new_file )
            except Exception as ex:
                logger.error( "skipped" , exc_info=True )
                worklist.failed.append(msg)
                continue

            msg['_deleteOnPost'] |= set( [ 'from_cluster', 'sum', 'to_clusters' ] )
            new_incoming.append(msg)

        worklist.incoming=new_incoming


Plugins That Change How a File is Downloaded

The after_accept routine is one of the two most common ones in use. It is used to change processing prior to a file being downloaded or sent. To process the file after it has been downloaded, the after_work entry point is used to process the worklist.ok (files that were successfully downloaded) list.

The after_accept routine has an outer loop that cycles through the entire list of incoming messages. It builds a new list of incoming messages from the ones it accepts, while appending all the rejected ones to worklist.failed. The list is just a list of messages, where each message is a python dictionary with all the fields stored in a v03 format message. In the message there are, for example, baseURL and relPath fields:

  • baseURL - the baseURL of the resource from which a file would be obtained.

  • relPath - the relative path to append to the baseURL to get the complete download URL.

This is happenning before transfer (download or sent, or processing) of the file has occurred, so one can change the behaviour by modifying fields in the message. Normally, the download paths (called new_dir, and new_file) will reflect the intent to mirror the original source tree. so if you have a/b/c.txt on the source tree, and are downloading in to directory mine on the local system, the new_dir would be mine/a/b and new_file would be c.txt.

Plugins that Process a file after it is Downloaded

A common use case is for plugins with an after_work entry point to read the file after it is downloaded and transform it into some derived product with a different name. So the new file is created as in the previous section. The message for the downloaded file still needs to be moved onto a list to ensure that it is acknowledged to the broker. Such an entry point would look like this:

[9]:

    def after_work(self, worklist):

        new_ok=[]
        for m in worklist.ok:
             success=do_something()
             if success:
                   new_ok.append(m)
             # since it is already acknowledged, we can just drop it from ok.


        worklist.ok = new_ok
        # the messages on worklist.ok will get posted in the next algorithm phase.

Plugins that Rename Files

The plugin above changes the layout of the files that are to be downloaded, based on the GTStoWIS class, which prescribes a different directory tree on output. There are a lot of fields to update when changing file placement, so best to use:

msg.updatePaths( self.o, new_dir, new_file )

to update all necessary fields in the message properly. It will update ‘new_baseURL’, ‘new_relPath’, ‘new_subtopic’ for use when posting.

The try/except part of the routine deals with the case that, should a file arrive with a name from which a topic tree cannot be built, then a python exception may occur, and the message is added to the failed worklist, and will not be processed by later plugins.

Plugins That Create New Files

The routine above is perfect when a file is just renamed. If a plugin needs to create new files only vaguely derived from the input file, then you want to create new messages for these files from scratch:

import sarracenia

m = sarracenia.Message.fromFileData(sample_fileName, self.o, os.stat(sample_fileName) )

The msg_fromFileData routine will use self.o to apply the appropriate posting settings. no knowledge of message formats, or construction of fields is needed. If the file is not local, such as when writing a poll callback, an alternate routing can be used:

m = sarracenia.Message.fromFileInfo(sample_fileName, self.o, fake_stat_info )

the fake stat record (as per the stat(2) man page or python os.stat() ) can be built from other fields, starting with:

import paramiko

fake_stat = paramiko.SFTPAttributes()
fake_stat.st_mtime = ... something else... perhaps an http header?
fake_stat.st_size = ... again will vary by context.

Either way, once you have the message, it can be appended to the incoming list.

Other Examples

Subclassing of Sarracenia.flowcb is used internally to do a lot of core work. It’s a good idea to look at the sarracenia source code itself. For example:

  • sarracenia.flowcb have a look at the init.py file in there, which provides this information on a more programmatically succinct format.

  • sarracenia.flowcb.gather.file is a class that implements file posting and directory watching, in the sense of a callback that implements the gather entry point, by reading a file system and building a list of messages for processing.

  • sarracenia.flowcb.gather.message is a class that implements reception of messages from message queue protocol flows.

  • sarracenia.flowcb.gather.nodupe This modules removes duplicates from message flows based on Identity checksums.

  • sarracenia.flowcb.post.message is a class that implements posting messages to Message queue protocol flows

  • sarracenia.flowcb.retry when the transfer of a file fails. Sarracenia needs to persist the relevant message to a state file for a later time when it can be tried again.

[ ]: