Java併發——線程間通訊與同步技術

      傳統的線程間通訊與同步技術爲Object上的wait()、notify()、notifyAll()等方法,Java在顯示鎖上增長了Condition對象,該對象也能夠實現線程間通訊與同步。本文會介紹有界緩存的概念與實現,在一步步實現有界緩存的過程當中引入線程間通訊與同步技術的必要性。首先先介紹一個有界緩存的抽象基類,全部具體實現都將繼承自這個抽象基類:java

public abstract class BaseBoundedBuffer<V> {
	private final V[] buf;
	private int tail;
	private int head;
	private int count;

	protected BaseBoundedBuffer(int capacity) {
		this.buf = (V[]) new Object[capacity];
	}

	protected synchronized final void doPut(V v) {
		buf[tail] = v;
		if (++tail == buf.length)
			tail = 0;
		++count;
	}

	protected synchronized final V doTake() {
		V v = buf[head];
		buf[head] = null;
		if (++head == buf.length)
			head = 0;
		--count;
		return v;
	}

	public synchronized final boolean isFull() {
		return count == buf.length;
	}

	public synchronized final boolean isEmpty() {
		return count == 0;
	}
}

      在向有界緩存中插入或者提取元素時有個問題,那就是若是緩存已滿還須要插入嗎?若是緩存爲空,提取的元素又是什麼?如下幾種具體實現將分別回答這個問題。緩存

一、將異常傳遞給調用者併發

      最簡單的實現方式是:若是緩存已滿,向緩存中添加元素,咱們就拋出異常:this

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public GrumpyBoundedBuffer() {
		this(100);
	}

	public GrumpyBoundedBuffer(int size) {
		super(size);
	}

	public synchronized void put(V v) throws BufferFullException {
		if (isFull())
			throw new BufferFullException();
		doPut(v);
	}

	public synchronized V take() throws BufferEmptyException {
		if (isEmpty())
			throw new BufferEmptyException();
		return doTake();
	}
}

  這種方法實現簡單,可是使用起來卻不簡單,由於每次put()與take()時都必須準備好捕捉異常,這或許知足某些需求,可是有些人仍是但願插入時檢測到已滿的話,能夠阻塞在那裏,等隊列不滿時插入對象。spa

二、經過輪詢與休眠實現簡單的阻塞操作系統

      當隊列已滿插入數據時,咱們能夠不拋出異常,而是讓線程休眠一段時間,而後重試,此時可能隊列已經不是已滿狀態:線程

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
	int SLEEP_GRANULARITY = 60;

	public SleepyBoundedBuffer() {
		this(100);
	}

	public SleepyBoundedBuffer(int size) {
		super(size);
	}

	public void put(V v) throws InterruptedException {
		while (true) {
			synchronized (this) {
				if (!isFull()) {
					doPut(v);
					return;
				}
			}
			Thread.sleep(SLEEP_GRANULARITY);
		}
	}

	public V take() throws InterruptedException {
		while (true) {
			synchronized (this) {
				if (!isEmpty())
					return doTake();
			}
			Thread.sleep(SLEEP_GRANULARITY);
		}
	}
}

  這種實現方式最大的問題是,咱們很難肯定合適的休眠間隔,若是休眠間隔過長,那麼程序的響應性會變差,若是休眠間隔太短,那麼會浪費大量CPU時間。對象

三、使用條件隊列實現有界緩存blog

      使用休眠的方式會有響應性問題,由於咱們沒法保證當隊列爲非滿狀態時線程就會馬上sleep結束而且檢測到,因此,咱們但願能有另外一種實現方式,當緩存非滿時,會主動喚醒線程,而不是須要線程去輪詢緩存狀態,Object對象上的wait()與notifyAll()可以實現這個需求。當調用wait()方法時,線程會自動釋放鎖,並請求請求操做系統掛起當前線程;當其餘線程檢測到條件知足時,會調用notifyAll()方法喚醒掛起線程,實現線程間通訊與同步:繼承

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public BoundedBuffer() {
		this(100);
	}

	public BoundedBuffer(int size) {
		super(size);
	}

	public synchronized void put(V v) throws InterruptedException {
		while (isFull())
			wait();
		doPut(v);
		notifyAll();
	}

	public synchronized V take() throws InterruptedException {
		while (isEmpty())
			wait();
		V v = doTake();
		notifyAll();
		return v;
	}

	public synchronized void alternatePut(V v) throws InterruptedException {
		while (isFull())
			wait();
		boolean wasEmpty = isEmpty();
		doPut(v);
		if (wasEmpty)
			notifyAll();
	}
}

  注意,上面的例子中咱們使用了notifyAll()喚醒線程而不是notify()喚醒線程,若是咱們改用notify()喚醒線程的話,將致使錯誤的,notify()會在等待隊列中隨機選擇一個線程喚醒,而notifyAll()會喚醒全部等待線程。對於上面的例子,若是如今是非滿狀態,咱們使用notify()喚醒線程,因爲只能喚醒一個線程,那麼咱們喚醒的多是在等待非空狀態的線程,將致使信號丟失。只有同時知足如下兩個條件時,才能用單一的notify而不是notifyAll:

  1. 全部等待線程的類型都相同。只有一個條件謂詞與條件隊列相關,而且每一個線程在從wait返回後將執行相同的操做。
  2. 單進單出。在條件變量上的每次通知,最多隻能喚醒一個線程來執行。

四、使用顯示的Condition實現有界緩存     

      內置條件隊列存在一些缺陷,每一個內置鎖都只能有一個相關聯的條件隊列,於是像上個例子,多個線程都要在同一個條件隊列上等待不一樣的條件謂詞,若是想編寫一個帶有多個條件謂詞的併發對象,就可使用顯示的鎖和Condition,與內置鎖不一樣的是,每一個顯示鎖能夠有任意數量的Condition對象。如下代碼給出了有界緩存的另外一種實現,即便用兩個Condition,分別爲notFull和notEmpty,用於表示"非滿"與"非空"兩個條件謂詞。

public class ConditionBoundedBuffer<T> {
	protected final Lock lock = new ReentrantLock();
	private final Condition notFull = lock.newCondition();
	private final Condition notEmpty = lock.newCondition();
	private static final int BUFFER_SIZE = 100;
	private final T[] items = (T[]) new Object[BUFFER_SIZE];
	private int tail, head, count;

	public void put(T x) throws InterruptedException {
		lock.lock();
		try {
			while (count == items.length)
				notFull.await();
			items[tail] = x;
			if (++tail == items.length)
				tail = 0;
			++count;
			notEmpty.signal();
		} finally {
			lock.unlock();
		}
	}

	public T take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0)
				notEmpty.await();
			T x = items[head];
			items[head] = null;
			if (++head == items.length)
				head = 0;
			--count;
			notFull.signal();
			return x;
		} finally {
			lock.unlock();
		}
	}
}

      注意,在上面的例子中,因爲使用了兩個Condition對象,咱們的喚醒方法調用的是signal()方法,而不是signalAll()方法。

      使用條件隊列時,須要特別注意鎖、條件謂詞和條件變量之間的三元關係:在條件謂詞中包含的變量必須由鎖保護,在檢查條件謂詞以及調用wait和notify(或者await和signal)時,必須持有鎖對象。

相關文章
相關標籤/搜索