Kafka源碼剖析 —— Kafka的 i/o 操做管理者 NetworkClient

以前說過,Kafka 本身實現了一個 Selector 來管理最底層的 i/o 操做,selector 以輪詢模式驅動不一樣事件,來通知網絡通道進行讀寫操做。java

而更加高層的 NetworkClient 則 管理多個節點的客戶端請求,來驅動 selector 進行工做。那麼它具體作些什麼呢?node

(1) =>3.3 ClientRequest的監聽器,適配器,與監聽器鏈json

(2) =>2、KafkaSelector的管理api

(3) =>2、KafkaSelector的管理網絡

(4) =>4、ClientRequest的後續處理數據結構

(5) =>4、ClientRequest的後續處理socket

(6) =>4、ClientRequest的後續處理ide

(7) =>4、ClientRequest的後續處理this

(8) =>3.3 ClientRequest的監聽器,適配器,與監聽器鏈.net

1、節點鏈接狀態管理

/* the state of each node's connection */
    private final ClusterConnectionStates connectionStates;

它有一個管理全部節點鏈接狀態的connectionStates,裏面的實現比較簡單,實際上就是維護一個退避時間以及一個管理全部節點鏈接狀態的Map。

退避時間用於避免屢次請求鏈接某個節點。

final class ClusterConnectionStates {

    private final long reconnectBackoffMs;

    private final Map<String, NodeConnectionState> nodeState;

    /**
     * The state of our connection to a node
     */
    private static class NodeConnectionState {

        ConnectionState state;

        long lastConnectAttemptMs;

        public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
            this.state = state;
            this.lastConnectAttemptMs = lastConnectAttempt;
        }

        public String toString() {
            return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
        }
    }

開啓一個鏈接也比較簡單,實際上就是讓selector去connect,若是失敗則將其標記爲鏈接失敗,成功則標記爲鏈接成功。

private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            this.connectionStates.connecting(nodeConnectionId, now);
            selector.connect(nodeConnectionId,
                new InetSocketAddress(node.host(), node.port()),
                this.socketSendBuffer,
                this.socketReceiveBuffer);
        } catch (IOException e) {
            /* attempt failed, we'll try again after the backoff */
            connectionStates.disconnected(nodeConnectionId, now);
            /* maybe the problem is our metadata, update it */
            metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }

斷開鏈接則更加暴力,代碼很簡單,這裏就不說了。

@Override
    public void close(String nodeId) {
        selector.close(nodeId);
        for (ClientRequest request : inFlightRequests.clearAll(nodeId))
            metadataUpdater.maybeHandleDisconnection(request);
        connectionStates.remove(nodeId);
    }

2、KafkaSelector的管理

Kafka源碼剖析 —— 網絡I/O篇 —— 淺析KafkaChannel

3、inFlightRequests的管理、ClientRequest的分析與元數據的更新

inFlightRequests:發送中的ClientRequest。

3.1 簡單分析ClientRequest

上篇文章Kafka源碼剖析 —— 生產者消息發送流水線上的大致流程說到,Kafka底層消息的發送單元是基於可發送對象ClientRequest的。Kafka全部發送的東西,最終都會包裝成ClientRequest

/**
 * A request being sent to the server. This holds both the network send as well as the client-level metadata.
 */
public final class ClientRequest {

    private final long createdTimeMs;
    private final boolean expectResponse;
    private final RequestSend request;
    private final RequestCompletionHandler callback;
    private final boolean isInitiatedByNetworkClient;
    private long sendTimeMs;

}

ClientRequest 比較簡單:

expectResponse表明需不須要ack。

RequestSend能夠是kafka本身的數據結構,有點像json,實現有些複雜,用起來也有些複雜,可是實際上理解起來很簡單,這裏不過多闡述。

RequestCompletionHandler回調,是kafka中比較重要的部分。

RequestCompletionHandler,定義了請求成功後,KafkaClient將調用其complete方法。

public static class RequestFutureCompletionHandler
        extends RequestFuture<ClientResponse>
        implements RequestCompletionHandler {

        @Override
        public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header()
                                                .apiKey());
                int correlation = send.header()
                                      .correlationId();
                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                    api, request, correlation, send.destination());
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response);
            }
        }
    }

