RocketMQ與MYSQL事務消息整合

一、基礎理論知識篇「兩階段提交」若是你瞭解能夠跳過這段,固然若是你想深刻了解你能夠購買相關書籍或去搜索相關資料閱讀

  兩階段提交分爲 正常提交和異常提交或異常回滾java

       上面是正常提交的示意圖,協調者發起預提交請求,參與者回覆成功以後協調者再次發起commit請求,統一提交事物。事物結束。mysql

       若是這兩階段提交過程中有任何一個請求出現異常就會回滾,以下流程:git

       異常請求包括預提交 返回預提交的應答,commit請求 等任何一個失敗都會致使整個事物回滾。github

  二階段提交的問題 
    二階段提交」還有一個很嚴重的問題就是若是commit過程中失敗了 就致使了所有事物失敗,代價很大,簡單粗暴的處理方式

         還有一個問題是若是 commit過程當中網絡出現問題 commit沒有被整個事物的參與者之一或者多個收到,這個時候就會出現數據不一致現象。spring

 
  可能你們會提到 協調者是誰,參與者又是誰那?

               這裏簡單說下本身的理解sql

         若是在你的應用程序中你是經過 begin等相關操做語句開始的,好比 你使用了spring的@Transactional註解等,mongodb

         那協調者就是你的「應用程序」,參與者就是 mysql或其餘支持事物的數據庫系統數據庫

         若是你就直接向mysql發送了一條sql語句mysql是自行提交的,那協調者和參與者都是mysql數據庫本身apache

二、這裏說下mysql對所謂的「重複數據」提供的相關sql或關鍵字。

       unique 惟一主鍵約束api

              在sql事物中和應用程序中均可以捕獲這個錯誤碼或異常,能夠做爲冪等判斷的一個依據。

       upset 操做,發現惟一主鍵衝忽然後更新相關數據,mongodb有直接使用的sql方法語句

              示例:insert into tablename(column1) values(123) on duplicate key update column1 =column1 +123

        ignore 忽略操做對於多餘的操做直接忽略

              insert ignore into tablename(column1)  values(123)

 

  基礎篇說完不少內容若是想深刻了解能夠本身找資料處理。下面是華麗分割線


三、在咱們原有的認知裏有一個方案就差那麼一點點就能夠大面積使用的。

       咱們以前可能想過怎樣既能發送mq又能寫數據庫,下面這個方案會分接近咱們的願望。

       咱們聽從以下步驟進行代碼處理:

       一、開啓數據庫事物執行相關sql

       二、發送MQ消息

       三、提交數據庫事物

       (注意:以上每一步都是在上一步執行成功以後在執行下一步的)

       根據步驟我畫出了下面的流程圖

 其實這個流程是有一個漏洞的,若是我把上面的流程圖改造爲下面的二階段提交的示意圖就會很明顯的看出來

        不知道你們有麼有發現問題,是否是 各類提交和回滾操做都是針對的數據庫,而不是MQ。commit數據庫事物出現異常就會形成數據不一致現象。

        其實也不用在想有沒有其餘的流程方案能解決分佈式雙寫問題,只要存在多寫問題就存在數據不一致問題的現象,

        因此就出現了3pc Paxos 等協議來解決分佈式事物/一致性的問題。

 

        下面咱們開始介紹怎麼使用mysql和RocketMQ來實現事物問題

         華麗分割線


四、RocketMQ事物消息的過程

       一、發送MQ的事物消息

       二、事物消息發送成功後會同步觸發對應執行本地接口來執行鍼對mysql數據庫的操做

       三、若是有未commit的消息,RocketMQ 的 broker會定時間隔時間來回查數據庫事物是否已經提交完成

五、結合RocketMQ的事物消息與Mysql數據庫事物的實現思想

  若是上面的二階段提交你已經理解了,你會發現我這裏設計的流程(上面圖的流程)有點不太同樣的地方

        什麼地方那?

        MQ事物消息回滾的時候是由於mysql數據庫事物沒有提交成功而致使的,也就是說若是mysql數據庫事務成功了MQ的事務消息是必定要成功的

        不然就會出現事物不一致的現象。

        假如發送MQ的prepare消息成功了,執行mysql事物的操做也成功了,可是恰恰返回給MQ的commit消息丟失了,那這個時候數據庫消息並不會回滾。

  因此就有了回查本地事物消息是否成功的操做,來對MQ的消息作個補償動做實現數據一致性

 

        理解了二階段提交以及RocketMQ的事物實現以後你就能夠本身設計事物相關操做的執行順序了

        (這裏的流程設計以及包括個人代碼實現是以個人理解作出的最佳實踐)

 六、RocketMQ與Mysql事物結合注意事項

       一、若是應用程序承擔協調者的工做就儘可能晚開啓事物和儘可能早的提交數據庫事物,事物中的sql對數據競爭多的sql儘可能靠後

            由於執行數據庫事物會有各類鎖操做,減小鎖的生命週期,數據庫是稀缺資源,你們能省則省

       二、數據庫事物最好設置超時時間,超時以後自動解除,最好不超過1分鐘

       三、MQ默認1分鐘以後回查一次已發送message但未commit的消息,最多回查15次,以後會執行回滾操做

       四、應用程序必定要作好冪等處理(能夠參考上面mysql相關語句實現冪等接口)

       五、網絡不要太差,不然會形成大量的重試,重試就會影響消息的及時性

       六、適用場景

                    單次請求數量小

                    每次請求會有數據產生,而不是查詢產生的數據(好比 insert操做叫生產數據,select操做不生產數據)

                    下游能夠接受必定的延遲(這裏有兩個因素,有應用程序自己和Broker,這裏指broker)

                    下游服務或系統以接收到的消息爲依據作相應的操做

                     MQ消息做爲主要信息傳遞的工具

 

         下面說下具體代碼實現

         華麗分割線


 

