RocketMQ快速上手

快速上手

本快速上手手冊,指的是在本地計算機上設置RocketMQ消息傳遞系統,並能作基本生產和消費的詳細說明。java

必要環境

  1. 64位的操做系統
  2. 64位的JDK1.8+
  3. Maven 3.2.x +
  4. Git
  5. 4G以上的存儲空間用於代理實例

下載和構建發佈版

這裏能夠下載 4.4.0 正式版的源代碼. 也能夠在這裏下載一個二進制版本git

如今執行如下的命令來解包4.4.0源版本並構建二進制包組件.github

> unzip rocketmq-all-4.4.0-source-release.zip
  > cd rocketmq-all-4.4.0/
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/apache-rocketmq
複製代碼

mvn -Prelease-all -DskipTests clean install -U命令將會去構建下載的源代碼包,而後在distribution目錄中生產target目錄,構建的包就在這裏.sql

啓動Name Server

charse@charse-thinkpad:/media/charse/文檔/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup sh bin/mqnamesrv &
[1] 5273
charse@charse-thinkpad:/media/charse/文檔/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup: 忽略輸入並把輸出追加到'nohup.out'
tail -f ~/logs/rocketmqlogs/namesrv.log
2019-03-31 19:24:10 INFO main - tls.client.keyPath = null
2019-03-31 19:24:10 INFO main - tls.client.keyPassword = null
2019-03-31 19:24:10 INFO main - tls.client.certPath = null
2019-03-31 19:24:10 INFO main - tls.client.authServer = false
2019-03-31 19:24:10 INFO main - tls.client.trustCertPath = null
2019-03-31 19:24:11 INFO main - Using OpenSSL provider
2019-03-31 19:24:11 INFO main - SSLContext created for server
2019-03-31 19:24:12 INFO NettyEventExecutor - NettyEventExecutor service started
2019-03-31 19:24:12 INFO FileWatchService - FileWatchService service started
2019-03-31 19:24:12 INFO main - The Name Server boot success. serializeType=JSON
2019-03-31 19:25:11 INFO NSScheduledThread1 - --------------------------------------------------------
2019-03-31 19:25:11 INFO NSScheduledThread1 - configTable SIZE: 0

複製代碼

能夠看到 The Name Server boot success就能夠知道是啓動成功了,而且序列化方式是JSON.shell

啓動Broker Server

charse@charse-thinkpad:/media/charse/文檔/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ nohup sh bin/mqbroker -n localhost:9876 &
[1] 5784
nohup: 忽略輸入並把輸出追加到'nohup.out'
charse@charse-thinkpad:/media/charse/文檔/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ tail -f ~/logs/rocketmqlogs/broker.log 
2019-03-31 19:41:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:41:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 19:41:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:42:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 19:42:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK

複製代碼

能夠看到broker已經成功註冊到name server中了express

同時,能夠經過jps命令來查看服務是否啓動成功apache

charse@charse-thinkpad:~$ jps
12128 Main
12549 Jps
5279 NamesrvStartup
5791 BrokerStartup
複製代碼

能夠看到,NameServer和Broker都已經啓動成功了,因此咱們就能夠進行下一步模擬發送者和消費者了。編程

Broker

使用編程的方式向topic(TopicTest)中生產消息的時候, Broker發現這個topic是沒有的,那麼broker默認去建立topic(TopicTest),並配置了默認的配置。 producer中顯示的是我本地的局域網的地址(192.168.3.16)promise

2019-03-31 20:37:43 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /192.168.3.16:47538
2019-03-31 20:37:43 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[192.168.3.16:47538]
2019-03-31 20:37:43 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: producer1 channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer[producer1] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863927]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer group[producer1] from groupChannelTable
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863933]
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 20:37:52 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
複製代碼

當建立一個producer的時候能夠看到時先會建立一個 CLIENT_INNER_PRODUCER producer在groupChannelTable中而後再建立客戶端的一個producer 即producer1註冊到groupChannelTable中,從中能夠看到這個producer中的一些信息。但produer關閉shutdown的時候。先關閉客戶端的producer1 而後再groupChannelTable從CLIENT_INNER_PRODUCER移除。 代理會不定時的向nameserver進行註冊。bash

當客戶端建立一個消費者的時候,以下圖代理輸出的日誌,能夠看到當有消費者的時候,會建立一個訂閱組,並建立訂閱組的配置信息,而後在新額消費者鏈接以後,會添加topi到對應的group中,其中有你訂閱的時topic(Topic), 同時會添加一個重試topicz這個topic是按照%RETRY%消費者group名稱命名的,並添加上訂閱。 同時也會建立一個新的producer, 這個producer是CLIENT_INNER_PRODUCER

