org.apache.kafka.common.network.Selector

org.apache.kafka.common.client.Selector實現了Selectable接口,用於提供符合Kafka網絡通信特色的異步的、非阻塞的、面向多個鏈接的網絡I/O.html

這些網絡IO包括了鏈接的建立、斷開,請求的發送和接收,以及一些網絡相關的metrics統計等功能。java

因此,它實際上應該至少具體如下功能node

使用

首先得談一下Selector這東西是準備怎麼讓人用的。這個註釋裏說了一部分:react

A nioSelector interface for doing non-blocking multi-connection network I/O.
This class works with NetworkSend and NetworkReceive to transmit size-delimited network requests and responses.apache

A connection can be added to the nioSelector associated with an integer id by doing
nioSelector.connect("42", new InetSocketAddress("google.com", server.port), 64000, 64000);

The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating the connection. The successful invocation of this method does not mean a valid connection has been established. Sending requests, receiving responses, processing connection completions, and disconnections on the existing connections are all done using the poll() call.
nioSelector.send(new NetworkSend(myDestination, myBytes));
nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
nioSelector.poll(TIMEOUT_MS);

The nioSelector maintains several lists that are reset by each call to poll() which are available via various getters. These are reset by each call to poll(). This class is not thread safe!編程

首先,Selector的API都是非阻塞或者帶有阻塞超時時間的,這個特色直接源於Java NIO的Selector和SocketChannel的特性。這種異步非阻塞的IO帶來的問題就是,必須時不時地調用某個方法,來檢測IO完成的進度狀況,對於NIO的selector,這個方法就是select,對於Kafka的Selector,這個方法就是poll.設計模式

爲此,註釋裏舉了一個典型的例子,這是一個發送數據的例子:api

nioSelector.send(new NetworkSend(myDestination, myBytes));
nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
nioSelector.poll(TIMEOUT_MS);

可是Kafka Selector的poll不只檢測IO的進度,它還執行IO操做,好比當發現有channel可讀了,它就從中讀數據出來。那麼,是否能夠說Kafka的Selector執行的是異步IO呢?下面來談下這個問題。服務器

異步IO vs 同步非阻塞IO網絡

異步IO是說實際的IO動做是由操做系統調用另外的線程或者其它的計算資源來作的。那麼,想要肯定Selector執行的是不是異步IO,得先看下它所構建的Channel是哪種,畢竟不是全部的channel都支持異步IO。

Selector建立channel的動做是在#connect(String, InetSocketAddress, int, int)方法中。

 SocketChannel socketChannel = SocketChannel.open();
 socketChannel.configureBlocking(false);

它是建了一個SocketChannel.而SocketChannel並不能進行異步IO,當它被設爲no-blocking模式時,進行的是非阻塞的IO。在Java7中,引入了AsynchronizedSocketChannel,它進行的纔是真正的異步IO。

參見

兩種高性能I/O設計模式(Reactor/Proactor)的比較 

Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations

An NIO.2 primer, Part 1: The asynchronous channel APIs

內部狀態

因爲Selector的各個方法是非阻塞的,所以須要保存每一個操做當前的完成進度。好比,正在寫,寫完成,讀完成,鏈接創建成功,等。這樣在調用者調用了poll方法之後,調用者能夠檢查各個操做完成的狀況。

 Selector內部的確有一些集合來保存這些信息:

    private final Map<String, KafkaChannel> channels; //有正在鏈接以及鏈接成功的channel,注意它的類型是KafkaChannel
    private final List<Send> completedSends;  //已發送完的請求
    private final List<NetworkReceive> completedReceives;  //已接收完成的響應。注意,這個集合並無包括全部已接收完成的響應,stagedReceives集合也包括了一些接收完成的響應
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;  //已接收完成,但尚未暴露給用戶的響應
    private final Set<SelectionKey> immediatelyConnectedKeys; //在調用SocketChannel#connect方法時當即完成的SelectionKey.爲何保存的是SelectionKey呢?
    private final List<String> disconnected; //已斷開鏈接的節點
    private final List<String> connected; //新鏈接成功的節點
    private final List<String> failedSends; //發送失敗的節點,但並非因爲IO異常致使的失敗,而是因爲SelectionKey被cancel引發的失敗,好比對一個已關閉的channel設置interestOps

 

可是這裏的集合有些並非按照channel來組織的。好比:completedSend,  completedReceives, disconnected, connected和failedSends。由於這些集合是在一個poll以後,Selector的使用者應該處理的,它們是按照類型組織。在poll執行的最開始,它會調用clear方法,清空這些集合,由於它們是上次poll的結果。因此,在一次poll以後查看這些結果的話,看到的就是此次poll的結果。

    /**
     * Clear the results from the prior poll
     */
    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        this.disconnected.addAll(this.failedSends);
        this.failedSends.clear();
    }