七、實戰代碼解析

       首先附上源碼地址 https://github.com/zygfengyuwuzu/springboot-rocketmq-example

       下面將針對關鍵代碼進行講解

       首先介紹一下代碼目錄

 

 

         瞭解了上面的代碼目錄下面說下代碼的執行流程


    首先看事物消息生產者的實例對象建立
package rocketmq_example.mqandmysqltraction.producer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * 生產者和消費者測試的時候記得注掉一中的一個以避免觀察不出效果
 * 
 */
@Component
public class TransactionProducer {
 static Logger logger = LoggerFactory.getLogger(TransactionProducer.class);

 public DefaultMQProducer producer = null;
 
 @Autowired
 TransactionListener transactionListenerImp;

 @PostConstruct
 private void init() throws MQClientException {
  logger.info("MQ事物生產者初始化開始--------------------------------------------------");
  TransactionMQProducer transactionProducer = new TransactionMQProducer("mytestgroup");
  // Producer 組名, 多個 Producer 若是屬於一 個應用,發送一樣的消息,則應該將它們 歸爲同一組
  //transactionProducer.setProducerGroup("mytestgroup");
  // Name Server 地址列表
  transactionProducer.setNamesrvAddr("10.10.6.71:9876;10.10.6.72:9876");
  // 超時時間  這裏必定要大於數據庫事物執行的超時時間
  transactionProducer.setSendMsgTimeout(90000);
  //這個線程池做用就是  mqbroker端回調信息的本地處理線程池
  ExecutorService executorService = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
     @Override
     public Thread newThread(Runnable r) {
      Thread thread = new Thread(r);
      thread.setName("client-transaction-msg-check-thread");
      return thread;
     }
    });
  transactionProducer.setExecutorService(executorService);
  transactionProducer.setTransactionListener(transactionListenerImp);
  producer = transactionProducer;
  producer.start();
  logger.info("MQ事物生產者初始化結束--------------------------------------------------");
 }
 public SendResult send(Message me) throws Exception {
  return producer.send(me);
 }
 /**
  * 發送普通消息
  * @param Topic
  * @param Tags
  * @param body
  * @return
  * @throws Exception
  */
 public SendResult send(String Topic, String Tags, String body) throws Exception {
  Message me = new Message();
  // 標示
  me.setTopic(Topic);
  // 標籤
  me.setTags(Tags);
  // 內容
  me.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));
  return producer.send(me);
 }
 /**
  * 發送普通消息
  * @param Topic
  * @param Tags
  * @param key
  * @param body
  * @return
  * @throws Exception
  */
 public SendResult send(String Topic, String Tags, String key, String body) throws Exception {
  try {
   Message me = new Message(Topic, Tags, key, 0, body.getBytes(RemotingHelper.DEFAULT_CHARSET), true);
   return producer.send(me);
  } catch (Exception e) {
   logger.error("發送MQ信息異常Topic{},Tags{},key{},body{}", Topic, Tags, key, body);
   throw e;
  }
 }
 @PreDestroy
 public void Destroy() {
  producer.shutdown();
 }
}

  上面的代碼咱們接收到請求傳輸過來的數據以後,首先作了MQ消息對象的建立,建立成功以後直接發送MQ事物消息

  事物消息發送成功以後會調用上面設置的接口實現類的TransactionListenerImpl.executeLocalTransaction()這個方法。

  接口實現的方法代碼以下:

package rocketmq_example.mqandmysqltraction.producer;

import java.util.List;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import rocketmq_example.mqandmysqltraction.MyTableModel;
import rocketmq_example.mqandmysqltraction.MytableService;

/**
 * 把數據庫事物嵌套在mq事物當中不能顯示拋出異常
 * 
 * 
 * 
 * 
 * @author zyg
 *
 */
@Component
public class TransactionListenerImpl implements TransactionListener {

