create_rabbitmq_federationcss
1.因爲公司環境用到rabbitmq的federation,若是經過web UI建立,耗時耗力,並且還容易出錯。因此但願經過rabbitmq的api來實施這一步驟
2.查詢官方文檔的api,http api最適合完成這一項任務html
1.linux主機一臺,ip:10.0.1.66
2.爲了快速實驗,安裝 docker engine,拉取官方docker鏡像:rabbitmq:3.6-alpinepython
docker pull rabbitmq:3.6-alpine
3.準備須要使用的rabbitmq的插件配置文件enabled_pluginslinux
mkdir -p /home/wu/rabbitmq-test/ echo '[rabbitmq_federation,rabbitmq_federation_management,rabbitmq_management].' > /home/wu/rabbitmq-test/enabled_plugins #注:配置文件末尾有一個點
4.啓動rabbitmqgit
1)啓動一個rabbitmq,假如命名爲external-mqgithub
docker run -d -h external-mq --name external-mq -p 5672:5672 -p 15672:15672 -v /home/wu/rabbitmq-test/external:/var/lib/rabbitmq -v /home/wu/rabbitmq-test/enabled_plugins:/etc/rabbitmq/enabled_plugins rabbitmq:3.6-alpine
2)啓動一個rabbitmq,假如命名爲internal-mqweb
docker run -d -h internal-mq --name internal-mq -p 5673:5672 -p 15673:15672 -v /home/wu/rabbitmq-test/internal:/var/lib/rabbitmq -v /home/wu/rabbitmq-test/enabled_plugins:/etc/rabbitmq/enabled_plugins rabbitmq:3.6-alpine
5.配置rabbitmq的用戶名密碼
1)使用guest:guest登錄
2)在admin標籤建立用戶,設置密碼,並設置權限(這裏我建立了用戶名user1,密碼爲password123)docker
1.參照官方文檔:http://www.rabbitmq.com/federation.html
因爲官方文檔的example,沒有給出包含設置prefetch-count,reconnect-delay等參數的例子,我也沒找到相關的數據格式,因此採用一個小技巧:
公司環境裏有已經經過web UI設置過federation-upstream的環境,因此get一下就能夠拿到數據格式了json
curl -i -u 'admin_user:admin_passowrd' -H "content-type:application/json" -XGET http://10.0.1.100:15672/api/parameters/federation-upstream/%2f/internal-result-send
2.在external-mq上建立federation-upstream:api
external-federation-upstream='{"value":{"uri":"amqp://user1:password123@10.0.1.66:5673","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"internal-result-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"internal-result-send"}' curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${external-federation-upstream} http://10.0.1.66:15672/api/parameters/federation-upstream/%2f/internal-result-send
3.在external-mq上建立policy
external_policies_body='{"pattern":"^external_result_receive","apply-to":"exchanges","definition":{"federation-upstream":"internal-result-send"},"priority":0}' curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${external_policies_body} http://10.0.1.66:15672/api/policies/%2f/external_result_receive
至此,就完成了單邊的消息傳遞
4.internal-mq上建立federation-upstream:
internal-federation-upstream='{"value":{"uri":"amqp://user1:password123@10.0.1.66:5672","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"external-search-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"external-search-send"}' curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${internal-federation-upstream} http://10.0.1.66:15673/api/parameters/federation-upstream/%2f/external-search-send
5.internal-mq上建立policy
internal_policies_body='{"pattern":"^internal_search_receive","apply-to":"exchanges","definition":{"federation-upstream":"external-search-send"},"priority":0}' curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${internal_policies_body} http://10.0.1.66:15673/api/policies/%2f/external-search-send
1.當設置的password有特殊字符時,執行curl命令後,rabbitmq並不能建立federation,而會報錯。
好比,用戶名爲user1,密碼爲password#123時,以上操做將沒法成功執行;
大體由於是:特殊字符,漢字等都不能被http的url處理,必須通過url百分號編碼
2.個人解決辦法:利用python的urllib庫編碼
cat url_percent_encode.py
#!/usr/bin/env python #-*- coding:utf-8 -*- import urllib content = 'password#123' new_content = urllib.quote_plus(content) print(new_content)
將執行python腳本後獲得的字符串password%23123替換以前的密碼字符串即:
如:
body='{"value":{"uri":"amqp://user1:password%23123@10.0.1.66:5672","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"external-search-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"external-search-send"}'
補充一個python操做rabbitmq http api的腳本
#!/usr/bin/env python # -*- coding: utf-8 -*- # @author: Wu # @date:18-8-6 import urllib, requests, json, base64 ex_info = { "username": "admin_user", "password": "admin#passwd", "ip": "10.0.1.66", "manage_port": "15672", "amqp_port": "5672", "exchange": "external_result_receive", "upstream": "internal-result-send", } in_info = { "username": "admin_user", "password": "admin#passwd", "ip": "10.0.1.66", "manage_port": "15673", "amqp_port": "5673", "exchange": "internal_search_receive", "upstream": "external-search-send" } def str_percent_encode(content): new_content = urllib.quote_plus(content) return new_content def http_put_federation(username, password, data, url): j_data = json.dumps(data) userInfo = "%s:%s" % (username, password) userInfo = base64.b64encode(userInfo.encode('utf-8')) auth = 'Basic ' + userInfo headers = { 'Content-Type': 'application/json', 'authorization': auth, } resp = requests.put(url, data=j_data, headers=headers) return resp api = "/api/parameters/federation-upstream/%2f/" ex_url = "http://" + ex_info.get("ip") + ":" + ex_info.get("manage_port") + api + ex_info.get("upstream") in_url = "http://" + in_info.get("ip") + ":" + in_info.get("manage_port") + api + in_info.get("upstream") in_mq = in_info.get("username") + ":" + str_percent_encode(in_info.get("password")) + "@" + in_info.get( "ip") + ":" + in_info.get("amqp_port") ex_mq = ex_info.get("username") + ":" + str_percent_encode(ex_info.get("password")) + "@" + ex_info.get( "ip") + ":" + ex_info.get("amqp_port") ex_data = { "value": { "uri": "amqp://" + in_mq, "prefetch-count": 20, "reconnect-delay": 30, "ack-mode": "on-confirm", "trust-user-id": False, "exchange": ex_info.get("upstream"), "max-hops": 1 }, "vhost": "/", "component": "federation-upstream", "name": ex_info.get("upstream") } in_data = { "value": { "uri": "amqp://" + ex_mq, "prefetch-count": 20, "reconnect-delay": 30, "ack-mode": "on-confirm", "trust-user-id": False, "exchange": in_info.get("upstream"), "max-hops": 1 }, "vhost": "/", "component": "federation-upstream", "name": in_info.get("upstream") } ex_po_data = { "pattern": "^" + ex_info.get("exchange"), "apply-to": "exchanges", "definition": { "federation-upstream": ex_info.get("upstream") }, "priority": 0 } in_po_data = { "pattern": "^" + in_info.get("exchange"), "apply-to": "exchanges", "definition": { "federation-upstream": in_info.get("upstream") }, "priority": 0 } po_api = "/api/policies/%2f/" ex_po_url = "http://" + ex_info.get("ip") + ":" + ex_info.get("manage_port") + po_api + ex_info.get("exchange") in_po_url = "http://" + in_info.get("ip") + ":" + in_info.get("manage_port") + po_api + in_info.get("exchange") resp = http_put_federation(ex_info.get("username"), ex_info.get("password"), ex_data, ex_url) print("external create federation:",resp) resp = http_put_federation(in_info.get("username"), in_info.get("password"), in_data, in_url) print("internal create federation:",resp) resp = http_put_federation(ex_info.get("username"), ex_info.get("password"), ex_po_data, ex_po_url) print("external create policies:",resp) resp = http_put_federation(in_info.get("username"), in_info.get("password"), in_po_data, in_po_url) print("external create policies:",resp)
#################使用python3重寫,並加入自動建立須要的exchange
#!/usr/bin/env python # -*- coding: utf-8 -*- # @author: Wu # @date:19-2-26 import urllib, requests, json, base64 class Federate: def __init__(self): self.ex_info = { "username": "admin", "password": "adminpassword", "ip": "10.0.1.60", "manage_port": "55788", "amqp_port": "5788", "exchange": "a_result_receive", "upstream": "b-result-send", } self.in_info = { "username": "admin", "password": "adminpassword", "ip": "10.0.1.60", "manage_port": "55789", "amqp_port": "5789", "exchange": "b_search_receive", "upstream": "a-search-send" } self.ex_args_list = [] self.in_args_list = [] self.ex_headers = None self.in_headers = None @staticmethod def str_percent_encode(content): new_content = urllib.parse.quote_plus(content) return new_content @staticmethod def request_put(kwargs): resp = requests.put(**kwargs) return resp def _init_url(self, kwargs): ip = kwargs.get("ip") username = kwargs.get("username") password = self.str_percent_encode(kwargs.get("password")) amqp_port = kwargs.get("amqp_port") manage_port = kwargs.get("manage_port") upstream = kwargs.get("upstream") exchange = kwargs.get("exchange") fed_api = "/api/parameters/federation-upstream/%2f/" exchange_api = "/api/exchanges/%2F/" po_api = "/api/policies/%2f/" fed_url = f"http://{ip}:{manage_port}{fed_api}{upstream}" mq_url = f"{username}:{password}@{ip}:{amqp_port}" exchange_url = f'http://{ip}:{manage_port}{exchange_api}{exchange}' po_url = f'http://{ip}:{manage_port}{po_api}{exchange}' return fed_url, mq_url, exchange_url, po_url @staticmethod def _init_fed_data(opposite_mq, upstream): fed_data = { "value": { "uri": f"amqp://{opposite_mq}", "prefetch-count": 300, "reconnect-delay": 30, "ack-mode": "on-confirm", "trust-user-id": False, "exchange": upstream, "max-hops": 1 }, "vhost": "/", "component": "federation-upstream", "name": upstream } return json.dumps(fed_data) @staticmethod def _init_policy_data(kwargs): po_data = { "pattern": F'^{kwargs.get("exchange")}', "apply-to": "exchanges", "definition": { "federation-upstream": kwargs.get("upstream") }, "priority": 0 } return json.dumps(po_data) @staticmethod def _init_headers(username, password): user_info = f"{username}:{password}" user_info = base64.b64encode(user_info.encode('utf-8')) auth = f'Basic {str(user_info,encoding="utf-8")}' headers = { 'Content-Type': 'application/json', 'authorization': auth, } return headers @staticmethod def _init_exchange_data(kwargs): exchange_data = {"vhost": "/", "name": kwargs.get("exchange"), "type": "topic", "durable": "true", "auto_delete": "false", "internal": "false", "arguments": {}} return json.dumps(exchange_data) def init_all_data(self): ex_fed_kwargs = {} ex_po_kwargs = {} in_fed_kwargs = {} in_po_kwargs = {} ex_exchange_kwargs = {} in_exchange_kwargs = {} ex_fed_kwargs["url"], ex_mq, ex_exchange_kwargs["url"], ex_po_kwargs["url"] = self._init_url(self.ex_info) in_fed_kwargs["url"], in_mq, in_exchange_kwargs["url"], in_po_kwargs["url"] = self._init_url(self.in_info) self.ex_headers = self._init_headers(self.ex_info.get("username"), self.ex_info.get("password")) self.in_headers = self._init_headers(self.in_info.get("username"), self.in_info.get("password")) ex_fed_kwargs["data"] = self._init_fed_data(in_mq, self.ex_info.get("upstream")) in_fed_kwargs["data"] = self._init_fed_data(ex_mq, self.in_info.get("upstream")) ex_po_kwargs["data"] = self._init_policy_data(self.ex_info) in_po_kwargs["data"] = self._init_policy_data(self.in_info) ex_exchange_kwargs["data"] = self._init_exchange_data(self.ex_info) in_exchange_kwargs["data"] = self._init_exchange_data(self.in_info) self.ex_args_list = [ex_fed_kwargs, ex_po_kwargs, ex_exchange_kwargs] self.in_args_list = [in_fed_kwargs, in_po_kwargs, in_exchange_kwargs]
def main():
fd = Federate()
fd.init_all_data()
for kwargs in fd.ex_args_list:
kwargs["headers"] = fd.ex_headers
res = fd.request_put(kwargs)
if res.content:
print(res.content)
print(res)
for kwargs in fd.in_args_list:
kwargs["headers"] = fd.in_headers
res = fd.request_put(kwargs)
if res.content:
print(res.content)
print(res)
if __name__ == "__main__": main()