這裏之因此把failedSends加到disconnected之中,是由於failedSends裏保存的失敗的send,並非上次poll留下來的,而是上次poll以後,這次poll以前,調用send方法時添加到failedSends集合中的。當有failedSends時,selector就會關閉這個channel,所以在clear過程當中,須要把failedSends裏保存的節點加到disconnected之中。

須要注意的是,這些集合裏並無包括正在發送以及正在接收的請求。緣由是KafkaChannel對象自己持有正在處理的請求和響應。

public class KafkaChannel {
    private final String id;
    private final TransportLayer transportLayer;
    private final Authenticator authenticator;
    private final int maxReceiveSize;
    private NetworkReceive receive;
    private Send send;
 
...
}

這裏須要注意是是它的setSend和read方法

    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; }

當一個send正在發送的過程當中,send != null, 此時調用setSend會拋出IllegalStateException。那麼,Selector在能夠在一個poll以前能夠往一個channel發送多個請求嗎?

canSendMore

這個須要須要追溯到哪些方法會調用KafkaChannel#setSend。結果只有NetworkClient的send(ClientRequest, long)方法會最終調到它。

而NetworkClient的send方法是這樣的

   public void send(ClientRequest request, long now) {
        String nodeId = request.request().destination();
        if (!canSendRequest(nodeId))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        doSend(request, now);
    }

    private boolean canSendRequest(String node) {
        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
    }

這裏connectionStates.isConnected用來檢測節點是否已經鏈接上。selector.isChannelReady()用來檢測channel是否準備完成。因爲Kafka security的一些要求,當socket channel鏈接創建完成後,可能還須要跟server交換一些認證數據,才能認爲channel準備完畢。那麼,重點就在於inFlightRequest.canSendMore這個方法了。由於若是它不檢測一個channel是否有正在發送的send,就可能會在調用NetworkClient#send時,再試圖給這個channel添加一個send,就會引起異常。

InFlightRequest保存了全部已發送,但還沒收到響應的請求。

InFlightRequests的canSendMore是這樣的:

    public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }

重點在於queue.peekFirst().request().completed, 即若是發給這個節點的最先的請求尚未發送完成,是不能再往這個節點發送請求的。

可是,從canSendMore方法中也能夠看出,只要沒有超過maxInFlightRequestsPerConnection,一個node能夠有多個in-flight request的。這點,實際上影響到了另外一個集合的數據結構的選擇——stagedReceives

stagedReceives

    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;

stagedRecieves用來保存已經接收完成,可是尚未暴露給用戶(即沒有放在completedReceive列表中)的NetworkReceive(即響應).

這裏有兩個問題:

  1. stagedRecieves使用時徹底是按照FIFO隊列來用的,所以爲何用Deque,而不用Queue?
  2. 爲何一個KafkaChannel會有多個NetworkRecieves

第二個問題的答案就是NetworkClient的canSendMore方法並無限制一個node只有在全部已發送請求都收到響應的狀況下才能發送新請求。所以,一個node能夠有多個in-flight request,也能夠有多個已發送的請求。所以,Selector也就可能會收到來自於同一個node的多個響應。所以,selector在每次poll的時候,讀取請求的操做是這樣的:

                /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }

也就是說,只要有能夠完整讀出的響應,都會把這些響應放到stagedReceives列表中。這個while循環使得在一次poll中,可能會添加多個NetworkReceive到stagedReceives裏。

可是,每次poll,只會把最先的一個NetworkReceive放在completedReceives裏。

     * checks if there are any staged receives and adds to completedReceives
     */
    private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (!channel.isMute()) {
                    Deque<NetworkReceive> deque = entry.getValue();
                    NetworkReceive networkReceive = deque.poll(); //從這個channel的stagedReceives隊列中取最先的一個
                    this.completedReceives.add(networkReceive);//把它添加到completedRecievs列表中
                    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
                    if (deque.isEmpty())
                        iter.remove();
                }
            }
        }
    }

這個行爲比較奇怪。可能的解釋是這會簡化NetworkClient的實現,形成一種"對每一個channel,poll一次只發送一個請求,只接收一個響應「的假像,使得NetworkClient的用戶更容易處理請求和響應之間的對應關係。既然poll是一個非阻塞操做,用戶就能夠在未收到某個請求的響應時,屢次調用poll,這個也沒什麼問題。由於poll一次並不保證就能收到剛纔發出的請求對應的響應。

至於第一個問題,則是因爲性能的考慮。

addToStagedReceives方法用於把一個NetworkReceive加到某個channel的stagedReceivs隊列中。

    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!stagedReceives.containsKey(channel))
            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        deque.add(receive);
    }

