RocketMQ生產者消息篇

系列文章

RocketMQ入門篇
RocketMQ生產者流程篇
RocketMQ生產者消息篇git

前言

上文RocketMQ生產者流程篇中詳細介紹了生產者發送消息的流程,本文將重點介紹發送消息的通訊模式以及各類不一樣的消息類型。github

通訊模式

RocketMQ提供了三種通信模式,分別是:同步,異步和單向;能夠查看內部類CommunicationMode:segmentfault

public enum CommunicationMode {
    SYNC,
    ASYNC,
    ONEWAY,
}

下面分別看一下三種通信模式如何使用服務器

1.同步方式

看一個簡單的發送同步消息的實例:併發

public class SyncProducer {

    public static void main(String[] args) throws Exception {

        System.setProperty("rocketmq.namesrv.domain", "localhost");
        // 構造Producer
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName2");
        // producer.setNamesrvAddr("192.168.237.128:9876");
        // 初始化Producer,整個應用生命週期內,只須要初始化1次
        producer.start();

        for (int i = 0; i < 1; i++) {
            Message msg = new Message("TopicTest6", "TagA",
                    ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        producer.shutdown();
    }
}

最簡單的直接指定一個message參數默認使用的就是同步方式發送消息,能夠看到在發送完消息以後,會立馬返回了發送結果SendResult:dom

SendResult [sendStatus=SEND_OK, msgId=0A0D5307324873D16E9365360AC60000, offsetMsgId=0A0D530700002A9F0000000000001200, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=0]

2.異步方式

看一個簡單的發送異步消息的實例:異步

public class AsyncProducer {

    public static void main(String[] args)
            throws MQClientException, RemotingException, InterruptedException, UnsupportedEncodingException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName2");
        producer.setRetryTimesWhenSendAsyncFailed(3);
        producer.setNamesrvAddr("192.168.237.128:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            Message msg = new Message("TopicTest6", "TagA",
                    ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

能夠看到在發送消息時指定了SendCallback回調類,send發送方法返回值爲void,發送成功以後會回調SendCallback的onSuccess方法,異常調用onException方法;發送成功日誌以下:分佈式

SendResult [sendStatus=SEND_OK, msgId=0A0D5307261473D16E936536591E0000, offsetMsgId=0A0D530700002A9F00000000000012B2, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=1]

3.單向方式

看一個簡單的發送單向消息的實例:ide

public class OneWayProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName2");
        producer.setNamesrvAddr("192.168.237.128:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            Message msg = new Message("TopicTest6", "TagA",
                    ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(msg);
        }
        producer.shutdown();
    }
}

單向發送消息發送以後沒有響應,可是在消費端能夠收到消息,以下所示:測試

ConsumeMessageThread_6Receive New Messages :[MessageExt [queueId=2, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1550648529526, bornHost=/10.13.83.7:54213, storeTimestamp=1550648529530, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000001416, commitLogOffset=5142, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1550648529533, UNIQ_KEY=0A0D530747DC73D16E93653766750000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]

4.發送狀態

以上同步和異步實例顯示的發送狀態都是SEND_OK,除了此狀態還有其餘三個狀態:FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE;具體能夠查看內部類SendStatus:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

FLUSH_DISK_TIMEOUT:刷盤超時,Broker設置的刷盤策略爲SYNC_FLUSH纔可能出現此錯誤;
FLUSH_SLAVE_TIMEOUT:主從同步超時,Broker設置了slave,而且指定同步策略爲SYNC_Master;
SLAVE_NOT_AVAILABLE:找不到salve,一樣是Broke指定同步策略爲SYNC_Master;
SEND_OK:表示發送成功,以上狀況都沒有出現。
注:必要時須要對各類異常場景進行處理,保證高質量的生產者。

消息類型

在RocketMQ的生產者端能夠發送多種類型的消息包括:延遲消息,順序消息以及事務消息,下面分別進行實例分析;

1.延遲消息

RocketMQ支持發送延遲消息,Broker收到消息後會延遲一段時間在處理,具體使用看以下代碼:

Message msg = new Message("TopicTest6", "TagA",
                    ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(3);

能夠直接設置延遲時間等級,具體有哪些等級,以及每一個等級對應的時間能夠查看類MessageStoreConfig

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

類中的messageDelayLevel 變量包含了全部能夠延遲的時間,使用空格分離,因此這裏等級爲3,其實對應的延遲時間就是10秒;分別觀察生產者和消費者的日誌以下:
生產者發送消息日誌以下:

Time [Wed Feb 20 18:25:32 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307268073D16E9365CCFA9E0000, offsetMsgId=0A0D530700002A9F0000000000001F42, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=1], queueOffset=4]

消費者接收日誌以下:

Time [Wed Feb 20 18:25:42 CST 2019],ConsumeMessageThread_3Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=2, sysFlag=0, bornTimestamp=1550658332318, bornHost=/10.13.83.7:55682, storeTimestamp=1550658342322, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002026, commitLogOffset=8230, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=3, CONSUME_START_TIME=1550658342325, UNIQ_KEY=0A0D5307268073D16E9365CCFA9E0000, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]

能夠發現發送消息的時間和接收消息的時間相差10秒;

2.順序消息

順序消息指生產者生產數據的順序和消費者消費數據的順序是一致的;順序消息包括全局順序消息和局部順序消息,全局順序消息指在某個Topic下全部消息都是順序的,局部順序消息指在Topic下的Message Queue中是順序的;

2.1全局順序消息

RocketMQ在默認狀況下並不能保證有序,一個Topic下會指定多個讀寫隊列,生產者會將消息寫入任意的Message Queue中,一樣消費者可能會啓動多個線程同時處理數據,因此並不能保證順序;如何保證全局順序須要只有一個讀隊列一個寫隊列,而後須要保證生產者和消費者不能併發處理,如下作一個簡單的實例驗證;
生產者順序的發送5條數據:

Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_4Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=3, sysFlag=0, bornTimestamp=1550713381399, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391403, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002575, commitLogOffset=9589, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=4, CONSUME_START_TIME=1550713391405, UNIQ_KEY=0A0D5307418073D16E936914F6160000, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_5Receive New Messages :[MessageExt [queueId=2, storeSize=219, queueOffset=2, sysFlag=0, bornTimestamp=1550713381450, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391451, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002650, commitLogOffset=9808, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=3, CONSUME_START_TIME=1550713391453, UNIQ_KEY=0A0D5307418073D16E936914F64A0001, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_6Receive New Messages :[MessageExt [queueId=3, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381455, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391460, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000272B, commitLogOffset=10027, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391463, UNIQ_KEY=0A0D5307418073D16E936914F64F0002, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_7Receive New Messages :[MessageExt [queueId=0, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381462, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391464, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002806, commitLogOffset=10246, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391490, UNIQ_KEY=0A0D5307418073D16E936914F6560003, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_8Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381466, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391469, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000028E1, commitLogOffset=10465, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391501, UNIQ_KEY=0A0D5307418073D16E936914F65A0004, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId='null'}]]

能夠發現這5條數據分別寫入了4個Message Queue中;再看一下消費者日誌:

Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_4Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=3, sysFlag=0, bornTimestamp=1550713381399, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391403, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002575, commitLogOffset=9589, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=4, CONSUME_START_TIME=1550713391405, UNIQ_KEY=0A0D5307418073D16E936914F6160000, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_5Receive New Messages :[MessageExt [queueId=2, storeSize=219, queueOffset=2, sysFlag=0, bornTimestamp=1550713381450, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391451, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002650, commitLogOffset=9808, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=3, CONSUME_START_TIME=1550713391453, UNIQ_KEY=0A0D5307418073D16E936914F64A0001, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_6Receive New Messages :[MessageExt [queueId=3, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381455, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391460, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000272B, commitLogOffset=10027, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391463, UNIQ_KEY=0A0D5307418073D16E936914F64F0002, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_7Receive New Messages :[MessageExt [queueId=0, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381462, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391464, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002806, commitLogOffset=10246, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391490, UNIQ_KEY=0A0D5307418073D16E936914F6560003, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId='null'}]]
Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_8Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381466, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391469, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000028E1, commitLogOffset=10465, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391501, UNIQ_KEY=0A0D5307418073D16E936914F65A0004, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId='null'}]]

消費者啓動了5個線程同時從4個Message Queue中讀取數據,全部不能保證數據的順序性;
分別作以下改造:設置Topic的讀寫隊列分別爲1,能夠直接去RocketMQ-console去修改配置;而後設置消費者的處理線程數爲1:

consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);

再次測試,生產者一樣發送5條數據:

Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457700000, offsetMsgId=0A0D530700002A9F0000000000002EF5, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=11]
Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457790001, offsetMsgId=0A0D530700002A9F0000000000002FA7, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=12]
Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E936924577F0002, offsetMsgId=0A0D530700002A9F0000000000003059, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=13]
Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457850003, offsetMsgId=0A0D530700002A9F000000000000310B, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=14]
Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457880004, offsetMsgId=0A0D530700002A9F00000000000031BD, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=15]

