Rabbit MQ

AMQP簡介

  介紹rabbitMQ以前。先介紹一下AMQP協議,由於rabbitMQ是基於AMQP協議實現的一個服務程序。(目前爲止應該也是惟一實現了AMQP協議的服務)html

AMQP定義

  AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware brockers)之間進行通訊。前端

消息代理和他們所扮演的角色

 

消息代理(message brokers)從發佈者(publishers)亦稱生產者(producers)那兒接收消息,並根據既定的路由規則把接收到的消息發送給處理消息的消費者(consumers)。python

因爲AMQP是一個網絡協議,因此這個過程當中的發佈者,消費者,消息代理 能夠存在於不一樣的設備上。web

AMQP 0-9-1 模型簡介

AMQP 0-9-1的工做過程以下圖:消息(message)被髮布者(publisher)發送給交換機(exchange),交換機經常被比喻成郵局或者郵箱。而後交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最後AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。算法

發佈者(publisher)發佈消息時能夠給消息指定各類消息屬性(message meta-data)。有些屬性有可能會被消息代理(brokers)使用,然而其餘的屬性則是徹底不透明的,它們只能被接收消息的應用所使用。數據庫

從安全角度考慮,網絡是不可靠的,接收消息的應用也有可能在處理消息的時候失敗。基於此緣由,AMQP模塊包含了一個消息確認(message acknowledgements)的概念:當一個消息從隊列中投遞給消費者後(consumer),消費者會通知一下消息代理(broker),這個能夠是自動的也能夠由處理消息的應用的開發者執行。當「消息確認」被啓用的時候,消息代理不會徹底將消息從隊列中刪除,直到它收到來自消費者的確認回執(acknowledgement)。編程

在某些狀況下,例如當一個消息沒法被成功路由時,消息或許會被返回給發佈者並被丟棄。或者,若是消息代理執行了延期操做,消息會被放入一個所謂的死信隊列中。此時,消息發佈者能夠選擇某些參數來處理這些特殊狀況。數組

隊列,交換機和綁定統稱爲AMQP實體(AMQP entities)。瀏覽器

 

AMQP是一個可編的程協議

AMQP 0-9-1是一個可編程協議,某種意義上說AMQP的實體和路由規則是由應用自己定義的,而不是由消息代理定義。包括像聲明隊列和交換機,定義他們之間的綁定,訂閱隊列等等關於協議自己的操做。安全

這雖然能讓開發人員自由發揮,但也須要他們注意潛在的定義衝突。固然這在實踐中不多會發生,若是發生,會以配置錯誤(misconfiguration)的形式表現出來。

應用程序(Applications)聲明AMQP實體,定義須要的路由方案,或者刪除再也不須要的AMQP實體。 

交換機和交換機類型

交換機是用來發送消息的AMQP實體。交換機拿到一個消息以後將它路由給一個或零個隊列。它使用哪一種路由算法是由交換機類型和被稱做綁定(bindings)的規則所決定的。AMQP 0-9-1的代理提供了四種交換機

Name(交換機類型) Default pre-declared names(預聲明的默認名稱)
Direct exchange(直連交換機) (Empty string) and amq.direct
Fanout exchange(扇型交換機) amq.fanout
Topic exchange(主題交換機) amq.topic
Headers exchange(頭交換機) amq.match (and amq.headers in RabbitMQ)

 除交換機類型外,在聲明交換機時還能夠附帶許多其餘的屬性,其中最重要的幾個分別是:

  • Name (交換機的名字)
  • Type (交換機的種類)
  • Passive (被動模式,布爾值)
  • Durable (消息代理重啓後,交換機是否還存在)
  • Auto_delete (當全部與之綁定的消息隊列都完成了對此交換機的使用後,刪掉它)
  • Arguments(依賴代理自己)

交換機能夠有兩個狀態:持久(durable)和暫存(transient)。持久化的交換機會在消息代理(broker)重啓後依舊存在,而暫存的交換機則不會(它們須要在代理再次上線後從新被聲明)。然而並非全部的應用場景都須要持久化的交換機。 

默認交換機

默認交換機(default exchange)其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。

