這是我參與8月更文挑戰的第11天,活動詳情查看:8月更文挑戰java
RabbitMQ系列彙總:RabbitMQ系列編程
生產者將信道設置成 confirm 模式,一旦信道進入 confirm 模式,全部在該信道上面發佈的消息都將會被指派一個惟一的 ID(從 1 開始),一旦消息被投遞到全部匹配的隊列以後。安全
broker 就會發送一個確認給生產者(包含消息的惟一 ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會在將消息寫入磁盤以後發出。markdown
broker 回傳 給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也能夠設置 basic.ack 的 multiple 域,表示到這個序列號以前的全部消息都已經獲得了處理。併發
confirm 模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信 道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後。dom
生產者應用即可以經過回調方法來處理該確認消息,若是 RabbitMQ 由於自身內部錯誤致使消息丟失,就會發送一條 nack 消息,生產者應用程序一樣能夠在回調方法中處理該 nack 消息。異步
開啓發布確認方式函數
發佈確認默認是沒有開啓的,若是要開啓須要調用方法 confirmSelect,每當你要想使用發佈 確認,都須要在 channel 上調用該方法高併發
//開啓發布確認
channel.confirmSelect();
複製代碼
這是一種簡單的確認方式,它是一種同步確認發佈的方式,也就是發佈一個消息以後只有它 被確認發佈,後續的消息才能繼續發佈,waitForConfirmsOrDie(long)這個方法只有在消息被確認 的時候才返回,若是在指定時間範圍內這個消息沒有被確認那麼它將拋出異常。post
生產者
/** * 這是一個測試的生產者 *@author DingYongJun *@date 2021/8/1 */
public class DyProducerTest_dingyuefabu {
//設置執行次數
public static final int MESSAGE_COUNT = 888;
/** * 這裏爲了方便,咱們使用main函數來測試 * 純屬看你我的選擇 * @param args */
public static void main(String[] args) throws Exception {
//單個發佈確認執行
publishMessageIndividually();
}
/** * 單個發佈確認 */
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啓發布確認
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服務端返回 false 或超時時間內未返回,生產者能夠消息重發
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息發送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("發佈" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) +
"ms");
}
}
複製代碼
執行結果
這種確認方式有一個最大的缺點就是:發佈速度特別的慢,由於若是沒有確認發佈的消息就會 阻塞全部後續消息的發佈,這種方式最多提供每秒不超過數百條發佈消息的吞吐量。固然對於某 些應用程序來講這可能已經足夠了。
固然,如今跟你說慢,你莫得感知,下面幾種綜合起來對比你就會發現他的效率有多低了!
與單個等待確認消息相比,先發布一批消息而後一塊兒確承認以極大地 提升吞吐量。
生產者
/** * 批量發佈確認 */
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//隊列名使用uuid來獲取不重複的值,不須要本身再進行命名了。
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啓發布確認
channel.confirmSelect();
//批量確認消息大小
int batchSize = 88;
//未確認消息個數
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();//確認代碼
outstandingMessageCount = 0;
}
}
//爲了確保還有剩餘沒有確認消息 再次確認
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("發佈" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) +
"ms");
}
複製代碼
執行結果
缺點:當發生故障致使發佈出現問題時,不知道是哪一個消息出現問題了,咱們必須將整個批處理保存在內存中,以記錄重要的信息然後從新發布消息。
固然這種方案仍然是同步的,也同樣阻塞消息的發佈。
異步確認雖然編程邏輯比上兩個要複雜,可是性價比最高,不管是可靠性仍是效率都沒得說, 他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是經過函數回調來保證是否投遞成功, 下面就讓咱們來詳細講解異步確認是怎麼實現的。
生產者
/** * 異步發佈確認 */
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//開啓發布確認
channel.confirmSelect();
/** * 線程安全有序的一個哈希表,適用於高併發的狀況 * 1.輕鬆的將序號與消息進行關聯 * 2.輕鬆批量刪除條目 只要給到序列號 * 3.支持併發訪問 */
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/** * 確認收到消息的一個回調 * 1.消息序列號 * 2.true 能夠確認小於等於當前序列號的消息 * false 確認當前序列號消息 */
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小於等於當前序列號的未確認消息 是一個 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認消息
confirmed.clear();
}else{
//只清除當前序列號的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發佈的消息"+message+"未被確認,序列號"+sequenceNumber);
};
/** * 添加一個異步確認的監聽器 * 1.確認收到消息的回調 * 2.未收到消息的回調 */
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/** * channel.getNextPublishSeqNo()獲取下一個消息的序列號 * 經過序列號與消息體進行一個關聯 * 所有都是未確認的消息體 */
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發佈" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) +
"ms");
}
}
複製代碼
執行結果
很容易看出,這種方式速度快的飛起呀!
如何處理未確認的消息?
單獨發佈消息
批量發佈消息
異步處理
路漫漫其修遠兮,吾必將上下求索~
若是你認爲i博主寫的不錯!寫做不易,請點贊、關注、評論給博主一個鼓勵吧~hahah