ReentrantLock加鎖過程簡單分析

什麼是可重入鎖?

Jdk api中的介紹

一個可重入的互斥鎖Lock,它具備與使用synchronized方法和語句所訪問的隱式監視器鎖相同的一些基本行爲和語義,但功能更強大。node

ReentrantLock將由最近成功得到鎖,而且尚未釋放該鎖的線程所擁有。當鎖沒有被另外一個線程所擁有時,調用lock的線程將成功獲取該鎖並返回。若是當前線程已經擁有該鎖,此方法將當即返回。api

怎麼用?

class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

public void m() { 
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
 }

怎麼實現的?

ReentrantLock的鎖是經過它內部的一個同步器實現的。安全

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

同步器的類型有兩個子類,公平同步器與非公平同步器,也即公平鎖與非公平鎖。這兩個子類是鎖的具體實現,咱們對鎖的操做也都是做用在這兩個類的實例上的。多線程

 

咱們在使用ReentrantLock時,經過無參構造器建立的lock是就是非公平的。併發

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

 若是想要使用公平鎖,咱們可使用有參構造器來建立lock,傳入true表示公平鎖,傳入false表示非公平鎖。 框架

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

 建立完成後,咱們使用鎖的方式就是調用lock方法,那lock方法又是怎麼實現的呢?咱們來看一下代碼。oop

/**
 * Acquires the lock.
 *
 * <p>Acquires the lock if it is not held by another thread and returns
 * immediately, setting the lock hold count to one.
 *
 * <p>If the current thread already holds the lock then the hold
 * count is incremented by one and the method returns immediately.
 *
 * <p>If the lock is held by another thread then the
 * current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the lock has been acquired,
 * at which time the lock hold count is set to one.
 */
public void lock() {
    sync.lock();
}

經過代碼咱們能夠看到,ReentrantLock的lock方法是經過調用sync的lock方法來實現的。根據咱們前面的介紹,sync又是FairSync和NonfairSync中的一個,那咱們下面就來具體分析一下這兩個類中lock方法的具體實現。ui

準備工做

在開始以前,咱們先作點準備工做。從下面的類繼承關係圖中能夠看到,ReentrantLock中的公平鎖與非公平鎖都是從AQS(AbstractQueuedSynchronizer)擴展來的,因此在詳細介紹ReentrantLock以前,咱們先簡單的瞭解一下AQS。this

 

在API中,對AQS的解釋是:spa

爲實現依賴於先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量、事件、等等)提供一個框架。此類的設計目標是成爲依靠單個原子int值來表示狀態的大多數同步器的一個有用基礎。

經過這兩句話,咱們能夠看出,AQS中有兩個重要的概念,一個是表示狀態的單個int值(state),一個是FIFO的隊列。在Java併發包中的各類同步器就是經過對這兩個概念進行不一樣的操做而實現的。其中須要額外說明的就是,對錶示狀態的int值的操做,是經過CAS的方式完成的,這樣才能保證在多線程的環境下的安全性。這兩個概念具體的內容與相關操做,咱們等待用到的時候再說。

在ReentrantLock中表示狀態的int值表示的是鎖的數量。

非公平鎖(NonfairSync)

概覽

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

lock方法

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

經過代碼能夠看出,得到鎖的第一步是經過CAS的方式,嘗試將state從0變爲1。若是成功,就表示當前線程成功搶到了鎖,接着就會將當前線程設爲ExclusiveOwnerThread,即獨佔線程。若是失敗,就會接着調用acquire方法。經過這搶鎖的第一步,咱們能夠明顯感覺到,這就是一個赤裸裸的插隊行爲。當新線程搶奪鎖的時候,它不會關注此時到底有多少個線程在排隊等待,而是直接嘗試搶鎖。若是此時鎖正好被釋放了,那新線程就有了成功的可能。這就是不公平鎖第一處體現不公平的地方。

acquire方法

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

