咱們要編寫一個Socket應用,監聽指定端口,實現數據包的接收和發送邏輯,這在早期系統間進行數據交互是常用的,這類接口一般須要考慮兩個問題:一個是避免線程阻塞,保證接收的數據儘快處理;二是:接口的穩定性和可靠性問題,數據包很複雜,接口服務的系統也不少,一旦守候線程出現異常就會致使Socket中止,這是很是危險的,那咱們有什麼辦法避免嗎?java
Java1.5版本之後在Thread類中增長了setUncaughtExceptionHandler方法,實現了線程異常的捕捉和處理。可能你們會有一個疑問:若是Socket應用出現了不可預測的異常是否能夠自動重啓呢?其實使用線程異常處理器很容易解決,咱們來看一個異常處理器應用實例,代碼以下: 安全
class TcpServer implements Runnable { // 建立後即運行 public TcpServer() { Thread t = new Thread(this); t.setUncaughtExceptionHandler(new TcpServerExceptionHandler()); t.start(); } @Override public void run() { for (int i = 0; i < 3; i++) { try { Thread.sleep(1000); System.out.println("系統正常運行:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } // 拋出異常 throw new RuntimeException(); } // 異常處理器 private static class TcpServerExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { // 記錄線程異常信息 System.out.println("線程" + t.getName() + " 出現異常,自行重啓,請分析緣由。"); e.printStackTrace(); // 重啓線程,保證業務不中斷 new TcpServer(); } } }
這段代碼的邏輯比較簡單,在TcpServer類建立時即啓動一個線程,提供TCP服務,例如接收和發送文件,具體邏輯在run方法中實現。同時,設置了該線程出現運行期異常(也就是Uncaught Exception)時,由TcpServerExceptionHandler異常處理器來處理異常。那麼TcpServerExceptionHandler作什麼事呢?兩件事:服務器
有了這兩點,TcpServer就能夠穩定的運行了,即便出現異常也能自動重啓,客戶端代碼比較簡單,只須要new TcpServer()便可,運行結果以下:多線程
從運行結果上能夠看出,當Thread-0出現異常時,系統自動重啓了Thread-1線程,繼續提供服務,大大提升了系統的性能。框架
這段程序只是一個示例程序,若要在實際環境中應用,則須要注意如下三個方面:異步
volatile關鍵字比較少用,緣由無外乎兩點,一是在Java1.5以前該關鍵字在不一樣的操做系統上有不一樣的表現,所帶來的問題就是移植性較差;並且比較難設計,並且誤用較多,這也致使它的"名譽" 受損。ide
咱們知道,每一個線程都運行在棧內存中,每一個線程都有本身的工做內存(Working Memory,好比寄存器Register、高速緩衝存儲器Cache等),線程的計算通常是經過工做內存進行交互的,其示意圖以下圖所示:工具
從示意圖上咱們能夠看到,線程在初始化時從主內存中加載所需的變量值到工做內存中,而後在線程運行時,若是是讀取,則直接從工做內存中讀取,如果寫入則先寫到工做內存中,以後刷新到主內存中,這是JVM的一個簡答的內存模型,可是這樣的結構在多線程的狀況下有可能會出現問題,好比:A線程修改變量的值,也刷新到了主內存,但B、C線程在此時間內讀取的仍是本線程的工做內存,也就是說它們讀取的不是最"新鮮"的值,此時就出現了不一樣線程持有的公共資源不一樣步的狀況。oop
對於此類問題有不少解決辦法,好比使用synchronized同步代碼塊,或者使用Lock鎖來解決該問題,不過,Java可使用volatile更簡單地解決此類問題,好比在一個變量前加上volatile關鍵字,能夠確保每一個線程對本地變量的訪問和修改都是直接與內存交互的,而不是與本線程的工做內存交互的,保證每一個線程都能得到最"新鮮"的變量值,其示意圖以下:性能
明白了volatile變量的原理,那咱們思考一下:volatile變量是否可以保證數據的同步性呢?兩個線程同時修改一個volatile是否會產生髒數據呢?咱們看看下面代碼:
class UnsafeThread implements Runnable { // 共享資源 private volatile int count = 0; @Override public void run() { // 增長CPU的繁忙程度,沒必要關心其邏輯含義 for (int i = 0; i < 1000; i++) { Math.hypot(Math.pow(92456789, i), Math.cos(i)); } count++; } public int getCount() { return count; } }
上面的代碼定義了一個多線程類,run方法的主要邏輯是共享資源count的自加運算,並且咱們還爲count變量加上了volatile關鍵字,確保是從內存中讀取和寫入的,若是有多個線程運行,也就是多個線程執行count變量的自加操做,count變量會產生髒數據嗎?想一想看,咱們已經爲count加上了volatile關鍵字呀!模擬多線程的代碼以下:
public static void main(String[] args) throws InterruptedException { // 理想值,並做爲最大循環次數 int value = 1000; // 循環次數,防止形成無限循環或者死循環 int loops = 0; // 主線程組,用於估計活動線程數 ThreadGroup tg = Thread.currentThread().getThreadGroup(); while (loops++ < value) { // 共享資源清零 UnsafeThread ut = new UnsafeThread(); for (int i = 0; i < value; i++) { new Thread(ut).start(); } // 先等15毫秒,等待活動線程爲1 do { Thread.sleep(15); } while (tg.activeCount() != 1); // 檢查實際值與理論值是否一致 if (ut.getCount() != value) { // 出現線程不安全的狀況 System.out.println("循環到:" + loops + " 遍,出現線程不安全的狀況"); System.out.println("此時,count= " + ut.getCount()); System.exit(0); } } }
想讓volatite變量"出點醜",仍是須要花點功夫的。此段程序的運行邏輯以下:
運行結果以下:
循環到:40 遍,出現線程不安全的狀況
此時,count= 999
這只是一種可能的結果,每次執行都有可能產生不一樣的結果。這也說明咱們的count變量沒有實現數據同步,在多個線程修改的狀況下,count的實際值與理論值產生了誤差,直接說明了volatile關鍵字並不能保證線程的安全。
在解釋緣由以前,咱們先說一下自加操做。count++表示的是先取出count的值而後再加1,也就是count=count+1,因此,在某個緊鄰時間片斷內會發生以下神奇的事情:
(1)、第一個時間片斷
A線程得到執行機會,由於有關鍵字volatile修飾,因此它從主內存中得到count的最新值爲998,接下來的事情又分爲兩種類型:
(2)、第二個片斷
這兩個時間片斷執行完畢後,本來指望的結果爲1000,單運行後的值爲999,這表示出現了線程不安全的狀況。這也是咱們要說明的:volatile關鍵字並不能保證線程安全,它只能保證當前線程須要該變量的值時可以得到最新的值,而不能保證線程修改的安全性。
順便說一下,在上面的代碼中,UnsafeThread類的消耗CPU計算是必須的,其目的是加劇線程的負荷,以便出現單個線程搶佔整個CPU資源的情景,不然很難模擬出volatile線程不安全的狀況,你們能夠自行模擬測試。
多線程應用有兩種實現方式,一種是實現Runnable接口,另外一種是繼承Thread類,這兩個方法都有缺點:run方法沒有返回值,不能拋出異常(這兩個缺點歸根究竟是Runnable接口的缺陷,Thread類也實現了Runnable接口),若是須要知道一個線程的運行結果就須要用戶自行設計,線程類自己也不能提供返回值和異常。可是從Java1.5開始引入了一個新的接口Callable,它相似於Runnable接口,實現它就能夠實現多線程任務,Callable的接口定義以下:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
實現Callable接口的類,只是代表它是一個可調用的任務,並不表示它具備多線程運算能力,仍是須要執行器來執行的,咱們先編寫一個任務類,代碼以下:
//稅款計算器 class TaxCalculator implements Callable<Integer> { // 本金 private int seedMoney; // 接收主線程提供的參數 public TaxCalculator(int _seedMoney) { seedMoney = _seedMoney; } @Override public Integer call() throws Exception { // 複雜計算,運行一次須要2秒 TimeUnit.MILLISECONDS.sleep(2000); return seedMoney / 10; } }
這裏模擬了一個複雜運算:稅款計算器,該運算可能要花費10秒鐘的時間,此時不能讓用戶一直等着吧,須要給用戶輸出點什麼,讓用戶知道系統還在運行,這也是系統友好性的體現:用戶輸入即有輸出,若耗時較長,則顯示運算進度。若是咱們直接計算,就只有一個main線程,是不可能有友好提示的,若是稅金不計算完畢,也不會執行後續動做,因此此時最好的辦法就是重啓一個線程來運算,讓main線程作進度提示,代碼以下:
public static void main(String[] args) throws InterruptedException, ExecutionException { // 生成一個單線程的異步執行器 ExecutorService es = Executors.newSingleThreadExecutor(); // 線程執行後的指望值 Future<Integer> future = es.submit(new TaxCalculator(100)); while (!future.isDone()) { // 尚未運算完成,等待200毫秒 TimeUnit.MICROSECONDS.sleep(200); // 輸出進度符號 System.out.print("*"); } System.out.println("\n計算完成,稅金是:" + future.get() + " 元 "); es.shutdown(); }
在這段代碼中,Executors是一個靜態工具類,提供了異步執行器的建立能力,如單線程異步執行器newSingleThreadExecutor、固定線程數量的執行器newFixedThreadPool等,通常它是異步計算的入口類。future關注的是線程執行後的結果,好比沒有運行完畢,執行結果是多少等。此段代碼的運行結果以下所示:
**********************************************......
計算完成,稅金是:10 元
執行時,"*"會依次遞增,表示系統正在運算,爲用戶提供了運算進度,此類異步計算的好處是:
在Java1.5以前,實現多線程比較麻煩,須要本身啓動線程,並關注同步資源,防止出現線程死鎖等問題,在1.5版本以後引入了並行計算框架,大大簡化了多線程開發。咱們知道一個線程有五個狀態:新建狀態(NEW)、可運行狀態(Runnable,也叫做運行狀態)、阻塞狀態(Blocked)、等待狀態(Waiting)、結束狀態(Terminated),線程的狀態只能由新建轉變爲了運行狀態後才能被阻塞或等待,最後終結,不可能產生本末倒置的狀況,好比把一個結束狀態的線程轉變爲新建狀態,則會出現異常,例如以下代碼會拋出異常:
public static void main(String[] args) throws InterruptedException { // 建立一個線程,新建狀態 Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println("線程正在運行"); } }); // 運行狀態 t.start(); // 是不是運行狀態,若不是則等待10毫秒 while (!t.getState().equals(Thread.State.TERMINATED)) { TimeUnit.MICROSECONDS.sleep(10); } // 直接由結束轉變爲雲心態 t.start(); }
此段程序運行時會報java.lang.IllegalThreadStateException異常,緣由就是不能從結束狀態直接轉變爲運行狀態,咱們知道一個線程的運行時間分爲3部分:T1爲線程啓動時間,T2爲線程的運行時間,T3爲線程銷燬時間,若是一個線程不能被重複使用,每次建立一個線程都須要通過啓動、運行、銷燬時間,這勢必增大系統的響應時間,有沒有更好的辦法下降線程的運行時間呢?
T2是沒法避免的,只有經過優化代碼來實現下降運行時間。T1和T2均可以經過線程池(Thread Pool)來縮減時間,好比在容器(或系統)啓動時,建立足夠多的線程,當容器(或系統)須要時直接從線程池中得到線程,運算出結果,再把線程返回到線程池中___ExecutorService就是實現了線程池的執行器,咱們來看一個示例代碼:
public static void main(String[] args) throws InterruptedException { // 2個線程的線程池 ExecutorService es = Executors.newFixedThreadPool(2); // 屢次執行線程體 for (int i = 0; i < 4; i++) { es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } // 關閉執行器 es.shutdown(); }
此段代碼首先建立了一個包含兩個線程的線程池,而後在線程池中屢次運行線程體,輸出運行時的線程名稱,結果以下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
本次代碼執行了4遍線程體,按照咱們以前闡述的" 一個線程不可能從結束狀態轉變爲可運行狀態 ",那爲何此處的2個線程能夠反覆使用呢?這就是咱們要搞清楚的重點。
線程池涉及如下幾個名詞:
咱們首先從線程池的建立提及,Executors.newFixedThreadPool(2)表示建立一個具備兩個線程的線程池,源代碼以下:
public class Executors { //生成一個最大爲nThreads的線程池執行器 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } }
這裏使用了LinkedBlockingQueue做爲隊列任務管理器,全部等待處理的任務都會放在該對列中,須要注意的是,此隊列是一個阻塞式的單端隊列。線程池創建好了,那就須要線程在其中運行了,線程池中的線程是在submit第一次提交任務時創建的,代碼以下:
public Future<?> submit(Runnable task) { //檢查任務是否爲null if (task == null) throw new NullPointerException(); //把Runnable任務包裝成具備返回值的任務對象,不過此時並無執行,只是包裝 RunnableFuture<Object> ftask = newTaskFor(task, null); //執行此任務 execute(ftask); //返回任務預期執行結果 return ftask; }
此處的代碼關鍵是execute方法,它實現了三個職責。
其中此處的關鍵是工做線程的建立,它也是經過new Thread方式建立的一個線程,只是它建立的並非咱們的任務線程(雖然咱們的任務實現了Runnable接口,但它只是起了一個標誌性的做用),而是通過包裝的Worker線程,代碼以下:
private final class Worker implements Runnable { // 運行一次任務 private void runTask(Runnable task) { /* 這裏的task纔是咱們自定義實現Runnable接口的任務 */ task.run(); /* 該方法其它代碼略 */ } // 工做線程也是線程,必須實現run方法 public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } // 任務隊列中得到任務 Runnable getTask() { /* 其它代碼略 */ for (;;) { return r = workQueue.take(); } } }
此處爲示意代碼,刪除了大量的判斷條件和鎖資源。execute方法是經過Worker類啓動的一個工做線程,執行的是咱們的第一個任務,而後改線程經過getTask方法從任務隊列中獲取任務,以後再繼續執行,但問題是任務隊列是一個BlockingQuene,是阻塞式的,也就是說若是該隊列的元素爲0,則保持等待狀態,直到有任務進入爲止,咱們來看LinkedBlockingQuene的take方法,代碼以下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { // 若是隊列中的元素爲0,則等待 while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; } // 等待狀態結束,彈出頭元素 x = extract(); c = count.getAndDecrement(); // 若是隊列數量還多於一個,喚醒其它線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); // 返回頭元素 return x; }
分析到這裏,咱們就明白了線程池的建立過程:建立一個阻塞隊列以容納任務,在第一次執行任務時建立作夠多的線程(不超過許可線程數),並處理任務,以後每一個工做線程自行從任務對列中得到任務,直到任務隊列中的任務數量爲0爲止,此時,線程將處於等待狀態,一旦有任務再加入到隊列中,即召喚醒工做線程進行處理,實現線程的可複用性。
使用線程池減小的是線程的建立和銷燬時間,這對於多線程應用來講很是有幫助,好比咱們經常使用的Servlet容器,每次請求處理的都是一個線程,若是不採用線程池技術,每次請求都會從新建立一個新的線程,這會致使系統的性能符合加大,響應效率降低,下降了系統的友好性。