AMQP協議詳解與RabbitMQ,MQ消息隊列的應用場景,如何避免消息丟失等消息隊列常見問題

什麼是AMQP?

異步通信中,消息不會馬上到達接收方,而是被存放到一個容器中,當知足必定的條件以後,消息會被容器發送給接收方,這個容器即消息隊列,而完成這個功能須要雙方和容器以及其中的各個組件遵照統一的約定和規則,
AMQP就是這樣的一種協議,消息發送與接受的雙方遵照這個協議能夠實現異步通信。這個協議約定了消息的格式和工做方式。

AMQP 中包含的主要元素

生產者Producer):Exchange發佈消息的應用。html

消費者Consumer):從消息隊列queue中消費消息的應用。git

消息隊列Message Queue):服務器組件,用於保存消息,直到發送給消費者。正則表達式

Queue:消息載體;每一個消息都會被投入到一個或多個隊列服務器

消息Message):傳輸的內容。網絡

交換器exchange:路由組件,接收Producer發送的消息,並根據Routing Key轉發消息隊列queueapp

Routing Key:路由關鍵字,exchange根據這個Routing Key進行消息投遞到隊列queue異步

 

虛擬主機Virtual Host: 用做不一樣用戶權限分離;一批交換器,消息隊列和相關對象。虛擬主機是共享相同身份認證和加密環境的獨立服務器域。vhost 能夠理解爲虛擬 broker ,即 mini-RabbitMQ server。其內部均含有獨立的 queue、exchange 和 binding 等,但最最重要的是,其擁有獨立的權限系統,能夠作到 vhost 範圍的用戶控制。固然,從 RabbitMQ 的全局角度,vhost 能夠做爲不一樣權限隔離的手段(一個典型的例子就是不一樣的應用能夠跑在不一樣的權限的 vhost 中) 分佈式

Broker AMQP服務端稱爲Broker性能

鏈接Connection:一個網絡鏈接,好比TCP/IP套接字鏈接;應用程序Rabbit之間創建鏈接的管理器,程序代碼中使用ConnectionFactory(鏈接管理器)。 ui

信道Channel:消息通道,在客戶端的每一個Connection鏈接裏,可創建多個channel,每一個channel表明一個會話任務多路複用鏈接中的一條獨立的雙向數據流通道,爲會話提供物理傳輸介質。

綁定器Binding:把exchangequeue按照路由規則綁定起來。

 

exchange Queue 的路由機制

生產者在發送消息時,都須要指定一個RoutingKeyExchange,Exchange在接到該RoutingKey之後,會判斷該ExchangeType,而後轉發到對應的Queue中;

生產者發消息不須要指定Queue,消費者能夠指定Queue綁定到某個RoutingKey和某個Exchange,也能夠不指定Queue,就只根據某個Exchange和某個RoutingKey接受到消息

 

 

Exchange 將消息發送到哪個queue是由exchange type 和 Binding綁定規則決定的,目前經常使用的有3exchangeDirect exchange, Fanout exchange, Topic exchange :

 

  1. Direct exchange 直接轉發路由,其實現原理是會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue 

 

  2. Fanout exchange 複製分發路由,該路由不須要RoutingKey,會將消息發送給全部與Exchange 定義過Binding全部Queues中去,實際上是一種廣播行爲 

 

  3. topic exchange 通配路由,是direct exchange通配符模式,消息中的RoutingKey能夠寫成通配的模式,exchange支持#*的通配。收到消息後,將消息轉發給全部符合匹配正則表達式的Queue 

 

TopicExchange的匹配符號:

 

#:匹配多個

 

*: 匹配一個

 

 

須要注意的一點只有queue具備保存消息的功能,exchange能保存消息。

 

 

RabbitMQ中一個核心的原則是,消息不能直接投遞到Queue中。Producer只能將本身的消息投遞到Exchange中,由Exchange按照路由規則將消息投遞到對應的Queue中。

 

