# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#
# more info: https://github.com/MetPX/sarracenia
#
import os, json, sys, time
from _codecs import decode, encode
from sarracenia import nowflt, timestr2flt
import logging
from sarracenia.flowcb import FlowCB
from sarracenia.featuredetection import features
# class sarra/retry
logger = logging.getLogger(__name__)
[docs]
class Retry(FlowCB):
"""
overall goal:
* When file transfers fail, write the messages to a queue to be retried later.
There is also a second retry queue for failed posts.
how it works:
* the after_accept checks how many incoming messages we received.
If there is a full batch to process, don't try to retry any.
* if there is room, then fill in the batch with some retry requests.
* when after_work is called, the worklist.failed list of messages
is the files where the transfer failed. write those messages to
a retry queue.
* the DiskQueue or RedisQueue classes are used to store the retries, and it handles
expiry on each housekeeping event.
"""
[docs]
def __init__(self, options) -> None:
logger.debug("sr_retry __init__")
super().__init__(options,logger)
if not features['retry']['present'] :
logger.critical( f"missing retry pre-requsites, module disabled")
return
self.o.add_option( 'retry_driver', 'str', 'disk')
# retry_refilter False -- rety to send with existing processing.
# retry_refilter True -- re-ingest and re-apply processing (if it has changed.)
self.o.add_option( 'retry_refilter', 'flag', False)
#queuedriver = os.getenv('SR3_QUEUEDRIVER', 'disk')
logger.debug('logLevel=%s' % self.o.logLevel)
[docs]
def gather(self, qty) -> None:
"""
If there are only a few new messages, get some from the download retry queue and put them into
`worklist.incoming`.
Do this in the gather() entry point if retry_refilter is True.
"""
if not features['retry']['present'] or not self.o.retry_refilter:
return (True, [])
if qty <= 0: return (True, [])
message_list = self.download_retry.get(qty)
# eliminate calculated values so it is refiltered from scratch.
for m in message_list:
for k in m:
if k in m['_deleteOnPost'] or k.startswith('new_'):
del m[k]
m['_isRetry'] = True
m['_deleteOnPost'] = set( [ '_isRetry' ] )
return (True, message_list)
[docs]
def after_accept(self, worklist) -> None:
"""
If there are only a few new messages, get some from the download retry queue and put them into
`worklist.incoming`.
Do this in the after_accept() entry point if retry_refilter is False.
"""
if not features['retry']['present'] or self.o.retry_refilter:
return
qty = (self.o.batch / 2) - len(worklist.incoming)
#logger.info('qty: %d len(worklist.incoming) %d' % ( qty, len(worklist.incoming) ) )
if qty <= 0: return
mlist = self.download_retry.get(qty)
#logger.debug("loading from %s: qty=%d ... got: %d " % (self.download_retry_name, qty, len(mlist)))
if len(mlist) > 0:
worklist.incoming.extend(mlist)
[docs]
def after_work(self, worklist) -> None:
"""
Messages in `worklist.failed` should be put in the download retry queue. If there are only a few new
messages, get some from the post retry queue and put them into `worklist.ok`.
"""
if not features['retry']['present'] :
return
if len(worklist.failed) != 0:
#logger.debug("putting %d messages into %s" % (len(worklist.failed),self.download_retry_name) )
self.download_retry.put(worklist.failed)
worklist.failed = []
# retry posting...
qty = (self.o.batch / 2) - len(worklist.ok)
if qty <= 0: return
mlist = self.post_retry.get(qty)
#logger.debug("loading from %s: qty=%d ... got: %d " % (self.post_retry_name, qty, len(mlist)))
if len(mlist) > 0:
worklist.ok.extend(mlist)
[docs]
def after_post(self, worklist) -> None:
"""
Messages in `worklist.failed` should be put in the post retry queue.
"""
if not features['retry']['present'] :
return
self.post_retry.put(worklist.failed)
worklist.failed=[]
[docs]
def metricsReport(self) -> dict:
"""Returns the number of messages in the download_retry and post_retry queues.
Returns:
dict: containing metrics: ``{'msgs_in_download_retry': (int), 'msgs_in_post_retry': (int)}``
"""
return {'msgs_in_download_retry': len(self.download_retry), 'msgs_in_post_retry': len(self.post_retry)}
def on_cleanup(self) -> None:
logger.debug('starting retry cleanup')
if not hasattr(self,'download_retry'):
self.on_start()
self.download_retry.cleanup()
self.post_retry.cleanup()
def on_housekeeping(self) -> None:
logger.debug("on_housekeeping")
self.download_retry.on_housekeeping()
self.post_retry.on_housekeeping()
def on_start(self) -> None:
if self.o.retry_driver == 'redis':
from sarracenia.redisqueue import RedisQueue
self.download_retry = RedisQueue(self.o, 'work_retry')
self.post_retry = RedisQueue(self.o, 'post_retry')
else:
from sarracenia.diskqueue import DiskQueue
self.download_retry_name = 'work_retry_%02d' % self.o.no
self.download_retry = DiskQueue(self.o, self.download_retry_name)
self.post_retry_name = 'post_retry_%03d' % self.o.no
self.post_retry = DiskQueue(self.o, self.post_retry_name)
def on_stop(self) -> None:
self.download_retry.close()
self.post_retry.close()