Source code for sarracenia.flowcb.post.message

# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2020
#

import logging

import sarracenia.moth
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


[docs] class Message(FlowCB): """ post messages to sarracenia.moth message queuing protocol destination. """
[docs] def __init__(self, options): super().__init__(options,logger) if hasattr(self.o, 'post_broker'): props = sarracenia.moth.default_options props.update(self.o.dictify()) # adjust settings post_xxx to be xxx, as Moth does not use post_ ones. for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic' ]: post_one='post_'+k if hasattr( self.o, post_one ): #props.update({ k: getattr(self.o,post_one) } ) props[ k ] = getattr(self.o,post_one) self.poster = sarracenia.moth.Moth.pubFactory(props)
def post(self, worklist): still_ok = [] all_good=True for m in worklist.ok: if all_good and hasattr(self.poster,'putNewMessage') and self.poster.putNewMessage(m): still_ok.append(m) else: all_good=False worklist.failed.append(m) worklist.ok = still_ok def metricsReport(self) -> dict: if hasattr(self,'poster') and self.poster: return self.poster.metricsReport() else: return {} def on_housekeeping(self): if hasattr(self,'poster') and self.poster: m = self.poster.metricsReport() logger.debug( f"messages: good: {m['txGoodCount']} bad: {m['txBadCount']} bytes: {m['txByteCount']}" ) self.poster.metricsReset() else: logger.debug( "no metrics available" ) def on_start(self): if hasattr(self,'poster') and self.poster: self.poster.putSetup() logger.info('starting') def on_stop(self): if hasattr(self,'poster') and self.poster: self.poster.close() logger.info('closing')