java編程思想之併發(線程之間的協做)

當你使用多線程來同時運行多個任務時,能夠經過使用鎖來同步兩個任務的行爲,從而使的一個任務不會干涉另外一個任務的資源。也就是說,若是兩個任務交替的步入某項共享資源,你可使用互斥來保證任什麼時候刻只有一個任務能夠訪問這項資源。java

線程之間的協做

上面的問題已經解決了,下一步是如何使得任務彼此之間能夠協做,使得多個任務能夠一塊兒工做去解決某個問題。如今的問題不是彼此之間的干涉,而是彼此之間的協做。解決這類問題的關鍵是某些部分必須在其餘部分被解決以前解決。編程

當任務協做時,關鍵問題是這些任務之間的握手。爲了實現握手,咱們使用了相同的基礎特性:互斥。在這種狀況下,互斥可以確保只有一個任務能夠響應某個信號,這樣就能根除任何可能的競爭條件。在互斥上,咱們爲任務添加了一種途徑,能夠將自身掛起,直至某些外部條件發生變化,表示是時候讓這個任務開始爲止。安全

wait() 與 notifyAll()

wait() 可使你等待某個條件發生變化,而改變這個條件一般是由另外一個任務來改變。你確定不想在你的任務測試這個條件的同時,不斷的進行空循環,這被稱爲忙等待,是一種不良的 cpu 使用方式。所以 wait() 會在外部條件發生變化的時候將任務掛起,而且只有在 notif() 或 notifAll() 發生時,這個任務纔會被喚醒並去檢查所發生的變化。所以,wait() 提供了一種在任務之間對活動同步的方式。bash

調用 sleep() 時候鎖並無被釋放,調用 yield() 也是同樣。當一個任務在方法裏遇到對 wait() 調用時,線程執行被掛起,對象的鎖被釋放。這就意味着另外一個任務能夠得到鎖,所以在改對象中的其餘 synchronized 方法能夠在 wait() 期間被調用。所以,當你在調用 wait() 時,就是在聲明:「我已經作完了全部的事情,可是我但願其餘的 synchronized 操做在條件什麼時候的狀況下可以被執行」。微信

有兩種形式的 wait():多線程

  • 第一種接受毫秒做爲參數:指再次暫停的時間。
    • 在 wait() 期間對象鎖是被釋放的。
    • 能夠經過 notif() 或 notifAll(),或者指令到期,從 wait() 中恢復執行。
  • 第二種不接受參數的 wait().
    • 這種 wait() 將無線等待下去,直到線程接收到 notif() 或 notifAll()。

wait()、notif()以及 notifAll() 有一個比較特殊的方面,那就是這些方法是基類 Object 的一部分,而不是屬於 Thread 類。僅僅做爲線程的功能卻成爲了通用基類的一部分。緣由是這些方法操做的鎖,也是全部對象的一部分。因此你能夠將 wait() 放進任何同步控制方法裏,而不用考慮這個類是繼承自 Thread 仍是 Runnable。實際上,只能在同步方法或者同步代碼塊裏調用 wait()、notif() 或者 notifAll()。若是在非同步代碼塊裏操做這些方法,程序能夠經過編譯,可是在運行時會獲得 IllegalMonitorStateException 異常。意思是,在調用 wait()、notif() 或者 notifAll() 以前必須擁有獲取對象的鎖。dom

好比,若是向對象 x 發送 notifAll(),那就必須在可以獲得 x 的鎖的同步控制塊中這麼作:ide

synchronized(x){
  x.notifAll();
}
複製代碼

咱們看一個示例:一個是將蠟塗到 Car 上,一個是拋光它。拋光任務在塗蠟任務完成以前,是不能執行其工做的,而塗蠟任務在塗另外一層蠟以前必須等待拋光任務完成。工具

public class Car {
  //塗蠟和拋光的狀態
	private boolean waxOn = false;
	//打蠟
	public synchronized void waxed() {
		waxOn = true;
		notifyAll();
	}

	//拋光
	public synchronized void buffed() {
		waxOn = false;
		notifyAll();
	}

	//拋光結束被掛起即將開始打蠟任務
	public synchronized void waitForWaxing() throws InterruptedException{
		while (waxOn == false) {
			wait();
		}
	}

