Kafka2.0服務端啓動源碼

  Kafka 服務端經過Kafka.scala的主函數main方法啓動。KafkaServerStartable類提供讀取配置文件、啓動/中止服務的方法。而啓動/中止服務最終調用的是KafkaServerstartup/shutdown方法。api

啓動流程

  1. 啓動 zk 客戶端。
  2. 啓動動態配置。
  3. 啓動調度線程池。
  4. 啓動日誌管理器的後臺線程,包括日誌清理、日誌刷盤、日誌刪除、日誌壓縮。
  5. 啓動 NIO Socket 服務
    1. 初始化一個接收器Acceptor,即啓動 NIO Socket。
    2. 添加num.network.threads個接收器到請求通道RequestChannel的處理器緩存ConcurrentHashMap,key 爲遞增編號,value 爲處理器Processor
    3. Acceptor執行CountDownLatch.await等待通知啓動。
    4. 緩存AcceptorConcurrentHashMap,key 爲EndPoint,value 爲Acceptor
  6. 啓動副本管理器。
  7. 在 zk 註冊 broker。
  8. 啓動控制器。
  9. 啓動組協調器。
  10. 啓動事務協調器。
  11. 初始化KafkaApis
  12. 初始化處理器線程緩存池
    1. 啓動num.io.threads個請求處理器線程KafkaRequestHandler
    2. 從阻塞隊列ArrayBlockingQueue獲取請求,調用KafkaApis.handle方法,進行集中處理請求。
  13. 啓動處理器線程
    1. 首先CountDownLatch.countDown通知喚醒Acceptor線程。
      1. 使用NIO.select輪詢。
      2. 若是有可接收就緒的事件,則將當前的SocketChannel加入緩存隊列ConcurrentLinkedQueue
    2. 從上述緩存隊列取出SocketChannel,綁定到KafkaChannel
    3. 將接收到的請求緩存到限長阻塞隊列ArrayBlockingQueue

請求處理流程

服務端請求處理流程

詳細源碼分析

Acceptor 線程

def run() {
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 註冊接收事件
  startupComplete() // 通知 Acceptor 線程
  var currentProcessor = 0
  while (isRunning) {
    val ready = nioSelector.select(500) // 輪詢事件
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        val key = iter.next
        iter.remove()
        if (key.isAcceptable) { // 有可接受事件
          val processor = synchronized {
            currentProcessor = currentProcessor % processors.size
            processors(currentProcessor) // 緩存 Processor 
          }
          accept(key, processor) // 將 SocketChannel 緩存到隊列
        }
      }
    }
  }
}

Processor 線程

override def run() {
  startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程。
  while (isRunning) {
    configureNewConnections() // 從緩存隊列取出 SocketChannel,綁定到 KafkaChannel
    processNewResponses() // 處理返回客戶端的響應
    poll() // Kafka.Selector 輪詢讀取/寫入事件
    processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列
    processCompletedSends() // 處理返回客戶端響應後的回調
    processDisconnected() // 斷開鏈接後的處理
  }
}

KafkaRequestHandler 線程阻塞隊列

def run() {
  while (!stopped) {
    val startSelectTime = time.nanoseconds
    // 從阻塞隊列拉取請求
    val req = requestChannel.receiveRequest(300) 

    req match {
      case request: RequestChannel.Request =>
        try {
          apis.handle(request) // 調用`KafkaApis.handle`方法,進行集中處理請求。
        }
    }
  }
}

KSelector

  參考客戶端源碼分析。緩存

相關文章
相關標籤/搜索