Java併發編程之ReentrantLock源碼分析

ReentrantLock介紹

從JDK1.5以前,咱們都是使用synchronized關鍵字來對代碼塊加鎖,在JDK1.5引入了ReentrantLock鎖。在JDK1.6以前synchronized關鍵字性能比ReentrantLock鎖要差,JDK1.6以後性能基本是持平,但ReentrantLock鎖功能要比synchronized關鍵字功能強大。java

特色

synchronized關鍵字和ReentrantLock鎖都是重入鎖,可重入鎖是指當一個線程獲取到鎖後,此線程還可繼續得到這把鎖,在此線程釋放這把鎖前其餘線程則不可得到這邊鎖。相比synchronized關鍵字,ReentrantLock鎖具備鎖獲取超時和獲取鎖響應中斷的特色。ReentrantLock鎖還分公平鎖和非公平鎖,公平鎖模式是按線程調用加鎖的前後排隊順序獲取鎖,非公平鎖模式是已經在排隊中的線程按順序獲取鎖,可是新來的線程會和排隊中的線程進行競爭,並不保證先排先獲取鎖。node

ReentrantLock 源碼分析

ReentrantLock實現了java.util.concurrent.locks.Lock接口和java.io.Serializable接口,前者是對實現Java鎖的一種規範,後者說明ReentrantLock能夠序列化。 ReentrantLock定義了一個成員變量c#

private final Sync sync;

Sync類型是ReentrantLock的內部類,繼承至AbstractQueuedSynchronizer ,AbstractQueuedSynchronizer是一個帶空頭的雙向列表,爲ReentrantLock的鎖排隊提供了基礎支持。 ReentrantLock的UML關係圖以下源碼分析

下面咱們解析下ReentrantLock中幾個經常使用方法。性能

lock()方法源碼分析

lock()是ReentrantLock中最經常使用的方法,用來對代碼塊加鎖。lock()先是調用Sync的lock()的方法,Sync#lock()實現分爲非公平模式和公平模式,咱們對這2個模式分別講解ui

非公平模式

Sync#lock()非公平模式代碼以下:this

final void lock() {
		//用CAS方法設置枷鎖狀態
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
		//搶鎖失敗,進入後續邏輯。
                acquire(1);
        }

新來線程先調用compareAndSetState(0, 1)方法用CAS方法設置加鎖狀態,這裏是非公平模式實現要點,這樣作主要是爲了新來的線程和排隊中的線程競爭,排隊中的線程激活後也會用CAS方法設置加鎖狀態,就是看哪一個線程線程搶的快,哪一個能拿到鎖。若是設置加鎖狀態成功,則設置AbstractQueuedSynchronizer中的全局變量線程爲當前當前線程。若是設置加鎖狀態失敗即搶鎖失敗,則調用acquire(1)進入排隊邏輯。線程

AbstractQueuedSynchronizer#acquire(int arg)實現代碼以下:code

public final void acquire(int arg) {
	//先調用tryAcquire(arg)再試下能不能獲取到鎖,沒法獲取則調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)進入排隊
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

先調用tryAcquire(arg)再試下能不能獲取到鎖,獲取成功則執行結束,沒法獲取則調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)進入排隊,此方法返回參數爲是否中斷當前線程,排隊過程當中若是線程被中斷則會返回ture,此時調用selfInterrupt()中斷當前線程。blog

tryAcquire(arg)直接調用了非公平模式nonfairTryAcquire(acquires)方法咱們看下實現:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
			//獲取鎖狀態
            int c = getState();
			//狀態未加鎖則嘗試獲取鎖
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
			//判斷是不是相同線程,若是是則表示當前線程的鎖重入了
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

調用getState()方法獲取加鎖狀態,若是爲0表示當前未被加鎖,嘗試CAS設置加鎖狀態獲取鎖,若是成功一樣設置AbstractQueuedSynchronizer中的全局變量線程爲當前當前線程。若是已被加鎖,這判斷當前線程和加鎖線程是不是同一線程,若是是同一線程則將獲取鎖的狀態加1返回獲取鎖成功,這裏就是可重入鎖實現的核心,狀態的值表示當前線程重入了多少次,以後的釋放鎖就要釋放相同的次數。

接下來咱們看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,acquireQueued主要功能是對當前線程阻塞,阻塞到能被上個獲取到鎖線程釋放爲止,addWaiter(Node.EXCLUSIVE)則是將當前線程加入到排隊隊列中。 咱們先來看下addWaiter(Node.EXCLUSIVE)實現

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
		//CAS快速添加節點到尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
		//若是尾節點不存在或者添加失敗走最大努力添加節點邏輯
        enq(node);
        return node;
    }
	
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
			//若是頭尾節點爲空則建立空節點當頭尾節點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
			//CAS添加節點到尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

建立已當前線程爲基礎的節點,先走快速添加到尾部邏輯,獲取尾節點若是尾節點存在,將當前節點和尾節點相連,並用CAS方式將當前節點設置爲尾節點,這邊使用CAS方式考慮了多個線程同時操做尾節點的狀況,因此若是尾節點已經變動則快速添加節點操做失敗,調用enq(node)方法走最大努力添加節點的邏輯。enq(node)最大努力添加邏輯就是一直添加節點直到添加節點到尾部成功。