	//打蠟結束被掛起即將開始拋任務
	public synchronized void waitForBuffing() throws InterruptedException{
		while (waxOn == true) {
			wait();
		}
	}


}

複製代碼

開始打蠟的任務:測試

public class WaxOn implements Runnable{
	private Car car;

	protected WaxOn(Car car) {
		super();
		this.car = car;
	}


	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				System.out.println("Wax one");
				TimeUnit.MICROSECONDS.sleep(200);
				//開始打蠟
				car.waxed();
				//當前任務被掛起
				car.waitForBuffing();
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println(" Exiting via interrupt");
		}
		System.out.println("Ending wax on task");
	}

}

複製代碼

開始拋光的任務:

public class WaxOff implements Runnable{
	private Car car;

	protected WaxOff(Car car) {
		super();
		this.car = car;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			while (!Thread.interrupted()) {
				//若是仍是在打蠟就掛起
				car.waitForWaxing();
				System.out.println("Wax off");
				TimeUnit.MICROSECONDS.sleep(200);
				//開始拋光
				car.buffed();
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("Wxtiing via interrupt");
		}
		System.out.println("Ending wax off task");
	}


}

複製代碼

測試類:

public class WaxOmatic {

	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Car car = new Car();
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(new WaxOff(car));
		service.execute(new WaxOn(car));
		//暫停2秒鐘
		TimeUnit.SECONDS.sleep(1);
		//關閉全部的任務
		service.shutdownNow();
	}

}

複製代碼

執行結果:

/.....
Wax one
Wax off
Wax one
Wax off
Wax one
Wax off
 Exiting via interrupt
Wxtiing via interrupt
Ending wax on task
Ending wax off task
複製代碼

在 waitForWaxing() 中檢查 WaxOn 標誌,若是它是 false,那麼這個調用任務將會被掛起。這個行爲發生在 synchronized 方法中這一點很重要。由於在這個方法中任務已經得到了鎖。當你調用 wait() 時,線程被掛起,而鎖被釋放。釋放鎖是本質所在,由於爲了安全的改變對象的狀態,其餘某個任務就必須可以得到這個鎖。

WaxOn.run() 表示給汽車打蠟的第一個步驟,它執行他的操做:調用 sleep() 模擬打蠟的時間,而後告知汽車打蠟結束,而且調用 waitForWaxing(),這個方法會調用 wait() 掛起當前打蠟的任務。直到 WaxOff 任務調用這兩車的 buffed(),從而改變狀態而且調用 notfiAll() 從新喚醒爲止。翻過來也是同樣的,在運行程序時,你能夠看到控制權在兩個任務之間來回的傳遞,這兩個步驟過程在不斷的重複。

錯失的信號

當兩個線程使用 notif()/wait() 或者 notifAll()/wait() 進行協做時,有可能會錯過某個信號。假設線程 T1 是通知 T2 的線程,而這兩個線程都使用下面的方式實現:

T1:
synchronized(X){
  //設置 T2 的一個條件
  <setup condition for T2>
  x.notif();
}

T2:
while(someCondition){
  //Potit
  synchronized(x){
    x.wait();
  }
}
複製代碼

以上的例子假設 T2 對 someCondition 發現其爲 true()。在執行 Potit 其中線程調度器可能切換到了 T1。而 T1 將會執行從新設置 condition,而且調用喚醒。當 T2 繼續執行時,以致於不能意識到條件已經發生變化,所以會盲目的進入 wait()。此時喚醒在以前已經調用過了,而 T2 將無限的等待下去喚醒的信號。

解決該問題的方案是防止 someCondition 變量上產生競爭條件:

synchronized(x){
  while(someCondition){
    x.wait();
  }
}

複製代碼

notif() 與 notifAll()

可能有多個任務在單個 Car 對象上被掛起處於 wait() 狀態,所以調用 notifyAll() 比調用 notify() 更安全。使用 notify() 而不是 notifyAll() 是一種優化。使用 notify() 時,在衆多等待同一個鎖的任務中只有一個被喚醒,所以若是你但願使用 notify(),就必須保證被喚醒的是恰當的任務。另外使用 notify() ,全部任務都必須等待相同的條件,由於若是你有多個任務在等待不一樣的條件,那你就不會知道是否喚醒了恰當的任務。若是使用 notfiy(),當條件發生變化時,必須只有一個任務能從中收益。最後,這些限制對全部可能存在的子類都必須總起做用。若是這些規則任何一條不知足都必須使用 notifyAll()。

