JUC之ReentrantLock源碼分析

ReentrantLock:實現了Lock接口,是一個可重入鎖,而且支持線程公平競爭和非公平競爭兩種模式,默認狀況下是非公平模式。ReentrantLock算是synchronized的補充和替代方案。node

公平競爭:聽從先來後到的規則,先到先得
非公平競爭:正常狀況下是先到先得,可是容許見縫插針。即持有鎖的線程剛釋放鎖,等待隊列中的線程剛準備獲取鎖時,忽然半路殺出個程咬金,搶到了鎖,等待隊列中等待獲取鎖的線程只能乾瞪眼,接着等搶到鎖的線程釋放鎖安全

ReentrantLock與synchronized比較:
  一、ReentrantLock底層是經過將阻塞的線程保存在一個FIFO隊列中,synchronized底層是阻塞的線程保存在鎖對象的阻塞池中
  二、ReentrantLock是經過代碼機制進行加鎖,因此須要手動進行釋放鎖,synchronized是JAVA關鍵字,加鎖和釋放鎖有JVM進行實現
  三、ReentrantLock的加鎖和釋放鎖必須在方法體內執行,可是能夠不用同一個方法體,synchronized能夠在方法體內做爲方法塊,也能夠在方法聲明上
  四、synchronized進行加鎖時,若是獲取不到鎖就會直接進行線程阻塞,等待獲取到鎖後再往下執行。ReentrantLock既能夠阻塞線程等待獲取鎖,也能夠設置等待獲取鎖的時間,超過等待獲取時間就放棄獲取鎖,再也不阻塞線程多線程

ReentrantLock源碼分析:less

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

