Source code for sarracenia.flowcb.filter.fdelay

"""
  This plugin delays processing of messages by *message_delay* seconds

  sarracenia.flowcb.msg.fdelay 30
  import sarracenia.flowcb.filter.fdelay.Fdelay

  or more simply:

  fdelay 30
  callback filter.fdelay

  every message will be at least 30 seconds old before it is forwarded by this plugin.
  in the meantime, the message is placed on the retry queue by marking it as failed.

"""
import logging
import os
import os.path
import stat

from sarracenia.flowcb import FlowCB
from sarracenia import timestr2flt, nowflt

logger = logging.getLogger(__name__)


[docs] class Fdelay(FlowCB):
[docs] def __init__(self, options): super().__init__(options,logger) logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper())) self.o.add_option('msg_fdelay', 'duration', 60) self.o.add_option('fdelay', 'duration', 60) #parent.declare_option('fdelay') if hasattr(self.o, 'msg_fdelay'): self.o.fdelay = self.o.msg_fdelay
def after_accept(self, worklist): # Prepare msg delay test outgoing = [] for m in worklist.incoming: # Test msg delay elapsedtime = nowflt() - timestr2flt(m['pubTime']) if 'fileOp' in m and 'remove' in m['fileOp'] : # 'remove' msg will be removed by itself worklist.rejected.append(m) logger.debug('marked rejected 0 (file removal)') continue # Test msg delay elapsedtime = nowflt() - timestr2flt(m['pubTime']) if elapsedtime < self.o.fdelay: dbg_msg = "message not old enough, sleeping for {:.3f} seconds" logger.debug( dbg_msg.format(elapsedtime, self.o.fdelay - elapsedtime)) worklist.failed.append(m) logger.debug('marked failed 1 (message not old enough)') continue # Prepare file delay test if '/cfr/' in m['new_dir']: f = os.path.join(m['new_dir'], m['new_file']) else: f = '/' + m['relPath'] if not os.path.exists(f): logger.debug("did not find file {}".format(f)) worklist.failed.append(m) logger.debug('marked failed 2 (file not found)') continue # Test file delay filetime = os.path.getmtime(f) elapsedtime = nowflt() - filetime if elapsedtime < self.o.fdelay: dbg_msg = "file not old enough, sleeping for {:.3f} seconds" logger.debug( dbg_msg.format(elapsedtime, self.o.fdelay - elapsedtime)) worklist.failed.append(m) logger.debug('marked failed 3 file not old enough') continue logger.debug('appending to outgoing') outgoing.append(m) worklist.incoming = outgoing