分佈式事務柔性事務解決方案:可靠消息最終一致性(異步確保型) —— 3、生產者實戰

建議簡單看看上一篇文章再往下閱讀java

咱們的項目就基於這個模型:git

這裏寫圖片描述

接下來就到了咱們的實戰時刻~github

項目基於spring cloud編寫,沒有spring cloud基礎看起來可能有一點點費力。web

準備階段:定義可靠消息接口

package com.anur.messageapi.api;


import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

import java.util.Map;

/**
 * Created by Anur IjuoKaruKas on 2018/5/8
 */
public interface TransactionMsgApi {

    /**
     * 預發送消息,先將消息保存到消息中心
     */
    @RequestMapping(value = "prepare", method = RequestMethod.GET)
    int prepareMsg(
            @RequestParam("id") String id,
            @RequestParam("msg") String msg,
            @RequestParam("routingKey") String routingKey,
            @RequestParam("exchange") String exchange,
            @RequestParam("paramMap") String paramMap,
            @RequestParam("artist") String artist);

    /**
     * 生產者確認消息可投遞
     */
    @RequestMapping(value = "confirm", method = RequestMethod.GET)
    int confirmMsgToSend(@RequestParam("id") String id, @RequestParam("caller") String caller);

    /**
     * 向隊列投遞消息
     */
    @RequestMapping(value = "send", method = RequestMethod.GET)
    void sendMsg(@RequestParam("id") String id);

    /**
     * 消費者確認消費成功
     */
    @RequestMapping(value = "ack", method = RequestMethod.GET)
    int acknowledgement(@RequestParam("id") String id,
                        @RequestParam("artist") String artist);
}

咱們先忽略後面的兩個接口,先看第一個,一共有六個參數spring

  • id:消息的id,這個設計其實很自由,能夠在可靠消息服務中生成,也能夠在生產者端生成,本項目選擇在生產者端生成。
  • msg:消息的主體,能夠是普通的字符串,也能夠是對象
  • routingKey:路由鍵,發送消息時用(不懂的能夠去看看MQ基礎
  • exchange:交換器,發送消息時用(不懂的能夠去看看MQ基礎
  • paramMap:可靠消息服務回查時用,好比說我一個消息發送到可靠消息服務,結果沒確認,可靠消息服務就根據這個paramMap進行消息的回查,向生產者查詢這個業務到底執行成功了沒。
  • artist:回調(回查)地址,在springCloud中,其實就是serverName

具體場景解析:訂單服務

1、建立預發送消息,並將其保存到數據庫

咱們首先生成一條消息,咱們往paramMap中指定了,咱們這個訂單的訂單id是orderId,消息內容我瞎寫的,這條消息要保存到數據庫(它的做用是保證消息必定被可靠消息接收並持久化)數據庫

String routingKey = "test.key.testing";
        Map<String, String> map = new HashMap<>();

        String orderId = UUID.randomUUID().toString() + System.currentTimeMillis();
        map.put("id", orderId);
        String mapStr = JSON.toJSONString(map);

        TestMsg testMsg = new TestMsg();
        testMsg.setContent("這是一條測試消息");
        String testMsgStr = JSON.toJSONString(testMsg);
        // ===============================

		// 要保存到數據庫(它的做用是保證消息必定被可靠消息接收並持久化)
        PrepareMsg prepareMsg = prepareMsgService.genMsg(orderId, testMsgStr, routingKey, Constant.TEST_EXCHANGE, mapStr);

2、異步發送這條消息,將其標記爲預發送

異步發送了一條**【預發送】**消息給消息可靠消息服務api

Future<Integer> future = prepareMsgService.prepareMsg(prepareMsg);


// 下面是prepareMsg的實現

    @Async
    @Override
    public Future<Integer> prepareMsg(PrepareMsg prepareMsg) {
    // 調用咱們剛纔在【準備階段】定義的接口
        int result = transactionMsgService.prepareMsg(prepareMsg.getId(), prepareMsg.getMsg(), prepareMsg.getRoutingKey(), prepareMsg.getExchange(), prepareMsg.getParamMap(), artistConfiguration.getArtist());
    // 若是調用成功,刪除剛纔本地保存的數據庫
        if (result == 1) {
            prepareMsgMapper.deleteByPrimaryKey(prepareMsg.getId());
        }
        return new AsyncResult<>(result);
    }

3、執行業務

你能夠把下面那些想象成處理訂單狀態,上面的這個步驟是有事務的,也就是說:app

  • 若是執行失敗,咱們的可靠消息服務只會收到一條預發送的消息,保證了操做的原子性。
  • 或者執行成功,但沒有及時向可靠消息服務發送,這種狀況往下看,先忽略它。
///////////// 事務
        ProviderOrder providerOrder = new ProviderOrder();
        providerOrder.setId(orderId);
        providerOrderService.save(providerOrder);
        ///////////// 事務

4、異步告知可靠消息服務,業務處理成功,將剛纔預發送的消息標記爲待發送

// 確認消息能夠被髮送
        if (future.get() == 1) {
            prepareMsgService.confirmMsgToSend(orderId, this.getClass().getSimpleName());
        }

Extra、異常狀況

######一、執行成功,但沒有及時向可靠消息服務發送通知。dom

這時候咱們的artist和paramMap就發揮做用了,咱們的可靠消息服務,能夠拿着這兩個東西,定時向生產者查詢那些沒有被標記爲【待發送】的消息。好比說這樣:異步

// 這裏是可靠消息服務
        String url = String.format("http://%s/check?", transactionMsg.getCreater());
        Map<String, String> paramMap = JSON.parseObject(transactionMsg.getParamMap(), new TypeReference<HashMap<String, String>>() {
        });

        StringBuilder sb = new StringBuilder();

        for (Map.Entry<String, String> stringStringEntry : paramMap.entrySet()) {
            sb.append(stringStringEntry.getKey()).append("=").append(stringStringEntry.getValue()).append("&");
        }

        sb.deleteCharAt(sb.length() - 1);
        
        // 結果爲true,表明這條消息的業務執行成功了,可自助將消息狀態標記爲【待發送】
        // 反之執行失敗
        resultBoolean = restTemplate.getForObject(url + sb, boolean.class);

######二、執行失敗,也沒有及時向可靠消息服務發送通知。

這個狀況並不影響,由於可靠消息服務會回查,發現消息沒有執行成功,不會將消息投遞出去。

這裏要注意,每條消息最好設置一個查詢次數的限制

######三、預發送失敗,業務執行成功

這時候咱們在第一步事先存儲的消息就發揮做用了,這裏只要寫一個定時任務,向可靠消息服務定時投遞便可。這裏要注意可靠消息服務的冪等性。

因爲消息id是由生產者指定,因此即便可靠消息服務收到了重複的建立【預發送】的消息,插入數據庫也是會失敗的。

@Scheduled(cron = "*/1 * * * * *")
    public void checkPrepareMsg() {
        List<PrepareMsg> prepareMsgList = prepareMsgService.getUnConfirmList();
        if (prepareMsgList.size() > 0) {
            System.out.println("消息重發中");
        }
        for (PrepareMsg prepareMsg : prepareMsgList) {
            prepareMsgService.prepareMsg(prepareMsg);
        }
    }

Github -- > 可靠消息服務 example

相關文章
相關標籤/搜索