深刻ReentrantLock 源碼剖析(JDK1.8)

ReentrantLock簡單使用demo以下:java

Lock lock = new ReentrantLock(); 
lock.lock();
try {
    //業務邏輯  
} finally {
  lock.unlock();
}

注:獲取的鎖代碼要放到try塊以外,防止得到鎖代碼異常,拋出異常的同時,也會致使一次鎖的釋放。釋放代碼必定要放到finally塊中。node

** AQS **
瞭解java中的鎖,首先的瞭解AQS。
AQS(AbstractQueuedSynchronizer)隊列同步器。是用來構建鎖或者其它同步組件的基礎框架,他實現了一個int成員變量標識同步狀態(更改這個變量值來獲取和釋放鎖),經過內置的FIFO雙向隊列來完成資源獲取線程排隊的工做。
AQS能夠實現獨佔鎖和共享鎖,RenntrantLock實現的是獨佔鎖,ReentrantReadWriteLock實現的是獨佔鎖和共享鎖,CountDownLatch實現的是共享鎖。安全

ReentrantLock 類結構信息以下圖:併發

  • ReentrantLock 實現 Lock 和 Serializable 接口
  • RentrantLock 有三個內部類 Sync、NonfairSync 和 FairSync 類
  • Sync 繼承 AbstractQueuedSynchronizer 抽象類
  • NonfairSync(非公平鎖) 繼承 Sync 抽象類
  • FairSync(公平鎖) 繼承 Sync 抽象類

** 公平鎖和非公平鎖 **框架

ReentrantLock 有兩種實現方式,公平鎖和非公平鎖。ui

  • 公平鎖:當前線程不馬上得到鎖,而是先直進入等待隊列中隊尾進行排隊獲取鎖。this

  • 非公平鎖:當前線程首先嚐試獲取一下鎖(僅僅嘗試一下),若是獲取不到,則乖乖的進入到等待隊列中去排隊。線程

ReentrantLock實現公平鎖和非公平鎖代碼以下:code

/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

** 獲取非公平鎖 **orm

/**
        * Performs lock.  Try immediate barge, backing up to normal
        * acquire on failure.
        */
       final void lock() {
           if (compareAndSetState(0, 1))
               setExclusiveOwnerThread(Thread.currentThread());
           else
               acquire(1);
       }
  1. 首先經過CAS更新AQS中的state變量來得到鎖(第一次得到鎖),若是獲取成功則把當前線程設置爲獨佔鎖
  2. 若是是設置失敗,進入到acquire方法
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. 首先執行tryAcquire方法,嘗試得到鎖。
  2. 若是獲取失敗則進入addWaiter方法,構造同步節點,將該節點添加到同步隊列尾部,並返回此節點,進入acquireQueued方法。
  3. acquireQueued方法,這個新節點死是循環的方式獲取同步狀態,若是獲取不到則阻塞節點中的線程,阻塞後的節點等待前驅節點來喚醒。
/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

tryAcquire調用nonfairTryAcquire方法來第二次嘗試得到鎖

  1. 若是state變量爲0,則進行CAS嘗試更新state來得到鎖,並把該線程設置成獨佔鎖,並返回true。
  2. 若是state變量不爲0,則判斷當前線程是否爲獨佔鎖,若是是,則當前state+1(可重入鎖),表示獲取鎖成功,更新state值,並返回true。這裏更新state變量,不須要CAS更新,由於,當前線程已經得到鎖。
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

將構造的同步節點加入到同步隊列中

  1. 使用鏈表的方式把該Node節點添加到隊列尾部,若是tail的前驅節點不爲空(隊列不爲空),則進行CAS添加到隊列尾部。
  2. 若是更新失敗(存在併發競爭更新),則進入enq方法進行添加
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

該方法使用CAS自旋的方式來保證向隊列中添加Node(同步節點簡寫Node)

  1. 若是隊列爲空,則把當前Node設置成頭節點
  2. 若是隊列不爲空,則向隊列尾部添加Node
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在acquireQueued方法中,當前線程在死循環中嘗試獲取同步狀態,

  1. 若是當前節點的前驅節點頭節點才能嘗試得到鎖,若是得到成功,則把當前線程設置成頭結點,把以前的頭結點從隊列中移除,等待垃圾回收(沒有對象引用)
  2. 若是獲取鎖失敗則進入shouldParkAfterFailedAcquire方法中檢測當前節點是否能夠被安全的掛起(阻塞),若是能夠安全掛起則進入parkAndCheckInterrupt方法,把當前線程掛起,並檢查剛線程是否執行了interrupted方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 嘗試將當前節點的前驅節點的等待狀態設爲SIGNAL
             * 1/這爲何用CAS,如今已經入隊成功了,前驅節點就是pred,除了node外應該沒有別的線程在操做這個節點了,那爲何還要用CAS?而不直接賦值呢?
             * (解釋:由於pred能夠本身將本身的狀態改成cancel,也就是pred的狀態可能同時會有兩條線程(pred和node)去操做)
             * 2/既然前驅節點已經設爲SIGNAL了,爲何最後還要返回false
             * (由於CAS可能會失敗,這裏無論失敗與否,都返回false,下一次執行該方法的以後,pred的等待狀態就是SIGNAL了)
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

waitStatus狀態值

狀態 說明
CANCELLED 1 等待超時或者中斷,須要從同步隊列中取消
SIGNAL -1 後繼節點出於等待狀態,當前節點釋放鎖後將會喚醒後繼節點
CONDITION -2 節點在等待隊列中,節點線程等待在Condition上,其它線程對Condition調用signal()方法後,該節點將會從等待同步隊列中移到同步隊列中,而後等待獲取鎖。
PROPAGATE -3 表示下一次共享式同步狀態獲取將會無條件地傳播下去
INITIAL 0 初始狀態
  1. 首先獲取前驅節點的等待狀態ws
  2. 若是ws爲SIGNAL則表示能夠被前驅節點喚醒,當前線程就能夠掛起,等待前驅節點喚醒,返回true(能夠掛起)
  3. 若是ws>0說明,前驅節點取消了,並循環查找此前驅節點以前全部連續取消的節點。並返回false(不能掛起)。
  4. 嘗試將當前節點的前驅節點的等待狀態設爲SIGNAL
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

把當前線程掛起,並檢查剛線程是否執行了interrupted方法,並返回true、false。

** 公平鎖 **

公平鎖和非公平鎖實現方式是同樣的,惟一不一樣的是tryAcquire方法的實現,下面是公平鎖tryAcquire方法實現:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  1. 首先獲取當前鎖狀態,若是當前state==0(無鎖),則進行獲取鎖操做
  2. hasQueuedPredecessors方法判斷頭結點是否當前線程,若是是當前線程則進行CAS更新得到鎖,獲取成功,把當前線程設置成獨佔鎖。
  3. 若是不是頭結點或獲取鎖失敗則,則判斷當前線程是否爲獨佔鎖,若是是,則當前state+1(可重入鎖),表示獲取鎖成功,更新state值,並返回true。這裏更新state變量,不須要CAS更新,由於,當前線程已經得到鎖。
相關文章
相關標籤/搜索