2019-03-31 20:38:22 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - auto create a subscription group, SubscriptionGroupConfig [groupName=consumer1, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - create new topic TopicConfig [topicName=%RETRY%consumer1, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2019-03-31 20:38:35 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915211]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]]] 192.168.3.16:48236
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915412]
2019-03-31 20:38:35 INFO HeartbeatThread_3 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915833, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915843, expressionType=TAG]
2019-03-31 20:38:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 20:39:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:39:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:40:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
複製代碼

NameServer

在沒有建立任何的topic的時候時候,namesever中輸出的日誌能夠看到, Roacket MQ 中默認建立了許多的topic

2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:48210
2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:48210]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, charse-thinkpad QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, BenchmarkTest QueueData [brokerName=charse-thinkpad, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, TBW102 QueueData [brokerName=charse-thinkpad, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, DefaultCluster QueueData [brokerName=charse-thinkpad, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new broker registered, 192.168.3.16:10911 HAServer: 192.168.3.16:10912
2019-03-31 19:40:12 INFO RemotingExecutorThread_4 - new topic registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]

複製代碼

當用客戶端進行建立一個topic(TopicTest)的時候,能夠看到,在nameserver中,這個topic已經註冊上去了。

2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49208
2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO RemotingExecutorThread_4 - new topic registered, TopicTest QueueData [brokerName=charse-thinkpad, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelInactive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelUnregistered, the channel[127.0.0.1:49208]
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49224
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49224]
2019-03-31 20:38:35 INFO RemotingExecutorThread_4 - new topic registered, %RETRY%consumer1 QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
複製代碼

當消費者關閉時,會將客戶端的消費者consumer1consumerGroupInfo中進行註銷.而後將CLIENT_INNER_PRODUCERgroupChannelTable中進行註銷.

2019-03-31 21:50:08 INFO HeartbeatThread_3 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]]] 192.168.3.16:49810
2019-03-31 21:50:08 INFO HeartbeatThread_3 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040208756, expressionType=TAG]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister a consumer[consumer1] from consumerGroupInfo ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208763]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister consumer ok, no any connection, and remove consumer group, consumer1
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208801]
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 21:50:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 21:50:28 WARN PullMessageThread_2 - the consumer's group info not exist, group: consumer1 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - processRequestWrapper response to /192.168.3.16:49810 failed java.nio.channels.ClosedChannelException: null at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=11, language=JAVA, version=293, opaque=24, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=2, suspendTimeoutMillis=15000, commitOffset=0, topic=%RETRY%consumer1, queueOffset=0, expressionType=TAG, subVersion=1554040208756, consumerGroup=consumer1}, serializeTypeCurrentRPC=JSON] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=24, language=JAVA, version=293, opaque=24, flag(B)=1, remark=the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details., extFields=null, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 21:51:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
複製代碼

生產消費示例

  • 使用RocketMQ三種方式進行發送消費:可靠的同步方式;可靠的異步方式;單向傳遞.
  • 使用RocketMQ消費消息

1.添加依賴

maven 方式:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
    </dependency>
複製代碼

2.1同步發送消費

可靠的同步傳輸普遍應用於重要的通知消息,短信通知,短信營銷系統等場景.

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

複製代碼

2.2異步發送消息

異步傳輸一般用於響應時間敏感的業務場景.

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
                final int index = i;
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
複製代碼

2.3單向傳遞

單向傳輸用於須要中等可靠的狀況,如日誌收集.

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);

        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

複製代碼

3.消費消息示例

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
複製代碼

關於更多關於rocketmq的使用實例能夠查看這裏.

順序消息

RocketMQ提供先進先出的順序消息隊列.下面例子中將會顯示全局和部分有序消息的發送/接收.

發送消息例子

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}
複製代碼

訂閱消息例子

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
複製代碼

消息廣播

廣播是向主的全部訂閱者發送消息,若是您但願全部訂閱這都收到有關某個主題的消息,則廣播是一個不錯的選擇.

生產例子

ublic class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}
複製代碼

消費例子

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}
複製代碼

定時消息

定時消息與普通消息不一樣,由於他們在設定的延遲時間以後纔會傳遞.

1.啓動消費者以等待傳入的訂閱消息

public class ScheduledMessageConsumer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate message consumer
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
         // Subscribe topics
         consumer.subscribe("TestTopic", "*");
         // Register message listener
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         // Launch consumer
         consumer.start();
     }
 }
複製代碼

2.發送定時消息

public class ScheduledMessageProducer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
    
         // Shutdown producer after use.
         producer.shutdown();
     }
        
 }
複製代碼

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 能夠觀察到,消息被消費的時間將會比它存儲的時間晚10s.

批量發送

爲何批量發送?

批量的發送消息能夠提升較小消息提傳輸性能.

使用限制

同一批的消息應該具備: 相同的主題,相同的waitstoremsgok, 而且不支持定時消息.另外, 每次發送消息體的總大小不該該超過1MB。

如何使用批量發送

若是一次只發送不超過1MB字節的消息,那麼批量使用就很容易了.

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}
    
複製代碼

大數量量拆分爲list

