KafkaChannel負責基於socket的鏈接,認證,數據讀取發送。它包含TransportLayer和Authenticator兩個部分。TransportLayer負責數據交互,Authenticator負責安全驗證。java
ChannelBuilders提供了實例化ChannelBuilder的工廠方法,clientChannelBuilder和serverChannelBuilder數組
public class ChannelBuilders { // 這裏構造爲私有方法,代表這個類只提供類方法 private ChannelBuilders() { } // 實例化客戶端使用的ChannelBuilder public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable) { return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism, saslHandshakeRequestEnable, null); } // 實例化服務端使用的ChannelBuilder public static ChannelBuilder serverChannelBuilder(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, CredentialCache credentialCache) { return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null, true, credentialCache); } private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache) { ....... ChannelBuilder channelBuilder; // 根據Protocol,選擇不一樣的channelBuidler switch (securityProtocol) { case SSL: // 基於ssl requireNonNullMode(mode, securityProtocol); channelBuilder = new SslChannelBuilder(mode); break; case SASL_SSL: case SASL_PLAINTEXT: // 基於sasl requireNonNullMode(mode, securityProtocol); JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs); channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); break; case PLAINTEXT: case TRACE: // 沒有任何加密 channelBuilder = new PlaintextChannelBuilder(); break; default: throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol); } channelBuilder.configure(configs); return channelBuilder; }
ChannelBuidler是接口,實現其接口的有PlaintextChannelBuilder, SaslChannelBuilder,SslChannelBuilder。其中PlaintextChannelBuilder最爲簡單,因此這裏以它爲例。 ChannelBuidler中最主要的方法是buildChannel,它會建立transportLayer和authenticator,來實例化KafkaChannel。安全
public class PlaintextChannelBuilder implements ChannelBuilder { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException { try { // 實例化TransportLayer PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); // 實例化Authenticator Authenticator authenticator = new DefaultAuthenticator(); authenticator.configure(transportLayer, this.principalBuilder, this.configs); // 返回KafkaChannel return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); } catch (Exception e) { log.warn("Failed to create channel due to ", e); throw new KafkaException(e); } } }
先回到Selector的pollSelectionKeys方法,它代表了KafkaChannel方法是什麼時候被調用app
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); KafkaChannel channel = channel(key); if (isImmediatelyConnected || key.isConnectable()) { // 調用channel的finishConnect方法,處理鏈接 if (channel.finishConnect()) { ...... } else continue; } if (channel.isConnected() && !channel.ready()) // 而後調用channel的prepare方法,作準備工做(好比ssl鏈接的握手過程) channel.prepare(); if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // 當channel準備工做完成,調用channel的read方法,讀取請求 NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } } ....... }
KafkaChannel負責鏈接,數據讀取,發送框架
public class KafkaChannel { // 首先完成鏈接 public boolean finishConnect() throws IOException { boolean connected = transportLayer.finishConnect(); if (connected) state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE; return connected; } public boolean isConnected() { return transportLayer.isConnected(); } public void prepare() throws IOException { //而後握手 if (!transportLayer.ready()) transportLayer.handshake(); // 認證 if (transportLayer.ready() && !authenticator.complete()) authenticator.authenticate(); if (ready()) // 若是都完成,更新狀態 state = ChannelState.READY; } public boolean ready() { // 當transportLayer和authenticator都完成,channel才認爲狀態準備好了 return transportLayer.ready() && authenticator.complete(); } // channel的讀取請求 public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } // 讀取請求 receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; } private long receive(NetworkReceive receive) throws IOException { // 調用NetworkReceive的readFrom方法 return receive.readFrom(transportLayer); } // 設置send,可是並不着急發送,等待transportLayer寫事件就緒 public void setSend(Send send) { if (this.send != null) // 只能一次發送一個Send throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); this.send = send; // 監聽寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // 若是沒有發送完,返回null。若是發送完,返回send。而且更新this.send爲null public Send write() throws IOException { Send result = null; // 調用send發送 if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { // 調用Send的writreTo方法 send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
NetworkReceive表示一個請求。數據格式爲socket
| size | data |ide
size 表示data的長度,爲4個字節的int類型 data則爲請求的數據,長度爲sizeui
public class NetworkReceive implements Receive { // channel的id,表示這個請求是屬於哪一個channel private final String source; // 只有4個字節,讀取請求的size private final ByteBuffer size; // 請求數據的最大長度 private final int maxSize; // 請求數據 private ByteBuffer buffer; public NetworkReceive(int maxSize, String source) { this.source = source; // 這裏只分配4個字節 this.size = ByteBuffer.allocate(4); this.buffer = null; this.maxSize = maxSize; } public long readFrom(ScatteringByteChannel channel) throws IOException { return readFromReadableChannel(channel); } public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; // 檢查是否已經完成讀取size if (size.hasRemaining()) { // 讀取數據的前4個字節,表示請求數據的大小 int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; if (!size.hasRemaining()) { // 若是讀取完成 size.rewind(); // 獲取請求數據的大小receiveSize int receiveSize = size.getInt(); // 檢查數據大小的合理 if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 根據receiveSize,分配buffer this.buffer = ByteBuffer.allocate(receiveSize); } } // buffer已經分配了,代表size讀取完 if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; } // 返回請求數據 public ByteBuffer payload() { return this.buffer; } // 當size和buffer都讀取玩,則返回true public boolean complete() { return !size.hasRemaining() && !buffer.hasRemaining(); }
NetworkSend只是繼承ByteBufferSend,增長了兩個類方法this
public class NetworkSend extends ByteBufferSend { public NetworkSend(String destination, ByteBuffer buffer) { //爲buffer添加sizeBuffer,而後初始化父類ByteBufferSend super(destination, sizeDelimit(buffer)); } // 爲buffer添加一個size的sizeBuffer,組成ByteBuffer數組 private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) { return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer}; } // 實例化4個字節的ByteBuffer,使用int初始化 private static ByteBuffer sizeBuffer(int size) { ByteBuffer sizeBuffer = ByteBuffer.allocate(4); sizeBuffer.putInt(size); sizeBuffer.rewind(); return sizeBuffer; } } public class ByteBufferSend implements Send { // 發送地址 private final String destination; // 響應數據的總大小 private final int size; protected final ByteBuffer[] buffers; // remaining表示buffer中未寫完的數據長度 private int remaining; // 表示是否channel中還有數據未發送 private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer... buffers) { this.destination = destination; this.buffers = buffers; // 計算全部buffer的總大小 for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); this.size = remaining; } @Override public boolean completed() { // 數據首先會從buffer中寫入到channel,而後channel再把數據寫入到真實的socket中 return remaining <= 0 && !pending; } @Override public long writeTo(GatheringByteChannel channel) throws IOException { // 寫入到channel中 long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); // 更新remaining remaining -= written; // 檢查pending狀態 pending = TransportLayers.hasPendingWrites(channel); return written; } }
上面NetworkReceive和NetworkSend調用了TransportLayer的方法, channel.write和channel.read。 TransportLayer是接口,PlaintextTransportLayer是實現TransportLayer的類之一,由於它比較簡單,因此這裏以它爲例。加密
public class PlaintextTransportLayer implements TransportLayer { private final SelectionKey key; private final SocketChannel socketChannel; public PlaintextTransportLayer(SelectionKey key) throws IOException { this.key = key; this.socketChannel = (SocketChannel) key.channel(); } //調用socketChannel的read方法 public long read(ByteBuffer[] dsts) throws IOException { return socketChannel.read(dsts); } //調用socketChannel的write方法 public int write(ByteBuffer src) throws IOException { return socketChannel.write(src); } }
類之間的關係。ChannelBuilders實例化ChannelBuilder,ChannelBuilder實例化TransportLayer和Authenticator, 而後實例化ChannelBuidler。ChannelBuidler而後實例化KafkaChannel,KafkaChannel使用NetworkSend表示發送數據,NetworkReceive表示接收數據。