在 Java 的線程機制中,有一個描述是這樣的:notifyAll() 將喚醒全部正在等待的任務。這是否意味着在程序中任何地方,任何處於 wait() 狀態中的任務都將被任何對 notifyAll() 的調用喚醒呢?在下面的實例中說明了狀況並不是如此,當 notifyAll() 因某個特定鎖被調用時,只有等待這個鎖的任務纔會被喚醒:

public class Blocker {
	 synchronized void waitingCall() {
	    try {
	      while(!Thread.interrupted()) {
	        wait();
	        System.out.print(Thread.currentThread() + " ");
	      }
	    } catch(InterruptedException e) {
	      // OK to exit this way
	    }
	  }
	  synchronized void prod() {
		  notify();
	  }

	  synchronized void prodAll() {
		  notifyAll();
	  }
}

複製代碼

建立任務 Task:

public class Task implements Runnable {
	  static Blocker blocker = new Blocker();
	  public void run() { blocker.waitingCall(); }
}

複製代碼

建立任務 Task2:

public class Task2 implements Runnable {
	  // A separate Blocker object:
	  static Blocker blocker = new Blocker();
	  public void run() { blocker.waitingCall(); }
}

複製代碼

測試類:

public class NotifyVsNotifyAll {
	public static void main(String[] args) throws Exception{
		ExecutorService service = Executors.newCachedThreadPool();
		for (int i = 0; i < 3; i++) {
			service.execute(new Task());
		}
			service.execute(new Task2());
			Timer timer = new Timer();
			timer.scheduleAtFixedRate(new TimerTask() {
				boolean prod = true;
				@Override
				public void run() {
					// TODO Auto-generated method stub
					if (prod) {
						System.out.println("notify");
						Task.blocker.prod();
						prod = false;
					}else {
						System.out.println("notifyAll");
						Task.blocker.prodAll();
						prod = true;
					}
				}
			}, 400, 400);
			TimeUnit.SECONDS.sleep(5);
			timer.cancel();
			System.out.println("Time cancle");
			TimeUnit.MILLISECONDS.sleep(500);
		    System.out.println("Task2.blocker.prodAll() ");
		    Task2.blocker.prodAll();
		    TimeUnit.MILLISECONDS.sleep(500);
		    System.out.println("\nShutting down");
		    service.shutdownNow(); // Interrupt all tasks

	}
}

複製代碼

測試結果:

notify
Thread[pool-1-thread-2,5,main] notifyAll
Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify
Thread[pool-1-thread-2,5,main] notifyAll
Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notify
Thread[pool-1-thread-2,5,main] notifyAll
Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify
Thread[pool-1-thread-2,5,main] notifyAll
Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notify
Thread[pool-1-thread-2,5,main] notifyAll
Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notify
Thread[pool-1-thread-2,5,main] notifyAll
Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] Time cancle
Task2.blocker.prodAll()
Thread[pool-1-thread-4,5,main]
Shutting down
複製代碼

從上面輸出的結果能夠看出,咱們啓動了三個 Task 任務線程,一個 Task2 線程。使用 timer 作了一個定時器,每間隔 4 毫秒就輪換啓動 Task.blocker 的 notify() 和 notifyAll()方法。咱們看到 Task 和 Task2 都有 Blocker 對象,他們調用 Blocker 對象的時候都會被阻塞。咱們看到當調用 Task.prod() 的時候只有一個在等待鎖的任務被喚醒,其他兩個繼續掛起。當調用 Task.prodAll() 的時候等待的三個線程都會被喚醒。當調用 Task2。prodAll() 的時候 只有 Task2 的線程任務被喚醒。其他的三個 Task 任務繼續掛起。

生產者與消費者

請考慮這樣一種狀況,在飯店有一個廚師和一個服務員。這個服務員必須等待廚師作好膳食。當廚師準備好時會通知服務員,以後服務員上菜,而後返回繼續等待。這是一個任務協做示例:廚師表明生產者,而服務員表明消費者。兩個任務必須在膳食被生產和消費時進行握手,而系統必須是以有序的方式關閉。