Consumer中,聲明本身對哪一個Exchange感興趣,並將本身的Queue綁定到本身感興趣的路由關鍵字上,創建相應的映射關係;第二,在Producer中,將消息投遞一個Exchange中,並指明它的路由關鍵字

 

 

AMQP 如何實現通訊的

(1)創建鏈接Connection。由producerconsumer分別鏈接broker的物理節點上。 

(2)創建消息ChannelChannel是創建在Connection之上的,一個Connection能夠創建多個Channelproducer鏈接Virtual Host 創建ChannelConsumer鏈接到相應的queue上創建Channel 

(3)發送消息。由Producer發送消息到Broker中的exchange中。 

(4)路由轉發。exchange收到消息後,根據必定的路由策略routing key,將消息轉發到相應的queue中去。 

(5)消息接收。Consumer會監聽相應的queue,一旦queue中有能夠消費的消息,queue就將消息發送給Consumer端。 

(6)消息確認。當Consumer完成某一條消息的處理以後,須要發送一條ACK消息給對應的QueueQueue收到ACK信息後,纔會認爲消息處理成功,並將消息從Queue移除若是在對應的Channel斷開Queue沒有收到這條消息的ACK信息,該消息將被髮送給另外Channel 至此一個消息的發送接收流程走完了。消息的確認機制提升了通訊的可靠性

 

消息隊列的使用大概過程

(1)客戶端鏈接Connection到消息隊列服務器Broker,打開一個channel
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
(5)客戶端投遞消息到exchange

 

RabbitMQ中 exchange、route、queue的關係

MessageQueue、Exchange和Binding構成了AMQP協議的核心。

  聲明MessageQueue 

 

  在Rabbit MQ中,不管是生產者發送消息仍是消費者接受消息,都首先須要聲明一個MessageQueue。這就存在一個問題,是生產者聲明仍是消費者聲明呢?要解決這個問題,首先須要明確:

 

  a)消費者是沒法訂閱或者獲取不存在的MessageQueue中信息。

 

  b)消息被Exchange接受之後,若是沒有匹配的Queue,則會被丟棄。

 

  在明白了上述兩點之後,就容易理解若是是消費者去聲明Queue,就有可能會出如今聲明Queue以前,生產者已發送的消息被丟棄的隱患。若是應用可以經過消息重發的機制容許消息丟失,則使用此方案沒有任何問題。可是若是不能接受該方案,這就須要不管是生產者仍是消費者,在發送或者接受消息前,都須要去嘗試創建消息隊列。

 

  

  (重點) 這裏有一點須要明確

  • 若是客戶端嘗試創建一個已經存在的消息隊列,Rabbit MQ不會作任何事情,並返回客戶端創建成功的,因此一個隊列若是已經存在了,好比消費者若是再次嘗試創建已存在的隊列,是無效的
  • 好比,你經過SpringBoot程序已經創建了一個queueA,再經過另外一個SpringBoot程序想要更改其queue屬性,好比設置隊列持久化durable=="true",就再次創建了一個queueA設置屬性,是無效

 

       若是一個消費者在一個信道中正在監聽某一個隊列的消息,Rabbit MQ是不容許該消費者在同一個channel去聲明其餘隊列的。Rabbit MQ中,能夠經過queue.declare命令聲明一個隊列,能夠設置該隊列如下屬性:

 

a) Exclusive:排他隊列,若是一個隊列被聲明爲排他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點:其一,排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是能夠同時訪問同一個鏈接建立的排他隊列的。其二,「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣。其三,即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。

 

b)   Auto-delete:自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列

 

 c)   Durable:持久化

 

d)  其餘選項,例如若是用戶僅僅想查詢某一個隊列是否已存在,若是不存在,不想創建該隊列,仍然能夠調用queue.declare,只不過須要將參數passive設爲true,傳給queue.declare,若是該隊列已存在,則會返回true;若是不存在,則會返回Error,可是不會建立新的隊列。

 

  

 

exchange Queue 的路由機制

