Writing FlowCallback Plugins

All Sarracenia components implement the Flow algorithm. Sarracenia’s main class is sarracenia.flow and the a great deal of core functionality is implemented using the class created to add custom processing to a flow, the flowcb (flow callback) class.

For a detailed discussion of the flow algorithm itself, have a look at Concepts manual. For any flow, one can add custom processing at a variety of times during processing by sub-classing the sarracenia.flowcb class.

Briefly, the algorithm has the following steps:

  • gather – passively collect notification messages to be processed.

  • poll – actively collect notification messages to be processed.

  • filter – apply accept/reject regular expression matches to the notification message list.

    • after_accept callback entry point

  • work – perform a transfer or transformation on a file.

    • after_work callback entry point

  • post – post the result of the work done for the next step.

A flow callback, is a python class built with routines named to indicate when the programmer wants them to be called.

There are a number of examples of flowcallback classes included with Sarracenia, given in the Flowcallback Reference that can be used as the basis for building custom ones.

This guide describes how to build flow callback classes from scratch.

Config File Entries to use Flow_Callbacks

To add a callback to a a flow, a line is added to the config file:

flowcb sarracenia.flowcb.log.Log

If you follow the convention, and the name of the class is a capitalized version (Log) of the file name (log), then a shorthand is available:

callback log

The class constructor accepts a sarracenia.config.Config class object, called options, that stores all the settings to be used by the running flow. Options is used to override default behaviour of both flows and callbacks. The argument to the flowcb is a standard python class that needs to be in the normal python path for python import, and the last element is the name of the class in within the file that needs to be instantiated as a flowcb instance.

a setting for a callback is declared as follows:

set sarracenia.flowcb.filter.log.Log.logLevel debug

(the prefix for the setting matches the type hierarchy in flowCallback)

when the constructor for the callback is called, it’s options argument will contain:

options.logLevel = 'debug'

If no module specific override is present, then the more global setting is used.

Worklists

Besides option, the other main argument to after_accept and after_work callback routines is the worklist. The worklist is given to entry points occurring during notification message processing, and is a number of worklists of notification messages:

worklist.incoming --> notification messages to process (either new or retries.)
worklist.ok       --> successfully processed
worklist.rejected --> notification messages to not be further processed.
worklist.failed   --> notification messages for which processing failed.
                      failed notification messages will be retried.

Initially, all notification messages are placed in worklists.incoming. if a plugin decides:

  • a notification message is not relevant, moved it to the rejected worklist.

  • a no further processing of the notification message is needed, move it to ok worklist.

  • an operation failed and it should be retried later, move to failed worklist.

Do not remove from all lists, only move notification messages between the worklists. it is necessary to put rejected notification messages in the appropriate worklist so that they are acknowledged as received. Messages can only removed after the acknowledgement has been taken care of.

Logging

Python has great built-in logging, and once has to just use the module in a normal, pythonic way, with:

import logging

After all imports in your python source file, then define a logger for the source file:

logger = logging.getLogger(__name__)

As is normal with the Python logging module, notification messages can then be posted to the log:

logger.debug('got here')

Each notification message in the log will be prefixed with the class and routine emitting the log notification message, as well as the date/time.

One can also implement a per module override to log levels. See sarracenia/moth/amqp.py as and example. For that module, the notification message logging level is upped to warning by default. One can override it with a config file setting:

set sarracenia.moth.amqp.AMQP.logLevel info

in the __init__(self,options) function of the callback, include the lines:

me = "%s.%s" % ( __class__.__module__ , __class__.__name__ )
if 'logLevel' in self.o['settings'][me]:
             logger.setLevel( self.o['logLevel'].upper() )

Initialization and Settings

The next step is declaring a class:

class Myclass(FlowCB):

as a subclass as FlowCB. The main routines in the class are entry points that will be called at the time their name implies. If you a class is missing a given entry point, it will just not be called. The __init__() class is used to initialize things for the callback class:

def __init__(self, options):

    super().__init__(options)

    logging.basicConfig(format=self.o.logFormat,
                        level=getattr(logging, self.o.logLevel.upper()))
    logger.setLevel(getattr(logging, self.o.logLevel.upper()))

    self.o.add_option( 'myoption', 'str', 'usuallythis')

The logging setup lines in __init__ allow setting a specific logging level for this flowCallback class. Once the logging boiler-plate is done, the add_option routine to define settings to for the class. users can include them in configuration files, just like built-in options:

