最近閱讀了kafka network包的源碼,主要是想了解下kafka底層通訊的一些細節,這部分都是用NIO實現的,而且用的是最基本的NIO實現模板,代碼閱讀起來也比較簡單。拋開zookeeper這部分的通訊不看,咱們就看最基本的producer和consumer之間的基於NIO的通訊模塊。在network中主要包含如下類: 數組
咱們挑選幾個最主要的類說明,先從SocketServer的描述看起: 網絡
/** * An NIO socket server. The thread model is * 1 Acceptor thread that handles new connections * N Processor threads that each have their own selectors and handle all requests from their connections synchronously */在 SocketServer 中採用 processors 數組保存 processor
Private val processors = new Array[Processor](numProcessorThreads)
在AbstractServerThread繼承了runnable,其中採用閉鎖控制開始和結束,主要做用是爲了實現同步。同時打開selector,爲後續的繼承者使用。 app
protected val selector = Selector.open(); protected val logger = Logger.getLogger(getClass()) private val startupLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1) private val alive = new AtomicBoolean(false)這個類是後續講到的兩個類的基類,而且閉鎖的應用是整個同步做用實現的關鍵,咱們看一組 stratup 的閉鎖操做,其中 Unit 在 scala 語法中你能夠把他認爲是 void ,也就是方法的返回值爲空:
/** * Wait for the thread to completely start up */ def awaitStartup(): Unit = startupLatch.await /** * Record that the thread startup is complete */ protected def startupComplete() = { alive.set(true) startupLatch.countDown }Acceptor繼承了AbstractServerThread,雖然叫Acceptor,可是它並無單獨拿出來使用,而是直接被socketServer引用,這點在命名和使用上與通常的通訊框架不一樣:
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {
這個類中主要實現了ServerSocketChannel的相關工做: 框架
val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) serverChannel.socket.bind(new InetSocketAddress(port)) serverChannel.register(selector, SelectionKey.OP_ACCEPT); logger.info("Awaiting connections on port " + port) startupComplete()
其內部操做和NIO同樣: socket
/* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize) val socketChannel = serverSocketChannel.accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) if (logger.isDebugEnabled()) { logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]") } processor.accept(socketChannel) }
Procesor類繼承了abstractServerThread,其實主要是在Acceptor類中的accept方法中,又新啓一個線程來處理讀寫操做: spa
private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping, val time: Time, val stats: SocketServerStats, val maxRequestSize: Int) extends AbstractServerThread
因此整個kafka中使用的NIO的模型能夠歸結爲下圖: 線程
socketServer中引用Acceptor處理多個client過來的connector,併爲每一個connection建立出一個processor去單獨處理,每一個processor中均引用獨立的selector。 scala
總體來講,這樣的設計和咱們在用NIO寫傳統的通訊沒有什麼區別,只是這裏在同步上稍微作了點兒文章。更詳細的網絡操做仍是請看mina系列的分析。 debug