Kafka Network層解析

咱們知道kafka是基於TCP鏈接的。其並無像不少中間件使用netty做爲TCP服務器。而是本身基於Java NIO寫了一套。關於kafka爲何沒有選用netty的緣由能夠看這裏java

對Java NIO不太瞭解的同窗能夠先看下這兩篇文章,本文須要讀者對NIO有必定的瞭解。node

segmentfault.com/a/119000001…git

www.jianshu.com/p/0d497fe54…github

更多文章見我的博客:github.com/farmerjohng…apache

幾個重要類

先看下Kafka Client的網絡層架構,圖片來自於這篇文章segmentfault

image

本文主要分析的是Network層。服務器

Network層有兩個重要的類:SelectorKafkaChannel網絡

這兩個類和Java NIO層的java.nio.channels.SelectorChannel有點相似。架構

Selector幾個關鍵字段以下app

// jdk nio中的Selector
java.nio.channels.Selector nioSelector;
// 記錄當前Selector的全部鏈接信息
Map<String, KafkaChannel> channels;
// 已發送完成的請求
List<Send> completedSends;
// 已收到的請求
List<NetworkReceive> completedReceives;
// 尚未徹底收到的請求,對上層不可見
Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
// 做爲client端,調用connect鏈接遠端時返回true的鏈接
Set<SelectionKey> immediatelyConnectedKeys;
// 已經完成的鏈接
List<String> connected;
// 一次讀取的最大大小
int maxReceiveSize;

複製代碼

從網絡層來看kafka是分爲client端(producer和consumer,broker做爲從時也是client)和server端(broker)的。本文將分析client端是如何創建鏈接,以及收發數據的。server也是依靠SelectorKafkaChannel進行網絡傳輸。在Network層兩端的區別並不大。

創建鏈接

kafka的client端啓動時會調用Selector#connect(下文中如無特殊註明,均指org.apache.kafka.common.network.Selector)方法創建鏈接。

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    if (this.channels.containsKey(id))
        throw new IllegalStateException("There is already a connection for id " + id);
    // 建立一個SocketChannel
    SocketChannel socketChannel = SocketChannel.open();
    // 設置爲非阻塞模式
    socketChannel.configureBlocking(false);
    // 建立socket並設置相關屬性
    Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setSendBufferSize(sendBufferSize);
    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setReceiveBufferSize(receiveBufferSize);
    socket.setTcpNoDelay(true);
    boolean connected;
    try {
        // 調用SocketChannel的connect方法,該方法會向遠端發起tcp建連請求
        // 由於是非阻塞的,因此該方法返回時,鏈接不必定已經創建好(即完成3次握手)。鏈接若是已經創建好則返回true,不然返回false。通常來講server和client在一臺機器上,該方法可能返回true。
        connected = socketChannel.connect(address);
    } catch (UnresolvedAddressException e) {
        socketChannel.close();
        throw new IOException("Can't resolve address: " + address, e);
    } catch (IOException e) {
        socketChannel.close();
        throw e;
    }
    // 對CONNECT事件進行註冊
    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    KafkaChannel channel;
    try {
        // 構造一個KafkaChannel
        channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    } catch (Exception e) {
      ...
    }
    // 將kafkachannel綁定到SelectionKey上
    key.attach(channel);
    // 放入到map中,id是遠端服務器的名稱
    this.channels.put(id, channel);
    // connectct爲true表明該鏈接不會再觸發CONNECT事件,因此這裏要單獨處理
    if (connected) {
        // OP_CONNECT won't trigger for immediately connected channels
        log.debug("Immediately connected to node {}", channel.id());
        // 加入到一個單獨的集合中
        immediatelyConnectedKeys.add(key);
        // 取消對該鏈接的CONNECT事件的監聽
        key.interestOps(0);
    }
}
複製代碼

這裏的流程和標準的NIO流程差很少,須要單獨說下的是socketChannel#connect方法返回true的場景,該方法的註釋中有提到

