{ "cells": [ { "cell_type": "markdown", "id": "acoustic-deviation", "metadata": {}, "source": [ "# Customize File handling with Callbacks.\n", "\n", "\n", "All Sarracenia components implement *the Flow* algorithm, with different\n", "callbacks, in the Python programming language. Sarracenia's main (Python) \n", "class is [sarracenia.flow](../Reference/code.html#module-sarracenia.flow) and the a great deal of core functionality is \n", "implemented using the class created to add custom processing to a flow, the \n", "flowcb (flow callback) class.\n", "\n", "For a detailed discussion of the flow algorithm itself, have a look\n", "at [Concepts](../Explanation/Concepts.html) manual. For any flow, one can\n", "add custom processing at a variety of times during processing by sub-classing\n", "the [sarracenia.flowcb](../Reference/flowcb.html) class.\n", "\n", "Briefly, the algorithm has the following steps:\n", "\n", "* **__init__(self, options)** -- when the import happens, traditional python initialization\n", "* **on_start** -- when an instance is started.\n", "* loop forever\n", " * **gather** -- collect messages to be processed called: worklist.incoming\n", " * **poll** -- another way to collect messages, only in the poll component.\n", " * **filter** -- apply accept/reject regular expression matches to the message list.\n", " moves messages for files not to download from worklist.incoming to worklist.reject\n", " * *after_accept* callback entry point. process worklist.incoming, potentially rejecting some more.\n", " * **ack** -- worklist.rejected messages are acknowledged to upstream source as processing is complete.\n", " * **work** -- perform a transfer or transformation on a file.\n", " * **ack** -- worklist.ok messages for successfully transferred files are acknowledged to upstream source.\n", " * *after_work* callback entry point\n", " * **ack** -- worklist.failed messages for files which not successfully transferred are acknowledged.\n", " * **post** -- post the result of the work done for the next step.\n", " * occasionaly... **on_housekeeping -- do periodic cleanups...\n", "* **on_stop** -- shutdown processing.\n", "\n", "for more details about flowcb entry points available, have a look at the source code: \n", "\n", "* [flowcb](../Reference/flowcb.html)\n" ] }, { "cell_type": "markdown", "id": "external-mention", "metadata": {}, "source": [ "Lets look at using the class in a configuration:" ] }, { "cell_type": "code", "execution_count": 1, "id": "coordinated-cocktail", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-01-12 15:51:55,713 127453 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}\n", "2024-01-12 15:51:55,714 127453 [INFO] root remove removing subscribe/hpfx_amis\n", "\n", "add: 2024-01-12 15:51:56,975 127456 [INFO] sarracenia.sr add copying: /home/peter/Sarracenia/sr3/sarracenia/examples/subscribe/hpfx_amis.conf to /home/peter/.config/sr3/subscribe/hpfx_amis.conf \n", "\n" ] } ], "source": [ "!sr3 remove subscribe/hpfx_amis.conf\n", "!sr3 add subscribe/hpfx_amis.conf" ] }, { "cell_type": "code", "execution_count": 2, "id": "tired-north", "metadata": {}, "outputs": [], "source": [ "!echo messageCountMax 10 >>~/.config/sr3/subscribe/hpfx_amis.conf" ] }, { "cell_type": "markdown", "id": "psychological-ratio", "metadata": {}, "source": [ "have the flow stop after 10 messages are consumed." ] }, { "cell_type": "code", "execution_count": 3, "id": "greater-nevada", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-01-12 15:52:22,624 127524 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}\r\n", "Provided callback classes: ( /home/peter/Sarracenia/sr3/sarracenia ) \r\n", "flowcb/accept/auth_NASA_Earthdata.py flowcb/accept/auth_copernicus.py \r\n", "flowcb/accept/auth_eumetsat.py flowcb/accept/dateappend.py \r\n", "flowcb/accept/delete.py flowcb/accept/downloadbaseurl.py \r\n", "flowcb/accept/hourtree.py flowcb/accept/httptohttps.py \r\n", "flowcb/accept/longflow.py flowcb/accept/pathreplace.py \r\n", "flowcb/accept/posthourtree.py flowcb/accept/postoverride.py \r\n", "flowcb/accept/printlag.py flowcb/accept/rename4jicc.py \r\n", "flowcb/accept/renamedmf.py flowcb/accept/renamewhatfn.py \r\n", "flowcb/accept/save.py flowcb/accept/speedo.py \r\n", "flowcb/accept/sundewpxroute.py flowcb/accept/testretry.py \r\n", "flowcb/accept/toclusters.py flowcb/accept/tohttp.py \r\n", "flowcb/accept/tolocal.py flowcb/accept/tolocalfile.py \r\n", "flowcb/accept/trim_legacy_fields.py flowcb/accept/wmotypesuffix.py \r\n", "flowcb/amserver.py flowcb/block_reassembly.py \r\n", "flowcb/clamav.py flowcb/destfn/replace.py \r\n", "flowcb/destfn/sample.py flowcb/download/mail_ingest.py \r\n", "flowcb/filter/deleteflowfiles.py flowcb/filter/fdelay.py \r\n", "flowcb/filter/pclean_f90.py flowcb/filter/pclean_f92.py \r\n", "flowcb/filter/wmo2msc.py flowcb/gather/file.py \r\n", "flowcb/gather/message.py flowcb/housekeeping/resources.py \r\n", "flowcb/log.py flowcb/mdelaylatest.py \r\n", "flowcb/nodupe/data.py flowcb/nodupe/disk.py \r\n", "flowcb/nodupe/name.py flowcb/nodupe/redis.py \r\n", "flowcb/pclean.py flowcb/poll/airnow.py \r\n", "flowcb/poll/eumetsat.py flowcb/poll/mail.py \r\n", "flowcb/poll/nasa_mls_nrt.py flowcb/poll/nexrad.py \r\n", "flowcb/poll/noaa_hydrometric.py flowcb/poll/odata.py \r\n", "flowcb/poll/poll_NASA_CMR.py flowcb/poll/rate_limit.py \r\n", "flowcb/poll/s3bucket.py flowcb/poll/usgs.py \r\n", "flowcb/post/message.py flowcb/report.py \r\n", "flowcb/retry.py flowcb/rootchown.py \r\n", "flowcb/run.py flowcb/rxqueue_gzip.py \r\n", "flowcb/sample.py flowcb/scheduled/wiski.py \r\n", "flowcb/send/am.py flowcb/send/email.py \r\n", "flowcb/shiftdir2baseurl.py flowcb/trace_on_stop.py \r\n", "flowcb/v2wrapper.py flowcb/wistree.py \r\n", "flowcb/work/age.py flowcb/work/check.py \r\n", "flowcb/work/citypage_check.py flowcb/work/delete.py \r\n", "flowcb/work/rxpipe.py flowcb/work/send_egc_les.py \r\n" ] } ], "source": [ "!sr3 list fcb" ] }, { "cell_type": "markdown", "id": "efficient-picture", "metadata": {}, "source": [ "Adding that line to the configuration means that the wistree flowcb subclass (source above) will be added to \n", "the flow, changing processing by having its routines called... the main one being *after_accept*" ] }, { "cell_type": "code", "execution_count": 4, "id": "external-commercial", "metadata": {}, "outputs": [], "source": [ "!echo callback accept.posthourtree >>~/.config/sr3/subscribe/hpfx_amis.conf" ] }, { "cell_type": "code", "execution_count": 5, "id": "insured-fetish", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-01-12 15:52:39,635 127558 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}\n", ".2024-01-12 15:52:40,036 [INFO] 127562 sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}\n", "2024-01-12 15:52:40,038 [INFO] 127562 sarracenia.config finalize overriding batch for consistency with messageCountMax: {self.batch}\n", "2024-01-12 15:52:40,038 [INFO] 127562 sarracenia.flow loadCallbacks flowCallback plugins to load: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']\n", "2024-01-12 15:52:40,041 [INFO] 127562 sarracenia.flowcb.log __init__ subscribe initialized with: logEvents: {'after_accept', 'on_housekeeping', 'after_work', 'after_post', 'post'}, logMessageDump: False\n", "2024-01-12 15:52:40,042 [INFO] 127562 sarracenia.flow run callbacks loaded: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']\n", "2024-01-12 15:52:40,042 [INFO] 127562 sarracenia.flow run pid: 127562 subscribe/hpfx_amis instance: 0\n", "2024-01-12 15:52:40,441 [INFO] 127562 sarracenia.moth.amqp _queueDeclare queue declared q_anonymous_subscribe.hpfx_amis.68942404.82515581 (as: amqps://anonymous@hpfx.collab.science.gc.ca/), (messages waiting: 0)\n", "2024-01-12 15:52:40,441 [INFO] 127562 sarracenia.moth.amqp getSetup binding q_anonymous_subscribe.hpfx_amis.68942404.82515581 with v02.post.*.WXO-DD.bulletins.alphanumeric.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca/)\n", "2024-01-12 15:52:40,487 [INFO] 127562 sarracenia.flow run now active on vip ['AnyAddressIsFine']\n", "2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:44,644 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.82 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRCN40_KWAL_122052___32878\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 6.28 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SO/KWNB/20/SOVD83_KWNB_121900_RRX__26978\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 6.27 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SM/KWNB/20/SMVD20_KWNB_121800_RRX__52297\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 6.27 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SM/KWNB/20/SMVD20_KWNB_121800_RRX__48301\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 3.97 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRCN40_KWAL_122052___22701\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 3.97 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SN/KWNB/20/SNVD20_KWNB_121900_RRX__17918\n", "2024-01-12 15:52:44,645 [INFO] 127562 sarracenia.flow do_download missing destination directories, makedirs: /tmp/hpfx_amis/20 \n", "2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_122052___32878 \n", "2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SOVD83_KWNB_121900_RRX__26978 \n", "2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SMVD20_KWNB_121800_RRX__52297 \n", "2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SMVD20_KWNB_121800_RRX__48301 \n", "2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_122052___22701 \n", "2024-01-12 15:52:45,153 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SNVD20_KWNB_121900_RRX__17918 \n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 1.53 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRMN70_KWAL_122052___39567\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 1.53 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRCN40_KWAL_122052___5395\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 0.52 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRWA20_KWAL_122052___2278\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.14 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SX/KWAL/20/SXCN40_KWAL_122052___60928\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.14 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SR/KWAL/20/SRME20_KWAL_122052___58721\n", "2024-01-12 15:52:45,263 [INFO] 127562 sarracenia.flowcb.log after_accept accepted: (lag: 2.14 ) https://hpfx.collab.science.gc.ca /20240112/WXO-DD/bulletins/alphanumeric/20240112/SN/KWNB/20/SNVD22_KWNB_121900_RRX__60599\n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRMN70_KWAL_122052___39567 \n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_122052___5395 \n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRWA20_KWAL_122052___2278 \n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SXCN40_KWAL_122052___60928 \n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRME20_KWAL_122052___58721 \n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SNVD22_KWNB_121900_RRX__60599 \n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flow please_stop ok, telling 5 callbacks about it.\n", "2024-01-12 15:52:45,735 [INFO] 127562 sarracenia.flow run starting last pass (without gather) through loop for cleanup.\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flow please_stop ok, telling 5 callbacks about it.\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flow run on_housekeeping pid: 127562 subscribe/hpfx_amis instance: 0\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flowcb.gather.message on_housekeeping messages: good: 12 bad: 0 bytes: 1.6 KiB average: 140 Bytes\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.flowcb.retry on_housekeeping on_housekeeping\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping work_retry_00 on_housekeeping\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping No retry in list\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000168\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping post_retry_000 on_housekeeping\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping No retry in list\n", "2024-01-12 15:52:45,741 [INFO] 127562 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000106\n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.housekeeping.resources on_housekeeping Current Memory cpu_times: user=0.44 system=0.07\n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 92.3 MiB, accumulating count (12 or 12/100 so far) before self-setting threshold\n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats version: 3.00.51rc5, started: 5 seconds ago, last_housekeeping: 5.7 seconds ago \n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats messages received: 12, accepted: 12, rejected: 0 rate accepted: 100.0% or 2.1 m/s\n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats files transferred: 12 bytes: 2.2 KiB rate: 386 Bytes/sec\n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log stats lag: average: 3.30, maximum: 6.28 \n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flowcb.log on_housekeeping housekeeping\n", "2024-01-12 15:52:45,742 [INFO] 127562 sarracenia.flow run clean stop from run loop\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2024-01-12 15:52:45,763 [INFO] 127562 sarracenia.flowcb.gather.message on_stop closing\n", "2024-01-12 15:52:45,764 [INFO] 127562 sarracenia.flow close flow/close completed cleanly pid: 127562 subscribe/hpfx_amis instance: 0\n", "\n" ] } ], "source": [ "!sr3 foreground subscribe/hpfx_amis.conf" ] }, { "cell_type": "markdown", "id": "stretch-directive", "metadata": {}, "source": [ "Without the plugin, the download would put all files directly the reception directory. with the addition of the wistree callback, it puts places the file in /tmp/hpfx_amis. With the change it puts it in the WIS tree of directories, and adds a file type suffix.\n" ] }, { "cell_type": "markdown", "id": "funny-isolation", "metadata": {}, "source": [ "## Config File Entries and Callbacks\n", "\n", "\n", "[flowcb.log](../Reference/flowcb.html#module-sarracenia.flowcb.log)\n", "\n", "To add a callback to a a flow, a line is added to the flows's configuration file:\n", "\n", " flowcb sarracenia.flowcb.log.Log\n", "\n", "If you follow the convention, and the name of the class is a capitalized\n", "version (Log) of the file name (log), then a shorthand is available:\n", "\n", " callback log \n", "\n", "Either way it is done, it will cause Sarracenia to import the class and then\n", "look for entry points in the class to call at appropriate times.\n", "\n", "The class constructor accepts a sarracenia.config.Config class object,\n", "called options, that stores all the settings to be used by the running flow.\n", "Options is used to override default behaviour of both flows and callbacks.\n", "The argument to the flowcb is a standard python class that needs to be\n", "in the normal python path for python *import*, and the last element\n", "is the name of the class in within the file that needs to be instantiated\n", "as a flowcb instance.\n", "\n", "a setting for a callback is declared as follows:\n", "\n", " set sarracenia.flowcb.filter.log.Log.logLevel debug\n", "\n", "(the prefix for the setting matches the type hierarchy in flowCallback)\n", "\n", "when the constructor for the callback is called, it's options\n", "argument will contain:\n", "\n", " options.logLevel = 'debug'\n", "\n", "If no module specific override is present, then the more global\n", "setting is used.\n", "\n", "So usage of callbacks can be made without much python knowledge at all,\n", "just the ability to create configuration files.\n", "\n", "Beyond this point, we find advice for people who want to write their\n", "own callbacks in Python. Callbacks are ordinary Python, with a few wrinkles:" ] }, { "cell_type": "markdown", "id": "shared-album", "metadata": {}, "source": [ "## Writing Your Own Callbacks\n", "\n", "\n", "A flow callback, is a python class built with routines named to\n", "indicate when the programmer wants them to be called.\n", "To do that, create a routine which subclasses *sarracenia.flowcb.FlowCB*\n", "so the class will normally have:\n", "\n", " from sarracenia.flowcb import FlowCB\n", "\n", "in among the imports near the top of the file.\n", "In the main part of the file, there will be the\n", "custom callback classes:\n", "\n", " class Myclass(FlowCB):\n", "\n", "declared as a subclass as FlowCB. The main routines in the class are entry points\n", "that will be called at the time their name implies. If you a class is missing a\n", "given entry point, it will just not be called. The __init__() class is used to\n", "initialize things for the callback class:\n", "\n", " def __init__(self, options):\n", "\n", " self.o = options\n", "\n", " logging.basicConfig(format=self.o.logFormat,\n", " level=getattr(logging, self.o.logLevel.upper()))\n", " logger.setLevel(getattr(logging, self.o.logLevel.upper()))\n", "\n", " self.o.add_option( 'myoption', 'str', 'usuallyThis')\n", "\n", "The logging setup lines in __init__ allow setting a specific logging level\n", "for this flowCallback class. Once the logging boiler-plate is done,\n", "the add_option routine to define settings to for the class.\n", "users can include them in configuration files, just like built-in options:\n", "\n", " myoption IsReallyNeeded\n", "\n", "The result of such a setting is that the *self.o.myoption = 'IsReallyNeeded'*.\n", "If no value is set in the configuration, *self.o.myoption* will default to *'usuallyThis'*\n", "There are various *kinds* of options, where the declared type modifies the parsing:\n", " \n", " 'count' integer count type. \n", " 'duration' a floating point number indicating a quantity of seconds (0.001 is 1 milisecond)\n", " modified by a unit suffix ( m-minute, h-hour, w-week ) \n", " 'flag' boolean (True/False) option.\n", " 'list' a list of string values, each succeeding occurrence catenates to the total.\n", " all v2 plugin options are declared of type list.\n", " 'size' integer size. Suffixes k, m, and g for kilo, mega, and giga (base 2) multipliers.\n", " 'str' an arbitrary string value, as will all of the above types, each \n", " succeeding occurrence overrides the previous one.\n", "\n" ] }, { "cell_type": "markdown", "id": "determined-medicare", "metadata": {}, "source": [ "## Worklists\n", "\n", "Besides *options*, the other main argument to after_accept and after_work callback\n", "routines is the worklist. The worklist is given to entry points occurring during message\n", "processing, and is a number of worklists of messages:\n", "\n", " worklist.incoming --> messages to process (either new or retries.)\n", " worklist.ok --> successfully processed\n", " worklist.rejected --> messages to not be further processed.\n", " worklist.failed --> messages for which processing failed.\n", " failed messages will be retried.\n", " worklist.directories_ok --> list of directories created during processing.\n", "\n", "Initially, all messages are placed in worklists.incoming.\n", "if a plugin decides:\n", "\n", "- a message is not relevant, moved it to the rejected worklist.\n", "- a no further processing of the message is needed, move it to ok worklist.\n", "- an operation failed and it should be retried later, move to failed worklist.\n", "\n", "Do not remove from all lists, only move messages between the worklists.\n", "it is necessary to put rejected messages in the appropriate worklist\n", "so that they are acknowledged as received. Messages can only removed\n", "after the acknowledgement has been taken care of." ] }, { "cell_type": "markdown", "id": "advised-ordinance", "metadata": {}, "source": [ "## Logging\n", "\n", "\n", "Python has great built-in logging, and once has to just use the module\n", "in a normal, pythonic way, with::\n", "\n", " import logging\n", "\n", "After all imports in your python source file, then define a logger\n", "for the source file::\n", "\n", " logger = logging.getLogger(\\__name\\__)\n", "\n", "As is normal with the Python logging module, messages can then\n", "be posted to the log::\n", "\n", " logger.debug('got here')\n", "\n", "Each message in the log will be prefixed with the class and routine\n", "emitting the log message, as well as the date/time.\n", "\n", "## Sample Flow Callback Class\n", "\n", "With the above information about option handling, worklists, and logging, we\n", "are ready to understand the wistree module we just used. As a very simple example,\n", "here is the source code of the callback used above is given below:" ] }, { "cell_type": "code", "execution_count": null, "id": "27954f45", "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "Plugin posthourtree.py:\n", " When posting a file, insert an hourly directory into the delivery path hierarchy.\n", "\n", "Example:\n", " input A/B/c.gif --> output A/B//c.gif\n", "\n", "Usage:\n", " callback accept.posthourtree\n", "\n", "\"\"\"\n", "import logging\n", "import sys, os, os.path, time, stat\n", "from sarracenia.flowcb import FlowCB\n", "\n", "logger = logging.getLogger(__name__)\n", "\n", "\n", "class Posthourtree(FlowCB):\n", "\n", " def after_accept(self, worklist):\n", " for message in worklist.incoming:\n", " datestr = time.strftime('%H', time.gmtime()) # pick the hour\n", " # insert the hour into the rename header of the message to be posted.\n", " message['new_dir'] += '/' + datestr \n", " logger.info( f\"post_hour_tree: new_dir: {message['new_dir']}\" )" ] }, { "cell_type": "markdown", "id": "million-smoke", "metadata": {}, "source": [ "## Sample Flowcb Sub-Class\n", "\n", "\n", "This wistree.py class, shows more aspects of the callback API, with an __init__.py as well as bringing in an externam python module, as well as adding fields to the messages.\n", "The Wistree class accepts files whose names begin with AHL's (World Meteorological Organization Abbreviated Header Lines for meteorological products), and renames the directory tree to a different standard, the evolving one for the WMO WIS 2.0 (for more information on that module:\n", "https://github.com/wmo-im/GTStoWIS2)" ] }, { "cell_type": "code", "execution_count": 6, "id": "related-consensus", "metadata": {}, "outputs": [], "source": [ " from sarracenia.flowcb import FlowCB\n", " import logging\n", " import GTStoWIS2\n", "\n", " logger = logging.getLogger(__name__)\n", "\n", "\n", " class Wistree(FlowCB):\n", "\n", " def __init__(self, options):\n", "\n", " if hasattr(options, 'logLevel'):\n", " logger.setLevel(getattr(logging, options.logLevel.upper()))\n", " else:\n", " logger.setLevel(logging.INFO)\n", " self.topic_builder=GTStoWIS2.GTStoWIS2()\n", " self.o = options\n", "\n", "\n", " def after_accept(self, worklist):\n", "\n", " new_incoming=[]\n", "\n", " for msg in worklist.incoming:\n", "\n", " # fix file name suffix.\n", " type_suffix = self.topic_builder.mapAHLtoExtension( msg['new_file'][0:2] )\n", " tpfx=msg['subtopic']\n", " \n", " # input has relpath=/YYYYMMDDTHHMM/... + pubTime\n", " # need to move the date from relPath to BaseDir, adding the T hour from pubTime.\n", " try:\n", " new_baseSubDir=tpfx[0]+msg['pubTime'][8:11]\n", " t='.'.join(tpfx[0:2])+'.'+new_baseSubDir\n", " new_baseDir = msg['new_dir'] + os.sep + new_baseSubDir\n", " new_relDir = 'WIS' + os.sep + self.topic_builder.mapAHLtoTopic(msg['new_file'])\n", " new_dir = new_baseDir + os.sep + new_relDir\n", " \n", " if msg['new_file'][-len(type_suffix):] != type_suffix: \n", " new_file = msg['new_file']+type_suffix\n", " else:\n", " new_file = msg['new_file']\n", " \n", " msg.updatePaths( self.o, new_baseDir + os.sep + new_relDir, new_file )\n", " except Exception as ex:\n", " logger.error( \"skipped\" , exc_info=True )\n", " worklist.failed.append(msg)\n", " continue\n", " \n", " msg['_deleteOnPost'] |= set( [ 'from_cluster', 'sum', 'to_clusters' ] )\n", " new_incoming.append(msg)\n", "\n", " worklist.incoming=new_incoming \n", "\n" ] }, { "cell_type": "markdown", "id": "offshore-student", "metadata": {}, "source": [ "\n", "## Plugins That Change How a File is Downloaded\n", "\n", "\n", "The *after_accept* routine is one of the two most common ones in use. It is used to change processing prior to a file being downloaded or sent. To process the file after it has been downloaded, the *after_work* entry point is used to process the worklist.ok (files that were successfully downloaded) list.\n", "\n", "The after_accept routine has an outer loop that cycles through the entire list of incoming messages. It builds a new list of incoming messages from the ones it accepts, while appending all the rejected ones to *worklist.failed.* The list is just a list of messages, where each message is a python dictionary with all the fields stored in a v03 format message. In the message there are, for example, *baseURL* and *relPath* fields:\n", "\n", "* baseURL - the baseURL of the resource from which a file would be obtained.\n", "* relPath - the relative path to append to the baseURL to get the complete download URL.\n", "\n", "This is happenning before transfer (download or sent, or processing) of the file has occurred, so one can change the behaviour by modifying fields in the message. Normally, the download paths (called new_dir, and new_file) will reflect the intent to mirror the original source tree. so if you have *a/b/c.txt* on the source tree, and are downloading in to directory *mine* on the local system, the new_dir would be *mine/a/b* and new_file would be *c.txt*.\n", "\n", "## Plugins that Process a file after it is Downloaded\n", "\n", "\n", "A common use case is for plugins with an *after_work* entry point to read the file after it is downloaded and transform it into some derived product with a different name. So the new file is created as in the previous section. The message for the downloaded file still needs to be moved onto a list to ensure that it is acknowledged to the broker. Such an entry point would look like this:" ] }, { "cell_type": "code", "execution_count": 9, "id": "disciplinary-dublin", "metadata": {}, "outputs": [], "source": [ "\n", " def after_work(self, worklist):\n", "\n", " new_ok=[]\n", " for m in worklist.ok:\n", " success=do_something()\n", " if success:\n", " new_ok.append(m)\n", " # since it is already acknowledged, we can just drop it from ok.\n", " \n", " \n", " worklist.ok = new_ok\n", " # the messages on worklist.ok will get posted in the next algorithm phase." ] }, { "cell_type": "markdown", "id": "coastal-moses", "metadata": {}, "source": [ "\n", "## Plugins that Rename Files\n", "\n", "\n", "The plugin above changes the layout of the files that are to be downloaded, based on the [GTStoWIS](https://github.com/wmo-im/GTStoWIS) class, which prescribes a different directory tree on output. There are a lot of fields to update when changing file placement, so best to use:\n", "\n", " msg.updatePaths( self.o, new_dir, new_file )\n", "\n", "to update all necessary fields in the message properly. It will update 'new_baseURL', 'new_relPath', 'new_subtopic' for use when posting.\n", "\n", "The try/except part of the routine deals with the case that, should a file arrive with a name from which a topic tree cannot be built, then a python exception may occur, and the message is added to the failed worklist, and will not be processed by later plugins.\n", "\n", "## Plugins That Create New Files\n", "\n", "\n", "The routine above is perfect when a file is just renamed. If a plugin needs to create new files only vaguely derived from the input file, then you want to create new messages for these files from scratch:\n", "\n", " import sarracenia\n", "\n", " m = sarracenia.Message.fromFileData(sample_fileName, self.o, os.stat(sample_fileName) )\n", " \n", "The msg_fromFileData routine will use self.o to apply the appropriate posting settings.\n", "no knowledge of message formats, or construction of fields is needed. If the file is not\n", "local, such as when writing a poll callback, an alternate routing can be used:\n", "\n", " m = sarracenia.Message.fromFileInfo(sample_fileName, self.o, fake_stat_info )\n", "\n", "the fake stat record (as per the stat(2) man page or python os.stat() ) can be built from other fields, starting with:\n", "\n", " import paramiko\n", "\n", " fake_stat = paramiko.SFTPAttributes()\n", " fake_stat.st_mtime = ... something else... perhaps an http header?\n", " fake_stat.st_size = ... again will vary by context.\n", " \n", "Either way, once you have the message, it can be appended to the incoming list.\n" ] }, { "cell_type": "markdown", "id": "inclusive-scope", "metadata": {}, "source": [ "## Other Examples\n", "\n", "\n", "Subclassing of [Sarracenia.flowcb](../Reference/flowcb.html) is used internally to do a lot of core work. It's a good idea to look at the sarracenia source code itself. For example:\n", "\n", "* [sarracenia.flowcb](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/__init__.py) have a look at the __init__.py file in there, which\n", " provides this information on a more programmatically succinct format.\n", "\n", "* [sarracenia.flowcb.gather.file](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/gather/file.py)\n", " is a class that implements\n", " file posting and directory watching, in the sense of a callback that\n", " implements the *gather* entry point, by reading a file system and building a\n", " list of messages for processing.\n", "\n", "* [sarracenia.flowcb.gather.message](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/gather/message.py)\n", "is a class that implements reception of messages from message queue protocol flows.\n", "\n", "* [sarracenia.flowcb.gather.nodupe](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/nodupe)\n", "This modules removes duplicates from message\n", " flows based on Identity checksums.\n", "\n", "* [sarracenia.flowcb.post.message](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/post/message.py)\n", "is a class that implements posting\n", " messages to Message queue protocol flows\n", "\n", "* [sarracenia.flowcb.retry](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/retry.py)\n", "when the transfer of a file fails. Sarracenia needs to persist the relevant message to a state file for\n", " a later time when it can be tried again. \n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "expensive-yellow", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 5 }