Kafka:Consumer

1.預覽

1.1 消費者組(Consumer Group)

  • 一個consumer group可能有若干個consumer實例
  • 同一個group裏面,topic的每條信息只能被髮送到group下的一個consumer實例
  • topic消息能夠被髮送到多個group

爲何須要consumer group?算法

consumer group是用於實現高伸縮性、高容錯性的consumer機制。組內的多個實例能夠同時讀取消息(不一樣的消息),而一旦某個consumer掛了,group會把這個實例的任務馬上交給其餘的consumer負責,不會丟失數據。這個過程叫作重平衡。數據庫

kafka實際上同時支持兩種消息引擎,基於隊列和基於發佈/訂閱。緩存

  • 全部的consumer實例都同屬於一個group——實現了隊列模型,每一個消息只被一個consumer處理。
  • consumer屬於不一樣的group——實現了基於發佈/訂閱模型。極端狀況下,每一個group只有一個consumer,那麼就至關於kafka把消息廣播到全部的consumer。

 1.2 位移(offset)

 這裏的位移指的是consumer端的offset,不是parititon那個。每一個consumer都會爲它消費的分區維護屬於本身的位置信息,記錄當前消費到該patition的哪一個位置。安全

kafka中,採用consumer group保存消費者端的offset,同時還引入了checkpoint機制按期對offset進行持久化。多線程

下圖展現了consumer端的offset保存方式,kafka consumer內部是使用一個map來保存其訂閱topic所屬分區的位移。app

 

1.3 位移提交

consumer客戶端須要按期向kafka集羣彙報本身消費數據的進度,這個過程稱爲位移提交。舊版本(0.9.0.0以前)的kafka consumer把位移提交到zookeeper。而以後的版本把位移提交到kafka的一個內部topic(__consumer_offsets)上,不依賴zk保存位移信息,因此在開發新版本的consumer時也不須要鏈接到zk。__consumer_offset 主題是kafka自行建立的,用戶不要擅自刪除。它保存的是consumer的位移信息,每條消息格式大體以下:socket

1.4 消費組重平衡(consumer group rebalance)

 它本質上是一種協議,規定一個consumer group下全部consumer如何達成一致來分配訂閱topic的分區。好比一個topic有100個partition,有一個group訂閱該topic,其中有50個consumer,那麼consumer group會爲每一個consumer平均分配兩個分區,即每一個consumer負責兩個分區的數據讀取。這個過程就成爲rebalance。ide

 

2.構建consumer

 

 

3.消息輪詢

3.1 poll內部原理

在Kafka1.0.0版本中,Java Consumer是一個多線程或者說是一個雙線程的Java進程——建立KafkaConsumer的爲用戶主線程,同時consumer後臺還有一個心跳線程。KafkaConsumer的poll()方法運行在主線程。這代表:消費者組執行rebalance、消息獲取、coordinator管理等操做都運行在主線程。spa

3.2 poll使用方法

每次poll方法返回的都是訂閱分區上的一組消息,若是某些分區沒有準備好,可能會返回空。線程

try {
    while (true) {
        // 一次poll()能夠拿到不少數據,不足1s時會阻塞,1000ms是最大阻塞時間
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> r : records) {
            System.out.printf("offset=%d, key=%s, value=%s, partition=%d\n",
                    r.offset(), r.key(), r.value(), r.partition());
        }
    }
} finally {
    consumer.close();
}

 

poll()方法根據當前consumer消費位移返回消息集合。

若是poll方法沒有給定參數,那麼consumer端會阻塞以等待數據不斷積累並最終知足consumer的需求(好比要一次至少獲取1m的數據);

若是給定了參數,那麼等待時間超過了指定超時時間就返回。

Java Consumer是非線程安全的,若是把它用到多線程中,會拋出KafkaConsumer is not safe for multi-threaded access異常。

超時參數的用處:

假設用戶除了獲取數據之外還須要按期執行其餘的常規任務(每隔10s須要把消費狀況記錄到日誌中),用戶就能夠設置consumer.poll(10000),讓consumer在等待kafka消息的同時還能夠按期執行其餘任務。

若是程序惟一的任務是從kafka獲取消息而後處理,那麼能夠採用如下方法

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> r : records) {
            System.out.printf("offset=%d, key=%s, value=%s, partition=%d\n",
                    r.offset(), r.key(), r.value(), r.partition());
        } 
    }
} catch (WakeupException e) {
    // 忽略異常處理
}finally {
    consumer.close();
}

 

這段代碼可讓consumer無限等待,而後在另一個線程中調用consumer.wakeup()來觸發異常,注意,用戶能夠安全地在另外一個線程中調用consumer.wakeup(),這時特例,其餘方法都是不安全的。

 總結以上,poll()的使用方法:

  • consumer須要按期執行其餘子任務,推薦poll(較小超時時間) + 運行標識布爾變量
  • consumer不須要按期執行任務,推薦poll(MAX_VALUE) + 捕獲 WakeupException的方式

 

4.位移管理

4.1 consumer位移

