RabbitMQ的應用總結

RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民羣衆的需求,雖然在同步消息通信的世界裏有不少公開標準(如 COBAR的 IIOP ,或者是 SOAP 等),可是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),所以,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標準。
RabbitMQ是由RabbitMQ Technologies Ltd開發而且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被併入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不一樣的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,如今並無上市。
RabbitMQ的官網是http://www.rabbitmq.com
百度百科amqp協議介紹https://baike.baidu.com/item/AMQP/8354716?fr=aladdin
注意:RabbitMQ是採用erlang語言開發的,因此必須有Erlang環境才能夠運行html

Erlang  (高併發應用) 數據庫

Erlang編程語言最初目的是進行大型電信交換設備的軟件開發,是一種適用於大規模並行處理環境的高可靠性編程語言。隨着多核處理器技術的日漸普及,以及互聯網、雲計算等技術的發展,該語言的應用範圍也有逐漸擴大之勢。編程

百度百科介紹:https://baike.baidu.com/item/Erlang%E8%AF%AD%E8%A8%80/20864044?fr=aladdin緩存

 

比較圖示:  初衷理念實現抗高併發語言服務器

 

 

 

不一樣的項目 不一樣的 路徑,獨立的virtualhost,相互進行隔離:  客戶端鏈接時候須要指定virtual host地址 。併發

更加解耦 相互進行隔離  相似於每一個項目都有不一樣的數據庫同樣。異步

 

 

 

添加virtual host編程語言

指定某個用戶的 Virtual Host分佈式

 

 

AMQP(高級消息隊列協議)是一個異步消息傳遞所使用應用層協議規範,爲面向消息中間件設計,基於此協議的客戶端與消息中間件能夠無視消息來源傳遞消息,不受客戶端、消息中間件、不一樣的開發語言環境等條件的限制;高併發

涉及概念解釋: 
Server(Broker):接收客戶端鏈接,實現AMQP協議的消息隊列和路由功能的進程;
Virtual Host:虛擬主機的概念,相似權限控制組,一個Virtual Host裏能夠有多個Exchange和Queue。   
Exchange:交換機,接收生產者發送的消息,並根據Routing Key將消息路由到服務器中的隊列Queue。
ExchangeType:交換機類型決定了路由消息行爲,RabbitMQ中有三種類型Exchange,分別是fanout、direct、topic;
Message Queue:消息隊列,用於存儲還未被消費者消費的消息;
Message:由Header和body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、優先級是多少、由哪一個Message Queue接收等;body是真正須要發送的數據內容;
BindingKey:綁定關鍵字,將一個特定的Exchange和一個特定的Queue綁定起來。

 

RabbitMQ幾種工做模式:

  RabbitMQ點對點模式:

一、點對點模式  一對一模式。  一個生產者投遞消息給隊列 只能容許有一個消費者進行消費    若是集羣的話 會進行均攤消費   服務器配置不同 均攤就不優了 

     

長鏈接 不用三次握手之類的 提升傳輸效率     可是長鏈接佔服務器帶寬

推:  消費者已經啓動了,創建長鏈接,一旦生產者向隊列投遞消息會立馬推送給消費者

取: 生產者先投遞消息隊列進行緩存,這時候消費者再次啓動時候 ,就會向隊列獲取消息。

點對點模式代碼: RabbitMQ整合Spring Booot點對點模式

 

RabbitMQ手動模式自動應答模式

   1.了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收而且處理完畢了。RabbitMQ就能夠刪除它了。
   2. 若是一個消費者掛掉卻沒有發送應答,RabbitMQ會理解爲這個消息沒有處理徹底,而後交給另外一個消費者去從新處理。這樣,你就能夠確認即便消費者偶爾掛掉也不會丟失任何消息了。
   3. 沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ纔會從新投遞。即便處理一條消息會花費很長的時間。
   4. 消息應答是默認打開的。咱們經過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦咱們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,能夠從內存刪除。若是消費者因宕機或連接失敗等緣由沒有發送ACK(不一樣於ActiveMQ,在RabbitMQ裏,消息沒有過時的概 念),則RabbitMQ會將消息從新發送給其餘監聽在隊列的下一個消費者。

 

應答模式:

