Source code for sarracenia.credentials

# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#
# Questions or bugs report: dps-client@ec.gc.ca
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
# credentials.py : python3 utility tool to configure all protocol credentials
#
#
# Code contributed by:
#  Michel Grenier - Shared Services Canada
#  Last Changed   : Dec 29 11:42:11 EST 2015
#
########################################################################
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; version 2 of the License.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
#

#

import logging

logger = logging.getLogger(__name__)

logger.setLevel(logging.INFO)

import os
import re
import urllib, urllib.parse
import sys


[docs] class Credential: r""" An object that holds information about a credential, read from a credential file, which has one credential per line, format:: url option1=value1, option2=value2 Examples:: sftp://alice@herhost/ ssh_keyfile=/home/myself/mykeys/.ssh.id_dsa ftp://georges:Gpass@hishost/ passive = True, binary = True `Format Documentation. <https://metpx.github.io/sarracenia/Reference/sr3_credentials.7.html>`_ Attributes: url (urllib.parse.ParseResult): object with URL, password, etc. ssh_keyfile (str): path to SSH key file for SFTP passive (bool): use passive FTP mode, defaults to ``True`` binary (bool): use binary FTP mode, defaults to ``True`` tls (bool): use FTPS with TLS, defaults to ``False`` prot_p (bool): use a secure data connection for TLS bearer_token (str): bearer token for HTTP authentication login_method (str): force a specific login method for AMQP (PLAIN, AMQPLAIN, EXTERNAL or GSSAPI) Usage: # build a credential from a url string: from sarracenia.credentials import Credential broker = Credential('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca') """
[docs] def __init__(self, urlstr=None): """Create a Credential object. Args: urlstr (str): a URL in string form to be parsed. """ if urlstr is not None: self.url = urllib.parse.urlparse(urlstr) else: self.url = None self.ssh_keyfile = None self.passive = True self.binary = True self.tls = False self.prot_p = False self.bearer_token = None self.login_method = None self.s3_endpoint = None self.s3_session_token = None self.azure_credentials = None
[docs] def __str__(self): """Returns attributes of the Credential object as a readable string. """ s = '' if False: s += self.url.geturl() else: s += self.url.scheme + '://' if self.url.username: s += self.url.username #if self.url.password: # s += ':' + self.url.password if self.url.hostname: s += '@' + self.url.hostname if self.url.port: s += ':' + str(self.url.port) if self.url.path: s += self.url.path s += " %s" % self.ssh_keyfile s += " %s" % self.passive s += " %s" % self.binary s += " %s" % self.tls s += " %s" % self.prot_p s += " %s" % self.bearer_token s += " %s" % self.login_method s += " %s" % self.s3_endpoint #want to show they provided a session token, but not leak it (like passwords above) s += " %s" % 'Yes' if self.s3_session_token != None else 'No' s += " %s" % 'Yes' if self.azure_credentials != None else 'No' return s
[docs] class CredentialDB: """Parses, stores and manages Credential objects. Attributes: credentials (dict): contains all sarracenia.credentials.Credential objects managed by the CredentialDB. Usage: # build a credential via lookup in the normal files: import CredentialDB from sarracenia.credentials credentials = CredentialDB.read( "/the/path/to/the/credentials.conf" ) # if there are corresponding passwords or modulation of login information look it up. broker = credentials.get( "amqps://hpfx.collab.science.gc.ca" ) remote = credentials.get( "sftp://hoho@theserver" ) """
[docs] def __init__(self, Unused_logger=None): """Create a CredentialDB. Args: Unused_logger: logger argument no longer used... left there for API compat with old calls. """ self.credentials = {} self.pwre = re.compile(':[^/:]*@') logger.debug("__init__")
[docs] def add(self, urlstr, details=None): """Add a new credential to the DB. Args: urlstr (str): string-formatted URL to be parsed and added to DB. details (sarracenia.credentials.Credential): a Credential object can be passed in, otherwise one is created by parsing urlstr. """ # need to create url object if details == None: details = Credential() details.url = urllib.parse.urlparse(urlstr) self.credentials[urlstr] = details
[docs] def get(self, urlstr): """Retrieve a Credential from the DB by urlstr. If the Credential is valid, but not already cached, it will be added to the CredentialDB. Args: urlstr (str): credentials as URL string to be parsed. Returns: tuple: containing cache_result (bool): ``True`` if the credential was retrieved from the CredentialDB cache, ``False`` if it was not in the cache. Note that ``False`` does not imply the Credential or urlstr is invalid. credential (sarracenia.credentials.Credential): the Credential object matching the urlstr, ``None`` if urlstr is invalid. """ #logger.debug("CredentialDB get %s" % urlstr) # already cached if self.has(urlstr): #logger.debug("CredentialDB get in cache %s %s" % (urlstr,self.credentials[urlstr])) return True, self.credentials[urlstr] # create url object if needed url = urllib.parse.urlparse(urlstr) # add anonymous default, if necessary. if ( 'amqp' in url.scheme ) and \ ( (url.username == None) or (url.username == '') ): urlstr = urllib.parse.urlunparse( ( url.scheme, \ 'anonymous:anonymous@%s' % url.netloc, url.path, None, None, url.port ) ) url = urllib.parse.urlparse(urlstr) if self.isValid(url): self.add(urlstr) return False, self.credentials[urlstr] # resolved from defined credentials ok, details = self._resolve(urlstr, url) if ok: return True, details # not found... is it valid ? if not self.isValid(url): return False, None # cache it as is... we dont want to validate every time self.add(urlstr) return False, self.credentials[urlstr]
[docs] def has(self, urlstr): """Return ``True`` if the Credential matching the urlstr is already in the CredentialDB. Args: urlstr(str): credentials in a URL string. """ logger.debug("has %s" % urlstr) return urlstr in self.credentials
[docs] def isTrue(self, S): """Returns ``True`` if s is ``true``, ``yes``, ``on`` or ``1``. Args: S (str): string to check if true. """ s = S.lower() if s == 'true' or s == 'yes' or s == 'on' or s == '1': return True return False
[docs] def isValid(self, url, details=None): """Validates a URL and Credential object. Checks for empty passwords, schemes, etc. Args: url (urllib.parse.ParseResult): ParseResult object for a URL. details (sarracenia.credentials.Credential): sarra Credential object containing additional details about the URL. Returns: bool: ``True`` if a URL is valid, ``False`` if not. """ # network location if url.netloc == '': # file (why here? anyway) if url.scheme == 'file': return True logger.error( f'no network location, and not a file url' ) return False # amqp... vhost not check: default / # user and password provided we are ok user = url.username != None and url.username != '' pasw = url.password != None and url.password != '' both = user and pasw # we have everything if both: return True # we have no user and no pasw (http normal, https... no cert, sftp hope for .ssh/config) if not user and not pasw: if url.scheme in ['http', 'https', 'sftp', 's3', 'azure', 'azblob']: return True logger.error( f'unknown scheme: {url.scheme}') return False # we have a pasw no user if pasw: # not sure... sftp hope to get user from .ssh/config if url.scheme == 'sftp': return True logger.error( f'password with no username specified') return False # we only have a user ... permitted only for sftp if url.scheme != 'sftp': logger.error( f'credential not found' ) return False # sftp and an ssh_keyfile was provided... check that it exists if details and details.ssh_keyfile: if not os.path.exists(details.ssh_keyfile): logger.error( f'ssh_keyfile not found: {details.ssh_keyfile}') return False # sftp with a user (and perhaps a valid ssh_keyfile) return True
[docs] def _parse(self, line): """Parse a line of a credentials file, add it to the CredentialDB. Args: line (str): line to be parsed. """ #logger.debug("parse %s" % self.pwre.sub(':<secret!>@', line, count=1) ) try: sline = line.strip() if len(sline) == 0 or sline[0] == '#': return # first field url string = protocol://user:password@host:port[/vost] parts = sline.split() urlstr = parts[0] url = urllib.parse.urlparse(urlstr) # credential details details = Credential() details.url = url # no option if len(parts) == 1: if not self.isValid(url, details): logger.error("bad credential 1 (%s)" % line) return self.add(urlstr, details) return # parsing options : comma separated option names # some option has name = value : like ssh_keyfile optline = sline.replace(urlstr, '') optline = optline.strip() optlist = optline.split(',') for optval in optlist: parts = optval.split('=') keyword = parts[0].strip() if keyword == 'ssh_keyfile': details.ssh_keyfile = os.path.expandvars(os.path.expanduser(parts[1].strip())) elif keyword == 'passive': details.passive = True elif keyword == 'active': details.passive = False elif keyword == 'binary': details.binary = True elif keyword == 'ascii': details.binary = False elif keyword == 'ssl': details.tls = False elif keyword == 'tls': details.tls = True elif keyword == 'prot_p': details.prot_p = True elif keyword in ['bearer_token', 'bt']: details.bearer_token = parts[1].strip() elif keyword == 'login_method': details.login_method = parts[1].strip() elif keyword == 's3_session_token': details.s3_session_token = urllib.parse.unquote(parts[1].strip()) elif keyword == 's3_endpoint': details.s3_endpoint = parts[1].strip() elif keyword == 'azure_storage_credentials': details.azure_credentials = urllib.parse.unquote(parts[1].strip()) else: logger.warning("bad credential option (%s)" % keyword) # need to check validity if not self.isValid(url, details): logger.error("bad credential 2 (%s)" % line) return # seting options to protocol self.add(urlstr, details) except: logger.error("credentials/parse %s" % line) logger.debug('Exception details: ', exc_info=True)
[docs] def read(self, path): """Read in a file containing credentials (e.g. credentials.conf). All credentials are parsed and added to the CredentialDB. Args: path (str): path of file to be read. """ logger.debug("read") # read in provided credentials (not mandatory) try: if os.path.exists(path): with open(path) as f: lines = f.readlines() for line in lines: self._parse(line) except: logger.error("credentials/read path = %s" % path) logger.debug('Exception details: ', exc_info=True)
#logger.debug("Credentials = %s\n" % self.credentials)
[docs] def _resolve(self, urlstr, url=None): """Resolve credentials for AMQP vhost from ones passed as a string, and optionally a urllib.parse.ParseResult object, into a sarracenia.credentials.Credential object. Args: urlstr (str): credentials in a URL string. url (urllib.parse.ParseResult): ParseResult object with creds. Returns: tuple: containing result (bool): ``False`` if the creds were not in the CredentialDB. ``True`` if they were. details (sarracenia.credentials.Credential): the updated Credential object, or ``None``. """ # create url object if needed if not url: url = urllib.parse.urlparse(urlstr) # resolving credentials for s in self.credentials: details = self.credentials[s] u = details.url if url.scheme != u.scheme: continue if url.hostname != u.hostname: continue if url.port != u.port: continue if url.username != u.username: if url.username != None: continue if url.password != u.password: if url.password != None: continue # for AMQP... vhost checking # amqp users have same credentials for any vhost # default / may not be set... if 'amqp' in url.scheme: url_vhost = url.path u_vhost = u.path if url_vhost == '': url_vhost = '/' if u_vhost == '': u_vhost = '/' if url_vhost != u_vhost: continue # resolved : cache it and return self.credentials[urlstr] = details #logger.debug("Credentials get resolved %s %s" % (urlstr,details)) return True, details return False, None