併發編程使人困惑的一個主要緣由:使用併發時須要解決的問題有多個,而實現併發的方法也有多種,而且在這二者之間沒有明顯的映射關係。java
速度問題初聽起來很簡單:若是你須要一個程序運行得更快,那麼能夠將起斷開爲多個片斷,在單個處理器上運行每一個片斷。
併發一般是提升運行在單個處理器上的程序的性能,但在單個處理器上運行的併發程序開銷確實應該比該程序全部部分都順序執行開銷大,由於其中增長了所謂的上下文切換的代價。
若是沒有任務會阻塞,那麼在單處理器上使用併發就沒有任何意義。
在單處理器系統中的性能提升常見示例是事件驅動的編程。
Java採起的是在順序語言的基礎上提供對線程的支持。與在多任務操做系統中分叉進程不一樣,線程機制是在由執行程序表示的單一進程中建立任務。程序員
協做多線程:Java的線程機制是搶佔式的,這表示調度機制週期性的中斷線程,將上下文切換到另外一個線程,從而爲每一個線程都提供時間片,使得每一個線程都會分配到數量合理得時間去驅動它得任務。在協做式系統中,每一個任務都會自動得放棄控制,這要求程序員要有意識得插入某種類型得讓步語句。協做式系統得優點是雙重得:上下文切換的開銷一般比搶佔式要少得多,而且對能夠同時執行的線程數量在理論上沒有任何限制。npm
經過使用多線程機制,這些獨立任務中的每個將由執行線程來驅動,一個線程就是在進程中的一個單一順序控制流,當個進程能夠擁有多個併發執行的任務。編程
線程能夠驅動任務,所以你須要一種描述任務的方式,這能夠由Runnable接口來提供。要想定義任務,只需實現Runnable接口並編寫run(0方法,使得該任務能夠執行你的命令。設計模式
public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } } ///:~
Thread.yield()的調用是對線程調度器的之後在哪一個建議,它聲明:我已經執行完生命週期中最重要的部分了,此刻正是切換給其餘任務執行一段時間的時機了。安全
public class MainThread { public static void main(String[] args) throws InterruptedException { LiftOff launch = new LiftOff(); launch.run(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), *///:~
當從Runnable導出一個類時,它必須具備run()方法,可是這個方法並沒有特殊之處——它不會產生內在的線程能力。要實現線程行爲,你必須顯示的將一個任務附着在線程上。多線程
將Runnable對象轉變爲工做任務的傳統方式是把它提交給一個Thread構造器:併發
public class BasicThreads { public static void main(String[] args) { Thread t = new Thread(new LiftOff()); t.start(); System.out.println("Waiting for LiftOff"); } } /* Output: (90% match) Waiting for LiftOff #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), *///:~
能夠添加更多的線程去驅動更多的任務。app
public class MoreBasicThreads { public static void main(String[] args) { for(int i = 0; i < 5; i++) new Thread(new LiftOff()).start(); System.out.println("Waiting for LiftOff"); } } /* Output: (Sample) Waiting for LiftOff #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
當main()建立Thread對象時,它並無捕獲任何對這些對象的引用。每一個Thread都註冊了它本身,所以確實有一個對它的引用,並且在它的任務推出其run()並死亡以前,垃圾回收期沒法清除它。dom
Java SE5的jav.util.concurrent包中的執行器(Executor)將爲你管理Thread對象,從而簡化了併發編程。Executor在客戶端和任務執行之間提供了一個間接層;與客戶端直接執行任務不一樣,這個中介對象將執行任務。Executor容許你管理異步任務的執行,而無須顯示的管理線程的聲明週期。
咱們可使用Executor來代替Thread對象。
import java.util.concurrent.*; public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: (Sample) #0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
單個的Executor被用來建立和管理系統中全部任務。
對shutdown()方法的調用能夠防止新任務被提交給這個Executor,當前線程將繼續運行在shutdown()被調用以前提交全部任務。
FixedThreadPool使用了有限的線程集來執行所提交的任務:
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), *///:~
有了FixedThreadPool,就能夠一次性預先執行代價高昂的線程分配,於是也就能夠限制線程的數量了。
CachedThreadPool在程序執行過程當中一般會建立於所徐數量相同的線程,而後再它回收舊線程時中止建立新的線程,所以它是合理的Executor首選。只有當這種方式會引起問題時,才須要切換到FixedThreadPool。
SingleThreadExecutor就像是線程數量爲1的FixedThreadPool。
SingleThreadExecutor會序列化全部提交給它的任務,並會維護它本身的懸掛任務隊列。
import java.util.concurrent.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); for(int i = 0; i < 5; i++) exec.execute(new LiftOff()); exec.shutdown(); } } /* Output: #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!), *///:~
Runnable是執行工做的獨立任務,可是它不返回任何值。若是你但願任務再完成時可以返回一個值,能夠實現Callable接口而不是Runnable接口。
//: concurrency/CallableDemo.java import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for(int i = 0; i < 10; i++) results.add(exec.submit(new TaskWithResult(i)));//將產生Future對象 for(Future<String> fs : results) try { // get() blocks until completion: System.out.println(fs.get()); } catch(InterruptedException e) { System.out.println(e); return; } catch(ExecutionException e) { System.out.println(e); } finally { exec.shutdown(); } } } /* Output: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 *///:~
還可使用isDone判斷是否執行完成,若是不調用isDone,那個若是沒有完成,get會被阻塞。
影響任務行爲的一種簡單方式是調用sleep(),這將使任務停止執行給定的時間。
//: concurrency/SleepingTask.java // Calling sleep() to pause for a while. import java.util.concurrent.*; public class SleepingTask extends LiftOff { public void run() { try { while(countDown-- > 0) { System.out.print(status()); // Old-style: Thread.sleep(100); // Java SE5/6-style: //TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { System.err.println("Interrupted"); } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new SleepingTask()); exec.shutdown(); } } /* Output: #0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!), *///:~
異常不能跨線程傳播回main(),因此你必須在本地處理全部在任務內部產生的異常。
線程的優先級將該線程的重要性傳遞給調度器。
優先級較低的線程僅僅是執行的頻率較低。
在對大多數時間裏,全部線程都應該以默認的優先級運行,試圖操做線程的優先級一般是一種錯誤。
//: concurrency/SimplePriorities.java // Shows the use of thread priorities. import java.util.concurrent.*; public class SimplePriorities implements Runnable { private int countDown = 5; private volatile double d; // No optimization private int priority; public SimplePriorities(int priority) { this.priority = priority; } public String toString() { return Thread.currentThread() + ": " + countDown; } public void run() { Thread.currentThread().setPriority(priority);//設置當前線程優先級,使用getPriority獲取當前優先級 while(true) { // An expensive, interruptable operation: for(int i = 1; i < 100000; i++) { d += (Math.PI + Math.E) / (double)i; if(i % 1000 == 0) Thread.yield(); } System.out.println(this); if(--countDown == 0) return; } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute( new SimplePriorities(Thread.MIN_PRIORITY)); exec.execute( new SimplePriorities(Thread.MAX_PRIORITY)); exec.shutdown(); } } /* Output: (70% match) Thread[pool-1-thread-6,10,main]: 5 Thread[pool-1-thread-6,10,main]: 4 Thread[pool-1-thread-6,10,main]: 3 Thread[pool-1-thread-6,10,main]: 2 Thread[pool-1-thread-6,10,main]: 1 Thread[pool-1-thread-3,1,main]: 5 Thread[pool-1-thread-2,1,main]: 5 Thread[pool-1-thread-1,1,main]: 5 Thread[pool-1-thread-5,1,main]: 5 Thread[pool-1-thread-4,1,main]: 5 ... *///:~
當調用yieId()時,你也是在建議具備相同優先級的其餘線程能夠運行。
大致上,對於任何重要的控制或在調用整個應用時,都不能依賴yieId(),實際上,yieId()常常被誤用。
所謂後臺線程,是指在程序運行的時候在後臺提供一種通用服務的線程,而且這種線程並不屬於程序中不可或缺的部分。當全部非後臺線程結束時,程序也就終止了,同時會殺死進程中的全部後臺線程。
//: concurrency/SimpleDaemons.java // Daemon threads don't prevent the program from ending. import java.util.concurrent.*; import static net.mindview.util.Print.*; public class SimpleDaemons implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("sleep() interrupted"); } } public static void main(String[] args) throws Exception { for(int i = 0; i < 10; i++) { Thread daemon = new Thread(new SimpleDaemons()); daemon.setDaemon(true); // 必須在線程被調用以前設置setDaemon daemon.start(); } print("All daemons started"); TimeUnit.MILLISECONDS.sleep(175); } } /* Output: (Sample) All daemons started Thread[Thread-0,5,main] SimpleDaemons@530daa Thread[Thread-1,5,main] SimpleDaemons@a62fc3 Thread[Thread-2,5,main] SimpleDaemons@89ae9e Thread[Thread-3,5,main] SimpleDaemons@1270b73 Thread[Thread-4,5,main] SimpleDaemons@60aeb0 Thread[Thread-5,5,main] SimpleDaemons@16caf43 Thread[Thread-6,5,main] SimpleDaemons@66848c Thread[Thread-7,5,main] SimpleDaemons@8813f2 Thread[Thread-8,5,main] SimpleDaemons@1d58aae Thread[Thread-9,5,main] SimpleDaemons@83cc67 ... *///:~
必須在線程啓動以前調用setDaemom()方法,才能把它設置爲後臺線程。
經過編寫定製的ThreadFactory能夠定製由Executor建立的線程的屬性:
package net.mindview.util; import java.util.concurrent.ThreadFactory; public class DaemonThreadFactory implements ThreadFactory { public DaemonThreadFactory() { } public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }
//: concurrency/DaemonFromFactory.java // Using a Thread Factory to create daemons. import java.util.concurrent.*; import net.mindview.util.*; import static net.mindview.util.Print.*; public class DaemonFromFactory implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("Interrupted"); } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool( new DaemonThreadFactory()); for(int i = 0; i < 10; i++) exec.execute(new DaemonFromFactory()); print("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); // Run for a while } } /* (Execute to see output) *///:~
能夠經過調用isDaemon()方法來肯定線程是不是一個後臺線程。若是是一個後臺線程,那麼它建立的任何線程將被自動設置成後臺線程:
// Using a Thread Factory to create daemons. import java.util.concurrent.*; import net.mindview.util.*; import static net.mindview.util.Print.*; public class DaemonFromFactory implements Runnable { public void run() { try { while(true) { TimeUnit.MILLISECONDS.sleep(100); print(Thread.currentThread() + " " + this); } } catch(InterruptedException e) { print("Interrupted"); } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool( new DaemonThreadFactory()); for(int i = 0; i < 10; i++) exec.execute(new DaemonFromFactory()); print("All daemons started"); TimeUnit.MILLISECONDS.sleep(500); // Run for a while } } /* (Execute to see output) *///:~
後臺進程在不執行finaiiy子句的狀況下就會終止其run()方法:
//: concurrency/DaemonsDontRunFinally.java // Daemon threads don't run the finally clause import java.util.concurrent.*; import static net.mindview.util.Print.*; class ADaemon implements Runnable { public void run() { try { print("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { print("Exiting via InterruptedException"); } finally { print("This should always run?"); } } } public class DaemonsDontRunFinally { public static void main(String[] args) throws Exception { Thread t = new Thread(new ADaemon()); t.setDaemon(true); t.start(); } } /* Output: Starting ADaemon *///:~
若是你註釋調對setDaemon()的調用,就會看到finally子句將會執行。
當最後一個非後臺線程終止時,後臺線程會忽然終止。所以一旦main()退出,JVM就會當即關閉全部後臺線程。由於不能以優雅的方式來關閉後臺線程,因此它們幾乎不是一種好的思想。非後臺的Executor一般是一種更好的方法,它控制的全部任務均可以同時被關閉,關閉將以有序的方式執行。
使用直接從Thread繼承這種可替代的方式:
//: concurrency/SimpleThread.java // Inheriting directly from the Thread class. public class SimpleThread extends Thread { private int countDown = 5; private static int threadCount = 0; public SimpleThread() { // Store the thread name: super(Integer.toString(++threadCount)); start(); } public String toString() { return "#" + getName() + "(" + countDown + "), "; } public void run() { while(true) { System.out.print(this); if(--countDown == 0) return; } } public static void main(String[] args) { for(int i = 0; i < 5; i++) new SimpleThread(); } } /* Output: #1(5), #1(4), #1(3), #1(2), #1(1), #2(5), #2(4), #2(3), #2(2), #2(1), #3(5), #3(4), #3(3), #3(2), #3(1), #4(5), #4(4), #4(3), #4(2), #4(1), #5(5), #5(4), #5(3), #5(2), #5(1), *///:~
經過調用適當的Thread構造器爲Thread對象賦予具體的名稱,這個名稱能夠經過使用GetName()的toString()中得到。
慣用法是自管理的Runnable:
public class SelfManaged implements Runnable { private int countDown = 5; private Thread t = new Thread(this);//傳入當前對象 public SelfManaged() { t.start(); } public String toString() { return Thread.currentThread().getName() + "(" + countDown + "), "; } public void run() { while(true) { System.out.print(this); if(--countDown == 0) return; } } public static void main(String[] args) { for(int i = 0; i < 5; i++) new SelfManaged(); } } /* Output: Thread-0(5), Thread-0(4), Thread-0(3), Thread-0(2), Thread-0(1), Thread-1(5), Thread-1(4), Thread-1(3), Thread-1(2), Thread-1(1), Thread-2(5), Thread-2(4), Thread-2(3), Thread-2(2), Thread-2(1), Thread-3(5), Thread-3(4), Thread-3(3), Thread-3(2), Thread-3(1), Thread-4(5), Thread-4(4), Thread-4(3), Thread-4(2), Thread-4(1), *///:~
這裏實現接口使得你能夠繼承另外一個不一樣的類。
經過使用內部類來將線程代碼隱藏在類中:
//: concurrency/ThreadVariations.java // Creating threads with inner classes. import java.util.concurrent.*; import static net.mindview.util.Print.*; // Using a named inner class: class InnerThread1 {//建立一個擴展自Thread的匿名內部類 private int countDown = 5; private Inner inner; private class Inner extends Thread { Inner(String name) { super(name); start(); } public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("interrupted"); } } public String toString() { return getName() + ": " + countDown; } } public InnerThread1(String name) {//建立這個內部類的實例 inner = new Inner(name); } } // Using an anonymous inner class: class InnerThread2 { private int countDown = 5; private Thread t; public InnerThread2(String name) {//可替換方式:在構造器中建立了一個匿名的Thread子類,而且將其向上轉型爲Thread引用t。 t = new Thread(name) { public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return getName() + ": " + countDown; } }; t.start(); } } // Using a named Runnable implementation: class InnerRunnable1 { private int countDown = 5; private Inner inner; private class Inner implements Runnable { Thread t; Inner(String name) { t = new Thread(this, name); t.start(); } public void run() { try { while (true) { print(this); if (--countDown == 0) return; TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return t.getName() + ": " + countDown; } } public InnerRunnable1(String name) { inner = new Inner(name); } } // Using an anonymous Runnable implementation: class InnerRunnable2 { private int countDown = 5; private Thread t; public InnerRunnable2(String name) { t = new Thread(new Runnable() { public void run() { try { while (true) { print(this); if (--countDown == 0) return; TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return Thread.currentThread().getName() + ": " + countDown; } }, name); t.start(); } } // A separate method to run some code as a task: class ThreadMethod {//在方法內部建立線程 private int countDown = 5; private Thread t; private String name; public ThreadMethod(String name) { this.name = name; } public void runTask() { if (t == null) { t = new Thread(name) { public void run() { try { while (true) { print(this); if (--countDown == 0) return; sleep(10); } } catch (InterruptedException e) { print("sleep() interrupted"); } } public String toString() { return getName() + ": " + countDown; } }; t.start(); } } } public class ThreadVariations { public static void main(String[] args) { new InnerThread1("InnerThread1"); new InnerThread2("InnerThread2"); new InnerRunnable1("InnerRunnable1"); new InnerRunnable2("InnerRunnable2"); new ThreadMethod("ThreadMethod").runTask(); } } /* (Execute to see output) *///:~
你對Thread類實際沒有任何控制權。你建立任務,並經過某種方式將一個線程附着到任務上,以使得這個線程能夠驅動任務。
Java的線程機制基於來自C的低級的p線程方式,這是一種你必須深刻研究,而且須要徹底理解其全部細節的方式。
一個線程能夠在其餘線程上調用join()方法,其效果是等待一段時間知道第二個線程結束才繼續執行。
若是某個線程在另外一個線程t上調用t.join(),此線程將被掛起,知道目標線程t結束才恢復。
也能夠調用join()時帶上一個超時參數,這樣若是目標線程在這段時間到期時尚未結束的話,join()方式總能返回。
對join()方法的調用能夠被中斷,作法時在調用線程上調用interrupt()方法。
//: concurrency/Joining.java // Understanding join(). import static net.mindview.util.Print.*; class Sleeper extends Thread { private int duration; public Sleeper(String name, int sleepTime) { super(name); duration = sleepTime; start(); System.out.println(name); } public void run() { try { sleep(duration); } catch(InterruptedException e) { print(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted()); return; } print(getName() + " has awakened"); } } class Joiner extends Thread { private Sleeper sleeper; public Joiner(String name, Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); System.out.println(name); } public void run() { try { sleeper.join(); } catch(InterruptedException e) { print("Interrupted"); } print(getName() + " join completed"); } } public class Joining { public static void main(String[] args) { Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy); grumpy.interrupt(); } } /* Output: Grumpy was interrupted. isInterrupted(): false Doc join completed Sleepy has awakened Dopey join completed *///:~
Joiner線程將經過在Sleeper對象上調用join()方法來等待Sleeper醒來。在main()裏面,每一個Sleeper都有一個Joiner,這個能夠在輸出中發現,若是Sleeper被中斷或者是正常結束,Joiner將和Sleeper一同結束。
使用線程的動機之一就是創建有響應的用戶界面:
//: concurrency/ResponsiveUI.java // User interface responsiveness. // {RunByHand} class UnresponsiveUI { private volatile double d = 1; public UnresponsiveUI() throws Exception { while(d > 0) d = d + (Math.PI + Math.E) / d; System.in.read(); // Never gets here } } public class ResponsiveUI extends Thread { private static volatile double d = 1; public ResponsiveUI() { setDaemon(true); start(); } public void run() { while(true) { d = d + (Math.PI + Math.E) / d; } } public static void main(String[] args) throws Exception { //new UnresponsiveUI(); // Must kill this process new ResponsiveUI();//做爲後臺運行的同時,還在等待用戶的輸入 System.in.read(); System.out.println("aaaaaa"); System.out.println(d); // Shows progress } } ///:~
線程組持有一個線程集合
因爲線程的本質特性,使得你不能捕獲從線程中逃逸的異常。一旦異常逃出任務的run()方法,它就會向外傳播到控制檯,除非你採起特殊的步驟捕獲這種錯誤的異常。
下面的程序老是會拋出異常:
//: concurrency/ExceptionThread.java // {ThrowsException} import java.util.concurrent.*; public class ExceptionThread implements Runnable { public void run() { throw new RuntimeException(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } } ///:~
在main中放入try carth並不能抓住異常:
import java.util.concurrent.*; public class NaiveExceptionHandling { public static void main(String[] args) { try { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } catch(RuntimeException ue) { // This statement will NOT execute! System.out.println("Exception has been handled!"); } } } ///:~
咱們須要修改Executor產生線程的方式。Thread.UncaughtExceptionHandler是Java SE5中的新接口,它容許在每一個Thread對象上都附着一個異常處理器。Thread.UncaughtExceptionHandler.uncaughtException()會在線程由於捕獲的異常而臨近死亡時被調用,爲了使用它,咱們建立一個新類型ThreadFactory,它將在每一個新建立的Thread對象上附着一個Thread.UncaughtExceptionHandler:
//: concurrency/CaptureUncaughtException.java import java.util.concurrent.*; class ExceptionThread2 implements Runnable { public void run() { Thread t = Thread.currentThread(); System.out.println("run() by " + t); System.out.println( "eh = " + t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { System.out.println("caught " + e); } } class HandlerThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); System.out.println("created " + t); t.setUncaughtExceptionHandler( new MyUncaughtExceptionHandler()); System.out.println( "eh = " + t.getUncaughtExceptionHandler()); return t; } } public class CaptureUncaughtException { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool( new HandlerThreadFactory()); exec.execute(new ExceptionThread2()); } } /* Output: (90% match) HandlerThreadFactory@de6ced creating new Thread created Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 run() by Thread[Thread-0,5,main] eh = MyUncaughtExceptionHandler@1fb8ee3 caught java.lang.RuntimeException *///:~
在Thread類中設置一個靜態域,並將這個處理器設置爲默認的爲捕獲異常處理器:
import java.util.concurrent.*; public class SettingDefaultHandler { public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler( new MyUncaughtExceptionHandler()); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } } /* Output: caught java.lang.RuntimeException *///:~
下面的任務產生一個偶數,而其餘任何消費這些數字。消費者任何惟一工做就是檢查偶數的有效性。
public abstract class IntGenerator { private volatile boolean canceled = false; public abstract int next(); // Allow this to be canceled: public void cancel() { canceled = true; }//修改canceled標識 public boolean isCanceled() { return canceled; }//查看該對象是否被取消 } ///:~
import java.util.concurrent.*; public class EvenChecker implements Runnable {//消費者任務 private IntGenerator generator; private final int id; public EvenChecker(IntGenerator g, int ident) { generator = g; id = ident; } public void run() { while(!generator.isCanceled()) { int val = generator.next(); if(val % 2 != 0) {//程序將檢查是不是偶數,若是是奇數,那麼就是另外一個線程尚未執行完next()就調用了檢查判斷。 System.out.println(val + " not even!"); generator.cancel(); // Cancels all EvenCheckers } } } // Test any type of IntGenerator: public static void test(IntGenerator gp, int count) { System.out.println("Press Control-C to exit"); ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < count; i++) exec.execute(new EvenChecker(gp, i)); exec.shutdown(); } // Default value for count: public static void test(IntGenerator gp) { test(gp, 10); } } ///:~
共享公共資源的任務能夠觀察該資源的終止信號。這能夠消除所謂競爭條件,即兩個或更多的任務競爭響應某個條件,所以產生的衝突或以一致結果:
public class EvenGenerator extends IntGenerator { private volatile int currentEvenValue = 0; public int next() {//一個任務可能在另外一個任務執行第一個對currentEvenValue遞增操做以後,但沒有執行第二個操做以前,調用next()方法。 ++currentEvenValue; // Danger point here! ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new EvenGenerator()); } } /* Output: (Sample) Press Control-C to exit 89476993 not even! 89476993 not even! *///:~
遞增也不是原子操做,因此,必需要保護任務。
使用線程時的一個基本問題:你永遠都不知道一個線程什麼時候在運行。
對於併發,你須要某種方式來防止兩個任務訪問相同的資源。
防止這種衝突的方法就是當資源被一個任務使用時,在其上加鎖。
基本全部併發模式在解決線程衝突問題的時候,都是採用序列化訪問共享資源的方案。這意味着在給定時刻只容許一個任務訪問共享資源。一般這是經過在代碼前面加上一條鎖語句來實現的,這就使得在一段時間內只有一個任務能夠運行這段代碼。由於鎖語句產生了一種互相排斥的效果,全部這種機制被稱爲互斥量。
Java提供關鍵字synchronized的形式,爲防止資源衝突提供了內置支持。當任務要執行被synchronized關鍵字保護的代碼片斷的時候,它將檢查鎖是否可用,而後獲取鎖,執行代碼,釋放鎖。
要控制對共享資源的訪問,得先把它包裝進一個對象,而後把全部要訪問這個資源的方法標記爲synchronized。
synchronized void f(){}
全部對象都自動含有單一的鎖(也稱爲監視器)。當對象上調用其任意synchronized方法的時候,此對象都被加鎖。
在使用併發時,將域設置爲private是很是重要的,不然,synchronized關鍵字就不能防止其餘任務直接訪問域,這樣就會產生衝突。
一個任務能夠獲取多個鎖。
JVM負責跟蹤對象被加鎖的次數。若是一個對象被解鎖,其計數變爲0。在任務第一個給對象加鎖的時候,計數變爲1.每當這個相同的任務在這個對象上獲取鎖,計數都會遞增。只有首先得到鎖的任務才能容許繼續獲取多個鎖。每當任務離開一個synchronized方法,計數遞減,當計數爲0的時候,鎖被徹底釋放,此時別的任務就可使用此資源。
針對每一個類,也有一個鎖,全部synchronized static方法能夠在類的範圍內防止對static數據的併發訪問。
你應該在何時同步,能夠運用Brian的同步規則:
若是你在寫一個變量,它接下來將被另外一個線程讀取,或者正在讀取一個上一次已經被另外一個線程寫過的變量,那麼你必須使用同步,而且,讀寫線程都必須使用相同的監視器同步。
每一個訪問臨界共享資源的方法都必須被同步,不然它們就不會正確的工做。
public class SynchronizedEvenGenerator extends IntGenerator { private int currentEvenValue = 0; public synchronized int next() { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } public static void main(String[] args) { EvenChecker.test(new SynchronizedEvenGenerator()); } } ///:~
第一個進入next()的任務將得到鎖,任何試圖獲取鎖的任務都將從其開始嘗試之時被組賽,直到第一個任務釋放鎖。經過這種方式,任什麼時候刻只有一個任務能夠經過由互斥量看護的代碼。
Lock對象必須被顯示的建立,鎖定和釋放。
對於解決某些類型的問題來講,它更靈活。
import java.util.concurrent.locks.*; public class MutexEvenGenerator extends IntGenerator { private int currentEvenValue = 0; private Lock lock = new ReentrantLock(); public int next() { lock.lock(); try { ++currentEvenValue; Thread.yield(); // Cause failure faster ++currentEvenValue; return currentEvenValue; } finally { lock.unlock(); } } public static void main(String[] args) { EvenChecker.test(new MutexEvenGenerator()); } } ///:~
添加一個被互斥調用的鎖,並使用lock和unlock方法在next()內建立臨界資源
當你使用synchronized關鍵字時,須要寫的代碼量更少,而且用戶錯誤出現的可能性也會下降,所以一般只有在解決特殊問題時,才能顯示使用Lock對象。例如:使用synchronized關鍵字不能嘗試獲取鎖且最終獲取鎖會失敗,或者嘗試着獲取鎖一段時間,而後放棄它,要實現這些,你必須使用concurrent類庫:
//: concurrency/AttemptLocking.java // Locks in the concurrent library allow you // to give up on trying to acquire a lock. import java.util.concurrent.*; import java.util.concurrent.locks.*; public class AttemptLocking { private ReentrantLock lock = new ReentrantLock();//ReentrantLock可讓你嘗試獲取鎖,但最終沒有獲取到鎖 public void untimed() { boolean captured = lock.tryLock(); try { System.out.println("tryLock(): " + captured); } finally { if(captured) lock.unlock(); } } public void timed() { boolean captured = false; try { captured = lock.tryLock(2, TimeUnit.SECONDS);//嘗試獲取鎖,在2秒後失敗 } catch(InterruptedException e) { throw new RuntimeException(e); } try { System.out.println("tryLock(2, TimeUnit.SECONDS): " + captured); } finally { if(captured) lock.unlock(); } } public static void main(String[] args) { final AttemptLocking al = new AttemptLocking(); al.untimed(); // True -- lock is available al.timed(); // True -- lock is available // Now create a separate task to grab the lock: new Thread() { { setDaemon(true); } public void run() { al.lock.lock(); System.out.println("acquired"); } }.start(); Thread.yield(); // Give the 2nd task a chance al.untimed(); // False -- lock grabbed by task al.timed(); // False -- lock grabbed by task } } /* Output: tryLock(): true tryLock(2, TimeUnit.SECONDS): true acquired tryLock(): false tryLock(2, TimeUnit.SECONDS): false *///:~
一個常不正確的知識是「原子操做不須要進行同步控制」
經過Goetz測試你就可使用原子性:若是你能夠編寫用於現代微處理器的高性能JVM,那麼就有資格去考慮是否能夠避免同步。
使用volatile關鍵字,就會得到(簡單的賦值和返回操做)原子性。
在多處理器系統上,相對於單處理系統而言,可視性問題遠比原子性問題多得多。一個任務作出的修改,即便在不中斷的意義上講是原子性的,對其餘任務也多是不可視的,所以不一樣的任務對應用的狀態有不一樣的視圖。另外一方面,同步機制強制在處理系統中,一個任務作出的修改必須在應用中是可視的。若是沒有同步機制,那麼修改時可視將沒法肯定。
volatile關鍵字還確保了應用中的可視性。若是你將一個域聲明爲volatile的,那麼只要對這個域產生了寫操做,那麼全部的讀操做就均可以看到這個修改。
原子性和易變性是不一樣的概念。在非volatile域上的原子操做沒必要刷新到主存中去,所以其它讀取該域的任務也沒必要看到這個新值。若是多個任務在同時訪問某個域,那麼這個域就應該是volatile的,不然,這個域應該只能經由同步來訪問。同步也會致使向主存中刷新,所以若是一個域徹底由syschronized方法或語句來防禦,那就沒必要將其設置爲是volatile的。
一個任務所做的任務寫入操做對這個任務來講都是可視的,所以若是它只須要在這個任務內部可視,那麼你就不須要將其設置爲volatile的。
當一個域的值依賴於它以前的值時,volatile就沒法工做了。
使用volatile而不是synchronized的惟一徹底狀況是類中只有一個可變域。全部,通常,你的第一選擇應該是synchronized。
不要盲目的應用原子性:
import java.util.concurrent.*; public class AtomicityTest implements Runnable { private int i = 0; public int getValue() { return i; } private synchronized void evenIncrement() { i++; i++; } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); AtomicityTest at = new AtomicityTest(); exec.execute(at); while(true) { int val = at.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
儘管return i確實是原子性操做,可是缺乏同步使得其數值能夠在處於不穩定的中間狀態被讀取。除此以外,因爲i也不是volatile的,所以還存在可視性問題。
一個產生序列數的類每當nextSerialNumber()被調用時,它必須調用者返回惟一的值:
public class SerialNumberGenerator { private static volatile int serialNumber = 0; public static int nextSerialNumber() { return serialNumber++; // Not thread-safe } }
若是一個域可能會被多個任務同時訪問,或者這些任務中至少有一個是寫入任務,那麼你就應該將這個域設置爲volatile。將一個域定義爲volatile,那麼它就會告訴編譯器不要執行任務移除讀取和寫入操做的優化,這些操做的目的是用線程中的局部變量維護對這個域的精確同步。
import java.util.concurrent.*; // Reuses storage so we don't run out of memory: class CircularSet { private int[] array; private int len; private int index = 0; public CircularSet(int size) { array = new int[size]; len = size; // Initialize to a value not produced // by the SerialNumberGenerator: for(int i = 0; i < size; i++) array[i] = -1; } public synchronized void add(int i) { array[index] = i; // Wrap index and write over old elements: index = ++index % len; } public synchronized boolean contains(int val) { for(int i = 0; i < len; i++) if(array[i] == val) return true; return false; } } public class SerialNumberChecker { private static final int SIZE = 10; private static CircularSet serials = new CircularSet(1000); private static ExecutorService exec = Executors.newCachedThreadPool(); static class SerialChecker implements Runnable { public void run() { while(true) { int serial = SerialNumberGenerator.nextSerialNumber(); if(serials.contains(serial)) { System.out.println("Duplicate: " + serial); System.exit(0); } serials.add(serial); } } } public static void main(String[] args) throws Exception { for(int i = 0; i < SIZE; i++) exec.execute(new SerialChecker()); // Stop after n seconds if there's an argument: if(args.length > 0) { TimeUnit.SECONDS.sleep(new Integer(args[0])); System.out.println("No duplicates detected"); System.exit(0); } } }
上面這個程序,最終會獲得重複的序列數。若是要解決這個問題,須要在nextSerialNumber()前面加上synchronized關鍵字。
Java SE5引入了諸如AtomicIntger,AtomicLong,AtomicReference等特殊的原子性變量類,它們提供下面形式的原子性條件更新操做:
booleean compareAndSet(expectedValue,updateValue)
咱們可使用AtomicInteger來重寫:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; public class AtomicIntegerTest implements Runnable { private AtomicInteger i = new AtomicInteger(0); public int getValue() { return i.get(); } private void evenIncrement() { i.addAndGet(2); } public void run() { while(true) evenIncrement(); } public static void main(String[] args) { new Timer().schedule(new TimerTask() { public void run() { System.err.println("Aborting"); System.exit(0); } }, 5000); // Terminate after 5 seconds ExecutorService exec = Executors.newCachedThreadPool(); AtomicIntegerTest ait = new AtomicIntegerTest(); exec.execute(ait); while(true) { int val = ait.getValue(); if(val % 2 != 0) { System.out.println(val); System.exit(0); } } } }
只是但願防止多個線程同時訪問方法內部的部分代碼而不是防止訪問整個方法。經過這種方式分離出來的代碼段被稱爲臨界區,它也使用synchronized關鍵字創建,synchronized被用來指定某個對象,此對象的鎖被用來對花括號內代碼進行同步控制:
synchronized(syncObject){}
這也被稱爲同步控制塊;在進入此段代碼前,必須獲得syncObject對象的鎖。若是其餘線程已經獲得這個鎖,那麼就得等到鎖被釋放之後,才能進入臨界區。
若是把一個非保護類型的類,在其餘類的保護和控制下,應用於多線程環境:
//: concurrency/CriticalSection.java // Synchronizing blocks instead of entire methods. Also // demonstrates protection of a non-thread-safe class // with a thread-safe one. import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.*; class Pair { // Not thread-safe private int x, y; public Pair(int x, int y) { this.x = x; this.y = y; } public Pair() { this(0, 0); } public int getX() { return x; } public int getY() { return y; } public void incrementX() { x++; } public void incrementY() { y++; } public String toString() { return "x: " + x + ", y: " + y; } public class PairValuesNotEqualException extends RuntimeException { public PairValuesNotEqualException() { super("Pair values not equal: " + Pair.this); } } // Arbitrary invariant -- both variables must be equal: public void checkState() { if(x != y) throw new PairValuesNotEqualException(); } } // Protect a Pair inside a thread-safe class: abstract class PairManager {//持有一個Pair對象,並控制一切對它的訪問 AtomicInteger checkCounter = new AtomicInteger(0); protected Pair p = new Pair(); private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); public synchronized Pair getPair() { // Make a copy to keep the original safe: return new Pair(p.getX(), p.getY()); } // Assume this is a time consuming operation protected void store(Pair p) { storage.add(p); try { TimeUnit.MILLISECONDS.sleep(50); } catch(InterruptedException ignore) {} } public abstract void increment(); } // Synchronize the entire method: class PairManager1 extends PairManager { public synchronized void increment() { p.incrementX(); p.incrementY(); store(getPair()); } } // Use a critical section: class PairManager2 extends PairManager { public void increment() { Pair temp; synchronized(this) { p.incrementX(); p.incrementY(); temp = getPair(); } store(temp); } } class PairManipulator implements Runnable { private PairManager pm; public PairManipulator(PairManager pm) { this.pm = pm; } public void run() { while(true) pm.increment(); } public String toString() { return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get(); } } class PairChecker implements Runnable { private PairManager pm; public PairChecker(PairManager pm) { this.pm = pm; } public void run() { while(true) { pm.checkCounter.incrementAndGet(); pm.getPair().checkState(); } } } public class CriticalSection { // Test the two different approaches: static void testApproaches(PairManager pman1, PairManager pman2) { ExecutorService exec = Executors.newCachedThreadPool(); PairManipulator pm1 = new PairManipulator(pman1), pm2 = new PairManipulator(pman2); PairChecker pcheck1 = new PairChecker(pman1), pcheck2 = new PairChecker(pman2); exec.execute(pm1); exec.execute(pm2); exec.execute(pcheck1); exec.execute(pcheck2); try { TimeUnit.MILLISECONDS.sleep(500); } catch(InterruptedException e) { System.out.println("Sleep interrupted"); } System.out.println("pm1: " + pm1 + "\npm2: " + pm2); System.exit(0); } public static void main(String[] args) { PairManager pman1 = new PairManager1(), pman2 = new PairManager2(); testApproaches(pman1, pman2); } } /* Output: (Sample) pm1: Pair: x: 15, y: 15 checkCounter = 272565 pm2: Pair: x: 16, y: 16 checkCounter = 3956974 *///:~
交給你的一個非線程安全的Pair類,你須要在一個線程環境中使用它。經過建立PairManager類就能夠實現,PairManager類持有一個Pair對象並控制對它的一切訪問。
PairManager類結構,它的一些功能在基類中實現,而且一個或多個抽象方法在派生類中定義,這種結構在設計模式中稱爲模板方法。
對於PairChecker的檢查頻率,PairManager1.increment()不容許有PairManager2.increment()那樣多。後者採用同步控制塊進行控制的典型緣由:使得其餘線程能更多的訪問。
使用顯示的Lock對象來建立臨界區:
//: concurrency/ExplicitCriticalSection.java // Using explicit Lock objects to create critical sections. import java.util.concurrent.locks.*; // Synchronize the entire method: class ExplicitPairManager1 extends PairManager { private Lock lock = new ReentrantLock(); public synchronized void increment() { lock.lock(); try { p.incrementX(); p.incrementY(); store(getPair()); } finally { lock.unlock(); } } } // Use a critical section: class ExplicitPairManager2 extends PairManager { private Lock lock = new ReentrantLock(); public void increment() { Pair temp; lock.lock(); try { p.incrementX(); p.incrementY(); temp = getPair(); } finally { lock.unlock(); } store(temp); } } public class ExplicitCriticalSection { public static void main(String[] args) throws Exception { PairManager pman1 = new ExplicitPairManager1(), pman2 = new ExplicitPairManager2(); CriticalSection.testApproaches(pman1, pman2); } } /* Output: (Sample) pm1: Pair: x: 15, y: 15 checkCounter = 174035 pm2: Pair: x: 16, y: 16 checkCounter = 2608588 *///:~
synchronized塊必須給定一個在其上進行同步的對象,而且最合理的方式是,使用其方法正在被調用的當前對象:synchronized(this)。
若是得到了synchronized塊上的鎖,那麼改對象其餘的synchronized方法和臨界區就不能被調用了,所以,若是在this上同步,臨界區的效果就會直接縮小到同步的範圍內。
兩個任務能夠同時進入同一個對象,只要這個對象上的方法是在不一樣的鎖上同步的便可:
//: concurrency/SyncObject.java // Synchronizing on another object. import javax.xml.crypto.Data; import java.text.SimpleDateFormat; import java.util.Date; import static net.mindview.util.Print.*; class DualSynch { private Object syncObject = new Object(); public synchronized void f() {//同步整個方法,在this上同步。 for(int i = 0; i < 5; i++) { print("f():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date())); Thread.yield(); } } public void g() {//在syncObject對象上同步 synchronized(syncObject) { for(int i = 0; i < 5; i++) { print("g():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date())); Thread.yield(); } } } } public class SyncObject { public static void main(String[] args) { final DualSynch ds = new DualSynch(); new Thread() { public void run() { ds.f(); } }.start(); ds.g(); } } /* Output: (Sample) g():2018/05/31-10:36:32:635 f():2018/05/31-10:36:32:635 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:637 g():2018/05/31-10:36:32:637 f():2018/05/31-10:36:32:638 g():2018/05/31-10:36:32:638 *///:~
防止任務在共享資源上產生衝突的第二種方式是根除對變量的共享。線程本地存儲是一種自動化機制,能夠爲使用相同變量的每一個不一樣的線程都建立不一樣的存儲。
//: concurrency/ThreadLocalVariableHolder.java // Automatically giving each thread its own storage. import java.util.concurrent.*; import java.util.*; class Accessor implements Runnable { private final int id; public Accessor(int idn) { id = idn; } public void run() { while(!Thread.currentThread().isInterrupted()) { ThreadLocalVariableHolder.increment(); System.out.println(this); Thread.yield(); } } public String toString() { return "#" + id + ": " + ThreadLocalVariableHolder.get(); } } public class ThreadLocalVariableHolder { private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() { private Random rand = new Random(47); protected synchronized Integer initialValue() { return rand.nextInt(10000); } }; public static void increment() { value.set(value.get() + 1); } public static int get() { return value.get(); } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Accessor(i)); TimeUnit.SECONDS.sleep(3); // Run for a while exec.shutdownNow(); // All Accessors will quit } } /* Output: (Sample) #0: 9259 #1: 556 #2: 6694 #3: 1862 #4: 962 #0: 9260 #1: 557 #2: 6695 #3: 1863 #4: 963 ... *///:~
ThreadLoca;對象一般看成靜態域存儲。
每一個單獨的線程都被分配了本身的存儲,由於它們每一個都須要跟蹤本身的計數值。
下面演示一個終止問題,並且仍是一個資源共享的示例
獲取天天進入公園的總人數。在公園的任何一個門口都有計數器能夠遞增。
//: concurrency/OrnamentalGarden.java import java.util.concurrent.*; import java.util.*; import static net.mindview.util.Print.*; class Count { private int count = 0; private Random rand = new Random(47); // Remove the synchronized keyword to see counting fail: public synchronized int increment() { int temp = count; if(rand.nextBoolean()) // Yield half the time Thread.yield(); return (count = ++temp); } public synchronized int value() { return count; } } class Entrance implements Runnable { private static Count count = new Count(); private static List<Entrance> entrances = new ArrayList<Entrance>(); private int number = 0; // Doesn't need synchronization to read: private final int id; private static volatile boolean canceled = false; // Atomic operation on a volatile field: public static void cancel() { canceled = true; } public Entrance(int id) { this.id = id; // Keep this task in a list. Also prevents // garbage collection of dead tasks: entrances.add(this); } public void run() { while(!canceled) { synchronized(this) { ++number; } print(this + " Total: " + count.increment()); try { TimeUnit.MILLISECONDS.sleep(100); } catch(InterruptedException e) { print("sleep interrupted"); } } print("Stopping " + this); } public synchronized int getValue() { return number; } public String toString() { return "Entrance " + id + ": " + getValue(); } public static int getTotalCount() { return count.value(); } public static int sumEntrances() { int sum = 0; for(Entrance entrance : entrances) sum += entrance.getValue(); return sum; } } public class OrnamentalGarden { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) exec.execute(new Entrance(i)); // Run for a while, then stop and collect the data: TimeUnit.SECONDS.sleep(3); Entrance.cancel(); exec.shutdown(); if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) print("Some tasks were not terminated!"); print("Total: " + Entrance.getTotalCount()); print("Sum of Entrances: " + Entrance.sumEntrances()); } } /* Output: (Sample) Entrance 0: 1 Total: 1 Entrance 2: 1 Total: 3 Entrance 1: 1 Total: 2 Entrance 4: 1 Total: 5 Entrance 3: 1 Total: 4 Entrance 2: 2 Total: 6 Entrance 4: 2 Total: 7 Entrance 0: 2 Total: 8 ... Entrance 3: 29 Total: 143 Entrance 0: 29 Total: 144 Entrance 4: 29 Total: 145 Entrance 2: 30 Total: 147 Entrance 1: 30 Total: 146 Entrance 0: 30 Total: 149 Entrance 3: 30 Total: 148 Entrance 4: 30 Total: 150 Stopping Entrance 2: 30 Stopping Entrance 1: 30 Stopping Entrance 0: 30 Stopping Entrance 3: 30 Stopping Entrance 4: 30 Total: 150 Sum of Entrances: 150 *///:~
sleep()的一種狀況,它使任務從執行狀態變爲被阻塞狀態,而有時你必須終止被阻塞的任務。
一個線程能夠處於如下四種狀態之一:
一個任務進入阻塞狀態,可能有以下緣由:
查看的問題:但願可以終止處於阻塞狀態的任務。
在任務的run()方法中間打斷,更像是拋出的異常,所以在Java線程種的這種類型的異常中斷種用到了異常。
Thread類包含interrupt()方法,所以你能夠終止被阻塞的任務,這個方法將設置線程的中斷狀態。若是一個線程已經被阻塞,或者試圖執行一個阻塞操做,那麼設置這個線程的中斷狀態將拋出InterruptedException。
在Executor上調用shutdownNow(),那麼它將發送一個interrupt()調用給它啓動的全部線程。
使用Executor,那麼經過調用submin()而不是executor()來啓動任務,就能夠持有改任務的上下文。submit()將返回一個泛型Future<?>,其中有一個未修飾的參數,由於你永遠不會調用上面的get()——持有這種Future的關鍵在於你能夠在其上調用cancel(),並所以可使用它來中斷某個特定的任務。
Executor展現了基本的interrupt()用法:
//: concurrency/Interrupting.java // Interrupting a blocked thread. import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class SleepBlocked implements Runnable { public void run() { try { TimeUnit.SECONDS.sleep(10000); } catch (InterruptedException e) {//被中斷時拋出異常 print("InterruptedException"); } print("Exiting SleepBlocked.run()"); } } class IOBlocked implements Runnable { private InputStream in; public IOBlocked(InputStream is) { in = is; } public void run() { try { print("Waiting for read():"); in.read();//不可被中斷 } catch (IOException e) { if (Thread.currentThread().isInterrupted()) { print("Interrupted from blocked I/O"); } else { throw new RuntimeException(e); } } print("Exiting IOBlocked.run()"); } } class SynchronizedBlocked implements Runnable { public synchronized void f() {//不可被中斷 while (true) // Never releases lock Thread.yield(); } public SynchronizedBlocked() { new Thread() { public void run() { f(); // Lock acquired by this thread } }.start(); } public void run() { print("Trying to call f()"); f(); print("Exiting SynchronizedBlocked.run()"); } } public class Interrupting { private static ExecutorService exec = Executors.newCachedThreadPool(); static void test(Runnable r) throws InterruptedException { Future<?> f = exec.submit(r); TimeUnit.MILLISECONDS.sleep(100); print("Interrupting " + r.getClass().getName()); f.cancel(true); // Interrupts if running print("Interrupt sent to " + r.getClass().getName()); } public static void main(String[] args) throws Exception { //test(new SleepBlocked()); //test(new IOBlocked(System.in)); test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3); print("Aborting with System.exit(0)"); System.exit(0); // ... since last 2 interrupts failed } } /* Output: (95% match) Interrupting SleepBlocked InterruptedException Exiting SleepBlocked.run() Interrupt sent to SleepBlocked Waiting for read(): Interrupting IOBlocked Interrupt sent to IOBlocked Trying to call f() Interrupting SynchronizedBlocked Interrupt sent to SynchronizedBlocked Aborting with System.exit(0) *///:~
你可以中斷對sleep()的調用,可是不能中斷正在試圖獲取synchronized鎖或者試圖執行I/O操做的線程。
對於這類問題,有一個笨拙的解決方案,即關閉任務在其上發生阻塞的底層資源:
//: concurrency/CloseResource.java // Interrupting a blocked task by // closing the underlying resource. // {RunByHand} import java.net.*; import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InputStream socketInput = new Socket("localhost", 8080).getInputStream(); exec.execute(new IOBlocked(socketInput));//線程中斷在關閉socket的時候 exec.execute(new IOBlocked(System.in));//線程沒有中斷 TimeUnit.MILLISECONDS.sleep(100); print("Shutting down all threads"); exec.shutdownNow(); TimeUnit.SECONDS.sleep(1); print("Closing " + socketInput.getClass().getName()); socketInput.close(); // Releases blocked thread TimeUnit.SECONDS.sleep(1); print("Closing " + System.in.getClass().getName()); System.in.close(); // Releases blocked thread } } /* Output: (85% match) Waiting for read(): Waiting for read(): Shutting down all threads Closing java.net.SocketInputStream Interrupted from blocked I/O Exiting IOBlocked.run() Closing java.io.BufferedInputStream Exiting IOBlocked.run() *///:~
可是各類nio類提供了更人性化的I/O中斷。被阻塞的nio通道回自動的響應中斷:
//: concurrency/NIOInterruption.java // Interrupting a blocked NIO channel. import java.net.*; import java.nio.*; import java.nio.channels.*; import java.util.concurrent.*; import java.io.*; import static net.mindview.util.Print.*; class NIOBlocked implements Runnable { private final SocketChannel sc; public NIOBlocked(SocketChannel sc) { this.sc = sc; } public void run() { try { print("Waiting for read() in " + this); sc.read(ByteBuffer.allocate(1)); } catch(ClosedByInterruptException e) { print("ClosedByInterruptException"); } catch(AsynchronousCloseException e) { print("AsynchronousCloseException"); } catch(IOException e) { throw new RuntimeException(e); } print("Exiting NIOBlocked.run() " + this); } } public class NIOInterruption { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InetSocketAddress isa = new InetSocketAddress("localhost", 8080); SocketChannel sc1 = SocketChannel.open(isa); SocketChannel sc2 = SocketChannel.open(isa); Future<?> f = exec.submit(new NIOBlocked(sc1)); exec.execute(new NIOBlocked(sc2)); exec.shutdown(); TimeUnit.SECONDS.sleep(1); // Produce an interrupt via cancel: f.cancel(true); TimeUnit.SECONDS.sleep(1); // Release the block by closing the channel: sc2.close(); } } /* Output: (Sample) Waiting for read() in NIOBlocked@7a84e4 Waiting for read() in NIOBlocked@15c7850 ClosedByInterruptException Exiting NIOBlocked.run() NIOBlocked@15c7850 AsynchronousCloseException Exiting NIOBlocked.run() NIOBlocked@7a84e4 *///:~
若是你嘗試着在一個對象上調用其synchronized方法,而這個對象的所已經被其餘任務得到,那麼調用任務將被掛起,直至這個鎖可得到。
示例說明了同一個互斥能夠如何能被同一個任務屢次得到:
import static net.mindview.util.Print.*; public class MultiLock { public synchronized void f1(int count) { if(count-- > 0) { print("f1() calling f2() with count " + count); f2(count); } } public synchronized void f2(int count) { if(count-- > 0) { print("f2() calling f1() with count " + count); f1(count); } } public static void main(String[] args) throws Exception { final MultiLock multiLock = new MultiLock(); new Thread() { public void run() { multiLock.f1(10); } }.start(); } } /* Output: f1() calling f2() with count 9 f2() calling f1() with count 8 f1() calling f2() with count 7 f2() calling f1() with count 6 f1() calling f2() with count 5 f2() calling f1() with count 4 f1() calling f2() with count 3 f2() calling f1() with count 2 f1() calling f2() with count 1 f2() calling f1() with count 0 *///:~
一個任務應該可以調用在同一個對象種的其餘synchronized方法,而這個任務已經持有鎖。
Java SE5併發類種添加了一個特性,即在ReentrantLock上阻塞的任務具有能夠被中斷的能力,這與在synchronized方法或臨界區上阻塞的任務不一樣:
//: concurrency/Interrupting2.java // Interrupting a task blocked with a ReentrantLock. import java.util.concurrent.*; import java.util.concurrent.locks.*; import static net.mindview.util.Print.*; class BlockedMutex { private Lock lock = new ReentrantLock(); public BlockedMutex() { // Acquire it right away, to demonstrate interruption // of a task blocked on a ReentrantLock: lock.lock();//獲取鎖 } public void f() { try { // This will never be available to a second task lock.lockInterruptibly(); // Special call print("lock acquired in f()"); } catch(InterruptedException e) { print("Interrupted from lock acquisition in f()"); } } } class Blocked2 implements Runnable { BlockedMutex blocked = new BlockedMutex(); public void run() { print("Waiting for f() in BlockedMutex"); blocked.f(); print("Broken out of blocked call"); } } public class Interrupting2 { public static void main(String[] args) throws Exception { Thread t = new Thread(new Blocked2()); t.start(); TimeUnit.SECONDS.sleep(1); System.out.println("Issuing t.interrupt()"); t.interrupt(); } } /* Output: Waiting for f() in BlockedMutex Issuing t.interrupt() Interrupted from lock acquisition in f() Broken out of blocked call *///:~