Source code for sarracenia.flowcb.v2wrapper

from base64 import b64decode, b64encode
from codecs import decode, encode

import copy
import logging
from hashlib import md5
from hashlib import sha512
import os
import sarracenia.config
import time
import types
import urllib

import sarracenia
from sarracenia.flowcb import FlowCB

from sarracenia import nowflt, timestr2flt, timev2tov3str

logger = logging.getLogger(__name__)

sum_algo_v3tov2 = {
                "arbitrary": "a",
                "md5": "d",
                "sha512": "s",
                "md5name": "n",
                "random": "0",
                "link": "L",
                "remove": "R",
                "cod": "z"
}

sum_algo_v2tov3 = { v: k for k,v in sum_algo_v3tov2.items() }

[docs] def sumstrFromMessage( msg ) -> str: """ accepts a v3 message as argument msg. returns the corresponding sum string for a v2 'sum' header. """ if 'identity' in msg: if msg['identity']['method'] in sum_algo_v3tov2: sa = sum_algo_v3tov2[msg["identity"]["method"]] else: # FIXME ... 1st md5name case... default when unknown... logger.error('identity method unknown to v2: %s, replacing with md5name' % msg['identity']['method'] ) sa = 'n' sv = md5(bytes(os.path.basename(msg['relPath']),'utf-8')).hexdigest() # transform sum value if sa in ['0', 'a']: sv = msg["identity"]["value"] elif sa in ['z']: sv = sum_algo_v3tov2[msg["identity"]["value"]] else: sv = encode( decode(msg["identity"]["value"].encode('utf-8'), "base64"), 'hex').decode('utf-8') sumstr = sa + ',' + sv else: # FIXME ... 2nd md5name case. sumstr = 'n,%s' % md5(bytes(os.path.basename(msg['relPath']),'utf-8')).hexdigest() if 'fileOp' in msg: if 'rename' in msg['fileOp']: msg['oldname'] = msg['fileOp']['rename'] if 'link' in msg['fileOp']: hash = sha512() hash.update( bytes( msg['fileOp']['link'], encoding='utf-8' ) ) sumstr = 'L,%s' % hash.hexdigest() elif 'remove' in msg['fileOp']: hash = sha512() hash.update(bytes(os.path.basename(msg['relPath']), encoding='utf-8')) sumstr = 'R,%s' % hash.hexdigest() elif 'directory' in msg['fileOp']: hash = sha512() hash.update(bytes(os.path.basename(msg['relPath']), encoding='utf-8')) if 'remove' in msg['fileOp']: sumstr = 'r,%s' % hash.hexdigest() else: sumstr = 'm,%s' % hash.hexdigest() else: logger.error('unknown fileOp: %s' % msg['fileOp'] ) return sumstr
class Message: def __init__(self, h): """ builds the in-memory representation of a message as expected by v2 plugins. In v3, a message is just a dictionary. in v2 it is an object. assign everything, except topic... because the topic is stored outside the body in v02. """ self.pubtime = h['pubTime'].replace("T", "") self.baseurl = h['baseUrl'] self.relpath = h['relPath'] if 'new_dir' in h: self.new_dir = h['new_dir'] self.new_file = h['new_file'] if 'new_relPath' in h: self.new_relpath = h['new_relPath'] self.urlstr = self.baseurl + self.relpath self.url = urllib.parse.urlparse(self.urlstr) self.notice = self.pubtime + ' ' + h["baseUrl"] + ' ' + h[ "relPath"].replace(' ', '%20').replace('#', '%23') #FIXME: ensure headers are < 255 chars. for k in ['mtime', 'atime']: if k in h: h[k] = h[k].replace("T", "") #FIXME: sum header encoding. if 'size' in h: if type(h['size']) is str: h['size'] = int(h['size']) h['parts'] = '1,%d,1,0,0' % h['size'] if 'blocks' in h: if h['blocks']['method'] == 'inplace': m = 'i' else: m = 'p' p = h['blocks'] if 'number' in p: remainder = p['manifest'][p['number']]['size'] number = p['number'] else: number=0 if 'manifest' in p: remainder = p['manifest'][len(p['manifest'])-1]['size'] else: remainder = 0 h['parts'] = '%s,%d,%d,%d,%d' % (m, p['size'], len(p['manifest']), remainder, number) h['topic'] = [ 'v02', 'post' ] + self.relpath.split('/')[0:-1] if 'parts' in h: self.partstr = h['parts'] #else: # self.partstr = None self.sumstr = sumstrFromMessage( h ) self.sumflg = self.sumstr[0] h['sum'] = self.sumstr if 'fileOp' in h and 'rename' in h['fileOp'] : h['oldname'] = h['fileOp']['rename'] self.headers = h self.hdrstr = str(h) self.isRetry = False # from sr_message/sr_new ... self.local_offset = 0 self.in_partfile = False self.local_checksum = None self.target_file = None # does not cover partitioned files. def set_hdrstr(self): logger.info("set_hdrstr not implemented") pass def get_elapse(self): return nowflt() - timestr2flt(self.pubtime) def set_parts(self): logger.info("set_parts not implemented") pass
[docs] class V2Wrapper(FlowCB):
[docs] def __init__(self, o): """ A wrapper class to run v02 plugins. us run_entry(entry_point,module) entry_point is a string like 'on_message', and module being the one to add. weird v2 stuff: when calling init, self is a config/subscriber... when calling on_message, self is a message... that is kind of blown away for each message... parent is the config/subscriber in both cases. so v2 state variables are always stored in parent. """ global logger logging.basicConfig(format=o.logFormat, level=getattr(logging, o.logLevel.upper())) logger.setLevel(getattr(logging, o.logLevel.upper())) #logger.info('logging: fmt=%s, level=%s' % ( o.logFormat, o.logLevel ) ) # FIXME, insert parent fields for v2 plugins to use here. self.logger = logger #logger.info('v2wrapper init start') self.state_vars = [] if o.statehost: hostdir = o.hostdir else: hostdir = None self.user_cache_dir = sarracenia.config.get_user_cache_dir(hostdir) if hasattr(o, 'no'): self.instance = o.no else: self.instance = 0 self.o = o self.v2plugins = {} self.consumer = types.SimpleNamespace() self.consumer.sleep_min = 0.01 for ep in sarracenia.config.Config.v2entry_points: self.v2plugins[ep] = [] unsupported_v2_events = ['do_download', 'do_get', 'do_put', 'do_send'] for e in o.v2plugins: #logger.info('resolving: %s' % e) for v in o.v2plugins[e]: if e in unsupported_v2_events: logger.error( 'v2 plugin conversion required, %s too different in v3' % e) continue self.add(e, v) #propagate options back to self.o for on_timing calls. #for v2o in self.o.v2plugin_options: # setattr( self.o, v2o, getattr(self,v2o ) ) # backward compat... self.o.user_cache_dir = self.o.cfg_run_dir self.o.instance = self.instance self.o.logger = self.logger if hasattr(self.o, 'post_baseDir'): self.o.post_base_dir = self.o.post_baseDir
#logger.info('v2wrapper init done') def declare_option(self, option): logger.info('v2plugin option: %s declared' % option) self.state_vars.append(option) self.o.add_option(option) if not hasattr(self.o, option): logger.info('value of %s not set' % option) return if type(getattr(self.o, option)) is not list: setattr(self.o, option, [getattr(self.o, option)]) def add(self, opname, path): setattr(self, opname, None) if path == 'None' or path == 'none' or path == 'off': logger.info("Reset plugin %s to None" % opname) exec('self.' + opname + '_list = [ ]') return True ok, script = sarracenia.config.config_path('plugins', path, mandatory=True, ctype='py') if not ok: logger.error("installing %s %s failed: not found " % (opname, path)) return False #logger.info('installing: %s %s' % ( opname, path ) ) c1 = set(vars(self)) try: with open(script) as f: exec( compile(f.read().replace('self.plugin', 'self.v2plugin'), script, 'exec')) except: logger.error( "sr_config/execfile 2 failed for option '%s' and plugin '%s'" % (opname, path)) logger.debug('Exception details: ', exc_info=True) return False if opname == 'plugin': if getattr(self, 'v2plugin') is None: logger.error("%s plugin %s incorrect: does not set self.%s" % ('v2plugin', path, 'v2plugin')) return False # pci plugin-class-instance... parent is self (a v2wrapper) pci = self.v2plugin.lower() s = pci + ' = ' + self.v2plugin + '(self)' exec(pci + ' = ' + self.v2plugin + '(self)') s = 'vars(' + self.v2plugin + ')' pcv = eval('vars(' + self.v2plugin + ')') for when in sarracenia.config.Config.v2entry_points: if when in pcv: #logger.info("v2 registering %s from %s" % ( when, path ) ) # 2020/05/22. I think the commented exec can be removed. #FIXME: this breaks things horrible in v3. I do not see the usefulness even in v2. # everything is done with the lists, so value of setting individual value is nil. # self.on_start... vs. # self.v2plugins['on_start'].append( thing. ) #exec( 'self.' + when + '=' + pci + '.' + when ) eval('self.v2plugins["' + when + '"].append(' + pci + '.' + when + ')') else: if getattr(self, opname) is None: logger.error("%s plugin %s incorrect: does not set self.%s" % (opname, path, opname)) return False #eval( 'self.' + opname + '_list.append(self.' + opname + ')' ) eval('self.v2plugins["' + opname + '"].append( self.' + opname + ')') c2 = set(vars(self)) c12diff = list(c2 - c1) #logger.error('init added: +%s+ to %s' % (c12diff, self.state_vars) ) if len(c12diff) > 0: self.state_vars.extend(c12diff) for opt in self.state_vars: if hasattr(self, opt): setattr(self.o, opt, getattr(self, opt)) return True def after_work(self, worklist): ok_to_post = [] for m in worklist.ok: if self.run_entry('on_file', m): ok_to_post.append(m) else: #worklist.failed.append(m) pass # FIXME: what should we do on failure of on_file plugin? # download worked, but on_file failed... hmm... worklist.ok = ok_to_post outgoing = [] for m in worklist.ok: if self.run_entry('on_post', m): outgoing.append(m) else: worklist.rejected.append(m) # set incoming for future steps. worklist.ok = outgoing def after_accept(self, worklist): outgoing = [] for m in worklist.incoming: try: if self.run_entry('on_message', m): outgoing.append(m) else: worklist.rejected.append(m) except Exception as Ex: logger.error( f"plugin {m} died: {Ex}" ); logger.debug( 'details: ', exc_info=True) worklist.rejected.append(m) # set incoming for future steps. worklist.incoming = outgoing
[docs] def on_time(self, time): """ run plugins for a given entry point. """ logger.info('v2 run %s' % time) for plugin in self.v2plugins[time]: plugin(self.o)
def on_housekeeping(self): self.on_time('on_heartbeat') def on_start(self): self.on_time('on_start') def on_stop(self): self.on_time('on_stop') def restoreMsg(self, m, v2msg): if 'topic' in m: if m['topic'][0:2] == ['v02', 'post' ]: m['topic'] = self.o.post_topicPrefix + m['topic'][2:] if ('link' in v2msg.headers): if not 'fileOp' in m: m['fileOp'] = {} if m['fileOp']['link'] != v2msg.headers['link']: m['fileOp']['link'] = v2msg.headers['link'] for h in ['oldname', 'newname' ]: if (h in v2msg.headers) and ((h not in m) or (v2msg.headers[h] != m[h])): m[h] = v2msg.headers[h] if v2msg.new_dir != m['new_dir']: m['new_dir'] = v2msg.new_dir relpath = m['new_dir'] + '/' + v2msg.new_file if self.o.post_baseDir: relpath = relpath.replace(self.o.post_baseDir, '', 1) m['new_relPath'] = relpath if v2msg.baseurl != m['baseUrl']: m['baseUrl'] = v2msg.baseurl if hasattr(v2msg, 'new_file'): if ('new_file' not in m) or (m['new_file'] != v2msg.new_file): m['new_file'] = v2msg.new_file if hasattr( v2msg, 'post_base_dir') and (v2msg.post_base_dir != m['new_baseDir']): m['post_baseDir'] = v2msg.post_base_dir
[docs] def run_entry(self, ep, m): """ run plugins for a given entry point. """ self.msg = Message(m) self.msg.topic = '.'.join(self.o.topicPrefix + m['subtopic']) self.o.msg = self.msg if hasattr(self.msg, 'partstr'): self.o.partstr = self.msg.partstr self.o.sumstr = self.msg.sumstr varsb4 = set(vars(self.msg)) for opt in self.state_vars: if hasattr(self.o, opt): setattr(self.msg, opt, getattr(self.o, opt)) ok = True for plugin in self.v2plugins[ep]: ok = plugin(self.o) if not ok: break vars_after = set(vars(self.msg)) self.restoreMsg(m, self.msg) diff = list(vars_after - varsb4) if len(diff) > 0: self.state_vars.extend(diff) return ok