舉個栗子:當你聲明瞭一個名爲"search-indexing-online"的隊列,AMQP代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是"search-indexing-online"。所以,當攜帶着名爲"search-indexing-online"的路由鍵的消息被髮送到默認交換機的時候,此消息會被默認交換機路由至名爲"search-indexing-online"的隊列中。換句話說,默認交換機貌似可以直接將消息投遞給隊列,儘管技術上並無作相關的操做。

 

直連交換機

直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。直連交換機用來處理消息的單播路由(unicast routing)(儘管它也能夠處理多播路由)。下邊介紹它是如何工做的:

  • 將一個隊列綁定到某個交換機上,同時賦予該綁定一個路由鍵(routing key)
  • 當一個攜帶着路由鍵爲R的消息被髮送給直連交換機時,交換機會把它路由給綁定值一樣爲R的隊列。

直連交換機常常用來循環分發任務給多個工做者(workers)。當這樣作的時候,咱們須要明白一點,在AMQP 0-9-1中,消息的負載均衡是發生在消費者(consumer)之間的,而不是隊列(queue)之間。

直連型交換機圖例:

 

扇型交換機

扇型交換機(funout exchange)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。

由於扇型交換機投遞消息的拷貝到全部綁定到它的隊列,因此他的應用案例都極其類似:

  • 大規模多用戶在線(MMO)遊戲可使用它來處理排行榜更新等全局事件
  • 體育新聞網站能夠用它來近乎實時地將比分更新分發給移動客戶端
  • 分發系統使用它來廣播各類狀態和配置更新
  • 在羣聊的時候,它被用來分發消息給參與羣聊的用戶。(AMQP沒有內置presence的概念,所以XMPP可能會是個更好的選擇)

扇型交換機圖例:

 

 

主題交換機

主題交換機(topic exchanges)經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。

主題交換機擁有很是普遍的用戶案例。不管什麼時候,當一個問題涉及到那些想要有針對性的選擇須要接收消息的 多消費者/多應用(multiple consumers/applications) 的時候,主題交換機均可以被列入考慮範圍。

使用案例:

  • 分發有關於特定地理位置的數據,例如銷售點
  • 由多個工做者(workers)完成的後臺任務,每一個工做者負責處理某些特定的任務
  • 股票價格更新(以及其餘類型的金融數據更新)
  • 涉及到分類或者標籤的新聞更新(例如,針對特定的運動項目或者隊伍)
  • 雲端的不一樣種類服務的協調
  • 分佈式架構/基於系統的軟件封裝,其中每一個構建者僅能處理一個特定的架構或者系統。

 

頭交換機

有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。

咱們能夠綁定一個隊列到頭交換機上,並給他們之間的綁定使用多個用於匹配的頭(header)。這個案例中,消息代理得從應用開發者那兒取到更多一段信息,換句話說,它須要考慮某條消息(message)是須要部分匹配仍是所有匹配。上邊說的「更多一段消息」就是"x-match"參數。當"x-match"設置爲「any」時,消息頭的任意一個值被匹配就能夠知足條件,而當"x-match"設置爲「all」的時候,就須要消息頭的全部值都匹配成功。

頭交換機能夠視爲直連交換機的另外一種表現形式。頭交換機可以像直連交換機同樣工做,不一樣之處在於頭交換機的路由規則是創建在頭屬性值之上,而不是路由鍵。路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,它們甚至能夠是整數或者哈希值(字典)等。

 

隊列

AMQP中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。

  • Name
  • Durable(消息代理重啓後,隊列依舊存在)
  • Exclusive(只被一個鏈接(connection)使用,並且當鏈接關閉後隊列即被刪除)
  • Auto_delete(當最後一個消費者退訂後即被刪除)
  • Arguments(一些消息代理用他來完成相似與TTL的某些額外功能)

隊列在聲明(declare)後才能被使用。若是一個隊列尚不存在,聲明一個隊列會建立它。若是聲明的隊列已經存在,而且屬性徹底相同,那麼這次聲明不會對原有隊列產生任何影響。若是聲明中的屬性與已存在隊列的屬性有差別,那麼一個錯誤代碼爲406的通道級異常就會被拋出。

 

