ReentrantLock是一個可重入的互斥鎖,也被稱爲獨佔鎖。它支持公平鎖和非公平鎖兩種模式。
java
下面看一個最初級的例子:node
public class Test { //默認內部採用非公平實現 ReentrantLock lock=new ReentrantLock(); public void myMethor(){ lock.lock(); //須要加鎖的一些操做 //必定要確保unlock能被執行到,尤爲是在存在異常的狀況下 lock.unlock(); } }
在進入方法後,在須要加鎖的一些操做執行以前須要調用lock方法,在jdk文檔中對lock方法詳細解釋以下:less
得到鎖。
若是鎖沒有被另外一個線程佔用而且當即返回,則將鎖定計數設置爲1。 若是當前線程已經保持鎖定,則保持計數增長1,該方法當即返回。 若是鎖被另外一個線程保持,則當前線程將被禁用以進行線程調度,而且在鎖定已被獲取以前處於休眠狀態,此時鎖定保持計數被設置爲1。ide
這裏也很好的解釋了什麼是可重入鎖,若是一個線程已經持有了鎖,它再次請求獲取本身已經拿到的鎖,是可以獲取成功的,這就是可重入鎖。源碼分析
在須要加鎖的代碼執行完畢以後,就會調用unlock釋放掉鎖。在jdk文檔之中對,unlock的解釋以下:ui
嘗試釋放此鎖。
若是當前線程是該鎖的持有者,則保持計數遞減。 若是保持計數如今爲零,則鎖定被釋放。 若是當前線程不是該鎖的持有者,則拋出IllegalMonitorStateException 。this
在這裏有一個須要注意的地點,lock和unlock都反覆提到了一個計數,這主要是由於ReentrantLock是可重入的。每次獲取鎖(重入)就將計數器加一,每次釋放的時候的計數器減一,直到計數器爲0,就將鎖釋放掉了。線程
以上就是最基礎,最簡單的使用方法。其他的一些方法,都是一些拓展的功能,查看jdk文檔便可知道如何使用。3d
能夠看出ReentrantLock繼承自AQS並實現了Lock接口。它內部有公平鎖和非公平鎖兩種實現,這兩種實現都是繼承自Sync。根據ReentrantLock決定到底採用公平鎖仍是非公平鎖實現。code
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
public void lock() { sync.lock(); }
咱們以非公平鎖實現來看下面的下面的代碼。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
首先進來就是一個判斷,其中判斷的條件就是compareAndSetState(0, 1)
.毫無疑問這是一個CAS。它的意思時若是當前的state的值的爲0就將1與其交換(能夠理解爲將1賦值給0)並返回true。其實在這一步若是state的值修改爲功了,那麼鎖就獲取成功了。setExclusiveOwnerThread(Thread.currentThread())
這行代碼就是將當前線程設置爲該排他鎖的擁有者。
若是CAS失敗了,那麼就調用acquire(1);
qcquire(1)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這個方法進來首先第一步就是調用tryAcquire(arg)
.
那麼該方法是幹什麼的呢?
非公平鎖實際是調用了這個實現:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
它具體的實現是在nonfairTryAcquire(acquires)
中。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //獲取鎖的狀態state,這就是前面咱們CAS的操做對象 if (c == 0) { //c==0說明沒被其它獲取 if (compareAndSetState(0, acquires)) { //CAS修改state //CAS修改爲功,說明獲取鎖成功,將當前線程設置爲該排他鎖的擁有者 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //若是鎖已經被佔有,可是是被當前鎖佔有的(可重入的具體體現) //計數器加一 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //鎖被其它線程佔有,就返回false return false; }
void acquire(int arg)
首先嚐試獲取鎖,獲取成功就直接返回了,獲取失敗就會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
進行排隊。addWaiter(Node.EXCLUSIVE)
一部分是acquireQueued
.咱們先看addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { //隊列已經初始化了,就直接入隊便可 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; //返回 } } //隊列沒有初始化,初始化隊列併入隊 enq(node); return node; }
初始化對立對入隊的具體實現以下:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //初始化隊列 if (compareAndSetHead(new Node())) tail = head; } else { //隊列初始化成功,進行入隊 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這裏稍微補充一下這個AQS中的這個等待隊列。
boolean acquireQueued(final Node node, int arg)
方法了。final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //獲取當前節點的先驅節點 if (p == head && tryAcquire(arg)) { //若是當前節點的前一個節點是頭節點,就會執行tryAcquire(arg)再次嘗試獲取鎖 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //根據狀況進入park狀態 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒等待的線程,能夠拿鎖了 unparkSuccessor(h); return true; } return false; }
該方法首先就調用了tryRelease(arg)
方法,這個方法就是實現釋放資源的關鍵。釋放的具體操做,也印證了在jdk文檔之中的關於unlock和lock的說明。
protected final boolean tryRelease(int releases) { int c = getState() - releases; //計算釋放後的state的值 if (Thread.currentThread() != getExclusiveOwnerThread()) //若是當前線程沒有持有鎖,就拋異常 throw new IllegalMonitorStateException(); boolean free = false; //標記爲釋放失敗 if (c == 0) { //若是state爲0了,說沒沒有線程佔有該鎖了 //進行重置全部者 free = true; setExclusiveOwnerThread(null); } //重置state的值 setState(c); return free; }
boolean release(int arg)
看if (h != null && h.waitStatus != 0) //喚醒等待的線程,能夠拿鎖了 unparkSuccessor(h);
咱們使用synchronized的時候,能夠經過wait和notify來讓線程等待,和喚醒線程。在ReentrantLock中,咱們也可使用Condition中的await和signal來使線程等待和喚醒。
如下面這段代碼來解釋:
public class Test { static ReentrantLock lock=new ReentrantLock(); //獲取到condition static Condition condition=lock.newCondition(); public static class TaskA implements Runnable{ @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getName() + "開始執行"); try { System.out.println(Thread.currentThread().getName() + "準備釋放掉鎖並等待"); //在此等待,直到其它線程喚醒 condition.await(); System.out.println(Thread.currentThread().getName() + "從新拿到鎖並執行"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } } public static class TaskB implements Runnable{ @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getName() + "開始執行"); System.out.println(Thread.currentThread().getName() + "開始喚醒等待的線程"); //喚醒等待的線程 condition.signal(); try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "任務執行完畢"); }catch (InterruptedException e){ e.printStackTrace(); }finally { lock.unlock(); } } } public static void main(String[] args) { Thread taskA=new Thread(new TaskA(),"taskA"); Thread taskB=new Thread(new TaskB(),"taskB"); taskA.start(); taskB.start(); } }
輸出結果:
taskA開始執行 taskA準備釋放掉鎖並等待 taskB開始執行 taskB開始喚醒等待的線程 taskB任務執行完畢 taskA從新拿到鎖並執行
現象解釋:
首先taskA拿到鎖,並執行,到condition.await();
釋放鎖,並進入阻塞。taskB所以拿到剛纔taskA釋放掉的鎖,taskB開始執行。taskB執行到condition.signal();
喚醒了taskA,taskB繼續執行,taskA由於拿不到鎖,所以雖然已經被喚醒了,可是仍是要等到taskB執行完畢,釋放鎖後,纔有機會拿到鎖,執行本身的代碼。
那麼這個過程,源碼究竟是如何實現的呢?
具體的實現以下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //添加一個條件節點 int savedState = fullyRelease(node); //釋放掉全部的資源 int interruptMode = 0; while (!isOnSyncQueue(node)) { //若是當前線程不在等待隊列中,park阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; //線程被中斷就跳出循環 } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //取消條件隊列中已經取消的等待節點的連接 unlinkCancelledWaiters(); if (interruptMode != 0) //等待結束後處理中斷 reportInterruptAfterWait(interruptMode); }
基本的步驟以下:
InterruptedException()
異常具體的實現代碼以下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
這個方法中最重要的也就是doSignal(first)
.
它的實現以下:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //解除等待隊列中首節點的連接 } while (!transferForSignal(first) && //轉移入等待隊列 (first = firstWaiter) != null); }
該方法所作的事情就是從等待隊列中移除指定節點,並將其加入等待隊列中去。
轉移節點的方法實現以下:
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS修改狀態失敗,說明節點被取消了,直接返回false return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); //加入節點到等待隊列 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //若是前節點被取消,說明當前爲最後一個等待線程,直接unpark喚醒, LockSupport.unpark(node.thread); return true; }
至此ReentrantLock的源碼分析就結束了!