生產者在發送消息時,都須要指定一個RoutingKeyExchange,Exchange在接到該RoutingKey之後,會判斷該ExchangeType,而後轉發到對應的Queue中,因此發消息不須要指定Queue,彷佛消費者能夠指定Queue綁定到某個RoutingKey和某個Exchange,也能夠不指定Queue,就只根據某個Exchange和某個RoutingKey接受到消息

 

exchange 將消息發送到哪個queue是由exchange type 和 Binding綁定規則決定的,目前經常使用的有3exchangeDirect exchange, Fanout exchange, Topic exchange  

 

Direct exchange 直接轉發路由,其實現原理是會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue 

 

Fanout exchange 複製分發路由,該路由不須要RoutingKey,會將消息發送給全部與該 Exchange 定義過Binding全部Queues中去,實際上是一種廣播行爲 

 

topic exchange 通配路由,是direct exchange通配符模式,消息中的RoutingKey能夠寫成通配的模式,exchange支持#*的通配。收到消息後,將消息轉發給全部符合匹配正則表達式的Queue 

 

須要注意的一點只有queue具備保存消息的功能,exchange能保存消息。

 

AMQP的應用場景

 

AMQP是實現消息機制的一種協議,消息隊列主要有如下幾種應用場景

 

異步處理

 
跨系統的異步通訊;好比公司新入職一個員工,須要開通系統帳號,有幾件事情要作,開通系統帳號,發短信通知用戶,發郵件給員工,在公司內部通信系統中發送消息給員工。其中發短信,發郵件,發內部通信系統消息,這三件事情能夠串行也能夠並行並行的好處就是能夠提升效率,這時能夠應用MQ來實現並行;若是不使用MQ,那麼開通系統帳號的服務就要依次調用發短信服務、發郵件服務、發內部通信系統消息,之後若是還要分配git帳號,那又要在開通系統帳號的服務裏添加代碼,若是使用MQ那麼開通帳號後發送消息到MQ,對應訂閱的消費者消費就好了。
異步處理不須要返回值的耗時操做:https://www.cnblogs.com/theRhyme/p/10796009.html
 

應用解耦

在公司內部系統中,有人事系統,OA系統,財務系統,外圍應用系統等等,當人事發生變更的時候(離職入職調崗),人事系統須要將這些變更通知給其餘系統,這時只需人事系統發送一條消息,各個外圍系統訂閱該消息,就可得知人事變更,與實時服務調用相比,若是人事系統掛掉,各個外圍系統不會受到影響,繼續運行;若是是實時服務調用,好比人事系統被各個服務調用,人事系統掛了,調用人事系統的服務都會受到影響
 

死信隊列

重要的業務隊列若是掛了,能夠被從新路由到死信隊列進行處理。
 

分佈式事務

RocketMQ TODO待寫
 

流量緩衝

 
在有些流量會瞬間暴增的場景下,如秒殺,爲了防止流量忽然增大而使得應用掛掉,能夠引入MQ,將請求存入MQ中,若是超過了MQ的長度,就把請求丟棄掉,這樣來限制流量
 

日誌處理

 
將消息隊列引入到日誌處理中,如kafka的應用,解決了大量日誌的傳輸問題。日誌客戶端負責採集日誌數據,並按期寫入kafka隊列,kafka負責接收存儲轉發日誌,日誌處理系統訂閱並消費kafka中的日誌數據
 


SpringBoot+RabbitMQ的簡單demo

 https://www.cnblogs.com/theRhyme/p/10071781.html

 

RabbitMQ死信隊列的應用場景和代碼實現

http://www.javashuo.com/article/p-njifrvnw-mn.html

 

RabbitMQ延遲隊列代碼實現和應用場景

場景: 訂單下單30min若是沒有付款就刪除該訂單

經過消息過時後進入死信交換器,再由交換器轉發到延遲消費隊列(重定向隊列),實現延遲功能;

使用 rabbitmq_delayed_message_exchange 插件實現延遲功能。

代碼:http://www.javashuo.com/article/p-rrimoagi-mo.html

 