膳食類:

public class Meal {
	private final int orderNum;
	public Meal(int orderNum) { this.orderNum = orderNum; }
    public String toString() { return "Meal " + orderNum; }
}

複製代碼

服務生類:

public class WaitPerson implements Runnable {
	  private Restaurant restaurant;
	  public WaitPerson(Restaurant r) {
		  restaurant = r;
	  }

	  public void run() {
	    try {
	      while(!Thread.interrupted()) {
	        synchronized(this) {
	          while(restaurant.meal == null)
	            wait(); // ... for the chef to produce a meal
	        }
	        Print.print("Waitperson got " + restaurant.meal);
	        synchronized(restaurant.chef) {
	          restaurant.meal = null;
	          restaurant.chef.notifyAll(); // Ready for another
	        }
	      }
	    } catch(InterruptedException e) {
	    	Print.print("WaitPerson interrupted");
	    }
	  }
}

複製代碼

廚師類:

public class Chef implements Runnable {
	  private Restaurant restaurant;
	  private int count = 0;
	  public Chef(Restaurant r) {
		  restaurant = r;
	  }
	  public void run() {
	    try {
	      while(!Thread.interrupted()) {
	        synchronized(this) {
	          while(restaurant.meal != null)
	            wait(); // ... for the meal to be taken
	        }
	        if(++count == 10) {
	        	Print.print("Out of food, closing");
	          restaurant.exec.shutdownNow();
	        }
	        Print.printnb("Order up! ");
	        synchronized(restaurant.waitPerson) {
	          restaurant.meal = new Meal(count);
	          restaurant.waitPerson.notifyAll();
	        }
	        TimeUnit.MILLISECONDS.sleep(100);
	      }
	    } catch(InterruptedException e) {
	    	Print.print("Chef interrupted");
	    }
	  }
}

複製代碼

測試類:

public class Restaurant {
	  Meal meal;
	  ExecutorService exec = Executors.newCachedThreadPool();
	  WaitPerson waitPerson = new WaitPerson(this);
	  Chef chef = new Chef(this);
	  public Restaurant() {
	    exec.execute(chef);
	    exec.execute(waitPerson);
	  }
	  public static void main(String[] args) {
	    new Restaurant();
	  }
}

複製代碼

執行結果:

Order up! Waitperson got Meal 1
Order up! Waitperson got Meal 2
Order up! Waitperson got Meal 3
Order up! Waitperson got Meal 4
Order up! Waitperson got Meal 5
Order up! Waitperson got Meal 6
Order up! Waitperson got Meal 7
Order up! Waitperson got Meal 8
Order up! Waitperson got Meal 9
Out of food, closing
Order up! WaitPerson interrupted
Chef interrupted

複製代碼

**使用顯示的 Lock 和 Condition 對象

在 java SE5 的類庫中還有額外的顯示工具。咱們來重寫咱們的打蠟和拋光類。使用互斥並容許任務掛起的基本類是 Condition,你能夠經過在 Condition 上調用 await() 來掛起一個任務。當外部條件發生變化時,意味着某個任務應該繼續執行,你能夠經過調用 signal() 來通知這個任務,從而喚醒一個任務,或者調用 signalAll() 來喚醒全部在這個 Condition 上被掛起的任務。(signalAll() 比 notifAll() 是更安全的方式)

下面是重寫版本:

class Car {
  private Lock lock = new ReentrantLock();
  private Condition condition = lock.newCondition();
  private boolean waxOn = false;
  public void waxed() {
    lock.lock();
    try {
      waxOn = true; // Ready to buff
      condition.signalAll();
    } finally {
      lock.unlock();
    }
  }
  public void buffed() {
    lock.lock();
    try {
      waxOn = false; // Ready for another coat of wax
      condition.signalAll();
    } finally {
      lock.unlock();
    }
  }
  public void waitForWaxing() throws InterruptedException {
    lock.lock();
    try {
      while(waxOn == false)
        condition.await();
    } finally {
      lock.unlock();
    }
  }
  public void waitForBuffing() throws InterruptedException{
    lock.lock();
    try {
      while(waxOn == true)
        condition.await();
    } finally {
      lock.unlock();
    }
  }
}

