Kafka2.0服務端啓動源碼

Kafka2.0服務端啓動源碼
  Kafka 服務端經過Kafka.scala的主函數main方法啓動。KafkaServerStartable類提供讀取配置文件、啓動/中止服務的方法。而啓動/中止服務最終調用的是KafkaServer的startup/shutdown方法。
啓動流程
啓動 zk 客戶端。
啓動動態配置。
啓動調度線程池。
啓動日誌管理器的後臺線程,包括日誌清理、日誌刷盤、日誌刪除、日誌壓縮。
啓動 NIO Socket 服務。
初始化一個接收器Acceptor,即啓動 NIO Socket。
添加num.network.threads個接收器到請求通道RequestChannel的處理器緩存ConcurrentHashMap,key 爲遞增編號,value 爲處理器Processor。
Acceptor執行CountDownLatch.await等待通知啓動。
緩存Acceptor到ConcurrentHashMap,key 爲EndPoint,value 爲Acceptor。
啓動副本管理器。
在 zk 註冊 broker。
啓動控制器。
啓動組協調器。
啓動事務協調器。
初始化KafkaApis。
初始化處理器線程緩存池。
啓動num.io.threads個請求處理器線程KafkaRequestHandler。
從阻塞隊列ArrayBlockingQueue獲取請求,調用KafkaApis.handle方法,進行集中處理請求。
啓動處理器線程。
首先CountDownLatch.countDown通知喚醒Acceptor線程。
使用NIO.select輪詢。
若是有可接收就緒的事件,則將當前的SocketChannel加入緩存隊列ConcurrentLinkedQueue
從上述緩存隊列取出SocketChannel,綁定到KafkaChannel。
將接收到的請求緩存到限長阻塞隊列ArrayBlockingQueueapi

請求處理流程緩存

詳細源碼分析Acceptor 線程def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 註冊接收事件 startupComplete() // 通知 Acceptor 線程 var currentProcessor = 0 while (isRunning) { val ready = nioSelector.select(500) // 輪詢事件 if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { val key = iter.next iter.remove() if (key.isAcceptable) { // 有可接受事件 val processor = synchronized { currentProcessor = currentProcessor % processors.size processors(currentProcessor) // 緩存 Processor } accept(key, processor) // 將 SocketChannel 緩存到隊列 } } } }}Processor 線程override def run() { startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程。 while (isRunning) { configureNewConnections() // 從緩存隊列取出 SocketChannel,綁定到 KafkaChannel processNewResponses() // 處理返回客戶端的響應 poll() // Kafka.Selector 輪詢讀取/寫入事件 processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列 processCompletedSends() // 處理返回客戶端響應後的回調 processDisconnected() // 斷開鏈接後的處理 }}KafkaRequestHandler 線程阻塞隊列def run() { while (!stopped) { val startSelectTime = time.nanoseconds // 從阻塞隊列拉取請求 val req = requestChannel.receiveRequest(300) req match { case request: RequestChannel.Request => try { apis.handle(request) // 調用KafkaApis.handle方法,進行集中處理請求。 } } }}KSelector
  參考客戶端源碼分析。ide

相關文章
相關標籤/搜索