import copy
import glob
import importlib
import logging
import os
import re
# v3 plugin architecture...
import sarracenia.flowcb
import sarracenia.identity
import sarracenia.transfer
import stat
import time
import types
import urllib.parse
import sarracenia
import sarracenia.filemetadata
# for v2 subscriber routines...
import json, os, sys, time
from sys import platform as _platform
from base64 import b64decode, b64encode
from mimetypes import guess_type
# end v2 subscriber
from sarracenia.featuredetection import features
if features['reassembly']['present']:
import sarracenia.blockmanifest
from sarracenia import nowflt
logger = logging.getLogger(__name__)
allFileEvents = set(['create', 'delete', 'link', 'mkdir', 'modify','rmdir'])
default_options = {
'accelThreshold': 0,
'acceptUnmatched': True,
'byteRateMax': None,
'discard': False,
'download': False,
'fileEvents': allFileEvents,
'housekeeping': 300,
'logReject': False,
'logFormat':
'%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s',
'logLevel': 'info',
'mirror': True,
'permCopy': True,
'timeCopy': True,
'messageCountMax': 0,
'messageRateMax': 0,
'messageRateMin': 0,
'sleep': 0.1,
'topicPrefix': ['v03'],
'topicCopy': False,
'vip': []
}
if features['filetypes']['present']:
import magic
if features['vip']['present']:
import netifaces
[docs]
class Flow:
"""
Implement the General Algorithm from the Concepts Guide.
All of the component types (e.g. poll, subscribe, sarra, winnow, shovel ) are implemented
as sub-classes of Flow. The constructor/factory accept a configuration
(sarracenia.config.Config class) with all the settings in it.
This class takes care of starting up, running with callbacks, and clean shutdown.
need to know whether to sleep between passes
o.sleep - an interval (floating point number of seconds)
o.housekeeping -
A flow processes worklists of messages
worklist given to callbacks...
* worklist.incoming --> new messages to continue processing
* worklist.ok --> successfully processed
* worklist.rejected --> messages to not be further processed.
* worklist.failed --> messages for which processing failed.
* worklist.dirrectories_ok --> directories created.
Initially all messages are placed in incoming.
if a callback decides:
- a message is not relevant, it is moved to rejected.
- all processing has been done, it moves it to ok.
- an operation failed and it should be retried later, move to retry
callbacks must not remove messages from all worklists, re-classify them.
it is necessary to put rejected messages in the appropriate worklist
so they can be acknowledged as received.
interesting data structure:
self.plugins -- dict of modular functionality metadata.
* self.plugins[ "load" ] contains a list of (v3) flow_callbacks to load.
* self.plugins[ entry_point ] - one for each invocation times of callbacks. examples:
"on_start", "after_accept", etc... contains routines to run at each *entry_point*
"""
@staticmethod
def factory(cfg):
if cfg.flowMain:
flowMain=cfg.flowMain
else:
flowMain=cfg.component
for sc in Flow.__subclasses__():
if flowMain == sc.__name__.lower():
return sc(cfg)
if cfg.component == 'flow':
return Flow(cfg)
return None
[docs]
def __init__(self, cfg=None):
"""
The cfg is should be an sarra/config object.
"""
self._stop_requested = False
me = 'flow'
logging.basicConfig(
format=
'%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s',
level=logging.DEBUG)
self.o = cfg
if 'sarracenia.flow.Flow' in self.o.settings and 'logLevel' in self.o.settings['sarracenia.flow.Flow']:
logger.setLevel(
getattr(
logging,
self.o.settings['sarracenia.flow.Flow']['logLevel'].upper()))
else:
logger.setLevel(getattr(logging, self.o.logLevel.upper()))
if not hasattr(self.o, 'post_topicPrefix'):
self.o.post_topicPrefix = self.o.topicPrefix
logging.basicConfig(format=self.o.logFormat,
level=getattr(logging, self.o.logLevel.upper()))
self.plugins = {}
for entry_point in sarracenia.flowcb.entry_points:
self.plugins[entry_point] = []
# FIXME: open new worklist
self.worklist = types.SimpleNamespace()
self.worklist.ok = []
self.worklist.incoming = []
self.worklist.rejected = []
self.worklist.failed = []
self.worklist.directories_ok = []
# for poll only, mark if we are catching up on posted messages
#
self.worklist.poll_catching_up = False
# Witness the creation of this list
self.plugins['load'] = self.o.plugins_early + [
'sarracenia.flowcb.retry.Retry',
'sarracenia.flowcb.housekeeping.resources.Resources'
]
# open cache, get masks.
if self.o.nodupe_ttl > 0:
if self.o.nodupe_driver.lower() == "redis":
self.plugins['load'].append('sarracenia.flowcb.nodupe.redis.Redis')
else:
self.plugins['load'].append('sarracenia.flowcb.nodupe.disk.Disk')
if (( hasattr(self.o, 'delete_source') and self.o.delete_source ) or \
( hasattr(self.o, 'delete_destination') and self.o.delete_destination )) and \
('sarracenia.flowcb.work.delete.Delete' not in self.o.plugins_late):
self.o.plugins_late.append('sarracenia.flowcb.work.delete.Delete')
# transport stuff.. for download, get, put, etc...
self.scheme = None
self.cdir = None
self.proto = {}
# initialize plugins.
if hasattr(self.o, 'v2plugins') and len(self.o.v2plugins) > 0:
self.plugins['load'].append(
'sarracenia.flowcb.v2wrapper.V2Wrapper')
self.plugins['load'].extend(self.o.plugins_late)
self.plugins['load'].extend(self.o.destfn_scripts)
# metrics - dictionary with names of plugins as the keys
self.metricsFlowReset()
self.had_vip = not os.path.exists( self.o.novipFilename )
def metricsFlowReset(self) -> None:
self.new_metrics = { 'flow': { 'stop_requested': False, 'last_housekeeping': 0,
'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0,
'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } }
# carry over some metrics... that don't reset.
if hasattr(self,'metrics'):
if 'transferRxLast' in self.metrics:
self.new_metrics['transferRxLast'] = self.metrics['transferRxLast']
if 'transferTxLast' in self.metrics:
self.new_metrics['transferTxLast'] = self.metrics['transferTxLast']
self.metrics=self.new_metrics
def loadCallbacks(self, plugins_to_load=None):
if not plugins_to_load:
plugins_to_load=self.plugins['load']
for m in self.o.imports:
try:
importlib.import_module(m)
except Exception as ex:
logger.critical( f"python module import {m} load failed: {ex}" )
logger.debug( "details:", exc_info=True )
return False
logger.info( f'flowCallback plugins to load: {plugins_to_load}' )
for c in plugins_to_load:
try:
plugin = sarracenia.flowcb.load_library(c, self.o)
except Exception as ex:
logger.critical( f"flowCallback plugin {c} did not load: {ex}" )
logger.debug( "details:", exc_info=True )
return False
#logger.debug( f'flowCallback plugin loading: {c} an instance of: {plugin}' )
for entry_point in sarracenia.flowcb.entry_points:
if hasattr(plugin, entry_point):
fn = getattr(plugin, entry_point)
if callable(fn):
#logger.debug( f'registering {c}/{entry_point}' )
if entry_point in self.plugins:
self.plugins[entry_point].append(fn)
else:
self.plugins[entry_point] = [fn]
if not (hasattr(plugin, 'registered_as')
and callable(getattr(plugin, 'registered_as'))):
continue
logger.debug('complete')
self.o.check_undeclared_options()
return True
def _runCallbacksWorklist(self, entry_point):
if hasattr(self, 'plugins') and (entry_point in self.plugins):
for p in self.plugins[entry_point]:
if self.o.logLevel.lower() == 'debug' :
p(self.worklist)
else:
try:
p(self.worklist)
except Exception as ex:
logger.error( f'flowCallback plugin {p}/{entry_point} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
def runCallbacksTime(self, entry_point):
for p in self.plugins[entry_point]:
if self.o.logLevel.lower() == 'debug' :
p()
else:
try:
p()
except Exception as ex:
logger.error( f'flowCallback plugin {p}/{entry_point} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
[docs]
def _runCallbackMetrics(self):
"""Collect metrics from plugins with a ``metricsReport`` entry point.
Expects the plugin to return a dictionary containing metrics, which is saved to ``self.metrics[plugin_name]``.
"""
if 'transferConnected' in self.metrics['flow'] and self.metrics['flow']['transferConnected']:
now=nowflt()
self.metrics['flow']['transferConnectTime'] += now - self.metrics['flow']['transferConnectStart']
self.metrics['flow']['transferConnectStart']=now
modules=self.plugins["metricsReport"]
if hasattr(self,'proto'): # gets re-spawned every batch, so not a permanent thing...
for scheme in self.proto:
if hasattr(self.proto[scheme], 'metricsReport'):
fn = getattr(self.proto[scheme], 'metricsReport')
if callable(fn):
modules.append( fn )
for p in modules:
if self.o.logLevel.lower() == 'debug' :
module_name = str(p.__module__).replace('sarracenia.flowcb.', '' )
self.metrics[module_name] = p()
else:
try:
module_name = str(p.__module__).replace('sarracenia.flowcb.', '' )
self.metrics[module_name] = p()
except Exception as ex:
logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' )
logger.debug( "details:", exc_info=True )
[docs]
def _runHousekeeping(self, now) -> float:
""" Run housekeeping callbacks
Return the time when housekeeping should be run next
"""
logger.info(f'on_housekeeping pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}')
self.runCallbacksTime('on_housekeeping')
self.metricsFlowReset()
self.metrics['flow']['last_housekeeping'] = now
next_housekeeping = now + self.o.housekeeping
self.metrics['flow']['next_housekeeping'] = next_housekeeping
return next_housekeeping
[docs]
def has_vip(self) -> list:
"""
return list of vips which are active on the current machine, or an empty list.
"""
if not features['vip']['present']: return True
# no vip given... standalone always has vip.
if self.o.vip == []:
return [ 'AnyAddressIsFine' ]
try:
for i in netifaces.interfaces():
for a in netifaces.ifaddresses(i):
j = 0
while (j < len(netifaces.ifaddresses(i)[a])):
k=netifaces.ifaddresses(i)[a][j].get('addr')
if k in self.o.vip:
return k
j += 1
except Exception as ex:
logger.error(
f'error while looking for interfaces to compare with vip {self.o.vip}: {ex}' )
logger.debug('Exception details: ', exc_info=True)
return []
[docs]
def reject(self, m, code, reason) -> None:
"""
reject a message.
"""
self.worklist.rejected.append(m)
m.setReport(code, reason)
def please_stop(self) -> None:
logger.info(
f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.'
)
self.runCallbacksTime('please_stop')
self._stop_requested = True
self.metrics["flow"]['stop_requested'] = True
def close(self) -> None:
self.runCallbacksTime('on_stop')
if os.path.exists( self.o.novipFilename ):
os.unlink( self.o.novipFilename )
logger.info(
f'flow/close completed cleanly pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}'
)
def ack(self, mlist) -> None:
if "ack" in self.plugins:
for p in self.plugins["ack"]:
if self.o.logLevel.lower() == 'debug' :
p(mlist)
else:
try:
p(mlist)
except Exception as ex:
logger.error( f'flowCallback plugin {p}/ack crashed: {ex}' )
logger.debug( "details:", exc_info=True )
def _run_vip_update(self) -> bool:
self.have_vip = self.has_vip()
if (self.o.component == 'poll') and not self.have_vip:
if self.had_vip:
logger.info("now passive on vips %s" % self.o.vip )
with open( self.o.novipFilename, 'w' ) as f:
f.write(str(nowflt()) + '\n' )
self.had_vip=False
else:
if not self.had_vip:
logger.info("now active on vip %s" % self.have_vip )
self.had_vip=True
if os.path.exists( self.o.novipFilename ):
os.unlink( self.o.novipFilename )
[docs]
def run(self):
"""
This is the core routine of the algorithm, with most important data driven
loop in it. This implements the General Algorithm (as described in the Concepts Explanation Guide)
check if stop_requested once in a while, but never return otherwise.
"""
if not self.loadCallbacks(self.plugins['load']):
return
logger.debug( f"working directory: {os.getpid()}" )
next_housekeeping = nowflt() + self.o.housekeeping
current_rate = 0
total_messages = 1
start_time = nowflt()
now=start_time
current_sleep = self.o.sleep
last_time = start_time
self.metrics['flow']['last_housekeeping'] = start_time
if self.o.logLevel == 'debug':
logger.debug("options:")
self.o.dump()
logger.info("callbacks loaded: %s" % self.plugins['load'])
logger.info(
f'pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}'
)
spamming = True
last_gather_len = 0
stopping = False
while True:
if self._stop_requested:
if stopping:
logger.info('clean stop from run loop')
self.close()
break
else:
logger.info( 'starting last pass (without gather) through loop for cleanup.')
stopping = True
self._run_vip_update()
if now > next_housekeeping or stopping:
next_housekeeping = self._runHousekeeping(now)
elif now == start_time:
self.runCallbacksTime(f'on_start')
self.worklist.incoming = []
if (self.o.component == 'poll') or self.have_vip:
if ( self.o.messageRateMax > 0 ) and (current_rate > 0.8*self.o.messageRateMax ):
logger.info("current_rate (%.2f) vs. messageRateMax(%.2f)) " % (current_rate, self.o.messageRateMax))
if not stopping:
self.gather()
last_gather_len = len(self.worklist.incoming)
if (last_gather_len == 0):
spamming = True
else:
current_sleep = self.o.sleep
spamming = False
self.filter()
# this for duplicate cache synchronization.
if self.worklist.poll_catching_up:
self.ack(self.worklist.incoming)
self.worklist.incoming = []
else: # normal processing, when you are active.
self.work()
self.post()
now = nowflt()
run_time = now - start_time
total_messages += last_gather_len
if (self.o.messageCountMax > 0) and (total_messages > self.o.messageCountMax):
self.please_stop()
current_rate = total_messages / run_time
elapsed = now - last_time
self.metrics['flow']['msgRate'] = current_rate
if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
and self.metrics['retry']['msgs_in_post_retry'] > 0):
logger.info("Not exiting because there are still messages in the post retry queue.")
# Sleep for a while. Messages can't be retried before housekeeping has run...
current_sleep = 60
else:
self.please_stop()
if spamming and (current_sleep < 5):
current_sleep *= 2
self.metrics['flow']['current_sleep'] = current_sleep
# Run housekeeping based on time, and before stopping to ensure it's run at least once
if now > next_housekeeping or stopping:
next_housekeeping = self._runHousekeeping(now)
if (self.o.messageRateMin > 0) and (current_rate <
self.o.messageRateMin):
logger.warning("receiving below minimum message rate")
if (self.o.messageRateMax > 0) and (current_rate >=
self.o.messageRateMax):
stime = 1 + 2 * ((current_rate - self.o.messageRateMax) /
self.o.messageRateMax)
logger.info(
"current_rate/2 (%.2f) above messageRateMax(%.2f): throttling"
% (current_rate, self.o.messageRateMax))
else:
logger.debug( f" not throttling: limit: {self.o.messageRateMax} " )
stime = 0
if (current_sleep > 0):
if elapsed < current_sleep:
stime += current_sleep - elapsed
if stime > 60: # if sleeping for a long time, debug output is good...
logger.debug(
"sleeping for more than 60 seconds: %.2f seconds. Elapsed since wakeup: %.2f Sleep setting: %.2f "
% (stime, elapsed, self.o.sleep))
else:
logger.debug('worked too long to sleep!')
last_time = now
continue
if not self._stop_requested and (stime > 0):
# dividing into small sleeps so exit processing happens faster
# bug #595, still relatively low cpu usage in increment sized chunks.
if 5 < stime:
increment=5
else:
increment=stime
while (stime > 0):
logger.debug( f"sleeping for {increment:.2f}" )
time.sleep(increment)
if self._stop_requested:
break
else:
stime -= 5
# Run housekeeping during long sleeps
now_for_hk = nowflt()
if now_for_hk > next_housekeeping:
next_housekeeping = self._runHousekeeping(now_for_hk)
last_time = now
[docs]
def sundew_getDestInfos(self, msg, currentFileOption, filename):
"""
modified from sundew client
WHATFN -- First part (':') of filename
HEADFN -- Use first 2 fields of filename
NONE -- Use the entire filename
TIME or TIME: -- TIME stamp appended
DESTFN=fname -- Change the filename to fname
ex: mask[2] = 'NONE:TIME'
"""
if currentFileOption is None or currentFileOption == 'None': return filename
timeSuffix = ''
satnet = ''
parts = filename.split(':')
firstPart = parts[0]
if 'sundew_extension' in msg.keys():
parts = [parts[0]] + msg['sundew_extension'].split(':')
filename = ':'.join(parts)
destFileName = filename
for spec in currentFileOption.split(':'):
if spec == 'WHATFN':
destFileName = firstPart
elif spec == 'HEADFN':
headParts = firstPart.split('_')
if len(headParts) >= 2:
destFileName = headParts[0] + '_' + headParts[1]
else:
destFileName = headParts[0]
elif spec == 'SENDER' and 'SENDER=' in filename:
i = filename.find('SENDER=')
if i >= 0: destFileName = filename[i + 7:].split(':')[0]
if destFileName[-1] == ':': destFileName = destFileName[:-1]
elif spec == 'NONE':
if 'SENDER=' in filename:
i = filename.find('SENDER=')
destFileName = filename[:i]
else:
if len(parts) >= 6:
# PX default behavior : keep 6 first fields
destFileName = ':'.join(parts[:6])
# PDS default behavior keep 5 first fields
if len(parts[4]) != 1:
destFileName = ':'.join(parts[:5])
# extra trailing : removed if present
if destFileName[-1] == ':': destFileName = destFileName[:-1]
elif spec == 'NONESENDER':
if 'SENDER=' in filename:
i = filename.find('SENDER=')
j = filename.find(':', i)
destFileName = filename[:i + j]
else:
if len(parts) >= 6:
# PX default behavior : keep 6 first fields
destFileName = ':'.join(parts[:6])
# PDS default behavior keep 5 first fields
if len(parts[4]) != 1:
destFileName = ':'.join(parts[:5])
# extra trailing : removed if present
if destFileName[-1] == ':': destFileName = destFileName[:-1]
elif re.compile('SATNET=.*').match(spec):
satnet = ':' + spec
elif re.compile('DESTFN=.*').match(spec):
destFileName = spec[7:]
elif re.compile('DESTFNSCRIPT=.*').match(spec):
scriptclass = spec[13:].split('.')[-1]
for dfm in self.plugins['destfn']:
classname = dfm.__qualname__.split('.')[0]
if (scriptclass == classname) or (scriptclass.capitalize() == classname):
destFileName = dfm(msg)
elif spec == 'TIME':
timeSuffix = ':' + time.strftime("%Y%m%d%H%M%S", time.gmtime())
if 'pubTime' in msg:
timeSuffix = ":" + msg['pubTime'].split('.')[0]
if 'pubTime' in msg:
timeSuffix = ":" + msg['pubTime'].split('.')[0]
timeSuffix = timeSuffix.replace('T', '')
# check for PX or PDS behavior ...
# if file already had a time extension keep his...
if len(parts[-1]) == 14 and parts[-1][0] == '2':
timeSuffix = ':' + parts[-1]
else:
logger.error("Don't understand this DESTFN parameter: %s" %
spec)
return filename
return destFileName + satnet + timeSuffix
# ==============================================
# how will the download file land on this server
# with all options, this is really tricky
# ==============================================
"""
to test changes to updateFieldsAccepted, run: make test_shim in the SarraC package...
because it tickles a lot of these settings, in addition to the flow_tests before
trying to PR changes here.
"""
[docs]
def updateFieldsAccepted(self, msg, urlstr, pattern, maskDir,
maskFileOption, mirror, path_strip_count, pstrip, flatten) -> None:
"""
Set new message fields according to values when the message is accepted.
* urlstr: the urlstr being matched (baseUrl+relPath+sundew_extension)
* pattern: the regex that was matched.
* maskDir: the current directory to base the relPath from.
* maskFileOption: filename option value (sundew compatibility options.)
* strip: number of path entries to strip from the left side of the path.
* pstrip: pattern strip regexp to apply instead of a count.
* flatten: a character to replace path separators with toe change a multi-directory
deep file name into a single long file name
"""
# relative path by default mirror
relPath = '%s' % msg['relPath']
if self.o.baseUrl_relPath:
u = sarracenia.baseUrlParse(msg['baseUrl'])
relPath = u.path[1:] + '/' + relPath
if self.o.download and 'rename' in msg:
# FIXME... why the % ? why not just assign it to copy the value?
relPath = '%s' % msg['rename']
# after download we dont propagate renaming... once used, get rid of it
del msg['rename']
# FIXME: worry about publishing after a rename.
# the relpath should be replaced by rename value for downstream
# because the file was written to rename.
# not sure if this happens or not.
token = relPath.split('/')
filename = token[-1]
# resolve a current base directory to which the relative path will eventually be added.
# update fileOp fields to replace baseDir.
#if self.o.currentDir : new_dir = self.o.currentDir
new_dir=''
if maskDir:
new_dir = self.o.variableExpansion(maskDir, msg)
else:
if self.o.post_baseDir:
new_dir = self.o.variableExpansion(self.o.post_baseDir, msg)
d=None
if self.o.baseDir:
if new_dir:
d = new_dir
elif self.o.post_baseDir:
d = self.o.variableExpansion(self.o.post_baseDir, msg)
# to get locally resolvable links and renames, need to mangle the pathnames.
# to get something to restore for downstream consumers, need to put the original
# names back.
if 'fileOp' in msg:
msg['post_fileOp'] = copy.deepcopy(msg['fileOp'])
msg['_deleteOnPost'] |= set( [ 'post_fileOp' ] )
# if provided, strip (integer) ... strip N heading directories
# or pstrip (pattern str) strip regexp pattern from relPath
# cannot have both (see setting of option strip in sr_config)
if path_strip_count > 0:
logger.warning( f"path_strip_count:{path_strip_count} ")
strip=path_strip_count
if strip < len(token):
token = token[strip:]
if 'fileOp' in msg:
"""
files are written with cwd being the directory containing the file written.
when stripping the root of the tree off, the path must be rendered relative to the
directory containing the file: the values are modified to create relative paths.
"""
for f in ['link', 'hlink', 'rename']:
if f in msg['fileOp']:
fopv = msg['fileOp'][f].split('/')
# an absolute path file posted is relative to '/' (in relPath) but the values in
# the link and rename fields may be absolute, requiring and adjustmeent when stripping
if fopv[0] == '':
strip += 1
elif len(fopv) == 1:
toclimb=len(token)-1
msg['fileOp'][f] = '../'*(toclimb) + fopv[0]
if len(fopv) > strip:
rest=fopv[strip:]
toclimb=len(token)-rest.count('..')-1
msg['fileOp'][f] = '../'*(toclimb)+'/'.join(rest)
# strip using a pattern
elif pstrip:
#MG FIXME Peter's wish to have replacement in pstrip (ex.:${SOURCE}...)
relstrip = re.sub(pstrip, '', relPath, 1)
if not filename in relstrip: relstrip = filename
token = relstrip.split('/')
if 'fileOp' in msg:
for f in ['link', 'hlink', 'rename']:
if f in msg['fileOp']:
msg['fileOp'][f] = re.sub(pstrip, '', msg['fileOp'][f] )
# if flatten... we flatten relative path
# strip taken into account
if flatten != '/':
filename = flatten.join(token)
token[-1] = filename
if 'fileOp' in msg:
for f in ['link', 'hlink', 'rename']:
if f in msg['fileOp']:
msg['fileOp'][f] = flatten.join(msg['fileOp'][f].split('/'))
if self.o.baseDir:
# remove baseDir from relPath if present.
token_baseDir = self.o.baseDir.split('/')[1:]
remcnt=0
if len(token) > len(token_baseDir):
for i in range(0,len(token_baseDir)):
if token_baseDir[i] == token[i]:
remcnt+=1
else:
break
if remcnt == len(token_baseDir):
token=token[remcnt:]
if d:
if 'fileOp' in msg and len(self.o.baseDir) > 1:
for f in ['link', 'hlink', 'rename']:
if (f in msg['fileOp']) :
if msg['fileOp'][f].startswith(self.o.baseDir):
msg['fileOp'][f] = msg['fileOp'][f].replace(self.o.baseDir, d, 1)
elif os.sep not in msg['fileOp'][f]:
toclimb=len(token)-1
msg['fileOp'][f] = '../'*(toclimb) + msg['fileOp'][f]
elif 'fileOp' in msg and new_dir:
u = sarracenia.baseUrlParse(msg['baseUrl'])
for f in ['link', 'hlink', 'rename']:
if (f in msg['fileOp']):
if (len(u.path) > 1):
if msg['fileOp'][f].startswith(u.path):
msg['fileOp'][f] = msg['fileOp'][f].replace(u.path, new_dir, 1)
elif '/' not in msg['fileOp'][f]:
toclimb=len(token)-1
msg['fileOp'][f] = '../'*(toclimb) + msg['fileOp'][f]
if self.o.mirror and len(token) > 1:
new_dir = new_dir + '/' + '/'.join(token[:-1])
new_dir = self.o.variableExpansion(new_dir, msg)
# resolution of sundew's dirPattern
tfname = filename
# when sr_sender did not derived from sr_subscribe it was always called
new_dir = self.o.sundew_dirPattern(pattern, urlstr, tfname, new_dir)
msg.updatePaths(self.o, new_dir, filename)
if maskFileOption:
msg['new_file'] = self.sundew_getDestInfos(msg, maskFileOption, filename)
msg['new_relPath'] = '/'.join( msg['new_relPath'].split('/')[0:-1] + [ msg['new_file'] ] )
def filter(self) -> None:
logger.debug(
'start len(incoming)=%d, rejected=%d' %
(len(self.worklist.incoming), len(self.worklist.rejected)))
filtered_worklist = []
if hasattr(self.o, 'directory'):
default_accept_directory = self.o.directory
elif hasattr(self.o, 'post_baseDir'):
default_accept_directory = self.o.post_baseDir
elif hasattr(self.o, 'baseDir'):
default_accept_directory = self.o.baseDir
now = nowflt()
for m in self.worklist.incoming:
then = sarracenia.timestr2flt(m['pubTime'])
lag = now - then
if self.o.messageAgeMax != 0 and lag > self.o.messageAgeMax:
self.reject(
m, 504,
"Excessive lag: %g sec. Skipping download of: %s, " %
(lag, m['new_file']))
continue
if 'fileOp' in m and 'rename' in m['fileOp']:
url = self.o.variableExpansion(m['baseUrl'],
m) + os.sep + m['fileOp']['rename']
if 'sundew_extension' in m and url.count(":") < 1:
urlToMatch = url + ':' + m['sundew_extension']
else:
urlToMatch = url
oldname_matched = False
for mask in self.o.masks:
pattern, maskDir, maskFileOption, mask_regexp, accepting, mirror, strip, pstrip, flatten = mask
if (pattern == '.*'):
oldname_matched = accepting
break
matches = mask_regexp.match(urlToMatch)
if matches:
m[ '_matches'] = matches
m['_deleteOnPost'] |= set(['_matches'])
oldname_matched = accepting
break
url = self.o.variableExpansion(m['baseUrl'], m)
if (m['baseUrl'][-1] == '/') or (len(m['relPath']) > 0 and (m['relPath'][0] == '/')):
if (m['baseUrl'][-1] == '/') and (len(m['relPath'])>0) and (m['relPath'][0] == '/'):
url += m['relPath'][1:]
else:
url += m['relPath']
else:
url += '/' + m['relPath']
if 'sundew_extension' in m and url.count(":") < 1:
urlToMatch = url + ':' + m['sundew_extension']
else:
urlToMatch = url
logger.debug( f" urlToMatch: {urlToMatch} " )
# apply masks for accept/reject options.
matched = False
for mask in self.o.masks:
pattern, maskDir, maskFileOption, mask_regexp, accepting, mirror, strip, pstrip, flatten = mask
if (pattern != '.*') :
matches = mask_regexp.match(urlToMatch)
if matches:
m[ '_matches'] = matches
m['_deleteOnPost'] |= set(['_matches'])
if (pattern == '.*') or matches:
matched = True
if not accepting:
if 'fileOp' in m and 'rename' in m['fileOp'] and oldname_matched:
# deletion rename case... need to accept with an extra field...
if not 'renameUnlink' in m:
m['renameUnlink'] = True
m['_deleteOnPost'] |= set(['renameUnlink'])
logger.debug("rename deletion 1 %s" %
(m['fileOp']['rename']))
else:
self.reject(
m, 304, "mask=%s strip=%s url=%s" %
(str(mask), strip, urlToMatch))
break
self.updateFieldsAccepted(m, url, pattern, maskDir,
maskFileOption, mirror, strip,
pstrip, flatten)
filtered_worklist.append(m)
break
if not matched:
if 'fileOp' in m and ('rename' in m['fileOp']) and oldname_matched:
if not 'renameUnlink' in m:
m['renameUnlink'] = True
m['_deleteOnPost'] |= set(['renameUnlink'])
logger.debug("rename deletion 2 %s" % (m['fileOp']['rename']))
filtered_worklist.append(m)
self.updateFieldsAccepted(m, url, None,
default_accept_directory,
self.o.filename, self.o.mirror,
self.o.strip, self.o.pstrip,
self.o.flatten)
continue
if self.o.acceptUnmatched:
logger.debug("accept: unmatched pattern=%s" % (url))
# FIXME... missing dir mapping with mirror, strip, etc...
self.updateFieldsAccepted(m, url, None,
default_accept_directory,
self.o.filename, self.o.mirror,
self.o.strip, self.o.pstrip,
self.o.flatten)
filtered_worklist.append(m)
else:
self.reject(m, 304, "unmatched pattern %s" % url)
self.worklist.incoming = filtered_worklist
logger.debug( 'end len(incoming)=%d, rejected=%d' % (len(self.worklist.incoming), len(self.worklist.rejected)))
self._runCallbacksWorklist('after_accept')
logger.debug( 'B filtered incoming: %d, ok: %d (directories: %d), rejected: %d, failed: %d stop_requested: %s have_vip: %s'
% (len(self.worklist.incoming), len(self.worklist.ok), len(self.worklist.directories_ok),
len(self.worklist.rejected), len(self.worklist.failed), self._stop_requested, self.have_vip))
self.ack(self.worklist.ok)
self.worklist.ok = []
self.ack(self.worklist.rejected)
self.worklist.rejected = []
self.ack(self.worklist.failed)
def gather(self) -> None:
so_far=0
keep_going=True
for p in self.plugins["gather"]:
try:
retval = p(self.o.batch-so_far)
# To avoid having to modify all existing gathers, support old API.
if type(retval) == tuple:
keep_going, new_incoming = retval
elif type(retval) == list:
new_incoming = retval
else:
logger.error( f"flowCallback plugin gather routine {p} returned unexpected type: {type(retval)}. Expected tuple of boolean and list of new messages" )
except Exception as ex:
logger.error( f'flowCallback plugin {p} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
continue
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)
so_far += len(new_incoming)
# if we gathered enough with a subset of plugins then return.
if not keep_going or (so_far >= self.o.batch):
if (self.o.component == 'poll' ):
self.worklist.poll_catching_up=True
return
# gather is an extended version of poll.
if self.o.component != 'poll':
return
if len(self.worklist.incoming) > 0:
logger.info('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) )
self.worklist.poll_catching_up = True
return
else:
self.worklist.poll_catching_up = False
if self.have_vip:
for plugin in self.plugins['poll']:
new_incoming = plugin()
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)
def do(self) -> None:
if self.o.download:
self.do_download()
else:
# mark all remaining messages as done.
self.worklist.ok = self.worklist.incoming
self.worklist.incoming = []
logger.debug('processing %d messages worked!' % len(self.worklist.ok))
def work(self) -> None:
self.do()
# need to acknowledge here, because posting will delete message-id
self.ack(self.worklist.ok)
self.ack(self.worklist.rejected)
self.ack(self.worklist.failed)
# adjust message after action is done, but before 'after_work' so adjustment is possible.
for m in self.worklist.ok:
if ('new_baseUrl' in m) and (m['baseUrl'] !=
m['new_baseUrl']):
m['old_baseUrl'] = m['baseUrl']
m['_deleteOnPost'] |= set(['old_baseUrl'])
m['baseUrl'] = m['new_baseUrl']
if ('new_retrievePath' in m) :
m['old_retrievePath'] = m['retrievePath']
m['retrievePath'] = m['new_retrievePath']
m['_deleteOnPost'] |= set(['old_retrievePath'])
# if new_file does not match relPath, then adjust relPath so it does.
if 'relPath' in m and m['new_file'] != m['relPath'].split('/')[-1]:
if not 'new_relPath' in m:
if len(m['relPath']) > 1:
m['new_relPath'] = '/'.join( m['relPath'].split('/')[0:-1] + [ m['new_file'] ])
else:
m['new_relPath'] = m['new_file']
else:
if len(m['new_relPath']) > 1:
m['new_relPath'] = '/'.join( m['new_relPath'].split('/')[0:-1] + [ m['new_file'] ] )
else:
m['new_relPath'] = m['new_file']
if ('new_relPath' in m) and (m['relPath'] != m['new_relPath']):
m['old_relPath'] = m['relPath']
m['_deleteOnPost'] |= set(['old_relPath'])
m['relPath'] = m['new_relPath']
m['old_subtopic'] = m['subtopic']
m['_deleteOnPost'] |= set(['old_subtopic','subtopic'])
m['subtopic'] = m['new_subtopic']
if '_format' in m:
m['old_format'] = m['_format']
m['_deleteOnPost'] |= set(['old_format'])
if 'post_format' in m:
m['_format'] = m['post_format']
# restore adjustment to fileOp
if 'post_fileOp' in m:
m['fileOp'] = m['post_fileOp']
if self.o.download and 'retrievePath' in m:
# retrieve paths do not propagate after download.
del m['retrievePath']
self._runCallbacksWorklist('after_work')
self.ack(self.worklist.rejected)
self.worklist.rejected = []
self.ack(self.worklist.failed)
def post(self) -> None:
if len(self.plugins["post"]) > 0:
# work-around for python3.5 not being able to copy re.match issue:
# https://github.com/MetPX/sarracenia/issues/857
if sys.version_info.major == 3 and sys.version_info.minor <= 6:
for m in self.worklist.ok:
if '_matches' in m:
del m['_matches']
for p in self.plugins["post"]:
try:
p(self.worklist)
except Exception as ex:
logger.error( f'flowCallback plugin {p} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
self._runCallbacksWorklist('after_post')
self._runCallbacksWorklist('report')
self._runCallbackMetrics()
if hasattr(self.o, 'metricsFilename' ) and os.path.isdir(os.path.dirname(self.o.metricsFilename)):
metrics=json.dumps(self.metrics)
with open(self.o.metricsFilename, 'w') as mfn:
mfn.write(metrics+"\n")
if self.o.logMetrics:
if self.o.logRotateInterval >= 24*60*60:
tslen=8
elif self.o.logRotateInterval > 60:
tslen=14
else:
tslen=16
timestamp=time.strftime("%Y%m%d-%H%M%S", time.gmtime())
with open(self.o.metricsFilename + '.' + timestamp[0:tslen], 'a') as mfn:
mfn.write( f'\"{timestamp}\" : {metrics},\n')
# removing old metrics files
logger.info( f"looking for old metrics for {self.o.metricsFilename}" )
old_metrics=sorted(glob.glob(self.o.metricsFilename+'.*'))[0:-self.o.logRotateCount]
for o in old_metrics:
logger.info( f"removing old metrics file: {o} " )
os.unlink(o)
self.worklist.ok = []
self.worklist.directories_ok = []
self.worklist.failed = []
[docs]
def write_inline_file(self, msg) -> bool:
"""
write local file based on a message with inlined content.
"""
# make sure directory exists, create it if not
if not os.path.isdir(msg['new_dir']):
try:
self.worklist.directories_ok.append(msg['new_dir'])
os.makedirs(msg['new_dir'], 0o775, True)
except Exception as ex:
logger.error("failed to make directory %s: %s" %
(msg['new_dir'], ex))
return False
logger.debug("data inlined with message, no need to download")
path = msg['new_dir'] + os.path.sep + msg['new_file']
#path = msg['new_relPath']
try:
f = os.fdopen(os.open(path, os.O_RDWR | os.O_CREAT), 'rb+')
except Exception as ex:
logger.warning("could not open %s to write: %s" % (path, ex))
return False
if msg['content']['encoding'] == 'base64':
data = b64decode(msg['content']['value'])
else:
data = msg['content']['value'].encode(msg['content']['encoding'])
if self.o.identity_method.startswith('cod,'):
algo_method = self.o.identity_method[4:]
elif msg['identity']['method'] == 'cod':
algo_method = msg['identity']['value']
else:
algo_method = msg['identity']['method']
onfly_algo = sarracenia.identity.Identity.factory(algo_method)
data_algo = sarracenia.identity.Identity.factory(algo_method)
onfly_algo.set_path(path)
data_algo.set_path(path)
if algo_method == 'arbitrary':
onfly_algo.value = msg['identity']['value']
data_algo.value = msg['identity']['value']
onfly_algo.update(data)
msg['onfly_checksum'] = {
'method': algo_method,
'value': onfly_algo.value
}
if ((msg['size'] > 0) and len(data) != msg['size']):
if self.o.acceptSizeWrong:
logger.warning(
"acceptSizeWrong data size is (%d bytes) vs. expected: (%d bytes)"
% (len(data), msg['size']))
else:
logger.warning(
"decoded data size (%d bytes) does not have expected size: (%d bytes)"
% (len(data), msg['size']))
return False
#try:
# for p in self.plugins['on_data']:
# data = p(data)
#except Exception as ex:
# logger.warning("plugin failed: %s" % (p, ex))
# return False
data_algo.update(data)
#FIXME: If data is changed by plugins, need to update content header.
# current code will reproduce the upstream message without mofification.
# need to think about whether that is OK or not.
msg['data_checksum'] = {
'method': algo_method,
'value': data_algo.value
}
msg['_deleteOnPost'] |= set(['onfly_checksum'])
msg['_deleteOnPost'] |= set(['data_checksum'])
try:
f.write(data)
f.truncate()
f.close()
self.set_local_file_attributes(path, msg)
except Exception as ex:
logger.warning("failed writing and finalizing: %s" % (path, ex))
return False
return True
def compute_local_checksum(self, msg) -> None:
if sarracenia.filemetadata.supports_extended_attributes:
try:
x = sarracenia.filemetadata.FileMetadata(msg['new_path'])
s = x.get('identity')
if s:
metadata_cached_mtime = x.get('mtime')
if ((metadata_cached_mtime >= msg['mtime'])):
# file has not been modified since checksum value was stored.
if (( 'identity' in msg ) and ( 'method' in msg['identity'] ) and \
( msg['identity']['method'] == s['method'] )) or \
( s['method'] == self.o.identity_method ) :
# file
# cache good.
msg['local_identity'] = s
msg['_deleteOnPost'] |= set(['local_identity'])
b = x.get('blocks')
msg['local_blocks'] = b
msg['_deleteOnPost'] |= set(['local_blocks'])
return
except:
pass
local_identity = sarracenia.identity.Identity.factory(
msg['identity']['method'])
if msg['identity']['method'] == 'arbitrary':
local_identity.value = msg['identity']['value']
local_identity.update_file(msg['new_path'])
msg['local_identity'] = {
'method': msg['identity']['method'],
'value': local_identity.value
}
msg['_deleteOnPost'] |= set(['local_identity'])
[docs]
def file_should_be_downloaded(self, msg) -> bool:
"""
determine whether a comparison of local_file and message metadata indicates that it is new enough
that writing the file locally is warranted.
return True to say downloading is warranted.
False if the file in the message represents the same or an older version that what is corrently on disk.
origin: refactor & translation of v2: content_should_not_be downloaded
Assumptions:
new_path exists... there is a file to compare against.
"""
# assert
lstat = sarracenia.stat(msg['new_path'])
fsiz = lstat.st_size
# FIXME... local_offset... offset within the local file... partitioned... who knows?
# part of partitioning deferral.
#end = self.local_offset + self.length
if 'size' in msg:
end = msg['size']
# compare sizes... if (sr_subscribe is downloading partitions into taget file) and (target_file isn't fully done)
# This check prevents random halting of subscriber (inplace on) if the messages come in non-sequential order
# target_file is the same as new_file unless the file is partitioned.
# FIXME If the file is partitioned, then it is the new_file with a partition suffix.
#if ('self.target_file == msg['new_file'] ) and ( fsiz != msg['size'] ):
if (fsiz != msg['size']):
logger.debug("%s file size different, so cannot be the same" %
(msg['new_path']))
return True
else:
end = 0
# compare dates...
if 'mtime' in msg:
new_mtime = sarracenia.timestr2flt(msg['mtime'])
old_mtime = 0.0
if self.o.timeCopy:
old_mtime = lstat.st_mtime
elif sarracenia.filemetadata.supports_extended_attributes:
try:
x = sarracenia.filemetadata.FileMetadata(msg['new_path'])
old_mtime = sarracenia.timestr2flt(x.get('mtime'))
except:
pass
if new_mtime <= old_mtime:
self.reject(msg, 304,
"mtime not newer %s " % (msg['new_path']))
return False
else:
logger.debug(
"{} new version is {} newer (new: {} vs old: {} )".format(
msg['new_path'], new_mtime - old_mtime, new_mtime,
old_mtime))
if 'identity' in msg and msg['identity']['method'] in ['random', 'cod']:
logger.debug("content_match %s sum 0/z never matches" %
(msg['new_path']))
return True
if end > fsiz:
logger.debug(
"new file not big enough... considered different")
return True
if not 'identity' in msg:
# FIXME... should there be a setting to assume them the same? use cases may vary.
logger.debug( "no checksum available, assuming different" )
return True
try:
self.compute_local_checksum(msg)
except:
logger.debug(
"something went wrong when computing local checksum... considered different"
)
return True
logger.debug("checksum in message: %s vs. local: %s" %
(msg['identity'], msg['local_identity']))
if msg['local_identity'] == msg['identity']:
self.reject(msg, 304, "same checksum %s " % (msg['new_path']))
return False
else:
return True
[docs]
def removeOneFile(self, path) -> bool:
"""
process an unlink event, returning boolean success.
"""
logger.debug("path to remove: %s" % path)
ok = True
try:
if os.path.isfile(path): os.unlink(path)
if os.path.islink(path): os.unlink(path)
if os.path.isdir(path): os.rmdir(path)
logger.info("removed %s" % path)
except:
logger.error("could not remove %s." % path)
logger.debug('Exception details: ', exc_info=True)
ok = False
return ok
[docs]
def renameOneItem(self, old, path) -> bool:
"""
for messages with an rename file operation, it is to rename a file.
"""
ok = True
logger.info( f" pwd is {os.getcwd()} " )
if not os.path.exists(old):
logger.info(
"old file %s not found, if destination (%s) missing, then fall back to copy"
% (old, path))
# if the destination file exists, assume rename already happenned,
# otherwis return false so that caller falls back to downloading/sending the file.
# return os.path.isfile(path)
# PAS 2022/12/01 ... only 1 message to interpret, will never be a previous rename.
return False
try:
if os.path.isfile(path): os.unlink(path)
if os.path.islink(path): os.unlink(path)
if os.path.isdir(path): os.rmdir(path)
os.rename(old, path)
logger.info("renamed %s -> %s" % (old, path))
except:
logger.error(
"sr_subscribe/doit_download: could not rename %s to %s " %
(old, path))
logger.debug('Exception details: ', exc_info=True)
ok = False
return ok
[docs]
def mkdir(self, msg) -> bool:
"""
perform an mkdir.
"""
ok=False
path = msg['new_dir'] + '/' + msg['new_file']
logger.debug( f"message is to mkdir {path}" )
if not os.path.isdir(msg['new_dir']):
try:
os.makedirs(msg['new_dir'], self.o.permDirDefault, True)
except Exception as ex:
logger.warning("making %s: %s" % (msg['new_dir'], ex))
logger.debug('Exception details:', exc_info=True)
if os.path.isdir(path):
logger.debug( f"no need to mkdir {path} as it exists" )
return True
if 'mode' in msg:
mode=msg['mode']
else:
mode=self.o.permDirDefault
if type(mode) is not int:
mode=int(mode,base=8)
try:
os.mkdir(path,mode=mode)
ok=True
except Exception as ex:
logger.error( f"mkdir {path} failed." )
logger.debug('Exception details:', exc_info=True)
return ok
[docs]
def link1file(self, msg, symbolic=True) -> bool:
"""
perform a link of a single file, based on a message, returning boolean success
if it's Symbolic, then do that. else do a hard link.
imported from v2/subscribe/doit_download "link event, try to link the local product given by message"
"""
logger.debug("message is to link %s to %s" %
(msg['new_file'], msg['fileOp']['link']))
# redundant, check is done in caller.
#if not 'link' in self.o.fileEvents:
# logger.info("message to link %s to %s ignored (events setting)" % \
# ( msg['new_file'], msg['fileOp'][ 'link' ] ) )
# return False
if not os.path.isdir(msg['new_dir']):
try:
self.worklist.directories_ok.append(msg['new_dir'])
os.makedirs(msg['new_dir'], self.o.permDirDefault, True)
except Exception as ex:
logger.warning("making %s: %s" % (msg['new_dir'], ex))
logger.debug('Exception details:', exc_info=True)
ok = True
try:
path = msg['new_dir'] + '/' + msg['new_file']
if os.path.isfile(path): os.unlink(path)
if os.path.islink(path): os.unlink(path)
#if os.path.isdir(path): os.rmdir(path)
if 'hlink' in msg['fileOp'] :
os.link(msg['fileOp']['hlink'], path)
logger.info("%s hard-linked to %s " % (msg['new_file'], msg['fileOp']['hlink']))
else:
os.symlink(msg['fileOp']['link'], path)
logger.info("%s sym-linked to %s " % (msg['new_file'], msg['fileOp']['link']))
except:
ok = False
logger.error("link of %s %s failed." %
(msg['new_file'], msg['fileOp']))
logger.debug('Exception details:', exc_info=True)
return ok
[docs]
def do_download(self) -> None:
"""
do download work for self.worklist.incoming, placing files:
successfully downloaded in worklist.ok
temporary failures in worklist.failed
permanent failures (or files not to be downloaded) in worklist.rejected
"""
if not self.o.download:
self.worklist.ok = self.worklist.incoming
self.worklist.incoming = []
return
for msg in self.worklist.incoming:
if 'newname' in msg:
"""
revamped rename algorithm requires only 1 message, ignore newname.
"""
self.worklist.ok.append(msg)
continue
new_path = msg['new_dir'] + os.path.sep + msg['new_file']
new_file = msg['new_file']
if not os.path.isdir(msg['new_dir']):
try:
logger.info( f"missing destination directories, makedirs: {msg['new_dir']} " )
self.worklist.directories_ok.append(msg['new_dir'])
os.makedirs(msg['new_dir'], 0o775, True)
except Exception as ex:
logger.warning("making %s: %s" % (msg['new_dir'], ex))
logger.debug('Exception details:', exc_info=True)
os.chdir(msg['new_dir'])
logger.debug( f"chdir {msg['new_dir']}")
if 'fileOp' in msg :
if 'rename' in msg['fileOp']:
if 'renameUnlink' in msg:
self.removeOneFile(msg['fileOp']['rename'])
msg.setReport(201, 'old unlinked %s' % msg['fileOp']['rename'])
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
else:
# actual rename...
ok = self.renameOneItem(msg['fileOp']['rename'], new_path)
# if rename succeeds, fall through to download object to find if the file renamed
# actually matches the one advertised, and potentially download it.
# if rename fails, recover by falling through to download the data anyways.
if ok:
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
msg.setReport(201, 'renamed')
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
continue
elif ('directory' in msg['fileOp']) and ('remove' in msg['fileOp'] ):
if 'rmdir' not in self.o.fileEvents:
self.reject(msg, 202, "skipping rmdir %s" % new_path)
continue
if self.removeOneFile(new_path):
msg.setReport(201, 'rmdired')
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
else:
#FIXME: should this really be queued for retry? or just permanently failed?
# in rejected to avoid retry, but wondering if failed and deferred
# should be separate lists in worklist...
self.reject(msg, 500, "rmdir %s failed" % new_path)
continue
elif ('remove' in msg['fileOp']):
if 'delete' not in self.o.fileEvents:
self.reject(msg, 202, "skipping delete %s" % new_path)
continue
if self.removeOneFile(new_path):
msg.setReport(201, 'removed')
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
else:
#FIXME: should this really be queued for retry? or just permanently failed?
# in rejected to avoid retry, but wondering if failed and deferred
# should be separate lists in worklist...
self.reject(msg, 500, "remove %s failed" % new_path)
continue
# no elif because if rename fails and operation is an mkdir or a symlink..
# need to retry as ordinary creation, similar to normal file copy case.
if 'directory' in msg['fileOp']:
if 'mkdir' not in self.o.fileEvents:
self.reject(msg, 202, "skipping mkdir %s" % new_path)
continue
if self.mkdir(msg):
msg.setReport(201, 'made directory')
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
else:
# as above...
self.reject(msg, 500, "mkdir %s failed" % msg['new_file'])
continue
elif 'link' in msg['fileOp'] or 'hlink' in msg['fileOp']:
if 'link' not in self.o.fileEvents:
self.reject(msg, 202, "skipping link %s" % new_path)
continue
if self.link1file(msg):
msg.setReport(201, 'linked')
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxFiles'] += 1
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
else:
# as above...
self.reject(msg, 500, "link %s failed" % msg['fileOp'])
continue
# establish new_inflight_path which is the file to download into initially.
if self.o.inflight == None or (
('blocks' in msg) and (msg['blocks']['method'] == 'inplace')):
new_inflight_path = msg['new_file']
elif type(self.o.inflight) == str:
if self.o.inflight == '.':
new_inflight_path = '.' + new_file
elif (self.o.inflight[-1] == '/') or (self.o.inflight[0] == '/'):
if not self.o.dry_run and not os.path.isdir(self.o.inflight):
try:
os.mkdir(self.o.inflight)
os.chmod(self.o.inflight, self.o.permDirDefault)
except:
pass
new_inflight_path = self.o.inflight + new_file
elif self.o.inflight[0] == '.':
new_inflight_path = new_file + self.o.inflight
else:
#inflight is interval: minimum the age of the source file, as per message.
logger.error('interval inflight setting: %s, not appropriate for downloads.' %
self.o.inflight)
# FIXME... what to do?
self.reject(
msg, 503, "invalid inflight %s settings %s" %
(self.o.inflight, new_path))
continue
msg['new_inflight_path'] = new_inflight_path
msg['new_path'] = new_path
msg['_deleteOnPost'] |= set(['new_path'])
msg['_deleteOnPost'] |= set(['new_inflight_path'])
# assert new_inflight_path is set.
if os.path.exists(msg['new_inflight_path']):
if self.o.inflight:
how_old = time.time() - os.path.getmtime(msg['new_inflight_path'])
#FIXME: if mtime > 5 minutes, perhaps rm it, and continue? what if transfer crashed?
# Added this with fixed value, should it be a setting?
if how_old > 300:
os.unlink( msg['new_inflight_path'] )
logger.info(
f"inflight file is {how_old}s old. Removed previous attempt {msg['new_path']}" )
else:
logger.warning(
'inflight file already exists. race condition, deferring transfer of %s'
% msg['new_path'])
self.worklist.failed.append(msg)
continue
# overwriting existing file.
# FIXME: decision of whether to download, goes here.
if os.path.isfile(new_path):
if not self.o.overwrite:
self.reject(msg, 204,
"not overwriting existing file %s" % new_path)
continue
if not self.file_should_be_downloaded(msg):
continue
# download content
if 'content' in msg.keys():
if self.write_inline_file(msg):
msg.setReport(201, "Download successful (inline content)")
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
continue
logger.warning(
"failed to write inline content %s, falling through to download"
% new_path)
parsed_url = sarracenia.baseUrlParse(msg['baseUrl'])
self.scheme = parsed_url.scheme
i = 1
while i <= self.o.attempts:
if i > 1:
logger.warning("downloading again, attempt %d" % i)
ok = self.download(msg, self.o)
if ok:
logger.debug("downloaded ok: %s" % new_path)
msg.setReport(201, "Download successful" )
# if content is present, but downloaded anyways, then it is no good, and should not be forwarded.
if 'content' in msg:
del msg['content']
self.worklist.ok.append(msg)
self.metrics['flow']['transferRxLast'] = msg['report']['timeCompleted']
break
else:
logger.info("attempt %d failed to download %s/%s to %s" \
% ( i, msg['baseUrl'], msg['relPath'], new_path) )
i = i + 1
if not ok:
logger.error(
"gave up downloading for now, appending to retry queue")
self.worklist.failed.append(msg)
# FIXME: file reassembly missing?
#if self.inplace : file_reassemble(self)
self.worklist.incoming = []
# v2 sr_util.py ... generic sr_transport imported here...
# generalized download...
[docs]
def download(self, msg, options) -> bool:
"""
download/transfer one file based on message, return True if successful, otherwise False.
"""
self.o = options
if 'retrievePath' in msg:
logger.debug("%s_transport download override retrievePath=%s" % (self.scheme, msg['retrievePath']))
remote_file = msg['retrievePath']
cdir = None
if msg['relPath'][0] == '/' or msg['baseUrl'][-1] == '/':
urlstr = msg['baseUrl'] + msg['relPath']
else:
urlstr = msg['baseUrl'] + '/' + msg['relPath']
else:
logger.debug("%s_transport download relPath=%s" % (self.scheme, msg['relPath']))
# split the path to the file and the file
# if relPath is just the file remote_path will return empty
remote_path, remote_file = os.path.split(msg['relPath'])
u = sarracenia.baseUrlParse(msg['baseUrl'])
logger.debug( f"baseUrl.path= {u.path} ")
if remote_path:
if u.path:
if ( u.path[-1] != '/' ) and ( remote_path[0] != '/' ) :
remote_path = u.path + '/' + remote_path
else:
remote_path = u.path + remote_path
cdir = remote_path
else:
if u.path:
cdir=u.path
else:
cdir=None
if msg['relPath'][0] == '/' or msg['baseUrl'][-1] == '/':
urlstr = msg['baseUrl'] + msg['relPath']
else:
urlstr = msg['baseUrl'] + '/' + msg['relPath']
istr =msg['identity'] if ('identity' in msg) else "None"
fostr = msg['fileOp'] if ('fileOp' in msg ) else "None"
logger.debug( 'identity: %s, fileOp: %s' % ( istr, fostr ) )
new_inflight_path = ''
new_dir = msg['new_dir']
new_file = msg['new_file']
new_inflight_path = None
if 'blocks' in msg:
if msg['blocks']['method'] in [ 'inplace' ]: # download only a specific block from a file, not the whole thing.
logger.debug( f"splitting 1 file into {len(msg['blocks']['manifest'])} block messages." )
blkno = msg['blocks']['number']
blksz_l = sarracenia.naturalSize(msg['blocks']['size']).split()
blksz = blksz_l[0]+blksz_l[1][0].lower()
if not '§block_' in new_file:
new_file += f"§block_{blkno:04d},{blksz}_§"
msg['new_file'] = new_file
if options.inflight == None:
new_inflight_path = new_file
elif type(options.inflight) == str:
if options.inflight == '.':
new_inflight_path = '.' + new_file
elif ( options.inflight[-1] == '/' ) or (options.inflight[0] == '/'):
new_inflight_path = options.inflight + new_file
elif options.inflight[0] == '.':
new_inflight_path = new_file + options.inflight
else:
logger.error('inflight setting: %s, not for downloads.' %
options.inflight)
if new_inflight_path:
msg['new_inflight_path'] = new_inflight_path
msg['_deleteOnPost'] |= set(['new_inflight_path'])
if 'download' in self.plugins and len(self.plugins['download']) > 0:
for plugin in self.plugins['download']:
try:
ok = plugin(msg)
except Exception as ex:
logger.error( f'flowCallback plugin {plugin} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
if not ok: return False
return True
if self.o.dry_run:
curdir = new_dir
else:
try:
curdir = os.getcwd()
except:
curdir = None
if curdir != new_dir:
# make sure directory exists, create it if not
try:
if not os.path.isdir(new_dir):
self.worklist.directories_ok.append(new_dir)
os.makedirs(new_dir, 0o775, True)
os.chdir(new_dir)
logger.debug( f"local cd to {new_dir}")
except Exception as ex:
logger.warning("making %s: %s" % (new_dir, ex))
logger.debug('Exception details:', exc_info=True)
return False
try:
options.sendTo = msg['baseUrl']
if (not (self.scheme in self.proto)) or (self.proto[self.scheme] is None):
self.proto[self.scheme] = sarracenia.transfer.Transfer.factory(self.scheme, self.o)
self.metrics['flow']['transferConnected'] = True
self.metrics['flow']['transferConnectStart'] = time.time()
if (not self.o.dry_run) and not self.proto[self.scheme].check_is_connected():
if self.metrics['flow']['transferConnected']:
now=nowflt()
self.metrics['flow']['transferConnectTime'] += now - self.metrics['flow']['transferConnectStart']
self.metrics['flow']['transferConnectStart'] = 0
self.metrics['flow']['transferConnected'] = False
logger.debug("%s_transport download connects" % self.scheme)
ok = self.proto[self.scheme].connect()
if not ok:
self.proto[self.scheme] = None
return False
self.metrics['flow']['transferConnected'] = True
self.metrics['flow']['transferConnectStart'] = time.time()
logger.debug('connected')
#=================================
# if parts, check that the protol supports it
#=================================
#if not hasattr(proto,'seek') and ('blocks' in msg) and ( msg['blocks']['method'] == 'inplace' ):
# logger.error("%s, inplace part file not supported" % self.scheme)
# return False
cwd = None
if (not self.o.dry_run) and hasattr(self.proto[self.scheme], 'getcwd'):
cwd = self.proto[self.scheme].getcwd()
logger.debug( f" from proto getcwd: {cwd} ")
if cdir and cwd != cdir:
logger.debug("%s_transport remote cd to %s" % (self.scheme, cdir))
if self.o.dry_run:
cwd = cdir
else:
try:
self.proto[self.scheme].cd(cdir)
except Exception as ex:
logger.error("chdir %s: %s" % (cdir, ex))
return False
remote_offset = 0
exactLength=False
if ('blocks' in msg) and (msg['blocks']['method'] == 'inplace'):
blkno=msg['blocks']['number']
remote_offset=0
exactLength=True
while blkno > 0:
blkno -= 1
remote_offset += msg['blocks']['manifest'][blkno]['size']
block_length=msg['blocks']['manifest'][msg['blocks']['number']]['size']
logger.info( f"offset calculation: start={remote_offset} count={block_length}" )
elif 'size' in msg:
block_length = msg['size']
else:
block_length = 0
#download file
logger.debug(
'Beginning fetch of %s %d-%d into %s %d-%d' %
(urlstr, remote_offset, block_length-1, new_inflight_path, msg['local_offset'],
msg['local_offset'] + block_length - 1))
# FIXME locking for i parts in temporary file ... should stay lock
# and file_reassemble... take into account the locking
if self.o.identity_method.startswith('cod,'):
download_algo = self.o.identity_method[4:]
elif 'identity' in msg:
download_algo = msg['identity']['method']
else:
download_algo = None
if download_algo:
self.proto[self.scheme].set_sumalgo(download_algo)
if download_algo == 'arbitrary':
self.proto[self.scheme].set_sumArbitrary(
msg['identity']['value'])
if (type(options.inflight) == str) \
and (options.inflight[0] == '/' or options.inflight[-1] == '/') \
and not os.path.exists(options.inflight):
try:
if not self.o.dry_run:
os.mkdir(options.inflight)
os.chmod(options.inflight, options.permDirDefault)
except:
logger.error('unable to make inflight directory %s/%s' %
(msg['new_dir'], options.inflight))
logger.debug('Exception details: ', exc_info=True)
logger.debug( "hasAccel=%s, thresh=%d, len=%d, remote_off=%d, local_off=%d inflight=%s" % \
( hasattr( self.proto[self.scheme], 'getAccelerated' ), \
self.o.accelThreshold, block_length, remote_offset, msg['local_offset'], new_inflight_path ) )
accelerated = hasattr( self.proto[self.scheme], 'getAccelerated') and \
(self.o.accelThreshold > 0 ) and (block_length > self.o.accelThreshold) and \
(remote_offset == 0) and ( msg['local_offset'] == 0)
if not self.o.dry_run:
try:
if accelerated:
len_written = self.proto[self.scheme].getAccelerated(
msg, remote_file, new_inflight_path, block_length, remote_offset, exactLength)
#FIXME: no onfly_checksum calculation during download.
else:
self.proto[self.scheme].set_path(new_inflight_path)
len_written = self.proto[self.scheme].get(
msg, remote_file, new_inflight_path, remote_offset,
msg['local_offset'], block_length, exactLength)
except Exception as ex:
logger.error( f"could not get {remote_file}: {ex}" )
return False
else:
len_written = block_length
if ('blocks' in msg) and (msg['blocks']['method'] == 'inplace'):
msg['blocks']['method'] = 'separate'
if (len_written == block_length):
if not self.o.dry_run:
if accelerated:
self.proto[self.scheme].update_file(new_inflight_path)
elif len_written < 0:
logger.error("failed to download %s" % new_file)
if (self.o.inflight != None) and os.path.isfile(new_inflight_path):
os.remove(new_inflight_path)
return False
else:
if block_length == 0:
if self.o.acceptSizeWrong:
logger.debug(
'AcceptSizeWrong %d of with no length given for %s assuming ok'
% (len_written, new_inflight_path))
else:
logger.warning(
'downloaded %d of with no length given for %s assuming ok'
% (len_written, new_inflight_path))
else:
if self.o.acceptSizeWrong:
logger.debug(
'AcceptSizeWrong download size mismatch, received %d of expected %d bytes for %s'
% (len_written, block_length, new_inflight_path))
else:
if len_written > block_length:
logger.error( f'download more {len_written} than expected {block_length} bytes for {new_inflight_path}' )
else:
logger.error( f'incomplete download only {len_written} of expected {block_length} bytes for {new_inflight_path}' )
if (self.o.inflight != None) and os.path.isfile(new_inflight_path):
os.remove(new_inflight_path)
return False
# when len_written is different than block_length
msg['size'] = len_written
# if we haven't returned False by this point, assuming download was successful
if (new_inflight_path != new_file):
if os.path.isfile(new_file):
os.remove(new_file)
os.rename(new_inflight_path, new_file)
# older versions don't include the contentType, so patch it here.
if features['filetypes']['present'] and 'contentType' not in msg:
msg['contentType'] = magic.from_file(new_file,mime=True)
self.metrics['flow']['transferRxBytes'] += len_written
self.metrics['flow']['transferRxFiles'] += 1
if download_algo and not self.o.dry_run:
msg['onfly_checksum'] = self.proto[self.scheme].get_sumstr()
msg['data_checksum'] = self.proto[self.scheme].data_checksum
if self.o.identity_method.startswith('cod,') and not accelerated:
msg['identity'] = msg['onfly_checksum']
msg['_deleteOnPost'] |= set(['onfly_checksum'])
msg['_deleteOnPost'] |= set(['data_checksum'])
# fix message if no partflg (means file size unknown until now)
#if not 'blocks' in msg:
# #msg['size'] = self.proto[self.scheme].fpos ... fpos not set when accelerated.
# fix permission
if not self.o.dry_run:
self.set_local_file_attributes(new_file, msg)
if options.delete and hasattr(self.proto[self.scheme], 'delete'):
try:
if not self.o.dry_run:
self.proto[self.scheme].delete(remote_file)
logger.debug('file deleted on remote site %s' %
remote_file)
except Exception as ex:
logger.error( f'unable to delete remote file {remote_file}: {ex}' )
logger.debug('Exception details: ', exc_info=True)
if (self.o.acceptSizeWrong or (block_length == 0)) and (len_written > 0):
return True
if (len_written != block_length):
return False
except Exception as ex:
logger.debug('Exception details: ', exc_info=True)
logger.warning("failed to write %s: %s" % (new_inflight_path, ex))
#closing on problem
if not self.o.dry_run:
try:
self.proto[self.scheme].close()
except:
logger.debug('closing exception details: ', exc_info=True)
self.metrics['flow']["transferConnected"] = False
if 'transferConnectLast' in self.metrics['flow']:
self.metrics['flow']['transferConnectedTime'] = time.time() - self.metrics['flow']['transferConnectLast']
else:
self.metrics['flow']['transferConnectedTime'] = 0
self.cdir = None
self.proto[self.scheme] = None
if (not self.o.dry_run) and os.path.isfile(new_inflight_path):
os.remove(new_inflight_path)
return False
return True
# generalized send...
def send(self, msg, options):
self.o = options
logger.debug( f"{self.scheme}_transport sendTo: {self.o.sendTo}" )
logger.debug("%s_transport send %s %s" %
(self.scheme, msg['new_dir'], msg['new_file']))
if len(self.plugins['send']) > 0:
for plugin in self.plugins['send']:
try:
ok = plugin(msg)
except Exception as ex:
logger.error( f'flowCallback plugin {plugin} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
if not ok: return False
return True
if self.o.baseDir:
local_path = self.o.variableExpansion(self.o.baseDir,
msg) + '/' + msg['relPath']
else:
local_path = '/' + msg['relPath']
# older versions don't include the contentType, so patch it here.
if features['filetypes']['present'] and \
('contentType' not in msg) and (not 'fileOp' in msg):
msg['contentType'] = magic.from_file(local_path,mime=True)
local_dir = os.path.dirname(local_path).replace('\\', '/')
local_file = os.path.basename(local_path).replace('\\', '/')
new_dir = msg['new_dir'].replace('\\', '/')
new_file = msg['new_file'].replace('\\', '/')
new_inflight_path = None
try:
curdir = os.getcwd()
except:
curdir = None
if (curdir != local_dir) and not self.o.dry_run:
try:
os.chdir(local_dir)
except Exception as ex:
logger.error("could not chdir to %s to write: %s" % (local_dir, ex))
return False
try:
if not self.o.dry_run:
if (not (self.scheme in self.proto)) or \
(self.proto[self.scheme] is None) or not self.proto[self.scheme].check_is_connected():
logger.debug("%s_transport send connects" % self.scheme)
if self.metrics['flow']['transferConnected']:
now = nowflt()
self.metrics['flow']['transferConnectTime'] += now - self.metrics['flow']['transferConnectStart']
self.metrics['flow']['transferConnectStart'] = 0
self.metrics['flow']['transferConnected'] = False
self.proto[self.scheme] = sarracenia.transfer.Transfer.factory( self.scheme, options)
ok = self.proto[self.scheme].connect()
if not ok: return False
self.cdir = None
self.metrics['flow']['transferConnected'] = True
self.metrics['flow']['transferConnectStart'] = time.time()
elif not (self.scheme in self.proto) or self.proto[self.scheme] is None:
logger.debug("dry_run %s_transport send connects" % self.scheme)
self.proto[self.scheme] = sarracenia.transfer.Transfer.factory( self.scheme, options)
self.cdir = None
self.metrics['flow']['transferConnected'] = True
self.metrics['flow']['transferConnectStart'] = time.time()
#=================================
# if parts, check that the protocol supports it
#=================================
if not self.o.dry_run and not hasattr(self.proto[self.scheme],
'seek') and ('blocks' in msg) and (
msg['blocks']['method'] == 'inplace'):
logger.error("%s, inplace part file not supported" %
self.scheme)
return False
#=================================
# if umask, check that the protocol supports it ...
#=================================
inflight = options.inflight
if not hasattr(self.proto[self.scheme],
'umask') and options.inflight == 'umask':
logger.warning("%s, umask not supported" % self.scheme)
inflight = None
#=================================
# if renaming used, check that the protocol supports it ...
#=================================
if not hasattr(self.proto[self.scheme],
'rename') and options.inflight.startswith('.'):
logger.warning("%s, rename not supported" % self.scheme)
inflight = None
#=================================
# remote set to new_dir
#=================================
cwd = None
if hasattr(self.proto[self.scheme], 'getcwd'):
if not self.o.dry_run:
try:
cwd = self.proto[self.scheme].getcwd()
except Exception as ex:
logger.error( f"could not getcwd: {ex}" )
return False
if cwd != new_dir:
logger.debug("%s_transport send cd to %s" %
(self.scheme, new_dir))
if not self.o.dry_run:
try:
self.proto[self.scheme].cd_forced(775, new_dir)
except Exception as ex:
logger.error( f"could not chdir to {new_dir}: {ex}" )
return False
#=================================
# delete event
#=================================
if 'fileOp' in msg:
if 'remove' in msg['fileOp'] :
if hasattr(self.proto[self.scheme], 'delete'):
logger.debug("message is to remove %s" % new_file)
if not self.o.dry_run:
if 'directory' in msg['fileOp']:
try:
self.proto[self.scheme].rmdir(new_file)
except Exception as ex:
logger.error( f"could not rmdir {new_file}: {ex}" )
return False
else:
try:
self.proto[self.scheme].delete(new_file)
except Exception as ex:
logger.error( f"could not delete {new_file}: {ex}" )
return False
msg.setReport(201, f'file or directory removed')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
logger.error("%s, delete not supported" % self.scheme)
return False
if 'rename' in msg['fileOp'] :
if hasattr(self.proto[self.scheme], 'delete'):
logger.debug( f"message is to rename {msg['fileOp']['rename']} to {new_file}" )
if not self.o.dry_run:
try:
self.proto[self.scheme].rename(msg['fileOp']['rename'], new_file)
except Exception as ex:
logger.error( f"could not rename {new_file}: {ex}" )
return False
msg.setReport(201, f'file renamed')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
logger.error("%s, delete not supported" % self.scheme)
return False
if 'directory' in msg['fileOp'] :
if 'contentType' not in msg:
msg['contentType'] = 'text/directory'
if hasattr(self.proto[self.scheme], 'mkdir'):
logger.debug( f"message is to mkdir {new_file}")
if not self.o.dry_run:
try:
self.proto[self.scheme].mkdir(new_file)
except Exception as ex:
logger.error( f"could not mkdir {new_file}: {ex}" )
return False
msg.setReport(201, f'directory created')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
logger.error("%s, mkdir not supported" % self.scheme)
return False
#=================================
# link event
#=================================
if 'hlink' in msg['fileOp']:
if 'contentType' not in msg:
msg['contentType'] = 'text/link'
if hasattr(self.proto[self.scheme], 'link'):
logger.debug("message is to link %s to: %s" % (new_file, msg['fileOp']['hlink']))
if not self.o.dry_run:
try:
self.proto[self.scheme].link(msg['fileOp']['hlink'], new_file)
except Exception as ex:
logger.error( f"could not link {new_file}: {ex}" )
return False
return True
logger.error("%s, hardlinks not supported" % self.scheme)
return False
elif 'link' in msg['fileOp']:
if 'contentType' not in msg:
msg['contentType'] = 'text/link'
if hasattr(self.proto[self.scheme], 'symlink'):
logger.debug("message is to link %s to: %s" % (new_file, msg['fileOp']['link']))
if not self.o.dry_run:
try:
self.proto[self.scheme].symlink(msg['fileOp']['link'], new_file)
except Exception as ex:
logger.error( f"could not symlink {new_file}: {ex}" )
return False
msg.setReport(201, f'file linked')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
return True
logger.error("%s, symlink not supported" % self.scheme)
return False
#=================================
# send event
#=================================
# the file does not exist... warn, sleep and return false for the next attempt
if not os.path.exists(local_file):
logger.warning(
"product collision or base_dir not set, file %s does not exist"
% local_file)
time.sleep(0.01)
return False
offset = 0
if ('blocks' in msg) and (msg['blocks']['method'] == 'inplace'):
offset = msg['offset']
new_offset = msg['local_offset']
if 'size' in msg:
block_length = msg['size']
str_range = ''
if ('blocks' in msg) and (
msg['blocks']['method'] == 'inplace'):
block_length = msg['blocks']['size']
str_range = 'bytes=%d-%d' % (new_offset, new_offset +
block_length - 1)
str_range = ''
if ('blocks' in msg) and (msg['blocks']['method'] == 'inplace'):
str_range = 'bytes=%d-%d' % (offset, offset + msg['size'] - 1)
#upload file
logger.debug( "hasattr=%s, thresh=%d, len=%d, remote_off=%d, local_off=%d " % \
( hasattr( self.proto[self.scheme], 'putAccelerated'), \
self.o.accelThreshold, block_length, new_offset, msg['local_offset'] ) )
accelerated = hasattr( self.proto[self.scheme], 'putAccelerated') and \
(self.o.accelThreshold > 0 ) and (block_length > self.o.accelThreshold) and \
(new_offset == 0) and ( msg['local_offset'] == 0)
if inflight == None or (('blocks' in msg) and
(msg['blocks']['method'] != 'inplace')):
try:
if not self.o.dry_run:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated( msg, local_file, new_file)
else:
len_written = self.proto[self.scheme].put( msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not send inflight=None {new_file}: {ex}" )
return False
elif (('blocks' in msg)
and (msg['blocks']['method'] == 'inplace')):
if not self.o.dry_run:
try:
self.proto[self.scheme].put(msg, local_file, new_file, offset,
new_offset, msg['size'])
except Exception as ex:
logger.error( f"could not send inplace {new_file}: {ex}" )
return False
elif inflight == '.':
new_inflight_path = '.' + new_file
if not self.o.dry_run:
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
else:
len_written = msg['size']
elif inflight[0] == '.':
new_inflight_path = new_file + inflight
if not self.o.dry_run:
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
elif options.inflight[-1] == '/':
if not self.o.dry_run:
try:
self.proto[self.scheme].cd_forced(
775, new_dir + '/' + options.inflight)
self.proto[self.scheme].cd_forced(775, new_dir)
except:
pass
new_inflight_path = options.inflight + new_file
if not self.o.dry_run:
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
else:
len_written = msg['size']
elif inflight == 'umask':
if not self.o.dry_run:
self.proto[self.scheme].umask()
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_file)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].put(msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
else:
len_written = msg['size']
msg.setReport(201, 'file sent')
self.metrics['flow']['transferTxBytes'] += len_written
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
# fix permission
if not self.o.dry_run:
self.set_remote_file_attributes(self.proto[self.scheme], new_file,
msg)
logger.info('Sent: %s %s into %s/%s %d-%d' %
(local_path, str_range, new_dir, new_file, offset,
offset + msg['size'] - 1))
return True
except Exception as err:
#removing lock if left over
if new_inflight_path != None and hasattr(self.proto[self.scheme],
'delete'):
if not self.o.dry_run:
try:
self.proto[self.scheme].delete(new_inflight_path)
except:
pass
#closing on problem
if not self.o.dry_run:
try:
self.proto[self.scheme].close()
except:
pass
now = nowflt()
self.metrics['flow']['transferConnectTime'] += now - self.metrics['flow']['transferConnectStart']
self.metrics['flow']['transferConnectStart']=0
self.metrics['flow']['transferConnected']=False
self.cdir = None
self.proto[self.scheme] = None
logger.error("Delivery failed %s" % msg['new_dir'] + '/' +
msg['new_file'])
logger.debug('Exception details: ', exc_info=True)
return False
# set_local_file_attributes
[docs]
def set_local_file_attributes(self, local_file, msg):
"""
after a file has been written, restore permissions and ownership if necessary.
"""
logger.debug("%s" % local_file)
# if the file is not partitioned, the the onfly_checksum is for the whole file.
# cache it here, along with the mtime.
if ('blocks' in msg) and sarracenia.features['reassembly']['present']:
with sarracenia.blockmanifest.BlockManifest(local_file) as y:
y.set( msg['blocks'] )
x = sarracenia.filemetadata.FileMetadata(local_file)
# FIXME ... what to do when checksums don't match?
if 'onfly_checksum' in msg:
x.set( 'identity', msg['onfly_checksum'] )
elif 'identity' in msg:
x.set('identity', msg['identity'] )
if self.o.timeCopy and 'mtime' in msg and msg['mtime']:
x.set('mtime', msg['mtime'])
else:
x.set('mtime', sarracenia.timeflt2str(os.path.getmtime(local_file)))
x.persist()
mode = 0
if self.o.permCopy and 'mode' in msg:
try:
mode = int(msg['mode'], base=8)
except:
mode = 0
if mode > 0:
os.chmod(local_file, mode)
if mode == 0 and self.o.permDefault != 0:
os.chmod(local_file, self.o.permDefault)
if self.o.timeCopy and 'mtime' in msg and msg['mtime']:
mtime = sarracenia.timestr2flt(msg['mtime'])
atime = mtime
if 'atime' in msg and msg['atime']:
atime = sarracenia.timestr2flt(msg['atime'])
os.utime(local_file, (atime, mtime))
# set_remote_file_attributes
def set_remote_file_attributes(self, proto, remote_file, msg):
#logger.debug("sr_transport set_remote_file_attributes %s" % remote_file)
if hasattr(proto, 'chmod'):
mode = 0
if self.o.permCopy and 'mode' in msg:
try:
mode = int(msg['mode'], base=8)
except:
mode = 0
if mode > 0:
try:
proto.chmod(mode, remote_file)
except:
pass
if mode == 0 and self.o.permDefault != 0:
try:
proto.chmod(self.o.permDefault, remote_file)
except:
pass
if hasattr(proto, 'chmod'):
if self.o.timeCopy and 'mtime' in msg and msg['mtime']:
mtime = sarracenia.timestr2flt(msg['mtime'])
atime = mtime
if 'atime' in msg and msg['atime']:
atime = sarracenia.timestr2flt(msg['atime'])
try:
proto.utime(remote_file, (atime, mtime))
except:
pass
# v2 sr_util sr_transport stuff. end.
# imported from v2: sr_sender/doit_send
[docs]
def do_send(self):
"""
"""
if not self.o.download:
self.worklist.ok = self.worklist.incoming
self.worklist.incoming = []
return
for msg in self.worklist.incoming:
# weed out non-file transfer operations that are configured to not be done.
if 'fileOp' in msg:
if ('directory' in msg['fileOp']) and ('remove' in msg['fileOp']) and ( 'rmdir' not in self.o.fileEvents ):
self.reject(msg, 202, "skipping rmdir here." )
continue
elif ('remove' in msg['fileOp']) and ( 'delete' not in self.o.fileEvents ):
self.reject(msg, 202, "skipping delete here." )
continue
if ('directory' in msg['fileOp']) and ( 'mkdir' not in self.o.fileEvents ):
self.reject(msg, 202, "skipping mkdir here." )
continue
if ('hlink' in msg['fileOp']) and ( 'link' not in self.o.fileEvents ):
self.reject(msg, 202, "skipping hlink here." )
continue
if ('link' in msg['fileOp']) and ( 'link' not in self.o.fileEvents ):
self.reject(msg, 202, "skipping link here." )
continue
#=================================
# proceed to send : has to work
#=================================
# N attempts to send
i = 1
while i <= self.o.attempts:
if i != 1:
logger.warning("sending again, attempt %d" % i)
ok = self.send(msg, self.o)
if ok:
self.worklist.ok.append(msg)
break
i = i + 1
if not ok:
self.worklist.failed.append(msg)
self.worklist.incoming = []
import sarracenia.flow.poll
import sarracenia.flow.post
import sarracenia.flow.report
import sarracenia.flow.sarra
import sarracenia.flow.sender
import sarracenia.flow.shovel
import sarracenia.flow.subscribe
import sarracenia.flow.watch
import sarracenia.flow.winnow