歡迎你們關注 github.com/hsfxuebao/j… ,但願對你們有所幫助,要是以爲能夠的話麻煩給點一下Star哈java
在上一篇文章中,已經介紹了 Producer 的發送模型,Producer dosend()
方法中的第一步,就是獲取相關的 topic 的 metadata,但在上篇中並無深刻展開,由於這部分的內容比較多,因此本文單獨一篇文章進行介紹,本文主要來說述如下三個問題:node
Metadata 信息的內容能夠經過源碼看明白:git
// 這個類被 client 線程和後臺 sender 所共享,它只保存了全部 topic 的部分數據,當咱們請求一個它上面沒有的 topic meta 時,它會經過發送 metadata update 來更新 meta 信息,
// 若是 topic meta 過時策略是容許的,那麼任何 topic 過時的話都會被從集合中移除,
// 可是 consumer 是不容許 topic 過時的由於它明確地知道它須要管理哪些 topic
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private final long refreshBackoffMs; // metadata 更新失敗時,爲避免頻繁更新 meta,最小的間隔時間,默認 100ms
private final long metadataExpireMs; // metadata 的過時時間, 默認 60,000ms
private int version; // 每更新成功1次,version自增1,主要是用於判斷 metadata 是否更新
private long lastRefreshMs; // 最近一次更新時的時間(包含更新失敗的狀況)
private long lastSuccessfulRefreshMs; // 最近一次成功更新的時間(若是每次都成功的話,與前面的值相等, 不然,lastSuccessulRefreshMs < lastRefreshMs)
private Cluster cluster; // 集羣中一些 topic 的信息
private boolean needUpdate; // 是都須要更新 metadata
/* Topics with expiry time */
private final Map<String, Long> topics; // topic 與其過時時間的對應關係
private final List<Listener> listeners; // 事件監控者
private final ClusterResourceListeners clusterResourceListeners; //當接收到 metadata 更新時, ClusterResourceListeners的列表
private boolean needMetadataForAllTopics; // 是否強制更新全部的 metadata
private final boolean topicExpiryEnabled; // 默認爲 true, Producer 會定時移除過時的 topic,consumer 則不會移除
}
複製代碼
關於 topic 的詳細信息(leader 所在節點、replica 所在節點、isr 列表)都是在 Cluster
實例中保存的。github
// 並非一個全集,metadata的主要組成部分
public final class Cluster {
// 從命名直接就看出了各個變量的用途
private final boolean isBootstrapConfigured;
private final List<Node> nodes; // node 列表
private final Set<String> unauthorizedTopics; // 未認證的 topic 列表
private final Set<String> internalTopics; // 內置的 topic 列表
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // partition 的詳細信息
private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic 與 partition 的對應關係
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; // 可用(leader 不爲 null)的 topic 與 partition 的對應關係
private final Map<Integer, List<PartitionInfo>> partitionsByNode; // node 與 partition 的對應關係
private final Map<Integer, Node> nodesById; // node 與 id 的對應關係
private final ClusterResource clusterResource;
}
// org.apache.kafka.common.PartitionInfo
// topic-partition: 包含 topic、partition、leader、replicas、isr
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
}
複製代碼
Cluster
實例主要是保存:apache
node
的對應關係;PartitionInfo
)的對應關係;node
與 partition (PartitionInfo
)的對應關係。Producer 在調用 dosend()
方法時,第一步就是經過 waitOnMetadata
方法獲取該 topic 的 metadata 信息.api
// 等待 metadata 的更新
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
metadata.add(topic);// 在 metadata 中添加 topic 後,若是 metadata 中沒有這個 topic 的 meta,那麼 metadata 的更新標誌設置爲了 true
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);// 若是 topic 已經存在 meta 中,則返回該 topic 的 partition 數,不然返回 null
// 當前 metadata 中若是已經有這個 topic 的 meta 的話,就直接返回
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// 發送 metadata 請求,直到獲取了這個 topic 的 metadata 或者請求超時
do {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();// 返回當前版本號,初始值爲0,每次更新時會自增,並將 needUpdate 設置爲 true
sender.wakeup();// 喚起 sender,發送 metadata 請求
try {
metadata.awaitUpdate(version, remainingWaitMs);// 等待 metadata 的更新
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 超時
if (cluster.unauthorizedTopics().contains(topic))// 認證失敗,對當前 topic 沒有 Write 權限
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);// 不停循環,直到 partitionsCount 不爲 null(即直到 metadata 中已經包含了這個 topic 的相關信息)
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
複製代碼
若是 metadata 中不存在這個 topic 的 metadata,那麼就請求更新 metadata,若是 metadata 沒有更新的話,方法就一直處在 do ... while
的循環之中,在循環之中,主要作如下操做:markdown
metadata.requestUpdate()
將 metadata 的 needUpdate
變量設置爲 true(強制更新),並返回當前的版本號(version),經過版本號來判斷 metadata 是否完成更新;數據結構
sender.wakeup()
喚醒 sender 線程,sender 線程又會去喚醒 NetworkClient
線程,NetworkClient
線程進行一些實際的操做(後面詳細介紹);app
metadata.awaitUpdate(version, remainingWaitMs)
等待 metadata 的更新。oop
// 更新 metadata 信息(根據當前 version 值來判斷) public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) {// 不斷循環,直到 metadata 更新成功,version 自增 if (remainingWaitMs != 0) wait(remainingWaitMs);// 阻塞線程,等待 metadata 的更新 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs)// timeout throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }
在 Metadata.awaitUpdate()
方法中,線程會阻塞在 while
循環中,直到 metadata 更新成功或者 timeout。
從前面能夠看出,此時 Producer 線程會阻塞在兩個 while
循環中,直到 metadata 信息更新,那麼 metadata 是如何更新的呢?若是有印象的話,前面應該已經介紹過了,主要是經過 sender.wakeup()
來喚醒 sender 線程,間接喚醒 NetworkClient 線程,NetworkClient 線程來負責發送 Metadata 請求,並處理 Server 端的響應。
在 Kafka 源碼分析之 Producer 發送模型(一) 中介紹 Producer 發送模型時,在第五步 sender
線程會調用 NetworkClient.poll()
方法進行實際的操做,其源碼以下:
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);// 判斷是否須要更新 meta,若是須要就更新(請求更新 metadata 的地方)
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
handleCompletedSends(responses, updatedNow);// 經過 selector 中獲取 Server 端的 response
handleCompletedReceives(responses, updatedNow);// 在返回的 handler 中,會處理 metadata 的更新
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
return responses;
}
複製代碼
在這個方法中,主要會如下操做:
metadataUpdater.maybeUpdate(now)
:判斷是否須要更新 Metadata,若是須要更新的話,先與 Broker 創建鏈接,而後發送更新 metadata 的請求;handleCompletedReceives(responses, updatedNow)
方法,它會處理 Server 端返回的 Metadata 結果。先看一下 metadataUpdater.maybeUpdate()
的具體實現:
public long maybeUpdate(long now) {
// should we update our metadata?
// metadata 是否應該更新
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);// metadata 下次更新的時間(須要判斷是強制更新仍是 metadata 過時更新,前者是立馬更新,後者是計算 metadata 的過時時間)
// 若是一條 metadata 的 fetch 請求還未從 server 收到恢復,那麼時間設置爲 waitForMetadataFetch(默認30s)
long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {// 時間未到時,直接返回下次應該更新的時間
return metadataTimeout;
}
Node node = leastLoadedNode(now);// 選擇一個鏈接數最小的節點
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node); // 能夠發送 metadata 請求的話,就發送 metadata 請求
}
/**
* Add a metadata request to the list of sends if we can make one
*/
// 判斷是否能夠發送請求,能夠的話將 metadata 請求加入到發送列表中
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {// 通道已經 ready 而且支持發送更多的請求
this.metadataFetchInProgress = true; // 準備開始發送數據,將 metadataFetchInProgress 置爲 true
MetadataRequest.Builder metadataRequest; // 建立 metadata 請求
if (metadata.needMetadataForAllTopics())// 強制更新全部 topic 的 metadata(雖然默認不會更新全部 topic 的 metadata 信息,可是每一個 Broker 會保存全部 topic 的 meta 信息)
metadataRequest = MetadataRequest.Builder.allTopics();
else // 只更新 metadata 中的 topics 列表(列表中的 topics 由 metadata.add() 獲得)
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);/ 發送 metadata 請求
return requestTimeoutMs;
}
// If there's any connection establishment underway, wait until it completes. This prevents
// the client from unnecessarily connecting to additional nodes while a previous connection
// attempt has not been completed.
if (isAnyNodeConnecting()) {// 若是 client 正在與任何一個 node 的鏈接狀態是 connecting,那麼就進行等待
// Strictly the timeout we should return here is "connect timeout", but as we don't
// have such application level configuration, using reconnect backoff instead.
return reconnectBackoffMs;
}
if (connectionStates.canConnect(nodeConnectionId, now)) {// 若是沒有鏈接這個 node,那就初始化鏈接
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);// 初始化鏈接
return reconnectBackoffMs;
}
return Long.MAX_VALUE;
}
// 發送 Metadata 請求
private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);// 建立 metadata 請求
doSend(clientRequest, true, now);
}
複製代碼
因此,每次 Producer 請求更新 metadata 時,會有如下幾種狀況:
而 KafkaProducer 線程以前是一直阻塞在兩個 while
循環中,直到 metadata 更新
poll()
方法時,初始化與 node 的鏈接;poll()
方法時,發送 Metadata
請求;poll()
方法時,獲取 metadataResponse
,並更新 metadata。通過上述 sender 線程三次調用 poll()
方法,所請求的 metadata 信息纔會獲得更新,此時 Producer 線程也不會再阻塞,開始發送消息。
NetworkClient
接收到 Server 端對 Metadata 請求的響應後,更新 Metadata 信息。
// 處理任何已經完成的接收響應
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
AbstractResponse body = parseResponse(receive.payload(), req.header);
log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
if (req.isInternalRequest && body instanceof MetadataResponse)// 若是是 meta 響應
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); // 若是是其餘響應
else
responses.add(req.completed(body, now));
}
}
// 處理 Server 端對 Metadata 請求處理後的 response
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);// 更新 meta 信息
} else {// 若是 metadata 中 node 信息無效,則不更新 metadata 信息
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
}
}
複製代碼
Producer Metadata 的更新策略
Metadata 會在下面兩種狀況下進行更新
lastRefreshMs
, lastSuccessfulRefreshMs
這2個字段來實現;Metadata.requestUpdate()
將 needUpdate
置成了 true 來強制更新。在 NetworkClient 的 poll()
方法調用時,就會去檢查這兩種更新機制,只要達到其中一種,就行觸發更新操做。
Metadata 的強制更新會在如下幾種狀況下進行:
initConnect
方法調用時,初始化鏈接;poll()
方法中對 handleDisconnections()
方法調用來處理鏈接斷開的狀況,這時會觸發強制更新;poll()
方法中對 handleTimedOutRequests()
來處理請求超時時;handleProduceResponse
),若是返回關於 Metadata 過時的異常,好比:沒有 topic-partition 的相關 meta 或者 client 沒有權限獲取其 metadata。強制更新主要是用於處理各類異常狀況。
轉自:Kafka 源碼分析系列