* <p> If this channel is in non-blocking mode then an invocation of this
* method initiates a non-blocking connection operation.  If the connection
* is established immediately, as can happen with a local connection, then
* this method returns <tt>true</tt>.  Otherwise this method returns
* <tt>false</tt> and the connection operation must later be completed by
* invoking the {@link #finishConnect finishConnect} method.
複製代碼

也就是說在非阻塞模式下,對於local connection,鏈接可能在立刻就創建好了,那該方法會返回true,對於這種狀況,不會再觸發以後的connect事件。所以kafka用一個單獨的集合immediatelyConnectedKeys將這些特殊的鏈接記錄下來。在接下來的步驟會進行特殊處理。

以後會調用poll方法對網絡事件監聽:

public void poll(long timeout) throws IOException {
...
// select方法是對java.nio.channels.Selector#select的一個簡單封裝
int readyKeys = select(timeout);
...
// 若是有就緒的事件或者immediatelyConnectedKeys非空
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
    // 對已就緒的事件進行處理,第2個參數爲false
    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
    // 對immediatelyConnectedKeys進行處理。第2個參數爲true
    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}

addToCompletedReceives();

...
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍歷集合
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 移除當前元素,要否則下次poll又會處理一遍
    iterator.remove();
    // 獲得connect時建立的KafkaChannel
    KafkaChannel channel = channel(key);
   ...

    try {
        // 若是當前處理的是immediatelyConnectedKeys集合的元素或處理的是CONNECT事件
        if (isImmediatelyConnected || key.isConnectable()) {
            // finishconnect中會增長READ事件的監聽
            if (channel.finishConnect()) {
                this.connected.add(channel.id());
                this.sensors.connectionCreated.record();
                ...
            } else
                continue;
        }

        // 對於ssl的鏈接還有些額外的步驟
        if (channel.isConnected() && !channel.ready())
            channel.prepare();

        // 若是是READ事件
        if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
            NetworkReceive networkReceive;
            while ((networkReceive = channel.read()) != null)
                addToStagedReceives(channel, networkReceive);
        }

        // 若是是WRITE事件
        if (channel.ready() && key.isWritable()) {
            Send send = channel.write();
            if (send != null) {
                this.completedSends.add(send);
                this.sensors.recordBytesSent(channel.id(), send.size());
            }
        }

        // 若是鏈接失效
        if (!key.isValid())
            close(channel, true);

    } catch (Exception e) {
        String desc = channel.socketDescription();
        if (e instanceof IOException)
            log.debug("Connection with {} disconnected", desc, e);
        else
            log.warn("Unexpected error from {}; closing connection", desc, e);
        close(channel, true);
    } finally {
        maybeRecordTimePerConnection(channel, channelStartTimeNanos);
    }
}
}
複製代碼

由於immediatelyConnectedKeys中的鏈接不會觸發CONNNECT事件,因此在poll時會單獨對immediatelyConnectedKeys的channel調用finishConnect方法。在明文傳輸模式下該方法會調用到PlaintextTransportLayer#finishConnect,其實現以下:

public boolean finishConnect() throws IOException {
    // 返回true表明已經鏈接好了
    boolean connected = socketChannel.finishConnect();
    if (connected)
        // 取消監聽CONNECt事件,增長READ事件的監聽
        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
    return connected;
}
複製代碼

關於immediatelyConnectedKeys更詳細的內容能夠看看這裏

發送數據

kafka發送數據分爲兩個步驟:

1.調用Selector#send將要發送的數據保存在對應的KafkaChannel中,該方法並無進行真正的網絡IO

// Selector#send
public void send(Send send) {
    String connectionId = send.destination();
    // 若是所在的鏈接正在關閉中,則加入到失敗集合failedSends中
    if (closingChannels.containsKey(connectionId))
        this.failedSends.add(connectionId);
    else {
        KafkaChannel channel = channelOrFail(connectionId, false);
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(connectionId);
            close(channel, false);
        }
    }
}

//KafkaChannel#setSend
public void setSend(Send send) {
    // 若是還有數據沒有發送出去則報錯
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
    // 保存下來
    this.send = send;
    // 添加對WRITE事件的監聽
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
複製代碼
  1. 調用Selector#poll,在第一步中已經對該channel註冊了WRITE事件的監聽,因此在當channel可寫時,會調用到pollSelectionKeys將數據真正的發送出去。
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍歷集合
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 移除當前元素,要否則下次poll又會處理一遍
    iterator.remove();
    // 獲得connect時建立的KafkaChannel
    KafkaChannel channel = channel(key);
   ...

    try {
        ...
 

        // 若是是WRITE事件
        if (channel.ready() && key.isWritable()) {
            // 真正的網絡寫
            Send send = channel.write();
            // 一個Send對象可能會被拆成幾回發送,write非空表明一個send發送完成
            if (send != null) {
                // completedSends表明已發送完成的集合
                this.completedSends.add(send);
                this.sensors.recordBytesSent(channel.id(), send.size());
            }
        }
		...
    } catch (Exception e) {
     ...
    } finally {
        maybeRecordTimePerConnection(channel, channelStartTimeNanos);
    }
}
}
複製代碼

