RocketMQ(7)---順序消費

RocketMQ順序消費

若是要保證順序消費,那麼他的核心點就是:生產者有序存儲消費者有序消費java

1、概念

一、什麼是無序消息

無序消息 無序消息也指普通的消息,Producer 只管發送消息,Consumer 只管接收消息,至於消息和消息之間的順序並無保證。網絡

舉例 Producer 依次發送 orderId 爲 一、二、3 的消息,Consumer 接到的消息順序有多是 一、二、3,也有多是 二、一、3 等狀況,這就是普通消息。併發

二、什麼是全局順序

對於指定的一個 Topic,全部消息按照嚴格的先入先出(FIFO)的順序進行發佈和消費app

舉例 好比 Producer 發送orderId 1,3,2 的消息, 那麼 Consumer 也必需要按照 1,3,2 的順序進行消費。異步

三、局部順序

在實際開發有些場景中,我並不須要消息徹底按照徹底按的先進先出,而是某些消息保證先進先出就能夠了。分佈式

就比如一個訂單涉及 訂單生成訂單支付訂單完成。我不用管其它的訂單,只保證一樣訂單ID能保證這個順序就能夠了。ide


2、實現原理

咱們知道 生產的message最終會存放在Queue中,若是一個Topic關聯了16個Queue,若是咱們不指定消息往哪一個隊列裏放,那麼默認是平均分配消息到16個queue,函數

比如有100條消息,那麼這100條消息會平均分配在這16個Queue上,那麼每一個Queue大概放5~6個左右。這裏有一點很重的是:測試

同一個queue,存儲在裏面的message 是按照先進先出的原則線程

這個時候思路就來了,比如有orderId=1的3條消息,分別是 訂單生產訂單付款訂單完成。只要保證它們放到同一個Queue那就保證消費者先進先出了。

這就保證局部順序了,即同一訂單按照前後順序放到同一Queue,那麼取消息的時候就能夠保證先進先取出。

那麼全局消息呢?

這個就簡單啦,你把全部消息都放在一個Queue裏,這樣不就保證全局消息了。

就這麼簡單

固然不是,這裏還有很關鍵的一點,比如在一個消費者集羣的狀況下,消費者1先去Queue拿消息,它拿到了 訂單生成,它拿完後,消費者2去queue拿到的是 訂單支付

拿的順序是沒毛病了,但關鍵是先拿到不表明先消費完它。會存在雖然你消費者1先拿到訂單生成,但因爲網絡等緣由,消費者2比你真正的先消費消息。這是否是很尷尬了。

訂單付款仍是可能會比訂單生成更早消費的狀況。那怎麼辦。

分佈式鎖來了

Rocker採用的是分段鎖,它不是鎖整個Broker而是鎖裏面的單個Queue,由於只要鎖單個Queue就能夠保證局部順序消費了。

因此最終的消費者這邊的邏輯就是

消費者1去Queue拿 訂單生成,它就鎖住了整個Queue,只有它消費完成並返回成功後,這個鎖纔會釋放。

而後下一個消費者去拿到 訂單支付 一樣鎖住當前Queue,這樣的一個過程來真正保證對同一個Queue可以真正意義上的順序消費,而不只僅是順序取出。

全局順序與分區順序對比

消息類型對比

發送方式對比

其它的注意事項

一、順序消息暫不支持廣播模式。
二、順序消息不支持異步發送方式,不然將沒法嚴格保證順序。
三、建議同一個 Group ID 只對應一種類型的 Topic,即不一樣時用於順序消息和無序消息的收發。
四、對於全局順序消息,建議建立實例個數 >=2。


3、代碼示例

這裏保證兩點

一、生產端 同一orderID的訂單放到同一個queue。

二、消費端 同一個queue取出消息的時候鎖住整個queue,直到消費後再解鎖。

一、ProductOrder實體

@AllArgsConstructor
@Data
@ToString
public class ProductOrder {
    /**
     * 訂單編號
     */
    private String orderId;

    /**
     * 訂單類型(訂單建立、訂單付款、訂單完成)
     */
    private String type;
}

二、Product(生產者)

生產者和以前發送普通消息最大的區別,就是針對每個message都手動經過MessageQueueSelector選擇好queue。

