Un premier exemple utilisant l’API Sarracenia Moth

Sarracenia est un package conçu pour annoncer la disponibilité de nouvelles données, généralement sous forme de fichiers. Nous plaçons les fichiers sur des serveurs standard, les rendons disponibles via le Web ou sftp, et informons les utilisateurs qu’ils sont arrivés à l’aide de messages.

Sarracenia utilise des protocoles de transmission de messages standard existants, comme rabbitmq/AMQP pour transporter les messages, et dans les cercles de transmission de messages, car le serveur qui distribue les messages est appelé un courtier (broker).

Nous appelons la combinaison d’un courtier de messages et d’un serveur de fichiers (qui peut être un serveur unique ou un grand cluster) une pompe de données (data pump).

En supposant que vous avez installé le paquet metpx-sr3, soit en tant que paquet debian, ou via pip, les annonces d’accès à sens unique à utiliser avec la classe sarracenia.moth (Messages Organisés par les en-têtes de sujet), qui permet à un programme python de se connecter à un serveur Sarracenia, et commencer à recevoir des messages qui annoncent des ressources.

La fabrique pour construire les objets sarracenia.moth prend deux arguments :

  • courtier : un objet (Credential) contenant une url pointant vers le serveur de messagerie qui annonce des produits, et d’autres options associées.

  • options : un dictionnaire d’autres paramètres que la classe pourrait utiliser.

L’exemple ci-dessous construit un appel à un courtier auquel tout le monde peut accéder et demander 10 annonces.

Vous pouvez l’exécuter, puis nous pourrons discuter de quelques paramètres :

[9]:
import sarracenia.moth
import sarracenia.moth.amqp
import sarracenia.credentials

import time
import socket


options = sarracenia.moth.default_options
options.update(sarracenia.moth.amqp.default_options)
options['broker'] = sarracenia.credentials.Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
options['topicPrefix'] = [ 'v02', 'post' ]
options['bindings'] = [('xpublic', ['v02', 'post'], ['#'])]
options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_QuelquechoseDUtile'

print('options: %s' % options)


options: {'acceptUnmatched': True, 'batch': 25, 'bindings': [('xpublic', ['v02', 'post'], ['#'])], 'broker': <sarracenia.credentials.Credential object at 0x7f602ee7d780>, 'dry_run': False, 'exchange': None, 'expire': None, 'inline': False, 'inlineEncoding': 'guess', 'inlineByteMax': 4096, 'logFormat': '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s', 'logLevel': 'info', 'messageDebugDump': False, 'message_strategy': {'reset': True, 'stubborn': True, 'failure_duration': '5m'}, 'message_ttl': 0, 'topicPrefix': ['v02', 'post'], 'tlsRigour': 'normal', 'auto_delete': False, 'durable': True, 'exchangeDeclare': True, 'prefetch': 25, 'queueName': 'q_anonymous_fractal_QuelquechoseDUtile', 'queueBind': True, 'queueDeclare': True, 'reset': False, 'subtopic': [], 'vhost': '/', 'queue_name': 'q_anonymous_fractal_QuelquechoseDUtile'}

Le paramètre courtier(broker) est un objet contenant une URL conventionnelle et d’autres options, indiquant le protocole de messagerie à utiliser pour accéder au serveur en amont. Lorsque vous vous connectez à un courtier, vous devez lui indiquer les messages qui vous intéressent. Dans Moth, tous les courtiers auxquels nous accédons doivent utiliser des hiérarchies de sujets. Vous pouvez les voir si vous avez exécuté avec succès l’exemple ci-dessus, il devrait y avoir dans les impressions de message un élément “sujet”(topic) dans les dictionnaires. En voici un exemple :

v02.post.20210213.WXO-DD.observations.swob-ml.20210213.CTZR

Celle-ci se divise en deux parties :

  • topic_prefix: v02.post

  • le reste de l’arborescence des rubriques est le reflet du chemin vers le produit annoncé, par rapport à un répertoire de base.

Dans AMQP, il y a le concept des “échanges” qui sont en quelque sorte comparables aux chaînes de télévision… ce sont des regroupements d’annonces. donc pour se connecter à un courtier AMQP, il faut spécifier:

  • exchange: Sarracenia promulgue xpublic comme défaut conventionnel.

  • topic_prefix: décidez quelle version des messages vous souhaitez obtenir. Ce serveur produit des v02.

  • subtopic: à quel sous-ensemble de messages topic_prefix voulons-nous nous abonner.

