Source code for sarracenia.transfer.ftp

# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2021
#
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
########################################################################
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; version 2 of the License.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
#

import ftplib, os, subprocess, sys, time
import logging
from sarracenia.transfer import Transfer
from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise
from urllib.parse import unquote

logger = logging.getLogger(__name__)


[docs] class Ftp(Transfer): """ File Transfer Protocol (FTP) ( https://datatracker.ietf.org/doc/html/rfc959 ) sarracenia transfer protocol subclass supports/uses additional custom options: * accelFtpputCommand (default: '/usr/bin/ncftpput %s %d' ) * accelFtpgetCommand (default: '/usr/bin/ncftpget %s %d' ) built using: ftplib ( https://docs.python.org/3/library/ftplib.html ) """
[docs] def __init__(self, proto, options): super().__init__(proto, options) self.o.add_option('accelFtpputCommand', 'str', '/usr/bin/ncftpput %s %d') self.o.add_option('accelFtpgetCommand', 'str', '/usr/bin/ncftpget %s %d') logger.debug("sr_ftp __init__") self.connected = False self.ftp = None self.details = None self.batch = 0
def registered_as(): return ['ftp'] # cd def cd(self, path): logger.debug("sr_ftp cd %s" % path) alarm_set(self.o.timeout) self.ftp.cwd(self.originalDir) self.ftp.cwd(path) self.pwd = path alarm_cancel() def cd_forced(self, perm, path): logger.debug("sr_ftp cd_forced %d %s" % (perm, path)) # try to go directly to path alarm_set(self.o.timeout) self.ftp.cwd(self.originalDir) try: self.ftp.cwd(path) alarm_cancel() return except: pass alarm_cancel() # need to create subdir subdirs = path.split("/") if path[0:1] == "/": subdirs[0] = "/" + subdirs[0] for d in subdirs: if d == '': continue # try to go directly to subdir try: alarm_set(self.o.timeout) self.ftp.cwd(d) alarm_cancel() continue except: pass # create alarm_set(self.o.timeout) self.ftp.mkd(d) alarm_cancel() # chmod alarm_set(self.o.timeout) self.ftp.voidcmd('SITE CHMOD ' + "{0:o}".format(perm) + ' ' + d) alarm_cancel() # cd alarm_set(self.o.timeout) self.ftp.cwd(d) alarm_cancel() # check_is_connected def check_is_connected(self): logger.debug("sr_ftp check_is_connected") if self.ftp == None: return False if not self.connected: return False if self.sendTo != self.o.sendTo: self.close() return False self.batch = self.batch + 1 if self.batch > self.o.batch: self.close() return False # really connected try: cwd = self.getcwd() except: self.close() return False return True # chmod def chmod(self, perm, path): logger.debug("sr_ftp chmod %s %s" % (str(perm), path)) alarm_set(self.o.timeout) self.ftp.voidcmd('SITE CHMOD ' + "{0:o}".format(perm) + ' ' + path) alarm_cancel() # close def close(self): logger.debug("sr_ftp close") old_ftp = self.ftp self.init() try: alarm_set(self.o.timeout) old_ftp.quit() except: pass alarm_cancel() # connect... def connect(self): logger.debug("sr_ftp connect %s" % self.o.sendTo) self.connected = False self.sendTo = self.o.sendTo if not self.credentials(): return False # timeout alarm 100 secs to connect alarm_set(self.o.timeout) try: expire = -999 if self.o.timeout: expire = self.o.timeout if self.port == '' or self.port == None: self.port = 21 if not self.tls: ftp = ftplib.FTP() ftp.encoding = 'utf-8' ftp.connect(self.host, self.port, timeout=expire) ftp.login(self.user, unquote(self.password)) else: # ftplib supports FTPS with TLS ftp = ftplib.FTP_TLS(self.host, self.user, unquote(self.password), timeout=expire) ftp.encoding = 'utf-8' if self.prot_p: ftp.prot_p() # needed only if prot_p then set back to prot_c #else : ftp.prot_c() ftp.set_pasv(self.passive) self.originalDir = '.' try: self.originalDir = ftp.pwd() except: logger.warning("Unable to ftp.pwd") logger.debug('Exception details: ', exc_info=True) self.pwd = self.originalDir self.connected = True self.ftp = ftp #alarm_cancel() return True except: logger.error("Unable to connect to %s (user:%s)" % (self.host, self.user)) logger.debug('Exception details: ', exc_info=True) alarm_cancel() return False # credentials... def credentials(self): logger.debug("sr_ftp credentials %s" % self.sendTo) try: ok, details = self.o.credentials.get(self.sendTo) if details: url = details.url self.host = url.hostname self.port = url.port self.user = url.username self.password = url.password self.passive = details.passive self.binary = details.binary self.tls = details.tls self.prot_p = details.prot_p return True except: logger.error( "sr_ftp/credentials: unable to get credentials for %s" % self.sendTo) logger.debug('Exception details: ', exc_info=True) return False # delete def delete(self, path): logger.debug("sr_ftp rm %s" % path) alarm_set(self.o.timeout) # if delete does not work (file not found) run pwd to see if connection is ok try: self.ftp.delete(path) except: d = self.ftp.pwd() alarm_cancel() # get def get(self, msg, remote_file, local_file, remote_offset=0, local_offset=0, length=0, exactLength=False): logger.debug("sr_ftp get %s %s %d" % (remote_file, local_file, local_offset)) # open local file dst = self.local_write_open(local_file, local_offset) # initialize sumalgo if self.sumalgo: self.sumalgo.set_path(remote_file) # download self.write_chunk_init(dst) if self.binary: self.ftp.retrbinary('RETR ' + remote_file, self.write_chunk, self.o.bufsize) else: self.ftp.retrlines('RETR ' + remote_file, self.write_chunk) rw_length = self.write_chunk_end() # close self.local_write_close(dst) return rw_length def getAccelerated(self, msg, remote_file, local_file, length=0, remote_offset=0, exactLength=False): base_url = msg['baseUrl'] if base_url[-1] == '/': base_url = base_url[0:-1] arg1 = base_url + self.pwd + os.sep + remote_file arg1 = arg1.replace(' ', '\\ ') arg2 = local_file cmd = self.o.accelFtpgetCommand.replace('%s', arg1) cmd = cmd.replace('%d', arg2).split() logger.info("accel_ftp: %s" % ' '.join(cmd)) p = subprocess.Popen(cmd) p.wait() if p.returncode != 0: return -1 sz = os.stat(arg2).st_size return sz # getcwd def getcwd(self): alarm_set(self.o.timeout) pwd = self.ftp.pwd() alarm_cancel() return pwd # ls def ls(self): logger.debug("sr_ftp ls") self.entries = {} alarm_set(self.o.timeout) self.ftp.retrlines('LIST', self.line_callback) alarm_cancel() logger.debug("sr_ftp ls = (size: %d) %s ..." % (len(self.entries), str(self.entries)[0:255])) return self.entries # line_callback: entries[filename] = 'stripped_file_description' def line_callback(self, iline): #logger.debug("sr_ftp line_callback %s" % iline) alarm_cancel() oline = iline oline = oline.strip('\n') oline = oline.strip() oline = oline.replace('\t', ' ') opart1 = oline.split(' ') opart2 = [] for p in opart1: if p == '': continue opart2.append(p) # else case is in the event of unlikely race condition # on linux, there are 8 fields, with spaces, perhaps more... if len(opart2) > 7: # university of Wisconsin as an ftp server that has an extra auth field. if opart2[4].isnumeric(): # normal linux case. fil = ' '.join(opart2[8:]) else: # U. Wisconsin case. fil = ' '.join(opart2[9:]) else: # guess it is on windows... fil = ' '.join(opart2[3:]) line = ' '.join(opart2) self.entries[fil] = line alarm_set(self.o.timeout) # mkdir def mkdir(self, remote_dir): logger.debug("sr_ftp mkdir %s" % remote_dir) alarm_set(self.o.timeout) self.ftp.mkd(remote_dir) alarm_cancel() alarm_set(self.o.timeout) self.ftp.voidcmd('SITE CHMOD ' + "{0:o}".format(self.o.permDirDefault) + ' ' + remote_dir) alarm_cancel() # put def put(self, msg, local_file, remote_file, local_offset=0, remote_offset=0, length=0): logger.debug("sr_ftp put %s %s" % (local_file, remote_file)) # open src = self.local_read_open(local_file, local_offset) # upload self.write_chunk_init(None) if self.binary: self.ftp.storbinary("STOR " + remote_file, src, self.o.bufsize, self.write_chunk) else: self.ftp.storlines("STOR " + remote_file, src, self.write_chunk) rw_length = self.write_chunk_end() # close self.local_read_close(src) return rw_length def putAccelerated(self, msg, local_file, remote_file, length=0): dest_baseUrl = self.o.sendTo if dest_baseUrl[-1] == '/': dest_baseUrl = dest_baseUrl[0:-1] arg2 = dest_baseUrl + msg['new_dir'] + os.sep + remote_file arg2 = arg2.replace(' ', '\\ ') arg1 = local_file cmd = self.o.accelFtpputCommand.replace('%s', arg1) cmd = cmd.replace('%d', arg2).split() logger.info("accel_ftp: %s" % ' '.join(cmd)) p = subprocess.Popen(cmd) p.wait() if p.returncode != 0: return -1 # FIXME: faking success... not sure how to check really. sz = int(msg['size']) return sz # rename def rename(self, remote_old, remote_new): logger.debug("sr_ftp rename %s %s" % (remote_old, remote_new)) alarm_set(self.o.timeout) self.ftp.rename(remote_old, remote_new) alarm_cancel() # rmdir def rmdir(self, path): logger.debug("sr_ftp rmdir %s" % path) alarm_set(self.o.timeout) self.ftp.rmd(path) alarm_cancel() # umask def umask(self): logger.debug("sr_ftp umask") alarm_set(self.o.timeout) self.ftp.voidcmd('SITE UMASK 777') alarm_cancel()