隊列名稱

隊列的名字能夠由應用(application)來取,也可讓消息代理(broker)直接生成一個。隊列的名字能夠是最多255字節的一個utf-8字符串。若但願AMQP消息代理生成隊列名,須要給隊列的name參數賦值一個空字符串:在同一個通道(channel)的後續的方法(method)中,咱們可使用空字符串來表示以前生成的隊列名稱。之因此以後的方法能夠獲取正確的隊列名是由於通道能夠默默地記住消息代理最後一次生成的隊列名稱。

以"amq."開始的隊列名稱被預留作消息代理內部使用。若是試圖在隊列聲明時打破這一規則的話,一個通道級的403 (ACCESS_REFUSED)錯誤會被拋出。

 

隊列持久化

持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啓的時候,它依舊存在。沒有被持久化的隊列稱做暫存隊列(Transient queues)。並非全部的場景和案例都須要將隊列持久化。

持久化的隊列並不會使得路由到它的消息也具備持久性。假若消息代理掛掉了,從新啓動,那麼在重啓的過程當中持久化隊列會被從新聲明,不管怎樣,只有通過持久化的消息才能被從新恢復。

 

綁定

綁定(Binding)是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則。若是要指示交換機「E」將消息路由給隊列「Q」,那麼「Q」就須要與「E」進行綁定。綁定操做須要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。路由鍵的意義在於從發送給交換機的衆多消息中選擇出某些消息,將其路由給綁定的隊列。

打個比方:

  • 隊列(queue)是咱們想要去的位於紐約的目的地
  • 交換機(exchange)是JFK機場
  • 綁定(binding)就是JFK機場到目的地的路線。可以到達目的地的路線能夠是一條或者多條

擁有了交換機這個中間層,不少由發佈者直接到隊列難以實現的路由方案可以得以實現,而且避免了應用開發者的許多重複勞動。

若是AMQP的消息沒法路由到隊列(例如,發送到的交換機沒有綁定隊列),消息會被就地銷燬或者返還給發佈者。如何處理取決於發佈者設置的消息屬性。

 

消費者

消息若是隻是存儲在隊列裏是沒有任何用處的。被應用消費掉,消息的價值纔可以體現。在AMQP 0-9-1 模型中,有兩種途徑能夠達到此目的:

  • 將消息投遞給應用 ("push API")
  • 應用根據須要主動獲取消息 ("pull API")

使用push API,應用(application)須要明確表示出它在某個特定隊列裏所感興趣的,想要消費的消息。如是,咱們能夠說應用註冊了一個消費者,或者說訂閱了一個隊列。一個隊列能夠註冊多個消費者,也能夠註冊一個獨享的消費者(當獨享消費者存在時,其餘消費者即被排除在外)。

每一個消費者(訂閱者)都有一個叫作消費者標籤的標識符。它能夠被用來退訂消息。消費者標籤其實是一個字符串。

 

消息確認

消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。並且網絡緣由也有可能引發各類問題。這就給咱們出了個難題,AMQP代理在何時刪除消息纔是正確的?AMQP 0-9-1 規範給咱們兩種建議:

  • 當消息代理(broker)將消息發送給應用後當即刪除。(使用AMQP方法:basic.deliver或basic.get-ok)
  • 待應用(application)發送一個確認回執(acknowledgement)後再刪除消息。(使用AMQP方法:basic.ack)

前者被稱做自動確認模式(automatic acknowledgement model),後者被稱做顯式確認模式(explicit acknowledgement model)。在顯式模式下,由消費者應用來選擇何時發送確認回執(acknowledgement)。應用能夠在收到消息後當即發送,或將未處理的消息存儲後發送,或等到消息被處理完畢後再發送確認回執(例如,成功獲取一個網頁內容並將其存儲以後)。

若是一個消費者在還沒有發送確認回執的狀況下掛掉了,那AMQP代理會將消息從新投遞給另外一個消費者。若是當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,而後再次嘗試投遞。

 

拒絕消息

