Source code for sarracenia.flowcb


#
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Shared Services Canada, 2020
#

import copy
import importlib
import logging
import sys

entry_points = [

    'ack', 'after_accept', 'after_post', 'after_work', 'destfn', 'do_poll', 
    'download', 'gather', 'metricsReport', 'on_cleanup', 'on_declare', 'on_features',
    'on_housekeeping', 'on_sanity', 'on_start', 'on_stop', 
    'please_stop', 'poll', 'post', 'report', 'send', 

]

logger = logging.getLogger(__name__)


[docs] class FlowCB: """ Flow Callback is the main class for implementing plugin customization to flows. sample activation in a configuration file: flowCallback sarracenia.flowcb.name.Name will instantiate an object of that type whose appropriately name methods will be called at the right time. __init__ accepts options as an argument. options is a sarracenia.config.Config object, used to override default behaviour a setting is declared in a configuration file like so:: set sarracenia.flowcb.filter.log.Log.level debug (the prefix for the setting matches the type hierarchy in flowCallback) the plugin should get the setting:: options.level = 'debug' worklist given to on_plugins... * worklist.incoming --> new messages to continue processing * worklist.ok --> successfully processed * worklist.rejected --> messages to not be further processed. * worklist.failed --> messages for which processing failed. Failed messages will be retried. * worklist.directories_ok --> list of directories created during processing. Initially, all messages are placed in incoming. if a plugin entry_point decides: - a message is not relevant, it is moved to the rejected worklist. - all processing has been done, it moves it to the ok worklist - an operation failed and it should be retried later, append it to the failed worklist Do not remove any message from all lists, only move messages between them. it is necessary to put rejected messages in the appropriate worklist so they can be acknowledged as received. Messages can only removed after ack. def __init__(self,options) -> None:: Task: initialization of the flowCallback at instantiation time. usually contains: self.o = options def ack(self,messagelist) -> None:: Task: acknowledge messages from a gather source. def gather(self, messageCountMax) -> (gather_more, messages) :: Task: gather messages from a source... return a tuple: * gather_more ... bool whether to continue gathering * messages ... list of messages or just return a list of messages. In a poll, gather is always called, regardless of vip posession. In all other components, gather is only called when in posession of the vip. return (True, list) OR return list def after_accept(self,worklist) -> None:: Task: just after messages go through accept/reject masks, operate on worklist.incoming to help decide which messages to process further. and move messages to worklist.rejected to prevent further processing. do not delete any messages, only move between worklists. def after_work(self,worklist) -> None:: Task: operate on worklist.ok (files which have arrived.) All messages on the worklist.ok list have been acknowledged, so to suppress posting of them, or futher processing, the messages must be removed from worklist.ok. worklist.failed processing should occur in here as it will be zeroed out after this step. The flowcb/retry.py plugin, for example, processes failed messages. def destfn(self,msg) -> str:: Task: look at the fields in the message, and perhaps settings and return a new file name for the target of the send or download. kind of a last resort function, exists mostly for sundew compatibility. can be used for selective renaming using accept clauses. def download(self,msg) -> bool:: Task: looking at msg['new_dir'], msg['new_file'], msg['new_inflight_file'] and the self.o options perform a download of a single file. return True on a successful transfer, False otherwise. if self.o.dry_run is set, simulate the output of a download without performing it. This replaces built-in download functionality, providing an override. for individual file transfers. ideally you set checksums as you download. def metricsReport(self) -> dict: Return a dictionary of metrics. Example: number of messages remaining in retry queues. def on_cleanup(self) -> None:: allow plugins to perform additional work after broker resources are eliminated. local state files are still present when this runs. def on_declare(self) -> None:: local state files are still already present when this runs. allow plugins to perform additional work besides broker resource setup. def on_housekeeping(self) -> None:: do periodic processing. def on_start(self) -> None:: After the connection is established with the broker and things are instantiated, but before any message transfer occurs. def on_stop(self) -> None:: what it says on the tin... clean up processing when stopping. def poll(self) -> list:: Task: gather messages from a destination... return a list of messages. works like a gather, but... When specified, poll replaces the built-in poll of the poll component. it runs only when the machine running the poll has the vip. in components other than poll, poll is never called. return [] def post(self,worklist) -> None:: Task: operate on worklist.ok, and worklist.failed. modifies them appropriately. message acknowledgement has already occurred before they are called. to indicate failure to process a message, append to worklist.failed. worklist.failed processing should occur in here as it will be zeroed out after this step. def send(self,msg) -> bool:: Task: looking at msg['new_dir'], msg['new_file'], and the self.o options perform a transfer of a single file. return True on a successful transfer, False otherwise. if self.o.dry_run is set, simulate the output of a send without performing it. This replaces built-in send functionality for individual files. def please_stop(self): Pre-warn a flowcb that a stop has been requested, allowing processing to wrap up before the full stop happens. """
[docs] def __init__(self, options, class_logger=None): self.o = options self.stop_requested = False if hasattr(self.o,'logFormat'): logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper())) if hasattr(self.o,'logLevel') and class_logger: class_logger.setLevel(getattr(logging, self.o.logLevel.upper()))
[docs] def please_stop(self): """ flow callbacks should not time.sleep for long periods, but only nap and check between naps if a stop has been requested. """ self.stop_requested = True
[docs] def load_library(factory_path, options): """ Loading the entry points for a python module. It searches the normal python module path using the importlib module. the factory_path is a combined file specification with a dot separator with a special last entry being the name of the class within the file. factory_path a.b.c.C means import the module named a.b.c and instantiate an object of type C. In that class-C object, look for the known callback entry points. or C might be guessed by the last class in the path not following python convention by not starting with a capital letter, in which case, it will just guess. re note that the ~/.config/sr3/plugins will also be in the python library path, so modules placed there will be found, in addition to those in the package itself in the *sarracenia/flowcb* directory callback foo -> foo.Foo sarracenia.flowcb.foo.Foo callback foo.bar -> foo.bar.Bar sarracenia.flowcb.foo.bar.Bar foo.bar sarracenia.flowcb.foo.bar """ if not '.' in factory_path: packagename = factory_path classname =factory_path.capitalize() else: if factory_path.split('.')[-1][0].islower(): packagename = factory_path classname = factory_path.split('.')[-1].capitalize() else: packagename, classname = factory_path.rsplit('.', 1) try: module = importlib.import_module(packagename) class_ = getattr(module, classname) except ModuleNotFoundError: module = importlib.import_module('sarracenia.flowcb.' + packagename) class_ = getattr(module, classname) if hasattr(options, 'settings'): opt = copy.deepcopy(options) # strip off the class prefix. if factory_path in options.settings: for s in options.settings[factory_path]: setattr(opt, s, options.settings[factory_path][s]) else: opt = options plugin = class_(opt) return plugin