##NioSelector和KafkaSelector有什麼區別?java
先說結論,KafkaSelector(org.apache.kafka.common.network.selector)是對NioSelector(java.nio.channels.Selector)的進一步封裝。回想一下NioSelector,它參與了IO中的哪些過程?node
一、建立一個通道,並將通道註冊到NioSelector上,咱們能夠獲得一個SelectionKey 二、輪詢NioSelector中的ready集合,拿到對應的SelectionKey,並根據這個SelectionKey所關注的事件去執行對應的操做apache
實際上,KafkaSelector也是在調用NioSelector去執行這些操做,待補充……網絡
##1、建立鏈接socket
KafkaSelector建立鏈接,和普通的NioSelector並無什麼不一樣,首先建立一個通道,並將其設置爲非阻塞式的長鏈接,設置完畢後,執行鏈接操做。ui
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false);// 非阻塞模式 Socket socket = socketChannel.socket(); socket.setKeepAlive(true);// 設置爲長鏈接 if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) { socket.setSendBufferSize(sendBufferSize);// 設置SO_SNDBUF 大小 } if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) { socket.setReceiveBufferSize(receiveBufferSize);// 設置 SO_RCVBUF 大小 } socket.setTcpNoDelay(true); boolean connected; try { connected = socketChannel.connect(address);// 由於是非阻塞模式,因此方法可能會在鏈接正式鏈接以前返回 } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; }
建立完通道後,將其註冊到NioSelector上,並關注OP_CONNECT,再以節點Id,SelectionKey來建立KafkaChannel,這裏先不詳細說明KafkaChannel,它是對通道的進一步封裝。在建立完KafkaChannel後,將KafkaChannel與SelectionKey、節點ID作進一步綁定。this
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 將當前這個socketChannel註冊到nioSelector上,並關注OP_CONNECT事件 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel key.attach(channel);// 將channel綁定到key上 this.channels.put(id, channel);// 將 nodeId 和 Channel綁定
這樣有一個好處,首先KafkaChannel中包含了節點ID與SelectionKey,而咱們也能夠根據節點ID來拿到KafkaChannel,一樣能夠根據SelectionKey來拿到KafkaChannel,這就意味着,咱們只要拿到了KafkaChannel、SelectionKey、節點ID中的任意一個,均可以經過這些引用關係拿到彼此,從而進行相關操做。.net
##2、預發送 實際上就是將要發送的ByteBuffer扔進KafkaChannel,此時並未進行IO操做,這裏的Send對象,實際上就是對ByteBuffer的進一步封裝,它主要包含了將要發往的節點ID、ByteBuffer大小、是否發送完畢等信息。咱們這裏根據節點ID,從咱們剛纔的channels中,取出KafkaChannel。code
public void send(Send send) { // 看看send要發的這個nodeId在不在 KafkaChannel channel = channelOrFail(send.destination()); try { // 把數據扔進KafkaChannel中(只能放一個,放多個會報錯),並關注write事件 channel.setSend(send); } catch (CancelledKeyException e) { // 失敗了加一條node_id的失敗記錄 this.failedSends.add(send.destination()); close(channel); } }
##3、進行IO操做 來到了咱們比較熟悉的輪詢環節,從NioSelector中取出全部SelectionKey進行輪詢。對象
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false);// 處理I/O的核心方法 pollSelectionKeys(immediatelyConnectedKeys, true); } private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 建立鏈接時(connect)將kafkaChannel註冊到key上,就是爲了在這裏獲取 KafkaChannel channel = channel(key); ……………………
#####一、判斷一下key 鏈接好了沒有,由於咱們用的是非阻塞鏈接,因此到了輪詢階段,尚未完成鏈接是正常的。
if (isImmediatelyConnected || key.isConnectable()) { // finishConnect方法會先檢測socketChannel是否創建完成,創建後,會取消對OP_CONNECT事件關注,//TODO 並開始關注OP_READ事件 if (channel.finishConnect()) { this.connected.add(channel.id());// 將當前channel id 添加到已鏈接的集合中 this.sensors.connectionCreated.record(); } else { continue;// 表明鏈接未完成,則跳過對此Channel的後續處理 } }
#####二、身份驗證(略過) #####三、判斷KafkaChannel有沒有準備好,有沒有關注OP_READ,能不能讀之類的,並進行讀操做。 這裏有一個判斷,就是判斷當前的KafkaChannel是否是在StagedReceives裏。咱們日後看看,在從網絡上讀取數據時,咱們會將KafkaChannel扔進StagedReceives裏,也就是說,若是這個KafkaChannel已經在StagedReceives裏了,那麼表明它已經在讀數據了。
if (channel.ready() // 鏈接的三次握手完成,而且 todo 權限驗證經過 && key.isReadable() // key是否關注了read事件 && !hasStagedReceive(channel)) {// todo 這個通道不能是正在讀數據的,由於在讀的時候,會把這個channel扔進stagedReceives裏面 NetworkReceive networkReceive; /** * 實際上這裏就是分屢次去一個channel取數據,直到取完,並將其保存在key:channel value:new ArrayDeque<NetworkReceive> 中 */ while ((networkReceive = channel.read()) != null) { addToStagedReceives(channel, networkReceive); } }
#####四、判斷KafkaChannel有沒有準備好,有沒有關注OP_WRITE,並進行寫操做
if (channel.ready() && key.isWritable()) { Send send = channel.write(); // 這裏會將KafkaChannel的send字段發送出去, // 若是未完成發送,或者沒發完,則返回null // 發送成功則返回send對象 if (send != null) { this.completedSends.add(send);// 添加到completedSends集合 this.sensors.recordBytesSent(channel.id(), send.size()); } }
##4、關閉空閒鏈接 在每一次IO操做完畢後,KafkaSelector都會調用一個方法,去關閉掉那些沒怎麼用的鏈接,實際上它就是一個基於時間戳的斷連機制。 KafkaSelector中維護了一個哈希表,
LinkedHashMap<String, Long> lruConnections (new LinkedHashMap<>(16, .75F, true);
在每次進行IO操做時,將Key:節點ID,Value:當前時間戳扔進哈希表裏面,在IO操做進行完畢時,檢查一下,最大的那個節點,它的最後一次IO時間+connectionsMaxIdleNanos(建立KafkaSelector時指定),是否超過了當前的時間。 若是是,這個鏈接就會被關掉。
好比說connectionsMaxIdleNanos被指定成了1分鐘,那麼若是這個有序哈希表的最後一個節點的時間是一分鐘以前,那麼這個節點ID的通道將會被關掉。
private void maybeCloseOldestConnection() { if (currentTimeNanos > nextIdleCloseCheckTime) { if (lruConnections.isEmpty()) { nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; } else { Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet() .iterator() .next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime) { String connectionId = oldestConnectionEntry.getKey(); if (log.isTraceEnabled()) { log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); } disconnected.add(connectionId); close(connectionId); } } } }
參考: 《Apache Kafka 源碼剖析》 - 徐郡明著 Apache Kafka 源碼 0.10.0.1