RocketMQ事務消息代碼樣例

本篇主要是分佈式工程中,有些跨數據庫操做的使用樣例,通常可用在分佈式事務上。java

MQ的做用,固然有扛洪峯,消息堆集,異步處理的做用。spring

第一步:添加POM的依賴,版本固然由你本身選擇數據庫

<dependency>
   <groupId>com.alibaba.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>3.2.6</version>
</dependency>

第二步,消息生產者。json

package com.xxx.consumer.mq;

import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.*;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class MQProducer {
   
   private final String GROUP_NAME = "xxxx";
   private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876";
   private TransactionMQProducer producer;
   
   public MQProducer() {
      
      this.producer = new TransactionMQProducer(GROUP_NAME);
      this.producer.setNamesrvAddr(NAMESRV_ADDR);    //nameserver服務
      this.producer.setCheckThreadPoolMinSize(5);    // 事務回查最小併發數
      this.producer.setCheckThreadPoolMaxSize(20);   // 事務回查最大併發數
      this.producer.setCheckRequestHoldMax(2000);    // 隊列數
      //服務器回調Producer,檢查本地事務分支成功仍是失敗
      this.producer.setTransactionCheckListener(new TransactionCheckListener() {
         public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("state -- "+ new String(msg.getBody()));
            return LocalTransactionState.COMMIT_MESSAGE;
         }
      });
      try {
         this.producer.start();
      } catch (MQClientException e) {
         e.printStackTrace();
      }  
   }
   
   public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception {
      return this.producer.queryMessage(topic, key, maxNum, begin, end);
   }
   
   public LocalTransactionState check(MessageExt me){
      LocalTransactionState ls = this.producer.getTransactionCheckListener().checkLocalTransactionState(me);
      return ls;
   }
   
   public void sendTransactionMessage(Message message, LocalTransactionExecuter localTransactionExecuter, Map<String, Object> transactionMapArgs) throws Exception {
      TransactionSendResult tsr = this.producer.sendMessageInTransaction(message, localTransactionExecuter, transactionMapArgs);
      System.out.println("send返回內容:" + tsr.toString());
   }
   
   public void shutdown(){
      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
         public void run() {
            producer.shutdown();
         }
      }));
      System.exit(0);
   }


}

組名和地址固然根據你的實際狀況寫。springboot

第三步,寫一個你要執行的方法,好比你的本項目的一次數據庫執行,或者其餘業務代碼。我這裏要執行的是保存我的信息。服務器

personInfoService.savePersonalInfo(userid, workClass,workCity);

全代碼以下。併發

package com.xxx.consumer.mq;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;
import com.wmq.stub.PersonInfoService;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * 執行本地事務,由客戶端回調
 */

//@Scope("prototype")
@Component("transactionExecuterImpl")
public class TransactionExecuterImpl implements LocalTransactionExecuter {

   @Reference(version = "1.0.0")
   private PersonInfoService personInfoService;
   
   public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
      try {
         //Message Body
         JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class);
         //Transaction MapArgs
         Map<String, Object> mapArgs = (Map<String, Object>) arg;
         
         // --------------------IN PUT---------------------- //
         System.out.println("message body = " + messageBody);
         System.out.println("message mapArgs = " + mapArgs);
         System.out.println("message tag = " + msg.getTags());
         // --------------------IN PUT---------------------- //
         long userid = messageBody.getLong("userid");
         String face = messageBody.getString("face"); //頭像
         String trueName = messageBody.getString("truename"); //姓名
         int gender = messageBody.getInteger("gender"); //性別
         int workClass = messageBody.getInteger("workClass"); //指望職位ID
         int workCity = messageBody.getInteger("workCity");
         personInfoService.savePersonalInfo(userid, workClass,workCity);
         //成功通知MQ消息變動 該消息變爲:<確認發送>
         
         return LocalTransactionState.COMMIT_MESSAGE;
         
         //return LocalTransactionState.UNKNOW;
         
      } catch (Exception e) {
         e.printStackTrace();
         //失敗則不通知MQ 該消息一直處於:<暫緩發送>
         return LocalTransactionState.ROLLBACK_MESSAGE;
         
      }
      
   }
}

注:由於我這裏用的是springboot的dubbo框架,因此框架

@Reference(version = "1.0.0")dom

private PersonInfoService personInfoService;是@Reference而不是@AutoWired異步

