Binding
:Exchange和Exchange、Queue之間的鏈接關係
Queue
:實際存儲消息。
Durability
:是否持久化,Durable:是,Transient:否。
Auto delete
:如選yes,表明當最後一個監聽被移除以後,該Queue會自動刪除。
Message
:服務器和應用程序之間傳送的數據。本質上就是一段數據,由Properties和Payload(Body)組成。經常使用屬性:delivery mode、headers(自定義屬性)
Virtual host
:虛擬主機,用於進行邏輯隔離,最上層的消息路由。一個Virtual Host裏面能夠有若干個Exchange和Queue,同一個Virtual Host裏面不能有相同名稱的Exchange或者Queue。
消息如何保證100%的投遞成功?segmentfault
生產端可靠性投遞
服務器
- 保障消息的成功發出
- 保障MQ節點的成功接收
- 發送端收到MQ節點(Broker)確認應答
- 完善的消息進行補償機制
可靠性投遞解決方案
ide
- 消息落庫,對消息狀態進行打標
- 消息的延遲投遞,作二次確認,回調檢查
- 消息集羣鏡像隊列:rabbitmq集羣模式很是經典的就是Mirror鏡像模式,保證100%數據不丟失,在實際工做中也是用的最多的。而且實現集羣很是簡單,通常互聯網公司都會構建這種鏡像集羣模式。https://segmentfault.com/a/11...
Confirm確認消息、Return返回消息fetch
理解Confirm消息確認機制:
日誌
- 消息的確認,是指生產者投遞消息後,若是Broker收到消息,則會給咱們生產者一個應答。
- 生產者進行接受應答,用來肯定這條消息是否正常的發送到了Broker,這種方式也是消息的可靠性投遞的核心保障。
如何實現Confirm確認消息?
code
- 第一步:在channel上開啓確認模式:channel.confirmSelect()
- 第二步:在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或者記錄日誌等後續處理。
Return消息機制
rabbitmq
- Return Listener用於處理一些不可路由的消息
- 咱們的消息生產者,經過制定一個Exchange和Routing Key,把消息送達到某一個隊列中去,而後咱們的消費者監聽隊列,進行消費處理。
- 可是在某寫狀況下,若是咱們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候若是咱們須要監聽這種不可達的消息,就要使用Return Listener。
- Mandatory:若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲false,那麼broker端自動刪除該消息。
public class MyConsumer extends DefaultConsumer implements Consumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { } }
消息的ACK與重回隊列隊列
消費端手工ACK(應答成功)和NACK(應答失敗)
ip
- 消費端進行消費的時候,若是因爲業務異常咱們能夠進行日誌的記錄,而後進行補償。
- 因爲服務器宕機等嚴重問題,咱們就要手工進行ACK保障消費端消費成功。好比:消費一半的時候宕機了,broker端沒有收到應答,重發消息。
消費端重回隊列
ci
- 消費端重回隊列是爲了對沒有處理成功的消息,把消息從新回遞給broker。
- 通常咱們在實際應用中,都會關閉重回隊列,也就是設置爲false。
channel.basciNack(envelope.getDeliveryTag(),false,
true
) // 爲true的話,在消費失敗的狀況下會重回隊列放入隊列末端。
消息的限流
假設有一個場景,RabbitMq服務器上有上萬條未處理的消息,咱們隨便打開一個消費者客戶端,會出現下面的狀況: 巨量的消息瞬間所有推送過來,可是咱們單個客戶端沒法同時處理這麼多數據。RabbitMq提供了一種qos(服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息(經過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息。
void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
prefetchSize
:0
prefetchCount
: 會告訴RabbitMq不要同時給一個消費者推送多於N個消息,即一旦有N個消息尚未被ack,則該consumer將block掉,知道有消息ack。
global
: true/false,是否將上面設置應用於channel,簡答點說,就是上面限制是channel級別的仍是consumer級別。
// 限流方式,第一件事就是autoAck設置爲false,關閉自動簽收,必須手動簽收 channel.basicQos(0,3,false); // 3表示若是消息積壓了1000條,先給我推3條,這三條消費結束後,我會給你一個ack表示這三條我已經處理完了,而後再給我推送3條... channel.basicConsume(queueName,false,new MyConsumer(channel)) //在MyConsumer中對消息進行簽收ack channel.basicAck(envelope,getDeliveryTag(), false);
TTL消息
TTL
:是Time To Live。就是生存時間。
- RabbitMQ支持消息的過時時間,在消息發送時能夠進行指定。
- RabbitMQ支持隊列的過時時間,從消息入隊開始計算,只要超過了隊列的超時時間配置,那麼消息會自動清除。
死信隊列
死信隊列
:DLX,Dead-Letter-Exchange。
利用DLX,當消息在一個隊列中變成了死信(dead message:這條消息沒有消費者去消費)以後,他能被從新publish到另一個Exchange,這個Exchange就是DLX。進入死信隊列(進入死信的三種方式)
- 1.消息被拒絕(basic.reject or basic.nack)而且requeue=false
void basicNack(long deliveryTag, boolean multiple,
boolean
requeue
) throws IOException;
- 2.消息TTL過時
- 3.隊列達到最大長度
備註說明
- DLX也是一個正常的Exchange,和通常的Exchange沒區別,他能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
- 當這個隊列中有死信時,RabbitMQ會自動將這個消息從新發送到已經設置了的Exchange上去,進而被路由到另一個隊列上去。
- 能夠監聽這個隊列中消息作相應的處理。
死信隊列設置
- 首先須要設置死信隊列的exchange和queue,而後進行綁定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
- 而後進行正常聲明交換機、隊列、綁定,只不過咱們須要在隊列加上一個參數便可:arguments.put("
x-dead-letter-exchange
","dlx.exchange");