能夠發送全部的消息都寫入了相同的隊列,而後看消費者日誌:

Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=11, sysFlag=0, bornTimestamp=1550714389360, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389364, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002EF5, commitLogOffset=12021, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1550714389372, UNIQ_KEY=0A0D53071A0873D16E93692457700000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]
Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=12, sysFlag=0, bornTimestamp=1550714389369, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389371, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002FA7, commitLogOffset=12199, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, CONSUME_START_TIME=1550714389379, UNIQ_KEY=0A0D53071A0873D16E93692457790001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]
Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=13, sysFlag=0, bornTimestamp=1550714389375, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389379, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000003059, commitLogOffset=12377, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, CONSUME_START_TIME=1550714389385, UNIQ_KEY=0A0D53071A0873D16E936924577F0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]
Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=14, sysFlag=0, bornTimestamp=1550714389381, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389382, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000310B, commitLogOffset=12555, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, CONSUME_START_TIME=1550714389385, UNIQ_KEY=0A0D53071A0873D16E93692457850003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId='null'}]]
Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=15, sysFlag=0, bornTimestamp=1550714389384, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389386, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000031BD, commitLogOffset=12733, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest6', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=16, CONSUME_START_TIME=1550714389409, UNIQ_KEY=0A0D53071A0873D16E93692457880004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId='null'}]]

