Guava(併發)

併發:

  • Guava集合處理是很強大的(這些在jdk8中都有些引入),但Guava發光的地方是併發java

Monitor

  • Monitor實現同步編程

/**
 * 經過Monitor的Guard進行條件阻塞
 */
public class MonitorSample {
    private List<String> list = new ArrayList<String>();
    private static final int MAX_SIZE = 10;
    private Monitor monitor = new Monitor();
     
    private Monitor.Guard listBelowCapacity = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return list.size() < MAX_SIZE;
        }
    };
 
    public void addToList(String item) throws InterruptedException {
        monitor.enterWhen(listBelowCapacity); //Guard(形如Condition),不知足則阻塞,並且咱們並無在Guard進行任何通知操做
        try {
            list.add(item);
        } finally {
            monitor.leave();
        }
    }
}
  • Monitor就像java本土的synchronizedReentrantLock同樣,每次只運行一個線程佔用,且可重佔用,每一次佔用會對應一次退出佔用。併發

Monitor最佳實踐:

  • 就如上面,咱們經過if條件來判斷是否可進入Monitor代碼塊,並再try/finally中釋放:app

if (monitor.enterIf(guardCondition)) {
        try {
              doWork();
    } finally {
           monitor.leave();
       }
}

其餘的Monitor訪問方法:

Monitor.enter //進入Monitor塊,將阻塞其餘線程知道Monitor.leave
Monitor.tryEnter //嘗試進入Monitor塊,true表示能夠進入, false表示不能,而且不會一直阻塞
Monitor.tryEnterIf //根據條件嘗試進入Monitor塊

這些方法都有對應的限時版本。異步

ListenableFuture類

  • jdk5以後有了Future這種異步執行的結構ide

ExecutorService executor = Executors.newCachedThreadPool();
   Future<Integer> future = executor.submit(new Callable<Integer>(){
                                public Integer call() throws Exception{
                                   return service.getCount();
} });
//Retrieve the value of computation
Integer count = future.get();
  • ListenableFuture對Future進行了擴展,容許註冊一個回調函數,task執行完後自動調用。函數式編程

  • 獲取ListableFuture對象。函數

正如咱們獲取Future對象要經過ExecutorService.submit(Callable)來獲取同樣,咱們能夠這樣建立ListenableFuture對象:工具

executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS)); //包裝Executors建立的線程池
ListenableFuture<String> listenableFuture = executorService.submit(new Callable<String>()...); //獲取ListableFuture對象
listenableFuture.addListener(new Runnable() {
      @Override
      public void run() {
          methodToRunOnFutureTaskCompletion();
      }
}, executorService); //註冊回調函數

FutureCallback類

  • FutureCallback定義了onSuccessonFailure方法,onSuccess方法會接收一個Future對象,這樣咱們就能夠獲取Future的結果。ui

  • 首先須要一個FutureCallback實現類。

/**
 * 定義一個FutureCallBack實現類
 */
public class FutureCallbackImpl implements FutureCallback<String> {
    private StringBuilder builder = new StringBuilder();
 
    @Override
    public void onSuccess(String result) {
        builder.append(result).append(" successfully");
    }
 
    @Override
    public void onFailure(Throwable t) {
        builder.append(t.toString());
    }
 
    public String getCallbackResult() {
        return builder.toString();
    }
}

使用實例:

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> futureTask = executorService.submit(new Callable<String>() { //建立ListenaleFuture對象
                    @Override
                    public String call() throws Exception {
                        return "Task completed";
                    }
                });
FutureCallbackImpl callback = new FutureCallbackImpl();
Futures.addCallback(futureTask, callback); //添加回調
callback.getCallbackResult(); //獲取結果

若是CallBack是一個耗時操做,你應該選擇另外一個註冊CallBack:

Futures.addCallback(futureTask,callback,executorService); //提供另外一個線程池來執行性回調

SettableFuture類:

SettableFuture能夠用來設置要返回得值:

SettableFuture<String> sf = SettableFuture.create();
//Set a value to return
sf.set("Success");
//Or set a failure Exception
sf.setException(someException);

AsyncFunction:

  • 該接口與函數式編程密切相關, 相似Function, 但apply方法會轉換成一個ListenableFuture封裝的範型對象。

public class AsyncFuntionSample implements AsyncFunction<Long, String> {
    private ConcurrentMap<Long, String> map = Maps.newConcurrentMap();
    private ListeningExecutorService listeningExecutorService;
 
    @Override
    public ListenableFuture<String> apply(final Long input) throws Exception {
        if (map.containsKey(input)) {
            SettableFuture<String> listenableFuture = SettableFuture.create(); //構建一個SettableFuture
            listenableFuture.set(map.get(input));
            return listenableFuture;
        } else {
            return listeningExecutorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    String retrieved = //compute to get the data;
                    map.putIfAbsent(input, retrieved);
                    return retrieved;
                }
            });
        }
    }
}

FutureFallback類:

  • FutureFallback用於異常恢復的備份

/**
 * 當Future任務失敗後, 做爲備份的Future
 */
public class FutureFallbackImpl implements FutureFallback<String> {
    @Override
    public ListenableFuture<String> create(Throwable t) throws Exception {
        if (t instanceof FileNotFoundException) {
            SettableFuture<String> settableFuture = SettableFuture.create();
            settableFuture.set("Not Found");
            return settableFuture;
        }
        throw new Exception(t);
    }
}

Futures類:

  • Futures類是有關Future實例的一個工具類。

異步轉換:

ListenableFuture<Person> lf = Futures.transform(ListenableFuture<String> f,AsyncFunction<String,Person> af);

使用FutureFallbacks:

ListenableFuture<String> lf = Futures.withFallback(ListenableFuture<String> f,FutureFallback<String> fb);

RateLimiter:

  • RateLimiter限制訪問每秒訪問資源的線程數。有點相似信號量Semaphore。

RateLimiter limiter = RateLimiter.create(4.0); //每秒不超過4個任務被提交
limiter.acquire();  //請求RateLimiter, 超過permits會被阻塞
executor.submit(runnable); //提交任務

也有非阻塞式地嘗試:

if(limiter.tryAcquire()){ //未請求到limiter則當即返回false
    doSomething();
}else{
    doSomethingElse();
}
相關文章
相關標籤/搜索