當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。應用能夠向消息代理代表,本條消息因爲「拒絕消息(Rejecting Messages)」的緣由處理失敗了(或者未能在此時完成)。當拒絕某條消息時,應用能夠告訴消息代理如何處理這條消息——銷燬它或者從新放入隊列。當此隊列只有一個消費者時,請確認不要因爲拒絕消息而且選擇了從新放入隊列的行爲而引發消息在同一個消費者身上無限循環的狀況發生。

Negative Acknowledgements

在AMQP中,basic.reject方法用來執行拒絕消息的操做。但basic.reject有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。可是若是你使用的是RabbitMQ,那麼你可使用被稱做negative acknowledgements(也叫nacks)的AMQP 0-9-1擴展來解決這個問題。更多的信息請參考幫助頁面

預取消息

在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每一個消費者一次能夠接受多少條消息是很是有用的。這能夠在試圖批量發佈消息的時候起到簡單的負載均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生產應用每分鐘才發送一條消息,這說明處理工做尚在運行。)

注意,RabbitMQ只支持通道級的預取計數,而不是鏈接級的或者基於大小的預取。

消息屬性和有效載荷(消息主體)

AMQP模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以致於AMQP 0-9-1 明確的定義了它們,而且應用開發者們無需費心思思考這些屬性名字所表明的具體含義。例如:

  • Content type(內容類型)
  • Content encoding(內容編碼)
  • Routing key(路由鍵)
  • Delivery mode (persistent or not)
    投遞模式(持久化 或 非持久化)
  • Message priority(消息優先權)
  • Message publishing timestamp(消息發佈的時間戳)
  • Expiration period(消息有效期)
  • Publisher application id(發佈應用的ID)

有些屬性是被AMQP代理所使用的,可是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱做消息頭(headers)。他們跟HTTP協議的X-Headers很類似。消息屬性須要在消息被髮布的時候定義。

AMQP的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被AMQP代理看成不透明的字節數組來對待。消息代理不會檢查或者修改有效載荷。消息能夠只包含屬性而不攜帶有效載荷。它一般會使用相似JSON這種序列化的格式數據,爲了節省,協議緩衝器和MessagePack將結構化數據序列化,以便以消息的有效載荷的形式發佈。AMQP及其同行者們一般使用"content-type" 和 "content-encoding" 這兩個字段來與消息溝通進行有效載荷的辨識工做,但這僅僅是基於約定而已。

消息可以以持久化的方式發佈,AMQP代理會將此消息存儲在磁盤上。若是服務器重啓,系統會確認收到的持久化消息未丟失。簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,並不會使得此消息具備持久化性質:它徹底取決與消息自己的持久模式(persistence mode)。將消息以持久化方式發佈時,會對性能形成必定的影響(就像數據庫操做同樣,健壯性的存在一定形成一些性能犧牲)。

 

消息確認

因爲網絡的不肯定性和應用失敗的可能性,處理確認回執(acknowledgement)就變的十分重要。有時咱們確認消費者收到消息就能夠了,有時確認回執意味着消息已被驗證而且處理完畢,例如對某些數據已經驗證完畢而且進行了數據存儲或者索引操做。

這種情形很常見,因此 AMQP 0-9-1 內置了一個功能叫作 消息確認(message acknowledgements),消費者用它來確認消息已經被接收或者處理。若是一個應用崩潰掉(此時鏈接會斷掉,因此AMQP代理亦會得知),並且消息的確認回執功能已經被開啓,可是消息代理還沒有得到確認回執,那麼消息會被重新放入隊列(而且在還有還有其餘消費者存在於此隊列的前提下,當即投遞給另一個消費者)。

協議內置的消息確認功能將幫助開發者創建強大的軟件。

 

AMQP 0-9-1 方法

AMQP 0-9-1由許多方法(methods)構成。方法便是操做,這跟面向對象編程中的方法沒半毛錢關係。AMQP的方法被分組在類(class)中。這裏的類僅僅是對AMQP方法的邏輯分組而已。在 AMQP 0-9-1參考中有對AMQP方法的詳細介紹。

