RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民羣衆的需求,雖然在同步消息通信的世界裏有不少公開標準(如 COBAR的 IIOP ,或者是 SOAP 等),可是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),所以,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標準。 RabbitMQ是由RabbitMQ Technologies Ltd開發而且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被併入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不一樣的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,如今並無上市。 RabbitMQ的官網是http://www.rabbitmq.com 百度百科amqp協議介紹https://baike.baidu.com/item/AMQP/8354716?fr=aladdin 注意:RabbitMQ是採用erlang語言開發的,因此必須有erlang環境才能夠運行html
點對點模式:一對一消費,一個生產者投遞消息給隊列,只能容許有一個消費者進行消費。spring
注意:若是消費集羣的話,會進行均攤消費。前提是服務器的配置相同。緩存
均攤消費的弊端:假若有2臺服務器分別爲A、B。若是每一個消費處理消息的業務時間不相同的狀況下,可能對消費者處理慢的服務器不公平(服務器壓力大),A處理比B處理時間快,應該A處理的消息多一些,B處理的消息少一些才合理。springboot
隊列以先進先出原則進行存放消息集合。生產者投遞消息到隊列中。服務器
當消費者啓動的時候,會與隊列服務器創建長鏈接,當生產者有消息投遞到隊列的時候,隊列會馬上將消息通知給消費者進行消費。網絡
長鏈接的好處:若是是短連接的話,每次訪問都須要創建鏈接,比較佔內存。創建長鏈接會減小三次握手,提升傳輸速度。異步
取消息:生產者投遞消息到隊列中,隊列服務器緩存消息。這時候當消費者啓動的時候,消費者會去向隊列服務器中獲取消息。學習
推消息:當生產者和消費者都啓動的時候,生產者向隊列投遞消息,這時候隊列會將消息推送給消費者。fetch
公平隊列的原理:隊列服務器向消費者發送消息的時候,消費者採用手動應答模式,隊列服務器必需要收到消費者發送ack結果通知以後,纔會繼續發送下一個消息。ui
Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。
發佈訂閱實現流程:生產者投遞消息給交換機,交換機根據路由策略(routignKey)轉發到不一樣的隊列服務器中緩存,而後隊列服務器在推送消息給消費者進行消費或者消費者從隊列服務器中拉取消息進行消費。
發佈訂閱實現原理:一對多。
這個隊列模式是消息隊列中最重要的隊列了,其餘的都是在它的基礎上進行了擴展。 功能實現:一個生產者發送消息,多個消費者獲取消息(一樣的消息),包括一個生產者,一個交換機,多個隊列,多個消費者。
思路解讀(重點理解):
1. 一個生產者,多個消費者
2. 每個消費者都有本身的一個隊列
3. 生產者沒有直接發消息到隊列中,而是發送到交換機
4. 每一個消費者的隊列都綁定到交換機上
5. 消息經過交換機到達每一個消費者的隊列 該模式就是Fanout Exchange(扇型交換機)將消息路由給綁定到它身上的全部隊列 以用戶發郵件案例講解
注意:交換機沒有存儲消息功能,若是消息發送沒有綁定消費隊列的交換機,消息則丟失。在消費者沒有啓動的狀況下,生產者投遞消息到交換機,這時候交換機不知道把消息轉發給哪一個消費者,因此消息會消失。由於交換機沒有緩存功能,只作轉發的功能。
使用場景:用戶註冊→發送郵件→發送短信。
Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。
生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)。
例如:咱們能夠把路由key設置爲insert ,那麼消費者隊列key指定包含insert才能夠接收消息,消費者隊列key定義爲update或者delete就不能接收消息。很好的控制了更新,插入和刪除的操做。 採用交換機direct模式
流程說明:若是生產者投遞消息到交換機(exchange),郵件隊列和短信隊列也都綁定了交換機(exchange)。可是當交換機的類型(type=direct)的時候,交換機的轉發(路由)由routingKey決定轉發給誰。以下如圖所示,當交換機的rontingKey=email的時候,消息將轉發到郵件隊列服務而後由郵件消費者進行消費。而短信隊列是都收不到消息的,由於短信的路由routingKey=msg。若是短信隊列也想收到消息就須要修改routingKey=email才能夠收到消息。
這就是交換機類型type=direct的用法及特性。
說明:此模式實在路由key模式的基礎上,使用了通配符來管理消費者接收消息。生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配;
符號#:匹配一個或者多個詞lazy.# 能夠匹配lazy.irs或者lazy.irs.cor
符號*:只能匹配一個詞lazy.* 能夠匹配lazy.irs或者lazy.cor
爲了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收而且處理完畢了。RabbitMQ就能夠刪除它了。 若是一個消費者掛掉卻沒有發送應答,RabbitMQ會理解爲這個消息沒有處理徹底,而後交給另外一個消費者去從新處理。這樣,你就能夠確認即便消費者偶爾掛掉也不會丟失任何消息了。 沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ纔會從新投遞。即便處理一條消息會花費很長的時間。 消息應答是默認打開的。咱們經過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦咱們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,能夠從內存刪除。若是消費者因宕機或連接失敗等緣由沒有發送ACK(不一樣於ActiveMQ,在RabbitMQ裏,消息沒有過時的概念),則RabbitMQ會將消息從新發送給其餘監聽在隊列的下一個消費者。
目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工做量很小,形成的緣由就是近當消息到達隊列進行轉發消息。並不在意有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發全部的奇數給一個消費者,偶數給另外一個消費者。 爲了解決這樣的問題,咱們可使用basicQos方法,傳遞參數爲prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,只有在消費者空閒的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完畢並本身對剛剛處理的消息進行確認以後,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去悠閒。 經過 設置channel.basicQos(1);
案例: 生產者端代碼不變,消費者端代碼這部分就是用於開啓手動應答模式的。 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 注:第二個參數值爲false表明關閉RabbitMQ的自動應答機制,改成手動應答。 在處理完消息時,返回應答狀態,true表示爲自動應答模式。 channel.basicAck(envelope.getDeliveryTag(), false);
生產者生產消息直接投遞給隊列服務器,隊列服務器在以推送消息到消費者或者消費者從隊列服務器拉取消息進行消費。消費者啓動的時候會與隊列服務器創建長鏈接。
AMQP(高級消息隊列協議)是一個異步消息傳遞所使用應用層協議規範,爲面向消息中間件設計,基於此協議的客戶端與消息中間件能夠無視消息來源傳遞消息,不受客戶端、消息中間件、不一樣的開發語言環境等條件的限制;
涉及概念解釋:
Server(Broker):接收客戶端鏈接,實現AMQP協議的消息隊列和路由功能的進程;
Virtual Host:虛擬主機的概念,相似權限控制組,一個Virtual Host裏能夠有多個Exchange和Queue。
Exchange:交換機,接收生產者發送的消息,並根據Routing Key將消息路由到服務器中的隊列Queue。
ExchangeType:交換機類型決定了路由消息行爲,RabbitMQ中有三種類型Exchange,分別是fanout、direct、topic; Message Queue:消息隊列,用於存儲還未被消費者消費的消息;
Message:由Header和body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、優先級是多少、由哪一個Message Queue接收等;
body是真正須要發送的數據內容;
BindingKey:綁定關鍵字,將一個特定的Exchange和一個特定的Queue綁定起來。
生產者發送消息不會像傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列在將消息以推送或者拉取方式給消費者進行消費,這和咱們以前學習Nginx有點相似。 交換機的做用根據具體的路由策略分發到不一樣的隊列中。
交換機有四種類型:
Direct exchange(直連交換機):是根據消息攜帶的路由鍵;
routing key:將消息投遞給對應隊列的 Fanout exchange(扇型交換機)將消息路由給綁定到它身上的全部隊列 ;
Topic exchange(主題交換機):隊列經過路由鍵綁定到交換機上,而後,交換機根據消息裏的路由值,將消息路由給一個或多個綁定隊列;
Headers exchange(頭交換機):相似主題交換機,可是頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。
問題產生背景: 生產者發送消息出去以後,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的。並且有的時候咱們在發送消息以後,後面的邏輯出問題了,咱們不想要發送以前的消息了,須要撤回該怎麼作。
若是RabbitMQ服務器宕機了,消息會丟失嗎?
答案:RabbitMQ服務器支持消息持久化機制,會把消息持久化在硬盤上保存。代碼設置 channel.queueDeclare(EMAIL_QUEUE, true, false, false, null); 方法第二個參數,默認狀況下咱們應該設置爲true。
解決方案:
1.AMQP 事務機制
2.Confirm 模式
事務模式::
txSelect:將當前channel設置爲transaction模式
txCommit :提交當前事務
txRollback:事務回滾
生產者 消費者 隊列服務器
消費者如何確保消息必定可以消費成功?
經過應答模式,默認爲應答模式,能夠修改成手動應答。設置方法:channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 第二個參數。
設置應答模式 :第一個參數 隊列名稱、第二個參數 應答模式 若是爲true 自動應答,false 爲手動應答、第三個參數 監聽器
自動應答(true):不在意消費者對這個消息處理是否成功,都會告訴隊列刪除該消息。若是處理消息失敗的狀況下,應該實現自動補償。
手動應答(false):當隊列把消息推送給消費者,消費者處理完業務邏輯以後,手動返回ack(通知)告訴給隊列服務器是否要刪除該消息、若是失敗,隊列服務器作補償,而不會直接刪除該消息、
springboot整合rabbitmq分爲2個項目,一個是生產者服務,一個是消息服務平臺項目。消息服務平臺項目中包括郵件消費者和短信消費者。沒有必要每個消費者都建立一個項目,那樣會浪費資源。
在一個項目中,能夠有多個生產者和消費者。
消費者在消費消息的時候,若是消費者業務邏輯出現程序異常,這時候應該如何處理?
答案:使用消息重試機制。
如何合適選擇重試機制:
狀況1: 消費者獲取到消息後,調用第三方接口,但接口暫時沒法訪問,是否須要重試?
答案:須要重試機制。
狀況2: 消費者獲取到消息後,拋出數據轉換異常,是否須要重試?
答案:不須要重試機制,須要發佈版本進行解決。
如何實現重試機制
總結:對於狀況2,若是消費者代碼拋出異常是須要發佈新版本才能解決的問題,那麼不須要重試,重試也無濟於事。應該採用日誌記錄+定時任務job健康檢查+人工進行補償。
產生緣由:網絡延遲傳輸中,消費出現異常或者是消費延遲消費,會形成MQ進行重試補償,在重試過程當中,可能會形成重複消費。
消費者如何保證消息冪等性,不被重複消費
解決辦法:
①使用全局MessageID判斷消費方是不是同一個,解決冪等性。
②或者使用業務邏輯id保證惟一(好比訂單號碼)
死信隊列 聽上去像 消息「死」了 其實也有點這個意思,死信隊列 是 當消息在一個隊列 由於下列緣由:
消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=false
消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
隊列超載
變成了 「死信」 後 被從新投遞(publish)到另外一個Exchange 該Exchange 就是DLX 而後該Exchange 根據綁定規則 轉發到對應的 隊列上 監聽該隊列 就能夠從新消費 說白了 就是 沒有被消費的消息 換個地方從新被消費
生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者
消息被拒絕(basic.reject或basic.nack)而且requeue=false.
消息TTL過時
隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)
在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息
定義業務(普通)隊列的時候指定參數
x-dead-letter-exchange: 用來設置死信後發送的交換機
x-dead-letter-routing-key:用來設置死信的routingKey
/**
* 定義死信隊列相關信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信隊列 交換機標識符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信隊列交換機綁定鍵標識符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 定義短信隊列 包括死信隊列
*
* @return
*/
@Bean
public Queue fanoutMsgQueue() {
//return new Queue(MSG_QUEUE_FANOUT);
// 將普通隊列綁定到死信隊列交換機上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(MSG_QUEUE_FANOUT, true, false, false, args);
return queue;
}
/**
* 配置死信隊列
*
* @return
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
/**
* 建立死信交換機
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
/**
* 死信交換機綁定私信隊列
* @param deadQueue
* @param deadExchange
* @return
*/
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
spring:
rabbitmq:
#### 鏈接地址
host: 127.0.0.1
####端口號
port: 5672
#### 用戶名 本身在rabbitmq服務器上新建的 默認的用戶名和密碼爲guest
username: ming
#### 密碼
password: ming
### 虛擬主機
virtual-host: /member
listener:
simple:
retry:
####開啓消費者(程序出現異常的狀況下)進行重試機制
enabled: true
### 最大重試次數, 默認狀況下 一直重試
max-attempts: 5
#### 重試間隔時間 單位:毫秒
initial-interval: 3000
##### 開啓手動應答 ack
acknowledge-mode: manual
### 服務端口號
server:
port: 8081
rabbitmq地址:http://www.rabbitmq.com/getstarted.html