kafka-網絡層框架

kafka-網絡層介紹

kafka的請求都是經過socket進行通訊的,網絡層就是負責接收請求,而且發送響應的。kafka網絡層使用了java的nio異步框架,大大提升了性能。java

框架圖

輸入圖片說明

Acceptor只監聽新的鏈接,而後經過新的鏈接輪詢發送給Processor。算法

Processor負責與鏈接的數據交互,而且將請求轉發給RequestHandler處理。網絡

RequestHandler負責處理Processor轉發的請求。session

KafkaSelector是對java nio的Selector封裝,負責讀取客戶的請求和發送響應。框架

網絡層初始化-SocketServer類

SocketServer負責上述類的初始化異步

def startup() {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
       .......
      var processorBeginIndex = 0
      // config.listeners表示監聽地址
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        // numProcessorThreads表示Processor的數目
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        // 初始化Processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
        // 初始化Acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        // 啓動acceptor線程
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        // 等待Acceptor完成開始動做
        acceptor.awaitStartup()
        processorBeginIndex = processorEndIndex
      }

AbstractServerThread類

AbstractServerThread是Runnable的封裝,提供startup和shutdown的過程。它是Acceptor和Processor的基類。下面介紹startup過程,shutdown過程相似。socket

abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
    // 計數器,代表是否startup完成
    private val startupLatch = new CountDownLatch(1)
    @volatile private var shutdownLatch = new CountDownLatch(0)
  // 等待線程startup階段完成,由外界調用
  def awaitStartup(): Unit = startupLatch.await

  // 通知startup完成,由線程自身調用
  protected def startupComplete(): Unit = {
    // Replace the open latch with a closed one
    shutdownLatch = new CountDownLatch(1)
    startupLatch.countDown()
  }

Acceptor類

Acceptor的初始化過程ide

// 將java nio的Selector重命名爲NSelector
  import java.nio.channels.{Selector => NSelector}
  // 實例化NSelector
  private val nioSelector = NSelector.open()
  // 實例化server channel
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   
  // 啓動Processor 
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor, false).start()
    }
  }

openServerSocket方法性能

private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    // 實例化serverChannel
    val serverChannel = ServerSocketChannel.open()
    // 設置非阻塞
    serverChannel.configureBlocking(false)
    // bind地址
    serverChannel.socket.bind(socketAddress)
.......

Acceptor本質是一個線程,下面是它的run方法this

def run() {
    // 註冊監聽ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    .........
    // currentProcessor表示processor的索引號
    var currentProcessor = 0
    while (isRunning) {
        // 等待500ms,返回就緒的通道
        val ready = nioSelector.select(500)
        if (ready > 0) {
            // 取出就緒的keys
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            // 遍歷keys
            while (iter.hasNext && isRunning) {
                if (key.isAcceptable)
                    // 接收新的鏈接,發送給索引爲currentProcessor的processor
                    accept(key, processors(currentProcessor))
                 // 更新currentProcessor,這裏體現了輪詢的算法
                currentProcessor = (currentProcessor + 1) % processors.length
                ........

下面是accept方法,它只是接收的新的鏈接,而後發送給processor

def accept(key: SelectionKey, processor: Processor) {
    .........
    // 接收新的鏈接
    val socketChannel = serverSocketChannel.accept()
    // 設置非阻塞
    socketChannel.configureBlocking(false)
    // 禁用了Nagle 算法,保證數據能立馬發送出去
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
   
    // 調用processor的accept方法
    processor.accept(socketChannel)
    .........

Processor類

Processor 屬性

// newConnections是acceptor線程和processor通訊的queue,
//  acceptor會向隊列添加新的鏈接,processor會從隊列取出鏈接,而後進行處理。
newConnections = new ConcurrentLinkedQueue[SocketChannel]()

//kafka的Selector
selector = new KSelector(......)

下面是Processor的accept方法,在上面由acceptor調用

def accept(socketChannel: SocketChannel) {
   // acceptor將新的鏈接,加入到newConnections裏面
   newConnections.add(socketChannel)
   // 喚醒selector,使processor能即時處理新鏈接
   wakeup()

接下來看看Processor的run方法

override def run() {
    startupComplete()
    while (isRunning) {
        ..........
        // 處理newConnections隊列裏面的新鏈接
        configureNewConnections()
        // 處理新的響應
        processNewResponses()
        
        poll()
        // 處理已經讀取完的請求
        processCompletedReceives()
        // 處理已經發送的請求
        processCompletedSends()
        //處理關閉的鏈接
        processDisconnected()
        .........
    }

    .........
  }

configureNewConnections方法,處理新鏈接

private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      // 從newConnections隊列裏面取出新鏈接
      val channel = newConnections.poll()
      // 獲取鏈接信息
      val localHost = channel.socket().getLocalAddress.getHostAddress
      val localPort = channel.socket().getLocalPort
      val remoteHost = channel.socket().getInetAddress.getHostAddress
      val remotePort = channel.socket().getPort
      // 構件鏈接id
      val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
      // 調用selecotr註冊新鏈接
      selector.register(connectionId, channel)
      ........

下面是processNewResponses方法

private def processNewResponses() {
    // 從requestChannel取出鏈接
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
        // 根據響應的Action做不一樣的處理
        curr.responseAction match {
            case RequestChannel.NoOpAction =>
                val channelId = curr.request.connectionId
                if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                    // 若是鏈接存在而且沒有準備關閉,程序會繼續等待讀取數據。
                    // unmute方法,實質是註冊新鏈接的讀事件
                    selector.unmute(channelId)
           case RequestChannel.SendAction =>
                val responseSend = curr.responseSend.getOrElse(
                          throw new IllegalStateException(s"responseSend must be defined for SendAction,         response: $curr"))
                // 發送響應
                sendResponse(curr, responseSend)
           case RequestChannel.CloseConnectionAction =>
                //關閉鏈接
                close(selector, curr.request.connectionId)
        }
    ..........

poll方法,實質就是selector.poll的封裝,後面篇幅會講到Selector的實現細節

private def poll() {
    ......
    selector.poll(300)
    ......
 }

processCompletedReceives方法

private def processCompletedReceives() {
    // 從selector遍歷completedReceives
    selector.completedReceives.asScala.foreach { receive =>
    val openChannel = selector.channel(receive.source)
    val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
    // 構建Request
    val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
    // 發送request到requestChannel
    requestChannel.sendRequest(req)
    // 取消這個鏈接的讀事件,不繼續讀取請求
    selector.mute(receive.source)

processCompletedSends方法

private def processCompletedSends() {
    // 從selector遍歷completedSends
    selector.completedSends.asScala.foreach { send =>
      val resp = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      // 增長這個鏈接的讀事件,繼續讀取請求
      selector.unmute(send.destination)
    }
  }

processDisconnected方法

private def processDisconnected() {
    // 從selector遍歷disconnected
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
      // the channel has been closed by the selector but the quotas still need to be updated
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }

歸納

SocketServer: 負責框架初始化。實例Acceptor和Processor

Acceptor: 使用java nio框架的Selector,綁定監端口,負責接收新鏈接,而且經過基於線程間的隊列,把新鏈接輪詢發送給Processor

Processor:接收acceptor的新鏈接,使用KSelector負責讀取鏈接的請求,而後把請求發送給

requestChannel處理。而後從requestChannel獲取響應後,將響應基於KSelector發送出去

相關文章
相關標籤/搜索