1、專業術語java
消費生產者,負責產生消息,通常由業務系統負責產生消息
服務器
消息消費者,負責消費消息,通常是後臺系統負責異步消費網絡
Consumer的一種,應用一般向Consumer對象註冊一個Listener接口,一旦收到消息,Consumer對象馬上回調Listener接口方法數據結構
Consumer的一種,應用一般主動調用Consumer的拉消息方法,從Broker拉消息,主動權由應用控制異步
一類Producer的集合名稱,這類Consumer一般發送一類消息,且發送邏輯一致。分佈式
一類Consumer的集合名稱,這類Consumer一般消費一類消息,且消費邏輯一致。ide
消息中轉角色,負責存儲消息,轉發消息,通常也稱爲Server。在JMS規範中稱爲Provider。測試
一個消息被多個Consumer消費,即便這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每一個Consumer都消費一 次,廣播消費中的Consumer Group概念能夠認爲在消息劃分方面無心義ui
在CORBA Notification 規範中,消費方式都屬於廣播消費。spa
在JMS規範中,至關於JMS publish/subscribe model
一個Consumer Group 中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個Consumer Group有3個實例(多是3個進程,或者3臺機器),那麼每一個實例只消費其中的3條消息。
消費消息的順序要同發送消息的順序一致,在RocketMq中,主要指的是局部順序,即一類消息爲知足順序性,必須Producer單線程順序發送,且發送到同一隊列,這樣Consumer就能夠按照Producer發送的順序去消費消息。
順序消息的一種,正常狀況下能夠保證徹底的順序消息,可是一旦發生通訊異常,Broker重啓,因爲隊列總數發生變化,哈希取模後定位的隊列會變化,產生短暫的消息順序不一致。
若是業務能容忍在集羣一次狀況(如某個Broker宕機或者重啓)下,消息短暫的亂序,使用普通順序方式比較合適。
順序消費的一種,不管正常異常狀況都能保證順序,可是犧牲了分佈式Failover特性,即Broker集羣中只要有一臺機器不可用,則整個集羣都不可用,服務可用性大大下降。
若是服務器部署爲同步雙寫模式,此缺陷可經過備機自動切換爲主避免,不過仍然會存在幾分鐘的服務不可用(依賴同步雙寫,主備自動切換,自動切換功能還沒有實現)
在RocketMq中,全部消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每一個存儲單元都是定長,訪問其中的存儲單元使用Offset來訪問,offset爲java long類型,64位,理論上在100年內不會溢出,因此任務是長度無限,另外隊列中只保存最近幾天的數據,以前的數據會按照過時時間來刪除。
2、代碼示例
生產者:
package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 9:20. * @Description : */ public class produce { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce"); producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicQuickStart", "TagA", ("Hello RoctetMq : " +i ).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); Thread.sleep(1000); } producer.shutdown(); } }
生產者生產100條消息:
消費者:
package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * @author : Jixiaohu * @Date : 2018-04-19. * @Time : 9:20. * @Description : */ public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce"); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicQuickStart", "*"); //不配置默認一條 consumer.setConsumeMessageBatchMaxSize(10); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("拉取消息條數 " + msgs.size()); try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }
這邊設置了每次消費條數,我這邊先啓動Consumer訂閱ropic,而後啓動produce,看一下打印結果:
produce發送了100條數據,下面看一下,Consumer消費數據的狀況
你會發現,爲何每次獲取的消息都是1條,那上面設置的每次獲取最大的消息數目「10」,是否是不起做用了?
實際上是這樣的,
咱們正常的流程通常都是先啓動Consumer端,而後再啓動Producer端。Consumer端都是一條一條的消費的。可是有時候會出現先啓動Producer端,而後再啓動Consumer端這種狀況,那這個時候就是會批量消費了,這個參數就會有做用了。
3、消息的重試,
消息的重試大體分爲三種狀況,
①:Produce發送消息到MQ上,發送失敗。
看一下produce的代碼是怎麼實現的,這邊看一個大概的狀況
public class produce { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce"); //消息發送至mq失敗後的重試次數 producer.setRetryTimesWhenSendFailed(10); producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicQuickStart", "TagA", ("Hello RoctetMq : " + i).getBytes()); // SendResult sendResult = producer.send(msg); //增長一個超時參數,單位爲毫秒 SendResult sendResult = producer.send(msg, 1000); System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); Thread.sleep(1000); } producer.shutdown(); } }
②MQ推送消息至Consumer超時失敗(網絡波動)timeout。
這種狀況,timeout,MQ會無限循環,直到把消息推送至Consumer,MQ沒有接收到RECONSUME_LATER或CONSUME_SUCCESS
③Consumer處理消息後,返回RECONSUME_LATER,MQ也會按照策略發送消息 exception。
消息重試的測試是 1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
RocketMQ爲咱們提供了這麼屢次數的失敗重試,可是在實際中也許咱們並不須要這麼多重試,好比重試3次,尚未成功,咱們但願把這條消息存儲起來並採用另外一種方式處理,並且但願RocketMQ不要再重試呢,由於重試解決不了問題了!這該如何作呢?
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce"); consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicQuickStart", "*"); //不配置默認一條 consumer.setConsumeMessageBatchMaxSize(10); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("拉取消息條數 " + msgs.size()); try { // for (MessageExt msg : msgs) { MessageExt msg = msgs.get(0); String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags + "msgs:" + msgs); //注意,要先啓動Consumer,在進行發送消息(也就是先訂閱服務,再發送) if ("Hello RoctetMq : 4".equals(msgBody)) { System.out.println("===========失敗消息開始========"); System.out.println(msgBody); System.out.println("===========失敗消息結束========"); //異常 int a = 1 / 0; } // } } catch (Exception e) { e.printStackTrace(); if (msgs.get(0).getReconsumeTimes() == 3) { // 該條消息能夠存儲到DB或者LOG日誌中,或其餘處理方式 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }
查看下打印結果:
這邊能看到重試次數。