基於AQS的Condition接口

  • 簡介

Condition接口實現的功能與Object類中的wait/notify(等待/通知機制)相似,Object的wait和notify/notify是與對象監視器(java線程狀態操做和鎖與監視器)配合完成線程間的等待/通知機制,而Condition與Lock配合完成等待通知機制,前者是java底層級別的,後者是語言級別的,具備更高的可控制性和擴展性java


  • 區別
  1. Condition可以支持不響應中斷,而經過使用Object方式不支持;
  2. Condition可以支持多個等待隊列(new 多個Condition對象),而Object方式只能支持一個;
  3. Condition可以支持超時時間的設置,而Object不支持

  • 主要方法
  1. void await() 形成當前線程在接到信號或被中斷以前一直處於等待狀態
  2. boolean await(long time, TimeUnit unit) 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態
  3. void signal() 喚醒一個線程
  4. void signalAll() 喚醒所有線程
  • 線程必須在獲取到鎖的前提下調用await,圖中展現了線程獲取到鎖後的操做
  1. AQS同步隊列的頭結點(head)獲取到鎖後,從AQS同步隊列中被剔除,而且切斷了與後續節點的聯繫
  2. 由於Condition基於AQS,因此AQS同步隊列與Condition等待隊列用的Node類是同一個
  3. 將獲取到鎖的線程包裝成Node對象後加入到Condition等待隊列的尾部(lastWaiter)

// 與Object wait()同樣,調用await()方法的線程必須先獲取鎖
public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	// 將線程封裝成Node,加入到Condition等待隊列的尾部
	Node node = addConditionWaiter();
	// 釋放當前線程所佔用的lock,在釋放的過程當中會喚醒同步隊列中的下一個節點
	// 與Object wait()同樣,await也會釋放當前獲取的鎖
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	// 判斷當前node是否在AQS同步隊列中,若是不在就阻塞等待加入AQS同步隊列後喚醒
	while (!isOnSyncQueue(node)) {
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	// 從AQS中獲取鎖後繼續執行
	// acquireQueued方法參考這篇博客
	// [AQS](https://my.oschina.net/kdy1994/blog/3022593 "AQS")
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

// 將線程封裝成Node對象,加入到Condition等待隊列的尾部
// firstWaiter 表示Conditon等待隊列的頭結點,lastWaiter是尾結點
private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.
	if (t != null && t.waitStatus != Node.CONDITION) {
		unlinkCancelledWaiters();
		t = lastWaiter;
	}
	// Node.CONDITION = -2
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null)
		firstWaiter = node;
	else
		t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

public final void signal() {
	// 判斷是否獲取了鎖
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	// firstWaiter Condition等待隊列的頭結點
	Node first = firstWaiter;
	if (first != null)
		doSignal(first);
}

private void doSignal(Node first) {
	do {
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		// 將頭結點從等待隊列中移除
		first.nextWaiter = null;
	} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	//將node waitStatus設置爲0
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
		return false;
	// 將node加入AQS同步隊列尾部,加入後await()的while循環條件不成立了
	Node p = enq(node);
	int ws = p.waitStatus;
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
		LockSupport.unpark(node.thread);
	return true;
}

  • 圖中主要想表達的是AQS同步隊列與Condition等待隊列的頭和尾的變化狀況
  1. AQS同步隊列的尾(tail)變成了Condition等待隊列的頭(firstWaiter)node

  2. Condition等待隊列的firstWaiter變成了以前firstWaiter的nextWaiter(doSigna()方法的if判斷)ui

    if ( (firstWaiter = first.nextWaiter) == null)this

  3. Condition等待隊列的firstWaiter與next的聯繫被中斷了(doSignal()方法中.net

    first.nextWaiter = null線程

  • Condition的await與signal流程
  • awaitThread
  1. awaitThread獲取鎖成功,則執行await()方法進入condition等待隊列尾部,調用LockSupport.park(this)阻塞線程
  2. awaitThread獲取鎖失敗,則進入AQS同步隊列尾部,等待其餘線程釋放鎖後,從新競爭鎖成功後執行上一步
  3. signalThread執行signal()後,awaitThread從Condition等待隊列中加入到了AQS同步隊列尾部後,當await線程unPark後,while循環條件不成立跳出循環while (!isOnSyncQueue(node)) 接下來執行acquireQueued(node, savedState),從AQS同步隊列中獲取鎖後結束退出
  • signalThread
  1. signalThread獲取鎖成功,則執行signal()將Condition等待隊列頭結點(firstWaiter )加入到AQS同步隊列尾部(tail),結束退出
  2. signalThread獲取鎖失敗,則進入AQS同步隊列尾部,等待其餘線程釋放鎖後,從新競爭鎖成功後執行上一步

  • 基於Condition的寫入與消費示例
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            // 若是「緩衝已滿」,則等待;直到「緩衝」不是滿的,纔將x添加到緩衝中。
            while (count == items.length)
                notFull.await();
            // 將x添加到緩衝中
            items[putptr] = x;
            // 將「put統計數putptr+1」;若是「緩衝已滿」,則設putptr爲0。
            if (++putptr == items.length) putptr = 0;
            // 將「緩衝」數量+1
            ++count;
            // 喚醒take線程,由於take線程經過notEmpty.await()等待
            notEmpty.signal();

            // 打印寫入的數據
            System.out.println(Thread.currentThread().getName() + " put  " + (Integer) x);
        } finally {
            lock.unlock();    // 釋放鎖
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            // 若是「緩衝爲空」,則等待;直到「緩衝」不爲空,纔將x從緩衝中取出。
            while (count == 0)
                notEmpty.await();
            // 將x從緩衝中取出
            Object x = items[takeptr];
            // 將「take統計數takeptr+1」;若是「緩衝爲空」,則設takeptr爲0。
            if (++takeptr == items.length) takeptr = 0;
            // 將「緩衝」數量-1
            --count;
            // 喚醒put線程,由於put線程經過notFull.await()等待
            notFull.signal();

            // 打印取出的數據
            System.out.println(Thread.currentThread().getName() + " take " + (Integer) x);
            return x;
        } finally {
            lock.unlock();    // 釋放鎖
        }
    }
}

public class ConditionTest {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 啓動10個「寫線程」,向BoundedBuffer中不斷的寫數據(寫入0-9);
        // 啓動10個「讀線程」,從BoundedBuffer中不斷的讀數據。
        for (int i = 0; i < 10; i++) {
            new PutThread("p" + i, i).start();
            new TakeThread("t" + i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;

        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }

        public void run() {
            try {
                Thread.sleep(1);    // 線程休眠1ms
                bb.put(num);        // 向BoundedBuffer中寫入數據
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }

        public void run() {
            try {
                Thread.sleep(10);                    // 線程休眠1ms
                Integer num = (Integer) bb.take();    // 從BoundedBuffer中取出數據
            } catch (InterruptedException e) {
            }
        }
    }
相關文章
相關標籤/搜索