kafka客戶端中使用了不少的回調方式處理請求。基本思路是將回調函數暫存到ClientRequest中,而ClientRequest會暫存到inFlightRequests中,當返回response的時候,從inFlightRequests中讀取對應的ClientRequest,並調用request中的回調函數完成處理。
inFlightRequests是請求和響應處理的橋樑.html
不管是producer仍是consumer,回調函數類都是實現了RequestCompletionHandler接口。java
public interface RequestCompletionHandler { public void onComplete(ClientResponse response); }
consumer的回調函數類不但實現了RequestCompletionHandler,還繼承了RequestFuture。RequestFuture是一個有狀態的類,在調用中會設置響應的狀態,能夠持有RequestFuture的引用,用來判斷請求的狀態。node
public class RequestFuture<T> { private boolean isDone = false; private T value; private RuntimeException exception; private List<RequestFutureListener<T>> listeners = new ArrayList<>(); // 省略其餘方法 }
producer是在sender線程中建立的ClientRequest,以下:api
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size()); for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; } // 建立request private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records.buffer()); recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); // 回調函數 RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; // 回調函數保存到request中, 而後request被保存到了inFlightRequests return new ClientRequest(now, acks != 0, send, callback); }
在NetworkClient#poll(..)最後會處理會調用對應的回調函數cookie
public List<ClientResponse> poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { // response中封裝了request中的回調函數 if (response.request().hasCallback()) { try { response.request().callback().onComplete(response); //調用回調函數 } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } return responses; }
consumer使用回調函數和producer使用方式相似,可是比producer複雜一些。前面說了Consumer的回調函數不但實現了RequestCompletionHandler,還繼承了RequestFuture。ide
public static class RequestFutureCompletionHandler extends RequestFuture<ClientResponse> implements RequestCompletionHandler { @Override public void onComplete(ClientResponse response) { if (response.wasDisconnected()) { ClientRequest request = response.request(); RequestSend send = request.request(); ApiKeys api = ApiKeys.forId(send.header().apiKey()); int correlation = send.header().correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, request, correlation, send.destination()); raise(DisconnectException.INSTANCE); } else { complete(response); // 關鍵, complete方法會設置RequestFuture的狀態 } } } } public void complete(T value) { // 設置RequestFuture狀態 if (isDone) throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); this.value = value; this.isDone = true; fireSuccess(); // 循環調用RequestFuture中的listeners } private void fireSuccess() { for (RequestFutureListener<T> listener : listeners) listener.onSuccess(value); } private void fireFailure() { for (RequestFutureListener<T> listener : listeners) listener.onFailure(exception); }
與producer相似,請求被放到一個map中,不過名字是unsent。以下ConsumerNetworkClient#send(..):函數
public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) { long now = time.milliseconds(); RequestFutureCompletionHandler future = new RequestFutureCompletionHandler(); // 回調函數 RequestHeader header = client.nextRequestHeader(api); RequestSend send = new RequestSend(node.idString(), header, request.toStruct()); put(node, new ClientRequest(now, true, send, future)); // request方法哦unsent中 return future; // 並返回回調函數類的引用 }
在調用ConsumerNetworkClient#send(..)後又緊接着調用了Future#compose(..)。以下:this
private RequestFuture<Void> sendGroupCoordinatorRequest() { Node node = this.client.leastLoadedNode(); if (node == null) { return RequestFuture.noBrokersAvailable(); } else { log.debug("Sending coordinator request for group {} to broker {}", groupId, node); GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) // send後返回FutureRequest,而後又調用compose方法 .compose(new RequestFutureAdapter<ClientResponse, Void>() { @Override public void onSuccess(ClientResponse response, RequestFuture<Void> future) { handleGroupMetadataResponse(response, future); } }); } }
Future#compose(..)方法又兩個做用線程
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { final RequestFuture<S> adapted = new RequestFuture<S>(); // 返回新的RequestFuture addListener(new RequestFutureListener<T>() { // 添加到原先FutureRequest中的listeners中 @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); // 返回response後會調用listeners,從而會設置新的RequestFuture狀態,咱們就能夠根據這個新的RequestFuture來判斷response處理狀態。 } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); return adapted; }
因此將ClientRequest放到map中後,最終咱們持有的是compose中新建的FutureRequest,如AbstractCoordinator#ensureCoordinatorReady(..):debug
public void ensureCoordinatorReady() { while (coordinatorUnknown()) { RequestFuture<Void> future = sendGroupCoordinatorRequest();// 最終返回compose返回的future。 client.poll(future); // 在poll中不停的輪訓future的狀態 if (future.failed()) { if (future.isRetriable()) client.awaitMetadataUpdate(); else throw future.exception(); } else if (coordinator != null && client.connectionFailed(coordinator)) { coordinatorDead(); time.sleep(retryBackoffMs); } } } public void poll(RequestFuture<?> future) { while (!future.isDone()) // 輪訓future狀態,當response作相應處理會調用回調函數,從而設置future相應狀態。 poll(Long.MAX_VALUE); }
kafka客戶端中使用了大量的回調函數作請求的處理,理解回調函數很重要,附回調函數連接:
http://www.cnblogs.com/set-cookie/p/8996951.html