自動應答: 不在意消費者對消息處理是否成功,都會告訴隊列刪除消息。若是處理消息失敗,實現自動補償(隊列投遞過去 從新處理)。

手動應答: 消費者處理完業務邏輯,手動返回ack(通知)告訴隊列處理完了,隊列進而刪除消息。

 

實現思路:

  • 生產者端代碼不變,消費者端代碼這部分就是用於開啓手動應答模式的。

               channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
         注:第二個參數值爲false表明關閉RabbitMQ的自動應答機制,改成手動應答。

  • 在處理完消息時,返回應答狀態,true表示爲自動應答模式。

              channel.basicAck(envelope.getDeliveryTag(), false);

 

自動應答: 不在意消費者對消息處理是否成功,都會告訴隊列刪除消息。若是處理消息失敗,實現自動補償(隊列投遞過去 從新處理)。

手動應答: 消費者處理完業務邏輯,手動返回ack(通知)告訴隊列處理完了,隊列進而刪除消息。

RabbitMQ整合Spring Booot【消費者應答模式】

 

RabbitMQ隊列形式

  公平隊列

  •      目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工做量很小,形成的緣由就是近當消息到達隊列進行轉發消息。並不在意有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發全部的奇數給一個消費者,偶數給另外一個消費者。
  •      爲了解決這樣的問題,咱們可使用basicQos方法,傳遞參數爲prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。

 總結:

      只有在 消費者空閒的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完畢並本身對剛剛處理的消息進行確認以後,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去悠閒。

 

 公平隊列原理:隊列服務器向消費者發送消息的時候,消費者採用手動應答模式,隊列服務器必需要收到消費者發送ack結果通知,纔會發送下一個消息。(快的處理的多,消費的多)

 

使用背景:

  • 服務器能力不一樣,能者多勞。 均攤模式的話,都處理相同數量的
  •  消息隊列 發出去的消息被消費完了 而後收到 ack包 才能夠繼續發給他

 

經過 設置channel.basicQos(1); 開發:   RabbitMQ整合Spring Booot【公平隊列】

 

RabbitMQ死信隊列

關於RabbitMQ死信隊列

  • 死信隊列 聽上去像 消息「死」了  。其實也有點這個意思,死信隊列 是當消息在一個隊列,由於下列緣由: 
  1. 消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=false
  2. 消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration()),消息TTL過時。
  3. 隊列超載,隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)

 變成了 「死信」 後 ,被從新投遞(publish)到另外一個Exchange ,該Exchange 就是DLX 。

 而後該Exchange 根據綁定規則,轉發到對應的 隊列上, 監聽該隊列 ,就能夠從新消費。

 說白了 就是 沒有被消費的消息 ,換個地方從新被消費

 

生產者   -->  消息 --> 交換機  --> 隊列  --> 變成死信  --> DLX交換機 -->隊列 --> 消費者

 

應用場景分析:

在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了

死信隊列 聽上去像 消息「死」了 ,其實也有點這個意思,
死信隊列 是 當消息在一個隊列 由於下列緣由:
1.消息被拒絕(basic.reject或basic.nack)而且requeue=false.
2.消息TTL過時
3.隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)

應用場景分析:

 

  •  在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了

                   channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息

  • 若是高併發狀況到來  某一個隊列好比郵件隊列滿了 或者異常  或者消息過時 或者消費者拒絕消息

如何使用死信交換機呢?

定義業務(普通)隊列的時候指定參數

  1. x-dead-letter-exchange: 用來設置死信後發送的交換機
  2. x-dead-letter-routing-key:用來設置死信的routingKey

 

 

 

1. 郵件隊列 綁定一個死信交換機 ,一旦郵件隊列滿了的狀況下 ,爲了防止數據丟失狀況  ,消息再也不郵件隊列存放了,放到死信交換機。

2.而後交給私信郵件隊列。

3.最終交給死信消費者。

 

 步驟:

  1.   建立死信交換機 、死信隊列、 而且綁定
  2.   以前的隊列沒有綁定死信隊列和死信交換機 不能作更改綁定死信交互機
  3.   以前建立好的郵件隊列 刪除掉  已經建立好的隊列不能作更改  交換機也清理掉

