等待並轉化future
)@Test public void testThen() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { return "zero"; }, executor); CompletableFuture<Integer> f2 = f1.thenApply(new Function<String, Integer>() { @Override public Integer apply(String t) { System.out.println(2); return Integer.valueOf(t.length()); } }); CompletableFuture<Double> f3 = f2.thenApply(r -> r * 2.0); System.out.println(f3.get()); }
監聽future完成
)/** * future完成處理,可獲取結果 */ @Test public void testThenAccept(){ CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { return "zero"; }, executor); f1.thenAccept(e -> { System.out.println("get result:"+e); }); } /** * future完成處理 */ @Test public void testThenRun(){ CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { return "zero"; }, executor); f1.thenRun(new Runnable() { @Override public void run() { System.out.println("finished"); } }); }
flatMap future
)/** * compose至關於flatMap,避免CompletableFuture<CompletableFuture<String>>這種 * @throws ExecutionException * @throws InterruptedException */ @Test public void testThenCompose() throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(5); CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { return "zero"; }, executor); CompletableFuture<CompletableFuture<String>> f4 = f1.thenApply(CompletableFutureTest::calculate); System.out.println("f4.get:"+f4.get().get()); CompletableFuture<String> f5 = f1.thenCompose(CompletableFutureTest::calculate); System.out.println("f5.get:"+f5.get()); System.out.println(f1.get()); } public static CompletableFuture<String> calculate(String input) { ExecutorService executor = Executors.newFixedThreadPool(5); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println(input); return input + "---" + input.length(); }, executor); return future; }
thenCombine(組合兩個future,有返回值
)html
/** * thenCombine用於組合兩個併發的任務,產生新的future有返回值 * @throws ExecutionException * @throws InterruptedException */ @Test public void testThenCombine() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f1 start to sleep at:"+System.currentTimeMillis()); Thread.sleep(1000); System.out.println("f1 finish sleep at:"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } return "zero"; }, executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f2 start to sleep at:"+System.currentTimeMillis()); Thread.sleep(3000); System.out.println("f2 finish sleep at:"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }, executor); CompletableFuture<String> reslutFuture = f1.thenCombine(f2, new BiFunction<String, String, String>() { @Override public String apply(String t, String u) { System.out.println("f3 start to combine at:"+System.currentTimeMillis()); return t.concat(u); } }); System.out.println(reslutFuture.get());//zerohello System.out.println("finish combine at:"+System.currentTimeMillis()); }
thenAcceptBoth(組合兩個future,沒有返回值
)java
/** * thenAcceptBoth用於組合兩個併發的任務,產生新的future沒有返回值 * @throws ExecutionException * @throws InterruptedException */ @Test public void testThenAcceptBoth() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f1 start to sleep at:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1); System.out.println("f1 stop sleep at:"+System.currentTimeMillis()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "zero"; }, executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f2 start to sleep at:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(3); System.out.println("f2 stop sleep at:"+System.currentTimeMillis()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "hello"; }, executor); CompletableFuture<Void> reslutFuture = f1.thenAcceptBoth(f2, new BiConsumer<String, String>() { @Override public void accept(String t, String u) { System.out.println("f3 start to accept at:"+System.currentTimeMillis()); System.out.println(t + " over"); System.out.println(u + " over"); } }); System.out.println(reslutFuture.get()); System.out.println("finish accept at:"+System.currentTimeMillis()); }
applyToEither(取2個future中最早返回的,有返回值
)編程
/** * 當任意一個CompletionStage 完成的時候,fn 會被執行,它的返回值會當作新的CompletableFuture<U>的計算結果 * @throws ExecutionException * @throws InterruptedException */ @Test public void testApplyToEither() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f1 start to sleep at:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(5); System.out.println("f1 stop sleep at:"+System.currentTimeMillis()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "fromF1"; }, executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f2 start to sleep at:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(2); System.out.println("f2 stop sleep at:"+System.currentTimeMillis()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "fromF2"; }, executor); CompletableFuture<String> reslutFuture = f1.applyToEither(f2,i -> i.toString()); System.out.println(reslutFuture.get()); //should not be null , wait for complete }
acceptEither(取2個future中最早返回的,無返回值
)併發
/** * 取其中返回最快的一個 * 當任意一個CompletionStage 完成的時候,action 這個消費者就會被執行。這個方法返回 CompletableFuture<Void> */ @Test public void testAcceptEither() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f1 start to sleep at:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(3); System.out.println("f1 stop sleep at:"+System.currentTimeMillis()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "zero"; }, executor); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { System.out.println("f2 start to sleep at:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(5); System.out.println("f2 stop sleep at:"+System.currentTimeMillis()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return "hello"; }, executor); CompletableFuture<Void> reslutFuture = f1.acceptEither(f2,r -> { System.out.println("quicker result:"+r); }); reslutFuture.get(); //should be null , wait for complete }
allOf(等待全部future返回
)app
/** * 等待多個future返回 */ @Test public void testAllOf() throws InterruptedException { List<CompletableFuture<String>> futures = IntStream.range(1,10) .mapToObj(i -> longCost(i)).collect(Collectors.toList()); final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})); allCompleted.thenRun(() -> { futures.stream().forEach(future -> { try { System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }); Thread.sleep(100000); //wait }
anyOf(取多個future當中最快的一個返回
)ide
/** * 等待多個future當中最快的一個返回 * @throws InterruptedException */ @Test public void testAnyOf() throws InterruptedException { List<CompletableFuture<String>> futures = IntStream.range(1,10) .mapToObj(i -> longCost(i)).collect(Collectors.toList()); final CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{})); firstCompleted.thenAccept((Object result) -> { System.out.println("get at:"+System.currentTimeMillis()+",first result:"+result); }); } private CompletableFuture<String> longCost(long i){ return CompletableFuture.supplyAsync(() -> { try { System.out.println("f"+i+" start to sleep at:"+System.currentTimeMillis()); Thread.sleep(3000); System.out.println("f"+i+" stop sleep at:"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } return String.valueOf(i); },executor); }