Personnalisez la gestion des fichiers avec les rappels.

Tous les composants de Sarracenia implémentent l’algorithme Flow, avec différents rappels, dans le langage de programmation Python. La classe principale de Sarracenia (Python) est sarracenia.flow et la grande partie de la fonctionnalité de base est mis en œuvre à l’aide de la classe créée pour ajouter un traitement personnalisé à un flux, le classe flowcb (rappel de flux).

Pour une discussion détaillée de l’algorithme de flux lui-même, jetez un oeil dans le manuel Concepts. Pour tout flux, on peut ajouter un traitement personnalisé à divers moments pendant le traitement par sous-classement la classe sarracenia.flowcb.

En bref, l’algorithme comporte les étapes suivantes :

  • init(self, options) – lors de l’import, initialisation de python traditionnelle

  • on_start – lorsqu’une instance est démarrée.

  • boucle pour toujours

    • gather – collecte les messages à traiter appelés : worklist.incoming

    • poll – une autre façon de collecter des messages, uniquement dans le composant poll.

    • filter – applique les correspondances d’expressions régulières acceptées/rejetées à la liste de messages. déplace les messages pour les fichiers à ne pas télécharger de worklist.incoming vers worklist.reject

      • after_accept point d’entrée de rappel. traiter worklist.incoming, en en rejetant potentiellement d’autres.

    • ack – les messages worklist.rejected sont reconnus à la source en amont lorsque le traitement est terminé.

    • work – effectue un transfert ou une transformation sur un fichier.

    • ack – les messages worklist.ok pour les fichiers transférés avec succès sont reconnus à la source en amont.

      • Point d’entrée du rappel after_work

    • ack – les messages worklist.failed pour les fichiers qui n’ont pas été transférés avec succès sont reconnus.

    • post – publier le résultat du travail effectué pour l’étape suivante.

    • occasionnellement… **on_housekeeping – faire des nettoyages périodiques…

  • on_stop – arrêt du traitement.

pour plus de détails sur les points d’entrée disponibles de flowcb, consultez le code source:

Regardons l’utilisation de la classe dans une configuration :

[1]:
!sr3 remove subscribe/hpfx_amis.conf
!sr3 add subscribe/hpfx_amis.conf
2023-05-28 16:35:16,492 1919860 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: 10
2023-05-28 16:35:16,493 1919860 [INFO] root remove removing subscribe/hpfx_amis

add: 2023-05-28 16:35:17,637 1919863 [INFO] sarracenia.sr add copying: /home/peter/Sarracenia/sr3/sarracenia/examples/subscribe/hpfx_amis.conf to /home/peter/.config/sr3/subscribe/hpfx_amis.conf

[2]:
!echo messageCountMax 10 >>~/.config/sr3/subscribe/hpfx_amis.conf

fait en sorte que le flux s’arrête après la consommation de 10 messages.

[3]:
!sr3 list fcb
2023-05-28 16:35:26,803 1919870 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: 10
Provided callback classes: ( /home/peter/Sarracenia/sr3/sarracenia )
flowcb/accept/dateappend.py      flowcb/accept/delete.py
flowcb/accept/downloadbaseurl.py flowcb/accept/hourtree.py
flowcb/accept/httptohttps.py     flowcb/accept/longflow.py
flowcb/accept/pathreplace.py     flowcb/accept/posthourtree.py
flowcb/accept/postoverride.py    flowcb/accept/printlag.py
flowcb/accept/rename4jicc.py     flowcb/accept/renamedmf.py
flowcb/accept/renamewhatfn.py    flowcb/accept/save.py
flowcb/accept/speedo.py          flowcb/accept/sundewpxroute.py
flowcb/accept/testretry.py       flowcb/accept/toclusters.py
flowcb/accept/tohttp.py          flowcb/accept/tolocal.py
flowcb/accept/tolocalfile.py     flowcb/accept/wmotypesuffix.py
flowcb/amserver.py               flowcb/clamav.py
flowcb/destfn/replace.py         flowcb/destfn/sample.py
flowcb/download/mail_ingest.py   flowcb/filter/deleteflowfiles.py
flowcb/filter/fdelay.py          flowcb/filter/pclean_f90.py
flowcb/filter/pclean_f92.py      flowcb/filter/wmo2msc.py
flowcb/gather/file.py            flowcb/gather/message.py
flowcb/housekeeping/resources.py flowcb/log.py
flowcb/mdelaylatest.py           flowcb/nodupe/data.py
flowcb/nodupe/name.py            flowcb/pclean.py
flowcb/poll/airnow.py            flowcb/poll/mail.py
flowcb/poll/nasa_mls_nrt.py      flowcb/poll/nexrad.py
flowcb/poll/noaa_hydrometric.py  flowcb/poll/s3bucket.py
flowcb/poll/usgs.py              flowcb/post/message.py
flowcb/retry.py                  flowcb/rootchown.py
flowcb/run.py                    flowcb/rxqueue_gzip.py
flowcb/sample.py                 flowcb/send/am.py
flowcb/send/email.py             flowcb/shiftdir2baseurl.py
flowcb/trace_on_stop.py          flowcb/v2wrapper.py
flowcb/wistree.py                flowcb/work/age.py
flowcb/work/check.py             flowcb/work/citypage_check.py
flowcb/work/delete.py            flowcb/work/rxpipe.py
flowcb/work/send_egc_les.py

