這裏舉一個栗子,咱們對一個資源進行加鎖,但是又要進行細粒度的控制,該如何實現呢?java
好比咱們開了了個餐館。餐館有一個廚房,服務員能夠通知廚房進行作菜,當前冰箱裏有菜時,廚房就會開始作菜,冰箱裏沒菜則會等待。dom
/** * Created by Anur IjuoKaruKas on 6/28/2018 */ @SuppressWarnings("Duplicates") public class Restaurant { private final Lock kitchen = new ReentrantLock(); private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("通知廚房作肉"); if (meetFridge.isEmpty()) { try { System.out.println("冰箱沒有肉了,等待冰箱有肉"); kitchen.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("進貨了"); meetFridge.addLast("牛肉"); kitchen.notify(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockMeet()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); executorService.execute(restaurant.cockMeet()); } }
運行一下main方法,能夠獲得如下輸出:ide
通知廚房作肉 冰箱沒有肉了,等待冰箱有肉 通知廚房作肉 冰箱沒有肉了,等待冰箱有肉 進貨了 正在炒牛肉 進貨了 正在炒牛肉 進貨了 通知廚房作肉 正在炒牛肉
到這裏是沒有什麼問題的。this
如今咱們既須要作肉,也須要作菜。 也就是說: 一、服務員通知了廚房,須要作一個肉和一個菜。這個時候廚房正好沒庫存,廚房進行了等待。 二、這時候某人去菜市場買了菜回來,廚房開始作菜。 三、過了一段時間 四、某人去菜市場買了肉回來,廚房開始作肉。線程
/** * Created by Anur IjuoKaruKas on 6/28/2018 */ @SuppressWarnings("Duplicates") public class Restaurant { private final Lock kitchen = new ReentrantLock(); private final Condition waitMeet = kitchen.newCondition(); private final Condition waitVege = kitchen.newCondition(); private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱 private ConcurrentLinkedDeque<String> vegeFridge = new ConcurrentLinkedDeque<>();// 菜冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { kitchen.lock(); try { System.out.println("通知廚房作肉"); if (meetFridge.isEmpty()) { try { System.out.println("冰箱沒有肉了,等待冰箱有肉"); waitMeet.await(); // 直接調用condition的wait方法 } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public Runnable cockVege() { return new Runnable() { @Override public void run() { kitchen.lock(); try { System.out.println("通知廚房作菜"); if (vegeFridge.isEmpty()) { try { System.out.println("冰箱沒有菜了,等待冰箱有菜"); waitVege.await(); // 直接調用condition的wait方法 } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = vegeFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { kitchen.lock(); try { Random random = new Random(); if (random.nextBoolean()) { System.out.println("肉進貨了"); meetFridge.addLast("牛肉"); waitMeet.signal(); } else { System.out.println("菜進貨了"); vegeFridge.addLast("苦瓜"); waitVege.signal(); } } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockVege()); executorService.execute(restaurant.buySomething()); } }
最後輸出:rest
通知廚房作肉 冰箱沒有肉了,等待冰箱有肉 通知廚房作菜 冰箱沒有菜了,等待冰箱有菜 肉進貨了 正在炒牛肉
可見咱們能夠針對狀況對不一樣的行爲進行通知,這就是condition的力量。code
這裏就不瞎扯場景了,直接上代碼。隊列
這是仿kafka BufferPool的一種思路,(固然沒kafka實現的那麼複雜),它的思路是使用一個隊列來管理等待的線程。 每次線程進來sout(),都進行等待 知足必定的條件時,mission()會通知隊頭的一個線程進行操做。資源
/** * Created by Anur IjuoKaruKas on 6/25/2018 */ public class Task { private Deque<Condition> waiters = new ArrayDeque<>(); private Lock lock = new ReentrantLock(); private Integer count = 0; private void sout(String str) { this.lock.lock(); try { System.out.println("sout " + str + " get the lock"); Condition condition = this.lock.newCondition(); waiters.addLast(condition); condition.await(); Condition conditionFromWaiters = waiters.removeFirst(); if (conditionFromWaiters != condition) { System.out.println("???????"); } System.out.println("Test Task: " + str); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void mission() { this.lock.lock(); try { System.out.println("mission get the lock"); while (count < 10) { count++; } Condition condition = waiters.peekFirst(); if (condition != null) { condition.signal(); } count = 0; } finally { lock.unlock(); } } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); final Task task = new Task(); for (int i = 0; i < 1000000; i++) { final int finalI = i; executorService.execute(new Runnable() { @Override public void run() { task.sout(finalI + ""); } }); executorService.execute(new Runnable() { @Override public void run() { task.mission(); } }); } } }