能夠發送全部的消息都被同一個線程處理,而且從同一個Message Queue中讀取數據,能夠保證數據的順序性;

2.2局部順序消息

生產者須要將相關業務的消息發送到同一個Message Queue,在消費端須要保證同一個Message Queue讀取的消息不能被併發處理;
生產者發送消息給同一個Message Queue能夠經過MessageQueueSelector來實現:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            return mqs.get(0);
        }
}, i);

經過在select放在中爲msg指定固定的Message Queue,這裏爲了方便給全部的消息都指定第0個隊列;
消費者保證同一個Message Queue讀取的消息不能被併發處理,經過MessageListenerOrderly實現:

consumer.registerMessageListener(new MessageListenerOrderly() {
            
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("Time [" + new Date().toString() + "]," +Thread.currentThread().getName() + "Receive New Messages :" + msgs + "%n");
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();

簡單測試一下,分開看一下生產者和消費者日誌:

Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D8B0000, offsetMsgId=0A0D530700002A9F00000000000035E9, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=5]
Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D940001, offsetMsgId=0A0D530700002A9F000000000000369B, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=6]
Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D980002, offsetMsgId=0A0D530700002A9F000000000000374D, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=7]
Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D9D0003, offsetMsgId=0A0D530700002A9F00000000000037FF, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=8]
Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D9F0004, offsetMsgId=0A0D530700002A9F00000000000038B1, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=9]
Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=5, sysFlag=0, bornTimestamp=1550717143436, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143440, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000035E9, commitLogOffset=13801, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest7', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D8B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId='null'}]]
Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=6, sysFlag=0, bornTimestamp=1550717143444, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143445, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000369B, commitLogOffset=13979, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest7', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D940001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId='null'}]]
Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=7, sysFlag=0, bornTimestamp=1550717143448, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143451, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000374D, commitLogOffset=14157, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest7', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D980002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId='null'}]]
Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=8, sysFlag=0, bornTimestamp=1550717143453, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143454, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000037FF, commitLogOffset=14335, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest7', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D9D0003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId='null'}]]
Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=9, sysFlag=0, bornTimestamp=1550717143455, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143456, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000038B1, commitLogOffset=14513, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest7', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D9F0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId='null'}]]

