Java併發4:鎖

Lock接口

在Lock接口出現以前,Java程序依靠 synchronized 關鍵字實現鎖的功能。鎖提供了相似的同步功能,只是在使用時須要顯式獲取和釋放鎖,同時還擁有了鎖的獲取釋放的操做性、可中斷的獲取鎖以及超時獲取鎖等多種 synchronized 不具有的同步特性。java

Lock接口提供的 synchronized 接口不具有的特性:node

  • 嘗試非阻塞的獲取鎖:當前線程嘗試獲取鎖,若是這一時刻鎖沒有被其餘線程獲取到,則成功獲取並持有鎖
  • 能被中斷的獲取鎖:與 synchronized 不一樣,獲取到鎖的線程能響應中斷,當獲取到鎖的線程被中斷,中斷異常被拋出,同時釋放鎖
  • 超時獲取鎖:在指定的截止時間以前獲取鎖,若是時間到了仍然沒法獲取鎖,返回

Lock的經常使用方法:編程

  • void lock() 獲取鎖,獲取鎖後返回
  • void lockInterruptibly throws InterruptedException可中斷的獲取鎖,在鎖的獲取中能夠中斷當前線程
  • boolean tryLock()嘗試非阻塞的獲取鎖,調用後當即返回,獲取了返回true,不然返回false
  • boolean tryLock(long time,TimeUnit unit) throws InterruptedException超時獲取鎖。當前線程在時間內得到了鎖,返回true;當前線程在時間內被中斷,拋出異常;超出時間,返回false
  • void unlock()釋放鎖
  • Condition newCondition()獲取等待通知組件,該組件與當前的鎖綁定。當前線程獲取了鎖,才能調用該組件的wait()方法,調用後,當前線程將釋放鎖

隊列同步器AQS

隊列同步器(AbstractQueuedSynchronizer)是用來構建鎖或者其餘同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,經過內置的FIFO隊列完成資源獲取線程的排隊工做。安全

AQS 接口

同步器的時機是基於模板方法模式,使用者須要繼承同步器並重寫特定方法。重寫時,使用如下三個方法訪問或修改同步狀態:併發

  • getState()獲取當前同步狀態
  • setState(int newState)設置當前同步狀態
  • compareAndSetState(int expect,int update)使用CAS設置當前狀態,該方法能保證狀態設置的原子性

除此以外,同步器提供了模板方法,分爲三類:獨佔式獲取與釋放同步狀態,共享式獲取與釋放同步狀態,查詢同步隊列中的等待線程狀況。框架

AQS實現分析

同步隊列

AQS 內部依賴一個同步的 FIFO 雙向隊列來完成同步狀態的管理。當前線程獲取同步狀態失敗時,將當前線程以及等待狀態等信息構形成一個節點並將其加入同步隊列,同時阻塞該線程;當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。ui

同步隊列中,一個幾點表明一個線程,保存着線程的引用、等待狀態、前驅和後繼節點。this

  • int waitStatus 等待狀態
  • Node prev 前驅
  • Node next 後繼
  • Node nextWaiter 等待隊列中的後繼節點
  • Thread thread 獲取同步狀態的線程

等待狀態包含以下:spa

  • CANCELLED: 在同步隊列中等待的線程超時或被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會變化
  • SIGNAL: 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者取消,將會通知後繼節點,使得後繼節點的線程得以運行
  • CONDITION:節點在等待隊列中,節點線程等待在 Condition 上,當其餘線程對 Condition 調用了 signal() 方法後,該節點會從等待隊列轉移到同步隊列中
  • PROPAGATE:表示下一次共享式同步狀態獲取將無條件傳播下去
  • INITIAL:初始狀態

入隊

同步器提供了一個基於CAS的設置尾節點的方法compareAndSetTail(Node expect,Node update),傳遞的兩個參數是當前線程認爲的尾節點和當前的節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。線程

出隊

同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,會喚醒後繼節點,後繼節點將會在獲取同步狀態成功時將本身設置爲首節點。

獨佔式同步狀態獲取與釋放

獨佔式也就是同一時刻僅有一個線程持有同步狀態。

同步狀態獲取

