參考資料:java
官網下載二進制安裝包(固然也可下載源碼包後本身編譯):下載地址mysql
解壓redis
unzip rocketmq-all-4.3.0-bin-release.zip
修改配置sql
在 conf/broker.conf
中新增數據庫
brokerIP1 = 192.168.195.88 autoCreateTopicEnable = true # 線上環境應該設爲false
在bin/runbroker.sh
中修改JVM內存大小,默認是8G,通常本身電腦的上虛擬機可能沒這麼大apache
啓動架構
# 後臺啓動NameServer nohup sh bin/mqnamesrv -n 192.168.195.88:9876 & # 查看日誌,看是否啓動成功 tail -f ~/logs/rocketmqlogs/namesrv.log # 後臺啓動Broker nohup sh bin/mqbroker -n 192.168.195.88:9876 -c conf/broker.conf & # 查看日誌,看是否啓動成功 tail -f ~/logs/rocketmqlogs/broker.log
中止dom
sh bin/mqshutdown broker sh bin/mqshutdown namesrv
注意:異步
在RocketMQ的bin目錄下有一個mqadmin腳本,它充當着控制檯的角色,能夠用來完成咱們經常使用的操做。如不喜歡命令可安裝第三方的可視化操控界面工具async
建立topic
sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t testTopic1 # 參數 -n爲nameServe服務地址 -b爲broker服務地址 -t爲topic的名字
查詢全部topic
sh mqadmin topicList -n localhost:9876 # 參數 -n爲nameServe服務地址
查看Topic統計信息
sh mqadmin topicStatus -n localhost:9876 -t testTopic1 # 參數 -n爲nameServe服務地址 -t爲topic的名字
查看消費組信息
sh mqadmin consumerProgress -n localhost:9876 -g simple_push_consumer_group_01 # 參數 -n爲nameServe服務地址 -g爲消費組的名字,無則表示查看全部的
查看全部命令:sh mqadmin
注意點:
RocketMQ的數據存儲主要有三個內容:ConsumeQueue、CommitLog和IndexFile
ConsumeQueue是消息的邏輯隊列,是由20字節定長的二進制數據單元組成,其中commitLogOffset(8 byte)、msgSize(4 byte)、tagsHashCode(8 byte);每一個Topic和QueuId對應一個ConsumeQueue;單個文件大小約5.72M,每一個文件由30W條數據組成,每一個文件默認大小爲600萬個字節,當一個ConsumeQueue類型的文件寫滿了,則寫入下一個文件。
CommitLog是消息存放的實際物理位置,每一個Broker下全部的Topic下的消息隊列共用同一個CommitLog的日誌數據文件來存儲,全部RocketMQ的寫入是順序的。單個CommitLog文件的默認大小爲1G。
IndexFile即消息索引,若是一個消息包含key值的話,會使用IndexFile存儲消息索引,其每一個單元的數據構成爲keyHash(4 byte)、commitLogOffset(8 byte)、timestamp(4 byte)、nextIndexOffset(4byte)。IndexFile主要是用來根據key來查詢消息。
Producer端發送消息最終寫入的是CommitLog,寫入CommitLog有同步刷盤和異步刷盤兩種方式:
同步刷盤:只有在消息真正持久化至磁盤後,Broker端纔會真正地返回給Producer端一個成功的ACK響應。
異步刷盤:只要消息寫入PageCache便可將成功的ACK返回給Producer端。
Consumer端先從ConsumeQueue讀取持久化消息的offset,隨後再從CommitLog中進行讀取消息的真正實體內容。因此實際上讀取操做是隨機而不是順序的,因此這也是消費速度是比Kafka低的緣由。
RocketMQ發送消息有三種方式
同步
消息發送後,等待服務端的ack響應,這種方式最可靠,但效率最低
異步
消息發送註冊回調函數,不需等待服務端的響應
單向
消息發送後,不關心服務端是否成功接受
若是Producer發送消息失敗,會自動重試,重試的策略:
RocketMQ消費消息主要有兩種方式:
pull模式
由消費者客戶端主動向服務端拉取消息。
通常狀況下,若是咱們沒有控制好pull的頻率,頻率太低時,則可能消費速度過低致使消息的積壓,頻率太高時,則可能發送過多無效或低效pull請求,增長了服務端負載。
爲了解決這個問題,RocketMQ在沒有足夠的消息時(如服務端沒有可消費的消息),並不會當即返回響應,而是保持並掛起當前請求,待有足夠的消息時在返回。而且咱們須要指定offset的起點和終點,而且須要咱們本身保存好本次消費的offset點,下次消費的時候好從上次的offset點開始拉取消息。
pull模式咱們並不常用。
push模式
由服務端主動地將消息推送給消費者。
push模式下,慢消費的狀況可能致使消費者端的緩衝區溢出。
可是在RocketMQ中並非真正的push,而是基於長輪訓的pull模式的來實現的僞push。具體的實現是:Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求後,若是有消息就當即返回數據,Consumer端收到返回的消息後,再回調消費者設置的Listener方法。若是broker在收到Pull請求時,消息隊列裏沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。
即消費失敗後,隔一段時間從新消費該消息。
重試隊列
RocketMQ會爲每一個消費組都設置一個Topic名稱爲%RETRY%+consumerGroup
的重試隊列(這裏須要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每一個Topic設置的),用於暫時保存由於各類異常而致使Consumer端沒法消費的消息。Consumer端出現異常失敗時,失敗的消息會從新發送給服務端的重試隊列。
死信隊列
重試隊列中超過配置的「最大重試消費次數」後就會移入到這個死信隊列中。在RocketMQ中,SubscriptionGroupConfig配置常量默認地設置了兩個參數,一個是retryQueueNums爲1(重試隊列數量爲1個),另一個是retryMaxTimes爲16(最大重試消費的次數爲16次)。Broker端經過校驗判斷,若是超過了最大重試消費次數則會將消息移至這裏所說的死信隊列。這裏,RocketMQ會爲每一個消費組都設置一個Topic命名爲%DLQ%+consumerGroup
的死信隊列。
通常在實際應用中,移入至死信隊列的消息,須要人工干預處理。
注意點:
RocketMQ的的默認延遲級別分爲16個,因此一條消息最大的重試次數爲16;
// 源碼位置:org.apache.rocketmq.store.config.MessageStoreConfig.class // 如需修改,則須要修改broker的配置,官方並不建議修改 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; //若是咱們在發送消息時設置消息的延遲級別爲3,則表示消息10s後才能被消費者發現 msg.setDelayTimeLevel(3);
RocketMQ的Message中的reconsumeTimes
屬性,表示該消息當前已重試的的次數,咱們能夠經過以下方法來控制最大重試的次數,超過最大重試次數的消息將移入死信隊列中。
// 設置最多重試3次, 默認16 consumer.setMaxReconsumeTimes(3);
RocketMQ自身的重試機制,默認消息的初始延遲級別就爲3,好像並無法修改(不敢確定)。源碼以下:
// 源碼位置:org.apache.rocketmq.client.impl.consumer.ProcessQueue.class public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { ... pushConsumer.sendMessageBack(msg, 3); ... }
public class Producer { public static void main(String[] args) { // 初始化一個生產者,生產組爲simple_producer_group DefaultMQProducer defaultMQProducer = new DefaultMQProducer("simple_producer_group_01"); // 設置NameServer地址 defaultMQProducer.setNamesrvAddr("192.168.195.88:9876"); try { // 啓動 defaultMQProducer.start(); Producer producer = new Producer(); producer.syncSend(defaultMQProducer); producer.asyncSend(defaultMQProducer); producer.onewaySend(defaultMQProducer); } catch (Exception e) { log.error("異常:", e); } finally { defaultMQProducer.shutdown(); } } /** * 同步方式 */ private void syncSend(DefaultMQProducer producer) throws InterruptedException, RemotingException,MQClientException, MQBrokerException, UnsupportedEncodingException { for(int i=0; i<100; i++){ String messageBody = "syncSend message" + i ; Message msg = new Message("testTopic1", "*", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); /* 設置消息延遲級別,默認有16個級別: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 注意,若是設置爲3,意思是10s再被消費 */ //msg.setDelayTimeLevel(2); SendResult sendResult = producer.send(msg, 10000); log.info("onewaySend發送結果:{}", sendResult); } } /** * 異步方式 */ private void asyncSend(DefaultMQProducer producer) throws UnsupportedEncodingException, RemotingException, MQClientException, InterruptedException { for(int i=0; i<100; i++){ String messageBody = "asyncSend message" + i ; Message msg = new Message("testTopic1", "*", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("asyncSend發送成功:{}", sendResult); } @Override public void onException(Throwable e) { log.info("asyncSend發送異常:{}", e); } }); } } /** * 單向方式 */ private void onewaySend(DefaultMQProducer producer) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException { for(int i=0; i<100; i++){ String messageBody = "onewaySend message" + i ; Message msg = new Message("testTopic1", "*", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); log.info("onewaySend發送完成"); } } }
public class Consumer { public static void main(String[] args) throws Exception { new Consumer().pushMode(); //new Consumer().pullMode(); } /** * push 消費模式 */ private void pushMode() throws MQClientException { // 設置消費組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple_push_consumer_group_01"); // 設置nameServer地址 consumer.setNamesrvAddr("192.168.195.88:9876"); // 設置從頭開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定topic consumer.subscribe("testTopic1", "*"); // 設置批量消費消息的數量,默認1 consumer.setConsumeMessageBatchMaxSize(1); // 設置最多重試3次, 默認16 consumer.setMaxReconsumeTimes(3); // 註冊消息監聽 consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> { try { MessageExt msg = msgs.get(0); log.info("收到消息, body:{}, reconsumeTimes={}, delayTimeLevel={}", new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getReconsumeTimes(), msg.getDelayTimeLevel()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.info("消費異常:", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 運行 consumer.start(); log.info("消費者啓動成功"); } /** 用於pull模式中記錄offset使用*/ private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>(); /** * pull 消費模式, 不建議使用 */ private void pullMode() throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("simple_pull_consumer_group_01"); consumer.setNamesrvAddr("192.168.195.88:9876"); consumer.start(); // 獲取topic中的全部隊列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("testTopic1"); for (MessageQueue mq : mqs) { log.info("消費隊列:{}", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); log.info("pullResult:{}", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: pullResult.getMsgFoundList().forEach(msgExt -> { try { log.info("收到消息:{}", new String(msgExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.info("異常:", e); } }); break; case NO_NEW_MSG: break SINGLE_MQ; case NO_MATCHED_MSG: case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { log.info("異常:", e); } } } } /** * 獲取偏移量 */ private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSET_TABLE.get(mq); if (offset != null) { return offset; } return 0; } /** * 保存偏移量 */ private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSET_TABLE.put(mq, offset); } }
邏輯是這樣的:
實例代碼(Springboot項目):
@Component @Slf4j public class TransactionProducer implements InitializingBean { @Autowired private TransactionListener transactionListener; private TransactionMQProducer producer; @Override public void afterPropertiesSet() throws Exception { producer = new TransactionMQProducer("transaction_producer_group_01"); producer.setNamesrvAddr("192.168.195.88:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); try { producer.start(); } catch (MQClientException e) { log.error("TransactionProducer 啓動異常:", e); } // 添加關閉鉤子 Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.shutdown())); } /** * 發送事務消息 */ public void produce() { try { String body= "transaction_test_1"; Message msg = new Message("transactionTopicTest1", "user", UUID.randomUUID().toString(), body.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); log.info("發送事務消息結果:{}", sendResult); } catch (MQClientException | UnsupportedEncodingException e) { log.info("發送事務消息異常:{}", e); } } }
@Component @Slf4j public class TransactionListenerImpl implements TransactionListener { @Resource(name = "jdbcTemplate1") private JdbcTemplate jdbcTemplate; /** 生產中能夠用redis或數據庫代替 */ private ConcurrentHashMap<String, LocalTransactionState> localTrans = new ConcurrentHashMap<>(); /** * 執行本地事務 * @param msg 事務消息 * @param arg 自定義參數 * @return 執行結果 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { jdbcTemplate.execute("insert into user(name) values('transaction_test_1')"); localTrans.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE); log.info("本地事務執行成功"); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e){ localTrans.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE); log.error("本地事務執行失敗"); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { LocalTransactionState state = localTrans.get(msg.getTransactionId()); log.info("執行事務回查,transactionId:{}, transactionState:{}", msg.getTransactionId(), state); if (null != state) { return state; } return LocalTransactionState.UNKNOW; } }
@Component @Slf4j public class TransactionConsumer implements InitializingBean { @Resource(name = "jdbcTemplate2") private JdbcTemplate jdbcTemplate; @Override public void afterPropertiesSet() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group_01"); consumer.setNamesrvAddr("192.168.195.88:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("transactionTopicTest1", "user"); // 只重試三次 consumer.setMaxReconsumeTimes(3); consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> { try { MessageExt msg = msgs.get(0); log.info("收到消息:{}", msg); // 正式項目中,咱們應該保存該msgId,防止消息的重複消費 // String msgId = msg.getMsgId(); String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); jdbcTemplate.execute("insert into user(name) values('"+body+"')"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.info("異常:", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); log.info("TransactionConsumer啓動成功"); } }
/** * 數據源配置,用的Mysql和Druid */ @Configuration public class DbConfig { @Bean(name = "dataSource1") DataSource dataSource1() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta1?useUnicode=true&characterEncoding=UTF8&useSSL=false"); dataSource.setUsername("root"); dataSource.setPassword("root"); return dataSource; } @Bean(name = "dataSource2") DataSource dataSource2() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta2?useUnicode=true&characterEncoding=UTF8&useSSL=false"); dataSource.setUsername("root"); dataSource.setPassword("root"); return dataSource; } @Bean("jdbcTemplate1") JdbcTemplate first(@Qualifier("dataSource1") DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(); jdbcTemplate.setDataSource(dataSource); return jdbcTemplate; } @Bean("jdbcTemplate2") JdbcTemplate second(@Qualifier("dataSource2") DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(); jdbcTemplate.setDataSource(dataSource); return jdbcTemplate; } }
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired TransactionProducer transactionProducer; /** * 測試分佈式事務 */ @Test public void testTransactionProducer() throws InterruptedException { transactionProducer.produce(); Thread.sleep(10000000); } }
如下內容沒有主要研究,主要摘自官網,有須要時再詳細研究。
原理和Kafka要作到有序差很少,即把消息放入一個隊列中,而後使用一個消費者去進行消費。
即每一個消費者消費都全部的消息
//設置廣播模式屬性便可 consumer.setMessageModel(MessageModel.BROADCASTING);
批量發送消息可提升生產者的性能。注意的是:同一批次的消息應該具備:相同的topic,相同的waitStoreMsgOK和沒有延時計劃,而且一批消息的總大小不該超過1M。
RocketMQ在消息過濾上是比較強大的,雖然咱們可能不會常常用到它。Message除了能夠Tag來加以區分外,咱們還能夠爲它添加額外的屬性。以下:
在發送消息時:
// 消息添加額外屬性 msg.putUserProperty("a", "3");
在消費消息時:
// 只有消費具備屬性a,a>= 0且a <= 3的消息 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
還有很是多的過濾方式如:in
and
or
not
is null
=
<
>
等等
RocketMQ不保證消息不重複,若是你的業務須要保證嚴格的不重複消息,須要你本身在業務端去重。