後篇:
JAVA多線程-基礎Lock Condition 併發集合
JAVA多線程-交互計算 Future Callable Promise java
讀懂代碼,首先要懂得thread的幾個狀態,以及它們之間的轉換. Java thread的狀態有new, runnable, sleep, blocked,wait, interrupt, dead. 多線程
t = new Thread() -> new
t.start() -> runnable
Synchronized(this) or lock.lock() -> blocked
wait() -> waiting
notify() ->runnable
sleep() -> sleep
t.interrupt() -> interrupt 併發
wait()和notify()必須在一個lock片斷或者synchronized片斷才能夠使用. Interrupt exception只有當thread在sleep或者waiting的時候纔會拋出. app
瞭解以上概念,請看下面代碼. dom
這段代碼雖爲多線程,但沒有使用併發集合,也沒有使用Lock和Condition.所有使用Synchronized來完成的. 這段代碼能夠看出,若是不使用併發集合,也不使用lock和condition,code書寫起來要麻煩不少.因此,儘可能使用併發集合.
代碼的邏輯:
1)SProducer不停的產生number到queue中.
2)3個carrier不停的取出queue中的number.
3)若是queue中存在10個剩餘number時,SProducer會停下來等Carrier消費一些後再生產.
4)若是Carrier發現queue中沒有number時,會等待.
5)若是Carrier取出的數字末尾爲4, 則會挑起罷工事件.
6)Carrier罷工會引起一個Negotiation線程進行談判.
7)罷工階段SProducer和全部Carrier均停工.
8)Negotiation若是發現取出的number首位爲3或者7,將引起談判失敗.
9)若是談判成功,則恢復工做,若是談判失敗,則破產,全部線程均退出. fetch
/* * This program does not use Concurrent collections, nor Lock or Condition. It only uses Synchronized. * What does this program do? The logic is: * 1) There are 1 SProducer and 3 Carriers. Producer produces numbers into a queue. * Carriers fetch numbers from queue. * 2) Producers will have a relax when size of the queue exceeds 10. * 3) Carrier will wait when queue is empty. * 4) If Carrier fetches a number ends with 4, Carrier will start a strike. * 5) The strike will initiate a Negotiation. * 6) During Negotiation, Producer and Carriers won't work. * 7) The Negotiation result depends on the Number fetched which leads to strike. If the number begins * with 3 or 7, Negotiation fails. Otherwise, Negotiation succeeds. * 8) Producer and Carriers go back to work if result of Negotiation is successful. * Otherwise, Process stops. */ package concurrency; import java.util.ArrayList; public class SProducer extends Thread { // Note: ArrayList is not thread-safe collection. private final static ArrayList<String> numbers = new ArrayList<String>(); private final static ArrayList<Thread> threads = new ArrayList<Thread>(); private volatile boolean negotiating = false; private class Negotiation implements Runnable { private String number; private Negotiation(String number) { this.number = number; } public void run() { try { System.out.println("Start negotiation..."); sleep(5000); if (number.startsWith("7") || number.startsWith("3")) { System.out.println("Negotiation broken."); for (Thread t : threads) { t.interrupt(); } return; } System.out.println("Negotiation succeeds."); } catch (InterruptedException e) { System.out.println("Middle man is killed."); } synchronized(Negotiation.class) { negotiating = false; Negotiation.class.notifyAll(); } } } private class Carrier implements Runnable { private String name; private Carrier(String name) { this.name = name; } private boolean negotiating(boolean atBeginning) { synchronized (Negotiation.class) { while (negotiating) { try { System.out.println("Carrier ["+name+"] join stricks."); Negotiation.class.wait(); } catch (InterruptedException e) { System.out.println("Negotiation fails. Carrier [" + name + "] retires."); return false; } } } return true; } public void run() { while(true) { if(negotiating && !this.negotiating(true))return; synchronized(numbers) { while (numbers.size() == 0) { try { System.out.println("List empty. Carrier [" + name + "] waiting."); numbers.wait(); } catch (InterruptedException e) { System.out.println("Negotiation fails. Carrier [" + name + "] retires."); return; } } } String number; synchronized(numbers) { number = numbers.remove(0); System.out.println("Carrier [" + name + "] carries "+ number +" out of List;"); numbers.notifyAll(); } if (number.endsWith("4")) { synchronized (Negotiation.class) { if(negotiating && !this.negotiating(false))return; try { sleep(1000); } catch (Exception e){ } negotiating = true; System.out.println("Stricks happen on number:"+number); new Thread(new Negotiation(number)).start(); if(!this.negotiating(true))return; } } } } } public void run() { Thread a = new Thread(new Carrier("a")); Thread b = new Thread(new Carrier("b")); Thread c = new Thread(new Carrier("c")); threads.add(this); threads.add(a); threads.add(b); threads.add(c); a.start(); b.start(); c.start(); this.produceNumbers(); } private void produceNumbers() { while (true) { synchronized (Negotiation.class) { while (negotiating) { try { System.out.println("Stricking... Producer waiting for negotiation result."); Negotiation.class.wait(); System.out.println("Negotiation succeeds. Producer happy."); } catch (InterruptedException e) { System.out.println("Negotiation fails. Producer breaks up."); return; } } } String number = ""+new java.util.Random().nextInt(47); // Need a lock for non-thread-safe list. synchronized(numbers) { numbers.add(number); System.out.println("Produce number " + number + " into List;"); numbers.notifyAll(); } synchronized(numbers) { while (numbers.size() > 10) { try { numbers.wait(); } catch (InterruptedException e) { System.out.println("Negotiation fails. Producer breaks up."); return; } } } } } public static void main(String[] args) { new SProducer().start(); } }