Source code for sarracenia.flowcb.poll

#
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2022
#

import sarracenia.moth
import copy


import datetime
import html.parser
import logging
import os
import paramiko

import sarracenia
from sarracenia.featuredetection import features

if features['ftppoll']['present']:
    import dateparser
    import pytz

import sarracenia.config
from sarracenia.flowcb import FlowCB
import sarracenia.transfer
import stat
import sys, time

logger = logging.getLogger(__name__)


def file_size_fix(str_value) -> int:
    try:

        factor = 1
        if str_value[-1] in 'bB': str_value = str_value[:-1]
        elif str_value[-1] in 'kK': factor = 1024
        elif str_value[-1] in 'mM': factor = 1024 * 1024
        elif str_value[-1] in 'gG': factor = 1024 * 1024 * 1024
        elif str_value[-1] in 'tT': factor = 1024 * 1024 * 1024 * 1024
        if str_value[-1].isalpha(): str_value = str_value[:-1]

        fsize = float(str_value) * factor
        isize = int(fsize)

    except:
        logger.debug("bad size %s" % str_value)
        return -1

    return isize


file_type_dict = {
    'l': 0o120000,  # symbolic link
    's': 0o140000,  # socket file
    '-': 0o100000,  # regular file
    'b': 0o060000,  # block device
    'd': 0o040000,  # directory
    'c': 0o020000,  # character device
    'p': 0o010000  # fifo (named pipe)
}


def modstr2num(m) -> int:
    mode = 0
    if (m[0] == 'r'): mode += 4
    if (m[1] == 'w'): mode += 2
    if (m[2] == 'x'): mode += 1
    return mode


def filemode(self, modstr) -> int:
    mode = 0
    mode += file_type_dict[modstr[0]]
    mode += modstr2num(modstr[1:4]) << 6
    mode += modstr2num(modstr[4:7]) << 3
    mode += modstr2num(modstr[7:10])
    return mode


def fileid(self, id) -> int:
    if id.isnumeric():
        return int(id)
    else:
        return None


