Kafka源碼剖析 —— 網絡I/O篇 —— 淺析KafkaChannel

1、SocketChannel和KafkaChannel有什麼區別?

上篇文章說道KafkaSelector在建立一個鏈接的時候和普通的nioSelector並無什麼不一樣,它是基於nioSelector的封裝。咱們知道建立鏈接的一系列操做都是由Channel去完成,而KafkaChannel實際上就是對它的進一步封裝:      KafkaChannel不只封裝了SocketChannel,還封裝了Kafka本身的認證器Authenticator,和讀寫相關的NetworkReceive、Send。NetworkReceive和Send的底層都是經過ByteBuffer來實現的。java

2、KafkaChannel的建立

實際上基本等同於KafkaSelector的建立:node

按照普通的方式建立完通道後,將其註冊到NioSelector上,並關注OP_CONNECT,再以節點Id,SelectionKey來建立KafkaChannel,這裏先不詳細說明KafkaChannel,它是對通道的進一步封裝。在建立完KafkaChannel後,將KafkaChannel與SelectionKey、節點ID作進一步綁定。segmentfault

SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 將當前這個socketChannel註冊到nioSelector上,並關注OP_CONNECT事件
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel
        key.attach(channel);// 將channel綁定到key上
        this.channels.put(id, channel);// 將 nodeId 和 Channel綁定

這樣有一個好處,首先KafkaChannel中包含了節點ID與SelectionKey,而咱們也能夠根據節點ID來拿到KafkaChannel,一樣能夠根據SelectionKey來拿到KafkaChannel,這就意味着,咱們只要拿到了KafkaChannel、SelectionKey、節點ID中的任意一個,均可以經過這些引用關係拿到彼此,從而進行相關操做app

3、預發送

實際上就是將要發送的ByteBuffer扔進KafkaChannel,此時並未進行IO操做,這裏的Send對象,實際上就是對ByteBuffer的進一步封裝,它主要包含了將要發往的節點ID、ByteBuffer大小、是否發送完畢等信息。咱們這裏根據節點ID,從咱們剛纔的channels中,取出KafkaChannel。socket

public void send(Send send) {
        // 看看send要發的這個nodeId在不在
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            // 把數據扔進KafkaChannel中(只能放一個,放多個會報錯),並關注write事件
            channel.setSend(send);
        } catch (CancelledKeyException e) {

            // 失敗了加一條node_id的失敗記錄
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

這個KafkaChannel的setSend方法實際上很是簡單,就是將要發送的send對象的引用交給KafkaChannel中的send。而且使這個channel的SelectionKey去關注OP_WRITE事件。ide

this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

4、nio中的io操做

在上篇文章裏,咱們知道KafkaSelector也是經過輪詢器去進行IO操做,看一下原始的nioSelector是如何進行io操做的:ui

public class NioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
        // 打開服務端 Socket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 打開 Selector
        Selector selector = Selector.open();

        // 服務端 Socket 監聽8080端口, 並配置爲非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 將 channel 註冊到 selector 中.
        // 一般咱們都是先註冊一個 OP_ACCEPT 事件, 而後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ
        // 註冊到 Selector 中.
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 經過調用 select 方法, 阻塞地等待 channel I/O 可操做
            if (selector.select(TIMEOUT) == 0) {
                System.out.print(".");
                continue;
            }

            // 獲取 I/O 操做就緒的 SelectionKey, 經過 SelectionKey 能夠知道哪些 Channel 的哪類 I/O 操做已經就緒.
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示咱們已經對這個 IO 事件進行了處理.
                keyIterator.remove();

                if (key.isAcceptable()) {
                    // 當 OP_ACCEPT 事件到來時, 咱們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
                    // 表明客戶端的鏈接
                    // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
                    // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
                    // 注意, 這裏咱們若是沒有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回.
                    clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                }

                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    long bytesRead = clientChannel.read(buf);
                    if (bytesRead == -1) {
                        clientChannel.close();
                    } else if (bytesRead > 0) {
                        key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                        System.out.println("Get data length: " + bytesRead);
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    buf.flip();
                    SocketChannel clientChannel = (SocketChannel) key.channel();

                    clientChannel.write(buf);

                    if (!buf.hasRemaining()) {
                        key.interestOps(OP_READ);
                    }
                    buf.compact();
                }
            }
        }
    }
}

5、kafkaChannel 如何進行io操做?