L’ajout de cette ligne à la configuration signifie que la sous-classe wistree flowcb (source ci-dessus) sera ajoutée au flux et modifier le traitement en faisant appeler ses routines… la principale étant after_accept

[4]:
!echo callback accept.posthourtree  >>~/.config/sr3/subscribe/hpfx_amis.conf
[5]:
!sr3 foreground subscribe/hpfx_amis.conf
2023-05-28 16:35:34,235 1919873 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: 10
.2023-05-28 16:35:34,349 [INFO] 1919874 sarracenia.config finalize overriding batch for consistency with messageCountMax: 10
2023-05-28 16:35:34,355 [INFO] 1919874 sarracenia.config finalize overriding batch for consistency with messageCountMax: 10
2023-05-28 16:35:34,355 [INFO] 1919874 sarracenia.flow loadCallbacks flowCallback plugins to load: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']
2023-05-28 16:35:34,670 [INFO] 1919874 sarracenia.moth.amqp __getSetup queue declared q_anonymous_subscribe.hpfx_amis.33557703.14415188 (as: amqps://anonymous@hpfx.collab.science.gc.ca/)
2023-05-28 16:35:34,670 [INFO] 1919874 sarracenia.moth.amqp __getSetup binding q_anonymous_subscribe.hpfx_amis.33557703.14415188 with v02.post.*.WXO-DD.bulletins.alphanumeric.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca/)
2023-05-28 16:35:34,723 [INFO] 1919874 sarracenia.flowcb.log __init__ subscribe initialized with: {'on_housekeeping', 'post', 'after_post', 'after_work', 'after_accept'}
2023-05-28 16:35:34,723 [INFO] 1919874 sarracenia.flow run callbacks loaded: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']
2023-05-28 16:35:34,723 [INFO] 1919874 sarracenia.flow run pid: 1919874 subscribe/hpfx_amis.conf instance: 0
2023-05-28 16:35:34,764 [INFO] 1919874 sarracenia.flow run now active on vip None
2023-05-28 16:35:35,009 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:35,009 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.80 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/CA/CWAO/20/CACN45_CWAO_281300__OBQ_20965
2023-05-28 16:35:35,154 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/CACN45_CWAO_281300__OBQ_20965
2023-05-28 16:35:35,677 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:35,677 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.56 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___1729
2023-05-28 16:35:35,796 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___1729
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.55 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRWA20_KWAL_282035___43515
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.53 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SN/KWNB/20/SNVD15_KWNB_282000_RRO__50372
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.53 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SX/KWAL/20/SXAK50_KWAL_282035___51354
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 1.81 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___32251
2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 1.80 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___62598
2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRWA20_KWAL_282035___43515
2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SNVD15_KWNB_282000_RRO__50372
2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SXAK50_KWAL_282035___51354
2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___32251
2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___62598
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/WA/KKCI/20/WAUS41_KKCI_282045___14468
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___20765
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/WA/KKCI/20/WAUS46_KKCI_282045___65023
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/WA/KKCI/20/WAUS44_KKCI_282045___40622
2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.25 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___41115
2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/WAUS41_KKCI_282045___14468
2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___20765
2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/WAUS46_KKCI_282045___65023
2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/WAUS44_KKCI_282045___40622
2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___41115
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow please_stop ok, telling 5 callbacks about it.
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow run starting last pass (without gather) through loop for cleanup.
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow please_stop ok, telling 5 callbacks about it.
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow run on_housekeeping pid: 1919874 subscribe/hpfx_amis.conf instance: 0
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.gather.message on_housekeeping messages: good: 12 bad: 0 bytes: 1.6 KiB average: 139 Bytes
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.retry on_housekeeping on_housekeeping
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping work_retry_00 on_housekeeping
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping No retry in list
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000117
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping post_retry_000 on_housekeeping
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping No retry in list
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000081
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.housekeeping.resources on_housekeeping Current Memory cpu_times: user=0.18 system=0.01
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 54.8 MiB, accumulating count (12 or 12/100 so far) before self-setting threshold
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.log stats version: 3.00.40, started: 2 seconds ago, last_housekeeping:  2.9 seconds ago
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.log stats messages received: 12, accepted: 12, rejected: 0   rate accepted: 100.0% or 4.1 m/s
2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.log stats files transferred: 12 bytes: 4.5 KiB rate: 1.5 KiB/sec
2023-05-28 16:35:37,638 [INFO] 1919874 sarracenia.flowcb.log stats lag: average: 1.91, maximum: 3.26
2023-05-28 16:35:37,638 [INFO] 1919874 sarracenia.flowcb.log on_housekeeping housekeeping
2023-05-28 16:35:37,638 [INFO] 1919874 sarracenia.flow run clean stop from run loop
2023-05-28 16:35:37,669 [INFO] 1919874 sarracenia.flowcb.gather.message on_stop closing
2023-05-28 16:35:37,669 [INFO] 1919874 sarracenia.flow close flow/close completed cleanly pid: 1919874 subscribe/hpfx_amis.conf instance: 0