可改爲大家本身的注入。事務消息的本質在這段代碼中能夠很清楚,發送一條消息出去,而後判斷該事務是否執行成功,若成功,通知消息能夠發送給消費者,不然該消息暫緩發送。

第四步,在交互代碼裏面調用

好比某一個controller或者其餘地方(。。。。。。。爲大家本身的獲取數據來源的代碼)

try {
    long userid = 。。。。。。。
    String face = 。。。。。。。
    String trueName = 。。。。。。。。
    int gender = 。。。。。。。。
    int workClass = 。。。。。。。。
    int workCity = 。。。。。。。。。
    //構造消息數據
    Message message = new Message();
    //主題
    message.setTopic("user");
    //子標籤
    message.setTags("tag");
    //key
    String uuid = UUID.randomUUID().toString();
    message.setKeys(uuid);
    JSONObject body = new JSONObject();
    body.put("userid", userid);
    body.put("face", face);
    body.put("truename", trueName);
    body.put("gender", gender);
    body.put("workClass", workClass);
    body.put("workCity", workCity);
    message.setBody(FastJsonConvert.convertObjectToJSON(body).getBytes());

    //添加參數
    Map<String, Object> transactionMapArgs = new HashMap<String, Object>();
    this.mQProducer.sendTransactionMessage(message, this.transactionExecuterImpl, transactionMapArgs);

} catch (Exception e) {
    e.printStackTrace();
}

這裏主要是在消息體中獲取參數以及發送消息,這裏的消息要等待事務執行成功才能被消費者得到。比較重要的地方就是消息的主題 message.setTopic("user");注意,生產者和消費者的主題必須相同,不然消費者是拿不到消息的,至於主題是什麼能夠本身定義。

第五步,消費者

消費者通常是寫在分佈式的另一個工程裏面的,並且是不一樣的數據庫,這一點比較重要,由於同一工程用MQ的意義不大(泄洪除外)。

package com.xxx.xxx.mq;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.xxx.model.User;
import com.xxx.stub.authentication.UpdateUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class MQConsumer {
   
   private final String GROUP_NAME = "xxxxxxxx";
   private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876";
   private DefaultMQPushConsumer consumer;
   
   @Autowired
   private UpdateUserService updateUserService;
   
   
   public MQConsumer() {
      try {
         this.consumer = new DefaultMQPushConsumer(GROUP_NAME);
         this.consumer.setNamesrvAddr(NAMESRV_ADDR);
         this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
         this.consumer.subscribe("user", "*");
         this.consumer.registerMessageListener(new Listener());
         this.consumer.start();
         System.out.println("consumer start");
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception {
      long current = System.currentTimeMillis();
      return this.consumer.queryMessage(topic, key, maxNum, begin, end);
   }
   
   class Listener implements MessageListenerConcurrently {
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         MessageExt msg = msgs.get(0);
         try {
            String topic = msg.getTopic();
            //Message Body
            JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class);
            String tags = msg.getTags();
            String keys = msg.getKeys();
            
            System.out.println("服務收到消息, keys : " + keys + ", body : " + new String(msg.getBody(), "utf-8"));
            long userid = messageBody.getLong("userid");
            String face = messageBody.getString("face"); //頭像
            String trueName = messageBody.getString("truename"); //姓名
            int gender = messageBody.getInteger("gender"); //性別
            int workClass = messageBody.getInteger("workClass"); //指望職位ID
            int workCity = messageBody.getInteger("workCity");
            User user = new User();
            user.setUserId(userid);
            user.setFace(face);
            user.setTruename(trueName);
            user.setGender(gender);
            user.setStep(4);
            updateUserService.updateUserInfo(user);
            
            
            
         } catch (Exception e) {    
            e.printStackTrace();
            //重試次數爲3狀況 
            if(msg.getReconsumeTimes() == 3){
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               //記錄日誌
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
         }        
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
   }

}

Listener是一個偵聽類,專門負責偵聽broker的消息是否到達,而且取回this.consumer.subscribe("user", "*");這個主題的消息。取得消息後執行業務代碼updateUserService.updateUserInfo(user);

通常來講,請嚴格測試該業務代碼,由於rocketmq若是這段業務代碼失敗,消息是會從新發送給消費者,從新執行這段代碼直到成功的,以此到達事務的最終一致性,因此你的這段代碼決不能自己就是有Bug的,固然咱們通常會處理3次,並不讓他不停的處理,若是仍是失敗則記錄日誌,咱們須要查看日誌來解決。

相關文章
相關標籤/搜索