深刻了解kafka系列-消費者

前言

與生產者對應的是消費者,應用程序能夠經過KafkaConsumer來訂閱主題,並從訂閱的主題中拉取消息。不過在使用KafkaConsumer消費消息以前須要先了解消費者和消費組的概念,不然沒法理解如何使用KafkaConsumer。apache

<!--more-->json

Consumer

  • 消費者(Consumer)負責訂閱Kafka中的主題(Topic),而且從訂閱的主題上拉取消息。與其餘一些消息中間件不一樣的是:在Kafka的消費理念中還有一層消費組(Consumer Group)的概念,每一個消費者都有一個對應的消費組。

當消息發佈到主題後,只會被投遞給訂閱它的每一個消費組中的一個消費者。如圖所示,某個主題中共有4個分區(Partition):P0、P一、P二、P3。有兩個消費組A和B都訂閱了這個主題,消費組A中有4個消費者(C0、C一、C2和C3),消費組B中有2個消費者(C4和C5)。按照Kafka默認的規則,最後的分配結果是消費組A中的每個消費者分配到1個分區,消費組B中的每個消費者分配到2個分區,兩個消費組之間互不影響。每一個消費者只能消費所分配到的分區中的消息。換言之,每個分區只能被一個消費組中的一個消費者所消費。bootstrap

分區分配的演變(Rebalance)

咱們再來看一下消費組內的消費者個數變化時所對應的分區分配的演變。假設目前某消費組內只有一個消費者C0,訂閱了一個主題,這個主題包含 7 個分區:P0、P一、P二、P三、P四、P五、P6。也就是說,這個消費者C0訂閱了7個分區,具體分配情形如圖。服務器

消費者與消費組此時消費組內又加入了一個新的消費者C1,按照既定的邏輯,須要將原來消費者C0的部分分區分配給消費者C1消費,以下圖所示。消費者C0和C1各自負責消費所分配到的分區,彼此之間並沒有邏輯上的干擾。ide

緊接着消費組內又加入了一個新的消費者C2,消費者C0、C1和C2按照下圖方式各自負責消費所分配到的分區。.net

消費者與消費組這種模型可讓總體的消費能力具有橫向伸縮性,咱們能夠增長(或減小)消費者的個數來提升(或下降)總體的消費能力。對於分區數固定的狀況,一味地增長消費者並不會讓消費能力一直獲得提高,若是消費者過多,出現了消費者的個數大於分區個數的狀況,就會有消費者分配不到任何分區。參考圖以下,一共有8個消費者,7個分區,那麼最後的消費者C7因爲分配不到任何分區而沒法消費任何消息。線程

投遞模式

以上分配邏輯都是基於默認的分區分配策略進行分析的,能夠經過消費者客戶端參數partition.assignment.strategy 來設置消費者與訂閱主題之間的分區分配策略,有關分區分配的更多細節能夠再接下來的系列繼續聊。翻譯

對於消息中間件而言,通常有兩種消息投遞模式:debug

點對點(P2P,Point-to-Point)模式: 點對點模式是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息。code

發佈/訂閱(Pub/Sub)模式: 發佈訂閱模式定義瞭如何向一個內容節點發布和訂閱消息,這個內容節點稱爲主題(Topic),主題能夠認爲是消息傳遞的中介,消息發佈者將消息發佈到某個主題,而消息訂閱者從主題中訂閱消息。主題使得消息的訂閱者和發佈者互相保持獨立,不須要進行接觸便可保證消息的傳遞,發佈/訂閱模式在消息的一對多廣播時採用。

Kafka 同時支持兩種消息投遞模式,而這正是得益於消費者與消費組模型的契合:

  • 若是全部的消費者都隸屬於同一個消費組,那麼全部的消息都會被均衡地投遞給每個消費者,即每條消息只會被一個消費者處理,這就至關於點對點模式的應用。

  • 若是全部的消費者都隸屬於不一樣的消費組,那麼全部的消息都會被廣播給全部的消費者,即每條消息會被全部的消費者處理,這就至關於發佈/訂閱模式的應用

消費組是一個邏輯上的概念,它將旗下的消費者歸爲一類,每個消費者只隸屬於一個消費組。每個消費組都會有一個固定的名稱,消費者在進行消費前須要指定其所屬消費組的名稱,這個能夠經過消費者客戶端參數group.id來配置,默認值爲空字符串。消費者並不是邏輯上的概念,它是實際的應用實例,它能夠是一個線程,也能夠是一個進程。同一個消費組內的消費者既能夠部署在同一臺機器上,也能夠部署在不一樣的機器上。

建立一個Kafka消費者

  • 如下代碼段顯示瞭如何建立KafkaConsumer:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer =
    new KafkaConsumer<String, String>(props);
  • 訂閱主題
consumer.subscribe(Collections.singletonList("customerCountries"));
  • 要訂閱全部test主題,咱們能夠:
consumer.subscribe(Pattern.compile("test.*"));
  • 輪詢循環

消費者API的核心是一個簡單的循環,用於輪詢服務器以獲取更多數據。 一旦用戶訂閱了主題,輪詢循環便會處理協調,分區從新平衡,心跳和數據獲取的全部詳細信息,從而爲開發人員提供了一個乾淨的API,該API僅從分配的分區中返回可用數據。 消費者的主體以下所示

