【原創】Kafka console consumer源代碼分析(一)

上一篇中分析了Scala版的console producer代碼,這篇文章爲讀者帶來一篇console consumer工做原理分析的隨筆。其實不管是哪一個consumer,大部分的工做原理都是相似的。本文利用console consumer做爲切入點,既容易理解又不失通常性。
 
本文使用的Kafka環境是0.8.2.1版本,這也是當前最新的版本。(注:Kafka 0.9版本聽說會用Java從新設計並編寫consumer代碼,對此咱們拭目以待) 因爲主要目的是分析consumer原理,所以本文並不過多糾結於console consumer特定的使用方法。一條最簡單的命令足以做爲咱們的開始:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-topic
 
kafka-console-consumer.sh腳本內容簡潔明瞭: exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@
 
很顯然,該shell腳本調用了kafka.tools包下的ConsoleConsumer類,並將提供的命令行參數所有傳給該類。由此可知,咱們須要從這個類開始分析。不過在此以前,簡單說一下console consumer總體的啓動流程,以下圖所示:

上圖流程具體展開以下:
1. 加載並解析命令行參數,惟一的必要參數(Required)是zookeeper
2. 若是沒有傳入group.id,ConsoleConsumer將生成本身的group.id,即console-consumer-[10萬之內的一個隨機數]
3. 建立ConsumerConfig用於封裝consumer的各類配置
4. 建立默認的消息格式化類,其定義的writeTo方法會默認將消息輸出到控制檯
5. 建立ZookeeperConsumerConnector。Kafka使用它來建立KafkaStream消費流
5.1 建立本地緩存, 保存topic下每一個分區的信息,包括該分區底層的阻塞隊列,已消費的位移、已獲取到的最新位移以及獲取大小等
5.2 建立本地緩存,保存每一個topic分區當前在zookeeper中保存的位移值
5.3 建立本地緩存,保存topic的每一個讀取線程底層對應的阻塞隊列,主要用於關閉Connector時能夠批量關閉底層的阻塞隊列
5.4 生成consumer id,規則爲[group.id]_[主機名]_[時間戳]_[隨機產生的一個UUID的前8位]。其中主機名就是運行ConsoleConsumer所在broker節點的主機名
5.5 建立獲取線程管理器(ConsumerFetcherManager)
5.6 啓動一個特定線程,用於定時地(默認是1分鐘)向Zookeeper提交更改過的位移 
6. 增長JVM關閉鉤子,確保JVM關閉後資源也可以被釋放
7. 建立KafkaStream並經過迭代器不斷遍歷該stream, KafkaStream的迭代器的底層實現包含一個阻塞隊列,若是沒有新的消息到來,該迭代器會一直阻塞,除非你顯式設置了consumer.timeout.ms參數(默認是-1表示consumer會一直等待新消息的帶來)
8. 每接收到一條新的消息,默認的消息格式化類會將其輸出到控制檯上。而後再次等待迭代器傳過來的下一條消息

本質上來講,console consumer啓動時會建立一個KafkaStream(能夠簡單翻譯成Kafak流),該stream會不停地等待可消費的新消息——具體作法就是經過LinkedBlockingQueue阻塞隊列來實現,後續會有詳細描述。針對上面啓動的順序列表,咱們在ConsoleConsumer.scala中逐一進行代碼走讀:shell

