6.JDK併發包2 ......................................................................................................................1java
1.線程池的基本使用 ......................................................................................................2緩存
1.1. 爲何須要線程池...............................................................................................2併發
1.2. JDK爲咱們提供了哪些支持 .................................................................................2ide
1.3. 線程池的使用.......................................................................................................2this
1.3.1. 線程池的種類 ...............................................................................................2atom
1.3.2. 不一樣線程池的共同性 ...................................................................................2操作系統
1.4. 線程池使用的小例子...........................................................................................2線程
1.4.1. 簡單線程池 ...................................................................................................3code
1.4.2. ScheduledThreadPool ....................................................................................3blog
擴展和加強線程池 .....................................................................................................3
2.1. 回調接口...............................................................................................................3
2.2. 拒絕策略...............................................................................................................3
2.3. 自定義ThreadFactory ...........................................................................................3
線程池及其核心代碼分析 .........................................................................................
ForkJoin ........................................................................................................................3
4.1. 思想.......................................................................................................................3
4.2. 使用接口...............................................................................................................4
4.2.1. RecursiveAction .............................................................................................4
4.2.2. RecursiveTask ................................................................................................4
4.3. 簡單例子...............................................................................................................4
4.4. 實現要素...............................................................................................................4
4.4.1. 工做竊取 .......................................................................................................
簡單例子
newCachedThreadPool 建立一個可緩存的線程池。默認的,若是線程池的大小超過了處理任務所須要的線程, 那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
newFixedThreadPool 建立固定大小的線程池。每次提交一個任務就建立一個線程,線程可複用,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPollDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); // 建立一個可重用固定線程數的線程池 ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { pool.submit(task); } pool.shutdown(); } }
newSingleThreadExecutor 建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
newScheduledThreadPool 建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); //若是前面的任務沒有完成,則調度也不會啓動 //每3秒重複執行一次 pool.scheduleWithFixedDelay(task, 0,3, TimeUnit.SECONDS); //3秒後執行一次 // pool.schedule(task, 3, TimeUnit.SECONDS); // pool.shutdown(); } }
擴展和加強線程池
import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + "task name= " + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() ) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行" + ((MyTask) r).name); super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println("執行完成" + ((MyTask) r).name); } @Override protected void terminated() { super.terminated(); System.out.println("線程池退出"); } }; for (int i = 0; i < 5; i++) { MyTask task = new MyTask("TASK-GEYM" + i); pool.execute(task); Thread.sleep(10); } pool.shutdown(); } }
準備執行TASK-GEYM0 正在執行:Thread ID:10task name= TASK-GEYM0 準備執行TASK-GEYM1 正在執行:Thread ID:11task name= TASK-GEYM1 準備執行TASK-GEYM2 正在執行:Thread ID:12task name= TASK-GEYM2 準備執行TASK-GEYM3 正在執行:Thread ID:13task name= TASK-GEYM3 準備執行TASK-GEYM4 正在執行:Thread ID:14task name= TASK-GEYM4 執行完成TASK-GEYM0 執行完成TASK-GEYM1 執行完成TASK-GEYM2 執行完成TASK-GEYM3 執行完成TASK-GEYM4 線程池退出
拒絕策略
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + "task name= " + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+" is discard"); } } ) ; for (int i = 0; i < Integer.MAX_VALUE; i++) { MyTask task = new MyTask("TASK-GEYM" + i); pool.submit(task); Thread.sleep(10); } // pool.shutdown(); } }
正在執行:Thread ID:10task name= TASK-GEYM0 正在執行:Thread ID:11task name= TASK-GEYM1 正在執行:Thread ID:12task name= TASK-GEYM2 正在執行:Thread ID:13task name= TASK-GEYM3 正在執行:Thread ID:14task name= TASK-GEYM4 java.util.concurrent.FutureTask@7b23ec81 is discard java.util.concurrent.FutureTask@6acbcfc0 is discard java.util.concurrent.FutureTask@5f184fc6 is discard java.util.concurrent.FutureTask@3feba861 is discard 正在執行:Thread ID:10task name= TASK-GEYM9 正在執行:Thread ID:11task name= TASK-GEYM10 正在執行:Thread ID:12task name= TASK-GEYM11 正在執行:Thread ID:13task name= TASK-GEYM12 正在執行:Thread ID:14task name= TASK-GEYM13 java.util.concurrent.FutureTask@5b480cf9 is discard java.util.concurrent.FutureTask@6f496d9f is discard java.util.concurrent.FutureTask@723279cf is discard java.util.concurrent.FutureTask@10f87f48 is discard 正在執行:Thread ID:10task name= TASK-GEYM18 正在執行:Thread ID:11task name= TASK-GEYM19
自定義線程池名稱
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId() + "task name= " + name); System.out.println(Thread.currentThread().getName()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new MssThreadFactory("個人專屬線程池"), // Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is discard"); } } ); for (int i = 0; i < Integer.MAX_VALUE; i++) { MyTask task = new MyTask("TASK-GEYM" + i); pool.submit(task); Thread.sleep(10); } // pool.shutdown(); } }
import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 5:00 PM */ public class MssThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; MssThreadFactory(String namePrefix) { this.namePrefix = namePrefix+"-"; } public Thread newThread(Runnable r) { Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement()); if (t.isDaemon()) t.setDaemon(true); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
ForkJoin
發送消息 RecursiveAction 無返回值
/** * description: 發送消息 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 5:12 PM */ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ForkJoinPoolDemo { class SendMsgTask extends RecursiveAction { private final int THRESHOLD = 10; private int start; private int end; private List<String> list; public SendMsgTask(int start, int end, List<String> list) { this.start = start; this.end = end; this.list = list; } @Override protected void compute() { if ((end - start) <= THRESHOLD) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + ": " + list.get(i)); } }else { int middle = (start + end) / 2; //批量提交任務集 invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list)); } } } public static void main(String[] args) throws InterruptedException { List<String> list = new ArrayList<>(); for (int i = 0; i < 123; i++) { list.add(String.valueOf(i+1)); } ForkJoinPool pool = new ForkJoinPool(); pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list)); pool.awaitTermination(10, TimeUnit.SECONDS); pool.shutdown(); } }
計算求和
/** * description: 求和 RecursiveTask 有返回值 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 5:25 PM */ import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class ForkJoinTaskDemo { private class SumTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 1000; private int arr[]; private int start; private int end; public SumTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } /** * 小計 */ private Integer subtotal() { Integer sum = 0; for (int i = start; i < end; i++) { sum += arr[i]; } System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum); return sum; } @Override protected Integer compute() { if ((end - start) <= THRESHOLD) { return subtotal(); }else { int middle = (start + end) / 2; SumTask left = new SumTask(arr, start, middle); SumTask right = new SumTask(arr, middle, end); left.fork(); right.fork(); return left.join() + right.join(); } } } public static void main(String[] args) throws ExecutionException, InterruptedException { int[] arr = new int[10000]; for (int i = 0; i < 10000; i++) { arr[i] = i + 1; } ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length)); //提交任務 System.out.println("最終計算結果: " + result.invoke()); pool.shutdown(); } }