不一樣於Spring中複雜的實現,在Kafka中,對於Listener和對應的Adaptor,都十分的簡單與簡潔。下面咱們就以Kafka的實現做爲參考,簡單說說如何去實現一個監聽器與適配器。java
咱們能夠看到,在這個RequestFuture類中,有一個 addListener 方法,這個方法實際上很是簡單,就是將一個RequestFutureListener<\T> 對象添加到維護的 List <\RequestFutureListener<\T>> listeners 中。json
public class RequestFuture<T> { private boolean isDone = false; private T value; private RuntimeException exception; private List<RequestFutureListener<T>> listeners = new ArrayList<>(); /** * Check whether the response is ready to be handled * * 接收是否已經準備好處理 * * @return true if the response is ready, false otherwise */ public boolean isDone() { return isDone; } /** * Get the value corresponding to this request (only available if the request succeeded) * * 獲取此請求對應的值(僅當請求成功) * * @return the value if it exists or null */ public T value() { return value; } /** * Check if the request succeeded; * * 檢查請求是否成功 * * @return true if the request completed and was successful */ public boolean succeeded() { return isDone && exception == null; } /** * Check if the request failed. * * 檢查請求是否失敗 * * @return true if the request completed with a failure */ public boolean failed() { return isDone && exception != null; } /** * Get the exception from a failed result (only available if the request failed) * * 獲取請求失敗的異常(僅失敗時) * * @return The exception if it exists or null */ public RuntimeException exception() { return exception; } /** * Complete the request successfully. After this call, {@link #succeeded()} will return true * and the value can be obtained through {@link #value()}. * * 請求成功完成。調用完這個方法後, succeeded會返回true,而且能夠經過 value() 方法來得到結果 * * @param value corresponding value (or null if there is none) */ public void complete(T value) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.value = value; this.isDone = true; // 調用 listener的onSuccess fireSuccess(); } /** * Raise The request will be marked as failed, and the caller can either * handle the exception or throw it. * * @param e corresponding exception to be passed to caller */ public void raise(RuntimeException e) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.exception = e; this.isDone = true; // 調用 listener的onFail fireFailure(); } /** * Raise an error. The request will be marked as failed. * * @param error corresponding error to be passed to caller */ public void raise(Errors error) { raise(error.exception()); } private void fireSuccess() { for (RequestFutureListener<T> listener : listeners) listener.onSuccess(value); } private void fireFailure() { for (RequestFutureListener<T> listener : listeners) listener.onFailure(exception); } /** * Add a listener which will be notified when the future completes */ public void addListener(RequestFutureListener<T> listener) { if (isDone) { if (exception != null) { listener.onFailure(exception); } else { listener.onSuccess(value); } } else { this.listeners.add(listener); } } }
而 listener 的調用也十分簡單,在listener上註冊的 onSuccess 和 onFailure 會分別在 fireSuccess 和 fireFailure 時調用。設計模式
private void fireSuccess() { for (RequestFutureListener<T> listener : listeners) listener.onSuccess(value); } private void fireFailure() { for (RequestFutureListener<T> listener : listeners) listener.onFailure(exception); }
而兩個 fireXxxx方法,則是 complete 與 raise 時調用。網絡
/** * Complete the request successfully. After this call, {@link #succeeded()} will return true * and the value can be obtained through {@link #value()}. * * 請求成功完成。調用完這個方法後, succeeded會返回true,而且能夠經過 value() 方法來得到結果 * * @param value corresponding value (or null if there is none) */ public void complete(T value) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.value = value; this.isDone = true; // 調用 listener的onSuccess fireSuccess(); } /** * Raise The request will be marked as failed, and the caller can either * handle the exception or throw it. * * @param e corresponding exception to be passed to caller */ public void raise(RuntimeException e) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.exception = e; this.isDone = true; // 調用 listener的onFail fireFailure(); }
這裏只說正向流程,總結一下,從compute方法開始。咱們無需管是誰,經過何種方式調用了這個方法。在compute中,咱們將會調用fireSuccess方法,而fireSuccess則十分簡單,就是將註冊過的監聽器列表循環一下,分別調用其onSuccess方法。異步
public void complete(T value) { if (isDone) { throw new IllegalStateException("Invalid attempt to complete a request future which is already complete"); } this.value = value; this.isDone = true; // 調用 listener的onSuccess fireSuccess(); }
咱們先無論RequestFuture裏面那個泛型ClientResponse是個什麼東西,咱們把它想象成一個Future,future裏面是一個Json對象。socket
假如咱們本來的請求構建是這樣的(僞代碼,和Kafka不要緊):tcp
public void Future request(String url, Param param) Future future = new Future(param); future.addListener(new Listener() { @Override public void onSuccess(Json json) { check(json); } @Override public void onFailure() { xxxxx; } });// 驗證返回是否有效 future.addListener(new Listener() { @Override public void onSuccess(Json json) { console(json); } @Override public void onFailure() { xxxxx; } });// 打印日誌 future.addListener(new Listener() { @Override public void onSuccess(Json json) { persist(json); } @Override public void onFailure() { xxxxx; } });// 記錄請求信息以進行統計 xxxxx......// 更多處理 sendRequest(url, future).fetch(() -> { void success(Json json){ future.complete(json); } });// 異步發送請求 return future; } ............................ Json json = future.value();// 獲取請求結果
咱們固然能夠爲新的Future<\Meat> 寫一個新的sendRequest方法,同時也要爲它寫一個request方法。 咱們首先爲Future對象新增泛型(固然也能夠直接用一個新的對象),而後爲其單獨寫一套request方法與sendRequest(這裏就不詳細寫了)方法。ide
public void Future<Meat> request4Meat(String url, Param param) Future<Meat> future = new Future<>(param); future.addListener(new Listener<Meat>() { @Override public void onSuccess(Meat meat) { check(meat);// 原future裏的驗證返回是否有效,重載的方法 checkMeat(meat);// 進一步校驗(好比說check只是校驗返回的格式對不對,而checkMeat // 要校驗肉是否新鮮,是前腿肉仍是五花肉等等) } @Override public void onFailure() { xxxxx; } });// 驗證返回是否有效 future.addListener(new Listener<Meat>() { @Override public void onSuccess(Meat meat) { console(meat);// 平常輸出 consoleMeat();// 和肉有關的輸出 } @Override public void onFailure() { xxxxx; } });// 打印日誌 future.addListener(new Listener<Meat>() { @Override public void onSuccess(Meat meat) { persist(meat);// 記錄普通的請求信息,好比請求發起時間,結束時間,請求了哪個url等等 persistMeat(meat);// 記錄肉質,肉的來源,哪裏生產的等等,和肉相關的信息 } @Override public void onFailure() { xxxxx; } });// 記錄請求信息以進行統計 xxxxx......// 更多處理 sendRequestMeat(url, future).fetch(() -> { void success(Meat meat){ future.complete(meat); } });// 異步發送請求,咱們這裏還要將返回來的對象封裝成Meat對象,而再也不是Json對象。 return future; } ............................ Meat meat = future.value();// 獲取請求結果
在這個例子中,很明顯的特色就是:<font color='red'>Future<\Json>的請求的過程和處理的流程是Future<\Meat>的子集。</font>fetch
若是每個新的Future咱們都爲其寫一套代碼,不只僅會形成代碼的臃腫。另外,若是傳輸的方式改變了,好比我想將tcp請求改爲socket請求,那麼全部的Future都必須修改。this
以前看到一篇文章裏很好的解釋(忘了在哪裏看到的,對不起原做者),咱們要取一個快遞,不管是咱們讓快遞小哥哥放在門口,仍是咱們本身去拿,仍是放在快遞櫃,首先都有一個快遞從發貨人開始發貨,而後交給快遞小哥的這個過程。中途的不管什麼處理,好比說快遞丟失處理,快遞被燒了,仍是快遞正常送達派件中心。這些操做都是相同的,咱們徹底沒有必要爲了要使用快遞櫃去接收而重寫一套完整的邏輯。
除了上面的大白話,這裏引用其餘博客對適配器模式的解釋:
適配器模式(Adapter Pattern)是做爲兩個不兼容的接口之間的橋樑。這種類型的設計模式屬於結構型模式,它結合了兩個獨立接口的功能。這種模式涉及到一個單一的類,該類負責加入獨立的或不兼容的接口功能。舉個真實的例子,讀卡器是做爲內存卡和筆記本之間的適配器。您將內存卡插入讀卡器,再將讀卡器插入筆記本,這樣就能夠經過筆記原本讀取內存卡。主要解決在軟件系統中,經常要將一些」現存的對象」放到新的環境中,而新環境要求的接口是現對象不能知足的。
大話設計模式中程傑老師給出的定義是,適配器模式:將一個類的接口轉換成客戶但願的另一個接口。適配器模式使得本來因爲接口不兼容而不能一塊兒工做的那些類能夠一塊兒工做。
系統數據和行爲都正確,但接口不符合時,咱們應該考慮用適配器,目的是使控制範圍以外的一個原有對象與某個接口匹配。
因而咱們引入
public abstract class RequestFutureAdapter<F, T> { public abstract void onSuccess(F value, RequestFuture<T> future); public void onFailure(RuntimeException e, RequestFuture<T> future) { future.raise(e); } }
Kafka在RequestFuture中加入了適配方法,這個適配方法十分簡單,實際上就是new了一個新的Future對象,而後在原來的future對象在success時調用適配器中,的onSuccess方法。
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) { final RequestFuture<S> adapted = new RequestFuture<S>(); addListener(new RequestFutureListener<T>() { // 實際上這裏就是讓原來的 future 在succeed 時,會調用 adapter 中的 onSuccess 方法 @Override public void onSuccess(T value) { adapter.onSuccess(value, adapted); } @Override public void onFailure(RuntimeException e) { adapter.onFailure(e, adapted); } }); // 返回的這個新的 future 對象 return adapted; }
在結合上面說的快遞過程,咱們能夠這麼作:
RequestFuture<快遞櫃> future = request(url, param).compose(new RequestFutureAdapter<Json, 快遞櫃>() { @Override public void onSuccess(Json json, RequestFuture<快遞櫃> future) { 快遞櫃 k = json.parse();// 將json(快遞)對象轉換爲快遞櫃對象,也就是扔到快遞櫃中 if (k.check()){// 若是扔到快遞櫃中的校驗都沒問題 future.complete(k);// 沒問題的話,能夠通知咱們新的RequestFuture<快遞櫃>對象正常了 } else { future.raise(new 快遞櫃Exception); } } // 失敗的就不寫了,這多是快遞在路上出問題了,快遞都沒到快遞小哥哥手上 });
而後咱們能夠在新的 RequestFuture<快遞櫃> 上添加一些和快遞櫃相關的操做。好比說,
future.addListener(new Listener<快遞櫃>() { @Override public void onSuccess(快遞櫃 快遞櫃對象) { sendSucceededMsg(快遞櫃對象.getPhoneNum);// 告訴我快遞到快遞櫃了 } @Override public void onFailure() { sout(e)// 因爲某種緣由失敗,好比說快遞櫃滿了。 } });
甚至能夠這樣:
RequestFuture<拿快遞>futureN(快遞櫃future).compose(new RequestFutureAdapter<快遞櫃, 拿快遞>() { @Override public void onSuccess(快遞櫃 快遞櫃對象, RequestFuture<拿快遞> future) { 拿快遞 n = 快遞櫃對象.parse(); if (n.check()){// 咱們來拿快遞了 future.complete(n);// 成功取走了快遞 } else { future.raise(new 滯留Exception); } } // 失敗的就不寫了 }); futureN.addListener(xxxx); // xxxx,成功拿了快遞後,要拆快遞,等等。 // 若是滯留了,要怎麼處理等等
首先,不使用適配器的狀況下,咱們固然能夠在原有的future上,經過添加監聽器,接着分支判斷的方式,來實現上述功能,不過比較噁心。
適配器 + 監聽器 的模式,能夠發現,咱們不只將一種對象適配成了另外一種對象。並且<font color='red'>適配前的狀態,和適配後Future的狀態,是隔離的。咱們經過適配器,將他們的狀態進行了隔離。</font>前面Future對象的success,並不表明適配後的對象就是success的。
好比說,快遞success抵達了配送中心,可能會在第一個適配器,也就是快遞轉快遞櫃適配器中被發現,這個快遞沒法通過check。好比快遞太大了,根本放不進快遞櫃。
這時候就會在快遞櫃適配器中,喚醒raise操做,而不會調用complete方法,也就是不會通知我來拿快遞,間接也不會在第二個適配器中,出現我去快遞櫃拿快遞這種操做了。在這種狀況下,就會調用Future<快遞櫃>的全部 onFailure方法,好比通知快遞小哥,快遞放不下;通知咱們,沒法使用快遞櫃寄存這種方式進行取快遞,須要本身來拿等等。
kafka在心跳處理中,首先拿到正常的clientResponse請求,將clientResponse轉換成了心跳對象(實際上就是 errorCode = struct.getShort(ERROR_CODE_KEY_NAME);),將clientResponse中struct(能夠看做是包體)的errorCode取出。
最後根據errorCode來判斷是否要從新加入消費組,協調器是否掛了等等。
緊接着
/** * 這個適配器並無作什麼特殊的邏輯處理,只是判斷請求成功或者失敗(失敗的各類類型)來進行各類補償操做, * 好比從新加入消費組之類的。 */ private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse/* 從這個 */, Void/* 到這個 */> { // 原 client.send(coordinator, ApiKeys.HEARTBEAT, req) 會返回的對象。 protected ClientResponse response; @Override public void onSuccess(ClientResponse clientResponse, RequestFuture<Void> future) { // 讓咱們看看在成功時都作了什麼 try { this.response = clientResponse; // 將clientResponse解析爲心跳包 HeartbeatResponse HeartbeatResponse responseObj = parse(clientResponse); // 處理心跳包 handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) { future.raise(e); } } } @Override public void onFailure(RuntimeException e, RequestFuture<Void> future) { // mark the coordinator as dead if (e instanceof DisconnectException) { coordinatorDead(); } future.raise(e); } @Override public HeartbeatResponse parse(ClientResponse response) { // 提取返回的body (Struct對象),new一個HeartbeatResponse return new HeartbeatResponse(response.responseBody()); } @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); // 將heartbeatResponse中short類型的 errCode 轉爲 Error對象 Errors error = Errors.forCode(heartbeatResponse.errorCode()); if (error == Errors.NONE) { log.debug("Received successful heartbeat response for group {}", groupId); // 沒報錯,直接將這個引用置爲成功,成功後會調用 RequestFuture<Void> future 的 onSuccess方法 future.complete(null); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE// 組協調器不可用 || error == Errors.NOT_COORDINATOR_FOR_GROUP) {// 沒有協調器 log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", groupId, coordinator); coordinatorDead();// 標記協調器掛了 future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) {// 當前組在重負載,因此須要重加入 log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); // 表示須要從新join AbstractCoordinator.this.rejoinNeeded = true;// 表示當前協調器須要重加入 future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) {// 世代錯誤 log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); // 表示須要從新join AbstractCoordinator.this.rejoinNeeded = true;// 表示當前協調器須要重加入 future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) {// 協調器不認識這個成員 log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; // 表示須要從新join AbstractCoordinator.this.rejoinNeeded = true;// 表示當前協調器須要重加入 future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {// 組認證失敗 future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } }