介紹rabbitMQ以前。先介紹一下AMQP協議,由於rabbitMQ是基於AMQP協議實現的一個服務程序。(目前爲止應該也是惟一實現了AMQP協議的服務)html
AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware brockers)之間進行通訊。前端
消息代理(message brokers)從發佈者(publishers)亦稱生產者(producers)那兒接收消息,並根據既定的路由規則把接收到的消息發送給處理消息的消費者(consumers)。python
因爲AMQP是一個網絡協議,因此這個過程當中的發佈者,消費者,消息代理 能夠存在於不一樣的設備上。web
AMQP 0-9-1的工做過程以下圖:消息(message)被髮布者(publisher)發送給交換機(exchange),交換機經常被比喻成郵局或者郵箱。而後交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最後AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。算法
發佈者(publisher)發佈消息時能夠給消息指定各類消息屬性(message meta-data)。有些屬性有可能會被消息代理(brokers)使用,然而其餘的屬性則是徹底不透明的,它們只能被接收消息的應用所使用。數據庫
從安全角度考慮,網絡是不可靠的,接收消息的應用也有可能在處理消息的時候失敗。基於此緣由,AMQP模塊包含了一個消息確認(message acknowledgements)的概念:當一個消息從隊列中投遞給消費者後(consumer),消費者會通知一下消息代理(broker),這個能夠是自動的也能夠由處理消息的應用的開發者執行。當「消息確認」被啓用的時候,消息代理不會徹底將消息從隊列中刪除,直到它收到來自消費者的確認回執(acknowledgement)。編程
在某些狀況下,例如當一個消息沒法被成功路由時,消息或許會被返回給發佈者並被丟棄。或者,若是消息代理執行了延期操做,消息會被放入一個所謂的死信隊列中。此時,消息發佈者能夠選擇某些參數來處理這些特殊狀況。數組
隊列,交換機和綁定統稱爲AMQP實體(AMQP entities)。瀏覽器
AMQP 0-9-1是一個可編程協議,某種意義上說AMQP的實體和路由規則是由應用自己定義的,而不是由消息代理定義。包括像聲明隊列和交換機,定義他們之間的綁定,訂閱隊列等等關於協議自己的操做。安全
這雖然能讓開發人員自由發揮,但也須要他們注意潛在的定義衝突。固然這在實踐中不多會發生,若是發生,會以配置錯誤(misconfiguration)的形式表現出來。
應用程序(Applications)聲明AMQP實體,定義須要的路由方案,或者刪除再也不須要的AMQP實體。
交換機是用來發送消息的AMQP實體。交換機拿到一個消息以後將它路由給一個或零個隊列。它使用哪一種路由算法是由交換機類型和被稱做綁定(bindings)的規則所決定的。AMQP 0-9-1的代理提供了四種交換機
Name(交換機類型) | Default pre-declared names(預聲明的默認名稱) |
---|---|
Direct exchange(直連交換機) | (Empty string) and amq.direct |
Fanout exchange(扇型交換機) | amq.fanout |
Topic exchange(主題交換機) | amq.topic |
Headers exchange(頭交換機) | amq.match (and amq.headers in RabbitMQ) |
除交換機類型外,在聲明交換機時還能夠附帶許多其餘的屬性,其中最重要的幾個分別是:
交換機能夠有兩個狀態:持久(durable)和暫存(transient)。持久化的交換機會在消息代理(broker)重啓後依舊存在,而暫存的交換機則不會(它們須要在代理再次上線後從新被聲明)。然而並非全部的應用場景都須要持久化的交換機。
默認交換機(default exchange)其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
舉個栗子:當你聲明瞭一個名爲"search-indexing-online"的隊列,AMQP代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是"search-indexing-online"。所以,當攜帶着名爲"search-indexing-online"的路由鍵的消息被髮送到默認交換機的時候,此消息會被默認交換機路由至名爲"search-indexing-online"的隊列中。換句話說,默認交換機貌似可以直接將消息投遞給隊列,儘管技術上並無作相關的操做。
直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。直連交換機用來處理消息的單播路由(unicast routing)(儘管它也能夠處理多播路由)。下邊介紹它是如何工做的:
R
的消息被髮送給直連交換機時,交換機會把它路由給綁定值一樣爲R
的隊列。直連交換機常常用來循環分發任務給多個工做者(workers)。當這樣作的時候,咱們須要明白一點,在AMQP 0-9-1中,消息的負載均衡是發生在消費者(consumer)之間的,而不是隊列(queue)之間。
直連型交換機圖例:
扇型交換機(funout exchange)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。
由於扇型交換機投遞消息的拷貝到全部綁定到它的隊列,因此他的應用案例都極其類似:
扇型交換機圖例:
主題交換機(topic exchanges)經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。
主題交換機擁有很是普遍的用戶案例。不管什麼時候,當一個問題涉及到那些想要有針對性的選擇須要接收消息的 多消費者/多應用(multiple consumers/applications) 的時候,主題交換機均可以被列入考慮範圍。
使用案例:
有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。
咱們能夠綁定一個隊列到頭交換機上,並給他們之間的綁定使用多個用於匹配的頭(header)。這個案例中,消息代理得從應用開發者那兒取到更多一段信息,換句話說,它須要考慮某條消息(message)是須要部分匹配仍是所有匹配。上邊說的「更多一段消息」就是"x-match"參數。當"x-match"設置爲「any」時,消息頭的任意一個值被匹配就能夠知足條件,而當"x-match"設置爲「all」的時候,就須要消息頭的全部值都匹配成功。
頭交換機能夠視爲直連交換機的另外一種表現形式。頭交換機可以像直連交換機同樣工做,不一樣之處在於頭交換機的路由規則是創建在頭屬性值之上,而不是路由鍵。路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,它們甚至能夠是整數或者哈希值(字典)等。
AMQP中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。
隊列在聲明(declare)後才能被使用。若是一個隊列尚不存在,聲明一個隊列會建立它。若是聲明的隊列已經存在,而且屬性徹底相同,那麼這次聲明不會對原有隊列產生任何影響。若是聲明中的屬性與已存在隊列的屬性有差別,那麼一個錯誤代碼爲406的通道級異常就會被拋出。
隊列的名字能夠由應用(application)來取,也可讓消息代理(broker)直接生成一個。隊列的名字能夠是最多255字節的一個utf-8字符串。若但願AMQP消息代理生成隊列名,須要給隊列的name參數賦值一個空字符串:在同一個通道(channel)的後續的方法(method)中,咱們可使用空字符串來表示以前生成的隊列名稱。之因此以後的方法能夠獲取正確的隊列名是由於通道能夠默默地記住消息代理最後一次生成的隊列名稱。
以"amq."開始的隊列名稱被預留作消息代理內部使用。若是試圖在隊列聲明時打破這一規則的話,一個通道級的403 (ACCESS_REFUSED)錯誤會被拋出。
持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啓的時候,它依舊存在。沒有被持久化的隊列稱做暫存隊列(Transient queues)。並非全部的場景和案例都須要將隊列持久化。
持久化的隊列並不會使得路由到它的消息也具備持久性。假若消息代理掛掉了,從新啓動,那麼在重啓的過程當中持久化隊列會被從新聲明,不管怎樣,只有通過持久化的消息才能被從新恢復。
綁定(Binding)是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則。若是要指示交換機「E」將消息路由給隊列「Q」,那麼「Q」就須要與「E」進行綁定。綁定操做須要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。路由鍵的意義在於從發送給交換機的衆多消息中選擇出某些消息,將其路由給綁定的隊列。
打個比方:
擁有了交換機這個中間層,不少由發佈者直接到隊列難以實現的路由方案可以得以實現,而且避免了應用開發者的許多重複勞動。
若是AMQP的消息沒法路由到隊列(例如,發送到的交換機沒有綁定隊列),消息會被就地銷燬或者返還給發佈者。如何處理取決於發佈者設置的消息屬性。
消息若是隻是存儲在隊列裏是沒有任何用處的。被應用消費掉,消息的價值纔可以體現。在AMQP 0-9-1 模型中,有兩種途徑能夠達到此目的:
使用push API,應用(application)須要明確表示出它在某個特定隊列裏所感興趣的,想要消費的消息。如是,咱們能夠說應用註冊了一個消費者,或者說訂閱了一個隊列。一個隊列能夠註冊多個消費者,也能夠註冊一個獨享的消費者(當獨享消費者存在時,其餘消費者即被排除在外)。
每一個消費者(訂閱者)都有一個叫作消費者標籤的標識符。它能夠被用來退訂消息。消費者標籤其實是一個字符串。
消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。並且網絡緣由也有可能引發各類問題。這就給咱們出了個難題,AMQP代理在何時刪除消息纔是正確的?AMQP 0-9-1 規範給咱們兩種建議:
前者被稱做自動確認模式(automatic acknowledgement model),後者被稱做顯式確認模式(explicit acknowledgement model)。在顯式模式下,由消費者應用來選擇何時發送確認回執(acknowledgement)。應用能夠在收到消息後當即發送,或將未處理的消息存儲後發送,或等到消息被處理完畢後再發送確認回執(例如,成功獲取一個網頁內容並將其存儲以後)。
若是一個消費者在還沒有發送確認回執的狀況下掛掉了,那AMQP代理會將消息從新投遞給另外一個消費者。若是當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,而後再次嘗試投遞。
當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。應用能夠向消息代理代表,本條消息因爲「拒絕消息(Rejecting Messages)」的緣由處理失敗了(或者未能在此時完成)。當拒絕某條消息時,應用能夠告訴消息代理如何處理這條消息——銷燬它或者從新放入隊列。當此隊列只有一個消費者時,請確認不要因爲拒絕消息而且選擇了從新放入隊列的行爲而引發消息在同一個消費者身上無限循環的狀況發生。
在AMQP中,basic.reject方法用來執行拒絕消息的操做。但basic.reject有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。可是若是你使用的是RabbitMQ,那麼你可使用被稱做negative acknowledgements(也叫nacks)的AMQP 0-9-1擴展來解決這個問題。更多的信息請參考幫助頁面
在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每一個消費者一次能夠接受多少條消息是很是有用的。這能夠在試圖批量發佈消息的時候起到簡單的負載均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生產應用每分鐘才發送一條消息,這說明處理工做尚在運行。)
注意,RabbitMQ只支持通道級的預取計數,而不是鏈接級的或者基於大小的預取。
AMQP模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以致於AMQP 0-9-1 明確的定義了它們,而且應用開發者們無需費心思思考這些屬性名字所表明的具體含義。例如:
有些屬性是被AMQP代理所使用的,可是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱做消息頭(headers)。他們跟HTTP協議的X-Headers很類似。消息屬性須要在消息被髮布的時候定義。
AMQP的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被AMQP代理看成不透明的字節數組來對待。消息代理不會檢查或者修改有效載荷。消息能夠只包含屬性而不攜帶有效載荷。它一般會使用相似JSON這種序列化的格式數據,爲了節省,協議緩衝器和MessagePack將結構化數據序列化,以便以消息的有效載荷的形式發佈。AMQP及其同行者們一般使用"content-type" 和 "content-encoding" 這兩個字段來與消息溝通進行有效載荷的辨識工做,但這僅僅是基於約定而已。
消息可以以持久化的方式發佈,AMQP代理會將此消息存儲在磁盤上。若是服務器重啓,系統會確認收到的持久化消息未丟失。簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,並不會使得此消息具備持久化性質:它徹底取決與消息自己的持久模式(persistence mode)。將消息以持久化方式發佈時,會對性能形成必定的影響(就像數據庫操做同樣,健壯性的存在一定形成一些性能犧牲)。
因爲網絡的不肯定性和應用失敗的可能性,處理確認回執(acknowledgement)就變的十分重要。有時咱們確認消費者收到消息就能夠了,有時確認回執意味着消息已被驗證而且處理完畢,例如對某些數據已經驗證完畢而且進行了數據存儲或者索引操做。
這種情形很常見,因此 AMQP 0-9-1 內置了一個功能叫作 消息確認(message acknowledgements),消費者用它來確認消息已經被接收或者處理。若是一個應用崩潰掉(此時鏈接會斷掉,因此AMQP代理亦會得知),並且消息的確認回執功能已經被開啓,可是消息代理還沒有得到確認回執,那麼消息會被重新放入隊列(而且在還有還有其餘消費者存在於此隊列的前提下,當即投遞給另一個消費者)。
協議內置的消息確認功能將幫助開發者創建強大的軟件。
AMQP 0-9-1由許多方法(methods)構成。方法便是操做,這跟面向對象編程中的方法沒半毛錢關係。AMQP的方法被分組在類(class)中。這裏的類僅僅是對AMQP方法的邏輯分組而已。在 AMQP 0-9-1參考中有對AMQP方法的詳細介紹。
讓咱們來看看交換機類,有一組方法被關聯到了交換機的操做上。這些方法以下所示:
(請注意,RabbitMQ網站參考中包含了特用於RabbitMQ的交換機類的擴展,這裏咱們不對其進行討論)
以上的操做來自邏輯上的配對:exchange.declare 和 exchange.declare-ok,exchange.delete 和 exchange.delete-ok. 這些操做分爲「請求 - requests」(由客戶端發送)和「響應 - responses」(由代理髮送,用來回應以前提到的「請求」操做)。
以下的例子:客戶端要求消息代理使用exchange.declare方法聲明一個新的交換機:
如上圖所示,exchange.declare方法攜帶了好幾個參數。這些參數能夠容許客戶端指定交換機名稱、類型、是否持久化等等。
操做成功後,消息代理使用exchange.declare-ok方法進行迴應:
exchange.declare-ok方法除了通道號以外沒有攜帶任何其餘參數(通道-channel 會在本指南稍後章節進行介紹)。
AMQP隊列類的配對方法 - queue.declare方法 和 queue.declare-ok有着與其餘配對方法很是類似的一系列事件:
不是全部的AMQP方法都有與其配對的「另外一半」。許多(basic.publish是最被普遍使用的)都沒有相對應的「響應」方法,另一些(如basic.get)有着一種以上與之對應的「響應」方法。
AMQP鏈接一般是長鏈接。AMQP是一個使用TCP提供可靠投遞的應用層協議。AMQP使用認證機制而且提供TLS(SSL)保護。當一個應用再也不須要鏈接到AMQP代理的時候,須要優雅的釋放掉AMQP鏈接,而不是直接將TCP鏈接關閉。
有些應用須要與AMQP代理創建多個鏈接。不管怎樣,同時開啓多個TCP鏈接都是不合適的,由於這樣作會消耗掉過多的系統資源而且使得防火牆的配置更加困難。AMQP 0-9-1提供了通道(channels)來處理多鏈接,能夠把通道理解成共享一個TCP鏈接的多個輕量化鏈接。
在涉及多線程/進程的應用中,爲每一個線程/進程開啓一個通道(channel)是很常見的,而且這些通道不能被線程/進程共享。
一個特定通道上的通信與其餘通道上的通信是徹底隔離的,所以每一個AMQP方法都須要攜帶一個通道號,這樣客戶端就能夠指定此方法是爲哪一個通道準備的。
爲了在一個單獨的代理上實現多個隔離的環境(用戶、用戶組、交換機、隊列 等),AMQP提供了一個虛擬主機(virtual hosts - vhosts)的概念。這跟Web servers虛擬主機概念很是類似,這爲AMQP實體提供了徹底隔離的環境。當鏈接被創建的時候,AMQP客戶端來指定使用哪一個虛擬主機。
AMQP 0-9-1 擁有多個擴展點:
這些特性使得AMQP 0-9-1模型更加靈活,而且可以適用於解決更加寬泛的問題。
AMQP 0-9-1 擁有衆多的適用於各類流行語言和框架的客戶端。其中一部分嚴格遵循AMQP規範,提供AMQP方法的實現。另外一部分提供了額外的技術,方便使用的方法和抽象。有些客戶端是異步的(非阻塞的),有些是同步的(阻塞的),有些將這二者同時實現。有些客戶端支持「供應商的特定擴展」(例如RabbitMQ的特定擴展)。
由於AMQP的主要目標之一就是實現交互性,因此對於開發者來說,瞭解協議的操做方法而不是隻停留在弄懂特定客戶端的庫就顯得十分重要。這樣一來,開發者使用不一樣類型的庫與協議進行溝通時就會容易的多。
sudo apt-get install rabbitmq-server
安裝rabbitmq,以後開啓rabbitmq的web管理頁面sudo /usr/sbin/rabbitmq-plugins enable rabbitmq_management
開啓rabbitmq的web管理頁面插件sudo /etc/init.d/rabbitmq-server start
啓動rabbitmq服務sudo netstat -ntlp
查看,若是開啓了web管理頁面的話,應該能看到5672,15672,25672幾個端口被打開在實現通信以前須要注意幾個點,當咱們要使用遠程來鏈接rabbitMQ的時候須要增長一個用戶並設置權限:
在rabbitMQ的主機終端執行如下命令:
# 啓動rabbitMQ服務端<br>sudo /etc.init.d/rabbitmq-server start<br># 建立一個用戶
sudo rabbitmqctl add_user gyc
123123
# 設置用戶爲administrator角色
dudo rabbitmqctl set_user_tags gyc administrator
# 設置權限
sudo rabbitmqctl set_permissions
-
p
"/"
gyc
'.'
'.'
'.'
# 而後重啓rabbiMQ服務
sudo
/
etc
/
init.d
/
rabbitmq
-
server restart
# 而後可使用剛纔的用戶遠程鏈接rabbitmq server了。
python鏈接rabbitmq須要安裝一個包,pika,安裝命令:pip3 install pika
。
例1:direct交換機demo 單播
rabbit_direct_publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import
pika
# 連接rabbitMQ
import
sys
# 創建TCP連接
credentials
=
pika.PlainCredentials(
"gyc"
,
"123123"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
"192.168.14.63"
, credentials
=
credentials))
# 創建一個通道
channel
=
connection.channel()
# 聲明交換機名稱和類型
channel.exchange_declare(exchange
=
"direct_logs"
,
type
=
"direct"
)
serverity
=
sys.argv[
1
]
if
len
(sys.argv) >
1
else
"info"
# 獲取routing_key
message
=
" "
.join(sys.argv[
2
:])
or
"info: Hello World"
# 獲取消息內容
# RabbitMQ消息不能直接發送到隊列,它須要經歷一個交換的過程。
channel.basic_publish(
exchange
=
"direct_logs"
,
# 指定消息發送到的交換機名稱
routing_key
=
serverity,
# 指定消息的routing_key
body
=
message
# 消息體
)
print
(
"[x] Send 'Hello World!'"
)
connection.close()
|
server端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import
pika
import
sys
print
(sys.argv)
credentials
=
pika.PlainCredentials(
"gyc"
,
"123123"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
"192.168.14.63"
, credentials
=
credentials))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
"direct_logs"
,
type
=
"direct"
)
result
=
channel.queue_declare(exclusive
=
True
)
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會再使用此queue的消費者
# 斷開後,自動將queue刪除
queue_name
=
result.method.queue
# 取rabbitmq-server返回的queue名稱
serverities
=
sys.argv[
1
:]
# 獲取參數
if
not
serverities:
# 若是沒有參數會拋出異常,提示應該填寫什麼參數
sys.stderr.write(
"Usage: %s[info] [warning] [error]\n"
%
sys.argv[
0
])
sys.exit(
1
)
for
serverity
in
serverities:
# 將該隊列經過不一樣的routing_key綁定到交換機direct_logs上
channel.queue_bind(
exchange
=
"direct_logs"
,
queue
=
queue_name,
routing_key
=
serverity
)
print
(
"[*] Waiting for logs. To exit press CTRL+C"
)
def
callback(ch, method, properties, body):
# 定義回調函數
print
(
"[x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(
callback,
queue
=
queue_name,
)
channel.start_consuming()
# 啓動偵聽消息隊列<br><br># 注:sys參數須要到終端上執行該py文件填寫
|
例子2:扇形交換機demo:廣播
rabbit_fanout_publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import
pika
import
sys
credentials
=
pika.PlainCredentials(
"gyc"
,
"123123"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
"192.168.14.63"
,
credentials
=
credentials
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
"logs"
,
# 聲明交換機logs 類型爲fanout
type
=
"fanout"
)
message
=
"
".join(sys.argv[1:]) or "
info: Hello World!"
channel.basic_publish(
exchange
=
"logs"
,
# 指定消息發送到交換機的名稱
routing_key
=
"",
# fanout類型爲廣播,因此不須要指定routing_key,全部鏈接到該交換機的消息隊列都將能收到發過來消息
body
=
message
)
print
(
" [x] Send %r"
%
message)
connection.close()
|
rabbit_fanout_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import
pika
credentials
=
pika.PlainCredentials(
"gyc"
,
"3778627"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
"192.168.14.63"
,
credentials
=
credentials
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
"logs"
,
type
=
"fanout"
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
channel.queue_bind(exchange
=
"logs"
,
queue
=
queue_name)
print
(
"[*] Waiting for logs. To exit press CTRL+C"
)
def
callback(ch, method, properties, body):
print
(
"[x] %r"
%
body)
channel.basic_consume(
callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
例子3:topic交換機demo:頭交換機
rabbit_topic_publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import
pika
import
sys
credentials
=
pika.PlainCredentials(
"gyc"
,
"123123"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
"192.168.14.63"
, credentials
=
credentials
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
"topic_logs"
,
type
=
"topic"
)
routing_key
=
sys.argv[
1
]
if
len
(sys.argv) >
1
else
"anonymous.info"
message
=
"
".join(sys.argv[2:]) or "
Hello World!"
channel.basic_publish(
exchange
=
"topic_logs"
,
routing_key
=
routing_key,
body
=
message
)
print
(
" [x] Send %r:%r"
%
(routing_key, message))
connection.close()
|
rabbit_topic_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import
pika
import
sys
credentials
=
pika.PlainCredentials(
"gyc"
,
"3778627"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
"192.168.14.63"
, credentials
=
credentials
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
"topic_logs"
,
type
=
"topic"
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
binding_keys
=
sys.argv[
1
:]
if
not
binding_keys:
sys.stderr.write(
"Usage: %s [binding_keys]\n"
%
sys.argv[
0
])
sys.exit()
for
binding_key
in
binding_keys:
channel.queue_bind(exchange
=
"topic_logs"
,
queue
=
queue_name,
routing_key
=
binding_key)
print
(
"[*] Waiting for logs. To exit press CTRL+C"
)
def
callback(ch, method, properties, body):
print
(
"[x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback, queue
=
queue_name)
channel.start_consuming()
# 注:binding_keys就是你要設置的消息的鍵,須要在publish側的routing_key能匹配到
|
例子4:rpc調用demo:
rpc_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
import
pika
import
uuid
class
FibonacciRpcClient:
def
__init__(
self
):
self
.credentials
=
pika.PlainCredentials(
"gyc"
,
"123123"
)
self
.connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
"192.168.14.63"
, credentials
=
self
.credentials
))
# 初始化, 創建TCP鏈接,前面加self是由於咱們要在其餘地方進行調用
self
.channel
=
self
.connection.channel()
result
=
self
.channel.queue_declare(exclusive
=
True
)
self
.callback_queue
=
result.method.queue
self
.channel.basic_consume(
self
.on_response, no_ack
=
True
, queue
=
self
.callback_queue)
# 綁定回調函數和消息隊列
def
on_response(
self
, ch, method, props, body):
# 定義回調函數
if
self
.corr_id
=
=
props.correlation_id:
self
.response
=
body
def
call(
self
, n):
self
.response
=
None
self
.corr_id
=
str
(uuid.uuid4())
# 獲取UUID, 隨機惟一值,用來標識返回的數據是客戶端所須要的
self
.channel.basic_publish(exchange
=
"",
# 以routing_key爲rpc_queue,在消息頭中定義reply_to告訴server消息回給client生成的消息隊列,而且消息的correlation_id爲client本身生成的uuid,server回消息時也將會帶上這個uuid
routing_key
=
"rpc_queue"
,
properties
=
pika.BasicProperties(
reply_to
=
self
.callback_queue,
correlation_id
=
self
.corr_id,
),
# 告訴server端回消息的隊列,返回的時候帶上這個uuid
body
=
str
(n))
# 消息體
while
self
.response
is
None
:
# 循環檢測self.response是否有返回值
self
.connection.process_data_events()
'''Will make sure that data events are processed. Dispatches timer and
channel callbacks if not called from the scope of BlockingConnection or
BlockingChannel callback. Your app can block on this method.
:param float time_limit:suggested upper bound on processing time in
seconds. The actual blocking time depends on the granularity of the
underlying ioloop. Zero means return as soon as possible. None means
there is no limit on processing time and the function will block
until I/O produces actionalable events. Defaults to 0 for backward
compatibility. This parameter is NEW in pika 0.10.0.
該方法能夠傳遞參數time_limit=0,默認爲0,即爲不阻塞的檢測channel是否有消息回來,若是
有消息接收到,則執行回調函數,當time_limit不爲0時,將每次檢測阻塞time_limit秒。該方法
在pika0.10.0中新加。
'''
return
int
(
self
.response)
fibonacci_rpc
=
FibonacciRpcClient()
print
(
"[x] Requesting fib(30)"
)
response
=
fibonacci_rpc.call(
30
)
print
(
"[.] Got %r"
%
response)
|
rpc_server.py
import
pika
credentials
=
pika.PlainCredentials(
"gyc"
,
"123123"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
"192.168.14.63"
, credentials
=
credentials
))
channel
=
connection.channel()
channel.queue_declare(queue
=
"rpc_queue"
)
# 聲明queue名稱爲rpc_queue
def
fib(n):
# 斐波那契計算函數
if
n
=
=
0
:
return
0
elif
n
=
=
1
:
return
1
else
:
return
fib(n
-
1
)
+
fib(n
-
2
)
def
on_request(ch, method, props, body):
n
=
int
(body)
print
(
"[.] fib(%s)"
%
n)
response
=
fib(n)
ch.basic_publish(exchange
=
"",
routing_key
=
props.reply_to,
properties
=
pika.BasicProperties(
correlation_id
=
props.correlation_id
),
body
=
str
(response))
ch.basic_ack(delivery_tag
=
method.delivery_tag)
# 給rabbitmq_server發送ack確認消息
channel.basic_qos(prefetch_count
=
1
)
channel.basic_consume(on_request, queue
=
"rpc_queue"
)
print
(
"[x] Waiting RPC requests"
)
channel.start_consuming()
轉自 https://www.cnblogs.com/sxzwj/p/6422870.html