kafka的請求都是經過socket進行通訊的,網絡層就是負責接收請求,而且發送響應的。kafka網絡層使用了java的nio異步框架,大大提升了性能。java
Acceptor只監聽新的鏈接,而後經過新的鏈接輪詢發送給Processor。算法
Processor負責與鏈接的數據交互,而且將請求轉發給RequestHandler處理。網絡
RequestHandler負責處理Processor轉發的請求。session
KafkaSelector是對java nio的Selector封裝,負責讀取客戶的請求和發送響應。框架
SocketServer負責上述類的初始化異步
def startup() { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) ....... var processorBeginIndex = 0 // config.listeners表示監聽地址 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName // numProcessorThreads表示Processor的數目 val processorEndIndex = processorBeginIndex + numProcessorThreads // 初始化Processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol) // 初始化Acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) // 啓動acceptor線程 Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start() // 等待Acceptor完成開始動做 acceptor.awaitStartup() processorBeginIndex = processorEndIndex }
AbstractServerThread是Runnable的封裝,提供startup和shutdown的過程。它是Acceptor和Processor的基類。下面介紹startup過程,shutdown過程相似。socket
abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { // 計數器,代表是否startup完成 private val startupLatch = new CountDownLatch(1) @volatile private var shutdownLatch = new CountDownLatch(0) // 等待線程startup階段完成,由外界調用 def awaitStartup(): Unit = startupLatch.await // 通知startup完成,由線程自身調用 protected def startupComplete(): Unit = { // Replace the open latch with a closed one shutdownLatch = new CountDownLatch(1) startupLatch.countDown() }
Acceptor的初始化過程ide
// 將java nio的Selector重命名爲NSelector import java.nio.channels.{Selector => NSelector} // 實例化NSelector private val nioSelector = NSelector.open() // 實例化server channel val serverChannel = openServerSocket(endPoint.host, endPoint.port) // 啓動Processor this.synchronized { processors.foreach { processor => Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor, false).start() } }
openServerSocket方法性能
private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if(host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) // 實例化serverChannel val serverChannel = ServerSocketChannel.open() // 設置非阻塞 serverChannel.configureBlocking(false) // bind地址 serverChannel.socket.bind(socketAddress) .......
Acceptor本質是一個線程,下面是它的run方法this
def run() { // 註冊監聽ACCEPT事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) ......... // currentProcessor表示processor的索引號 var currentProcessor = 0 while (isRunning) { // 等待500ms,返回就緒的通道 val ready = nioSelector.select(500) if (ready > 0) { // 取出就緒的keys val keys = nioSelector.selectedKeys() val iter = keys.iterator() // 遍歷keys while (iter.hasNext && isRunning) { if (key.isAcceptable) // 接收新的鏈接,發送給索引爲currentProcessor的processor accept(key, processors(currentProcessor)) // 更新currentProcessor,這裏體現了輪詢的算法 currentProcessor = (currentProcessor + 1) % processors.length ........
下面是accept方法,它只是接收的新的鏈接,而後發送給processor
def accept(key: SelectionKey, processor: Processor) { ......... // 接收新的鏈接 val socketChannel = serverSocketChannel.accept() // 設置非阻塞 socketChannel.configureBlocking(false) // 禁用了Nagle 算法,保證數據能立馬發送出去 socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) // 調用processor的accept方法 processor.accept(socketChannel) .........
Processor 屬性
// newConnections是acceptor線程和processor通訊的queue, // acceptor會向隊列添加新的鏈接,processor會從隊列取出鏈接,而後進行處理。 newConnections = new ConcurrentLinkedQueue[SocketChannel]() //kafka的Selector selector = new KSelector(......)
下面是Processor的accept方法,在上面由acceptor調用
def accept(socketChannel: SocketChannel) { // acceptor將新的鏈接,加入到newConnections裏面 newConnections.add(socketChannel) // 喚醒selector,使processor能即時處理新鏈接 wakeup()
接下來看看Processor的run方法
override def run() { startupComplete() while (isRunning) { .......... // 處理newConnections隊列裏面的新鏈接 configureNewConnections() // 處理新的響應 processNewResponses() poll() // 處理已經讀取完的請求 processCompletedReceives() // 處理已經發送的請求 processCompletedSends() //處理關閉的鏈接 processDisconnected() ......... } ......... }
configureNewConnections方法,處理新鏈接
private def configureNewConnections() { while (!newConnections.isEmpty) { // 從newConnections隊列裏面取出新鏈接 val channel = newConnections.poll() // 獲取鏈接信息 val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort // 構件鏈接id val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString // 調用selecotr註冊新鏈接 selector.register(connectionId, channel) ........
下面是processNewResponses方法
private def processNewResponses() { // 從requestChannel取出鏈接 var curr = requestChannel.receiveResponse(id) while (curr != null) { // 根據響應的Action做不一樣的處理 curr.responseAction match { case RequestChannel.NoOpAction => val channelId = curr.request.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) // 若是鏈接存在而且沒有準備關閉,程序會繼續等待讀取數據。 // unmute方法,實質是註冊新鏈接的讀事件 selector.unmute(channelId) case RequestChannel.SendAction => val responseSend = curr.responseSend.getOrElse( throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr")) // 發送響應 sendResponse(curr, responseSend) case RequestChannel.CloseConnectionAction => //關閉鏈接 close(selector, curr.request.connectionId) } ..........
poll方法,實質就是selector.poll的封裝,後面篇幅會講到Selector的實現細節
private def poll() { ...... selector.poll(300) ...... }
processCompletedReceives方法
private def processCompletedReceives() { // 從selector遍歷completedReceives selector.completedReceives.asScala.foreach { receive => val openChannel = selector.channel(receive.source) val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) // 構建Request val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol) // 發送request到requestChannel requestChannel.sendRequest(req) // 取消這個鏈接的讀事件,不繼續讀取請求 selector.mute(receive.source)
processCompletedSends方法
private def processCompletedSends() { // 從selector遍歷completedSends selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } // 增長這個鏈接的讀事件,繼續讀取請求 selector.unmute(send.destination) } }
processDisconnected方法
private def processDisconnected() { // 從selector遍歷disconnected selector.disconnected.keySet.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request)) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) } }
SocketServer: 負責框架初始化。實例Acceptor和Processor
Acceptor: 使用java nio框架的Selector,綁定監端口,負責接收新鏈接,而且經過基於線程間的隊列,把新鏈接輪詢發送給Processor
Processor:接收acceptor的新鏈接,使用KSelector負責讀取鏈接的請求,而後把請求發送給
requestChannel處理。而後從requestChannel獲取響應後,將響應基於KSelector發送出去