Java併發編程基礎(三)

線程間通訊

線程間通訊稱爲進程內通訊,多個線程實現互斥訪問共享資源時會互相發送信號貨這等待信號,好比線程等待數據到來的通知,線程收到變量改變的信號。多線程

線程阻塞(同步)和非阻塞(異步)

1.線程阻塞 每一個請求都會建立一個線程去處理請求,這樣的結果就致使了線程頻繁的建立消耗很大,沒法高吞吐的處理數據,大量的業務線程會頻繁的CPU上下文切換,下降了系統的處理能力.異步

2.線程非阻塞 服務有若干的線程,不須要每次頻繁的建立,在必定程度上減小了建立線程的消耗,線程不須要頻繁的在CPU上頻繁的切換(由於若干的線程數量是有限的),客戶端也不須要等待處理好結果後返回結果,提升了系統的吞吐量。可是客戶端想獲得處理結果需再次請求接口(這個可使用接口回調的方式解決)。this

單線程間的通訊

wait和notify

  1. wait有三個重載的方法,其中wait(0)表明永不超時。線程

  2. wait(long timeout)方法會致使當前線程進入阻塞,直到線程調用了Object的notify或者nitifyAll方法才能將其喚醒,或者阻塞時間到達了timeout時間而自動喚醒。code

  3. wait方法必須用於該對象monitor,也就是wait方法必需要同步的方法使用。對象

  4. 當前線程執行了該對象的wait方法以後,將會放棄對該monitor的全部全且進入與該對象關聯的wait set中,也就是說一旦線程執行了某個object的wait方法以後,他就會釋放對該對象的monitor的全部權,其餘線程也會有機會繼續爭奪該monitor的全部權。blog

例子:接口

public class EventQueueDemo {
	private final int max;
	static class Event {
	}
	private final LinkedList<Event> eventQueue = new LinkedList<>();
	private final static int DEFAULT_MAX_EVENT = 10;

	public EventQueueDemo() {
		this(DEFAULT_MAX_EVENT);
	}