Liaisons

L’option de liaisons définit les trois valeurs ci-dessus. dans l’exemple, les liaisons sont :

  • topic_prefix: v02.post (obtenir des messages v02.)

  • exchange: xpublic (celui par défaut.)

  • subtopic: # ( un joker AMQP signifiant tout. )

on se connecte au courtier

amqp://hpfx.collab.science.gc.ca, sur l’échange xpublic, et nous serons intéressés par tous les messages correspondant à la spécification de sujet v02.post.#… (c’est-à-dire tous les messages v02 disponibles .)

sous-thème

Le sous-thème ici ( # ) correspond à tout ce qui est produit sur le serveur. Plus le sous-thème est large, plus il y a de messages à envoyer et plus le traitement est important. Il est préférable de le rendre plus étroit. En prenant l’exemple ci-dessus, si nous sommes intéressés par swob, un sous-thème comme:

  • *.WXO-DD.observations.swob-ml.#

correspondrait à tous les swobs similaires à celui ci-dessus, mais évitez de vous envoyer des messages pour des non-swobs.

queue_name

Par convention, dans les courtiers administrés par Sarracenia, les utilisateurs ne peuvent créer que des files d’attente commençant par q_ suivi de leur nom d’utilisateur. nous nous sommes connectés en tant qu’anonymes, et donc q_anonymous doit être utilisé. Après cela, le reste peut être ce que vous voulez, mais il y a quelques considérations :

  • Si vous souhaitez démarrer plusieurs processus Python pour partager un flux de données, ils spécifient tous le même nom de file d’attente et ils partageront le flux de messages. Il s’adapte bien à quelques dizaines de téléchargeurs coopérants, mais ne s’adapte pas à l’infini, ne vous attendez pas à plus d’environ 99 processus pour pouvoir partager efficacement une charge à partir d’une seule file d’attente. Pour évoluer au-delà de cela avec AMQP, plusieurs sélections sont préférables.

  • si vous allez demander de l’aide aux administrateurs de la pompe de données … vous devrez leur fournir le nom de la file d’attente, et ils devront peut-être pouvoir le choisir parmi des centaines ou des milliers qui se trouvent sur le serveur.

Messages

Différents protocoles de messagerie ont différentes structures et conventions de stockage. la classe MoTH renvoie les messages sous forme de dictionnaires python, quel que soit le protocole utilisé pour les obtenir ou, en cas de transfert, pour les transmettre. On peut ajouter des champs pour une utilisation programmatique aux messages simplement en ajoutant des éléments au dictionnaire. S’ils sont uniquement destinés à un usage interne, ajoutez le nom de l’élément du dictionnaire à la clé spéciale ‘_deleteOnPost’, afin que l’élément du dictionnaire soit supprimé lors du transfert du message.

Ack

Les messages sont marqués en transit par le courtier, et si vous ne les reconnaissez pas, la pompe de données les conservera et les réexpédiera éventuellement. conserver les messages en attente en mémoire ralentira également le traitement de tous les messages. Il faut accuser réception des messages dès que possible, mais pas si tôt que vous perdrez des données si le programme est interrompu. Dans l’exemple, nous reconnaissons après avoir fait notre travail d’impression du message.

[10]:
h = sarracenia.moth.Moth.subFactory(options)

count=0
bon=0 # compteur des messages bien reçus

while count < 5:
    m = h.getNewMessage()  #get only one Message
    if m is not None:
        print("message %d: %s" % (count,m) )
        content = m.getContent()
        print("le premier 50 octets du fichier annoncé: %s" % content[0:50])
        bon += 1
        h.ack(m)
    time.sleep(0.1)
    count += 1

h.cleanup() # remove server-side queue defined by Factory.
h.close()
print( f"{bon} messages bien reçus")

2023-05-28 15:01:16,250 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous_fractal_QuelquechoseDUtile (as: amqps://anonymous@hpfx.collab.science.gc.ca)
2023-05-28 15:01:16,251 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous_fractal_QuelquechoseDUtile with v02.post.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)
message 0: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'ack_id', '_format', 'local_offset', 'exchange'}, 'sundew_extension': 'CMC:HRDPS:GRIB2:BIN::20230528190111', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_5eebe93b78f7f20d6c58dff7079f17f8:CMC:HRDPS:GRIB2:BIN::20230528190111', 'source': 'WXO-DD', 'mtime': '20230528T190113.733', 'atime': '20230528T190113.733', 'pubTime': '20230528T190113.733', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/model_hrdps/north/grib2/12/006/CMC_hrdps_north_HGT_ISBL_1000_ps2.5km_2023052812_P006-00.grib2', 'subtopic': ['20230528', 'WXO-DD', 'model_hrdps', 'north', 'grib2', '12', '006'], 'identity': {'method': 'md5', 'value': 'DcEZ6+fx637myOUf83VyDQ=='}, 'size': 236654, 'exchange': 'xpublic', 'ack_id': 1, 'local_offset': 0}
le premier 50 octets du fichier annoncé: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x03\x9cn\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1c\x0c\x00\x00\x01\x02\x00\x00\x00A\x03\x00\x00\x12q1\x00\x00\x00'
message 1: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'ack_id', '_format', 'local_offset', 'exchange'}, 'sundew_extension': 'CMC:HRDPS:GRIB2:BIN::20230528190111', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_abed0a37b3c8b8511cc78b6f8c5c6a82:CMC:HRDPS:GRIB2:BIN::20230528190111', 'source': 'WXO-DD', 'mtime': '20230528T190114.13', 'atime': '20230528T190114.13', 'pubTime': '20230528T190114.13', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/model_hrdps/north/grib2/12/006/CMC_hrdps_north_SPFH_ISBL_0150_ps2.5km_2023052812_P006-00.grib2', 'subtopic': ['20230528', 'WXO-DD', 'model_hrdps', 'north', 'grib2', '12', '006'], 'identity': {'method': 'md5', 'value': 'oMQDWV/QlF9aLLGOu+Tumw=='}, 'size': 330883, 'exchange': 'xpublic', 'ack_id': 2, 'local_offset': 0}
le premier 50 octets du fichier annoncé: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x05\x0c\x83\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1c\x0c\x00\x00\x01\x02\x00\x00\x00A\x03\x00\x00\x12q1\x00\x00\x00'
message 2: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'ack_id', '_format', 'local_offset', 'exchange'}, 'sundew_extension': 'CMC:HRDPS:GRIB2:BIN::20230528190111', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_7af6caa9fd11fae11919525225783fad:CMC:HRDPS:GRIB2:BIN::20230528190111', 'source': 'WXO-DD', 'mtime': '20230528T190113.823', 'atime': '20230528T190113.823', 'pubTime': '20230528T190113.823', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/model_hrdps/north/grib2/12/006/CMC_hrdps_north_DEPR_ISBL_0850_ps2.5km_2023052812_P006-00.grib2', 'subtopic': ['20230528', 'WXO-DD', 'model_hrdps', 'north', 'grib2', '12', '006'], 'identity': {'method': 'md5', 'value': 'zSw+zw6P1XlQayy+CjoLAg=='}, 'size': 194315, 'exchange': 'xpublic', 'ack_id': 3, 'local_offset': 0}
le premier 50 octets du fichier annoncé: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x02\xf7\x0b\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1c\x0c\x00\x00\x01\x02\x00\x00\x00A\x03\x00\x00\x12q1\x00\x00\x00'
message 3: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'ack_id', '_format', 'local_offset', 'exchange'}, 'sundew_extension': 'CMC:HRDPS:GRIB2:BIN::20230528190112', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_e02209b0564eb4b19dd746af9eb5ee9c:CMC:HRDPS:GRIB2:BIN::20230528190112', 'source': 'WXO-DD', 'mtime': '20230528T190114.89', 'atime': '20230528T190114.89', 'pubTime': '20230528T190114.89', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/model_hrdps/north/grib2/12/006/CMC_hrdps_north_WDIR_TGL_40_ps2.5km_2023052812_P006-00.grib2', 'subtopic': ['20230528', 'WXO-DD', 'model_hrdps', 'north', 'grib2', '12', '006'], 'identity': {'method': 'md5', 'value': 'GpPL5qQEOn0ALfuOzQrIHw=='}, 'size': 529466, 'exchange': 'xpublic', 'ack_id': 4, 'local_offset': 0}
le premier 50 octets du fichier annoncé: b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x08\x14:\x00\x00\x00\x15\x01\x006\x00\x00\x04\x00\x01\x07\xe7\x05\x1c\x0c\x00\x00\x01\x02\x00\x00\x00A\x03\x00\x00\x12q1\x00\x00\x00'
2023-05-28 15:01:17,548 [INFO] sarracenia.moth.amqp getCleanUp deleteing queue q_anonymous_fractal_QuelquechoseDUtile
message 4: {'_format': 'v02', '_deleteOnPost': {'subtopic', 'ack_id', '_format', 'local_offset', 'exchange'}, 'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528190109', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'ALL', 'filename': 'msg_ddsr-WXO-DD_0926936c6c7b2968e12487b5e10b3bc9:DMS:WXO_RENAMED_SWOB2:MSC:XML::20230528190109', 'source': 'WXO-DD', 'mtime': '20230528T190111.364', 'atime': '20230528T190111.364', 'pubTime': '20230528T190111.364', 'baseUrl': 'https://hpfx.collab.science.gc.ca', 'relPath': '/20230528/WXO-DD/observations/swob-ml/20230528/CVKU/2023-05-28-1900-CVKU-AUTO-minute-swob.xml', 'subtopic': ['20230528', 'WXO-DD', 'observations', 'swob-ml', '20230528', 'CVKU'], 'identity': {'method': 'md5', 'value': 'WEEsvB9/BKQC1Pv9hgO3LA=='}, 'size': 6426, 'exchange': 'xpublic', 'ack_id': 5, 'local_offset': 0}
le premier 50 octets du fichier annoncé: b'<?xml version="1.0" encoding="UTF-8" standalone="n'
5 messages bien reçus

