# This file is part of sarracenia.
# The sarracenia suite is Free and is proudly provided by the Government of Canada
# Copyright (C) Her Majesty The Queen in Right of Canada, 2008-2020
#
# Sarracenia repository: https://github.com/MetPX/sarracenia
# Documentation: https://github.com/MetPX/sarracenia
#
########################################################################
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
# the real AMQP library... not this one...
import amqp
import copy
import json
import logging
import re
import sarracenia
from sarracenia.postformat import PostFormat
from sarracenia.moth import Moth
import signal
import os
import time
from urllib.parse import unquote
logger = logging.getLogger(__name__)
"""
amqp_ss_maxlen
the maximum length of a "short string", as per AMQP protocol, in bytes.
"""
amqp_ss_maxlen = 255
default_options = {
'auto_delete': False,
'batch': 25,
'durable': True,
'exchange': None,
'exchangeDeclare': True,
'expire': None,
'logLevel': 'info',
'persistent': True,
'prefetch': 25,
'queueName': None,
'queueBind': True,
'queueDeclare': True,
'reset': False,
'subtopic': [],
'messageDebugDump': False,
'topicPrefix': ['v03'],
'vhost': '/',
}
[docs]
class AMQP(Moth):
"""
implementation of the Moth API for the amqp library, which is built to talk to rabbitmq brokers in 0.8 and 0.9
AMQP dialects.
to allow acknowledgements we map: AMQP' 'delivery_tag' to the 'ack_id'
additional AMQP specific options:
exchangeDeclare - declare exchanges before use.
queueBind - bind queue to exchange before use.
queueDeclare - declare queue before use.
"""
def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
if raw_msg is not None:
body = raw_msg.body
if self.o['messageDebugDump']:
logger.info('raw message start')
if not ('content_type' in raw_msg.properties):
logger.warning('message is missing content-type header')
if body:
logger.info('body: type: %s (%d bytes) %s' %
(type(body), len(body), body))
else:
logger.info('had no body')
if raw_msg.headers:
logger.info('headers: type: %s (%d elements) %s' %
(type(raw_msg.headers), len(raw_msg.headers), raw_msg.headers))
else:
logger.info('had no headers')
if raw_msg.properties:
logger.info('properties:' % raw_msg.properties)
else:
logger.info('had no properties')
if raw_msg.delivery_info:
logger.info( f"delivery info: {raw_msg.delivery_info}" )
else:
logger.info('had no delivery info')
logger.info('raw message end')
if type(body) is bytes:
try:
body = raw_msg.body.decode("utf8")
except Exception as ex:
logger.error(
'ignoring message. UTF8 encoding expected. raw message received: %s' % ex)
logger.debug('Exception details: ', exc_info=True)
self.channel.basic_ack( raw_msg.delivery_info['delivery_tag'])
return None
if 'content_type' in raw_msg.properties:
content_type = raw_msg.properties['content_type']
else:
content_type = None
msg = PostFormat.importAny( body, raw_msg.headers, content_type, self.o )
if not msg:
logger.error('Decode failed, discarding message')
self.channel.basic_ack( raw_msg.delivery_info['delivery_tag'])
return None
topic = raw_msg.delivery_info['routing_key'].replace(
'%23', '#').replace('%22', '*')
msg['exchange'] = raw_msg.delivery_info['exchange']
source=None
if 'source' in self.o:
source = self.o['source']
elif 'sourceFromExchange' in self.o and self.o['sourceFromExchange']:
itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] )
if itisthere:
source = itisthere[1]
else:
itisthere = re.match( "xs_([^_]+)", msg['exchange'] )
if itisthere:
source = itisthere[1]
if 'source' in msg and 'sourceFromMessage' in self.o and self.o['sourceFromMessage']:
pass
elif source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])
msg_topic = topic.split('.')
# topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
if 'topicCopy' in self.o and self.o['topicCopy']:
topicOverride=True
else:
topicOverride=False
if 'relPath' in msg:
path_topic = self.o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/')
if msg_topic != path_topic:
topicOverride=True
# set subtopic if possible.
if msg_topic[0:len(self.o['topicPrefix'])] == self.o['topicPrefix']:
msg['subtopic'] = msg_topic[len(self.o['topicPrefix']):]
else:
topicOverride=True
if topicOverride:
msg['topic'] = topic
msg['_deleteOnPost'] |= set( ['topic'] )
msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
msg['local_offset'] = 0
msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic'])
if not msg.validate():
if hasattr(self,'channel'):
self.channel.basic_ack(msg['ack_id'])
logger.error('message acknowledged and discarded: %s' % msg)
msg = None
else:
msg = None
return msg
# length of an AMQP short string (used for headers and many properties)
amqp_ss_maxlen = 255
[docs]
def __init__(self, props, is_subscriber) -> None:
"""
connect to broker, depending on message_strategy stubborness, remain connected.
"""
super().__init__(props, is_subscriber)
self.last_qDeclare = time.time()
logging.basicConfig(
format=
'%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s')
self.o = copy.deepcopy(default_options)
self.o.update(props)
self.first_setup = True
self.please_stop = False
me = "%s.%s" % (__class__.__module__, __class__.__name__)
if ('settings' in self.o) and (me in self.o['settings']):
for s in self.o['settings'][me]:
self.o[s] = self.o['settings'][me][s]
if 'logLevel' in self.o['settings'][me]:
logger.setLevel(self.o['logLevel'].upper())
self.connection = None
def __connect(self, broker) -> bool:
"""
connect to broker.
returns True if connected, false otherwise.
* side effect: self.channel set to a new channel.
Expect caller to handle errors.
"""
if broker.url.hostname:
host = broker.url.hostname
if broker.url.port is None:
if (broker.url.scheme[-1] == 's'):
host += ':5671'
else:
host += ':5672'
else:
host += ':{}'.format(broker.url.port)
else:
logger.critical( f"invalid broker specification: {broker} " )
return False
# if needed, set the vhost using the broker URL's path
vhost = self.o['vhost']
# if the URL path is '/' or '', no vhost is specified and the default vhost from self.o
# will be used. Otherwise, strip off leading or trailing slashes.
if broker.url.path != '/' and broker.url.path != '':
vhost = broker.url.path.strip('/')
self.connection = amqp.Connection(host=host,
userid=broker.url.username,
password=unquote(
broker.url.password),
login_method=broker.login_method,
virtual_host=vhost,
ssl=(broker.url.scheme[-1] == 's'))
if hasattr(self.connection, 'connect'):
# check for amqp 1.3.3 and 1.4.9 because connect doesn't exist in those older versions
self.connection.connect()
self.management_channel = self.connection.channel(1)
self.channel = self.connection.channel(2)
return True
def _amqp_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True
def metricsReport(self):
if 'no' in self.o and self.o['no'] < 2 and self.is_subscriber and self.connection and self.connection.connected:
# control frequency of checks for queue size. hardcoded for now.
next_time = self.last_qDeclare + 30
now=time.time()
if next_time <= now:
#self._queueDeclare(passive=True)
self.last_qDeclare=now
super().metricsReport()
return self.metrics
def _queueDeclare(self,passive=False) -> int:
try:
# from sr_consumer.build_connection...
if not self.__connect(self.o['broker']):
logger.critical('could not connect')
if hasattr(self,'metrics'):
self.metrics['brokerQueuedMessageCount'] = -2
return -2
#FIXME: test self.first_setup and props['reset']... delete queue...
broker_str = self.o['broker'].url.geturl().replace(
':' + self.o['broker'].url.password + '@', '@')
if self.o['queueDeclare'] and self.o['queueName']:
args = {}
if self.o['expire']:
x = int(self.o['expire'] * 1000)
if x > 0: args['x-expires'] = x
if self.o['message_ttl']:
x = int(self.o['message_ttl'] * 1000)
if x > 0: args['x-message-ttl'] = x
#FIXME: conver expire, message_ttl to proper units.
if self.o['dry_run']:
logger.info('queue declare (dry run) %s (as: %s) ' %
(self.o['queueName'], broker_str))
msg_count=0
else:
qname, msg_count, consumer_count = self.management_channel.queue_declare(
self.o['queueName'],
passive=passive,
durable=self.o['durable'],
exclusive=False,
auto_delete=self.o['auto_delete'],
nowait=False,
arguments=args)
if not passive:
logger.info( f"queue declared {self.o['queueName']} (as: {broker_str}), (messages waiting: {msg_count})" )
if hasattr(self,'metrics'):
self.metrics['brokerQueuedMessageCount'] = msg_count
return msg_count
except Exception as err:
logger.error(
f'connecting to: {self.o["queueName"]}, durable: {self.o["durable"]}, expire: {self.o["expire"]}, auto_delete={self.o["auto_delete"]}'
)
logger.error("AMQP getSetup failed to {} with {}".format(
self.o['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)
if hasattr(self,'metrics'):
self.metrics['brokerQueuedMessageCount'] = -1
return -1
[docs]
def getSetup(self) -> None:
"""
Setup so we can get messages.
if message_strategy is stubborn, will loop here forever.
connect, declare queue, apply bindings.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)
while True:
if self.please_stop:
break
if 'broker' not in self.o or self.o['broker'] is None:
logger.critical( f"no broker given" )
break
# It does not really matter how it fails, the recovery approach is always the same:
# tear the whole thing down, and start over.
try:
# from sr_consumer.build_connection...
if not self.__connect(self.o['broker']):
logger.critical('could not connect')
break
# only first/lead instance needs to declare a queue and bindings.
if 'no' in self.o and self.o['no'] >= 2:
self.metricsConnect()
return
#logger.info('getSetup connected to {}'.format(self.o['broker'].url.hostname) )
if self.o['prefetch'] != 0:
self.channel.basic_qos(0, self.o['prefetch'], True)
#FIXME: test self.first_setup and props['reset']... delete queue...
broker_str = self.o['broker'].url.geturl().replace(
':' + self.o['broker'].url.password + '@', '@')
# from Queue declare
msg_count = self._queueDeclare()
if msg_count == -2: break
if self.o['queueBind'] and self.o['queueName']:
for tup in self.o['bindings']:
exchange, prefix, subtopic = tup
topic = '.'.join(prefix + subtopic)
if self.o['dry_run']:
logger.info('binding (dry run) %s with %s to %s (as: %s)' % \
( self.o['queueName'], topic, exchange, broker_str ) )
else:
logger.info('binding %s with %s to %s (as: %s)' % \
( self.o['queueName'], topic, exchange, broker_str ) )
if exchange:
self.management_channel.queue_bind(self.o['queueName'], exchange,
topic)
# Setup Successfully Complete!
self.metricsConnect()
logger.debug('getSetup ... Done!')
break
except Exception as err:
logger.error(
f'connecting to: {self.o["queueName"]}, durable: {self.o["durable"]}, expire: {self.o["expire"]}, auto_delete={self.o["auto_delete"]}'
)
logger.error("AMQP getSetup failed to {} with {}".format(
self.o['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)
if not self.o['message_strategy']['stubborn']: return
if ebo < 60: ebo *= 2
logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)
def putSetup(self) -> None:
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)
while True:
# It does not really matter how it fails, the recovery approach is always the same:
# tear the whole thing down, and start over.
try:
if self.please_stop:
break
if self.o['broker'] is None:
logger.critical( f"no broker given" )
break
if not self.__connect(self.o['broker']):
logger.critical('could not connect')
break
# transaction mode... confirms would be better...
self.channel.tx_select()
broker_str = self.o['broker'].url.geturl().replace(
':' + self.o['broker'].url.password + '@', '@')
#logger.debug('putSetup ... 1. connected to {}'.format(broker_str ) )
if self.o['exchangeDeclare']:
logger.debug('putSetup ... 1. declaring {}'.format(
self.o['exchange']))
if type(self.o['exchange']) is not list:
self.o['exchange'] = [self.o['exchange']]
for x in self.o['exchange']:
if self.o['dry_run']:
logger.info('exchange declare (dry run): %s (as: %s)' %
(x, broker_str))
else:
self.channel.exchange_declare(
x,
'topic',
auto_delete=self.o['auto_delete'],
durable=self.o['durable'])
logger.info('exchange declared: %s (as: %s)' %
(x, broker_str))
# Setup Successfully Complete!
self.metricsConnect()
logger.debug('putSetup ... Done!')
break
except Exception as err:
logger.error(
"AMQP putSetup failed to connect or declare exchanges {}@{} on {}: {}"
.format(self.o['exchange'], self.o['broker'].url.username,
self.o['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)
if not self.o['message_strategy']['stubborn']: return
if ebo < 60: ebo *= 2
self.close()
logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)
def putCleanUp(self) -> None:
try:
for x in self.o['exchange']:
try:
if self.o['dry_run']:
logger.info("deleted exchange (dry run): %s (if unused)" % x)
else:
if hasattr(self,'channel'):
self.channel.exchange_delete(x, if_unused=True)
logger.info("deleted exchange: %s" % x)
except amqp.exceptions.PreconditionFailed as err:
err_msg = str(err).replace("Exchange.delete: (406) PRECONDITION_FAILED - exchange ", "")
logger.warning("failed to delete exchange: %s" % err_msg)
except Exception as err:
logger.error("failed on {} with {}".format(
self.o['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)
def getCleanUp(self) -> None:
try:
if self.o['dry_run']:
logger.info("deleting queue (dry run) %s" % self.o['queueName'] )
else:
logger.info("deleting queue %s" % self.o['queueName'] )
if hasattr(self,'channel'):
self.channel.queue_delete(self.o['queueName'])
except Exception as err:
logger.error("failed to {} with {}".format(
self.o['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)
[docs]
def newMessages(self) -> list:
if not self.is_subscriber: #build_consumer
logger.error("getting from a publisher")
return []
ml = []
m = self.getNewMessage()
if m is not None:
fetched = 1
ml.append(m)
while fetched < self.o['batch']:
m = self.getNewMessage()
if m is None:
break
ml.append(m)
fetched += 1
return ml
[docs]
def getNewMessage(self) -> sarracenia.Message:
if not self.is_subscriber: #build_consumer
logger.error("getting from a publisher")
return None
try:
if not self.connection:
self.getSetup()
raw_msg = self.channel.basic_get(self.o['queueName'])
if (raw_msg is None) and (self.connection.connected):
return None
else:
self.metrics['rxByteCount'] += len(raw_msg.body)
try:
msg = self._msgRawToDict(raw_msg)
except Exception as err:
logger.error("message decode failed. raw message: %s" % raw_msg.body )
logger.debug('Exception details: ', exc_info=True)
msg = None
if msg is None:
self.metrics['rxBadCount'] += 1
return None
else:
self.metrics['rxGoodCount'] += 1
self.metrics['rxLast'] = sarracenia.nowstr()
if hasattr(self.o, 'fixed_headers'):
for k in self.o.fixed_headers:
msg[k] = self.o.fixed_headers[k]
logger.debug("new msg: %s" % msg)
return msg
except Exception as err:
logger.warning("failed %s: %s" %
(self.o['queueName'], err))
logger.debug('Exception details: ', exc_info=True)
if not self.o['message_strategy']['stubborn']:
return None
logger.warning('lost connection to broker')
self.close()
time.sleep(1)
return None
[docs]
def ack(self, m: sarracenia.Message) -> None:
"""
do what you need to acknowledge that processing of a message is done.
"""
if not self.is_subscriber: #build_consumer
logger.error("getting from a publisher")
return
# silent success. retry messages will not have an ack_id, and so will not require acknowledgement.
if not 'ack_id' in m:
#logger.warning( f"no ackid present" )
return
#logger.info( f"acknowledging {m['ack_id']}" )
ebo = 1
while True:
try:
if hasattr(self, 'channel'):
self.channel.basic_ack(m['ack_id'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
# Break loop if no exceptions encountered
return
except Exception as err:
logger.warning("failed for tag: %s: %s" % (m['ack_id'], err))
logger.debug('Exception details: ', exc_info=True)
# Cleanly close partially broken connection and restablish
self.close()
self.getSetup()
if ebo < 60: ebo *= 2
logger.info(
"Sleeping {} seconds before re-trying ack...".format(ebo))
time.sleep(ebo)
[docs]
def putNewMessage(self,
message: sarracenia.Message,
content_type: str = 'application/json',
exchange: str = None ) -> bool:
"""
put a new message out, to the configured exchange by default.
"""
if self.is_subscriber: #build_consumer
logger.error("publishing from a consumer")
return False
# Check connection and channel status, try to reconnect if not connected
if (not self.connection) or (not self.connection.connected) or (not self.channel.is_open):
try:
self.close()
self.putSetup()
except Exception as err:
logger.warning(f"failed, connection was closed/broken and could not be re-opened {exchange}: {err}")
logger.debug('Exception details: ', exc_info=True)
return False
# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(message)
version = body['_format']
if '_deleteOnPost' in body:
# FIXME: need to delete because building entire JSON object at once.
# makes this routine alter the message. Ideally, would use incremental
# method to build json and _deleteOnPost would be a guide of what to skip.
# library for that is jsonfile, but not present in repos so far.
for k in body['_deleteOnPost']:
if k in body:
del body[k]
del body['_deleteOnPost']
if not exchange:
if (type(self.o['exchange']) is list):
if (len(self.o['exchange']) > 1):
if ( 'exchangeSplit' in self.o) and self.o['exchangeSplit'] > 1:
# FIXME: assert ( len(self.o['exchange']) == self.o['post_exchangeSplit'] )
# if that isn't true... then there is something wrong... should we check ?
if 'exchangeSplitOverride' in message:
idx = int(message['exchangeSplitOverride'])%len(self.o['exchange'])
else:
idx = sum( bytearray(body['identity']['value'],
'ascii')) % len(self.o['exchange'])
exchange = self.o['exchange'][idx]
else:
logger.error(
'do not know which exchange to publish to: %s' %
self.o['exchange'])
return False
else:
exchange = self.o['exchange'][0]
else:
exchange = self.o['exchange']
if self.o['message_ttl']:
ttl = "%d" * int(
sarracenia.durationToSeconds(self.o['message_ttl']) * 1000)
else:
ttl = "0"
if 'persistent' in self.o:
deliv_mode = 2 if self.o['persistent'] else 1
else:
deliv_mode = 2
raw_body, headers, content_type = PostFormat.exportAny( body, version, self.o['topicPrefix'], self.o )
topic = '.'.join(headers['topic'])
topic = topic.replace('#', '%23')
topic = topic.replace('*', '%22')
if len(topic) >= 255: # ensure topic is <= 255 characters
logger.error("message topic too long, truncating")
mxlen = amqp_ss_maxlen
while (topic.encode("utf8")[mxlen - 1] & 0xc0 == 0xc0):
mxlen -= 1
topic = topic.encode("utf8")[0:mxlen].decode("utf8")
if self.o['messageDebugDump']:
logger.info('raw message body: version: %s type: %s %s' %
(version, type(raw_body), raw_body))
logger.info('raw message headers: type: %s value: %s' % (type(headers), headers))
message['post_topic'] = topic
message['post_exchange'] = exchange
message['_deleteOnPost'] |= set( ['post_exchange', 'post_topic'] )
del headers['topic']
if headers :
for k in headers:
if (type(headers[k]) is str) and (len(headers[k]) >=
amqp_ss_maxlen):
logger.error("message header %s too long, dropping" % k)
return False
AMQP_Message = amqp.Message(raw_body,
content_type=content_type,
application_headers=headers,
expire=ttl,
delivery_mode=deliv_mode)
self.metrics['txByteCount'] += len(raw_body)
if headers:
self.metrics['txByteCount'] += len(''.join(str(headers)))
self.metrics['txLast'] = sarracenia.nowstr()
# timeout option is a float and default is 0.0. basic_publish wants int or None for no timeout
try:
if self.o['timeout']:
pub_timeout = int(self.o['timeout'])
else:
pub_timeout = None
except Exception as err:
logger.debug('Set pub_timeout to None. Exception details: ', exc_info=True)
pub_timeout = None
body=raw_body
ebo = 1
try:
logger.debug( f"trying to publish body: {body} headers: {headers} to {exchange} under: {topic} " )
self.channel.basic_publish(AMQP_Message, exchange, topic, timeout=pub_timeout)
# Issue #732: tx_commit can get stuck forever
self.channel.tx_commit()
logger.debug("published body: {} headers: {} to {} under: {} ".format(
body, headers, exchange, topic))
self.metrics['txGoodCount'] += 1
return True # no failure == success :-)
except Exception as err:
logger.warning("failed %s: %s" % (exchange, err))
logger.debug('Exception details: ', exc_info=True)
self.metrics['txBadCount'] += 1
# Issue #466: commenting this out until message_strategy stubborn is working correctly (Issue #537)
# Always return False when an error occurs and use the DiskQueues to retry, instead of looping. This should
# eventually be configurable with message_strategy stubborn
# if True or not self.o['message_strategy']['stubborn']:
# return False
self.close()
return False # instead of looping
def close(self) -> None:
try:
if self.connection:
self.connection.collect()
self.connection.close()
except Exception as err:
logger.error("sr_amqp/close 2: {}".format(err))
logger.debug("sr_amqp/close 2 Exception details:", exc_info=True)
# FIXME toclose not useful as we don't close channels anymore
self.metricsDisconnect()
self.connection = None