	public EventQueueDemo(int max) {
		this.max = max;
	}
	public void offer(Event event) {
		synchronized (eventQueue) {
			if (eventQueue.size() >= max) {
				try {
					console("the queue is full");
					eventQueue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			console("the new event is submitted");
			//若是沒有滿就將生產一個,將他放入隊列的末尾。
			eventQueue.addLast(event);
			//告知消費者線程去消費
			eventQueue.notify();
		}
	}
	public Event take() {
		synchronized (eventQueue) {
			if (eventQueue.isEmpty()) {
				try {
					console("the queue is empty.");
					eventQueue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			//消費獲取隊頭的實例
			Event event = eventQueue.removeFirst();
			//通知生產這線程去生產
			this.eventQueue.notify();
			console("the event " + event + " is handle.");
			return event;
		}
	}
	private void console(String message) {
		System.out.printf("%s:%s\n", currentThread().getName(), message);
	}
	public static void main(String[] args) {
		final EventQueueDemo eventQueue = new EventQueueDemo();
		//生產線程
		new Thread(() -> {
			while (true)
			eventQueue.offer(new EventQueueDemo.Event());
		}, "Producer").start();
		//消費線程
		new Thread(() -> {
			while (true) {
				eventQueue.take();
				try {
					TimeUnit.MILLISECONDS.sleep(10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}, "Consumer").start();
	}
}

執行結果:隊列

使用wait和notify的注意事項進程

  • wait方法是可中斷的方法,這也就意味着,當前線程一旦調用了wait方法進入阻塞狀態,其餘線程是可使用interrupt方法將其打斷的,可中斷的方法被打斷以後會收到中斷一次InterruptedException,同時interrupt標識也會被擦出。

  • 線程執行了某個對象的wait方法之後,會加入與之對應的wait set中,沒個對象的monitor都有一個與之關聯的wait set

  • 當線程的進入了wait set以後,notify方法就能夠將其喚醒,也就是從wait set中彈出,同時中斷wait中的線程也會欸喚醒。

  • 必須在同步的方法重視那個使用wait方法和notify方法,由於執行了wait和notify的前提添加是必須持有同步的方法的monitor的全部權,不然將會收到IllegalMonitorStateException。

  • 同步代碼的monitor必須與執行wait notify 方法的對象一致,簡單的說就是用哪一個對象的monitor進行同步,就只能用哪一個對象進行wait和notify操做.

wait和sleep的區別

wait和sleep均可以使用當前的線程進入阻塞的狀態,可是這兩個方法仍是有本質上的區別的。

相同點:

1.wait和sleep均可以使得當前的線程進入阻塞狀態.

2.wait和sleep方法軍事可中孤單的方法,被中斷後會收到中孤單異常。

不一樣點:

1.wait事Object的方法,而sleep是Thread特有的方法.

2.wait方法的執行必須是在同步方法中執行的,sleep能夠不須要。

3.線程執行sleep的時候,並不會釋放monitor的鎖,而執行wait的方法會釋放monitor的鎖。

4.sleep方法短暫休眠以後會主訂的退出阻塞,而wait方法(沒有指定的wait的時間)則須要被氣壓線程中斷後纔可以退出阻塞。

多線程間通訊

生產,消費

多線程間的通訊須要使用到Object的notifyAll方法,該方法與notify比較相似,均可以喚醒因爲調用了wait方法而阻塞的線程,可是notify方法只能喚醒其中的一個線程,而notifyAll方法能夠同時喚醒全國不的則色的線程,統一被喚醒的線程讓然須要繼續爭奪monitor的鎖。

須要注意的地方。 上面的例子只適合再單個生產着和單個消費者的場景使用,在多個生產着和多個消費者的時候會出校生產的大於最大值了還在生產,或者隊列爲空了還再消費。

須要改進就只須要講判斷的地方該成循環的。

改進的例子:

public class EventQueueMultiThreadDemo {

	private final int max;

	static class Event {

	}

	private final LinkedList<Event> eventQueue = new LinkedList<>();
	private final static int DEFAULT_MAX_EVENT = 10;

	public EventQueueMultiThreadDemo() {
		this(DEFAULT_MAX_EVENT);
	}

	public EventQueueMultiThreadDemo(int max) {
		this.max = max;
	}

	public void offer(Event event) {
		synchronized (eventQueue) {
			while (eventQueue.size() >= max) {
				try {
					console("the queue is full");
					eventQueue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			console("the new event is submitted");
			eventQueue.addLast(event);
			eventQueue.notify();
		}
	}

	public Event take() {
		synchronized (eventQueue) {
			while (eventQueue.isEmpty()) {
				try {
					console("the queue is empty.");
					eventQueue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			Event event = eventQueue.removeFirst();
			this.eventQueue.notify();
			console("the event " + event + " is handle.");
			return event;
		}
	}

	private void console(String message) {
		System.out.printf("%s:%s\n", currentThread().getName(), message);
	}

	public static void main(String[] args) {
		final EventQueueMultiThreadDemo eventQueue = new EventQueueMultiThreadDemo();
		new Thread(() -> {
			while (true)
			eventQueue.offer(new EventQueueMultiThreadDemo.Event());
		}, "Producer").start();

		new Thread(() -> {
			//這裏該成循環判斷,避免同一類的線程沒有判斷就
			//直接又往下跑了。
			while (true) {
				eventQueue.take();
				try {
					TimeUnit.MILLISECONDS.sleep(10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

		}, "Consumer").start();

		new Thread(() -> {
			//這裏該成循環判斷,避免同一類的線程沒有判斷就
			//直接又往下跑了。
			while (true) {
				eventQueue.take();
				try {
					TimeUnit.MILLISECONDS.sleep(10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

		}, "Consumer2").start();
	}
相關文章
相關標籤/搜索