若是這個channel沒有stagedReceives隊列,會給它建一個,此時new的是ArrayDeque對象。這個ArrayDeque是JDK中性能最高的FIFO隊列的實現,優於ArrayList和linkedList.

詳見What is the fastest Java collection with the basic functionality of a Queue?

 

immediatelyConnectedKeys

    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            connected = socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);

        if (connected) {
            // OP_CONNECT won't trigger for immediately connected channels
            log.debug("Immediately connected to node {}", channel.id());
            immediatelyConnectedKeys.add(key);
            key.interestOps(0);
        }
    }

雖然在connect方法中,SocketChannel被設爲non-blocking, 而後調用socketChannel.connect(address),雖然是非阻塞模式,可是connect方法仍然有可能會直接返回ture,表明鏈接成功。connect方法的doc是這麼說的:

If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.

好比,若是是一個本地的鏈接,就可能在非阻模式下也會當即返回鏈接成功。也是挺神奇的,想想,若是認爲」執行指令「是一種阻塞的話,絕對意義上的非阻塞方法是不存在的,不存在執行時間爲零的方法。也就是說,若是進行一個本地鏈接,OS加上JVM是能夠在有限的指令數量和時間段內肯定鏈接成功,這也能夠被認爲是在非阻塞狀態下進行的。

lruConnection

在前邊的connect方法中,socket被配置了keepAlive,能夠檢測出來鏈接斷開的狀況。可是,還有一種狀況須要考慮,就是一個鏈接過久沒有用來執行讀寫操做,爲了下降服務器端的壓力,須要釋放這些的鏈接。因此Selector有LRU機制,來淘汰這樣的鏈接。

在Java裏,實現LRU機制最簡單的就是使用LinkedHashMap, Selector也的確是這麼作的。

private final Map<String, Long> lruConnections;
this.lruConnections = new LinkedHashMap<>(16, .75F, true);

lruConnection的key是node的id, value是上次訪問的時間。它的「順序」被設爲access順序。Selector會用map的put操做來access這個map,當NIO的selector poll出來一批SelectionKey以後,這些key對應的node被從新put進map,以刷新它們的最近訪問順序,同時也把具體的「最近使用時間」做爲entry的value放在這個map中。

這發生在會被每次poll調用的pollSelectionKeys方法中

lruConnections.put(channel.id(), currentTimeNanos);

之因此要在value中保存最近使用時間,是由於這個時間會被用於計算空閒時間,當空閒時間超過了connectionMaxIdleMs時,就會關閉這個鏈接。

在poll的最後,會執行maybeCloseOldestConnection方法,來檢測並關閉須要關閉的鏈接。

    private void maybeCloseOldestConnection() {
        if (currentTimeNanos > nextIdleCloseCheckTime) {
            if (lruConnections.isEmpty()) {
                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
            } else {
                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
                if (currentTimeNanos > nextIdleCloseCheckTime) {
                    String connectionId = oldestConnectionEntry.getKey();
                    if (log.isTraceEnabled())
                        log.trace("About to close the idle connection from " + connectionId
                                + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");

                    disconnected.add(connectionId);
                    close(connectionId);
                }
            }
        }
    }

這裏有幾點要注意:

  1. 並非每次poll都須要執行實際的檢測。假如在某一時刻,咱們得知了此時的least recently used node的access時間,那麼之後最早過時的確定是這個node,所以下一次檢測的時間應至少是這個 access time of LRU node + maxIdleTime. 因此在代碼中,使用這段代碼來重置nextIdelCloseCheckTime
                    Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
                    Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                    nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;

     

  2. maybeCloseOldestConnection每調用一次,最多隻關閉一個鏈接。可是,在關閉鏈接時,它並無根據移除node後的新的LRU node來重置 nextIdelCloseCheckTime。因此下一次調用maybeCloseOldestConnection時,if的判斷條件確定爲true,所以會繼續檢測並關閉鏈接。

這種作法有些不妥,由於這樣作的話一個poll並不能關閉全部應該關閉的空閒鏈接,不能期望用戶接下來會主動地多poll幾回。


 總結

Kafka使用這個抽象出來的Selector的確比直接使用NIO在編程上要好一些,主要是代碼會不那麼臃腫,由於Selector配合KafkaChannel、Send, NetworkReceive, 處理了NIO網絡編程的一些細節。Selector的這些代碼寫的也的確不錯。 不過,poll這個操做被搞得有些教條,被賦予了太多的責任,看起來是爲了迎合Kafka的新consumer的特色搞出來的東西。這個東西讓人想起了回合制的遊戲,設置好下一回合想幹啥,點肯定,而後就喝茶等了。

相關文章
相關標籤/搜索