Exemple d’API de flux

La classe sarracenia.flow fournit un filtrage d’acceptation/rejet intégré pour les messages, prend en charge le téléchargement intégré dans plusieurs protocoles, réessaye en cas d’échec et permet la création de rappels pour personnaliser le traitement.

Vous devez fournir une configuration comme argument lors de l’instanciation d’un abonné. La sarracenia.config.no_file_config() renvoie une configuration vide sans consulter l’arborescence des fichiers de configuration sr3.

Après avoir apporté les modifications nécessaires à la configuration, l’abonné est alors initié et exécuté.

[1]:
!mkdir /tmp/flow_demo
mkdir: cannot create directory ‘/tmp/flow_demo’: File exists

Créer un répertoire pour les fichiers que vous allez télécharger. La racine de l’arborescence de répertoires doit exister.

[2]:
import re
import sarracenia.config
from sarracenia.flow.subscribe import Subscribe
import sarracenia.flowcb
import sarracenia.credentials

cfg = sarracenia.config.no_file_config()

cfg.broker = 'amqps://anonymous:anonymous@hpfx.collab.science.gc.ca'
cfg.topicPrefix = [ 'v02', 'post']
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.bindings = [ ('xpublic', ['v02', 'post'], ['*', 'WXO-DD', 'observations', 'swob-ml', '#' ]) ]
cfg.queueName='q_anonymous.subscriber_test2'
cfg.download=True
cfg.batch=1
cfg.messageCountMax=5

# set the instance number for the flow class.
cfg.no=0

cfg.finalize()

# accept/reject patterns:
pattern=".*"
#   to_match, write_to_dir, DESTFN, regex_to_match, accept=True,mirror,strip, pstrip,flatten
cfg.masks= [ ( pattern, "/tmp/flow_demo", None, re.compile(pattern), True, False, False, False, '/' ) ]



démareurs.

les paramètres du courtier, des liaisons et du nom de la file d’attente sont expliqués dans le bloc-notes de moth.

cfg.download

Si vous souhaitez que le flux télécharge les fichiers correspondant aux messages. Si vrai, il téléchargera les fichiers.

cfg.batch

Les messages sont traités par lots. Le nombre de messages à récupérer par appel à newMessages() est limité par le paramètre batch. Nous le définissons ici sur 1 afin que vous puissiez voir chaque fichier téléchargé immédiatement lorsque le message correspondant est téléchargé. vous pouvez laisser ce champ vide et la valeur par défaut est 25. Les paramètres sont une question de goût et de cas d’utilisation.

cfg.messageCountMax

Normalement, nous laissons ce paramètre à sa valeur par défaut (0) qui n’a aucun effet sur le traitement. à des fins de démonstration, nous limitons le nombre de messages que l’abonné traitera avec ce paramètre. après la réception de messageCountMax messages, arrêtez le traitement.

cfg.masks

Les masques sont une forme compilée de directives d’acceptation/rejet. un relPath est comparé à la regex dans le masque. Si l’expression régulière correspond et que accept est True, le message est accepté pour un traitement ultérieur. Si l’expression régulière correspond, mais accept vaut False, le traitement du message est arrêté (le message est rejeté.)

les masques sont un tuple. la signification peut être recherchée dans la page de manuel sr3(1).

  • pattern_string, la chaîne d’expression régulière d’entrée, à compiler par les routines.

  • directory, où mettre les fichiers téléchargés (racine de l’arborescence, lors de la mise en miroir)

  • fn, transformation du filename à faire. NONE est utilisé 99% des cas d’utilisation.

  • regex, version regex compilée de pattern_string

  • accept(True/False), si le modèle correspond, acceptez le message pour un traitement ultérieur.

  • mirror(True/False), lors du téléchargement, créez une arborescence complète pour refléter la source, ou videz-la simplement dans le répertoire

  • strip(True/False), modifier le relpath en supprimant les entrées de la gauche.

  • pstrip(True/False), entrées de bande basées sur le modèle

  • flatten(char … ‘/’ signifie ne pas aplatir.) )

cfg.no, cfg.pid_filename

Ces paramètres sont nécessaires car ils seraient normalement définis par la classe sarracenia.instance qui est normalement utilisée pour lancer des flux. Ils permettent de configurer des chemins d’exécution pour retry_queues et des fichiers d’état, afin de mémoriser les paramètres si nécessaire entre les exécutions.

[3]:
subscriber = sarracenia.flow.subscribe.Subscribe( cfg )

