深刻分析ReentrantLock

ReentrantLock是java concurrent包提供的一種鎖實現。不一樣於synchronized,ReentrantLock是從代碼層面實現同步的。 java

圖1 reentrantLock的類層次結構圖node

Lock定義了鎖的接口規範。 程序員

ReentrantLock實現了Lock接口。 編程

AbstractQueuedSynchronizer中以隊列的形式實現線程之間的同步。 安全

ReentrantLock的方法都依賴於AbstractQueuedSynchronizer的實現。多線程

Lock接口定義了以下方法: 併發

圖2 lock接口規範函數

一、lock()方法的實現 性能

進入lock()方法,發現其內部調用的是sync.lock();優化

    public void lock() {
        sync.lock();
    }

sync是在ReentrantLock的構造函數中實現的。其中fair參數的不一樣可實現公平鎖和非公平鎖。因爲在鎖釋放的階段,鎖處於無線程佔有的狀態,此時其餘線程和在隊列中等待的線程均可以搶佔該鎖,從而出現公平鎖和非公平鎖的區別。 

非公平鎖:當鎖處於無線程佔有的狀態,此時其餘線程和在隊列中等待的線程均可以搶佔該鎖。 

公平鎖:當鎖處於無線程佔有的狀態,在其餘線程搶佔該鎖的時候,都須要先進入隊列中等待。 

本文以非公平鎖NonfairSync的sync實例進行分析。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