讓咱們來看看交換機類,有一組方法被關聯到了交換機的操做上。這些方法以下所示:

  • exchange.declare
  • exchange.declare-ok
  • exchange.delete
  • exchange.delete-ok

(請注意,RabbitMQ網站參考中包含了特用於RabbitMQ的交換機類的擴展,這裏咱們不對其進行討論)

以上的操做來自邏輯上的配對:exchange.declare 和 exchange.declare-ok,exchange.delete 和 exchange.delete-ok. 這些操做分爲「請求 - requests」(由客戶端發送)和「響應 - responses」(由代理髮送,用來回應以前提到的「請求」操做)。

以下的例子:客戶端要求消息代理使用exchange.declare方法聲明一個新的交換機:

  

如上圖所示,exchange.declare方法攜帶了好幾個參數。這些參數能夠容許客戶端指定交換機名稱、類型、是否持久化等等。

操做成功後,消息代理使用exchange.declare-ok方法進行迴應:

 

 

exchange.declare-ok方法除了通道號以外沒有攜帶任何其餘參數(通道-channel 會在本指南稍後章節進行介紹)。

AMQP隊列類的配對方法 - queue.declare方法 和 queue.declare-ok有着與其餘配對方法很是類似的一系列事件:

 

 

 不是全部的AMQP方法都有與其配對的「另外一半」。許多(basic.publish是最被普遍使用的)都沒有相對應的「響應」方法,另一些(如basic.get)有着一種以上與之對應的「響應」方法。

鏈接

AMQP鏈接一般是長鏈接。AMQP是一個使用TCP提供可靠投遞的應用層協議。AMQP使用認證機制而且提供TLS(SSL)保護。當一個應用再也不須要鏈接到AMQP代理的時候,須要優雅的釋放掉AMQP鏈接,而不是直接將TCP鏈接關閉。

通道

有些應用須要與AMQP代理創建多個鏈接。不管怎樣,同時開啓多個TCP鏈接都是不合適的,由於這樣作會消耗掉過多的系統資源而且使得防火牆的配置更加困難。AMQP 0-9-1提供了通道(channels)來處理多鏈接,能夠把通道理解成共享一個TCP鏈接的多個輕量化鏈接。

在涉及多線程/進程的應用中,爲每一個線程/進程開啓一個通道(channel)是很常見的,而且這些通道不能被線程/進程共享。

一個特定通道上的通信與其餘通道上的通信是徹底隔離的,所以每一個AMQP方法都須要攜帶一個通道號,這樣客戶端就能夠指定此方法是爲哪一個通道準備的。

虛擬主機

爲了在一個單獨的代理上實現多個隔離的環境(用戶、用戶組、交換機、隊列 等),AMQP提供了一個虛擬主機(virtual hosts - vhosts)的概念。這跟Web servers虛擬主機概念很是類似,這爲AMQP實體提供了徹底隔離的環境。當鏈接被創建的時候,AMQP客戶端來指定使用哪一個虛擬主機。

AMQP是可擴展的

AMQP 0-9-1 擁有多個擴展點:

  • 定製化交換機類型 可讓開發者們實現一些開箱即用的交換機類型還沒有很好覆蓋的路由方案。例如 geodata-based routing。
  • 交換機和隊列的聲明中能夠包含一些消息代理可以用到的額外屬性。例如RabbitMQ中的per-queue message TTL便是使用該方式實現。
  • 特定消息代理的協議擴展。例如RabbitMQ所實現的擴展。
  • 新的 AMQP 0-9-1 方法類可被引入。
  • 消息代理能夠被其餘的插件擴展,例如RabbitMQ的管理前端 和 已經被插件化的HTTP API。

這些特性使得AMQP 0-9-1模型更加靈活,而且可以適用於解決更加寬泛的問題。

AMQP 0-9-1 客戶端生態系統

AMQP 0-9-1 擁有衆多的適用於各類流行語言和框架的客戶端。其中一部分嚴格遵循AMQP規範,提供AMQP方法的實現。另外一部分提供了額外的技術,方便使用的方法和抽象。有些客戶端是異步的(非阻塞的),有些是同步的(阻塞的),有些將這二者同時實現。有些客戶端支持「供應商的特定擴展」(例如RabbitMQ的特定擴展)。