try {
    while (true) { 1
        ConsumerRecords<String, String> records = consumer.poll(100); 2
        for (ConsumerRecord<String, String> record : records) 3
        {
            log.debug("topic = %s, partition = %d, offset = %d,"
                customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());

            int updatedCount = 1;
            if (custCountryMap.countainsKey(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            custCountryMap.put(record.value(), updatedCount)

            JSONObject json = new JSONObject(custCountryMap);
            System.out.println(json.toString(4)) 4
        }
    }
} finally {
    consumer.close(); 5
}
  • 反序列化
public class StringDeserializer implements Deserializer<String> {
  private String encoding = "UTF8";

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
      Object encodingValue = configs.get(propertyName);
      if (encodingValue == null)
          encodingValue = configs.get("deserializer.encoding");
      if (encodingValue instanceof String)
          encoding = (String) encodingValue;
  }

  @Override
  public String deserialize(String topic, byte[] data) {
      try {
          if (data == null)
              return null;
          else
              return new String(data, encoding);
      } catch (UnsupportedEncodingException e) {
          throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
      }
  }
}
  • 消息消費

Kafka中的消費是基於拉模式的。消息的消費通常有兩種模式:推模式和拉模式。推模式是服務端主動將消息推送給消費者,而拉模式是消費者主動向服務端發起請求來拉取消息。從輪詢循環代碼清單中能夠看出,Kafka中的消息消費是一個不斷輪詢的過程,消費者所要作的就是重複地調用poll()方法,而poll()方法返回的是所訂閱的主題(分區)上的一組消息。對於poll()方法而言,若是某些分區中沒有可供消費的消息,那麼此分區對應的消息拉取的結果就爲空;若是訂閱的全部分區中都沒有可供消費的消息,那麼poll()方法返回爲空的消息集合。

poll(long)方法中timeout的時間單位固定爲毫秒,而poll(Duration)方法能夠根據Duration中的ofMillis()、ofSeconds()、ofMinutes()、ofHours()等多種不一樣的方法指定不一樣的時間單位,靈活性更強。而且 poll(long)方法也已經被標註爲@Deprecated,雖然目前還可使用,若是條件容許的話,仍是推薦使用poll(Duration)的方式。

咱們在消費消息的時候能夠直接對 ConsumerRecord 中感興趣的字段進行具體的業務邏輯處理。

poll()方法的返回值類型是 ConsumerRecords,它用來表示一次拉取操做所得到的消息集,內部包含了若干ConsumerRecord,它提供了一個iterator()方法來循環遍歷消息集內部的消息,iterator()方法的定義以下:

@Override
    public Iterator<ConsumerRecord<K, V>> iterator() {
        return new ConcatenatedIterable<>(records.values()).iterator();
    }

在 ConsumerRecords 類中還提供了幾個方法來方便開發人員對消息集進行處理:count()方法用來計算出消息集中的消息個數,返回類型是int;isEmpty()方法用來判斷消息集是否爲空,返回類型是boolean;empty()方法用來獲取一個空的消息集,返回類型是ConsumerRecord<K,V>。

到目前爲止,能夠簡單地認爲poll()方法只是拉取一下消息而已,但就其內部邏輯而言並不簡單,它涉及消費位移、消費者協調器、組協調器、消費者的選舉、分區分配的分發、再均衡的邏輯、心跳等內容

  • 位移提交

對於Kafka中的分區而言,它的每條消息都有惟一的offset,用來表示消息在分區中對應的位置。對於消費者而言,它也有一個offset的概念,消費者使用offset來表示消費到分區中某個消息所在的位置。單詞「offset」能夠翻譯爲「偏移量」,也能夠翻譯爲「位移」,不少同窗可能並無過多地在乎這一點:在不少中文資料中都會交叉使用「偏移量」和「位移」這兩個詞,並無很嚴謹地進行區分。

我對offset作了一些區分:對於消息在分區中的位置,咱們將offset稱爲「偏移量」;對於消費者消費到的位置,將 offset 稱爲「位移」,有時候也會更明確地稱之爲「消費位移」。作這一區分的目的是讓讀者在遇到 offset 的時候能夠很容易甄別出是在講分區存儲層面的內容,仍是在講消費層面的內容

在每次調用poll()方法時,它返回的是尚未被消費過的消息集(固然這個前提是消息已經存儲在Kafka 中了,而且暫不考慮異常狀況的發生),在舊消費者客戶端中,消費位移是存儲在ZooKeeper中的。而在新消費者客戶端中,消費位移存儲在Kafka內部的主題__consumer_offsets中。這裏把將消費位移存儲起來(持久化)的動做稱爲「提交」,消費者在消費完消息以後須要執行消費位移的提交。

  • 指定位移消費

正是有了消費位移的持久化,才使消費者在關閉、崩潰或者在遇到再均衡的時候,可讓接替的消費者可以根據存儲的消費位移繼續進行消費 ,但是有一個問題則是 _consumer_offsets 位移信息過時而被刪除後,它也沒有能夠查找的消費位移 ,這個時候就會根據消費者客戶端參數auto.offset.reset的配置來決定從何處開始進行消費

除了查找不到消費位移,位移越界也會觸發 auto.offset.reset 參數的執行 ,然而有些時候,咱們須要一種更細粒度的掌控,可讓咱們從特定的位移處開始拉取消息,哎 !這個時候 KafkaConsumer 中的 seek()方法正好提供了這個功能,讓咱們得以追前消費或回溯消費。seek()方法的具體定義以下:

public void seek(TopicPartition partition, long offset) {}

seek()方法爲咱們提供了從特定位置讀取消息的能力,咱們能夠經過這個方法來向前跳過若干消息,也能夠經過這個方法來向後回溯若干消息,這樣爲消息的消費提供了很大的靈活性

原創不易,若是以爲有點用的話,請絕不留情點個贊,轉發一下,這將是我持續輸出優質文章的最強動力。

相關文章
相關標籤/搜索