kafka-網絡層Selector

Selector介紹

Selector是對java nio中Selector的封裝,額外提供了對kafka請求或者響應的處理方法java

框架圖

輸入圖片說明

Selectable接口

Selector實現了Selectable接口。下面列舉它的定義框架

public interface Selectable {
    public void close(String id);

    // 發送數據
    public void send(Send send);

    // 等待timeout時間,而後處理就緒的鏈接
    public void poll(long timeout) throws IOException;

    // 返回已經成功發送的數據
    public List<Send> completedSends();

    // 返回已經成功接收的數據
    public List<NetworkReceive> completedReceives();

    // 返回關閉的鏈接,ChannelState表示鏈接關閉前最後的狀態,在KafkaChannel會有介紹
    public Map<String, ChannelState> disconnected();

    // 返回已經成功的鏈接
    public List<String> connected();

    // 取消監聽讀事件
    public void mute(String id);

    // 開啓監聽讀事件
    public void unmute(String id);

    // 鏈接是否已經就緒(好比ssl握手過程是否完成)
    public boolean isChannelReady(String id);
}

Selector類

register方法由Processor調用,負責註冊該鏈接socket

public class Selector implements Selectable, AutoCloseable {
    // nioSelector負責鏈接的事件監聽器
    private final java.nio.channels.Selector nioSelector;
    
    // channels集合
    private final Map<String, KafkaChannel> channels;

    public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
        // 監聽鏈接的讀事件
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
        // 構造KafkaChannel,而且綁定到該鏈接。KafkaChannel負責請求的解析和響應的發送
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        key.attach(channel);
        // 更新channels
        this.channels.put(id, channel);
    }
}

poll方法,是Selector最重要的方法。它負責監聽事件,並作相應的處理ui

public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
        // 清除
        clear();
        // hasStagedReceives返回是否還有receive處理。若是有,則設置timeout爲0,表示不等待。
        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;
        // 等待socket事件就緒
        int readyKeys = select(timeout);

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            //  處理就緒的鏈接
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            // 處理immediatelyConnectedKeys的鏈接
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }
        // 更新請求
        addToCompletedReceives();

        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        maybeCloseOldestConnection(endSelect);
    }

clear方法,清除上一次的鏈接列表this

public class Selector implements Selectable, AutoCloseable {
    
    // 完成發送的列表
    private final List<Send> completedSends;
    // 已經成功接收完的請求列表
    private final List<NetworkReceive> completedReceives;
    // 關閉的鏈接列表
    private final Map<String, ChannelState> disconnected;
    // 已經創建的鏈接列表
    private final List<String> connected;
    // 準備關閉的鏈接列表,當這個鏈接的請求處理完後,就會移除
    private final Map<String, KafkaChannel> closingChannels;
    // 發送send失敗的channel列表。用來更新disconnected的狀態
    private final List<String> failedSends;

   private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
            
        // 遍歷準備關閉的鏈接,當異常關閉的鏈接,其請求已所有讀取完,就關閉
        for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) {
            KafkaChannel channel = it.next().getValue();
            Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
            // 是否發送send失敗
            boolean sendFailed = failedSends.remove(channel.id());
            if (deque == null || deque.isEmpty() || sendFailed) {
                // 關閉鏈接,而且從closingChannels刪除
                doClose(channel, true);
                it.remove();
            }
        }
        // 遍歷發送失敗的鏈接
        for (String channel : this.failedSends) {
            KafkaChannel failedChannel = closingChannels.get(channel);
            if (failedChannel != null)
                // 更新channel的state
                failedChannel.state(ChannelState.FAILED_SEND);
            // 添加到disconnected列表裏
            this.disconnected.put(channel, ChannelState.FAILED_SEND);
        }
        this.failedSends.clear();
    }
}

pollSelectionKeys方法,處理鏈接相關的事件code

public class Selector implements Selectable, AutoCloseable {
   private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        // 遍歷Keys
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
         while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            // 獲取與key綁定的KafkaChannel
            KafkaChannel channel = channel(key);
            try {
                // connect事件
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        // 必須等到KafkaChannel完成鏈接,才加入到connected列表
                        this.connected.add(channel.id());
                    } else
                        continue;
                }
                // 若是channel尚未ready 
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();
                //讀事件
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    // channel已經準備好,而且尚未沒有讀取完的Receive
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        // channel.read若是返回null,表示一條請求的數據還沒接收完整,須要繼續監聽讀事件
                        // 加入到未處理的Receive隊列
                        addToStagedReceives(channel, networkReceive);
                }

                // 寫事件
                if (channel.ready() && key.isWritable()) {
                    // channel.write若是發送完,將會send。如沒有,則會返回null
                    Send send = channel.write();
                    if (send != null) {
                        // 更新completedSends列表
                        this.completedSends.add(send);
                    }
                }
                if (!key.isValid())
                    //若是鏈接失效,則關閉鏈接
                    close(channel, true);
            } catch (Exception e) {
                // 關閉鏈接
                close(channel, true);
            } finally {
                .......
            }
    }
}

addToCompletedReceives方法,從stagedReceives取出Receive到completedReceives列表裏接口

private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            // 遍歷stagedReceives map,
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (!channel.isMute()) {
                    // 若是channel還在監聽讀事件,則將Receive添加到completedReceives隊列
                    Deque<NetworkReceive> deque = entry.getValue();
                    addToCompletedReceives(channel, deque);
                    if (deque.isEmpty())
                        iter.remove();
                }
            }
        }
    }
    
    // 從stagedDeque取出一個Receive,添加到completedReceives裏面
    private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
        NetworkReceive networkReceive = stagedDeque.poll();
        this.completedReceives.add(networkReceive);
    }

close方法隊列

private void close(KafkaChannel channel, boolean processOutstanding) {
        // 關閉鏈接
        channel.disconnect();

       // 獲取channel有哪些未處理的Receive
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        if (processOutstanding && deque != null && !deque.isEmpty()) {
            // processOutstanding表示是否須要處理未完成的請求
            if (!channel.isMute()) {
                // 若是鏈接還在監聽讀事件
                addToCompletedReceives(channel, deque);
                if (deque.isEmpty())
                    this.stagedReceives.remove(channel);
            }
            // 添加到closingChannels列表,表示即將關閉
            closingChannels.put(channel.id(), channel);
        } else
            // 不然關閉channel
            doClose(channel, processOutstanding);
        this.channels.remove(channel.id());
    }

send 方法事件

public void send(Send send) {
        String connectionId = send.destination();
        if (closingChannels.containsKey(connectionId))
            this.failedSends.add(connectionId);
        else {
            KafkaChannel channel = channelOrFail(connectionId, false);
            try {
                // 調用channel setSend方法
                channel.setSend(send);
            } catch (CancelledKeyException e) {
                this.failedSends.add(connectionId);
                close(channel, false);
            }
        }
    }

歸納

Selector實現了Selectable的接口。圖片

Processor會從Selector獲取 請求列表completedReceives,發送列表completedSends, disconnected 鏈接關閉列表.

當鏈接被意外關掉後,Selector仍然會處理完全部的請求

相關文章
相關標籤/搜索