# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2021
#
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
########################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
import logging, paramiko, os, subprocess, sys, time
from paramiko import *
from stat import *
from sarracenia.transfer import Transfer
from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise
from urllib.parse import unquote
import logging
logger = logging.getLogger(__name__)
[docs]
class Sftp(Transfer):
"""
SecSH File Transfer Protocol (SFTP) ( https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt )
Sarracenia transfer protocol subclass supports/uses additional custom options:
* accelScpCommand (default: '/usr/bin/scp %s %d' )
The module uses the paramiko library for python SecSH support ( https://www.paramiko.org/ )
"""
[docs]
def __init__(self, proto, options):
super().__init__(proto, options)
logger.debug("sr_sftp __init__")
self.o.add_option("accelScpCommand", "str", "/usr/bin/scp %s %d")
# sftp command times out after 20 secs
# this setting is different from the computed timeout (protocol)
self.connected = False
self.sftp = None
self.ssh = None
self.seek = True
self.batch = 0
self.connected = False
self.ssh_config = None
try:
self.ssh_config = paramiko.SSHConfig()
ssh_config = os.path.expanduser('~/.ssh/config')
if os.path.isfile(ssh_config):
fp = open(ssh_config, 'r')
self.ssh_config.parse(fp)
fp.close()
except:
logger.error("sr_sftp/__init__: unable to load ssh config %s" %
ssh_config)
logger.debug('Exception details: ', exc_info=True)
def registered_as():
return ['sftp', 'scp', 'ssh', 'fish']
# cd
def cd(self, path):
alarm_set(self.o.timeout)
logger.debug("first cd to %s" % self.originalDir)
self.sftp.chdir(self.originalDir)
logger.debug("then cd to %s" % path)
self.sftp.chdir(path)
self.pwd = path
alarm_cancel()
# cd forced
def cd_forced(self, perm, path):
logger.debug("sr_sftp cd_forced %d %s" % (perm, path))
# try to go directly to path
alarm_set(self.o.timeout)
self.sftp.chdir(self.originalDir)
try:
self.sftp.chdir(path)
alarm_cancel()
return
except:
pass
alarm_cancel()
# need to create subdir
subdirs = path.split("/")
if path[0:1] == "/": subdirs[0] = "/" + subdirs[0]
for d in subdirs:
if d == '': continue
# try to go directly to subdir
try:
alarm_set(self.o.timeout)
self.sftp.chdir(d)
alarm_cancel()
continue
except:
pass
# create and go to subdir
alarm_set(self.o.timeout)
self.sftp.mkdir(d, self.o.permDirDefault)
self.sftp.chdir(d)
alarm_cancel()
def check_is_connected(self):
logger.debug("sr_sftp check_is_connected")
if self.sftp == None: return False
if not self.connected: return False
if self.sendTo != self.o.sendTo:
self.close()
return False
self.batch = self.batch + 1
if self.batch > self.o.batch:
self.close()
return False
# really connected, getcwd would not work, send_ignore would not work... so chdir used
try:
alarm_set(self.o.timeout)
self.sftp.chdir(self.originalDir)
alarm_cancel()
except:
self.close()
return False
return True
# chmod
def chmod(self, perm, path):
logger.debug("sr_sftp chmod %s %s" % ("{0:o}".format(perm), path))
alarm_set(self.o.timeout)
self.sftp.chmod(path, perm)
alarm_cancel()
# close
def close(self):
logger.debug("sr_sftp close")
old_sftp = self.sftp
old_ssh = self.ssh
self.init()
alarm_set(self.o.timeout)
try:
old_sftp.close()
except:
pass
try:
old_ssh.close()
except:
pass
alarm_cancel()
# connect...
def connect(self):
logger.debug("sr_sftp connect %s" % self.o.sendTo)
if self.connected: self.close()
self.connected = False
self.sendTo = self.o.sendTo
if not self.credentials(): return False
alarm_set(self.o.timeout)
try:
sublogger = logging.getLogger('paramiko')
sublogger.setLevel(logging.CRITICAL)
self.ssh = paramiko.SSHClient()
# FIXME this should be an option... for security reasons... not forced
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.password:
self.ssh.connect(self.host,self.port,self.user,unquote(self.password), \
pkey=None,key_filename=self.ssh_keyfile,\
timeout=self.o.timeout,allow_agent=False,look_for_keys=False)
else:
self.ssh.connect(self.host,self.port,self.user,self.password, \
pkey=None,key_filename=self.ssh_keyfile,\
timeout=self.o.timeout)
#if ssh_keyfile != None :
# key=DSSKey.from_private_key_file(ssh_keyfile,password=None)
sftp = self.ssh.open_sftp()
if self.o.timeout != None:
logger.debug("sr_sftp connect setting timeout %f" %
self.o.timeout)
channel = sftp.get_channel()
channel.settimeout(self.o.timeout)
sftp.chdir('.')
self.originalDir = sftp.getcwd()
self.pwd = self.originalDir
self.connected = True
self.sftp = sftp
#alarm_cancel()
return True
except:
logger.error("sr_sftp/connect: unable to connect to %s (user:%s)" %
(self.host, self.user))
logger.debug('Exception details: ', exc_info=True)
alarm_cancel()
return False
# credentials...
def credentials(self):
#logger.debug("sr_sftp credentials %s" % self.sendTo)
try:
ok, details = self.o.credentials.get(self.sendTo)
if details: url = details.url
self.host = url.hostname
self.port = url.port
self.user = url.username
self.password = url.password
self.ssh_keyfile = details.ssh_keyfile
if url.username == '': self.user = None
if url.password == '': self.password = None
if url.port == '': self.port = None
if self.ssh_keyfile: self.password = None
if self.port == None: self.port = 22
#logger.debug("h u:p s = %s:%d %s:%s %s" %
# (self.host, self.port, self.user, self.password,
# self.ssh_keyfile))
if self.ssh_config == None: return True
if self.user == None or \
( self.ssh_keyfile == None and self.password == None):
#logger.debug("check in ssh_config")
for key, value in self.ssh_config.lookup(self.host).items():
if key == "hostname":
self.host = value
elif key == "user":
self.user = value
elif key == "port":
self.port = int(value)
elif key == "identityfile":
self.ssh_keyfile = os.path.expanduser(value[0])
#logger.debug("h u:p s = %s:%d %s:%s %s" %
# (self.host, self.port, self.user, self.password,
# self.ssh_keyfile))
return True
except:
logger.error(
"sr_sftp/credentials: unable to get credentials for %s" %
self.sendTo)
logger.debug('Exception details: ', exc_info=True)
return False
# delete
# MG sneak rmdir here in case 'R' message implies a directory (remote mirroring)
def delete(self, path):
logger.debug("sr_sftp rm %s" % path)
alarm_set(self.o.timeout)
# check if the file is there... if not we are done,no error
try:
s = self.sftp.lstat(path)
except:
alarm_cancel()
return
# proceed with file/link removal
if not S_ISDIR(s.st_mode):
logger.debug("sr_sftp remove %s" % path)
self.sftp.remove(path)
# proceed with directory removal
else:
logger.debug("sr_sftp rmdir %s" % path)
self.sftp.rmdir(path)
alarm_cancel()
def readlink(self, link):
logger.debug("%s" % (link))
alarm_set(self.o.timeout)
value = self.sftp.readlink(link)
alarm_cancel()
return value
# symlink
def symlink(self, link, path):
logger.debug("(in %s), create this file %s as a link to: %s" % (self.getcwd(), path, link) )
alarm_set(self.o.timeout)
self.sftp.symlink(link, path)
alarm_cancel()
# get
def get(self,
msg,
remote_file,
local_file,
remote_offset=0,
local_offset=0,
length=0, exactLength=False):
logger.debug(
"sr_sftp get %s %s %d %d %d %s" %
(remote_file, local_file, remote_offset, local_offset, length, exactLength))
alarm_set(2 * self.o.timeout)
rfp = self.sftp.file(remote_file, 'rb', self.o.bufsize)
if remote_offset != 0: rfp.seek(remote_offset, 0)
rfp.settimeout(1.0 * self.o.timeout)
alarm_cancel()
# read from rfp and write to local_file
rw_length = self.read_writelocal(remote_file, rfp, local_file,
local_offset, length, exactLength=False)
# close
alarm_set(self.o.timeout)
rfp.close()
alarm_cancel()
return rw_length
def getAccelerated(self, msg, remote_file, local_file, length=0, remote_offset=0, exactLength=False):
base_url = msg['baseUrl'].replace('sftp://', '')
if base_url[-1] == '/':
base_url = base_url[0:-1]
arg1 = base_url + ':' + self.pwd + os.sep + remote_file
arg1 = arg1.replace(' ', '\\ ')
arg2 = '.' + os.sep + local_file
cmd = self.o.accelScpCommand.replace('%s', arg1)
cmd = cmd.replace('%d', arg2).split()
logger.info("accel_sftp: %s" % ' '.join(cmd))
p = subprocess.Popen(cmd)
p.wait()
if p.returncode != 0:
return -1
sz = os.stat(arg2).st_size
return sz
# getcwd
def getcwd(self):
alarm_set(self.o.timeout)
cwd = self.sftp.getcwd() if self.sftp else None
alarm_cancel()
return cwd
# ls
def ls(self):
logger.debug("sr_sftp ls")
self.entries = {}
# timeout is at least 30 secs, say we wait for max 5 mins
alarm_set(self.o.timeout)
dir_attr = self.sftp.listdir_attr()
alarm_cancel()
for index in range(len(dir_attr)):
attr = dir_attr[index]
line = attr.__str__()
self.line_callback(line, attr)
#logger.debug("sr_sftp ls = %s" % self.entries )
return self.entries
# line_callback: ls[filename] = 'stripped_file_description'
def line_callback(self, iline, attr):
#logger.debug("sr_sftp line_callback %s" % iline)
oline = iline
oline = oline.strip('\n')
oline = oline.strip()
oline = oline.replace('\t', ' ')
opart1 = oline.split(' ')
opart2 = []
for p in opart1:
if p == '': continue
opart2.append(p)
# else case is in the event of unlikely race condition
fil = ' '.join(opart2[8:])
line = ' '.join(opart2)
self.entries[fil] = attr
# mkdir
def mkdir(self, remote_dir):
logger.debug("mkdir %s" % remote_dir)
alarm_set(self.o.timeout)
try:
s = self.sftp.lstat(path)
if S_ISDIR(s.st_mode):
return
logger.error( f"cannot mkdir {path}, file exists" )
alarm_cancel()
return
except FileNotFoundError:
pass
except:
alarm_cancel()
return
self.sftp.mkdir(remote_dir, self.o.permDirDefault)
alarm_cancel()
# put
def put(self,
msg,
local_file,
remote_file,
local_offset=0,
remote_offset=0,
length=0):
logger.debug(
"sr_sftp put %s %s %d %d %d" %
(local_file, remote_file, local_offset, remote_offset, length))
# simple file
alarm_set(2 * self.o.timeout)
if length == 0:
rfp = self.sftp.file(remote_file, 'wb', self.o.bufsize)
rfp.settimeout(1.0 * self.o.timeout)
# parts
else:
try:
self.sftp.stat(remote_file)
except:
rfp = self.sftp.file(remote_file, 'wb', self.o.bufsize)
rfp.close()
rfp = self.sftp.file(remote_file, 'r+b', self.o.bufsize)
rfp.settimeout(1.0 * self.o.timeout)
if remote_offset != 0: rfp.seek(remote_offset, 0)
alarm_cancel()
# read from local_file and write to rfp
rw_length = self.readlocal_write(local_file, local_offset, length, rfp)
# no sparse file... truncate where we are at
alarm_set(self.o.timeout)
self.fpos = remote_offset + rw_length
if length != 0: rfp.truncate(self.fpos)
rfp.close()
alarm_cancel()
return rw_length
def putAccelerated(self, msg, local_file, remote_file, length=0):
dest_baseUrl = self.o.sendTo.replace('sftp://', '')
if dest_baseUrl[-1] == '/':
dest_baseUrl = dest_baseUrl[0:-1]
arg2 = dest_baseUrl + ':' + msg['new_dir'] + os.sep + remote_file
arg2 = arg2.replace(' ', '\\ ')
arg1 = local_file
cmd = self.o.accelScpCommand.replace('%s', arg1)
cmd = cmd.replace('%d', arg2).split()
logger.info("accel_sftp: %s" % ' '.join(cmd))
p = subprocess.Popen(cmd)
p.wait()
if p.returncode != 0:
return -1
# FIXME: faking success... not sure how to check really.
sz = int(msg['size'])
return sz
# rename
def rename(self, remote_old, remote_new):
logger.debug("sr_sftp rename %s %s" % (remote_old, remote_new))
try:
self.delete(remote_new)
except:
pass
alarm_set(self.o.timeout)
self.sftp.rename(remote_old, remote_new)
alarm_cancel()
# rmdir
def rmdir(self, path):
logger.debug("sr_sftp rmdir %s " % path)
alarm_set(self.o.timeout)
self.sftp.rmdir(path)
alarm_cancel()
# utime
def utime(self, path, tup):
logger.debug("sr_sftp utime %s %s " % (path, tup))
alarm_set(self.o.timeout)
self.sftp.utime(path, tup)
alarm_cancel()