Java併發(5)- ReentrantLock與AQS

引言

synchronized未優化以前,咱們在編碼中使用最多的同步工具類應該是ReentrantLock類,ReentrantLock擁有優化後synchronized關鍵字的性能,又提供了更多的靈活性。相比synchronized,他在功能上更增強大,具備等待可中斷,公平鎖以及綁定多個條件等synchronized不具有的功能,是咱們開發過程當中必需要重點掌握的一個關鍵併發類。java

ReentrantLock在JDK併發包中舉足輕重,不只是由於他自己的使用頻度,同時他也爲大量JDK併發包中的併發類提供底層支持,包括CopyOnWriteArrayLitCyclicBarrierLinkedBlockingDeque等等。既然ReentrantLock如此重要,那麼瞭解他的底層實現原理對咱們在不一樣場景下靈活使用ReentrantLock以及查找各類併發問題就很關鍵。這篇文章就帶領你們一步步剖析ReentrantLock底層的實現邏輯,瞭解實現邏輯以後又應該怎麼更好的使用ReentrantLocknode

ReentrantLock與AbstractQueuedSynchronizer的關係

在使用ReentrantLock類時,第一步就是對他進行實例化,也就是使用new ReentrantLock(),來看看他的實例化的源碼:面試

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼

在代碼中能夠看到,ReentrantLock提供了2個實例化方法,未帶參數的實例化方法默認用NonfairSync()初始化了sync字段,帶參數的實例化方法經過參數區用NonfairSync()FairSync()初始化sync字段。安全

經過名字看出也就是咱們經常使用的非公平鎖與公平鎖的實現,公平鎖須要經過排隊FIFO的方式來獲取鎖,非公平鎖也就是說能夠插隊,默認狀況下ReentrantLock會使用非公平鎖的實現。那麼是sync字段的實現邏輯是什麼呢?看下sync的代碼:多線程

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {......}

static final class NonfairSync extends Sync {......}

static final class FairSync extends Sync {......}
複製代碼

到這裏就發現了AbstractQueuedSynchronizer類,公平鎖和非公平鎖其實都是在AbstractQueuedSynchronizer的基礎上實現的,也就是AQS。AQS提供了ReentrantLock實現的基礎。併發

ReentrantLock的lock()方法

分析了ReentrantLock的實例化以後,來看看他是怎麼實現鎖這個功能的:app

//ReentrantLock的lock方法
public void lock() {
    sync.lock();
}

//調用了Sync中的lock抽象方法
abstract static class Sync extends AbstractQueuedSynchronizer {
    ......
    /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */
    abstract void lock();
    ......
}
複製代碼

調用了synclock()方法,Sync類的lock()方法是一個抽象方法,NonfairSync()FairSync()分別對lock()方法進行了實現。工具

//非公平鎖的lock實現
static final class NonfairSync extends Sync {
    ......
    /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
    final void lock() {
        if (compareAndSetState(0, 1)) //插隊操做,首先嚐試CAS獲取鎖,0爲鎖空閒
            setExclusiveOwnerThread(Thread.currentThread()); //獲取鎖成功後設置當前線程爲佔有鎖線程
        else
            acquire(1);
    }
    ......
}

//公平鎖的lock實現
static final class FairSync extends Sync {
    ......
    final void lock() {
        acquire(1);
    }
    ......
}
複製代碼

注意看他們的區別,NonfairSync()會先進行一個CAS操做,將一個state狀態從0設置到1,這個也就是上面所說的非公平鎖的「插隊」操做,前面講過CAS操做默認是原子性的,這樣就保證了設置的線程安全性。這是非公平鎖和公平鎖的第一點區別。性能

那麼這個state狀態是作什麼用的呢?從0設置到1又表明了什麼呢?再來看看跟state有關的源碼:優化

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

/** * The synchronization state. */
private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}
複製代碼

首先state變量是一個volatile修飾的int類型變量,這樣就保證了這個變量在多線程環境下的可見性。從變量的註釋「The synchronization state」能夠看出state表明了一個同步狀態。再回到上面的lock()方法,在設置成功以後,調用了setExclusiveOwnerThread方法將當前線程設置給了一個私有的變量,這個變量表明瞭當前獲取鎖的線程,放到了AQS的父類AbstractOwnableSynchronizer類中實現。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    ......

    /** * The current owner of exclusive mode synchronization. */
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
複製代碼