myoption IsReallyNeeded

The result of such a setting is that the self.o.myoption = ‘IsReallyNeeded’. If no value is set in the configuration, self.o.myoption will default to ‘usuallyThis’ There are various kinds of options, where the declared type modifies the parsing:

'count'    integer count type.
'duration' a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)
           modified by a unit suffix ( m-minute, h-hour, w-week )
'flag'     boolean (True/False) option.
'list'     a list of string values, each succeeding occurrence catenates to the total.
           all v2 plugin options are declared of type list.
'size'     integer size. Suffixes k, m, and g for kilo, mega, and giga (base 2) multipliers.
'str'      an arbitrary string value, as will all of the above types, each
           succeeding occurrence overrides the previous one.

Entry Points

Other entry_points, extracted from sarracenia/flowcb/__init__.py

 def ack(self,messagelist):
     Task: acknowledge notification messages from a gather source.

 """
   application of the accept/reject clauses happens here, so after_accept callbacks
   run on a filtered set of notification messages.

 """

 def after_accept(self,worklist):
     """
      Task: just after notification messages go through accept/reject masks,
            operate on worklist.incoming to help decide which notification messages to process further.
            and move notification messages to worklist.rejected to prevent further processing.
            do not delete any notification messages, only move between worklists.
     """

 def after_work(self,worklist):
     Task: operate on worklist.ok (files which have arrived.)

 def download(self,msg) -> bool::

      Task: looking at msg['new_dir'], msg['new_file'], msg['new_inflight_file']
            and the self.o options perform a download of a single file.
            return True on a successful transfer, False otherwise.


 def gather(self):
     Task: gather notification messages from a source... return either:
           * a list of notification messages, or
           * a tuple, (bool:keep_going, list of messages)
           * to curtail further gathers in this cycle.

     return []

 def metrics_report(self) -> dict:

     Return a dictionary of metrics. Example: number of messages remaining in retry queues.

 def on_housekeeping(self):
      do periodic processing.

 def on_html_page(self,page):
      Task: modify an html page.

 def on_line(self,line):
      used in FTP polls, because servers have different formats, modify to canonical use.

      Task: return modified line.

 def on_start(self):
      After the connection is established with the broker and things are instantiated, but
      before any notification message transfer occurs.

 def on_stop(self):
      cleanup processing when stopping.

 def poll(self):
     Task: build worklist.incoming, a form of gather()

 def post(self,worklist):
      Task: operate on worklist.ok, and worklist.failed. modifies them appropriately.
            notification message acknowledgement has already occurred before they are called.

def send(self,msg) -> bool::

      Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer
            of a single file.
            return True on a successful transfer, False otherwise.

      This replaces built-in send functionality for individual files.

 def stop_requested(self):
      Pre-warn a flowcb that a stop has been requested, allowing processing to wrap up
      before the full stop happens.

new_* Fields

During processing of notification messages, the original standard field values are generally left un-changed (as-read in.) To change fields of notification messages forwarded to downstream consumers, one modifies new_field instead of the one from the message, as the original is necessary for successful upstream retrieval:

  • msg[‘new_baseUrl’] … baseUrl to pass to downstream consumers.

  • msg[‘new_dir’] … the directory into which a file will be downloaded or sent.

  • msg[‘new_file’] …. final name of the file to write.

  • msg[‘new_inflight_path’] … calculated name of the temporary file to be written before renaming to msg[‘new_file’] … do not set manually.

  • msg[‘new_relPath’] … calculated from ‘new_baseUrl’, ‘post_baseDir’, ‘new_dir’, ‘new_file’ … do not set manually.

  • msg[‘post_version’] … calculated the encoding format of the message to post (from settings)

  • msg[‘new_subtopic’] … the subtopic hierarchy that will be encoded in the notification message for downstream consumers.

Override Fields

To change processing of messages, one can set overrides to change how built-in algorithms work. For example:

  • msg[‘nodupe_override’] = { ‘key’: …, ‘path’: … } changes how the duplicate detection operates.

  • msg[‘topic’] … defines the topic of a published message (instead of being calculated from other fields.)

  • msg[‘exchangeSplitOverride’] = int … changes how post_ExchangeSplit chooses among multiple postExchanges.

Customizing Duplicate Suppression

The built-in processing for duplicates is to use the identity field as a key, and store the path as the value. So if a file is received with the same key, and the path is already present, then it is considered a duplicate and dropped.