Sans le plugin, le téléchargement mettrait tous les fichiers directement dans le répertoire de réception. avec l’ajout du retour wistree, il place le fichier dans /tmp/hpfx_amis. Avec le changement, il le place dans l’arborescence des répertoires WIS et ajoute un suffixe selon le type de fichier.

Entrées de fichier de configuration et rappels

flowcb.log

Pour ajouter un retour à un flux, une ligne est ajoutée au fichier de configuration du flux:

flowcb sarracenia.flowcb.log.Log

Si vous suivez la convention et que le nom de la classe est une version en majuscules (Log) du nom de fichier (log), alors un raccourci est disponible:

callback log

Quoi qu’il en soit, cela entraînera l’importation de la classe par Sarracenia, puis chercher des points d’entrée dans la classe à appeler aux moments opportuns.

Le constructeur de classe accepte un objet de classe sarracenia.config.Config, appelé options, qui stocke tous les paramètres à utiliser par le flux en cours d’exécution. Les options sont utilisées pour remplacer le comportement par défaut des flux et des rappels. L’argument de flowcb est une classe python standard qui doit se trouver dans le chemin python normal pour python import, et le dernier élément est le nom de la classe dans le fichier qui doit être instancié en tant qu’instance flowcb.

un paramètre pour un rappel est déclaré comme suit :

set sarracenia.flowcb.filter.log.Log.logLevel debug

(le préfixe du paramètre correspond à la hiérarchie des types dans flowCallback)

lorsque le constructeur du rappel est appelé, son argument options contiendra :

options.logLevel = 'debug'

Si aucun remplacement spécifique au module n’est présent, le paramètre le plus global est utilisé.

Ainsi, l’utilisation des rappels peut être faite sans beaucoup de connaissances en python, juste la possibilité de créer des fichiers de configuration.

Au-delà de ce point, nous trouvons des conseils pour les personnes qui souhaitent écrire leurs propres retours en Python. Les rappels sont en Python ordinaire, avec quelques plis:

