import logging
from sarracenia import nowflt, timeflt2str, timestr2flt, __version__, naturalSize, naturalTime
from sarracenia.flowcb import FlowCB
logger = logging.getLogger(__name__)
[docs]
class Log(FlowCB):
"""
The logging flow callback class.
logs message at the indicated time. Controlled by:
logEvents - which entry points to emit log messages at.
logMessageDump - print literal messages when printing log messages.
every housekeeping interval, prints:
* how many messages were received, rejected, %accepted.
* number of files transferred, their size, and rate in files/s and bytes/s
* lag: some information about how old the messages are when processed
"""
[docs]
def __init__(self, options):
super().__init__(options,logger)
self.o.add_option('logEvents', 'set',
['after_accept', 'on_housekeeping'])
self.o.add_option('logMessageDump', 'flag', False)
logger.info(f'{self.o.component} initialized with: logEvents: {self.o.logEvents}, logMessageDump: {self.o.logMessageDump}')
if self.o.component in ['sender']:
self.action_verb = 'sent'
elif self.o.component in ['subscribe', 'sarra' ]:
self.action_verb = 'downloaded'
elif self.o.component in ['post', 'poll', 'watch']:
self.action_verb = 'noticed'
elif self.o.component in [ 'flow', 'shovel', 'winnow']:
self.action_verb = self.o.component + 'ed'
else:
self.action_verb = 'done'
self.started = nowflt()
self.rxTopicSeparator='.'
if hasattr(options,'broker') and options.broker and options.broker.url.scheme.startswith('mqtt'):
self.rxTopicSeparator='/'
self.__reset()
def __reset(self):
self.last_housekeeping = nowflt()
self.fileBytes = 0
self.lagTotal = 0
self.lagMax = 0
self.msgCount = 0
self.rejectCount = 0
self.transferCount = 0
def metricsReport(self):
return { 'lagMax': self.lagMax, 'lagTotal':self.lagTotal, 'lagMessageCount':self.msgCount, 'rejectCount':self.rejectCount }
def gather(self, messageCountMax):
if set(['gather']) & self.o.logEvents:
logger.info( f' messageCountMax: {messageCountMax} ')
return (True, [])
def _messageStr(self, msg):
if self.o.logMessageDump:
return msg.dumps()
else:
s = ''
if 'baseUrl' in msg:
s+= msg['baseUrl'] + ' '
if 'relPath' in msg:
s+= msg['relPath']
elif 'retrievePath' in msg:
s+= msg['retrievePath']
else:
s+= 'badMessage'
return s
def _messageAcceptStr(self,msg):
if self.o.logMessageDump:
return msg.dumps()
s = " "
if 'exchange' in msg:
s+= f"exchange: {msg['exchange']} "
if 'subtopic' in msg:
s+= f"subtopic: {self.rxTopicSeparator.join(msg['subtopic'])} "
if 'fileOp' in msg:
op=','.join(msg['fileOp'].keys())
if op in ['link']:
s+= f"a link to {msg['fileOp']['link']} with baseUrl: {msg['baseUrl']} "
elif op in ['rename']:
s+= f"a rename {msg['fileOp']['rename']} with baseUrl: {msg['baseUrl']} "
else:
s+= f"a {op} with baseUrl: {msg['baseUrl']} "
else:
s+= f"a file with baseUrl: {msg['baseUrl']} "
if 'relPath' in msg:
s+= f"relPath: {msg['relPath']} "
if 'retrievePath' in msg:
s+= f"retrievePath: {msg['retrievePath']} "
if 'rename' in msg:
s+= f"rename: {msg['rename']} "
return s
def _messagePostStr(self,msg):
if self.o.logMessageDump:
return msg.dumps()
s = "to "
if 'post_exchange' in msg and ('post_topic' in msg) and \
not msg['post_topic'].startswith(msg['post_exchange']) :
s+= f"exchange: {msg['post_exchange']} "
if 'post_topic' in msg:
s+= f"topic: {msg['post_topic']} "
if 'fileOp' in msg:
op=','.join(msg['fileOp'].keys())
if op in ['link']:
s+= f"a link to {msg['fileOp']['link']} "
elif op in ['rename']:
s+= f"a rename {msg['fileOp']['rename']} "
else:
s+= f"a {op} "
else:
s+= f"a file "
if 'baseUrl' in msg:
s+= f"with baseUrl: {msg['baseUrl']} "
if 'relPath' in msg:
s+= f"relPath: {msg['relPath']} "
if 'retrievePath' in msg:
s+= f"retrievePath: {msg['retrievePath']} "
if 'rename' in msg:
s+= f"rename: {msg['rename']} "
return s
def after_accept(self, worklist):
self.rejectCount += len(worklist.rejected)
self.msgCount += len(worklist.incoming)
now = nowflt()
if set(['reject']) & self.o.logEvents:
for msg in worklist.rejected:
if 'report' in msg:
logger.info(
"%s rejected: %d %s " %
(msg['relPath'], msg['report']['code'], msg['report']['message']))
else:
logger.info("rejected: %s " % self._messageAcceptStr(msg))
for msg in worklist.incoming:
lag = now - timestr2flt(msg['pubTime'])
self.lagTotal += lag
if lag > self.lagMax:
self.lagMax = lag
if set(['after_accept']) & self.o.logEvents:
logger.info( f"accepted: (lag: {lag:.2f} ) {self._messageStr(msg)}" )
def after_post(self, worklist):
if set(['after_post']) & self.o.logEvents:
for msg in worklist.ok:
logger.info("posted %s" % self._messagePostStr(msg))
for msg in worklist.failed:
logger.info("failed to post, queued to retry %s" % self._messagePostStr(msg))
def after_work(self, worklist):
self.rejectCount += len(worklist.rejected)
self.transferCount += len(worklist.ok)
if set(['reject']) & self.o.logEvents:
for msg in worklist.rejected:
if 'report' in msg:
logger.info(
"rejected: %d %s " %
(msg['report']['code'], msg['report']['message']))
else:
logger.info("rejected: %s " % self._messageStr(msg))
for msg in worklist.ok:
if 'size' in msg:
self.fileBytes += msg['size']
if not self.o.download:
continue
if set(['after_work']) & self.o.logEvents:
if 'fileOp' in msg :
if 'link' in msg['fileOp']:
verb = 'linked'
elif 'remove' in msg['fileOp']:
verb = 'removed'
elif 'rename' in msg['fileOp']:
verb = 'renamed'
else:
verb = ','.join(msg['fileOp'].keys())
elif self.action_verb in ['downloaded'] and 'content' in msg:
verb = 'written from message'
else:
verb = self.action_verb
if ('new_dir' in msg) and ('new_file' in msg):
logger.info("%s ok: %s " %
(verb, msg['new_dir'] + '/' + msg['new_file']))
elif 'relPath' in msg:
logger.info("%s ok: relPath: %s " % (verb, msg['relPath'] ))
if self.o.logMessageDump:
logger.info('message: %s' % msg.dumps())
def stats(self):
tot = self.msgCount + self.rejectCount
how_long = nowflt() - self.last_housekeeping
if tot > 0:
apc = 100 * self.msgCount / tot
rate = self.msgCount / how_long
else:
apc = 0
rate = 0
logger.info(
f"version: {__version__}, started: {naturalTime(nowflt()-self.started)}, last_housekeeping: {how_long:4.1f} seconds ago "
)
logger.info(
"messages received: %d, accepted: %d, rejected: %d rate accepted: %3.1f%% or %3.1f m/s"
% (self.msgCount + self.rejectCount, self.msgCount,
self.rejectCount, apc, rate))
logger.info( f"files transferred: {self.transferCount} " +\
f"bytes: {naturalSize(self.fileBytes)} " +\
f"rate: {naturalSize(self.fileBytes/how_long)}/sec" )
if self.msgCount > 0:
logger.info("lag: average: %.2f, maximum: %.2f " %
(self.lagTotal / self.msgCount, self.lagMax))
def on_cleanup(self):
logger.info("hello")
def on_declare(self):
logger.info("hello")
def on_stop(self):
if set(['on_stop']) & self.o.logEvents:
self.stats()
logger.info("stopping")
def on_start(self):
if set(['on_start']) & self.o.logEvents:
self.stats()
logger.info("starting")
def on_housekeeping(self):
if set(['on_housekeeping']) & self.o.logEvents:
self.stats()
logger.debug("housekeeping")
self.__reset()