Futures工具類使用

Futures是guava提供的工具類,全類名是com.google.common.util.concurrent.Futures。配合MoreExecutors使用,效果極佳。java

主要方法以下:異步

一、addCallback()方法:async

public static void addCallback(ListenableFuture future, FutureCallback callback, Executor executor):給ListenableFuture實例添加一個回調,做用等同於調用ListenableFuture實例的addListener(Runnable listener, Executor executor)方法。ide

示例:函數

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ListenableFuture<Integer> future = listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } }, 1); Futures.addCallback(future, new FutureCallback<Integer>() { @Override public void onSuccess(Integer result) { System.out.println(Thread.currentThread().getName() + "@" + result); } @Override public void onFailure(Throwable t) { System.out.println(Thread.currentThread().getName() + "@" + t.getMessage()); } }, threadPoolExecutor); System.out.println(Thread.currentThread().getName() + "@888"); }

ExecutorService對應Future,ListeningExecutorService對應ListenableFuture。工具

二、allAsList()方法的兩個重載:ui

public static ListenableFuture<List<V>> allAsList(ListenableFuture<V>... futures)google

public static ListenableFuture<List<V>> allAsList(Iterable<ListenableFuture<V>> futures)spa

示例:線程

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ListenableFuture<List<Integer>> mergedListenableFuture = Futures.allAsList( Lists.newArrayList( listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } }, 1), listeningExecutorService.submit(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "@888"); } catch (InterruptedException e) { e.printStackTrace(); } }, 2) ) ); try { List<Integer> resultList = mergedListenableFuture.get(); System.out.println(resultList); } catch (Exception e) { e.printStackTrace(); } Futures.addCallback(mergedListenableFuture, new FutureCallback<List<Integer>>() { @Override public void onSuccess(List<Integer> result) { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ", success callback"); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onFailure(Throwable t) { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ", " + t.getMessage()); } catch (InterruptedException e) { e.printStackTrace(); } } }, threadPoolExecutor); }

能夠用來把多個ListenableFuture實例合併成一個ListenableFuture實例,組合的ListenableFuture實例的get()方法返回一個集合,集合中的元素是以前各ListenableFuture實例的get()方法返回值,且元素順序同allAsList()方法入參Future實例對應。假如對這個組合的ListenableFuture實例添加回調,則回調會在原來全部ListenableFuture實例都done以後才執行。一樣,假如某一個ListenableFuture實例對應任務拋異常,則組合的ListenableFuture實例的get()也會拋異常。

三、successfulAsList()方法的兩個重載:

public static ListenableFuture<List<V>> successfulAsList(ListenableFuture<V>... futures)

public static ListenableFuture<List<V>> successfulAsList(Iterable<ListenableFuture<V>> futures)

successfulAsList()方法和allAsList()方法有一點區別,就是組合的ListenableFuture實例的get()方法永遠不會拋異常,即便以前某ListenableFuture實例對應的任務拋異常。若是某任務拋異常,則get()方法返回的集合中對應位置的值爲null。極端狀況下,get()方法會返回一個純null的集合。

示例:

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ListenableFuture<List<Integer>> mergedListenableFuture = Futures.successfulAsList( Lists.newArrayList( listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Lists.newArrayList(1, 2).get(3)); }, 1), listeningExecutorService.submit(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "@888"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("".substring(2)); }, 2) ) ); try { List<Integer> resultList = mergedListenableFuture.get(); System.out.println(resultList); } catch (Exception e) { e.printStackTrace(); } Futures.addCallback(mergedListenableFuture, new FutureCallback<List<Integer>>() { @Override public void onSuccess(List<Integer> result) { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ", success callback"); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onFailure(Throwable t) { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ", " + t.getMessage()); } catch (InterruptedException e) { e.printStackTrace(); } } }, threadPoolExecutor); }

四、whenAllComplete()方法的兩個重載:

public static FutureCombiner<V> whenAllComplete(ListenableFuture<V>... futures)

public static FutureCombiner<V> whenAllComplete(Iterable<ListenableFuture<V>> futures)

當全部ListenableFuture實例都執行完後,作一些操做,其中一些ListenableFuture實例對應任務拋異常也沒關係,不影響接下來要作的事情。

返回的FutureCombiner實例,有三個實例方法可使用,返回值都是ListenableFuture類型,利用這個特性還能夠實現鏈式異步操做。異步1執行完後執行異步2,異步2完成以後執行異步3,只要須要,就能夠一直這麼鏈式下去。

FutureCombiner經常使用實例方法:

public ListenableFuture<C> call(Callable<C> combiner, Executor executor)

public ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor)

public ListenableFuture<?> run(Runnable combiner, Executor executor)

示例:

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ListenableFuture<Integer> listenableFuture1 = Futures.whenAllComplete( listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Lists.newArrayList(1, 2).get(3)); }, 1), listeningExecutorService.submit(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "@888"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("".substring(2)); }, 2) ).call(() -> { System.out.println(123); return 1; }, threadPoolExecutor); Futures.whenAllComplete(listenableFuture1).call(() -> { System.out.println(456); return 2; }, threadPoolExecutor); }

五、whenAllSucceed()方法的兩個重載

public static FutureCombiner<V> whenAllSucceed(ListenableFuture<V>... futures)

public static FutureCombiner<V> whenAllSucceed(Iterable<ListenableFuture<V>> futures)

whenAllSucceed()方法和whenAllComplete()方法有一點區別,就是若是入參某個實例對應任務拋異常,則返回值FutureCombiner實例的call()方法或者run()方法入參的任務不會執行,也不拋異常。

