AdminClient 類提供了建立、刪除 topic 的 api。apache
在項目中建立了一個 AdminClient 對象,每次建立 topic 時,調用api
org.apache.kafka.clients.admin.AdminClient#createTopics
若是長時間不使用這個對象,客戶端與 broker 之間的鏈接會被關掉,相關的參數:this
connections.max.idle.ms
這個最大空閒參數在 broker 和 客戶端均可以配置,即 broker 和客戶端都會關閉空閒過久的鏈接。spa
org.apache.kafka.common.network.Selector#maybeCloseOldestConnection.net
private void maybeCloseOldestConnection(long currentTimeNanos) { if (idleExpiryManager == null) return; Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos); if (expiredConnection != null) { String connectionId = expiredConnection.getKey(); KafkaChannel channel = this.channels.get(connectionId); if (channel != null) { if (log.isTraceEnabled()) log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); close(channel, CloseMode.GRACEFUL); } } }
org.apache.kafka.common.network.Selector.IdleExpiryManager#pollExpiredConnectioncode
lruConnections 是 LinkedHashMap 類型,能夠按照插入和訪問順序進行排序,這裏是按訪問順序進行排序,訪問過的順序放到雙向鏈表的結尾。對象
public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) { if (currentTimeNanos <= nextIdleCloseCheckTime) return null; if (lruConnections.isEmpty()) { nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; return null; } Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); Long connectionLastActiveTime = oldestConnectionEntry.getValue(); nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; if (currentTimeNanos > nextIdleCloseCheckTime) return oldestConnectionEntry; else return null; }