上篇文章說道KafkaSelector在建立一個鏈接的時候和普通的nioSelector並無什麼不一樣,它是基於nioSelector的封裝。咱們知道建立鏈接的一系列操做都是由Channel去完成,而KafkaChannel實際上就是對它的進一步封裝: KafkaChannel不只封裝了SocketChannel,還封裝了Kafka本身的認證器Authenticator,和讀寫相關的NetworkReceive、Send。NetworkReceive和Send的底層都是經過ByteBuffer來實現的。java
實際上基本等同於KafkaSelector的建立:node
按照普通的方式建立完通道後,將其註冊到NioSelector上,並關注OP_CONNECT,再以節點Id,SelectionKey來建立KafkaChannel,這裏先不詳細說明KafkaChannel,它是對通道的進一步封裝。在建立完KafkaChannel後,將KafkaChannel與SelectionKey、節點ID作進一步綁定。segmentfault
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 將當前這個socketChannel註冊到nioSelector上,並關注OP_CONNECT事件 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel key.attach(channel);// 將channel綁定到key上 this.channels.put(id, channel);// 將 nodeId 和 Channel綁定
這樣有一個好處,首先KafkaChannel中包含了節點ID與SelectionKey,而咱們也能夠根據節點ID來拿到KafkaChannel,一樣能夠根據SelectionKey來拿到KafkaChannel,這就意味着,咱們只要拿到了KafkaChannel、SelectionKey、節點ID中的任意一個,均可以經過這些引用關係拿到彼此,從而進行相關操做。app
實際上就是將要發送的ByteBuffer扔進KafkaChannel,此時並未進行IO操做,這裏的Send對象,實際上就是對ByteBuffer的進一步封裝,它主要包含了將要發往的節點ID、ByteBuffer大小、是否發送完畢等信息。咱們這裏根據節點ID,從咱們剛纔的channels中,取出KafkaChannel。socket
public void send(Send send) { // 看看send要發的這個nodeId在不在 KafkaChannel channel = channelOrFail(send.destination()); try { // 把數據扔進KafkaChannel中(只能放一個,放多個會報錯),並關注write事件 channel.setSend(send); } catch (CancelledKeyException e) { // 失敗了加一條node_id的失敗記錄 this.failedSends.add(send.destination()); close(channel); } }
這個KafkaChannel的setSend方法實際上很是簡單,就是將要發送的send對象的引用交給KafkaChannel中的send。而且使這個channel的SelectionKey去關注OP_WRITE事件。ide
this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
在上篇文章裏,咱們知道KafkaSelector也是經過輪詢器去進行IO操做,看一下原始的nioSelector是如何進行io操做的:ui
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打開服務端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打開 Selector Selector selector = Selector.open(); // 服務端 Socket 監聽8080端口, 並配置爲非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 將 channel 註冊到 selector 中. // 一般咱們都是先註冊一個 OP_ACCEPT 事件, 而後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ // 註冊到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 經過調用 select 方法, 阻塞地等待 channel I/O 可操做 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 獲取 I/O 操做就緒的 SelectionKey, 經過 SelectionKey 能夠知道哪些 Channel 的哪類 I/O 操做已經就緒. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示咱們已經對這個 IO 事件進行了處理. keyIterator.remove(); if (key.isAcceptable()) { // 當 OP_ACCEPT 事件到來時, 咱們就有從 ServerSocketChannel 中獲取一個 SocketChannel, // 表明客戶端的鏈接 // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中. // 注意, 這裏咱們若是沒有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
####一、讀操做 首先,進行是否能夠開始讀操做的判斷。一、channel.ready(),這裏作了兩個判斷,一個是Kafka的認證器是否定證經過,另外一個則是是否握手成功。二、key.isReadable(),selectionKey是否關注了OP_READ。三、!hasStagedReceive(channel),判斷該channel是否在hasStagedReceive這個map裏面,若是該channel正在讀,那麼它會在這個map裏面,直到讀取完成。this
// channel是否已經準備好從鏈接中讀取任何可讀數據 /* if channel is ready read from any connections that have readable data */ if (channel.ready() // 鏈接的三次握手完成,而且 todo 權限驗證經過 && key.isReadable() // key是否關注了read事件 && !hasStagedReceive(channel)) {// todo 這個通道不能是正在讀數據的,由於在讀的時候,會把這個channel扔進stagedReceives裏面 NetworkReceive networkReceive; /** * 實際上這裏就是分屢次去一個channel取數據,直到取完,並將其保存在key:channel value:new ArrayDeque<NetworkReceive> 中 */ while ((networkReceive = channel.read()) != null) { // 將屢次接收的數據放進stagedReceives下channel的Deque裏面 addToStagedReceives(channel, networkReceive); } }
剩下的channel.read()就比較簡單了,KafkaChannel裏面封裝了一個NetworkReceives,而NetworkReceives主要就是對ByteBuffer的封裝。spa
咱們將該NioChannel傳入,調用channel.read(size)方法,這個size,其實就是一個ByteBuffer,它是kafka協議中用來判斷包體有多長的包頭。rest
第一步,先判斷byteBuffer(size)中是否還有剩餘空間
第二步,從nioChannel中將數據讀到byteBuffer中
第三步,判斷byteBuffer是否是裝滿了
第四步,若是裝滿了,證實size這個bytebuffer已經拿到了包體的長度,調用readInt獲取其capacity,再用這個capacity去申請一個用於接收包體的byteBuffer(buffer)。
第五步,正式地將channel中的數據中讀取到byteBuffer(buffer)
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); if (bytesRead < 0) { throw new EOFException(); } read += bytesRead; if (!size.hasRemaining()) { size.rewind(); int receiveSize = size.getInt(); if (receiveSize < 0) { throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); } if (maxSize != UNLIMITED && receiveSize > maxSize) { throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); } this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) { throw new EOFException(); } read += bytesRead; } return read; }
讀取完成以後,再作一下校驗:就會返回了,也就是上面while ((networkReceive = channel.read()) != null)拿到的這個networkReceives,裏面裝着包頭和包體。這裏Kafka有一個小操做,就是將kafkaChannel內的networkReceive的引用賦值給外面的這個networkReceive後,會將kafkaChannel內的networkReceive的引用置爲空。
/** * 接收數據,將數據保存在 NetworkReceive */ public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } receive(receive);// 這個方法就是上面說了一大堆第一步第二步第三步的那個方法。 if (receive.complete()) { receive.payload() .rewind(); result = receive; receive = null; } return result; }
####二、寫操做 寫操做要比讀操做更加簡單,上面有一個預發送操做,就是將要send的對象Send
/** * 發送時其實也有一次沒發送完的狀況,每發送完的話,就不會出如今completedSends裏面 */ /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ // 若是channel已經ready 而且 咱們有數據來準備好寫sockets if (channel.ready() && key.isWritable()) { Send send = channel.write(); // 這裏會將KafkaChannel的send字段發送出去, // 若是未完成發送,或者沒發完,則返回null // 發送成功則返回send對象 if (send != null) { this.completedSends.add(send);// 添加到completedSends集合 this.sensors.recordBytesSent(channel.id(), send.size()); } }
主要的發送方法就是channel.write();
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; }
而write方法中最核心的方法則是send(send),這個send對象也是一個byteBuffer對象。底層中的底層仍是調用了channel.write(byteBuffer方法)
@Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) { throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); } remaining -= written; // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel. // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than // GatheringByteChannel or ScatteringByteChannel. // 這是一個臨時工做區,當發送時,接收數據的接口一直被BlockingChannel使用着。 // 一旦BlockingChannel 被移除,咱們就能夠開始咱們的發送操做,接收經過 transportLayer 來工做而不是 GatheringByteChannel 或 ScatteringByteChannel if (channel instanceof TransportLayer) { pending = ((TransportLayer) channel).hasPendingWrites(); } return written; }
參考: Java NIO 的前生今世 之四 NIO Selector 詳解 《Apache Kafka 源碼剖析》 - 徐郡明著 Apache Kafka 源碼 0.10.0.1