Kafka源碼剖析 —— 網絡I/O篇 —— 淺析KafkaSelector

##NioSelector和KafkaSelector有什麼區別?java

先說結論,KafkaSelector(org.apache.kafka.common.network.selector)是對NioSelector(java.nio.channels.Selector)的進一步封裝。回想一下NioSelector,它參與了IO中的哪些過程?node

一、建立一個通道,並將通道註冊到NioSelector上,咱們能夠獲得一個SelectionKey 二、輪詢NioSelector中的ready集合,拿到對應的SelectionKey,並根據這個SelectionKey所關注的事件去執行對應的操做apache

實際上,KafkaSelector也是在調用NioSelector去執行這些操做,待補充……網絡

##1、建立鏈接socket

KafkaSelector建立鏈接,和普通的NioSelector並無什麼不一樣,首先建立一個通道,並將其設置爲非阻塞式的長鏈接,設置完畢後,執行鏈接操做。ui

SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);// 非阻塞模式
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);// 設置爲長鏈接
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
            socket.setSendBufferSize(sendBufferSize);// 設置SO_SNDBUF 大小
        }
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
            socket.setReceiveBufferSize(receiveBufferSize);// 設置 SO_RCVBUF 大小
        }
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            connected = socketChannel.connect(address);// 由於是非阻塞模式,因此方法可能會在鏈接正式鏈接以前返回
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }

建立完通道後,將其註冊到NioSelector上,並關注OP_CONNECT,再以節點Id,SelectionKey來建立KafkaChannel,這裏先不詳細說明KafkaChannel,它是對通道的進一步封裝。在建立完KafkaChannel後,將KafkaChannel與SelectionKey、節點ID作進一步綁定。this

SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 將當前這個socketChannel註冊到nioSelector上,並關注OP_CONNECT事件
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel
        key.attach(channel);// 將channel綁定到key上
        this.channels.put(id, channel);// 將 nodeId 和 Channel綁定

這樣有一個好處,首先KafkaChannel中包含了節點ID與SelectionKey,而咱們也能夠根據節點ID來拿到KafkaChannel,一樣能夠根據SelectionKey來拿到KafkaChannel,這就意味着,咱們只要拿到了KafkaChannel、SelectionKey、節點ID中的任意一個,均可以經過這些引用關係拿到彼此,從而進行相關操做。.net

##2、預發送 實際上就是將要發送的ByteBuffer扔進KafkaChannel,此時並未進行IO操做,這裏的Send對象,實際上就是對ByteBuffer的進一步封裝,它主要包含了將要發往的節點ID、ByteBuffer大小、是否發送完畢等信息。咱們這裏根據節點ID,從咱們剛纔的channels中,取出KafkaChannel。code

public void send(Send send) {
        // 看看send要發的這個nodeId在不在
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            // 把數據扔進KafkaChannel中(只能放一個,放多個會報錯),並關注write事件
            channel.setSend(send);
        } catch (CancelledKeyException e) {

            // 失敗了加一條node_id的失敗記錄
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

##3、進行IO操做 來到了咱們比較熟悉的輪詢環節,從NioSelector中取出全部SelectionKey進行輪詢。對象

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 處理I/O的核心方法
            pollSelectionKeys(immediatelyConnectedKeys, true);
}
 
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
		Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            // 建立鏈接時(connect)將kafkaChannel註冊到key上,就是爲了在這裏獲取
            KafkaChannel channel = channel(key);

……………………

#####一、判斷一下key 鏈接好了沒有,由於咱們用的是非阻塞鏈接,因此到了輪詢階段,尚未完成鏈接是正常的。

if (isImmediatelyConnected || key.isConnectable()) {
                    // finishConnect方法會先檢測socketChannel是否創建完成,創建後,會取消對OP_CONNECT事件關注,//TODO 並開始關注OP_READ事件
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());// 將當前channel id 添加到已鏈接的集合中
                        this.sensors.connectionCreated.record();
                    } else {
                        continue;// 表明鏈接未完成,則跳過對此Channel的後續處理
                    }
                }

#####二、身份驗證(略過) #####三、判斷KafkaChannel有沒有準備好,有沒有關注OP_READ,能不能讀之類的,並進行讀操做。 這裏有一個判斷,就是判斷當前的KafkaChannel是否是在StagedReceives裏。咱們日後看看,在從網絡上讀取數據時,咱們會將KafkaChannel扔進StagedReceives裏,也就是說,若是這個KafkaChannel已經在StagedReceives裏了,那麼表明它已經在讀數據了。

if (channel.ready() // 鏈接的三次握手完成,而且 todo 權限驗證經過
                    && key.isReadable() // key是否關注了read事件
                    && !hasStagedReceive(channel)) {// todo 這個通道不能是正在讀數據的,由於在讀的時候,會把這個channel扔進stagedReceives裏面
                    NetworkReceive networkReceive;

                    /**
                     * 實際上這裏就是分屢次去一個channel取數據,直到取完,並將其保存在key:channel  value:new ArrayDeque<NetworkReceive> 中
                     */
                    while ((networkReceive = channel.read()) != null) {
                        addToStagedReceives(channel, networkReceive);
                    }
                }

#####四、判斷KafkaChannel有沒有準備好,有沒有關注OP_WRITE,並進行寫操做

if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    // 這裏會將KafkaChannel的send字段發送出去,
                    // 若是未完成發送,或者沒發完,則返回null
                    // 發送成功則返回send對象
                    if (send != null) {
                        this.completedSends.add(send);// 添加到completedSends集合
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

##4、關閉空閒鏈接 在每一次IO操做完畢後,KafkaSelector都會調用一個方法,去關閉掉那些沒怎麼用的鏈接,實際上它就是一個基於時間戳的斷連機制。 KafkaSelector中維護了一個哈希表,

LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);

在每次進行IO操做時,將Key:節點ID,Value:當前時間戳扔進哈希表裏面,在IO操做進行完畢時,檢查一下,最大的那個節點,它的最後一次IO時間+connectionsMaxIdleNanos(建立KafkaSelector時指定),是否超過了當前的時間。 若是是,這個鏈接就會被關掉。

好比說connectionsMaxIdleNanos被指定成了1分鐘,那麼若是這個有序哈希表的最後一個節點的時間是一分鐘以前,那麼這個節點ID的通道將會被關掉。

private void maybeCloseOldestConnection() {
        if (currentTimeNanos > nextIdleCloseCheckTime) {
            if (lruConnections.isEmpty()) {
                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
            } else {
                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet()
                                                                              .iterator()
                                                                              .next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
                if (currentTimeNanos > nextIdleCloseCheckTime) {
                    String connectionId = oldestConnectionEntry.getKey();
                    if (log.isTraceEnabled()) {
                        log.trace("About to close the idle connection from " + connectionId
                            + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
                    }

                    disconnected.add(connectionId);
                    close(connectionId);
                }
            }
        }
    }

參考: 《Apache Kafka 源碼剖析》 - 徐郡明著 Apache Kafka 源碼 0.10.0.1

相關文章
相關標籤/搜索