下面看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的實現

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued裏有個循環,這個循環的主要做用就是在線程激活後重試獲取鎖直到獲取鎖。node.predecessor()獲取當前線程節點的前一個節點,若是是頭節點,則當前線程嘗試獲取鎖,獲取鎖成功設置當前節點爲頭節點。若是獲取失敗或者非頭節點則調用shouldParkAfterFailedAcquire(p, node)判斷是否須要阻塞等待,若是須要阻塞等待則調用parkAndCheckInterrupt()阻塞當前線程並讓出cup資源直到被前一個節點激活,繼續循環邏輯。

咱們先來看下shouldParkAfterFailedAcquire(p, node)的實現

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

先獲取前個節點的狀態,狀態分如下4類

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

除了CANCELLED關閉狀態是非正常,其餘狀態均正常狀態。判斷當前狀態是不是SIGNAL正常狀態,若是是就返回成功,這樣當前線程就能夠阻塞安心的等待上個節點的激活。若是狀態爲CANCELLED關閉狀態則刪除全部當前節點以前狀態爲CANCELLED的節點,返回失敗讓當前線程重試獲取鎖,若是是初始化0狀態則CAS方式設置狀態爲SIGNAL。

接下來看下阻塞方法parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

方法很簡單調用LockSupport.park(this)阻塞當前線程,這裏要講下方法返回時調用Thread.interrupted()判斷當前線程是否被中斷,若是被中斷的話,當前線程獲取到鎖後會調用Thread.currentThread().interrupt()中斷線程。

公平模式

公平模式和非公平模式大部分代碼相同,主要是獲取鎖的邏輯不一樣,咱們就講下代碼不一樣的部分 lock()代碼以下

final void lock() {
       acquire(1);
}

非公平模式模式先嚐試設置狀態來獲取鎖,而公平模式則直接調用acquire(1)去走排隊邏輯。

嘗試獲取鎖的方法tryAcquire(int acquires)也不同代碼以下

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
}

該方法跟非公平鎖基本都同樣,只是在獲取鎖的時候加了hasQueuedPredecessors()判斷,這個方法主要判斷了當前線程是否在頭節點的下個節點,這樣保證了獲取鎖的順序性。

unlock()方法源碼分析

unlock()方法比較簡單,直接調用sync.release(1)方法。 release(1)代碼以下

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

先嚐試釋放鎖,若是釋放產品這判斷當前節點是否爲0不爲0調用unparkSuccessor(h)方法激活下個節點的線程,不然直接返回。這裏會有個疑問爲何h.waitStatus爲0不去激活下個節點的線程,若是不激活下個節點的線程是否一直阻塞的,答案是否認的。這樣作主要是爲了釋放鎖的效率。waitStatus爲0是初始化的值,這個值還沒被下個節點線程調用shouldParkAfterFailedAcquire(p, node)方法設置成SIGNAL狀態,也就說明下個節點線程還沒被阻塞,此時若是下個節點線程調用此方法並設置成SIGNAL狀態,勢必它會從新獲取鎖,從而獲取到鎖避免了上述的問題。

下面來看下tryRelease(arg)方法

protected final boolean tryRelease(int releases) {
 		//重入次數減1
            int c = getState() - releases
		//非持有線程拋異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
		//若是釋放了全部的重入次則清理持有線程爲空
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
		//設置當前剩餘的重入次數
            setState(c);
            return free;
        }

由於鎖可重入,所以調用getState()獲取狀態的值並減去一次重入次數,獲得的c就是剩餘重入的次數,而後判斷當前釋放的線程是不是當前佔有鎖的線程,若是不是拋出異常,不然先判斷c是否爲0表示當前線程持有的鎖是否釋放徹底,若是是則設置持有鎖的線程的變量爲空,並設置鎖狀態爲0,不然設置剩餘的c到鎖的狀態。

接下來看下unparkSuccessor(h)的實現

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
			//查找下個正常狀態的節點去激活
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
		    //激活線程
            LockSupport.unpark(s.thread);
    }

獲取當前節點狀態,設置若是當前節點正常狀況則設置成0,而後取當前節點的下個節點,若是下個節點狀態非正常即CANCELLED狀態,則從隊列的尾部開始查找查到最靠近當前的節點且狀態正常的節點,而後調用LockSupport.unpark(s.thread)通知此節點中止阻塞。這邊會有個疑問若是調用LockSupport.unpark(s.thread)方法後,此節點才調用LockSupport.park(this)去阻塞,這樣會不會發生此節點永久阻塞的問題,答案是否認的,LockSupport.unpark(s.thread)方法的實現實際上是爲線程設置了一個信號量,LockSupport.park(this)就算後調,若是線程相同也會收到此信號從而激活線程,這裏的實現原理就不展開講。

相關文章
相關標籤/搜索