ReentrantLock源碼自讀 -- AQS

ReentrantLock公平鎖加鎖過程

測試代碼

public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock(true);
    new Thread(() -> {
        System.out.println("11111 try get lock");
        lock.lock();
        System.out.println("11111");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.unlock();
    }).start();

    try {
        TimeUnit.MILLISECONDS.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(() -> {
        System.out.println("22222 try get lock");
        lock.lock();
        System.out.println("22222");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.unlock();
    }).start();

    try {
        TimeUnit.MILLISECONDS.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(() -> {
        System.out.println("33333 try get lock");
        lock.lock();
        System.out.println("33333");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.unlock();
    }).start();

    try {
        TimeUnit.MILLISECONDS.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("main");
}
這裏開啓 3個線程,而且讓 3個線程依次執行。

第一個線程加鎖

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

先來看看上面這段代碼中的tryAcquire方法。java

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
由於是第一個線程來獲取鎖,因此 getState()會獲得 state默認值 0,接下來進入第一個 if代碼塊,再看 hasQueuedPredecessors方法。
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
一樣的由於是第一個線程,因此 tailhead也是默認初始值 null,也就是說 return語句塊中的 h != t將返回 false,再回到 tryAcquire方法中,這時 !hasQueuedPredecessors()true,因此緊接着執行 compareAndSetState(0, acquires),這句代碼的意思是將 state的值由 0變爲 1,表明上鎖成功,而後將當前線程置爲獨佔全部者。這時上鎖完成。
那麼下面的 else if中的代碼是什麼意思呢?沒錯,正如你想,是重入邏輯。

至此第一個線程的加鎖邏輯完成,有沒有發現目前跟AQS沒有一毛錢的關係。node

第二個線程加鎖

毫無疑問,咱們仍是從 tryAcquire方法開始。

在不考慮重入的狀況下,這時執行getState()方法獲得的值爲1。因此第一個if代碼塊不會進,同理,當前獨佔線程也不是咱們所謂的第二個線程,因此else if代碼塊也不會進,那麼直接返回false。這時!tryAcquire(arg)true,那麼執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。app

先看看 addWaiter(Node.EXCLUSIVE)方法。
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
能夠看到該方法一開始就 new了一個 Node,初始化該 Node包含咱們所謂的第二個線程。注意這裏開始涉及 AQS(AbstractQueuedSynchronizer)了,其實就是一個鏈表隊列。
言歸正傳,代碼緊接着獲取隊列的尾部節點,由於第一個線程並無涉及到隊列,因此這裏毫無覺得 tail節點爲 null,所以執行 enq(node)方法。

老規矩,看看代碼。測試

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
一進來就看到個死循環。一樣的獲取尾部節點,沒的說,鐵定爲 null。那麼執行 if代碼塊, new了一個空節點給頭部節點,而後又把頭部節點賦值給尾部節點(這時頭部和尾部指向同一內存地址)。而後第一次循環結束,由於沒有返回值,因此執行第二次循環,這時候尾部節點已經不是 null了(哪怕它是一個空節點),那麼進入 else代碼塊。
先把傳進來的 node(上個方法中 new出來的包含第二個線程的 node)的上一個節點設爲第一次循環設置的空節點。
而後把傳進來的 node給尾部節點。
最後把空節點的下一個節點設置爲傳進來的 node
這樣就造成了擁有兩個節點的雙向鏈表。
有點難理解,畫個幼兒園水平的圖來看看。
首先定義一個抽象的空節點。

clipboard.png

而後定義addWaiter方法new出來的節點,賜名t2節點。

clipboard.png

再而後定義一個enq方法new出來的空節點。

clipboard.png

最後看看節點間的關聯關係

執行tail = head;ui

clipboard.png

執行Node t = tail;this

clipboard.png

執行node.prev = t;spa

clipboard.png

執行compareAndSetTail(t, node)線程

clipboard.png

執行t.next = node;3d

clipboard.png

最後造成AQS隊列code

clipboard.png

圖畫完了,僅表明我的理解。
最後看 acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
一進來又是一個死循環。第一次循環 if (p == head && tryAcquire(arg))這句代碼,由於咱們設定的場景,因此確定加鎖失敗,因此直接看 shouldParkAfterFailedAcquire方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
這時候由於尚未進入等待鎖的線程,因此 waitStatus0,那麼執行 else中的代碼將 waitStatus置爲 -1,值得注意的是這裏操做的是上一個節點的 waitStatus。緊接着第二次循環一樣會加鎖失敗(由於咱們設定的場景),再一次進入 shouldParkAfterFailedAcquire方法,這時候 waitStatus已經被置爲 -1了,因此返回 true。而後執行 parkAndCheckInterrupt方法。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
這時候直接調用 native方法 park(),使第二個線程進入等待。下面先說第三個線程,再看第二個線程是如何加鎖成功的。

第三個線程加鎖

再次強調咱們設定的場景緻使第三次加鎖鐵定失敗,因此直接看 addWaiter方法。
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
這個時候 pred確定不是 null了,因此將第三個線程節點的上一個節點設置爲第二個線程節點,而後將第三個線程節點設置爲尾節點。而後再看 acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
這個時候 p節點確定不是 head節點(實際上是第二個線程節點),全部執行第二個 if代碼塊,後面的邏輯就和第二個線程走的同樣了。

ReentrantLock公平鎖解鎖過程

第一個線程解鎖

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
直接看 tryRelease方法
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
仍是在不考慮重入的狀況下, getState等於 1,因此 c等於 0,將獨佔線程置爲 null,最後將 state置爲 0
接下來,由於第一個線程更 AQS隊列沒有半毛錢關係,因此直接執行 unparkSuccessor方法而且返回 true,解鎖完成。
那麼再來看看 unparkSuccessor方法。
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
上面說過當有線程進入 AQS隊列時會將 waitStatus置爲 -1,因此進入第一個 if代碼塊,將 waitStatus置爲 0,而後拿到隊列中的第二個節點( 也就是第二個線程節點),將其喚醒。
而後再說說第二個線程節點被喚醒之候幹些什麼?
先看看第二個線程進入等待的代碼。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
當第二個線程被喚醒以後,將執行 return語句,這時由於咱們外部並無打斷線程,因此返回 false。而後繼續開始循環執行 if (p == head && tryAcquire(arg))語句,這時候知足 p == head,再看 tryAcquire方法。
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
這個時候由於第一個線程解鎖,將 state置爲了 0,因此進入 hasQueuedPredecessors()方法。
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
由於這個時候隊列裏有三個節點(第一個空節點,第二個線程節點,第三個線程節點),因此 h != t知足條件, (s = h.next) == null也知足條件,所以返回 true,而後再反過來看,執行 compareAndSetState(0, acquires)方法,將 state置爲 1,表示第二個線程上鎖成功,而且將獨佔線程置爲第二個線程,最後返回 true
繼續往上層看,這時執行 if (p == head && tryAcquire(arg))中的方法。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
將第二個線程節點設置爲頭節點,並將頭節點置爲空節點。
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
第二個線程加鎖完成。
第三個線程加鎖第二個線程相同。不說了。

ReentrantLock非公平鎖加鎖過程

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
能夠看到,非公平鎖一上來就是嘗試去改變 state狀態。失敗了走與公平鎖同樣的路。因此,非公平鎖加鎖失敗以後仍是老老實實排隊。
相關文章
相關標籤/搜索