Selector是對java nio中Selector的封裝,額外提供了對kafka請求或者響應的處理方法java
public interface Selectable { public void close(String id); // 發送數據 public void send(Send send); // 等待timeout時間,而後處理就緒的鏈接 public void poll(long timeout) throws IOException; // 返回已經成功發送的數據 public List<Send> completedSends(); // 返回已經成功接收的數據 public List<NetworkReceive> completedReceives(); // 返回關閉的鏈接,ChannelState表示鏈接關閉前最後的狀態,在KafkaChannel會有介紹 public Map<String, ChannelState> disconnected(); // 返回已經成功的鏈接 public List<String> connected(); // 取消監聽讀事件 public void mute(String id); // 開啓監聽讀事件 public void unmute(String id); // 鏈接是否已經就緒(好比ssl握手過程是否完成) public boolean isChannelReady(String id); }
public class Selector implements Selectable, AutoCloseable { // nioSelector負責鏈接的事件監聽器 private final java.nio.channels.Selector nioSelector; // channels集合 private final Map<String, KafkaChannel> channels; public void register(String id, SocketChannel socketChannel) throws ClosedChannelException { // 監聽鏈接的讀事件 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ); // 構造KafkaChannel,而且綁定到該鏈接。KafkaChannel負責請求的解析和響應的發送 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); key.attach(channel); // 更新channels this.channels.put(id, channel); } }
public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); // 清除 clear(); // hasStagedReceives返回是否還有receive處理。若是有,則設置timeout爲0,表示不等待。 if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; // 等待socket事件就緒 int readyKeys = select(timeout); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { // 處理就緒的鏈接 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); // 處理immediatelyConnectedKeys的鏈接 pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } // 更新請求 addToCompletedReceives(); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }
public class Selector implements Selectable, AutoCloseable { // 完成發送的列表 private final List<Send> completedSends; // 已經成功接收完的請求列表 private final List<NetworkReceive> completedReceives; // 關閉的鏈接列表 private final Map<String, ChannelState> disconnected; // 已經創建的鏈接列表 private final List<String> connected; // 準備關閉的鏈接列表,當這個鏈接的請求處理完後,就會移除 private final Map<String, KafkaChannel> closingChannels; // 發送send失敗的channel列表。用來更新disconnected的狀態 private final List<String> failedSends; private void clear() { this.completedSends.clear(); this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); // 遍歷準備關閉的鏈接,當異常關閉的鏈接,其請求已所有讀取完,就關閉 for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) { KafkaChannel channel =; Deque<NetworkReceive> deque = this.stagedReceives.get(channel); // 是否發送send失敗 boolean sendFailed = failedSends.remove(; if (deque == null || deque.isEmpty() || sendFailed) { // 關閉鏈接,而且從closingChannels刪除 doClose(channel, true); it.remove(); } } // 遍歷發送失敗的鏈接 for (String channel : this.failedSends) { KafkaChannel failedChannel = closingChannels.get(channel); if (failedChannel != null) // 更新channel的state failedChannel.state(ChannelState.FAILED_SEND); // 添加到disconnected列表裏 this.disconnected.put(channel, ChannelState.FAILED_SEND); } this.failedSends.clear(); } }
public class Selector implements Selectable, AutoCloseable { private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { // 遍歷Keys Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key =; iterator.remove(); // 獲取與key綁定的KafkaChannel KafkaChannel channel = channel(key); try { // connect事件 if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { // 必須等到KafkaChannel完成鏈接,才加入到connected列表 this.connected.add(; } else continue; } // 若是channel尚未ready if (channel.isConnected() && !channel.ready()) channel.prepare(); //讀事件 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { // channel已經準備好,而且尚未沒有讀取完的Receive NetworkReceive networkReceive; while ((networkReceive = != null) // channel.read若是返回null,表示一條請求的數據還沒接收完整,須要繼續監聽讀事件 // 加入到未處理的Receive隊列 addToStagedReceives(channel, networkReceive); } // 寫事件 if (channel.ready() && key.isWritable()) { // channel.write若是發送完,將會send。如沒有,則會返回null Send send = channel.write(); if (send != null) { // 更新completedSends列表 this.completedSends.add(send); } } if (!key.isValid()) //若是鏈接失效,則關閉鏈接 close(channel, true); } catch (Exception e) { // 關閉鏈接 close(channel, true); } finally { ....... } } }
private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { // 遍歷stagedReceives map, Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry =; KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { // 若是channel還在監聽讀事件,則將Receive添加到completedReceives隊列 Deque<NetworkReceive> deque = entry.getValue(); addToCompletedReceives(channel, deque); if (deque.isEmpty()) iter.remove(); } } } } // 從stagedDeque取出一個Receive,添加到completedReceives裏面 private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) { NetworkReceive networkReceive = stagedDeque.poll(); this.completedReceives.add(networkReceive); }
private void close(KafkaChannel channel, boolean processOutstanding) { // 關閉鏈接 channel.disconnect(); // 獲取channel有哪些未處理的Receive Deque<NetworkReceive> deque = this.stagedReceives.get(channel); if (processOutstanding && deque != null && !deque.isEmpty()) { // processOutstanding表示是否須要處理未完成的請求 if (!channel.isMute()) { // 若是鏈接還在監聽讀事件 addToCompletedReceives(channel, deque); if (deque.isEmpty()) this.stagedReceives.remove(channel); } // 添加到closingChannels列表,表示即將關閉 closingChannels.put(, channel); } else // 不然關閉channel doClose(channel, processOutstanding); this.channels.remove(; }
send 方法事件
public void send(Send send) { String connectionId = send.destination(); if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); else { KafkaChannel channel = channelOrFail(connectionId, false); try { // 調用channel setSend方法 channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(connectionId); close(channel, false); } } }
Processor會從Selector獲取 請求列表completedReceives,發送列表completedSends, disconnected 鏈接關閉列表.