爲了更好地控制多線程,JDK提供了一套線程框架Executor,幫助開發人員有效的進行線程控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,它扮演着線程工廠的角色,咱們經過Executors能夠建立特定功能的線程池。
Executors建立線程池方法:java
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
使用例子:緩存
class Temp extends Thread { @Override public void run() { System.out.println("run..."); } } public class ScheduledJob { public static void main(String[] args) { Temp command = new Temp(); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleWithFixedDelay(command, 1, 3, TimeUnit.SECONDS); //1秒後執行線程,以後每隔3秒輪詢 } }
運行結果:
run...
run...
run...
run...
run...
run...
run...
...多線程
若Executors工廠類沒法知足咱們的需求,能夠本身去建立自定義的線程池。其實Executors工廠類裏面的建立線程方法其內部實現均是用了ThreadPoolExecutor這個類,這個類能夠自定義線程,構造方法以下:併發
public ThreadPoolExecutor(int corePoolSize, //表示當前建立的核心線程數 int maximumPoolSize, //表示最大線程數 long keepAliveTime, //線程池空閒時存活時間 TimeUnit unit, //指定時間單位 BlockingQueue<Runnable> workQueue, //緩存隊列 ThreadFactory threadFactory, // RejectedExecutionHandler handler) { //拒絕執行的方法 ... ... }
這個構造方法對於隊列是什麼類型的比較關鍵:框架
public class UserThreadPoolExecutor { public static void main(String[] args) { /** * 使用有界任務隊列時:如有新的任務須要執行,若是線程池實際線程數小於corePoolSize,則優先建立線程; * 若大於corePoolSize,則會將任務加入隊列, * 若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,建立新的線程, * 若線程數大於maximumPoolSize,則執行拒絕策略。 */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //maxSize 60, //無效時間 TimeUnit.SECONDS, //單位 new ArrayBlockingQueue<Runnable >(3) //有界隊列 ); pool.execute(new MyTask(1, "任務1")); pool.execute(new MyTask(2, "任務2")); pool.execute(new MyTask(3, "任務3")); pool.execute(new MyTask(4, "任務4")); pool.execute(new MyTask(5, "任務5")); pool.execute(new MyTask(6, "任務6")); pool.shutdown(); } }
運行結果:
run taskId = 1
run taskId = 5
Exception in thread "main"
java.util.concurrent.RejectedExecutionException: Task Thread[Thread-5,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@70dea4e[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at Executor.UserThreadPoolExecutor.main(UserThreadPoolExecutor.java:28)
run taskId = 2
run taskId = 3
run taskId = 4
分析:
任務數大於coreSize(目前爲1),則有任務加入隊列,又隊列(隊列容量爲3)已滿,則建立一個線程(目前coreSize爲2),因爲maxSize爲2,全部最多隻能再建立一個線程到線程池(目前coreSize+queue=5小於任務數6),沒法再建立線程,執行拒絕策略ide
public class UserThreadPoolExecutor2 implements Runnable{ /** * 當有新任務到來,系統線程數小於corePoolSize時,則新建線程執行任務, * 當達到corePoolSize後,就不會繼續增長, * 若後續仍有新的任務加入,而又沒有空閒的線程資源,則任務直接進入隊列等待。 * 若任務建立和處理的速度差別很大,無界隊列會保持快速增加,知道耗盡系統內存 */ private static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); ThreadPoolExecutor pool = new ThreadPoolExecutor( 5, 10, 120L, //2分鐘 TimeUnit.SECONDS, queue ); for (int i = 0; i < 20; i++) { pool.execute(new UserThreadPoolExecutor2()); } Thread.sleep(1000); System.out.println("queue size : " + queue.size()); Thread.sleep(2000); //pool.shutdown(); } @Override public void run() { try { int num = count.incrementAndGet(); System.out.println("任務" + num); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
運行結果:
任務2
任務4
任務1
任務3
任務5
queue size : 15
任務6
任務8
任務7
任務9
任務10
任務11
任務12
任務13
任務14
任務15
任務16
任務18
任務19
任務17
任務20線程
JDK拒絕策略:日誌
若是須要自定義拒絕策略,能夠實現RejectedExecutionHandle接口:code
public class MyReject implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定義處理..."); System.out.println("當前被拒絕任務爲:" + r.toString()); //記錄日誌,等待其餘時間處理 } } public class UserThreadPoolExecutor1 { public static void main(String[] args) throws InterruptedException { /** * 使用有界任務隊列時:如有新的任務須要執行,若是線程池實際線程數小於corePoolSize,則優先建立線程; * 若大於corePoolSize,則會將任務加入隊列, * 若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,建立新的線程, * 若線程數大於maximumPoolSize,則執行拒絕策略。 */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //maxSize 60, //無效時間 TimeUnit.SECONDS, //單位 new ArrayBlockingQueue<Runnable>(3), //有界隊列 new MyReject() ); pool.execute(new MyTask(1, "任務1")); pool.execute(new MyTask(2, "任務2")); pool.execute(new MyTask(3, "任務3")); pool.execute(new MyTask(4, "任務4")); pool.execute(new MyTask(5, "任務5")); pool.execute(new MyTask(6, "任務6")); pool.shutdown(); } }
運行結果:
自定義處理...
當前被拒絕任務爲:Thread[Thread-5,5,main]
run taskId = 5
run taskId = 1
run taskId = 2
run taskId = 3
run taskId = 4對象