咱們能夠看到,在這個RequestFuture類中,有一個 addListener 方法,這個方法實際上很是簡單,就是將一個RequestFutureListener<\T> 對象添加到維護的 List <\RequestFutureListener<\T>> listeners 中。json
public class RequestFuture<T> { private boolean isDone = false; private T value; private RuntimeException exception; private List<RequestFutureListener<T>> listeners = new ArrayList<>(); /** * Check whether the response is ready to be handled * * 接收是否已經準備好處理 * * @return true if the response is ready, false otherwise */ public boolean isDone() { return isDone; } /** * Get the value corresponding to this request (only available if the request succeeded) * * 獲取此請求對應的值(僅當請求成功) * * @return the value if it exists or null */ public T value() { return value; } /** * Check if the request succeeded; * * 檢查請求是否成功 * * @return true if the request completed and was successful */ public boolean succeeded() { return isDone && exception == null; } /** * Check if the request failed. * * 檢查請求是否失敗 * * @return true if the request completed with a failure */ public boolean failed() { return isDone && exception != null; } /** * Get the exception from a failed result (only available if the request failed) * * 獲取請求失敗的異常(僅失敗時) * * @return The exception if it exists or null */ public RuntimeException exception() { return exception; } /** * Complete the request successfully. After this call, {@link #succeeded()} will return true * and the value can be obtained through {@link #value()}. * * 請求成功完成。調用完這個方法後, succeeded會返回true,而且能夠經過 value() 方法來得到結果 * * @param value corresponding value (or null if there is none) */ public void complete(T value) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.value = value; this.isDone = true; // 調用 listener的onSuccess fireSuccess(); } /** * Raise The request will be marked as failed, and the caller can either * handle the exception or throw it. * * @param e corresponding exception to be passed to caller */ public void raise(RuntimeException e) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.exception = e; this.isDone = true; // 調用 listener的onFail fireFailure(); } /** * Raise an error. The request will be marked as failed. * * @param error corresponding error to be passed to caller */ public void raise(Errors error) { raise(error.exception()); } private void fireSuccess() { for (RequestFutureListener<T> listener : listeners) listener.onSuccess(value); } private void fireFailure() { for (RequestFutureListener<T> listener : listeners) listener.onFailure(exception); } /** * Add a listener which will be notified when the future completes */ public void addListener(RequestFutureListener<T> listener) { if (isDone) { if (exception != null) { listener.onFailure(exception); } else { listener.onSuccess(value); } } else { this.listeners.add(listener); } } }
而 listener 的調用也十分簡單,在listener上註冊的 onSuccess 和 onFailure 會分別在 fireSuccess 和 fireFailure 時調用。設計模式
而兩個 fireXxxx方法,則是 complete 與 raise 時調用。網絡
/** * Complete the request successfully. After this call, {@link #succeeded()} will return true * and the value can be obtained through {@link #value()}. * * 請求成功完成。調用完這個方法後, succeeded會返回true,而且能夠經過 value() 方法來得到結果 * * @param value corresponding value (or null if there is none) */ public void complete(T value) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.value = value; this.isDone = true; // 調用 listener的onSuccess fireSuccess(); } /** * Raise The request will be marked as failed, and the caller can either * handle the exception or throw it. * * @param e corresponding exception to be passed to caller */ public void raise(RuntimeException e) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.exception = e; this.isDone = true; // 調用 listener的onFail fireFailure(); }
public void Future request(String url, Param param) Future future = new Future(param); future.addListener(new Listener() { @Override public void onSuccess(Json json) { check(json); } @Override public void onFailure() { xxxxx; } });// 驗證返回是否有效 future.addListener(new Listener() { @Override public void onSuccess(Json json) { console(json); } @Override public void onFailure() { xxxxx; } });// 打印日誌 future.addListener(new Listener() { @Override public void onSuccess(Json json) { persist(json); } @Override public void onFailure() { xxxxx; } });// 記錄請求信息以進行統計 xxxxx......// 更多處理 sendRequest(url, future).fetch(() -> { void success(Json json){ future.complete(json); } });// 異步發送請求 return future; } ............................ Json json = future.value();// 獲取請求結果
咱們固然能夠爲新的Future<\Meat> 寫一個新的sendRequest方法,同時也要爲它寫一個request方法。 咱們首先爲Future對象新增泛型(固然也能夠直接用一個新的對象),而後爲其單獨寫一套request方法與sendRequest(這裏就不詳細寫了)方法。ide
public void Future<Meat> request4Meat(String url, Param param) Future<Meat> future = new Future<>(param); future.addListener(new Listener<Meat>() { @Override public void onSuccess(Meat meat) { check(meat);// 原future裏的驗證返回是否有效,重載的方法 checkMeat(meat);// 進一步校驗(好比說check只是校驗返回的格式對不對,而checkMeat // 要校驗肉是否新鮮,是前腿肉仍是五花肉等等) } @Override public void onFailure() { xxxxx; } });// 驗證返回是否有效 future.addListener(new Listener<Meat>() { @Override public void onSuccess(Meat meat) { console(meat);// 平常輸出 consoleMeat();// 和肉有關的輸出 } @Override public void onFailure() { xxxxx; } });// 打印日誌 future.addListener(new Listener<Meat>() { @Override public void onSuccess(Meat meat) { persist(meat);// 記錄普通的請求信息,好比請求發起時間,結束時間,請求了哪個url等等 persistMeat(meat);// 記錄肉質,肉的來源,哪裏生產的等等,和肉相關的信息 } @Override public void onFailure() { xxxxx; } });// 記錄請求信息以進行統計 xxxxx......// 更多處理 sendRequestMeat(url, future).fetch(() -> { void success(Meat meat){ future.complete(meat); } });// 異步發送請求,咱們這裏還要將返回來的對象封裝成Meat對象,而再也不是Json對象。 return future; } ............................ Meat meat = future.value();// 獲取請求結果
在這個例子中,很明顯的特色就是:<font color='red'>Future<\Json>的請求的過程和處理的流程是Future<\Meat>的子集。</font>fetch
適配器模式(Adapter Pattern)是做爲兩個不兼容的接口之間的橋樑。這種類型的設計模式屬於結構型模式,它結合了兩個獨立接口的功能。這種模式涉及到一個單一的類,該類負責加入獨立的或不兼容的接口功能。舉個真實的例子,讀卡器是做爲內存卡和筆記本之間的適配器。您將內存卡插入讀卡器,再將讀卡器插入筆記本,這樣就能夠經過筆記原本讀取內存卡。主要解決在軟件系統中,經常要將一些」現存的對象」放到新的環境中,而新環境要求的接口是現對象不能知足的。
public abstract class RequestFutureAdapter<F, T> { public abstract void onSuccess(F value, RequestFuture<T> future); public void onFailure(RuntimeException e, RequestFuture<T> future) { future.raise(e); } }
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { final RequestFuture<S> adapted = new RequestFuture<S>(); addListener(new RequestFutureListener<T>() { // 實際上這裏就是讓原來的 future 在succeed 時,會調用 adapter 中的 onSuccess 方法 @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); // 返回的這個新的 future 對象 return adapted; }
RequestFuture<快遞櫃> future = request(url, param).compose(new RequestFutureAdapter<Json, 快遞櫃>() { @Override public void onSuccess(Json json, RequestFuture<快遞櫃> future) { 快遞櫃 k = json.parse();// 將json(快遞)對象轉換爲快遞櫃對象,也就是扔到快遞櫃中 if (k.check()){// 若是扔到快遞櫃中的校驗都沒問題 future.complete(k);// 沒問題的話,能夠通知咱們新的RequestFuture<快遞櫃>對象正常了 } else { future.raise(new 快遞櫃Exception); } } // 失敗的就不寫了,這多是快遞在路上出問題了,快遞都沒到快遞小哥哥手上 });
而後咱們能夠在新的 RequestFuture<快遞櫃> 上添加一些和快遞櫃相關的操做。好比說,
future.addListener(new Listener<快遞櫃>() { @Override public void onSuccess(快遞櫃 快遞櫃對象) { sendSucceededMsg(快遞櫃對象.getPhoneNum);// 告訴我快遞到快遞櫃了 } @Override public void onFailure() { sout(e)// 因爲某種緣由失敗,好比說快遞櫃滿了。 } });
RequestFuture<拿快遞>futureN(快遞櫃future).compose(new RequestFutureAdapter<快遞櫃, 拿快遞>() { @Override public void onSuccess(快遞櫃 快遞櫃對象, RequestFuture<拿快遞> future) { 拿快遞 n = 快遞櫃對象.parse(); if (n.check()){// 咱們來拿快遞了 future.complete(n);// 成功取走了快遞 } else { future.raise(new 滯留Exception); } } // 失敗的就不寫了 }); futureN.addListener(xxxx); // xxxx,成功拿了快遞後,要拆快遞,等等。 // 若是滯留了,要怎麼處理等等
適配器 + 監聽器 的模式,能夠發現,咱們不只將一種對象適配成了另外一種對象。並且<font color='red'>適配前的狀態,和適配後Future的狀態,是隔離的。咱們經過適配器,將他們的狀態進行了隔離。</font>前面Future對象的success,並不表明適配後的對象就是success的。
這時候就會在快遞櫃適配器中,喚醒raise操做,而不會調用complete方法,也就是不會通知我來拿快遞,間接也不會在第二個適配器中,出現我去快遞櫃拿快遞這種操做了。在這種狀況下,就會調用Future<快遞櫃>的全部 onFailure方法,好比通知快遞小哥,快遞放不下;通知咱們,沒法使用快遞櫃寄存這種方式進行取快遞,須要本身來拿等等。
kafka在心跳處理中,首先拿到正常的clientResponse請求,將clientResponse轉換成了心跳對象(實際上就是 errorCode = struct.getShort(ERROR_CODE_KEY_NAME);),將clientResponse中struct(能夠看做是包體)的errorCode取出。
/** * 這個適配器並無作什麼特殊的邏輯處理,只是判斷請求成功或者失敗(失敗的各類類型)來進行各類補償操做, * 好比從新加入消費組之類的。 */ private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse/* 從這個 */, Void/* 到這個 */> { // 原 client.send(coordinator, ApiKeys.HEARTBEAT, req) 會返回的對象。 protected ClientResponse response; @Override public void onSuccess(ClientResponse clientResponse, RequestFuture<Void> future) { // 讓咱們看看在成功時都作了什麼 try { this.response = clientResponse; // 將clientResponse解析爲心跳包 HeartbeatResponse HeartbeatResponse responseObj = parse(clientResponse); // 處理心跳包 handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) { future.raise(e); } } } @Override public void onFailure(RuntimeException e, RequestFuture<Void> future) { // mark the coordinator as dead if (e instanceof DisconnectException) { coordinatorDead(); } future.raise(e); } @Override public HeartbeatResponse parse(ClientResponse response) { // 提取返回的body (Struct對象),new一個HeartbeatResponse return new HeartbeatResponse(response.responseBody()); } @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); // 將heartbeatResponse中short類型的 errCode 轉爲 Error對象 Errors error = Errors.forCode(heartbeatResponse.errorCode()); if (error == Errors.NONE) { log.debug("Received successful heartbeat response for group {}", groupId); // 沒報錯,直接將這個引用置爲成功,成功後會調用 RequestFuture<Void> future 的 onSuccess方法 future.complete(null); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE// 組協調器不可用 || error == Errors.NOT_COORDINATOR_FOR_GROUP) {// 沒有協調器 log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", groupId, coordinator); coordinatorDead();// 標記協調器掛了 future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) {// 當前組在重負載,因此須要重加入 log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); // 表示須要從新join AbstractCoordinator.this.rejoinNeeded = true;// 表示當前協調器須要重加入 future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) {// 世代錯誤 log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); // 表示須要從新join AbstractCoordinator.this.rejoinNeeded = true;// 表示當前協調器須要重加入 future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) {// 協調器不認識這個成員 log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; // 表示須要從新join AbstractCoordinator.this.rejoinNeeded = true;// 表示當前協調器須要重加入 future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {// 組認證失敗 future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } }