Kafka.network包源碼解讀

最近閱讀了kafka network包的源碼,主要是想了解下kafka底層通訊的一些細節,這部分都是用NIO實現的,而且用的是最基本的NIO實現模板,代碼閱讀起來也比較簡單。拋開zookeeper這部分的通訊不看,咱們就看最基本的producerconsumer之間的基於NIO的通訊模塊。在network中主要包含如下類: 數組

咱們挑選幾個最主要的類說明,先從SocketServer的描述看起: 網絡

/**
 * An NIO socket server. The thread model is
 *   1 Acceptor thread that handles new connections
 *   N Processor threads that each have their own selectors and handle all requests from their connections synchronously
 */
SocketServer 中採用 processors 數組保存 processor
Private val processors = new Array[Processor](numProcessorThreads)

AbstractServerThread繼承了runnable,其中採用閉鎖控制開始和結束,主要做用是爲了實現同步。同時打開selector,爲後續的繼承者使用。 app

protected val selector = Selector.open();
  protected val logger = Logger.getLogger(getClass())
  private val startupLatch = new CountDownLatch(1)
  private val shutdownLatch = new CountDownLatch(1)
  private val alive = new AtomicBoolean(false)
這個類是後續講到的兩個類的基類,而且閉鎖的應用是整個同步做用實現的關鍵,咱們看一組 stratup 的閉鎖操做,其中 Unit scala 語法中你能夠把他認爲是 void ,也就是方法的返回值爲空:
/**
   * Wait for the thread to completely start up
   */
  def awaitStartup(): Unit = startupLatch.await

  /**
   * Record that the thread startup is complete
   */
  protected def startupComplete() = {
    alive.set(true)
    startupLatch.countDown
  }
Acceptor繼承了AbstractServerThread,雖然叫Acceptor,可是它並無單獨拿出來使用,而是直接被socketServer引用,這點在命名和使用上與通常的通訊框架不一樣:
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {

這個類中主要實現了ServerSocketChannel的相關工做: 框架

val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket.bind(new InetSocketAddress(port))
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    logger.info("Awaiting connections on port " + port)
    startupComplete()

其內部操做和NIO同樣: socket

/*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
    
    val socketChannel = serverSocketChannel.accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    if (logger.isDebugEnabled()) {
      logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() 
          + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
    }

    processor.accept(socketChannel)
  }

Procesor類繼承了abstractServerThread,其實主要是在Acceptor類中的accept方法中,又新啓一個線程來處理讀寫操做: spa

private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
                               val time: Time,
                               val stats: SocketServerStats,
                               val maxRequestSize: Int) extends AbstractServerThread

因此整個kafka中使用的NIO的模型能夠歸結爲下圖: 線程

socketServer中引用Acceptor處理多個client過來的connector,併爲每一個connection建立出一個processor去單獨處理,每一個processor中均引用獨立的selector。 scala

總體來講,這樣的設計和咱們在用NIO寫傳統的通訊沒有什麼區別,只是這裏在同步上稍微作了點兒文章。更詳細的網絡操做仍是請看mina系列的分析。 debug

相關文章
相關標籤/搜索