consumer須要爲它要讀取的分區保存消費進度,即分區中當前最新消費消息的位置,這個位置就成爲offset。consuemr須要按期向kafka集羣提交本身的位置信息這裏的位移值一般指下一條待消費的消息位置

offset是實現消息交付語義的保證,以下:

  • 最多一次:消息可能丟失,但不會重複處理
  • 最少一次:消息不會丟失,但可能會重複處理
  • 精確一次:消息必定會被處理一次且只會被處理一次

若是consumer在消費前就提交了位移,那麼能夠實現at most once語義;若是在消費以後提交了位移,可實現at least once語義。

 

consumer中的位置信息不少,下面要給出區別:

  • 上次提交位移:consumer最近一次提交的位移
  • 當前位移:已讀取但未提交時的位置
  • 水位(High Watermark):它不屬於consumer管理的範圍,屬於分區日誌管轄。consumer只能讀取處於HW如下的數據
  • 日誌終端位移(Log End Offset):它不屬於consumer管理的範圍,表示某個分區副本當前保存消息最大的位移。

 4.2 consumer位移管理

consumer會在kafka集羣的全部broker中選擇一個broker做爲consumer group的coordinator,用於實現組成員管理、消費分配方案指定以及位移提交等。

consumer group首次啓動時,因爲沒有初始的位移信息,coordinator必須爲其肯定初始值,這就是consumer參數 auto.offset.reset 的做用,一般,要麼從最先(earliest)(從頭開始消費),要麼從最新(latest)(從最新append到partition日誌的位置開始消費)開始。

當consumer運行了一段時間以後,它必須提交本身的位移。若是consumer崩潰或被關閉,它負責的分區就會分配給其餘的consumer。

consumer提交位移的主要機制是經過向所屬的coordinator發送位移提交請求來實現,每一個位移提交請求都會往__consumer_offsets對應的分區追加一條消息。

4.3 自動提交和手動提交

默認狀況下,consumer是自動提交位移的,間隔是5秒(CDH5.14 1.0.1+kafka-3.1.0 是60秒)。能夠經過 auto.commit.interval.ms 設置。

手動提交:由用戶自行確當消息什麼時候被真正處理完畢並提交位移。以下面的例子:

final int minBatchSize = 500;
// 緩存
List<ConsumerRecord<String,String>> buffer = new ArrayList<>(minBatchSize);
try {
    while (true) {
        ConsumerRecords<String,String> records = consumer.poll(1000);
        records.forEach(buffer::add);
        if (buffer.size() >= minBatchSize){
            // 插入數據庫
            insertIntoDb(buffer);
            // 等數據插入數據庫以後,再同步提交位移
            consumer.commitSync();
            // 若是提交位移失敗了,那麼重啓consumer後會重複消費以前的數據,再次插入到數據庫中
            // 清空緩衝區
            buffer.clear();
        }
    }
} finally {
    consumer.close();
}

 若是要進行更加細粒度的控制,能夠進行分區層的手動提交位移:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        // 處理每一個分區的記錄
        records.partitions().forEach(p -> {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(p);
            partitionRecords.forEach(pr -> {
                System.out.println(pr.offset() + ": " + pr.value());
            });
            // 獲取該partition最後一個消息的位移
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            // 提交的offset應該是下一次插入消息的位置
            consumer.commitSync(Collections.singletonMap(p,new OffsetAndMetadata(lastOffset+1)));
        });
    }
} finally {
    consumer.close();
}

總結如下,自動提交和手動提交的區別:

  使用方法 優點 劣勢 交付語義保證 使用場景
自動提交 默認或顯式配置enable.auto.commit=true 簡單 沒法精確控制,提交失敗後不易處理 最少一次 對消息交付語義無需求,可容忍必定的消息丟失
手動提交 設置enable.auto.commit=false,並調用consumer.Sync()或consumer.Async()提交 可精確控制位移提交 額外開發成本,須自行提交 易實現最少一次,依賴外部狀態能夠實現精確一次 消息處理邏輯重,不容許消息丟失

 

 

 

 

 

 

 

 

 

 

5 重平衡(Rebalance)

5.1 預覽

consumer group的rebalance本質上是一組協議,規定了consumer group是如何達成一致來分配訂閱topic的全部分區的。

對於每一個組,kafka的某個broker會被選舉爲組協調者(group coordinatior)。coordinatior負責對組的狀態進行管理,它的主要職責就是當新成員到達時,促成組內的全部成員達成新的分區分配方案,即coordinator負責對組執行rebalance

5.2 rebalce觸發條件

有三個:

  • 組成員發生變動,好比加入了新的consumer或有consumer離開group,或有consumer崩潰
  • 組訂閱的topic數量發生改變。
  • 組訂閱的topic的分區數發生改變。

5.3 rebalance分區分配

consumer默認提供了3種分配策略:

  • range策略:將單個topic的分區按順序排列,而後把這些分區劃分紅固定大小的分區段並依次分配給每一個consumer。
  • round-robin:把topic的全部分區按順序排開,以輪詢的方式分配給每一個consumer。
  • sticky:// todo

