RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什麼已經有了基本的瞭解,若是你還不知道它是什麼以及能夠用來作什麼,建議先從官網的 RabbitMQ Tutorials 入門教程開始學習。php
本文將會講解如何使用RabbitMQ實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗後,自動延時將消息從新投遞,當達到必定的重試次數後,將消息投遞到失敗消息隊列,等待人工介入處理。在這裏我會帶領你們一步一步的實現一個帶有失敗重試功能的發佈訂閱組件,使用該組件後能夠很是簡單的實現消息的發佈訂閱,在進行業務開發的時候,業務開發人員能夠將主要精力放在業務邏輯實現上,而不須要花費時間去理解RabbitMQ的一些複雜概念。html
本文將會持續修正和更新,最新內容請參考個人 GITHUB 上的 程序猿成長計劃 項目,歡迎 Star,更多精彩內容請 follow me。java
咱們將會實現以下功能git
具體流程見下圖github
Linus Torvalds 曾經說過json
Talk is cheap. Show me the code服務器
我分別用Java和PHP實現了本文所講述的方案,讀者能夠經過參考代碼以及本文中的基本步驟來更好的理解app
爲了實現消息的延時重試和失敗存儲,咱們須要建立三個Exchange來處理消息。ide
全部的Exchange聲明(declare)必須使用如下參數函數
參數 | 值 | 說明 |
---|---|---|
exchange | - | Exchange名稱 |
type | topic | Exchange 類型 |
passive | false | 若是Exchange已經存在,則返回成功,不存在則建立 |
durable | true | 持久化存儲Exchange,這裏僅僅是Exchange自己持久化,消息和隊列須要單獨指定其持久化 |
no-wait | false | 該方法須要應答確認 |
Java代碼
// 聲明Exchange:主體,失敗,重試
channel.exchangeDeclare("master", "topic", true);
channel.exchangeDeclare("master.retry", "topic", true);
channel.exchangeDeclare("master.failed", "topic", true);
複製代碼
PHP代碼
// 普通交換機
$this->channel->exchange_declare('master', 'topic', false, true, false);
// 重試交換機
$this->channel->exchange_declare('master.retry', 'topic', false, true, false);
// 失敗交換機
$this->channel->exchange_declare('master.failed', 'topic', false, true, false);
複製代碼
在RabbitMQ的管理界面中,咱們能夠看到建立的三個Exchange
消息發佈時,使用basic_publish
方法,參數以下
參數 | 值 | 說明 |
---|---|---|
message | - | 發佈的消息對象 |
exchange | master | 消息發佈到的Exchange |
routing-key | - | 路由KEY,用於標識消息類型 |
mandatory | false | 是否強制路由,指定了該選項後,若是沒有訂閱該消息,則會返回路由不可達錯誤 |
immediate | false | 指定了當消息沒法直接路由給消費者時如何處理 |
發佈消息時,對於message
對象,其內容建議使用json編碼後的字符串,同時消息須要標識如下屬性
'delivery_mode'=> 2 // 1爲非持久化,2爲持久化
複製代碼
Java代碼
channel.basicPublish(
"master",
routingKey,
MessageProperties.PERSISTENT_BASIC, // delivery_mode
message.getBytes()
);
複製代碼
PHP代碼
$msg = new AMQPMessage($message->serialize(), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
$this->channel->basic_publish($msg, 'master', $routingKey);
複製代碼
消息訂閱的實現相對複雜一些,須要完成隊列的聲明以及隊列和Exchange的綁定。
對於每個訂閱消息的服務,都必須建立一個該服務對應的隊列,將該隊列綁定到關注的路由規則,這樣以後,消息生產者將消息投遞給Exchange以後,就會按照路由規則將消息分發到對應的隊列供消費者消費了。
消費服務須要declare三個隊列
[queue_name]
隊列名稱,格式符合 [服務名稱]@訂閱服務標識
[queue_name]@retry
重試隊列[queue_name]@failed
失敗隊列
訂閱服務標識
是客戶端本身對訂閱的分類標識符,好比用戶中心服務(服務名稱ucenter),包含兩個訂閱:user和enterprise,這裏兩個訂閱的隊列名稱就爲ucenter@user
和ucenter@enterprise
,其對應的重試隊列爲ucenter@user@retry
和ucenter@enterprise@retry
。
Declare隊列時,參數規定規則以下
參數 | 值 | 說明 |
---|---|---|
queue | - | 隊列名稱 |
passive | false | 隊列不存在則建立,存在則直接成功 |
durable | true | 隊列持久化 |
exclusive | false | 排他,指定該選項爲true則隊列只對當前鏈接有效,鏈接斷開後自動刪除 |
no-wait | false | 該方法須要應答確認 |
auto-delete | false | 當再也不使用時,是否自動刪除 |
對於@retry
重試隊列,須要指定額外參數
'x-dead-letter-exchange' => 'master'
'x-message-ttl' => 30 * 1000 // 重試時間設置爲30s
複製代碼
這裏的兩個header字段的含義是,在隊列中延遲30s後,將該消息從新投遞到
x-dead-letter-exchange
對應的Exchange中
Java代碼
// 聲明監聽隊列
channel.queueDeclare(
queueName, // 隊列名稱
true, // durable
false, // exclusive
false, // autoDelete
null // arguments
);
channel.queueDeclare(queueName + "@failed", true, false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", exchangeName());
arguments.put("x-message-ttl", 30 * 1000);
channel.queueDeclare(queueName + "@retry", true, false, false, arguments);
複製代碼
PHP代碼
$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_declare($failedQueueName, false, true, false, false, false);
$this->channel->queue_declare(
$retryQueueName, // 隊列名稱
false, // passive
true, // durable
false, // exclusive
false, // auto_delete
false, // nowait
new AMQPTable([
'x-dead-letter-exchange' => 'master',
'x-message-ttl' => 30 * 1000,
])
);
複製代碼
在RabbitMQ的管理界面中,Queues部分能夠看到咱們建立的三個隊列
查看隊列的詳細信息,咱們能夠看到 queueName@retry 隊列與其它兩個隊列的不一樣
建立完隊列以後,須要將隊列與Exchange綁定(bind
),不一樣隊列須要綁定到以前建立的對應的Exchange上面
Queue | Exchange |
---|---|
[queue_name] | master |
[queue_name]@retry | master.retry |
[queue_name]@failed | master.failed |
綁定時,須要提供訂閱的路由KEY,該路由KEY與消息發佈時的路由KEY對應,區別是這裏可使用通配符同時訂閱多種類型的消息。
參數 | 值 | 說明 |
---|---|---|
queue | - | 綁定的隊列 |
exchange | - | 綁定的Exchange |
routing-key | - | 訂閱的消息路由規則 |
no-wait | false | 該方法須要應答確認 |
Java代碼
// 綁定監聽隊列到Exchange
channel.queueBind(queueName, "master", routingKey);
channel.queueBind(queueName + "@failed", "master.failed", routingKey);
channel.queueBind(queueName + "@retry", "master.retry", routingKey);
複製代碼
PHP代碼
$this->channel->queue_bind($queueName, 'master', $routingKey);
$this->channel->queue_bind($retryQueueName, 'master.retry', $routingKey);
$this->channel->queue_bind($failedQueueName, 'master.failed', $routingKey);
複製代碼
在RabbitMQ的管理界面中,咱們能夠看到該隊列與Exchange和routing-key的綁定關係
使用 basic_consume
對消息進行消費的時候,須要注意下面參數
參數 | 值 | 說明 |
---|---|---|
queue | - | 消費的隊列名稱 |
consumer-tag | - | 消費者標識,留空便可 |
no_local | false | 若是設置了該字段,服務器將不會發布消息到 發佈它的客戶端 |
no_ack | false | 須要消費確認應答 |
exclusive | false | 排他訪問,設置後只容許當前消費者訪問該隊列 |
nowait | false | 該方法須要應答確認 |
消費端在消費消息時,須要從消息中獲取消息被消費的次數,以此判斷該消息處理失敗時重試仍是發送到失敗隊列。
Java代碼
protected Long getRetryCount(AMQP.BasicProperties properties) {
Long retryCount = 0L;
try {
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (headers.containsKey("x-death")) {
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths.size() > 0) {
Map<String, Object> death = deaths.get(0);
retryCount = (Long) death.get("count");
}
}
}
} catch (Exception e) {}
return retryCount;
}
複製代碼
PHP代碼
protected function getRetryCount(AMQPMessage $msg): int {
$retry = 0;
if ($msg->has('application_headers')) {
$headers = $msg->get('application_headers')->getNativeData();
if (isset($headers['x-death'][0]['count'])) {
$retry = $headers['x-death'][0]['count'];
}
}
return (int)$retry;
}
複製代碼
消息消費完成後,須要發送消費確認消息給服務端,使用basic_ack
方法
ack(delivery-tag=消息的delivery-tag標識)
複製代碼
Java代碼
// 消息消費處理
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
...
// 注意,因爲使用了basicConsume的autoAck特性,所以這裏就不須要手動執行
// channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 執行消息消費處理
channel.basicConsume(
queueName,
true, // autoAck
consumer
);
複製代碼
PHP代碼
$this->channel->basic_consume(
$queueName,
'', // customer_tag
false, // no_local
false, // no_ack
false, // exclusive
false, // nowait
function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {
...
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
複製代碼
若是消息處理中出現異常,應該將該消息從新投遞到重試Exchange,等待下次重試
basic_publish(msg, 'master.retry', routing-key)
ack(delivery-tag) // 不要忘記了應答消費成功消息
複製代碼
若是判斷重試次數大於3次,仍然處理失敗,則應該講消息投遞到失敗Exchange,等待人工處理
basic_publish(msg, 'master.failed', routing-key)
ack(delivery-tag) // 不要忘記了應答消費成功消息
複製代碼
必定不要忘記ack消息,由於重試、失敗都是經過將消息從新投遞到重試、失敗Exchange來實現的,若是忘記ack,則該消息在超時或者鏈接斷開後,會從新被從新投遞給消費者,若是消費者依舊沒法處理,則會形成死循環。
Java代碼
try {
String message = new String(body, "UTF-8");
// 消息處理函數
handler.handle(message, envelope.getRoutingKey());
} catch (Exception e) {
long retryCount = getRetryCount(properties);
if (retryCount > 3) {
// 重試次數大於3次,則自動加入到失敗隊列
channel.basicPublish("master.failed", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body);
} else {
// 重試次數小於3,則加入到重試隊列,30s後再重試
channel.basicPublish("master.retry", envelope.getRoutingKey(), properties, body);
}
}
複製代碼
若是任務重試三次仍未成功,則會被投遞到失敗隊列,這時候須要人工處理程序異常,處理完畢後,須要將消息從新投遞到隊列進行處理,這裏惟一須要作的就是從失敗隊列訂閱消息,而後獲取到消息後,清空其application_headers
頭信息,而後從新投遞到master
這個Exchange便可。
Java代碼
channel.basicPublish(
'master',
envelope.getRoutingKey(),
MessageProperties.PERSISTENT_BASIC,
body
);
複製代碼
PHP代碼
$msg->set('application_headers', new AMQPTable([]));
$this->channel->basic_publish(
$msg,
'master',
$msg->get('routing_key')
);
複製代碼
隊列和Exchange以及發佈訂閱的關係咱們就說完了,那麼使用起來是什麼效果呢?這裏咱們以Java代碼爲例
// 發佈消息
Publisher publisher = new Publisher(factory.newConnection(), 'master');
publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create");
// 訂閱消息
new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)
.init("user-monitor", "user.*")
.subscribe((message, routingKey) -> {
// TODO 業務邏輯
System.out.printf(" <%s> message consumed: %s\n", routingKey, message);
}
);
複製代碼
使用RabbitMQ時,實現延時重試和失敗隊列的方式並不只僅侷限於本文中描述的方法,若是讀者有更好的實現方案,歡迎拍磚,在這裏我也只是拋磚引玉了。本文中講述的方法還有不少優化空間,讀者也能夠試着去改進其實現方案,好比本文中使用了三個Exchagne,是否只使用一個Exchange也能實現本文中所講述的功能。
本文將會持續修正和更新,最新內容請參考個人 GITHUB 上的 程序猿成長計劃 項目,歡迎 Star,更多精彩內容請 follow me。