[docs] class Poll(FlowCB): """ The Poll flow callback class implements the main logic for polling remote resources. the *poll* routine returns a list of messages for new files to be filtered. when instantiated with options, the options honoured include: * pollUrl - the URL of the server to be polled. * post_baseURL - parameter for messages to be returned. Also used to look up credentials to help subscribers with retrieval. * masks - These are the directories at the pollUrl to poll. derived from the accept/reject clauses, but filtering should happen later. entire directories are listed at this point. * timezone - interpret listings from an FTP server as being in the given timezone (as per `pytz <pypi.org/project/pytz>`_ * chmod - used to identify the minimum permissions to accept for a file to be included in a polling result. * identity_method - parameter for how to build identity checksum for messages. as these are usually remote files, the default is typically "cod" (calculate on download) * rename - parameter used to to put in messages built to specify the rename field contents. * options are passed to sarracenia.Transfer classes for their use as well. Poll uses sarracenia.transfer (ftp, sftp, https, etc... )classes to requests lists of files using those protocols using built-in logic. Internally, Poll normalizes the listings received by placing them into paramiko.SFTPAttributes metadata records (similar to stat records) and builds a Sarracenia.Message from them. The *poll* routine does one pass of this, returning a list of Sarracenia.Messages. To customize: * one can add new sarracenia.transfer protocols, each implementing the *ls* entry point to be compatible with this polling routine, ideally the entry point would return a list of paramiko.SFTPAttributes for each file in a directory listing. This can be used to implement polling of structured remote resources such as S3 or webdav. * one can deal with different formats of HTTP pages by overriding the handle_data entry point, as done in `nasa_mls_nrt.py <nasa_mls_nrt.py>`_ plugin * for traditional file servers, the listing format should be decypherable with the built-in processing. * sftp file servers provide paramiko.SFTPAttributes naturally which are timezone agnostic. * for some FTP servers, one may need to specify the *timezone* option to override the UTC default. * If there are problems with date or line formats, one can sub-class poll, and override only the on_line routine to deal with that. """ def handle_starttag(self, tag, attrs): if tag == "table": self.tabular_format=True elif tag == "tr": self.table_column=0 elif tag == "td": self.table_column +=1 else: for attr in attrs: c, n = attr if c == "href": self.myfname = n.strip().strip('\t')
[docs] def handle_data(self, data): """ routine called from html.parser to deal with a single line. if the line is about a file, then create a new entry for it with a metadata available from SFTPAttributes. example lines: from hpfx.collab.science.gc.ca: 20230113T00Z_MSC_REPS_HGT_ISBL-0850_RLatLon0.09x0.09_PT000H.grib2 2023-01-13 03:49 5.2M from https://data.cosmic.ucar.edu/suominet/nrt/ncConus/y2023/ CsuPWVh_2023.011.22.00.0060_nc 11-Jan-2023 23:58 47K this can be overridden by subclassing to deal with new web sites. Other web servers put their file indices in a tabular format, where there is a number of cells per row: <tr><td></td><td href=filename>filename</td><td>yyyy-mm-dd hh:mm</td><td>size</td> This handle_data supports both formats... the tabular format is provided by a vanilla apache2 on a debian derived system. """ logger.debug( f"handling_data {data} column={self.table_column}" ) if self.tabular_format: if self.table_column == 2: self.myfname=data return elif self.table_column != 3: return sdate=data.strip() else: if self.myfname == None: return if self.myfname == data: return words = data.split() if len(words) != 3: self.myfname = None return sdate = words[0] + ' ' + words[1] if len(sdate) < 10: return entry = paramiko.SFTPAttributes() t=None for f in [ '%d-%b-%Y %H:%M', '%Y-%m-%d %H:%M' ]: logger.debug( f" try parsing +{sdate}+ using {f}" ) try: t = time.strptime(sdate, f) break except Exception as Ex: pass if t: mydate = time.strftime('%b %d %H:%M', t) entry.st_mtime = time.mktime(t) # size is rounded, need a way to be more precise. #entry.st_size = file_size_fix(words[-1]) if self.myfname[-1] != '/': entry.st_mode = 0o755 else: entry.st_mode = stat.S_IFDIR | 0o755 self.entries[self.myfname] = entry self.myfname = None
[docs] def on_html_page(self, data) -> dict: """ called once per directory or page of HTML, invokes html.parser, returns a dictionary of file entries. """ self.entries = {} self.myfname = None self.tabular_format=False self.table_column=0 self.parser.feed(data) self.parser.close() return self.entries
def on_html_parser_init(self): # HTML Parsing stuff. self.parser = html.parser.HTMLParser() self.parser.handle_starttag = self.handle_starttag self.parser.handle_data = self.handle_data """ HTML Parsing begine """
[docs] def __init__(self, options,class_logger=None): super().__init__(options,class_logger) # check pollUrl self.details = None if self.o.pollUrl is not None: ok, self.details = sarracenia.config.Config.credentials.get( self.o.pollUrl) if self.o.pollUrl is None or self.details == None: logger.error("pollUrl option incorrect or missing\n") sys.exit(1) if self.o.post_baseUrl is None: self.o.post_baseUrl = self.details.url.geturl() if self.o.post_baseUrl[-1] != '/': self.o.post_baseUrl += '/' if self.o.post_baseUrl.startswith('file:'): self.o.post_baseUrl = 'file:' if self.details.url.password: self.o.post_baseUrl = self.o.post_baseUrl.replace( ':' + self.details.url.password, '') self.o.sendTo = self.o.pollUrl self.dest = sarracenia.transfer.Transfer.factory( self.details.url.scheme, self.o) if self.dest is None: logger.critical("unsupported polling protocol") # rebuild mask as pulls instructions # pulls[directory] = [mask1,mask2...] #self.pulls = {} #for mask in self.o.masks: # pattern, maskDir, maskFileOption, mask_regexp, accepting, mirror, strip, pstrip, flatten = mask # logger.debug(mask) # if not maskDir in self.pulls: # self.pulls[maskDir] = [] # self.pulls[maskDir].append(mask) self.metricsReset() self.on_html_parser_init()
def metricsReset(self) -> None: self.metrics = { 'transferRxBytes': 0 } def metricsReport(self) -> dict: return self.metrics def cd(self, path): try: self.dest.cd(path) return True except: logger.warning("sr_poll/cd: could not cd to directory %s" % path) return False def filedate(self, line): if not features['ftppoll']['present']: logger.error('need dateparser library to deal with polling of ftp servers, no date parsed') return 0 line_split = line.split() file_date = line_split[5] + " " + line_split[6] + " " + line_split[7] current_date = datetime.datetime.now(pytz.utc) # case 1: the date contains '-' implies the date is in 1 string not 3 seperate ones, and H:M is also provided if "-" in file_date: file_date = line_split[5] + " " + line_split[6] standard_date_format = dateparser.parse( file_date, settings={ 'RELATIVE_BASE': datetime.datetime(current_date.year, 1, 1), 'TIMEZONE': self.o.timezone, #turn this into an option - should be EST for mtl 'TO_TIMEZONE': 'UTC' }) if standard_date_format is not None: # case 2: the year was not given, it is defaulted to 1900. Must find which year (this one or last one). if standard_date_format.month - current_date.month >= 6: standard_date_format = standard_date_format.replace( year=(current_date.year - 1)) timestamp = datetime.datetime.timestamp(standard_date_format) return timestamp
[docs] def on_line(self, line) -> paramiko.SFTPAttributes: """ default line processing, converts a file listing into an SFTPAttributes. does nothing if input is already an SFTPAttributes item, returning it unchanged. verifies that file is accessible (based on self.o.permDefault pattern to establish minimum permissions.) """ if type(line) is paramiko.SFTPAttributes: sftp_obj = line elif type(line) is str and len(line.split()) < 7: # assume windows... parts = line.split() sftp_obj = paramiko.SFTPAttributes() ldate = dateparser.parse( ' '.join(parts[0:2]), settings={ 'TIMEZONE': self.o.timezone, 'TO_TIMEZONE':'UTC' } ) sftp_obj.st_mtime = ldate.timestamp() sftp_obj.st_size = file_size_fix(parts[2]) sftp_obj.longname = ' '.join(line[3:]) sftp_obj.st_mode = 0o644 # just make it work... no permission info provided. #logger.info( f"windows line parsing result: {sftp_obj}") elif type(line) is str and len(line.split()) > 7: parts = line.split() sftp_obj = paramiko.SFTPAttributes() sftp_obj.st_mode = filemode(self,parts[0]) sftp_obj.st_uid = fileid(self,parts[2]) sftp_obj.st_gid = fileid(self,parts[3]) if file_size_fix(parts[4]) >= 0: # normal linux/unix ftp server case. sftp_obj.st_size = file_size_fix(parts[4]) sftp_obj.filename = line[8:] sftp_obj.st_mtime = self.filedate(line) else: # university of wisconsin (some special file system? has third ownship field before size) sftp_obj.st_size = file_size_fix(parts[5]) sftp_obj.filename = line[9:] sftp_obj.st_mtime = self.filedate(line[1:]) sftp_obj.longname = sftp_obj.filename # assert at this point we have an sftp_obj... # filter out files we don't have the necessary permissions for. if 'sftp_obj' in locals() and ((sftp_obj.st_mode & self.o.permDefault) == self.o.permDefault): return sftp_obj else: return None
def lsdir(self): try: ls = self.dest.ls() if type(ls) is bytes: self.metrics["transferRxBytes"] += len(ls) ls = self.on_html_page(ls.decode('utf-8')) new_ls = {} new_dir = {} # del ls[''] # For some reason with FTP the first line of the ls causes an index out of bounds error becuase it contains only "total ..." in line_mode.py # apply selection on the list for f in ls: logger.debug( f"line to parse: {f}" ) matched = False line = ls[f] line = self.on_line(line) if (line is None) or (line == ""): continue if stat.S_ISDIR(line.st_mode): new_dir[f] = line else: new_ls[f] = line return True, new_ls, new_dir except Exception as e: logger.warning("dest.lsdir: Could not ls directory") logger.debug("Exception details:", exc_info=True) return False, {}, {} def poll_directory(self, pdir): #logger.debug("poll_directory %s %s" % (pdir)) msgs = [] # cd to that directory logger.debug(" cd %s" % pdir) ok = self.cd(pdir) if not ok: return [] # ls that directory ok, file_dict, dir_dict = self.lsdir() if not ok: return [] filelst = file_dict.keys() desclst = file_dict logger.debug("poll_directory: new files found %d" % len(filelst)) # post poll list msgs.extend(self.poll_list_post(pdir, dir_dict, dir_dict.keys())) msgs.extend(self.poll_list_post(pdir, desclst, filelst)) # poll in children directory sdir = sorted(dir_dict.keys()) for d in sdir: if d == '.' or d == '..': continue #d_lspath = lspath + '_' + d d_pdir = pdir + os.sep + d msgs.extend(self.poll_directory(d_pdir)) return msgs def poll_file_post(self, desc, destDir, remote_file): path = destDir + '/' + remote_file # posting a localfile if self.o.post_baseUrl.startswith('file:'): if os.path.isfile(path) or os.path.islink(path): try: lstat = sarracenia.stat(path) except: lstat = None ok = sarracenia.Message.fromFileInfo(path, self.o, lstat) if os.path.islink(path): if 'size' in msg: del msg['size'] if not self.o.follow_symlinks: try: ok['fileOp'] = { 'link': os.readlink(path) } if 'Identity' in msg: del ok['Identity'] except: logger.error("cannot read link %s message dropped" % path) logger.debug('Exception details: ', exc_info=True) ok=None return ok post_relPath = destDir + '/' + remote_file logger.debug('desc: type: %s, value: %s' % (type(desc), desc)) if type(desc) == str: line = desc.split() st = paramiko.SFTPAttributes() st.st_size = file_size_fix(line[4]) # actionally only need to convert normalized time to number here... # just being lazy... lstime = dateparser.parse(line[5] + " " + line[6]).timestamp() st.st_mtime = lstime st.st_atime = lstime desc = st msg = sarracenia.Message.fromFileInfo(post_relPath, self.o, desc) if stat.S_ISDIR(desc.st_mode): if 'mkdir' not in self.o.fileEvents: return None msg['fileOp'] = { 'directory':'' } elif stat.S_ISLNK(desc.st_mode): if 'link' not in self.o.fileEvents: return None if not self.o.follow_symlinks: try: msg['fileOp'] = { 'link': self.dest.readlink(path) } except: logger.error("cannot read link %s message dropped" % post_relPath) logger.debug('Exception details: ', exc_info=True) return None if 'create' not in self.o.fileEvents and 'modify' not in self.o.fileEvents: return None if self.o.identity_method and (',' in self.o.identity_method): m, v = self.o.identity_method.split(',') msg['identity'] = {'method': m, 'value': v} # If there is a file operation, and it isn't a rename, then some fields are irrelevant/wrong. if 'fileOp' in msg and 'rename' not in msg['fileOp']: if 'identity' in msg: del msg['identity'] if 'size' in msg: del msg['size'] return [msg] def poll_list_post(self, destDir, desclst, filelst): n = 0 msgs = [] for idx, remote_file in enumerate(filelst): desc = desclst[remote_file] new_msgs = self.poll_file_post(desc, destDir, remote_file) if new_msgs: msgs.extend(new_msgs) return msgs # ============= # for all directories, get urls to post # if True is returned it means : no sleep, retry on return # False means, go to sleep and retry after sleep seconds # ============= def poll(self) -> list: msgs = [] try: self.dest.connect() except: # connection did not work logger.error("sr_poll/post_new_url: unable to connect to %s" % self.o.pollUrl) logger.debug('Exception details: ', exc_info=True) nap=15 logger.error("Sleeping {nap} secs and retry") time.sleep(nap) return [] for destDir in self.o.path: currentDir = self.o.variableExpansion(destDir) if currentDir == '': currentDir = destDir msgs.extend(self.poll_directory(currentDir)) logger.debug('poll_directory returned: %s' % len(msgs)) # close connection try: self.dest.close() except: pass return msgs