 static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);

 @Autowired
 MytableService mytableService;

 /**
  * 必定要設置執行sql時間,儘可能不要超時
  * 
  */
 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  logger.info("開始執行本地數據庫事物  transactionid:{}", msg.getTransactionId());
  LocalTransactionState lts = LocalTransactionState.UNKNOW;
  @SuppressWarnings("unchecked")
  List<MyTableModel> mytablelist = (List<MyTableModel>) arg;
  try {
   long start=System.currentTimeMillis();
   //數據庫事物執行時間不要超過mq回查時間 默認15分鐘
   mytableService.execMytableinsert2(mytablelist, msg.getTransactionId());
   logger.info("執行數據庫事物耗時:{}",System.currentTimeMillis()-start);
   lts = LocalTransactionState.COMMIT_MESSAGE;
  } catch (Exception e) {
   logger.error("數據庫事務異常", e);
   lts = LocalTransactionState.ROLLBACK_MESSAGE;
  }
  logger.info("結束執行本地數據庫事物  transactionid:{} 返回:{}", msg.getTransactionId(),lts);
  return lts;
 }

 /**
  * 去數據庫查詢看看是否存在已經成功發送預提交數據而沒有commit成功的mq信息
  * 每分鐘1次默認15次
  * 
  * 這裏能夠作個計數 讓MQ重試5次/5分鐘就回滾減輕MQ回查的壓力
  * 
  */
 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  if (mytableService.existMyTableModelByMsgid(msg.getTransactionId())) {
   logger.info("查詢到已提交事物 transactionid:{}",msg.getTransactionId());
   return LocalTransactionState.COMMIT_MESSAGE;
  } else {
   logger.info("未查到已提交事物 transactionid:{}",msg.getTransactionId());
   return LocalTransactionState.UNKNOW;
  }

 }

}

     上面代碼有兩個方法,這裏說下兩個方法的做用和執行時間

             executeLocalTransaction這個方法是發送完 事物消息 以後同步被調用到的方法,用來執行本地事物操做

             executeLocalTransaction方法有兩個參數,第一個是發送成功以後的message消息,在這個方法中包含事物ID其實就是msgid

             第二個參數是object類型的是從dataapi傳過來,

             個人代碼中沒作任何處理直接傳遞過來了而後直接轉化傳遞給了service層進行事物處理

             這個executeLocalTransaction方法裏面爲何要直接返回commit或rollback,

             目的是儘可能快的告訴MQ個人數據庫事務執行成功了,

             儘快將half消息轉爲正常消息,已備消費者消費到作業務處理。

             這裏徹底能夠直接返回unknow,等待broker回查來實現commit操做的。可是這樣作對回查消息broker形成必定的壓力。

      上面代碼的第二個方法是提供給broker回調執行的,進行檢查本地事務是否成功執行的操做,發起方是broker

             這裏面咱們接收到broker的回查請求以後直接去數據庫查詢是否存在broker提供的事務ID的數據

             若是存在返回commit標識,若是不存在返回unknow標識以等待下一次再來回查

      到此咱們的一個事務操做就算完成了


    另外你們能夠直接查看service層的實現代碼,就不一一解釋了
package rocketmq_example.mqandmysqltraction;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class MytableService {
 static Logger logger = LoggerFactory.getLogger(MytableService.class);

 @Autowired
 IMytableMapper mytable;

 @Autowired
 ObjectMapper objMapper;

 /**
  * 這裏能夠顯示提交事物 返回boolean 一條一條插入只是爲了展示事物的特性 獲取全部異常 處理你的業務邏輯等等
  * 
  * @param mytablemodels
  * @return
  */
 @Transactional(rollbackFor = Exception.class, timeout = 60000)
 public List<Integer> execMytableinsert2(List<MyTableModel> mytablemodels, String msgid) {

  // logger.info("開始執行數據庫事物");
  List<Integer> result = new ArrayList<Integer>();
  for (MyTableModel myTableModel : mytablemodels) {
   // 插入數據庫
   myTableModel.setMsgid(msgid);
   mytable.insertmytable(myTableModel);
   result.add(myTableModel.getId());
  }
  // logger.info("結束執行數據庫事物");
  return result;
 }

 public boolean existMyTableModelById(Integer id) {
  MyTableModel myTableModel = mytable.selectMyTableModelById(id);
  if (myTableModel != null && null != myTableModel.getId()) {
   return true;
  }
  return false;
 }

 /**
  * 查詢是否存在已經發送過的msgid消息
  * 
  * @param msgid
  * @return
  */
 public boolean existMyTableModelByMsgid(String msgid) {
  int count = mytable.selectMyTableModelByMsgid(msgid);
  if (count > 0) {
   return true;
  }
  return false;
 }

 public void insetmsg(MyTableModel mytablemodel) {
  try {
   mytable.insertmsgrecord(mytablemodel);

  } catch (org.springframework.dao.DuplicateKeyException e) {
   logger.error("主鍵衝突異常被捕獲",e);
  }
 }
}

很是感謝你能看到這裏!!!看到這裏相信你已經對本篇博客的內容有所瞭解了!若是有什麼問題或者想不通的地方歡迎評論區進行討論。

若是有不正確的地方懇請指正

 

相關文章
相關標籤/搜索