ReentrantLock實現原理分析

ReentrantLock主要利用CAS+CLH隊列來實現。它支持公平鎖和非公平鎖,二者的實現相似。java

  • CAS:Compare and Swap,比較並交換。CAS有3個操做數:內存值V、預期值A、要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。該操做是一個原子操做,被普遍的應用在Java的底層實現中。在Java中,CAS主要是由sun.misc.Unsafe這個類經過JNI調用CPU底層指令實現。node

  • CLH隊列:帶頭結點的雙向非循環鏈表(以下圖所示):安全

圖片描述

ReentrantLock的基本實現能夠歸納爲:先經過CAS嘗試獲取鎖。若是此時已經有線程佔據了鎖,那就加入CLH隊列而且被掛起。當鎖被釋放以後,排在CLH隊列隊首的線程會被喚醒,而後CAS再次嘗試獲取鎖。在這個時候,若是:函數

        1.非公平鎖:若是同時還有另外一個線程進來嘗試獲取,那麼有可能會讓這個線程搶先獲取;性能

        2. 公平鎖:若是同時還有另外一個線程進來嘗試獲取,當它發現本身不是在隊首的話,就會排到隊尾,由隊首的線程獲取到鎖。ui

ReentrantLock是java concurrent包提供的一種鎖實現。不一樣於synchronized,ReentrantLock是從代碼層面實現同步的。 
這裏寫圖片描述 
圖1 reentrantLock的類層次結構圖this

Lock定義了鎖的接口規範。 
ReentrantLock實現了Lock接口。 
AbstractQueuedSynchronizer中以隊列的形式實現線程之間的同步。 
ReentrantLock的方法都依賴於AbstractQueuedSynchronizer的實現。spa

Lock接口定義了以下方法: 
這裏寫圖片描述
圖2 lock接口規範線程

一、lock()方法的實現 
進入lock()方法,發現其內部調用的是sync.lock();設計

    public void lock() {
        sync.lock();
    }

sync是在ReentrantLock的構造函數中實現的。其中fair參數的不一樣可實現公平鎖和非公平鎖。因爲在鎖釋放的階段,鎖處於無線程佔有的狀態,此時其餘線程和在隊列中等待的線程均可以搶佔該鎖,從而出現公平鎖和非公平鎖的區別。 
非公平鎖:當鎖處於無線程佔有的狀態,此時其餘線程和在隊列中等待的線程均可以搶佔該鎖。 
公平鎖:當鎖處於無線程佔有的狀態,在其餘線程搶佔該鎖的時候,都須要先進入隊列中等待。 
本文以非公平鎖NonfairSync的sync實例進行分析。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

