Source code for sarracenia.rabbitmq_admin

#!/usr/bin/env python3
   rabbitmq administration bindings, to allow sr to invoke broker management functions.

import sys
import urllib, urllib.parse
import base64
import logging
import os
import re
import socket
import subprocess

#rabbitmqadmin = '.' + os.sep + 'rabbitmqadmin'
rabbitmqadmin = 'rabbitmqadmin'

logger = logging.getLogger(__name__)

#logger.setLevel( logging.DEBUG )

# rabbitmqadmin

[docs] def exec_rabbitmqadmin(url, options, simulate=False): """ invoke rabbitmqadmin using a sub-process, with the given options. """ try: command = rabbitmqadmin command += ' --host \'' + url.hostname command += '\' --user \'' + url.username command += '\' -p \'' + urllib.parse.unquote(url.password) command += '\' --format raw_json ' if url.scheme == 'amqps': command += ' --ssl --port=15671 ' command += ' ' + options logger.debug("command = %s" % command) if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5): if logger: logger.debug("using subprocess.getstatusoutput") if simulate: print("dry_run: %s" % ' '.join(command)) return 0, None return subprocess.getstatusoutput(command) else: cmdlin = command.replace("'", '') cmdlst = cmdlin.split() if logger: logger.debug("using cmdlst=%s" % ' '.join(cmdlst)) if simulate: print("dry_run: %s" % cmdlin) return 0, None rclass =, stdout=subprocess.PIPE) if rclass.returncode == 0: output = rclass.stdout if type(output) == bytes: output = output.decode("utf-8") return rclass.returncode, output return rclass.returncode, None except: if sys.version_info.major < 3 or (sys.version_info.major == 3 and sys.version_info.minor < 5): if logger: logger.error("trying run command %s %s" % command) else: if logger: logger.error("trying run command %s %s" % ' '.join(cmdlst)) if logger: logger.debug('Exception details:', exc_info=True) return 0, None
[docs] def add_user(url, role, user, passwd, simulate): """ add the given user with the given credentials. """ declare = "declare user name='%s' password=" % user if passwd != None: declare += "\'%s\'" % urllib.parse.unquote(passwd) if role == 'admin': declare += " tags=administrator " else: declare += ' tags="" ' dummy = run_rabbitmqadmin(url, declare, simulate) # admin and feeder gets the same permissions if role in ['admin,', 'feeder', 'manager']: c = "configure=.*" w = "write=.*" r = "read=.*""permission user \'%s\' role %s %s %s %s " % (user + '@' + url.hostname, 'feeder', c, w, r)) declare = "declare permission vhost=/ user=%s %s %s %s" % (user, c, w, r) dummy = run_rabbitmqadmin(url, declare, simulate) return # source if role in ['source']: c = "configure=^q_%s.*|^xs_%s.*" % (user, user) w = "write=^q_%s.*|^xs_%s.*" % (user, user) r = "read=^q_%s.*|^x[lrs]_%s.*|^x.*public$" % (user, user)"permission user '%s' role %s %s %s %s " % (user + '@' + url.hostname, 'source', c, w, r)) declare = "declare permission vhost=/ user=%s %s %s %s" % (user, c, w, r) dummy = run_rabbitmqadmin(url, declare, simulate) return # subscribe if role in ['subscribe', 'subscriber']: c = "configure=^q_%s.*" % user w = "write=^q_%s.*|^xs_%s$" % (user, user) r = "read=^q_%s.*|^x[lrs]_%s.*|^x.*public$" % (user, user)"permission user '%s' role %s %s %s %s " % (user + '@' + url.hostname, 'source', c, w, r)) declare = "declare permission vhost=/ user=%s %s %s %s" % (user, c, w, r) dummy = run_rabbitmqadmin(url, declare, simulate)
[docs] def del_user(url, user, simulate): """ delete user from the given broker. """"deleting user %s" % user) delete = "delete user name='%s'" % user dummy = run_rabbitmqadmin(url, delete, simulate)
[docs] def get_exchanges(url): """ get the list of existing exchanges. """"geting exchanges") cmd = "list exchanges name" return run_rabbitmqadmin(url, cmd)
[docs] def get_queues(url): """ get the list of existing queues. """"geting queues") cmd = "list queues name messages state" return run_rabbitmqadmin(url, cmd)
[docs] def get_users(url): """ get the list of existing users. """"geting users") cmd = "list users name" return run_rabbitmqadmin(url, cmd)
#=========================== # direct access to rabbitmq management plugin # this is what rabbitmqadmin does under the cover #===========================
[docs] def broker_get_exchanges(url, ssl_key_file=None, ssl_cert_file=None): """ get the list of existing exchanges using a url query. """ import http.client method = "GET" path = "/api/exchanges?columns=name" if url.scheme == 'amqps': conn = http.client.HTTPSConnection(url.hostname, "15671", ssl_key_file, ssl_cert_file) else: conn = http.client.HTTPConnection(url.hostname, "15672") bcredentials = bytes( url.username + ':' + urllib.parse.unquote(url.password), "utf-8") b64credentials = base64.b64encode(bcredentials).decode("ascii") headers = {"Authorization": "Basic " + b64credentials} try: conn.request(method, path, "", headers) except socket.error as e: print("Could not connect: {0}".format(e)) resp = conn.getresponse() answer = if b'error' in answer[:5]: print(answer) return [] lst = eval(answer) exchanges = [] for i in lst: ex = i["name"] if ex == '': continue exchanges.append(ex) return exchanges
[docs] def user_access(url, user): """ Given an administrative URL, return a list of exchanges and queues the user can access. lox = list of exchanges, just a list of names. loq = array of queues, where the value of each is the number of messages ready. return value:: { 'exchanges': { 'configure': lox, 'write': lox, 'read': lox }, 'queues' : { 'configure': loq, 'write': loq, 'read': loq }, 'bindings' : { <queue> : { 'exchange': <exchange> , 'key' : <routing_key> } } } """ import json import re found = False for p in json.loads(exec_rabbitmqadmin(url, "list permissions")[1]): if user == p['user']: found = True re_cf = re.compile(p['configure']) re_wr = re.compile(p['write']) re_rd = re.compile(p['read']) #exchanges = rabbitmq_broker_get_exchanges(url) x_cf = [] x_wr = [] x_rd = [] for x in list( map(lambda x: x['name'], json.loads(exec_rabbitmqadmin(url, "list exchanges name")[1]))): #print( "x: %s\n" % x ) if re_cf.match(x): x_cf += [x] continue if re_wr.match(x): x_wr += [x] continue if re_rd.match(x): x_rd += [x] continue q_cf = {} q_wr = {} q_rd = {} for qq in json.loads(exec_rabbitmqadmin(url, "list queues")[1]): #print( "qq name=%s ready=%d\n\n" % (qq['name'], qq['messages_ready_ram']) ) q = qq['name'] nq = qq['messages_ready_ram'] if re_cf.match(q): q_cf[q] = nq continue if re_wr.match(q): q_wr[q] = nq continue if re_rd.match(q): q_rd[q] = nq continue b = {} for bb in json.loads(exec_rabbitmqadmin(url, "list bindings")[1]): #print("\n binding: %s" % bb ) if bb['source'] != '': q = bb['destination'] if (q in q_cf) or (q in q_wr) or (q in q_rd): #print(" exchange: %s, queue: %s, topic: %s" % ( bb['source'], q, bb['routing_key'] ) ) if not q in b: b[q] = {'exchange': bb['source'], 'key': bb['routing_key']} else: b[q] += { 'exchange': bb['source'], 'key': bb['routing_key'] } return( { 'exchanges': { 'configure' : x_cf , 'write': x_wr, 'read': x_rd }, \ 'queues': { 'configure' : q_cf , 'write': q_wr, 'read': q_rd }, \ 'bindings': b } )
if __name__ == "__main__": url = urllib.parse.urlparse(sys.argv[1]) print(exec_rabbitmqadmin(url, "list queue names")[1]) import json lex = list( map(lambda x: x['name'], json.loads(exec_rabbitmqadmin(url, "list exchanges name")[1]))) print("exchanges: %s\n\n" % lex) u = 'tsource' up = rabbitmq_user_access(url, u) print("permissions for %s: \nqueues: %s\nexchanges: %s\nbindings %s" % (u, up['queues'], up['exchanges'], up['bindings'])) #print( "\n\nbindings: %s" % json.loads(exec_rabbitmqadmin(url,"list bindings")[1]) )
[docs] def run_rabbitmqadmin(url, options, simulate=False): """ spawn a subprocess to run rabbitmqadmin with the given options. capture result. """ logger.debug("sr_rabbit run_rabbitmqadmin %s" % options) try: (status, answer) = exec_rabbitmqadmin(url, options, simulate) if simulate: return if status != 0 or answer == None or len( answer) == 0 or 'error' in answer: logger.error("run_rabbitmqadmin invocation failed") return [] if answer == None or len(answer) == 0: return [] lst = [] try: lst = eval(answer) except: pass return lst except: logger.error("sr_rabbit/run_rabbitmqadmin failed with option '%s'" % options) logger.debug('Exception details: ', exc_info=True) return []