2ème exemple … combinez baseURL + relPath (en parlant de retPath) et récupérez les données … utilisez newMessages() au lieu de getNewMessage pour afficher une autre interface utilisateur de consommation. Parler de http, et comment la récupération variera en fonction du protocole répertorié dans la baseUrl, et peut être compliqué.

[12]:
import urllib.request
import xml.etree.ElementTree as ET


options['bindings'] = [('xpublic', [ 'v02', 'post'], \
        [ '*', 'WXO-DD', 'observations', 'swob-ml', '#'] )]

h = sarracenia.moth.Moth.subFactory(options)

count=0

while count < 10:
    messages = h.newMessages()  #get all received Messages, upto options['batch'] of them at a time.
    for m in messages:
        dataUrl = m['baseUrl']
        if 'retPath' in m:
           dataUrl += m['retPath']
        else:
           dataUrl += m['relPath']

        print("url %d: %s" % (count,dataUrl) )
        with urllib.request.urlopen( dataUrl ) as f:
            vxml = f.read().decode('utf-8')
            xmlData = ET.fromstring(vxml)

            stn_name=''
            tc_id=''
            lat=''
            lon=''
            air_temp=''

            for i in xmlData.iter():
                name = i.get('name')
                if name == 'stn_nam' :
                   stn_name= i.get('value')
                elif name == 'tc_id' :
                   tc_id = i.get('value')
                elif name == 'lat' :
                   lat =  i.get('value')
                elif name == 'long' :
                   lon  = i.get('value')
                elif name == 'air_temp' :
                   air_temp = i.get('value')

            print( 'station: %s, tc_id: %s, lat: %s, long: %s, air_temp: %s' %
                   ( stn_name, tc_id, lat, lon, air_temp  ))
        h.ack(m)
        count += 1
        if count > 10:
            break
    time.sleep(1)