當可寫時,會調用KafkaChannel#write方法,該方法中會進行真正的網絡IO:

public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}
private boolean send(Send send) throws IOException {
    // 最終調用SocketChannel#write進行真正的寫
    send.writeTo(transportLayer);
    if (send.completed())
        // 若是寫完了,則移除對WRITE事件的監聽
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

    return send.completed();
}
複製代碼

接收數據

若是遠端有發送數據過來,那調用poll方法時,會對接收到的數據進行處理。

public void poll(long timeout) throws IOException {
...
// select方法是對java.nio.channels.Selector#select的一個簡單封裝
int readyKeys = select(timeout);
...
// 若是有就緒的事件或者immediatelyConnectedKeys非空
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
    // 對已就緒的事件進行處理,第2個參數爲false
    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
    // 對immediatelyConnectedKeys進行處理。第2個參數爲true
    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}

addToCompletedReceives();

...
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍歷集合
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 移除當前元素,要否則下次poll又會處理一遍
    iterator.remove();
    // 獲得connect時建立的KafkaChannel
    KafkaChannel channel = channel(key);
   ...

    try {
        ...
 

        // 若是是READ事件
        if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
            NetworkReceive networkReceive;
            // read方法會從網絡中讀取數據,但可能一次只能讀取一個req的部分數據。只有讀到一個完整的req的狀況下,該方法才返回非null
            while ((networkReceive = channel.read()) != null)
                // 將讀到的請求存在stagedReceives中
                addToStagedReceives(channel, networkReceive);
        }
		...
    } catch (Exception e) {
     ...
    } finally {
        maybeRecordTimePerConnection(channel, channelStartTimeNanos);
    }
}
}

private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
    if (!stagedReceives.containsKey(channel))
        stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

    Deque<NetworkReceive> deque = stagedReceives.get(channel);
    deque.add(receive);
}
複製代碼

在以後的addToCompletedReceives方法中會對該集合進行處理。

private void addToCompletedReceives() {
    if (!this.stagedReceives.isEmpty()) {
        Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
            KafkaChannel channel = entry.getKey();
            // 對於client端來講該isMute返回爲false,server端則依靠該方法保證消息的順序
            if (!channel.isMute()) {
                Deque<NetworkReceive> deque = entry.getValue();
                addToCompletedReceives(channel, deque);
                if (deque.isEmpty())
                    iter.remove();
            }
        }
    }
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
    // 將每一個channel的第一個NetworkReceive加入到completedReceives
    NetworkReceive networkReceive = stagedDeque.poll();
    this.completedReceives.add(networkReceive);
    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}
複製代碼

讀出數據後,會先放到stagedReceives集合中,而後在addToCompletedReceives方法中對於每一個channel都會從stagedReceives取出一個NetworkReceive(若是有的話),放入到completedReceives中。

這樣作的緣由有兩點:

  1. 對於SSL的鏈接來講,其數據內容是加密的,因此不能精準的肯定本次須要讀取的數據大小,只能儘量的多讀,這樣會致使可能會比請求的數據讀的要多。那若是該channel以後沒有數據能夠讀,會致使多讀的數據將不會被處理。
  2. kafka須要確保一個channel上request被處理的順序是其發送的順序。所以對於每一個channel而言,每次poll上層最多隻能看見一個請求,當該請求處理完成以後,再處理其餘的請求。在sever端,每次poll後都會將該channel給mute掉,即再也不從該channel上讀取數據。當處理完成以後,纔將該channelunmute,即以後能夠從該socket上讀取數據。而client端則是經過InFlightRequests#canSendMore控制。

代碼中關於這段邏輯的註釋以下:

/* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting, * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses. * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted * we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's * application buffer size. This means we might be reading additional bytes than the requested size. * If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0 * and pop response and add to the completedReceives. * Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added * by SocketServer to the request queue may be processed by different request handler threads, requests on each * channel must be processed one-at-a-time to guarantee ordering. */
複製代碼

End

本文分析了kafka network層的實現,在閱讀kafka源碼時,若是不把network層搞清楚會比較迷,好比req/resp的順序保障機制、真正進行網絡IO的不是send方法等等。

相關文章
相關標籤/搜索