RocketMQ中文資料:(若英文很好能夠直接去看英文版本)html
雖然以上基於RocketMQ 3版本,也有很好的參考價值。java
從官方倉庫 github.com/apache/rock… Fork 出屬於本身的倉庫。git
搭建調試環境的過程:github
本系列使用的 RocketMQ 版本是 4.5.1 。apache
打開 org.apache.rocketmq.namesrv.NameServerInstanceTest 單元測試類,參考 #startup() 方法,咱們編寫 #main(String[] args) 靜態方法,代碼以下:bash
// NameServerInstanceTest.java
public static void main(String[] args) throws Exception {
// NamesrvConfig 配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// NettyServerConfig 配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); // 設置端口
// 建立 NamesrvController 對象,並啓動
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
namesrvController.initialize();
namesrvController.start();
// sleep
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}
複製代碼
而後,右鍵運行,RocketMQ Namesrv 就啓動完成。輸出日誌以下:ide
14:42:23.154 [NettyEventExecutor] INFO RocketmqRemoting - NettyEventExecutor service started
14:42:23.154 [FileWatchService] INFO RocketmqCommon - FileWatchService service started
複製代碼
打開 org.apache.rocketmq.broker.BrokerControllerTest 單元測試類,參考 #testBrokerRestart() 方法,咱們編寫 #main(String[] args) 方法,代碼以下:單元測試
// BrokerControllerTest.java
public static void main(String[] args) throws Exception {
// 設置版本號
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// NettyServerConfig 配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
// BrokerConfig 配置
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName("broker-a");
brokerConfig.setNamesrvAddr("127.0.0.1:9876");
// MessageStoreConfig 配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setDeleteWhen("04");
messageStoreConfig.setFileReservedTime(48);
messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
messageStoreConfig.setDuplicationEnable(false);
// 建立 BrokerController 對象,並啓動
BrokerController brokerController = new BrokerController(
brokerConfig,
nettyServerConfig,
new NettyClientConfig(),
messageStoreConfig);
brokerController.initialize();
brokerController.start();
System.out.println("啓動了");
// sleep
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}
複製代碼
brokerControllerTest控制檯輸出:測試
啓動了
複製代碼
NameServerInstanceTests控制檯輸出:fetch
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, HZO-PC10068847 QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, TopicTest QueueData [brokerName=broker-a, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new topic registered, DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
14:43:38.524 [RemotingExecutorThread_1] INFO RocketmqNamesrv - new broker registered, 10.111.23.56:10911 HAServer: 10.111.23.56:10912
複製代碼
打開 org.apache.rocketmq.example.quickstart.Producer 示例類,代碼以下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
producer.setNamesrvAddr("127.0.0.1:9876"); //<x>
/*
* Launch the instance.
*/
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
複製代碼
在處添加設置NameServer地址producer.setNamesrvAddr("127.0.0.1:9876") 代碼塊,指明 Producer 使用的 RocketMQ Namesrv。控制檯輸出:
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C103DF, offsetMsgId=0A6F173800002A9F000000003A31CEC2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1277979]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C203E0, offsetMsgId=0A6F173800002A9F000000003A31CF76, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1277981]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C203E1, offsetMsgId=0A6F173800002A9F000000003A31D02A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1277975]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E2, offsetMsgId=0A6F173800002A9F000000003A31D0DE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1277978]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E3, offsetMsgId=0A6F173800002A9F000000003A31D192, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1277980]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E4, offsetMsgId=0A6F173800002A9F000000003A31D246, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1277982]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E5, offsetMsgId=0A6F173800002A9F000000003A31D2FA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1277976]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C403E6, offsetMsgId=0A6F173800002A9F000000003A31D3AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1277979]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C403E7, offsetMsgId=0A6F173800002A9F000000003A31D462, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1277981]
14:46:25.763 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:46:25.764 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[10.111.23.56:10911] result: true
複製代碼
打開 org.apache.rocketmq.example.quickstart.Consumer 示例類,代碼以下:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
consumer.setNamesrvAddr("127.0.0.1:9876"); //<x>
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
複製代碼
在 處增長 consumer.setNamesrvAddr("127.0.0.1:9876") 代碼塊,指明 Consumer 使用的 RocketMQ Namesrv。控制檯輸出:
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=1277974, sysFlag=0, bornTimestamp=1563259585728, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585729, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31CD5A, commitLogOffset=976342362, bodyCRC=1058180698, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277977, CONSUME_START_TIME=1563259835201, UNIQ_KEY=0A6F1738844418B4AAC2506AF0C003DD, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 57], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=1277972, sysFlag=0, bornTimestamp=1563259585725, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585725, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31C7BA, commitLogOffset=976340922, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277977, CONSUME_START_TIME=1563259835201, UNIQ_KEY=0A6F1738844418B4AAC2506AF0BD03D5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=1277971, sysFlag=0, bornTimestamp=1563259585723, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585723, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31C4EA, commitLogOffset=976340202, bodyCRC=1597161362, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277977, CONSUME_START_TIME=1563259835201, UNIQ_KEY=0A6F1738844418B4AAC2506AF0BB03D1, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 55], transactionId='null'}]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1277978, sysFlag=0, bornTimestamp=1563259585727, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585727, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31CBF2, commitLogOffset=976342002, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277982, CONSUME_START_TIME=1563259835210, UNIQ_KEY=0A6F1738844418B4AAC2506AF0BF03DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]]
ConsumeMessageThread_19 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1277970, sysFlag=0, bornTimestamp=1563259585709, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585710, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31B572, commitLogOffset=976336242, bodyCRC=51035196, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277982, CONSUME_START_TIME=1563259835209, UNIQ_KEY=0A6F1738844418B4AAC2506AF0AD03BB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 53, 53], transactionId='null'}]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1277968, sysFlag=0, bornTimestamp=1563259585705, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585705, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31AFD2, commitLogOffset=976334802, bodyCRC=1948249169, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277982, CONSUME_START_TIME=1563259835208, UNIQ_KEY=0A6F1738844418B4AAC2506AF0A903B3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 55], transactionId='null'}]]
複製代碼
KVConfigManager:KV配置管理
RouteInfoManager:路由信息管理