####一、讀操做 首先,進行是否能夠開始讀操做的判斷。一、channel.ready(),這裏作了兩個判斷,一個是Kafka的認證器是否定證經過,另外一個則是是否握手成功。二、key.isReadable(),selectionKey是否關注了OP_READ。三、!hasStagedReceive(channel),判斷該channel是否在hasStagedReceive這個map裏面,若是該channel正在讀,那麼它會在這個map裏面,直到讀取完成。this

// channel是否已經準備好從鏈接中讀取任何可讀數據
            /* if channel is ready read from any connections that have readable data */
            if (channel.ready() // 鏈接的三次握手完成,而且 todo 權限驗證經過
                && key.isReadable() // key是否關注了read事件
                && !hasStagedReceive(channel)) {// todo 這個通道不能是正在讀數據的,由於在讀的時候,會把這個channel扔進stagedReceives裏面
                NetworkReceive networkReceive;

                /**
                 * 實際上這裏就是分屢次去一個channel取數據,直到取完,並將其保存在key:channel  value:new ArrayDeque<NetworkReceive> 中
                 */
                 while ((networkReceive = channel.read()) != null) {
                    // 將屢次接收的數據放進stagedReceives下channel的Deque裏面
                    addToStagedReceives(channel, networkReceive);
                }
            }

剩下的channel.read()就比較簡單了,KafkaChannel裏面封裝了一個NetworkReceives,而NetworkReceives主要就是對ByteBuffer的封裝。spa

咱們將該NioChannel傳入,調用channel.read(size)方法,這個size,其實就是一個ByteBuffer,它是kafka協議中用來判斷包體有多長的包頭。rest

第一步,先判斷byteBuffer(size)中是否還有剩餘空間

第二步,從nioChannel中將數據讀到byteBuffer中

第三步,判斷byteBuffer是否是裝滿了

第四步,若是裝滿了,證實size這個bytebuffer已經拿到了包體的長度,調用readInt獲取其capacity,再用這個capacity去申請一個用於接收包體的byteBuffer(buffer)。

第五步,正式地將channel中的數據中讀取到byteBuffer(buffer)

public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0) {
                throw new EOFException();
            }
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                int receiveSize = size.getInt();
                if (receiveSize < 0) {
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                }
                if (maxSize != UNLIMITED && receiveSize > maxSize) {
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                }

                this.buffer = ByteBuffer.allocate(receiveSize);
            }
        }

        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0) {
                throw new EOFException();
            }
            read += bytesRead;
        }

        return read;
    }

讀取完成以後,再作一下校驗:就會返回了,也就是上面while ((networkReceive = channel.read()) != null)拿到的這個networkReceives,裏面裝着包頭和包體。這裏Kafka有一個小操做,就是將kafkaChannel內的networkReceive的引用賦值給外面的這個networkReceive後,會將kafkaChannel內的networkReceive的引用置爲空。

/**
     * 接收數據,將數據保存在 NetworkReceive
     */
    public NetworkReceive read() throws IOException {
        NetworkReceive result = null;

        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }

        receive(receive);// 這個方法就是上面說了一大堆第一步第二步第三步的那個方法。
        if (receive.complete()) {
            receive.payload()
                   .rewind();
            result = receive;
            receive = null;
        }
        return result;
    }

####二、寫操做 寫操做要比讀操做更加簡單,上面有一個預發送操做,就是將要send的對象Send

/**
                 * 發送時其實也有一次沒發送完的狀況,每發送完的話,就不會出如今completedSends裏面
                 */
                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                // 若是channel已經ready 而且 咱們有數據來準備好寫sockets
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    // 這裏會將KafkaChannel的send字段發送出去,
                    // 若是未完成發送,或者沒發完,則返回null
                    // 發送成功則返回send對象
                    if (send != null) {
                        this.completedSends.add(send);// 添加到completedSends集合
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

主要的發送方法就是channel.write();

public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

而write方法中最核心的方法則是send(send),這個send對象也是一個byteBuffer對象。底層中的底層仍是調用了channel.write(byteBuffer方法)

@Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0) {
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        }
        remaining -= written;
        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
        // GatheringByteChannel or ScatteringByteChannel.

        // 這是一個臨時工做區,當發送時,接收數據的接口一直被BlockingChannel使用着。
        // 一旦BlockingChannel 被移除,咱們就能夠開始咱們的發送操做,接收經過 transportLayer 來工做而不是 GatheringByteChannel 或 ScatteringByteChannel
        if (channel instanceof TransportLayer) {
            pending = ((TransportLayer) channel).hasPendingWrites();
        }

        return written;
    }

參考: Java NIO 的前生今世 之四 NIO Selector 詳解 《Apache Kafka 源碼剖析》 - 徐郡明著 Apache Kafka 源碼 0.10.0.1

相關文章
相關標籤/搜索