#
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2021
#
import logging
import sarracenia
from sarracenia.flow import Flow
from sarracenia.featuredetection import features
import sys
logger = logging.getLogger(__name__)
default_options = {
'acceptUnmatched': True,
'blocksize': 1,
'bufsize': 1024 * 1024,
'chmod': 0o400,
'pollUrl': None,
'follow_symlinks': False,
'force_polling': False,
'inflight': None,
'identity_method': 'cod,sha512',
'part_ext': 'Part',
'partflg': '1',
'post_baseDir': None,
'permCopy': True,
'timeCopy': True,
'randomize': False,
'post_on_start': False,
'nodupe_ttl': 7 * 60 * 60,
'fileAgeMax': 30 * 24 * 60 * 60,
}
# 'sumflg': 'cod,md5',
[docs]
class Poll(Flow):
"""
repeatedly query a remote (non-sarracenia) server to list the files there.
post messages (to post_broker) for every new file discovered there.
the sarracenia.flowcb.poll class is used to implement the remote querying,
and is highly customizable to that effect.
if the vip option is set,
* subscribe to the same settings that are being posted to.
* consume all the messages posted, keeping new file duplicate cache updated.
"""
[docs]
def __init__(self, options):
super().__init__(options)
if hasattr(self.o,'post_exchange') and hasattr(self.o,'exchange'):
px = self.o.post_exchange if type(self.o.post_exchange) != list else self.o.post_exchange[0]
if px != self.o.exchange:
logger.warning( f"post_exchange: {px} is different from exchange: {self.o.exchange}. The settings need for multiple instances to share a poll." )
else:
logger.info( f"Good! post_exchange: {px} and exchange: {self.o.exchange} match so multiple instances to share a poll." )
if not 'scheduled' in ','.join(self.plugins['load']):
self.plugins['load'].append('sarracenia.flowcb.scheduled.poll.Poll')
if not 'flowcb.poll.Poll' in ','.join(self.plugins['load']):
logger.info( f"adding poll plugin, because missing from: {self.plugins['load']}" )
self.plugins['load'].append('sarracenia.flowcb.poll.Poll')
if options.vip:
self.plugins['load'].insert( 0, 'sarracenia.flowcb.gather.message.Message')
self.plugins['load'].insert( 0, 'sarracenia.flowcb.post.message.Message')
if self.o.nodupe_ttl < self.o.fileAgeMax:
logger.warning( f"nodupe_ttl < fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904")
if not features['ftppoll']['present']:
if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ):
logger.critical( f"attempting to configure an FTP poll pollUrl={self.o.pollUrl}, but missing python modules: {' '.join(features['ftppoll']['modules_needed'])}" )