一、背景:
重構了一個項目,將之前散亂的多線程和定時任務線程作統一管理,減小代碼量並提高代碼可讀性。java
二、直接上代碼 - 可直接COPY使用 ----------------------------緩存
-、新建一個線程池構造工廠類 ExecutorServiceFactory:安全
package com.aaa.bbb.ccc.ddd;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Function : 線程池構造工廠
* @Author & @Date : lynn_ - 2018年6月14日
*/
public class ExecutorServiceFactory {
// 實例化ExecutorServiceFactory
private static ExecutorServiceFactory executorServiceFactory = new ExecutorServiceFactory();
// 定時任務線程池
private ExecutorService executorService;
/**
* 默認無參構造
*/
private ExecutorServiceFactory() { }
/**
* @Function: 獲取ExecutorServiceFactory
*/
public static ExecutorServiceFactory getInstance() {
return executorServiceFactory;
}
/**
* @Function: 建立一個定長的線程池 - 它可安排在給定延遲後運行命令或者按期地執行
*/
public ExecutorService createScheduledThreadPool() {
// CPU個數
int availableProcessors = Runtime.getRuntime().availableProcessors();
// 建立
executorService = Executors.newScheduledThreadPool(availableProcessors * 10, getThreadFactory());
return executorService;
}
/**
* @Function: 建立一個單線程化的Executor,即只建立惟一的工做者線程來執行任務,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行
* Executor,以無界隊列方式來運行該線程。(注意,若是由於在關閉前的執行期間出現失敗而終止了此單個線程,
* 那麼若是須要,一個新線程將代替它執行後續的任務 )。可保證順序地執行各個任務,
* 而且在任意給定的時間不會有多個線程是活動的。與其餘等效的 newFixedThreadPool(1)
* 不一樣,可保證無需從新配置此方法所返回的執行程序便可使用其餘的線程。
*/
public ExecutorService createSingleThreadExecutor() {
// 建立
executorService = Executors.newSingleThreadExecutor(getThreadFactory());
return executorService;
}
/**
* @Function: 建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程
* 對於執行不少短時間異步任務的程序而言,這些線程池一般可提升程序性能。調用 execute將重用之前構造的線程(若是線程可用)。
* 若是現有線程沒有可用的,則建立一個新線程並添加到池中。 終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。
* 所以,長時間保持空閒的線程池不會使用任何資源。注意,可使用 ThreadPoolExecutor
* 構造方法建立具備相似屬性但細節不一樣(例如超時參數)的線程池。
*/
public ExecutorService createCachedThreadPool() {
// 建立
executorService = Executors.newCachedThreadPool(getThreadFactory());
return executorService;
}
/**
* @Function: 建立一個指定工做線程數量的線程池。每當提交一個任務就建立一個工做線程,
* 若是工做線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。
* 可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。在任意點,在大多數 nThreads
* 線程會處於處理任務的活動狀態。若是在全部線程處於活動狀態時提交附加任務,則在有可用線程以前,附加任務將在隊列中等待。
* 若是在關閉前的執行期間因爲失敗而致使任何線程終止,那麼一個新線程將代替它執行後續的任務(若是須要)。
* 在某個線程被顯式地關閉以前,池中的線程將一直存在。
*/
public ExecutorService createFixedThreadPool(int count) {
// 建立
executorService = Executors.newFixedThreadPool(count, getThreadFactory());
return executorService;
}
/**
* @Function: 獲取線程池工廠
*/
private ThreadFactory getThreadFactory() {
return new ThreadFactory() {
// AtomicInteger是一個提供原子操做的Integer類,經過線程安全的方式操做加減
AtomicInteger sn = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
// 安全管理器
SecurityManager s = System.getSecurityManager();
// 全部線程都隸屬於一個線程組
ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
Thread t = new Thread(group, r);
t.setName("任務線程 - " + sn.incrementAndGet());
return t;
}
};
}
}
複製代碼
--、新建一個線程處理類 ExecutorProcessPool:多線程
package com.aaa.bbb.ccc.ddd;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* @Function : 線程處理類
* @Author & @Date : lin.li - 2018年6月14日
*/
public class ExecutorProcessPool {
//線程池接口
private ExecutorService executor;
//實例化當前類
private static ExecutorProcessPool pool = new ExecutorProcessPool();
/**
* Creates a new instance of ExecutorProcessPool
*/
private ExecutorProcessPool() {
executor = ExecutorServiceFactory.getInstance().createCachedThreadPool();
}
/**
* @Function: 獲取 ExecutorProcessPool
*/
public static ExecutorProcessPool getInstance() {
return pool;
}
/**
* @Function: 關閉線程池,這裏要說明的是:調用關閉線程池方法後,線程池會執行完隊列中的全部任務才退出
*/
public void shutdown(){
executor.shutdown();
}
/**
* @Function: 提交任務到線程池,能夠接收線程返回值 - Future模式
*/
public Future<?> submit(Runnable task) {
return executor.submit(task);
}
public Future<?> submit(Callable<?> task) {
return executor.submit(task);
}
/**
* @Function: 直接提交任務到線程池,無返回值
*/
public void execute(Runnable task){
executor.execute(task);
}
}
複製代碼
---、核心代碼就這點了,下面是測試類 ExecutorTest:dom
package com.goldpac.ito.system.interceptor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @Function : 線程池測試類
* @Author & @Date : lin.li - 2018年6月11日
*/
public class ExecutorTest {
public static void main(String[] args) {
//獲取實例
ExecutorProcessPool pool = ExecutorProcessPool.getInstance();
//for循環添加多個線程 -
for (int i = 0; i < 200; i++) {
Future<?> future = pool.submit(new ExcuteTask1(i + ""));
try {
//若是接收線程返回值,future.get() 會阻塞,若是這樣寫就是一個線程一個線程執行。因此非特殊狀況不建議使用接收返回值的。
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
//沒有返回值。能夠執行任務,但沒法判斷任務是否成功完成。
//execute(Runnable x);
//for (int i = 0; i < 200; i++) {
// pool.execute(new ExcuteTask2(i + ""));
//}
// 關閉線程池,若是是須要長期運行的線程池,不用調用該方法。
// 監聽程序退出的時候最好執行一下。
pool.shutdown();
}
/**
* 執行任務1,實現Callable方式
*/
static class ExcuteTask1 implements Callable<String> {
private String taskName;
public ExcuteTask1(String taskName) {
this.taskName = taskName;
}
@Override
public String call() throws Exception {
try {
// Java 6/7最佳的休眠方法爲TimeUnit.MILLISECONDS.sleep(100);
// 最好不要用 Thread.sleep(100);
// 1000毫秒之內的隨機數,模擬業務邏輯處理
TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 1000));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-------------這裏執行業務邏輯,Callable TaskName = " + taskName + "-------------");
return ">>>>>>>>>>>>>線程返回值,Callable TaskName = " + taskName + "<<<<<<<<<<<<<<";
}
}
/**
* 執行任務2,實現Runable方式
*/
static class ExcuteTask2 implements Runnable {
private String taskName;
public ExcuteTask2(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
try {
// 1000毫秒之內的隨機數,模擬業務邏輯處理
TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 1000));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-------------這裏執行業務邏輯,Runnable TaskName = " + taskName + "-------------");
}
}
}複製代碼