下面給一個簡單的例子,假設目前某個consumer group有2個consumer A和B,當C加入時,觸發了rebalance條件,coordinator會進行rebalance,根據range策略從新分配了partition。

5.4 rebalance generation

rebalance generation用於標識某次rebalance。它是一個整數,從0開始。它主要是爲了保護consumer group的,好比上一屆的consumer因爲某些緣由延遲提交了offset,但rebalance以後該group產生了新的一屆成員,而此次延遲的offset提交的是舊的generation信息,所以會被consumer group拒絕。

 5.5 rebalance 協議

 group與coordinator共同使用rebalance協議來完成rebance操做,kafka提供了下面5個協議:

  • JoinGroup請求:consumer請求加入組
  • SyncGroup請求:group leader把分配方案同步更新到組內成員。
  • Hearbeat請求:consumer按期向coordinator彙報心跳代表本身存活
  • LeaveGroup請求:consumer主動通知coordinator該consumer即將離開組
  • DescribeGroup請求:查看組的全部信息

5.6 rebalance流程

 consumer group在rebalance以前必須肯定coordinator所在的broker,並建立與之通訊的socket。

肯定coordinator位置的算法以下:

  • 計算Math.abs(groupID.hashCode)%offsets.topic.num.partitions,假設結果爲x
  • 尋找__consumer_offset分區x的leader所在的broker,該broker就是這個group的coordinator

 創建socket以後,開始進行rebalance。主要有2步:

  • 加入組:組內全部的consumer向coordinator發送JoinGroup請求,收集完畢後,coordinator從中選一個consumer做爲group的leader,並把全部的成員信息都發給這個leader。
  • 同步更新分配方案:leader開始制定分配方案,即根據分配策略決定每一個consumer負責topic的哪些分區。方案肯定後,leader會把這個分配方案以SyncGroup請求發送給coordinator。組內的全部成員都會發送SyncGroup請求,可是隻有leader纔會攜帶分配方案。coordinator接收到分配方案後把屬於每一個consumer的分配方案單獨拿出來,左右SyncGroupResponse返回給各自的consumer。

5.7 rebalce監聽器

這個監聽器的主要做用是在coordinator開啓一輪rebalance的先後進行一些操做,好比,要在rebalance前手動提交位移到第三方存儲。

要使用監聽器,要在consumer.subscribe()方法的第二個參數新建一個回調接口ConsumerRebalanceListener,裏面封裝了相關的邏輯,咱們須要實現onPartitionsRevoked(rebalace前調用)和onPartitionAssigned(rebalance後調用)方法。

 consumer.subscribe(Arrays.asList("test1"), new ConsumerRebalanceListener() {        //rebalance監聽器

    //在coordinator開啓新一輪rebalance前調用
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        //這裏能夠進行一些操做,好比把手動提交的位移存儲到第三方
        partitions.forEach(tp -> saveOffsetInExternalStore(consumer.position(tp)));
        joinStart.set(System.currentTimeMillis());
    }

    //在rebalance完成後調用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        totalRebalanceTimeMs.addAndGet(System.currentTimeMillis() - joinStart.get());        //更新總的rebalance時長
        // 從外部存儲讀取每一個topicPartition的位移,而後移動當前consumer的位移到該位置
        partitions.forEach(tp -> consumer.seek(tp,readOffsetFromExternalStore(tp)));
    }

    private long readOffsetFromExternalStore(TopicPartition tp) {
    }

    // 保存到數據庫
    private void saveOffsetInExternalStore(long position) {
    }
});

// 位移處理後就能夠從上面移動到的位置開始讀取了
try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        consumerRecords.forEach(r ->
                System.out.printf("offset=%d, key=%s, value=%s, partition=%d\n",
                        r.offset(), r.key(), r.value(), r.partition()));
    }

} finally {
    System.out.println("totoalRebalanceTimeMs: " + totalRebalanceTimeMs);
    consumer.close();
}

6 多線程消費 

KafkaConsumer是非線程安全的,多個線程中要避免共用一個KafkaConsumer。

那麼如何實現多線程的consumer消費呢?有兩種方法

1.每一個線程維護一個KafkaConsumer,每一個consumer消費固定數目的分區。

2.單個KafkaConsumer實例+多worker線程。僅由一個consumer實例接收消息,而後馬上交給其餘的工做線程進行消息的處理。

 7 獨立Consumer

前面討論的consumer都是以consumer group的形式存在的,group自動幫用戶執行分區分配和rebalance。

standalone consumer間彼此獨立工做互不干擾,任何一個consumer崩潰都不會影響其餘standalone consumer的工做。

使用standalone consumer的方法就是調用KafkaConsumer的assign方法。這個方法接收一個分區列表,直接賦予該consumer訪問這些分區的權力。

List<TopicPartition> partitions = new ArrayList<>(10);
consumer.partitionsFor("test1").forEach(partitionInfo ->
      partitions.add(new TopicPartition(partitionInfo.topic(), 0)));      //只訂閱分區0的消息
//賦予consumer訪問分區的能力
consumer.assign(partitions);

 注意:assign和subscribe不能混用。

相關文章
相關標籤/搜索