Source code for sarracenia.flowcb.accept.speedo

"""
Plugin speedo.py:
    Gives a *speedometer* reading on the messages going through an exchange.
    as this is an after_accept
    Accumulate the number of messages and the bytes they represent over a period of time.

Options:
    msgSpeedoInterval -> how often the speedometer is updated. (default: 5)
    msg_speedo_maxlag  -> if the message flow indicates that messages are 'late', emit warnings. (default 60)

Usage: 
    callback accept.speedo
    msgSpeedoInterval x
    msg_speedo_maxlag y
"""


from sarracenia import timestr2flt, nowflt, naturalSize, naturalTime
import logging
from sarracenia.flowcb import FlowCB
logger = logging.getLogger(__name__)


[docs] class Speedo(FlowCB):
[docs] def __init__(self, options): """ set defaults for options. can be overridden in config file. """ super().__init__(options, logger) self.o.add_option('msg_speedo_maxlag', 'count', 60) #if hasattr(self.o, 'msg_speedo_maxlag'): # if type(self.o.msg_speedo_maxlag) is list: # self.o.msg_speedo_maxlag = int(self.o.msg_speedo_maxlag[0]) #else: # self.o.msg_speedo_maxlag = 60 logger.debug("speedo init: 2 ") self.o.add_option('msgSpeedoInterval', 'count', 5) #if hasattr(self.o, 'msgSpeedoInterval'): # if type(self.o.msgSpeedoInterval) is list: # self.o.msgSpeedoInterval = int(self.o.msgSpeedoInterval[0]) #else: # self.o.msgSpeedoInterval = 5 now = nowflt() self.msg_speedo_last = now self.msg_speedo_msgcount = 0 self.msg_speedo_bytecount = 0
def after_accept(self, worklist): for message in worklist.incoming: msgtime = timestr2flt(message['pubTime']) now = nowflt() self.msg_speedo_msgcount = self.msg_speedo_msgcount + 1 (method, psize, ptot, prem, pno) = message['partstr'].split(',') self.msg_speedo_bytecount += int(psize) #not time to report yet. if self.o.msgSpeedoInterval > now - self.msg_speedo_last: continue lag = now - msgtime msgpersec = self.msg_speedo_msgcount / (now - self.msg_speedo_last) bytespersec = self.msg_speedo_bytecount / (now - self.msg_speedo_last) logger.info("speedo: %3d messages received: %5.4f msg/s, %4.2f bytes/s, lag: %4.0f s" % (self.msg_speedo_msgcount, msgpersec, bytespersec, lag)) # If lag is higher than max allowed, emmit a warning if lag > self.o.msg_speedo_maxlag: logger.warning("speedo: Excessive lag! Messages posted %4.0f s ago" % lag) self.msg_speedo_last = now self.msg_speedo_msgcount = 0 self.msg_speedo_bytecount = 0