由於AMQP的主要目標之一就是實現交互性,因此對於開發者來說,瞭解協議的操做方法而不是隻停留在弄懂特定客戶端的庫就顯得十分重要。這樣一來,開發者使用不一樣類型的庫與協議進行溝通時就會容易的多。

 

RabbitMQ

安裝

  • 使用sudo apt-get install rabbitmq-server安裝rabbitmq,以後開啓rabbitmq的web管理頁面
  • 使用sudo /usr/sbin/rabbitmq-plugins enable rabbitmq_management開啓rabbitmq的web管理頁面插件
  • 使用sudo /etc/init.d/rabbitmq-server start啓動rabbitmq服務
  • 打開瀏覽器訪問服務器的web頁面:http://ipaddr:15672,默認登陸帳號/密碼:guest/guest(爲安全起見,建議生產環境不使用該帳號)
  • rabbitmq的鏈接端口默認是5672端口,可使用sudo netstat -ntlp查看,若是開啓了web管理頁面的話,應該能看到5672,15672,25672幾個端口被打開

實現最簡單的隊列通訊

在實現通信以前須要注意幾個點,當咱們要使用遠程來鏈接rabbitMQ的時候須要增長一個用戶並設置權限:

在rabbitMQ的主機終端執行如下命令:

# 啓動rabbitMQ服務端<br>sudo /etc.init.d/rabbitmq-server start<br># 建立一個用戶
sudo rabbitmqctl add_user gyc  123123
# 設置用戶爲administrator角色
dudo rabbitmqctl set_user_tags gyc administrator
# 設置權限
sudo rabbitmqctl set_permissions  - "/"  gyc  '.' '.' '.'
# 而後重啓rabbiMQ服務
sudo  / etc / init.d / rabbitmq - server restart
 
# 而後可使用剛纔的用戶遠程鏈接rabbitmq server了。

 

應用

python鏈接rabbitmq須要安裝一個包,pika,安裝命令:pip3 install pika

例1:direct交換機demo 單播
rabbit_direct_publish.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import  pika   # 連接rabbitMQ
import  sys
 
# 創建TCP連接
credentials  =  pika.PlainCredentials( "gyc" "123123" )
connection  =  pika.BlockingConnection(pika.ConnectionParameters(host = "192.168.14.63" , credentials = credentials))
 
# 創建一個通道
channel  =  connection.channel()
 
# 聲明交換機名稱和類型
channel.exchange_declare(exchange = "direct_logs" ,
                          type = "direct" )
 
serverity  =  sys.argv[ 1 if  len (sys.argv) >  1  else  "info"   # 獲取routing_key
 
message  =  " " .join(sys.argv[ 2 :])  or  "info: Hello World"   # 獲取消息內容
 
# RabbitMQ消息不能直接發送到隊列,它須要經歷一個交換的過程。
channel.basic_publish(
     exchange = "direct_logs" ,   # 指定消息發送到的交換機名稱
     routing_key = serverity,   # 指定消息的routing_key
     body = message   # 消息體
)
 
print ( "[x] Send 'Hello World!'" )
 
connection.close()

server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import  pika
import  sys
 
print (sys.argv)
 
credentials  =  pika.PlainCredentials( "gyc" "123123" )
connection  =  pika.BlockingConnection(pika.ConnectionParameters(host = "192.168.14.63" , credentials = credentials))
 
channel  =  connection.channel()
 
channel.exchange_declare(exchange = "direct_logs" ,
                          type = "direct" )
 
result  =  channel.queue_declare(exclusive = True )   # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會再使用此queue的消費者
# 斷開後,自動將queue刪除
queue_name  =  result.method.queue   # 取rabbitmq-server返回的queue名稱
 
serverities  =  sys.argv[ 1 :]   # 獲取參數
 
if  not  serverities:   # 若是沒有參數會拋出異常,提示應該填寫什麼參數
     sys.stderr.write( "Usage: %s[info] [warning] [error]\n"  %  sys.argv[ 0 ])
     sys.exit( 1 )
 
for  serverity  in  serverities:   # 將該隊列經過不一樣的routing_key綁定到交換機direct_logs上
     channel.queue_bind(
         exchange = "direct_logs" ,
         queue = queue_name,
         routing_key = serverity
     )
 
print ( "[*] Waiting for logs. To exit press CTRL+C" )
 
 
def  callback(ch, method, properties, body):   # 定義回調函數
     print ( "[x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(
     callback,
     queue = queue_name,
)
 
channel.start_consuming()   # 啓動偵聽消息隊列<br><br># 注:sys參數須要到終端上執行該py文件填寫

例子2:扇形交換機demo:廣播
rabbit_fanout_publish.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import  pika
 
import  sys
 
credentials  =  pika.PlainCredentials( "gyc" "123123" )
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = "192.168.14.63" ,
     credentials = credentials
))
 
