以前有個打算在學習RabbitMQ以前,把AMQP詳細閱讀一次,挑出裏面的重點內容。後來找了下RabbitMQ的官方文檔,發現了有一篇文檔專門介紹了RabbitMQ中實現的AMQP模型部分,因而直接基於此文檔和我的理解寫下這篇文章。算法
AMQP全稱是Advanced Message Queuing Protocol,它是一個(分佈式)消息傳遞協議,使用和符合此協議的客戶端可以基於使用和符合此協議的消息傳遞中間件代理(Broker,也就是經紀人,我的感受叫代理合口一些)進行通訊。AMQP目前已經推出協議1.0,實現此協議的比較知名的產品有StormMQ、RabbitMQ、Apache Qpid等。RabbitMQ實現的AMQP版本是0.9.1,官方文檔中也提供了該協議pdf文本下載,有興趣能夠翻閱一下。編程
Messaging Broker,這裏稱爲消息中間件代理。它的職責是從發佈者(Publisher,或者有些時候稱爲Producer,生產者)接收消息,而後把消息路由到消費者(Consumer,或者有些時候稱爲Listener,監聽者)。數組
由於消息中間件代理、發佈者客戶端和消費者客戶端都是基於AMQP這一網絡消息協議,因此消息中間件代理、發佈者客戶端和消費者客戶端能夠在不一樣的機器上,從而實現分佈式通信和服務解耦。服務器
消息中間件代理不只僅提供了消息接收和消息路由這兩個基本功能,還有其餘高級的特性如消息持久化功能、監控功能等等。網絡
AMQP-0-9-1模型的基本視圖是:消息發佈者消息發佈到交換器(Exchange)中,交換器的角色有點相似於平常見到的郵局或者信箱。而後,交換器把消息的副本分發到隊列(Queue)中,分發消息的時候遵循的規則叫作綁定(Binding)。接着,消息中間件代理向訂閱隊列的消費者發送消息(push模式),或者消費者也能夠主動從隊列中拉取消息(fetch/pull模式)。多線程
發佈者在發佈消息的時候能夠指定消息屬性(消息元數據),某些消息元數據可能由消息中間件代理使用,其餘消息元數據對於消息中間件代理而言是不透明的,僅供消息消費者使用。併發
因爲網絡是不可靠的,客戶端可能沒法接收消息或者處理消息失敗,這個時候消息中間件代理沒法感知消息是否正確傳遞到消費者中,所以AMQP模型提供了消息確認(Message Acknowledgement)的概念:當消息傳遞到消費者,消費者能夠自動向消息中間件代理確認消息已經接收成功或者由應用程序開發者選擇手動確認消息已經接收成功而且向消息中間件代理確認消息,消息中間件代理只有在接收到該消息(或者消息組)的確認通知後纔會從隊列中徹底刪除該消息。app
在某些狀況下,交換器沒法正確路由到隊列中,那麼該消息就會返回給發佈者,或者丟棄,或者若是消息中間件代理實現了"死信隊列(Dead Letter Queue)"擴展,消息會被放置到死信隊列中。消息發佈者能夠選擇使用對應的參數控制路由失敗的處理策略。負載均衡
交互器(Exchange)是消息發送的第一站目的地,它的做用就是就收消息而且將其路由到零個或者多個隊列。路由消息的算法取決於交互器的類型和路由規則(也就是Binding)。RabbitMQ消息中間件代理支持四種類型的交互器,分別是:編程語言
交換器類型 | Broker默認預聲明的交換器 |
---|---|
Direct | (空字符串[(AMQP default)])和amq.direct |
Fanout | amq.fanout |
Topic | amq.topic |
Headers | amq.match (和RabbitMQ中的amq.headers) |
聲明交互器的時候須要提供一些列的屬性,其中比較重要的屬性以下:
之因此存在Durability和Auto-delete特性是由於併發全部的場景和用例都要求交互器是持久化的。
Direct類型的交換器基於消息路由鍵(RoutingKey)把消息傳遞到隊列中。Direct交換器是消息單播路由的理想實現(固然,用於多播路由也能夠),它的工做原理以下:
默認交換器(Default Exchange)是一種特殊的Direct交互器,它的名稱是空字符串(也就是""),它由消息中間件代理預聲明,在RabbitMQ Broker中,它在Web管理界面中的名稱是(AMQP default)
。每一個新建立的隊列都會綁定到默認交換器,路由鍵就是該隊列的隊列名,也就是全部的隊列均可以經過默認交換器進行消息投遞,只須要指定路由鍵爲相應的隊列名便可。
Fanout實際上是一個組合單詞,fan也就是扇形,out就是向外發散的意思,Fanout交換器能夠想象爲"扇形"交換器。Fanout交換器會忽略路由鍵,它會路由消息到全部綁定到它的隊列。也就是說,若是有N個隊列綁定到一個Fanout交換器,當一個新的消息發佈到該Fanout交換器,那麼這條新消息的一個副本會分發到這N個隊列中。Fanout交換器是消息廣播路由的理想實現。
Topic交換器基於路由鍵和綁定隊列和交換器的模式進行匹配從而把消息路由到一個或者多個隊列。綁定隊列和交換器的Topic模式(這個模式串其實就是聲明綁定時候的路由鍵,和消息發佈的路由鍵並不是同一個)通常使用點號(dot,也就是'.')分隔,例如source.target.key
,綁定模式支持通配符:
source.target.#
能夠匹配source.target.doge
、source.target.doge.throwable
等等。能夠匹配
source.target.doge、
source.target.throwable`等等。對每一條消息,Topic交換器會遍歷全部的綁定關係,檢查消息指定的路由鍵是否匹配綁定關係中的路由鍵,若是匹配,則將消息推送到相應隊列。
Topic交換器是消息多播路由的理想實現。
Headers交換器是一種不經常使用的交換器,它使用多個屬性進行路由,這些屬性通常稱爲消息頭,它不使用路由鍵進行消息路由。消息頭(Message Headers)是消息屬性(消息元數據)部分,所以,使用Headers交換器在創建隊列和交換器的綁定關係的時候須要指定一組鍵值對,發送消息到Headers交換器時候,須要在消息屬性中攜帶一組鍵值對做爲消息頭。消息頭屬性支持匹配規則x-match以下:
Headers交換器也是忽略路由鍵的,只依賴於消息屬性中的消息頭進行消息路由。
AMQP 0-9-1模型中的隊列與其餘消息或者任務隊列系統中的隊列很是類似:它們存儲應用程序所使用的消息。隊列和交換器的基本屬性有相似的地方:
一個隊列只有被聲明(Declare)了才能使用,也就是隊列的第一次聲明就是隊列的建立操做(由於第一次聲明的時候隊列並不存在)。若是使用相同的參數再次聲明已經存在的隊列,那麼這次聲明會不生效(固然也不會出現異常)。可是若是使用不相同的參數再次聲明已經存在的隊列,那麼會拋出通道級別的異常,異常代碼是406(PRECONDITION_FAILED)。
隊列名必須由255字節(bytes)長度之內的UTF-8編碼字符組成。實現AMQP 0-9-1規範的消息中間件代理具有自動生成隨機隊列名的功能,也就是在聲明隊列的時候,隊列名指定爲空字符串,那麼消息中間件代理會自動生成一個隊列名,而且在隊列聲明的返回結果中帶上對應的隊列名。
以"amq."開頭的隊列是由消息中間件代理內部生成的,有其特殊的做用,所以不能聲明此類名稱的新隊列,不然會致使通道級別的異常,異常代碼爲403(ACCESS_REFUSED)。
持久化的隊列會持久化到磁盤中,這種隊列在消息中間件代理重啓後不會被刪除。不開啓持久化特性的隊列稱爲瞬時(transient)隊列,並不是全部的場景都須要開啓隊列的持久化特性。
隊列的持久化特性並不意味着路由到它上面的消息是持久化的,也就是隊列的持久化跟消息的持久化是兩回事。若是息中間件代理掛了,它重啓後會從新聲明開啓了持久化特性的隊列,這些隊列中只有使用了消息持久化特性的消息會被恢復。
綁定(Binding)是交換器路由消息到隊列的規則。例如交換器E能夠路由消息到隊列Q,那麼Q必須經過必定的規則綁定到E。綁定中使用的某些交換器的類型決定了它可使用可選的路由鍵(RoutingKey)。路由鍵的做用相似於過濾器,能夠篩選某些發佈到交換器的消息路由到目標隊列。
若是發佈的消息沒有路由到任意一個目標隊列,例如,消息已經發布到交換器,交換器中沒有任何綁定,這個時候消息會被丟棄或者返回給發佈者,取決於消息發佈者發佈消息時候使用的參數。
若是隊列只有發佈者生產消息,那麼是沒有意義的,必須有消費者對消息進行使用,或者叫這個操做爲消息消費,消息消費的方式有兩種:
basic.consume
)。basic.get
)。使用推模式的狀況下,消費者必須指定須要訂閱的隊列。每一個隊列能夠存在多個消費者,或者僅僅註冊一個獨佔的消費者。
每一個消費者(訂閱者)都有一個稱爲消費者標籤(consumer tag)的標識符,消費者標籤是一個字符串。經過消費者標籤能夠實現取消訂閱的操做。
消費者應用程序有可能在接收和處理消息的時候崩潰,也有可能由於網絡緣由致使消息中間件代理投遞消息到消費者的時候失敗了,這樣就會催生一個問題:AMQP消息中間件代理應該在何時從隊列中刪除消息?所以,AMQP 0-9-1規範提供了兩種選擇:
basic.deliver
或basic.get-ok
)。basic.ack
<= 我的感受這個地方少寫了basic.nack
和basic.reject
)前一種稱爲自動確認模型(動做觸發的同時進行了消息確認),後一種稱爲顯式確認模型。顯式確認模型中,須要消費者主動向消息中間件代理進行消息主動確認,這個消息主動確認動做的執行時機徹底由應用程序控制。消息主動確認有三種方式:積極確認(ack)、消極確認(nack)和拒絕(reject)。
預取消息(Prefetching Messages)是一個特性。對於多個消費者共享同一個隊列的狀況,可以告知消息中間件代理在發送下一個確認以前指定每一個消費者一次能夠接收消息的消息量。這個特性能夠理解爲簡單的負載均衡技術,在批量發佈消息的場景下可以提升吞吐量。
AMQP模型中,消息具備屬性值。AMQP 0-9-1規範定義了一些常見的屬性,通常開發人員不須要太關注這些屬性:
這些通用的屬性通常是消息中間件代理使用的,還有能夠定製的可選屬性header,形式是鍵值對,相似於HTTP中的請求頭。消息屬性是在發佈消息的時候設置的。
AMQP消息還有一個有效載荷(payload,其實就是消息數據體),AMQP代理將其視爲不透明的字節數組,也就是AMQP代理不會檢查或者修改消息的有效載荷。有些消息可能只包含屬性而沒有有效負載。一般使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)來序列化和結構化數據,以便將其做爲消息有效負載發佈。在通常約定下,消息屬性中的Content type
和Content encoding
通常能夠代表其序列化的方式。
消息發佈支持消息的持久化特性,消息持久化特性開啓後,消息中間件代理會把消息保存到磁盤中,若是重啓代理消息也不會丟失。開啓消息持久化特性將會影響性能,主要是由於涉及到刷盤操做。
AMQP 0-9-1定義了一些方法,對應了客戶端和消息中間件代理之間交互的一些操做方法,這些操做方法的設計跟面向對象編程語言中的方法沒有任何共同之處。經常使用的交換器相關的操做方法有:
在邏輯上,上面幾個操做方法在客戶端和消息中間件代理之間的交互以下:
對於隊列,也有相似的操做方法:
並不是全部的AMQP操做方法都有響應結果操做方法,像消息發佈方法basic.publish
的使用是最普遍的,此操做方法沒有對應的響應結果操做方法。有些操做方法可能有多個響應結果(操做方法),例如basic.get
。
AMQP的鏈接(Connection)一般是長期存在的。AMQP是一種使用TCP進行可靠傳遞的應用程序級協議。AMQP鏈接使用用戶身份驗證,可使用TLS(SSL)進行保護。當應用程序再也不須要鏈接到AMQP代理時,它應該正常關閉AMQP鏈接,而不是忽然關閉底層TCP鏈接。
某些應用程序須要與AMQP代理程序創建多個鏈接。可是,不但願同時打開許多TCP鏈接,由於這樣作會消耗系統資源並使配置防火牆變得十分困難。通道(Channel)能夠認爲是"共享一個單獨的TCP鏈接的輕量級鏈接",一個AMQP鏈接能夠擁有多個通道。
對於使用了多線程處理的應用程序,有一種使用場景十分廣泛:每一個線程開啓一個新的通道使用,這些通道是線程間隔離的。
另外,每一個特定的通道和其餘通道是相互隔離的,每一個執行的AMQP操做方法(包括響應)都攜帶一個通道的惟一標識,這樣客戶端就能經過該通道的惟一標識得知操做方法是對應哪一個通道發生的。
爲了使單個消息中間件代理能夠託管多個徹底隔離的"環境"(這裏的隔離指的是用戶組、交互器、隊列等),AMQP提供了虛擬主機(Virtual Host)的概念。多個虛擬主機相似於許多主流的Web服務器的虛擬主機,提供了AMQP組件徹底隔離的環境。AMQP客戶端能夠在鏈接消息中間件代理時指定須要鏈接的虛擬主機。
理解RabbitMQ中的AMQP模型,其實從開發者的角度來看,最重要的是Exchange、Queue、Binding三者的關係,這裏談談我的的看法。消息的發佈第一站老是Exchange,從模型上看,消息發佈沒法直接發送到隊列中。Exchange自己不存儲消息,它在接收到消息以後,會基於路由規則也就是Binding,把消息路由到目標Queue中。從實際操做來看,聲明路由規則老是在發佈消息和消費消息以前,也就是通常步驟以下:
咱們最關注的兩個階段,消息發佈和消息消費中,消息發佈實際上只跟Exchange有關,而消息消費實際上只跟Queue有關。Binding實際上就是Exchange和Queue的契約關係,會直接影響消息發佈階段的消息路由。那麼,路由失敗通常是什麼狀況致使的?路由失敗,其實就是消息已經發布到Exchange,而Exchange中從既有的Binding中沒法找到存在的目標Queue用於傳遞消息副本(通常而言,不多人會發送消息到一個不存在的Exchange)。消息路由失敗,從理解AMQP的模型來看,能夠從根本上避免的,除非是消息發佈者故意胡亂使用或者人爲錯誤使用了未存在的RoutingKey、Exchange或者說是Binding關係而致使的。
AMQP-0-9-1模型中支持了四種交換器direct(單播)、fanout(廣播)、topic(多播)、headers,實際上,從使用者角度來看,四種交換器的功能是能夠相互取代的。例如可使用fanout類型交換器實現廣播,其實使用direct類型交換器也是能夠實現廣播的,只是對應的direct類型交換器須要經過多個路由鍵綁定到多個目標隊列中。在面對生產環境的技術選型的時候,咱們須要考慮性能、維護難度、合理性等角度去考慮選擇什麼類型的交換器,就上面的廣播消息的例子,顯然使用fanout類型交換器能夠避免聲明多個綁定關係,這樣在性能、合理性上是更優的選擇。
在AMQP-0-9-1模型中,負載均衡的實現是基於消費者而不是基於隊列(準確來講應該是消息傳遞到隊列的方式)。實際上,出現消息生產速度大大超過消費者的消費速度的時候,隊列中有可能會出現消息積壓。AMQP-0-9-1模型中沒有提供基於隊列負載均衡的特性,也就是出現消息生產速度大大超過消費者的消費速度時候,並不會把消息路由到多個隊列中,而是經過預取消息(Prefetching Messages)的特性,肯定消息者的消費能力,從而調整消息中間件代理推送消息到對應消費者的數量,這樣就可以實現消費速度快的消費者可以消費更多的消息,減小產生有消費者處於飢餓狀態和有消費者長期處於忙碌狀態的問題。
AMQP中提供的消息確認機制主要包括積極確認(通常叫ack,Acknowledgement)、消極確認(通常叫nack,Negative Acknowledgement)和拒絕(reject)。消息確認機制是保證消息不丟失的重要措施,當消費者接收到消息中間件代理推送的消息時候,須要主動通知消息中間件代理消息已經確認投遞成功,而後消息中間件代理纔會從隊列中刪除對應的消息。沒有主動確認的消息就會變爲"nack"狀態,能夠想象爲暫存在隊列的"nack區"中,這些消息不會投遞到消費者,直到消費者重啓後,"nack區"中的消息會從新變爲"ready"狀態,能夠從新投遞給消費者。