Java多線程框架源碼閱讀之---ReentrantLock非公平鎖

部分段落來自於http://javadoop.com/post/Abst...,他的文章至關不錯。java

ReentrantLock基於Sync內部類來完成鎖。Sync繼承於AbstractQueuedSynchronizer。Sync有兩個不一樣的子類NonfairSync和FairSync。node

ReentrantLock的大部分方法都是基於AbstractQueuedSynchronizer實現,大部分僅僅是對AbstractQueuedSynchronizer的轉發。所以,瞭解AbstractQueuedSynchronizer就很是重要。app

做爲AbstractQueuedSynchronizer的實現者須要實現isHeldExclusively,tryAcquire,tryRelease,(可選tryAcquireShared,tryReleaseShared)less


那麼咱們看看對於一個經常使用的套路,ReentrantLock是如何實現同步的oop

lock.lock();
try{
   i++;
}finally {
   lock.unlock();
}

lock.lock()內部實現爲調用了sync.lock(),以後又會調用NonfairSync或FairSync的lock(),你看果真重度使用了AQS吧,這裏咱們先記住這個位置,一會咱們還會回來分析。post

public void lock() {
    sync.lock();
}

先介紹一下AQS裏面的屬性,不復雜就4個主要的屬性:AQS裏面阻塞的節點是做爲隊列出現的,維護了一個head節點和tail節點,同時維護了一個阻塞狀態,若是state=0表示沒有鎖,若是state>0表示鎖被重入了幾回。
注意head是一個假節點,阻塞的節點是做爲head後面的節點出現的。
aqs-0.pngui

// 頭結點,你直接把它當作 當前持有鎖的線程 多是最好理解的
private transient volatile Node head;
// 阻塞的尾節點,每一個新的節點進來,都插入到最後,也就造成了一個隱視的鏈表
private transient volatile Node tail;
// 這個是最重要的,不過也是最簡單的,表明當前鎖的狀態,0表明沒有被佔用,大於0表明有線程持有當前鎖
// 之因此說大於0,而不是等於1,是由於鎖能夠重入嘛,每次重入都加上1
private volatile int state;
// 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入
// reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

接着看一下FairSync和NonfairSync的實現,FairSync和NonfairSync都繼承了Sync,並且Sync又繼承了AbstractQueuedSynchronizer。能夠看到FairSync和NonfairSync直接或間接的實現了isHeldExclusively,tryAcquire,tryRelease這三個方法。this

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
     //若是沒有鎖上,則設置爲鎖上並設置本身爲獨佔線程
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
     //若是鎖上了,並且獨佔線程是本身,那麼從新設置state+1,而且返回true
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
     //不然返回false
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //若是沒有人鎖上,那麼就設置我本身爲獨佔線程,不然再acquire一次
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //調用到了AQS的acquire裏面
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

以前咱們說到回到ReentrantLock的lock()調用了sync.lock();如今咱們回來看看非公平鎖的邏輯是:若是搶到鎖了,則設置本身的線程爲佔有鎖的線程,不然調用acquire(1),這個是AQS的方法。spa

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire會調用tryAcquire,而這個是對於不一樣的實現是不同的,非公平鎖NonfairSync裏面的tryAcquire,而tryAcquire又會調用到Sync的nonfairTryAcquire。總之tryAcquire在非公平鎖場景下嘗試去獲取鎖,若是獲取上了,則置一下AQS狀態state,並設置本身爲獨佔線程,並支持重入鎖功能。線程

addWaiter方法用於建立一個節點(值爲當前線程)並維護一個雙向鏈表。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

如今說一下Node的結構,主要有用的field爲waitStatus,prev,next,thread。waitStatus目前僅要了解1,0,-1就夠了。 0是默認狀態,1表明爭取鎖取消,-1表示它的後繼節點對應的線程須要被喚醒。也就是說這個waitStatus其實表明的不是本身的狀態,而是後繼節點的狀態。能夠看見默認進隊的節點的waitStatus都是0

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 標識節點當前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 標識節點當前在獨佔模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個int常量是給waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代碼此線程取消了爭搶這個鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示當前node的後繼節點對應的線程須要被喚醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,因此略過吧
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 一樣的不分析,略過吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值爲上面的一、-一、-二、-3,或者0(之後會講到)
    // 這麼理解,暫時只須要知道若是這個值 大於0 表明此線程取消了等待,
    // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是能夠指定timeouot的。。。
    volatile int waitStatus;
    // 前驅節點的引用
    volatile Node prev;
    // 後繼節點的引用
    volatile Node next;
    // 這個就是線程本尊
    volatile Thread thread;
}

acquireQueued的做用是從等待隊列中嘗試去把入隊的那個節點去作park。另外當節點unpark之後,也會在循環中將本身設置成頭結點,而後本身拿到鎖

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //對於隊首節點,剛纔也許沒有搶到鎖,如今也許能搶到了,再試一次
                if (p == head && tryAcquire(arg)) {
                    //若是搶到了鎖,這個入隊的節點根本不須要park,直接能夠執行
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //若是不是隊首節點,或者是隊首可是沒有搶過其餘節點
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire。這個方法說的是:"當前線程沒有搶到鎖,是否須要掛起當前線程?第一個參數是前驅節點,第二個參數纔是表明當前線程的節點。注意由於默認加入的節點的狀態都是0,這個方法會進來兩次,第一次進來走到else分支裏面修改前置節點的waitStatus爲-1.第二次進來直接返回true。對於剛加入隊列的節點,修改head節點的waitStatus爲-1,對於後來加入的節點,修改它前一個節點的waitStatus爲-1。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt的代碼很簡單,這個this就是ReentrantLock類的實例。阻塞了當前線程。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

再來看看怎麼解鎖。

public void unlock() {
    sync.release(1);
}

調用到AQS裏面,若是鎖被徹底釋放了,那麼就unpark head的下一個

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease是由Sync覆蓋的。重置AQS裏面的state,返回鎖是否被徹底釋放了的判斷。

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //下面的代碼就是喚醒後繼節點,可是有可能後繼節點取消了等待(waitStatus==1)
        // 從隊尾往前找,找到waitStatus<=0的全部節點中排在最前面的 
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

等到unpark之後,parkAndCheckInterrupt的阻塞解除,將繼續for無限循環,由於是隊列裏是一個一個阻塞的,此時阻塞節點的前置依次都是head,所以if (p == head && tryAcquire(arg)) 這句話若是它醒來搶鎖成功了將執行成功,阻塞的線程獲取鎖並執行,將本身設置成head,同時也將本身從隊列中清除出去。 注意這裏是非公平鎖,所以在tryAcquire有可能尚未搶過其餘線程,那麼搶到的那個將會直接執行,而沒有搶到的,又在循環裏鎖住了。

相關文章
相關標籤/搜索