Source code for sarracenia.rabbitmq_admin

#!/usr/bin/env python3
"""
   rabbitmq administration bindings, to allow sr to invoke broker management functions.

"""
import sys
import urllib, urllib.parse
import base64
import logging
import os
import re
import socket
import subprocess

#rabbitmqadmin = '.' + os.sep + 'rabbitmqadmin'
rabbitmqadmin = 'rabbitmqadmin'

logger = logging.getLogger(__name__)

#logger.setLevel( logging.DEBUG )

#===========================
# rabbitmqadmin
#===========================


[docs] def exec_rabbitmqadmin(url, options, simulate=False): """ invoke rabbitmqadmin using a sub-process, with the given options. """ try: command = rabbitmqadmin command += ' --host \'' + url.hostname command += '\' --user \'' + url.username command += '\' -p \'' + urllib.parse.unquote(url.password) command += '\' --format raw_json ' if url.scheme == 'amqps': command += ' --ssl --port=15671 ' command += ' ' + options logger.debug("command = %s" % command) if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5): if logger: logger.debug("using subprocess.getstatusoutput") if simulate: print("dry_run: %s" % ' '.join(command)) return 0, None return subprocess.getstatusoutput(command) else: cmdlin = command.replace("'", '') cmdlst = cmdlin.split() if logger: logger.debug("using subprocess.run cmdlst=%s" % ' '.join(cmdlst)) if simulate: print("dry_run: %s" % cmdlin) return 0, None rclass = subprocess.run(cmdlst, stdout=subprocess.PIPE) if rclass.returncode == 0: output = rclass.stdout if type(output) == bytes: output = output.decode("utf-8") return rclass.returncode, output return rclass.returncode, None except: if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5): if logger: logger.error("trying run command %s %s" % command) else: if logger: logger.error("trying run command %s %s" % ' '.join(cmdlst)) if logger: logger.debug('Exception details:', exc_info=True) return 0, None
[docs] def add_user(url, role, user, passwd, simulate): """ add the given user with the given credentials. """ declare = "declare user name='%s' password=" % user if passwd != None: declare += "\'%s\'" % urllib.parse.unquote(passwd) if role == 'admin': declare += " tags=administrator " else: declare += ' tags="" ' dummy = run_rabbitmqadmin(url, declare, simulate) # admin and feeder gets the same permissions if role in ['admin,', 'feeder', 'manager']: c = "configure=.*" w = "write=.*" r = "read=.*" logger.info("permission user \'%s\' role %s %s %s %s " % (user + '@' + url.hostname, 'feeder', c, w, r)) declare = "declare permission vhost=/ user=%s %s %s %s" % (user, c, w, r) dummy = run_rabbitmqadmin(url, declare, simulate) return # source if role in ['source']: c = "configure=^q_%s.*|^xs_%s.*" % (user, user) w = "write=^q_%s.*|^xs_%s.*" % (user, user) r = "read=^q_%s.*|^x[lrs]_%s.*|^x.*public$" % (user, user) logger.info("permission user '%s' role %s %s %s %s " % (user + '@' + url.hostname, 'source', c, w, r)) declare = "declare permission vhost=/ user=%s %s %s %s" % (user, c, w, r) dummy = run_rabbitmqadmin(url, declare, simulate) return # subscribe if role in ['subscribe', 'subscriber']: c = "configure=^q_%s.*" % user w = "write=^q_%s.*|^xs_%s$" % (user, user) r = "read=^q_%s.*|^x[lrs]_%s.*|^x.*public$" % (user, user) logger.info("permission user '%s' role %s %s %s %s " % (user + '@' + url.hostname, 'source', c, w, r)) declare = "declare permission vhost=/ user=%s %s %s %s" % (user, c, w, r) dummy = run_rabbitmqadmin(url, declare, simulate)
[docs] def del_user(url, user, simulate): """ delete user from the given broker. """ logger.info("deleting user %s" % user) delete = "delete user name='%s'" % user dummy = run_rabbitmqadmin(url, delete, simulate)
[docs] def get_exchanges(url): """ get the list of existing exchanges. """ logger.info("geting exchanges") cmd = "list exchanges name" return run_rabbitmqadmin(url, cmd)
[docs] def get_queues(url): """ get the list of existing queues. """ logger.info("geting queues") cmd = "list queues name messages state" return run_rabbitmqadmin(url, cmd)
[docs] def get_users(url): """ get the list of existing users. """ logger.info("geting users") cmd = "list users name" return run_rabbitmqadmin(url, cmd)
#=========================== # direct access to rabbitmq management plugin # this is what rabbitmqadmin does under the cover #===========================
[docs] def broker_get_exchanges(url, ssl_key_file=None, ssl_cert_file=None): """ get the list of existing exchanges using a url query. """ import http.client method = "GET" path = "/api/exchanges?columns=name" if url.scheme == 'amqps': conn = http.client.HTTPSConnection(url.hostname, "15671", ssl_key_file, ssl_cert_file) else: conn = http.client.HTTPConnection(url.hostname, "15672") bcredentials = bytes( url.username + ':' + urllib.parse.unquote(url.password), "utf-8") b64credentials = base64.b64encode(bcredentials).decode("ascii") headers = {"Authorization": "Basic " + b64credentials} try: conn.request(method, path, "", headers) except socket.error as e: print("Could not connect: {0}".format(e)) resp = conn.getresponse() answer = resp.read() if b'error' in answer[:5]: print(answer) return [] lst = eval(answer) exchanges = [] for i in lst: ex = i["name"] if ex == '': continue exchanges.append(ex) return exchanges
[docs] def user_access(url, user): """ Given an administrative URL, return a list of exchanges and queues the user can access. lox = list of exchanges, just a list of names. loq = array of queues, where the value of each is the number of messages ready. return value:: { 'exchanges': { 'configure': lox, 'write': lox, 'read': lox }, 'queues' : { 'configure': loq, 'write': loq, 'read': loq }, 'bindings' : { <queue> : { 'exchange': <exchange> , 'key' : <routing_key> } } } """ import json import re found = False for p in json.loads(exec_rabbitmqadmin(url, "list permissions")[1]): if user == p['user']: found = True re_cf = re.compile(p['configure']) re_wr = re.compile(p['write']) re_rd = re.compile(p['read']) #exchanges = rabbitmq_broker_get_exchanges(url) x_cf = [] x_wr = [] x_rd = [] for x in list( map(lambda x: x['name'], json.loads(exec_rabbitmqadmin(url, "list exchanges name")[1]))): #print( "x: %s\n" % x ) if re_cf.match(x): x_cf += [x] continue if re_wr.match(x): x_wr += [x] continue if re_rd.match(x): x_rd += [x] continue q_cf = {} q_wr = {} q_rd = {} for qq in json.loads(exec_rabbitmqadmin(url, "list queues")[1]): #print( "qq name=%s ready=%d\n\n" % (qq['name'], qq['messages_ready_ram']) ) q = qq['name'] nq = qq['messages_ready_ram'] if re_cf.match(q): q_cf[q] = nq continue if re_wr.match(q): q_wr[q] = nq continue if re_rd.match(q): q_rd[q] = nq continue b = {} for bb in json.loads(exec_rabbitmqadmin(url, "list bindings")[1]): #print("\n binding: %s" % bb ) if bb['source'] != '': q = bb['destination'] if (q in q_cf) or (q in q_wr) or (q in q_rd): #print(" exchange: %s, queue: %s, topic: %s" % ( bb['source'], q, bb['routing_key'] ) ) if not q in b: b[q] = {'exchange': bb['source'], 'key': bb['routing_key']} else: b[q] += { 'exchange': bb['source'], 'key': bb['routing_key'] } return( { 'exchanges': { 'configure' : x_cf , 'write': x_wr, 'read': x_rd }, \ 'queues': { 'configure' : q_cf , 'write': q_wr, 'read': q_rd }, \ 'bindings': b } )
if __name__ == "__main__": url = urllib.parse.urlparse(sys.argv[1]) print(exec_rabbitmqadmin(url, "list queue names")[1]) import json lex = list( map(lambda x: x['name'], json.loads(exec_rabbitmqadmin(url, "list exchanges name")[1]))) print("exchanges: %s\n\n" % lex) u = 'tsource' up = rabbitmq_user_access(url, u) print("permissions for %s: \nqueues: %s\nexchanges: %s\nbindings %s" % (u, up['queues'], up['exchanges'], up['bindings'])) #print( "\n\nbindings: %s" % json.loads(exec_rabbitmqadmin(url,"list bindings")[1]) )
[docs] def run_rabbitmqadmin(url, options, simulate=False): """ spawn a subprocess to run rabbitmqadmin with the given options. capture result. """ logger.debug("sr_rabbit run_rabbitmqadmin %s" % options) try: (status, answer) = exec_rabbitmqadmin(url, options, simulate) if simulate: return if status != 0 or answer == None or len( answer) == 0 or 'error' in answer: logger.error("run_rabbitmqadmin invocation failed") return [] if answer == None or len(answer) == 0: return [] lst = [] try: lst = eval(answer) except: pass return lst except: logger.error("sr_rabbit/run_rabbitmqadmin failed with option '%s'" % options) logger.debug('Exception details: ', exc_info=True) return []