由圖1可知,NonfairSync繼承自Sync,所以也繼承了AbstractQueuedSynchronizer中的全部方法實現。接着進入NonfairSync的lock()方法。

 final void lock() {
            // 利用cas置狀態位,若是成功,則表示佔有鎖成功
            if (compareAndSetState(0, 1))
                // 記錄當前線程爲鎖擁有者
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

在lock方法中,利用cas實現ReentrantLock的狀態置位(cas即compare and swap,它是CPU的指令,所以賦值操做都是原子性的)。若是成功,則表示佔有鎖成功,並記錄當前線程爲鎖擁有者。當佔有鎖失敗,則調用acquire(1)方法繼續處理。

    public final void acquire(int arg) {
        //嘗試得到鎖,若是失敗,則加入到隊列中進行等待
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire()是AbstractQueuedSynchronizer的方法。它首先會調用tryAcquire()去嘗試得到鎖,若是得到鎖失敗,則將當前線程加入到CLH隊列中進行等待。tryAcquire()方法在NonfairSync中有實現,但最終調用的仍是Sync中的nonfairTryAcquire()方法。

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 得到狀態
            int c = getState();
            // 若是狀態爲0,則表示該鎖未被其餘線程佔有
            if (c == 0) {
                // 此時要再次利用cas去嘗試佔有鎖
                if (compareAndSetState(0, acquires)) {
                    // 標記當前線程爲鎖擁有者
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 若是當前線程已經佔有了,則state + 1,記錄佔有次數
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 此時無需利用cas去賦值,由於該鎖確定被當前線程佔有
                setState(nextc);
                return true;
            }
            return false;
        }

在nonfairTryAcquire()中,首先會去得到鎖的狀態,若是爲0,則表示鎖未被其餘線程佔有,此時會利用cas去嘗試將鎖的狀態置位,並標記當前線程爲鎖擁有者;若是鎖的狀態大於0,則會判斷鎖是否被當前線程佔有,若是是,則state + 1,這也是爲何lock()的次數要和unlock()次數對等;若是佔有鎖失敗,則返回false。 
在nonfairTryAcquire()返回false的狀況下,會繼續調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,將當前線程加入到隊列中繼續嘗試得到鎖。

  private Node addWaiter(Node mode) {
        // 建立當前線程的節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 若是尾節點不爲空
        if (pred != null) {
            // 則將當前線程的節點加入到尾節點以後,成爲新的尾節點
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        // CAS方法有可能失敗,所以要循環調用,直到當前線程的節點加入到隊列中
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header,頭節點爲虛擬節點
                h.next = node;
                node.prev = h;
                    if (compareAndSetHead(h)) {
                    tail = node;  
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

addWaiter()是AbstactQueuedSynchronizer的方法,會以節點的形式來標記當前線程,並加入到尾節點中。enq()方法是在節點加入到尾節點失敗的狀況下,經過for(;;)循環反覆調用cas方法,直到節點加入成功。因爲enq()方法是非線程安全的,因此在增長節點的時候,須要使用cas設置head節點和tail節點。此時添加成功的結點狀態爲Node.EXCLUSIVE。 
在節點加入到隊列成功以後,會接着調用acquireQueued()方法去嘗試得到鎖。

 final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                // 得到前一個節點
                final Node p = node.predecessor();
                // 若是前一個節點是頭結點,那麼直接去嘗試得到鎖
                // 由於其餘線程有可能隨時會釋放鎖,不必Park等待
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

在acquireQueued()方法中,會利用for (;;)一直去得到鎖,若是前一個節點爲head節點,則表示能夠直接嘗試去得到鎖了,由於佔用鎖的線程隨時都有可能去釋放鎖而且該線程是被unpark喚醒的CLH隊列中的第一個節點,得到鎖成功後返回。 
若是該線程的節點在CLH隊列中比較靠後或者得到鎖失敗,即其餘線程依然佔用着鎖,則會接着調用shouldParkAfterFailedAcquire()方法來阻塞當前線程,以讓出CPU資源。在阻塞線程以前,會執行一些額外的操做以提升CLH隊列的性能。因爲隊列中前面的節點有可能在等待過程當中被取消掉了,所以當前線程的節點須要提早,並將前一個節點置狀態位爲SIGNAL,表示能夠阻塞當前節點。所以該函數在判斷到前一個節點爲SIGNAL時,直接返回true便可。此處雖然存在對CLH隊列的同步操做,但因爲局部變量節點確定是不同的,因此對CLH隊列操做是線程安全的。因爲在compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行以前可能發生pred節點搶佔鎖成功或pred節點被取消掉,所以此處須要返回false以容許該節點能夠搶佔鎖。 
當shouldParkAfterFailedAcquire()返回true時,會進入parkAndCheckInterrupt()方法。parkAndCheckInterrupt()方法最終調用safe.park()阻塞該線程,以避免該線程在等待過程當中無線循環消耗cpu資源。至此,當前線程便被park了。那麼線程什麼時候被unpark,這將在unlock()方法中進行。 
這裏有一個小細節須要注意,在線程被喚醒以後,會調用Thread.interrupted()將線程中斷狀態置位爲false,而後記錄下中斷狀態並返回上層函數去拋出異常。我想這樣設計的目的是爲了可讓該線程能夠完成搶佔鎖的操做,從而可使當前節點稱爲CLH的虛擬頭節點。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;

        if (ws > 0) {
            // 若是前面的節點是CANCELLED狀態,則一直提早
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        } 
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

二、unlock()方法的實現 
同lock()方法,unlock()方法依然調用的是sync.release(1)。

 public final boolean release(int arg) {
        // 釋放鎖
        if (tryRelease(arg)) {
            Node h = head;
            // 此處有個疑問,爲何須要判斷h.waitStatus != 0
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

能夠看到,tryRelease()方法實現了鎖的釋放,邏輯上便是將鎖的狀態置爲0。當釋放鎖成功以後,一般狀況下不須要喚醒隊列中線程,所以隊列中老是有一個線程處於活躍狀態。

總結: 
         ReentrantLock的鎖資源以state狀態描述,利用CAS則實現對鎖資源的搶佔,並經過一個CLH隊列阻塞全部競爭線程,在後續則逐個喚醒等待中的競爭線程。ReentrantLock繼承AQS徹底從代碼層面實現了java的同步機制,相對於synchronized,更容易實現對各種鎖的擴展。同時,AbstractQueuedSynchronizer中的Condition配合ReentrantLock使用,實現了wait/notify的功能。

 

 

相關文章
相關標籤/搜索