客戶端用法:java
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); // 決定一個topic啓動幾個線程去拉取數據,即生成幾個KafkaStream; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(threads)); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic); // 本質是調用了 ZookeeperConsumerConnector val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
queued.max.message.chunks
。消費者線程數量決定阻塞隊列的個數。所以,分析Consumer,主要是分析ZookeeperConsumerConnector
。代碼裏面,有兩個類,它們是什麼關係呢?程序員
0.8.0 和 0.8.2.1
ZookeeperConsumerConnector
的源碼不同,下面以0.8.2.1源碼爲主來分析,也就是從這個版本開始,能夠將Offset存在Kafka的Broker中。(關注實現思想,忽略細節。)api
一個Consumer會建立一個ZookeeperConsumerConnector,表明一個消費者進程.緩存
createMessageStreams
建立消息流,反序列化messagesyncedRebalance
方法,去rebalance
消費者。以一段代碼來講明,消費的topic 12 partition,分配在3臺broker機器上。數據結構
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("test-string-topic", new Integer(2)); //value表示consumer thread線程數量 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
下面是分配的狀況:架構
consumer線程 | Blocking Queue | partitions |
---|---|---|
consumer thread1 | blockingQueue1 | 0,1,2,3,4,5 |
consumer thread2 | blockingQueue2 | 6,7,8,9,10,11 |
fetch線程 | partitions |
---|---|
fetch thread1 | 0,3,6,9 |
fetch thread2 | 1,4,7,10 |
fetch thread3 | 2,5,8,11 |
用戶的consumer thread就使用2個BlockingQueue的數據進行處理;因此通常會使用2個consumer thread去消費這2個BlockingQueue數據。負載均衡
代碼上調用:syncedRebalance方法在內部會調用def rebalance(cluster: Cluster): Boolean方法,去執行操做。
性能
topicRegistry=getCurrentConsumerPartitionInfo
最後,對每一個broker建立一個FetcherRunnable線程,並啓動它。這個fetcher線程負責從Broker上不斷獲取數據,對每一個partition分別建立FetchRequest,最後把數據插入BlockingQueue的操做。fetch
KafkaStream
對ConsumerIterator
作了進一步的封裝,咱們調用stream的next
方法就能夠取到數據了(內部經過調用ConsumerIterator
的next
方法實現)線程
ConsumerIterator
的實現可能會形成數據的重複發送(這要看生產者如何生產數據),FetchedDataChunk
是一個數據集合,它內部會包含不少數據塊,一個數據塊可能包含多條消息,但同一個數據塊中的消息只有一個offset,因此當一個消息塊有多條數據,處理完部分數據發生異常時,消費者從新去取數據,就會再次取得這個數據塊,而後消費過的數據就會被從新消費。
問題: Kafka中由Consumer維護消費狀態,當Consumer消費消息時,支持2種模式commit消費狀態,分別爲當即commit和週期commit。前者會致使性能低下,作到消息投遞剛好一次,但不多使用,後者性能高,一般用於實際應用,但極端條件下沒法保證消息不丟失。
解決方案(這個問題太極端狀況,不推薦,長個知識)
變成下面的: