# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#
# more info: https://github.com/MetPX/sarracenia
#
# Code originally contributed by:
# Michel Grenier - Shared Services Canada
# first shot : Wed Jan 10 16:06:16 UTC 2018
# re-factored beyond recognition by PSilva 2021. Don't blame Michel
#
from _codecs import decode, encode
import jsonpickle, os, os.path, sarracenia, sys, time
import logging
# class sarra/retry
logger = logging.getLogger(__name__)
[docs]
class DiskQueue():
"""
Process Persistent Queue...
Persist messages to a file so that processing can be attempted again later.
For safety reasons, want to be writing to a file ASAP.
For performance reasons, all those writes need to be Appends.
so continuous, but append-only io... with an occasional housekeeping cycle.
to resolve them
not clear if we need multi-task safety... just one task writes to each queue.
retry_ttl how long
self.retry_cache
* a dictionary indexed by some sort of key to prevent duplicate messages being stored in it.
retry_path = ~/.cache/sr3/<component>/<config>/diskqueue_<name>
with various suffixes:
.new -- messages added to the retry list are appended to this file.
whenever a message is added to the retry_cache, it is appended to a
cumulative list of entries to add to the retry list.
every housekeeping interval, the two files are consolidated.
note that the *ack_id* of messages retreived from the retry list, is
removed. Files must be acked around the time they are placed on the
retry_list, as reception from the source should have already been acknowledged.
FIXME: would be fun to look at performance of this thing and compare it to
python persistent queue. the differences:
This class does no locking (presumed single threading.)
could add locks... and they would be coarser grained than stuff in persistentqueue
this should be faster than persistent queue, but who knows what magic they did.
This class doesn't implement in-memory queue... it is entirely on disk...
saves memory, optimal for large queues.
probably good, since retries should be slow...
not sure what will run better.
"""
[docs]
def __init__(self, options, name):
logger.debug(" %s __init__" % name)
self.o = options
self.name = name
if not hasattr(self.o, 'retry_ttl'):
self.o.retry_ttl = None
#logging.basicConfig(format=self.o.logFormat,
# level=getattr(logging, self.o.logLevel.upper()))
logger.setLevel(getattr(logging, self.o.logLevel.upper()))
logger.debug('name=%s logLevel=%s' % (self.name, self.o.logLevel))
# initialize all retry path if retry_path is provided
self.working_dir = os.path.dirname(self.o.pid_filename)
if not os.path.isdir(self.working_dir):
os.makedirs(self.working_dir)
self.queue_file = self.working_dir + os.sep + 'diskqueue_' + name
self.now = sarracenia.nowflt()
# retry messages
self.queue_fp = None
# newer retries
self.new_path = self.queue_file + '.new'
self.new_fp = None
# working file at housekeeping
self.housekeeping_path = self.queue_file + '.hk'
self.housekeeping_fp = None
# initialize ages and message counts
self.msg_count = 0
self.msg_count_new = 0
if not os.path.isfile(self.queue_file):
return
retry_age = os.path.getmtime(self.queue_file)
self.msg_count = self._count_msgs(self.queue_file)
if os.path.isfile(self.new_path):
new_age = os.path.getmtime(self.new_path)
if retry_age > new_age:
os.unlink(self.new_path)
else:
self.msg_count_new = self._count_msgs(self.new_path)
[docs]
def put(self, message_list):
"""
add messages to the end of the queue.
"""
if self.new_fp is None:
self.new_fp = open(self.new_path, 'a')
for message in message_list:
logger.debug("DEBUG add to new file %s %s" %
(os.path.basename(self.new_path), message))
self.new_fp.write(self.msgToJSON(message))
self.msg_count_new += 1
self.new_fp.flush()
[docs]
def cleanup(self):
"""
remove statefiles.
"""
if os.path.exists(self.queue_file):
os.unlink(self.queue_file)
self.msg_count = 0
[docs]
def close(self):
"""
clean shutdown.
"""
try:
self.housekeeping_fp.close()
except:
pass
try:
os.fsync(self.new_fp)
self.new_fp.close()
except:
pass
try:
self.queue_fp.close()
except:
pass
self.housekeeping_fp = None
self.new_fp = None
self.queue_fp = None
self.msg_count = 0
self.msg_count_new = 0
[docs]
def _count_msgs(self, file_path) -> int:
"""Count the number of messages (lines) in the queue file. This should be used only when opening an existing
file, because :func:`~sarracenia.diskqueue.DiskQueue.get` does not remove messages from the file.
Args:
file_path (str): path to the file to be counted.
Returns:
int: count of messages in file, -1 if the file could not be read.
"""
count = -1
if os.path.isfile(file_path):
count = 0
with open(file_path, mode='r') as f:
for line in f:
if "{" in line:
count +=1
return count
[docs]
def __len__(self) -> int:
"""Returns the total number of messages in the DiskQueue.
Number of messages in the DiskQueue does not necessarily equal the number of messages available to ``get``.
Messages in the .new file are counted, but can't be retrieved until
:func:`~sarracenia.diskqueue.DiskQueue.on_housekeeping` has been run.
Returns:
int: number of messages in the DiskQueue.
"""
return self.msg_count + self.msg_count_new
def msgFromJSON(self, line):
try:
msg = jsonpickle.decode(line)
except ValueError:
logger.error("corrupted line in retry file: %s " % line)
logger.debug("Error information: ", exc_info=True)
return None
return msg
def msgToJSON(self, message):
return jsonpickle.encode(message) + '\n'
[docs]
def get(self, maximum_messages_to_get=1):
"""
qty number of messages to retrieve from the queue.
"""
ml = []
count = 0
while count < maximum_messages_to_get:
self.queue_fp, message = self.msg_get_from_file(
self.queue_fp, self.queue_file)
# FIXME MG as discussed with Peter
# no housekeeping in get ...
# if no message (and new or state file there)
# we wait for housekeeping to present retry messages
if not message:
try:
os.unlink(self.queue_file)
except:
pass
self.queue_fp = None
#logger.debug("MG DEBUG retry get return None")
break
if self.is_expired(message):
#logger.error("MG invalid %s" % message)
continue
if 'ack_id' in message:
del message['ack_id']
message['_deleteOnPost'].remove('ack_id')
ml.append(message)
count += 1
self.msg_count -= count
return ml
[docs]
def in_cache(self, message) -> bool:
"""
return whether the entry is message is in the cache or not.
side effect: adds it.
"""
urlstr = message['baseUrl'] + '/' + message['relPath']
if 'noDupe' in message:
sumstr = jsonpickle.encode(message['noDupe']['key'])
elif 'fileOp' in message:
sumstr = jsonpickle.encode(message['fileOp'])
elif 'identity' in message:
sumstr = jsonpickle.encode(message['identity'])
elif 'pubTime' in message:
sumstr = jsonpickle.encode(message['pubTime'])
else:
logger.info('no key found for message, cannot add')
return False
cache_key = urlstr + ' ' + sumstr
if 'parts' in message:
cache_key += ' ' + message['parts']
if cache_key in self.retry_cache: return True
self.retry_cache[cache_key] = True
return False
[docs]
def is_expired(self, message) -> bool:
"""
return is the given message expired ?
"""
# no expiry
if self.o.retry_ttl is None: return False
if self.o.retry_ttl <= 0: return False
# compute message age
msg_time = sarracenia.timestr2flt(message['pubTime'])
msg_age = self.now - msg_time
# expired ?
return msg_age > self.o.retry_ttl
[docs]
def needs_requeuing(self, message) -> bool:
"""
return
* True if message is not expired, and not already in queue.
* False otherwise.
"""
if self.in_cache(message):
logger.info("discarding duplicate message (in %s cache) %s" %
(self.name, message))
return False
# log is info... it is good to log a retry message that expires
if self.is_expired(message):
logger.info("discarding expired message in (%s): %s" %
(self.name, message))
return False
return True
[docs]
def msg_get_from_file(self, fp, path):
"""
read a message from the state file.
"""
if fp is None:
if not os.path.isfile(path): return None, None
logger.debug("DEBUG %s open read" % path)
fp = open(path, 'r')
line = fp.readline()
if not line:
try:
fp.close()
except:
pass
return None, None
msg = self.msgFromJSON(line)
# a corrupted line : go to the next
if msg is None: return self.msg_get_from_file(fp, path)
return fp, msg
[docs]
def on_housekeeping(self):
"""
read rest of queue_file (from current point of unretried ones.)
- check if message is duplicate or expired.
- write to .hk
read .new file,
- check if message is duplicate or expired.
- writing to .hk (housekeeping)
remove .new
rename housekeeping to queue for next period.
"""
logger.debug("%s on_housekeeping" % self.name)
# finish retry before reshuffling all retries entries
if os.path.isfile(self.queue_file) and self.queue_fp != None:
logger.info(
"have not finished retry list. Resuming retries with %s" %
self.queue_file)
return
self.now = sarracenia.nowflt()
self.retry_cache = {}
N = 0
# put this in try/except in case ctrl-c breaks something
try:
self.close()
try:
os.unlink(self.housekeeping_path)
except:
pass
fp = open(self.housekeeping_path, 'w')
fp.close()
i = 0
last = None
fp = self.queue_fp
self.housekeeping_fp = open(self.housekeeping_path, 'a')
logger.debug("has queue %s" %
os.path.isfile(self.queue_file))
# remaining of retry to housekeeping
while True:
fp, message = self.msg_get_from_file(fp, self.queue_file)
if not message: break
i = i + 1
if not self.needs_requeuing(message): continue
self.housekeeping_fp.write(self.msgToJSON(message))
N = N + 1
try:
fp.close()
except:
pass
i = 0
j = N
fp = None
# append new to housekeeping.
while True:
fp, message = self.msg_get_from_file(fp, self.new_path)
if not message: break
i = i + 1
logger.debug("DEBUG message %s" % message)
if not self.needs_requeuing(message): continue
#logger.debug("MG DEBUG flush retry to state %s" % message)
self.housekeeping_fp.write(self.msgToJSON(message))
N = N + 1
try:
fp.close()
except:
pass
logger.debug("retrieved %d from the %d retry" %
(N - j, i))
self.housekeeping_fp.close()
except Exception as Err:
logger.error("something went wrong")
logger.debug('Exception details: ', exc_info=True)
# no more retry
self.msg_count = N
if N == 0:
logger.debug("%s No retry in list" % self.name)
try:
os.unlink(self.housekeeping_path)
except:
pass
# housekeeping file becomes new retry
else:
logger.info("%s Number of messages in retry list %d" % (self.name, N))
try:
os.rename(self.housekeeping_path, self.queue_file)
except:
logger.error("Something went wrong with rename")
# cleanup
self.msg_count_new = 0
try:
os.unlink(self.new_path)
except:
pass
elapse = sarracenia.nowflt() - self.now
logger.debug("on_housekeeping elapse %f" % elapse)