示例:

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); Futures.FutureCombiner futureCombiner = Futures.whenAllSucceed( listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } }, 1), listeningExecutorService.submit(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "@888"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("".substring(2)); }, 2) ); futureCombiner.call(() -> { System.out.println(123); return 1; }, threadPoolExecutor); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println(456); }

本例因爲whenAllSucceed()方法第二個入參ListenableFuture實例對應的任務會拋異常,因此FutureCombiner實例的call()方法的任務不會執行,故不會打印123。

六、catching開頭的兩個方法:

public static ListenableFuture<V> catching(ListenableFuture<V> input, Class<X> exceptionType, Function<X, V> fallback, Executor executor)

注意,這裏的Function不是jdk的java.util.function.Function,而是guava的Function,在base 子package中,全類名是com.google.common.base.Function。

public static ListenableFuture<V> catchingAsync(ListenableFuture<V> input, Class<X> exceptionType, AsyncFunction<X, V> fallback, Executor executor)

當ListenableFuture實例對應的任務拋異常時,假如拋出的異常是指定的類型,則能夠執行planB。

示例:

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ListenableFuture<List<Integer>> mergedListenableFuture = Futures.allAsList( Lists.newArrayList( listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } }, 1), listeningExecutorService.submit(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "@888"); System.out.println(Thread.currentThread().getName() + "".substring(2)); } catch (InterruptedException e) { e.printStackTrace(); } }, 2) ) ); ListenableFuture<List<Integer>> withFallbackListenableFuture = Futures.catching(mergedListenableFuture, StringIndexOutOfBoundsException.class, input -> getBackUpList(), threadPoolExecutor ); try { System.out.println(withFallbackListenableFuture.get()); } catch (Exception e) { e.printStackTrace(); } } private static List<Integer> getBackUpList() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return new ArrayList<>(); }

只有當input拋出指定異常時,纔會執行fallback方法。若是fallback方法也拋了異常,則最終ListenableFuture實例的get()方法會拋異常。

catchingAsync()方法,第三個參數是AsyncFunction實例,AsyncFunction也是個函數式接口,只是這個接口的方法的返回值必須是ListenableFuture類型,用起來沒有catching()方法方便。

七、public static ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<ListenableFuture<T>> futures)

返回一個不可變的ListenableFuture實例的集合,集合中元素順序和各ListenableFuture實例執行完的順序一致

示例:

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ImmutableList<ListenableFuture<Integer>> listenableFutureList = Futures.inCompletionOrder(Lists.newArrayList( listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } }, 1), listeningExecutorService.submit(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "@888"); } catch (InterruptedException e) { e.printStackTrace(); } }, 2) ) ); listenableFutureList.forEach(p -> { try { System.out.println(Thread.currentThread().getName() + p.get()); } catch (Exception e) { e.printStackTrace(); } }); }

八、transform()相關的三個方法:

public static ListenableFuture<O> transform(ListenableFuture<I> input, Function<I, O> function, Executor executor)

public static ListenableFuture<O> transformAsync(ListenableFuture<I> input, AsyncFunction<I, O> function, Executor executor)

public static Future<O> lazyTransform(Future<I> input, Function<I, O> function)

transform()方法和transformAsync()方法,返回一個ListenableFuture實例,其結果是由入參ListenableFuture實例的結果經過入參Function實例計算得出。若是入參ListenableFuture實例對應的任務拋異常,則返回的ListenableFuture實例也會拋一樣的異常,Function實例不會執行。若是入參ListenableFuture實例對應的任務被取消,則返回的ListenableFuture實例也會被取消。若是返回的ListenableFuture實例被取消,則入參ListenableFuture實例也會被取消。

lazyTransform()方法比較特殊, 入參Function實例不會主動執行,只有在返回的Future實例的get()方法被調用時,Function實例纔會執行,可是這樣又會阻塞當前主線程。因此這個方法不是很實用。

示例:

public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); ListenableFuture<Integer> oriListenableFuture = listeningExecutorService.submit(() -> { try { Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + "@666"); } catch (InterruptedException e) { e.printStackTrace(); } }, 1); Future future = Futures.lazyTransform(oriListenableFuture, input -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "@888"); } catch (InterruptedException e) { e.printStackTrace(); } return String.valueOf(input); }); future.get(); System.out.println(456); }

九、scheduleAsync()方法的兩個重載:指定多少時間後執行任務,任務只會執行一次。

public static ListenableFuture<O> scheduleAsync(AsyncCallable<O> callable, long delay, TimeUnit timeUnit, ScheduledExecutorService executorService)

public static ListenableFuture<O> scheduleAsync(AsyncCallable<O> callable, Duration delay, ScheduledExecutorService executorService)

AsyncCallable也是個函數式接口,無入參,出參是一個ListenableFuture實例。

示例:

public static void main(String[] args) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build(); ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); Futures.scheduleAsync(() -> { ListenableFuture sf = MoreExecutors.listeningDecorator(scheduledExecutorService).submit(() -> { System.out.println(Thread.currentThread().getName() + "@" + System.currentTimeMillis()); }); return sf; }, 5, TimeUnit.SECONDS, scheduledExecutorService); }

上例中,5s後會打印一次,只打印一次。

若要想真的定時任務,應該怎麼寫呢?好比說,要求每5s打印一次。

示例:

 

十、withTimeout()方法的兩個重載:

相關文章
相關標籤/搜索