RabbitMQ整合Spring Booot【死信隊列】

 

Exchange

 這個多是消息隊列中最重要的隊列了,其餘的都是在它的基礎上進行了擴展。

 

 關於交換機:

  1. 生產者發送消息不會向傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列在將消息以推送或者拉取方式給消費者進行消費,這和咱們以前學習Nginx有點相似。交換機的做用根據具體的路由策略分發到不一樣的隊列中。

 

交換機有四種類型:

  1.  Direct exchange(直連交換機):是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。
  2.  Fanout exchange(扇型交換機):將消息路由給綁定到它身上的全部隊列。
  3.  Topic exchange(主題交換機):隊列經過路由鍵綁定到交換機上,而後,交換機根據消息裏的路由值,將消息路由給一個或多個綁定隊列。
  4.  Headers exchange(頭交換機):相似主題交換機,可是頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。 

 

過程:

一個生產者發送消息  ----> 到交換機  ---->  到隊列(每一個隊列綁定到交換機上)  ----> 到消費者(每一個消費者有本身的隊列)

 

功能實現:一個生產者發送消息,多個消費者獲取消息(一樣的消息), 包括:

  •    一個生產者
  •   一個交換機
  •   多個隊列
  •   多個消費者

   

1. DirectExchange :

   (1) 路由key模式:

場景

  • 生產者發送消息到交換機並指定一個路由key,
  • 消費者隊列綁定到交換機時要指定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

例如

     咱們能夠把路由key設置爲insert ,那麼消費者隊列key指定包含insert才能夠接收消息,消費者隊列key定義爲update或者delete就不能接收消息。很好的控制了更新,插入和刪除的操做。

注: 

    前面作的demo中RoutingKey設置的空。

 


代碼實現:

RoutingKey有值的時候,那麼 通過消息隊列以後,須要在通過RoutingKey進行判斷決定消費者。 

RabbitMQ整合Spring Booot【Exchange-路由key模式】

(1) Topics模式:

  •    此模式實在路由key模式的基礎上,使用了通配符來管理消費者接收消息。
  •    生產者P發送消息到交換X,type=topic。交換機根據綁定隊列的routing key的值進行通配符匹配。

                      -    符號#:匹配一個或者多個詞lazy.# 能夠匹配lazy.irs或者lazy.irs.cor。

                      -    符號*:只能匹配一個詞lazy.* 能夠匹配lazy.irs或者lazy.cor。

 

 關於 通配符 「,」 和 「#」 的使用: RabbitMQ整合Spring Booot【Exchange-Topics模式】

 

 

 (3)Fanout Exchange:

RabbitMQ發佈與訂閱原理 Exchange Fanout模式:

案例: 用戶註冊 ---> 發送郵件 --->發送短信

 

 

RabbitMQ整合Spring Booot【Exchange-Fanout模式】

 

RabbitMQ生產者事務確認機制 

  背景:

       生產者發送消息出去以後,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的。並且有的時候咱們在發送消息以後,後面的邏輯出問題了,咱們不想要發送以前的消息了,須要撤回該怎麼作?

解決方案:

  AMQP 事務機制

  事務模式:

  •  txSelect 將當前channel設置爲transaction模式 (開啓事務)
  •  txCommit 提交當前事務   (提交事務)
  • txRollback 事務回滾   (回滾事務)

注: RabbitMQ支持消息持久化機制,把消息持久化到硬盤上。若是RabbitMQ服務器宕機了,消息不會丟失。

RabbitMQ整合Spring Booot【生產者事務確認機制】

 

 方案二:Confirm 模式

  隊列和消費者創建長鏈接,推送或者拉取形式。

  消費者經過自動應答或者手動應答,隊列服務器等待應答結果,若是沒有應答結果那麼保留給下一個消費者。 

 

RabbitMQ消費者重試機制 

  消費者運行報錯時候,會進行重試。

RabbitMQ整合Spring Booot【消費者補償冪等問題】

 

RabbitMQ解決分佈式事務問題:

RabbitMQ解決分佈式事務

 

小結:

1.點對點(簡單)的隊列2.工做(公平性)隊列模式3.發佈訂閱模式4.路由模式Routing(Direct)5.通配符模式Topics

相關文章
相關標籤/搜索