Java按時間梯度實現異步回調接口

1. 背景java

  在業務處理完以後,須要調用其餘系統的接口,將相應的處理結果通知給對方,如果同步請求,假如調用的系統出現異常或是宕機等事件,會致使自身業務受到影響,事務會一直阻塞,數據庫鏈接不夠用等異常現象,能夠經過異步回調來防止阻塞,但異步的狀況還存在一個問題,若調用一次不成功的話接下來怎麼處理?這個地方就須要按時間梯度回調,好比前期按10s間隔回調,回調3次,若不成功按30s回調,回調2次,再不成功按分鐘回調,依次類推……至關於給了對方系統恢復的時間,不可能一直處於異常或宕機等異常狀態,如果再不成功能夠再經過人工干預的手段去處理了,具體業務具體實現。git

2. 技術實現github

  大致實現思路以下圖,此過程用到兩個隊列,當前隊列和Next隊列,當前隊列用來存放第一次須要回調的數據對象,若是調用不成功則放入Next隊列,按照制定的時間策略再繼續回調,直到成功或最終持久化後人工接入處理。數據庫

 

  用到的技術以下:json

  • http請求庫,retrofit2
  • 隊列,LinkedBlockingQueue
  • 調度線程池,ScheduledExecutorService

 

3. 主要代碼說明api

3.1 回調時間梯度的策略設計異步

採用枚舉來對策略規則進行處理,便於代碼上的維護,該枚舉設計三個參數,級別、回調間隔、回調次數;ide

/**
 * 回調策略
 */
public enum CallbackType {

    //等級1,10s執行3次
    SECONDS_10(1, 10, 3),
    //等級2,30s執行2次
    SECONDS_30(2, 30, 2),
    //等級3,60s執行2次
    MINUTE_1(3, 60, 2),
    //等級4,5min執行1次
    MINUTE_5(4, 300, 1),
    //等級5,30min執行1次
    MINUTE_30(5, 30*60, 1),
    //等級6,1h執行2次
    HOUR_1(6, 60*60, 1),
    //等級7,3h執行2次
    HOUR_3(7, 60*60*3, 1),
    //等級8,6h執行2次
    HOUR_6(8, 60*60*6, 1);

    //級別
    private int level;
    //回調間隔時間 秒
    private int intervalTime;
    //回調次數
    private int count;
}

3.2 數據傳輸對象設計ui

聲明抽象父類,便於其餘對象調用傳輸繼承。spa

/**
 * 消息對象父類
 */
public abstract class MessageInfo {

    //開始時間
    private long startTime;
    //更新時間
    private long updateTime;
    //是否回調成功
    private boolean isSuccess=false;
    //回調次數
    private int count=0;
    //回調策略
    private CallbackType callbackType;
}

要傳輸的對象,繼承消息父類;

/**
 * 工單回調信息
 */
public class WorkOrderMessage extends MessageInfo {

    //車架號
    private String vin;
    //工單號
    private String workorderno;
    //工單狀態
    private Integer status;
    //工單緣由
    private String reason;
    //操做用戶
    private Integer userid;
}

3.3 調度線程池的使用

//聲明線程池,大小爲16
private ScheduledExecutorService pool = Executors.newScheduledThreadPool(16);