/**
 * Base of synchronization control for this lock. Subclassed
 * into fair and nonfair versions below. Uses AQS state to
 * represent the number of holds on the lock.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    ....
}

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    ....
}

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    ....
}

 

 

 從省略細節的源碼中咱們能夠很清晰的看到,ReentrantLock內部定義了三個內部類,所有直接或者間接的繼承AbstractQueuedSynchronizer,ReentrantLock有一個屬性sync,默認狀況下爲NonfairSync類型,即爲非公平鎖。實際上ReentrantLock的全部操做都是有sync這個屬性進行的,ReentrantLock只是一層外皮。源碼分析

ReentrantLock既然實現類Lock接口,咱們就先以加鎖、解鎖進行分析:
一、加鎖(非公平):ui

 public void lock() {
        sync.lock();
}

就像上面說的同樣,ReentrantLock的功能都是靠底層sync進行實現,ReentrantLock的加鎖很簡單,就一句話使用sync的lock()方法,咱們直接看源碼this

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

sync的lock()方法也是比較簡單的,先經過CAS試圖將AQS的state由0變爲1。若是成功,表示該線程獲取鎖成功(設值時沒有線程在持有鎖),就將當前線程設置爲鎖的擁有者(這就是前面所說得見縫插針,後面還有);若是失敗,只表示設置值沒有成功,不表示該線程獲取鎖失敗(由於有多是重入加鎖),開始調用AQS的acquire(int)方法進行獲取鎖,咱們直接看acquire(int)方法spa

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這是AQS定義的模板方法,由AQS的子類去重寫tryAcquire(int) 方法,由AQS去調用,進行嘗試獲取鎖。若是獲取鎖成功,方法直接結束;若是獲取鎖失敗,就將當前線程進行阻塞並加入到FIFO的CLH隊列中。咱們先看ReentrantLock內部類是如何重寫tryAcquire(int)方法的線程

protected final boolean tryAcquire(int acquires) {
  // 直接調用父類nonfairTryAcquire(int)方法
return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

咱們直接看sync聲明的nonfairTryAcquire(int)方法,先獲取保存的state,若是值爲0,表示當前沒有線程持有鎖,處理和前面的一致,直接見縫插針,經過CAS嘗試直接設置值,設值失敗就表示獲取鎖失敗了,直接返回失敗結果;若是state不是0,表示鎖被線程持有,就比較下持有鎖的線程是不是當前線程,若是是,表示線程重入持有鎖,進行state值的累加。若是不是,直接返回持有鎖失敗結果。tryAcquire(int)方法獲取鎖失敗後,會去執行AQS聲明的acquireQueued(Node, int)方法將當前線程封裝到CLH隊列的節點Node中,並進行阻塞。咱們先看看AQS是將當前線程封裝到Node中都作了什麼操做code

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 將隊尾的node改成新建的node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

將當前線程裝進Node實例中,並設置改Node爲獨享模式。而後獲取隊尾的Node實例,由於走到這一步的時候有兩種獲取鎖失敗的場景:一、競爭鎖時,沒有線程持有鎖;二、競爭鎖時,已有別的線程持有鎖。因此會先判斷下隊尾Node是否爲null,若是爲空,表示是第一種場景獲取鎖失敗,若是不爲空,這是第二種場景獲取鎖失敗。先看第2種場景的處理流程,直接將隊尾的Node設置爲新建Node實例的prev(表示在隊列中的前一個Node節點),而後經過CAS嘗試將新建的Node節點設置爲隊尾(這個時候用CAS是由於有可能存在多線程競爭),若是設置隊尾成功,就將前任隊尾的next節點設置爲新建的node節點;若是設置隊尾失敗(多線程競爭纔會出現),和場景1進行相同的處理,先看源碼

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

處理很簡單,先獲取隊尾,若是獲取的隊尾爲null,表示上一步場景1過來的,經過CAS將隊首設置爲一個空的Node節點(該節點表示正在持有鎖的線程封裝的Node,僅僅是表明,沒有實值),並將隊尾和隊首指向同一個節點;若是獲取的隊尾不爲null,將隊尾設置爲參數Node的上一個節點,並經過CAS嘗試將參數Nodo設置爲隊尾,若是設置成功,將新隊尾設置爲前任隊尾的next節點,並直接返回;若是設置失敗,往復循環,直到成功爲止。

經過前面對addWaiter(Node)源碼的分析,咱們能夠清楚的瞭解到addWaiter方法將當前線程封裝到CLH隊列的獨享模式的Node節點中,並經過CAS將當前線程的Node節點設置爲隊尾。下面咱們接着看acquireQueued(final Node node, int arg)方法會作什麼處理

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取前任節點
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

先獲取參數Node節點的前任節點,若是前任節點是隊首時,會去再次調用tryAcquire(int)方法去嘗試持有鎖,若是成功持有鎖,就將參數Node直接設置爲隊首,同時將前任隊首的next節點設置爲null(去除引用,利於GC),而後直接返回當前線程是否要進行中斷操做。若是前任節點不是隊首或者再次嘗試持有鎖失敗,會先調用shouldParkAfterFailedAcquire(Node pre, Node node)進行判斷是否須要進行線程阻塞,若是須要線程阻塞再調用parkAndCheckInterrupt()進行線程阻塞(該方法返回值表示該線程是不是中斷狀態)。線程阻塞後會等待前任節點釋放鎖時喚醒結束阻塞,線程結束阻塞後會循環再次去獲取鎖。可是若是結束阻塞後去獲取鎖時,有新的線程見縫插針直接獲取到鎖了,那就只能再次在隊列中進行阻塞了。其實shouldParkAfterFailedAcquire和parkAndCheckInterrupt看方法名稱就能猜個大概了,咱們仍是直接先看shouldParkAfterFailedAcquire(Node pre, Node node)方法的源碼

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 這個節點已經設置了狀態,請求釋放信號通知它,這樣它就能夠安全地進行阻塞了。
         */
        return true;
    if (ws > 0) {
        /*
         * 此時前任被取消了,跳過前任並重試。
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 等待狀態必須爲0或傳播。將節點狀態設置爲SIGNAL,告訴前任節點後面節點須要釋放信號通知,但先不進行阻塞。呼叫者將須要重試,以確保它不能在阻塞前得到鎖。
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

先獲取前任節點的waitStatus(等待狀態),1.若是前任節點的等待狀態爲Node.SIGNAL(表示後面的節點須要前任節點釋放鎖時進行通知,結束後面節點的阻塞),直接返回true,執行後面的阻塞流程;2.1若是前任節點的等待轉態值大於0,表示前任節點被取消了(爭奪鎖的線程因過了設定時間,獲取鎖失敗,從隊列中刪除節點的時候,Node節點會被設爲取消狀態),跳過前任節點,往前找,直到找到不是取消狀態的節點,直接將找到的有效前任節點的next節點設置爲當前節點;2.2若是前任節點不是取消狀態(多是初始值0、等待狀態或者傳播狀態),經過CAS嘗試將前任節點的waitStatus設置爲Node.SIGNAL(無論設置成功與否,都直接返回false,後面會再次循環執行,用來確保該節點的線程在阻塞前必定不會獲取到鎖,由於存在見縫插針去爭取持有鎖的線程)。shouldParkAfterFailedAcquire方法返回true,會進行線程阻塞,返回false,調用層會進行循環讓當前線程再次獲取一次鎖,失敗後再次被調用進行清洗已經取消的節點或者進行前任節點的等待狀態設置爲Node.SIGNAL。

下面咱們再來看看parkAndCheckInterrupt()方法的源碼

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt()方法的內容很簡單,使用LockSupport的park(Object blocker)方法進行線程阻塞,有興趣的同窗能夠自行去深刻了解,等待前任節點調用調用LockSupport的unpark(Thread thread)喚醒該線程。而後在該線程結束阻塞後返回該線程是不是中斷的狀態。

至此,非公平鎖獲取鎖的流程就分析完了,總結下非公平鎖加鎖流程

  1.無論線程有沒有被持有,先嚐試獲取鎖
  2.鎖未被持有,直接獲取鎖。2.1獲取鎖成功,結束;2.2獲取鎖失敗,封裝成CLH隊列的Node節點,並進行線程阻塞
  3.鎖被持有。3.1線程重入加鎖,進行state累加,等待釋放鎖時進行減除;3.2封裝成CLH隊列的Node節點,並進行線程阻塞

二、定長時間加鎖(非公平)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

沒什麼可說的,依然是使用sync進行操做,咱們直接看sync的實現源碼

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

這裏直接先判斷當前線程是不是中斷狀態(多是由於ReentrantReadWriteLock中的WriteLock也會使用才進行判斷),若是是中斷狀態,直接拋異常。咱們這邊確定不會是中斷狀態的啦,接着往下走,調用tryAcquire(int)方法嘗試獲取鎖,忘了過程的同窗請往前翻。若是獲取鎖成功,那就直接結束。獲取鎖失敗時,執行doAcquireNanos(int arg, long nanosTimeout)方法以獨佔定時模式獲取鎖。咱們直接看源碼

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

一樣是先建立一個Node節點並放置到CLH隊列的隊尾,而後一樣的是開始進行循環處理,不一樣的地方是,定長獲取鎖使用了LockSupport.pargNanos(Object blocker, long nanos)進行阻塞一段時間,若是在線程阻塞未自動結束時,前一個節點釋放了鎖,該節點同樣會被解除阻塞去爭奪鎖,若是不幸被別的線程見縫插針搶去了鎖,那就接着去阻塞定長時間(這個定長時間是根據最初設定的時間和當前時間的差值),等待鎖的釋放。若是超過了定長時間仍是沒有獲取到鎖,就會調用cacelAcquire(Node node)去刪除該節點。

三、公平競爭加鎖

公平鎖對象FairSync的lock()方法直接調用了acquire(int)方法,前面咱們分析了,acquire(int)方法會先調用tryAcquire(int)方法去嘗試獲取鎖,根據返回結果去判斷是否須要加入到隊列中。下面咱們直接FairSync的源碼

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

源碼的邏輯比較清晰簡單,先判斷當前鎖是不是空閒狀態(state是不是0),若是是空閒的,就去嘗試獲取鎖,返回爭奪結果;若是不是空閒的,判斷是不是線程重入,若是是就累加state,返回成功,若是不是重入,直接返回失敗。若是tryAcquire方法返回了false,那麼就會將該線程封裝到CLH隊列的Node中並進行線程阻塞,後面的流程和非公平鎖時一致的。

總結下公平鎖加鎖的流程
  1.鎖未被持有,嘗試直接獲取鎖。2.1獲取鎖成功,結束;2.2獲取鎖失敗,封裝成CLH隊列的Node節點,並進行線程阻塞
  2.鎖被持有。2.1線程重入加鎖,進行state累加,等待釋放鎖時進行減除;2.2封裝成CLH隊列的Node節點,並進行線程阻塞

四、釋放鎖:

public void unlock() {
    sync.release(1);
}

ReentrantLock的釋放鎖直接調用的sync屬性的release(int)方法,實際是直接調用的AbstractQueuedSynchronizer的release(int)方法,咱們直接看源碼

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

依然是一樣的配方,調用子類重寫的tryRelease(int)方法去真正釋放鎖,若是釋放成功,並且隊列中有節點在等待隊首釋放鎖後進行通知,就會調用unparkSuccessor(Node node)去解除下一個節點的線程阻塞狀態,讓下一個線程去獲取鎖。

咱們先看看子類重寫的tryRelease(int)方法

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

源碼比較簡單,先比較當前線程是不是持有鎖的線程,若是不是,直接拋異常,說明調用者使用不規範,沒有先去獲取鎖。而後進行state的減除,先判斷此時state是否爲0,爲0表示線程徹底釋放了鎖。若是爲0,就將鎖的持有者變爲null。無論最後有沒有徹底釋放鎖都會將state設置成新值。

咱們再看看unparkSuccessor(Node node)都作了什麼

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

此時參數node表明的head,若是head的waitStatus小於0,表示後面節點須要在等待通知鎖被釋放的信號,先將head的waitStatus改成0,而後去看看head的next節點是存在而且next節點的waitStatus小於0,。若是next節點爲null或者waitStatus大於0,就從隊尾tail節點依次往前找,找到head節點後第一個waitStatus不大於0的節點,而後結束該節點的線程阻塞狀態;若是head的next節點存在而且waitStatus不大於0,直接解除head的next節點線程阻塞狀態。

總結下鎖釋放過程:
  1.先判斷當前線程是否能夠進行鎖釋放
  2.state減除,若是減除後的state爲0,就將鎖的持有者設爲空,並解除下一個等待節點的線程阻塞狀態

爲了加深印象,我專門還花了三個流程圖,一塊兒看看

到這裏基本上ReentrantLock的主要功能點都說完了,還有一個Condition功能沒說,接着搞起來。

ReentrantLock的Condition:

ReentrantLock方法:
public Condition newCondition() {
    return sync.newCondition();
}

Sync內部類方法:
final ConditionObject newCondition() {
    return new ConditionObject();
}

仍是一如既往的配方,ReentrantLock的newCondition依然用的是sync屬性去實現功能,Sync也簡單,直接就是建立一個AbstractQueuedSynchronizer的內部列ConditionObject的實例。咱們直接去看ConditionObject的源碼去分析

Condition主要是wait()、signal()兩個方法,咱們先來看wait()方法

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

分析以前先說明下,ConditionObject中一樣保存的有CLH隊列,和外部類AbstractQueuedSynchronizer類似。

咱們經過await()方法的源碼簡單分析下:
  一、首先調用了addConditionWaiter()方法將當前線程封裝到Node.CONDITION模式的Node中,放入到ConditionObject的CLH隊列中,順便去除了一些隊列中waitStatus不是Node.CONDITION的節點。
  二、調用fullyRelease(Node node)去徹底釋放掉鎖,並去解除AbstractQueuedSynchronizer中隊列的head節點,方法返回節點徹底釋放前的state值
  三、循環:對當前節點進行線程阻塞,直到被其餘線程使用signal()或者signalAll()方法解除線程阻塞狀態
  四、將節點加入到AbstractQueuedSynchronizer的CLH隊列中,等待爭奪鎖

總體上Condition的wait()方法的內容就這麼多,源碼也比較簡單,有興趣的能夠本身深刻看看。

如今看下signal()的源碼:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal()方法的主要工做仍是放在了doSignal(Node first)方法和transferForSignal(Node node)兩個方法上。在doSignal(Node first)方法中,獲取到ConditionObject的CLH隊列的隊首,而後調用transferForSignal(Node node)方法先將節點的waitStatus改成0,而後將節點放入到AbstractQueuedSynchronizer的CLH隊列隊尾,若是前任隊尾的waitStatus大於0或者將前任隊尾的waitStatus改成Node.SIGNAL失敗時,直接解除節點的線程阻塞狀態,結束wait()方法中的循環,調用acquireQueued(Node node, int savedState)去嘗試搶奪鎖,由於此時當前線程仍然持有鎖,因此節點最後仍是會被線程阻塞。由於此時節點node已經從ConditionObject的CLH隊列遷移到了AQS的CLH隊列隊尾,即便if條件不知足,不能解除node節點的線程阻塞狀態,等到前任隊尾節點釋放鎖時仍是會解除node節點的線程阻塞狀態。

Condition還有await(long time, TimeUnit unit)、signalAll()等等其它方法,原理差很少,這裏就不一一贅述了。至此,ReentrantLock的知識點基本上也說的差很少了。

相關文章
相關標籤/搜索