3.2 如何新建一個ClientRequest

僞代碼:在這個ClientRequest中,咱們在回調中簡單的將response打印出來,response.responseBody()將得到一個Struct對象(將其理解成相似json就好了)。

ClientRequest clientRequest = new ClientRequest(System.currentTimeMillis(),
            true,
            new RequestSend(null, null, null),
            new RequestCompletionHandler() {

                @Override
                public void onComplete(ClientResponse response) {
                    System.out.println(response.responseBody());
                }
            });
        networkClient.doSend(clientRequest,System.currentTimeMillis());

3.3 ClientRequest的監聽器,適配器,與監聽器鏈

ClientRequest 其實是個很巧妙的設計,仿照Kafka,實現一個簡單的監聽器 + 適配器吧!

咱們從生產者發送一個JoinGroup請求來看看這一系列的流程:

RequestFuture<ByteBuffer> future = client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                     .compose(new JoinGroupResponseHandler());

拿到future之後:

future.addListener(new RequestFutureListener<ByteBuffer>() {

                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset();
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                }
            });

看起來十分簡單,但其實是這樣的:

一、首先咱們發送了一個client.send(coordinator, ApiKeys.JOIN_GROUP, request)請求,實際上全部的請求都會使用默認的RequestFutureCompletionHandler。

也就是成功後,會調用Future<ClientResponse>的complete方法。

public static class RequestFutureCompletionHandler
        extends RequestFuture<ClientResponse>
        implements RequestCompletionHandler {

        @Override
        public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header()
                                                .apiKey());
                int correlation = send.header()
                                      .correlationId();
                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                    api, request, correlation, send.destination());
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response);
            }
        }
    }

二、默認的RequestFutureCompletionHandler的complete方法後,會觸發針對這個future一系列的監聽器,也就是會觸發到咱們compose(new JoinGroupResponseHandler())的這個onSuccess方法。

private void fireSuccess() {
        for (RequestFutureListener<T> listener : listeners)
            listener.onSuccess(value);
    }

咱們的compose將Future<ClientResponse>適配成了一個新的Future<ByteBuffer>,咱們將其稱爲JoinGroupFuture,觸發JoinGroupFuture的onSuccess會發生什麼呢?

public void onSuccess(ClientResponse clientResponse, RequestFuture<ByteBuffer> future) {
            try {
                this.response = clientResponse;
                JoinGroupResponse responseObj = parse(clientResponse);
                handle(responseObj, future);
            } catch (RuntimeException e) {
                if (!future.isDone()) {
                    future.raise(e);
                }
            }
        }
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future/* sendJoinGroupRequest#joinGroupFuture */) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();

                // 正常收到JoinGroupResponse的相應,將rejoin設置爲false
                AbstractCoordinator.this.rejoinNeeded = false;
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                sensors.joinLatency.record(response.requestLatencyMs());
                if (joinResponse.isLeader()) {
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }

三、以onJoinFollower爲例,onJoinFollower().chain(future);發生了什麼?

又發送了一個新的request,並將其compose成了Future<ByteBuffer> SyncGroupFuture,重要的是!這裏將JoinGroupFuture Chain 到了 SyncGroupFuture中。

chain就是在 SyncGroupFuture Success時,調用JoinGroupFuture的Complete。

private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
        if (coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
                     .compose(new SyncGroupResponseHandler());
    }
public void chain(final RequestFuture<T> future) {
        addListener(new RequestFutureListener<T>() {

            @Override
            public void onSuccess(T value) {
                future.complete(value);
            }

            @Override
            public void onFailure(RuntimeException e) {
                future.raise(e);
            }
        });
    }

也就是說,咱們的JoinGroupFuture會在SyncGroupFuture Success後才Success。