class WaxOn implements Runnable {
  private Car car;
  public WaxOn(Car c) { car = c; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        printnb("Wax On! ");
        TimeUnit.MILLISECONDS.sleep(200);
        car.waxed();
        car.waitForBuffing();
      }
    } catch(InterruptedException e) {
      print("Exiting via interrupt");
    }
    print("Ending Wax On task");
  }
}

class WaxOff implements Runnable {
  private Car car;
  public WaxOff(Car c) { car = c; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        car.waitForWaxing();
        printnb("Wax Off! ");
        TimeUnit.MILLISECONDS.sleep(200);
        car.buffed();
      }
    } catch(InterruptedException e) {
      print("Exiting via interrupt");
    }
    print("Ending Wax Off task");
  }
}

public class WaxOMatic2 {
  public static void main(String[] args) throws Exception {
    Car car = new Car();
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new WaxOff(car));
    exec.execute(new WaxOn(car));
    TimeUnit.SECONDS.sleep(5);
    exec.shutdownNow();
  }
}
複製代碼

在 Car 的構造器中單個的 Lock 將產生一個 Condition 對象,這個對象被用來管理任務之間的通訊。可是這個 Condition 不包含任何有關處理狀態的信息,所以你須要額外的表示處理狀態的信息,即 Boolean waxOn。

生產者消費者與隊列

wait() 和 notifAll() 方法以一種很是低級的方式解決了任務的互操做的問題,即每次交互時都握手。許多時候咱們可使用同步隊列來解決協做的問題,同步隊列在任什麼時候刻只容許一個任務插入或移除元素。在 Java.util.concurrent.BlockingQueue 接口中提供了這個隊列,這個接口有大量的標準實現。可使用 LinkedBlockingQueue 他是一個無界隊列,還可使用 ArrayBlockingQueue,它具備固定的尺寸,能夠在它被阻塞以前向其中放置有限數量的元素。

若是消費者任務試圖從隊列中獲取對象,而該隊列爲空時,那麼這些隊列就能夠掛起這些任務,而且當有更多的元素可用時恢復這些消費任務。阻塞隊列能夠解決很是大的問題,而其方式與 wait() 和 notifyAll() 相比,則簡單切可靠。

下面是一個簡單的測試,它將多個 LiftOff 對象執行串行化。消費者 LiftOffRunner 將每一個 LiftOff 對象從 BlockIngQueue 中推出並直接運行。它經過顯示的調用 run() 而是用本身的線程來運行,而不是爲每一個任務啓動一個線程。

首先把以前寫過的 LiftOff 類貼出來:

public class LiftOff implements Runnable{
	  protected int countDown = 10; // Default
	  private static int taskCount = 0;
	  private final int id = taskCount++;
	  public LiftOff() {}
	  public LiftOff(int countDown) {
	    this.countDown = countDown;
	  }
	  public String status() {
	    return "#" + id + "(" +
	      (countDown > 0 ? countDown : "Liftoff!") + "), ";
	  }
	  public void run() {
	    while(countDown-- > 0) {
	      System.out.print(status());
	      Thread.yield();
	    }
	  }

}

複製代碼

LiftOffRunner 類:

public class LiftOffRunner implements Runnable{
	private BlockingQueue<LiftOff> rockets;

	protected LiftOffRunner(BlockingQueue<LiftOff> rockets) {
		super();
		this.rockets = rockets;
	}
    public void add(LiftOff lo) {
		try {
			rockets.put(lo);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("添加失敗");
		}
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			while (!Thread.interrupted()) {
				LiftOff rocket = rockets.take();
				rocket.run();
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("運行中斷");
		}
		System.out.println("退出運行");
	}

}

複製代碼

最後是測試類:

