考慮一個分佈式場景中一個常見的場景:服務A執行某個數據庫操做成功後,會發送一條消息到消息隊列,如今但願只有數據庫操做執行成功才發送這條消息。下面是一些常見的做法:數據庫
1. 先執行數據庫操做,再發送消息網絡
public void purchaseOrder() { orderDao.save(order); messageQueue.send(message); }
有可能order新增成功,發送消息失敗。最終造成不一致狀態。分佈式
2. 先發送消息,再執行數據庫操做spa
public void purchaseOrder() { messageQueue.send(message); orderDao.save(order); }
有可能消息發送成功,而order新增失敗,從而造成不一致狀態。code
3. 在數據庫事務中,先發送消息,再執行數據庫操做blog
@Transactional public void purchaseOrder() { messageQueue.send(message); orderDao.save(order); }
這裏一樣沒法保證一致性。若是數據庫操做成功,然而消息已經發送了,沒法進行回滾。隊列
4. 在數據庫事務中,先執行數據庫操做,再發送消息事務
@Transactional public void purchaseOrder() { orderDao.save(order); messageQueue.send(message); }
這種方案成功與否,取決於消息隊列是否擁有應答機制和事務機制。get
應答機制表示producer發送消息後,消息隊列可以返回response從而證實消息是否插入成功。kafka
若是消息隊列擁有應答機制,將上面的代碼改寫爲:
@Transactional public void purchaseOrder() { orderDao.save(order); try{ kafkaProducer.send(message).get(); } catch(Exception e) throw new RuntimeException("Fail to send message"); }
這段代碼表示若是發送發收到消息隊列錯誤的response,就拋出一個RuntimeException。那麼消息發送失敗,可以形成數據庫操做的回滾。這個方案看似可行,然而存在這樣一種狀況,若是消息發送成功,而消息隊列因爲網絡緣由沒有即時返回response,此時消息發送方因爲沒有及時收到應答從而認爲消息發送失敗了,所以消息發送方的數據庫事務回滾了,然而消息的確已經插入成功,從而形成了最終不一致性。
上面的不一致性能夠經過消息的事務機制解決。
事務機制表示消息隊列中的消息是否擁有狀態,從而決定消費者是否消費該條消息。
Alibaba旗下的開源消息隊列RocketMQ以高可用性聞名,它是最先支持事務消息的消息隊列。Kafka從版本0.11開始也支持了事務機制。
RoketMQ的事務機制是將消息標記爲Prepared狀態或者Confirmed狀態。處於Prepared狀態的消息對consumer不可見。
而Kafka經過Transaction Marker將消息標記爲Uncommited或Commited狀態。Consumer經過配置isolation-level
爲read_committed
或read_uncommitted
來決定對哪一種類型的消息可見。
5. 消息隊列不支持事務消息
若是消息隊列不支持事務消息,那麼咱們的解決方案是,新增一張message表,並開啓一個定時任務掃描這張message表,將全部狀態爲prepared的message發送給消息隊列,發送成功後,將message狀態置爲confirmed。
代碼以下:
@Transactional public void purchaseOrder() { orderDao.save(order); messageService.save(message); }
此時插入order和插入message的邏輯處於同一個數據庫事務,經過後臺的定時程序不斷掃描message表,所以必定可以保證消息被成功投遞到消息消費方。
這個方案存在的一個問題是,有可能後臺任務發送消息成功後宕機了,從而沒有來得及將已發送的message狀態置爲confirmed。所以下一次掃描message表時,會重複發送該條消息。這就是at least once delivery。
因爲at least once delivery的特性,consumer有可能收到重複的數據。此時能夠在consumer端創建一張message_consume表,來判斷消息是否已經消費過,若是已經消費過,那麼就直接丟棄該消息。