獨佔式同步狀態獲取採用acquire(int arg)方法。該方法對中斷不敏感,因爲線程獲取同步狀態失敗加入到同步隊列中,後序對線程進行中斷操做時,線程不會從隊列中移除。

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&//獲取同步狀態
        //首先生成節點加入隊列
        //而後等待前驅節點成爲頭節點並獲取同步狀態
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼
  • tryAcquire:嘗試獲取鎖,獲取成功則設置鎖狀態並返回true,不然返回false
  • addWaiter:若是獲取鎖失敗,調用該方法將當前線程加入到同步隊列尾部
  • acquireQueued:當前線程根據公平性原則進行阻塞等待(自旋,也就是死循環),直到獲取鎖爲止。返回當前線程在等待過程當中有沒有中斷過。
  • selfInterrupt:產生一箇中斷
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

addWaiter(Node mode) 用來構造節點以及加入同步隊列。經過使用compareAndSetTail(Node expect,Node update)來確保節點能被線程安全添加。在enq(final Node node)方法中,使用死循環保證節點的正確添加。在死循環中,只有經過CAS將節點設置爲尾節點以後,當前線程才能從該方法返回。

此時,節點進入同步隊列以後,進入了一個自選的過程。當條件知足,得到了鎖,退出隊列。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //當前驅是頭結點,嘗試獲取同步狀態
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

複製代碼

在該方法中,只有前驅是頭節點才能嘗試獲取同步狀態。緣由有兩個:1. 頭節點是成功獲取到同步狀態的節點,頭節點線程釋放鎖,喚醒其後繼節點;2. 維護同步隊列的FIFO原則。

總體流程以下圖所示:

同步狀態釋放

當前線程獲取同步狀態並執行了相應邏輯後,須要釋放同步狀態,使得後續節點能繼續獲取同步狀態。調用release(int arg)方法釋放同步狀態,在釋放同步狀態以後,會喚醒其後繼節點。

public final boolean release(int arg) {
        if (tryRelease(arg)) {//釋放鎖
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//喚醒頭節點的後繼節點
            return true;
        }
        return false;
    }
複製代碼

總結:獲取同步狀態時,維護一個同步隊列,獲取狀態失敗的線程加入隊列並進行自旋;移出隊列的條件是前驅爲頭節點且成功獲取同步狀態。釋放時,先釋放同步狀態,而後喚醒頭節點的後繼節點。

共享式同步狀態獲取與釋放

共享式與獨佔式的最主要區別在於同一時刻獨佔式只能有一個線程獲取同步狀態,而共享式在同一時刻能夠有多個線程獲取同步狀態。例如讀操做能夠有多個線程同時進行,而寫操做同一時刻只能有一個線程進行寫操做,其餘操做都會被阻塞。

同步狀態獲取

AQS提供了acquireShared(int arg)共享式獲取同步狀態

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //前驅是頭結點,獲取同步狀態
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

acquireShared(int arg)方法中,首先調用tryAcquireShared(int arg)嘗試獲取同步狀態,返回一個int,當返回值大於等於0,表示能獲取到同步狀態。不然,獲取失敗調用doAcquireShared(int arg)自旋獲取同步狀態。

同步狀態釋放

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
複製代碼

該方法釋放同步狀態後,將會喚醒後續處於等待狀態的節點。由於可能會存在多個線程同時進行釋放同步狀態資源,因此須要確保同步狀態安全地成功釋放,通常都是經過CAS和循環來完成的。

獨佔式超時獲取同步狀態

AQS提供了tryAcquireNanos(int arg,long nanos)方法,是acquireInterruptibly(int arg)的加強。除了響應中斷以外,還有超時控制。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
複製代碼

其中,doAcquireNanos(arg, nanosTimeout)用來超時獲取同步狀態。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //獲取同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //獲取失敗
                //從新計算須要的休眠時間
                nanosTimeout = deadline - System.nanoTime();
                //超時返回
                if (nanosTimeout <= 0L)
                    return false;
                //未超時,等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //中斷處理
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

與獨佔式獲取同步狀態的區別在於未獲取到同步狀態時的處理邏輯。獨佔式在未獲取到同步狀態時,會使當前線程一致處於等待狀態,而超時獲取會使當前線程等待nanosTimeout納秒,若是沒有獲取到,將返回。

可重入鎖ReentrantLock

可重入鎖也就是支持從新進入的鎖,它表示該鎖能支持一個線程對資源的重複加鎖。它能夠等同於 synchronized 的使用(synchronized 隱式支持重入),可是提供了比 synchronized 更強大更靈活的鎖機制,能夠減小死鎖發生的機率。

