我的技術博客:www.zhenganwen.topjava
熟悉Java的人都能很容易地寫出以下代碼:linux
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread is running...");
}
}
public static void main(String[] args) {
Thread t = new MyThread();
t.start();
}
複製代碼
這是一個面試常問的基礎問題,你應該確定的回答線程只有五種狀態,分別是:新建狀態、就緒狀態、執行狀態、阻塞狀態、終止狀態。web
因爲Scheduler(調度器)的時間片分配算法,每一個Running的線程會執行多長時間是未知的,所以線程可以在Runnable和Running之間來回轉換。阻塞狀態的線程必須先進入就緒狀態才能進入執行狀態。面試
Running線程在主動調用Thread.sleep()
、obj.wait()
、thread.join()
時會進入TIMED-WAITING
或WAITING
狀態並主動讓出CPU執行權。若是是TIMED-WAITING
,那麼在通過必定的時間以後會主動返回並進入Runnable狀態等待時間片的分配。算法
thread.join()
的底層就是當前線程不斷輪詢thread
是否存活,若是存活就不斷地wait(0)
。shell
Running線程在執行過程當中若是遇到了臨界區(synchronized
修飾的方法或代碼塊)而且須要獲取的鎖正在被其餘線程佔用,那麼他會主動將本身掛起並進入BLOCKED
狀態。數據庫
若是持有鎖的線程退出臨界區,那麼在該鎖上等待的線程都會被喚醒並進入就緒狀態,但只有搶到鎖的線程會進入執行狀態,其餘沒有搶到鎖的線程仍將進入阻塞狀態。編程
若是某個線程調用了obj
的notify/notifyAll
方法,那麼在該線程退出臨界區時(調用wait/notify
必須先經過synchronized
獲取對象的鎖),被喚醒的等待在obj.wait
上的線程纔會從阻塞狀態進入就緒狀態獲取obj
的monitor
,而且只有搶到monitor
的線程纔會從obj.wait
返回,而沒有搶到的線程仍舊會阻塞在obj.wait
上設計模式
在執行狀態下的線程執行完run
方法或阻塞狀態下的線程被interrupt
時會進入終止狀態,隨後會被銷燬。安全
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {}
}
}
private native void start0();
複製代碼
start
方法主要作了三件事:
start0
,這是一個native
方法,在往期文章《Java線程是如何實現的?》一文中談到線程的調度將交給LWP,這裏的啓動新建線程一樣屬於此範疇。所以咱們可以猜到此JNI(Java Native Interface)調用將會新建一個線程(LWP)並執行該線程對象的run
方法started
狀態置爲true
表示已被啓動過。正如初學線程時老師所講的,線程的start
只能被調用一次,重複調用會報錯就是經過這個變量實現的。咱們將經過Thread
來模擬這樣一個場景:銀行多窗口叫號。從而思考已經有Thread
了爲何還要引入Runnable
首先咱們須要一個窗口線程模擬叫號(窗口叫號,相應號碼的顧客到對應窗口辦理業務)的過程:
public class TicketWindow extends Thread {
public static final Random RANDOM = new Random(System.currentTimeMillis());
private static final int MAX = 20;
private int counter;
private String windowName;
public TicketWindow(String windowName) {
super(windowName);
counter = 0;
this.windowName = windowName;
}
@Override
public void run() {
System.out.println(windowName + " start working...");
while (counter < MAX){
System.out.println(windowName + ": It's the turn to number " + counter++);
//simulate handle the business
try {
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複製代碼
而後編寫一個叫號客戶端模擬四個窗口同時叫號:
public class WindowThreadClient {
public static void main(String[] args) {
Stream.of("Window-1","Window-2","Window-3","Window-4").forEach(
windowName -> new TicketWindow(windowName).start()
);
}
}
複製代碼
你會發現同一個號碼被叫了四次,顯然這不是咱們想要的。正常狀況下應該是四個窗口共享一個叫號系統,窗口只負責辦理業務而叫號則應該交給叫號系統,這是典型的OOP中的單一職責原則。
咱們將線程和要執行的任務耦合在了一塊兒,所以出現瞭如上所述的尷尬狀況。線程的職責就是執行任務,它有它本身的運行時狀態,咱們不該該將要執行的任務的相關狀態(如本例中的counter
、windowName
)將線程耦合在一塊兒,而應該將業務邏輯單獨抽取出來做爲一個邏輯執行單元,當須要執行時提交給線程便可。因而就有了Runnable
接口:
public interface Runnable {
public abstract void run();
}
複製代碼
所以咱們能夠將以前的多窗口叫號改造一下:
public class TicketWindowRunnable implements Runnable {
public static final Random RANDOM = new Random(System.currentTimeMillis());
private static final int MAX = 20;
private int counter = 0;
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start working...");
while (counter < MAX){
System.out.println(Thread.currentThread().getName()+ ": It's the turn to number " + counter++);
//simulate handle the business
try {
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複製代碼
測試類:
public class WindowThreadClient {
public static void main(String[] args) {
TicketWindowRunnable ticketWindow = new TicketWindowRunnable();
Stream.of("Window-1", "Window-2", "Window-3", "Window-4").forEach(
windowName -> new Thread(ticketWindow, windowName).start()
);
}
}
複製代碼
如此你會發現沒有重複的叫號了。可是這個程序並非線程安全的,由於有多個線程同時更改windowRunnable
中的counter
變量,因爲本節主要闡述Runnable
的做用,所以暫時不對此展開討論。
將Thread
中的run
經過接口的方式暴露出來還有一個好處就是對策略模式和函數式編程友好。
首先簡單介紹一下策略模式,假設咱們如今須要計算一個員工的我的所得稅,因而咱們寫了以下工具類,傳入基本工資和獎金便可調用calculate
得出應納稅額:
public class TaxCalculator {
private double salary;
private double bonus;
public TaxCalculator(double base, double bonus) {
this.salary = base;
this.bonus = bonus;
}
public double calculate() {
return salary * 0.03 + bonus * 0.1;
}
}
複製代碼
這樣寫有什麼問題?咱們將應納稅額的計算寫死了:salary * 0.03 + bonus * 0.1
,而稅率並不是一層不變的,客戶提出需求變更也是常有的事!難道每次需求變動咱們都要手動更改這部分代碼嗎?
這時策略模式來幫忙:當咱們的需求的輸入是不變的,但輸出須要根據不一樣的策略作出相應的調整時,咱們能夠將這部分的邏輯抽取成一個接口:
public interface TaxCalculateStrategy {
public double calculate(double salary, double bonus);
}
複製代碼
具體策略實現:
public class SimpleTaxCalculateStrategy implements TaxCalculateStrategy {
@Override
public double calculate(double salary, double bonus) {
return salary * 0.03 + bonus * 0.1;
}
}
複製代碼
而業務代碼僅調用接口:
public class TaxCalculator {
private double salary;
private double bonus;
private TaxCalculateStrategy taxCalculateStrategy;
public TaxCalculator(double base, double bonus, TaxCalculateStrategy taxCalculateStrategy) {
this.salary = base;
this.bonus = bonus;
this.taxCalculateStrategy = taxCalculateStrategy;
}
public double calculate() {
return taxCalculateStrategy.calculate(salary, bonus);
}
}
複製代碼
將Thread
中的邏輯執行單元run
抽取成一個接口Runnable
有着殊途同歸之妙。由於實際業務中,須要提交給線程執行的任務咱們是沒法預料的,抽取成一個接口以後就給咱們的應用程序帶來了很大的靈活性。
另外在JDK1.8中引入了函數式編程和lambda表達式,使用策略模式對這個特性也是很友好的。仍是藉助上面這個例子,若是計算規則變成了(salary + bonus) * 1.5
,可能咱們須要新增一個策略類:
public class AnotherTaxCalculatorStrategy implements TaxCalculateStrategy {
@Override
public double calculate(double salary, double bonus) {
return (salary + bonus) * 1.5;
}
}
複製代碼
在JDK增長內部類語法糖以後,可使用匿名內部類省去建立新類的開銷:
public class TaxCalculateTest {
public static void main(String[] args) {
TaxCalculator taxCalaculator = new TaxCalculator(5000,1500, new TaxCalculateStrategy(){
@Override
public double calculate(double salary, double bonus) {
return (salary + bonus) * 1.5;
}
});
}
}
複製代碼
可是在JDK新增函數式編程後,能夠更加簡潔明瞭:
public class TaxCalculateTest {
public static void main(String[] args) {
TaxCalculator taxCalaculator = new TaxCalculator(5000, 1500, (salary, bonus) -> (salary + bonus) * 1.5);
}
}
複製代碼
這對只有一個抽象方法run
的Runnable
接口來講是一樣適用的。
查看Thread
的構造方法,追溯到init
方法(略有刪減):
Thread parent = currentThread();
if (g == null) {
if (g == null) {
g = parent.getThreadGroup();
}
}
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
this.target = target;
setPriority(priority);
this.stackSize = stackSize;
tid = nextThreadID();
複製代碼
g
是當前對象的ThreadGroup
,2~8
就是在設置當前對象所屬的線程組,若是在new Thread
時沒有顯式指定,那麼默認將父線程(當前執行new Thread
的線程)線程組設置爲本身的線程組。
9~10
行,從父線程中繼承兩個狀態:是不是守護線程、優先級是多少。固然了,在new Thread
以後能夠經過thread.setDeamon
或thread.setPriority
進行自定義
12
行,若是是經過new Thread(Runnable target)
方式建立的線程,那麼取得傳入的Runnable target
,線程啓動時調用的run
中會執行不空的target
的run
方法。理論上來說建立線程有三種方式:
Runnable
接口MyRunnable
,經過new Thread(myRunnable)
執行MyRunnable
中的run
Thread
並重寫run
,經過new MyThread()
執行重寫的run
Thread
並重寫run
,仍可向構造方法傳入Runnable
實現類實例:new MyThread(myRunnable)
,可是隻會執行MyThread
中重寫的run
,不會受myRunnable
的任何影響。這種建立線程的方式有很大的歧義,除了面試官可能會拿來爲難你一下,不建議這樣使用設置線程優先級,一共有10個優先級別對應取值[0,9]
,取值越大優先級越大。但這一參數具備平臺依賴性,這意味着可能在有的操做系統上可能有效,而在有的操做系統上可能無效,由於Java線程是直接映射到內核線程的,所以具體的調度仍要看操做系統。
設置棧大小。這個大小指的是棧的內存大小而非棧所能容納的最大棧幀數目,每個方法的調用和返回對應一個棧幀從線程的虛擬機棧中入棧到出棧的過程,在下一節中會介紹這個參數。虛擬機棧知識詳見《深刻理解Java虛擬機(第二版)》第二章。
設置線程的ID
,是線程的惟一標識,好比偏向鎖偏向線程時會在對象頭的Mark Word
中存入該線程的ID(偏向鎖可見《併發編程的藝術》和《深刻理解Java虛擬機》第五章)。
經過nextThreadID
會發現是一個static synchronized
方法,原子地取得線程序列號threadSeqNumber
自增後的值:
public static void main(String[] args) {
new Thread(() -> {
System.out.println(Thread.currentThread().getId()); //11
}).start();
}
複製代碼
爲何main
中建立的第一個線程的ID是11(意味着他是JVM啓動後建立的第11個線程)呢?這由於在JVM在執行main
時會啓動JVM進程的第一個線程(叫作main
線程),而且會啓動一些守護線程,好比GC線程。
這裏要注意的是每一個線程都有一個私有的虛擬機棧。全部線程的棧都存放在JVM運行時數據區域的虛擬機棧區域中。
Thread
提供了一個能夠設置stackSize
的重載構造方法:
public Thread(ThreadGroup group, Runnable target, String name, long stackSize) 複製代碼
官方文檔對該參數的描述以下:
The stack size is the approximate number of bytes of address space that the virtual machine is to allocate for this thread's stack. The effect of the stackSize parameter, if any, is highly platform dependent.
你能經過指定stackSize
參數近似地指定虛擬機棧的內存大小(注意:是內存大小即字節數而不是棧中所能容納的最大棧幀數目,並且這個大小指的是該線程的棧大小而並不是是整個虛擬機棧區的大小)。且該參數具備高度的平臺依賴性,也就是說在各個操做系統上,一樣的參數表現出來的效果有所不一樣。
On some platforms, specifying a higher value for the
stackSize
parameter may allow a thread to achieve greater recursion depth before throwing aStackOverflowError
. Similarly, specifying a lower value may allow a greater number of threads to exist concurrently without throwing anOutOfMemoryError
(or other internal error). The details of the relationship between the value of thestackSize
parameter and the maximum recursion depth and concurrency level are platform-dependent. On some platforms, the value of the stackSize parameter may have no effect whatsoever.
在一些平臺上,爲stackSize
指定一個較大的值,可以容許線程在拋出棧溢出異常前達到較大的遞歸深度(由於方法棧幀的大小在編譯期可知,以局部變量表爲例,基本類型變量中只有long
和double
佔8個字節,其他的做4個字節處理,引用類型根據虛擬機是32位仍是64位而佔4個字節或8個字節。如此的話棧越大,棧所能容納的最大棧幀數目也即遞歸深度也就越大)。相似的,指定一個較小的stackSize
可以讓更多的線程共存而避免OOM異常(有的讀者可能會異或,棧較小怎麼還不容易拋出OOM異常了呢?不是應該棧較小,內存更不夠用,更容易OOM嗎?其實單線程環境下,只可能發生棧溢出而不會發生OOM,由於每一個方法對應的棧幀大小在編譯器就可知了,線程啓動時會從虛擬機棧區劃分一塊內存做爲棧的大小,所以不管是壓入的棧幀太多仍是將要壓入的棧幀太大都只會致使棧沒法繼續容納棧幀而拋出棧溢出。那麼何時回拋出OOM呢。對於虛擬機棧區來講,若是沒有足夠的內存劃分出來做爲新建線程的棧內存時,就會拋出OOM了。這就不難理解了,有限的進程內存除去堆內存、方法區、JVM自身所需內存以後剩下的虛擬機棧是有限的,分配給每一個棧的越少,可以並存的線程天然就越多了)。最後,在一些平臺上,不管將stackSize
設置爲多大均可能不會起到任何做用。
The virtual machine is free to treat the
stackSize
parameter as a suggestion. If the specified value is unreasonably low for the platform, the virtual machine may instead use some platform-specific minimum value; if the specified value is unreasonably high, the virtual machine may instead use some platform-specific maximum. Likewise, the virtual machine is free to round the specified value up or down as it sees fit (or to ignore it completely).
虛擬機會將stackSize
視爲一種建議,在棧大小的設置上仍有必定的話語權。若是給定的值過小,虛擬機會將棧大小設置爲平臺對應的最小棧大小;相應的若是給定的值太大,則會設置成平臺對應的最大棧大小。又或者,虛擬機可以按照給定的值向上或向下取捨以設置一個合適的棧大小(甚至虛擬機會忽略它)。
Due to the platform-dependent nature of the behavior of this constructor, extreme care should be exercised in its use. The thread stack size necessary to perform a given computation will likely vary from one JRE implementation to another. In light of this variation, careful tuning of the stack size parameter may be required, and the tuning may need to be repeated for each JRE implementation on which an application is to run.
因爲此構造函數的平臺依賴特性,在使用時須要格外當心。線程棧的實際大小的計算規則會由於JVM的不一樣實現而有不一樣的表現。鑑於這種變化,可能須要仔細調整堆棧大小參數,而且對於應用程序使用的不一樣的JVM實現須要有不一樣的調整。
Implementation note: Java platform implementers are encouraged to document their implementation's behavior with respect to the
stackSize
parameter.
public class StackSizeTest {
public static int counter = 0;
public static void main(String[] args) {
new Thread(() -> {
try {
count();
} catch (StackOverflowError e) {
System.out.println(counter); // result -> 35473
}
}).start();
}
public static void count() {
counter++;
count();
}
}
複製代碼
顯式指定stackSize
以後顯著地影響了線程棧的大小,調用深度由原來的35473
變成了296
:
public class StackSizeTest {
public static int counter = 0;
public static void main(String[] args) {
new Thread(null,() -> {
try {
count();
} catch (StackOverflowError e) {
System.out.println(counter);
}
},"test-stack-size",10 * 1024).start(); //stackSize -> 10KB result -> 296
}
public static void count() {
counter++;
count();
}
}
複製代碼
要想改變棧幀的大小,經過增長局部變量便可實現。如下經過增長多個long
變量(一個佔8個字節),較上一次的測試,方法調用深度又有明顯的減少:
public class StackSizeTest {
public static int counter = 0;
public static void main(String[] args) {
new Thread(null,() -> {
try {
count();
} catch (StackOverflowError e) {
System.out.println(counter);
}
},"test-stack-size",10 * 1024).start(); //stackSize -> 10KB result -> 65
}
public static void count() {
long a,b,c,d,e,f,g,h,j,k,l,m,n,o,p,q;
counter++;
count();
}
}
複製代碼
經過thread.setDaemon(true)
可將新建後的線程設置爲守護線程,必須在線程啓動前(thread.start
)設置纔有效。
集羣架構中,一般須要心跳檢測機制。若是應用程序開一條非守護線程來作心跳檢測,那麼可能會出現應用主程序都終止運行了但心跳檢測線程仍在工做的狀況,這時JVM會由於仍有非守護線程在工做而繼續佔用系統的CPU、內存資源,這顯然是不該該的。
下列代碼簡單模仿了這一場景:
public class HeartCheck {
public static void main(String[] args) {
// worker thread
new Thread(()->{
// start the heart-check thread first
Thread heartCheck = new Thread(()->{
// do interval-automatic heart check and notify the parent thread when heart check has error
while (true) {
System.out.println("do heart check");
try {
Thread.sleep(100); //interval
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
heartCheck.setDaemon(true);
heartCheck.start();
// simulate work
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
複製代碼
直接上源碼:
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
複製代碼
若是調用某個線程thread
的join()
,會分發到join(0)
,執行上述的第10~12
行,只要當前線程獲取到了CPU執行權就會輪詢thread
的執行狀態(isAlive
是個native
方法,但咱們可以猜到它的做用就是檢測thread
是否存活,即不是Terminated
狀態),一旦發現thread
仍然存活就會釋放CPU執行權(經過wait(0)
的方式),等下一輪的輪詢,直到thread
進入終止狀態,那麼當前線程將從thread.join()
返回。
必定要區分清楚,調用thread.join()
阻塞的是當前線程,不會對thread
線程形成任何影響。
join
提供了一個重載的限時等待方法(這是一個經典的超時等待模型:只有當條件知足或者已超過等待時限時才返回),這也是爲了不當前線程陷入永久等待的困境,可以在等待一段時間發現目標線程仍未執行完後自動返回。
join
有一個比較好玩的地方就是若是線程調用它本身的join
方法,那麼該線程將無限wait
下去,由於:Thread.currentThread().join()
會等待當前線程執行完,而當前線程正在調用當前線程的join
即等當前線程執行完……就讓他自個兒去慢慢玩兒吧~
好比電商網站中的用戶行爲日誌,可能須要通過聚合、篩選、分析、歸類等步驟加工,最後再存入數據庫。而且這些步驟的執行必須是循序漸進的層層加工,那麼一個步驟就必須等到上一個步驟結束後拿到結果在開始,這時就能夠利用join
作到這點。
下列代碼簡單模仿了此場景:
public class StepByStep {
public static void main(String[] args) throws InterruptedException {
Thread step1 = new Thread(() -> {
System.out.println("start capture data...");
//simulate capture data
try {
Thread.sleep(1000);
System.out.println("capture done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step1.start();
Thread step2 = new Thread(() -> {
try {
step1.join();
System.out.println("start screen out the data...");
Thread.sleep(1000);
System.out.println("screen out done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step2.start();
Thread step3 = new Thread(() -> {
try {
step2.join();
System.out.println("start analyze the data...");
Thread.sleep(1000);
System.out.println("analyze done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step3.start();
Thread step4 = new Thread(() -> {
try {
step3.join();
System.out.println("start classify the data");
Thread.sleep(1000);
System.out.println("classify done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
step4.start();
step4.join();
System.out.println("write into database");
}
}
複製代碼
值得注意的是,若是調用未啓動線程的join
,將會當即返回:
public class StepByStep {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
});
t.join();
}
}
複製代碼
有時任務量太大且任務是可分的(子任務之間沒有上例的依賴關係),那麼咱們不妨將任務拆分紅互不相干的子任務(這一步叫作Fork
),分別爲各個子任務分配一個單獨線程從而實現子任務並行執行,提升執行效率,最後將個子任務的結果整合起來作最後的加工(主線程就可使用join
來等待各個子任務線程的執行結果,從而最後作一個彙總)。JDK8提供的Stream
和ForkJoin
框架都有此模型的身影。
咱們能夠經過join
的重載方法提供的限時等待,在目標任務執行時間過長時自動返回,從而採起其餘彌補策略,而不至於總是傻傻地等着。
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
複製代碼
這裏有一個細節,interrupt
首先會設置線程的中斷標誌位,而後再打斷它。
查看官方文檔:
If this thread is blocked in an invocation of the
wait()
,wait(long)
, orwait(long, int)
methods of theObject
class, or of thejoin()
,join(long)
,join(long, int)
,sleep(long)
, orsleep(long, int)
, methods of this class, then its interrupt status will be cleared and it will receive anInterruptedException
.If none of the previous conditions hold then this thread's interrupt status will be set.
Interrupting a thread that is not alive need not have any effect.
由此咱們能夠提取三點信息:
Timed-Waiting/Waiting
中的線程被打斷後首先會清除它的中斷標誌位,而後再拋出InterruptedException
。所以被中斷的線程進入Runnable/Running
)下的線程不會被打斷,可是其中斷標誌位會被設置,即調用它的isInterrupted
將返回true
interrupt
不會產生任何效果。Tests whether this thread has been interrupted. The interrupted status of the thread is unaffected by this method.
A thread interruption ignored because a thread was not alive at the time of the interrupt will be reflected by this method returning false.
測試線程是否被中斷過,該方法的調用不會改變線程的中斷標誌位。對一個終止狀態下的線程調用過interrupt
並不會致使該方法返回true
。
因而咱們可使用isInterrupted
來測試一下上面提取的3個結論:
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t1.interrupt();
System.out.println(t1.isInterrupted()); //true
Thread.sleep(1000);
System.out.println(t1.isInterrupted()); //false
}
複製代碼
上述代碼在t1.interrupt
後立刻檢查t1
的中斷標誌位,因爲interrupt
是先設置中斷標誌位,再中斷,所以17
行的輸出檢測到了中斷標誌位返回true
;接着18~19
行先等t1
在拋出InterruptedException
時清除標誌位,再檢測其中斷標誌位發現返回false
證實告終論1:拋出InterruptedException
以前會先清除其中斷標誌位。
static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (flag) {
}
});
t1.start();
t1.interrupt();
System.out.println(t1.isInterrupted()); //true
flag = false;
t1.join();
System.out.println(t1.isInterrupted()); //false
}
複製代碼
interrupted
不會中斷正在運行的線程,但會設置其中斷標誌位,所以第10
行返回true
。由第13
行的輸出咱們還能夠的處一個新的結論:對終止狀態的線程調用isInterrupted
始終會返回false
。
這是一個靜態方法,用來檢測當前線程是否被中斷過,但與isInterrupted
不一樣,它的調用會致使當前線程的中斷標誌位被清除且isInterrupted
是實例方法。也就是說若是連續兩次調用Thread.interrupted
,第二次必定會返回false
。
static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (flag) {
}
System.out.println(Thread.currentThread().isInterrupted()); //true
System.out.println(Thread.interrupted()); //true
System.out.println(Thread.interrupted()); //false
});
t1.start();
t1.interrupt();
flag = false;
}
複製代碼
Thread
有一個棄用的方法stop
,棄用的緣由是這個方法是相似於linux
中kill -9
的方式強制當即終止線程,不給線程任何喘息的機會,這意味着執行了一半的程序忽然沒後文了,若是線程打開了I/O、數據庫鏈接等資源時將沒法及時釋放他們。
守護線程在其父線程終結時也會隨之終結,所以咱們能夠經過將線程設置爲守護線程,經過控制其父線程的終結時間來間接終結他:
public class ThreadService {
private Thread executeThread;
private volatile boolean finished;
public void execute(Runnable task) {
executeThread =new Thread(() -> {
Thread t = new Thread(() -> {
task.run();
});
t.setDaemon(true);
t.start();
try {
t.join();
finished = true;
} catch (InterruptedException e) {
System.out.println("task execution was interrupted");
}
});
executeThread.start();
}
public void shutdown(long millis) {
long base = System.currentTimeMillis();
long now = 0;
while (!finished) {
now = System.currentTimeMillis() - base;
if (now >= millis) {
System.out.println("task execution time out, kill it now");
executeThread.interrupt();
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("was interrupted when shutdown");
}
}
finished = true;
}
}
複製代碼
在上述代碼中,能夠經過給shutdown
傳入一個task
執行時限,要求它在millis
時間內執行完,若是超出這個時間則視爲任務執行異常,經過終止其父線程來終止它。若是它執行正常,在millis
時間內返回了,那也會致使父線程的結束,shutdown
也能經過輪詢finished
狀態來感知任務執行結束。
public class ThreadCloseGraceful implements Runnable{
private volatile boolean stop = false;
@Override
public void run() {
while (true) {
if (stop) {
break;
}
// to do here
}
}
public void shutdown() {
stop = true;
}
}
複製代碼
這種方式的要點是,共享狀態變量必須聲明爲volatile
,這樣執行線程才能及時感知到shutdown
命令。
經過輪詢線程的中斷標誌位來感知外界的中斷命令。
public class ThreadCloseGraceful extends Thread{
@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
break;
}
// to do here
}
}
public void shutdown() {
this.interrupt();
}
}
複製代碼
resume/suspend
被棄用的主要緣由是由於suspend
將線程掛起時並不會釋放其所持有的共享資源,若是一個線程持有一個甚至多個鎖後執行suspend
,那麼將會致使全部等待該鎖或這些鎖釋放的線程陷入長久的阻塞狀態。若是碰巧將要resume
這個被掛起線程的線程事先也有獲取這些鎖的需求,那麼resume
線程也會被阻塞,這可能致使suspend
線程將無人喚醒,這些線程都將陷入永久阻塞。
所以在併發場景下,對於臨界區來講,suspend
和resume
是線程對立的,不管是誰先進入臨界區,都將致使這兩個線程甚至是多個線程陷入死鎖。
synchronized
可以保證被同步的代碼在多線程環境下的執行是串行化的。
this
對象的monitor
(也就是咱們一般所說的鎖,術語是管程),一個monitor
同一個時刻只能被一個線程持有,獲取失敗將陷入阻塞狀態(BLOCKED),直到該鎖被釋放(持有鎖的線程退出該方法/臨界區)後該線程將加入到新一輪的鎖爭取之中Class
對象的monitor
,鎖獲取-釋放邏輯和實例方法的相同。synchronized
括號後顯式指定一個同步對象,鎖獲取-釋放邏輯依然相同獲取鎖失敗時陷入阻塞、鎖釋放時相應阻塞在該鎖上的線程會被喚醒,這會引發線程由用戶態到內核態的切換,時間開銷較大,甚至大於臨界區代碼的實際執行開銷。所以原則上要減小synchronized
的使用,可是隨着JDK的升級,自旋鎖、適應性自旋、鎖消除、鎖粗化、偏向鎖、輕量級鎖等優化的引入(詳見《深刻理解Java虛擬機(第二版)》高併發章節),synchronized
的開銷實際上也沒那麼大了。
可重入,若是當前線程已持有某個對象的monitor
,在再次進入須要該monitor
的臨界區時,可直接進入而無需通過鎖獲取這一步。
一個線程可同時持有多個monitor
。注意,這一操做容易致使死鎖的發生,如下代碼就模仿了這一情景:
public class DeadLock {
public static Object lock1 = new Object();
public static Object lock2 = new Object();
public static void main(String[] args) {
IntStream.rangeClosed(0,19).forEach(i->{
if (i % 2 == 0) {
new Thread(() -> m1()).start();
} else {
new Thread(() -> m2()).start();
}
});
}
public static void m1() {
synchronized (lock1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
System.out.println(Thread.currentThread().getName());
}
}
}
public static void m2() {
synchronized (lock2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1) {
System.out.println(Thread.currentThread().getName());
}
}
}
}
複製代碼
上述代碼有很大的概率陷入死鎖,可是並不會有任何提示信息。咱們能夠經過jps/jstack
查看一下線程狀態:
C:\Users\zaw>jps
2864
5664 Jps
4072 Launcher
2172 DeadLock
C:\Users\zaw>jstack 2172
"Thread-1" #12 prio=5 os_prio=0 tid=0x0000000018c71800 nid=0x8f0 waiting for monitor entry [0x00000000196cf000]
java.lang.Thread.State: BLOCKED (on object monitor)
at deadlock.DeadLock.m2(DeadLock.java:47)
- waiting to lock <0x00000000d6081098> (a java.lang.Object)
- locked <0x00000000d60810a8> (a java.lang.Object)
at deadlock.DeadLock.lambda$null$1(DeadLock.java:21)
at deadlock.DeadLock$$Lambda$3/1989780873.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"Thread-0" #11 prio=5 os_prio=0 tid=0x0000000018c70800 nid=0x944 waiting for monitor entry [0x00000000195cf000]
java.lang.Thread.State: BLOCKED (on object monitor)
at deadlock.DeadLock.m1(DeadLock.java:34)
- waiting to lock <0x00000000d60810a8> (a java.lang.Object)
- locked <0x00000000d6081098> (a java.lang.Object)
at deadlock.DeadLock.lambda$null$0(DeadLock.java:19)
at deadlock.DeadLock$$Lambda$2/999966131.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
複製代碼
筆者省去了其餘線程的狀態,分析清楚這一對線程死鎖的緣由以後,剩下的18個線程是相似的。首先第9
和18
兩行代表兩個線程代表線程由於獲取不到對象的鎖而陷入BLOCKED
狀態。11~12
行詳細的指出Thread-1
正在等待獲取內存地址爲0x00000000d6081098
的一個對象的鎖,且已持有了內存地址爲0x00000000d60810a8
的對象的鎖。20~21
行一樣的指出Thread-0
等在0x00000000d60810a8
對象上,而已獲取了0x00000000d6081098
對象的鎖。可見他們都在無腦阻塞地等待對方釋放鎖,因而就陷入了死鎖。
在jstack
羅列JVM各個線程狀態以後還爲咱們分析了死鎖:
Found one Java-level deadlock:
=============================
"Thread-19":
waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object),
which is held by "Thread-1"
"Thread-1":
waiting to lock monitor 0x0000000018c58d98 (object 0x00000000d6081098, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object),
which is held by "Thread-1"
複製代碼
咱們還可使用JDK內置的JVM性能監控工具JConsole更直觀地分析線程狀態:
C:\Users\zaw>jps
2864
6148 Jps
4072 Launcher
2172 DeadLock
C:\Users\zaw>jconsole 2172
複製代碼
打開的工具窗口會詢問一下是否信任不安全的鏈接,點擊是方可進入。進入後經過線程面板可以查看各線程狀態,點擊死鎖分析,它會爲咱們分析出當前JVM進程中哪些線程陷入了死鎖以及緣由是什麼:
要想了解爲何線程在執行臨界區(包括同步方法和同步代碼塊)時會有鎖獲取-釋放這一機制,那咱們就要知道這個關鍵字在編譯後生成了怎樣的JVM指令。
首先咱們分別編寫一個同步方法和同步塊,分別測試synchronized
在字節碼層面會產生什麼樣的效果:
public class SynchronizedTest{
public synchronized void m1(){
System.out.println("sync method");
}
Object lock = new Object();
public void m2(){
synchronized(lock){
System.out.println("sync block");
}
}
}
複製代碼
而後使用javac
編譯,因爲編譯後的字節碼文件是二進制字節流,咱們查看不方便(JVM查看方便),所以還須要使用javap
將其轉換成咱們能看懂的友好內容(字節碼格式詳見《深刻理解Java虛擬機(第二版)》中的Class文件格式),爲了照顧對這部分不熟悉的讀者,筆者作了刪減,僅關注synchronized
產生的效果:
C:\Users\zaw>cd Desktop
C:\Users\zaw\Desktop>javac SynchronizedTest.java
C:\Users\zaw\Desktop>javap -verbose SynchronizedTest.class
public class SynchronizedTest
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
# 這裏省去了常量池部分
{
java.lang.Object lock;
descriptor: Ljava/lang/Object;
flags:
public synchronized void m1();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=1, args_size=1
0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #5 // String sync method
5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
LineNumberTable:
line 4: 0
line 5: 8
public void m2();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: aload_0
1: getfield #3 // Field lock:Ljava/lang/Object;
4: dup
5: astore_1
6: monitorenter
7: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
10: ldc #7 // String sync block
12: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
15: aload_1
16: monitorexit
17: goto 25
20: astore_2
21: aload_1
22: monitorexit
23: aload_2
24: athrow
25: return
Exception table:
from to target type
7 17 20 any
20 23 20 any
}
SourceFile: "SynchronizedTest.java"
複製代碼
儘管上述代碼看起來很長,可是咱們只須要關注兩個點:
20
行和33
行,會發現同步方法m1
比非同步方法m2
的flags
多了一個ACC_SYNCHRONIZED
,所以線程在進入同步方法時,若檢測到該方法的flags
包含ACC_SYNCHRONIZED
,那麼該線程將嘗試獲取this
或該方法所在類的Class
實例(這取決於方法是實例方法仍是靜態方法),即同步方法的synchronized
語義是經過方法標誌位ACC_SYNCHRONIZED
來實現的,同步過程是隱式的(同步對象由JVM來指定,鎖釋放由JVM來完成)40~49
行,發現它給咱們的同步塊內的內容System.out.println("sync block")
的先後分別加上了一個monitorenter
和一個monitorexit
,這就對應鎖獲取-釋放,這種同步語義是顯式的,同步對象和臨界區由咱們來控制,相對同步方法靈活一些。還有一點值得注意的是上述的第49
行代碼爲何又出現了一個monitorexit
?這是爲了保證在同步代碼塊執行過程當中若是拋出了異常,線程持有的鎖也可以在異常拋出前被釋放掉(不至於影響到其餘正在等待鎖獲取的線程)。
通過上述的分析,對於鎖的理解應該有了更深入的認識。那麼如何避免死鎖呢?陷入死鎖的線程既不會工做還要持續佔用系統資源,咱們的應用程序應當避免發生這種狀況。
Lock
的tryLock(millis)
超時等待機制,一旦發現等待時間過長,那麼就不必一直等下去,能夠先去完成其餘任務以後再來嘗試獲取鎖。後面咱們將針對這種狀況手寫一個等待超時就能自動返回的鎖。棄用suspend/resume
以後,官方建議使用wait/notify
代替。與suspend/resume
的定位不一樣,wait/notify
實現於Object
,是全部對象都可以調用的方法。且調用對象的wait/notify
前必須先獲取該對象的monitor
。
如下是官方對wait(millis)
給出的說明:
* This method causes the current thread (call it <var>T</var>) to
* place itself in the wait set for this object and then to relinquish
* any and all synchronization claims on this object. Thread <var>T</var>
* becomes disabled for thread scheduling purposes and lies dormant
* until one of four things happens: notify, notifyAll, interrupt, time out
複製代碼
調用一個對象obj
的wait
方法將會致使當前執行線程被放入obj
的等待隊列中(wait set
,線程休息室),而且釋放該線程經過synchronized
已持有的全部鎖,而後釋放CPU的執行權陷入等待,直到被notify/notifyAll
通知到、被其餘線程調用interrupt
中斷或者等待時間已超過所設置的時限時纔會進入就緒狀態從新爭取CPU執行權。
這裏須要注意的是並不是線程被notify/notifyAll
喚醒了就能當即從wait
返回,被喚醒後只會使線程進入就緒狀態爭取CPU執行權,只有獲取到CPU執行權而且獲取到全部wait
前釋放的鎖後才能從wait
返回,不然線程仍將阻塞在wait
上。
使用wait/notify
,咱們可以實現線程間的通訊。
官方給出了wait/notify
使用的經典範式:
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}
複製代碼
使用while
而不使用if
的緣由就是被喚醒並從wait
返回的線程應該不斷檢查它所關注的條件,由於被喚醒可能並非因爲另外一個線程爲了通知該線程而有針對性的喚醒該線程,這一點從notify
的隨機喚醒、notifyAll
喚醒所有、被喚醒的線程在同一時刻只有一個可以搶到鎖,能夠看出真正可以從wait
返回的線程具備很大的不肯定性。因爲每一個線程的關注的條件不一樣,因此須要輪詢判斷條件是否成立,方可從while
中退出來。
由此咱們能夠利用wait/notify
實現生產者-消費者通訊模型:
public class ClassicForm {
private static String message;
private static Object lock = new Object();
public static void main(String[] args) {
Thread consumer = new Thread(() -> {
while(true){
synchronized (lock) {
while (message == null) { // wait for producing
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println("consumer was broken");
return;
}
}
System.out.println("CONSUMER receive message : " + message);
message = null;
lock.notify();
}
}
});
Thread producer = new Thread(() -> {
synchronized (lock) {
for(int i = 0 ; i < 100 ; i++){
while (message != null) { // wait for consuming
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println("producer was broken");
return;
}
}
message = "please the order, order-id is " + i;
lock.notify();
System.out.println("PRODUCER send the message : " + message);
}
}
});
consumer.start();
producer.start();
}
}
複製代碼
你會發現這裏的message
即便沒有加volatile
,生產者每次所作的更改消費者都能準確獲取到。這是由synchronized
的unlock
指令和JMM(Java內存模型)共同決定的,JMM將在後文中詳細展開。
上述代碼有一個明顯的缺陷,那就是若是生產者生產消息很慢,那麼消費者就會一直wait
直到有新的消息到來。這樣就沒有充分利用消費者線程所佔用的資源。可否爲消費者的等待設置一個限時?在等待時長超過限時以後就不wait
了,先去處理其餘任務,稍後再來監聽生產者生產的消息。下段代碼簡單模擬了這一場景:
public class WaitTimeoutModel {
private static String message;
private static Object lock = new Object();
private static final long MAX_WAIT_LIMIT = 1000;
public static void main(String[] args) {
Thread consumer = new Thread(() -> {
synchronized (lock) {
while (true) {
long base = System.currentTimeMillis();
long now = 0;
while (message == null) {
now = System.currentTimeMillis() - base;
if (now >= MAX_WAIT_LIMIT) {
break; // exit wait
}
try {
lock.wait(MAX_WAIT_LIMIT);
} catch (InterruptedException e) {
System.out.println("consumer was broken");
}
}
if (message == null) {
System.out.println("CONSUMER exit wait, and do other things");
try { // simulate do other thing
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("CONSUMER receive the message : " + message);
message = null;
}
}
}
});
Thread producer = new Thread(() -> {
// prepare message is very slowly
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// notify consumer
synchronized (lock) {
message = "please handle the order, order-id is 5454656465";
lock.notify();
System.out.println("PRODUCER send the message : " + message);
}
});
consumer.start();
producer.start();
}
}
複製代碼
要點就是在經典範式的基礎之上,在輪詢狀態變量的過程當中增長了一個等待時長判斷(第14~17
行),若是發現超過了給定的時限(這裏是MAX_WAIT_LIMIT
),那麼就再也不等待,去作其餘事情(第25~30
行),相反若是在wait(MAX_WAIT_LIMIT)
期間因爲生產者的提醒被喚醒,那麼一樣會跳出輪詢(生產者一般生產出消息後才喚醒消費者)進入到第32~33
行去消費消息。但不管是哪種狀況,都算是消費者一個邏輯執行單元的結束。因爲消費者一般是24小時運行監聽的(while(true)
),所以在每個執行單元結束後將重置用來計算等待時長的base
和now
(第11~12
行)。
運行效果以下:
CONSUMER exit wait, and do other things
PRODUCER send the message : please handle the order, order-id is 5454656465
CONSUMER receive the message : please handle the order, order-id is 5454656465
CONSUMER exit wait, and do other things
CONSUMER exit wait, and do other things
CONSUMER exit wait, and do other things
...
複製代碼
超時等待模型被普遍用於併發設計模式以及JUC包,須要好好理解。
wait
是Object
中的實例方法且調用前須要獲取實例對象的鎖,sleep
是Thread
中的靜態方法可直接調用sleep
不會釋放當前線程所持有的鎖,而wait
則會釋放當前線程持有的全部鎖sleep
和wait
都會使線程進入TIMED-WAITING
狀態釋放CPU執行權,但調用sleep
的線程在設定的時限後可以自動返回,而wait(millis)
在超時後須要先獲取對象的鎖才能返回、wait(0)
更是須要等待被喚醒並獲取到鎖後才能返回。下段代碼模擬了生產者-消費者模型下兩個生產者和兩個消費者同時工做致使程序假死的一個案例
public class ProducerConsumer {
private String message;
public synchronized void produce() {
while (message != null) {
try {
this.wait();
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was broken");
return;
}
}
message = "time is " + new Date(System.currentTimeMillis());
this.notify();
System.out.println(Thread.currentThread().getName() + " send the message : " + message);
}
public synchronized void consume() {
while (message == null) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was broken");
return;
}
}
System.out.println(Thread.currentThread().getName() + " recv the message : " + message);
message = null;
this.notify();
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
Stream.of("p1", "p2").forEach(name -> {
new Thread(() -> {
while (true) {
pc.produce();
}
}, name).start();
});
Stream.of("c1", "c2").forEach(name -> {
new Thread(() -> {
while (true) {
pc.consume();
}
}, name).start();
});
}
}
複製代碼
輸出以下:
p1 send the message : time is Fri Feb 01 14:06:26 CST 2019
c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019
p2 send the message : time is Fri Feb 01 14:06:26 CST 2019
c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019
p1 send the message : time is Fri Feb 01 14:06:27 CST 2019
# 至此,四個線程陷入永久wait
複製代碼
筆者也曾異或良久,一個Producer生產了消息會通知一個Consumer消費,且後者消費完後又會通知一個等待生產的Producer,沒問題啊!怎麼會都陷入wait
呢?
這是由於咱們陷入了一個慣性思惟,學生產者-消費者模式學着學着就總覺得生產者生產了消息會通知消費者、消費者消費完了會通知生產者。咱們忘記了notify
的本質:notify
會從對象的wait set
中隨機選取一個線程喚醒。咱們再來理性地分析一下上述代碼:第17
行的notify
必定會喚醒對象的wait set
上的一個消費者線程嗎?不必定吧!假設某一時刻p1
搶到了鎖,而p2,c1,c2
均阻塞在wait
上,那麼p1
生產消息後調用的notify
有沒有可能喚醒的是p2
呢(如此的話,被喚醒的p2
發現p1
生產的消息沒有被消費仍然會陷入wait
,這樣的話四個線程就都陷入wait
了,沒有其餘線程來喚醒他們。相似的,消費者消費完消息後喚醒的也多是另外一個在wait
的消費者,這樣的喚醒作的是無用功)。就是由於notify
的不肯定性,從而致使上述代碼並無按照生產者-消費者的套路來,最後四個線程都陷入了wait
且沒有線程去喚醒他們。
可是若是將第17,34
行的notify
改爲notifyAll
就不會死鎖了。這是由於notifyAll
會喚醒全部阻塞在該對象的wait
上的線程。所以p1
生產消息後若是調用的是notifyAll
,那麼p2,c1,c2
都會被喚醒並爭取該對象的monitor
,這時即便p2
先搶到了,它也會因爲消息未被消費而進入wait
進而釋放鎖並喚醒等待該鎖的c1,c2
,因此p1
的notifyAll
最終必定會致使其中一個消費者從wait
返回,這樣即便是多Producer多Consumer,程序也能跑通了。
p2 send the message : time is Fri Feb 01 14:30:39 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:39 CST 2019
p1 send the message : time is Fri Feb 01 14:30:39 CST 2019
c2 recv the message : time is Fri Feb 01 14:30:39 CST 2019
p2 send the message : time is Fri Feb 01 14:30:40 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:40 CST 2019
p1 send the message : time is Fri Feb 01 14:30:41 CST 2019
c2 recv the message : time is Fri Feb 01 14:30:41 CST 2019
p2 send the message : time is Fri Feb 01 14:30:42 CST 2019
c1 recv the message : time is Fri Feb 01 14:30:42 CST 2019
...
複製代碼
多線程下的生產者-消費者模型,要使用
notifyAll
上文說到synchronized
有一個嚴重的缺陷就是,若是持有鎖的線程遲遲不釋放鎖(臨界區的執行時間過長),那麼等待該鎖的其餘線程就會一直阻塞住,直到該鎖被釋放。那麼可否實現這樣一種機制呢:給等待鎖釋放的線程設置一個時限,若是超過了該時限,那麼就認爲鎖一時半會兒不會被釋放,因而可讓線程利用這段空閒執行其餘的任務而非一直阻塞着什麼事都不作。
如今咱們可使用wait/notify
的經典範式實現synchronized
語義,使用其超時等待模型實現限時等待語義。首先定義一個同步對象接口,即Lock
:
public interface Lock {
void lock() throws InterruptedException;
void unlock();
void lock(long millis) throws InterruptedException, TimeoutException;
Collection<Thread> getBlockedThread();
int getBlockedCount();
}
複製代碼
接着實現一個簡單的用一個布爾變量表示同步狀態的BooleanLock
:
public class BooleanLock implements Lock {
private volatile boolean isSync = false; //represent whether the lock is held or not. true is held, false is not held
private Thread currentThread; //current thread which hold the lock
private Collection<Thread> waitQueue;
public BooleanLock() {
this.isSync = false;
this.currentThread = null;
this.waitQueue = new ArrayList<>();
}
@Override
public synchronized void lock() throws InterruptedException {
waitQueue.add(Thread.currentThread());
while (isSync) { // lock is held by other thread
this.wait();
}
// get the lock successfully
waitQueue.remove(Thread.currentThread());
currentThread = Thread.currentThread();
isSync = true; //indicate the lock is held
System.out.println(Thread.currentThread().getName() + " get the lock");
}
@Override
public void unlock() {
// check the operator is the thread which is holding the lock
if (Thread.currentThread() != currentThread) {
return;
}
synchronized (this) {
currentThread = null;
isSync = false;
this.notifyAll();
System.out.println(Thread.currentThread().getName() + " release the lock");
}
}
@Override
public synchronized void lock(long millis) throws InterruptedException, TimeoutException {
long base = System.currentTimeMillis();
long now = 0;
waitQueue.add(Thread.currentThread());
while (isSync) {
now = System.currentTimeMillis() - base;
if (now >= millis) {
throw new TimeoutException();
}
this.wait(millis);
}
waitQueue.remove(Thread.currentThread());
currentThread = Thread.currentThread();
isSync = true;
System.out.println(Thread.currentThread().getName() + " get the lock");
}
@Override
public Collection<Thread> getBlockedThread() {
return Collections.unmodifiableCollection(waitQueue);
}
@Override
public int getBlockedCount() {
return waitQueue.size();
}
}
複製代碼
synchronized
語義:public static void main(String[] args) {
BooleanLock lock = new BooleanLock();
Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> {
new Thread(() -> {
try {
lock.lock();
Thread.sleep(50); // to do thing
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
printBlockedThread(lock);
lock.unlock();
}
}, name).start();
});
}
private static void printBlockedThread(BooleanLock lock) {
System.out.print("There are " + lock.getBlockedCount() + " threads waiting on the lock: ");
lock.getBlockedThread().forEach(thread -> System.out.print(thread.getName() + " "));
System.out.println();
}
複製代碼
運行結果:
t1 get the lock
There are 4 threads waiting on the lock: t4 t3 t2 t5
t1 release the lock
t5 get the lock
There are 3 threads waiting on the lock: t4 t3 t2
t5 release the lock
t4 get the lock
There are 2 threads waiting on the lock: t3 t2
t4 release the lock
t2 get the lock
There are 1 threads waiting on the lock: t3
t2 release the lock
t3 get the lock
There are 0 threads waiting on the lock:
t3 release the lock
複製代碼
須要注意的是unlock
必須寫在finally
中確保鎖必定會被釋放,而synchronized
同步塊執行時拋出異常JVM會經過異常表(詳見《深刻理解Java虛擬機(第二版)》Class文件結構一章中的方法表的描述)在異常拋出時釋放當前線程所持有的所有的鎖。
上例只是實現了與synchronized
一樣的功能,接着咱們測試一下限時獲取鎖的功能,這是synchronized
沒法作到的。
public static void main(String[] args) {
BooleanLock lock = new BooleanLock();
Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> {
new Thread(() -> {
try {
lock.lock(1000);
Thread.sleep(2000); // the task is very time-consuming
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted");
} catch (TimeoutException e) {
System.out.println(Thread.currentThread().getName() + " get lock time out, so do other thing first and then get the lock again");
} finally {
lock.unlock();
}
}, name).start();
});
}
複製代碼
輸出以下:
t1 get the lock
t2 get lock time out, so do other thing first and then get the lock again
t3 get lock time out, so do other thing first and then get the lock again
t4 get lock time out, so do other thing first and then get the lock again
t5 get lock time out, so do other thing first and then get the lock again
t1 release the lock
複製代碼
在使用一些開源框架時,好比Tomcat
,在關閉時仍會有些日誌打印出來,這些日誌一般是釋放應用程序資源的信息。也就是說咱們點擊terminate
的事件被應用程序捕獲到後,應用程序並不是直接終止而是先釋放一些珍貴資源。這就是經過設置鉤子函數作到的,它會在應用程序主線程終止前被調用。對應API
是Runtime.getRuntime().addShutdownHook(thread)
。
下面我將在linux
上演示鉤子函數的用處。MyApp.java
表示個人應用程序:
public class MyApp{
public static void main(String[] args){
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
//release resource here, like socket,connection etc
System.out.println("releasing resources...");
})
);
while(true){
// start a service
}
}
}
複製代碼
經過addShutdownHook
設置的線程將在main
線程被外界中斷時調用,好比我在運行java MyApp
時按下了CTRL C
[root@izm5ecexclrsy1gmkl4bgdz ~]# javac MyApp.java
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp
^Creleasing resources...
複製代碼
又好比後臺運行MyApp
,經過kill pid
終止它:
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp &
[1] 14230
[root@izm5ecexclrsy1gmkl4bgdz ~]# jps
14240 Jps
14230 MyApp
[root@izm5ecexclrsy1gmkl4bgdz ~]# kill 14230
[root@izm5ecexclrsy1gmkl4bgdz ~]# releasing resources...
複製代碼
可是kill -9
則不會觸發鉤子程序:
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp &
[1] 14264
[root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java
root 14264 96.3 1.4 2460724 27344 pts/0 Sl 16:03 0:09 java MyApp
root 14275 0.0 0.0 112660 964 pts/0 R+ 16:03 0:00 grep --color=auto java
[root@izm5ecexclrsy1gmkl4bgdz ~]# kill -9 14264
[root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java
root 14277 0.0 0.0 112660 964 pts/0 R+ 16:03 0:00 grep --color=auto java
[1]+ Killed java MyApp
複製代碼
Thread.currentThread().getStackTracke()
獲取當前線程執行到當前方法時棧中的全部棧幀信息,返回StackTraceElement[]
,一個元素就表明一個方法棧幀,能夠經過它得知方法所屬的類、方法名、方法執行到了第幾行
public static void main(String[] args) {
m1();
}
public static void m1() {
m2();
}
private static void m2() {
m3();
}
private static void m3() {
Arrays.asList(Thread.currentThread().getStackTrace()).stream()
.filter(
//過濾掉native方法
stackTraceElement -> !stackTraceElement.isNativeMethod()
).forEach(
stackTraceElement -> {
System.out.println(stackTraceElement.getClassName() + ":" +
stackTraceElement.getMethodName() + "():" +
stackTraceElement.getLineNumber());
}
);
}
複製代碼
因爲Runnable
接口的run
方法並未聲明拋出任何異常,所以在重寫run
時,全部checked exception
都須要咱們手動解決。可是若是拋出unchecked exception
呢,1/0
就是典型的例子,咱們如何捕獲他?
經過thread.setUncheckedExceptionHandler()
可以作到這一點:
public static final int A = 1;
public static final int B = 0;
public static void main(String[] args) {
Thread thread = new Thread(() -> {
int i = A / B;
});
thread.setUncaughtExceptionHandler((t, e) -> {
// t -> the ref of the thread, e -> exception
System.out.println(e.getMessage()); /// by zero
});
thread.start();
}
複製代碼
線程組表明一個線程的集合,一個線程組也能夠包含其餘線程組,線程組能夠以樹形結構展開。
在JVM啓動時,會建立一個名爲main
的線程運行main
函數和一個名爲main
的線程組,main
線程的線程組是main
線程組:
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName()); //main
System.out.println(Thread.currentThread().getThreadGroup().getName()); //main
}
複製代碼
建立線程時,若是沒有爲該線程顯式指定線程組,那麼該線程將會拿他的父線程的線程組做爲本身的線程組。
若是建立線程組時沒有顯式指定其父線程組,將會拿當前線程的線程組做爲其父線程組
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
//
});
System.out.println(t1.getThreadGroup().getName()); //main
ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup");
Thread t2 = new Thread(threadGroup, () -> {
//
});
System.out.println(t2.getThreadGroup().getName()); //MyThreadGroup
System.out.println(t2.getThreadGroup().getParent().getName()); //main
}
複製代碼
threadGroup.list()
方法可以打印線程組中存活線程的信息,可用於debug
ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup");
new Thread(threadGroup, () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(threadGroup, () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
threadGroup.list();
java.lang.ThreadGroup[name=MyThreadGroup,maxpri=10]
Thread[Thread-0,5,MyThreadGroup]
Thread[Thread-1,5,MyThreadGroup]
複製代碼
更多API
你們可查看官方文檔。
工做線程應該不斷輪詢任務隊列是否有任務可作,有則拿來執行,無則等待外界提交。而後還要爲外界提供終止當前線程的stop
,其採用的是利用共享狀態變量的方式並使用volatile
修飾使得外界的終止操做當即對當前工做線程可見。
public class Worker implements Runnable {
private volatile boolean stop;
private LinkedList<Runnable> taskQueue;
private Thread currentThread;
public Worker(LinkedList<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
@Override
public void run() {
currentThread = Thread.currentThread();
Runnable task = null;
OUTER:
while (!stop) {
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" has been interrupted");
break OUTER;
}
}
task = taskQueue.removeFirst();
taskQueue.notifyAll();
}
if (task != null) {
task.run();
}
}
}
public void interrupt() {
if (currentThread != null) {
currentThread.interrupt();
}
}
public void stop() {
stop = true;
}
}
複製代碼
public class ThreadPool {
private static final int DEFAULT_THREAD_COUNT = 10;
private int threadCount;
private LinkedList<Worker> workQueue;
private LinkedList<Runnable> taskQueue;
public ThreadPool() {
this(DEFAULT_THREAD_COUNT);
}
public ThreadPool(int size) {
this.threadCount = size;
this.workQueue = new LinkedList<>();
this.taskQueue = new LinkedList<>();
init(size);
}
//建立並啓動count個線程
private void init(int count) {
if (count <= 0) {
throw new IllegalArgumentException("thread pool size must greater than zero");
}
for (int i = 0; i < count; i++) {
Worker worker = new Worker(taskQueue);
Thread thread = new Thread(worker, "ThreadPool-" + i);
thread.start();
workQueue.add(worker);
}
}
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task);
taskQueue.notifyAll();
}
}
public int getThreadCount() {
return threadCount;
}
public int getTaskCount() {
return taskQueue.size();
}
//對wait中的線程調用stop,他也沒法輪詢該變量而退出循環
//所以對於wait中的工做線程直接中斷它,而正在執行的線程則等他本身輪詢到stop而退出
public void shutdown() {
synchronized (taskQueue) {
for (Worker worker : workQueue) {
worker.stop();
worker.interrupt();
}
}
System.out.println("thread pool destroyed");
}
}
複製代碼
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool();
for (int i = 0; i < 40; i++) {
int number = i;
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "start execute task-" + number);
try {
Thread.sleep(new Random(System.currentTimeMillis()).nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(5000);
threadPool.shutdown();
}
}
複製代碼
線程池的工做隊列不該該無限大,若是不注意的或可能會致使OOM,所以在任務隊列中的任務數到達必定數目時應對提交的任務採起拒絕策略。
這裏應該用策略模式,策略接口:
public interface RefusePolicy {
void refuse() throws Exception;
}
複製代碼
簡單任務數過大就拋異常的策略:
public class DiscardRefusePolicy implements RefusePolicy {
public class TaskExceededException extends Exception {
public TaskExceededException(String message) {
super(message);
}
}
@Override
public void refuse() throws TaskExceededException {
throw new TaskExceededException("task has exceeded the taskSize of thread poll");
}
}
複製代碼
改造execute
方法:
private static final int DEFAULT_THREAD_COUNT = 10;
private static final RefusePolicy DEFAULT_REFUSE_POLICY = new DiscardRefusePolicy();
private static final int DEFAULT_TASK_SIZE = 200;
private int threadCount;
private LinkedList<Worker> workQueue;
private LinkedList<Runnable> taskQueue;
private int maxTaskSize;
private RefusePolicy refusePolicy;
public ThreadPool() {
this(DEFAULT_THREAD_COUNT, DEFAULT_TASK_SIZE, DEFAULT_REFUSE_POLICY);
}
public ThreadPool(int size, int maxTaskSize, RefusePolicy refusePolicy) {
this.threadCount = size;
this.maxTaskSize = maxTaskSize;
this.workQueue = new LinkedList<>();
this.taskQueue = new LinkedList<>();
this.refusePolicy = refusePolicy;
init(size);
}
public void execute(Runnable task) throws Exception {
synchronized (taskQueue) {
if (taskQueue.size() >= maxTaskSize) {
refusePolicy.refuse();
return;
}
taskQueue.add(task);
taskQueue.notifyAll();
}
}
複製代碼
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool();
for (int i = 0; i < 300; i++) {
int number = i;
try {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "start execute task-" + number);
try {
Thread.sleep(new Random(System.currentTimeMillis()).nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
System.out.println("task-" + i + " execution error : " + e.getMessage());
}
}
Thread.sleep(5000);
threadPool.shutdown();
}
複製代碼