Kafka 0.8 Consumer處理邏輯

0.前言

客戶端用法:java

kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

// 決定一個topic啓動幾個線程去拉取數據,即生成幾個KafkaStream;
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threads));

Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);

// 本質是調用了 ZookeeperConsumerConnector
val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
  • 一個Topic啓動幾個消費者線程,會生成幾個KafkaStream。
  • 一個KafkaStream對應的是一個Queue(有界的LinkedBlockingQueue),有界的參數控制:queued.max.message.chunks。消費者線程數量決定阻塞隊列的個數。
  • Fetcher線程是對應topic所在的broker的個數。

所以,分析Consumer,主要是分析ZookeeperConsumerConnector。代碼裏面,有兩個類,它們是什麼關係呢?程序員

  • kafka.consumer.ZookeeperConsumerConnector:核心類
  • kafka.javaapi.consumer.ZookeeperConsumerConnector:對上面那個類的scala數據結構封裝,方便Java程序員使用。

0.8.0 和 0.8.2.1 ZookeeperConsumerConnector的源碼不同,下面以0.8.2.1源碼爲主來分析,也就是從這個版本開始,能夠將Offset存在Kafka的Broker中。(關注實現思想,忽略細節。)api

1.ZookeeperConsumerConnector 架構

image

一個Consumer會建立一個ZookeeperConsumerConnector,表明一個消費者進程.緩存

  • fetcher: 消費者獲取數據, 使用ConsumerFetcherManager fetcher線程抓取數據
  • zkClient: 消費者要和ZK通訊, 除了註冊本身,還有其餘信息也會寫到ZK中
  • topicThreadIdAndQueues: 消費者會指定本身消費哪些topic,並指定線程數, 因此topicThreadId都對應一個隊列
  • messageStreamCreated: 消費者會建立消息流, 每一個隊列都對應一個消息流
  • offsetsChannel: offset能夠存儲在ZK或者kafka中,若是存在kafka裏,像其餘請求同樣,須要和Broker通訊。能夠理解成OffsetManager的一部分。
  • scheduler: 後臺調度autoCommit
  • 還有其餘幾個Listener監聽器,分別用於topicPartition的更新,負載均衡,消費者從新負載等

簡述獲取數據的流程

  1. 初始化上面的幾個組件,包括與ZK的鏈接,建立ConsumerFetcherManager,確保鏈接上OffsetManager(爲該ConsumerGroup創建一個OffsetChannel)。
  2. createMessageStreams建立消息流,反序列化message
  3. 經過Fetcher線程拉取數據,放入BlockingQueue來給客戶端。
  4. 客戶端啓動ZKRebalancerListener,ZKRebalancerListener實例會在內部建立一個線程,這個線程定時檢查監聽的事件有沒有執行(消費者發生變化),若是沒有變化則wait 1秒鐘,當發生了變化就調用 syncedRebalance 方法,去rebalance消費者。

1.1 消費者線程(consumer thread),隊列(LinkedBlockingQueue),拉取線程(fetch thread)三者之間關係

以一段代碼來講明,消費的topic 12 partition,分配在3臺broker機器上。數據結構

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test-string-topic", new Integer(2)); //value表示consumer thread線程數量

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  • consumer thread數量與BlockingQueue一一對應。因此上述的代碼只有2個BlockQueue。(它們鏈接的橋樑是KafkaStream)
  • fetcher線程數和topic所在多少臺broker有關。所以,共有3個fetcher線程與broker創建一個鏈接。(3個fetch thread線程去拉取消息數據,最終放到2個BlockingQueue中,等待consumer thread來消費。

下面是分配的狀況:架構

  • 消費者線程,緩衝隊列,partitions分佈列表以下
consumer線程 Blocking Queue partitions
consumer thread1 blockingQueue1 0,1,2,3,4,5
consumer thread2 blockingQueue2 6,7,8,9,10,11
  • fetch thread與partitions分佈列表以下
fetch線程 partitions
fetch thread1 0,3,6,9
fetch thread2 1,4,7,10
fetch thread3 2,5,8,11

用戶的consumer thread就使用2個BlockingQueue的數據進行處理;因此通常會使用2個consumer thread去消費這2個BlockingQueue數據。負載均衡

1.2 rebalance的流程

代碼上調用:syncedRebalance方法在內部會調用def rebalance(cluster: Cluster): Boolean方法,去執行操做。性能

  1. // 關閉全部的數據獲取者 closeFetchers
  2. // 解除分區的全部者 releasePartitionOwnership
  3. // 按規則獲得當前消費者擁有的分區信息並保存到topicRegistry中 topicRegistry=getCurrentConsumerPartitionInfo
  4. // 修改並重啓Fetchers updateFetchers

最後,對每一個broker建立一個FetcherRunnable線程,並啓動它。這個fetcher線程負責從Broker上不斷獲取數據,對每一個partition分別建立FetchRequest,最後把數據插入BlockingQueue的操做。fetch

KafkaStreamConsumerIterator作了進一步的封裝,咱們調用stream的next方法就能夠取到數據了(內部經過調用ConsumerIteratornext方法實現)線程

1.3 注意

ConsumerIterator的實現可能會形成數據的重複發送(這要看生產者如何生產數據),FetchedDataChunk是一個數據集合,它內部會包含不少數據塊,一個數據塊可能包含多條消息,但同一個數據塊中的消息只有一個offset,因此當一個消息塊有多條數據,處理完部分數據發生異常時,消費者從新去取數據,就會再次取得這個數據塊,而後消費過的數據就會被從新消費。

  • 沒想到裏面,裏面是這個樣子的,給一個數據塊,致使了數據消費的重複。

3.美團遇到的一個問題

問題: Kafka中由Consumer維護消費狀態,當Consumer消費消息時,支持2種模式commit消費狀態,分別爲當即commit和週期commit。前者會致使性能低下,作到消息投遞剛好一次,但不多使用,後者性能高,一般用於實際應用,但極端條件下沒法保證消息不丟失。

解決方案(這個問題太極端狀況,不推薦,長個知識)

  • 將原本的結果改爲下面的處理流程:等待「執行業務邏輯」成功完成後更新緩存消費狀態,就能夠保證消息不會丟失。
    -

變成下面的:
image

相關文章
相關標籤/搜索