Source code for sarracenia.flowcb.clamav

"""
 A sample on_part plugin to perform virus scanning, using the ClamAV engine.

 requires a clamd binding package to be installed. On debian derived systems:: 

    sudo apt-get install python3-pyclamd

 on others::

    pip3 install pyclamd

 author: Peter Silva
"""

import logging
import os
import stat
import time

from sarracenia import nowflt
import sarracenia
from sarracenia.flowcb import FlowCB

#
# Support for features inventory mechanism.
#
from sarracenia.featuredetection import features

features['clamd'] = { 'modules_needed': [ 'pyclamd' ], 'Needed': True,
        'lament' : 'cannot use clamd to av scan files transferred',
        'rejoice' : 'can use clamd to av scan files transferred' }

try:
    import pyclamd
    features['clamd']['present'] = True
except:
    features['clamd']['present'] = False



logger = logging.getLogger(__name__)


[docs] class Clamav(FlowCB): """ Invoke ClamAV anti-virus scanning on files as they pass through a data pump. when it is invoked depends on the component it is used from. from a sender, post, or poll, the scan should stop processing prior to the transfer. for other components, subsscribers that download, it needs to take place after downloading. """
[docs] def __init__(self, options) -> None: super().__init__(options,logger) self.metric_scanned = 0 self.metric_hits = 0 if sarracenia.features['pyclamd']['present']: import pyclamd self.av = pyclamd.ClamdAgnostic() print("clam_scan on_part plugin initialized")
def avscan_hit(self, scanfn) -> bool: # worried about how long the scan will take. start = nowflt() virus_found = self.av.scan_file(scanfn) end = nowflt() self.metric_scanned += 1 if virus_found: logger.error( "part_clamav_scan took %g not forwarding, virus detected in %s" % (end - start, scanfn)) self.metric_hits += 1 return False logger.info("part_clamav_scan took %g seconds, no viruses in %s" % (end - start, scanfn)) return True def after_accept(self, worklist) -> None: if self.o.component in ['sender', 'post', 'watch']: new_incoming = [] for m in worklist.incoming: scanfn = m['new_dir'] + os.sep + m['new_file'] logger.info(f'scanning: {scanfn}') if self.avscan_hit(scanfn): worklist.rejected.append(m) else: new_incoming.append(m) worklist.incoming = new_incoming def after_work(self, worklist) -> None: if self.o.component in ['subscribe', 'sarra']: new_ok = [] for m in worklist.ok: scanfn = m['new_dir'] + os.sep + m['new_file'] logger.info(f'scanning: {scanfn}') if self.avscan_hit(scanfn): worklist.rejected.append(m) else: new_ok.append(m) worklist.ok = new_ok def on_housekeeping(self): logger.info( f'files scanned {self.metric_scanned}, hits: {self.metric_hits} ') self.metric_scanned = 0 self.metric_hits = 0