建議簡單看看上一篇文章再往下閱讀java
咱們的項目就基於這個模型:git
接下來就到了咱們的實戰時刻~github
項目基於spring cloud編寫,沒有spring cloud基礎看起來可能有一點點費力。web
package com.anur.messageapi.api; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import java.util.Map; /** * Created by Anur IjuoKaruKas on 2018/5/8 */ public interface TransactionMsgApi { /** * 預發送消息,先將消息保存到消息中心 */ @RequestMapping(value = "prepare", method = RequestMethod.GET) int prepareMsg( @RequestParam("id") String id, @RequestParam("msg") String msg, @RequestParam("routingKey") String routingKey, @RequestParam("exchange") String exchange, @RequestParam("paramMap") String paramMap, @RequestParam("artist") String artist); /** * 生產者確認消息可投遞 */ @RequestMapping(value = "confirm", method = RequestMethod.GET) int confirmMsgToSend(@RequestParam("id") String id, @RequestParam("caller") String caller); /** * 向隊列投遞消息 */ @RequestMapping(value = "send", method = RequestMethod.GET) void sendMsg(@RequestParam("id") String id); /** * 消費者確認消費成功 */ @RequestMapping(value = "ack", method = RequestMethod.GET) int acknowledgement(@RequestParam("id") String id, @RequestParam("artist") String artist); }
咱們先忽略後面的兩個接口,先看第一個,一共有六個參數spring
咱們首先生成一條消息,咱們往paramMap中指定了,咱們這個訂單的訂單id是orderId,消息內容我瞎寫的,這條消息要保存到數據庫(它的做用是保證消息必定被可靠消息接收並持久化)數據庫
String routingKey = "test.key.testing"; Map<String, String> map = new HashMap<>(); String orderId = UUID.randomUUID().toString() + System.currentTimeMillis(); map.put("id", orderId); String mapStr = JSON.toJSONString(map); TestMsg testMsg = new TestMsg(); testMsg.setContent("這是一條測試消息"); String testMsgStr = JSON.toJSONString(testMsg); // =============================== // 要保存到數據庫(它的做用是保證消息必定被可靠消息接收並持久化) PrepareMsg prepareMsg = prepareMsgService.genMsg(orderId, testMsgStr, routingKey, Constant.TEST_EXCHANGE, mapStr);
異步發送了一條**【預發送】**消息給消息可靠消息服務api
Future<Integer> future = prepareMsgService.prepareMsg(prepareMsg); // 下面是prepareMsg的實現 @Async @Override public Future<Integer> prepareMsg(PrepareMsg prepareMsg) { // 調用咱們剛纔在【準備階段】定義的接口 int result = transactionMsgService.prepareMsg(prepareMsg.getId(), prepareMsg.getMsg(), prepareMsg.getRoutingKey(), prepareMsg.getExchange(), prepareMsg.getParamMap(), artistConfiguration.getArtist()); // 若是調用成功,刪除剛纔本地保存的數據庫 if (result == 1) { prepareMsgMapper.deleteByPrimaryKey(prepareMsg.getId()); } return new AsyncResult<>(result); }
你能夠把下面那些想象成處理訂單狀態,上面的這個步驟是有事務的,也就是說:app
///////////// 事務 ProviderOrder providerOrder = new ProviderOrder(); providerOrder.setId(orderId); providerOrderService.save(providerOrder); ///////////// 事務
// 確認消息能夠被髮送 if (future.get() == 1) { prepareMsgService.confirmMsgToSend(orderId, this.getClass().getSimpleName()); }
######一、執行成功,但沒有及時向可靠消息服務發送通知。dom
這時候咱們的artist和paramMap就發揮做用了,咱們的可靠消息服務,能夠拿着這兩個東西,定時向生產者查詢那些沒有被標記爲【待發送】的消息。好比說這樣:異步
// 這裏是可靠消息服務 String url = String.format("http://%s/check?", transactionMsg.getCreater()); Map<String, String> paramMap = JSON.parseObject(transactionMsg.getParamMap(), new TypeReference<HashMap<String, String>>() { }); StringBuilder sb = new StringBuilder(); for (Map.Entry<String, String> stringStringEntry : paramMap.entrySet()) { sb.append(stringStringEntry.getKey()).append("=").append(stringStringEntry.getValue()).append("&"); } sb.deleteCharAt(sb.length() - 1); // 結果爲true,表明這條消息的業務執行成功了,可自助將消息狀態標記爲【待發送】 // 反之執行失敗 resultBoolean = restTemplate.getForObject(url + sb, boolean.class);
######二、執行失敗,也沒有及時向可靠消息服務發送通知。
這個狀況並不影響,由於可靠消息服務會回查,發現消息沒有執行成功,不會將消息投遞出去。
這裏要注意,每條消息最好設置一個查詢次數的限制
######三、預發送失敗,業務執行成功
這時候咱們在第一步事先存儲的消息就發揮做用了,這裏只要寫一個定時任務,向可靠消息服務定時投遞便可。這裏要注意可靠消息服務的冪等性。
因爲消息id是由生產者指定,因此即便可靠消息服務收到了重複的建立【預發送】的消息,插入數據庫也是會失敗的。
@Scheduled(cron = "*/1 * * * * *") public void checkPrepareMsg() { List<PrepareMsg> prepareMsgList = prepareMsgService.getUnConfirmList(); if (prepareMsgList.size() > 0) { System.out.println("消息重發中"); } for (PrepareMsg prepareMsg : prepareMsgList) { prepareMsgService.prepareMsg(prepareMsg); } }
Github -- > 可靠消息服務 example