以前說過,Kafka 本身實現了一個 Selector 來管理最底層的 i/o 操做,selector 以輪詢模式驅動不一樣事件,來通知網絡通道進行讀寫操做。java
而更加高層的 NetworkClient 則 管理多個節點的客戶端請求,來驅動 selector 進行工做。那麼它具體作些什麼呢?node
(1) =>3.3 ClientRequest的監聽器,適配器,與監聽器鏈json
(2) =>2、KafkaSelector的管理api
(3) =>2、KafkaSelector的管理網絡
(4) =>4、ClientRequest的後續處理數據結構
(5) =>4、ClientRequest的後續處理socket
(6) =>4、ClientRequest的後續處理ide
(7) =>4、ClientRequest的後續處理this
(8) =>3.3 ClientRequest的監聽器,適配器,與監聽器鏈.net
/* the state of each node's connection */ private final ClusterConnectionStates connectionStates;
它有一個管理全部節點鏈接狀態的connectionStates,裏面的實現比較簡單,實際上就是維護一個退避時間以及一個管理全部節點鏈接狀態的Map。
退避時間用於避免屢次請求鏈接某個節點。
final class ClusterConnectionStates { private final long reconnectBackoffMs; private final Map<String, NodeConnectionState> nodeState; /** * The state of our connection to a node */ private static class NodeConnectionState { ConnectionState state; long lastConnectAttemptMs; public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { this.state = state; this.lastConnectAttemptMs = lastConnectAttempt; } public String toString() { return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; } }
開啓一個鏈接也比較簡單,實際上就是讓selector去connect,若是失敗則將其標記爲鏈接失敗,成功則標記爲鏈接成功。
private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } }
斷開鏈接則更加暴力,代碼很簡單,這裏就不說了。
@Override public void close(String nodeId) { selector.close(nodeId); for (ClientRequest request : inFlightRequests.clearAll(nodeId)) metadataUpdater.maybeHandleDisconnection(request); connectionStates.remove(nodeId); }
Kafka源碼剖析 —— 網絡I/O篇 —— 淺析KafkaChannel
inFlightRequests:發送中的ClientRequest。
上篇文章Kafka源碼剖析 —— 生產者消息發送流水線上的大致流程說到,Kafka底層消息的發送單元是基於可發送對象ClientRequest的。Kafka全部發送的東西,最終都會包裝成ClientRequest。
/** * A request being sent to the server. This holds both the network send as well as the client-level metadata. */ public final class ClientRequest { private final long createdTimeMs; private final boolean expectResponse; private final RequestSend request; private final RequestCompletionHandler callback; private final boolean isInitiatedByNetworkClient; private long sendTimeMs; }
ClientRequest 比較簡單:
expectResponse表明需不須要ack。 RequestSend能夠是kafka本身的數據結構,有點像json,實現有些複雜,用起來也有些複雜,可是實際上理解起來很簡單,這裏不過多闡述。 RequestCompletionHandler回調,是kafka中比較重要的部分。
RequestCompletionHandler,定義了請求成功後,KafkaClient將調用其complete方法。
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler { @Override public void onComplete(ClientResponse response) { if (response.wasDisconnected()) { ClientRequest request = response.request(); RequestSend send = request.request(); ApiKeys api = ApiKeys.forId(send.header() .apiKey()); int correlation = send.header() .correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, request, correlation, send.destination()); raise(DisconnectException.INSTANCE); } else { complete(response); } } }
僞代碼:在這個ClientRequest中,咱們在回調中簡單的將response打印出來,response.responseBody()將得到一個Struct對象(將其理解成相似json就好了)。
ClientRequest clientRequest = new ClientRequest(System.currentTimeMillis(), true, new RequestSend(null, null, null), new RequestCompletionHandler() { @Override public void onComplete(ClientResponse response) { System.out.println(response.responseBody()); } }); networkClient.doSend(clientRequest,System.currentTimeMillis());
ClientRequest 其實是個很巧妙的設計,仿照Kafka,實現一個簡單的監聽器 + 適配器吧!
咱們從生產者發送一個JoinGroup請求來看看這一系列的流程:
RequestFuture<ByteBuffer> future = client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler());
拿到future之後:
future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } });
看起來十分簡單,但其實是這樣的:
一、首先咱們發送了一個client.send(coordinator, ApiKeys.JOIN_GROUP, request)請求,實際上全部的請求都會使用默認的RequestFutureCompletionHandler。
也就是成功後,會調用Future<ClientResponse>的complete方法。
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler { @Override public void onComplete(ClientResponse response) { if (response.wasDisconnected()) { ClientRequest request = response.request(); RequestSend send = request.request(); ApiKeys api = ApiKeys.forId(send.header() .apiKey()); int correlation = send.header() .correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, request, correlation, send.destination()); raise(DisconnectException.INSTANCE); } else { complete(response); } } }
二、默認的RequestFutureCompletionHandler的complete方法後,會觸發針對這個future一系列的監聽器,也就是會觸發到咱們compose(new JoinGroupResponseHandler())的這個onSuccess方法。
private void fireSuccess() { for (RequestFutureListener<T> listener : listeners) listener.onSuccess(value); }
咱們的compose將Future<ClientResponse>適配成了一個新的Future<ByteBuffer>,咱們將其稱爲JoinGroupFuture,觸發JoinGroupFuture的onSuccess會發生什麼呢?
public void onSuccess(ClientResponse clientResponse, RequestFuture<ByteBuffer> future) { try { this.response = clientResponse; JoinGroupResponse responseObj = parse(clientResponse); handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) { future.raise(e); } } }
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future/* sendJoinGroupRequest#joinGroupFuture */) { Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct()); AbstractCoordinator.this.memberId = joinResponse.memberId(); AbstractCoordinator.this.generation = joinResponse.generationId(); // 正常收到JoinGroupResponse的相應,將rejoin設置爲false AbstractCoordinator.this.rejoinNeeded = false; AbstractCoordinator.this.protocol = joinResponse.groupProtocol(); sensors.joinLatency.record(response.requestLatencyMs()); if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); }
三、以onJoinFollower爲例,onJoinFollower().chain(future);發生了什麼?
又發送了一個新的request,並將其compose成了Future<ByteBuffer> SyncGroupFuture,重要的是!這裏將JoinGroupFuture Chain 到了 SyncGroupFuture中。
chain就是在 SyncGroupFuture Success時,調用JoinGroupFuture的Complete。
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) { if (coordinatorUnknown()) { return RequestFuture.coordinatorNotAvailable(); } return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); }
public void chain(final RequestFuture<T> future) { addListener(new RequestFutureListener<T>() { @Override public void onSuccess(T value) { future.complete(value); } @Override public void onFailure(RuntimeException e) { future.raise(e); } }); }
也就是說,咱們的JoinGroupFuture會在SyncGroupFuture Success後才Success。
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> { @Override public SyncGroupResponse parse(ClientResponse response) { return new SyncGroupResponse(response.responseBody()); } @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(syncResponse.errorCode()); if (error == Errors.NONE) { log.info("Successfully joined group {} with generation {}", groupId, generation); sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment());
四、最後,當SyncGroupFuture Success後,調用JoinGroupFuture的Success,來到了咱們最開始的
future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } });
也就是說,一個JoinGroupRequest經歷了兩個請求,只有在兩個請求都成功之後,纔會觸發JoinGroupFuture的Listener
future.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance onJoinComplete(generation, memberId, protocol, value); needsJoinPrepare = true; heartbeatTask.reset(); } @Override public void onFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin } });
完成這一系列的控制,都要歸功於Kafka 監聽器+適配器的靈活使用與組合,讓咱們看看這兩個神奇的方法:
/** * Convert from a request future of one type to another type * * @param adapter The adapter which does the conversion * @param <S> The type of the future adapted to * * @return The new future */ public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { final RequestFuture<S> adapted = new RequestFuture<S>(); addListener(new RequestFutureListener<T>() { // 實際上這裏就是讓原來的 future 在succeed 時,會調用 新future 的 onSuccess 方法 @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); // 返回的這個新的 future 對象 return adapted; } public void chain(final RequestFuture<T> future) { addListener(new RequestFutureListener<T>() { @Override public void onSuccess(T value) { future.complete(value); } @Override public void onFailure(RuntimeException e) { future.raise(e); } }); }
inFlightRequests能夠簡單理解爲一個Map,Key爲node,value則是Deque<ClientRequest>,每當咱們發送一個請求,就把ClientRequest扔進去,
this.inFlightRequests.add(request);
在發送完成後,不須要ack的,發送完成就直接結束了
private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it // 這個 completedSends 是能保證消息必定發送出去了的 for (Send send : this.selector.completedSends()) { ClientRequest request = this.inFlightRequests.lastSent(send.destination()); // 判斷一下這個request需不須要ack,若是不須要ack,添加到返回列表中 if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); responses.add(new ClientResponse(request, now, false, null)); } } }
而須要ack的,則將結果封裝起來,扔到ClientResponse裏
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); // Receives,payload 是拿到 接收到的buffer的引用 Struct body = parseResponse(receive.payload(), req.request() .header()); if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) { responses.add(new ClientResponse(req, now, false, body)); } } }
inFlightRequests 還可用於判斷節點的負載情況,控制總請求數等等。
咱們在發送消息時,會將消息扔進inFlightRequests裏面。而進行一次輪詢後,KafkaSelector會將各類消息對應地扔進發送完成的列表completedSends,接收完成的列表completedReceives,斷開鏈接的列表disconnected之類的。
在一次輪詢後,NetworkClient會根據上面列表的不一樣採起不一樣的處理。另外,還會額外處理inFlightRequests中超時的請求。
獲取超時的請求實際上十分簡單,就是將全部node的最先的一個request拿出來,看看是否是超時了,若是超時了,NetworkClient會斷開它的鏈接。
public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) { List<String> nodeIds = new LinkedList<String>(); for (String nodeId : requests.keySet()) { if (inFlightRequestCount(nodeId) > 0) { ClientRequest request = requests.get(nodeId).peekLast(); long timeSinceSend = now - request.sendTimeMs(); if (timeSinceSend > requestTimeout) { nodeIds.add(nodeId); } } } return nodeIds; }
最後,NetworkClient會分別調用這些ClientRequest的回調,也就是上面 3.3 ClientRequest的監聽器,適配器,與監聽器鏈的觸發條件。
參考書籍: 《Kafka技術內幕》 鄭奇煌著 《Apache Kafka源碼剖析》 徐郡明著