來源:高廣超,算法
1. 歷史服務器
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民羣衆的需求,雖然在同步消息通信的世界裏有不少公開標準(如 COBAR的 IIOP ,或者是 SOAP 等),可是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),所以,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標準。架構
RabbitMQ是由RabbitMQ Technologies Ltd開發而且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被併入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不一樣的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,如今並無上市。併發
RabbitMQ的官網是http://www.rabbitmq.comapp
2. 應用場景異步
言歸正傳。RabbitMQ,或者說AMQP解決了什麼問題,或者說它的應用場景是什麼?socket
對於一個大型的軟件系統來講,它會有不少的組件或者說模塊或者說子系統或者(subsystem or Component or submodule)。那麼這些模塊的如何通訊?這和傳統的IPC有很大的區別。傳統的IPC不少都是在單一系統上的,模塊耦合性很大,不適合擴展(Scalability);若是使用socket那麼不一樣的模塊的確能夠部署到不一樣的機器上,可是仍是有不少問題須要解決。好比:ide
1)信息的發送者和接收者如何維持這個鏈接,若是一方的鏈接中斷,這期間的數據如何方式丟失?高併發
2)如何下降發送者和接收者的耦合度?性能
3)如何讓Priority高的接收者先接到數據?
4)如何作到load balance?有效均衡接收者的負載?
5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不一樣的數據,如何作有效的filter。
6)如何作到可擴展,甚至將這個通訊模塊發到cluster上?
7)如何保證接收者接收到了完整,正確的數據?
AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。
3. 系統架構
RabbitMQ Server:也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數據可以按照指定的方式進行傳輸。可是這個保證也不是100%的保證,可是對於普通的應用來講這已經足夠了。固然對於商業系統來講,能夠再作一層數據一致性的guard,就能夠完全保證系統的一致性了。
Client P:也叫Producer,數據的發送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload顧名思義就是傳輸的數據。label是exchange的名字或者說是一個tag,它描述了payload,並且RabbitMQ也是經過這個label來決定把這個Message發給哪一個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。
Client C: 也叫Consumer,數據的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比做是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer。固然可能會把同一個Message發送給不少的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來講,它是不知道誰發送的這個信息的。就是協議自己不支持。可是固然了若是Producer發送的payload包含了Producer的信息就另當別論了。
對於一個數據從Producer到Consumer的正確傳遞,還有三個概念須要明確:exchanges, queues and bindings。
Exchanges are where producers publish their messages.
Queues are where the messages end up and are received by consumers
Bindings are how the messages get routed from the exchange to particular queues.
還有幾個概念是上述圖中沒有標明的,那就是Connection(鏈接),Channel(通道,頻道)。
Connection:就是一個TCP的鏈接。Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的。之後咱們能夠看到,程序的起始處就是創建這個TCP鏈接。
Channels:虛擬鏈接。它創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。也就是說,通常狀況是程序起始創建TCP鏈接,第二步就是創建這個Channel。
那麼,爲何使用Channel,而不是直接使用TCP鏈接?
對於OS來講,創建和關閉TCP鏈接是有代價的,頻繁的創建關閉TCP鏈接對於系統的性能有很大的影響,並且TCP的鏈接數也有限制,這也限制了系統處理高併發的能力。可是,在TCP鏈接中創建Channel是沒有上述代價的。對於Producer或者Consumer來講,能夠併發的使用多個Channel進行Publish或者Receive。有實驗代表,1s的數據能夠Publish10K的數據包。固然對於不一樣的硬件環境,不一樣的數據包大小這個數據確定不同,可是我只想說明,對於普通的Consumer或者Producer來講,這已經足夠了。若是不夠用,你考慮的應該是如何細化split你的設計。
Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的惟一的線路。
4.基本概念
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket連接,它封裝了socket協議相關部分邏輯。ConnectionFactory爲Connection的製造工廠。
Channel是咱們與RabbitMQ打交道的最重要的一個接口,咱們大部分的業務操做是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發佈消息等。
Queue
Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,用下圖表示。
queue
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)能夠從Queue中獲取消息並消費。
多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。
Message acknowledgment
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。
這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯。
另外pub message是沒有ack的。
Message durability
若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。
Prefetch count
前面咱們講到若是有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時若是每一個消息的處理時間不一樣,就有可能會致使某些消費者一直在忙,而另一些消費者很快就處理完手頭工做並一直空閒的狀況。咱們能夠經過設置prefetchCount來限制Queue每次發送給每一個消費者的消息數,好比咱們設置prefetchCount=1,則Queue每次給每一個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。
Exchange
在上一節咱們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的狀況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。
Exchange是按照什麼邏輯將消息路由到Queue的?這個將在Binding一節介紹。
RabbitMQ中的Exchange有四種類型,不一樣的類型有着不一樣的路由策略,這將在Exchange Types一節介紹。
routing key
生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,而這個routing key須要與Exchange Type及binding key聯合使用才能最終生效。
在Exchange Type與binding key固定的狀況下(在正常使用時通常這些內容都是固定配置好的),咱們的生產者就能夠在發送消息給Exchange時,經過指定routing key來決定消息流向哪裏。
RabbitMQ爲routing key設定的長度限制爲255 bytes。
Binding
RabbitMQ中經過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
Binding key
在綁定(Binding)Exchange與Queue的同時,通常會指定一個binding key;消費者將消息發送給Exchange時,通常會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。
在綁定多個Queue到同一個Exchange的時候,這些Binding容許使用相同的binding key。
binding key 並非在全部狀況下都生效,它依賴於Exchange Type,好比fanout類型的Exchange就會無視binding key,而是將消息路由到全部綁定到該Exchange的Queue。
Exchange Types
RabbitMQ經常使用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規範裏還提到兩種Exchange Type,分別爲system與自定義,這裏不予以描述),下面分別進行介紹。
fanout
fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。
上圖中,生產者(P)發送到Exchange(X)的全部消息都會路由到圖中的兩個Queue,並最終被兩個消費者(C1與C2)消費。
direct
direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。
以上圖的配置爲例,咱們以routingKey=」error」發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);若是咱們以routingKey=」info」或routingKey=」warning」來發送消息,則消息只會路由到Queue2。若是咱們以其餘routingKey發送消息,則消息不會路由到這兩個Queue中。
topic
前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定:
routing key爲一個句點號「.」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」 binding key與routing key同樣也是句點號「. 」分隔的字符串
binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)
以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。
headers
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對;若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該頗有用武之地),因此不作介紹。
RPC
MQ自己是基於異步的消息處理,前面的示例中全部的生產者(P)將消息發送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,咱們極可能須要一些同步處理,須要同步等待服務端將個人消息處理完成後再進行下一步處理。這至關於RPC(Remote Procedure Call,遠程過程調用)。
在RabbitMQ中也支持RPC。
RabbitMQ中實現RPC的機制是:
客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨着消息一塊兒發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成後將通知個人消息發送到這個Queue中)和correlationId(這次請求的標識號,服務器處理完成後須要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗)
服務器端收到消息並處理
服務器端處理完消息後,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
客戶端以前已訂閱replyTo指定的Queue,從中收到服務器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理
5. 細節闡明
使用ack確認Message的正確傳遞
默認狀況下,若是Message 已經被某個Consumer正確的接收到了,那麼該Message就會被從queue中移除。固然也可讓同一個Message發送到不少的Consumer。
若是一個queue沒被任何的Consumer Subscribe(訂閱),那麼,若是這個queue有數據到達,那麼這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被當即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。
那麼什麼是正確收到呢?經過ack。每一個Message都要被acknowledged(確認,ack)。咱們能夠顯示的在程序中去ack,也能夠自動的ack。若是有數據沒有被ack,那麼RabbitMQ Server會把這個信息發送到下一個Consumer。
若是這個app有bug,忘記了ack,那麼RabbitMQ Server不會再發送數據給它,由於Server認爲這個Consumer處理能力有限。
並且ack的機制能夠起到限流的做用(Benefitto throttling):在Consumer處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的balance Consumer的load。
固然對於實際的例子,好比咱們可能會對某些數據進行merge,好比merge 4s內的數據,而後sleep 4s後再獲取數據。特別是在監聽系統的state,咱們不但願全部的state實時的傳遞上去,而是但願有必定的延時。這樣能夠減小某些IO,並且終端用戶也不會感受到。
Reject a message
有兩種方式,第一種的Reject可讓RabbitMQ Server將該Message 發送到下一個Consumer。第二種是從queue中當即刪除該Message。
Creating a queue
Consumer和Procuder均可以經過 queue.declare 建立queue。對於某個Channel來講,Consumer不能declare一個queue,卻訂閱其餘的queue。固然也能夠建立私有的queue。這樣只有app自己纔可使用這個queue。queue也能夠自動刪除,被標爲auto-delete的queue在最後一個Consumer unsubscribe後就會被自動刪除。那麼若是是建立一個已經存在的queue呢?那麼不會有任何的影響。須要注意的是沒有任何的影響,也就是說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是queue的屬性並不會被修改。
那麼誰應該負責建立這個queue呢?是Consumer,仍是Producer?
若是queue不存在,固然Consumer不會獲得任何的Message。可是若是queue不存在,那麼Producer Publish的Message會被丟棄。因此,仍是爲了數據不丟失,Consumer和Producer都try to create the queue!反正無論怎麼樣,這個接口都不會出問題。
queue對load balance的處理是完美的。對於多個Consumer來講,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不一樣的Consumer。
Exchanges
從架構圖能夠看出,Procuder Publish的Message進入了Exchange。接着經過「routing keys」, RabbitMQ會找到應該把這個Message放到哪一個queue裏。queue也是經過這個routing keys來作的綁定。
有三種類型的Exchanges:direct, fanout,topic。每一個實現了不一樣的路由算法(routing algorithm)。
Direct exchange: 若是 routing key 匹配, 那麼Message就會被傳遞到相應的queue中。其實在queue建立時,它會自動的以queue的名字做爲routing key來綁定那個exchange。
Fanout exchange: 會向響應的queue廣播。
Topic exchange:對key進行模式匹配,好比ab能夠傳遞到全部ab的queue。
Virtual hosts
每一個virtual host本質上都是一個RabbitMQ Server,擁有它本身的queue,exchagne,和bings rule等等。這保證了你能夠在多個不一樣的application中使用RabbitMQ。