rabbitmq 怎麼避免消息丟失?

  • 生產者Confirm機制(異步,推薦)或是事務方式(同步,不推薦)
  • MQ服務端將消息持久化
  • 消費者給MQ回覆ACK,確認機制
  • MQ服務端設置集羣鏡像模式
  • 消費者消費消息補償機制(如死信隊列)

 

若是生產者弄丟了數據

 

RabbitMQ 生產者將數據發送到 RabbitMQ 的時候,可能數據在網絡傳輸中搞丟了,這個時候 RabbitMQ 收不到消息,消息就丟了。

RabbitMQ 提供了兩種方式來解決這個問題:

 

事務方式在生產者發送消息以前,經過`channel.txSelect`開啓一個事務,接着發送消息。

若是消息沒有成功被 RabbitMQ 接收到,生產者會收到異常,此時就能夠進行事務回滾`channel.txRollback`,而後從新發送。假如 RabbitMQ 收到了這個消息,就能夠提交事務`channel.txCommit`。

可是這樣一來,生產者的吞吐量和性能都會下降不少,如今通常不這麼幹

 

另一種方式就是經過 Confirm 機制這個 Confirm 模式是在生產者那裏設置的,就是每次發消息的時候會分配一個惟一的 ID,而後 RabbitMQ服務端 收到以後會回傳一個 ACK,告訴生產者這個消息 OK 了。

若是 RabbitMQ 沒有處理到這個消息,那麼就回調一個 Nack 的接口,這個時候生產者就能夠重發

 

事務機制和 Confirm 機制最大的不一樣在於事務機制是同步的,提交一個事務以後會阻塞在那兒。 

可是 Confirm 機制是異步的,發送一個消息以後就能夠發送下一個消息,而後那個消息 RabbitMQ 接收了以後會異步回調你一個接口通知你這個消息接收到了。

因此通常在生產者這塊避免數據丟失,都是用 Confirm 機制的。

 

要保證消息持久化成功的條件有哪些?

  • 聲明隊列必須設置持久化 durable 設置爲 true.
  • 消息推送投遞模式必須設置持久化,deliveryMode 設置爲 2(持久)。
  • 消息已經到達持久化交換器
  • 消息已經到達持久化隊列

以上四個條件都知足才能保證消息持久化成功。

 

rabbitmq 持久化有什麼缺點?

持久化的缺點就是下降了服務器的吞吐量,由於使用的是磁盤非內存存儲,從而下降了吞吐量。可儘可能使用 ssd 硬盤來緩解吞吐量的問題。

 

RabbitMQ如何保證同一個隊列中的消息被順序消費?

TODO待寫

 

 

來源:

http://www.javashuo.com/article/p-zcdowiti-gw.html

http://www.javashuo.com/article/p-qalwxbqb-hh.html

http://www.javashuo.com/article/p-sahvupop-hq.html

http://techblog.ppdai.com/2018/07/17/20180717/

https://www.toutiao.com/a6698312611185820171/?timestamp=1559696015&app=news_article&group_id=6698312611185820171&tdsourcetag=s_pctim_aiomsg&req_id=2019060508533401002506701591332CD

https://mp.weixin.qq.com/s?__biz=MjM5ODI5Njc2MA==&mid=2655825391&idx=1&sn=f7523195ff08a51085012c736bc002a8&chksm=bd74e0388a03692e49ca3967a03dd2e8164e02741a75cabc8bd56e5b70ba6f2cc19f6fe10fd2&scene=0&xtrack=1&key=1c855a3d2871be72b53c28efecdb6c847aa5d9daffdac4207cc93d6a62948c3ac03b6e8813a35eaa72a54f7668de41b31fb1265ff3066312574ca210769ad2b726b9932dd21a296f9ea91fd6cf367dd7&ascene=1&uin=ODEzMzE3OTc%3D&devicetype=Windows+10&version=62060833&lang=zh_CN&pass_ticket=ZLGuBJ0cY2BIuQpqK%2Be08dQVFm3Htt7htVVelbWP8XE%3D

相關文章
相關標籤/搜索