1 package com.study1; 2 3 import java.util.Random; 4 import java.util.concurrent.Callable; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.locks.LockSupport; 7 8 import com.google.common.util.concurrent.FutureCallback; 9 import com.google.common.util.concurrent.Futures; 10 import com.google.common.util.concurrent.ListenableFuture; 11 import com.google.common.util.concurrent.ListeningExecutorService; 12 import com.google.common.util.concurrent.MoreExecutors; 13 /** 14 * GUAVA ListenableFuture 15 * @author gaojy 16 * 17 */ 18 public class TestListenableFuture { 19 //定義一個線程池,用於處理全部任務 20 final static ListeningExecutorService service 21 = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 22 23 public static void main(String[] args){ 24 Long t1 = System.currentTimeMillis(); 25 // 任務1 26 ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() { 27 @Override 28 public Boolean call() throws Exception { 29 Thread.sleep(10000); 30 return true; 31 } 32 }); 33 34 /** 35 * 36 */ 37 Futures.addCallback(booleanTask, new FutureCallback<Boolean>() { 38 @Override 39 public void onSuccess(Boolean result) { 40 System.err.println("BooleanTask: " + result); 41 } 42 43 @Override 44 public void onFailure(Throwable t) { 45 } 46 }); 47 48 // 任務2 49 ListenableFuture<String> stringTask = service.submit(new Callable<String>() { 50 @Override 51 public String call() throws Exception { 52 Thread.sleep(10000); 53 return "Hello World"; 54 } 55 }); 56 57 Futures.addCallback(stringTask, new FutureCallback<String>() { 58 @Override 59 public void onSuccess(String result) { 60 System.err.println("StringTask: " + result); 61 } 62 63 @Override 64 public void onFailure(Throwable t) { 65 } 66 }); 67 68 // 任務3 69 ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() { 70 @Override 71 public Integer call() throws Exception { 72 Thread.sleep(10000); 73 return new Random().nextInt(100); 74 } 75 }); 76 77 Futures.addCallback(integerTask, new FutureCallback<Integer>() { 78 @Override 79 public void onSuccess(Integer result) { 80 try { 81 Thread.sleep(5000); 82 } catch (InterruptedException e) { 83 e.printStackTrace(); 84 } 85 System.err.println("IntegerTask: " + result); 86 } 87 88 @Override 89 public void onFailure(Throwable t) { 90 } 91 }); 92 93 // 執行時間 94 System.err.println("time: " + (System.currentTimeMillis() - t1)); 95 96 } 97 }
在26行中ListeningExecutorService的submit()方法:java
public ListenableFuture submit(Runnable task, Object result)
{
// 初始化了ListenableFutureTask對象
ListenableFutureTask ftask = ListenableFutureTask.create(task, result); //執行task,實際上調用了Callable對象的call()方法 execute(ftask); return ftask; }
再來查看一下Futures.addCallback的方法:mysql
public static void addCallback(ListenableFuture future, FutureCallback callback, Executor executor)
{
//對回調task進行檢查
Preconditions.checkNotNull(callback); //建立一個新的Runnable對象,並放到一個指定的線程中,執行。 //這個Runnable對象主要任務就是得到future的結果,並根據結果調用回調函數的相應方法 Runnable callbackListener = new Runnable(future, callback) { public void run() { Object value; try { //獲取future執行結果,內部調用future.get(),產生堵塞,而不影響main主線程的執行,當獲取到value時,就調用callback的onSuccess()方法 value = Uninterruptibles.getUninterruptibly(future); } catch(ExecutionException e) { callback.onFailure(e.getCause()); return; } catch(RuntimeException e) { callback.onFailure(e); return; } catch(Error e) { callback.onFailure(e); return; } callback.onSuccess(value); } final ListenableFuture val$future; final FutureCallback val$callback; { future = listenablefuture; callback = futurecallback; super(); } } ; future.addListener(callbackListener, executor); }