Source code for sarracenia.flowcb.scheduled

import logging
import paramiko
import re

import sarracenia
from sarracenia.filemetadata import FmdStat
from sarracenia.flowcb import FlowCB

import datetime
import json
import time


logger = logging.getLogger(__name__)

[docs] class Scheduled(FlowCB): """ Scheduled flow callback plugin arranges to post url's at scheduled times. usage: In the configuration file, need:: callback scheduled and the schedule can be a specified as: * scheduled_interval 1m (once a minute) a duration * scheduled_hour 4,9 at 4Z and 9Z every day. * scheduled_minute 33,45 within scheduled hours which minutes. Scheduled_interval takes precedence over the others, making it easier to specify an interval for testing/debugging purposes. use in code (for subclassing): from sarracenia.scheduled import Scheduled class hoho(Scheduled): replace the gather() routine... keep the top lines "until time to run" replace whatever is below. will only run when it should. """
[docs] def __init__(self,options,class_logger=logger): super().__init__(options,class_logger) # if logLevel is set for a subclass, apply it here too if hasattr(self.o,'logLevel') and logger: logger.setLevel(getattr(logging, self.o.logLevel.upper())) self.o.add_option( 'scheduled_interval', 'duration', 0 ) self.o.add_option( 'scheduled_hour', 'list', [] ) self.o.add_option( 'scheduled_minute', 'list', [] ) self.o.add_option( 'scheduled_time', 'list', [] ) self.housekeeping_needed=False self.sched_times = sum([ x.split(',') for x in self.o.scheduled_time],[]) #self.sched_times.sort() sched_hours = sum([ x.split(',') for x in self.o.scheduled_hour],[]) if sched_hours == [] : sched_hours = list(range(0,24)) self.hours = list(map( lambda x: int(x), sched_hours )) #self.hours.sort() logger.debug( f"hours {self.hours}" ) sched_min = sum([ x.split(',') for x in self.o.scheduled_minute ],[]) if sched_min == [] : sched_min = [0] self.minutes = list(map( lambda x: int(x), sched_min)) #self.minutes.sort() self.default_wait=datetime.timedelta(seconds=300) logger.debug( f'minutes: {self.minutes}') now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) self.update_appointments(now) self.first_interval=True if self.o.scheduled_interval <= 0 and not self.appointments: logger.warning(f"no scheduled_interval or appointments (combination of scheduled_hour and scheduled_minute) set defaulting to every {self.default_wait.seconds} seconds") # Determine the next gather time # For scheduled_interval, gather immediately after starting if self.o.scheduled_interval and self.o.scheduled_interval > 0: self.next_gather_time = now else: self.next_gather_time = None self.calc_next_gather_time() self.last_gather_time = 0
[docs] def update_appointments(self,when): """ # make a flat list from values where comma separated on a single or multiple lines. set self.appointments to a list of when something needs to be run during the current day. """ self.appointments=[] if self.o.scheduled_minute or self.o.scheduled_hour: for h in self.hours: for m in self.minutes: if ( h > when.hour ) or ((h == when.hour) and ( m >= when.minute )): appointment = datetime.time(h, m, tzinfo=datetime.timezone.utc ) next_time = datetime.datetime.combine(when,appointment) self.appointments.append(next_time) else: pass # that time is passed for today. if self.o.scheduled_time: for time in self.sched_times: hour,minute=time.split(':') hour = int(hour) minute = int(minute) if ( hour > when.hour ) or ((hour == when.hour) and ( minute >= when.minute )): appointment = datetime.time(hour, minute, tzinfo=datetime.timezone.utc ) next_time = datetime.datetime.combine(when,appointment) self.appointments.append(next_time) else: pass # that time is passed for today. self.appointments.sort() logger.info( f"for {when}: {self.appointments_to_string()} ")
def appointments_to_string(self): return json.dumps(list(map( lambda x: str(x), self.appointments))) def calc_next_gather_time(self, last_gather=0): if self.next_gather_time in self.appointments: self.appointments.remove(self.next_gather_time) if last_gather == 0: last_gather = datetime.datetime.now(datetime.timezone.utc) # Scheduled interval overrides other options if self.o.scheduled_interval and self.o.scheduled_interval > 0: self.next_gather_time = last_gather + datetime.timedelta(seconds=self.o.scheduled_interval) logger.debug(f"next gather should be in {self.o.scheduled_interval}s, scheduled for {self.next_gather_time}") # No scheduled interval --> try to use configured schedule elif len(self.o.scheduled_hour) > 0 or len(self.o.scheduled_minute) > 0 or len(self.o.scheduled_time) > 0: next_appointment=None missed_appointments=[] for t in self.appointments: if last_gather < t: next_appointment=t break else: logger.info( f'already too late to {t} skipping' ) missed_appointments.append(t) if missed_appointments: for ma in missed_appointments: self.appointments.remove(ma) if next_appointment is None: # done for the day... tomorrow = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)+datetime.timedelta(days=1) midnight = datetime.time(0,0,tzinfo=datetime.timezone.utc) midnight = datetime.datetime.combine(tomorrow,midnight) self.update_appointments(midnight) next_appointment=self.appointments[0] self.next_gather_time = next_appointment logger.debug(f"next gather scheduled for {self.next_gather_time} from appointments {self.appointments_to_string()}") # No scheduled interval and no scheduled hour/minutes/time else: self.next_gather_time = last_gather + self.default_wait logger.debug(f"next gather should be in {self.default_wait.seconds}s, scheduled for {self.next_gather_time} (default_wait") def ready_to_gather(self): current_time = datetime.datetime.now(datetime.timezone.utc ) if current_time >= self.next_gather_time and not self.stop_requested: late = (current_time - self.next_gather_time).total_seconds() logger.info(f"--> yes, now >= {self.next_gather_time} ({late}s late)") # NOTE: could also pass self.next_gather_time to calc_next_gather_time to get more precise intervals # See https://github.com/MetPX/sarracenia/issues/1214#issuecomment-2344711046 for discussion. self.calc_next_gather_time(current_time) self.last_gather_time = current_time return True else: logger.debug(f"--> no, next gather scheduled for {self.next_gather_time}") def gather(self, messageCountMax): if self.ready_to_gather(): # always post the same file at different time gathered_messages = [] for relPath in self.o.path: st = FmdStat() m = sarracenia.Message.fromFileInfo(relPath, self.o, st) gathered_messages.append(m) return (True, gathered_messages) else: logger.debug(f"nothing to do") return (False, []) def on_housekeeping(self): logger.info(f"next gather scheduled for {self.next_gather_time}") n_appointments = len(self.appointments) if n_appointments > 0: logger.info(f"{n_appointments} appointments remaining for today") logger.debug(f"remaining appointments: {self.appointments_to_string()}") self.housekeeping_needed = False
if __name__ == '__main__': import sarracenia.config import types import sarracenia.flow options = sarracenia.config.default_config() flow = sarracenia.flow.Flow(options) flow.o.scheduled_hour= [ '1','3','5',' 7',' 9',' 13','21','23'] flow.o.scheduled_minute= [ '1,3,5',' 7',' 9',' 13',' 15',' 51','53' ] logging.basicConfig(level=logging.DEBUG) when=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) me = Scheduled(flow.o) me.update_appointments(when) flow.o.scheduled_hour= [ '1' ] me = Scheduled(flow.o) me.update_appointments(when) """ for unit testing should be able to change when, and self.o.scheduled_x to cover many different test cases. """ while True: logger.info("hoho!") me.wait_until_next() logger.info("Do Something!")