private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {

        @Override
        public SyncGroupResponse parse(ClientResponse response) {
            return new SyncGroupResponse(response.responseBody());
        }

        @Override
        public void handle(SyncGroupResponse syncResponse,
            RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(syncResponse.errorCode());
            if (error == Errors.NONE) {
                log.info("Successfully joined group {} with generation {}", groupId, generation);
                sensors.syncLatency.record(response.requestLatencyMs());
                future.complete(syncResponse.memberAssignment());

四、最後,當SyncGroupFuture Success後,調用JoinGroupFuture的Success,來到了咱們最開始的

future.addListener(new RequestFutureListener<ByteBuffer>() {

                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset();
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                }
            });

也就是說,一個JoinGroupRequest經歷了兩個請求,只有在兩個請求都成功之後,纔會觸發JoinGroupFuture的Listener

future.addListener(new RequestFutureListener<ByteBuffer>() {

                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset();
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                }
            });

完成這一系列的控制,都要歸功於Kafka 監聽器+適配器的靈活使用與組合,讓咱們看看這兩個神奇的方法:

/**
     * Convert from a request future of one type to another type
     *
     * @param adapter The adapter which does the conversion
     * @param <S> The type of the future adapted to
     *
     * @return The new future
     */
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        final RequestFuture<S> adapted = new RequestFuture<S>();
        addListener(new RequestFutureListener<T>() {

            // 實際上這裏就是讓原來的 future 在succeed 時,會調用 新future 的 onSuccess 方法
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted);
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });

        // 返回的這個新的 future 對象
        return adapted;
    }
	
    public void chain(final RequestFuture<T> future) {
        addListener(new RequestFutureListener<T>() {

            @Override
            public void onSuccess(T value) {
                future.complete(value);
            }

            @Override
            public void onFailure(RuntimeException e) {
                future.raise(e);
            }
        });
    }

3.4 inFlightRequests

inFlightRequests能夠簡單理解爲一個Map,Key爲node,value則是Deque<ClientRequest>,每當咱們發送一個請求,就把ClientRequest扔進去,

this.inFlightRequests.add(request);

在發送完成後,不須要ack的,發送完成就直接結束了

private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // if no response is expected then when the send is completed, return it
        // 這個 completedSends 是能保證消息必定發送出去了的
        for (Send send : this.selector.completedSends()) {
            ClientRequest request = this.inFlightRequests.lastSent(send.destination());

            // 判斷一下這個request需不須要ack,若是不須要ack,添加到返回列表中
            if (!request.expectResponse()) {
                this.inFlightRequests.completeLastSent(send.destination());
                responses.add(new ClientResponse(request, now, false, null));
            }
        }
    }

而須要ack的,則將結果封裝起來,扔到ClientResponse裏

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {

            String source = receive.source();
            ClientRequest req = inFlightRequests.completeNext(source);

            // Receives,payload 是拿到 接收到的buffer的引用
            Struct body = parseResponse(receive.payload(), req.request()
                                                              .header());
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) {
                responses.add(new ClientResponse(req, now, false, body));
            }
        }
    }

inFlightRequests 還可用於判斷節點的負載情況,控制總請求數等等。

4、ClientRequest的後續處理

咱們在發送消息時,會將消息扔進inFlightRequests裏面。而進行一次輪詢後,KafkaSelector會將各類消息對應地扔進發送完成的列表completedSends,接收完成的列表completedReceives,斷開鏈接的列表disconnected之類的。

在一次輪詢後,NetworkClient會根據上面列表的不一樣採起不一樣的處理。另外,還會額外處理inFlightRequests中超時的請求。

獲取超時的請求實際上十分簡單,就是將全部node的最先的一個request拿出來,看看是否是超時了,若是超時了,NetworkClient會斷開它的鏈接。

public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
        List<String> nodeIds = new LinkedList<String>();
        for (String nodeId : requests.keySet()) {
            if (inFlightRequestCount(nodeId) > 0) {
                ClientRequest request = requests.get(nodeId).peekLast();
                long timeSinceSend = now - request.sendTimeMs();
                if (timeSinceSend > requestTimeout) {
                    nodeIds.add(nodeId);
                }
            }
        }

        return nodeIds;
    }

最後,NetworkClient會分別調用這些ClientRequest的回調,也就是上面 3.3 ClientRequest的監聽器,適配器,與監聽器鏈的觸發條件。


參考書籍: 《Kafka技術內幕》 鄭奇煌著 《Apache Kafka源碼剖析》 徐郡明著

相關文章
相關標籤/搜索