顯示鎖 java
Lock接口是Java 5.0新增的接口,該接口的定義以下:sql
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
與內置加鎖機制不一樣的是,Lock提供了一種無條件的、可輪詢的、定時的以及可中斷的鎖獲取操做,全部加鎖和解鎖的方法都是顯示的。ReentrantLock實現了Lock接口,與內置鎖相比,ReentrantLock有如下優點:能夠中斷獲取鎖操做,獲取鎖時候能夠設置超時時間。如下代碼給出了Lock接口的標準使用形式:安全
Lock lock = new ReentrantLock(); ... lock.lock(); try{ ... } finally { lock.unlock();
1.一、輪詢鎖與定時鎖併發
可定時的與可輪詢的鎖獲取方式是由tryLock方法實現的,與無條件的鎖獲取方式相比,它具備跟完善的錯誤回覆機制。tryLock方法的說明以下:app
boolean tryLock():僅在調用時鎖爲空閒狀態才獲取該鎖。若是鎖可用,則獲取鎖,並當即返回值 true。若是鎖不可用,則此方法將當即返回值 false。 boolean tryLock(long time, TimeUnit unit) throws InterruptedException: 若是鎖在給定的等待時間內空閒,而且當前線程未被中斷,則獲取鎖。 若是鎖可用,則此方法將當即返回值 true。若是鎖不可用,出於線程調度目的,將禁用當前線程,而且在發生如下三種狀況之一前,該線程將一直處於休眠狀態: 鎖由當前線程得到;或者 其餘某個線程中斷當前線程,而且支持對鎖獲取的中斷;或者 已超過指定的等待時間 若是得到了鎖,則返回值 true。 若是當前線程: 在進入此方法時已經設置了該線程的中斷狀態;或者 在獲取鎖時被中斷,而且支持對鎖獲取的中斷, 則將拋出 InterruptedException,並會清除當前線程的已中斷狀態。 若是超過了指定的等待時間,則將返回值 false。若是 time 小於等於 0,該方法將徹底不等待。
在內置鎖中,死鎖是一個嚴重的問題,恢復程序的惟一方法是從新啓動程序,而防止死鎖的惟一方法就是在構造程序時避免出現不一致的鎖順序,可定時的與可輪詢的鎖提供了另外一種選擇:先用tryLock()嘗試獲取全部的鎖,若是不能獲取全部須要的鎖,那麼釋放已經獲取的鎖,而後從新嘗試獲取全部的鎖,如下例子演示了使用tryLock避免死鎖的方法:先用tryLock來獲取兩個鎖,若是不能同時獲取,那麼就回退並從新嘗試。異步
public boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException { long fixedDelay = 1; long randMod = 2; long stopTime = System.nanoTime() + unit.toNanos(timeout); while (true) { if (fromAcct.lock.tryLock()) { try { if (toAcct.lock.tryLock()) { try { if (fromAcct.getBalance().compareTo(amount) < 0) throw new InsufficientFundsException(); else { fromAcct.debit(amount); toAcct.credit(amount); return true; } } finally { toAcct.lock.unlock(); } } } finally { fromAcct.lock.unlock(); } } if (System.nanoTime() < stopTime) return false; NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod); } }
1.二、可中斷的鎖獲取操做工具
lockInterruptibly方法可以在得到鎖的同時保持對中斷的響應,該方法說明以下:ui
void lockInterruptibly() throws InterruptedException: 若是當前線程未被中斷,則獲取鎖。 若是鎖可用,則獲取鎖,並當即返回。 若是鎖不可用,出於線程調度目的,將禁用當前線程,而且在發生如下兩種狀況之一之前,該線程將一直處於休眠狀態: 鎖由當前線程得到;或者 其餘某個線程中斷當前線程,而且支持對鎖獲取的中斷。 若是當前線程: 在進入此方法時已經設置了該線程的中斷狀態;或者 在獲取鎖時被中斷,而且支持對鎖獲取的中斷, 則將拋出 InterruptedException,並清除當前線程的已中斷狀態。
1.三、讀-寫鎖this
Java 5除了增長了Lock接口,還增長了ReadWriteLock接口,即讀寫鎖,該接口定義以下:spa
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
讀寫鎖容許多個讀線程併發執行,可是不容許寫線程與讀線程併發執行,也不容許寫線程與寫線程併發執行。下面的例子使用了ReentrantReadWriteLock包裝Map,從而使他可以在多個線程之間安全的共享:
public class ReadWriteMap <K,V> { private final Map<K, V> map; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock r = lock.readLock(); private final Lock w = lock.writeLock(); public ReadWriteMap(Map<K, V> map) { this.map = map; } public V put(K key, V value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } public V remove(Object key) { w.lock(); try { return map.remove(key); } finally { w.unlock(); } } public void putAll(Map<? extends K, ? extends V> m) { w.lock(); try { map.putAll(m); } finally { w.unlock(); } } public void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } public V get(Object key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } public int size() { r.lock(); try { return map.size(); } finally { r.unlock(); } } public boolean isEmpty() { r.lock(); try { return map.isEmpty(); } finally { r.unlock(); } } public boolean containsKey(Object key) { r.lock(); try { return map.containsKey(key); } finally { r.unlock(); } } public boolean containsValue(Object value) { r.lock(); try { return map.containsValue(value); } finally { r.unlock(); } } }
同步工具類
2.一、閉鎖
閉鎖是一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。
用給定的計數初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。這種現象只出現一次——計數沒法被重置。若是須要重置計數,請考慮使用 CyclicBarrier。
下例給出了閉鎖的常見用法,TestHarness建立必定數量的線程,利用它們併發的執行指定的任務,它使用兩個閉鎖,分別表示"起始門"和"結束門"。每一個線程首先要作的就是在啓動門上等待,從而確保全部線程都就緒後纔開始執行,而每一個線程要作的最後一件事是將調用結束門的countDown方法減1,這能使主線程高效地等待直到全部工做線程都執行完畢,所以能夠統計所消耗的時間:
public class TestHarness { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } }
2.二、FutureTask
FutureTask表示可取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。僅在計算完成時才能獲取結果;若是計算還沒有完成,則阻塞 get 方法。一旦計算完成,就不能再從新開始或取消計算。FutureTask的方法摘要以下:
boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。 protected void done() 當此任務轉換到狀態 isDone(不論是正常地仍是經過取消)時,調用受保護的方法。 V get() throws InterruptedException, ExecutionException 若有必要,等待計算完成,而後獲取其結果。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 若有必要,最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。 boolean isCancelled() 若是在任務正常完成前將其取消,則返回 true。 boolean isDone() 若是任務已完成,則返回 true。 void run() 除非已將此 Future 取消,不然將其設置爲其計算的結果。 protected boolean runAndReset() 執行計算而不設置其結果,而後將此 Future 重置爲初始狀態,若是計算遇到異常或已取消,則該操做失敗。 protected void set(V v) 除非已經設置了此 Future 或已將其取消,不然將其結果設置爲給定的值。 protected void setException(Throwable t) 除非已經設置了此 Future 或已將其取消,不然它將報告一個 ExecutionException,並將給定的 throwable 做爲其緣由。
FutureTask能夠用來表示一些時間較長的計算,這些計算能夠在使用計算結果以前啓動,如下代碼就是模擬一個高開銷的計算,咱們能夠先調用start()方法開始計算,而後在須要結果時,再調用get獲得結果:
public class Preloader { ProductInfo loadProductInfo() throws DataLoadException { return null; } private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>( new Callable<ProductInfo>() { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; else throw new RuntimeException(e); } } interface ProductInfo { } } class DataLoadException extends Exception { }
2.三、信號量
從概念上講,信號量維護了一個許可集。若有必要,在許可可用前會阻塞每個 acquire(),而後等待獲取許可。每一個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。可是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並採起相應的行動。
Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:
class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }
得到一項前,每一個線程必須從信號量獲取許可,從而保證可使用該項。該線程結束後,將項返回到池中並將許可返回到該信號量,從而容許其餘線程獲取該項。注意,調用 acquire() 時沒法保持同步鎖,由於這會阻止將項返回到池中。信號量封裝所需的同步,以限制對池的訪問,這同維持該池自己一致性所需的同步是分開的。
將信號量初始化爲 1,使得它在使用時最多隻有一個可用的許可,從而可用做一個相互排斥的鎖。這一般也稱爲二進制信號量,由於它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信號量具備某種屬性(與不少 Lock 實現不一樣),便可以由線程釋放「鎖」,而不是由全部者(由於信號量沒有全部權的概念)。在某些專門的上下文(如死鎖恢復)中這會頗有用。
Semaphore的構造方法可選地接受一個公平 參數。當設置爲 false 時,此類不對線程獲取許可的順序作任何保證。特別地,闖入 是容許的,也就是說能夠在已經等待的線程前爲調用 acquire() 的線程分配一個許可,從邏輯上說,就是新線程將本身置於等待線程隊列的頭部。當公平設置爲 true時,信號量保證對於任何調用獲取方法的線程而言,都按照處理它們調用這些方法的順序(即先進先出;FIFO)來選擇線程、得到許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。因此,可能某個線程先於另外一個線程調用了acquire,可是卻在該線程以後到達排序點,而且從方法返回時也相似。還要注意,非同步的tryAcquire 方法不使用公平設置,而是使用任意可用的許可。
一般,應該將用於控制資源訪問的信號量初始化爲公平的,以確保全部線程均可訪問資源。爲其餘的種類的同步控制使用信號量時,非公平排序的吞吐量優點一般要比公平考慮更爲重要。
Semaphore還提供便捷的方法來同時 acquire 和釋放多個許可。當心,在未將公平設置爲 true 時使用這些方法會增長不肯定延期的風險。
內存一致性效果:線程中調用「釋放」方法(好比 release())以前的操做 happen-before 另外一線程中緊跟在成功的「獲取」方法(好比 acquire())以後的操做。
2.四、柵欄
CyclicBarrier是一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier頗有用。由於該 barrier在釋放等待線程後能夠重用,因此稱它爲循環的barrier。
CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做頗有用。
示例用法:下面是一個在並行分解設計中使用barrier的例子:
class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; barrier = new CyclicBarrier(N, new Runnable() { public void run() { //mergeRows(...); } }); for (int i = 0; i < N; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } }
在這個例子中,每一個 worker 線程處理矩陣的一行,在處理完全部的行以前,該線程將一直在屏障處等待。處理完全部的行以後,將執行所提供的 Runnable 屏障操做,併合並這些行。若是合併者肯定已經找到了一個解決方案,那麼 done() 將返回 true,全部的 worker 線程都將終止。
若是屏障操做在執行時不依賴於正掛起的線程,則線程組中的任何線程在得到釋放時都能執行該操做。爲方便此操做,每次調用 await() 都將返回能到達屏障處的線程的索引。而後,您能夠選擇哪一個線程應該執行屏障操做.
對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼所有要麼全不 (all-or-none) 的破壞模式:若是由於中斷、失敗或者超時等緣由,致使線程過早地離開了屏障點,那麼在該屏障點等待的其餘全部線程也將經過 BrokenBarrierException以反常的方式離開。
內存一致性效果:線程中調用 await() 以前的操做 happen-before 那些是屏障操做的一部份的操做,後者依次 happen-before 緊跟在從另外一個線程中對應 await() 成功返回的操做。