獲取多線程的方法,咱們都知道有三種,還有一種是實現Callable接口java
當咱們實現Ruannable接口的時候,須要重寫run方法,也就是線程啓動的時候,會自動調用的方法。算法
同理,咱們實現Callable接口,也須要實現call方法,可是這個時候咱們還須要有返回值。這個Callable接口的應用場景通常在於批處理業務,好比轉帳的時候,須要給返回結果的狀態碼回來,表明本次操做成功仍是失敗。數據庫
public class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("進入了Callable方法"); return 1024; } }
而後經過Thread線程,將MyThread實現Callable接口的類包裝起來。小程序
這裏須要用到FutureTask
類,他實現了Runnable接口,類的構造函數須要傳遞一個實現Callable接口的類。緩存
public static void main(String[] args){ // 一、實例化FutureTask類,傳進去一個實現了Callable的類 FutureTask<Integer> futureTask = new FutureTask<>(new MyThread()); // 二、而後在用Thread進行實例化,傳入實現Runnable接口的FutureTask的類 Thread t1 = new Thread(futureTask, "A線程"); t1.start(); // 三、最後經過 futureTask.get() 獲取到返回值 System.out.println("FutureTask的返回值爲:" + futureTask.get()); }
原來咱們的方式是一個main方法冰糖葫蘆似的串下去,引入Callable後,對於執行比較久的線程,能夠單獨新開一個線程進行執行,最後再進行彙總輸出。服務器
若是咱們用
get()
獲取Callable的計算結果,可是若是並無計算完成,會致使阻塞,直到計算完成爲止。也就是說,futureTask.get()
須要放在最後執行,這樣不會致使主線程阻塞。網絡//也可使用下面算法,使用相似自旋鎖的方式來進行判斷是否運行完畢 while(!futureTask.isDone()){ }
多個線程執行一個FutureTask對象的時候,只會計算一次。多線程
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread()); // 開啓兩個線程計算futureTask new Thread(futureTask, "A線程").start(); new Thread(futureTask, "B線程").start();
若是咱們須要兩個線程同時計算任務的話,那麼須要定義兩個FutureTask對象架構
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread); FutureTask<Integer> futureTask2 = new FutureTask<>(new MyThread); // 開啓兩個線程計算futureTask new Thread(futureTask1, "A線程").start(); new Thread(futureTask2, "B線程").start();
線程池作的主要工做就是控制運行的線程的數量,處理過程當中,將任務放入到阻塞隊列中,而後線程建立後,啓動這些任務,若是線程數量超過了最大數量的線程排隊等候,等其它線程執行完畢,再從隊列中取出任務來執行。併發
它的主要特色爲:線程複用、控制最大併發數、管理線程
說白了就是一個線程集合(workerSet)和一個阻塞隊列(workQueue)。當用戶向線程池提交一個任務時,線程池會先將任務放入阻塞隊列中。線程集合中的線程會不斷的從阻塞隊列中獲取任務執行,當阻塞隊列中沒有任務的時候,就會阻塞,直到隊列中有任務了就取出來繼續執行。
任務:客戶。 線程集合:辦理窗口。 阻塞隊列:候客區。
Java中線程池是經過Executor框架實現的,該框架中用到了Executor
,Executors
(輔助工具類),ExecutorService
,ThreadPoolExecutor
這幾個類。
ExecutorService threadPool = Executors.newFixedThreadPool(5);
ExecutorService threadPool = Executors.newSingleThreadExecutor();
ExecutorService threadPool = Executors.newCacheThreadPool();
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 模擬10個用戶來辦理業務,每一個用戶就是一個來自外部請求線程 try { // 循環十次,模擬業務辦理,讓5個線程處理這10個請求 for (int i = 0; i < 10; i++) { final int tempInt = i; threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " 給用戶:" + tempInt + "辦理業務"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); // 用池化技術,必定要記得關閉 }
結果:
咱們可以看到,一共有5個線程,在給10個用戶辦理業務。
咱們經過查看源碼,發現底層都是使用了ThreadPoolExecutor。
由於在單線程池和固定線程池中,線程數量是有限的,所以提交的任務須要在隊列中等待空閒的線程。
而在緩存線程池中,線程數量幾乎無限,所以提交的任務在 SynchronousQueue 隊列中同步給空餘線程便可。
Tips:SynchronousQueue是一個沒有存儲空間的隊列,生產者進行 put 操做時,必需要等待消費者的 take 操做。
線程池在建立的時候,一共有7大參數:
workQueue:任務隊列,被提交的但未被執行的任務(相似於銀行裏面的候客區)
整個線程池的工做就像銀行辦理業務同樣。
ps:臨時增長的業務窗口,會先處理那些後面來的,沒位置坐的客戶。(候客區客戶os:憑什麼= =)
如下全部拒絕策略都實現了RejectedExecutionHandler接口
RejectedExcutionException
異常,阻止系統正常運行工做中應該用哪個方法來建立線程池呢?答案是一個都不用,咱們生產上只能使用自定義的。
阿里巴巴開發手冊:【強制】線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。
弊端以下:
- FixedThreadPool 和 SingleThreadPool 使用了無界隊列(LinkedBlockingQueue),可能會堆積大量的請求,從而致使OOM
- CacheThreadPool 和 ScheduledThreadPool 的最大線程爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM
ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),//候客區3個座位 Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//而後使用for循環,模擬10個用戶來進行請求 for (int i = 0; i < 10; i++) { final int tempInt = i; threadPool.execute(() -> { System.out.println(Thread.currentThread().getName()+" 給用戶:"+tempInt+"辦理業務"); }); }
可是用戶執行到第九個的時候,觸發了異常,程序中斷。
這是由於觸發了AbortPolicy的拒絕策略:直接報異常。
觸發條件是,請求的線程大於 阻塞隊列大小 + 最大線程數 = 8的時候,也就是說第9個線程來獲取線程池中的線程時,就會拋出異常。
也稱爲回退策略,就是用調用者所在的線程處理任務。
咱們看運行結果:
咱們發現,輸出的結果裏面出現了main線程,由於線程池觸發了拒絕策略,把任務回退到main線程,而後main線程對任務進行處理。
DiscardPolicy拒絕策略、DiscardOldestPolicy拒絕策略
這兩種策略都是把任務丟棄。
前者丟棄的是,進來排隊排不上的任務。
後者丟棄的是當前隊列中最老的任務,即排隊下一個就到你了,可是由於有人進來,致使你被丟棄了(爲何這麼慘?)
。處理邏輯以下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); //用 poll() 移除隊列中的首個元素 e.execute(r); //執行當前任務 } }
生產環境中如何配置 corePoolSize 和 maximumPoolSize?。這個是根據具體業務來配置的,分爲CPU密集型和IO密集型。
//能夠看CPU是幾核的 Runtime.getRuntime().availableProcessors();
CPU密集型也叫計算密集型,指的是系統的硬盤、內存性能比CPU要好,CPU的IO操做很快,可是CPU還有不少運算要處理,致使系統的CPU大部分都是 100%。
須要儘量少的線程數量,通常爲:CPU核數 + 1
即該任務須要大量的IO,即大量的阻塞。IO包括:數據庫交互,文件上傳下載,網絡傳輸等
IO密集型指的是系統的CPU性能相對硬盤、內存要好不少。此時大部分的情況是CPU在等IO操做,則應配置儘量多的線程,如 CPU核數 * 2。
參考公式:CPU核數 / (1 - 阻塞系統)。通常阻塞係數是0.9,好比 8 核CPU,應該配置 8 / 0.1 = 80 個線程數
若是執行方式是 execute 時,會看到堆棧異常的輸出。
當執行方式是 submit 時,堆棧異常沒有輸出。而且調用 Future.get() 方法時,能夠捕獲到異常。不會影響線程池裏面其餘線程的正常執行,線程池會把這個異常的線程移除掉,並建立一個新的線程放入線程池中。?