Source code for sarracenia.filemetadata
# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, Environment Canada, 2008-2015
#
# Questions or bugs report: dps-client@ec.gc.ca
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
# sr_post.py : python3 program allowing users to post an available product
#
# Code contributed by:
# Michel Grenier - Shared Services Canada
# Last Changed : Nov 8 22:10:16 UTC 2017
#
########################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
from sarracenia.featuredetection import features
if features['sftp']['present']:
import paramiko
FmdStat = paramiko.SFTPAttributes
else:
class FmdStat(object):
def __init__(self):
"""
(PAS: copied from paramiko)
Create a new (empty) SFTPAttributes object. All fields will be empty.
"""
self._flags = 0
self.st_size = None
self.st_uid = None
self.st_gid = None
self.st_mode = None
self.st_atime = None
self.st_mtime = None
self.attr = {}
if features['xattr']['present']:
import xattr
supports_extended_attributes = True
else:
supports_extended_attributes = False
import sys
supports_alternate_data_streams = False
if sys.platform == 'win32':
try:
from sarracenia.pyads import ADS
supports_alternate_data_streams = True
except:
pass
import json
STREAM_NAME = 'sr_.json'
xattr_disabled = False
def disable_xattr():
global xattr_disabled
xattr_disabled = True
[docs]
class FileMetadata:
r"""
This class implements storing metadata *with* a file.
on unlix/linux/mac systems, we use extended attributes,
where we apply a *user.sr\_* prefix to the attribute names to avoid clashes.
on Windows NT, create an "sr\_.json" Alternate Data Stream to store them.
API:
All values are utf-8, hence readable by some subset of humans.
not bytes. no binary, go away...
x = sr_attr( path ) <- read metadata from file.
x.list() <- list all extant extended attributes.
* sample return value: [ 'sum', 'mtime' ]
x.get('sum') <- look at one value.
* returns None if missing.
x.set('sum', 'hoho') <- set one value.
* fails silently (fall-back gracefully.)
x.persist() <- write metadata back to file, if necessary.
"""
[docs]
def __init__(self, path):
global supports_alternate_data_streams
global supports_extended_attributes
self.path = path
self.x = {}
self.dirty = False
if xattr_disabled:
supports_alternate_data_streams = False
supports_extended_attributes = False
return
if supports_alternate_data_streams:
self.ads = ADS(path)
s = list(self.ads)
if STREAM_NAME in s:
self.x = json.loads(
self.ads.get_stream_content(STREAM_NAME).decode('utf-8'))
if supports_extended_attributes:
try:
d = xattr.listxattr(path)
for i in d:
if isinstance(i, bytes):
i = i.decode('utf-8')
if not i.startswith('user.sr_'):
continue
k = i.replace('user.sr_', '')
v = xattr.getxattr(path, i).decode('utf-8')
if v[0] == '{':
v = json.loads(v)
self.x[k] = v
except:
self.x = {}
if 'integrity' in self.x: # id transition.
self.x['identity'] = self.x['integrity']
del self.x['integrity']
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.persist()
def __del__(self):
self.persist()
"""
return a dictionary of extended attributes.
"""
[docs]
def list(self):
"""
return the list of defined extended attributes. (keys to the dict.)
"""
return self.x.keys()
[docs]
def get(self, name) -> str:
"""
return the value of the named extended attribute.
"""
if name in self.x.keys():
if name == 'blocks':
for k in ['manifest', 'waiting' ]:
m={}
if k in self.x['blocks']:
for db in self.x['blocks'][k]: # when json'd for writing, numeric indices are stringified.
m[db if type(db) is int else int(db)] = self.x['blocks'][k][db]
self.x['blocks'][k] = m
return self.x[name]
return None
[docs]
def set(self, name, value):
"""
set the name & value pair to the extended attributes for the file.
"""
self.dirty = True
self.x[name] = value
[docs]
def persist(self):
"""
write the in-memory extended attributes to disk.
"""
global supports_alternate_data_streams
global supports_extended_attributes
if not self.dirty:
return
try:
if supports_alternate_data_streams:
#replace STREAM_NAME with json.dumps(self.x)
s = list(self.ads)
if STREAM_NAME in s:
self.ads.delete_stream(STREAM_NAME)
self.ads.add_stream_from_string(
STREAM_NAME, bytes(json.dumps(self.x, indent=4), 'utf-8'))
if supports_extended_attributes:
#set the attributes in the list. encoding utf8...
for i in self.x:
if type(self.x[i]) is not str:
s = json.dumps(self.x[i])
else:
s = self.x[i]
xattr.setxattr(self.path, 'user.sr_' + i,
bytes(s, 'utf-8'))
except:
# not really sure what to do in the exception case...
# permission would be a normal thing and just silently fail...
# could also be on windows, but not on an NTFS file system.
# silent failure means it falls back to using other means.
pass
self.dirty = False