java從誕生開始就明智的選擇了內置對多線程的支持,這使得java語言相比同一時期的其餘語言具備明顯的優點。線程做爲操做系統調度的最小單元,多個線程可以同時執行,這將顯著提高程序的性能,在多核環境中表現的更加明顯。可是,過多的建立線程和對線程的不當管理也容易形成問題。本章將着重介紹java併發編程的基礎知識,從啓動一個線程到線程間不一樣的通訊方式,最後經過簡單的線程池示例以及應用(簡單的Web服務器)來串聯本章所介紹的內容。html
1.線程簡介java
1.1 什麼是線程web
現代操做系統中在運行一個程序時,會爲其建立一個進程。例如,啓動一個java程序,操做系統就會建立一個java進程。如今操做系統調度的最小單元就是線程,也叫輕量級進程(Light Weight Process),在一個進程中能夠建立多個線程,這些線程都擁有各自的計數器、堆棧和局部變量等屬性,而且可以訪問共享的內存變量。處理器在這些線程上高速切換,讓使用者感受到這些線程在同時進行。算法
一個java程序從main()方法開始執行,而後按照既定的代碼邏輯執行,看似沒有其餘線程參與,但實際上java程序天生就是多線程程序,由於執行main()方法的是一個名稱爲main的線程,下面使用JMX來查看一個普通的java程序包含哪些線程sql
public static void main(String[] args) { //使用java線程管理MXbean ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); //不須要獲取同步的monitor和synchronizer信息,進獲取線程和線程堆棧信息 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); for (ThreadInfo threadInfo : threadInfos) { System.out.println("["+threadInfo.getThreadId()+"]"+threadInfo.getThreadName()); } } // [7]JDWP Command Reader // [6]JDWP Event Helper Thread // [5]JDWP Transport Listener: dt_socket // [4]Signal Dispatcher 分發處理髮送給jvm信號的線程 // [3]Finalizer 調用對象finalize方法的線程 // [2]Reference Handler 清除reference 的線程 // [1]main main線程,用戶程序入口
能夠看到,一個java程序的運行不只僅是面()方法的運行,而是main線程和多個其餘線程的同時執行。數據庫
1.2 爲何要使用多線程編程
執行一個簡單的「Hello word!」,卻啓動了那麼多「無關」線程,是否是把簡單的問題複雜化了?固然不是,由於真確的使用多線程,總可以給開發人員帶來顯著的好處,而使用多線程的緣由主要有如下幾點:瀏覽器
1.更多的處理器核心tomcat
隨着處理器上的核心數量愈來愈多,以及超線程技術的普遍運用,如今大多數計算機都比以往更加擅長並行計算,而處理器性能的提高方式,也從更高的主頻向更多的核心發展。如何利用好處理器上的核心也成了如今的主要問題安全
線程是大多數操做系統調度的基本單元,一個程序做爲一個進程來運行,程序運行過程當中可以建立多個線程,而一個線程在一個時刻只能運行在一個處理器核心上。試想一下,一個單線程程序在運行是隻能使用一個處理器核心,那麼再多的處理器核心加入也沒法顯著提高該程序的執行效率。相反,若是改程序使用多線程技術,將計算邏輯分配到多個處理器核心上,就會顯著減小程序的處理時間,而且隨着更多處理器核心的加入而變得更有效率。
2.更快的響應時間
有事咱們會編寫一些較爲複雜的代碼(這的複雜代碼不是說複雜的算法,而是複雜的業務邏輯)。例如,一筆訂單的建立,他包括插入訂單數據、生成訂單快照、發送郵件通知買家和記錄貨品銷售數量等。用戶從單擊「訂購」按鈕開始,就要等待這些操做所有完成才能看到定購成功的結果。可是這麼多業務操做,如何可以讓其跟快的完成呢?
在上面的場景中,可使用多線程技術,即將數據一致性不強的操做派發個其餘線程處理(也可使用消息隊列),如生成訂單快照、發送郵件等這樣作的好處就是想用用戶請求的線程可以儘量的處理完成,縮短了響應時間,提高了用戶體驗。
3.更好的編程模型
java爲多線程模型提供了良好、考究而且一致的編程模型,是開發人員可以更加專一於問題的解決,即爲所遇到的問題創建合適的模型,而不是絞盡腦汁的考慮如何將其多線程化。一旦開發人員創建好模型,稍作修改老是可以方便的映射到java提供的多線程編程模型上。
1.3 線程優先級
如今操做系統基本採用時分的形式調度運行的線程,操做系統會分出一個時間片,線程會分配到若干時間片,當線程的時間片用完了就發生線程的調度,並等待下次分配。線程分配到的時間片多少也就決定了線程使用處理器資源的多少,而線程優先級就是決定線程須要多或者少分配一些處理資源的線程屬性。
在java線程中,經過一個整型成員變量priority來控制優先級,優先級的範圍1~10,默認是5,在線程構建的時候能夠經過setPriority(int)方法來修改優先級,優先級高的線程分配時間片的數量要多於優先級低的線程。設置線程優先級是,針對頻繁阻塞(休眠或者I/O操做)的現場須要設置較高的優先級,而偏重計算(須要較多cpu時間或者片預算)的線程則設置較低的優先級,確保處理器不會被獨佔。在不一樣jvm以及操做系統上,線程規劃存在差別,有些操做系統甚至會忽略線程優先級的設定。
public class Priority { private static volatile boolean notStart = true; private static volatile boolean notEnd = true; public static void main(String[] args) throws InterruptedException { ArrayList<Job> jobs = new ArrayList<>(); for (int i = 0; i < 10; i++) { int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY; Job job = new Job(priority); jobs.add(job); Thread thread = new Thread(job,"thread:"+i); thread.setPriority(priority); thread.start(); } notStart = false; TimeUnit.SECONDS.sleep(10); notEnd = false; for (Job job : jobs) { System.out.println("job priority :"+job.priority+",count : "+job.jobCount); } } static class Job implements Runnable{ private int priority; private long jobCount; public Job(int priority) { this.priority = priority; } @Override public void run() { while (notStart){ Thread.yield(); } while (notEnd){ Thread.yield(); jobCount++; } } } }
job priority :1,count : 9225351
job priority :1,count : 9103654
job priority :1,count : 9276681
job priority :1,count : 9197398
job priority :1,count : 9292870
job priority :10,count : 9330619
job priority :10,count : 9238517
job priority :10,count : 9389986
job priority :10,count : 9340567
job priority :10,count : 8984817
從輸出能夠看到線程的優先級沒有生效,優先級1和優先級10的Job計數器的結果很是先進,沒有明顯差距,這表示程序正確性不能依賴優先級高低。
1.4 線程的狀態
java線程在運行的生命週期可能處於的6中狀態,在給定的一個時刻,線程只能處於其中的一個狀態
public class ThreadState { public static void main(String[] args) { new Thread(new TIMEWaiting(),"timeWatingThread").start(); new Thread(new Waiting(),"watingThread").start(); //使用兩個block線程,一個獲取鎖成功,另外一個阻塞 new Thread(new Blocked(),"blockedThread-1").start(); new Thread(new Blocked(),"blockedThread-2").start(); } //該線程不斷地進行睡 static class TIMEWaiting implements Runnable{ @Override public void run() { while (true){ SleepUtils.second(100); } } } // 該線程在 waiting。class實例上等代 static class Waiting implements Runnable{ @Override public void run() { while (true){ synchronized (Waiting.class){ try { Waiting.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } // 改線成在Blocked。class 實例上加鎖後,不會釋放鎖 static class Blocked implements Runnable{ @Override public void run() { synchronized (Blocked.class){ while (true){ SleepUtils.second(100); } } } } static class SleepUtils{ public static final void second(long seconds){ try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } } }
運行改實例代開中斷或者命令提示符 輸入 jps
4067 Launcher
4068 ThreadState
4103 Jps
3983
jstack PID
線程建立以後,調用start()方法開始執行。當線程執行wait()方法以後,線程進入等待狀態。進入等待狀態的線程須要依靠其餘線程的通知纔可以返回到運行狀態,而超時等待狀態至關於在等待方法的基礎上增長了超時限制,也就是超時時間到達將會返回運行狀態。當線程調用同步方法時,在沒有獲取到鎖的狀況下,線程將會進入到阻塞狀態,線程在執行Runnable的run() 方法以後將進入到終止狀態,
1.5 Daemon線程
Daemon線程是一種支持線程,由於它主要被用做程序中後臺調度以及支持性工做,這意味着,當一個java虛擬機中不存在非Daemon線程的時候,java虛擬機將會推出,能夠經過Thread.setDeamon(true)將線程設置爲Daemon線程,當java虛擬機退出時,Daemon線程中的finally塊並不必定執行
public class Daemon { public static void main(String[] args) { Thread thread = new Thread(new DaemonRunner(), "daemonRunner"); thread.setDaemon(true); thread.start(); } static class DaemonRunner implements Runnable{ @Override public void run() { try { ThreadState.SleepUtils.second(10); }catch (Exception e){ System.out.println("daemonThread finally run ."); } } } }
運行Daemon程序,能夠看到在終端或者命令符上沒有任何的輸出。main線程(非Daemon線程)在啓動了DaemonRunner以後隨着main方法執行完畢而終止,而此時java虛擬機中已經沒有非Daemon線程,虛擬機須要退出。java虛擬機中的全部Daemon線程都須要當即終止,所以DaemonRunner當即終止,可是DaemonRunner中的finally塊並無執行。
在建立Daemon線程時,不能依靠Finally快中的內容來確保執行關閉或清理資源的邏輯
2.啓動和終止線程
2.1 構造線程
在運行線程以前手下要構造一個線程對象,線程對象在構造的時候須要提供線程所須要的屬性,如線程所屬的線程組,線程優先級,是不是Daemon線程等信息
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) { throw new NullPointerException("name cannot be null"); } this.name = name; //當前線程就是該線程的父線程 Thread parent = currentThread(); this.group = g; //將Daemon。priority屬性設置爲父線程的對應屬性 this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); this.target = target; setPriority(priority); // 將父線程的inheritThreadLocal複製過來 if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* 分配一個線程id */ tid = nextThreadID(); }
一個新構造的線程對象是由其parent線程來進行空間分配的,而child線程繼承了parent是否爲Daemon、優先級和加載資源的contenxtClassLoader以及可繼承的ThreadLocal,同時還會分配一個惟一id來標識這個Child線程。至此一個可以運行的線程對象就初始化好了,在堆內存中等待運行。
2.2 啓動線程
線程對象在初始化完成以後,調用start()方法就能夠啓動這個線程。線程Start()方法的含義是:當前線程(即parent線程同步告知java虛擬機,只要線程劃器空閒,應當即啓動調用start()方法的線程)
2.3 理解中斷
中斷能夠理解爲線程的一個標識位屬性,他表示一個運行中的線程是否被其餘線程進行了中斷,中斷比如其餘線程對該線程打了一個招呼,其餘線程經過調用該線程的interupt()方法對其進行了中斷操做。
線程經過檢查自身時候被中斷來進行響應,線程經過方法isInterrupted()來進行判斷是否被中斷,也能夠調用靜態方法Thread.interrupted()對當前線程的中斷標識位進行復位,若是該線程已經處於終結狀態,即便線程被中斷過,即便該線程被中斷過,在調用該線程對象的isInterrupted()時依舊會返回false。
從java的API中能夠看到,許多聲明拋出InterruptedException的方法(例如Thread。sleep(Long millis))這些方法在拋出InterruptedException以前,java虛擬機會想講線程的中斷標識位清除,而後拋出InterruptedException,此時調用IsIterrupted()方法將會返回false。
public class Interrupted { public static void main(String[] args) throws InterruptedException { //SleepThread不停的嘗試失眠 Thread sleepThread = new Thread(new SleepRunner(), "SleepThread"); sleepThread.setDaemon(true); //busyThread 不停的運行 Thread busyThread = new Thread(new BusyRunner(), "busyThread"); busyThread.setDaemon(true); sleepThread.start(); busyThread.start(); //休眠5秒,讓sleepThread和busyThread充分運行 TimeUnit.SECONDS.sleep(5); sleepThread.interrupt(); busyThread.interrupt(); System.out.println("sleepThread interrupted is:"+sleepThread.isInterrupted()); System.out.println("busyThread interrupted is:"+busyThread.isInterrupted()); SleepUtils.second(2); } static class SleepRunner implements Runnable{ @Override public void run() { while (true){ SleepUtils.second(10); } } } static class BusyRunner implements Runnable{ @Override public void run() { while (true){} } } } //輸出結果 sleepThread interrupted is:false busyThread interrupted is:true java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at 多線程併發的藝術.併發編程的挑戰1.SleepUtils.second(SleepUtils.java:25) at 多線程併發的藝術.併發編程的挑戰1.Interrupted$SleepRunner.run(Interrupted.java:45) at java.lang.Thread.run(Thread.java:748)
從結果能夠看出,拋出InterruptedException 的線程SleepThread,其中斷標識位被清除了,而一直忙碌運做的線程BusyThread,中斷標識位沒有被清除
2.4 過時的suspend()、resume()、stop()
你們對於CD機確定不陌生,若是把它播放音樂比做一個線程運做,那麼對音樂的播放作出暫停、恢復和中止操做對應的線程Thread的API的supend()/resume()/和stop()。
public class Deprecated { public static void main(String[] args) throws InterruptedException { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); Thread thread = new Thread(new Runner(), "printThread"); thread.setDaemon(true); thread.start(); TimeUnit.SECONDS.sleep(3); //將thread 進行暫停,輸出內容工做中止 thread.suspend(); System.out.println("main supend thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); thread.resume(); System.out.println("main resume thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); thread.stop(); System.out.println("main stop thread at "+simpleDateFormat.format(new Date())); TimeUnit.SECONDS.sleep(3); } static class Runner implements Runnable{ @Override public void run() { DateFormat format = new SimpleDateFormat("HH:mm:ss"); while (true){ System.out.println(Thread.currentThread().getName()+"run at"+format.format(new Date())); SleepUtils.second(1); } } } } printThreadrun at22:17:37 printThreadrun at22:17:38 printThreadrun at22:17:39 main supend thread at 22:17:40 main resume thread at 22:17:43 printThreadrun at22:17:43 printThreadrun at22:17:44 printThreadrun at22:17:45 main stop thread at 22:17:46
不建議使用的緣由有:以suspend()方法爲例,在調用後,線程不會釋放已經佔有的資源(好比鎖),而是佔有着資源進入睡眠狀態,這樣容易已發死鎖問題。一樣,stop()方法在終結一個線程時不會保證線程的資源正常釋放,一般是沒有給予線程完成資源釋放工做的機會,所以會致使程序可能在不肯定狀態下。
2.5 安全的終止線程
2.3中提到的中斷狀態是線程的一個標識位,而中斷操做是一種簡便的線程間交互方式,而這種交互方式最適合用來取消或中止任務。除了中斷之外,還能夠利用一個Boolean變量來控制是否須要中止任務並終止任務
public class Shutdown {
public static void main(String[] args) throws InterruptedException { Runner one = new Runner(); Thread countThread = new Thread(one, "countThread"); countThread.start(); //睡眠一秒 mian 線程對CountThread 進行中斷,使countThread可以感知中斷而結束 TimeUnit.SECONDS.sleep(1); countThread.interrupt(); Runner two = new Runner(); countThread = new Thread(two, "countThread"); countThread.start(); TimeUnit.SECONDS.sleep(1); two.cancel(); } private static class Runner implements Runnable{ private long i; private volatile boolean on = true; @Override public void run() { while(on && !Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i ="+i); } public void cancel(){ on = false; } } } Count i =753158205
Count i =769424811
示例在執行過程當中,main線程經過中斷操做和cancel()方法都可使CountThread線程終止,這種經過標識或者中斷操做的方式可以使線程在終止時有機會去清理資源,而不是武斷色將線程中止,所以這種終止線程的作法顯得更加安全優雅。
3.線程間通訊
線程開始運行,擁有本身的棧空間,就如同一個腳本同樣,按照既定的代碼一步一步的執行,直至終止,可是每個運行的線程,若是多個線程可以相互配合完成工做,這將會帶來巨大價值。
3.1 volatile和synchronized關鍵字
java支持多個線程同時訪問一個對象或者對象的成員變量,因爲每一個線程能夠擁有這個變量的拷貝(雖然對象以及成員變量分配的內存是在共享內存中的,可是每一個執行的線程仍是能夠擁有一份拷貝,這樣作的目的是加速程序的執行,這是現代多核處理器的一個顯著特性),因此程序在執行過程當中,一個線程看到的變量並不必定是最新的。
關鍵字volatile能夠用來修飾字段(成員變量),就是告知程序任何對該變量的訪問均需喲啊從共享內存中獲取,而對他的改變必須同步刷新回共享內存,他能保證全部線程對變量訪問的可見性。
舉個列子,定義一個表示程序是否運行的成員變量Boolean on= true,那麼另外一線程能對他執行關閉動做(on = false),這裏涉及多個線程對變量的訪問,所以須要將其定義成爲 volatile Boolean on= true,這樣其餘線程對他進行改變時,可讓全部線程感知到變化,由於全部對on變量的訪問和修改都須要以共享內存爲準,可是,過多的使用volatile是沒必要要的,由於他會下降程序的執行效率。
關鍵字synchronized能夠修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。
public class Synchronized { public static void main(String[] args) { //對Synchronized class對象進行加鎖 synchronized (Synchronized.class){} m(); } private static synchronized void m() { } }
public static void main(java.lang.String[]); descriptor: ([Ljava/lang/String;)V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=3, args_size=1 0: ldc #2 // class 多線程併發的藝術/併發編程的挑戰1/Synchronized 2: dup 3: astore_1 4: monitorenter //監視器進入,獲取鎖 5: aload_1 6: monitorexit //監視器退出,釋放鎖 7: goto 15 10: astore_2 11: aload_1 12: monitorexit 13: aload_2 14: athrow 15: invokestatic #3 // Method m:()V 18: return public static synchronized void m(); descriptor: ()V
//方法修飾符 表示 public static synchronized void m()
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=0, locals=0, args_size=0 0: return
上面class信息中,對於同步代碼塊的實現使用了monitorenter和monitorexit指令,而同步方法則是依靠方法修飾符上的ACC_SYNCHRONIZED來完成的。不管採用哪一種方式,其本質是對一個對象的監視器(moniter)進行獲取,而這個獲取過程實排他的,也就是同一時刻只能有一個線程獲取到由synchronized鎖保護對象的監視器。
任意一個對象都擁有本身的監視器,當這個對象由同步塊或者這個對象的同步方法調用時,執行方法的線程必須先獲取到該對象的監視器才能進入同步塊或者同步方法,而沒有獲取到監視器(執行該方法)的線程將會被阻塞再同步代碼塊和同步方法的入口處,進入BLOCKED狀態。
該圖能夠看出,任意線程對Object的訪問,首先要得到Object的監視器,若是獲取失敗,該線程就進入同步狀態,線程狀態變爲BLOCKED,當Object的監視器佔有者釋放後,在同步隊列中得線程就會有機會從新獲取該監視器。
3.2 等待/通知機制
一個線程修改了一個對象的值,而另外一線程感知到了變化,而後進行相應的操做,整個過程開始於一個線程,而最終執行又是另外一個線程。前者是生產者,後者就是消費者,這種模式隔離了「作什麼(what)」和「怎麼作(How)」,在功能層面上實現瞭解耦,體系結構上具有了良好的伸縮性,可是在java語言中如何實現相似的功能呢?
簡單的方法是讓消費者線程不斷地循環檢查變量是否符合預期,以下面代碼所示,在while循環中設置不知足的條件,若是條件知足則退出while循環,從而完成消費者的工做.
while (value != desire){ Thread.sleep(1000); } doSomething();
上面這段僞代碼在條件不知足時就睡眠一段時間,這樣作的目的是爲了防止過快的"無效"嘗試,這種方式看似可以解實現所需的功能,可是存在的以下問題。
1)難以確保及時性。在睡眠時,基本不消耗處理器資源,可是若是睡的太久,就不能及時發現條件已經變化,也就是及時性難以保證。
2)難以下降開銷。若是下降睡眠時間,好比休眠1毫秒,這樣消費者能更加迅速地發現條件的變化,可是卻可能消耗更多的處理器資源,形成了無故的浪費。
以上兩個問題,看似矛盾難以調和,可是java經過內置的等待/通知機制可以很好地解決這個矛盾並實現了所需的工能。
等待/通知的相關方法是任意java對象都具有的,由於這些方法被定義在全部對象的超類java.lang.object上。
等待/通知機制,是指一個線程A調用對象O的wait()方法進入了等待狀態,而另外一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知後從對象O的wait()方法返回,進而執行後續操做。上述兩個線程經過對象O來完成交互,而對象的wait()和notify()/notifyAll()的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做
穿建了兩個線程——WaiThread和NotifyThread,前者檢查flag值是否爲false,若是符合要求,進行後續操做,不然在lock上等待,後者在睡眠了一段時間後對lock進行通知
public class WaitNotify { static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) { Thread waitThread = new Thread(new Wait(), "waitThread"); waitThread.start(); SleepUtils.second(1); Thread notifyThread = new Thread(new Notify(), "notifyThread"); notifyThread.start(); } static class Wait implements Runnable{ @Override public void run() { //加鎖 擁有lock的 Moniter synchronized (lock){ //當條件不知足時,繼續wait同時釋放lock鎖 while (flag){ try { System.out.println(Thread.currentThread()+ "flag is true. wait@"+new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //條件知足時,完成工做 System.out.println(Thread.currentThread()+"flag is false. running @"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); } } } static class Notify implements Runnable{ @Override public void run() { //加鎖 擁有lock的 Moniter synchronized (lock){ //獲取lock的鎖,而後進行通知,通知時不會釋放lock的鎖 //直到當前線程釋放了lock後,waitThread才能從wait方法中返回 System.out.println(Thread.currentThread()+"hold lock .notify @"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.notifyAll(); flag = false; SleepUtils.second(5); } //再次加鎖 synchronized (lock){ System.out.println(Thread.currentThread()+"hold lock again. sleep@"+ new SimpleDateFormat("HH:mm:ss").format(new Date())); SleepUtils.second(5); } } } }
Thread[waitThread,5,main]flag is true. wait@20:38:13
Thread[notifyThread,5,main]hold lock .notify @20:38:14
Thread[notifyThread,5,main]hold lock again. sleep@20:38:19
Thread[waitThread,5,main]flag is false. running @20:38:24
上述第3行和第4行輸出的順序可能會互換,而上述例子主要說明了調用wait(),notify()以及notifyAll()是須要注意的細節,以下:
1)使用wait(),notify()和notifyAll()時須要先對調用對象加鎖。
2)使用wait()方法後,線程的狀態由RUNNING變爲WAITING,並將當前線程放置到對象的等待隊列。
3)notify()和notifuAll()方法調用後,等待線程依舊不會從wait()返回,須要調用notify()或notifyAll()的線程釋放鎖以後,等待線程纔有機會wait()返回。
4)notify()方法將等待隊列中的一個等待線程從等待隊列中移到同步隊列中,而totifyAll()方法則是將等待隊列中全部線程所有移到同步隊列,被移動的線程狀態由WAITING變爲BLOCKED。
5)從wait()方法返回的前提是得到了調用對象的鎖。
從上述細節中能夠看到,等待/通知機制依託於同步機制,其目的就是確保等待線程從wait()方法返回時可以感知到通知線程對變量作出的修改、
WaitThread首先獲取了對象鎖,而後調用對象的wait()方法,從而放棄了鎖並進入了對象的等待隊列WaitQueue中,進入等待狀態。因爲WaitThread釋放了對象的鎖,NotifyThread隨後獲取了對象的鎖,並調用了對象的notify()方法,將WaitThread從WaitQueue移到SynchronizedQueue中,此時WaitThread的狀態變爲阻塞狀態。NotifyThread釋放了鎖以後,WaitThread再次獲取到鎖並從wait()方法返回繼續執行。
3.3 等待/通知經典案例
從3.2中的WaitNotify實例中能夠提煉出等待/通知的經典範式,該範式分爲兩部分,分別針對等待方(消費者)和通知這(生產者)
等待者遵循以下原則
1)獲取對象鎖
2)若是條件不知足,那麼調用對象的wait()方法,被通知後仍要檢查條件
3)條件知足則執行對應的邏輯
對應的僞代碼
synchronized(對象){
while(條件不知足){
對象.wait();
}
對應的處理邏輯
}
通知方遵循以下原則
1)獲取對象鎖
2)改變條件
3)通知全部等待的對象的線程
synchronized(對象){
改變條件
對象.notifyAll();
}
3.4 管道輸入/輸出流
管道輸入/輸出流和普通的文件輸入/輸出流或者網絡輸入/輸出流不一樣之處在於,它主要用於線程之間的數據傳輸,而傳輸的媒介爲內存。
管道輸入/輸出流主要包括了以下4種具體的實現:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前兩種面相字節,然後兩中面相字符
建立了printThread,它用來接受main線程的輸入,任何main下城的輸入都可經過pipedWriter寫入,而printThread在另外一端經過oioedReader將內容讀出並打印。
public class Piped { public static void main(String[] args) throws IOException { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); //將輸出流和輸入流進行鏈接,不然在使用時會拋出 IOException out.connect(in); Thread printThread = new Thread(new Print(in), "printThread"); printThread.start(); int receive = 0; try { while ((receive = System.in.read())!= -1){ out.write(receive); } }finally { out.close(); } } static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int receive = 0; try { while ((receive = in.read() )!= -1){ System.out.println((char) receive); } } catch (IOException e) { e.printStackTrace(); } } } }
運行該示例,輸入一組字符串,能夠看到被printThread進行了原樣輸出
Repeat my words
Repeat my words
對於piped類型的流,必須先要進行綁定,也就是調用connect()方法,若是沒有將輸入/輸出流綁定起來,對於該流的訪問將會拋出異常。
3.5 Thread.jion()
若是線程A 執行了thread.jion()語句,其含義是:當前線程A等待thread線程終止以後才從thread.jion() 返回。線程Thread除了提供jion()方法以外,還提供了jion(long millis)和jion(long millis,int nanos)兩個具有超時時間裏沒有終止,那麼將會從該超時方法中返回。
在代碼示例中,建立了10個線程,編號0~9,每一個線程調用前一個線程的join()方法,也就是線程0結束了,線程1才能從jion()方法中返回,而線程0須要等待main線程結束
public class Jion { public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for (int i = 0; i < 10; i++) { //每一個線程擁有前一個線程的引用,須要等待前一個線程終止,才能從等待中返回 Thread thread = new Thread(new Domino(previous), String.valueOf(i)); thread.start(); previous = thread; } TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName()+"terminate"); } static class Domino implements Runnable{ private Thread thread; public Domino(Thread thread) { this.thread = thread; } @Override public void run() { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"terminate."); } } }
mainterminate
0terminate.
1terminate.
2terminate.
3terminate.
4terminate.
5terminate.
6terminate.
7terminate.
8terminate.
9terminate.
從上述輸出能夠看到,每個線程終止的前提是前驅線程的終止,每一個線程等待前驅線程終止後,才從join()方法返回,這裏涉及到了等待/通知機制(等待前驅線程結束,接收前驅線程結束通知)
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
當線程終止時,會調用線程自身的notifyAll()方法,會通知全部等待在該線程對象上的線程。能夠看到join()方法的邏輯結構與3.3節中描述的等待/通知經典範式一致,即加鎖、循環和處理邏輯3個步驟。
3.6 ThreadLocal的使用
ThreadLocal,即線程變量,是一個以threadLocal對象爲鍵、任意對象爲值的存儲結構。這個結構被附帶在線程上,也就是說一個線程能夠根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值。
能夠經過set(T)方法來設置一個值,在當前線程下再經過get()方法獲取到原先設置的值。
在代碼清單4-15所示的例子中,構建一個經常使用的profiler類。它具備begin()和end()兩個方法,而end()方法返回begin()方法調用開始到end()方法調用溼的時間差,單位是毫秒。
public class Profiler { private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){ @Override protected Long initialValue(){ return System.currentTimeMillis(); } }; public static final void begin(){ TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final Long end(){ return System.currentTimeMillis()-TIME_THREADLOCAL.get(); } public static void main(String[] args) throws Exception { begin(); TimeUnit.SECONDS.sleep(1); System.out.println("cost: "+end()+"mills"); } }
cost: 1004mills
Profiler能夠被複用在方法調用耗時統計的功能上,再方法的入口前執行begin()方法,在方法調用後end() 方法,好處就是兩個方法的調用不用在一個方法或者類中,好比在AOP(面向方面編程)中,能夠在方法調用前的切入點執行begin()方法,而在方法調用後的切入點執行end()方法,這樣依舊能夠得到方法的執行耗時。
4.線程應用實例
4.1 等待超時模式
開發人員常常會遇到這樣的方法調用經常使用場景:調用一個方法時等待一段時間(通常來講是給定一個時間段),若是該方法可以在給定的時間段以內獲得結果,那麼將結果馬上返回,反之,超時返回默認結果。
前章介紹了等待、通知的經典範式,即加鎖、條件循環和處理邏輯3個步驟,而這種範式沒法作到超時等待。而超時等待的加入,只須要對經典範式作出很是小的改動,改動內容以下:
假設超時時間段是T,那麼能夠推斷出在當前時間now+T以後就會超時。
定義變量
1.等待持續時間 REMAINING=T
2.超時時間:FUTURE=now +T
這是僅須要wait(REMAINING)便可,在wait(REMAINING)返回以後將會執行:REMAINING = FUTURE -now,若是REMAINING小於等於0,表示已經超時,直接退出,不然將繼續執行wait(REMAINING)。
上述描述等待超時模式的僞代碼以下:
public synchronized Object get(long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remaining = mills; //當超時大於0而且result 返回值不知足要求 while((result == null) && remaining > 0){ wait(remaining); remaining = future - System.currentTimeMillis(); } return result; }
能夠看出,等待超時模式就是在等待/通知範式的基礎上增長了超時控制。這使得該模式相比原有範式更具備靈活性,由於即便方法執行時間過長,也不會"永久阻塞調用者",而時會按照調用者的要求「按時」返回
4.2 一個簡單的數據庫鏈接池示例
咱們使用等待超時模式來構造一個簡答的數據庫鏈接池,在示例模擬從鏈接池中獲取、使用和釋放鏈接的過程,而客戶端獲取鏈接的過程被設定爲超時等待的模式,也就是在1000毫秒內若是沒法獲取到可用的鏈接,將返回給客戶端一個null。設定鏈接池的大小爲10個,而後經過調節客戶端的線程數來模擬沒法獲取鏈接大場景。
首先看一下鏈接池的定義。他經過構造函數初始化鏈接的最大上限,經過一個雙向隊列來維護鏈接,調用fetchConnection(long)方法來指定在多少毫秒內超時獲取鏈接,當鏈接使用完成後,須要調用releaseConnection(Connection)方法將鏈接放回線程池。
public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<>(); public ConnectionPool(int initialSize) { if (initialSize > 0){ for (int i = 0; i < initialSize; i++) { pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection(Connection connection){ if(connection != null){ synchronized (pool){ // 鏈接釋放後須要進行通知,這樣其餘消費者可以感知到鏈接池中已經歸還了一個鏈接 pool.addLast(connection); pool.notifyAll(); } } } public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ if(mills <= 0){ while (pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else { long future = System.currentTimeMillis() + mills; long remaining = mills; while (pool.isEmpty() && remaining >0){ pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection result = null; if(!pool.isEmpty()){ result = pool.removeFirst(); } return result; } } } }
因爲java.sql.connection是一個接口,最終的實現是由數據庫驅動提供方來實現的,考慮到只是個示例,咱們經過動態代理構造了一個Connection,該connection,該connection的代理實現僅僅實在commit()方法調用時休眠100毫秒
public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler{ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("commit")){ TimeUnit.SECONDS.sleep(100); } return null; } } public static final Connection createConnection(){ return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),new Class<?>[]{Connection.class},new ConnectionHandler()); } }
下面經過一個示例來測試建議數據庫鏈接池的工做狀況,模擬客戶端ConnextionRunner獲取、使用、最後釋放鏈接的過程,當它使用鏈接將會增長後去到鏈接的數量,反之,將會增長未獲取到鏈接的數量
public class ConnectionPoolTest { static ConnectionPool connectionPool = new ConnectionPool(10); //保證全部connection 可以同時開始 static CountDownLatch start = new CountDownLatch(1); //main 線程將會等待全部connectionRunner 結束後才能繼續執行 static CountDownLatch end; public static void main(String[] args) throws InterruptedException { int threadCount = 10; end = new CountDownLatch(threadCount); int count = 20; AtomicInteger got = new AtomicInteger(); AtomicInteger notGot = new AtomicInteger(); for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "connectionRunnerThread"); thread.start(); start.countDown(); end.await(); System.out.println("total invoke :"+(threadCount * count)); System.out.println("got connection:"+got); System.out.println("notgot connection :"+notGot); } } static class ConnectionRunner implements Runnable{ int count; AtomicInteger got; AtomicInteger notgot; public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notgot) { this.count = count; this.got = got; this.notgot = notgot; } @Override public void run() { try { start.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (count > 0){ //從線程池中獲取鏈接,若是1000ms 內沒法獲取到,將會返回null //分別統計鏈接獲取的數量got和或獲取到的數量 notgot try { Connection connection = connectionPool.fetchConnection(100); if(connection != null){ try { connection.createStatement(); connection.commit(); }finally { connectionPool.releaseConnection(connection); got.incrementAndGet(); } }else { notgot.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); }finally { count --; } } end.countDown(); } } }
上述示例中使用了CountDownLatch來確保ConnectionRunnerThread可以同時開始執行,而且在所有結束以後,才使main線程從等待狀態中返回。當前設定的場景是10個線程同時運行獲取鏈接池(10個鏈接)中鏈接,經過調整線程數量來觀察未獲取到鏈接的狀況。線程數、總獲取次數、獲取到的數量、未獲取到的數量以及未獲取到的比率。
從表中的數據統計能夠看出,在資源必定的狀況下(鏈接池中的10個鏈接),隨着客戶端線程的逐步增長,客戶端出現超時沒法獲取鏈接的比率不斷升高。雖然客戶端線程在這種超時獲取的模式下回出現鏈接沒法獲取的狀況,可是他可以保證客戶端線程不會一直關在鏈接獲取的操做上,而是「按時」返回,並告知客戶端鏈接後去出現問題,是系統的一種自我保護機制。數據庫鏈接池的設計也能夠複用到其餘的資源獲取的場景,針對昂貴的資源(好比數據庫鏈接池)的獲取都應該加以超時限制。
4.3 線程技術及其示例
對於一個服務端的程序,常常面對的是客戶傳入的短小(執行時間短、工做內容較爲單一)任務,須要服務端快速處理並返回結果。若是服務端每次接受到一個任務,穿建一個線程。而後進行執行,這在原型階段是一個不錯的選擇,可是面對成千上萬的任務遞交進服務器時,若是仍是採用一個任務一個線程的方式,那麼將會建立數以萬記得線程,這不是一個好的選擇。由於這回使操做系統頻繁的進行上下文切換,無端增長系統的負載,而線程的建立和消亡都是須要消費系統資源的,也無疑浪費了系統資源。
線程池技術可以很好的解決這個問題,他預先建立了若干數量的線程,而且不能由用戶直接對線程的建立進行控制,在這個前提下重複使用固定或較爲固定數目的線程來完成任務的執行。這樣作的好處是,一方面消除了頻繁建立和消亡線程的系統資源的開銷,另外一方面,面對過量任務的提交可以平緩的劣化。
下面先看一個簡單的線程池接口定義
public interface ThreadPool<Job extends Runnable>{ //執行一個Job,這個須要實現Runner void execute(Job job); //關閉線程chi void shutdown(); //增長工做者線程 void addWorkers(int num); //減小工做者線程 void removeWorker(int num); //獲得正在等待執行的任務數量 int getJobSize(); }
客戶端能夠經過execute(Job)方法將Job提交入線程池執行,而客戶端自身不用等待Job的執行完成。除了execute(Job)方法之外,線程池接口提供了增大/減小工做者線程以及關閉線程池的方法。這裏工做者線程表明者一個重複執行Job的線程,而每一個有客戶段提交的Job都將進入一個工做隊列中等待工做者線程的處理。
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //線程池最大限制數 private static final int MAX_WORKER_NUMBERS = 10; //線程池默認的數量 private static final int DEFAULT_WORKER_NUMBERS = 5; //線程池最小的數量 private static final int MIN_WORKER_NUMBERS = 1; //這裏是一個工做列表,將會向裏面插入工做 private final LinkedList<Job> jobs = new LinkedList<>(); //工做者列表 private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); //工做者線程的數量 private int workerNum = DEFAULT_WORKER_NUMBERS; //線程編號生成 private AtomicInteger threadNum = new AtomicInteger(); public DefaultThreadPool() { initializWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS :num; initializWokers(workerNum); } //初始化線程工做者 private void initializWokers(int num) { for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet()); thread.start(); } } @Override public void execute(Job job) { if(job != null){ //添加一個工做,而後進行通知 synchronized (jobs){ jobs.addLast(job); jobs.notify(); } } } @Override public void shutdown() { for (Worker worker : workers) { worker.shutdown(); } } @Override public void addWorkers(int num) { synchronized (jobs){ //限制新增的worker 數量不能超過最大值 if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializWokers(num); this.workerNum += num; } } @Override public void removeWorker(int num) { synchronized (jobs){ if(num >= this.workerNum){ throw new IllegalArgumentException("beyond worknum"); } //按照給定數量中止worker int count = 0; while (count < num){ Worker worker = workers.get(count); if(workers.remove(worker)){ worker.shutdown(); count++; } } this.workerNum -= count; } } @Override public int getJobSize() { return jobs.size(); } //工做者,負責消費任務 class Worker implements Runnable{ //是否工做 private volatile boolean running = true; @Override public void run() { while (running){ Job job = null; synchronized (jobs){ while (jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { //感知到外部對workerThread的中斷操做,返回 Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if(job != null){ try { job.run(); }catch (Exception e){ //忽略執行中的exception } } } } public void shutdown(){ running = false; } } }
從線程池的實現能夠看到,當客戶端調用execute(Job)方法時,會不斷地向任務列表jobs中添加job,而每一個工做者線程會不斷從Jobs上取出一個Job進行執行,當jobs爲空時,工做He線程進入等待狀態。
添加一個Job後,對工做隊列jobs調用了其notify() 方法,而不是notifyAll()方法,由於可以肯定有工做者線程被喚醒,這時使用notify()方法將會比notifyAll()方法得到更小的開銷(避免將等待對列中的線程所有移動到阻塞對列).
能夠看到,線程池的本質就是使用了一個線程安全的工做隊列鏈接工做者線程和客戶端線程,客戶端線程將任務放入工做隊列後便返回,而工做者線程則不段的從工做隊列上取出工做並執行。當工做隊列爲空時,全部的工做者線程均等待在工做隊列上,當有客戶段提交了一個任務以後會通知任意一個工做者線程,隨着大量的任務被提交,更多的工做者線程會被喚醒。
4.4 一個基於線程池技術的簡單web服務器
目前的瀏覽器都支持多線程訪問,好比說在請求一個HTML頁面的時候,頁面中包含的圖片資源、樣式資源會被瀏覽器發起併發的獲取,這樣用戶就不會遇到一直等到一個圖片徹底下載完成才能繼續查看文字內容的尷尬狀況
若是web服務器是單線程的,多線程的瀏覽器也沒有用武之地,由於服務端仍是一個請求一個請求的順序處理。所以,大部分web服務器都是支持併發訪問的。經常使用的java Web服務器,如tomcat、jetty,在其處理請求的過程當中都使用到了線程池技術。
下面經過使用前一節中的線程池來構造一個簡單的web服務器,這個web服務器用來處理HTTP請求,目前只能處理簡單的文本和JPG圖片內容。這個WEB服務器使用main線程不斷地接受客戶端Socket的鏈接,將鏈接以及請求提交給線程池處理,這樣使得web服務器可以同時處理多個客戶端請求,
public class SimpleHttpServer { //處理HttpRequest的線程池 static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1); //simpleHttpServer的根路徑 static String basePath; static ServerSocket serverSocket; static int port = 8080; public static void setPort(int port){ if (port > 0){ SimpleHttpServer.port = port; } } public static void setBasePath(String basePath){ if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){ SimpleHttpServer.basePath = basePath; } } //啓動SimpleHttpserver public static void start() throws Exception { serverSocket = new ServerSocket(port); Socket socket = null; while ((socket = serverSocket.accept())!= null){ threadPool.execute(new HttpRequestHandler(socket)); } serverSocket.close(); } static class HttpRequestHandler implements Runnable{ private Socket socket; public HttpRequestHandler(Socket socket) { socket = socket; } @Override public void run() { String line = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); //由相對路徑計算出絕對路徑 String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); //若是請求資源的後綴爲jpg或者ico,則讀取資源並輸出 if(filePath.endsWith("jpg") || filePath.endsWith("ico")){ in = new FileInputStream(filePath); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int i = 0; while ((i = in.read()) != -1){ baos.write(i); } byte[] bytes = baos.toByteArray(); out.println("HTTP/1.1 200 OK"); out.println("Server : Molly"); out.println("content-Type: image/jpeg"); out.println("Content-Length: "+bytes.length); out.println(""); socket.getOutputStream().write(bytes,0,bytes.length); }else { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.println("HTTP/1.1 200 OK"); out.println("Server: Molly"); out.println("Content-Type: text/html; charset=UTF-8"); out.println(""); while ((line = br.readLine()) != null){ out.println(line); } } out.flush(); }catch (Exception e){ out.println("HTTP/1.1 500"); out.println(""); out.flush(); }finally { close(br,in,reader,out,socket); } } private void close(Closeable... closeables) { if (closeables != null){ for (Closeable closeable : closeables) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
在圖中,SimpleHttpeServer在創建了與客戶端的鏈接以後,並不會處理客戶端的請求,而是將其包裝成HttpRequestHandler並交由線程池處理。在線程池中的Worker處理客戶端請求的同時,SimpleHttpServer可以繼續完成後續客戶端鏈接的創建,不會阻塞後續客戶端的請求。
5.本章小結
本章從介紹多線程技術帶來的好處開始,講述瞭如何啓動和終止線程以及線程的狀態,詳細闡述了多線程之間進行通訊的基本方式和等待/通知經典範式。在線程因果給你示例中,使用了等待超時、數據庫鏈接池以及簡單的線程池3個不一樣案例。