kafka中的回調函數

kafka客戶端中使用了不少的回調方式處理請求。基本思路是將回調函數暫存到ClientRequest中,而ClientRequest會暫存到inFlightRequests中,當返回response的時候,從inFlightRequests中讀取對應的ClientRequest,並調用request中的回調函數完成處理。
inFlightRequests是請求和響應處理的橋樑.html

1. 接口和抽象類

不管是producer仍是consumer,回調函數類都是實現了RequestCompletionHandler接口。java

public interface RequestCompletionHandler {
    public void onComplete(ClientResponse response);
}

consumer的回調函數類不但實現了RequestCompletionHandler,還繼承了RequestFuture。RequestFuture是一個有狀態的類,在調用中會設置響應的狀態,能夠持有RequestFuture的引用,用來判斷請求的狀態。node

public class RequestFuture<T> {

    private boolean isDone = false;
    private T value;
    private RuntimeException exception;
    private List<RequestFutureListener<T>> listeners = new ArrayList<>(); 
    // 省略其餘方法
}

2. producer

producer是在sender線程中建立的ClientRequest,以下:api

private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
        return requests;
}

// 建立request
 private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
        for (RecordBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            produceRecordsByPartition.put(tp, batch.records.buffer());
            recordsByPartition.put(tp, batch);
        }
        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
        RequestSend send = new RequestSend(Integer.toString(destination),
                                           this.client.nextRequestHeader(ApiKeys.PRODUCE),
                                           request.toStruct());

        // 回調函數
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };
        
        // 回調函數保存到request中, 而後request被保存到了inFlightRequests
        return new ClientRequest(now, acks != 0, send, callback);
}

在NetworkClient#poll(..)最後會處理會調用對應的回調函數cookie

public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) { // response中封裝了request中的回調函數
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response); //調用回調函數
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

3. Consumer

consumer使用回調函數和producer使用方式相似,可是比producer複雜一些。前面說了Consumer的回調函數不但實現了RequestCompletionHandler,還繼承了RequestFuture。ide

public static class RequestFutureCompletionHandler
            extends RequestFuture<ClientResponse>
            implements RequestCompletionHandler {

        @Override
        public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header().apiKey());
                int correlation = send.header().correlationId();
                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                        api, request, correlation, send.destination());
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response); // 關鍵, complete方法會設置RequestFuture的狀態
            }
        }
    }
}

public void complete(T value) { // 設置RequestFuture狀態
        if (isDone)
            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
        this.value = value;
        this.isDone = true;
        fireSuccess(); // 循環調用RequestFuture中的listeners
    }

private void fireSuccess() {
        for (RequestFutureListener<T> listener : listeners)
            listener.onSuccess(value);
    }

    private void fireFailure() {
        for (RequestFutureListener<T> listener : listeners)
            listener.onFailure(exception);
    }

與producer相似,請求被放到一個map中,不過名字是unsent。以下ConsumerNetworkClient#send(..):函數

public RequestFuture<ClientResponse> send(Node node,
                                              ApiKeys api,
                                              AbstractRequest request) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler(); // 回調函數
        RequestHeader header = client.nextRequestHeader(api);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        put(node, new ClientRequest(now, true, send, future)); // request方法哦unsent中
        return future; // 並返回回調函數類的引用
    }

在調用ConsumerNetworkClient#send(..)後又緊接着調用了Future#compose(..)。以下:this

private RequestFuture<Void> sendGroupCoordinatorRequest() {
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            return RequestFuture.noBrokersAvailable();
        } else {
            log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
            GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
            return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) // send後返回FutureRequest,而後又調用compose方法
                    .compose(new RequestFutureAdapter<ClientResponse, Void>() {
                        @Override
                        public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
                            handleGroupMetadataResponse(response, future);
                        }
                    });
        }
    }

Future#compose(..)方法又兩個做用線程

  1. 添加FutureRequest的listeners
  2. 返回一個新的FutureRequest,用新FutureRequest來判斷狀態
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        final RequestFuture<S> adapted = new RequestFuture<S>(); // 返回新的RequestFuture
        addListener(new RequestFutureListener<T>() { // 添加到原先FutureRequest中的listeners中
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted); // 返回response後會調用listeners,從而會設置新的RequestFuture狀態,咱們就能夠根據這個新的RequestFuture來判斷response處理狀態。
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });
        return adapted;
    }

因此將ClientRequest放到map中後,最終咱們持有的是compose中新建的FutureRequest,如AbstractCoordinator#ensureCoordinatorReady(..):debug

public void ensureCoordinatorReady() {
        while (coordinatorUnknown()) {
            RequestFuture<Void> future = sendGroupCoordinatorRequest();// 最終返回compose返回的future。
            client.poll(future); // 在poll中不停的輪訓future的狀態

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                coordinatorDead();
                time.sleep(retryBackoffMs);
            }

        }
    }

public void poll(RequestFuture<?> future) {
        while (!future.isDone()) // 輪訓future狀態,當response作相應處理會調用回調函數,從而設置future相應狀態。
            poll(Long.MAX_VALUE);
    }

總結

kafka客戶端中使用了大量的回調函數作請求的處理,理解回調函數很重要,附回調函數連接:
http://www.cnblogs.com/set-cookie/p/8996951.html

相關文章
相關標籤/搜索