{ "cells": [ { "cell_type": "markdown", "id": "informative-conservation", "metadata": {}, "source": [ "# flow API Example\n", "\n", "The [sarracenia.flow class](../Reference/code.rst#module-sarracenia.flow) provides built in accept/reject filtering for messages, supports built-in downloading in several protocols, retries on failure, and allows the creation of callbacks, to customize processing.\n", "\n", "You need to provide a configuration as an argument when instantiating a subscriber.\n", "the _sarracenia.config.no_file_config()_ returns an empty configuration without consulting\n", "any of the sr3 configuration file tree.\n", "\n", "After adding the modifications needed to the configuration, the subscriber is then initiated and run." ] }, { "cell_type": "code", "execution_count": 1, "id": "weekly-terminology", "metadata": { "scrolled": true }, "outputs": [], "source": [ "!mkdir /tmp/flow_demo" ] }, { "cell_type": "markdown", "id": "exterior-folks", "metadata": {}, "source": [ "make a directory for the files you are going to download.\n", "the root of the directory tree to must exist." ] }, { "cell_type": "code", "execution_count": 1, "id": "aggregate-election", "metadata": {}, "outputs": [], "source": [ "import copy\n", "import re\n", "import sarracenia.config\n", "from sarracenia.flow.subscribe import Subscribe\n", "import sarracenia.flowcb\n", "import sarracenia.config.credentials\n", "import socket\n", "\n", "cfg = sarracenia.config.no_file_config()\n", "\n", "cfg.broker = sarracenia.config.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')\n", "cfg.topicPrefix = [ 'v02', 'post']\n", "cfg.component = 'subscribe'\n", "cfg.config = 'flow_demo'\n", "cfg.action = 'start'\n", "bindings = [ {'exchange':'xpublic', 'prefix':['v02', 'post'],\n", " 'sub':['*.WXO-DD.observations.swob-ml.#']}]\n", "cfg.queueName = 'q_${BROKER_USER}_${HOSTNAME}_${QUEUESHARE}'\n", "cfg.download=True\n", "cfg.batch=1\n", "cfg.messageCountMax=5\n", "cfg.queueShare = 'SomethingSessionfulToYou'\n", "\n", "\n", "cfg.settings = { 'sarracenia.moth.amqp.AMQP': { 'logLevel':'debug' } }\n", "\n", "# Note: queue name must start with q_ because server is configured to deny anything else.\n", "#\n", "\n", "queue = {'name': 'q_anonymous_' + socket.getfqdn() + '_' + cfg.queueShare,\n", " 'template': cfg.queueName,\n", " 'auto_delete' : False, # AO == amqp only\n", " 'durable': True, # AO: queue should survive broker reboots\n", " 'expire': 600, # MO: seconds until queue with no consumers disappears.\n", " 'prefetch': 5,\n", " 'qos': 1,\n", " 'tlsRigour': 'normal',\n", " 'bind': True, # whether to bind queues/subscriptions\n", " 'declare': True # whether to declare queues/subscriptions\n", " }\n", "\n", "\n", "cfg.subscriptions = sarracenia.config.subscription.Subscriptions( [ {\n", " 'broker': cfg.broker,\n", " 'bindings': bindings,\n", " 'queue' : queue\n", " } ] )\n", "\n", "\n", "# set the instance number for the flow class.\n", "cfg.no=0\n", "\n", "# set other settings based on provided ones, so it is ready for use.\n", "\n", "cfg.finalize()\n", "\n", "# accept/reject patterns:\n", "pattern=\".*\"\n", "\n", "# to_match, write_to_dir, DESTFN, regex_to_match, accept=True,mirror,strip, pstrip,flatten\n", "cfg.masks = [(pattern, \"/tmp/flow_demo\", None, re.compile(pattern), True, False, False, False, '/', None)]\n", "\n", "\n" ] }, { "cell_type": "markdown", "id": "legitimate-necessity", "metadata": {}, "source": [ "\n", "## starters.\n", "the broker, bindings, and queueName settings are explained in the moth notebook.\n", "\n", "## cfg.download\n", "\n", "Whether you want the flow to download the files corresponding to the messages.\n", "If true, then it will download the files.\n", "\n", "## cfg.batch\n", "\n", "Messages are processed in batches. The number of messages to retrieve per call to newMessages()\n", "is limited by the _batch_ setting. We set it to 1 here so you can see each file being downloaded immediately when the corresponding message is downloaded. you can leave this blank, and it defaults to 25. Settings are matter of taste and use case.\n", "\n", "## cfg.messageCountMax\n", "\n", "Normally we just leave this setting at it's default (0) which has no effect on processing.\n", "for demonstration purposes, we limit the number of messages the subscriber will process with this setting.\n", "after _messageCountMax_ messages have been received, stop processing.\n", "\n", "\n", "## cfg.masks\n", "masks are a compiled form of accept/reject directives. a relPath is compared to the regex in the mask.\n", "If the regex matches, and accept is true, then the message is accepted for further processing.\n", "If the regex matches, but accept is False, then processing of the message is stopped (the message is rejected.)\n", "\n", "masks are a tuple. the meaning can be looked up in the sr3(1) man page.\n", "\n", "* pattern_string, the input regular expression string, to be compiled by re routines.\n", "* directory, where to put the files downloaded (root of the tree, when mirroring)\n", "* fn, transformation of filename to do. None is the 99% use case.\n", "* regex, compiled regex version of the pattern_string\n", "* accept(True/False), if pattern matches then accept message for further processing.\n", "* mirror(True/False), when downloading build a complete tree to mirror the source, or just dump in directory\n", "* strip(True/False), modify the relpath by stripping entries from the left.\n", "* pstrip(True/False), strip entries based on patterm\n", "* flatten(char ... '/' means do not flatten.) )\n", "\n", "## cfg.no, cfg.pid_filename\n", "\n", "These settings are needed because they would ordinarily be set by the sarracenia.instance class which is\n", "normally used to launch flows. They allow setting up of run-time paths for retry_queues, and statefiles,\n", "to remember settings if need be between runs.\n" ] }, { "cell_type": "code", "execution_count": 2, "id": "musical-discrimination", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2025-05-06 18:02:26,977 [DEBUG] sarracenia.flowcb.retry __init__ sr_retry __init__\n", "2025-05-06 18:02:26,978 [DEBUG] sarracenia.config add_option 0 retry_driver declared as type: value:disk\n", "2025-05-06 18:02:26,978 [DEBUG] sarracenia.config add_option 0 retry_refilter declared as type: value:False\n", "2025-05-06 18:02:26,979 [DEBUG] sarracenia.config add_option 0 MemoryMax declared as type: value:0\n", "2025-05-06 18:02:26,979 [DEBUG] sarracenia.config add_option 0 MemoryBaseLineFile declared as type: value:100\n", "2025-05-06 18:02:26,980 [DEBUG] sarracenia.config add_option 0 MemoryMultiplier declared as type: value:3\n", "2025-05-06 18:02:26,981 [DEBUG] sarracenia.config add_option 0 logEvents declared as type: value:{'after_post', 'after_work', 'on_housekeeping', 'after_accept'}\n", "2025-05-06 18:02:26,981 [DEBUG] sarracenia.config add_option 0 logMessageDump declared as type: value:False\n", "2025-05-06 18:02:26,981 [DEBUG] sarracenia.config check_undeclared_options missing defaults: {'feeder', 'count', 'exchangeSuffix', 'realpathFilter', 'report_exchange', 'post_topic', 'identity', 'MemoryBaseLineFile', 'sendTo', 'cluster', 'exchangeSplit', 'nodupe_basis', 'accelWgetCommand', 'notify_only', 'post_on_start', 'reconnect', 'pollUrl', 'header', 'post_exchange', 'httpsSafeQuote', 'inplace', 'save', 'logMessageDump', 'blockSize', 'post_exchangeSuffix', 'follow_symlinks', 'topic', 'post_exchangeSplit', 'force_polling', 'accelCpCommand', 'accelScpCommand', 'retry_driver', 'restore', 'MemoryMax', 'MemoryMultiplier'}\n", "2025-05-06 18:02:26,982 [DEBUG] sarracenia.diskqueue __init__ work_retry_00 __init__\n", "2025-05-06 18:02:27,069 [DEBUG] amqp _on_start Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@hpfx2.collab.science.gc.ca', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.2.1', 'product': 'RabbitMQ', 'version': '3.9.27'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']\n", "2025-05-06 18:02:27,097 [DEBUG] amqp __init__ using channel_id: 1\n", "2025-05-06 18:02:27,113 [DEBUG] amqp _on_open_ok Channel open\n", "2025-05-06 18:02:27,113 [DEBUG] amqp __init__ using channel_id: 2\n", "2025-05-06 18:02:27,148 [DEBUG] amqp _on_open_ok Channel open\n", "2025-05-06 18:02:27,180 [INFO] sarracenia.moth.amqp _queueDeclare queue declared q_anonymous_fractal_SomethingSessionfulToYou (as: amqps://anonymous@hpfx.collab.science.gc.ca), (messages waiting: 3352)\n", "2025-05-06 18:02:27,181 [INFO] sarracenia.moth.amqp getSetup binding q_anonymous_fractal_SomethingSessionfulToYou with v02.post.*.WXO-DD.observations.swob-ml.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)\n", "2025-05-06 18:02:27,195 [DEBUG] sarracenia.moth.amqp getSetup getSetup ... Done!\n", "2025-05-06 18:02:27,209 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'local_offset', 'exchange', '_format', 'topic', 'ack_id', 'subtopic', 'subscription_index'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215340', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_cc20303d99e2f4ff798884b0d54d5a10:DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215340', 'source': 'anonymous', 'mtime': '20250506T215341.256', 'atime': '20250506T215341.256', 'pubTime': '20250506T215341.256', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20250506/WXO-DD/observations/swob-ml/20250506/CWLJ/2025-05-06-2153-CWLJ-AUTO-minute-swob.xml', 'subtopic': ['20250506', 'WXO-DD', 'observations', 'swob-ml', '20250506', 'CWLJ'], 'identity': {'method': 'md5', 'value': '+dZT5kYQnEU0d18cD8antw=='}, 'size': 9430, 'exchange': 'xpublic', 'topic': 'v02.post.20250506.WXO-DD.observations.swob-ml.20250506.CWLJ', 'ack_id': {'delivery_tag': 1, 'channel_id': 2, 'connection_id': '4f89e0df-234f-4c89-a935-104ef4ab6193_sub', 'broker': 'hpfx.collab.science.gc.ca:5671//'}, 'local_offset': 0, 'subscription_index': 0}\n", "2025-05-06 18:02:27,210 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 525.95 ) exchange: xpublic subtopic: 20250506.WXO-DD.observations.swob-ml.20250506.CWLJ a file with baseUrl: https://hpfx.collab.science.gc.ca relPath: /20250506/WXO-DD/observations/swob-ml/20250506/CWLJ/2025-05-06-2153-CWLJ-AUTO-minute-swob.xml sundew_extension: DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215340 id: +dZT5kY size: 9430 \n", "2025-05-06 18:02:27,211 [DEBUG] sarracenia.config add_option 0 accelWgetCommand declared as type: value:/usr/bin/wget %s -o - -O %d\n", "2025-05-06 18:02:27,211 [DEBUG] sarracenia.config add_option 0 httpsSafeQuote declared as type: value:/+\n", "2025-05-06 18:02:27,313 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2025-05-06-2153-CWLJ-AUTO-minute-swob.xml \n", "2025-05-06 18:02:27,335 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'local_offset', 'exchange', '_format', 'topic', 'ack_id', 'subtopic', 'subscription_index'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215342', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_897c298a738b12b599ce0a0a41d4de56:DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215342', 'source': 'anonymous', 'mtime': '20250506T215343.267', 'atime': '20250506T215343.267', 'pubTime': '20250506T215343.267', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20250506/WXO-DD/observations/swob-ml/20250506/CWIK/2025-05-06-2153-CWIK-AUTO-minute-swob.xml', 'subtopic': ['20250506', 'WXO-DD', 'observations', 'swob-ml', '20250506', 'CWIK'], 'identity': {'method': 'md5', 'value': 'QzRgaoONkPYgW1AQISJomQ=='}, 'size': 9347, 'exchange': 'xpublic', 'topic': 'v02.post.20250506.WXO-DD.observations.swob-ml.20250506.CWIK', 'ack_id': {'delivery_tag': 2, 'channel_id': 2, 'connection_id': '4f89e0df-234f-4c89-a935-104ef4ab6193_sub', 'broker': 'hpfx.collab.science.gc.ca:5671//'}, 'local_offset': 0, 'subscription_index': 0}\n", "2025-05-06 18:02:27,335 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 524.07 ) exchange: xpublic subtopic: 20250506.WXO-DD.observations.swob-ml.20250506.CWIK a file with baseUrl: https://hpfx.collab.science.gc.ca relPath: /20250506/WXO-DD/observations/swob-ml/20250506/CWIK/2025-05-06-2153-CWIK-AUTO-minute-swob.xml sundew_extension: DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215342 id: QzRgaoO size: 9347 \n", "2025-05-06 18:02:27,406 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2025-05-06-2153-CWIK-AUTO-minute-swob.xml \n", "2025-05-06 18:02:27,434 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'local_offset', 'exchange', '_format', 'topic', 'ack_id', 'subtopic', 'subscription_index'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215342', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_da727c53f038b5b7da3f448e45d9dda9:DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215342', 'source': 'anonymous', 'mtime': '20250506T215343.268', 'atime': '20250506T215343.268', 'pubTime': '20250506T215343.268', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20250506/WXO-DD/observations/swob-ml/20250506/CWNQ/2025-05-06-2153-CWNQ-AUTO-minute-swob.xml', 'subtopic': ['20250506', 'WXO-DD', 'observations', 'swob-ml', '20250506', 'CWNQ'], 'identity': {'method': 'md5', 'value': '5utOLH9nlj8BbYcXpMIkKg=='}, 'size': 8283, 'exchange': 'xpublic', 'topic': 'v02.post.20250506.WXO-DD.observations.swob-ml.20250506.CWNQ', 'ack_id': {'delivery_tag': 3, 'channel_id': 2, 'connection_id': '4f89e0df-234f-4c89-a935-104ef4ab6193_sub', 'broker': 'hpfx.collab.science.gc.ca:5671//'}, 'local_offset': 0, 'subscription_index': 0}\n", "2025-05-06 18:02:27,435 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 524.17 ) exchange: xpublic subtopic: 20250506.WXO-DD.observations.swob-ml.20250506.CWNQ a file with baseUrl: https://hpfx.collab.science.gc.ca relPath: /20250506/WXO-DD/observations/swob-ml/20250506/CWNQ/2025-05-06-2153-CWNQ-AUTO-minute-swob.xml sundew_extension: DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215342 id: 5utOLH9 size: 8283 \n", "2025-05-06 18:02:27,506 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2025-05-06-2153-CWNQ-AUTO-minute-swob.xml \n", "2025-05-06 18:02:27,525 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'local_offset', 'exchange', '_format', 'topic', 'ack_id', 'subtopic', 'subscription_index'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215341', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_f26ee15b5d44070ce75ed65271414b09:DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215341', 'source': 'anonymous', 'mtime': '20250506T215343.428', 'atime': '20250506T215343.428', 'pubTime': '20250506T215343.428', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20250506/WXO-DD/observations/swob-ml/20250506/CWUT/2025-05-06-2153-CWUT-AUTO-minute-swob.xml', 'subtopic': ['20250506', 'WXO-DD', 'observations', 'swob-ml', '20250506', 'CWUT'], 'identity': {'method': 'md5', 'value': 'KQWbBWIE3OMSaCVjhGE79Q=='}, 'size': 9818, 'exchange': 'xpublic', 'topic': 'v02.post.20250506.WXO-DD.observations.swob-ml.20250506.CWUT', 'ack_id': {'delivery_tag': 4, 'channel_id': 2, 'connection_id': '4f89e0df-234f-4c89-a935-104ef4ab6193_sub', 'broker': 'hpfx.collab.science.gc.ca:5671//'}, 'local_offset': 0, 'subscription_index': 0}\n", "2025-05-06 18:02:27,525 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 524.10 ) exchange: xpublic subtopic: 20250506.WXO-DD.observations.swob-ml.20250506.CWUT a file with baseUrl: https://hpfx.collab.science.gc.ca relPath: /20250506/WXO-DD/observations/swob-ml/20250506/CWUT/2025-05-06-2153-CWUT-AUTO-minute-swob.xml sundew_extension: DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215341 id: KQWbBWI size: 9818 \n", "2025-05-06 18:02:27,597 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2025-05-06-2153-CWUT-AUTO-minute-swob.xml \n", "2025-05-06 18:02:27,615 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'local_offset', 'exchange', '_format', 'topic', 'ack_id', 'subtopic', 'subscription_index'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215344', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_8a0d2cf8c42ad6bb00358762e7a798fe:DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215344', 'source': 'anonymous', 'mtime': '20250506T215346.150', 'atime': '20250506T215346.150', 'pubTime': '20250506T215346.150', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20250506/WXO-DD/observations/swob-ml/20250506/CZEL/2025-05-06-2153-CZEL-AUTO-minute-swob.xml', 'subtopic': ['20250506', 'WXO-DD', 'observations', 'swob-ml', '20250506', 'CZEL'], 'identity': {'method': 'md5', 'value': '34Av1QcKERddo5tIUU9hcA=='}, 'size': 10268, 'exchange': 'xpublic', 'topic': 'v02.post.20250506.WXO-DD.observations.swob-ml.20250506.CZEL', 'ack_id': {'delivery_tag': 5, 'channel_id': 2, 'connection_id': '4f89e0df-234f-4c89-a935-104ef4ab6193_sub', 'broker': 'hpfx.collab.science.gc.ca:5671//'}, 'local_offset': 0, 'subscription_index': 0}\n", "2025-05-06 18:02:27,616 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 521.47 ) exchange: xpublic subtopic: 20250506.WXO-DD.observations.swob-ml.20250506.CZEL a file with baseUrl: https://hpfx.collab.science.gc.ca relPath: /20250506/WXO-DD/observations/swob-ml/20250506/CZEL/2025-05-06-2153-CZEL-AUTO-minute-swob.xml sundew_extension: DMS:WXO_RENAMED_SWOB2:MSC:XML::20250506215344 id: 34Av1Qc size: 10268 \n", "2025-05-06 18:02:27,676 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2025-05-06-2153-CZEL-AUTO-minute-swob.xml \n", "2025-05-06 18:02:27,677 [INFO] sarracenia.flow run 6 messages processed > messageCountMax 5\n", "2025-05-06 18:02:27,677 [INFO] sarracenia.flow please_stop asked to stop\n", "2025-05-06 18:02:27,677 [INFO] sarracenia.moth please_stop asked to stop\n", "2025-05-06 18:02:27,677 [INFO] sarracenia.flow _runHousekeeping on_housekeeping pid: 414972 subscribe/flow_demo instance: 0\n", "2025-05-06 18:02:27,678 [INFO] sarracenia.flowcb.gather.message on_housekeeping from amqps://anonymous@hpfx.collab.science.gc.ca messages: good: 5 bad: 0 bytes: 730Bytes average: 146Bytes\n", "2025-05-06 18:02:27,679 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current cpu_times: user=0.69 system=0.15\n", "2025-05-06 18:02:27,679 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 820.9MiB, accumulating count (5 or 5/100 so far) before self-setting threshold\n", "2025-05-06 18:02:27,679 [INFO] sarracenia.flowcb.log stats version: 3.00.59rc1, started: now, last_housekeeping: 0.7 seconds ago \n", "2025-05-06 18:02:27,679 [INFO] sarracenia.flowcb.log stats messages received: 5, accepted: 5, rejected: 0 rate accepted: 100.0% or 7.2 m/s\n", "2025-05-06 18:02:27,680 [INFO] sarracenia.flowcb.log stats files transferred: 5 bytes: 46.0KiB rate: 66.0KiB/sec\n", "2025-05-06 18:02:27,680 [INFO] sarracenia.flowcb.log stats lag: average: 523.95, maximum: 525.95 \n", "2025-05-06 18:02:27,680 [INFO] sarracenia.flow run 6 messages processed > messageCountMax 5\n", "2025-05-06 18:02:27,681 [INFO] sarracenia.flow please_stop asked to stop\n", "2025-05-06 18:02:27,681 [INFO] sarracenia.moth please_stop asked to stop\n", "2025-05-06 18:02:27,681 [INFO] sarracenia.flow _runHousekeeping on_housekeeping pid: 414972 subscribe/flow_demo instance: 0\n", "2025-05-06 18:02:27,681 [INFO] sarracenia.flowcb.gather.message on_housekeeping from amqps://anonymous@hpfx.collab.science.gc.ca messages: good: 0 bad: 0 bytes: 0Bytes average: 0Bytes\n", "2025-05-06 18:02:27,682 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current cpu_times: user=0.69 system=0.15\n", "2025-05-06 18:02:27,683 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 820.9MiB, accumulating count (5 or 5/100 so far) before self-setting threshold\n", "2025-05-06 18:02:27,683 [INFO] sarracenia.flowcb.log stats version: 3.00.59rc1, started: now, last_housekeeping: 0.0 seconds ago \n", "2025-05-06 18:02:27,683 [INFO] sarracenia.flowcb.log stats messages received: 0, accepted: 0, rejected: 0 rate accepted: 0.0% or 0.0 m/s\n", "2025-05-06 18:02:27,683 [INFO] sarracenia.flowcb.log stats files transferred: 0 bytes: 0Bytes rate: 0Bytes/sec\n", "2025-05-06 18:02:27,700 [DEBUG] amqp collect Closed channel #1\n", "2025-05-06 18:02:27,701 [DEBUG] amqp collect Closed channel #2\n", "2025-05-06 18:02:27,701 [INFO] sarracenia.flow close flow/close completed cleanly pid: 414972 subscribe/flow_demo instance: 0\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2024-01-29 15:00:38,025 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 8201.25 ) https://hpfx.collab.science.gc.ca /20240129/WXO-DD/observations/swob-ml/20240129/CVBB/2024-01-29-1743-CVBB-AUTO-minute-swob.xml\n", "2024-01-29 15:00:38,025 [INFO] sarracenia.flow do_download missing destination directories, makedirs: /tmp/flow_demo/20240129/WXO-DD/observations/swob-ml/20240129/CVBB \n", "2024-01-29 15:00:38,114 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/20240129/WXO-DD/observations/swob-ml/20240129/CVBB/2024-01-29-1743-CVBB-AUTO-minute-swob.xml \n", "2024-01-29 15:00:38,139 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'source', '_format', 'exchange', 'subtopic', 'local_offset', 'ack_id'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20240129174356', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_8067f0a1a5b4711ab86e481341b26590:DMS:WXO_RENAMED_SWOB2:MSC:XML::20240129174356', 'source': 'anonymous', 'mtime': '20240129T174357.781', 'atime': '20240129T174357.781', 'pubTime': '20240129T174357.781', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20240129/WXO-DD/observations/swob-ml/20240129/CWLJ/2024-01-29-1743-CWLJ-AUTO-minute-swob.xml', 'subtopic': ['20240129', 'WXO-DD', 'observations', 'swob-ml', '20240129', 'CWLJ'], 'identity': {'method': 'md5', 'value': 'uDrzi9GLNnhEgGvSylHu9g=='}, 'size': 9428, 'exchange': 'xpublic', 'ack_id': 4, 'local_offset': 0}\n", "2024-01-29 15:00:38,140 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 8200.36 ) https://hpfx.collab.science.gc.ca /20240129/WXO-DD/observations/swob-ml/20240129/CWLJ/2024-01-29-1743-CWLJ-AUTO-minute-swob.xml\n", "2024-01-29 15:00:38,141 [INFO] sarracenia.flow do_download missing destination directories, makedirs: /tmp/flow_demo/20240129/WXO-DD/observations/swob-ml/20240129/CWLJ \n", "2024-01-29 15:00:38,242 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/20240129/WXO-DD/observations/swob-ml/20240129/CWLJ/2024-01-29-1743-CWLJ-AUTO-minute-swob.xml \n", "2024-01-29 15:00:38,262 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'source', '_format', 'exchange', 'subtopic', 'local_offset', 'ack_id'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20240129174355', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_6f203257347d4f090abc1d7557864cb7:DMS:WXO_RENAMED_SWOB2:MSC:XML::20240129174355', 'source': 'anonymous', 'mtime': '20240129T174357.267', 'atime': '20240129T174357.267', 'pubTime': '20240129T174357.267', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20240129/WXO-DD/observations/swob-ml/20240129/CAMS/2024-01-29-1743-CAMS-AUTO-minute-swob.xml', 'subtopic': ['20240129', 'WXO-DD', 'observations', 'swob-ml', '20240129', 'CAMS'], 'identity': {'method': 'md5', 'value': 'H/h4jm6MTzMSp1oCeDS1jA=='}, 'size': 9826, 'exchange': 'xpublic', 'ack_id': 5, 'local_offset': 0}\n", "2024-01-29 15:00:38,263 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 8201.00 ) https://hpfx.collab.science.gc.ca /20240129/WXO-DD/observations/swob-ml/20240129/CAMS/2024-01-29-1743-CAMS-AUTO-minute-swob.xml\n", "2024-01-29 15:00:38,263 [INFO] sarracenia.flow do_download missing destination directories, makedirs: /tmp/flow_demo/20240129/WXO-DD/observations/swob-ml/20240129/CAMS \n", "2024-01-29 15:00:38,356 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/20240129/WXO-DD/observations/swob-ml/20240129/CAMS/2024-01-29-1743-CAMS-AUTO-minute-swob.xml \n", "2024-01-29 15:00:38,357 [INFO] sarracenia.flow please_stop ok, telling 4 callbacks about it.\n", "2024-01-29 15:00:38,357 [INFO] sarracenia.flow run starting last pass (without gather) through loop for cleanup.\n", "2024-01-29 15:00:38,358 [INFO] sarracenia.flow please_stop ok, telling 4 callbacks about it.\n", "2024-01-29 15:00:38,359 [INFO] sarracenia.flow run on_housekeeping pid: 3567801 subscribe/flow_demo instance: 0\n", "2024-01-29 15:00:38,359 [INFO] sarracenia.flowcb.gather.message on_housekeeping messages: good: 5 bad: 0 bytes: 730 Bytes average: 146 Bytes\n", "2024-01-29 15:00:38,359 [INFO] sarracenia.flowcb.retry on_housekeeping on_housekeeping\n", "2024-01-29 15:00:38,360 [INFO] sarracenia.diskqueue on_housekeeping work_retry_00 on_housekeeping\n", "2024-01-29 15:00:38,361 [INFO] sarracenia.diskqueue on_housekeeping No retry in list\n", "2024-01-29 15:00:38,361 [INFO] sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000548\n", "2024-01-29 15:00:38,361 [INFO] sarracenia.diskqueue on_housekeeping post_retry_000 on_housekeeping\n", "2024-01-29 15:00:38,362 [INFO] sarracenia.diskqueue on_housekeeping No retry in list\n", "2024-01-29 15:00:38,362 [INFO] sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000741\n", "2024-01-29 15:00:38,363 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current Memory cpu_times: user=0.76 system=0.17\n", "2024-01-29 15:00:38,363 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 790.2 MiB, accumulating count (5 or 5/100 so far) before self-setting threshold\n", "2024-01-29 15:00:38,364 [INFO] sarracenia.flowcb.log stats version: 3.00.51rc6, started: a second ago, last_housekeeping: 1.0 seconds ago \n", "2024-01-29 15:00:38,364 [INFO] sarracenia.flowcb.log stats messages received: 5, accepted: 5, rejected: 0 rate accepted: 100.0% or 5.0 m/s\n", "2024-01-29 15:00:38,364 [INFO] sarracenia.flowcb.log stats files transferred: 5 bytes: 45.3 KiB rate: 45.1 KiB/sec\n", "2024-01-29 15:00:38,365 [INFO] sarracenia.flowcb.log stats lag: average: 8200.95, maximum: 8201.25 \n", "2024-01-29 15:00:38,366 [INFO] sarracenia.flowcb.log on_housekeeping housekeeping\n", "2024-01-29 15:00:38,366 [INFO] sarracenia.flow run clean stop from run loop\n", "2024-01-29 15:00:38,385 [DEBUG] amqp collect Closed channel #1\n", "2024-01-29 15:00:38,386 [DEBUG] amqp collect Closed channel #2\n", "2024-01-29 15:00:38,386 [INFO] sarracenia.flowcb.gather.message on_stop closing\n", "2024-01-29 15:00:38,386 [INFO] sarracenia.flow close flow/close completed cleanly pid: 3567801 subscribe/flow_demo instance: 0\n" ] } ], "source": [ "subscriber = sarracenia.flow.subscribe.Subscribe( cfg )\n", "\n", "subscriber.run()" ] }, { "cell_type": "markdown", "id": "passive-biotechnology", "metadata": {}, "source": [ "## Conclusion:\n", "\n", "With the sarracenia.flow class, an async method of operation is supported, it can be customized using flowcb (flow callback) class to introduce specific processing at specific times. It is just like invocation of a single instance from the command line, except all configuration is done within python by setting cfg fields, rather than using the configuration language.\n", "\n", "What is lost vs. using the command line tool: \n", "\n", "* ability to use the configuration language (slightly simpler than assigning values to the cfg object) \n", "* easy running of multiple instances, \n", "* co-ordinated monitoring of the instances (restarts on failure, and a programmable number of subscribers started per configuration.) \n", "* log file management.\n", "\n", "The command line tool provides those additional features." ] }, { "cell_type": "code", "execution_count": null, "id": "2267de19-8c40-417d-ac0e-dc20df3e1a06", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }