1. Maven配置
<dependency> <groupId>com.linkedin.parseq</groupId> <artifactId>parseq</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>com.linkedin.parseq</groupId> <artifactId>parseq-http-client</artifactId> <version>2.6.3</version> </dependency>
2. 建立以及關閉線程池引擎Engine
private static ExecutorService taskService; private static ScheduledExecutorService timerService; private static Engine engine; private static JsonMapper mapper = new JsonMapper(); static { int numCores = Runtime.getRuntime().availableProcessors(); //可伸縮的線程池 taskService = new ThreadPoolExecutor(numCores, numCores * 2, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new CallerRunsPolicy()); timerService = Executors.newScheduledThreadPool(numCores); engine = new EngineBuilder().setTaskExecutor(taskService).setTimerScheduler(timerService).build(); } public static void shutdown() { try { if (engine != null) { log.info("shutdown engine"); engine.shutdown(); engine.awaitTermination(3, TimeUnit.SECONDS); } if (taskService != null) { log.info("shutdown taskService"); taskService.shutdown(); } if (timerService != null) { log.info("shutdown timerService"); timerService.shutdown(); } } catch (InterruptedException e) { e.printStackTrace(); } }
3. 建立簡單請求任務Task
/** * 建立Post任務 * * @param taskName * 任務名稱 * @param url * 目標連接 * @param params * 查詢參數 * @param headers * 報頭 * @param body * 報文 * @return */ public static Task<String> createPostTask(String taskName, String url, List<Param> params, Map<String, String> headers, String body) { //設置重試機制 Task<String> reusableTask = Task.withRetryPolicy(createRetryPolicy(), () -> { //final WrappedRequestBuilder builder = HttpClient.get(url); //Get請求 final WrappedRequestBuilder builder = HttpClient.post(url); if (body != null) builder.setBody(body); if (headers != null) headers.entrySet().forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue())); if (params != null) builder.addQueryParams(params); return builder.task().map(taskName, Response::getResponseBody).withTimeout(5, TimeUnit.SECONDS); }).recover(e -> "{\"success:\":false}"); return reusableTask; }
4.合併簡單任務,以及任務結果處理
回調接口,用以合併處理請求結果java
public interface TaskResultHandler { //SpringMvc異步請求 default void setResult(DeferredResult<GwResult> deferredResult, String result) { deferredResult.setResult(result); } } //函數式接口註釋 @FunctionalInterface public interface TaskResultHandler1 extends TaskResultHandler { String handle(String result); } @FunctionalInterface public interface TaskResultHandler2 extends TaskResultHandler { String handle(String result1, String result2); } // ..................n個TaskResultHandler.................
任務的合併app
/** * 合併任務 * * @param taskName * 任務名稱 * @param task1 * @param task2 * @param handler * 任務結果處理 * @return */ public static Task<String> merge(String taskName, Task<String> task1, Task<String> task2, TaskResultHandler2 handler) { return Task.par(task1, task2) //合併處理兩個任務結果 .map(taskName, (result1, result2) -> handler.handle(result1, result2)); } public static Task<String> merge(String taskName, Task<String> task1, Task<String> task2, Task<String> task3, TaskResultHandler3 handler) { return Task.par(task1, task2, task3) //合併處理三個任務結果 .map(taskName,(result1, result2, result3) -> handler.handle(result1, result2, result3)); } // ..................n個merge方法.................
任務開跑!異步
public static void run(Task<String> task, TaskResultHandler1 handler, DeferredResult<GwResult> deferredResult) { engine.run(task.map("runTask1", (result) -> handler.handle(result)) .andThen(result -> { if (deferredResult != null) handler.setResult(deferredResult, result); else log.info(result); }).recover(e -> { //輸出系統錯誤結果 deferredResult.setResult(gwResult); return mapper.toJson(gwResult); })); } public static void run(Task<String> task1, Task<String> task2, TaskResultHandler2 handler, DeferredResult<GwResult> deferredResult) { Task<String> tasks = merge("runTask2", task1, task2, handler) //setResultElsePrintLog與上面的方法相同 .andThen(setResultElsePrintLog(handler, deferredResult)) ////recoverHandle與上面的方法相同 .recover(recoverHandle(deferredResult)); engine.run(tasks); } // ..................n個run方法.................
5.異常處理以及重試機制
private static RetryPolicy createRetryPolicy() { return new RetryPolicyBuilder().setTerminationPolicy(TerminationPolicy.limitAttempts(3)) // .setErrorClassifier(null) .build(); }