接上文分佈式事務:基於可靠消息服務介紹了總體中間件的設計思路,有些內容沒有展開。故此,本文詳細講解下如何將消息可靠發送到Rabbitmq。git
在上文簡單提到了如何將消息進行可靠發送,由於shine-mq是無縫集成spring-boot-starter
的,因此rabbitmq的操做也是基於spring的rabbitTemplate
來完成的。github
rabbitTemplate
提供了setConfirmCallback
方法,能夠在消息發送到RabbitMQ交換器後,進行ack的回調。spring
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
...
//消息發送到RabbitMQ交換器後接收ack回調
//若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack返回值爲仍是true(這裏是一個坑,後面仔細講解)
if (ack) {
log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
...
}
...
});
複製代碼
在此以前還須要設置CachingConnectionFactory
緩存
//設置生成者確認機制
rabbitConnectionFactory.setPublisherConfirms(true);
複製代碼
若是還須要設置setReturnCallback
(消息發送到RabbitMQ交換器,但無相應queue時的回調),那就還須要設置rabbitTemplate
bash
//使用return-callback時必須設置mandatory爲true
rabbitTemplate.setMandatory(true);
複製代碼
這裏須要知道的是ReturnCallback比ConfirmCallback先回調。服務器
上面提到了setConfirmCallback
返回ack的一個坑,就是當消息成功發送到交換器,可是沒有匹配的隊列,就會觸發 ReturnCallback 回調,並且消息也丟失了,最致命的是setConfirmCallback
回調返回的ack倒是true,若是單靠這個ack來判斷消息是否成功到達mq,就有必定機率形成消息丟失。網絡
要解決的話,能夠在setReturnCallback
作一個緩存,由於上面的狀況會先觸發 ReturnCallback 回調,咱們緩存這個狀態,在setConfirmCallback
回調的時候,結合ack和以前緩存的狀態進行判斷是否真的發送成功。app
下面是shine-mq
實現的源碼:異步
//消息發送到RabbitMQ交換器後接收ack回調
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
String msgId = correlationData.getId();
CorrelationDataExt ext = (CorrelationDataExt) correlationData;
Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
coordinator.confirmCallback(correlationData, ack);
// 若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack返回值爲仍是true(這裏是一個坑,須要注意)
if (ack && !coordinator.getReturnCallback(msgId)) {
log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
coordinator.delReady(msgId);
} else {
//失敗了判斷重試次數,重試次數大於0則繼續發送
if (ext.getMaxRetries() > 0) {
try {
rabbitmqFactory.setCorrelationData(msgId, ext.getCoordinator(), ext.getMessage(),
ext.getMaxRetries() - 1);
rabbitmqFactory.getTemplate().send(ext.getMessage(), 0, 0, SendTypeEnum.DISTRIBUTED);
} catch (Exception e) {
log.error("Message retry failed to send, message:{} exception: ", ext.getMessage(), e);
}
} else {
log.error("Message delivery failed, msgId: {}, cause: {}", msgId, cause);
}
}
coordinator.delReturnCallback(msgId);
}
});
//使用return-callback時必須設置mandatory爲true
rabbitTemplate.setMandatory(true);
//消息發送到RabbitMQ交換器,但無相應queue時的回調
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String messageId = message.getMessageProperties().getMessageId();
String coordinatorName = messageId.split(MqConstant.SPLIT)[0];
Coordinator coordinator = (Coordinator) applicationContext.getBean(coordinatorName);
coordinator.setReturnCallback(messageId);
log.error("ReturnCallback exception, no matching queue found. message id: {}, replyCode: {}, replyText: {},"
+ "exchange: {}, routingKey: {}", messageId, replyCode, replyText, exchange, routingKey);
});
複製代碼
熟悉Rabbitmq的同窗可能知道,Rabbitmq有兩種機制來實現消息的可靠發送。分佈式
rabbitTemplate
設置回調就是這個模式)因此咱們知道了
rabbitTemplate
提供的確認機制是一種異步機制,並不能同步的發現問題,也就是說在極端的網絡條件下是會出現消息丟失的。
因此shine-mq經過增長一個Coordinator
(協調者)來實現。Coordinator
會保存2個狀態,一個是prepare(攜帶回查id),這個狀態在前文說過是用來保證上游服務的任務狀態的。
而另外一個狀態ready,就是來保證消息的可靠投遞。
首先shine-mq是使用@DistributedTrans
來開啓。在這個註解的切面裏,先持久化ready狀態。
@Around(value = "@annotation(trans)")
public void around(ProceedingJoinPoint pjp, DistributedTrans trans) throws Throwable {
...
try {
EventMessage message = new EventMessage(exchange, routeKey, SendTypeEnum.DISTRIBUTED.toString(), checkBackId,
coordinatorName, msgId);
//將消息持久化
coordinator.setReady(msgId, checkBackId.toString(), message);
rabbitmqFactory.setCorrelationData(msgId, coordinatorName, message, null);
rabbitmqFactory.addDLX(exchange, exchange, routeKey, null, null);
if (flag) {
rabbitmqFactory.add(MqConstant.DEAD_LETTER_QUEUE, MqConstant.DEAD_LETTER_EXCHANGE,
MqConstant.DEAD_LETTER_ROUTEKEY, null, null);
flag = false;
}
rabbitmqFactory.getTemplate().send(message, 0, 0, SendTypeEnum.DISTRIBUTED);
} catch (Exception e) {
log.error("Message failed to be sent : ", e);
throw e;
}
}
複製代碼
而後在回調中刪除該狀態:
//消息發送到RabbitMQ交換器後接收ack回調
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
String msgId = correlationData.getId();
CorrelationDataExt ext = (CorrelationDataExt) correlationData;
Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
//能夠自定義實現的回調
coordinator.confirmCallback(correlationData, ack);
// 若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack返回值爲仍是true(這裏是一個坑,須要注意)
if (ack && !coordinator.getReturnCallback(msgId)) {
log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
//刪除ready狀態
coordinator.delStatus(msgId);
} else {
...
}
}
});
複製代碼
由於存儲ready是在上游服務任務執行以後的,因此只要有超時的ready記錄未被清理掉,daemon(守護線程)
只管撈起來進行重發就行,由於Mq的可靠性投遞就已經要求下游服務是須要保證冪等性了。
最後還有個極端的狀況,就是ready消息存儲的時候由於網絡抖動該消息丟失了,這時候也沒有關係,由於有prepare狀態會進行回查,該狀態只有在ready存儲後纔會觸發刪除。
若是對你有幫助,那就幫忙點個星星把 ^.^
github地址:github.com/7le/shine-m…