subscriber.run()
2023-05-28 16:52:19,861 [INFO] sarracenia.flow loadCallbacks flowCallback plugins to load: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'log']
2023-05-28 16:52:19,940 [DEBUG] amqp _on_start Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@hpfx2.collab.science.gc.ca', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 24.2.1', 'product': 'RabbitMQ', 'version': '3.9.13'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2023-05-28 16:52:19,984 [DEBUG] amqp __init__ using channel_id: 1
2023-05-28 16:52:20,002 [DEBUG] amqp _on_open_ok Channel open
2023-05-28 16:52:20,048 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous.subscriber_test2 (as: amqps://anonymous@hpfx.collab.science.gc.ca)
2023-05-28 16:52:20,048 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous.subscriber_test2 with v02.post.*.WXO-DD.observations.swob-ml.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)
2023-05-28 16:52:20,072 [DEBUG] sarracenia.moth.amqp __getSetup getSetup ... Done!
2023-05-28 16:52:20,073 [DEBUG] sarracenia.flowcb.retry __init__ sr_retry __init__
2023-05-28 16:52:20,074 [DEBUG] sarracenia.config add_option retry_driver declared as type:<class 'str'> value:disk
2023-05-28 16:52:20,101 [DEBUG] sarracenia.diskqueue __init__  work_retry_00 __init__
2023-05-28 16:52:20,103 [DEBUG] sarracenia.config add_option MemoryMax declared as type:<class 'int'> value:0
2023-05-28 16:52:20,103 [DEBUG] sarracenia.config add_option MemoryBaseLineFile declared as type:<class 'int'> value:100
2023-05-28 16:52:20,103 [DEBUG] sarracenia.config add_option MemoryMultiplier declared as type:<class 'float'> value:3
2023-05-28 16:52:20,104 [DEBUG] sarracenia.config add_option logEvents declared as type:<class 'set'> value:{'after_work', 'after_accept', 'on_housekeeping'}
2023-05-28 16:52:20,104 [DEBUG] sarracenia.config add_option logMessageDump declared as type:<class 'bool'> value:False
2023-05-28 16:52:20,105 [INFO] sarracenia.flowcb.log __init__ subscribe initialized with: {'after_work', 'after_accept', 'on_housekeeping'}
2023-05-28 16:52:20,105 [DEBUG] sarracenia.config check_undeclared_options missing defaults: {'post_exchangeSuffix', 'exchangeSplit', 'identity', 'pollUrl', 'post_exchangeSplit', 'MemoryMax', 'notify_only', 'cluster', 'blocksize', 'exchange_suffix', 'report_exchange', 'realpathFilter', 'action', 'logMessageDump', 'retry_driver', 'source', 'nodupe_basis', 'MemoryMultiplier', 'reconnect', 'force_polling', 'header', 'inplace', 'post_exchange', 'save', 'post_on_start', 'follow_symlinks', 'count', 'MemoryBaseLineFile', 'feeder', 'sendTo', 'restore'}
2023-05-28 16:52:20,105 [INFO] sarracenia.flow run callbacks loaded: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'log']
2023-05-28 16:52:20,105 [INFO] sarracenia.flow run pid: 1921103 subscribe/flow_demo instance: 0
2023-05-28 16:52:20,128 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'_format', 'exchange', 'ack_id', 'local_offset', 'subtopic'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528202430', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_ef8614a54e610cd50588f448a9632244:DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528202430', 'source': 'WXO-DD', 'mtime': '20230528T202432.81', 'atime': '20230528T202432.81', 'pubTime': '20230528T202432.81', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/observations/swob-ml/20230528/CWRZ/2023-05-28-2023-CWRZ-AUTO-minute-swob.xml', 'subtopic': ['20230528', 'WXO-DD', 'observations', 'swob-ml', '20230528', 'CWRZ'], 'identity': {'method': 'md5', 'value': '30K1LtKs+91neD6625tbcg=='}, 'size': 7665, 'exchange': 'xpublic', 'ack_id': 1, 'local_offset': 0}
2023-05-28 16:52:20,129 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 1667.32 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/observations/swob-ml/20230528/CWRZ/2023-05-28-2023-CWRZ-AUTO-minute-swob.xml
2023-05-28 16:52:20,129 [INFO] sarracenia.flow run now active on vip None
2023-05-28 16:52:20,130 [DEBUG] sarracenia.config add_option accelWgetCommand declared as type:<class 'str'> value:/usr/bin/wget %s -o - -O %d
2023-05-28 16:52:20,213 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2023-05-28-2023-CWRZ-AUTO-minute-swob.xml
2023-05-28 16:52:20,233 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'_format', 'exchange', 'ack_id', 'local_offset', 'subtopic'}, 'sundew_extension': 'DMS:CMC:SWOB_FORESTRY:XML::20230528202436', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_b82b2479d8c115cf0eb4c82bfcc59981:DMS:CMC:SWOB_FORESTRY:XML::20230528202436', 'source': 'WXO-DD', 'mtime': '20230528T202437.541', 'atime': '20230528T202437.541', 'pubTime': '20230528T202437.541', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/observations/swob-ml/partners/on-firewx/20230528/ban/2023-05-28-2023-on-mnrf-affes-ban-ban-AUTO-swob.xml', 'subtopic': ['20230528', 'WXO-DD', 'observations', 'swob-ml', 'partners', 'on-firewx', '20230528', 'ban'], 'identity': {'method': 'md5', 'value': 'QGDX+gsirC8l8hnDfEHa1w=='}, 'size': 5203, 'exchange': 'xpublic', 'ack_id': 2, 'local_offset': 0}
2023-05-28 16:52:20,233 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 1662.69 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/observations/swob-ml/partners/on-firewx/20230528/ban/2023-05-28-2023-on-mnrf-affes-ban-ban-AUTO-swob.xml
2023-05-28 16:52:20,311 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2023-05-28-2023-on-mnrf-affes-ban-ban-AUTO-swob.xml
2023-05-28 16:52:20,336 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'_format', 'exchange', 'ack_id', 'local_offset', 'subtopic'}, 'sundew_extension': 'DMS:CMC:SWOB_FORESTRY:XML::20230528202436', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_b96cfe8f3a477e2c4a39af99a97b6429:DMS:CMC:SWOB_FORESTRY:XML::20230528202436', 'source': 'WXO-DD', 'mtime': '20230528T202437.541', 'atime': '20230528T202437.541', 'pubTime': '20230528T202437.541', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/observations/swob-ml/partners/on-firewx/20230528/ple/2023-05-28-2023-on-mnrf-affes-ple-ple-AUTO-swob.xml', 'subtopic': ['20230528', 'WXO-DD', 'observations', 'swob-ml', 'partners', 'on-firewx', '20230528', 'ple'], 'identity': {'method': 'md5', 'value': 'eL80Iw/3MaCqipWJOT7LeQ=='}, 'size': 5091, 'exchange': 'xpublic', 'ack_id': 3, 'local_offset': 0}
2023-05-28 16:52:20,336 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 1662.80 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/observations/swob-ml/partners/on-firewx/20230528/ple/2023-05-28-2023-on-mnrf-affes-ple-ple-AUTO-swob.xml
2023-05-28 16:52:20,433 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2023-05-28-2023-on-mnrf-affes-ple-ple-AUTO-swob.xml
2023-05-28 16:52:20,456 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'_format', 'exchange', 'ack_id', 'local_offset', 'subtopic'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528202442', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_0e2c0d07f3648f9956db0a2b1523e6d7:DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528202442', 'source': 'WXO-DD', 'mtime': '20230528T202443.46', 'atime': '20230528T202443.46', 'pubTime': '20230528T202443.46', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/observations/swob-ml/20230528/CXHI/2023-05-28-2024-CXHI-AUTO-minute-swob.xml', 'subtopic': ['20230528', 'WXO-DD', 'observations', 'swob-ml', '20230528', 'CXHI'], 'identity': {'method': 'md5', 'value': 'bGYYmVHKuo3JSRDOjiC7NA=='}, 'size': 9353, 'exchange': 'xpublic', 'ack_id': 4, 'local_offset': 0}
2023-05-28 16:52:20,457 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 1657.00 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/observations/swob-ml/20230528/CXHI/2023-05-28-2024-CXHI-AUTO-minute-swob.xml
2023-05-28 16:52:20,534 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2023-05-28-2024-CXHI-AUTO-minute-swob.xml
2023-05-28 16:52:20,558 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'_format': 'v02', '_deleteOnPost': {'_format', 'exchange', 'ack_id', 'local_offset', 'subtopic'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528202442', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_1198e5a492e9a42cd6aadbbe92bcb788:DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528202442', 'source': 'WXO-DD', 'mtime': '20230528T202443.47', 'atime': '20230528T202443.47', 'pubTime': '20230528T202443.47', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/observations/swob-ml/20230528/CWBM/2023-05-28-2024-CWBM-AUTO-minute-swob.xml', 'subtopic': ['20230528', 'WXO-DD', 'observations', 'swob-ml', '20230528', 'CWBM'], 'identity': {'method': 'md5', 'value': 'sIqUJmWCsX5BVfplkZA75Q=='}, 'size': 9354, 'exchange': 'xpublic', 'ack_id': 5, 'local_offset': 0}
2023-05-28 16:52:20,559 [INFO] sarracenia.flowcb.log after_accept accepted: (lag: 1657.09 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/observations/swob-ml/20230528/CWBM/2023-05-28-2024-CWBM-AUTO-minute-swob.xml
2023-05-28 16:52:20,652 [INFO] sarracenia.flowcb.log after_work downloaded ok: /tmp/flow_demo/2023-05-28-2024-CWBM-AUTO-minute-swob.xml
2023-05-28 16:52:20,653 [INFO] sarracenia.flow please_stop ok, telling 4 callbacks about it.
2023-05-28 16:52:20,653 [INFO] sarracenia.flow run starting last pass (without gather) through loop for cleanup.
2023-05-28 16:52:20,654 [INFO] sarracenia.flow please_stop ok, telling 4 callbacks about it.
2023-05-28 16:52:20,654 [INFO] sarracenia.flow run on_housekeeping pid: 1921103 subscribe/flow_demo instance: 0
2023-05-28 16:52:20,655 [INFO] sarracenia.flowcb.gather.message on_housekeeping messages: good: 5 bad: 0 bytes: 783 Bytes average: 156 Bytes
2023-05-28 16:52:20,655 [INFO] sarracenia.flowcb.retry on_housekeeping on_housekeeping
2023-05-28 16:52:20,655 [INFO] sarracenia.diskqueue on_housekeeping work_retry_00 on_housekeeping
2023-05-28 16:52:20,656 [INFO] sarracenia.diskqueue on_housekeeping No retry in list
2023-05-28 16:52:20,656 [INFO] sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000612
2023-05-28 16:52:20,656 [INFO] sarracenia.diskqueue on_housekeeping post_retry_000 on_housekeeping
2023-05-28 16:52:20,657 [INFO] sarracenia.diskqueue on_housekeeping No retry in list
2023-05-28 16:52:20,657 [INFO] sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000450
2023-05-28 16:52:20,657 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current Memory cpu_times: user=0.47 system=0.07
2023-05-28 16:52:20,657 [INFO] sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 759.0 MiB, accumulating count (5 or 5/100 so far) before self-setting threshold
2023-05-28 16:52:20,658 [INFO] sarracenia.flowcb.log stats version: 3.00.40, started: now, last_housekeeping:  0.6 seconds ago
2023-05-28 16:52:20,658 [INFO] sarracenia.flowcb.log stats messages received: 5, accepted: 5, rejected: 0   rate accepted: 100.0% or 9.0 m/s
2023-05-28 16:52:20,658 [INFO] sarracenia.flowcb.log stats files transferred: 5 bytes: 35.8 KiB rate: 64.8 KiB/sec
2023-05-28 16:52:20,658 [INFO] sarracenia.flowcb.log stats lag: average: 1661.38, maximum: 1667.32
2023-05-28 16:52:20,658 [INFO] sarracenia.flowcb.log on_housekeeping housekeeping
2023-05-28 16:52:20,659 [INFO] sarracenia.flow run clean stop from run loop
2023-05-28 16:52:20,679 [DEBUG] amqp collect Closed channel #1
2023-05-28 16:52:20,680 [INFO] sarracenia.flowcb.gather.message on_stop closing
2023-05-28 16:52:20,680 [INFO] sarracenia.flow close flow/close completed cleanly pid: 1921103 subscribe/flow_demo instance: 0

Conclusion:

Avec la classe sarracenia.flow, une méthode de fonctionnement asynchrone est prise en charge, elle peut être personnalisée à l’aide de la classe flowcb (rappel de flux) pour introduire un traitement spécifique à des moments spécifiques. C’est comme l’invocation d’une seule instance à partir de la ligne de commande, sauf que toute la configuration est effectuée dans python en définissant des champs cfg, plutôt qu’en utilisant le langage de configuration.

Qu’est-ce qui est perdu par rapport à l’utilisation de l’outil de ligne de commande :

  • possibilité d’utiliser le langage de configuration (légèrement plus simple que d’attribuer des valeurs à l’objet cfg)

  • exécution facile de plusieurs instances,

  • surveillance coordonnée des instances (redémarrages en cas d’échec, et nombre programmable d’abonnés démarrés par configuration.)

  • gestion des fichiers journaux.

L’outil de ligne de commande fournit ces fonctionnalités supplémentaires.