channel  =  connection.channel()
 
channel.exchange_declare(exchange = "logs" ,   # 聲明交換機logs 類型爲fanout
                          type = "fanout" )
 
message  =  " ".join(sys.argv[1:]) or " info: Hello World!"
 
channel.basic_publish(
     exchange = "logs" ,   # 指定消息發送到交換機的名稱
     routing_key = "",   # fanout類型爲廣播,因此不須要指定routing_key,全部鏈接到該交換機的消息隊列都將能收到發過來消息
     body = message
)
 
print ( " [x] Send %r"  %  message)
 
connection.close()

rabbit_fanout_consumer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import  pika
 
credentials  =  pika.PlainCredentials( "gyc" "3778627" )
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = "192.168.14.63" ,
     credentials = credentials
))
 
channel  =  connection.channel()
 
channel.exchange_declare(exchange = "logs" type = "fanout" )
 
result  =  channel.queue_declare(exclusive = True )
queue_name  =  result.method.queue
 
channel.queue_bind(exchange = "logs" ,
                    queue = queue_name)
 
print ( "[*] Waiting for logs. To exit press CTRL+C" )
 
 
def  callback(ch, method, properties, body):
     print ( "[x] %r"  %  body)
 
channel.basic_consume(
     callback,
     queue = queue_name,
     no_ack = True
)
 
channel.start_consuming()

例子3:topic交換機demo:頭交換機
rabbit_topic_publish.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import  pika
 
import  sys
 
credentials  =  pika.PlainCredentials( "gyc" "123123" )
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = "192.168.14.63" , credentials = credentials
))
 
channel  =  connection.channel()
 
channel.exchange_declare(exchange = "topic_logs" ,
                          type = "topic" )
 
routing_key  =  sys.argv[ 1 if  len (sys.argv) >  1  else  "anonymous.info"
 
message  =  " ".join(sys.argv[2:]) or " Hello World!"
 
channel.basic_publish(
     exchange = "topic_logs" ,
     routing_key = routing_key,
     body = message
)
 
print ( " [x] Send %r:%r"  %  (routing_key, message))
 
connection.close()

rabbit_topic_consumer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import  pika
 
import  sys
 
credentials  =  pika.PlainCredentials( "gyc" "3778627" )
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = "192.168.14.63" , credentials = credentials
))
 
channel  =  connection.channel()
 
channel.exchange_declare(exchange = "topic_logs" ,
                          type = "topic" )
 
result  =  channel.queue_declare(exclusive = True )
queue_name  =  result.method.queue
 
binding_keys  =  sys.argv[ 1 :]
 
if  not  binding_keys:
     sys.stderr.write( "Usage: %s [binding_keys]\n"  %  sys.argv[ 0 ])
     sys.exit()
 
for  binding_key  in  binding_keys:
     channel.queue_bind(exchange = "topic_logs" ,
                        queue = queue_name,
                        routing_key = binding_key)
 
