Administration de Rabbitmq Adddendum
Les anciennes modifications que les gens voulaient garder?
Introduction
AMQP signifie Advanced Message Queuing Protocol. C’est la définition d’un protocole qui vient de la nécessité de standardiser un système de changement de message asynchrone. Dans le jargon de l’AMQP, nous parlerons des producteurs de messages, des consommateurs de messages et des courtiers.
installation RABBITMQ-SERVER
Sur nos machines qui doivent traiter les messages AMQP, nous installons le broker, en installant le paquet rabbitmq-server_3.3.5-1_all.deb. L’installation de base se fait comme suit sur toutes nos machines
# installing package taken on the rabbitmq homepage
# rabbitmq-server version > 3.3.x required to use ldap for passwords verification only
apt-get install erlang-nox
dpkg -i /tmp/rabbitmq-server_3.3.5-1_all.deb
# create anonymous user
# password ********* provided in potato
# conf write read
rabbitmqctl add_user anonymous *********
rabbitmqctl set_permissions -p / anonymous "^xpublic|^amq.gen.*$|^cmc.*$" "^amq.gen.*$|^cmc.*$" "^xpublic|^amq.gen.*$|^cmc.*$"
rabbitmqctl list_user_permissions anonymous
# create feeder user
# password ********* provided in potato
# conf write read
rabbitmqctl add_user feeder ********
rabbitmqctl set_permissions -p / feeder ".*" ".*" ".*"
rabbitmqctl list_user_permissions feeder
# create administrator user
# password ********* provided in potato
rabbitmqctl add_user root *********
rabbitmqctl set_user_tags root administrator
# takeaway administrator privileges from guest
rabbitmqctl set_user_tags guest
rabbitmqctl list_user_permissions guest
rabbitmqctl change_password guest *************
# list users
rabbitmqctl list_users
# enabling management web application
# this is important since sr_rabbit uses this management facility/port access
# to retrieve some important info
rabbitmq-plugins enable rabbitmq_management
/etc/init.d/rabbitmq-server restart
Installation d’un RABBITMQ-SERVER
Sur le bunny, nous avons opté pour une installation en cluster. Pour ce faire, nous suivons les instructions suivantes:
Stop rabbitmq-server on all nodes....
/var/lib/rabbitmq/.erlang.cookie same on all nodes
on each node restart /etc/init.d/rabbitmq-server stop/start
on one of the node
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@"other node"
rabbitmqctl start_app
rabbitmqctl cluster_status
# having high availability queue...
# here all queues that starts with "cmc." will be highly available on all the cluster nodes
rabbitmqctl set_policy ha-all "^cmc\." '{"ha-mode":"all"}'
installation ldap RABBITMQ-SERVER
Sur les serveurs où nous voulons avoir une authentification en utilisant les instructions suivantes:
rabbitmq-plugins enable rabbitmq_auth_backend_ldap
# replace username by ldap username
# clear password (will be verified through the ldap one)
rabbitmqctl add_user username aaa
rabbitmqctl clear_password username
rabbitmqctl set_permissions -p / username "^xpublic|^amq.gen.*$|^cmc.*$" "^amq.gen.*$|^cmc.*$" "^xpublic|^amq.gen.*$|^cmc.*$"
Et nous configurons les services LDAP dans le fichier de configuration rabbitmq-server (ancienne configuration de test de ldap-dev qui ne fonctionnait que…):
cat /etc/rabbitmq/rabbitmq.config
[
{rabbit, [{auth_backends, [ {rabbit_auth_backend_ldap,rabbit_auth_backend_internal}, rabbit_auth_backend_internal]}]},
{rabbitmq_auth_backend_ldap,
[
{servers, ["ldap-dev.cmc.ec.gc.ca"]},
{user_dn_pattern, "uid=${username},ou=People,ou=depot,dc=ec,dc=gc,dc=ca"},
{use_ssl, false},
{port, 389},
{log, true},
{network, true},
{vhost_access_query, {in_group,
"ou=${vhost}-users,ou=vhosts,dc=ec,dc=gc,dc=ca"}},
{resource_access_query,
{for, [{permission, configure, {in_group, "cn=admin,dc=ec,dc=gc,dc=ca"}},
{permission, write,
{for, [{resource, queue, {in_group, "cn=admin,dc=ec,dc=gc,dc=ca"}},
{resource, exchange, {constant, true}}]}},
{permission, read,
{for, [{resource, exchange, {in_group, "cn=admin,dc=ec,dc=gc,dc=ca"}},
{resource, queue, {constant, true}}]}}
]
}},
{tag_queries, [{administrator, {constant, false}},
{management, {constant, true}}]}
]
}
].
Utilisation de l’AMQP sur DD (DDI, DD.BETA)
Nous (Peter) voulions faire une implémentation d’AMQP dans METPX. Pour ce faire, nous utilisons la bibliothèque python-amqplib qui implémente les fonctionnalités nécessaires d’AMQP en python. Nous avons ainsi développé un pxSender de type amqp qui est le producteur de messages de notification ainsi qu’un pxReceiver de type amqp qui sert de consommateur de messages de notification. En tant que courtier, nous utilisons rabbitmq-server qui est un paquet Debian standard d’un courtier AMQP.
Un pxSender de type amqp, lit le contenu d’un fichier dans sa fil d’attente, crée un message auquel il joint un “topic” et l’envoie au broker. Un pxReceiver de type amqp annoncera au broker le “topic” pour lequel il est intéressé à recevoir des messages de notification, et le broker lui enverra chaque message correspondant à son choix.
Comme un message peut être n’importe quoi, au niveau du pxSender, nous avons également joint le nom du fichier d’où provient le message. Ainsi, dans notre pxReceiver, nous pouvons assurer le contenu du message dans le nom de fichier correspondant. Cette astuce n’est inutile que pour les changements amqp entre un expéditeur et un récepteur amqp…
Notifications pour DD
Nous avons trouvé dans AMQP une opportunité d’annoncer des produits lorsqu’ils arrivent sur DD. Donc un utilisateur, au lieu de vérifier constamment si un produit est présent sur DD. Pour le modifier, il pouvait s’abonner (topic AMQP) pour recevoir un message (l’url du produit) qui ne serait omis qu’à la livraison du produit sur DD. Nous ne ferions pas cet exercice pour les newsletters… mais pour d’autres produits (grib, images… etc.)
Pour mettre cela en œuvre, nous avons utilisé une possibilité de pxSender, le sender_script. Nous avons écrit un script sftp_amqp.py qui effectue les livraisons à DD et pour chaque produit, il crée un fichier contenant l’URL sous laquelle le produit sera présent. Voici le début de la configuration de wxo-b1-oper-dd.conf
type script
send_script sftp_amqp.py
# connection info
protocol ftp
host wxo-b1.cmc.ec.gc.ca
user wxofeed
password **********
ftp_mode active
noduplicates false
# no filename validation (pds format)
validation False
# delivery method
lock umask
chmod 775
batch 100
Nous voyons dans cette configuration que toutes les informations pour un expéditeur à fichier unique sont là. Mais parce que le type est script… et la send_script sftp_amqp.py est fournie, nous sommes en mesure de demander à notre expéditeur d’en faire plus…
Le fichier contenant l’URL est placé sous le txq d’un expéditeur AMQP /apps/px/txq/dd-notify-wxo-b1 pour que la notification AMQP soit effectuée. Pour envoyer les fichiers dans cette fil d’attente, un expéditeur doit avoir écrit dd-notify-wxo-b1.conf qui est configuré comme suit
type amqp
validation False
noduplicates False
protocol amqp
host wxo-b1.cmc.ec.gc.ca
user feeder
password ********
exchange_name cmc
exchange_key exp.dd.notify.${0}
exchange_type topic
reject ^ensemble.naefs.grib2.raw.*
accept ^(.*)\+\+.*
Encore une fois, le cl du topic contient une partie programmée. La partie ${0} contient l’arborescence où le produit est placé sur dd… Par exemple, voici une ligne de journal de dd-notify-wxo-b1.log:
2013-06-06 14:47:11,368 [INFO] (86 Bytes) Message radar.24_HR_ACCUM.GIF.XSS++201306061440_XSS_24_HR_ACCUM_MM.gif:URP:XSS:RADAR:GIF::20130606144709 delivered (lat=1.368449,speed=168950.887119)
Et ainsi serait le cl. |
|
Et l’emplacement du fichier |
|
Et l’URL complète dans le message |
|
Utilitaires installés sur les serveurs DD
Lorsqu’un client se connecte au broker (rabbitmq-server), il doit créer une file d’attente et l’attacher à un échange. Nous pouvons donner à cette fil d’attente l’option qu’elle s’autodétruit lorsqu’elle n’est plus utilisée ou qu’elle est conservée et continue d’empiler les produits si le client est hors ligne. En général, nous aimerions que la fil d’attente soit préservée et donc que la reprise de la connexion redémarre la collection de produits sans perte.
- queue_manager.py
Le rabbitmq-server ne détruira jamais une fil d’attente créée par un client si elle n’est pas en mode de suppression automatique (et encore moins si elle est créée avec durabilité). Cela peut causer un problème. Par exemple, un client qui développe un processus, peut changer d’IDE plusieurs fois et entasser sur le serveur une multitude de files d’attente qui ne seront jamais utilisées. Nous avons donc créé un script queue_manager.py qui vérifie si les files d’attente inutilisées ont plus de X produits en attente ou prennent plus de Y Mo… Si c’est le cas, ils sont détruits par le script.
Au moment de la rédaction du présent document, les limites sont les suivantes :
25000 messages and 50Mb.
- dd-xml-inotify.py
Sur notre datamart public, il y a des produits qui ne proviennent pas directement de pds/px/pxatx. Comme nos notifications sont effectuées à partir de la livraison du produit, nous n’avons pas de messages de notification pour eux. C’est le cas pour les produits XML sous les répertoires :
citypage_weather
andmarine_weather
. Pour surmonter cette situation, le démon dd-xml-inotify.py a été créé et installé. Ce script python utilise inotify pour surveiller la modification des produits sous leurs répertoires. Si un produit est modifié ou ajouté, une notification amqp est envoyée au serveur. Ainsi, tous les produits du datamart sont couverts par l’envoi de message.
Utilisation d’AMQP avec URP, BUNNY, PDS-OP
Note
s’applique également au développement…
De URP-1/2 annoncer à BUNNY-OP qu’un produit est prêt
Sur urp-1/2 un metpx roule l’expéditeur amqp_expose_db.conf qui annonce qu’un produit vient d’arriver dans la db de metpx avec un message de la forme
Md5sum of product name file-size url dbname
a985c32cbdee8af2ab5d7b8f6022e781 498081 http://urp-1.cmc.ec.gc.ca/ db/20150120/RADAR/URP/IWA/201501201810~~PA,60,10,PA_PRECIPET,MM_HR,MM:URP:IWA:RADAR:META::20150120180902
Ces messages AMQP sont envoyés au serveur rabbitmq sur bunny-op avec une clé d’échange qui commence par v00.urp.input suivie par convention par le chemin de db avec le ‘/’ remplacé par ‘.’.
Note
que urp-1/2 exécute apache et que l’annonce du produit se trouve dans la base de données de metpx et est visible à partir de l’URL du message.
BUNNY-OP et dd_dispatcher.py
bunny-op est un vip qui vit sur bunny1-op ou bunny2-op. C’est avec keepalived que nous nous assurons que ce vip réside sur l’un des bunny-op. Nous testons également que rabbitmq-server fonctionne sur le même serveur. La partie configuration de keepalived qui traite de le vip est:
vip bunny-op 142.135.12.59 port 5672
vrrp_script chk_rabbitmq {
script "killall -0 rabbitmq-server"
interval 2
}
vrrp_instance bunny-op {
state BACKUP
interface eth0
virtual_router_id 247
priority 150
track_interface {
eth0
}
advert_int 1
preempt_delay 5
authentication {
auth_type PASS
auth_pass bunop
}
virtual_ipaddress {
# bunny-op
142.135.12.59 dev eth0
}
track_script {
chk_rabbitmq
}
}
Les serveurs rabbitmq sur ces machines sont installés dans un cluster.
Nous mettons la haute disponibilité sur les files d’attente en commençant par cmc.*
.
Sur chacune des machines, exécutez l’utilitaire dd_dispatcher.py
.
Ce programme vérifie si le vip bunny-op et proc dera a son travail uniquement sur le serveur où vit le vip.
(S’il y a un commutateur, détection automatique en 5 secondes et les files d’attente restent inchangées)
L’utilitaire dd_dispatcher.py s’abonne aux messages de notification v00.urp.input.#
et redirige
ainsi les messages de notification des 2 serveurs opérationnels URP.
À la réception d’un premier produit, le md5dum du produit est placé dans une cache et le message est re-expédié
mais cette fois avec v00.urp.notify
comme clé d’échange.
Si un autre message arrive de v00.urp.input
avec le même md5sum que le premier, il est ignoré,
de sorte que les produits annoncés à partir de la clé d’échange v00.urp.notify
sont uniques et représentent la première arrivée des 2 URP opérationnels.
Réceptions PDS-OP de messages de notification de répartition, wget de produits radar
Sur pds-op, un récepteur pull_urp, exécutez le fx_script pull_amqp_wget.py. Dans ce script, la commande suivante
# shared queue : each pull receive 1 message (prefetch_count=1)
self.channel.basic_qos(prefetch_size=0,prefetch_count=1,a_global=False)
fait que la distribution des messages de notification v00.urp.notify
sera répartie
également sur les 5 serveurs sous pds-op. Nous garantissons donc une traction distribuée.
Pour chaque message du formulaire
a985c32cbdee8af2ab5d7b8f6022e781 498081 http://urp-1.cmc.ec.gc.ca/ db/20150120/RADAR/URP/IWA/201501201810~~PA,60,10,PA_PRECIPET,MM_HR,MM:URP:IWA:RADAR:META::20150120180902
l’url est rebuted à partir des 2 derniers champs du message et un wget du produit est fait et placé dans la fil d’attente du récepteur qui est ensuite ignoré / acheminé de manière ordinaire.
Vérification / Dépannage
Dans l’ordre de production
- Sur
urp-1/2
: Vérifiez que les produits radar sont générés sur urp-1/2.
Vérifiez que les notifications sont générées sur urp-1/2 /apps/px/log/tx_amqp_expose_db.log
- Sur
- Sur
bunny1/2-op
Vérifiez où réside bunny-op
Vérifiez les journaux de dd_dispatcher.py
/var/log/dd_dispatcher_xxxx.log
où xxxx est le processus pid
- Sur
- Sur
pds-op
Vérifiez le pull_urp
- Sur
La réparation des processus qui ne fonctionnent pas correctement devrait résoudre les problèmes en général. Plus de détails seront ajoutés ici au fur et à mesure que les problèmes sont rencontrés et corrigés.