在使用消息機制時,咱們一般須要考慮如下幾個問題:javascript
- 消息不能丟失
- 保證消息必定能投遞到目的地
- 保證業務處理和消息發送/消費的一致性
本文以RabbitMQ爲例,討論如何解決以上問題。html
消息持久化
若是但願RabbitMQ重啓以後消息不丟失,那麼須要對如下3種實體均配置持久化:java
- exchange
- queue
- message
聲明exchange時設置持久化(durable = true
)而且不自動刪除(autoDelete = false):數據庫
boolean durable = true;
boolean autoDelete = false;
channel.exchangeDeclare("dlx", TOPIC, durable, autoDelete, null)
聲明queue時設置持久化(durable = true
)而且不自動刪除(autoDelete = false):json
boolean durable = true;
boolean autoDelete = false;
channel.queueDeclare("order-summary-queue", durable, false, autoDelete, queueArguments);
發送消息時經過設置deliveryMode=2
持久化消息:markdown
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(0)
.build();
channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes())
發送確認
有時,業務處理成功,消息也發了,可是咱們並不知道消息是否成功到達了rabbitmq,若是因爲網絡等緣由致使業務成功而消息發送失敗,那麼發送方將出現不一致的問題,此時可使用rabbitmq的發送確認功能,即要求rabbitmq顯式告知咱們消息是否已成功發送。網絡
首先須要在channel上設置ConfirmListener
:架構
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "號及其之前的全部消息發送成功,當消息發送成功後執行相應邏輯,好比標記事件爲已發送或者刪除原來事件");
} else {
logger.info(seqNo + "號發送成功,當消息發送成功後執行相應邏輯,好比標記事件爲已發送或者刪除原來事件");
}
}
public void handleNack(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "號及其之前的全部消息發送失敗,當消息發送失敗後執行相應邏輯,好比重試或者標記事件發送失敗");
} else {
logger.info(seqNo + "號發送失敗,當消息發送失敗後執行相應邏輯,好比重試或者標記事件發送失敗");
}
}
});
而後在發送消息直線須要開啓發送確認模式:app
//開啓發送者確認
channel.confirmSelect();
而後發送消息:less
channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes());
當消息正常投遞時,rabbitmq客戶端將異步調用handleAck()
表示消息已經成功投遞,此時程序能夠自行處理投遞成功以後的邏輯,好比在數據庫中將消息設置爲已發送
。當消息投遞出現異常時,handleNack()
將被調用。
一般來說,發送端只須要保證消息可以發送到exchange便可,而無需關注消息是否被正確地投遞到了某個queue,這個是rabbitmq和消息的接收方須要考慮的事情。基於此,若是rabbitmq找不到任何須要投遞的queue,那麼rabbitmq依然會ack給發送方,此時發送方能夠認爲消息已經正確投遞,而很差用關係消息沒有queue接收的問題。可是,對於rabbitmq而言,這種消息是須要記錄下來的,不然rabbitmq將直接丟棄該消息。此時能夠爲exchange設置alternate-exchange
,即表示rabbitmq將把沒法投遞到任何queue的消息發送到alternate-exchange
指定的exchange中,一般來講能夠設置一個死信交換(DLX)。
事實上,對於exchange存在可是卻找不到任何接收queue時,若是發送是設置了mandatory=true
,那麼在消息被ack前將return給客戶端,此時客戶端能夠建立一個ReturnListener
用於接收返回的消息:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
logger.warn("消息沒法正確投遞,已返回。");
}
});
可是須要注意的是,在return以後,消息依然會被ack而不是nack,還不如不設置madatory呢,所以return有時並不見得有用。
須要注意的是,在發送消息時若是exchange不存在,rabbitmq直接丟棄該消息,而且不會ack或者nack操做,可是在Spring中,會nack。
綜合起來,要完成發送方確認,須要作如下幾個點:
- 設置ConfirmListener
- 經過
confirmSelect()
開啓 - 爲exchange設置
alternate-exchange
到DLX - 發送時沒有必要設置mandotory
- 發送方將消息記錄在數據庫中,收到ack時在數據庫中標記消息爲
已發送
狀態 - 若是收到reject或者因爲網絡緣由沒有收到ack,那麼消息狀態不會改變,下次發送時再次發送,此時可能致使消息重複,解決重複問題請參考「保證至少一次投遞,而且消費端冪」小節。
手動消費確認
有時,消息被正確投遞到消費方,可是消費方處理失敗,那麼便會出現消費方的不一致問題。好比訂單已建立
的消息發送到用戶積分子系統中用於增長用戶積分,可是積分消費法處理卻都失敗了,用戶就會問:我購買了東西爲何積分並無增長呢?
要解決這個問題,須要引入消費方確認,即只有消息被成功處理以後才告知rabbitmq以ack,不然告知rabbitmq以nack,此時的處理流程以下:
- 接收消息,不做ack,處理消息成功則ack,不成功nack
- 對於nack的消息,能夠配置rabbitmq要麼從新投遞,要麼直接扔掉,要麼傳到死信交換(DLX)
- 若是處理成功,可是因爲網絡等問題致使確認(不管是ack仍是nack)不成功,那麼rabbitmq會從新投遞消息,可是此時因爲消息已經成功,從新投遞便致使了消費重複的消息,此時請參考「保證至少一次投遞,而且消費端冪」小節。
在rabbitmq中,消息默認是字段ack的,即消息到達消費方當即ack,而無論消費方業務處理是否成功,爲此能夠開啓手動確認模式,即有消費方自行決定什麼時候應該ack,經過設置autoAck=false
開啓手動確認模式:
boolean autoAck = false;
channel.basicConsume("order-summary-queue", autoAck,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
if (success()) {
logger.info("成功消費消息" + deliveryTag);
channel.basicAck(deliveryTag, false);
} else {
if (!envelope.isRedeliver()) {
logger.warn("首次消費消息" + deliveryTag + "不成功,嘗試重試");
boolean requeue = true;
channel.basicNack(deliveryTag, false, requeue);
} else {
logger.warn("第二次消費消息" + deliveryTag + "不成功,扔到DLX");
boolean requeue = false;
channel.basicNack(deliveryTag, false, requeue);
}
}
}
});
能夠看到,在autoAck=false
狀況下,經過業務處理的是否成功(success()
)來判斷應該ack仍是nack。
另外,爲了不消息反覆requeue的狀況,若是消息第一次消費不成功,則在nack時設置requeue=true
,表示告知rabbitmq將reject的消息從新投遞,若是第二次消費依然不成功,那麼nack時設置requeue=false
,告知rabbitmq不要從新投遞了,此時rabbitmq將根據本身的配置要麼直接扔掉消息,要麼將消息發送到DLX中,具體配置請參考「設置死信交換(DLX)和死信隊列(DLQ)」。
保證至少一次投遞,而且消費端冪等
一般來講,程序中會先完成寫數據庫的操做,而後發送消息,此時一個重要的點是保證這二者的一致性,即一旦數據庫保存成功消息必須也可以發送成功。要保證發送發一致性,一種作法是使用全局事務,即將數據庫操做和消息發送放到一個事務中,好比JTA,可是全局事務是很重的,而且rabbitmq目前並不支持全局事務。
要解決發送發的一致性問題,能夠實現將消息保存到數據庫的事件表中,此時業務處理的數據庫操做和保存消息到數據庫屬於同一個本地數據庫事務,那麼到此能夠保證業務處理和消息產生的原子性,而後有一個異步的後臺任務從數據庫的事件表中一次讀取未發送的消息發送至rabbitmq,發送成功後更新消息的狀態爲已發佈
。
然而,此時咱們依然沒法保證發送消息和更新消息狀態之間的原子性,由於可能發生消息發送成功可是數據庫狀態更新不成功的狀況,爲了解決這種極端狀況,能夠屢次重試消息發送,步驟以下:
- 讀取時間表中未發送消息,發送到rabbitmq
- 若是發送成功,事件表中消息狀態也更新成功,皆大歡喜
- 若是消息發送不成功,那麼消息狀態也不做改變,下次重試
- 若是消息發送成功而狀態更新不成功,下次重試
不斷重試,總有一個可以達到發送消息和狀態更新的原子性。
那麼問題也來了:rabbitmq中可能出現多條重複消息,此時消費端就懵了。爲了解決這個問題,消費方應該設計爲冪等的,即對相同消息的屢次消費與單次消費結果相同。有些消費方的業務邏輯自己即是冪等的,而對於自己不冪等的消費方,須要在數據庫中記錄已經被正確消費的消息,當重複消息來時,判斷該消息是否已經被消費,若是沒有則執行消費邏輯,若是已經消費則直接忽略。此時消費方的處理步驟以下:
- 接收到消息,判斷消息是否已經消費,若是是,則直接忽略,此時已然須要作消費成功確認
- 若是消息還未被消費,則處理業務邏輯,記錄消息,業務邏輯自己和記錄消息在同一個數據庫事務中,若是都成功,則皆大歡喜;若是失敗,那麼消費方業務回滾,消息也不記錄,此時reject消息,等下次重發
設置消息的TTL和消息隊列的max-length
爲了保證消息的時效性,能夠設置隊列中消息的TTL(x-message-ttl
),而爲了保證消息隊列不至於太大而影響性能,能夠設置隊列的最大消息數(x-max-length
)。在建立隊列時設置以下:
ImmutableMap<String, Object> orderSummaryQueueArguments = of(
"x-max-length",
300,
"x-message-ttl",
24 * 60 * 60 * 1000);
channel.queueDeclare("order-summary-queue", true, false, false, orderSummaryQueueArguments);
設置死信交換(DLX)和死信隊列(DLQ)
對於沒法投遞的消息,咱們須要將其記錄下來便於後續跟蹤排查,此時能夠將這樣的消息放入DLX和DLQ中。默認狀況下,queue中被拋棄的消息將被直接丟掉,可是能夠經過設置queue的x-dead-letter-exchange
參數,將被拋棄的消息發送到x-dead-letter-exchange
作指定的exchange中,這樣的exchange成爲DLX。
設置了x-dead-letter-exchange
以後,在如下三種狀況下消息將被扔到DLX中:
- 消費方nack時指定了
requeue=false
- 消息的TTL已到
- 消息隊列的max-length已到
在聲明queue時定義x-dead-letter-exchange
:
ImmutableMap<String, Object> orderNotificationQueueArguments = of("x-dead-letter-exchange", "dlx");
channel.queueDeclare("order-notification-queue", true, false, false, orderNotificationQueueArguments);
- 設置DLQ爲lazy,而且沒有TTL,而且沒有max-length
在如下3種狀況下,消息會被投遞到DLX中:
須要注意的是,在發送消息時,當已經達到queue的上限,而當queue定義爲x-overflow=reject-publish時,rabbitmq將nack。當有多個queue同時綁定到exchange時,若是有些queue設置了reject-publish,而有些卻沒有,那麼依然會nack,這對發送方來講很差處理。所以,仍是那句話,發送方只須要保證正確地投遞到了exchange便可,而不用關係exchange後面有哪些queue。
設置Prefetch count
Prefetch count表示消費方一次性從rabbitmq讀取的消息數量,若是設置過大,那麼消費方可能始終處於高負荷運轉狀態,而若是過小又會增長網絡開銷,一般設置爲20-50。另外,有時爲了保證多個消費方均衡地分攤消息處理任務,一般設置prefetch count爲1。
異常處理
在以上設置的狀況下,咱們來看看當各類異常發生時,rabbitmq是如何運做的:
- broker不可達:直接拋出異常;
- 發送方本身始終發送不出去:消息狀態始終處於「未發送」,不會破壞一致性,可是對於事件表中累計太多的事件須要關注;
- exchange不存在:消息被丟掉,rabbitmq不會ack,消息狀態始終處於「未發送」,下次將從新發送,不會破壞一致性,可是當exchange持續不存在下去,那麼事件表中事件也會累計太多;
- exchange存在可是沒有接受queue:消息將被ack並標記爲「已發送」,但因爲設置了alternative exchange爲dlx,那麼消息將發送到dlx對應的dlq中保存以便後續處理;
- consumer不在線,而累積消息太多:消息一致性沒有問題,可是當累計到了max-length上限,消息隊列頭部的消息將被放置dlq中以便後續處理;
- consumer臨時性失敗:經過redelivered判斷是否爲重複投遞,若是是,則nack而且requeue=false,表示若是重複投遞的一次的消息若是再失敗,那麼直接扔到dlx中,也即消息最多重複投遞一次;
- consumer始終失敗:全部消息均被投入dlq以便後續處理,此時可能須要關注dlq的長度是否太長。
路由策略
系統中每每會發布多種類型的消息,在發送時有幾種路由策略:
- 全部類型的消息都發送到同一個exchange中
- 每種類型的消息都單獨配置一個exchange
- 對消息類型進行歸類,同一類型的消息對應一個exchange
筆者建議採用最後一種,而且結合DDD中的聚合劃分,路由策略建議以下:
每個聚合根下發布的全部類型的事件對應一個exchange,exchange設置爲topic,queue能夠配置接收某一種類型的事件,也能夠配置接收全部某種聚合相關的事件,還能夠配置接收全部事件。
案例
假設有個訂單(Order)系統,用戶下單後須要向用戶發送短信通知,而全部對訂單的數據顯示採用了CQRS架構,即將訂單的讀模型和寫模型分離,即全部訂單的更新都經過事件發到rabbitmq,而後專門有個consumer接收這些消息用於更新訂單的讀模型。
訂單相關有兩個事件:order.created和order.updated,全部與訂單相關的事件都發布到同一個 topic exchange中,exchange名爲「order",設置短信通知queue(order-notification-queue)只接收order.created消息,由於只有訂單在新建時纔會發出通知,即order-notification-queue的routing key爲order.created
,設置讀模型的queue(order-summary-queue)接收全部與Order相關的消息,即配置order-summary-queue的routing key爲order.#
,示例代碼以下:
package com.ecommerce.order.spike.rabbitmq;
import com.ecommerce.order.common.logging.AutoNamingLoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import static com.google.common.collect.ImmutableMap.of;
import static com.rabbitmq.client.BuiltinExchangeType.TOPIC;
public class RabbitMQSender {
private static final Logger logger = AutoNamingLoggerFactory.getLogger();
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("rabbitmq-user");
factory.setPassword("rabbitmq-password");
factory.setVirtualHost("/");
factory.setPort(5672);
try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel();) {
//設置死信交換,Topic類型,持久化
channel.exchangeDeclare("dlx", TOPIC, true, false, null);
//設置死信隊列,持久化,lazy型
channel.queueDeclare("dlq", true, false, false, of("x-queue-mode", "lazy"));
//接收全部發給dlx的消息,另外能夠定義其餘queue接收指定類型的消息
channel.queueBind("dlq", "dlx", "#");
//定義與order相關的事件exchange,若是沒法路由,則路由到死信交換dlx
channel.exchangeDeclare("order", TOPIC, true, false, of("alternate-exchange", "dlx"));
//定義用於異步更新order讀模型的queue,設置死信交換爲dlx,隊列滿(x-overflow)時將頭部消息發到dlx
//定義queue的最大消息數(x-max-length)爲300,滿後發到dlx,另外定義消息的存活時間(x-message-ttl)爲1天,1天后發送到dlx
ImmutableMap<String, Object> orderSummaryQueueArguments = of("x-dead-letter-exchange",
"dlx",
"x-overflow",
"drop-head",
"x-max-length",
300,
"x-message-ttl",
24 * 60 * 60 * 1000);
channel.queueDeclare("order-summary-queue", true, false, false, orderSummaryQueueArguments);
channel.queueBind("order-summary-queue", "order", "order.#");
//定義用於order建立時向用戶發出通知的queue,設置死信交換爲dlx
ImmutableMap<String, Object> orderNotificationQueueArguments = of("x-dead-letter-exchange",
"dlx",
"x-overflow",
"drop-head",
"x-max-length",
300,
"x-message-ttl",
24 * 60 * 60 * 1000);
channel.queueDeclare("order-notification-queue", true, false, false, orderNotificationQueueArguments);
channel.queueBind("order-notification-queue", "order", "order.created");
//設置發送端確認
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "號及其之前的全部消息發送成功,當消息發送成功後執行相應邏輯,好比標記事件爲已發送或者刪除原來事件");
} else {
logger.info(seqNo + "號發送成功,當消息發送成功後執行相應邏輯,好比標記事件爲已發送或者刪除原來事件");
}
}
public void handleNack(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "號及其之前的全部消息發送失敗,當消息發送失敗後執行相應邏輯,好比重試或者標記事件發送失敗");
} else {
logger.info(seqNo + "號發送失敗,當消息發送失敗後執行相應邏輯,好比重試或者標記事件發送失敗");
}
}
});
//開啓發送者確認
channel.confirmSelect();
//設置消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(0)
.build();
//發送時沒有必要設置mandatory,由於沒法路由的消息會記錄在dlq中
//達到queue的上限時,queue頭部消息將被放入dlx中
try {
channel.basicPublish("order", "order.created", false, properties, "create order data".getBytes());
channel.basicPublish("order", "order.updated", false, properties, "update order data".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
Thread.sleep(5000);
}
}
}
以上,咱們發送了一條order.created消息和一條order.updated消息,基於routing key設置,兩條消息都會到達order-summary-queue,可是隻有order.created消息到達了order-notification-queue:
在consumer端,開啓手動ack,而且對於處理失敗的場景,只容許從新投遞一次,不然扔到DLX中:
package com.ecommerce.order.spike.rabbitmq;
import com.ecommerce.order.common.logging.AutoNamingLoggerFactory;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class RabbitMQReceiver {
private static final Logger logger = AutoNamingLoggerFactory.getLogger();
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("rabbitmq-user");
factory.setPassword("rabbitmq-password");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.basicQos(1, true);
boolean autoAck = false;
channel.basicConsume("order-summary-queue", autoAck,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
//用Random來模擬有時處理成功有時處理失敗的場景
if (new Random().nextBoolean()) {
logger.info("成功消費消息" + deliveryTag);
channel.basicAck(deliveryTag, false);
} else {
if (!envelope.isRedeliver()) {
logger.warn("首次消費消息" + deliveryTag + "不成功,嘗試重試");
channel.basicNack(deliveryTag, false, true);
} else {
logger.warn("第二次消費消息" + deliveryTag + "不成功,扔到DLX");
channel.basicNack(deliveryTag, false, false);
}
}
}
});
}
}
來自:https://www.cnblogs.com/davenkin/p/rabbitmq-best-practices.html