...略
while (true){ //從隊列獲取數據,交給定時器執行 try { WorkOrderMessage message = MessageQueue.getMessageFromNext(); long excueTime = message.getUpdateTime()+message.getCallbackType().getIntervalTime()* 1000; long t = excueTime - System.currentTimeMillis(); if (t/1000 < 5) {//5s以內將要執行的數據提交給調度線程池 System.out.println("MessageHandleNext-知足定時器執行條件"+JSONObject.toJSONString(message)); pool.schedule(new Callable<Boolean>() { @Override public Boolean call() throws Exception { remoteCallback(message); return true; } }, t, TimeUnit.MILLISECONDS); }else { MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { System.out.println(e); } }

 

3.4 retrofit2的使用,方便好用。

具體可查看官網相關文檔進行了解,用起來仍是比較方便的。http://square.github.io/retrofit/

retrofit初始化:

import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

public class RetrofitHelper {

    private static final String HTTP_URL = "http://baidu.com/";
    private static Retrofit retrofit;

    public static Retrofit instance(){
        if (retrofit == null){
            retrofit = new Retrofit.Builder()
                    .baseUrl(HTTP_URL)
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
}

若是須要修改超時時間,鏈接時間等能夠這樣初始話,Retrofit採用OkHttpClient

import okhttp3.OkHttpClient;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

import java.util.concurrent.TimeUnit;

public class RetrofitHelper {

    private static final String HTTP_URL = "http://baidu.com/";
    private static Retrofit retrofit;

    public static Retrofit instance(){
        if (retrofit == null){
            retrofit = new Retrofit.Builder()
                    .baseUrl(HTTP_URL)
                    .client(new OkHttpClient.Builder()
                            .connectTimeout(30, TimeUnit.SECONDS)//鏈接時間
                            .readTimeout(30, TimeUnit.SECONDS)//讀時間
                            .writeTimeout(30, TimeUnit.SECONDS)//寫時間
                            .build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
}

Retrofit使用經過接口調用,要先聲明一個接口;

import com.alibaba.fastjson.JSONObject;
import com.woasis.callbackdemo.bean.WorkOrderMessage;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.POST;

public interface WorkOrderMessageInterface {

    @POST("/api")
    Call<JSONObject> updateBatteryInfo(@Body WorkOrderMessage message);

}

接口和實例對象準備好了,接下來就是調用;

private void remoteCallback(WorkOrderMessage message){
        //實例接口對象
        WorkOrderMessageInterface workOrderMessageInterface = RetrofitHelper.instance().create(WorkOrderMessageInterface.class);
        
        //調用接口方法
        Call<JSONObject> objectCall = workOrderMessageInterface.updateBatteryInfo(message);
        System.out.println("遠程調用執行:"+new Date());

        //異步調用執行
        objectCall.enqueue(new Callback<JSONObject>() {
            @Override
            public void onResponse(Call<JSONObject> call, Response<JSONObject> response) {
                System.out.println("MessageHandleNext****調用成功"+Thread.currentThread().getId());
                message.setSuccess(true);
                System.out.println("MessageHandleNext-回調成功"+JSONObject.toJSONString(message));
            }

            @Override
            public void onFailure(Call<JSONObject> call, Throwable throwable) {
                System.out.println("MessageHandleNext++++調用失敗"+Thread.currentThread().getId());
                //失敗後再將數據放入隊列
                try {
                    //對回調策略初始化
                    long currentTime = System.currentTimeMillis();
                    message.setUpdateTime(currentTime);
                    message.setSuccess(false);
                    CallbackType callbackType = message.getCallbackType();
                    //獲取等級
                    int level = CallbackType.getLevel(callbackType);
                    //獲取次數
                    int count = CallbackType.getCount(callbackType);
                    //若是等級已經最高,則再也不回調
                    if (CallbackType.HOUR_6.getLevel() == callbackType.getLevel() && count == message.getCount()){
                        System.out.println("MessageHandleNext-等級最高,再也不回調, 線下處理:"+JSONObject.toJSONString(message));
                    }else {
                        //看count是否最大,count次數最大則增長level
                        if (message.getCount()<callbackType.getCount()){
                            message.setCount(message.getCount()+1);
                        }else {//若是不小,則增長level
                            message.setCount(1);
                            level += 1;
                            message.setCallbackType(CallbackType.getTypeByLevel(level));
                        }
                        MessageQueue.putMessageToNext(message);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("MessageHandleNext-放入隊列數據失敗");
                }
            }
        });
    }

3.5結果實現

4.總結

本次實現了按照時間梯度去相應其餘系統的接口,再也不致使自己業務因其餘系統的異常而阻塞。請大佬們看實現有沒有不合理的地方,歡迎批評指正。

源碼:https://github.com/liuzwei/callback-demo

相關文章
相關標籤/搜索