RabbitMQ 消息隊列安裝:html
linux版本:CentOS 7python
安裝第一步:先關閉防火牆mysql
一、Centos7.x關閉防火牆linux
1
2
3
4
5
|
[root@rabbitmq /]# systemctl stop firewalld.service
[root@rabbitmq /]# systemctl disable firewalld.service
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
Removed symlink /etc/systemd/system/basic.target.wants/firewalld.service.
|
若是不想關閉防火牆,能夠經過以下方法處理:nginx
1
2
3
4
|
開放5672端口:
firewall-cmd --zone=
public
--add-port=5672/tcp --permanent
firewall-cmd --reload
|
第二步:安裝Erlanggit
因爲RabbitMQ依賴Erlang, 因此須要先安裝Erlang。github
Erlang的安裝方式大概有兩種:web
1)從Erlang Solution安裝(推薦)sql
1
2
3
4
5
|
# 添加erlang solutions源
$ wget https:
//packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
$ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
$ sudo yum install erlang
|
2)從EPEL源安裝(這種方式安裝的Erlang版本可能不是最新的,有時候不能知足RabbitMQ須要的最低版本)shell
1
2
3
4
|
# 啓動EPEL源
$ sudo yum install epel-release
# 安裝erlang
$ sudo yum install erlang
|
安裝第三步:安裝RabbitMQ
1
2
3
|
$ sudo rpm --import https:
//www.rabbitmq.com/rabbitmq-release-signing-key.asc
$ wget https:
//www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm # 下載RabbitMQ安裝包
$ sudo yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
|
注意:安裝時若是遇到下面的依賴錯誤
1
2
|
Error: Package: socat-1.7.2.3-1.el6.x86_64 (epel)
Requires: libreadline.so.5()(64bit)
|
能夠嘗試先執行:$ sudo yum install socat
至此,若是沒有報錯的話,RabbitMQ已經安裝成功!此時咱們須要測試RabbitMQ是否能夠正常使用。
測試:
一、開啓服務及查看工做狀態
1
2
3
4
|
$ rabbitmq-server #啓動RabbitMQ隊列$ sudo chkconfig rabbitmq-server
on
# 添加開機啓動RabbitMQ服務
$ sudo /sbin/service rabbitmq-server start # 啓動服務
$ sudo /sbin/service rabbitmq-server status # 查看服務狀態
$ sudo /sbin/service rabbitmq-server stop # 中止服務$ netstat -tulnp |grep rabbitmq #查看默認啓用的端口號,5672
|
二、關於RabbitMQ的一些基本操做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
# 查看當前全部用戶
$ sudo rabbitmqctl list_users<br><br>$ rabbitmqctl list_queues #查看當前的消息隊列列表
<br># 查看默認guest用戶的權限
$ sudo rabbitmqctl list_user_permissions guest
# 因爲RabbitMQ默認的帳號用戶名和密碼都是guest。爲了安全起見, 先刪掉默認用戶
$ sudo rabbitmqctl delete_user guest
# 添加新用戶
$ sudo rabbitmqctl add_user username password
# 設置用戶tag
$ sudo rabbitmqctl set_user_tags username administrator
# 賦予用戶默認vhost的所有操做權限
$ sudo rabbitmqctl set_permissions -p / username
".*"
".*"
".*"
# 查看用戶的權限
$ sudo rabbitmqctl list_user_permissions username
更多關於rabbitmqctl的使用,能夠參考<a href=
"https://www.rabbitmq.com/man/rabbitmqctl.1.man.html"
target=
"_blank"
>幫助手冊</a>。
|
三、開啓web管理接口
若是隻從命令行操做RabbitMQ,多少有點不方便。幸虧RabbitMQ自帶了web管理界面,只須要啓動插件即可以使用。
1
2
3
4
5
6
|
$ sudo rabbitmq-plugins enable rabbitmq_management
而後經過瀏覽器訪問
http:
//localhost:5672
輸入用戶名和密碼訪問web管理界面了。
|
四、配置RabbitMQ
關於RabbitMQ的配置,能夠下載RabbitMQ的配置文件模板到/etc/rabbitmq/rabbitmq.config, 而後按照需求更改便可。
關於每一個配置項的具體做用,能夠參考官方文檔。
更新配置後,別忘了重啓服務哦!
五、開啓用戶遠程訪問
默認狀況下,RabbitMQ的默認的guest用戶只容許本機訪問, 若是想讓guest用戶可以遠程訪問的話,只須要將配置文件中的loopback_users列表置爲空便可,以下:
1
|
{loopback_users, []}
|
另外關於新添加的用戶,直接就能夠從遠程訪問的,若是想讓新添加的用戶只能本地訪問,能夠將用戶名添加到上面的列表, 如只容許admin用戶本機訪問。
1
|
{loopback_users, [
"admin"
]}
|
更新配置後,別忘了重啓服務哦!
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。
通俗的講,就是生產者消費者模型。
實現生產者消費者模型的核心就是隊列!經過隊列去鏈接完成操做!
這個模型解決了耦合性,讓生產者和消費者之間沒有直接的聯繫,而是經過隊列創建橋樑。這其中最重要的就是隊列。
一、基於Queue實現生產者消費者模型,python的隊列 queue。
q=queue.Queue() q.put() q.qsize() #隊列內消息個數 q.get() 先進先出 #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
如上:python擁有本身的隊列模式,可是有一點不得不注意,他的隊列只能在同一進程內多線程間起做用,不能跨進程操做!
二、隊列的做用:
存儲消息、數據
保證消息的順序
保證數據的交付
三、消息隊列解決了兩個問題:
解耦 自然的解耦,程序間調用再也不使用接口,而是調用消息隊列的接口把執行結果放到隊列中,實現解耦。【實際開發過程當中,必定要想盡辦法下降程序間的耦合】
異步: 異步操做,程序再也不等待執行結果,而是提供接收接口
優勢:解決排隊問題
缺點:不能保證任務及時執行
應用場景:去哪兒購買飛機票,
同步:
優勢:保證任務及時執行
缺點:排隊問題
四、有關大併發的事宜:
web網站:
以前部署:apache 1000 - 2000 一臺機器同一時刻只能承載這麼多請求!
經常使用部署:nginx 10000 - 20000
什麼算大併發?有三個指標:
pv = page visit 頁面訪問量 【一天訪問量上億纔算大網站】【具備分散性,高峯時間段訪問明顯】
通常,一億的pv用10臺server web cluster 集羣,pv分散到實際的用戶上並很少,推算到秒級別訪問量
uv = user visit 用戶訪問量【相對頁面訪問量很小】
qps = 每秒鐘的訪問流量 or 用戶量
對同一個請求的訪問,多個機器每一個都負責一點,這就叫分佈式運算。
五、引入rabbitmq的緣由:
異步操做,應對大併發。
解決Python隊列不能跨進程執行的弊端。
對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
它做爲一個獨立的組件,能夠同時存在多個隊列【能夠爲任何程序提供隊列】,每一個隊列對應不一樣的應用程序,隊列之間平行存在,相互獨立,不能混用!
與Python Queue隊列相比,消息不會直接放到rabbitmq隊列中,而是會先經過一次消息過濾,看他所屬哪一個程序,而後放到對應的隊列中。
在不存在外力的影響下,RabbitMQ 隊列中的消息,若是不消費就永遠存在!只有消費以後會消失。
RabbitMQ 是一個公共的組件,能爲多語言間提供隊列【例如:生產者是Java,消費者是Python】。
六、生產者消費者經過RabbitMQ隊列創建通訊的步驟:
生產者:
端口 ip 認證信息
建立隊列
向隊列發送消息
消費者:
端口 ip 認證信息
從指定的隊列中取消息
七、RabbitMQ 配置 (Python鏈接隊列)
1.7.一、客戶端若想調用RabbitMQ,須要安裝對應的API。在python中安裝pika,經過pika鏈接rabbitmq
1
|
pip3 install pika
|
注意:若想使用、遠程鏈接rabbitmq server的話,就須要在RabbitMQ隊列這個組件內配置權限信息
八、rabbitmq 建立用戶和設置權限
1)首先在rabbitmq server上建立一個用戶:rabbitmqctl add_user aaa 密碼
2)配置權限,受權容許從外面訪問全部隊列:rabbitmqctl set_permissions -p / aaa "." "." ".*" #受權全部!
set_permissions [-p vhost] {user} {conf} {write} {read} vhost The name of the virtual host to which to grant the user access, defaulting to /. ,默認是 / user The name of the user to grant access to the specified virtual host. conf A regular expression matching resource names for which the user is granted configure permissions. write A regular expression matching resource names for which the user is granted write permissions. read A regular expression matching resource names for which the user is granted read permissions.
九、鏈接隊列時,須要創建通訊,配置認證信息(生產者和消費者都必須與隊列創建聯繫)
1
2
3
|
credentials
=
pika.PlainCredentials(
'aaa'
,
'密碼'
)
#配置認證信息<br>#創建連接
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'10.211.55.5'
,
5672
,
'/'
,credentials))
channel
=
connection.channel()
#隊列鏈接通道
|
十、生成者與消費者之間收發消息通訊
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
生產者:
發送消息語法:
#先聲明queue(沒有就建立,有就使用)
channel.queue_declare(queue
=
'task123'
,durable
=
True
)
#創建通訊
channel.basic_publish(exchange
=
'',
#給負責消息過濾的方法傳遞參數
routing_key
=
'task123'
,
#路由
body
=
'Hello World2!'
,消息內容
)
routing_key
=
'task123'
#路由 把消息隊列先傳給Exchange 而後再轉到task123隊列上
消費者:
獲取消息語法:
def
callback(ch,mothod,properties,body)
#獲取消息執行的函數
參數解釋:
ch:指隊列通道
method:請求方法
properties: 消息參數
body:消息內容
channel.basic_consume(
callback,
#取到消息以後,調用函數 callback
queue
=
"xxxxx"
,
#隊列名稱
no_ack
=
True
,
)
#開始消費
channel.start_consuming()
# 阻塞模式
|
注意: 通常申明隊列只須要在生產者端申明,但消費者端也能夠申明。是防止若是生產者沒有啓動,消費者先啓動後沒有隊列會報錯的問題。此時服務端若是有相同代碼,會檢查若是有相同隊列就不建立。消費者再次申明隊列,目的是:消費者要清楚去哪裏取數據!
import pika credentials = pika.PlainCredentials('aaa', '123') # 配置認證的用戶 密碼 parameters = pika.ConnectionParameters(host="192.168.152.132", credentials=credentials) connection = pika.BlockingConnection(parameters) # 創建一個連接對象 channel = connection.channel() # 隊列鏈接通道 channel.queue_declare(queue='hello') # 聲明隊列queue 用rabbitmqctl list_queuse 查看 channel.basic_publish(exchange='', routing_key='hello', body='server hello world') # routing_key 路由表明要發送的隊列 body是發送的內容 print('server send "hello world"') connection.close() # 關閉鏈接 相似socket
#消費者是一種阻塞模式,會一致取數據 import pika credentials = pika.PlainCredentials('aaa', '123') # 配置認證的用戶 密碼 parameters = pika.ConnectionParameters(host="192.168.152.132", credentials=credentials) connection = pika.BlockingConnection(parameters) # 創建一個連接對象 channel = connection.channel() # 隊列鏈接通道 channel.queue_declare(queue='hello') # 聲明queue 用rabbitmqctl list_queuse 查看 def callback(ch, method, properties, body): print("Recived %r" % ch, method, properties, body) channel.basic_consume(callback, # 取到消息後,執行callback函數 queue='hello', # 從hello隊列獲取數據 no_ack=True ) print("waiting for message") channel.start_consuming() # 進入阻塞模式
如何保證隊列中的消息被徹底處理完畢?咱們正常的思惟應該是:沒有處理完,應該返回隊列。可是在上面的代碼中,若是消費者客戶端掛了或者在處理的過程當中中止了,不只消息沒有處理完畢,同時隊列中也沒有了。
2.一、模擬客戶端中斷 觀察服務端隊列的數據會不會返回(答案:不會)
1
2
3
|
#- 開啓一個生產者,兩個消費者
#- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡10秒期間中斷,表示出錯,它不會報告給服務端
#- 這時隊列中爲零,另外一客戶端也不會取到值
|
測試代碼以下:
#生產者 import pika credentials = pika.PlainCredentials("aaa","123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello') #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
#解決辦法: #0、發送消息時,在函數中添加如下參數,保證消息持久化 properties=pika.BasicProperties( delivery_mode=2, # make message persistent),# 數字表明狀態:2保持消息持久化;1處理中;0處理完畢; #一、消費確認的問題! #在消費者端,從隊列中獲取信息函數中有一個參數:no_ack = True 的意思是消息處理後,向rabbit-server確認消息已消費完畢。 刪除這個參數,再也不確認信息已消費,rabbit-server的消息隊列中會一致存在數據 #二、解決消費後數據還存在問題! # 解決rabbit-server中消息被消費後數據還存在的狀況,在消費者處理消息的函數中,使用ch.basic_ack(delivery_tag=method.delivery_tag)與生產者手動確認,消息處理完畢! #經過這兩個參數,同時保證了消費者可以消費完數據不掛,同時消費完後rabbit-server收到消費完的消息把被消費的數據刪除 #1. 生產者端發消息時,加參數 消息持久化 properties=pika.BasicProperties( delivery_mode=2, # make message persistent), #2. 消費者端,消息處理完畢時,發送確認包 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, #取到消息後,調用callback 函數 queue='task1',) #no_ack=True) #消息處理後,不向rabbit-server確認消息已消費完畢
2.二、模擬測試 觀察服務端隊列的數據會不會返回(答案:會)
1
2
3
|
#- 開啓一個服務端,兩個客戶端
#- 服務端向隊列中存放一個值,一客戶端從隊列中取到數據,在睡20秒期間中斷,表示出錯,它會報給服務端,服務端隊列還有值
#- 這時啓動另外一客戶端還能夠取到值
|
#生產者 import pika credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 #聲明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='task1', #路由 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), #新加入參數 body='Hello World2!') print(" [x] Sent 'Hello World!'") connection.close()
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少。此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上。
當咱們把rabbitmq-server重啓後,發現全部的消息就都丟失了?這種問題怎麼辦?假如咱們在某一個隊列中加入了上萬條消息,忽然消息隊列重啓了。。。那是否是咱們還得手動去添加消失的消息麼?不用!如下是解決辦法:
一、生產者在聲明隊列的時候使用參數,保持隊列持久化 durable = True。
注意:必定是要在隊列第一次聲明的時候前添加,不能對已經生成的隊列從新再進行一次設置,不然會報錯【沒法從新修改隊列】。
二、再經過參數delivery_mode = 2 把消息也變成持久化的。即使是rabbitmq服務重啓後,也不會丟消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#隊列持久化 【僅設置單個】
channel.queue_declare(queue
=
'hello'
,durable
=
True
)
systemctl restart rabbitmq
-
server
#重啓服務發現hello隊列還在,可是消息不在
rabbitmqctl list_queues
#查看消息隊列
#hello
#隊列和消息持久化 【兩個參數都存在】
channel.queue_declare(queue
=
'hello'
,durable
=
True
)
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
# make message persistent
),
systemctl restart rabbitmq
-
server
#重啓服務發現隊列和消息都還在
rabbitmqctl list_queues
#查看消息隊列
#hello 1
|
import pika credentials = pika.PlainCredentials("aaa","123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello',durable=True) #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
傳統模式:輪詢(排隊)獲取隊列中的數據,若是有一個消費者處理慢了,其餘的消費者須要一直等待。那怎麼解決併發的問題呢?別人處理快慢與本人處理的速度無關。
解決方案:以誰先處理完,誰就先得到數據的原則【消息處理完畢纔會再拿一條數據】。在消費者端加上這個條件判斷:channel.basic_qos(prefetch_count=1) # 公平分發,能者多勞,每次執行一個。
注意:生產者的代碼不變,消費者代碼中加入每次處理一次的參數:channel.basic_qos(prefetch_count=1) # 公平分發
import pika credentials = pika.PlainCredentials("aaa","123") #受權的帳號 密碼 connection = pika.BlockingConnection( pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #創建socket channel = connection.channel() #建立rabbitmq協議通道 channel.queue_declare(queue='hello',durable=True) #經過通道生成一個隊列 channel.basic_publish(exchange='', routing_key='hello', #隊列 properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), body='Hello World!') #內容 print(" [x] Sent 'Hello World!'") connection.close()
import pika import time credentials = pika.PlainCredentials('aaa', '123') # 配置認證的用戶 密碼 parameters = pika.ConnectionParameters(host="192.168.11.144", credentials=credentials) connection = pika.BlockingConnection(parameters) # 創建一個連接對象 channel = connection.channel() # 隊列鏈接通道 def callback(ch, method, properties, body): print("Recived %r" % ch, method, properties, body) time.sleep(10) print('msg handle done...',body) ch.basic_ack(delivery_tag=method.delivery_tag) # 這個是表示消費者處理完了 channel.basic_qos(prefetch_count=1) # 公平分發 channel.basic_consume(callback, # 取到消息後,執行callback函數 queue='hello', # no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 進入阻塞模式
廣播策略:每一個人都能收到;或是過濾某些人能夠接收
一個生產者,對應對個消費者!
exchange type 過濾類型
fanout = 廣播
direct = 組播
topic = 規則播
header = 略過。。。
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout
headers: 經過headers 來決定把消息發給哪些queue
a、廣播模式(一個生產者,多個消費者):
一、路由指定爲空!全部消息都發給exchange處理轉到隊列,轉到哪一個隊列就須要exchange指定,因此在創建鏈接的時候要指定名字。
注意:exchange只負責轉發不負責存放消息!若是沒有隊列綁定消息就會扔掉!
二、自動生成隊列名,而後使用完以後再刪掉
隊列參數exclusive=True惟一的,rabbit 隨機生成一個名字。
三、生產者和消費者端都要聲明隊列,以排除生成者未啓動,消費者獲取報錯的問題
四、生產者發送一條消息,說有的消費者都能接收到!高效,效率的完成發送!
應用場景:新浪微博 訂閱模式,只有當前登陸的用戶才能夠收到實時發送的消息
import pika import sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.exchange_declare(exchange='logs',type='fanout') #聲明隊列 exchange名字和類型 message = ' '.join(sys.argv[1:]) or "info: Hello World!" #獲取外界輸入的信息,不然就是hello world channel.basic_publish(exchange='logs', #指定exchange的名字 routing_key='', #注意,不須要指定隊列名! body=message) #信息 print(" [x] Sent %r" % message) connection.close()
import pika credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.exchange_declare(exchange='logs', type='fanout') queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue #獲取隊列名 print('queue name',queue_name,queue_obj) #打印會列名 channel.queue_bind(exchange='logs',queue=queue_name) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
b、direct 組播模式:有選擇的接收消息(exchange type=direct)
一、有選擇的接收消息(exchange type=direct),RabbitMQ還支持根據關鍵字發送,至關因而添加了一個過濾地帶!
即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
二、發什麼類型的,什麼類型的接收,在接收端運行的時候加參數,指定接收的類型。
三、routing_key = 'xxx' 與廣播相比再也不爲空,隊列由執行時手動輸入獲取,而後路由指定發送到哪一個隊列。
四、按照類型:生產者發送指定類型的消息;消費者循環綁定隊列,若是不存在不接收
例子:就像廣播電臺,要想接收生產者發送的數據,必須是綁定且在線!若是斷開一段時間再接收該電臺消息,以前的訊息就不會再收到!
應用場景:日誌分類處理邏輯 【注:能夠同時存在多個消費者】
import pika import sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.exchange_declare(exchange='direct_log',type='direct') #聲明消息隊列及類型 log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' #日誌等級 message = ' '.join(sys.argv[1:]) or "info: Hello World!" #接收手動輸入的消息內容 channel.basic_publish(exchange='direct_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()
import pika,sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue print('queue name',queue_name,queue_obj) log_levels = sys.argv[1:] #日誌等級 info warning error danger #判斷存不存在,不存在退出 if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) #循環綁定隊列 for level in log_levels: channel.queue_bind(exchange='direct_log', queue=queue_name, routing_key=level) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
c、topic規則播
話題類型,能夠根據正則進行更精確的匹配,按照規則過濾。exchange type = topic,僅改下類型便可!
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。
# 表示能夠匹配 0 個 或 多個 單詞
* 表示只能匹配 一個 單詞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
To receive
all
the logs run:
python receive_logs_topic.py
"#"
To receive
all
logs
from
the facility
"kern"
:
python receive_logs_topic.py
"kern.*"
Or
if
you want to hear only about
"critical"
logs:
python receive_logs_topic.py
"*.critical"
You can create multiple bindings:
python receive_logs_topic.py
"kern.*"
"*.critical"
And to emit a log with a routing key
"kern.critical"
type
:
python emit_log_topic.py
"kern.critical"
"A critical kernel error"
|
1
2
3
4
5
6
7
8
9
10
|
#測試執行以下:
#客戶端一:
-
python3 receive1.py
*
.django
#客戶端二:
-
python3 receive1.py mysql.error
#客戶端三:
-
python3 receive1.py mysql.
*
#服務端:
-
python3 receive1.py
#匹配相應的客戶端
|
import pika import sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.exchange_declare(exchange='topic_log',type='topic') #log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info' message = ' '.join(sys.argv[1:]) or "all.info: Hello World!" channel.basic_publish(exchange='topic_log', routing_key=log_level, body=message) print(" [x] Sent %r" % message) connection.close()
import pika,sys credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = queue_obj.method.queue log_levels = sys.argv[1:] # info warning errr if not log_levels: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for level in log_levels: channel.queue_bind(exchange='topic_log', queue=queue_name, routing_key=level) #綁定隊列到Exchange print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name, no_ack=True) channel.start_consuming()
從上邊全部的例子中你有沒有發現,上面的隊列都是單向執行的,須要有發送端和接收端。若是遠程的一臺機器執行完畢再返回結果,那就實現不了了。若是讓他執行完返回,這種模式叫什麼呢?RPC(遠程過程調用),snmp就是典型的RPC。
那RabbitMQ能不能返回呢,怎麼返回呢?可讓機器既是發送端又是接收端。可是接收端返回消息怎麼返回?能夠發送到發過來的queue裏麼?答案固然是不能夠,若是仍是存在原先的隊列就會直接陷入死循環!因此返回時,須要讓消息內部指定再創建一個隊列queue,把結果發送新的queue裏。
同時,爲了服務端返回的queue不寫死,在客戶端給服務端發指令的的時候,同時帶一條消息說,你結果返回給哪一個queue
在執行多個消息任務的時候,怎麼區分判斷哪一個消息是先執行呢?答案就是,在發任務時,再額外提交一個惟一標識符。
task1,task2異步執行,可是返回的順序是不固定的,爲了區分是誰執行完的,在發送的任務添加惟一標識符,這樣取回的時候就能區分。
設置一個異步RPC
聲明一個隊列reply_to,做爲返回消息結果的隊列
發送消息隊列,消息中帶惟一標識uid
監聽reply_to隊列,直到有結果
在類中聲明監聽
__author__ = 'ShengXin' #1. 定義fib函數 #2. 聲明接收指令的隊列名rpc_queue #3. 開始監聽隊列,收到消息後 調用fib函數 #4. 把fib執行結果,發送回客戶端指定的reply_to 隊列 import subprocess import pika import time credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() #隊列鏈接通道 channel.queue_declare(queue='rpc_queue2') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def run_cmd(cmd): cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) result = cmd_obj.stdout.read() + cmd_obj.stderr.read() return result def on_request(ch, method, props, body): cmd = body.decode("utf-8") print(" [.] run (%s)" % cmd) response = run_cmd(cmd) ch.basic_publish(exchange='', routing_key=props.reply_to, #隊列 接收客戶端傳過來的隊列,返回 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=response) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(on_request, queue='rpc_queue2') print(" [x] Awaiting RPC requests") channel.start_consuming() rpc-server
# 1.聲明一個隊列,做爲reply_to返回消息結果的隊列 # 2. 發消息到隊列,消息裏帶一個惟一標識符uid,reply_to # 3. 監聽reply_to 的隊列,直到有結果 import queue import pika import uuid class CMDRpcClient(object): def __init__(self): credentials = pika.PlainCredentials('aaa', '123') parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #命令的執行結果的queue #聲明要監聽callback_queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): """ 收到服務器端命令結果後執行這個函數 :param ch: :param method: :param props: 服務器端返回的消息結果! :param body: :return: """ if self.corr_id == props.correlation_id: self.response = body.decode("gbk") #把執行結果賦值給Response def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) #惟一標識符號 self.channel.basic_publish(exchange='', routing_key='rpc_queue2', properties=pika.BasicProperties( reply_to = self.callback_queue, #傳遞要返回的消息隊列 correlation_id = self.corr_id, #惟一id ), body=str(n)) #循環監聽 while self.response is None: self.connection.process_data_events() #檢測監聽的隊列裏有沒有新消息,若是有,收,若是沒有,返回None #檢測有沒有要發送的新指令 return self.response cmd_rpc = CMDRpcClient() print(" [x] Requesting fib(30)") response = cmd_rpc.call('ipconfig') print(response)