# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
########################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
import calendar, datetime
from hashlib import md5
from hashlib import sha512
import humanize
import logging
import os
import random
import signal
import stat
import sys
import time
import urllib
import urllib.parse
#from sarracenia.sr_xattr import *
from sarracenia import nowflt, timestr2flt
from sarracenia.featuredetection import features
logger = logging.getLogger(__name__)
#============================================================
# sigalarm
#============================================================
[docs]
class TimeoutException(Exception):
"""timeout exception"""
pass
# alarm_cancel
def alarm_cancel():
if sys.platform != 'win32':
signal.alarm(0)
# alarm_raise
def alarm_raise(n, f):
raise TimeoutException("signal alarm timed out")
# alarm_set
[docs]
def alarm_set(time):
"""
FIXME: replace with set itimer for > 1 second resolution... currently rouding to nearest second.
"""
if sys.platform != 'win32':
signal.signal(signal.SIGALRM, alarm_raise)
signal.alarm(int(time + 0.5))
# =========================================
# sr_proto : one place for throttle, onfly checksum, buffer io timeout
# =========================================
[docs]
class Transfer():
"""
This is a sort of abstract base class for implementing transfer protocols.
Implemented subclasses include support for: local files, https, sftp, and ftp.
This class has routines that do i/o given descriptors opened by the sub-classes,
so that each one does not need to re-implement copying, for example.
Each subclass needs to implement the following routines:
if downloading::
get ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
getAccellerated( msg, remote_file, local_file, length )
ls ()
cd (dir)
delete (path)
if sending::
put ( msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0 )
putAccelerated ( msg, remote_file, local_file, length=0 )
cd (dir)
mkdir (dir)
umask ()
chmod (perm)
rename (old,new)
Note that the ls() call returns are polymorphic. One of:
* a dictionary where the key is the name of the file in the directory,
and the value is an SFTPAttributes structure for if (from paramiko.)
(sftp.py as an example)
* a dictionary where the key is the name of the file, and the value is a string
that looks like the output of a linux ls command.
(ftp.py as an example.)
* a sequence of bytes... will be parsed as an html page.
(https.py as an example)
The first format is the vastly preferred one. The others are fallbacks when the first
is not available.
The flowcb/poll/__init__.py lsdir() routing will turn ls tries to transform any of
these return values into the first form (a dictionary of SFTPAttributes)
Each SFTPAttributes structure needs st_mode set, and folders need stat.S_IFDIR set.
if the lsdir() routine gets a sequence of bytes, the on_html_page() and on_html_parser_init(,
or perhaps handle_starttag(..) and handle_data() routines) will be used to turn them into
the first form.
web services with different such formats can be accommodated by subclassing and overriding
the handle_* entry points.
uses options (on Sarracenia.config data structure passed to constructor/factory.)
* credentials - used to authentication information.
* sendTo - server to connect to.
* batch - how many files to transfer before a connection is torn down and re-established.
* permDefault - what permissions to set on files transferred.
* permDirDefault - what permission to set on directories created.
* timeout - how long to wait for operations to complete.
* byteRateMax - maximum transfer rate (throttle to avoid exceeding)
* bufSize - size of buffers for file transfers.
"""
@staticmethod
def factory(proto, options) -> 'Transfer':
for sc in Transfer.__subclasses__():
if (hasattr(sc, 'registered_as')
and (proto in sc.registered_as())):
return sc(proto, options)
return None
[docs]
def __init__(self, proto, options):
self.o = options
if 'sarracenia.transfer.Transfer' in self.o.settings and 'logLevel' in self.o.settings:
ll = self.o.settings['sarracenia.transfer.Transfer']['logLevel']
else:
ll = self.o.logLevel
logger.setLevel(getattr(logging, ll.upper()))
logger.debug("class=%s , subclasses=%s" %
(type(self).__name__, Transfer.__subclasses__()))
self.init()
def init(self):
self.sumalgo = None
self.checksum = None
self.data_sumalgo = None
self.data_checksum = None
self.fpos = 0
self.tbytes = 0
self.tbegin = nowflt()
self.lastLog = self.tbegin
self.byteRate = 0
self.logMinimumInterval = 60
#if hasattr(self.o,'sanity_log_dead'):
# self.logMinimumInterval = self.o.runStateThreshold_hung/4
#else:
# self.logMinimumInterval = 30
[docs]
def logProgress(self,sz):
"""
if there hasn't been a log message in at least logMinumumInterval,
then put out a message, so sanity does not think it is dead.
this should print out a message once in a while for long file transfers.
"""
now=nowflt()
if now-self.lastLog > self.logMinimumInterval:
logger.info( f"{humanize.naturalsize(sz,binary=True)} written so far.")
self.lastLog=now
def local_read_close(self, src):
#logger.debug("sr_proto local_read_close")
src.close()
# finalize checksum
if self.sumalgo: self.checksum = self.sumalgo.value
if self.data_sumalgo:
self.data_checksum = self.data_sumalgo.value
def local_read_open(self, local_file, local_offset=0):
logger.debug("sr_proto local_read_open getcwd=%s self.cwd=%s" %
(os.getcwd(), self.getcwd()))
self.checksum = None
# local_file opening and seeking if needed
src = open(local_file, 'rb')
if local_offset != 0: src.seek(local_offset, 0)
# initialize sumalgo
if self.sumalgo: self.sumalgo.set_path(local_file)
if self.data_sumalgo: self.data_sumalgo.set_path(local_file)
return src
def local_write_close(self, dst):
# flush sync (make sure all io done)
dst.flush()
os.fsync(dst)
# flush,sync, remember current position, truncate = no sparce, close
self.fpos = dst.tell()
dst.truncate()
dst.close()
# finalize checksum
if self.sumalgo: self.checksum = self.sumalgo.value
if self.data_sumalgo:
self.data_checksum = self.data_sumalgo.value
def local_write_open(self, local_file, local_offset=0):
#logger.debug("sr_proto local_write_open")
# reset ckecksum, fpos
self.checksum = None
self.fpos = 0
# local_file has to exists
if not os.path.isfile(local_file):
dst = open(local_file, 'w')
dst.close()
# local_file opening and seeking if needed
dst = open(local_file, 'r+b')
if local_offset != 0: dst.seek(local_offset, 0)
return dst
[docs]
def on_data(self, chunk) -> bytes:
"""
transform data as it is being read.
Given a buffer, return the transformed buffer.
Checksum calculation is based on pre transformation... likely need
a post transformation value as well.
"""
return chunk
#FIXME ... need to re-enable on_data plugins... not sure how they should work.
# sub-classing of transfer class?
def read_write(self, src, dst, length=0):
logger.debug("sr_proto read_write")
# reset speed
rw_length = 0
self.tbytes = 0.0
self.tbegin = nowflt()
self.lastLog = self.tbegin
# length = 0, transfer entire remote file to local file
if length == 0:
while True:
if self.o.timeout: alarm_set(self.o.timeout)
chunk = src.read(self.o.bufSize)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
dst.write(new_chunk)
self.logProgress(rw_length)
alarm_cancel()
if not chunk: break
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
return rw_length
# exact length to be transfered
nc = int(length / self.o.bufSize)
r = length % self.o.bufSize
# read/write bufSize "nc" times
i = 0
while i < nc:
if self.o.timeout: alarm_set(self.o.timeout)
chunk = src.read(self.o.bufSize)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
dst.write(new_chunk)
self.logProgress(rw_length)
alarm_cancel()
if not chunk: break
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
i = i + 1
# remaining
if r > 0:
if self.o.timeout: alarm_set(self.o.timeout)
chunk = src.read(r)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
dst.write(new_chunk)
self.logProgress(rw_length)
alarm_cancel()
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
return rw_length
def read_writelocal(self,
src_path,
src,
local_file,
local_offset=0,
length=0, exactLength=False):
#logger.debug("sr_proto read_writelocal")
# open
dst = self.local_write_open(local_file, local_offset)
# initialize sumalgo
if self.sumalgo: self.sumalgo.set_path(src_path)
if self.data_sumalgo: self.data_sumalgo.set_path(src_path)
# copy source to sendTo
# 2022/12/02 - pas - need copies to always work...
# in HPC mirroring case, a lot of short files, likely length is wrong in announcements.
# grab the whole file unconditionally for now, detect error using mismatch.
rw_length = self.read_write(src, dst, length if exactLength else 0)
# close
self.local_write_close(dst)
# warn if length mismatch without transformation.
# 2022/12/02 - pas should see a lot of these messages in HPC case from now on...
if not self.o.acceptSizeWrong and length != 0 and rw_length != length:
logger.debug(
"util/writelocal mismatched file length writing %s. Message said to expect %d bytes. Got %d bytes."
% (local_file, length, rw_length))
return rw_length
def readlocal_write(self, local_file, local_offset=0, length=0, dst=None):
logger.debug("sr_proto readlocal_write")
# open
src = self.local_read_open(local_file, local_offset)
# copy source to sendTo
rw_length = self.read_write(src, dst, length)
# close
self.local_read_close(src)
# warn if length mismatch without transformation.
# FIXME: 2020/09 - commented out for now... unsure about this.
#if (not self.o.on_data_list) and length != 0 and rw_length != length :
# logger.error("util/readlocal mismatched file length reading %s. Message announced it as %d bytes, but read %d bytes " % (local_file,length,rw_length))
# 2022/12/02 - pas attempting to get files that get shorter addressed.
if ((length==0) or (rw_length < length)) and hasattr(dst,'truncate'):
dst.truncate(rw_length)
return rw_length
def set_sumalgo(self, sumalgo):
logger.debug("sr_proto set_sumalgo %s" % sumalgo)
self.sumalgo = sarracenia.identity.Identity.factory(sumalgo)
self.data_sumalgo = sarracenia.identity.Identity.factory(sumalgo)
def set_sumArbitrary(self, value):
self.sumalgo.value = value
self.data_sumalgo.value = value
def update_file(self, path):
if self.sumalgo:
self.sumalgo.update_file(path)
if self.data_sumalgo:
self.data_sumalgo.update_file(path)
def set_path(self, path):
if self.sumalgo:
self.sumalgo.set_path(path)
if self.data_sumalgo:
self.data_sumalgo.set_path(path)
def get_sumstr(self) -> dict:
if self.sumalgo:
#return { 'method':type(self.sumalgo).__name__, 'value':self.sumalgo.value }
return {
'method': self.sumalgo.get_method(),
'value': self.sumalgo.value
}
else:
return None
def metricsReport(self):
return { 'byteRateInstant': self.byteRate }
# throttle
def throttle(self, buf):
self.tbytes = self.tbytes + len(buf)
rspan = nowflt() - self.tbegin
if rspan > 0:
self.byteRate = self.tbytes/rspan
if hasattr(self.o,'byteRateMax') and self.o.byteRateMax and self.o.byteRateMax > 0:
span = self.tbytes / self.o.byteRateMax
if span > rspan:
stime = span - rspan
if stime > 10:
logger.info( f"exceeded byteRateMax: {self.o.byteRateMax} sleeping for {stime:.2f}")
time.sleep(stime)
# write_chunk
def write_chunk(self, chunk):
if self.chunk_iow: self.chunk_iow.write(chunk)
self.rw_length += len(chunk)
alarm_cancel()
self.logProgress(self.rw_length)
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
if self.o.timeout: alarm_set(self.o.timeout)
# write_chunk_end
def write_chunk_end(self):
alarm_cancel()
self.chunk_iow = None
return self.rw_length
# write_chunk_init
def write_chunk_init(self, proto):
self.chunk_iow = proto
self.tbytes = 0.0
self.tbegin = nowflt()
self.lastLog = self.tbegin
self.rw_length = 0
if self.o.timeout: alarm_set(self.o.timeout)
def gethttpsUrl(self, path):
return None
# batteries included.
import sarracenia.transfer.file
import sarracenia.transfer.ftp
import sarracenia.transfer.https
if features['sftp']['present']:
import sarracenia.transfer.sftp
if features['s3']['present']:
import sarracenia.transfer.s3