jdk提供
synchronized
實現線程同步,但有些場景下並不靈活,如多個同步方法,每次只能有一個線程訪問;而Lock則能夠很是靈活的在代碼中實現同步機制java
在以前學習阻塞隊列中,較多地方使用
ReadWriteLock
,Condition
,接下來在探究實現原理以前,先研究下鎖的使用node
Lock 接口的定義安全
public interface Lock { // 獲取鎖,若當前lock被其餘線程獲取;則此線程阻塞等待lock被釋放 // 若是採用Lock,必須主動去釋放鎖,而且在發生異常時,不會自動釋放鎖 void lock(); // 獲取鎖,若當前鎖不可用(被其餘線程獲取); // 則阻塞線程,等待獲取鎖,則這個線程可以響應中斷,即中斷線程的等待狀態 void lockInterruptibly() throws InterruptedException; // 來嘗試獲取鎖,若是獲取成功,則返回true; // 若是獲取失敗(即鎖已被其餘線程獲取),則返回false // 也就是說,這個方法不管如何都會當即返回 boolean tryLock(); // 在拿不到鎖時會等待必定的時間 // 等待過程當中,能夠被中斷 // 超過期間,依然獲取不到,則返回false;不然返回true boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 釋放鎖 void unlock(); // 返回一個綁定該lock的Condtion對象 // 在Condition#await()以前,鎖會被該線程持有 // Condition#await() 會自動釋放鎖,在wait返回以後,會自動獲取鎖 Condition newCondition(); }
ReentrantLock
可重入鎖。jdk中ReentrantLock是惟一實現了Lock接口的類多線程
可重入的意思是一個線程擁有鎖以後,能夠再次獲取鎖,併發
最基本的使用場景,就是利用lock和unlock來實現線程同步框架
以輪班爲實例進行說明,要求一我的下班以後,另外一我的才能上班,即不能兩我的同時上班,具體實現能夠以下ide
public class LockDemo { private Lock lock = new ReentrantLock(); private void workOn() { System.out.println(Thread.currentThread().getName() + ":上班!"); } private void workOff() { System.out.println(Thread.currentThread().getName() + ":下班"); } public void work() { try { lock.lock(); workOn(); System.out.println(Thread.currentThread().getName() + "工做中!!!!"); Thread.sleep(100); workOff(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { LockDemo lockDemo = new LockDemo(); int i = 0; List<Thread> list = new ArrayList<>(30); do { Thread a = new Thread(new Runnable() { @Override public void run() { lockDemo.work(); } }, "小A_" + i); Thread b = new Thread(new Runnable() { @Override public void run() { lockDemo.work(); } }, "小B_" + i); list.add(a); list.add(b); } while (i++ < 10); list.parallelStream().forEach(Thread::start); Thread.sleep(3000); System.out.println("main over!"); } }
上面的示例,主要給出了lock()
和 unlock()
的配套使用,當一個線程在上班遷嘗試獲取鎖,若是獲取到,則只有在下班以後纔會釋放鎖,保證在其上班的過程當中,不會有線程也跑來上崗搶飯碗,輸出以下源碼分析
小A_3:上班! 小A_3工做中!!!! 小A_3:下班 小A_1:上班! 小A_1工做中!!!! 小A_1:下班 // .... 省略部分 小B_7:上班! 小B_7工做中!!!! 小B_7:下班 小B_5:上班! 小B_5工做中!!!! 小B_5:下班 main over!
從基本的使用中,肯定lock的使用姿式通常以下:學習
Lock lock = new ReentrantLock()
lock.lock()
嘗試獲取鎖,若被其餘線程佔用,則阻塞lock.unlock()
; 通常來說,把釋放鎖的邏輯,放在須要線程同步的代碼包裝外的finally
塊中即常見的使用姿式應該是測試
try { lock.lock(); // ..... } finally { lock.unlock(); }
一個疑問,若沒被加鎖,僅只執行lock.unlock()
是否會有問題?
測試以下
@Test public void testLock() { Lock lock = new ReentrantLock(); // lock.lock(); // lock.unlock(); lock.unlock(); System.out.println("123"); }
執行以後,發現拋了異常(去掉上面的註釋,即加鎖一次,釋放鎖兩次也會拋下面異常)
另外一個疑問,代碼塊中屢次上鎖,釋放鎖只一次,是否會有問題?
將前面的TestDemo方法稍稍改動一下
public void work() { try { lock.lock(); workOn(); lock.lock(); System.out.println(Thread.currentThread().getName() + "工做中!!!!"); Thread.sleep(100); workOff(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
再次執行,發現其餘線程都沒法再次獲取到鎖了,運行的gif圖以下
所以能夠得出結論:
lock()
,lock()
連續調用的狀況,即二者之間沒有釋放鎖unlock()
的顯示調用在JDK的阻塞隊列中,不少地方就利用了Condition和Lock來實現出隊入隊的併發安全性,以 ArrayBlockingQueue
爲例
內部定義了鎖,非空條件,非滿條件
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { // ... 省略 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
出隊,入隊的實現以下(屏蔽一些與鎖無關邏輯)
// 入隊邏輯 public void put(E e) throws InterruptedException { // ... final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 若是隊列已滿,則執行Condtion notFull的等待方法 // 本線程會釋放鎖,等待其餘線程出隊以後,執行 notFull.singal()方法 notFull.await(); enqueue(e); } finally { // 釋放鎖 lock.unlock(); } } private void enqueue(E x) { // 入隊,notEmpty 條件執行,喚醒被 notEmpty.await() 阻塞的出隊線程 notEmpty.signal(); } // 出隊 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 隊列爲空,線程執行notEmpty.wait(),阻塞並釋放鎖 // 等待其餘入隊線程執行 notEmpty.signal(); 後被喚醒 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // ... // 出隊,notFull 條件執行,喚醒被 notFull.await() 阻塞的入隊線程 notFull.signal(); }
下面看下Condition的定義
public interface Condition { // 使當前線程處於等待狀態,釋放與Condtion綁定的lock鎖 // 直到 singal()方法被調用後,被喚醒(若中斷,就game over了) // 喚醒後,該線程會再次獲取與條件綁定的 lock鎖 void await() throws InterruptedException; // 相比較await()而言,不響應中斷 void awaitUninterruptibly(); // 在wait()的返回條件基礎上增長了超時響應,返回值表示當前剩餘的時間 // < 0 ,則表示超時 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同上,只是時間參數不一樣而已 boolean await(long time, TimeUnit unit) throws InterruptedException; // 同上,只是時間參數不一樣而已 boolean awaitUntil(Date deadline) throws InterruptedException; // 表示條件達成,喚醒一個被條件阻塞的線程 void signal(); // 喚醒全部被條件阻塞的線程。 void signalAll(); }
經過上面的註釋,也就是說Condtion通常是與Lock配套使用,應用在多線程協同工做的場景中;即一個線程的執行,指望另外一個線程執行完畢以後才完成
針對這種方式,咱們寫個測試類,來實現累加,要求以下:
上面這種狀況下,線程3的執行,要求線程1和線程2都執行完畢
說明:下面實現只是爲了演示Condition和Lock的使用,上面這種場景有更好的選擇,如Thread.join()或者利用Fork/Join都更加優雅
public class LockCountDemo { private int start = 10; private int middle = 90; private int end = 200; private volatile int tmpAns1 = 0; private volatile int tmpAns2 = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private AtomicInteger count = new AtomicInteger(0); private int add(int i, int j) { try { lock.lock(); int sum = 0; for (int tmp = i; tmp < j; tmp++) { sum += tmp; } return sum; } finally { atomic(); lock.unlock(); } } private int sum() throws InterruptedException { try { lock.lock(); condition.await(); return tmpAns1 + tmpAns2; } finally { lock.unlock(); } } private void atomic() { if (2 == count.addAndGet(1)) { condition.signal(); } } public static void main(String[] args) throws InterruptedException { LockCountDemo demo = new LockCountDemo(); Thread thread1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " : 開始執行"); demo.tmpAns1 = demo.add(demo.start, demo.middle); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + demo.tmpAns1); }, "count1"); Thread thread2 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " : 開始執行"); demo.tmpAns2 = demo.add(demo.middle, demo.end + 1); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + demo.tmpAns2); }, "count2"); Thread thread3 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " : 開始執行"); int ans = demo.sum(); System.out.println("the total result: " + ans); } catch (Exception e) { e.printStackTrace(); } }, "sum"); thread3.start(); thread1.start(); thread2.start(); Thread.sleep(3000); System.out.println("over"); } }
輸出以下
sum : 開始執行 count2 : 開始執行 count1 : 開始執行 count1 : calculate ans: 3960 the total result: 20055 count2 : calculate ans: 16095 over
小結Condition的使用:
Lock#newConditin()
進行實例化Condition#await()
會釋放lock,線程阻塞;直到線程中斷or Condition#singal()
被執行,喚醒阻塞線程,並從新獲取lockAQS是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題
AQS使用一個FIFO的隊列表示排隊等待鎖的線程,隊列頭節點稱做「哨兵節點」或者「啞節點」,它不與任何線程關聯。其餘的節點與等待線程關聯,每一個節點維護一個等待狀態waitStatus
private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; //取值爲 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一 volatile int waitStatus; volatile Node prev; volatile Node next; // Link to next node waiting on condition, // or the special value SHARED volatile Thread thread; Node nextWaiter; }
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
源碼分析:
tyrAcquire: 嘗試獲取鎖,非阻塞當即返回
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 沒有線程佔用鎖 if (compareAndSetState(0, acquires)) { // 佔用鎖成功,設置爲獨佔 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 本線程以前已經獲取到鎖 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新重入次數 setState(nextc); return true; } return false; }
非公平鎖tryAcquire的流程是:
addWaiter
: 入隊
private Node addWaiter(Node mode) { // 建立一個節點,並將其掛在鏈表的最後 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾節點爲空,說明隊列還未初始化,須要初始化head節點併入隊新節點 enq(node); return node; }
acquireQueued
掛起
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // /標記線程是否被中斷過 boolean interrupted = false; for (;;) { //獲取前驅節點 final Node p = node.predecessor(); // 若該節點爲有效的隊列頭(head指向的Node內部實際爲空) // 嘗試獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); // 獲取成功,將當前節點設置爲head節點 p.next = null; // help GC failed = false; //返回是否被中斷過 return interrupted; } // 判斷獲取失敗後是否能夠掛起,若能夠則掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 線程若被中斷,設置interrupted爲true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
線程掛起的邏輯
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驅節點的狀態 if (ws == Node.SIGNAL) // 前驅節點狀態爲signal,返回true return true; if (ws > 0) { // 從隊尾向前尋找第一個狀態不爲CANCELLED的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 將前驅節點的狀態設置爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
線程入隊後可以掛起的前提是,它的前驅節點的狀態爲SIGNAL,它的含義是「Hi,前面的兄弟,若是你獲取鎖而且出隊後,記得把我喚醒!」。
shouldParkAfterFailedAcquire
會先判斷當前節點的前驅狀態是否符合要求:
小結下lock()流程
acquire()
acquireQueued
, 掛起以前,會先嚐試獲取鎖,值有確認失敗以後,則掛起鎖,並設置前置Node的狀態爲SIGNAL(以保障在釋放鎖的時候,能夠保證喚醒Node的後驅節點線程)嘗試釋放鎖,成功,須要清楚各類狀態(計數,釋放獨佔鎖)
此外還須要額外判斷隊列下個節點是否須要喚醒,而後決定喚醒被掛起的線程;
public final boolean release(int arg) { if (tryRelease(arg)) { // 嘗試釋放鎖 Node h = head; if (h != null && h.waitStatus != 0) // 查看頭結點的狀態是否爲SIGNAL,若是是則喚醒頭結點的下個節點關聯的線程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 計算釋放後state值 if (Thread.currentThread() != getExclusiveOwnerThread()) // 若是不是當前線程佔用鎖,那麼拋出異常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 鎖被重入次數爲0,表示釋放成功,清空獨佔線程 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
Lock lock = new ReentrantLock()
lock.lock()
嘗試獲取鎖,若被其餘線程佔用,則阻塞lock.unlock()
; 通常來說,把釋放鎖的邏輯,放在須要線程同步的代碼包裝外的finally
塊中lock()
,lock()
連續調用的狀況,即二者之間沒有釋放鎖unlock()
的顯示調用Lock#newConditin()
進行實例化Condition#await()
會釋放lock,線程阻塞;直到線程中斷or Condition#singal()
被執行,喚醒阻塞線程,並從新獲取lockReentrantLock#lock
的流程圖大體以下