GUAVA-ListenableFuture實現回調

隨着軟件開發的不斷進步,在實際的開發應用中,可能一次請求須要查詢若干次數據庫或者調用若干次第三方,按照傳統的串行執行的話,會大大增長響應時間,沒法知足業務需求,更沒法知足用戶迫切須要響應迅速的願望。對此,咱們須要針對網絡請求或內部調用中包含的「多任務」進行異步處理,並行去執行這些「任務」,這樣就就會大大減少響應時間。本文是基於guava中的ListenableFuture來實現的。

 測試代碼:

 1 package com.study1;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.Callable;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.locks.LockSupport;
 7 
 8 import com.google.common.util.concurrent.FutureCallback;
 9 import com.google.common.util.concurrent.Futures;
10 import com.google.common.util.concurrent.ListenableFuture;
11 import com.google.common.util.concurrent.ListeningExecutorService;
12 import com.google.common.util.concurrent.MoreExecutors;
13 /**
14  * GUAVA  ListenableFuture
15  * @author gaojy
16  *
17  */
18 public class TestListenableFuture {
19     //定義一個線程池,用於處理全部任務
20     final static ListeningExecutorService service 
21                 = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
22 
23     public static void main(String[] args){
24         Long t1 = System.currentTimeMillis();
25         // 任務1  
26         ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
27             @Override
28             public Boolean call() throws Exception {
29                 Thread.sleep(10000);
30                 return true;
31             }
32         });
33 
34         /**
35          * 
36          */
37         Futures.addCallback(booleanTask, new FutureCallback<Boolean>() {
38             @Override
39             public void onSuccess(Boolean result) {
40                 System.err.println("BooleanTask: " + result);
41             }
42 
43             @Override
44             public void onFailure(Throwable t) {
45             }
46         });
47 
48         // 任務2
49         ListenableFuture<String> stringTask = service.submit(new Callable<String>() {
50             @Override
51             public String call() throws Exception {
52                 Thread.sleep(10000);
53                 return "Hello World";
54             }
55         });
56 
57         Futures.addCallback(stringTask, new FutureCallback<String>() {
58             @Override
59             public void onSuccess(String result) {
60                 System.err.println("StringTask: " + result);
61             }
62 
63             @Override
64             public void onFailure(Throwable t) {
65             }
66         });
67 
68         // 任務3
69         ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() {
70             @Override
71             public Integer call() throws Exception {
72                 Thread.sleep(10000);
73                 return new Random().nextInt(100);
74             }
75         });
76 
77         Futures.addCallback(integerTask, new FutureCallback<Integer>() {
78             @Override
79             public void onSuccess(Integer result) {
80                 try {
81                     Thread.sleep(5000);
82                 } catch (InterruptedException e) {
83                     e.printStackTrace();
84                 }
85                 System.err.println("IntegerTask: " + result);
86             }
87 
88             @Override
89             public void onFailure(Throwable t) {
90             }
91         });
92 
93         // 執行時間
94         System.err.println("time: " + (System.currentTimeMillis() - t1));
95         
96     }
97 }

 

測試結果:

源碼分析:

在26行中ListeningExecutorService的submit()方法:java

 public ListenableFuture submit(Runnable task, Object result)
    {
        // 初始化了ListenableFutureTask對象
        ListenableFutureTask ftask = ListenableFutureTask.create(task, result); //執行task,實際上調用了Callable對象的call()方法  execute(ftask); return ftask; }

再來查看一下Futures.addCallback的方法:mysql

 public static void addCallback(ListenableFuture future, FutureCallback callback, Executor executor)
    {
      //對回調task進行檢查
 Preconditions.checkNotNull(callback); //建立一個新的Runnable對象,並放到一個指定的線程中,執行。 //這個Runnable對象主要任務就是得到future的結果,並根據結果調用回調函數的相應方法 Runnable callbackListener = new Runnable(future, callback) { public void run() { Object value; try { //獲取future執行結果,內部調用future.get(),產生堵塞,而不影響main主線程的執行,當獲取到value時,就調用callback的onSuccess()方法 value = Uninterruptibles.getUninterruptibly(future); } catch(ExecutionException e) { callback.onFailure(e.getCause()); return; } catch(RuntimeException e) { callback.onFailure(e); return; } catch(Error e) { callback.onFailure(e); return; } callback.onSuccess(value); } final ListenableFuture val$future; final FutureCallback val$callback; { future = listenablefuture; callback = futurecallback; super(); } } ; future.addListener(callbackListener, executor); }
相關文章
相關標籤/搜索