3.事務消息

事務消息是指RocketMQ發送的消息和其餘本地事件須要同時成功同時失敗,能夠理解爲就是分佈式事務;RocketMQ處理事務消息的大體流程以下:
1.生產者發送"待確認"消息;
2.RocketMQ接收到消息進行相關保存操做,成功之後返回狀態給生產者;
3.生產者接收到的返回若是爲SEND_OK狀態,將執行本地事務操做;
4.根據本地事務執行的結果,生產者執行commit仍是rollback;
5.若是第四步生產者執行操做失敗,服務器會在通過固定時間段對狀態爲"待確認"的消息發起回查請求;
6.生產者接收到回查請求後根據本地事務的結果返回commit仍是rollback;
7.服務器收到結果後執行相關操做。

接下來看一下官方提供的實例
TransactionProducerTest生產者類,相似DefaultMQProducer,主要設置了一個事務監聽器類TransactionListener,用於開始本地事務會給服務器的回查接口;

public class TransactionProducerTest {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroupName");
        producer.setNamesrvAddr("192.168.237.128:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    }
                });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                System.out.println("start send message " + msg);
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

還有一個監聽器類TransactionListenerImpl

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("executeLocalTransaction");
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        System.out.println("checkLocalTransaction:status = " + status);
        if (null != status) {
            switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
            default:
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

以上監聽器須要實現兩個接口方法,分別是執行本地事務的方法和用於被服務器回調的方法;運行以上生產者相關日誌以下:

start send message Message{topic='TopicTest1234', flag=0, properties={KEYS=KEY0, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}
executeLocalTransaction
SendResult [sendStatus=SEND_OK, msgId=0A0D5307383473D16E936A31CF040000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=broker-a, queueId=2], queueOffset=47]
checkLocalTransaction:status = 0
checkLocalTransaction:status = 0
checkLocalTransaction:status = 0

從日誌能夠看出,首先發送"待確認"消息,發送返回爲SEND_OK;而後執行本地事務,實例中返回的是一個LocalTransactionState.UNKNOW狀態,致使服務器一直調用回查方法checkLocalTransaction,同時消費端一直沒有消息被消費;作簡單代碼改動,將本地事務的執行結果改爲LocalTransactionState.COMMIT_MESSAGE,生產者消費者日誌以下:

start send message Message{topic='TopicTest1234', flag=0, properties={KEYS=KEY0, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}
executeLocalTransaction
SendResult [sendStatus=SEND_OK, msgId=0A0D530740F073D16E936A3A2DC10000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=broker-a, queueId=2], queueOffset=58]
Time [Thu Feb 21 15:03:17 CST 2019],ConsumeMessageThread_3Receive New Messages :[MessageExt [queueId=2, storeSize=278, queueOffset=0, sysFlag=8, bornTimestamp=1550732597697, bornHost=/10.13.83.7:55029, storeTimestamp=1550732597758, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000008853, commitLogOffset=34899, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=34610, toString()=Message{topic='TopicTest1234', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest1234, MAX_OFFSET=1, KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=0A0D530740F073D16E936A3A2DC10000, WAIT=true, PGROUP=transactionProducerGroupName, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='0A0D530740F073D16E936A3A2DC10000'}]]

能夠看到消息狀態在服務器端被修改,這樣消費端就能夠消費此消息;

總結

本文首先介紹了RocketMQ發送消息的通信模式,而後重點介紹了延遲消息,順序消息以及事務消息,而且結合實例進行分析。

示例代碼地址

https://github.com/ksfzhaohui...
https://gitee.com/OutOfMemory...

相關文章
相關標籤/搜索