Guava集合處理是很強大的(這些在jdk8中都有些引入),但Guava發光的地方是併發。java
Monitor實現同步編程
/** * 經過Monitor的Guard進行條件阻塞 */ public class MonitorSample { private List<String> list = new ArrayList<String>(); private static final int MAX_SIZE = 10; private Monitor monitor = new Monitor(); private Monitor.Guard listBelowCapacity = new Monitor.Guard(monitor) { @Override public boolean isSatisfied() { return list.size() < MAX_SIZE; } }; public void addToList(String item) throws InterruptedException { monitor.enterWhen(listBelowCapacity); //Guard(形如Condition),不知足則阻塞,並且咱們並無在Guard進行任何通知操做 try { list.add(item); } finally { monitor.leave(); } } }
Monitor就像java本土的synchronized, ReentrantLock同樣,每次只運行一個線程佔用,且可重佔用,每一次佔用會對應一次退出佔用。併發
就如上面,咱們經過if條件來判斷是否可進入Monitor代碼塊,並再try/finally中釋放:app
if (monitor.enterIf(guardCondition)) { try { doWork(); } finally { monitor.leave(); } }
Monitor.enter //進入Monitor塊,將阻塞其餘線程知道Monitor.leave Monitor.tryEnter //嘗試進入Monitor塊,true表示能夠進入, false表示不能,而且不會一直阻塞 Monitor.tryEnterIf //根據條件嘗試進入Monitor塊
這些方法都有對應的限時版本。異步
jdk5以後有了Future這種異步執行的結構ide
ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> future = executor.submit(new Callable<Integer>(){ public Integer call() throws Exception{ return service.getCount(); } }); //Retrieve the value of computation Integer count = future.get();
ListenableFuture對Future進行了擴展,容許註冊一個回調函數,task執行完後自動調用。函數式編程
獲取ListableFuture對象。函數
正如咱們獲取Future對象要經過ExecutorService.submit(Callable)來獲取同樣,咱們能夠這樣建立ListenableFuture對象:工具
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS)); //包裝Executors建立的線程池 ListenableFuture<String> listenableFuture = executorService.submit(new Callable<String>()...); //獲取ListableFuture對象 listenableFuture.addListener(new Runnable() { @Override public void run() { methodToRunOnFutureTaskCompletion(); } }, executorService); //註冊回調函數
FutureCallback定義了onSuccess和onFailure方法,onSuccess方法會接收一個Future對象,這樣咱們就能夠獲取Future的結果。ui
首先須要一個FutureCallback實現類。
/** * 定義一個FutureCallBack實現類 */ public class FutureCallbackImpl implements FutureCallback<String> { private StringBuilder builder = new StringBuilder(); @Override public void onSuccess(String result) { builder.append(result).append(" successfully"); } @Override public void onFailure(Throwable t) { builder.append(t.toString()); } public String getCallbackResult() { return builder.toString(); } }
使用實例:
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); ListenableFuture<String> futureTask = executorService.submit(new Callable<String>() { //建立ListenaleFuture對象 @Override public String call() throws Exception { return "Task completed"; } }); FutureCallbackImpl callback = new FutureCallbackImpl(); Futures.addCallback(futureTask, callback); //添加回調 callback.getCallbackResult(); //獲取結果
若是CallBack是一個耗時操做,你應該選擇另外一個註冊CallBack:
Futures.addCallback(futureTask,callback,executorService); //提供另外一個線程池來執行性回調
SettableFuture能夠用來設置要返回得值:
SettableFuture<String> sf = SettableFuture.create(); //Set a value to return sf.set("Success"); //Or set a failure Exception sf.setException(someException);
該接口與函數式編程密切相關, 相似Function, 但apply方法會轉換成一個ListenableFuture封裝的範型對象。
public class AsyncFuntionSample implements AsyncFunction<Long, String> { private ConcurrentMap<Long, String> map = Maps.newConcurrentMap(); private ListeningExecutorService listeningExecutorService; @Override public ListenableFuture<String> apply(final Long input) throws Exception { if (map.containsKey(input)) { SettableFuture<String> listenableFuture = SettableFuture.create(); //構建一個SettableFuture listenableFuture.set(map.get(input)); return listenableFuture; } else { return listeningExecutorService.submit(new Callable<String>() { @Override public String call() throws Exception { String retrieved = //compute to get the data; map.putIfAbsent(input, retrieved); return retrieved; } }); } } }
FutureFallback用於異常恢復的備份
/** * 當Future任務失敗後, 做爲備份的Future */ public class FutureFallbackImpl implements FutureFallback<String> { @Override public ListenableFuture<String> create(Throwable t) throws Exception { if (t instanceof FileNotFoundException) { SettableFuture<String> settableFuture = SettableFuture.create(); settableFuture.set("Not Found"); return settableFuture; } throw new Exception(t); } }
Futures類是有關Future實例的一個工具類。
ListenableFuture<Person> lf = Futures.transform(ListenableFuture<String> f,AsyncFunction<String,Person> af);
ListenableFuture<String> lf = Futures.withFallback(ListenableFuture<String> f,FutureFallback<String> fb);
RateLimiter限制訪問每秒訪問資源的線程數。有點相似信號量Semaphore。
RateLimiter limiter = RateLimiter.create(4.0); //每秒不超過4個任務被提交
limiter.acquire(); //請求RateLimiter, 超過permits會被阻塞 executor.submit(runnable); //提交任務
也有非阻塞式地嘗試:
if(limiter.tryAcquire()){ //未請求到limiter則當即返回false doSomething(); }else{ doSomethingElse(); }