1. 加載必要參數 zookeeper
ConsoleConsumer.scala類定義了main方法,說明這是個可執行的類。類的前100多行幾乎都在處理命令行參數的解析。其中真正必要的參數只有zookeeper.connect一個,以下面代碼所示:
1 // REQUIRED表示這是一個必需要指定的參數
2 val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
3     "Multiple URLS can be given to allow fail-over.").withRequiredArg.describedAs("urls").ofType(classOf[String])
2. 生成group.id
乍一看和官網上要求的配置不匹配,由於官網中說過consumer真正必要的參數實際上有兩個:zookeeper.connect和group.id。由此能夠推斷console consumer應該會生成group.id的值,且它本質上也是一個consumer,必然屬於一個消費組,所以也必然定義了consumer id。下面的代碼中即展現了console consumer如何生成本身的group id: (consumer id是如何生成的後面再說)
1 // 若是沒有顯式指定group.id,那麼代碼就本身合成一個 2 // 具體格式: console-consumer-[10萬之內的一個隨機數] 3 // 10萬是一個很大的數,所以只有很是低的概率會碰到多個console consumer的group id相同的狀況
4 if(!consumerProps.containsKey("group.id")) { 5       consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) 6       groupIdPassed=false
7 }
3. 建立ConsumerConfig對象封裝配置

肯定了consumer的group.id以後console consumer須要把傳入參數封裝進ConsumerConfig類中並把後者傳給Consumer的create方法以構造一個ConsumerConnector——即初始化consumer了,具體邏輯見下面的代碼:apache

1 val config = new ConsumerConfig(consumerProps) // 封裝ConsumerConfig配置類
2 val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
4. 建立默認的消息格式化類,其定義的writeTo方法會默認將消息輸出到控制檯
1 val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))  // 建立消息格式類,用於最後的輸出顯示
2 val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) 3 val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
5. 建立ZookeeperConsumerConnector
ZookeeperConsumerConnector很是重要,它實現了ConsumerConnector接口(該接口定義了建立KafkaStream和提交位移的操做,如createMessageStreams、commitOffsets等)。Kakfa官網把這個接口稱爲high level的consumer API。對於大多數consumer來講,這個high level的consumer API提供的功能已經足夠了。不過不少用戶可能須要對位移有更大的控制,這個時候Kafka推薦用戶使用被稱爲low level的consumer API—— SimpleConsumer。你們參考這篇文章來深刻學習high level API的用法。目前爲止,咱們只須要知道Kafka經過下面的語句構建了ConsumerConnector這個consumer的核心接口:
1 val connector = Consumer.create(config) // 建立ConsumerConnector,Consumer核心接口
 
6. 構建JVM關閉鉤子線程 
這部分很是簡單,就是在線程中關閉上一步建立的connector,並根據傳入的參數決定是否刪除zookeeper下/consumers/[group.id]節點
7. 建立KafkaStream,經過迭代器等待消息到來
因爲console consumer支持同時消費多個topic的消息,所以它提供了相似於過濾器這樣的實現,這也是爲何connector調用createMessageStreamsByFilter來建立KafkaStream的緣由,以下面的代碼所示。
1 val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) 2 val iter = if(maxMessages >= 0) 3     stream.slice(0, maxMessages) 4 else
5     stream
createMessageStreamsByFilter方法返回的是一組KafkaStream,但console consumer默認只是建立了1個stream,因此這裏直接調用get(0)取到這個stream就能夠了。
8. 經過迭代器以阻塞等待的方式消費消息
建立好KafkaStream以後,console consumer經過迭代器遍歷KafkaStream。這裏值得注意的是,該迭代器底層實現依賴一個阻塞隊列。若是沒有顯式配置過consumer.timeout.ms參數(默認是-1表示consumer會一直等待新消息),那麼迭代器會一直處於阻塞狀態等待可供消費的消息——具體的實現細節參見下一篇。迭代器每收到一條消息後,它就會使用默認的消息格式化類DefaultMessageFormatter將消息輸出到控制檯,這也是console consumer名字的由來,以下面的代碼所示:
1 for(messageAndTopic <- iter) { 2     try { 3         formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) // 輸出到控制檯
4         numMessages += 1
5     } catch { ... } 6  ... 7 }

好了,至此咱們按照啓動順序概述了console consumer啓動時的各個階段。不過,ZookeeperConsumerConnector和建立和迭代器的實現咱們並未詳細展開,這部份內容將做爲後面續篇的內容呈現給你們。敬請期待!緩存

相關文章
相關標籤/搜索