ReentrantLock 還提供了公平鎖和非公平鎖的選擇,構造方法中接受一個可選的公平參數,默認是非公平的。公平鎖也就是等待時間最長的線程最優先獲取鎖。可是公平鎖的效率每每沒有非公平的機率高。

ReentrantLock 有一個內部類 Sync,Sync 繼承AQS,有兩個子類:公平鎖 FairSync 和非公平鎖 NonfairSync。

獲取鎖

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //獲取同步狀態
            int c = getState();
            //鎖處於空閒狀態
            if (c == 0) {
            //獲取鎖成功,設置爲當前線程全部
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //判斷鎖持有的線程是不是當前線程
            //若是是持有鎖的線程再次請求,將同步狀態值進行增長並返回true
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
複製代碼

首先判斷同步狀態,若是爲0說明尚未被線程持有,經過CAS獲取同步狀態,若是成功返回true。不然,判斷當前線程是否爲獲取鎖的線程,若是是則獲取鎖,成功返回true。成功過去鎖的線程再次獲取鎖,並將同步狀態值增長。

釋放鎖

protected final boolean tryRelease(int releases) {
        //減掉releases
        int c = getState() - releases;
        //若是釋放的不是持有鎖的線程,拋出異常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //state == 0 表示已經釋放徹底了,其餘線程能夠獲取同步狀態了
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
複製代碼

將同步狀態是否爲0做爲最終釋放的條件,當同步狀態爲0時,將佔有線程設置爲null,並返回true,表示釋放成功。

公平鎖

若是一個鎖是公平的,那麼按照請求的絕對時間順序也就是FIFO進行鎖的獲取。公平鎖的獲取方法以下:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
複製代碼

與非公平鎖獲取鎖的方法不一樣,二者區別在於公平鎖在獲取同步狀態時多了一個限制條件:hasQueuedPredecessors()。該方法用來判斷當前線程是否位於CLH同步隊列中的第一個,是則返回true,不然返回false。

公平鎖保證了鎖的獲取按照FIFO原則,代價是進行大量的線程切換。非公平鎖雖然可能形成線程「飢餓」,可是極少的線程切換,保證了其更大的吞吐量。

與 synchronized 比較

  1. synchronized 是JVM實現的, ReentrantLock 是JDK實現的。
  2. 當持有鎖的線程長期不釋放鎖,正在等待的線程能夠選擇放棄等待,ReentrantLock 能夠中斷,synchronized 不可中斷。
  3. synchronized 是非公平鎖,ReentrantLock 默認是非公平,可是能夠設置爲公平鎖。
  4. ReentrantLock還提供了條件Condition,對線程的等待、喚醒操做更加詳細和靈活,在多個條件變量和高度競爭鎖的地方,ReentrantLock更加適合。
  5. 除非要使用 ReentrantLock 的高級功能,不然優先使用 synchronized。這是由於 synchronized 是 JVM 實現的一種鎖機制,JVM 原生地支持它,而 ReentrantLock 不是全部的 JDK 版本都支持。而且使用 synchronized 不用擔憂沒有釋放鎖而致使死鎖問題,由於 JVM 會確保鎖的釋放。

讀寫鎖

ReentrantLock 是排他鎖,同一時刻只容許一個線程進行訪問,而讀寫鎖在同一時刻能夠容許多個讀線程訪問,寫線程訪問時,其餘的線程均被阻塞。經過讀鎖和寫鎖分離,使得併發性相比通常的排他鎖有了很大提高。

讀寫鎖的主要特性:

  1. 公平性:支持公平性鎖和非公平鎖
  2. 重進入:支持重入。讀線程獲取讀鎖後能夠再次獲取讀鎖。寫線程獲取寫鎖後能再次獲取寫鎖,也能獲取讀鎖。
  3. 鎖降級:遵循獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能降級成爲讀鎖

接口示例

ReentrantReadWriteLock 實現了接口 ReadWriteLock,維護了一對相關的鎖。

public interface ReadWriteLock {
    Lock readLock();//返回讀鎖
    Lock writeLock();//返回寫鎖
}
複製代碼

ReentrantReadWriteLock 使用了三個內部類

/** 內部類 讀鎖 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 內部類 寫鎖 */
private final ReentrantReadWriteLock.WriteLock writerLock;
    
/** 同步器 */
final Sync sync;
複製代碼

構造方法以下:

/** 使用默認(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
複製代碼

讀寫鎖一樣依賴AQS實現同步功能,讀寫狀態就是其同步器的同步狀態。所以其同步器須要在同步狀態(一個整形變量)上維護多個讀線程和一個寫線程的狀態。

在 ReentrantLock 中使用一個 int 類型的 state 表示同步狀態,表示鎖被一個線程重複獲取的次數。讀寫鎖須要用一個變量維護多種狀態,因此採用了「按位切割使用」的方式維護這個變量。讀寫鎖將變量切分爲兩個部分,高16位表示讀,低16位表示寫。分割以後,讀寫鎖經過位運算肯定讀和寫各自的狀態。假設當前狀態爲S,寫狀態等於 S&0X0000FFFF (抹去高16位),讀狀態等於 S>>>16 (無符號補0右移16位)。寫狀態增長1,S = S+1,讀狀態加1,S = S + ( 1 << 16 )。

寫鎖

寫鎖是一個支持重進入的排它鎖。

獲取

protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //當前同步狀態
        int c = getState();
        //寫鎖狀態
        int w = exclusiveCount(c);
        if (c != 0) {
            //c != 0 && w == 0 表示存在讀鎖
            //當前線程不是已經獲取寫鎖的線程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //超出最大範圍
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        //是否須要阻塞
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //設置獲取鎖的線程爲當前線程
        setExclusiveOwnerThread(current);
        return true;
    }
複製代碼

首先獲取同步狀態和寫鎖的同步狀態。若是存在讀鎖,或者當前線程不是持有寫鎖的線程,不能得到寫鎖。若是能獲取寫鎖,且未超出最大範圍,則更新同步狀態並返回true。

釋放

寫鎖的釋放與 ReentrantLock 釋放相似,每次釋放減小寫狀態,當寫狀態爲0表示寫鎖已經被釋放。

// WriteLock類提供了unlock()釋放寫鎖
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {//調用AQS方法釋放鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

複製代碼
//這是定義在Sync類中的方法
protected final boolean tryRelease(int releases) {
    //釋放的線程不爲鎖的持有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //若寫鎖的新線程數爲0,則將鎖的持有者設置爲null
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
複製代碼

讀鎖

讀鎖是一個支持重進入的共享鎖,能被多個線程同時獲取。

在寫狀態爲0時,讀鎖總會被成功獲取,而後增長讀狀態。若是當前線程已經獲取了讀鎖,則增長讀狀態;若是獲取讀鎖時,寫鎖已經被其餘線程獲取,則進入等待狀態。讀狀態時全部線程獲取讀鎖次數的總和,而每一個線程各自獲取讀鎖的次數只能保存在 ThreadLocal 中。

讀鎖的每次釋放均減小讀狀態。

鎖降級

鎖降級指的是寫鎖降級爲讀鎖。鎖降級是指把持有的寫鎖把持住,再獲取到讀鎖,而後釋放擁有的寫鎖的過程。也就是須要遵循先獲取寫鎖、獲取讀鎖再釋放寫鎖的次序才能夠稱爲鎖降級。

鎖降級中讀鎖的獲取是必要的,主要是爲了保證數據的可見性。若是當前線程A不獲取讀鎖而是直接釋放寫鎖,此刻另外一線程B獲取了寫鎖並修改了數據,那麼當前線程A沒法感知線程B的數據更新。若是當前線程A遵循鎖降級的規則,則線程B會被阻塞,直到當前線程A使用數據並釋放讀鎖以後,線程B才能獲取寫鎖進行數據更新。

Condition 接口

在 synchronized 控制同步時,配合 Object 的 wait(), notify(), notifyAll() 等方法實現等待/通知模式。Lock 提供了條件 Condition 接口,二者配合實現了等待/通知模式。

Condition使用

Condition 定義了等待/通知兩種類型的方法,線程調用這些方法時,須要提早獲取 Condition 關聯的鎖。Condition 對象是由 Lock 對象建立出來的。

public class ConditionCase {
    Lock lock=new ReentrantLock();
    Condition condition=lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal(){
        lock.lock();
        try{
            condition.signal();
        }finally {
            lock.unlock();
        }
    }
}
複製代碼

將 Condition 做爲成員變量,調用 await() 方法形成當前線程在接到信號或被中斷以前一直處於等待狀態。調用signal()或者signalAll()方法會喚醒一個或全部的等待線程,可以從等待方法返回的線程必須獲取與 Condition 相關的鎖。

Condition的實現

經過 Lock 的newCondition()方法獲取 Condition。Condition 是一個接口,其爲一個的實現類是 ConditionObject,且是同步器AQS的內部類。

等待隊列

每一個 ConditionObject 包含一個FIFO的隊列,在隊列中的每一個節點都包含了一個線程引用,該線程就是在 Condition 對象上等待的線程。當一個線程調用 await()方法,那麼該線程將釋放鎖,構形成節點加入等待隊列並進入等待狀態。此處的節點依然是AQS的內部類 Node。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    //頭節點
    private transient Node firstWaiter;
    //尾節點
    private transient Node lastWaiter;

    public ConditionObject() {
    }

    /** 省略方法 **/
}
複製代碼

Condition 擁有頭節點和尾節點的引用。當一個線程調用await()方法,將該線程構形成一個幾點,加入隊列尾部。

在 Object 的監視器模型上,一個對象擁有一個同步隊列和一個等待隊列;並罰保中的同步器擁有一個同步隊列和多個等待隊列,每一個 Condition 對應一個等待隊列。

等待

調用 Condition 的await()系列方法將使當前線程釋放鎖並進入等待狀態。當從該方法返回時,當前線程必定獲取了 Condition 相關的鎖。從隊列的角度看,當調用該方法時,至關於同步隊列的首節點(也就是獲取了鎖的節點)移動到 Condition 的等待隊列中。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //將當前線程加入等待隊列
            Node node = addConditionWaiter();
            //釋放鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //檢查該節點的線程是否在同步隊列上,若是不在,還不具有競爭鎖的資格,繼續等待
            while (!isOnSyncQueue(node)) {
                //掛起線程
                LockSupport.park(this);
                //線程已經中斷則退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //競爭同步狀態
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
複製代碼

調用該方法的線程是同步隊列中的首節點(獲取鎖的線程)。該方法將當前線程構形成節點加入等待隊列,釋放同步狀態,喚醒後繼節點,而後當前線程進入等待狀態。而後不斷監測該節點表明的線程是否出如今同步隊列中(也就是收到了signal信號),若是不是,則掛起;不然開始競爭同步狀態。

通知

調用 Condition 的 signal()方法,將會喚醒在等待隊列的首節點。在喚醒節點以前,將節點移入同步隊列中。

public final void signal() {
        //檢測當前線程是否爲擁有鎖的獨
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //頭節點,喚醒條件隊列中的第一個節點
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);    //喚醒
    }
複製代碼

該方法首先會判斷當前線程是否已經獲取了鎖,而後喚醒等待隊列中的頭節點,具體來講,先將條件隊列中的頭節點移出,而後調用AQS的enq(Node node)方法將其安全地移到CLH同步隊列中。當節點移動到同步隊列中,當前線程再喚醒該節點的線程。

總結

一個線程獲取鎖後,經過調用Condition的await()方法,會將當前線程先加入到條件隊列中,而後釋放鎖,最後經過isOnSyncQueue(Node node)方法不斷自檢看節點是否已經在CLH同步隊列了,若是是則嘗試獲取鎖,不然一直掛起。當線程調用signal()方法後,程序首先檢查當前線程是否獲取了鎖,而後經過doSignal(Node first)方法喚醒CLH同步隊列的首節點。被喚醒的線程,將從await()方法中的while循環中退出來,而後調用acquireQueued()方法競爭同步狀態。

示例

//生產者-消費者問題
public class ConditionCase {
    private LinkedList<String> buffer;
    private int maxSize;
    private Lock lock;
    private Condition fullCondition;
    private Condition notFullCondition;

    public ConditionCase(int maxSize){
        this.maxSize=maxSize;
        buffer=new LinkedList<>();
        lock=new ReentrantLock();
        fullCondition=lock.newCondition();
        notFullCondition=lock.newCondition();
    }

    public void set(String string) throws InterruptedException {
        lock.lock();
        try {
            while(maxSize==buffer.size()){
                notFullCondition.await();
            }
            buffer.add(string);
            fullCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    public String set() throws InterruptedException {
        String string;
        try {
            lock.lock();
            while(buffer.size()==0){
                fullCondition.await();
            }
            string=buffer.poll();
            notFullCondition.signal();
        } finally {
            lock.unlock();
        }
        return string;
    }
}

複製代碼

參考資料

相關文章
相關標籤/搜索