kafka-網絡層KafkaChannel

KafkaChannel介紹

KafkaChannel負責基於socket的鏈接,認證,數據讀取發送。它包含TransportLayer和Authenticator兩個部分。TransportLayer負責數據交互,Authenticator負責安全驗證。java

框架圖

輸入圖片說明

ChannelBuilders

ChannelBuilders提供了實例化ChannelBuilder的工廠方法,clientChannelBuilder和serverChannelBuilder數組

public class ChannelBuilders {
    // 這裏構造爲私有方法,代表這個類只提供類方法
    private ChannelBuilders() { }
    
    // 實例化客戶端使用的ChannelBuilder
    public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol,
            JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName,
            String clientSaslMechanism, boolean saslHandshakeRequestEnable) {
        return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName,                         
clientSaslMechanism, saslHandshakeRequestEnable, null);
    }

    // 實例化服務端使用的ChannelBuilder
    public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
                               SecurityProtocol securityProtocol, AbstractConfig config,
                                CredentialCache credentialCache) {
        return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,  true, credentialCache);
    }

    private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode,
                                         JaasContext.Type contextType, AbstractConfig config,
                                         ListenerName listenerName, String clientSaslMechanism,
                                         boolean saslHandshakeRequestEnable,  CredentialCache credentialCache) {
        .......
        ChannelBuilder channelBuilder;
        // 根據Protocol,選擇不一樣的channelBuidler
        switch (securityProtocol) {
            case SSL:
                // 基於ssl
                requireNonNullMode(mode, securityProtocol);
                channelBuilder = new SslChannelBuilder(mode);
                break;
            case SASL_SSL:
            case SASL_PLAINTEXT:
                // 基於sasl
                requireNonNullMode(mode, securityProtocol);
                JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
                channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol,
                        clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
                break;
            case PLAINTEXT:
            case TRACE:
                // 沒有任何加密
                channelBuilder = new PlaintextChannelBuilder();
                break;
            default:
                throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
        }

        channelBuilder.configure(configs);
        return channelBuilder;
    }

PlaintextChannelBuilder類

ChannelBuidler是接口,實現其接口的有PlaintextChannelBuilder, SaslChannelBuilder,SslChannelBuilder。其中PlaintextChannelBuilder最爲簡單,因此這裏以它爲例。 ChannelBuidler中最主要的方法是buildChannel,它會建立transportLayer和authenticator,來實例化KafkaChannel。安全

public class PlaintextChannelBuilder implements ChannelBuilder {
    
    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
        try {
            // 實例化TransportLayer
            PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
            // 實例化Authenticator
            Authenticator authenticator = new DefaultAuthenticator();
            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
            // 返回KafkaChannel
            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
        } catch (Exception e) {
            log.warn("Failed to create channel due to ", e);
            throw new KafkaException(e);
        }
    }
}

Selector回顧

先回到Selector的pollSelectionKeys方法,它代表了KafkaChannel方法是什麼時候被調用app

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            KafkaChannel channel = channel(key);
            if (isImmediatelyConnected || key.isConnectable()) {
                    // 調用channel的finishConnect方法,處理鏈接
                    if (channel.finishConnect()) {
                        ......
                    } else
                        continue;
                }

                
                if (channel.isConnected() && !channel.ready())
                    // 而後調用channel的prepare方法,作準備工做(好比ssl鏈接的握手過程)
                    channel.prepare();

               
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    // 當channel準備工做完成,調用channel的read方法,讀取請求
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }
            }
            

        .......
    }

KafkaChannel

KafkaChannel負責鏈接,數據讀取,發送框架

public class KafkaChannel {
    // 首先完成鏈接
    public boolean finishConnect() throws IOException {
        boolean connected = transportLayer.finishConnect();
        if (connected)
            state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
        return connected;
    }

    public boolean isConnected() {
        return transportLayer.isConnected();
    }

    public void prepare() throws IOException {
        //而後握手
        if (!transportLayer.ready())
            transportLayer.handshake();
        // 認證
        if (transportLayer.ready() && !authenticator.complete())
            authenticator.authenticate();
        if (ready())
            // 若是都完成,更新狀態
            state = ChannelState.READY;
    }
    
    public boolean ready() {
        // 當transportLayer和authenticator都完成,channel才認爲狀態準備好了
        return transportLayer.ready() && authenticator.complete();
    }
    
