本質上來講,console consumer啓動時會建立一個KafkaStream(能夠簡單翻譯成Kafak流),該stream會不停地等待可消費的新消息——具體作法就是經過LinkedBlockingQueue阻塞隊列來實現,後續會有詳細描述。針對上面啓動的順序列表,咱們在ConsoleConsumer.scala中逐一進行代碼走讀:shell
1 // REQUIRED表示這是一個必需要指定的參數
2 val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
3 "Multiple URLS can be given to allow fail-over.").withRequiredArg.describedAs("urls").ofType(classOf[String])
2. 生成group.id
1 // 若是沒有顯式指定group.id,那麼代碼就本身合成一個 2 // 具體格式: console-consumer-[10萬之內的一個隨機數] 3 // 10萬是一個很大的數,所以只有很是低的概率會碰到多個console consumer的group id相同的狀況
4 if(!consumerProps.containsKey("group.id")) { 5 consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) 6 groupIdPassed=false
7 }
3. 建立ConsumerConfig對象封裝配置
肯定了consumer的group.id以後console consumer須要把傳入參數封裝進ConsumerConfig類中並把後者傳給Consumer的create方法以構造一個ConsumerConnector——即初始化consumer了,具體邏輯見下面的代碼:apache
1 val config = new ConsumerConfig(consumerProps) // 封裝ConsumerConfig配置類
2 val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
4. 建立默認的消息格式化類,其定義的writeTo方法會默認將消息輸出到控制檯
1 val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) // 建立消息格式類,用於最後的輸出顯示
2 val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) 3 val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
5. 建立ZookeeperConsumerConnector
ZookeeperConsumerConnector很是重要,它實現了ConsumerConnector接口(該接口定義了建立KafkaStream和提交位移的操做,如createMessageStreams、commitOffsets等)。Kakfa官網把這個接口稱爲high level的consumer API。對於大多數consumer來講,這個high level的consumer API提供的功能已經足夠了。不過不少用戶可能須要對位移有更大的控制,這個時候Kafka推薦用戶使用被稱爲low level的consumer API—— SimpleConsumer。你們參考這篇文章來深刻學習high level API的用法。目前爲止,咱們只須要知道Kafka經過下面的語句構建了ConsumerConnector這個consumer的核心接口:
1 val connector = Consumer.create(config) // 建立ConsumerConnector,Consumer核心接口
1 val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) 2 val iter = if(maxMessages >= 0) 3 stream.slice(0, maxMessages) 4 else
5 stream
1 for(messageAndTopic <- iter) { 2 try { 3 formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) // 輸出到控制檯
4 numMessages += 1
5 } catch { ... } 6 ... 7 }
好了,至此咱們按照啓動順序概述了console consumer啓動時的各個階段。不過,ZookeeperConsumerConnector和建立和迭代器的實現咱們並未詳細展開,這部份內容將做爲後面續篇的內容呈現給你們。敬請期待!緩存