ListenableFuture in Guava

ListenableFuture的說明 

  併發編程是一個難題,可是一個強大而簡單的抽象能夠顯著的簡化併發的編寫。出於這樣的考慮,Guava 定義了 ListenableFuture接口並繼承了JDK concurrent包下的Future 接口,ListenableFuture 容許你註冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用,  或者在運算(多線程執行)完成後當即執行。這樣簡單的改進,使得能夠明顯的支持更多的操做,這樣的功能在JDK concurrent中的Future是不支持的。 在高併發而且須要大量Future對象的狀況下,推薦儘可能使用ListenableFuture來代替..java

  ListenableFuture 中的基礎方法是addListener(Runnable, Executor), 該方法會在多線程運算完的時候,在Executor中執行指定的Runnable。編程

ListenableFuture的建立和使用

  對應JDK中的 ExecutorService.submit(Callable) 提交多線程異步運算的方式,Guava 提供了ListeningExecutorService 接口, 該接口返回 ListenableFuture, 而相應的ExecutorService 返回普通的 Future。將 ExecutorService 轉爲 ListeningExecutorService,可使用MoreExecutors.listeningDecorator(ExecutorService)進行裝飾。舉例說明:多線程

  

ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

而後咱們能夠向這個ListeningExecutorService提交Callable任務併發

複製代碼
final ListenableFuture<String> future =  pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000*3);
                return     "Task done !";
            }
        });
複製代碼

而後咱們添加Listener:異步

複製代碼
future.addListener(new Runnable() {
                @Override
                public void run() {
                    try {
                        final String contents = future.get();
                        System.out.println(contents);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }, MoreExecutors.sameThreadExecutor());
複製代碼

咱們看看上面的代碼,確實不怎麼優雅,咱們須要處理拋出的異常,須要本身經過future.get()得到前面計算的值。有沒有更加簡便的方法呢?固然有,Guava提供了一個簡便方法來替代上面的寫法:ide

複製代碼
Futures.addCallback(future, new FutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                    System.out.println(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    t.printStackTrace();
                }
            });
複製代碼

完成代碼以下:高併發

複製代碼
package concurrency;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * Created by hupeng on 2014/9/24.
 */
public class ListenableFutureTest {


    public static void main(String[] args) throws InterruptedException {
        ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        final ListenableFuture<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000 * 2);
                return "Task done !";
            }
        });

//            future.addListener(new Runnable() {
//                @Override
//                public void run() {
//                    try {
//                        final String contents = future.get();
//                        System.out.println(contents);
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    } catch (ExecutionException e) {
//                        e.printStackTrace();
//                    }
//                }
//            }, MoreExecutors.sameThreadExecutor());

        Futures.addCallback(future, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println(result);
            }

            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        });

        Thread.sleep(5 * 1000);  //wait for task done

        pool.shutdown();
    }
}
複製代碼
相關文章
相關標籤/搜索