    // channel的讀取請求
    public NetworkReceive read() throws IOException {
        NetworkReceive result = null;

        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }
        // 讀取請求
        receive(receive);
        if (receive.complete()) {
            receive.payload().rewind();
            result = receive;
            receive = null;
        }
        return result;
    }

    private long receive(NetworkReceive receive) throws IOException {
        // 調用NetworkReceive的readFrom方法
        return receive.readFrom(transportLayer);
    }
    
    // 設置send,可是並不着急發送,等待transportLayer寫事件就緒
    public void setSend(Send send) {
        if (this.send != null)
            // 只能一次發送一個Send
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        // 監聽寫事件
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

    // 若是沒有發送完,返回null。若是發送完,返回send。而且更新this.send爲null
    public Send write() throws IOException {
        Send result = null;
        // 調用send發送
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

    private boolean send(Send send) throws IOException {
        // 調用Send的writreTo方法
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
        return send.completed();
    }

NetworkReceive

NetworkReceive表示一個請求。數據格式爲socket

| size | data |ide

size 表示data的長度,爲4個字節的int類型 data則爲請求的數據,長度爲sizeui

public class NetworkReceive implements Receive {
     // channel的id,表示這個請求是屬於哪一個channel
    private final String source;
    // 只有4個字節,讀取請求的size
    private final ByteBuffer size;
    // 請求數據的最大長度
    private final int maxSize;
    // 請求數據
    private ByteBuffer buffer;

    public NetworkReceive(int maxSize, String source) {
        this.source = source;
        // 這裏只分配4個字節
        this.size = ByteBuffer.allocate(4);
        this.buffer = null;
        this.maxSize = maxSize;
    }

    public long readFrom(ScatteringByteChannel channel) throws IOException {
        return readFromReadableChannel(channel);
    }
    
    
    public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
        int read = 0;
        // 檢查是否已經完成讀取size
        if (size.hasRemaining()) {
            // 讀取數據的前4個字節,表示請求數據的大小
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                // 若是讀取完成
                size.rewind();
                // 獲取請求數據的大小receiveSize
                int receiveSize = size.getInt();
                // 檢查數據大小的合理
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                // 根據receiveSize,分配buffer
                this.buffer = ByteBuffer.allocate(receiveSize);
            }
        }
        // buffer已經分配了,代表size讀取完
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }
    
    // 返回請求數據
    public ByteBuffer payload() {
        return this.buffer;
    }
    
    // 當size和buffer都讀取玩,則返回true
    public boolean complete() {
        return !size.hasRemaining() && !buffer.hasRemaining();
    }

NetworkSend

NetworkSend只是繼承ByteBufferSend,增長了兩個類方法this

public class NetworkSend extends ByteBufferSend {

    public NetworkSend(String destination, ByteBuffer buffer) {
        //爲buffer添加sizeBuffer,而後初始化父類ByteBufferSend
        super(destination, sizeDelimit(buffer));
    }
    
    // 爲buffer添加一個size的sizeBuffer,組成ByteBuffer數組
    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
        return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
    }
    
    // 實例化4個字節的ByteBuffer,使用int初始化
    private static ByteBuffer sizeBuffer(int size) {
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        sizeBuffer.putInt(size);
        sizeBuffer.rewind();
        return sizeBuffer;
    }

}

public class ByteBufferSend implements Send {
    // 發送地址
    private final String destination;
    // 響應數據的總大小
    private final int size;
    protected final ByteBuffer[] buffers;
    // remaining表示buffer中未寫完的數據長度
    private int remaining;
    // 表示是否channel中還有數據未發送
    private boolean pending = false;
    
    public ByteBufferSend(String destination, ByteBuffer... buffers) {
        this.destination = destination;
        this.buffers = buffers;
        // 計算全部buffer的總大小
        for (ByteBuffer buffer : buffers)
            remaining += buffer.remaining();
        this.size = remaining;
    }

    @Override
    public boolean completed() {
         // 數據首先會從buffer中寫入到channel,而後channel再把數據寫入到真實的socket中
        return remaining <= 0 && !pending;
    }

    @Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        // 寫入到channel中
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        // 更新remaining
        remaining -= written;
        // 檢查pending狀態
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    }
}

PlaintextTransportLayer

上面NetworkReceive和NetworkSend調用了TransportLayer的方法, channel.write和channel.read。 TransportLayer是接口,PlaintextTransportLayer是實現TransportLayer的類之一,由於它比較簡單,因此這裏以它爲例。加密

public class PlaintextTransportLayer implements TransportLayer {

    private final SelectionKey key;
    private final SocketChannel socketChannel;

    public PlaintextTransportLayer(SelectionKey key) throws IOException {
        this.key = key;
        this.socketChannel = (SocketChannel) key.channel();
    }
    //調用socketChannel的read方法
    public long read(ByteBuffer[] dsts) throws IOException {
        return socketChannel.read(dsts);
    }
    //調用socketChannel的write方法
    public int write(ByteBuffer src) throws IOException {
        return socketChannel.write(src);
    }
}

歸納

類之間的關係。ChannelBuilders實例化ChannelBuilder,ChannelBuilder實例化TransportLayer和Authenticator, 而後實例化ChannelBuidler。ChannelBuidler而後實例化KafkaChannel,KafkaChannel使用NetworkSend表示發送數據,NetworkReceive表示接收數據。

相關文章
相關標籤/搜索