Publication à partir du code Python
Si vous avez du code python qui crée déjà des fichiers, et vous ne souhaitez pas invoquer un programme séparé pour publier les fichiers, vous pouvez accéder facilement à la logique de publication des messages, étant donné un fichier existant.
Cet exemple concerne la création par programme de publications pour des fichiers. Il nécessite un accès en écriture à un courtier (broker), avec un utilisateur autorisé à publier sur un échange.
Besoin d’établir une configuration de publication, puis d’instancier un posting_engine (une instance qui peut être utilisée pour publier des messages.)
[1]:
import sarracenia
import sarracenia.moth
import sarracenia.credentials
from sarracenia.config import default_config
import os
import time
import socket
cfg = default_config()
cfg.logLevel = 'debug'
cfg.broker = sarracenia.credentials.Credential('amqp://tfeed:password@localhost')
cfg.exchange = 'xpublic'
cfg.post_baseUrl = 'http://host'
cfg.post_baseDir = '/tmp'
# moth wants a dict as options, rather than sarracenia.config.Config instance.
posting_engine = sarracenia.moth.Moth.pubFactory(cfg.broker, cfg.dictify() )
2022-02-04 14:56:31,477 [DEBUG] amqp _on_start Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@flow', 'copyright': 'Copyright (c) 2007-2019 Pivotal Software, Inc.', 'information': 'Licensed under the MPL 1.1. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 22.2.7', 'product': 'RabbitMQ', 'version': '3.8.2'}, mechanisms: [b'AMQPLAIN'], locales: ['en_US']
2022-02-04 14:56:31,494 [DEBUG] amqp __init__ using channel_id: 1
2022-02-04 14:56:31,529 [DEBUG] amqp _on_open_ok Channel open
ensuite, nous créons un fichier texte…
[2]:
sample_fileName = '/tmp/sample.txt'
sample_file = open( sample_fileName , 'w')
sample_file.write(
"""
CACN00 CWAO 161800
PMN
160,2021,228,1800,1065,100,-6999,20.49,43.63,16.87,16.64,323.5,9.32,27.31,1740,317.8,19.22,1.609,230.7,230.7,230.7,230.7,0,0,0,16.38,15.59,305.
9,17.8,16.38,19.35,55.66,15.23,14.59,304,16.67,3.844,20.51,18.16,0,0,-6999,-6999,-6999,-6999,-6999,-6999,-6999,-6999,0,0,0,0,0,0,0,0,0,0,0,0,0,
13.41,13.85,27.07,3473
"""
)
sample_file.close()
vous donnez le nom du fichier, la configuration initialisée ci-dessus et l’enregistrement de statistiques du fichier à la fonction msg_init(). Il renverra un message prêt à être envoyé au posting_engine.
[3]:
# you can supply msg_init with your files, it will build a message appropriate for it.
m = sarracenia.Message.fromFileData(sample_fileName, cfg, os.stat(sample_fileName) )
# here is the resulting message.
print(m)
# feed the message to the posting engine.
posting_engine.putNewMessage(m)
# when done, should close... cleaner...
posting_engine.close()
2022-02-04 14:56:38,509 [DEBUG] amqp collect Closed channel #1
{'exchange': 'xpublic', 'local_offset': 0, '_deleteOnPost': {'exchange', 'local_offset', 'new_relPath', 'new_subtopic', 'new_file', 'new_baseUrl', 'new_dir'}, 'pubTime': '20220204T205638.499809504', 'new_dir': '/tmp', 'new_file': 'sample.txt', 'new_baseUrl': 'http://host', 'new_relPath': 'sample.txt', 'new_subtopic': [], 'relPath': 'sample.txt', 'subtopic': [], 'baseUrl': 'http://host', 'from_cluster': 'localhost', 'size': 335, 'mtime': '20220204T205635.203999996', 'atime': '20220204T195639.0199999809', 'mode': '664', 'identity': {'method': 'sha512', 'value': 'w5ZwUT1IMAjnQT6TLR9NSLzG5RKijhxq46FjMx5UWtsHM/FNOaYNRmGwonIPfnhE5xUORf3z5dRyI6zdL6ygNw=='}}
On peut publier autant de messages que nécessaire avec putNewMessage, lorsque vous avez terminé de publier des fichiers, pour couper proprement la connexion avec le courtier, veuillez fermer le posting_engine.