@RestController
public class Product {
    private static List<ProductOrder> orderList = null;
    private static String producerGroup = "test_producer";
    /**
     * 模擬數據
     */
    static {
        orderList = new ArrayList<>();
        orderList.add(new ProductOrder("XXX001", "訂單建立"));
        orderList.add(new ProductOrder("XXX001", "訂單付款"));
        orderList.add(new ProductOrder("XXX001", "訂單完成"));
        orderList.add(new ProductOrder("XXX002", "訂單建立"));
        orderList.add(new ProductOrder("XXX002", "訂單付款"));
        orderList.add(new ProductOrder("XXX002", "訂單完成"));
        orderList.add(new ProductOrder("XXX003", "訂單建立"));
        orderList.add(new ProductOrder("XXX003", "訂單付款"));
        orderList.add(new ProductOrder("XXX003", "訂單完成"));
    }

    @GetMapping("message")
    public  void sendMessage() throws Exception {
        //示例生產者
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //不開啓vip通道 開通口端口會減2
        producer.setVipChannelEnabled(false);
        //綁定name server
        producer.setNamesrvAddr("IP:9876");
        producer.start();
        for (ProductOrder order : orderList) {
            //一、生成消息
            Message message = new Message(JmsConfig.TOPIC, "", order.getOrderId(), order.toString().getBytes());
            //二、發送消息是 針對每條消息選擇對應的隊列
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //三、arg的值其實就是下面傳入 orderId
                    String orderid = (String) arg;
                    //四、由於訂單是String類型,因此經過hashCode轉成int類型
                    int hashCode = orderid.hashCode();
                    //五、由於hashCode可能爲負數 因此取絕對值
                    hashCode = Math.abs(hashCode);
                    //六、保證同一個訂單號 必定分配在同一個queue上
                    long index = hashCode % mqs.size();
                    return mqs.get((int) index);
                }
            }, order.getOrderId(),50000);

            System.out.printf("Product:發送狀態=%s, 存儲queue=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), 
                                      sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType());
        }
        producer.shutdown();
    }
}

看看生產者有沒有把相同訂單指定到同一個queue

經過測試結果能夠看出:相同訂單已經存到同一queue中了

三、Consumer(生產者)

上面說過,消費者真正要達到消費順序,須要分佈式鎖,因此這裏須要將MessageListenerOrderly替換以前的MessageListenerConcurrently,由於它裏面實現了分佈式鎖。

@Slf4j
@Component
public class Consumer {
    
    /**
     * 消費者實體對象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消費者組
     */
    public static final String CONSUMER_GROUP = "consumer_group";
    /**
     * 經過構造函數 實例化對象
     */
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr("IP:9876");
        //TODO 這裏真的是個坑,我product設置VipChannelEnabled(false),但消費者並無設置這個參數,以前發送普通消息的時候也沒有問題。能正常消費。
        //TODO 但在順序消息時,consumer一直不消費消息了,找了很久都沒有找到緣由,直到我這裏也設置爲VipChannelEnabled(false),居然才能夠消費消息。
        consumer.setVipChannelEnabled(false);
        //訂閱主題和 標籤( * 表明全部標籤)下信息
        consumer.subscribe(JmsConfig.TOPIC, "*");
            //註冊消費的監聽 這裏注意順序消費爲MessageListenerOrderly 以前併發爲ConsumeConcurrentlyContext
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            //獲取消息
            MessageExt msg = msgs.get(0);
            //消費者獲取消息 這裏只輸出 不作後面邏輯處理
            log.info("Consumer-線程名稱={},消息={}", Thread.currentThread().getName(), new String(msg.getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
    }
}

看看消費結果是否是咱們須要的結果

經過測試結果咱們看出

一、消費消息的順序並無徹底按照以前的先進先出,即沒有知足全局順序。
二、同一訂單來說,訂單的 訂單生成、訂單支付、訂單完成 消費順序是保證的。

這是局部保證順序消費就已經知足咱們當前實際開發中的需求了。

有關消費端選擇MessageListenerOrderly後,consumer.start()啓動相關的源碼能夠參考博客:RocketMQ順序消息消費端源碼




只要本身變優秀了,其餘的事情纔會跟着好起來(上將4)
相關文章
相關標籤/搜索