白菜Java自習室 涵蓋核心知識mysql
RocketMQ 前身叫作 MetaQ, 在 MeataQ 發佈 3.0 版本的時候更名爲 RocketMQ,其本質上的設計思路和 Kafka 相似,可是和 Kafka 不一樣的是其使用 Java 進行開發,因爲在國內的 Java 受衆羣體遠遠多於 Scala,因此 RocketMQ 是不少以 Java 語言爲主的公司的首選。一樣的 RocketMQ 和 Kafka 都是 Apache 基金會中的頂級項目,他們社區的活躍度都很是高,項目更新迭代也很是快。markdown
對於 RocketMQ 的架構圖,在大致上來看和 Kafka 並無太多的差異,可是在不少細節上是有不少差異的,接下來會一一進行講述。架構
RocketMQ 架構圖中多個 Producer,多個主 Broker,多個從 Broker,每一個 Producer 能夠對應多個 Topic,每一個 Consumer 也能夠消費多個 Topic。ide
Broker 信息會上報至 NameServer,Consumer 會從 NameServer 中拉取 Broker 和 Topic 的信息。post
Producer:消息生產者,向 Broker 發送消息的客戶端spa
Consumer:消息消費者,從 Broker 讀取消息的客戶端設計
Broker:消息中間的處理節點,這裏和 kafka 不一樣,kafka 的 Broker 沒有主從的概念,均可以寫入請求以及備份其餘節點數據,RocketMQ 只有主 Broker 節點才能寫,通常也經過主節點讀,當主節點有故障或者一些其餘特殊狀況纔會使用從節點讀,有點相似- 於 mysql 的主從架構。3d
Topic:消息主題,一級消息類型,生產者向其發送消息, 消費者讀取其消息。code
Group:分爲 ProducerGroup, ConsumerGroup, 表明某一類的生產者和消費者,通常來講同一個服務能夠做爲 Group, 同一個 Group 通常來講發送和消費的消息都是同樣的。
Tag:Kafka 中沒有這個概念,Tag 是屬於二級消息類型,通常來講業務有關聯的可使用同一個 Tag, 好比訂單消息隊列,使用 Topic_Order, Tag 能夠分爲 Tag_食品訂單, Tag_服裝訂單等等。
Queue: 在 kafka 中叫 Partition, 每一個 Queue 內部是有序的,在 RocketMQ 中分爲讀和寫兩種隊列,通常來講讀寫隊列數量一致,若是不一致就會出現不少問題。
NameServer:Kafka 中使用的是 ZooKeeper 保存 Broker 的地址信息,以及 Broker 的 Leader 的選舉,在 RocketMQ 中並無採用選舉 Broker 的策略,因此採用了無狀態的 NameServer 來存儲,因爲NameServer 是無狀態的,集羣節點之間並不會通訊,因此上傳數據的時候都須要向全部節點進行發送。
不少朋友都在問什麼是無狀態呢?狀態的有無實際上就是數據是否會作存儲,有狀態的話數據會被持久化,無狀態的服務能夠理解就是一個內存服務,NameServer 自己也是一個內存服務,全部數據都存儲在內存中,重啓以後都會丟失。
在 RocketMQ 中的每一條消息,都有一個 Topic,用來區分不一樣的消息。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生產者寫入的新消息。
在 Topic 中有分爲了多個 Queue,這實際上是咱們發送/讀取消息通道的最小單位,咱們發送消息都須要指定某個寫入某個 Queue,拉取消息的時候也須要指定拉取某個 Queue,因此咱們的順序消息能夠基於咱們的 Queue 維度保持隊列有序,若是想作到全局有序那麼須要將 Queue 大小設置爲1,這樣全部的數據都會在Queue 中有序。
在上圖中咱們的 Producer 會經過一些策略進行 Queue 的選擇:
非順序消息:非順序消息通常直接採用輪訓發送的方式進行發送。
順序消息:根據某個 Key 好比咱們常見的訂單Id, 用戶Id,進行 Hash,將同一類數據放在同一個隊列中,保證咱們的順序性。
咱們同一組 Consumer 也會根據一些策略來選 Queue,常見的好比平均分配或者一致性 Hash 分配。 要注意的是當 Consumer 出現下線或者上線的時候,這裏須要作重平衡,也就是 Rebalance,RocketMQ 的重平衡機制以下:
因爲重平衡是定時作的,因此這裏有可能會出現某個 Queue 同時被兩個 Consumer 消費,因此會出現消息重複投遞。
Kafka 的重平衡機制和 RocketMQ 不一樣,Kafka 的重平衡是經過 Consumer 和 Coordinator 聯繫來完成的,當 Coordinator 感知到消費組的變化,會在心跳過程當中發送重平衡的信號,而後由一個ConsumerLeader 進行重平衡選擇,而後再由 Coordinator 將結果通知給全部的消費者。
Queue 讀寫數量不一致怎麼辦?
在 RocketMQ 中 Queue 被分爲讀和寫兩種,在最開始接觸 RocketMQ 的時候一直覺得讀寫隊列數量配置不一致不會出現什麼問題的,好比當消費者機器不少的時候咱們配置不少讀的隊列,可是實際過程當中發現會出現消息沒法消費和根本沒有消息消費的狀況。
直接定義好一個producer,建立好Message,調用send方法便可。
public class Producer {
public static void main(String[] args)
throws MQClientException,
InterruptedException {
DefaultMQProducer producer = new
DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes
(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
複製代碼
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere
(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener
(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
複製代碼