由圖1可知,NonfairSync繼承自Sync,所以也繼承了AbstractQueuedSynchronizer中的全部方法實現。接着進入NonfairSync的lock()方法。

 final void lock() {
            // 利用cas置狀態位,若是成功,則表示佔有鎖成功
            if (compareAndSetState(0, 1))
                // 記錄當前線程爲鎖擁有者
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

在lock方法中,利用cas實現ReentrantLock的狀態置位(cas即compare and swap,它是CPU的指令,所以賦值操做都是原子性的)。若是成功,則表示佔有鎖成功,並記錄當前線程爲鎖擁有者。當佔有鎖失敗,則調用acquire(1)方法繼續處理。

    public final void acquire(int arg) {
        //嘗試得到鎖,若是失敗,則加入到隊列中進行等待
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire()是AbstractQueuedSynchronizer的方法。它首先會調用tryAcquire()去嘗試得到鎖,若是得到鎖失敗,則將當前線程加入到CLH隊列中進行等待。tryAcquire()方法在NonfairSync中有實現,但最終調用的仍是Sync中的nonfairTryAcquire()方法。

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 得到狀態
            int c = getState();
            // 若是狀態爲0,則表示該鎖未被其餘線程佔有
            if (c == 0) {
                // 此時要再次利用cas去嘗試佔有鎖
                if (compareAndSetState(0, acquires)) {
                    // 標記當前線程爲鎖擁有者
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 若是當前線程已經佔有了,則state + 1,記錄佔有次數
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 此時無需利用cas去賦值,由於該鎖確定被當前線程佔有
                setState(nextc);
                return true;
            }
            return false;
        }

在nonfairTryAcquire()中,首先會去得到鎖的狀態,若是爲0,則表示鎖未被其餘線程佔有,此時會利用cas去嘗試將鎖的狀態置位,並標記當前線程爲鎖擁有者;若是鎖的狀態大於0,則會判斷鎖是否被當前線程佔有,若是是,則state + 1,這也是爲何lock()的次數要和unlock()次數對等;若是佔有鎖失敗,則返回false。 

在nonfairTryAcquire()返回false的狀況下,會繼續調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,將當前線程加入到隊列中繼續嘗試得到鎖。

    private Node addWaiter(Node mode) {
        // 建立當前線程的節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 若是尾節點不爲空
        if (pred != null) {
            // 則將當前線程的節點加入到尾節點以後,成爲新的尾節點
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

enq(node);
        return node;
    }

private Node enq(final Node node) {
        // CAS方法有可能失敗,所以要循環調用,直到當前線程的節點加入到隊列中
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header,頭節點爲虛擬節點
                h.next = node;
                node.prev = h;
                    if (compareAndSetHead(h)) {
                    tail = node; 
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter()是AbstactQueuedSynchronizer的方法,會以節點的形式來標記當前線程,並加入到尾節點中。enq()方法是在節點加入到尾節點失敗的狀況下,經過for(;;)循環反覆調用cas方法,直到節點加入成功。因爲enq()方法是非線程安全的,因此在增長節點的時候,須要使用cas設置head節點和tail節點。此時添加成功的結點狀態爲Node.EXCLUSIVE。 

在節點加入到隊列成功以後,會接着調用acquireQueued()方法去嘗試得到鎖。

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                // 得到前一個節點
                final Node p = node.predecessor();
                // 若是前一個節點是頭結點,那麼直接去嘗試得到鎖
                // 由於其餘線程有可能隨時會釋放鎖,不必Park等待
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

在acquireQueued()方法中,會利用for (;;)一直去得到鎖,若是前一個節點爲head節點,則表示能夠直接嘗試去得到鎖了,由於佔用鎖的線程隨時都有可能去釋放鎖而且該線程是被unpark喚醒的CLH隊列中的第一個節點,得到鎖成功後返回。 

若是該線程的節點在CLH隊列中比較靠後或者得到鎖失敗,即其餘線程依然佔用着鎖,則會接着調用shouldParkAfterFailedAcquire()方法來阻塞當前線程,以讓出CPU資源。在阻塞線程以前,會執行一些額外的操做以提升CLH隊列的性能。因爲隊列中前面的節點有可能在等待過程當中被取消掉了,所以當前線程的節點須要提早,並將前一個節點置狀態位爲SIGNAL,表示能夠阻塞當前節點。所以該函數在判斷到前一個節點爲SIGNAL時,直接返回true便可。此處雖然存在對CLH隊列的同步操做,但因爲局部變量節點確定是不同的,因此對CLH隊列操做是線程安全的。因爲在compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行以前可能發生pred節點搶佔鎖成功或pred節點被取消掉,所以此處須要返回false以容許該節點能夠搶佔鎖。 

當shouldParkAfterFailedAcquire()返回true時,會進入parkAndCheckInterrupt()方法。parkAndCheckInterrupt()方法最終調用safe.park()阻塞該線程,以避免該線程在等待過程當中無線循環消耗cpu資源。至此,當前線程便被park了。那麼線程什麼時候被unpark,這將在unlock()方法中進行。 

這裏有一個小細節須要注意,在線程被喚醒以後,會調用Thread.interrupted()將線程中斷狀態置位爲false,而後記錄下中斷狀態並返回上層函數去拋出異常。我想這樣設計的目的是爲了可讓該線程能夠完成搶佔鎖的操做,從而可使當前節點稱爲CLH的虛擬頭節點。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;

if (ws > 0) {
            // 若是前面的節點是CANCELLED狀態,則一直提早
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

二、unlock()方法的實現 

同lock()方法,unlock()方法依然調用的是sync.release(1)。

    public final boolean release(int arg) {
        // 釋放鎖
        if (tryRelease(arg)) {
            Node h = head;
            // 此處有個疑問,爲何須要判斷h.waitStatus != 0
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

能夠看到,tryRelease()方法實現了鎖的釋放,邏輯上便是將鎖的狀態置爲0。當釋放鎖成功以後,一般狀況下不須要喚醒隊列中線程,所以隊列中老是有一個線程處於活躍狀態。

總結: 

ReentrantLock的鎖資源以state狀態描述,利用CAS則實現對鎖資源的搶佔,並經過一個CLH隊列阻塞全部競爭線程,在後續則逐個喚醒等待中的競爭線程。ReentrantLock繼承AQS徹底從代碼層面實現了java的同步機制,相對於synchronized,更容易實現對各種鎖的擴展。同時,AbstractQueuedSynchronizer中的Condition配合ReentrantLock使用,實現了wait/notify的功能。

自旋鎖可使線程在沒有取得鎖的時候,不被掛起,而轉去執行一個空循環,(即所謂的自旋,就是本身執行空循環),若在若干個空循環後,線程若是能夠得到鎖,則繼續執行。若線程依然不能得到鎖,纔會被掛起。

使用自旋鎖後,線程被掛起的概率相對減小,線程執行的連貫性相對增強。所以,對於那些鎖競爭不是很激烈,鎖佔用時間很短的併發線程,具備必定的積極意義,但對於鎖競爭激烈,單線程鎖佔用很長時間的併發程序,自旋鎖在自旋等待後,每每毅然沒法得到對應的鎖,不只僅白白浪費了CPU時間,最終仍是免不了被掛起的操做 ,反而浪費了系統的資源。

可能引發的問題:

1.過多佔據CPU時間:若是鎖的當前持有者長時間不釋放該鎖,那麼等待者將長時間的佔據cpu時間片,致使CPU資源的浪費,所以能夠設定一個時間,當鎖持有者超過這個時間不釋放鎖時,等待者會放棄CPU時間片阻塞;

2.死鎖問題:試想一下,有一個線程連續兩次試圖得到自旋鎖(好比在遞歸程序中),第一次這個線程得到了該鎖,當第二次試圖加鎖的時候,檢測到鎖已被佔用(實際上是被本身佔用),那麼這時,線程會一直等待本身釋放該鎖,而不能繼續執行,這樣就引發了死鎖。所以遞歸程序使用自旋鎖應該遵循如下原則:遞歸程序決不能在持有自旋鎖時調用它本身,也決不能在遞歸調用時試圖得到相同的自旋鎖。

 

可重入鎖,也叫作遞歸鎖,指的是同一線程 外層函數得到鎖以後 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。

在JAVA環境下 ReentrantLock 和Synchronized都是可重入鎖

Synchronized表明一種聲明式編程思惟, 程序員更多的是表達一種同步聲明 , 由]ava系統負責具體實現, 程序員不知道其實現細節; 顯式鎖表明一種命令式編程思惟, 程序員 實現全部細節.聲明式編程的好處除了簡單 , 還在於性能 , 在較新版本的Java上 , ReentrantLock 和Synchronized的性能是接近的 , Java編譯器和虛擬機可 以 不斷優化Synchronized的實現, 好比自動分析Synchronized的使用 , 對於沒有鎖競爭的場景, 自動省略對鎖獲取/釋放的調用.

簡單總結下 , 能用 Synchronized就用Synchronized, 不知足要求 時再考慮ReentrantLock

 

互斥鎖, 指的是一次最多隻能有一個線程持有的鎖。如Java的Lock

 

引入偏向鎖是爲了在無多線程競爭的狀況下儘可能減小沒必要要的輕量級鎖執行路徑,由於輕量級鎖的獲取及釋放依賴屢次CAS原子指令,而偏向鎖只須要在置換ThreadID的時候依賴一次CAS原子指令(因爲一旦出現多線程競爭的狀況就必須撤銷偏向鎖,因此偏向鎖的撤銷操做的性能損耗必須小於節省下來的CAS原子指令的性能消耗)

相關文章
相關標籤/搜索