若是設置state成功,lock()方法執行完畢,表明獲取了鎖。能夠看出state狀態就是用來管理是否獲取到鎖的一個同步狀態,0表明鎖空閒,1表明獲取到了鎖。那麼若是設置state狀態不成功呢?接下來會調用acquire(1)方法,公平鎖則直接調用acquire(1)方法,不會用CAS操做進行插隊。acquire(1)方法是實如今AQS中的一個方法,看下他的源碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

這個方法很重要也很簡單理解,有幾步操做,首先調用tryAcquire嘗試獲取鎖,若是成功,則執行完畢,若是獲取失敗,則調用addWaiter方法添加當前線程到等待隊列,同時添加後執行acquireQueued方法掛起線程。若是掛起等待中須要中斷則執行selfInterrupt將線程中斷。下面來具體看看這個流程執行的細節,首先看看tryAcquire方法:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

//NonfairSync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { //鎖空閒
        if (compareAndSetState(0, acquires)) { //再次cas操做獲取鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { //當前線程重複獲取鎖,也就是鎖重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//FairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && //判斷隊列中是否已經存在等待線程,若是存在則獲取鎖失敗,須要排隊
            compareAndSetState(0, acquires)) { //不存在等待線程,再次cas操做獲取鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { //當前線程重複獲取鎖,也就是鎖重入
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//AQS中實現,判斷隊列中是否已經存在等待線程
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
複製代碼

AQS沒有提供具體的實現,ReentrantLock中公平鎖和非公平鎖分別有本身的實現。非公平鎖在鎖空閒的狀態下再次CAS操做嘗試獲取鎖,保證線程安全。若是當前鎖非空閒,也就是state狀態不爲0,則判斷是不是重入鎖,也就是同一個線程屢次獲取鎖,是重入鎖則將state狀態+1,這也是ReentrantLock`支持鎖重入的邏輯。

公平鎖和非公平鎖在這上面有第二點區別,公平鎖在鎖空閒時首先會調用hasQueuedPredecessors方法判斷鎖等待隊列中是否存在等待線程,若是存在,則不會去嘗試獲取鎖,而是走接下來的排隊流程。至此非公平鎖和公平鎖的區別你們應該清楚了。若是面試時問道公平鎖和非公平鎖的區別,相信你們能夠很容易答出來了。

經過tryAcquire獲取鎖失敗以後,會調用acquireQueued(addWaiter),先來看看addWaiter方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);   //用EXCLUSIVE模式初始化一個Node節點,表明是一個獨佔鎖節點
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) { //若是尾節點不爲空,表明等待隊列中已經有線程節點在等待
        node.prev = pred; //將當前節點的前置節點指向尾節點
        if (compareAndSetTail(pred, node)) { //cas設置尾節點爲當前節點,將當前線程加入到隊列末尾,避免多線程設置致使數據丟失
            pred.next = node;
            return node;
        }
    }
    enq(node); //若是隊列中無等待線程,或者設置尾節點不成功,則循環設置尾節點
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node())) //空隊列,初始化頭尾節點都爲一個空節點
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { //重複addWaiter中的設置尾節點,也是cas的經典操做--自旋,避免使用Synchronized關鍵字致使的線程掛起
                t.next = node;
                return t;
            }
        }
    }
}

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node(); //共享模式
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;  //獨佔模式

    ......
}
複製代碼

addWaiter方法首先初始化了一個EXCLUSIVE模式的Node節點。Node節點你們應該很熟悉,我寫的集合系列文章裏面介紹了不少鏈式結構都是經過這種方式來實現的。AQS中的Node也不例外,他的隊列結構也是經過實現一個Node內部類來實現的,這裏實現的是一個雙向隊列。Node節點分兩種模式,一種SHARED共享鎖模式,一種EXCLUSIVE獨佔鎖模式,ReentrantLock使用的是EXCLUSIVE獨佔鎖模式,所用用EXCLUSIVE來初始化。共享鎖模式後面的文章咱們再詳細講解。

初始化Node節點以後就是將節點加入到隊列之中,這裏有一點要注意的是多線程環境下,若是CAS設置尾節點不成功,須要自旋進行CAS操做來設置尾節點,這樣即保證了線程安全,又保證了設置成功,這是一種樂觀的鎖模式,固然你能夠經過synchronized關鍵字鎖住這個方法,但這樣效率就會降低,是一種悲觀鎖模式。

設置節點的過程我經過下面幾張圖來描述下,讓你們有更形象的理解:

將當前線程加入等待隊列以後,須要調用acquireQueued掛起當前線程:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //獲取當前節點的前置節點
            if (p == head && tryAcquire(arg)) { //若是前置節點是頭節點,說明當前節點是第一個掛起的線程節點,再次cas嘗試獲取鎖
                setHead(node); //獲取鎖成功設置當前節點爲頭節點,當前節點佔有鎖
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && //非頭節點或者獲取鎖失敗,檢查節點狀態,查看是否須要掛起線程
                parkAndCheckInterrupt())  //掛起線程,當前線程阻塞在這裏!
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

能夠看到這個方法是一個自旋的過程,首先獲取當前節點的前置節點,若是前置節點爲頭結點則再次嘗試獲取鎖,失敗則掛起阻塞,阻塞被取消後自旋這一過程。是否能夠阻塞經過shouldParkAfterFailedAcquire方法來判斷,阻塞經過parkAndCheckInterrupt方法來執行。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //表明繼任節點須要掛起
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    if (ws > 0) { //表明前置節點已經退出(超時或中斷等狀況) 
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0); //前置節點退出,循環設置到最近的一個未退出節點
        pred.next = node;
    } else { //非可掛起狀態或退出狀態則嘗試設置爲Node.SIGNAL狀態
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//掛起當前線程
    return Thread.interrupted();
}
複製代碼

只有當節點處於SIGNAL狀態時才能夠掛起線程,Node的waitStatus有4個狀態分別是:

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
static final int PROPAGATE = -3;
複製代碼

註釋寫的很清楚,這裏就不詳細解釋着四種狀態了。到這裏整個Lock的過程咱們就所有說完了,公平鎖和非公平鎖的區別從Lock的過程當中咱們也很容易發現,非公平鎖同樣要進行排隊,只不過在排隊以前會CAS嘗試直接獲取鎖。說完了獲取鎖,下面來看下釋放鎖的過程。

ReentrantLock的unLock()方法

unLock()方法比較好理解,由於他不須要考慮多線程的問題,若是unLock()的不是以前lock的線程,直接退出就能夠了。看看unLock()的源碼:

public class ReentrantLock implements Lock, java.io.Serializable {
    ......
    public void unlock() {
        sync.release(1);
    }
    ......
}

public abstract class AbstractQueuedSynchronizer {
    ......
    public final boolean release(int arg) {
        if (tryRelease(arg)) { //嘗試釋放鎖
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //釋放鎖成功後啓動後繼線程
            return true;
        }
        return false;
    }
    ......
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    ......
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread()) //釋放鎖必需要是獲取鎖的線程,不然退出,保證了這個方法只能單線程訪問
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { //獨佔鎖爲0後表明鎖釋放,不然爲重入鎖,不釋放
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    ......
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    ......
    private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //掛起當前線程
    }
    ......
}
複製代碼

lock()方法同樣,會調用AQS的release方法,首先調用tryRelease嘗試釋放,首先必需要是當前獲取鎖的線程,以後判斷是否爲重入鎖,非重入鎖則釋放當前線程的鎖。鎖釋放以後調用unparkSuccessor方法啓動後繼線程。

總結

ReentrantLock的獲取鎖和釋放鎖到這裏就講完了,總的來講仍是比較清晰的一個流程,經過AQS的state狀態來控制鎖獲取和釋放狀態,AQS內部用一個雙向鏈表來維護掛起的線程。在AQS和ReentrantLock之間經過狀態和行爲來分離,AQS用管理各類狀態,並內部經過鏈表管理線程隊列,ReentrantLock則對外提供鎖獲取和釋放的功能,具體實現則在AQS中。下面我經過兩張流程圖總結了公平鎖和非公平鎖的流程。

非公平鎖:

公平鎖:
相關文章
相關標籤/搜索