import logging
import paramiko
import re
import sarracenia
from sarracenia.filemetadata import FmdStat
from sarracenia.flowcb import FlowCB
import datetime
import json
import time
logger = logging.getLogger(__name__)
[docs]
class Scheduled(FlowCB):
"""
Scheduled flow callback plugin arranges to post url's
at scheduled times.
usage:
In the configuration file, need::
callback scheduled
and the schedule can be a specified as:
* scheduled_interval 1m (once a minute) a duration
* scheduled_hour 4,9 at 4Z and 9Z every day.
* scheduled_minute 33,45 within scheduled hours which minutes.
Scheduled_interval takes precedence over the others, making it
easier to specify an interval for testing/debugging purposes.
use in code (for subclassing):
from sarracenia.scheduled import Scheduled
class hoho(Scheduled):
replace the gather() routine...
keep the top lines "until time to run"
replace whatever is below.
will only run when it should.
"""
[docs]
def update_appointments(self,when):
"""
# make a flat list from values where comma separated on a single or multiple lines.
set self.appointments to a list of when something needs to be run during the current day.
"""
self.appointments=[]
for h in self.hours:
for m in self.minutes:
if ( h > when.hour ) or ((h == when.hour) and ( m >= when.minute )):
appointment = datetime.time(h, m, tzinfo=datetime.timezone.utc )
next_time = datetime.datetime.combine(when,appointment)
self.appointments.append(next_time)
else:
pass # that time is passed for today.
logger.info( f"for {when}: {json.dumps(list(map( lambda x: str(x), self.appointments))) } ")
[docs]
def __init__(self,options,logger=logger):
super().__init__(options,logger)
self.o.add_option( 'scheduled_interval', 'duration', 0 )
self.o.add_option( 'scheduled_hour', 'list', [] )
self.o.add_option( 'scheduled_minute', 'list', [] )
self.housekeeping_needed=False
self.interrupted=None
sched_hours = sum([ x.split(',') for x in self.o.scheduled_hour],[])
self.hours = list(map( lambda x: int(x), sched_hours ))
self.hours.sort()
logger.debug( f"hours {self.hours}" )
sched_min = sum([ x.split(',') for x in self.o.scheduled_minute ],[])
self.minutes = list(map( lambda x: int(x), sched_min))
self.minutes.sort()
self.default_wait=300
logger.debug( f'minutes: {self.minutes}')
now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
self.update_appointments(now)
self.first_interval=True
if self.o.scheduled_interval <= 0 and not self.appointments:
logger.info( f"no scheduled_interval or appointments (combination of scheduled_hour and scheduled_minute) set defaulting to every {self.default_wait} seconds" )
def gather(self,messageCountMax):
# for next expected post
self.wait_until_next()
if self.stop_requested or self.housekeeping_needed:
return (False, [])
logger.info('time to run')
# always post the same file at different time
gathered_messages = []
for relPath in self.o.path:
st = FmdStat()
m = sarracenia.Message.fromFileInfo(relPath, self.o, st)
gathered_messages.append(m)
return (True, gathered_messages)
def on_housekeeping(self):
self.housekeeping_needed = False
[docs]
def wait_seconds(self,sleepfor):
"""
sleep for the given number of seconds, like time.sleep() but broken into
shorter naps to be able to honour stop_requested, or when housekeeping is needed.
"""
housekeeping=datetime.timedelta(seconds=self.o.housekeeping)
nap=datetime.timedelta(seconds=10)
if self.interrupted:
sleepfor = self.interrupted
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
# update sleep remaining based on how long other processing took.
interruption_duration= now-self.interrupted_when
sleepfor -= interruption_duration
if sleepfor < nap:
nap=sleepfor
sleptfor=datetime.timedelta(seconds=0)
while sleepfor > datetime.timedelta(seconds=0):
time.sleep(nap.total_seconds())
if self.stop_requested:
return
# how long is left to sleep.
sleepfor -= nap
self.interrupted=sleepfor
self.interrupted_when = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
sleptfor += nap
if sleptfor > housekeeping:
self.housekeeping_needed=True
return
# got to the end of the interval...
self.interrupted=None
def wait_until( self, appointment ):
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
sleepfor=appointment-now
logger.info( f"appointment at: {appointment}, need to wait: {sleepfor})" )
self.wait_seconds( sleepfor )
def wait_until_next( self ):
if self.o.scheduled_interval > 0:
if self.first_interval:
self.first_interval=False
return
self.wait_seconds(datetime.timedelta(seconds=self.o.scheduled_interval))
return
if ( len(self.o.scheduled_hour) > 0 ) or ( len(self.o.scheduled_minute) > 0 ):
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
next_appointment=None
missed_appointments=[]
for t in self.appointments:
if now < t:
next_appointment=t
break
else:
logger.info( f'already too late to {t} skipping' )
missed_appointments.append(t)
if missed_appointments:
for ma in missed_appointments:
self.appointments.remove(ma)
if next_appointment is None:
# done for the day...
tomorrow = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)+datetime.timedelta(days=1)
midnight = datetime.time(0,0,tzinfo=datetime.timezone.utc)
midnight = datetime.datetime.combine(tomorrow,midnight)
self.update_appointments(midnight)
next_appointment=self.appointments[0]
self.wait_until(next_appointment)
if self.interrupted:
logger.info( f"sleep interrupted, returning for housekeeping." )
else:
self.appointments.remove(next_appointment)
logger.info( f"ok {len(self.appointments)} appointments left today" )
return
# default wait...
if self.first_interval:
self.first_interval=False
return
self.wait_seconds(self.default_wait)
if __name__ == '__main__':
import sarracenia.config
import types
import sarracenia.flow
options = sarracenia.config.default_config()
flow = sarracenia.flow.Flow(options)
flow.o.scheduled_hour= [ '1','3','5',' 7',' 9',' 13','21','23']
flow.o.scheduled_minute= [ '1,3,5',' 7',' 9',' 13',' 15',' 51','53' ]
logging.basicConfig(level=logging.DEBUG)
when=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
me = Scheduled(flow.o)
me.update_appointments(when)
flow.o.scheduled_hour= [ '1' ]
me = Scheduled(flow.o)
me.update_appointments(when)
"""
for unit testing should be able to change when, and self.o.scheduled_x to cover
many different test cases.
"""
while True:
logger.info("hoho!")
me.wait_until_next()
logger.info("Do Something!")