「簡單」的消息隊列與kafka

小時候就特別喜歡龍系精靈,特別是乘龍,後來才知道只是冰水。。。尷尬。 在寵物小精靈中,乘龍一直是訓練家的載人夥伴,和咱們下面的MQ好像有幾分類似呢~~

前言

MQ,全稱消息隊列,如今市面上有不少種消息隊列,像你們耳熟能詳的RabbitMQ,RocketMQ,Kafka等等,接下來爲你們詳細的介紹消息隊列。數據庫

使用場景

俗話說的好,多用多錯,不能爲了技術而技術,要結合實際的業務場景使用合適的技術。
例如你用了Redis緩存,這時候你也許得考慮主從架構。由於主從架構,你可能得考慮主從切換。同時也許你還得考慮集羣模式。這就大大的提升了開發與維護的成本。所以選擇一種合適的技術是十分重要的。緩存

異步解耦

  1. 若是是單體應用,在用戶下單時,下訂單,減庫存,物流記錄這三個操做是同步阻塞的。若是將這三步的操做存放到各自的消息隊列中,而後監聽這三個消息隊列,那麼能夠大大的減小時間。
  2. 但同時,因爲是分佈式應用,你可能得考慮分佈式事務的必要性了。此外,爲了保證MQ的高可用,你可能得去調研市場上集羣模式支持的最好的中間件了。

流量削峯

  1. 在一些秒殺場景,爲了防止極高的併發,將數據庫沖垮,除了能夠在負載均衡端進行限流的設置,同時也能夠將用戶的請求數據存放到消息隊列中,而後經過消息隊列中控制處理速度。
  2. 同時這個處理的進程能夠監聽zk的某個節點,在秒殺結束後,修改這個節點的值,進程監聽到這個節點值的變化,將再也不處理請求(能夠作成攔截器)。這個有時也被用來做爲大數據中流處理的緩衝

市面上的消息隊列

RabbitMQ

介紹

RabbitMQ 試用於邏輯相對複雜,同時對於隊列的性能需求相對不高的場景。bash

模型

  1. RabbitMQ發送消息時會先發送到Exchange(交換機),指定routing key
  2. Exchange和隊列綁定時指定 binding key
  3. 只有當routing keybinding key 的匹配規則成立時,纔會將消息發往指定的隊列。
  4. 兩個消費者監聽同一個隊列,一條消息不會被兩個隊列同時消費。這是和kafka有區別的。

kafka

kafka topic

RabbitMQ中 數據存儲的最小單位是queue,而在kafka中 的最小單位是partition,partition包含在topic中,見下圖。網絡

直連模型

一個消費組 gourp1 監聽一個 topic, c1和c2會分別消費一個分區。 若是是一個消費組的話,必須指定默認消費者組名稱

訂閱模型

多個消費組(每一個組中只有一個消費者), 監聽一個topic,每一個消費者會消費topic中的全部分區。

kafka 偏移量(offset)

每條消息在分區中都會有一個偏移量。在消息進入分區前,會給新的消息分配一個惟一的偏移量,保證了每一個分區中消息的順序性。
RabbitMQ中, 多個消費者監聽一個隊列,因爲消息是異步發送的,因爲網絡等緣由,可能Consumer2先接受到Message2,這樣會致使消息的處理順序不會按照消息存入隊列的順序。固然,咱們能夠經過一個消費者監聽一個隊列來保證消息的有序性。
而kafka呢,多個消費者在一個消費者組中,監聽一個topictopic中的不一樣的分區會被不一樣的消費者消費(一個消費者消費一個分區)。固然,咱們只能保證每一個分區的消息時被順序消費的,不一樣的分區則不能保證了。架構

kafka實戰

一個分區,一個消費者組,一個消費者

/** 一個消費者 一個消費者組 一個分區 **/
    public void send1() {
        kafkaTemplate.send("test1",0,"test1","data1");
        System.out.println("生產者發送消息");
    }
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("消費者接受消息" + record);
        }
    }
複製代碼
生產者發送消息
消費者接受消息ConsumerRecord(topic = test1, partition = 0, offset = 0, CreateTime = 1560260495702, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼

2個分區,一個消費者組,一個消費者,消費全部分區

## 增長分區數
kafka-topics --zookeeper localhost:2181 --alter --topic test1 --partitions 2
## 結果
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
複製代碼
## 生產者
kafkaTemplate.send("test1",0 #分區數,"test1","data1");
kafkaTemplate.send("test1",1,"test11","data11");
System.out.println("生產者發送消息");
複製代碼
## 消費者
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("消費者1接受消息" + record);
        }
    }
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 4, CreateTime = 1560261029521, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消費者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 1, CreateTime = 1560261029521, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
複製代碼

一個分區,2個消費者組,2個消費者 ,每一個消費者都消費一個分區

/** 2個消費者 2個消費者組 1個分區 **/
    public void send3() {
        kafkaTemplate.send("test1",0,"test1","data1");
        System.out.println("生產者發送消息");
    }
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test2")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者2接受消息" + record);
        }
    }
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
消費者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 7, CreateTime = 1560261183386, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼

2個分區,2個消費者,1個消費者組(負載均衡)

/** 2個消費者 1個消費者組 2個分區 **/
    public void send4() {
        kafkaTemplate.send("test1",0,"test1","data1");
        kafkaTemplate.send("test1",1,"test11","data11");
        System.out.println("生產者發送消息");

    }
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者2接受消息" + record);
        }
    }
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消費者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼

2個分區,3個消費者,1個消費者組(有個消費者未消費分區)

/** 2個消費者 1個消費者組 2個分區 **/
    public void send4() {
        kafkaTemplate.send("test1",0,"test1","data1");
        kafkaTemplate.send("test1",1,"test11","data11");
        System.out.println("生產者發送消息");

    }
複製代碼
@KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test1(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者1接受消息" + record);
        }
    }

    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test2(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者2接受消息" + record);
        }
    }
    
    @KafkaListener(topics = {"test1"}, groupId = "test1")
    public void test3(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("消費者3接受消息" + record);
        }
    }
複製代碼
消費者1接受消息ConsumerRecord(topic = test1, partition = 1, offset = 3, CreateTime = 1560261444482, serialized key size = 6, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = test11, value = data11)
消費者2接受消息ConsumerRecord(topic = test1, partition = 0, offset = 9, CreateTime = 1560261444482, serialized key size = 5, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = test1, value = data1)
複製代碼

kafka消息發送機制

xxxx

未完待續

相關文章
相關標籤/搜索