In some cases, we may want only the file name to be used, so if any file with the same name is received twice, regardless of content, then it should be considered a duplicate and dropped. This is useful when multiple systems are producing the same products, but they are not bitwise identical. The built-in flowcb that implements that functionality is below:

import logging
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class Name(FlowCB):
    """
      Override the the comparison so that files with the same name,
      regardless of what directory they are in, are considered the same.
      This is useful when receiving data from two different sources (two different trees)
      and winnowing between them.
    """
    def after_accept(self, worklist):
        for m in worklist.incoming:
            if not 'nodupe_override' in m:
                m['_deleteOnPost'] \|= set(['nodupe_override'])
                m['nodupe_override'] = {}

            m['nodupe_override']['path'] = m['relPath'].split('/')[-1]
            m['nodupe_override']['key'] = m['relPath'].split('/')[-1]

Customizing post_exchangeSplit

The exchangeSplit function allows a single flow to send outputs to different exchanges, numbered 1…n to provide load distribution. The built-in processing does this in a fixed way based on the hash of the identify field. The purpose of exchangeSplit is to allow a common set of downstream paths to receive a subset of the total flow, and for products with similar “routing” to land on the same downstream node. For example, a file with a given checksum, for winnowing to work, has to land on the same downstream node.

It could be that, rather than using a checksum, one would prefer to use some other method to decide which exchange is used:

import logging
from sarracenia.flowcb import FlowCB
import hashlib
logger = logging.getLogger(__name__)


class Distbydir(FlowCB):
  """
    Override the use of the identity field so that products can be grouped by directory in the relPath
    This ensures that all products received from the same directory get posted to the same
    exchange when post_exchangeSplit is active.
  """
  def after_accept(self, worklist):
      for m in worklist.incoming:
          m['_deleteOnPost'] |= set(['exchangeSplitOverride'])
          m['exchangeSplitOverride'] = int(hashlib.md5(m['relPath'].split(os.sep)[-2]).hexdigest()[0])

This routine sets the exchangeSplitOverride field, which needs to be an integer that will be used to pick which of the n exchanges in the post_exchangeSplit exchanges defined. This routine calculates a checksum of the directory containing the file and then converts the first character of that checksum to an integer. If the directory is the same, the exchange chosen will be the same.

Sample Flowcb Sub-Class