public class TestBlockingQueues {
	static void getkey(){
		try {
			new BufferedReader(new InputStreamReader(System.in)).readLine();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	static void getkey(String message) {
	    Print.print(message);
	    getkey();
	  }

	static void test(String msg,BlockingQueue<LiftOff> queue){
		LiftOffRunner runner = new LiftOffRunner(queue);
		Thread thread = new Thread(runner);
		thread.start();
		//啓動了,可是內容是空的,就一直掛起,等待有新的內容進去
		for (int i = 0; i < 5; i++) {
			runner.add(new LiftOff(5));
		}
		getkey("Press Enter "+ msg);
		thread.interrupt();

	}

	public static void main(String[] args) {
		test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());
		test("ArrayBlockingQueue", new ArrayBlockingQueue<>(3));
		test("SynchronousQueue", new SynchronousQueue<>());
	}
}

複製代碼

吐司 BlockingQueue

下面是一個示例,每一臺機器都有三個任務:一個只作吐司、一個給吐司抹黃油、另外一個在塗抹黃油的吐司上抹果醬。咱們來示例若是使用 BlockIngQueue 來運行這個示例:

class Toast {
  public enum Status { DRY, BUTTERED, JAMMED }
  private Status status = Status.DRY;
  private final int id;
  public Toast(int idn) { id = idn; }
  public void butter() { status = Status.BUTTERED; }
  public void jam() { status = Status.JAMMED; }
  public Status getStatus() { return status; }
  public int getId() { return id; }
  public String toString() {
    return "Toast " + id + ": " + status;
  }
}

class ToastQueue extends LinkedBlockingQueue<Toast> {}

class Toaster implements Runnable {
  private ToastQueue toastQueue;
  private int count = 0;
  private Random rand = new Random(47);
  public Toaster(ToastQueue tq) { toastQueue = tq; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        TimeUnit.MILLISECONDS.sleep(
          100 + rand.nextInt(500));
        // Make toast
        Toast t = new Toast(count++);
        print(t);
        // Insert into queue
        toastQueue.put(t);
      }
    } catch(InterruptedException e) {
      print("Toaster interrupted");
    }
    print("Toaster off");
  }
}

// Apply butter to toast:
class Butterer implements Runnable {
  private ToastQueue dryQueue, butteredQueue;
  public Butterer(ToastQueue dry, ToastQueue buttered) {
    dryQueue = dry;
    butteredQueue = buttered;
  }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        // Blocks until next piece of toast is available:
        Toast t = dryQueue.take();
        t.butter();
        print(t);
        butteredQueue.put(t);
      }
    } catch(InterruptedException e) {
      print("Butterer interrupted");
    }
    print("Butterer off");
  }
}

// Apply jam to buttered toast:
class Jammer implements Runnable {
  private ToastQueue butteredQueue, finishedQueue;
  public Jammer(ToastQueue buttered, ToastQueue finished) {
    butteredQueue = buttered;
    finishedQueue = finished;
  }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        // Blocks until next piece of toast is available:
        Toast t = butteredQueue.take();
        t.jam();
        print(t);
        finishedQueue.put(t);
      }
    } catch(InterruptedException e) {
      print("Jammer interrupted");
    }
    print("Jammer off");
  }
}

// Consume the toast:
class Eater implements Runnable {
  private ToastQueue finishedQueue;
  private int counter = 0;
  public Eater(ToastQueue finished) {
    finishedQueue = finished;
  }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        // Blocks until next piece of toast is available:
        Toast t = finishedQueue.take();
        // Verify that the toast is coming in order,
        // and that all pieces are getting jammed:
        if(t.getId() != counter++ ||
           t.getStatus() != Toast.Status.JAMMED) {
          print(">>>> Error: " + t);
          System.exit(1);
        } else
          print("Chomp! " + t);
      }
    } catch(InterruptedException e) {
      print("Eater interrupted");
    }
    print("Eater off");
  }
}

public class ToastOMatic {
  public static void main(String[] args) throws Exception {
    ToastQueue dryQueue = new ToastQueue(),
               butteredQueue = new ToastQueue(),
               finishedQueue = new ToastQueue();
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new Toaster(dryQueue));
    exec.execute(new Butterer(dryQueue, butteredQueue));
    exec.execute(new Jammer(butteredQueue, finishedQueue));
    exec.execute(new Eater(finishedQueue));
    TimeUnit.SECONDS.sleep(5);
    exec.shutdownNow();
  }
}
複製代碼

這個示例中沒有任何顯示的同步,由於同步隊列和系統的設計隱式的管理了每片 Toast 在任什麼時候刻都只有一個任務在操做。由於隊列的阻塞,使得處理過程將被自動掛起和恢復。

個人博客

個人微信公號:Android開發吹牛皮

掃碼關注可免費得到全套的 Java 編程思想筆記

相關文章
相關標籤/搜索