# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2020
#
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
import copy
from base64 import b64decode, b64encode
from collections import *
from hashlib import sha512
import json
import logging
from mimetypes import guess_type
import os
import os.path
import random
from random import choice
import sarracenia
from sarracenia import *
from sarracenia.featuredetection import features
if features['reassembly']['present']:
import sarracenia.blockmanifest
from sarracenia.flowcb import FlowCB
import sarracenia.identity
import stat
from sys import platform as _platform
import sys
import time
if features['watch']['present']:
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import PatternMatchingEventHandler
logger = logging.getLogger(__name__)
if features['watch']['present']:
[docs]
class SimpleEventHandler(PatternMatchingEventHandler):
[docs]
def __init__(self, parent):
self.on_created = parent.on_created
self.on_deleted = parent.on_deleted
self.on_modified = parent.on_modified
self.on_moved = parent.on_moved
super().__init__()
[docs]
class File(FlowCB):
"""
read the file system, create messages for the files you find.
this is taken from v2's sr_post.py
FIXME FIXME FIXME
FIXME: the sr_post version would post files on the fly as it was traversing trees.
so it was incremental and had little overhead. This one is does the whole recursion
in one gather.
It will fail horribly for large trees. Need to re-formulate to replace recursion with interation.
perhaps a good time to use python iterators.
also should likely switch from listdir to scandir
"""
def on_add(self, event, src, dst):
logger.debug("on_add %s %s %s" % ( event, src, dst ) )
self.new_events['%s %s' % (src, dst)] = (event, src, dst)
def on_created(self, event):
# on_created (for SimpleEventHandler)
if event.is_directory:
self.on_add('mkdir', event.src_path, None)
else:
self.on_add('create', event.src_path, None)
def on_deleted(self, event):
# on_deleted (for SimpleEventHandler)
if event.is_directory:
self.on_add('rmdir', event.src_path, None)
else:
self.on_add('delete', event.src_path, None)
def on_modified(self, event):
# on_modified (for SimpleEventHandler)
if not event.is_directory:
self.on_add('modify', event.src_path, None)
def on_moved(self, event):
# on_moved (for SimpleEventHandler)
self.on_add('move', event.src_path, event.dest_path)
[docs]
def __init__(self, options):
"""
"""
super().__init__(options,logger)
if not features['watch']['present']:
logger.critical("watchdog module must be installed to watch directories")
logger.debug("%s used to be overwrite_defaults" % self.o.component)
self.obs_watched = []
self.watch_handler = None
self.post_topicPrefix = ["v03"]
self.inl = OrderedDict()
self.new_events = OrderedDict()
self.left_events = OrderedDict()
#self.o.blocksize = 200 * 1024 * 1024
self.o.create_modify = ('create' in self.o.fileEvents) or (
'modify' in self.o.fileEvents)
def post_delete(self, path, key=None, value=None,is_directory=False):
#logger.debug("post_delete %s (%s,%s)" % (path, key, value))
msg = sarracenia.Message.fromFileInfo(path, self.o, None)
msg['fileOp'] = { 'remove':'' }
if is_directory:
msg['fileOp']['directory'] = ''
# partstr
partstr = None
# used when moving a file
if key != None:
msg[key] = value
if key == 'newname' and self.o.post_baseDir:
msg['new_dir'] = os.path.dirname(value)
msg['new_file'] = os.path.basename(value)
msg[key] = value.replace(self.o.post_baseDir, '')
return [msg]
def post_file(self, path, lstat, key=None, value=None):
#logger.debug("start %s" % path)
# check the value of blocksize
fsiz = lstat.st_size
blksz = self.set_blocksize(self.o.blocksize, fsiz)
# if we should send the file in parts
if (blksz > 0 and blksz < fsiz) and os.path.isfile(path):
return self.post_file_in_parts(path, lstat)
msg = sarracenia.Message.fromFileData(path, self.o, lstat)
# used when moving a file
if key != None:
if not 'fileOp' in msg:
msg['fileOp'] = { key : value }
else:
msg['fileOp'][key] = value
if os_stat.S_ISDIR(lstat.st_mode):
return [msg]
# complete message
if (self.o.post_topicPrefix[0] == 'v03') and self.o.inline:
if fsiz < self.o.inlineByteMax:
if self.o.inlineEncoding == 'guess':
e = guess_type(path)[0]
binary = not e or not ('text' in e)
else:
binary = (self.o.inlineEncoding == 'text')
f = open(path, 'rb')
d = f.read()
f.close()
if binary:
msg["content"] = {
"encoding": "base64",
"value": b64encode(d).decode('utf-8')
}
else:
try:
msg["content"] = {
"encoding": "utf-8",
"value": d.decode('utf-8')
}
except:
msg["content"] = {
"encoding": "base64",
"value": b64encode(d).decode('utf-8')
}
else:
if self.o.inlineOnly:
logger.error('skipping file %s too large (%d bytes > %d bytes max)) for inlining' % \
( path, fsiz, self.o.inlineByteMax ) )
return []
return [msg]
def post_file_in_parts(self, path, lstat):
#logger.info("start %s" % path )
msg = sarracenia.Message.fromFileInfo(path, self.o, lstat)
logger.debug( f"initial msg:{msg}" )
# check the value of blocksize
fsiz = lstat.st_size
chunksize = self.set_blocksize(self.o.blocksize, fsiz)
# count blocks and remainder
block_count = int(fsiz / chunksize)
remainder = fsiz % chunksize
if remainder > 0: block_count = block_count + 1
#logger.debug( f" fiz:{fsiz}, chunksize:{chunksize}, block_count:{block_count}, remainder:{remainder}" )
# loop on blocks
blocks = list(range(0, block_count))
if self.o.randomize:
random.shuffle(blocks)
#blocks = [8, 3, 1, 2, 9, 6, 0, 7, 4, 5] # Testing
logger.info('Sending partitions in the following order: ' +
str(blocks))
msg['blocks'] = {
'method': 'inplace',
'size': chunksize,
'number': -1,
'manifest': {}
}
logger.debug( f" blocks:{blocks} " )
for current_block in blocks:
# compute block stuff
offset = current_block * chunksize
length = chunksize
last = current_block == block_count - 1
if last and remainder > 0:
length = remainder
msg['size']=length
# set partstr
msg.computeIdentity(path, self.o, offset=offset )
msg['blocks']['manifest'][current_block] = { 'size':length, 'identity': msg['identity']['value'] }
if features['reassembly']['present'] and \
(not hasattr(self.o, 'block_manifest_delete') or not self.o.block_manifest_delete):
with sarracenia.blockmanifest.BlockManifest( path ) as x:
x.set(msg['blocks'])
messages = []
for current_block in blocks:
msg['blocks']['number'] = current_block
msg['size'] = msg['blocks']['manifest'][current_block]['size']
msg['identity']['value'] = msg['blocks']['manifest'][current_block]['identity']
#logger.info( f" size: {msg['size']} blocks: {msg['blocks']}, offset: {offset} identity: {msg['identity']} " )
messages.append(copy.deepcopy(msg))
return messages
def post_link(self, path, key='link', value=None):
#logger.debug("post_link %s" % path )
msg = sarracenia.Message.fromFileInfo(path, self.o, None)
# resolve link
if key == 'link':
value = os.readlink(path)
# used when moving a file
if not 'fileOp' in msg:
msg['fileOp'] = { key: value }
else:
msg['fileOp'][key] = value
return [msg]
def post_move(self, src, dst):
#logger.debug("post_move %s %s" % (src,dst) )
# watchdog funny ./ added at end of directory path ... removed
messages = []
src = src.replace('/./', '/')
dst = dst.replace('/./', '/')
if os.path.islink(dst) and self.o.realpathPost:
dst = os.path.realpath(dst)
if sys.platform == 'win32':
dst = dst.replace('\\', '/')
# file
if os.path.isfile(dst):
if hasattr(self.o,'v2compatRenameDoublePost') and self.o.v2compatRenameDoublePost:
messages.extend(self.post_delete(src, 'newname', dst))
messages.extend(self.post_file(dst, sarracenia.stat(dst), 'rename', src))
return messages
# link
if os.path.islink(dst):
if hasattr(self.o,'v2compatRenameDoublePost') and self.o.v2compatRenameDoublePost:
messages.extend(self.post_delete(src, 'newname', dst))
messages.extend(self.post_link(dst, 'rename', src))
return messages
# directory
if os.path.isdir(dst):
for x in os.listdir(dst):
dst_x = dst + '/' + x
src_x = src + '/' + x
messages = self.post_move(src_x, dst_x)
# directory list to delete at end
self.move_dir_lst.append((src, dst))
return messages
[docs]
def post1file(self, path, lstat, is_directory=False) -> list:
"""
create the notification message for a single file, based on the lstat metadata.
when lstat is present it is used to decide whether the file is an ordinary file, a link
or a directory, and the appropriate message is built and returned.
if the lstat metadata is None, then that signifies a "remove" message to be created.
In the remove case, without the lstat, one needs the is_directory flag to decide whether
it is an ordinary file remove, or a directory remove. is_directory is not used other
than for the remove case.
The return value is a list that usually contains a single message. It is a list to allow
for if options are combined such that a symbolic link and the realpath it posts to may
involve multiple messages for a single file. Similarly in the multi-block transfer case.
"""
messages = []
# watchdog funny ./ added at end of directory path ... removed
path = path.replace('/./', '/')
# always use / as separator for paths being posted.
if os.sep != '/': # windows
path = path.replace(os.sep, '/')
# path is a link
if os.path.islink(path):
messages.extend(self.post_link(path))
if self.o.follow_symlinks:
link = os.readlink(path)
try:
rpath = os.path.realpath(link)
if sys.platform == 'win32':
rpath = rpath.replace('\\', '/')
except:
return messages
lstat = None
if os.path.exists(rpath):
lstat = sarracenia.stat(rpath)
messages.extend(self.post1file(rpath, lstat))
# path deleted
elif lstat == None:
messages.extend(self.post_delete(path,key=None,value=None,is_directory=is_directory))
# path is a file
elif os.path.isfile(path) or os.path.isdir(path):
messages.extend(self.post_file(path, lstat))
return messages
def post1move(self, src, dst):
#logger.debug("post1move %s %s" % (src,dst) )
self.move_dir_lst = []
messages = self.post_move(src, dst)
for tup in self.move_dir_lst:
src, dst = tup
#logger.debug("deleting moved directory %s" % src )
messages.extend(self.post_delete(src, 'newname', dst))
return messages
[docs]
def process_event(self, event, src, dst):
"""
return a tuple: pop? + list of messages.
"""
#logger.debug("process_event %s %s %s " % (event,src,dst) )
# delete
if event == 'delete' :
if event in self.o.fileEvents:
return (True, self.post1file(src, None))
return (True, [])
if event == 'rmdir' :
if event in self.o.fileEvents:
return (True, self.post1file(src, None, is_directory=True))
return (True, [])
# move
if event == 'move':
if self.o.create_modify:
return (True, self.post1move(src, dst))
# create or modify
# directory : skipped, its content is watched
#if self.o.recursive and os.path.isdir(src):
# dirs = list(map(lambda x: x[1][1], self.inl.items()))
# #logger.debug("skipping directory %s list: %s" % (src, dirs))
# link ( os.path.exists = false, lstat = None )
if os.path.islink(src):
if 'link' in self.o.fileEvents:
return (True, self.post1file(src, None))
return (True, [])
# file : must exists
# (may have been deleted since event caught)
if not os.path.exists(src): return (True, [])
# file : must be old enough
lstat = sarracenia.stat(src)
if lstat and hasattr(lstat,'st_mtime'):
age = time.time() - lstat.st_mtime
if age < self.o.fileAgeMin:
logger.debug( "%d vs (inflight setting) %d seconds. Too New!" % (age,self.o.fileAgeMin) )
return (False, [])
if self.o.fileAgeMax > 0 and age > self.o.fileAgeMax:
logger.debug("%d vs (fileAgeMax setting) %d seconds. Too Old!" % (age,self.o.fileAgeMax) )
return (True, [])
# post it
if event == 'mkdir' and 'mkdir' in self.o.fileEvents:
return (True, self.post1file(src, lstat, is_directory=True))
elif self.o.create_modify:
return (True, self.post1file(src, lstat))
return (True, [])
def set_blocksize(self, bssetting, fsiz):
tfactor = 50 * 1024 * 1024
if bssetting == 0: ## default blocksize
return tfactor
elif bssetting == 1: ## send file as one piece.
return fsiz
else: ## partstr=i
return bssetting
def wakeup(self):
#logger.debug("wakeup")
# FIXME: Tiny potential for events to be dropped during copy.
# these lists might need to be replaced with watchdog event queues.
# left for later work. PS-20170105
# more details: https://github.com/gorakhargosh/watchdog/issues/392
# pile up left events to process
self.left_events.update(self.new_events)
self.new_events = OrderedDict()
# work with a copy events and keep done events (to delete them)
self.cur_events = OrderedDict()
self.cur_events.update(self.left_events)
# loop on all events
messages = []
for key in self.cur_events:
event_done=False
event, src, dst = self.cur_events[key]
try:
(event_done, new_messages) = self.process_event(event, src, dst)
messages.extend(new_messages)
except OSError as err:
"""
This message is reduced to debug priority because it often happens when files
are too transitory (they disappear before we have a chance to post them)
not sure if it should be an error message or not.
"""
logger.debug("skipping event that could not be processed: ({}): {}".format(
event, err))
logger.debug("Exception details:", exc_info=True)
event_done=True
if event_done:
self.left_events.pop(key)
return messages
[docs]
def walk(self, src):
"""
walk directory tree returning 1 message for each file in it.
"""
logger.debug("walk %s" % src)
# how to proceed with symlink
if os.path.islink(src) and self.o.realpathPost:
src = os.path.realpath(src)
if sys.platform == 'win32':
src = src.replace('\\', '/')
messages = []
# need to post root of tree first, so mode bits get propagated on creation.
if src == self.o.post_baseDir :
logger.debug("skip posting of post_baseDir {src}")
else:
messages.extend(self.post1file(src, sarracenia.stat(src), is_directory=True))
# walk src directory, this walk is depth first... there could be a lot of time
# between *listdir* run, and when a file is visited, if there are subdirectories before you get there.
# hence the existence check after listdir (crashed in flow_tests of > 20,000)
if self.o.recursive:
for x in os.listdir(src):
path = src + '/' + x
# add path created
if os.path.isdir(path):
messages.extend(self.walk(path))
continue
if os.path.exists(path):
messages.extend(self.post1file(path, sarracenia.stat(path)))
return messages
[docs]
def walk_priming(self, p):
"""
Find all the subdirectories of the given path, start watches on them.
deal with symbolically linked directories correctly
"""
if os.path.islink(p):
realp = os.path.realpath(p)
if sys.platform == 'win32':
realp = realp.replace('\\', '/')
logger.info("sr_watch %s is a link to directory %s" % (p, realp))
if self.o.realpathPost:
d = realp
else:
d = p + '/' + '.'
else:
d = p
try:
fs = sarracenia.stat(d)
dir_dev_id = '%s,%s' % (fs.st_dev, fs.st_ino)
if dir_dev_id in self.inl:
return True
except OSError as err:
logger.warning("could not stat file ({}): {}".format(d, err))
logger.debug("Exception details:", exc_info=True)
if os.access(d, os.R_OK | os.X_OK):
try:
ow = self.observer.schedule(self.watch_handler,
d,
recursive=True)
self.obs_watched.append(ow)
self.inl[dir_dev_id] = (ow, d)
logger.info(
"sr_watch priming watch (instance=%d) scheduled for: %s " %
(len(self.obs_watched), d))
except:
logger.warning("sr_watch priming watch: %s failed, deferred." %
d)
logger.debug('Exception details:', exc_info=True)
# add path created
self.on_add('create', p, None)
return True
else:
logger.warning(
"sr_watch could not schedule priming watch of: %s (EPERM) deferred."
% d)
logger.debug('Exception details:', exc_info=True)
# add path created
self.on_add('create', p, None)
return True
return True
def watch_dir(self, sld):
logger.debug("watch_dir %s" % sld)
if not features['watch']['present']:
logger.critical("sr_watch needs the python watchdog library to be installed.")
return []
if self.o.force_polling:
logger.info(
"sr_watch polling observer overriding default (slower but more reliable.)"
)
self.observer = PollingObserver()
else:
logger.info(
"sr_watch optimal observer for platform selected (best when it works)."
)
self.observer = Observer()
self.obs_watched = []
self.watch_handler = SimpleEventHandler(self)
self.walk_priming(sld)
logger.info(
"sr_watch priming walk done, but not yet active. Starting...")
self.observer.start()
logger.info("sr_watch now active on %s posting to exchange: %s" %
(sld, self.o.post_exchange))
if self.o.post_on_start:
return self.walk(sld)
else:
return []
def on_start(self):
self.queued_messages = []
self.primed = False
[docs]
def gather(self, messageCountMax):
"""
from sr_post.py/run
FIXME: really bad performance with large trees: It scans an entire tree
before emitting any messages. Need to re-factor with iterator style so produce
result in batch sized chunks incrementally.
"""
#logger.debug("%s run partflg=%s, sum=%s, nodupe_ttl=%s basis=%s pbd=%s" % \
# ( self.o.component, self.o.partflg, self.o.sumflg, self.o.nodupe_ttl,
# self.o.nodupe_basis, self.o.post_baseDir ))
#logger.debug("%s realpathPost=%s follow_links=%s force_polling=%s batch=%s" % \
# ( self.o.component, self.o.realpathPost, self.o.follow_symlinks, \
# self.o.force_polling, self.o.batch ) )
#logger.info("%s len(self.queued_messages)=%d" % \
# ( self.o.component, len(self.queued_messages) ) )
pbd = self.o.post_baseDir
if len(self.queued_messages) > self.o.batch:
messages = self.queued_messages[0:self.o.batch]
self.queued_messages = self.queued_messages[self.o.batch:]
return (True, messages)
elif len(self.queued_messages) > 0:
messages = self.queued_messages
self.queued_messages = []
if self.o.sleep < 0:
return (True, messages)
else:
messages = []
if self.primed:
return (True, self.wakeup())
cwd = os.getcwd()
for d in self.o.postpath:
# convert relative path to absolute.
if d[0] != os.sep: d = cwd + os.sep + d
d=self.o.variableExpansion(d)
logger.debug("postpath = %s" % d)
if self.o.sleep > 0:
if features['watch']['present']:
messages.extend(self.watch_dir(d))
else:
logger.critical("python watchdog package missing! Cannot watch directory")
continue
if os.path.isdir(d):
logger.debug("postpath = %s" % d)
messages.extend(self.walk(d))
elif os.path.islink(d):
messages.extend(self.post1file(d, None))
elif os.path.isfile(d):
messages.extend(self.post1file(d, sarracenia.stat(d)))
else:
logger.error("could not post %s (exists %s)" %
(d, os.path.exists(d)))
if len(messages) > self.o.batch:
self.queued_messages = messages[self.o.batch:]
logger.info("len(queued_messages)=%d" % len(self.queued_messages))
messages = messages[0:self.o.batch]
self.primed = True
return (True, messages)