MQ,全稱消息隊列,如今市面上有不少種消息隊列,像你們耳熟能詳的RabbitMQ,RocketMQ,Kafka等等,接下來爲你們詳細的介紹消息隊列。數據庫
俗話說的好,多用多錯,不能爲了技術而技術,要結合實際的業務場景使用合適的技術。
例如你用了Redis緩存,這時候你也許得考慮主從架構。由於主從架構,你可能得考慮主從切換。同時也許你還得考慮集羣模式。這就大大的提升了開發與維護的成本。所以選擇一種合適的技術是十分重要的。緩存
RabbitMQ 試用於邏輯相對複雜,同時對於隊列的性能需求相對不高的場景。bash
RabbitMQ
發送消息時會先發送到Exchange
(交換機),指定routing key
。Exchange
和隊列綁定時指定 binding key
routing key
和 binding key
的匹配規則成立時,纔會將消息發往指定的隊列。RabbitMQ中 數據存儲的最小單位是queue
,而在kafka中 的最小單位是partition
,partition
包含在topic
中,見下圖。網絡
gourp1
監聽一個
topic
, c1和c2會分別消費一個分區。 若是是一個消費組的話,必須指定默認消費者組名稱
每條消息在分區中都會有一個偏移量。在消息進入分區前,會給新的消息分配一個惟一的偏移量,保證了每一個分區中消息的順序性。
在RabbitMQ
中, 多個消費者監聽一個隊列,因爲消息是異步發送的,因爲網絡等緣由,可能Consumer2
先接受到Message2
,這樣會致使消息的處理順序不會按照消息存入隊列的順序。固然,咱們能夠經過一個消費者監聽一個隊列來保證消息的有序性。
而kafka呢,多個消費者在一個消費者組中,監聽一個topic
。topic
中的不一樣的分區會被不一樣的消費者消費(一個消費者消費一個分區)。固然,咱們只能保證每一個分區的消息時被順序消費的,不一樣的分區則不能保證了。架構
/** 一個消費者 一個消費者組 一個分區 **/
public void send1() {
kafkaTemplate.send("test1",0,"test1","data1");
System.out.println("生產者發送消息");
}
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者接受消息" + record);
}
}
複製代碼
生產者發送消息
消費者接受消息ConsumerRecord(topic = test1, partition = 0, offset = 0, CreateTime = 1560260495702, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼
## 增長分區數
kafka-topics --zookeeper localhost:2181 --alter --topic test1 --partitions 2
## 結果
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
複製代碼
## 生產者
kafkaTemplate.send("test1",0 #分區數,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生產者發送消息");
複製代碼
## 消費者
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者1接受消息" + record);
}
}
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 4, CreateTime = 1560261029521, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消費者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 1, CreateTime = 1560261029521, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
複製代碼
/** 2個消費者 2個消費者組 1個分區 **/
public void send3() {
kafkaTemplate.send("test1",0,"test1","data1");
System.out.println("生產者發送消息");
}
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者1接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test2")
public void test2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者2接受消息" + record);
}
}
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消費者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼
/** 2個消費者 1個消費者組 2個分區 **/
public void send4() {
kafkaTemplate.send("test1",0,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生產者發送消息");
}
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者1接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者2接受消息" + record);
}
}
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消費者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼
/** 2個消費者 1個消費者組 2個分區 **/
public void send4() {
kafkaTemplate.send("test1",0,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生產者發送消息");
}
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者1接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test2(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者2接受消息" + record);
}
}
@KafkaListener(topics = {"test1"}, groupId = "test1")
public void test3(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("消費者3接受消息" + record);
}
}
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消費者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