Écrire Vos Propres Rappels

Un rappel de flux est une classe python construite avec des routines nommées pour indiquer quand le programmeur veut qu’elles soient appelées. Pour ce faire, créez une routine qui sous-classe sarracenia.flowcb.FlowCB afin que la classe ait normalement:

from sarracenia.flowcb import FlowCB

parmi les importations dans le haut du fichier. Dans la partie principale du fichier, il y aura les classes de rappel personnalisées:

class Myclass(FlowCB):

déclarée comme sous-classe en tant que FlowCB. Les principales routines de la classe sont des points d’entrée qui seront appelés au moment où leur nom l’indique. S’il manque à une classe un point d’entrée donné, elle ne sera tout simplement pas appelée. La classe init() est utilisée pour initialiser les choses pour la classe de rappel :

def __init__(self, options):

    self.o = options

    logging.basicConfig(format=self.o.logFormat,
                        level=getattr(logging, self.o.logLevel.upper()))
    logger.setLevel(getattr(logging, self.o.logLevel.upper()))

    self.o.add_option( 'myoption', 'str', 'usuallyThis')

Les lignes de configuration du logging dans init permettent de définir un niveau de logging spécifique pour cette classe flowCallback. Une fois le passe-partout de logging terminé, la routine add_option pour définir les paramètres de la classe. Les utilisateurs peuvent les inclure dans les fichiers de configuration, tout comme les options intégrées:

myoption IsReallyNeeded

Le résultat d’un tel réglage est que self.o.myoption = ‘IsReallyNeeded’. Si aucune valeur n’est définie dans la configuration, self.o.myoption sera par défaut ‘usualThis’ Il existe différents kinds (types) d’options, où le type déclaré modifie l’analyse:

'count'    type de nombre entier.
'duration' un nombre à virgule flottante indiquant une quantité de secondes (0.001 est 1 milliseconde)
           modifié par un suffixe d'unité ( m-minute, h-hour (heure), w-week(semaine) )
'flag'     option booléenne (Vrai/Faux).
'list'     une liste de valeurs de chaîne, chaque occurrence successive étant enchaînée au total.
           toutes les options du plugin v2 sont déclarées de type liste.
'taille'   taille entière. Suffixes k, m et g pour les multiplicateurs kilo, méga et giga (base 2).
'str'      une valeur de chaîne arbitraire, comme tous les types ci-dessus, chaque
           occurrence suivante remplace la précédente.

Listes De Travail

Autre qu’ options, l’autre argument principal des routines de rappel after_accept et after_work est la liste de travail. La liste de travail est donnée aux points d’entrée se produisant pendant le traitement des messages et est un certain nombre de listes de travail de messages:

worklist.incoming --> messages to process (either new or retries.)
worklist.ok       --> successfully processed
worklist.rejected --> messages to not be further processed.
worklist.failed   --> messages for which processing failed.
                      failed messages will be retried.
worklist.directories_ok --> list of directories created during processing.

Initialement, tous les messages sont placés dans worklists.incoming. Si un plugin décide :

  • a message is not relevant, moved it to the rejected worklist.

  • a no further processing of the message is needed, move it to ok worklist.

  • an operation failed and it should be retried later, move to failed worklist.

Ne supprimez pas de toutes les listes, déplacez uniquement les messages entre les listes de travail. Il est nécessaire de mettre les messages rejetés dans la liste de travail appropriée afin qu’ils soient reconnus comme reçus. Les messages ne peuvent être supprimés qu’après la prise en charge de l’accusé de réception.

Sortie d’Exécution

Python a une excellente journalisation intégrée et doit une fois utiliser le module de manière normale et pythonique, avec:

import logging

Après toutes les importations dans votre fichier source python, définissez un enregistreur pour le fichier source:

logger = logging.getLogger(_name_)

Comme c’est normal avec le module de journalisation Python, les messages peuvent ensuite être publiés dans le journal:

logger.debug(‘got here’)

Chaque message du journal sera précédé de la classe et de la routine émettant le message de journal, ainsi que de la date/heure.