當第一步插隊失敗後,會調用require方法,並傳入1。接着會調用tryAcquire方法再次搶鎖。在非公平鎖中,tryAcquire方法會直接調用nonfairTryAcquire方法。

nonfairTryAcquire方法

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在此方法中,首先會查看當前的鎖數量,若是數量爲0,表示當前無鎖,那麼就會再使用一次插隊搶鎖。若是搶鎖成功,就會直接返回true,這就是不公平鎖第二處體現不公平的地方。若是搶鎖失敗,就會看當前持鎖的線程是否是搶鎖的線程,若是是的話,鎖數nextc+1,並返回true。這也就是爲何叫可重入鎖的緣由,持鎖線程可重複的獲取鎖。其餘狀況則會返回false,回到acquire方法中。

當前面的操做都不能成功搶到鎖的時候,就會將線程加入到等待隊列中。

addWaiter方法,線程進入等待隊列

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

 咱們前面提到過AQS中的兩個重要概念,以前已經用到一個state了,如今是FIFO隊列登場的時候了。

在AQS內部有一條雙向鏈表來存放等待線程,鏈表中的每一個節點都是一個Node對象。Node對象中維護了線程,等待狀態以及先後Node的指針。線程在加入隊列以前須要包裝成Node對象,而包裝線程的方法就是addWaiter。

Node有個表示共享模式的字段,在包裝時須要提供,這個值是由參數傳來的,默認是獨佔模式。當把線程包裝成Node對象之後,就須要入隊了。若是隊列不爲空,那麼此方法就會嘗試着快速入隊,若是快速入隊失敗或者隊列爲空,那麼就會使用正常的入隊方法。快速入隊是使用CAS的方式將新建的node放到隊尾,由於在多線程的環境下,有可能在放入隊尾的時候被其餘線程捷足先登,因此後面會有個正常入隊的操做。正常入隊操做就是循環入隊,直到成功爲止。入隊操做以後,須要作的就是阻塞等待線程。

acquireQueued方法,阻塞等待線程

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先經過node的predecessor方法獲取到前一個節點。若是前一個節點是頭結點(頭結點是獲取了鎖的線程),說明隊列中只有一個節點(有誤),說明此節點是即將運行的節點,那麼就會直接開始自旋搶鎖。若是前節點不是頭結點或搶鎖失敗,那麼就會調用shouldParkAfterFailedAcquire方法。

shouldParkAfterFailedAcquire方法

/**
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops.  Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

這個方法會檢查當前節點前節點的等待狀態,若是是SIGNAL,則表示前節點已經park了,即已經暫停了,那麼當前線程也就能夠安全暫定等待喚醒了。若是等待狀態大於0,則表示前節點已經被取消了,那麼會從當前節點往前遍歷,去掉已經被取消的節點。其餘狀況則將前節點的等待狀態改成SIGNAL。由於這個方法外部是一個死循環,因此最終會返回true 的。若是返回true,那就會執行parkAndCheckInterrupt方法。

parkAndCheckInterrupt方法

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
} 

這個方法比較簡單,就是阻塞當前線程,以防外層死循環消耗系統資源並等待被前節點喚醒。

總結

至此,非公平鎖獲取鎖的過程就結束了。簡單總結一下搶鎖的過程:

1. 經過CAS的方式將state(鎖數量)從0設置爲1(第一次插隊)。

  1.1 若是設置成功,設置當前線程爲獨佔鎖的線程。

  1.2 若是沒有成功,那麼會再獲取一次鎖數量。

    1.2.1 若是鎖數量爲0,那麼會再次用經過CAS的方式將state(鎖數量)從0設置爲1(第二次插隊)。

    1.2.2 若是鎖數量不爲0,或者第二次搶鎖失敗,那麼會查看獨佔鎖線程是不是當前線程。

      1.2.2.1 若是是,則將當前鎖數量+1。

      1.2.2.2 若是不是,則將當前線程包裝爲一個Node對象,並打入到阻等待隊列中去,等待被前節點喚醒。

相關文章
相關標籤/搜索