h.cleanup() # remove server-side queue defined by Factory.
h.close()
print("obtained 10 product temperatures")

2023-05-28 15:01:57,568 [INFO] sarracenia.moth.amqp __getSetup queue declared q_anonymous_fractal_QuelquechoseDUtile (as: amqps://anonymous@hpfx.collab.science.gc.ca)
2023-05-28 15:01:57,568 [INFO] sarracenia.moth.amqp __getSetup binding q_anonymous_fractal_QuelquechoseDUtile with v02.post.*.WXO-DD.observations.swob-ml.# to xpublic (as: amqps://anonymous@hpfx.collab.science.gc.ca)
url 0: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CWZU/2023-05-28-1900-CWZU-AUTO-swob.xml
station: SHEARWATER JETTY, tc_id: WZU, lat: 44.628055, long: -63.5225, air_temp: 23.3
url 1: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CYVR/2023-05-28-1900-CYVR-MAN-swob.xml
station: Vancouver International, tc_id: , lat: 49.19470, long: -123.18400, air_temp: 18.2
url 2: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CAHK/2023-05-28-1900-CAHK-AUTO-swob.xml
station: HALIFAX KOOTENAY, tc_id: AHK, lat: 44.5875, long: -63.55, air_temp: 25.2
url 3: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CPCN/2023-05-28-1900-CPCN-AUTO-swob.xml
station: CAPPON, tc_id: PCN, lat: 51.066947, long: -110.796689, air_temp: 20.8
url 4: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CWMT/2023-05-28-1901-CWMT-AUTO-minute-swob.xml
station: WHATI, tc_id: WMT, lat: 63.1343, long: -117.244497, air_temp: 11.3
url 5: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CWPO/2023-05-28-1901-CWPO-AUTO-minute-swob.xml
station: PILOT MOUND (AUT), tc_id: WPO, lat: 49.187933, long: -98.9064, air_temp: 30.3
url 6: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CVOC/2023-05-28-1858-CVOC-AUTO-minute-swob.xml
station: WHISTLER - NESTERS, tc_id: VOC, lat: 50.1289, long: -122.9546, air_temp: 18.1
url 7: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CXCP/2023-05-28-1900-CXCP-AUTO-swob.xml
station: CHAMPION AGDM, tc_id: XCP, lat: 50.281945, long: -113.350278, air_temp: 23.7
url 8: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CYZG/2023-05-28-1900-CYZG-MAN-swob.xml
station: Salluit, tc_id: , lat: 62.17940, long: -75.66720, air_temp: 2.4
url 9: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CVOC/2023-05-28-1900-CVOC-AUTO-swob.xml
station: WHISTLER - NESTERS, tc_id: VOC, lat: 50.1289, long: -122.9546, air_temp: 18.1
url 10: https://hpfx.collab.science.gc.ca/20230528/WXO-DD/observations/swob-ml/20230528/CNZS/2023-05-28-1900-CNZS-AUTO-swob.xml
station: CORAL HARBOUR RCS, tc_id: NZS, lat: 64.187831, long: -83.347054, air_temp: -1.5
2023-05-28 15:02:01,891 [INFO] sarracenia.moth.amqp getCleanUp deleteing queue q_anonymous_fractal_QuelquechoseDUtile
obtained 10 product temperatures

Télécharger des données avec Python

Vous pouvez utiliser la bibliothèque python urllib pour télécharger des données, puis les analyser. Dans cet exemple, les données sont une structure XML par message téléchargé et lu en mémoire. Certaines données de station sont ensuite imprimées.

Cela fonctionne bien avec urllib pour les ressources de protocole de transport hyper-test, mais d’autres ressources peuvent être annoncées à l’aide d’autres protocoles, tels que sftp ou ftp. Le code python devra être étendu pour traiter avec d’autres protocoles, ainsi que des conditions d’erreur, telles que des pannes temporaires.

Conclusion

Sarracenia.moth.amqp est le moyen le plus léger d’ajouter la consommation de messages Sarracenia à votre pile python existante. Vous demandez explicitement de nouveaux messages lorsque vous êtes prêt à les utiliser.

Ce type d’intégration ne fournit pas:

  • data retrieval: vous avez besoin de votre propre code pour télécharger les données correspondantes,

  • error recovery: s’il y a des erreurs transitoires, vous devez créer un code de récupération d’erreur (pour récupérer des téléchargements partiels.)

  • async/event/data driven: une façon de dire “faites ceci chaque fois que vous obtenez un fichier” … définissez les rappels à exécuter lorsqu’un événement particulier se produit, plutôt que le flux séquentiel illustré ci-dessus.

La classe sarracenia.flow fournit des téléchargements, une récupération d’erreur et une API asynchrone à l’aide de la classe sarracenia.flowcb (flowCallback).

[ ]: