消息隊列對比參照表:
java
RocketMQ vs. ActiveMQ vs. Kafka:
node
參考至:git
環境要求:github
一、下載RocketMQ的二進制包,我這裏使用的是4.5.1版本,下載地址以下:web
http://rocketmq.apache.org/release_notes/release-notes-4.5.1/spring
使用wget命令下載:數據庫
[root@study-01 ~]# cd /usr/local/src [root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
二、解壓下載好的壓縮包,並移動到合適的目錄下:apache
[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip [root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1
注:若沒有安裝unzip命令則使用以下命令安裝:
yum install -y unzip編程
三、進入rocketmq的根目錄並查看是否包含以下目錄及文件:json
[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1 [root@study-01 /usr/local/rocketmq-4.5.1]# ls benchmark bin conf lib LICENSE NOTICE README.md
四、沒問題後,使用以下命令啓動Name Server:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv & [1] 2448 [root@study-01 /usr/local/rocketmq-4.5.1]#
五、查看默認的9876端口是否被監聽,以驗證Name Server是否啓動成功:
[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java tcp6 0 0 :::9876 :::* LISTEN 2454/java [root@study-01 /usr/local/rocketmq-4.5.1]#
六、啓動Broker:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 & [2] 2485 [root@study-01 /usr/local/rocketmq-4.5.1]#
七、驗證Broker是否啓動成功,若是啓動成功,能看到相似以下的日誌::
[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success" 2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]#
若想中止Name Server和Broker,則依次執行如下兩條命令便可:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker The mqbroker(2492) is running... Send shutdown request to mqbroker(2492) OK # 輸出該信息說明中止成功 [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv The mqnamesrv(2454) is running... Send shutdown request to mqnamesrv(2454) OK # 輸出該信息說明中止成功 [2]+ 退出 143 nohup sh bin/mqbroker -n localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]#
一、驗證生產消息正常,執行以下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的狀況下,會看到一堆的相似於以下的輸出,這是生產消息後成功的result:
SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]
二、驗證消費消息正常,執行以下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的狀況下,會看到一堆的相似於以下的輸出,這是消費的消息內容:
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]
RocketMQ官方提供了一個基於Spring Boot開發的可視化控制檯,能夠方便咱們查看RocketMQ的運行狀況以及提高運維效率。因此本小節將介紹一下如何搭建搭建RocketMQ的控制檯,因爲咱們使用的RocketMQ版本是4.5.1,因此須要對控制檯的源碼進行一些改動以適配RocketMQ的4.5.1版本。
一、首先須要下載源碼,有兩種方式,一是使用git克隆代碼倉庫,二是直接下載rocketmq-externals的zip包,我這裏使用git方式,執行以下命令:
git clone https://github.com/apache/rocketmq-externals.git
二、修改控制檯代碼,使用IDE打開rocketmq-console
項目,以下圖所示:
2.一、修改項目中的application.properties
配置文件,我這裏主要是修改了監聽端口和Name Server的鏈接地址,至於其餘配置項有須要的話可按照說明自行修改:
# console的監聽端口,默認是8080 server.port=8011 # Name Server的鏈接地址;非必須,能夠在啓動了console後,在控制檯導航欄 - 運維 - NameSvrAddrList一欄設置 rocketmq.config.namesrvAddr=192.168.190.129:9876
2.二、修改依賴,因爲console項目默認使用的rocketmq版本是4.4.0,與咱們這裏使用的是4.5.1不徹底兼容,因此須要修改一下依賴版本,找到這一行:
<rocketmq.version>4.4.0</rocketmq.version>
修改成:
<rocketmq.version>4.5.1</rocketmq.version>
2.三、修改代碼,因爲修改了rocketmq的版本,會致使org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic
方法編譯報錯,因此須要改動一下此處代碼 ,將:
@Override public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); ...
修改成:
@Override public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { RPCHook rpcHook = null; DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); ...
三、打包構建並啓動,打開idea的terminal,執行以下命令:
# 在rocketmq-console目錄下執行 mvn clean package -DskipTests # 進入jar包存放目錄 cd target # 啓動rocketmq console java -jar rocketmq-console-ng-1.0.1.jar
四、使用瀏覽器訪問控制檯,我這裏因爲修改了端口,因此訪問地址是:http://localhost:8011
,正常的狀況下能看到以下界面:
不習慣英文的話能夠在右上角切換語言:
因爲控制檯是可視化界面而且支持中文,這裏就不過多介紹了,能夠參考官方的控制檯使用說明文檔:
我這裏將基本的術語與概念簡單總結成了思惟導圖:
官方文檔:
在以上小節搭建完RocketMQ以後,咱們來使用Spring的消息編程模型,編寫一個簡單的示例。首先須要在項目中添加相關依賴以下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
在配置文件中添加rocketmq相關的配置以下:
rocketmq: name-server: 192.168.190.129:9876 producer: # 小坑:必須指定group group: test-group
編寫生產者的代碼,這裏以Controller作示例,具體代碼以下:
package com.zj.node.contentcenter.controller.content; import lombok.Data; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 生產者 * * @author 01 * @date 2019-08-03 **/ @RestController @RequiredArgsConstructor public class TestProducerController { /** * 用於發送消息到 RocketMQ 的api */ private final RocketMQTemplate rocketMQTemplate; @GetMapping("/test-rocketmq/sendMsg") public String testSendMsg() { String topic = "test-topic"; // 發送消息 rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance()); return "send message success"; } } @Data class MyMessage { private Integer id; private String name; private String status; private Date createTime; static MyMessage getInstance() { MyMessage message = new Message(); message.id = 1; message.name = "×××"; message.status = "default"; message.createTime = new Date(); return message; } }
編寫完成後,啓動項目,訪問該接口:
消息發送成功後,能夠到RocketMQ的控制檯中進行查看:
消息體能夠在消息詳情中查看,以下:
從生產者的代碼來看,能夠說是十分的簡單了,只須要使用一個RocketMQTemplate就能夠實現將對象轉換成消息體併發送消息。實際上除了RocketMQ外,其餘的MQ也有對應的Template,以下:
在消費者項目中,也須要添加rocketmq的依賴:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
一樣須要配置Name Server的鏈接地址:
rocketmq: name-server: 192.168.190.129:9876
編寫消費者的代碼,具體代碼以下:
package com.zj.node.usercenter.rocketmq; import com.alibaba.fastjson.JSON; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 消費者監聽器 * * @author 01 * @date 2019-08-03 **/ @Slf4j @Component // topic須要和生產者的topic一致,consumerGroup屬性是必須指定的,內容能夠隨意 @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group") public class TestConsumerListener implements RocketMQListener<MyMessage> { /** * 監聽到消息的時候就會調用該方法 * * @param message 消息體 */ @Override public void onMessage(MyMessage message) { log.info("從test-topic中監聽到消息"); log.info(JSON.toJSONString(message)); } } /** * 消息體結構須要一致 */ @Data class MyMessage { private Integer id; private String name; private String status; private Date createTime; }
編寫完成後啓動項目,因爲以前咱們已經往隊列裏發送了消息,因此此時消費者項目一啓動,就能夠監聽到消息並消費,控制檯就會輸出以下日誌:
衆所周知RocketMQ是支持事務消息的,這也是不少人選擇使用RocketMQ做爲消息中間件的一大緣由,也是RocketMQ的一大特定。RocketMQ事務消息的流程以下圖所示:
因爲原圖是英文的,因此進行了一個大體的翻譯。以下:
簡單剖析一下流程:
一、生產者向MQ Server發送半消息,半消息是一種特殊的消息,這種消息會被存儲到MQ Server裏,可是會標記爲暫時不能投遞的狀態,因此此時消費者不會消費該消息
二、當半消息發送成功後,生產者就會去執行本地事務
三、生產者根據本地事務的執行結果,向MQ Server發送commit或rollback消息進行二次確認。若是MQ Server接收到的是commit則會將半消息標記爲可投遞狀態,那麼消費者就能夠進行消費。反之,MQ Server接收到的是rollback則會將半消息丟棄掉,消費者就沒法進行消費
四、若MQ Server未接收到二次確認的消息或生產者暫停了本地事務的執行,MQ Server則會定時(默認1分鐘)向生產者發送回查消息,檢查生產者的本地事務狀態。而後生產者會根據回查的本地事務執行結果向MQ Server再次發送commit或rollback消息
概念術語:
消息三態:
要想實現RocketMQ事務消息的話,須要按照流程圖編寫一些代碼。在開始編碼以前,先在數據庫中建立一張RocketMQ的事務日誌表,用做於本地事務回查的依據,表結構以下:
而後再建一張表,做爲事務方法操做的數據表,表結構以下:
接着開始寫代碼,首先定義一個service,裏面有帶有事務註解的方法以及發送事務消息的方法。具體代碼以下:
package com.zj.node.contentcenter.service.test; import com.zj.node.contentcenter.dao.content.NoticeMapper; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import lombok.Data; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Date; import java.util.UUID; /** * @author 01 * @date 2019-08-08 **/ @Service @RequiredArgsConstructor public class TestProducerService { private final RocketMQTemplate rocketMQTemplate; private final NoticeMapper noticeMapper; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; public String testSendMsg(Notice notice) { // topic String topic = "test-topic"; // 生產者所在的事務組 String txProducerGroup = "tx-test-producer-group"; // 生產事務id String transactionId = UUID.randomUUID().toString(); // 發送半消息 rocketMQTemplate.sendMessageInTransaction( txProducerGroup, topic, // 消息體 MessageBuilder.withPayload("事務消息") // header是消息的頭部分,能夠用做傳參 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) .setHeader("notice_id", notice.getId()) .build(), // 傳遞到executeLocalTransaction的參數 notice); return "send message success"; } @Transactional(rollbackFor = Exception.class) public void updateNotice(Integer noticeId, Notice notice) { Notice newNotice = new Notice(); newNotice.setId(noticeId); newNotice.setContent(notice.getContent()); noticeMapper.updateByPrimaryKeySelective(newNotice); } @Transactional(rollbackFor = Exception.class) public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) { updateNotice(noticeId, notice); // 寫入事務日誌 rocketmqTransactionLogMapper.insertSelective( RocketmqTransactionLog.builder() .transactionId(transactionId) .log("updateNotice") .build() ); } }
實現一個本地事務監聽器,用於執行事務方法及提供本地事務狀態的回查方法。具體代碼以下:
package com.zj.node.contentcenter.rocketmq; import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper; import com.zj.node.contentcenter.domain.entity.content.Notice; import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog; import com.zj.node.contentcenter.service.test.TestProducerService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** * 本地事務監聽器 * * @author 01 * @date 2019-08-08 **/ @Slf4j @RequiredArgsConstructor // 這裏的txProducerGroup須要與sendMessageInTransaction裏設置的一致 @RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group") public class TestTransactionListener implements RocketMQLocalTransactionListener { private final TestProducerService service; private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper; /** * 用於執行本地事務的方法 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.info("執行本地事務方法. 事務id: {}", transactionId); // header裏拿出來的都是String類型 Integer noticeId = Integer.parseInt((String) headers.get("notice_id")); try { // 執行帶有事務註解的方法 service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId); // 正常執行,向MQ Server發送commit消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事務方法發生異常,消息將被回滾", e); // 發生異常向MQ Server發送rollback消息 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 用於回查本地事務的執行結果 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); log.warn("回查本地事務狀態. 事務id: {}", transactionId); // 按事務id查詢日誌數據 RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne( RocketmqTransactionLog.builder() .transactionId(transactionId) .build() ); // 若是能按事務id查詢出來數據表示本地事務執行成功,沒有數據則表示本地事務執行失敗 if (transactionLog == null) { log.warn("本地事務執行失敗,事務日誌不存在,消息將被回滾. 事務id: {}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; } return RocketMQLocalTransactionState.COMMIT; } }
簡單說明一下這些方法的執行流程:
首先調用
TestProducerService.testSendMsg
向MQ Server發送半消息,從代碼也能夠看到該方法裏不會執行本地事務方法。當MQ Server接收半消息成功後,會告訴生產者接收成功,接着就會執行本地事務監聽器裏的executeLocalTransaction
方法,該方法裏會調用TestProducerService
裏帶有事務註解的方法updateNoticeWithRocketMQLog
,並在事務方法執行完畢後返回本地事務狀態給MQ Server。若executeLocalTransaction
方法返回的事務狀態是UNKNOWN
或者該方法出於某種緣由沒有被執行完畢,那麼MQ Server就接收不到二次確認消息,默認會在一分鐘後向生產者發送回查消息,生產者接收到回查消息的話就會執行本地事務監聽器裏的checkLocalTransaction
方法,經過事務日誌記錄表的數據來確認該事務狀態並返回。
因爲rocketmq有本身內部的日誌體系,因此默認不會使用Slf4j。體現到executeLocalTransaction
方法的話,就是若是該方法的執行過程當中拋出了異常的話,異常信息不會被打印到控制檯,而是輸出到rocketmq_client.log日誌文件中。相關源碼:org.apache.rocketmq.client.log.ClientLogger
若是但願rocketmq的日誌輸出到控制檯的話,須要在啓動類的main方法中增長以下代碼:
// 讓rocketmq使用slf4j日誌 System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");