批量發送的複雜性只有在發送大批量消息時纔會增長,而且可能沒法肯定批量發送的消息體它是否超越了1MB的大小限制.這個時候你就須要將消息拆分爲List

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

複製代碼

消息過濾器示例

在大多數狀況下,標記是一種簡單而有用的設計,用於選擇所需的消息.例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
複製代碼

上面的實例中,消費者將收到包含TAGA或TAGB或TAGC的消息,但限制是一條消息只能有一個標記,這可能不適用於複雜的場景.在這種狀況下,可使用SQL表達式篩選出消息.

原理

SQL功能能夠經過發送消息時輸入的屬性進行一些計算.在rocketmq定義的語法下模.你能夠實現一些有趣的邏輯

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

複製代碼

語法

rocketmq 只定義了一些基本語法來支持這個特性,你也能夠很容易地擴展它.

  1. Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  2. Character comparison, like =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, OR, NOT;

常量類型:

  1. Numeric, like 123, 3.1415;
  2. Character, like ‘abc’, must be made with single quotes;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;

使用限制

只有推送使用這能夠經過sql92選擇消息.接口以下:

public void subscribe(final String topic, final MessageSelector messageSelector)
複製代碼

生產示例

發送時能夠經過方法putUserProperty將屬性放入消息中.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown();
複製代碼

消費實例

使用MessageSelector.bySql來消費消息.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
複製代碼

開放消息

開放消息, 其中包括指定行業指南和消息傳遞,流式規範,爲金融,電子商務,物聯網和大數據領域提供通用框架.設計原則是面向雲,簡單,靈活和獨立於語言的異構環境.符合這些規範將使跨全部主要平臺和操做系統開發異構消息傳遞應用程序成爲可能.

RocketMQ提供了OpenMessaging 0.1.0-alpha的部分實現,下面的實例中演示了基於OpenMessaging訪問RocketMQ.

OMSProducer

下面的實例中將展現如何使用RocketMQ發送同步消息,異步消息,單向傳輸消息.

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}
複製代碼

OMSPullConsumer

使用OMSPullConsumer從一個特殊的隊列中拉取消息

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}
複製代碼

OMSPushConsumer

將OMS PushConsumer附加到指定隊列,並按MessageListenner使用消息.

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
        
        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
                context.ack();
            }
        });
        
    }
}
複製代碼

事務性消息示例

什麼是事務性消息?

能夠將其視爲兩階段提交消息實現,以確保分佈式系統中的最終一致性. 事務性消息保證了本地事務的執行和消息的發送能夠自執行.

使用限制

  • 事務消息不支持定時和批量操做.
  • 爲了不單個消息被檢查屢次,致使一半隊列消息積壓,咱們將單個消息檢查次數默認限制在15次,用戶能夠經過更改代理配置中的"TransactionCheckMax"參數來更改此限制,若是一個消息被檢查的次數超過在代理配置的"TransactionCheckMax"次,代理默認狀況下將丟棄該消息同時打印錯誤日誌,用戶能夠經過重寫"AbstractTransationCheckListener類來更改此行爲.
  • 事務消息將會在transactionTimeout事件內被檢查,這個參數在代理配置中能夠進行配置.用戶還能夠經過設置用戶屬性「CHECK_IMMUNITY_TIME_IN_SECONDS」來更改此限制。當發送事務性消息時,此參數優先於「transactionMsgTimeout」參數。
  • 事務消息可能被屢次檢查或使用.
  • 提交的消息從新放置到用戶的目標主題可能會失敗.目前,它取決於日誌記錄。高可用性是由RocketMQ自己的高可用性機制保證的.若是但願確保事務消息沒有丟失,而且事務完整性獲得了保證,建議使用同步雙寫,機制.
  • 事務性消息的生產者ID不能與其餘類型消息的生產者ID共享.與其餘類型的消息不一樣,事務性消息容許向後查詢.按生產商ID查詢客戶機的MQ服務器.

事務消息的狀態(Transactional status)

  • TransactionStatus.CommitTransaction: 提交事務,這意味着容許消費者使用該消息.
  • TransactionStatus.RollbackTransaction: 回滾事務,這意味着消息將被刪除和不容許消費.
  • TransactionStatus.Unknown: 中間狀態,這意味着MQ須要覈對,以肯定狀態.

建立事務消息

使用TransactionMQProducer類建立producer客戶端,並指定一個唯一的producerGroup,您能夠設置一個自定義線程池來處理檢查請求。執行本地事務後,你須要回覆MQ根據執行結果,和應答狀態是在上面的部分中描述。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
複製代碼

消費事務消息

「executeLocalTransaction」方法用於在發送半消息成功時執行本地事務。它返回前一節中提到的三個事務狀態之一。 「checkLocalTransaction」方法用於檢查本地事務狀態並響應MQ檢查請求。它還返回上一節中提到的三個事務狀態之一。

import ...
   
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
   
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
   
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
   
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }
複製代碼
相關文章
相關標籤/搜索