Source code for sarracenia.flowcb.accept.save

"""
Plugin save.py:
    Converts a consuming component into one that writes the queue into a file. 
    If there is some sort of problem with a component, then add callback save and restart.

    The messages will accumulate in a save file in ~/.cache/<component>/<config>/ ??<instance>.save
    When the situation is returned to normal (you want the component to process the data as normal):
    * remove the callback save
    * note the queue this configuration is using to read (should be in the log on startup.)
    * run an sr_shovel with -restore_to_queue and the queue name.

Options:
    save.py takes 'msgSaveFile' as an argument.
    "_9999.sav" will be added to msgSaveFile, where the 9999 represents the instance number.
    As instances run in parallel, rather than sychronizing access, just writes to one file per instance.

Usage:
    callback accept.save
    msgSaveFile x
"""

import jsonpickle
import logging
from sarracenia.flowcb import FlowCB

logger = logging.getLogger('__name__')


[docs] class Save(FlowCB):
[docs] def __init__(self, options): super().__init__(options, logger) self.o.add_option('msgSaveFile', 'str') if not self.o.msgSaveFile: logger.error("msg_save: setting msgSaveFile setting is mandatory") return self.msgSaveFile = open(self.o.msgSaveFile, "a") logger.debug("msg_save initialized")
def after_accept(self, worklist): for message in worklist.incoming: self.msgSaveFile.write(jsonpickle.encode(message) + '\n') self.msgSaveFile.flush() logger.info("msg_save saving msg with topic:%s (aborting further processing)" % message['topic'])