LinkedIn的Parseq + ning.httpclient異步請求框架的使用

1. Maven配置

<dependency>
    <groupId>com.linkedin.parseq</groupId>
    <artifactId>parseq</artifactId>
    <version>2.6.3</version>
</dependency>
<dependency>
    <groupId>com.linkedin.parseq</groupId>
    <artifactId>parseq-http-client</artifactId>
    <version>2.6.3</version>
</dependency>

2. 建立以及關閉線程池引擎Engine

private static ExecutorService taskService;
private static ScheduledExecutorService timerService;
private static Engine engine;
private static JsonMapper mapper = new JsonMapper();

static {
    int numCores = Runtime.getRuntime().availableProcessors();
    //可伸縮的線程池
    taskService = new ThreadPoolExecutor(numCores, numCores * 2, 30, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(100), new CallerRunsPolicy());

    timerService = Executors.newScheduledThreadPool(numCores);
    engine = new EngineBuilder().setTaskExecutor(taskService).setTimerScheduler(timerService).build();
}

public static void shutdown() {
    try {
        if (engine != null) {
            log.info("shutdown engine");
                engine.shutdown();
                    engine.awaitTermination(3, TimeUnit.SECONDS);
        }
        if (taskService != null) {
            log.info("shutdown taskService");
            taskService.shutdown();
        }
        if (timerService != null) {
            log.info("shutdown timerService");
            timerService.shutdown();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

3. 建立簡單請求任務Task

/**
	 * 建立Post任務
	 * 
	 * @param taskName
	 *            任務名稱
	 * @param url
	 *            目標連接
	 * @param params
	 *            查詢參數
	 * @param headers
	 *            報頭
	 * @param body
	 *            報文
	 * @return
	 */
public static Task<String> createPostTask(String taskName, String url, List<Param> params, Map<String, String> headers, String body) {
                                    //設置重試機制
    Task<String> reusableTask = Task.withRetryPolicy(createRetryPolicy(), () -> {
        //final WrappedRequestBuilder builder = HttpClient.get(url); //Get請求
        final WrappedRequestBuilder builder = HttpClient.post(url);
        if (body != null)
            builder.setBody(body);
        if (headers != null)
            headers.entrySet().forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
        if (params != null)
            builder.addQueryParams(params);
        return builder.task().map(taskName, Response::getResponseBody).withTimeout(5, TimeUnit.SECONDS);
    }).recover(e -> "{\"success:\":false}");
    return reusableTask;
}

4.合併簡單任務,以及任務結果處理

回調接口,用以合併處理請求結果java

public interface TaskResultHandler {
                        //SpringMvc異步請求
    default void setResult(DeferredResult<GwResult> deferredResult, String result) {
        deferredResult.setResult(result);
    }
}
//函數式接口註釋
@FunctionalInterface
public interface TaskResultHandler1 extends TaskResultHandler {
    String handle(String result);
}
@FunctionalInterface
public interface TaskResultHandler2 extends TaskResultHandler {
    String handle(String result1, String result2);
}

// ..................n個TaskResultHandler.................

任務的合併app

/**
	 * 合併任務
	 * 
	 * @param taskName
	 *            任務名稱
	 * @param task1
	 * @param task2
	 * @param handler
	 *            任務結果處理
	 * @return
	 */
public static Task<String> merge(String taskName, Task<String> task1, Task<String> task2, TaskResultHandler2 handler) {
    return Task.par(task1, task2)
        //合併處理兩個任務結果
        .map(taskName, (result1, result2) -> handler.handle(result1, result2));
}

public static Task<String> merge(String taskName, Task<String> task1, Task<String> task2, Task<String> task3,
			TaskResultHandler3 handler) {
    return Task.par(task1, task2, task3)
         //合併處理三個任務結果
        .map(taskName,(result1, result2, result3) -> handler.handle(result1, result2, result3));
}

// ..................n個merge方法.................

任務開跑!異步

public static void run(Task<String> task, TaskResultHandler1 handler, DeferredResult<GwResult> deferredResult) {
    engine.run(task.map("runTask1", (result) -> handler.handle(result))
            .andThen(result -> {
                if (deferredResult != null)
                    handler.setResult(deferredResult, result);
                else
                    log.info(result);
             }).recover(e -> {
                    //輸出系統錯誤結果
                    deferredResult.setResult(gwResult);
                    return mapper.toJson(gwResult);
            }));
}

public static void run(Task<String> task1, Task<String> task2, TaskResultHandler2 handler,
			DeferredResult<GwResult> deferredResult) {
    Task<String> tasks = merge("runTask2", task1, task2, handler)
        //setResultElsePrintLog與上面的方法相同
        .andThen(setResultElsePrintLog(handler, deferredResult))
        ////recoverHandle與上面的方法相同
        .recover(recoverHandle(deferredResult));
    engine.run(tasks);
}

// ..................n個run方法.................

5.異常處理以及重試機制

private static RetryPolicy createRetryPolicy() {
    return new RetryPolicyBuilder().setTerminationPolicy(TerminationPolicy.limitAttempts(3))
        // .setErrorClassifier(null)
        .build();
}
相關文章
相關標籤/搜索