Kafka 服務端經過Kafka.scala
的主函數main
方法啓動。KafkaServerStartable
類提供讀取配置文件、啓動/中止服務的方法。而啓動/中止服務最終調用的是KafkaServer
的startup/shutdown
方法。api
Acceptor
,即啓動 NIO Socket。num.network.threads
個接收器到請求通道RequestChannel
的處理器緩存ConcurrentHashMap
,key 爲遞增編號,value 爲處理器Processor
。Acceptor
執行CountDownLatch.await
等待通知啓動。Acceptor
到ConcurrentHashMap
,key 爲EndPoint
,value 爲Acceptor
。KafkaApis
。num.io.threads
個請求處理器線程KafkaRequestHandler
。ArrayBlockingQueue
獲取請求,調用KafkaApis.handle
方法,進行集中處理請求。CountDownLatch.countDown
通知喚醒Acceptor
線程。
NIO.select
輪詢。SocketChannel
加入緩存隊列ConcurrentLinkedQueue
SocketChannel
,綁定到KafkaChannel
。ArrayBlockingQueue
def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 註冊接收事件 startupComplete() // 通知 Acceptor 線程 var currentProcessor = 0 while (isRunning) { val ready = nioSelector.select(500) // 輪詢事件 if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { val key = iter.next iter.remove() if (key.isAcceptable) { // 有可接受事件 val processor = synchronized { currentProcessor = currentProcessor % processors.size processors(currentProcessor) // 緩存 Processor } accept(key, processor) // 將 SocketChannel 緩存到隊列 } } } } }
override def run() { startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程。 while (isRunning) { configureNewConnections() // 從緩存隊列取出 SocketChannel,綁定到 KafkaChannel processNewResponses() // 處理返回客戶端的響應 poll() // Kafka.Selector 輪詢讀取/寫入事件 processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列 processCompletedSends() // 處理返回客戶端響應後的回調 processDisconnected() // 斷開鏈接後的處理 } }
def run() { while (!stopped) { val startSelectTime = time.nanoseconds // 從阻塞隊列拉取請求 val req = requestChannel.receiveRequest(300) req match { case request: RequestChannel.Request => try { apis.handle(request) // 調用`KafkaApis.handle`方法,進行集中處理請求。 } } } }
參考客戶端源碼分析。緩存