Exemple de sous-classe Flowcb

Avec les informations ci-dessus sur la gestion des options, les listes de travail et la journalisation, nous sommes prêts à comprendre le module wistree que nous venons d’utiliser. Cette classe wistree.py accepte les fichiers dont les noms commencent par AHL et renomme l’arborescence de répertoires dans un standard différent, celui en évolution pour le WMO WIS 2.0 (pour plus d’informations sur ce module: https://github.com/wmo-im/GTStoWIS2)

[6]:
  from sarracenia.flowcb import FlowCB
  import logging
  import GTStoWIS2

  logger = logging.getLogger(__name__)


  class Wistree(FlowCB):

    def __init__(self, options):

        if hasattr(options, 'logLevel'):
            logger.setLevel(getattr(logging, options.logLevel.upper()))
        else:
            logger.setLevel(logging.INFO)
        self.topic_builder=GTStoWIS2.GTStoWIS2()
        self.o = options


    def after_accept(self, worklist):

        new_incoming=[]

        for msg in worklist.incoming:

            # fix file name suffix.
            type_suffix = self.topic_builder.mapAHLtoExtension( msg['new_file'][0:2] )
            tpfx=msg['subtopic']

            # input has relpath=/YYYYMMDDTHHMM/... + pubTime
            # need to move the date from relPath to BaseDir, adding the T hour from pubTime.
            try:
                new_baseSubDir=tpfx[0]+msg['pubTime'][8:11]
                t='.'.join(tpfx[0:2])+'.'+new_baseSubDir
                new_baseDir = msg['new_dir'] + os.sep + new_baseSubDir
                new_relDir = 'WIS' + os.sep + self.topic_builder.mapAHLtoTopic(msg['new_file'])
                new_dir = new_baseDir + os.sep + new_relDir

                if msg['new_file'][-len(type_suffix):] != type_suffix:
                    new_file = msg['new_file']+type_suffix
                else:
                    new_file = msg['new_file']

                msg.updatePaths( self.o, new_baseDir + os.sep + new_relDir, new_file )
            except Exception as ex:
                logger.error( "skipped" , exc_info=True )
                worklist.failed.append(msg)
                continue

            msg['_deleteOnPost'] |= set( [ 'from_cluster', 'sum', 'to_clusters' ] )
            new_incoming.append(msg)

        worklist.incoming=new_incoming


---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[6], line 3
      1 from sarracenia.flowcb import FlowCB
      2 import logging
----> 3 import GTStoWIS2
      5 logger = logging.getLogger(__name__)
      8 class Wistree(FlowCB):

ModuleNotFoundError: No module named 'GTStoWIS2'

Plugins qui changent la façon dont un fichier est téléchargé

La routine after_accept est l’une des deux plus couramment utilisées. Il est utilisé pour modifier le traitement avant le téléchargement ou l’envoi d’un fichier. Pour traiter le fichier après son téléchargement, le point d’entrée after_work est utilisé pour traiter la liste worklist.ok (fichiers qui ont été téléchargés avec succès).

La routine after_accept a une boucle externe qui parcourt toute la liste des messages entrants. Il construit une nouvelle liste de messages entrants à partir de ceux qu’il accepte, tout en ajoutant tous les messages rejetés à worklist.failed. La liste est juste une liste de messages, où chaque message est un dictionnaire python avec tous les champs stockés dans un message au format v03. Dans le message, il y a, par exemple, les champs baseURL et relPath :

  • baseURL - la baseURL de la ressource à partir de laquelle un fichier serait obtenu.

  • relPath - le chemin relatif à ajouter à la baseURL pour obtenir l’URL de téléchargement complète.

Cela se produit avant que le transfert (téléchargement ou envoi, ou traitement) du fichier ait eu lieu, on peut donc changer le comportement en modifiant les champs du message. Normalement, les chemins de téléchargement (appelés new_dir et new_file) refléteront l’intention de copier l’arborescence source d’origine. donc si vous avez a/b/c.txt sur l’arborescence des sources et que vous téléchargez dans le répertoire mine sur le système local, le new_dir serait mine/a/b et new_file serait c.txt.

Plugins qui Traitent un Fichier après son Téléchargement

Un cas d’utilisation courant est pour les plugins avec un point d’entrée after_work pour lire le fichier après son téléchargement et le transformer en un produit dérivé avec un nom différent. Ainsi, le nouveau fichier est créé comme dans la section précédente. Le message pour le fichier téléchargé doit encore être déplacé sur une liste pour s’assurer qu’il est reconnu par le courtier. Un tel point d’entrée ressemblerait à ceci:

[7]:

    def after_work(self, worklist):

        new_ok=[]
        for m in worklist.ok:
             success=do_something()
             if success:
                   new_ok.append(m)
             # since it is already acknowledged, we can just drop it from ok.


        worklist.ok = new_ok
        # the messages on worklist.ok will get posted in the next algorithm phase.

Plugins qui renomment les fichiers

Le plugin ci-dessus modifie la disposition des fichiers à télécharger, en fonction de la classe GTStoWIS, qui prescrit une arborescence de répertoires différente en sortie. Il y a beaucoup de champs à mettre à jour lors de la modification du placement des fichiers, il est donc préférable d’utiliser:

msg.updatePaths( self.o, new_dir, new_file )

pour mettre à jour correctement tous les champs nécessaires dans le message. Il mettra à jour ‘new_baseURL’, ‘new_relPath’, ‘new_subtopic’ à utiliser lors de la publication.

La partie try/except de la routine traite du cas où, si un fichier arrive avec un nom à partir duquel une arborescence de rubriques ne peut pas être construite, une exception python peut se produire et le message est ajouté à la liste de travail ayant échoué et ne sera pas être traitées par des plugins ultérieurs.

Plugins qui Créent de Nouveaux Fichiers

La routine ci-dessus est parfaite lorsqu’un fichier vient d’être renommé. Si un plugin a besoin de créer de nouveaux fichiers vaguement dérivés du fichier d’entrée, alors vous voulez créer de nouveaux messages pour ces fichiers à partir de rien:

import sarracenia

m = sarracenia.Message.fromFileData(sample_fileName, self.o, os.stat(sample_fileName) )

La routine msg_fromFileData utilisera self.o pour appliquer les paramètres de publication appropriés. Aucune connaissance des formats de message ou de la construction de champs n’est nécessaire. Si le fichier n’est pas local, comme lors de l’écriture d’un rappel d’interrogation, un routage alternatif peut être utilisé:

m = sarracenia.Message.fromFileInfo(sample_fileName, self.o, fake_stat_info )

le faux enregistrement de statistiques (selon la page de manuel stat(2) ou python os.stat() ) peut être construit à partir d’autres champs, en commençant par:

import paramiko

fake_stat = paramiko.SFTPAttributes()
fake_stat.st_mtime = ... something else... perhaps an http header?
fake_stat.st_size = ... again will vary by context.

Dans tous les cas, une fois que vous avez le message, il peut être ajouté à la liste entrante.

Other Examples

Le sous-classement de Sarracenia.flowcb est utilisé en interne pour faire beaucoup de travail de base. C’est une bonne idée de regarder le code source de Sarracenia lui-même. Par exemple:

  • sarracenia.flowcb jetez un oeil au fichier init.py là qui fournit ces informations sur un format plus programmatiquement succinct.

  • sarracenia.flowcb.gather.file est une classe qui implémente: la publication de fichiers et la surveillance de répertoires, dans le sens d’un rappel qui implémente le point d’entrée gather, en lisant un système de fichiers et en construisant un liste des messages à traiter.

  • sarracenia.flowcb.gather.message est une classe qui implémente la réception de messages à partir de flux de protocole de file d’attente de messages.

  • sarracenia.flowcb.gather.nodupe Ce module supprime les doublons du message flux basés sur les sommes de contrôle d’intégrité.

  • sarracenia.flowcb.post.message est une classe qui implémente la publication messages vers flux de protocole de file d’attente de messages

  • sarracenia.flowcb.retry lorsque le transfert d’un fichier échoue. Sarracenia doit conserver le message pertinent dans un fichier d’état pour un moment ultérieur où il pourra être réessayé.

[ ]: