MQ消息投遞,MQ服務器宕機致使丟失,rabbitMq經過durable參數持久化,也會機率上產生丟失。java
生產端丟數據場景,例如生產者將數據推送RabbitMq時,因網絡緣由致使數據丟失redis
rabbitmq丟數據,例如沒有開啓持久化,rabbitmq重啓致使丟數據。或者開啓持久化,在持久化到磁盤過程當中掛了。算法
消費端丟數據場景,例如消費端消費過程當中掛了,rabbitmq認爲消費了並刪除,致使丟數據。sql
將queue、exchange、message都持久化,但不能保證100%不丟失數據,消息持久化解決由於服務器異常奔潰致使的消息丟失。數據庫
queue持久化編程
//durable=true,實現queue的持久化
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);
複製代碼
//Channel類中queueDeclare的完整定義以下:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue(隊列名稱)the name of the queue
* @param durable(持久化)true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive(排他隊列) true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete(自動刪除) true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
exclusive:排他隊列,若是一個隊列被聲明爲排他隊列,該隊列僅對首次申明它的鏈接可見,並在鏈接斷開時自動刪除。
這裏須要注意三點:
1. 排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是能夠同時訪問同一鏈接建立的排他隊列;
2.「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣;
3.即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
autoDelete:自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
複製代碼
exchange持久化緩存
若是不設置exchange的持久化對消息的可靠性來講沒有什麼影響,可是一樣若是exchange不設置持久化,那麼當broker服務重啓以後,exchange將不復存在,那麼既而發送方rabbitmq producer就沒法正常發送消息。服務器
//durable=true,持久化
channel.exchangeDeclare(exchangeName, 「direct/topic/header/fanout」, true);
複製代碼
//exchangeDeclare的完整定義以下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
複製代碼
message持久化markdown
queue隊列持久化爲true,但message沒有持久化,重啓後message仍是會丟失。 須要queue和message都設置持久化,broker服務重啓後,隊列存在,消息也存在。網絡
//MessageProperties.PERSISTENT_TEXT_PLAIN 爲消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
複製代碼
//basicPublish完整定義以下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
exchange表示exchange的名稱
routingKey表示routingKey的名稱
body表明發送的消息體
//BasicProperties定義以下,deliveryMode=1表明不持久化,deliveryMode=2表明持久化
public BasicProperties(
String contentType,//消息類型如:text/plain
String contentEncoding,//編碼
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//優先級
String correlationId,
String replyTo,//反饋隊列
String expiration,//expiration到期時間
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)
//MessageProperties.PERSISTENT_TEXT_PLAIN定義以下:其中deliveryMode=2表示持久化
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);
複製代碼
消息推送到rabbitmq後,先保存到cache中,而後異步刷入到磁盤中。
消息何時刷到磁盤?
寫入文件前會有一個Buffer,大小爲1M,數據在寫入文件時,首先會寫入到這個Buffer,若是Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。 有個固定的刷盤時間:25ms,也就是無論Buffer滿不滿,每一個25ms,Buffer裏的數據及未刷新到磁盤的文件內容一定會刷到磁盤。 每次消息寫入後,若是沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操做。
在producer引入事務機制或者Confirm機制來確保消息已經正確的發送至broker端。 RabbitMQ的可靠性涉及producer端的確認機制、broker端的鏡像隊列的配置以及consumer端的確認機制,要想確保消息的可靠性越高,那麼性能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取捨。
消息持久化解決由於服務器異常奔潰致使的消息丟失,但不能解決發佈者將消息發送以後,消息有沒有正確到達broker代理服務器。若是在消息到達broker以前已經丟失的話,持久化操做也解決不了這個問題,由於消息根本就沒到達代理服務器,你怎麼進行持久化,那麼這個問題該怎麼解決呢?
這時RabbitMQ爲咱們提供了兩種方式:
rabbitMQ事務機制三個方法:
在經過txSelect開啓事務以後,咱們即可以發佈消息給broker代理服務器了,若是txCommit提交成功了,則消息必定到達了broker了,若是在txCommit執行以前broker異常崩潰或者因爲其餘緣由拋出異常,這個時候咱們即可以捕獲異常經過txRollback回滾事務了。
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
複製代碼
使用事務機制的話會下降RabbitMQ的性能,可是RabbitMQ提供了一個更好的方案,即將channel信道設置成confirm模式。
producer端confirm模式的實現原理
生產者將信道設置成confirm模式,在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理。
confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。
在channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm又被nack 。
已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的。
編程模式
客戶端實現生產者confirm有三種編程方式:
第1種
\\普通confirm模式最簡單,publish一條消息後,等待服務器端confirm,若是服務端返回false或者超時時間內未返回,客戶端進行消息重傳。
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
複製代碼
第二種
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
複製代碼
第三種
異步confirm模式的編程實現最複雜,Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Chanel發出的消息序號),咱們須要本身爲每個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好採用有序集合SortedSet存儲結構。實際上,SDK中的waitForConfirms()方法也是經過SortedSet維護消息序號的。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
複製代碼
性能對比
性能由低到高:事務模式(tx) < 普通confirm模式 < 批量confirm模式 < 異步confirm模式
爲了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,能夠指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(和磁盤,若是是持久化消息的話)中移去消息。不然,RabbitMQ會在隊列中消息被消費後當即刪除它。
採用消息確認機制後,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔憂處理消息過程當中消費者進程掛掉後消息丟失的問題,由於RabbitMQ會一直持有消息直到消費者顯式調用basicAck爲止。
當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分紅了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,可是尚未收到消費者ack信號的消息。若是服務器端一直沒有收到消費者的ack信號,而且消費此消息的消費者已經斷開鏈接,則服務器端會安排該消息從新進入隊列,等待投遞給下一個消費者(也可能仍是原來的那個消費者)。
RabbitMQ不會爲未ack的消息設置超時時間,它判斷此消息是否須要從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開。這麼設計的緣由是RabbitMQ容許消費者消費一條消息的時間能夠好久好久。
RabbitMQ管理平臺界面上能夠看到當前隊列中Ready狀態和Unacknowledged狀態的消息數,分別對應上文中的等待投遞給消費者的消息數和已經投遞給消費者可是未收到ack信號的消息數。也能夠經過命令行來查看上述信息:
代碼示例(關閉自動消息確認,進行手動ack):
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(ConfirmConfig.queueName, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
// do something with msg.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
複製代碼
broker將在下面的狀況中對消息進行confirm:
- broker發現當前消息沒法被路由到指定的queues中(若是設置了mandatory屬性,則broker會發送basic.return)
- 非持久屬性的消息到達了其所應該到達的全部queue中(和鏡像queue中)
- 持久消息到達了其所應該到達的全部queue中(和鏡像中),並被持久化到了磁盤(fsync)
- 持久消息從其所在的全部queue中被consume了(若是必要則會被ack)
basicRecover:是路由不成功的消息可使用recovery從新發送到隊列中。
basicReject:是接收端告訴服務器這個消息我拒絕接收,不處理,能夠設置是否放回到隊列中仍是丟掉,並且只能一次拒絕一個消息,官網中有明確說明不能批量拒絕消息,爲解決批量拒絕消息纔有了basicNack。
basicNack:能夠一次拒絕N條消息,客戶端能夠設置basicNack方法的multiple參數爲true,服務器會拒絕指定了delivery_tag的全部未確認的消息(tag是一個64位的long值,最大值是9223372036854775807)。
RabbitMQ的mirrored-queue即鏡像隊列,這個至關於配置了副本,當master在此特殊時間內crash掉,能夠自動切換到slave,這樣有效的保障了HA, 除非整個集羣都掛掉,這樣也不能徹底的100%保障RabbitMQ不丟消息,但比沒有mirrored-queue的要好不少,不少現實生產環境下都是配置了mirrored-queue的。
消息提早持久化+定時任務
上圖流程:
(1)訂單服務生產者在投遞消息以前,先把消息持久化到Redis或DB中,建議Redis,高性能。消息的狀態爲發送中。
(2)confirm機制監聽消息是否發送成功?如ack成功消息,刪除Redis中此消息。
(3)若是nack不成功的消息,這個能夠根據自身的業務選擇是否重發此消息。也能夠刪除此消息,由本身的業務決定。
(4)這邊加了個定時任務,來拉取隔必定時間了,消息狀態仍是爲發送中的,這個狀態就代表,訂單服務是沒有收到ack成功消息。
(5)定時任務會做補償性的投遞消息。這個時候若是MQ回調ack成功接收了,再把Redis中此消息刪除。
根據業務規則,設置表字段惟一性。
借鑑數據庫的樂觀鎖機制,根據version版本,也就是在操做庫存前先獲取當前商品的version版本號,而後操做的時候帶上此version號。咱們梳理下,咱們第一次操做庫存時,獲得version爲1,調用庫存服務version變成了2;但返回給訂單服務出現了問題,訂單服務又一次發起調用庫存服務,當訂單服務傳遞的version仍是1,再執行上面的sql語句時,就不會執行;由於version已經變爲2了,where條件就不成立。這樣就保證了無論調用幾回,只會真正的處理一次。
update table set count=count-1,version=version+1 where id=2 and version=1
複製代碼
如果是分佈是系統,構建全局唯一索引比較困難,例如唯一性的字段沒法確定,這時候可以引入分佈式鎖,通過第三方的系統(redis或zookeeper),在業務系統插入數據或者更新數據,獲取分佈式鎖,然後做操作,之後釋放鎖,這樣其實是把多線程併發的鎖的思路,引入多多個系統,也就是分佈式系統中得解決思路。要點:某個長流程處理過程要求不能併發執行,可以在流程執行之前根據某個標誌(用戶ID+後綴等)獲取分佈式鎖,其他流程執行時獲取鎖就會失敗,也就是同一時間該流程只能有一個能執行成功,執行完成後,釋放分佈式鎖(分佈式鎖要第三方系統提供);
惟一ID:如數據庫的主鍵id
指紋碼:業務規則標識惟一的。如時間戳+銀行返回的惟一碼。須要注意的是,這個指紋碼不必定就是咱們系統生產的,多是咱們本身業務規則或者是外部返回的一些規則通過拼接後的東西。其目的:就是爲了保障這次操做達到絕對惟一的。
惟一ID+指紋碼機制,利用數據庫主鍵去重。如:
Select count(id) from table where id = 惟一ID+指紋碼
複製代碼
好處:實現簡單,就一個拼接,然後查詢判斷是否重複。
壞處:高併發下如果是單個數據庫就會有寫入性能瓶頸
解決方案:根據 ID 進行分庫分表,對 id 進行算法路由,落到一個具體的數據庫,然後當這個 id 第二次來又會落到這個數據庫,這時候就像我單庫時的查重一樣了。利用算法路由把單庫的冪等變成多庫的冪等,分攤數據流量壓力,提高性能。
相信大家都知道 redis 的原子性操作,我這裏就不需要過多介紹了。性能
使用 redis 的原子性去實現需要考慮兩個點
一是 是否 要進行數據落庫,如果落庫的話,關鍵解決的問題是數據庫和緩存如何做到原子性? 數據庫與緩存進行同步肯定要進行寫操作,到底先寫 redis 還是先寫數據庫,這是個問題,涉及到緩存更新與淘汰的問題
二是如果不落庫,那麼都存儲到緩存中,如何設置定時同步的策略? 不入庫的話,可以使用雙重緩存等策略,保障一個消息副本,具體同步可以使用類似 databus 這種同步工具。
摘抄