print ( "[*] Waiting for logs. To exit press CTRL+C" )
 
 
def  callback(ch, method, properties, body):
     print ( "[x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback, queue = queue_name)
 
channel.start_consuming()
# 注:binding_keys就是你要設置的消息的鍵,須要在publish側的routing_key能匹配到

例子4:rpc調用demo:
rpc_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import  pika
 
import  uuid
 
 
class  FibonacciRpcClient:
     def  __init__( self ):
         self .credentials  =  pika.PlainCredentials( "gyc" "123123" )
         self .connection  =  pika.BlockingConnection(pika.ConnectionParameters(
             host = "192.168.14.63" , credentials = self .credentials
         ))   # 初始化, 創建TCP鏈接,前面加self是由於咱們要在其餘地方進行調用
 
         self .channel  =  self .connection.channel()
 
         result  =  self .channel.queue_declare(exclusive = True )
         self .callback_queue  =  result.method.queue
 
         self .channel.basic_consume( self .on_response, no_ack = True , queue = self .callback_queue)   # 綁定回調函數和消息隊列
 
     def  on_response( self , ch, method, props, body):   # 定義回調函數
         if  self .corr_id  = =  props.correlation_id:
             self .response  =  body
 
     def  call( self , n):
         self .response  =  None
         self .corr_id  =  str (uuid.uuid4())   # 獲取UUID, 隨機惟一值,用來標識返回的數據是客戶端所須要的
         self .channel.basic_publish(exchange = "",   # 以routing_key爲rpc_queue,在消息頭中定義reply_to告訴server消息回給client生成的消息隊列,而且消息的correlation_id爲client本身生成的uuid,server回消息時也將會帶上這個uuid
                                    routing_key = "rpc_queue" ,
                                    properties = pika.BasicProperties(
                                        reply_to = self .callback_queue,
                                        correlation_id = self .corr_id,
                                    ),   # 告訴server端回消息的隊列,返回的時候帶上這個uuid
                                    body = str (n))   # 消息體
         while  self .response  is  None :   # 循環檢測self.response是否有返回值
             self .connection.process_data_events()
             '''Will make sure that data events are processed. Dispatches timer and
             channel callbacks if not called from the scope of BlockingConnection or
             BlockingChannel callback. Your app can block on this method.
 
             :param float time_limit:suggested upper bound on processing time in
             seconds. The actual blocking time depends on the granularity of the
             underlying ioloop. Zero means return as soon as possible. None means
             there is no limit on processing time and the function will block
             until I/O produces actionalable events. Defaults to 0 for backward
             compatibility. This parameter is NEW in pika 0.10.0.
             該方法能夠傳遞參數time_limit=0,默認爲0,即爲不阻塞的檢測channel是否有消息回來,若是
             有消息接收到,則執行回調函數,當time_limit不爲0時,將每次檢測阻塞time_limit秒。該方法
             在pika0.10.0中新加。
             '''
         return  int ( self .response)
 
fibonacci_rpc  =  FibonacciRpcClient()
 
print ( "[x] Requesting fib(30)" )
 
response  =  fibonacci_rpc.call( 30 )
 
print ( "[.] Got %r"  %  response)

rpc_server.py

import  pika
 
credentials  =  pika.PlainCredentials( "gyc" "123123" )
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = "192.168.14.63" , credentials = credentials
))
 
channel  =  connection.channel()
 
channel.queue_declare(queue = "rpc_queue" # 聲明queue名稱爲rpc_queue
 
 
def  fib(n):
     # 斐波那契計算函數
     if  = =  0 :
         return  0
     elif  = =  1 :
         return  1
     else :
         return  fib(n  -  1 +  fib(n  -  2 )
 
 
def  on_request(ch, method, props, body):
     =  int (body)
 
     print ( "[.] fib(%s)"  %  n)
 
     response  =  fib(n)
 
     ch.basic_publish(exchange = "",
                      routing_key = props.reply_to,
                      properties = pika.BasicProperties(
                          correlation_id = props.correlation_id
                      ),
                      body = str (response))
 
     ch.basic_ack(delivery_tag = method.delivery_tag)   # 給rabbitmq_server發送ack確認消息
 
 
channel.basic_qos(prefetch_count = 1 )
 
channel.basic_consume(on_request, queue = "rpc_queue" )
 
print ( "[x] Waiting RPC requests" )
 
channel.start_consuming()

 轉自 https://www.cnblogs.com/sxzwj/p/6422870.html

相關文章
相關標籤/搜索