本篇主要是分佈式工程中,有些跨數據庫操做的使用樣例,通常可用在分佈式事務上。java
MQ的做用,固然有扛洪峯,消息堆集,異步處理的做用。spring
第一步:添加POM的依賴,版本固然由你本身選擇數據庫
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
第二步,消息生產者。json
package com.xxx.consumer.mq; import com.alibaba.rocketmq.client.QueryResult; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.*; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.util.Map; @Component public class MQProducer { private final String GROUP_NAME = "xxxx"; private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876"; private TransactionMQProducer producer; public MQProducer() { this.producer = new TransactionMQProducer(GROUP_NAME); this.producer.setNamesrvAddr(NAMESRV_ADDR); //nameserver服務 this.producer.setCheckThreadPoolMinSize(5); // 事務回查最小併發數 this.producer.setCheckThreadPoolMaxSize(20); // 事務回查最大併發數 this.producer.setCheckRequestHoldMax(2000); // 隊列數 //服務器回調Producer,檢查本地事務分支成功仍是失敗 this.producer.setTransactionCheckListener(new TransactionCheckListener() { public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("state -- "+ new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }); try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception { return this.producer.queryMessage(topic, key, maxNum, begin, end); } public LocalTransactionState check(MessageExt me){ LocalTransactionState ls = this.producer.getTransactionCheckListener().checkLocalTransactionState(me); return ls; } public void sendTransactionMessage(Message message, LocalTransactionExecuter localTransactionExecuter, Map<String, Object> transactionMapArgs) throws Exception { TransactionSendResult tsr = this.producer.sendMessageInTransaction(message, localTransactionExecuter, transactionMapArgs); System.out.println("send返回內容:" + tsr.toString()); } public void shutdown(){ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } }
組名和地址固然根據你的實際狀況寫。springboot
第三步,寫一個你要執行的方法,好比你的本項目的一次數據庫執行,或者其餘業務代碼。我這裏要執行的是保存我的信息。服務器
personInfoService.savePersonalInfo(userid, workClass,workCity);
全代碼以下。併發
package com.xxx.consumer.mq; import com.alibaba.dubbo.config.annotation.Reference; import com.alibaba.fastjson.JSONObject; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import com.wmq.stub.PersonInfoService; import org.springframework.stereotype.Component; import java.util.Map; /** * 執行本地事務,由客戶端回調 */ //@Scope("prototype") @Component("transactionExecuterImpl") public class TransactionExecuterImpl implements LocalTransactionExecuter { @Reference(version = "1.0.0") private PersonInfoService personInfoService; public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { try { //Message Body JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class); //Transaction MapArgs Map<String, Object> mapArgs = (Map<String, Object>) arg; // --------------------IN PUT---------------------- // System.out.println("message body = " + messageBody); System.out.println("message mapArgs = " + mapArgs); System.out.println("message tag = " + msg.getTags()); // --------------------IN PUT---------------------- // long userid = messageBody.getLong("userid"); String face = messageBody.getString("face"); //頭像 String trueName = messageBody.getString("truename"); //姓名 int gender = messageBody.getInteger("gender"); //性別 int workClass = messageBody.getInteger("workClass"); //指望職位ID int workCity = messageBody.getInteger("workCity"); personInfoService.savePersonalInfo(userid, workClass,workCity); //成功通知MQ消息變動 該消息變爲:<確認發送> return LocalTransactionState.COMMIT_MESSAGE; //return LocalTransactionState.UNKNOW; } catch (Exception e) { e.printStackTrace(); //失敗則不通知MQ 該消息一直處於:<暫緩發送> return LocalTransactionState.ROLLBACK_MESSAGE; } } }
注:由於我這裏用的是springboot的dubbo框架,因此框架
@Reference(version = "1.0.0")dom
private PersonInfoService personInfoService;是@Reference而不是@AutoWired異步
可改爲大家本身的注入。事務消息的本質在這段代碼中能夠很清楚,發送一條消息出去,而後判斷該事務是否執行成功,若成功,通知消息能夠發送給消費者,不然該消息暫緩發送。
第四步,在交互代碼裏面調用
好比某一個controller或者其餘地方(。。。。。。。爲大家本身的獲取數據來源的代碼)
try { long userid = 。。。。。。。 String face = 。。。。。。。 String trueName = 。。。。。。。。 int gender = 。。。。。。。。 int workClass = 。。。。。。。。 int workCity = 。。。。。。。。。 //構造消息數據 Message message = new Message(); //主題 message.setTopic("user"); //子標籤 message.setTags("tag"); //key String uuid = UUID.randomUUID().toString(); message.setKeys(uuid); JSONObject body = new JSONObject(); body.put("userid", userid); body.put("face", face); body.put("truename", trueName); body.put("gender", gender); body.put("workClass", workClass); body.put("workCity", workCity); message.setBody(FastJsonConvert.convertObjectToJSON(body).getBytes()); //添加參數 Map<String, Object> transactionMapArgs = new HashMap<String, Object>(); this.mQProducer.sendTransactionMessage(message, this.transactionExecuterImpl, transactionMapArgs); } catch (Exception e) { e.printStackTrace(); }
這裏主要是在消息體中獲取參數以及發送消息,這裏的消息要等待事務執行成功才能被消費者得到。比較重要的地方就是消息的主題 message.setTopic("user");注意,生產者和消費者的主題必須相同,不然消費者是拿不到消息的,至於主題是什麼能夠本身定義。
第五步,消費者
消費者通常是寫在分佈式的另一個工程裏面的,並且是不一樣的數據庫,這一點比較重要,由於同一工程用MQ的意義不大(泄洪除外)。
package com.xxx.xxx.mq; import com.alibaba.fastjson.JSONObject; import com.alibaba.rocketmq.client.QueryResult; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.xxx.model.User; import com.xxx.stub.authentication.UpdateUserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; @Component public class MQConsumer { private final String GROUP_NAME = "xxxxxxxx"; private final String NAMESRV_ADDR = "xxx.xxx.xxx.xxx:9876"; private DefaultMQPushConsumer consumer; @Autowired private UpdateUserService updateUserService; public MQConsumer() { try { this.consumer = new DefaultMQPushConsumer(GROUP_NAME); this.consumer.setNamesrvAddr(NAMESRV_ADDR); this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); this.consumer.subscribe("user", "*"); this.consumer.registerMessageListener(new Listener()); this.consumer.start(); System.out.println("consumer start"); } catch (Exception e) { e.printStackTrace(); } } public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws Exception { long current = System.currentTimeMillis(); return this.consumer.queryMessage(topic, key, maxNum, begin, end); } class Listener implements MessageListenerConcurrently { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); try { String topic = msg.getTopic(); //Message Body JSONObject messageBody = FastJsonConvert.convertJSONToObject(new String(msg.getBody(), "utf-8"), JSONObject.class); String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("服務收到消息, keys : " + keys + ", body : " + new String(msg.getBody(), "utf-8")); long userid = messageBody.getLong("userid"); String face = messageBody.getString("face"); //頭像 String trueName = messageBody.getString("truename"); //姓名 int gender = messageBody.getInteger("gender"); //性別 int workClass = messageBody.getInteger("workClass"); //指望職位ID int workCity = messageBody.getInteger("workCity"); User user = new User(); user.setUserId(userid); user.setFace(face); user.setTruename(trueName); user.setGender(gender); user.setStep(4); updateUserService.updateUserInfo(user); } catch (Exception e) { e.printStackTrace(); //重試次數爲3狀況 if(msg.getReconsumeTimes() == 3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //記錄日誌 } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } }
Listener是一個偵聽類,專門負責偵聽broker的消息是否到達,而且取回this.consumer.subscribe("user", "*");這個主題的消息。取得消息後執行業務代碼updateUserService.updateUserInfo(user);
通常來講,請嚴格測試該業務代碼,由於rocketmq若是這段業務代碼失敗,消息是會從新發送給消費者,從新執行這段代碼直到成功的,以此到達事務的最終一致性,因此你的這段代碼決不能自己就是有Bug的,固然咱們通常會處理3次,並不讓他不停的處理,若是仍是失敗則記錄日誌,咱們須要查看日誌來解決。