#
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#
# Questions or bugs report: dps-client@ec.gc.ca
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
# __init__.py : contains version number of sarracenia
#
########################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
from ._version import __version__
from base64 import b64decode, b64encode
import calendar
import datetime
import humanize
import importlib.util
import logging
import os
import os.path
import random
import re
from sarracenia.featuredetection import features
import stat as os_stat
import sys
import time
import types
import urllib
import urllib.parse
import urllib.request
logger = logging.getLogger(__name__)
def baseUrlParse( url ):
upr = urllib.parse.urlparse(url)
u = types.SimpleNamespace()
u.scheme = upr.scheme
u.netlog = upr.netloc
u.params = upr.params
u.query = upr.query
u.fragment = upr.fragment
u.path = upr.path
if u.scheme in [ 'sftp', 'file' ]:
while u.path.startswith('//'):
u.path = u.path[1:]
return u
if features['filetypes']['present']:
import magic
if features['mqtt']['present']:
import paho.mqtt.client
if not hasattr( paho.mqtt.client, 'MQTTv5' ):
# without v5 support, mqtt is not useful.
features['mqtt']['present'] = False
# if humanize is not present, compensate...
if features['humanize']['present']:
import humanize
def naturalSize( num ):
return humanize.naturalsize(num,binary=True)
def naturalTime( dur ):
return humanize.naturaltime(dur)
else:
def naturalSize( num ):
return "%g" % num
def naturalTime( dur ):
return "%g" % dur
if features['appdirs']['present']:
import appdirs
def site_config_dir( app, author ):
return appdirs.site_config_dir( app, author )
def user_config_dir( app, author ):
return appdirs.user_config_dir( app, author )
def user_cache_dir( app, author ):
return appdirs.user_cache_dir( app, author )
else:
# if appdirs is missing, pretend we're on Linux.
import pathlib
def site_config_dir( app, author ):
return '/etc/xdg/xdg-ubuntu-xorg/%s' % app
def user_config_dir( app, author ):
return str(pathlib.Path.home()) + '/.config/%s' % app
def user_cache_dir( app, author ):
return str(pathlib.Path.home()) + '/.cache/%s' % app
"""
end of extra feature scan.
"""
import sarracenia.filemetadata
[docs]
class Sarracenia:
"""
Core utilities of Sarracenia. The main class here is sarracenia.Message.
a Sarracenia.Message is subclassed from a dict, so for most uses, it works like the
python built-in, but also we have a few major entry points some factoryies:
**Building a message from a file**
m = sarracenia.Message.fromFileData( path, options, lstat )
builds a notification message from a given existing file, consulting *options*, a parsed
in memory version of the configuration settings that are applicable
**Options**
see the sarracenia.config.Config class for functions to parse configuration files
and create corresponding python option dictionaries. One can supply small
dictionaries for example::
options['topicPrefix'] = [ 'v02', 'post' ]
options['bindings'] = [ ('xpublic', [ 'v02', 'post'] , [ '#' ] )]
options['queueName'] = 'q_anonymous_' + socket.getfqdn() + '_SomethingHelpfulToYou'
Above is an example of a minimal options dictionary taken from the tutorial
example called moth_api_consumer.py. often
**If you don't have a file**
If you don't have a local file, then build your notification message with:
m = sarracenia.Message.fromFileInfo( path, options, lstat )
where you can make up the lstat values to fill in some fields in the message.
You can make a fake lstat structure to provide these values using sarracenia.filemetadata
class which is either an alias for paramiko.SFTPAttributes
( https://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_attr.SFTPAttributes )
if paramiko is installed, or a simple emulation if not.
from sarracenia.filemetadata import FmdStat
lstat = FmdStat()
lstat.st_mtime= utcinteger second count in UTC (numeric version of a Sarracenia timestamp.)
lstat.st_atime=
lstat.st_mode=0o644
lstat.st_size= size_in_bytes
optional fields that may be of interest:
lstat.filename= "nameOfTheFile"
lstat.longname= 'lrwxrwxrwx 1 peter peter 20 Oct 11 20:28 nameOfTheFile'
that you can then provide as an *lstat* argument to the above *fromFileInfo()*
call. However the notification message returned will lack an identity checksum field.
once you get the file, you can add the Identity field with:
m.computeIdentity(path, o):
In terms of consuming notification messages, the fields in the dictionary provide metadata
for the announced resource. The anounced data could be embedded in the notification message itself,
or available by a URL.
Messages are generally gathered from a source such as the Message Queueing Protocol wrapper
class: moth... sarracenia.moth.
data = m.getContent()
will return the content of the announced resource as raw data.
"""
pass
[docs]
class TimeConversions:
"""
Time conversion routines.
* os.stat, and time.now() return floating point
* The floating point representation is a count of seconds since the beginning of the epoch.
* beginning of epoch is platform dependent, and conversion to actual date is fraught (leap seconds, etc...)
* Entire SR_* formats are text, no floats are sent over the protocol
(avoids byte order issues, null byte / encoding issues, and enhances readability.)
* str format: YYYYMMDDHHMMSS.msec goal of this representation is that a naive
conversion to floats yields comparable numbers.
* but the number that results is not useful for anything else, so need these
special routines to get a proper epochal time.
* also OK for year 2032 or whatever (rollover of time_t on 32 bits.)
* string representation is forced to UTC timezone to avoid having to communicate timezone.
timestr2flt() - accepts a string and returns a float.
caveat
FIXME: this encoding will break in the year 10000 (assumes four digit year)
and requires leading zeroes prior to 1000. One will have to add detection of
the decimal point, and change the offsets at that point.
"""
pass
[docs]
def stat( path ) -> sarracenia.filemetadata.FmdStat:
"""
os.stat call replacement which improves on it by returning
and SFTPAttributes structure, in place of the OS stat one,
featuring:
* mtime and ctime with subsecond accuracy
* fields that can be overridden (not immutable.)
"""
native_stat = os.stat( path )
sa = sarracenia.filemetadata.FmdStat()
sa.st_mode = native_stat.st_mode
sa.st_ino = native_stat.st_ino
sa.st_dev = native_stat.st_dev
# st_nlink does not exist in paramiko.SFTPAttributes()
# FmdStat comes from that type.
#sa.st_nlink = native_stat.st_nlink
sa.st_uid = native_stat.st_uid
sa.st_gid = native_stat.st_gid
sa.st_size = native_stat.st_size
sa.st_mtime = os.path.getmtime(path)
sa.st_atime = os.path.getctime(path)
sa.st_ctime = native_stat.st_atime
return sa
def nowflt():
return timestr2flt(nowstr())
def nowstr():
return timeflt2str(time.time())
[docs]
def timeflt2str(f=None):
"""
timeflt2str - accepts a float and returns a string.
flow is a floating point number such as returned by time.now()
(number of seconds since beginning of epoch.)
the str is YYYYMMDDTHHMMSS.sssss
20210921T011331.0123
translates to: Sept. 21st, 2021 at 01:13 and 31.0123 seconds.
always UTC timezone.
"""
nsec = "{:.9g}".format(f % 1)[1:]
return "{}{}".format(time.strftime("%Y%m%dT%H%M%S", time.gmtime(f)), nsec)
def timeValidate(s) -> bool:
if len(s) < 14: return False
if (len(s) > 14) and (s[8] != 'T') and (s[14] != '.'): return False
if (len(s) > 15) and (s[8] == 'T') and (s[15] != '.'): return False
if not s[0:8].isalnum(): return False
return True
def timestr2flt(s):
if s[8] == "T":
s = s.replace('T', '')
dt_tuple = int(s[0:4]), int(s[4:6]), int(s[6:8]), int(s[8:10]), int(
s[10:12]), int(s[12:14])
t = datetime.datetime(*dt_tuple, tzinfo=datetime.timezone.utc)
return calendar.timegm(t.timetuple()) + float('0' + s[14:])
def timev2tov3str(s):
if s[8] == 'T':
return s
else:
return s[0:8] + 'T' + s[8:]
[docs]
def durationToString(d) -> str:
"""
given a numbner of seconds, return a short, human readable string.
"""
return humanize.naturaldelta(d).replace("minutes","m").replace("seconds","s")
[docs]
def durationToSeconds(str_value, default=None) -> float:
"""
this function converts duration to seconds.
str_value should be a number followed by a unit [s,m,h,d,w] ex. 1w, 4d, 12h
return 0.0 for invalid string.
"""
factor = 1
if type(str_value) in [list]:
str_value = str_value[0]
if type(str_value) in [int, float]:
return str_value
if type(str_value) is not str:
return 0
if str_value.lower() in [ 'none', 'off', 'false' ]:
return 0
if default and str_value.lower() in [ 'on', 'true' ]:
return float(default)
if str_value[-1] in 'sS': factor *= 1
elif str_value[-1] in 'mM': factor *= 60
elif str_value[-1] in 'hH': factor *= 60 * 60
elif str_value[-1] in 'dD': factor *= 60 * 60 * 24
elif str_value[-1] in 'wW': factor *= 60 * 60 * 24 * 7
if str_value[-1].isalpha(): str_value = str_value[:-1]
try:
duration = float(str_value) * factor
except:
logger.error("conversion failed for: %s" % str_value)
duration = 0.0
return duration
known_report_codes = {
201:
"Download successful. (variations: Downloaded, Inserted, Published, Copied, or Linked)",
203: "Non-Authoritative Information: transformed during download.",
205:
"Reset Content: truncated. File is shorter than originally expected (changed length during transfer) This only arises during multi-part transfers.",
205: "Reset Content: checksum recalculated on receipt.",
304:
"Not modified (Checksum validated, unchanged, so no download resulted.)",
307: "Insertion deferred (writing to temporary part file for the moment.)",
417: "Expectation Failed: invalid notification message (corrupt headers)",
499: "Failure: Not Copied. SFTP/FTP/HTTP download problem",
#FIXME : should not have 503 error code 3 times in a row
# 503: "Service unavailable. delete (File removal not currently supported.)",
503: "Unable to process: Service unavailable",
# 503: "Unsupported transport protocol specified in posting."
}
[docs]
class Message(dict):
"""
A notification message in Sarracenia is stored as a python dictionary, with a few extra management functions.
The internal representation is very close to the v03 format defined here: https://metpx.github.io/sarracenia/Reference/sr_post.7.html
Unfortunately, sub-classing of dict means that to copy it from a dict will mean losing the type,
and hence the need for the copyDict member.
"""
[docs]
def __init__(self):
self['_format'] = 'v03'
self['_deleteOnPost'] = set(['_format'])
[docs]
def computeIdentity(msg, path, o, offset=0):
"""
check extended attributes for a cached identity sum calculation.
if extended attributes are present, and
* the file mtime is not too new, and
* the cached sum us using the same method
then use the cached value.
otherwise, calculate a checksum.
set the file's extended attributes for the new value.
the method of checksum calculation is from options.identity.
sets the message 'identity' field if appropriate.
"""
xattr = sarracenia.filemetadata.FileMetadata(path)
if not 'blocks' in msg:
if o.randomize:
methods = [
'random', 'md5', 'md5name', 'sha512', 'cod,md5', 'cod,sha512'
]
calc_method = random.choice(methods)
elif 'identity' in xattr.x and 'mtime' in xattr.x:
if xattr.get('mtime') >= msg['mtime']:
logger.debug("mtime remembered by xattr")
fxainteg = xattr.get('identity')
if fxainteg['method'] == o.identity_method:
msg['identity'] = fxainteg
return
logger.debug("xattr different method than on disk")
calc_method = o.identity_method
else:
logger.debug("xattr sum too old")
calc_method = o.identity_method
else:
calc_method = o.identity_method
else:
calc_method = o.identity_method
if calc_method == None:
return
if 'mtime' in msg:
xattr.set('mtime', msg['mtime'])
logger.debug("mtime persisted, calc_method: {calc_method}")
if calc_method[:4] == 'cod,' and len(calc_method) > 2:
sumstr = calc_method
elif calc_method in [ 'md5name', 'invalid' ]:
xattr.persist() # persist the mtime, at least...
return # no checksum needed for md5name.
elif calc_method == 'arbitrary':
sumstr = {
'method': 'arbitrary',
'value': o.identity_arbitrary_value
}
else: # a "normal" calculation method, liks sha512, or md5
sumalgo = sarracenia.identity.Identity.factory(calc_method)
sumalgo.set_path(path)
# compute checksum
if calc_method in ['md5', 'sha512']:
fp = open(path, 'rb')
i = 0
#logger.info( f"offset: {offset} size: {msg['size']} max: {offset+msg['size']} " )
if offset:
fp.seek( offset )
while i < offset+msg['size']:
buf = fp.read(o.bufsize)
if not buf: break
sumalgo.update(buf)
i += len(buf)
fp.close()
# setting sumstr
checksum = sumalgo.value
sumstr = {'method': calc_method, 'value': checksum}
msg['identity'] = sumstr
xattr.set('identity', sumstr)
xattr.persist()
[docs]
def copyDict(msg, d):
"""
copy dictionary into message.
"""
if d is None: return
for h in d:
msg[h] = d[h]
[docs]
def dumps(msg) -> str:
"""
FIXME: used to be msg_dumps.
print a message in a compact but relatively compact way.
msg is a python dictionary. if there is a field longer than maximum_field_length,
truncate.
"""
maximum_field_length = 255
if msg is None: return ""
if msg['_format'] == 'Wis':
s = '{ '
if 'id' in msg:
s += f"{{ 'id': '{msg['id']}', 'type':'Feature', "
if 'geometry' in msg:
s += f"'geometry':{msg['geometry']} 'properties':{{ "
else:
s += "'geometry': None, 'properties':{ "
else:
s = "{ "
for k in sorted(msg.keys()):
if msg['_format'] == 'v04' and k in [ 'id', 'type', 'geometry' ]:
continue
if type(msg[k]) is dict:
if k != 'properties':
v = "{ "
for kk in sorted(msg[k].keys()):
v += " '%s':'%s'," % (kk, msg[k][kk])
v = v[:-1]
if k != 'properties':
v += " }"
else:
try:
v = "%s" % msg[k]
except:
v = "unprintable"
if len(v) > maximum_field_length:
v = v[0:maximum_field_length - 4] + '...'
if v[0] == '{':
v += '}'
s += f" '{k}':'{v}',"
if msg['_format'] == 'Wis':
s += ' } '
s = s[:-1] + " }"
return s
[docs]
@staticmethod
def fromFileData(path, o, lstat=None):
"""
create a message based on a given file, calculating the checksum.
returns a well-formed message, or none.
"""
m = sarracenia.Message.fromFileInfo(path, o, lstat)
if lstat :
if os_stat.S_ISREG(lstat.st_mode):
m.computeIdentity(path, o)
if features['filetypes']['present']:
try:
t = magic.from_file(path,mime=True)
m['contentType'] = t
except Exception as ex:
logging.info("trying to determine mime-type. Exception details:", exc_info=True)
#else:
# m['contentType'] = 'application/octet-stream' # https://www.rfc-editor.org/rfc/rfc2046.txt (default when clueless)
# I think setting a bad value is worse than none, so just omitting.
elif os_stat.S_ISDIR(lstat.st_mode):
m['contentType'] = 'text/directory' # source: https://www.w3.org/2002/12/cal/rfc2425.html
elif os_stat.S_ISLNK(lstat.st_mode):
m['contentType'] = 'text/link' # I invented this one, could not find any reference
return m
[docs]
@staticmethod
def fromFileInfo(path, o, lstat=None):
"""
based on the fiven information about the file (it's name and a stat record if available)
and a configuration options object (sarracenia.config.Config)
return an sarracenia.Message suitable for placement on a worklist.
A message is a specialized python dictionary with a certain set of fields in it.
The message returned will have the necessary fields for processing and posting.
The message is built for a file is based on the given path, options (o), and lstat (output of os.stat)
The lstat record is used to build 'atime', 'mtime' and 'mode' fields if
timeCopy and permCopy options are set.
if no lstat record is supplied, then those fields will not be set.
"""
msg = Message()
#FIXME no variable substitution... o.variableExpansion ?
if hasattr(o,'post_format') :
msg['_format'] = o.post_format
elif hasattr(o,'post_topicPrefix') and o.post_topicPrefix[0] in [ 'v02', 'v03' ]:
msg['_format'] = o.post_topicPrefix[0]
else:
msg['_format'] = 'v03'
if hasattr(o, 'post_exchange'):
msg['exchange'] = o.post_exchange
elif hasattr(o, 'exchange'):
msg['exchange'] = o.exchange
if hasattr(o, 'blocksize') and (o.blocksize > 1) and lstat and \
(os_stat.S_IFMT(lstat.st_mode) == os_stat.S_IFREG) and \
(lstat.st_size > o.blocksize):
msg['blocks'] = { 'method': 'inplace', 'number':-1, 'size': o.blocksize, 'manifest': {} }
msg['local_offset'] = 0
msg['_deleteOnPost'] = set(['exchange', 'local_offset', 'subtopic', '_format'])
# notice
msg['pubTime'] = timeflt2str(time.time())
# set new_dir, new_file, new_subtopic, etc...
msg.updatePaths(o, os.path.dirname(path), os.path.basename(path))
# rename
if 'new_relPath' in msg:
post_relPath = msg['new_relPath']
elif 'relPath' in msg:
post_relPath = msg['relPath']
else:
post_relPath = None
newname = post_relPath
# rename path given with no filename
if o.rename:
msg['retrievePath'] = msg['new_retrievePath']
newname = o.variableExpansion(o.rename)
if o.rename[-1] == '/':
newname += os.path.basename(path)
# strip 'N' heading directories
if o.strip > 0:
strip = o.strip
if path[0] == '/': strip = strip + 1
# if we strip too much... keep the filename
token = path.split('/')
try:
token = token[strip:]
except:
token = [os.path.basename(path)]
newname = '/' + '/'.join(token)
if newname != post_relPath: msg['rename'] = newname
if hasattr(o, 'to_clusters') and (o.to_clusters is not None):
msg['to_clusters'] = o.to_clusters
if hasattr(o, 'cluster') and (o.cluster is not None):
msg['from_cluster'] = o.cluster
if hasattr(o, 'source') and (o.source is not None):
msg['source'] = o.source
if o.identity_method:
if o.identity_method.startswith('cod,'):
msg['identity'] = {
'method': 'cod',
'value': o.identity_method[4:]
}
elif o.identity_method in ['random']:
algo = sarracenia.identity.Identity.factory(o.identity_method)
algo.set_path(post_relPath)
msg['identity'] = {
'method': o.identity_method,
'value': algo.value
}
else:
if 'identity' in msg:
del msg['identity']
# for md5name/aka None aka omit identity... should just fall through.
if lstat is None: return msg
if (lstat.st_mode is not None) :
msg['mode'] = "%o" % (lstat.st_mode & 0o7777)
if not o.permCopy:
msg['_deleteOnPost'] |= set(['mode'])
if os_stat.S_ISDIR(lstat.st_mode):
msg['fileOp'] = { 'directory': '' }
return msg
if lstat.st_size is not None:
msg['size'] = lstat.st_size
if lstat.st_mtime is not None:
msg['mtime'] = timeflt2str(lstat.st_mtime)
if lstat.st_atime is not None:
msg['atime'] = timeflt2str(lstat.st_atime)
if not o.timeCopy:
msg['_deleteOnPost'] |= set([ 'atime', 'mtime' ])
return msg
[docs]
@staticmethod
def fromStream(path, o, data=None):
"""
Create a file and message for the given path.
The file will be created or overwritten with the provided data.
then invoke fromFileData() for the resulting file.
"""
with open(path, 'wb') as fh:
fh.write(data)
if hasattr(o, 'chmod') and o.chmod:
os.chmod(path, o.chmod)
return sarracenia.Message.fromFileData(path, o, stat(path))
[docs]
def setReport(msg, code, text=None):
"""
FIXME: used to be msg_set_report
set message fields to indicate result of action so reports can be generated.
set is supposed to indicate final message dispositions, so in the case
of putting a message on worklist.failed... no report is generated, since
it will be retried later. FIXME: should we publish an interim failure report?
"""
if code in known_report_codes:
if text is None:
text = known_report_codes[code]
else:
logger.warning('unknown report code supplied: %d:%s' %
(code, text))
if text is None:
text = 'unknown disposition'
if 'report' in msg:
logger.warning('overriding initial report: %d: %s' %
(msg['report']['code'], msg['report']['message']))
msg['report'] = {'code': code, 'timeCompleted': nowstr(), 'message': text}
msg['_deleteOnPost'] |= set(['report'])
[docs]
def updatePaths(msg, options, new_dir=None, new_file=None):
"""
set the new_* fields in the message based on changed file placement.
if new_* options are ommitted updaste the rest of the fields in
the message based on their current values.
If you change file placement in a flow callback, for example.
One would change new_dir and new_file in the message.
This routines updates other fields in the message (e.g. relPath,
baseUrl, topic ) to match new_dir/new_file.
msg['post_baseUrl'] defaults to msg['baseUrl']
"""
# the headers option is an override.
if hasattr(options, 'fixed_headers'):
for k in options.fixed_headers:
msg[k] = options.fixed_headers[k]
msg['_deleteOnPost'] |= set([
'new_dir', 'new_file', 'new_relPath', 'new_baseUrl', 'new_subtopic', 'subtopic', 'post_format'
])
if new_dir:
msg['new_dir'] = new_dir
elif 'new_dir' in msg:
new_dir = msg['new_dir']
else:
new_dir = ''
if new_file or new_file == '':
msg['new_file'] = new_file
elif 'new_file' in msg:
new_file = msg['new_file']
elif 'new_relPath' in msg:
new_file = os.path.basename(msg['rel_relPath'])
elif 'relPath' in msg:
new_file = os.path.basename(msg['relPath'])
else:
new_file = 'ErrorInSarraceniaMessageUpdatePaths.txt'
newFullPath = new_dir + '/' + new_file
# post_baseUrl option set in msg overrides other possible options
if 'post_baseUrl' in msg:
baseUrl_str = msg['post_baseUrl']
elif options.post_baseUrl:
baseUrl_str = options.variableExpansion(options.post_baseUrl, msg)
else:
if 'baseUrl' in msg:
baseUrl_str = msg['baseUrl']
else:
logger.error('missing post_baseUrl setting')
return
if options.post_format:
msg['post_format'] = options.post_format
elif options.post_topicPrefix:
msg['post_format'] = options.post_topicPrefix[0]
elif options.topicPrefix != msg['_format']:
logger.warning( f"received message in {msg['_format']} format, expected {options.post_topicPrefix} " )
msg['post_format'] = options.topicPrefix[0]
else:
msg['post_format'] = msg['_format']
if hasattr(options, 'post_baseDir') and ( type(options.post_baseDir) is str ) \
and ( len(options.post_baseDir) > 1):
pbd_str = options.variableExpansion(options.post_baseDir, msg)
parsed_baseUrl = sarracenia.baseUrlParse(baseUrl_str)
if newFullPath.startswith(pbd_str):
newFullPath = new_dir.replace(pbd_str, '', 1) + '/' + new_file
if (len(parsed_baseUrl.path) > 1) and newFullPath.startswith(
parsed_baseUrl.path):
newFullPath = newFullPath.replace(parsed_baseUrl.path, '', 1)
if ('new_dir' not in msg) and options.post_baseDir:
msg['new_dir'] = options.post_baseDir
msg['new_baseUrl'] = baseUrl_str
if len(newFullPath) > 0 and newFullPath[0] == '/':
newFullPath = newFullPath[1:]
msg['new_relPath'] = newFullPath
msg['new_subtopic'] = newFullPath.split('/')[0:-1]
for i in ['relPath', 'subtopic', 'baseUrl']:
if not i in msg:
msg[i] = msg['new_%s' % i]
if sys.platform == 'win32':
if 'new_dir' not in msg:
msg['new_dir'] = msg['new_dir'].replace('\\', '/')
msg['new_relPath'] = msg['new_relPath'].replace('\\', '/')
if re.match('[A-Z]:', str(options.currentDir),
flags=re.IGNORECASE):
msg['new_dir'] = msg['new_dir'].lstrip('/')
msg['new_relPath'] = msg['new_relPath'].lstrip('/')
[docs]
def validate(msg):
"""
FIXME: used to be msg_validate
return True if message format seems ok, return True, else return False, log some reasons.
"""
if not type(msg) is sarracenia.Message:
logger.error( f"not a message")
return False
res = True
for required_key in ['pubTime', 'baseUrl', 'relPath']:
if not required_key in msg:
logger.error( f'missing key: {required_key}' )
res = False
if not timeValidate(msg['pubTime']):
logger.error( f"malformed pubTime: {msg['pubTime']}")
res = False
return res
[docs]
def getContent(msg,options=None):
"""
Retrieve the data referred to by a message. The data may be embedded
in the messate, or this routine may resolve a link to an external server
and download the data.
does not handle authentication.
This routine is meant to be used with small files. using it to download
large files may be very inefficient. Untested in that use-case.
Return value is the data.
often on server where one is publishing data, the file is available as
a local file, and one can avoid the network usage by using a options.baseDir setting.
this behaviour can be disabled by not providing the options or not setting baseDir.
"""
# inlined/embedded case.
if 'content' in msg:
if msg['content']['encoding'] == 'base64':
return b64decode(msg['content']['value'])
else:
return msg['content']['value'].encode('utf-8')
path=''
if msg['baseUrl'].startswith('file:'):
pu = urllib.parse.urlparse(msg['baseUrl'])
path=pu.path + msg['relPath']
logger.info( f"path: {path}")
elif options and hasattr(options,'baseDir') and options.baseDir:
# local file shortcut
path=options.baseDir + os.sep + msg['relPath']
if os.path.exists(path):
logger.info( f"reading local file path: {path} exists?: {os.path.exists(path)}" )
with open(path,'rb') as f:
return f.read()
# case requiring resolution.
if 'retrievePath' in msg:
retUrl = msg['baseUrl'] + '/' + msg['retrievePath']
else:
retUrl = msg['baseUrl'] + '/' + msg['relPath']
logger.info( f"retrieving from: {retUrl}" )
with urllib.request.urlopen(retUrl) as response:
return response.read()