This is an example callback class file (gts2wis2.py) that accepts files whose names begin with AHL’s, and renames the directory tree to a different standard, the evolving one for the WMO WIS 2.0 (for more information on that module: https://github.com/wmo-im/GTStoWIS2)

import json
import logging
import os.path

from sarracenia.flowcb import FlowCB
import GTStoWIS2

logger = logging.getLogger(__name__)


class GTS2WIS2(FlowCB):

  def __init__(self, options):

      super().__init__(options,logger)
      self.topic_builder=GTStoWIS2.GTStoWIS2()

  def after_accept(self, worklist):

      new_incoming=[]

      for msg in worklist.incoming:

          # fix file name suffix.
          type_suffix = self.topic_builder.mapAHLtoExtension( msg['new_file'][0:2] )
          tpfx=msg['subtopic']

          # input has relpath=/YYYYMMDD/... + pubTime
          # need to move the date from relPath to BaseDir, adding the T hour from pubTime.
          try:
              new_baseSubDir=tpfx[0]+msg['pubTime'][8:11]
              t='.'.join(tpfx[0:2])+'.'+new_baseSubDir
              new_baseDir = msg['new_dir'] + os.sep + new_baseSubDir
              new_relDir = 'WIS' + os.sep + self.topic_builder.mapAHLtoTopic(msg['new_file'])
              msg['new_dir'] = new_baseDir + os.sep + new_relDir
              msg.updatePaths( self.o, new_baseDir + os.sep + new_relDir, msg['new_file'] )

          except Exception as ex:
              logger.error( "skipped" , exc_info=True )
              worklist.failed.append(msg)
              continue

          msg['_deleteOnPost'] |= set( [ 'from_cluster', 'sum', 'to_clusters' ] )
          new_incoming.append(msg)

      worklist.incoming=new_incoming

The after_accept routine is one of the two most common ones in use.

The after_accept routine has an outer loop that cycles through the entire list of incoming notification messages. The normal processing is that is builds a new list of incoming notification messages, appending all the rejected ones to worklist.failed. The list is just a list of notification messages, where each notification message is a python dictionary with all the fields stored in a v03 format notification message. In the notification message there are, for example, baseURL and relPath fields:

  • baseURL - the baseURL of the resource from which a file would be obtained.

  • relPath - the relative path to append to the baseURL to get the complete download URL.

This is happenning before transfer (download or sent, or processing) of the file has occurred, so one can change the behaviour by modifying fields in the notification message. Normally, the download paths (called new_dir, and new_file) will reflect the intent to mirror the original source tree. so if you have a/b/c.txt on the source tree, and are downloading in to directory mine on the local system, the new_dir would be mine/a/b and new_file would be c.txt.

The plugin above changes the layout of the files that are to be downloaded, based on the GTStoWIS class, which prescribes a different directory tree on output. There are a lot of fields to update when changing file placement, so best to use:

msg.updatePaths( self.o, new_dir, new_file )

to update all necessary fields in the notification message properly. It will update ‘new_baseURL’, ‘new_relPath’, ‘new_subtopic’ for use when posting.

The try/except part of the routine deals with the case that, should a file arrive with a name from which a topic tree cannot be built, then an exception may occur, and the notification message is added to the failed worklist, and will not be processed by later plugins.

Download Renaming

Sometimes the URL used to obtain data from a server isn’t the same as the name one wants to assign to the downloaded result. This occurs often when polling upstream arbitrary web services. For such cases the message format defines the retrievePath, or retrieval path, used as follows:

Standard subscribers will download using retrievePath but assign the download path using relPath. When forwarding after download, the retrievePath should often be removed (to avoid downstream clients pulling from the original source instead of the downloaded copy.)

While the above is a preferred way of defining messages where the download will have a different name from the upstream source, a second method is available, the rename used as follows:

  • msg[‘rename’] = alternate relPath … download it to here instead of using relPath.

again, once downloaded, the rename header should be removed from the message prior to forwarding to downstream clients. the relPath needs to be adjusted.

Note that both of these methods work the same for senders as well. The term ‘download’ is used for simplicity.

Web Sites with non-standard file listings

The poll/nasa_mls

Other Examples

Subclassing of Sarracenia.flowcb is used internally to do a lot of core work. It’s a good idea to look at the sarracenia source code itself. For example:

  • sr3 list fcb is a command to list all the callback classes that are included in the metpx-sr3 package.

  • sarracenia.flowcb have a look at the __init__.py file in there, which provides this information on a more programmatically succinct format.

  • sarracenia.flowcb.gather.file.File is a class that implements file posting and directory watching, in the sense of a callback that implements the gather entry point, by reading a file system and building a list of notification messages for processing.

  • sarracenia.flowcb.gather.message.Message is a class that implements reception of notification messages from message queue protocol flows.

  • sarracenia.flowcb.nodupe.NoDupe This modules removes duplicates from message flows based on Identity checksums.

  • sarracenia.flowcb.post.message.Message is a class that implements posting notification messages to Message queue protocol flows

  • sarracenia.flowcb.retry.Retry when the transfer of a file fails, Sarracenia needs to persist the relevant notification message to a state file for a later time when it can be tried again. This class implements that functionality.

Modifying Files in Flight

The sarracenia.transfer class has an on_data entry point:

 def on_data(self, chunk) -> bytes:
     """
         transform data as it is being read.
         Given a buffer, return the transformed buffer.
         Checksum calculation is based on pre transformation... likely need
         a post transformation value as well.
     """
     # modify the chunk in this body...
     return chunk

def registered_as():
     return ['scr' ]

# copied from sarracenia.transfer.https

def connect(self):

     if self.connected: self.close()

     self.connected = False
     self.sendTo = self.o.sendTo.replace('scr', 'https', 1)
     self.timeout = self.o.timeout

     if not self.credentials(): return False

     return True

to perform inflight data modification, one can sub-class the relevant transfer class. Such a class (scr - strip carriage returns) can be added by putting an import in the configuration file:

import scr.py

then messages where the retrieval url is set to use the scr retrieval scheme will use this custome transfer protocol.

Subclassing Flow

If none of the built-in components ( poll, post, sarra, shovel, subscribe, watch, winnow ) have the behaviour desired, one can build a custom component to do the right thing by sub-classing flow.

Copy one of the flow sub-classes from the source code, and modify to taste. In the configuration file, add the line:

flowMain myComponent

to have the flow use the new component.