1. 背景java
在業務處理完以後,須要調用其餘系統的接口,將相應的處理結果通知給對方,如果同步請求,假如調用的系統出現異常或是宕機等事件,會致使自身業務受到影響,事務會一直阻塞,數據庫鏈接不夠用等異常現象,能夠經過異步回調來防止阻塞,但異步的狀況還存在一個問題,若調用一次不成功的話接下來怎麼處理?這個地方就須要按時間梯度回調,好比前期按10s間隔回調,回調3次,若不成功按30s回調,回調2次,再不成功按分鐘回調,依次類推……至關於給了對方系統恢復的時間,不可能一直處於異常或宕機等異常狀態,如果再不成功能夠再經過人工干預的手段去處理了,具體業務具體實現。git
2. 技術實現github
大致實現思路以下圖,此過程用到兩個隊列,當前隊列和Next隊列,當前隊列用來存放第一次須要回調的數據對象,若是調用不成功則放入Next隊列,按照制定的時間策略再繼續回調,直到成功或最終持久化後人工接入處理。數據庫
用到的技術以下:json
3. 主要代碼說明api
3.1 回調時間梯度的策略設計異步
採用枚舉來對策略規則進行處理,便於代碼上的維護,該枚舉設計三個參數,級別、回調間隔、回調次數;ide
/** * 回調策略 */ public enum CallbackType { //等級1,10s執行3次 SECONDS_10(1, 10, 3), //等級2,30s執行2次 SECONDS_30(2, 30, 2), //等級3,60s執行2次 MINUTE_1(3, 60, 2), //等級4,5min執行1次 MINUTE_5(4, 300, 1), //等級5,30min執行1次 MINUTE_30(5, 30*60, 1), //等級6,1h執行2次 HOUR_1(6, 60*60, 1), //等級7,3h執行2次 HOUR_3(7, 60*60*3, 1), //等級8,6h執行2次 HOUR_6(8, 60*60*6, 1); //級別 private int level; //回調間隔時間 秒 private int intervalTime; //回調次數 private int count; }
3.2 數據傳輸對象設計ui
聲明抽象父類,便於其餘對象調用傳輸繼承。spa
/** * 消息對象父類 */ public abstract class MessageInfo { //開始時間 private long startTime; //更新時間 private long updateTime; //是否回調成功 private boolean isSuccess=false; //回調次數 private int count=0; //回調策略 private CallbackType callbackType; }
要傳輸的對象,繼承消息父類;
/** * 工單回調信息 */ public class WorkOrderMessage extends MessageInfo { //車架號 private String vin; //工單號 private String workorderno; //工單狀態 private Integer status; //工單緣由 private String reason; //操做用戶 private Integer userid; }
3.3 調度線程池的使用
//聲明線程池,大小爲16 private ScheduledExecutorService pool = Executors.newScheduledThreadPool(16);
...略 while (true){ //從隊列獲取數據,交給定時器執行 try { WorkOrderMessage message = MessageQueue.getMessageFromNext(); long excueTime = message.getUpdateTime()+message.getCallbackType().getIntervalTime()* 1000; long t = excueTime - System.currentTimeMillis(); if (t/1000 < 5) {//5s以內將要執行的數據提交給調度線程池 System.out.println("MessageHandleNext-知足定時器執行條件"+JSONObject.toJSONString(message)); pool.schedule(new Callable<Boolean>() { @Override public Boolean call() throws Exception { remoteCallback(message); return true; } }, t, TimeUnit.MILLISECONDS); }else { MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { System.out.println(e); } }
3.4 retrofit2的使用,方便好用。
具體可查看官網相關文檔進行了解,用起來仍是比較方便的。http://square.github.io/retrofit/
retrofit初始化:
import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory; public class RetrofitHelper { private static final String HTTP_URL = "http://baidu.com/"; private static Retrofit retrofit; public static Retrofit instance(){ if (retrofit == null){ retrofit = new Retrofit.Builder() .baseUrl(HTTP_URL) .addConverterFactory(GsonConverterFactory.create()) .build(); } return retrofit; } }
若是須要修改超時時間,鏈接時間等能夠這樣初始話,Retrofit採用OkHttpClient
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory; import java.util.concurrent.TimeUnit; public class RetrofitHelper { private static final String HTTP_URL = "http://baidu.com/"; private static Retrofit retrofit; public static Retrofit instance(){ if (retrofit == null){ retrofit = new Retrofit.Builder() .baseUrl(HTTP_URL) .client(new OkHttpClient.Builder() .connectTimeout(30, TimeUnit.SECONDS)//鏈接時間 .readTimeout(30, TimeUnit.SECONDS)//讀時間 .writeTimeout(30, TimeUnit.SECONDS)//寫時間 .build()) .addConverterFactory(GsonConverterFactory.create()) .build(); } return retrofit; } }
Retrofit使用經過接口調用,要先聲明一個接口;
import com.alibaba.fastjson.JSONObject; import com.woasis.callbackdemo.bean.WorkOrderMessage; import retrofit2.Call; import retrofit2.http.Body; import retrofit2.http.POST; public interface WorkOrderMessageInterface { @POST("/api") Call<JSONObject> updateBatteryInfo(@Body WorkOrderMessage message); }
接口和實例對象準備好了,接下來就是調用;
private void remoteCallback(WorkOrderMessage message){ //實例接口對象 WorkOrderMessageInterface workOrderMessageInterface = RetrofitHelper.instance().create(WorkOrderMessageInterface.class); //調用接口方法 Call<JSONObject> objectCall = workOrderMessageInterface.updateBatteryInfo(message); System.out.println("遠程調用執行:"+new Date()); //異步調用執行 objectCall.enqueue(new Callback<JSONObject>() { @Override public void onResponse(Call<JSONObject> call, Response<JSONObject> response) { System.out.println("MessageHandleNext****調用成功"+Thread.currentThread().getId()); message.setSuccess(true); System.out.println("MessageHandleNext-回調成功"+JSONObject.toJSONString(message)); } @Override public void onFailure(Call<JSONObject> call, Throwable throwable) { System.out.println("MessageHandleNext++++調用失敗"+Thread.currentThread().getId()); //失敗後再將數據放入隊列 try { //對回調策略初始化 long currentTime = System.currentTimeMillis(); message.setUpdateTime(currentTime); message.setSuccess(false); CallbackType callbackType = message.getCallbackType(); //獲取等級 int level = CallbackType.getLevel(callbackType); //獲取次數 int count = CallbackType.getCount(callbackType); //若是等級已經最高,則再也不回調 if (CallbackType.HOUR_6.getLevel() == callbackType.getLevel() && count == message.getCount()){ System.out.println("MessageHandleNext-等級最高,再也不回調, 線下處理:"+JSONObject.toJSONString(message)); }else { //看count是否最大,count次數最大則增長level if (message.getCount()<callbackType.getCount()){ message.setCount(message.getCount()+1); }else {//若是不小,則增長level message.setCount(1); level += 1; message.setCallbackType(CallbackType.getTypeByLevel(level)); } MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println("MessageHandleNext-放入隊列數據失敗"); } } }); }
3.5結果實現
4.總結
本次實現了按照時間梯度去相應其餘系統的接口,再也不致使自己業務因其餘系統的異常而阻塞。請大佬們看實現有沒有不合理的地方,歡迎批評指正。