{ "cells": [ { "cell_type": "markdown", "id": "acoustic-deviation", "metadata": {}, "source": [ "# Personnalisez la gestion des fichiers avec les rappels.\n", "\n", "Tous les composants de Sarracenia implémentent *l'algorithme Flow*, avec différents\n", "rappels, dans le langage de programmation Python. La classe principale de Sarracenia (Python)\n", "est [sarracenia.flow](../Reference/code.html#module-sarracenia.flow) et la grande partie de la fonctionnalité de base est\n", "mis en œuvre à l'aide de la classe créée pour ajouter un traitement personnalisé à un flux, le\n", "classe flowcb (rappel de flux).\n", "\n", "Pour une discussion détaillée de l'algorithme de flux lui-même, jetez un oeil\n", "dans le manuel [Concepts](../Explanation/Concepts.html). Pour tout flux, on peut\n", "ajouter un traitement personnalisé à divers moments pendant le traitement par sous-classement\n", "la classe [sarracenia.flowcb](../Reference/flowcb.html).\n", "\n", "En bref, l'algorithme comporte les étapes suivantes :\n", "\n", "* **__init__(self, options)** -- lors de l'import, initialisation de python traditionnelle\n", "* **on_start** -- lorsqu'une instance est démarrée.\n", "* boucle pour toujours\n", " * **gather** -- collecte les messages à traiter appelés : worklist.incoming\n", " * **poll** -- une autre façon de collecter des messages, uniquement dans le composant poll.\n", " * **filter** -- applique les correspondances d'expressions régulières acceptées/rejetées à la liste de messages.\n", " déplace les messages pour les fichiers à ne pas télécharger de worklist.incoming vers worklist.reject\n", " * *after_accept* point d'entrée de rappel. traiter worklist.incoming, en en rejetant potentiellement d'autres.\n", " * **ack** -- les messages worklist.rejected sont reconnus à la source en amont lorsque le traitement est terminé.\n", " * **work** -- effectue un transfert ou une transformation sur un fichier.\n", " * **ack** -- les messages worklist.ok pour les fichiers transférés avec succès sont reconnus à la source en amont.\n", " * Point d'entrée du rappel *after_work*\n", " * **ack** -- les messages worklist.failed pour les fichiers qui n'ont pas été transférés avec succès sont reconnus.\n", " * **post** -- publier le résultat du travail effectué pour l'étape suivante.\n", " * occasionnellement... **on_housekeeping -- faire des nettoyages périodiques...\n", "* **on_stop** -- arrêt du traitement.\n", "\n", "pour plus de détails sur les points d'entrée disponibles de flowcb, consultez le code source:\n", "\n", "* [flowcb](../Reference/flowcb.html)\n" ] }, { "cell_type": "markdown", "id": "external-mention", "metadata": {}, "source": [ "Regardons l'utilisation de la classe dans une configuration :" ] }, { "cell_type": "code", "execution_count": 1, "id": "coordinated-cocktail", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2023-05-28 16:35:16,492 1919860 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: 10\n", "2023-05-28 16:35:16,493 1919860 [INFO] root remove removing subscribe/hpfx_amis\n", "\n", "add: 2023-05-28 16:35:17,637 1919863 [INFO] sarracenia.sr add copying: /home/peter/Sarracenia/sr3/sarracenia/examples/subscribe/hpfx_amis.conf to /home/peter/.config/sr3/subscribe/hpfx_amis.conf \n", "\n" ] } ], "source": [ "!sr3 remove subscribe/hpfx_amis.conf\n", "!sr3 add subscribe/hpfx_amis.conf" ] }, { "cell_type": "code", "execution_count": 2, "id": "tired-north", "metadata": {}, "outputs": [], "source": [ "!echo messageCountMax 10 >>~/.config/sr3/subscribe/hpfx_amis.conf" ] }, { "cell_type": "markdown", "id": "psychological-ratio", "metadata": {}, "source": [ "fait en sorte que le flux s'arrête après la consommation de 10 messages." ] }, { "cell_type": "code", "execution_count": 3, "id": "greater-nevada", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2023-05-28 16:35:26,803 1919870 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: 10\r\n", "Provided callback classes: ( /home/peter/Sarracenia/sr3/sarracenia ) \r\n", "flowcb/accept/dateappend.py flowcb/accept/delete.py \r\n", "flowcb/accept/downloadbaseurl.py flowcb/accept/hourtree.py \r\n", "flowcb/accept/httptohttps.py flowcb/accept/longflow.py \r\n", "flowcb/accept/pathreplace.py flowcb/accept/posthourtree.py \r\n", "flowcb/accept/postoverride.py flowcb/accept/printlag.py \r\n", "flowcb/accept/rename4jicc.py flowcb/accept/renamedmf.py \r\n", "flowcb/accept/renamewhatfn.py flowcb/accept/save.py \r\n", "flowcb/accept/speedo.py flowcb/accept/sundewpxroute.py \r\n", "flowcb/accept/testretry.py flowcb/accept/toclusters.py \r\n", "flowcb/accept/tohttp.py flowcb/accept/tolocal.py \r\n", "flowcb/accept/tolocalfile.py flowcb/accept/wmotypesuffix.py \r\n", "flowcb/amserver.py flowcb/clamav.py \r\n", "flowcb/destfn/replace.py flowcb/destfn/sample.py \r\n", "flowcb/download/mail_ingest.py flowcb/filter/deleteflowfiles.py \r\n", "flowcb/filter/fdelay.py flowcb/filter/pclean_f90.py \r\n", "flowcb/filter/pclean_f92.py flowcb/filter/wmo2msc.py \r\n", "flowcb/gather/file.py flowcb/gather/message.py \r\n", "flowcb/housekeeping/resources.py flowcb/log.py \r\n", "flowcb/mdelaylatest.py flowcb/nodupe/data.py \r\n", "flowcb/nodupe/name.py flowcb/pclean.py \r\n", "flowcb/poll/airnow.py flowcb/poll/mail.py \r\n", "flowcb/poll/nasa_mls_nrt.py flowcb/poll/nexrad.py \r\n", "flowcb/poll/noaa_hydrometric.py flowcb/poll/s3bucket.py \r\n", "flowcb/poll/usgs.py flowcb/post/message.py \r\n", "flowcb/retry.py flowcb/rootchown.py \r\n", "flowcb/run.py flowcb/rxqueue_gzip.py \r\n", "flowcb/sample.py flowcb/send/am.py \r\n", "flowcb/send/email.py flowcb/shiftdir2baseurl.py \r\n", "flowcb/trace_on_stop.py flowcb/v2wrapper.py \r\n", "flowcb/wistree.py flowcb/work/age.py \r\n", "flowcb/work/check.py flowcb/work/citypage_check.py \r\n", "flowcb/work/delete.py flowcb/work/rxpipe.py \r\n", "flowcb/work/send_egc_les.py \r\n" ] } ], "source": [ "!sr3 list fcb" ] }, { "cell_type": "markdown", "id": "efficient-picture", "metadata": {}, "source": [ "L'ajout de cette ligne à la configuration signifie que la sous-classe wistree flowcb (source ci-dessus) sera ajoutée au\n", "flux et modifier le traitement en faisant appeler ses routines... la principale étant *after_accept*" ] }, { "cell_type": "code", "execution_count": 4, "id": "external-commercial", "metadata": {}, "outputs": [], "source": [ "!echo callback accept.posthourtree >>~/.config/sr3/subscribe/hpfx_amis.conf" ] }, { "cell_type": "code", "execution_count": 5, "id": "insured-fetish", "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2023-05-28 16:35:34,235 1919873 [INFO] sarracenia.config finalize overriding batch for consistency with messageCountMax: 10\n", ".2023-05-28 16:35:34,349 [INFO] 1919874 sarracenia.config finalize overriding batch for consistency with messageCountMax: 10\n", "2023-05-28 16:35:34,355 [INFO] 1919874 sarracenia.config finalize overriding batch for consistency with messageCountMax: 10\n", "2023-05-28 16:35:34,355 [INFO] 1919874 sarracenia.flow loadCallbacks flowCallback plugins to load: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']\n", "2023-05-28 16:35:34,670 [INFO] 1919874 sarracenia.moth.amqp __getSetup queue declared q_anonymous_subscribe.hpfx_amis.33557703.14415188 (as: amqps://anonymous@hpfx.collab.science.gc.ca/) \n", "2023-05-28 16:35:34,670 [INFO] 1919874 sarracenia.moth.amqp __getSetup binding q_anonymous_subscribe.hpfx_amis.33557703.14415188 with v02.post.*.WXO-DD.bulletins.alphanumeric.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca/)\n", "2023-05-28 16:35:34,723 [INFO] 1919874 sarracenia.flowcb.log __init__ subscribe initialized with: {'on_housekeeping', 'post', 'after_post', 'after_work', 'after_accept'}\n", "2023-05-28 16:35:34,723 [INFO] 1919874 sarracenia.flow run callbacks loaded: ['sarracenia.flowcb.gather.message.Message', 'sarracenia.flowcb.retry.Retry', 'sarracenia.flowcb.housekeeping.resources.Resources', 'accept.posthourtree', 'log']\n", "2023-05-28 16:35:34,723 [INFO] 1919874 sarracenia.flow run pid: 1919874 subscribe/hpfx_amis.conf instance: 0\n", "2023-05-28 16:35:34,764 [INFO] 1919874 sarracenia.flow run now active on vip None\n", "2023-05-28 16:35:35,009 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:35,009 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.80 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/CA/CWAO/20/CACN45_CWAO_281300__OBQ_20965 \n", "2023-05-28 16:35:35,154 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/CACN45_CWAO_281300__OBQ_20965 \n", "2023-05-28 16:35:35,677 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:35,677 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.56 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___1729 \n", "2023-05-28 16:35:35,796 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___1729 \n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.55 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRWA20_KWAL_282035___43515 \n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.53 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SN/KWNB/20/SNVD15_KWNB_282000_RRO__50372 \n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 0.53 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SX/KWAL/20/SXAK50_KWAL_282035___51354 \n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 1.81 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___32251 \n", "2023-05-28 16:35:36,314 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 1.80 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___62598 \n", "2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRWA20_KWAL_282035___43515 \n", "2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SNVD15_KWNB_282000_RRO__50372 \n", "2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SXAK50_KWAL_282035___51354 \n", "2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___32251 \n", "2023-05-28 16:35:36,951 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___62598 \n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.accept.posthourtree after_accept post_hour_tree: new_dir: /tmp/hpfx_amis/20\n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/WA/KKCI/20/WAUS41_KKCI_282045___14468 \n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___20765 \n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/WA/KKCI/20/WAUS46_KKCI_282045___65023 \n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.26 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/WA/KKCI/20/WAUS44_KKCI_282045___40622 \n", "2023-05-28 16:35:37,133 [INFO] 1919874 sarracenia.flowcb.log after_accept accepted: (lag: 3.25 ) https://hpfx.collab.science.gc.ca /20230528/WXO-DD/bulletins/alphanumeric/20230528/SR/KWAL/20/SRCN40_KWAL_282035___41115 \n", "2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/WAUS41_KKCI_282045___14468 \n", "2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___20765 \n", "2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/WAUS46_KKCI_282045___65023 \n", "2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/WAUS44_KKCI_282045___40622 \n", "2023-05-28 16:35:37,636 [INFO] 1919874 sarracenia.flowcb.log after_work downloaded ok: /tmp/hpfx_amis/20/SRCN40_KWAL_282035___41115 \n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow please_stop ok, telling 5 callbacks about it.\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow run starting last pass (without gather) through loop for cleanup.\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow please_stop ok, telling 5 callbacks about it.\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flow run on_housekeeping pid: 1919874 subscribe/hpfx_amis.conf instance: 0\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.gather.message on_housekeeping messages: good: 12 bad: 0 bytes: 1.6 KiB average: 139 Bytes\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.retry on_housekeeping on_housekeeping\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping work_retry_00 on_housekeeping\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping No retry in list\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000117\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping post_retry_000 on_housekeeping\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping No retry in list\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.diskqueue on_housekeeping on_housekeeping elapse 0.000081\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.housekeeping.resources on_housekeeping Current Memory cpu_times: user=0.18 system=0.01\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.housekeeping.resources on_housekeeping Current mem usage: 54.8 MiB, accumulating count (12 or 12/100 so far) before self-setting threshold\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.log stats version: 3.00.40, started: 2 seconds ago, last_housekeeping: 2.9 seconds ago \n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.log stats messages received: 12, accepted: 12, rejected: 0 rate accepted: 100.0% or 4.1 m/s\n", "2023-05-28 16:35:37,637 [INFO] 1919874 sarracenia.flowcb.log stats files transferred: 12 bytes: 4.5 KiB rate: 1.5 KiB/sec\n", "2023-05-28 16:35:37,638 [INFO] 1919874 sarracenia.flowcb.log stats lag: average: 1.91, maximum: 3.26 \n", "2023-05-28 16:35:37,638 [INFO] 1919874 sarracenia.flowcb.log on_housekeeping housekeeping\n", "2023-05-28 16:35:37,638 [INFO] 1919874 sarracenia.flow run clean stop from run loop\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "2023-05-28 16:35:37,669 [INFO] 1919874 sarracenia.flowcb.gather.message on_stop closing\r\n", "2023-05-28 16:35:37,669 [INFO] 1919874 sarracenia.flow close flow/close completed cleanly pid: 1919874 subscribe/hpfx_amis.conf instance: 0\r\n", "\r\n" ] } ], "source": [ "!sr3 foreground subscribe/hpfx_amis.conf" ] }, { "cell_type": "markdown", "id": "stretch-directive", "metadata": {}, "source": [ "Sans le plugin, le téléchargement mettrait tous les fichiers directement dans le répertoire de réception. avec l'ajout du retour wistree, il place le fichier dans /tmp/hpfx_amis. Avec le changement, il le place dans l'arborescence des répertoires WIS et ajoute un suffixe selon le type de fichier." ] }, { "cell_type": "markdown", "id": "funny-isolation", "metadata": {}, "source": [ "## Entrées de fichier de configuration et rappels\n", "\n", "\n", "[flowcb.log](../Reference/flowcb.html#module-sarracenia.flowcb.log)\n", "\n", "Pour ajouter un retour à un flux, une ligne est ajoutée au fichier de configuration du flux:\n", "\n", " flowcb sarracenia.flowcb.log.Log\n", "\n", "Si vous suivez la convention et que le nom de la classe est une version en \n", "majuscules (Log) du nom de fichier (log), alors un raccourci est disponible:\n", "\n", " callback log \n", "\n", "Quoi qu'il en soit, cela entraînera l'importation de la classe par Sarracenia, puis\n", "chercher des points d'entrée dans la classe à appeler aux moments opportuns.\n", "\n", "Le constructeur de classe accepte un objet de classe sarracenia.config.Config, \n", "appelé options, qui stocke tous les paramètres à utiliser par le flux en cours d'exécution. \n", "Les options sont utilisées pour remplacer le comportement par défaut des flux et des rappels. \n", "L'argument de flowcb est une classe python standard qui doit se trouver dans le chemin \n", "python normal pour python *import*, et le dernier élément est le nom de la classe dans le fichier \n", "qui doit être instancié en tant qu'instance flowcb.\n", "\n", "un paramètre pour un rappel est déclaré comme suit :\n", "\n", " set sarracenia.flowcb.filter.log.Log.logLevel debug\n", "\n", "(le préfixe du paramètre correspond à la hiérarchie des types dans flowCallback)\n", "\n", "lorsque le constructeur du rappel est appelé, \n", "son argument options contiendra :\n", "\n", " options.logLevel = 'debug'\n", "\n", "Si aucun remplacement spécifique au module n'est présent, \n", "le paramètre le plus global est utilisé.\n", "\n", "Ainsi, l'utilisation des rappels peut être faite sans beaucoup de connaissances en python, \n", "juste la possibilité de créer des fichiers de configuration.\n", "\n", "Au-delà de ce point, nous trouvons des conseils pour les personnes qui souhaitent écrire \n", "leurs propres retours en Python. Les rappels sont en Python ordinaire, avec quelques plis:" ] }, { "cell_type": "markdown", "id": "shared-album", "metadata": {}, "source": [ "## Écrire Vos Propres Rappels\n", "\n", "Un rappel de flux est une classe python construite avec des routines nommées \n", "pour indiquer quand le programmeur veut qu'elles soient appelées. Pour ce faire, \n", "créez une routine qui sous-classe *sarracenia.flowcb.FlowCB* \n", "afin que la classe ait normalement:\n", "\n", " from sarracenia.flowcb import FlowCB\n", "\n", "parmi les importations dans le haut du fichier. \n", "Dans la partie principale du fichier, il y aura \n", "les classes de rappel personnalisées:\n", "\n", " class Myclass(FlowCB):\n", "\n", "déclarée comme sous-classe en tant que FlowCB. Les principales routines de la classe \n", "sont des points d'entrée qui seront appelés au moment où leur nom l'indique. \n", "S'il manque à une classe un point d'entrée donné, elle ne sera tout simplement pas appelée. \n", "La classe __init__() est utilisée pour initialiser les choses pour la classe de rappel :\n", "\n", " def __init__(self, options):\n", "\n", " self.o = options\n", "\n", " logging.basicConfig(format=self.o.logFormat,\n", " level=getattr(logging, self.o.logLevel.upper()))\n", " logger.setLevel(getattr(logging, self.o.logLevel.upper()))\n", "\n", " self.o.add_option( 'myoption', 'str', 'usuallyThis')\n", "\n", "Les lignes de configuration du logging dans __init__ permettent de définir \n", "un niveau de logging spécifique pour cette classe flowCallback. Une fois le \n", "passe-partout de logging terminé, la routine add_option pour définir les paramètres de la classe. \n", "Les utilisateurs peuvent les inclure dans les fichiers de configuration, tout comme les options intégrées:\n", "\n", " myoption IsReallyNeeded\n", "\n", "Le résultat d'un tel réglage est que *self.o.myoption = 'IsReallyNeeded'*. \n", "Si aucune valeur n'est définie dans la configuration, *self.o.myoption* sera par défaut *'usualThis'* \n", "Il existe différents *kinds* (types) d'options, où le type déclaré modifie l'analyse:\n", "\n", " 'count' type de nombre entier.\n", " 'duration' un nombre à virgule flottante indiquant une quantité de secondes (0.001 est 1 milliseconde)\n", " modifié par un suffixe d'unité ( m-minute, h-hour (heure), w-week(semaine) )\n", " 'flag' option booléenne (Vrai/Faux).\n", " 'list' une liste de valeurs de chaîne, chaque occurrence successive étant enchaînée au total.\n", " toutes les options du plugin v2 sont déclarées de type liste.\n", " 'taille' taille entière. Suffixes k, m et g pour les multiplicateurs kilo, méga et giga (base 2).\n", " 'str' une valeur de chaîne arbitraire, comme tous les types ci-dessus, chaque\n", " occurrence suivante remplace la précédente.\n", "\n" ] }, { "cell_type": "markdown", "id": "determined-medicare", "metadata": {}, "source": [ "## Listes De Travail\n", "\n", "Autre qu' *options*, l'autre argument principal des routines de rappel after_accept et after_work est la liste de travail. \n", "La liste de travail est donnée aux points d'entrée se produisant pendant le traitement des messages \n", "et est un certain nombre de listes de travail de messages:\n", "\n", " worklist.incoming --> messages to process (either new or retries.)\n", " worklist.ok --> successfully processed\n", " worklist.rejected --> messages to not be further processed.\n", " worklist.failed --> messages for which processing failed.\n", " failed messages will be retried.\n", " worklist.directories_ok --> list of directories created during processing.\n", "\n", "Initialement, tous les messages sont placés dans worklists.incoming. \n", "Si un plugin décide :\n", "\n", "- a message is not relevant, moved it to the rejected worklist.\n", "- a no further processing of the message is needed, move it to ok worklist.\n", "- an operation failed and it should be retried later, move to failed worklist.\n", "\n", "Ne supprimez pas de toutes les listes, déplacez uniquement les messages entre les \n", "listes de travail. Il est nécessaire de mettre les messages rejetés dans la liste de travail \n", "appropriée afin qu'ils soient reconnus comme reçus. Les messages ne peuvent\n", "être supprimés qu'après la prise en charge de l'accusé de réception." ] }, { "cell_type": "markdown", "id": "aa6cfe2b", "metadata": {}, "source": [ "## Sortie d'Exécution\n", "\n", "Python a une excellente journalisation intégrée et doit une fois utiliser le module \n", "de manière normale et pythonique, avec:\n", "\n", " import logging\n", "\n", "Après toutes les importations dans votre fichier source python, définissez \n", "un enregistreur pour le fichier source:\n", "\n", " logger = logging.getLogger(\\__name\\__)\n", "\n", "Comme c'est normal avec le module de journalisation Python, les messages \n", "peuvent ensuite être publiés dans le journal:\n", "\n", " logger.debug('got here')\n", "\n", "Chaque message du journal sera précédé de la classe et de la routine émettant\n", "le message de journal, ainsi que de la date/heure." ] }, { "cell_type": "markdown", "id": "million-smoke", "metadata": {}, "source": [ "## Exemple de sous-classe Flowcb\n", "\n", "Avec les informations ci-dessus sur la gestion des options, les listes de travail et la journalisation, \n", "nous sommes prêts à comprendre le module wistree que nous venons d'utiliser.\n", "Cette classe wistree.py accepte les fichiers dont les noms commencent par AHL et renomme l'arborescence \n", "de répertoires dans un standard différent, celui en évolution pour le WMO WIS 2.0 (pour plus d'informations sur ce module: \n", "https://github.com/wmo-im/GTStoWIS2)" ] }, { "cell_type": "code", "execution_count": 6, "id": "related-consensus", "metadata": {}, "outputs": [ { "ename": "ModuleNotFoundError", "evalue": "No module named 'GTStoWIS2'", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn[6], line 3\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01msarracenia\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mflowcb\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m FlowCB\n\u001b[1;32m 2\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mlogging\u001b[39;00m\n\u001b[0;32m----> 3\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mGTStoWIS2\u001b[39;00m\n\u001b[1;32m 5\u001b[0m logger \u001b[38;5;241m=\u001b[39m logging\u001b[38;5;241m.\u001b[39mgetLogger(\u001b[38;5;18m__name__\u001b[39m)\n\u001b[1;32m 8\u001b[0m \u001b[38;5;28;01mclass\u001b[39;00m \u001b[38;5;21;01mWistree\u001b[39;00m(FlowCB):\n", "\u001b[0;31mModuleNotFoundError\u001b[0m: No module named 'GTStoWIS2'" ] } ], "source": [ " from sarracenia.flowcb import FlowCB\n", " import logging\n", " import GTStoWIS2\n", "\n", " logger = logging.getLogger(__name__)\n", "\n", "\n", " class Wistree(FlowCB):\n", "\n", " def __init__(self, options):\n", "\n", " if hasattr(options, 'logLevel'):\n", " logger.setLevel(getattr(logging, options.logLevel.upper()))\n", " else:\n", " logger.setLevel(logging.INFO)\n", " self.topic_builder=GTStoWIS2.GTStoWIS2()\n", " self.o = options\n", "\n", "\n", " def after_accept(self, worklist):\n", "\n", " new_incoming=[]\n", "\n", " for msg in worklist.incoming:\n", "\n", " # fix file name suffix.\n", " type_suffix = self.topic_builder.mapAHLtoExtension( msg['new_file'][0:2] )\n", " tpfx=msg['subtopic']\n", " \n", " # input has relpath=/YYYYMMDDTHHMM/... + pubTime\n", " # need to move the date from relPath to BaseDir, adding the T hour from pubTime.\n", " try:\n", " new_baseSubDir=tpfx[0]+msg['pubTime'][8:11]\n", " t='.'.join(tpfx[0:2])+'.'+new_baseSubDir\n", " new_baseDir = msg['new_dir'] + os.sep + new_baseSubDir\n", " new_relDir = 'WIS' + os.sep + self.topic_builder.mapAHLtoTopic(msg['new_file'])\n", " new_dir = new_baseDir + os.sep + new_relDir\n", " \n", " if msg['new_file'][-len(type_suffix):] != type_suffix: \n", " new_file = msg['new_file']+type_suffix\n", " else:\n", " new_file = msg['new_file']\n", " \n", " msg.updatePaths( self.o, new_baseDir + os.sep + new_relDir, new_file )\n", " except Exception as ex:\n", " logger.error( \"skipped\" , exc_info=True )\n", " worklist.failed.append(msg)\n", " continue\n", " \n", " msg['_deleteOnPost'] |= set( [ 'from_cluster', 'sum', 'to_clusters' ] )\n", " new_incoming.append(msg)\n", "\n", " worklist.incoming=new_incoming \n", "\n" ] }, { "cell_type": "markdown", "id": "offshore-student", "metadata": {}, "source": [ "\n", "## Plugins qui changent la façon dont un fichier est téléchargé\n", "\n", "\n", "La routine *after_accept* est l'une des deux plus couramment utilisées. Il est utilisé pour modifier le traitement avant le téléchargement ou l'envoi d'un fichier. Pour traiter le fichier après son téléchargement, le point d'entrée *after_work* est utilisé pour traiter la liste worklist.ok (fichiers qui ont été téléchargés avec succès).\n", "\n", "La routine after_accept a une boucle externe qui parcourt toute la liste des messages entrants. Il construit une nouvelle liste de messages entrants à partir de ceux qu'il accepte, tout en ajoutant tous les messages rejetés à *worklist.failed.* La liste est juste une liste de messages, où chaque message est un dictionnaire python avec tous les champs stockés dans un message au format v03. Dans le message, il y a, par exemple, les champs *baseURL* et *relPath* :\n", "\n", "* baseURL - la baseURL de la ressource à partir de laquelle un fichier serait obtenu.\n", "* relPath - le chemin relatif à ajouter à la baseURL pour obtenir l'URL de téléchargement complète.\n", "\n", "Cela se produit avant que le transfert (téléchargement ou envoi, ou traitement) du fichier ait eu lieu, on peut donc changer le comportement en modifiant les champs du message. Normalement, les chemins de téléchargement (appelés new_dir et new_file) refléteront l'intention de copier l'arborescence source d'origine. donc si vous avez *a/b/c.txt* sur l'arborescence des sources et que vous téléchargez dans le répertoire *mine* sur le système local, le new_dir serait *mine/a/b* et new_file serait *c.txt*.\n", "\n", "\n", "## Plugins qui Traitent un Fichier après son Téléchargement\n", "\n", "Un cas d'utilisation courant est pour les plugins avec un point d'entrée *after_work* pour lire le fichier après son téléchargement et le transformer en un produit dérivé avec un nom différent. Ainsi, le nouveau fichier est créé comme dans la section précédente. Le message pour le fichier téléchargé doit encore être déplacé sur une liste pour s'assurer qu'il est reconnu par le courtier. Un tel point d'entrée ressemblerait à ceci:" ] }, { "cell_type": "code", "execution_count": 7, "id": "disciplinary-dublin", "metadata": {}, "outputs": [], "source": [ "\n", " def after_work(self, worklist):\n", "\n", " new_ok=[]\n", " for m in worklist.ok:\n", " success=do_something()\n", " if success:\n", " new_ok.append(m)\n", " # since it is already acknowledged, we can just drop it from ok.\n", " \n", " \n", " worklist.ok = new_ok\n", " # the messages on worklist.ok will get posted in the next algorithm phase." ] }, { "cell_type": "markdown", "id": "coastal-moses", "metadata": {}, "source": [ "\n", "## Plugins qui renomment les fichiers\n", "\n", "\n", "Le plugin ci-dessus modifie la disposition des fichiers à télécharger, en fonction de la classe [GTStoWIS](https://github.com/wmo-im/GTStoWIS), qui prescrit une arborescence de répertoires différente en sortie. Il y a beaucoup de champs à mettre à jour lors de la modification du placement des fichiers, il est donc préférable d'utiliser:\n", "\n", " msg.updatePaths( self.o, new_dir, new_file )\n", "\n", "pour mettre à jour correctement tous les champs nécessaires dans le message. Il mettra à jour 'new_baseURL', 'new_relPath', 'new_subtopic' à utiliser lors de la publication.\n", "\n", "La partie try/except de la routine traite du cas où, si un fichier arrive avec un nom à partir duquel une arborescence de rubriques ne peut pas être construite, une exception python peut se produire et le message est ajouté à la liste de travail ayant échoué et ne sera pas être traitées par des plugins ultérieurs.\n", "\n", "## Plugins qui Créent de Nouveaux Fichiers\n", "\n", "\n", "La routine ci-dessus est parfaite lorsqu'un fichier vient d'être renommé. Si un plugin a besoin de créer de nouveaux fichiers vaguement dérivés du fichier d'entrée, alors vous voulez créer de nouveaux messages pour ces fichiers à partir de rien:\n", "\n", " import sarracenia\n", "\n", " m = sarracenia.Message.fromFileData(sample_fileName, self.o, os.stat(sample_fileName) )\n", "\n", "La routine msg_fromFileData utilisera self.o pour appliquer les paramètres de publication appropriés. \n", "Aucune connaissance des formats de message ou de la construction de champs n'est nécessaire. Si le fichier n'est pas local, \n", "comme lors de l'écriture d'un rappel d'interrogation, un routage alternatif peut être utilisé: \n", "\n", " m = sarracenia.Message.fromFileInfo(sample_fileName, self.o, fake_stat_info )\n", "\n", "le faux enregistrement de statistiques (selon la page de manuel stat(2) ou python os.stat() ) peut être construit à partir d'autres champs, en commençant par:\n", "\n", " import paramiko\n", "\n", " fake_stat = paramiko.SFTPAttributes()\n", " fake_stat.st_mtime = ... something else... perhaps an http header?\n", " fake_stat.st_size = ... again will vary by context.\n", " \n", "Dans tous les cas, une fois que vous avez le message, il peut être ajouté à la liste entrante.\n" ] }, { "cell_type": "markdown", "id": "inclusive-scope", "metadata": {}, "source": [ "## Other Examples\n", "\n", "\n", "Le sous-classement de [Sarracenia.flowcb](../Reference/flowcb.html) est utilisé en interne pour faire beaucoup de travail de base. C'est une bonne idée de regarder le code source de Sarracenia lui-même. Par exemple:\n", "\n", "* [sarracenia.flowcb](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/__init__.py) jetez un oeil au fichier __init__.py là qui fournit ces informations sur un format plus programmatiquement succinct.\n", "\n", "* [sarracenia.flowcb.gather.file](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/gather/file.py)\n", " est une classe qui implémente:\n", " la publication de fichiers et la surveillance de répertoires, dans le sens d'un rappel qui\n", " implémente le point d'entrée *gather*, en lisant un système de fichiers et en construisant un\n", " liste des messages à traiter.\n", "\n", "* [sarracenia.flowcb.gather.message](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/gather/message.py)\n", "est une classe qui implémente la réception de messages à partir de flux de protocole de file d'attente de messages.\n", "\n", "* [sarracenia.flowcb.gather.nodupe](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/nodupe)\n", "Ce module supprime les doublons du message\n", " flux basés sur les sommes de contrôle d'intégrité.\n", "\n", "* [sarracenia.flowcb.post.message](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/post/message.py)\n", "est une classe qui implémente la publication\n", " messages vers flux de protocole de file d'attente de messages\n", "\n", "* [sarracenia.flowcb.retry](https://github.com/MetPX/Sarracenia/blob/development/sarracenia/flowcb/retry.py)\n", "lorsque le transfert d'un fichier échoue. Sarracenia doit conserver le message pertinent dans un fichier d'état pour\n", " un moment ultérieur où il pourra être réessayé.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "expensive-yellow", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" } }, "nbformat": 4, "nbformat_minor": 5 }