鎖像synchronized同步塊同樣,是一種線程同步機制。讓自Java 5開始,java.util.concurrent.locks包提供了另外一種方式實現線程同步機制——Lock。那麼問題來了既然均可以經過synchronized來實現同步訪問了,那麼爲何還須要提供Lock呢?這個問題咱們下面討論java.util.concurrent.locks包中包含了一些鎖的實現,因此咱們不須要重複造輪子了。可是咱們仍然須要去了解怎樣使用這些鎖,且瞭解這些實現背後的理論也是頗有用處的。html
本文將從下面幾個方面介紹java
在學習或者使用Java的過程當中進程會遇到各類各樣的鎖的概念:公平鎖、非公平鎖、自旋鎖、可重入鎖、偏向鎖、輕量級鎖、重量級鎖、讀寫鎖、互斥鎖等待。下邊總結了對各類鎖的解釋算法
公平鎖是指多個線程在等待同一個鎖時按照申請鎖的前後順序來獲取鎖。相反的非公平鎖是指多個線程獲取鎖的順序並非按照申請鎖的順序,有可能後申請的線程比先申請的線程優先獲取鎖。編程
公平鎖的好處是等待鎖的線程不會餓死,可是總體效率相對低一些;非公平鎖的好處是總體效率相對高一些,可是有些線程可能會餓死或者說很早就在等待鎖,但要等好久纔會得到鎖。其中的緣由是公平鎖是嚴格按照請求所的順序來排隊得到鎖的,而非公平鎖時能夠搶佔的,即若是在某個時刻有線程須要獲取鎖,而這個時候恰好鎖可用,那麼這個線程會直接搶佔,而這時阻塞在等待隊列的線程則不會被喚醒。緩存
對於Java ReentrantLock
而言,經過構造函數指定該鎖是不是公平鎖,默認是非公平鎖。例:new ReentrantLock(true)是公平鎖
對於Synchronized
而言,也是一種非公平鎖。因爲其並不像ReentrantLock
是經過AQS的來實現線程調度,因此並無任何辦法使其變成公平鎖。多線程
也叫遞歸鎖,是指在外層函數得到鎖以後,內層遞歸函數仍然能夠獲取到該鎖。即線程能夠進入任何一個它已經擁有鎖的代碼塊。在JAVA環境下 ReentrantLock 和synchronized 都是可重入鎖。可重入鎖最大的做用是避免死鎖。併發
具體區別下文闡述。app
在Java中,自旋鎖是指嘗試獲取鎖的線程不會當即阻塞,而是採用循環的方式去嘗試獲取鎖,這樣的好處是減小線程上下文切換的消耗,缺點是循環會消耗CPU。框架
JDK6中已經變爲默認開啓自旋鎖,而且引入了自適應的自旋鎖。自適應意味着自旋的時間不在固定了,而是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。自旋是在輕量級鎖中使用的,在重量級鎖中,線程不使用自旋。dom
這三種鎖是指鎖的狀態,而且是針對Synchronized
。在Java 5後經過引入鎖升級的機制來實現高效Synchronized。這三種鎖的狀態是經過對象監視器在對象頭中的字段來代表的。以下圖
這裏的無鎖和偏向鎖在對象頭的倒數第三bit中分別採用0和1標記
樂觀鎖與悲觀鎖不是指具體的什麼類型的鎖,而是指看待併發同步的角度
獨佔鎖/共享鎖就是一種廣義的說法,互斥鎖/讀寫鎖就是具體的實現。
ReentrantLock,可重入鎖,是一種遞歸無阻塞的同步機制。它能夠等同於synchronized的使用,可是ReentrantLock提供了比synchronized更強大、靈活的鎖機制,能夠減小死鎖發生的機率。
ReentrantLock還提供了公平鎖和非公平鎖的選擇,構造方法接受一個可選的公平參數(默認非公平鎖),當設置爲true時,表示公平鎖,不然爲非公平鎖。
通常使用以下方式獲取鎖
ReentrantLock lock = new ReentrantLock(); lock.lock();
lock方法:
public void lock() { sync.lock(); }
Sync爲Sync爲ReentrantLock裏面的一個內部類,它繼承AQS。關於AQS的相關知識能夠自行補充一下。Sync有兩個子類分別是FairSync(公平鎖)和 NofairSync(非公平鎖)。默認使用NofairSync,下面是ReentrantLock的構造類
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
下邊是一個簡單的重入鎖使用案例
1 public class ReentrantLockDemo implements Runnable { 2 public static final Lock lock = new ReentrantLock(); 3 public static int i = 0; 4 5 @Override 6 public void run() { 7 for (int j = 0; j < 1000000; j++) { 8 lock.lock(); 9 try { 10 i++; 11 } finally { 12 lock.unlock(); 13 } 14 } 15 } 16 17 public static void main(String[] args) throws InterruptedException { 18 ReentrantLockDemo demo = new ReentrantLockDemo(); 19 Thread t1 = new Thread(demo); 20 Thread t2 = new Thread(demo); 21 t1.start(); 22 t2.start(); 23 t1.join(); 24 t2.join(); 25 System.out.println(i); 26 } 27 }
上述代碼的第8~12行,使用了重入鎖保護了臨界區資源i,確保了多線程對i的操做。輸出結果爲2000000。能夠看到與synchronized相比,重入鎖必選手動指定在什麼地方加鎖,什麼地方釋放鎖,因此更加靈活。
要注意是,再退出臨界區的時候,須要釋放鎖,不然其餘線程就沒法訪問臨界區了。這裏爲啥叫可重入鎖是由於這種鎖是能夠被同一個線程反覆進入的。好比上述代碼的使用鎖部分能夠寫成這樣
lock.lock(); lock.lock(); try { i++; } finally { lock.unlock(); lock.unlock(); }
在這種狀況下,一個線程聯連續兩次獲取同一把鎖,這是容許的。可是須要注意的是,若是同一個線程屢次獲的鎖,那麼在釋放是也要釋放相同次數的鎖。若是釋放的鎖少了,至關於該線程依然持有這個鎖,那麼其餘線程就沒法訪問臨界區了。釋放的次數多了也會拋出java.lang.IllegalMonitorStateException異常。
除了使用上的靈活,ReentrantLock還提供了一些高級功能如中斷。限時等待等。
對用synchrozide來講,若是一個線程在等待,那麼結果只有兩種狀況,要麼得到這把鎖繼續執行下去要麼一直等待下去。而使用重入鎖,提供了另一種可能,那就是線程能夠被中斷。也就是說在這裏能夠取消對鎖的請求。這種狀況對解決死鎖是有必定幫組的。
下面代碼產生了一個死鎖,可是咱們能夠經過鎖的中斷,解決這個死鎖。
public class ReentrantLockDemo implements Runnable { //重入鎖ReentrantLock public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public ReentrantLockDemo(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); Thread.sleep(500); lock2.lockInterruptibly(); System.out.println("this is thread 1"); } else { lock2.lockInterruptibly(); Thread.sleep(500); lock1.lockInterruptibly(); System.out.println("this is thread 2"); } } catch (Exception e) { //e.printStackTrace(); } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock();//釋放鎖 } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":線程退出"); } } public static void main(String[] args) throws InterruptedException { ReentrantLockDemo r1 = new ReentrantLockDemo(1); ReentrantLockDemo r2 = new ReentrantLockDemo(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); Thread.sleep(1000); //t2線程被中斷,放棄鎖申請,釋放已得到的lock2,這個操做使得t1線程順利得到lock2繼續執行下去; //若沒有此段代碼,t2線程沒有中斷,那麼會出現t1獲取lock1,請求lock2,而t2獲取lock2,請求lock1的相互等待死鎖狀況 t2.interrupt(); } }
線程t1和t2啓動後,t1先佔用lock1而後在請求lock2;t2先佔用lock2,而後請求lock1,所以很容易造成線程之間的相互等待。着這裏使用的是ReenTrantLock提供了一種可以中斷等待鎖的線程的機制,經過lock.lockInterruptibly()來實現這個機制。
最後因爲t2線程被中斷,t2會放棄對lock1的1請求,同時釋放lock2。這樣可使t1繼續執行下去,結果以下圖
除了等待通知之外,避免死鎖還有另一種方式,那就是限時等待。經過給定一個等待時間,讓線程自動放棄。
public class TimeLockDemo implements Runnable { private static ReentrantLock reentrantLock = new ReentrantLock(); @Override public void run() { try { if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("Gets lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (reentrantLock.isHeldByCurrentThread()){ reentrantLock.unlock(); } } } public static void main(String[] args) { TimeLockDemo demo1 = new TimeLockDemo(); TimeLockDemo demo2 = new TimeLockDemo(); Thread t1 = new Thread(demo1); Thread t2 = new Thread(demo2); t1.start(); t2.start(); } }
tryLock有兩個參數,一個表示等待時長,另外一個表示計時單位。在這裏就是經過lock.tryLock(5,TimeUnit.SECONDS)來設置鎖申請等待限時,此例就是限時等待5秒獲取鎖。在這裏的鎖請求最多爲5秒,若是超過5秒未得到鎖請求,則會返回fasle,若是成功得到鎖就會返回true。此案例中第一個線程會持有鎖長達6秒,因此另一個線程沒法在5秒內得到鎖 故案例輸出結果爲Gets lock failed
另外tryLock方法也能夠不帶參數之直接運行,在這種狀況下,當前線程會嘗試得到鎖,若是鎖並未被其餘線程佔用,則申請鎖直接成功,當即返回true,不然當前線程不會進行等待,而是當即返回false。這種模式不會引發線程等待,所以也不會產生死鎖。
下邊展現了這種使用方式
public class ReentrantLockDemo implements Runnable { //重入鎖ReentrantLock public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public ReentrantLockDemo(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { while (true) { if (lock1.tryLock()) { try { Thread.sleep(1000); } finally { lock1.unlock(); } } if (lock2.tryLock()) { try { System.out.println("thread " + Thread.currentThread().getId() + " 執行完畢"); return; } finally { lock2.unlock(); } } } } else { while (true) { if (lock2.tryLock()) { try { Thread.sleep(1000); } finally { lock2.unlock(); } } if (lock1.tryLock()) { try { System.out.println("thread " + Thread.currentThread().getId() + " 執行完畢"); return; } finally { lock1.unlock(); } } } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ReentrantLockDemo r1 = new ReentrantLockDemo(1); ReentrantLockDemo r2 = new ReentrantLockDemo(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } }
使用了tryLock後,線程不會傻傻的等待,而是不一樣的嘗試獲取鎖,所以,只要執行足夠長的時間,線程老是會得到全部須要的資源。從而正常執行。下邊展現了運行結果。表示兩個線程運行都正常。
在大多數狀況下。鎖的申請都是非公平的。也就是說系統只是會從等待鎖的隊列裏隨機挑選一個,因此不能保證其公平性。可是公平鎖的實現成本很高,性能也相對低下。所以若是沒有特別要求,也不須要使用公平鎖。
對上邊ReentrantLock幾個重要的方法整理以下。
Conditon和ReentrantLock的組合可讓線程在合適的時間等待,或者在某一個特定的時間獲得通知,繼續執行。在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通訊方式,Condition均可以實現,這裏注意,Condition是被綁定到Lock上的,要建立一個Lock的Condition必須用newCondition()方法。
await()
的返回條件基礎上增長了超時響應,返回值表示當前剩餘的時間,若是在nanosTimeout以前被喚醒,返回值 = nanosTimeout - 實際消耗的時間,返回值 <= 0表示超時;await()
的返回條件基礎上增長了超時響應,與上一接口不一樣的是能夠自定義超時時間單位; 返回值返回true/false,在time以前被喚醒,返回true,超時返回false。使用案例以下
public class ConditionDemo { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { //初始化可重入鎖 final Lock lock = new ReentrantLock(); //第一個條件當屏幕上輸出到3 final Condition reachThreeCondition = lock.newCondition(); //第二個條件當屏幕上輸出到6 final Condition reachSixCondition = lock.newCondition(); //NumberWrapper只是爲了封裝一個數字,一邊能夠將數字對象共享,並能夠設置爲final //注意這裏不要用Integer, Integer 是不可變對象 final NumberWrapper num = new NumberWrapper(); //初始化A線程 Thread threadA = new Thread(new Runnable() { @Override public void run() { //須要先得到鎖 lock.lock(); try { System.out.println("threadA start write"); //A線程先輸出前3個數 while (num.value <= 3) { System.out.println(num.value); num.value++; } //輸出到3時要signal,告訴B線程能夠開始了 reachThreeCondition.signal(); } finally { lock.unlock(); } lock.lock(); try { //等待輸出6的條件 while(num.value <= 6) { reachSixCondition.await(); } System.out.println("threadA start write"); //輸出剩餘數字 while (num.value <= 9) { System.out.println(num.value); num.value++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); while (num.value <= 3) { //等待3輸出完畢的信號 reachThreeCondition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { lock.lock(); //已經收到信號,開始輸出4,5,6 System.out.println("threadB start write"); while (num.value <= 6) { System.out.println(num.value); num.value++; } //4,5,6輸出完畢,告訴A線程6輸出完了 reachSixCondition.signal(); } finally { lock.unlock(); } } }); //啓動兩個線程 threadB.start(); threadA.start(); } }
結果以下
這樣看來,Condition和傳統的線程通訊沒什麼區別,Condition的強大之處在於它能夠爲多個線程間創建不一樣的Condition,下面引入API中的一段代碼,加以說明。
class BoundedBuffer { final Lock lock = new ReentrantLock();//鎖對象 final Condition notFull = lock.newCondition();//寫線程條件 final Condition notEmpty = lock.newCondition();//讀線程條件 final Object[] items = new Object[100];//緩存隊列 int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//若是隊列滿了 notFull.await();//阻塞寫線程 items[putptr] = x;//賦值 if (++putptr == items.length) putptr = 0;//若是寫索引寫到隊列的最後一個位置了,那麼置爲0 ++count;//個數++ notEmpty.signal();//喚醒讀線程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//若是隊列爲空 notEmpty.await();//阻塞讀線程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//若是讀索引讀到隊列的最後一個位置了,那麼置爲0 --count;//個數-- notFull.signal();//喚醒寫線程 return x; } finally { lock.unlock(); } } }
這個示例中BoundedBuffer是一個固定長度的集合,這個在其put操做時,若是發現長度已經達到最大長度,那麼要等待notFull信號才能繼續put,若是獲得notFull信號會像集合中添加元素,而且put操做會發出notEmpty的信號,而在其take方法中若是發現集合長度爲空,那麼會等待notEmpty的信號,接受到notEmpty信號才能繼續take,同時若是拿到一個元素,那麼會發出notFull的信號。
信號量(Semaphore)爲多線程協做提供了更爲強大的控制用法。不管是內部鎖Synchronized仍是ReentrantLock,一次都只容許一個線程訪問資源,而信號量能夠多個線程訪問同一資源。Semaphore是用來保護一個或者多個共享資源的訪問,Semaphore內部維護了一個計數器,其值爲能夠訪問的共享資源的個數。一個線程要訪問共享資源,先得到信號量,若是信號量的計數器值大於1,意味着有共享資源能夠訪問,則使其計數器值減去1,再訪問共享資源。若是計數器值爲0,線程進入休眠。當某個線程使用完共享資源後,釋放信號量,並將信號量內部的計數器加1,以前進入休眠的線程將被喚醒並再次試圖得到信號量。
信號量的UML的類圖以下,能夠看出和ReentrantLock同樣,Semaphore也包含了sync對象,sync是Sync類型;並且,Sync是一個繼承於AQS的抽象類。Sync包括兩個子類:"公平信號量"FairSync 和 "非公平信號量"NonfairSync。sync是"FairSync的實例",或者"NonfairSync的實例";默認狀況下,sync是NonfairSync(即,默認是非公平信號量)
信號量主要提供瞭如下構造函數
Semaphore(int num) Semaphore(int num,boolean how)
這裏,num指定初始許可計數。所以,它指定了一次能夠訪問共享資源的線程數。若是是1,則任什麼時候候只有一個線程能夠訪問該資源。默認狀況下,全部等待的線程都以未定義的順序被授予許可。經過設置how爲true,能夠確保等待線程按其請求訪問的順序被授予許可。信號量的主要邏輯方法以下
// 今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被中斷。 void acquire() // 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。 void acquire(int permits) // 今後信號量中獲取許可,在有可用的許可前將其阻塞。 void acquireUninterruptibly() // 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。 void acquireUninterruptibly(int permits) // 返回此信號量中當前可用的許可數。 // 釋放一個許可,將其返回給信號量。 void release() // 釋放給定數目的許可,將其返回到信號量。 // 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。 boolean tryAcquire() // 僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。 boolean tryAcquire(int permits) // 若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被中斷,則今後信號量獲取給定數目的許可。 boolean tryAcquire(int permits, long timeout, TimeUnit unit) // 若是在給定的等待時間內,此信號量有可用的許可而且當前線程未被中斷,則今後信號量獲取一個許可。
實例以下:這裏咱們模擬10我的去銀行存款,可是該銀行只有兩個辦公櫃檯,有空位則上去存錢,沒有空位則只能去排隊等待。最後輸出銀行總額
public class SemaphoreThread { private int customer; public SemaphoreThread() { customer = 0; } /** * 銀行存錢類 */ class Bank { private int account = 100; public int getAccount() { return account; } public void save(int money) { account += money; } } /** * 線程執行類,每次存10塊錢 */ class NewThread implements Runnable { private Bank bank; private Semaphore semaphore; public NewThread(Bank bank, Semaphore semaphore) { this.bank = bank; this.semaphore = semaphore; } @Override public void run() { int tempCustomer = customer++; if (semaphore.availablePermits() > 0) { System.out.println("客戶" + tempCustomer + "啓動,進入銀行,有位置當即去存錢"); } else { System.out.println("客戶" + tempCustomer + "啓動,進入銀行,無位置,去排隊等待等待"); } try { semaphore.acquire(); bank.save(10); System.out.println(tempCustomer + "銀行餘額爲:" + bank.getAccount()); Thread.sleep(1000); System.out.println("客戶" + tempCustomer + "存錢完畢,離開銀行"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 創建線程,調用內部類,開始存錢 */ public void useThread() { Bank bank = new Bank(); // 定義2個新號量 Semaphore semaphore = new Semaphore(2); // 創建一個緩存線程池 ExecutorService es = Executors.newCachedThreadPool(); // 創建10個線程 for (int i = 0; i < 10; i++) { // 執行一個線程 es.submit(new Thread(new NewThread(bank, semaphore))); } // 關閉線程池 es.shutdown(); // 從信號量中獲取兩個許可,而且在得到許可以前,一直將線程阻塞 semaphore.acquireUninterruptibly(2); System.out.println("到點了,工做人員要吃飯了"); // 釋放兩個許可,並將其返回給信號量 semaphore.release(2); } public static void main(String[] args) { SemaphoreThread test = new SemaphoreThread(); test.useThread(); } }
ReentrantReadWriteLock是Lock的另外一種實現方式,咱們已經知道了ReentrantLock是一個排他鎖,同一時間只容許一個線程訪問,而ReentrantReadWriteLock容許多個讀線程同時訪問(也就是讀操做),但不容許寫線程和讀線程、寫線程和寫線程同時訪問。約束以下
相對於排他鎖,提升了併發性。在實際應用中,大部分狀況下對共享數據(如緩存)的訪問都是讀操做遠多於寫操做,這時ReentrantReadWriteLock可以提供比排他鎖更好的併發性和吞吐量。
看一下官方案例
lass CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void processCachedData() { rwl.readLock().lock();//1 if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock();//2 rwl.writeLock().lock();//3 try { // Recheck state because another thread might have,acquired write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // 在釋放寫鎖以前經過獲取讀鎖降級寫鎖(注意此時尚未釋放寫鎖) rwl.readLock().lock();//4 } finally { // 釋放寫鎖而此時已經持有讀鎖 rwl.writeLock().unlock();//5 } } try { use(data); } finally { rwl.readLock().unlock();//6 } } }
若是不使用鎖降級功能,如先釋放寫鎖,而後得到讀鎖,在這個get過程當中,可能會有其餘線程競爭到寫鎖 或者是更新數據 則得到的數據是其餘線程更新的數據,可能會形成數據的污染,即產生髒讀的問題
1 public class ReadAndWriteLock { 2 private static ReentrantLock lock = new ReentrantLock(); 3 private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 4 private static Lock readLock = readWriteLock.readLock(); 5 private static Lock writeLock = readWriteLock.writeLock(); 6 7 public ReadAndWriteLock setValue(int value) { 8 this.value = value; 9 return this; 10 } 11 12 private int value; 13 14 public Object handleRead(Lock lock) throws InterruptedException { 15 try { 16 //模擬讀操做 17 lock.lock(); 18 System.out.println("thread:" + Thread.currentThread().getId() + " value:" + value); 19 Thread.sleep(1000); 20 return value; 21 } finally { 22 lock.unlock(); 23 } 24 } 25 26 public Object handleWrite(Lock lock, int index) throws InterruptedException { 27 try { 28 //模擬寫操做 29 lock.lock(); 30 value = index; 31 Thread.sleep(1000); 32 System.out.println("thread:" + Thread.currentThread().getId() + " value:" + value); 33 return value; 34 35 } finally { 36 lock.unlock(); 37 } 38 } 39 40 public static void main(String[] args) throws InterruptedException { 41 final ReadAndWriteLock demo = new ReadAndWriteLock(); 42 demo.setValue(0); 43 Runnable readRunnable = new Runnable() { 44 @Override 45 public void run() { 46 try { 47 //讀鎖 48 demo.handleRead(readLock); 49 //可重入鎖 50 //demo.handleRead(lock); 51 52 } catch (InterruptedException e) { 53 e.printStackTrace(); 54 } 55 56 } 57 }; 58 59 Runnable writeRunnable = new Runnable() { 60 @Override 61 public void run() { 62 try { 63 //寫鎖 64 demo.handleWrite(readLock, (int) (Math.random() * 1000)); 65 //可重入鎖 66 //demo.handleWrite(lock, (int) (Math.random() * 1000)); 67 } catch (InterruptedException e) { 68 e.printStackTrace(); 69 } 70 71 } 72 }; 73 ExecutorService exec = new ThreadPoolExecutor(0, 200, 74 0, TimeUnit.SECONDS, 75 new SynchronousQueue<Runnable>()); 76 ; 77 long startTime = System.currentTimeMillis(); 78 79 for (int i = 0; i < 18; i++) { 80 exec.execute(readRunnable); 81 } 82 83 for (int i = 0; i < 18; i++) { 84 exec.execute(writeRunnable); 85 } 86 exec.shutdown(); 87 exec.awaitTermination(60, TimeUnit.MINUTES); 88 long endTime = System.currentTimeMillis(); //獲取結束時間 89 System.out.println("程序運行時間: " + (endTime - startTime) + "ms"); 90 91 } 92 }
在這裏讀線程徹底並行,而寫會阻塞讀。程序執行時間以下
將上述案例中的讀寫鎖改爲可重入鎖,即將第行代碼註釋掉那麼全部的讀和寫線程都必須相互等待,程序執行時間以下所示
CountDownLatch是java1.5版本以後util.concurrent提供的工具類。這裏簡單介紹一下CountDownLatch,能夠將其當作是一個計數器,await()方法能夠阻塞至超時或者計數器減至0,其餘線程當完成本身目標的時候能夠減小1,利用這個機制咱們能夠將其用來作併發。 好比有一個任務A,它要等待其餘4個任務執行完畢以後才能執行,此時就能夠利用CountDownLatch來實現這種功能了。
CountDownLatch類只提供了一個構造器,該構造器接受一個整數做爲參數,即當前這個計數器的計數個數 。
public CountDownLatch(int count) { }; //參數count爲計數值
使用場景:好比對於馬拉松比賽,進行排名計算,參賽者的排名,確定是跑完比賽以後,進行計算得出的,翻譯成Java識別的預發,就是N個線程執行操做,主線程等到N個子線程執行完畢以後,在繼續往下執行。
public class CountDownLatchTest { public static void main(String[] args){ int threadCount = 10; final CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { new Thread(new Runnable() { @Override public void run() { System.out.println("線程" + Thread.currentThread().getId() + "開始出發"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getId() + "已到達終點"); latch.countDown(); } }).start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10個線程已經執行完畢!開始計算排名"); } }
結果以下
線程12開始出發
線程14開始出發
線程15開始出發
線程17開始出發
線程13開始出發
線程16開始出發
線程18開始出發
線程19開始出發
線程20開始出發
線程21開始出發
線程16已到達終點
線程13已到達終點
線程19已到達終點
線程18已到達終點
線程17已到達終點
線程14已到達終點
線程15已到達終點
線程12已到達終點
線程21已到達終點
線程20已到達終點
10個線程已經執行完畢!開始計算排名
CountDownLatch在並行化應用中也是比較經常使用。經常使用的並行化框架OpenMP中也是借鑑了這種思想。好比有這樣的一個需求,在你淘寶訂單的時候,這筆訂單可能還須要查,用戶信息,折扣信息,商家信息,商品信息等,用同步的方式(也就是串行的方式)流程以下。
設想一下這5個查詢服務,平均每次消耗100ms,那麼本次調用至少是500ms,咱們這裏假設,在這個這五個服務其實並無任何數據依賴,誰先獲取誰後獲取均可以,那麼咱們能夠想辦法並行化這五個服務。
這裏可使用CountDownLatch來實現這個效果。
public class CountDownDemo { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 8; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException { // 新建一個爲5的計數器 CountDownLatch countDownLatch = new CountDownLatch(5); OrderInfo orderInfo = new OrderInfo(); THREAD_POOL.execute(() -> { System.out.println("當前任務Customer,線程名字爲:" + Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("當前任務Discount,線程名字爲:" + Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("當前任務Food,線程名字爲:" + Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("當前任務Tenant,線程名字爲:" + Thread.currentThread().getName()); orderInfo.setTenantInfo(new TenantInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("當前任務OtherInfo,線程名字爲:" + Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); countDownLatch.countDown(); }); countDownLatch.await(1, TimeUnit.SECONDS); System.out.println("主線程:" + Thread.currentThread().getName()); } }
創建一個線程池(具體配置根據具體業務,具體機器配置),進行併發的執行咱們的任務(生成用戶信息,菜品信息等),最後利用await方法阻塞等待結果成功返回。
字面意思循環柵欄,柵欄就是一種障礙物。這裏就是內存屏障。經過它能夠實現讓一組線程等待至某個狀態以後再所有同時執行。叫作迴環是由於當全部等待線程都被釋放之後,CyclicBarrier能夠被重用。CyclicBarrier比CountDownLatch 功能更強大一些,CyclicBarrier能夠接受一個參數做爲barrierAction。所謂barrierAction就是當計算器一次計數完成後,系統會執行的動做。CyclicBarrier強調的是n個線程,你們相互等待,只要有一個沒完成,全部人都得等着。(這種思想在高性能計算最爲常見,GPU計算中關於也有相似內存屏障的用法)。構造函數以下,其中parties表示計數總數,也就是參與的線程總數。
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
案例10我的去旅行,規定達到一個地點後才能繼續前行.代碼以下
class CyclicBarrierWorker implements Runnable { private int id; private CyclicBarrier barrier; public CyclicBarrierWorker(int id, final CyclicBarrier barrier) { this.id = id; this.barrier = barrier; } @Override public void run() { try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); System.out.println(id + " th people wait"); barrier.await(); // 你們等待最後一個線程到達 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } public class TestCyclicBarrier { public static void main(String[] args) { int num = 10; CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() { @Override public void run() { System.out.println("go on together!"); } }); for (int i = 1; i <= num; i++) { new Thread(new CyclicBarrierWorker(i, barrier)).start(); } } }
從上面輸出結果能夠看出,每一個線程執行本身的操做以後,就在等待其餘線程執行操做完畢。當全部線程線程執行操做完畢以後,全部線程就繼續進行後續的操做了。
參考資料
《Java高併發編程設計》
https://www.cnblogs.com/-new/p/7256297.html
https://www.cnblogs.com/dolphin0520/p/3923167.html