若是要保證順序消費,那麼他的核心點就是:生產者有序存儲
、消費者有序消費
。java
無序消息
無序消息也指普通的消息,Producer 只管發送消息,Consumer 只管接收消息,至於消息和消息之間的順序並無保證。網絡
舉例
Producer 依次發送 orderId 爲 一、二、3 的消息,Consumer 接到的消息順序有多是 一、二、3,也有多是 二、一、3 等狀況,這就是普通消息。併發
對於指定的一個 Topic,全部消息按照嚴格的先入先出(FIFO)的順序進行發佈和消費。app
舉例
好比 Producer 發送orderId 1,3,2 的消息, 那麼 Consumer 也必需要按照 1,3,2 的順序進行消費。異步
在實際開發有些場景中,我並不須要消息徹底按照徹底按的先進先出,而是某些消息保證先進先出就能夠了。分佈式
就比如一個訂單涉及 訂單生成
,訂單支付
、訂單完成
。我不用管其它的訂單,只保證一樣訂單ID能保證這個順序
就能夠了。ide
咱們知道 生產的message最終會存放在Queue中,若是一個Topic關聯了16個Queue,若是咱們不指定消息往哪一個隊列裏放,那麼默認是平均分配消息到16個queue,函數
比如有100條消息,那麼這100條消息會平均分配在這16個Queue上,那麼每一個Queue大概放5~6個左右。這裏有一點很重的是:測試
同一個queue,存儲在裏面的message 是按照先進先出的原則
線程
這個時候思路就來了,比如有orderId=1的3條消息,分別是 訂單生產、訂單付款、訂單完成。只要保證它們放到同一個Queue那就保證消費者先進先出了。
這就保證局部順序了,即同一訂單按照前後順序放到同一Queue,那麼取消息的時候就能夠保證先進先取出。
那麼全局消息呢?
這個就簡單啦,你把全部消息都放在一個Queue裏,這樣不就保證全局消息了。
就這麼簡單
固然不是,這裏還有很關鍵的一點,比如在一個消費者集羣的狀況下,消費者1先去Queue拿消息,它拿到了 訂單生成,它拿完後,消費者2去queue拿到的是 訂單支付。
拿的順序是沒毛病了,但關鍵是先拿到不表明先消費完它。會存在雖然你消費者1先拿到訂單生成,但因爲網絡等緣由,消費者2比你真正的先消費消息。這是否是很尷尬了。
訂單付款仍是可能會比訂單生成更早消費的狀況。那怎麼辦。
分佈式鎖來了
Rocker採用的是分段鎖,它不是鎖整個Broker而是鎖裏面的單個Queue,由於只要鎖單個Queue就能夠保證局部順序消費了。
因此最終的消費者這邊的邏輯就是
消費者1去Queue拿 訂單生成,它就鎖住了整個Queue,只有它消費完成並返回成功後,這個鎖纔會釋放。
而後下一個消費者去拿到 訂單支付 一樣鎖住當前Queue,這樣的一個過程來真正保證對同一個Queue可以真正意義上的順序消費,而不只僅是順序取出。
全局順序與分區順序對比
消息類型對比
發送方式對比
其它的注意事項
一、順序消息暫不支持廣播模式。 二、順序消息不支持異步發送方式,不然將沒法嚴格保證順序。 三、建議同一個 Group ID 只對應一種類型的 Topic,即不一樣時用於順序消息和無序消息的收發。 四、對於全局順序消息,建議建立實例個數 >=2。
這裏保證兩點
一、生產端 同一orderID的訂單放到同一個queue。 二、消費端 同一個queue取出消息的時候鎖住整個queue,直到消費後再解鎖。
@AllArgsConstructor @Data @ToString public class ProductOrder { /** * 訂單編號 */ private String orderId; /** * 訂單類型(訂單建立、訂單付款、訂單完成) */ private String type; }
生產者和以前發送普通消息最大的區別,就是針對每個message都手動經過MessageQueueSelector
選擇好queue。
@RestController public class Product { private static List<ProductOrder> orderList = null; private static String producerGroup = "test_producer"; /** * 模擬數據 */ static { orderList = new ArrayList<>(); orderList.add(new ProductOrder("XXX001", "訂單建立")); orderList.add(new ProductOrder("XXX001", "訂單付款")); orderList.add(new ProductOrder("XXX001", "訂單完成")); orderList.add(new ProductOrder("XXX002", "訂單建立")); orderList.add(new ProductOrder("XXX002", "訂單付款")); orderList.add(new ProductOrder("XXX002", "訂單完成")); orderList.add(new ProductOrder("XXX003", "訂單建立")); orderList.add(new ProductOrder("XXX003", "訂單付款")); orderList.add(new ProductOrder("XXX003", "訂單完成")); } @GetMapping("message") public void sendMessage() throws Exception { //示例生產者 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //不開啓vip通道 開通口端口會減2 producer.setVipChannelEnabled(false); //綁定name server producer.setNamesrvAddr("IP:9876"); producer.start(); for (ProductOrder order : orderList) { //一、生成消息 Message message = new Message(JmsConfig.TOPIC, "", order.getOrderId(), order.toString().getBytes()); //二、發送消息是 針對每條消息選擇對應的隊列 SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //三、arg的值其實就是下面傳入 orderId String orderid = (String) arg; //四、由於訂單是String類型,因此經過hashCode轉成int類型 int hashCode = orderid.hashCode(); //五、由於hashCode可能爲負數 因此取絕對值 hashCode = Math.abs(hashCode); //六、保證同一個訂單號 必定分配在同一個queue上 long index = hashCode % mqs.size(); return mqs.get((int) index); } }, order.getOrderId(),50000); System.out.printf("Product:發送狀態=%s, 存儲queue=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType()); } producer.shutdown(); } }
看看生產者有沒有把相同訂單指定到同一個queue
經過測試結果能夠看出:相同訂單已經存到同一queue中了
。
上面說過,消費者真正要達到消費順序,須要分佈式鎖,因此這裏須要將MessageListenerOrderly
替換以前的MessageListenerConcurrently,由於它裏面實現了分佈式鎖。
@Slf4j @Component public class Consumer { /** * 消費者實體對象 */ private DefaultMQPushConsumer consumer; /** * 消費者組 */ public static final String CONSUMER_GROUP = "consumer_group"; /** * 經過構造函數 實例化對象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr("IP:9876"); //TODO 這裏真的是個坑,我product設置VipChannelEnabled(false),但消費者並無設置這個參數,以前發送普通消息的時候也沒有問題。能正常消費。 //TODO 但在順序消息時,consumer一直不消費消息了,找了很久都沒有找到緣由,直到我這裏也設置爲VipChannelEnabled(false),居然才能夠消費消息。 consumer.setVipChannelEnabled(false); //訂閱主題和 標籤( * 表明全部標籤)下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); //註冊消費的監聽 這裏注意順序消費爲MessageListenerOrderly 以前併發爲ConsumeConcurrentlyContext consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { //獲取消息 MessageExt msg = msgs.get(0); //消費者獲取消息 這裏只輸出 不作後面邏輯處理 log.info("Consumer-線程名稱={},消息={}", Thread.currentThread().getName(), new String(msg.getBody())); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); } }
看看消費結果是否是咱們須要的結果
經過測試結果咱們看出
一、消費消息的順序並無徹底按照以前的先進先出,即沒有知足全局順序。 二、同一訂單來說,訂單的 訂單生成、訂單支付、訂單完成 消費順序是保證的。
這是局部保證順序消費就已經知足咱們當前實際開發中的需求了。
有關消費端選擇MessageListenerOrderly
後,consumer.start()啓動相關的源碼能夠參考博客:RocketMQ順序消息消費端源碼
只要本身變優秀了,其餘的事情纔會跟着好起來(上將4)