阿里JAVA編碼規約,建議採用ThreadPoolExecutor建立線程池。json
private static ExecutorService simpleExecutorService = new ThreadPoolExecutor( 200, 300, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>(10000), new ThreadPoolExecutor.DiscardPolicy());
public void doSomething(final String message) { simpleExecutorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); System.out.println("step 2"); System.out.println("message=>" + message); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("step 1"); }
ThreadUtil threadUtil = new ThreadUtil(); threadUtil.doSomething("a thread pool demo");
step 1 step 2 message=>a thread pool demo
在Spring3.x以後框架已經支持採用@Async註解進行異步執行了。多線程
被@Async修飾的方法叫作異步方法,這些異步方法會在新的線程中進行處理,不影響主線程的順序執行。併發
@Component @Slf4j public class AsyncTask { @Async public void dealNoReturnTask(){ log.info("Thread {} deal No Return Task start", Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis()); } }
進行調用:框架
@SpringBootTest(classes = SpringbootApplication.class) @RunWith(SpringJUnit4ClassRunner.class) @Slf4j public class AsyncTest { @Autowired private AsyncTask asyncTask; @Test public void testDealNoReturnTask(){ asyncTask.dealNoReturnTask(); try { log.info("begin to deal other Task!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
@Async public Future<String> dealHaveReturnTask() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = new JSONObject(); jsonObject.put("thread", Thread.currentThread().getName()); jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString()); }
判斷任務是否取消:異步
@Test public void testDealHaveReturnTask() throws Exception { Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break; } if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break; } log.info("wait async task to end ..."); Thread.sleep(1000); } }
咱們能夠實現AsyncConfigurer接口,也能夠繼承AsyncConfigurerSupport類來實現 在方法getAsyncExecutor()中建立線程池的時候,必須使用 executor.initialize(), 否則在調用時會報線程池未初始化的異常。 若是使用threadPoolTaskExecutor()來定義bean,則不須要初始化async
@Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { // @Bean // public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // executor.setCorePoolSize(10); // executor.setMaxPoolSize(100); // executor.setQueueCapacity(100); // return executor; // } @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutorThread-"); executor.initialize(); //若是不初始化,致使找到不到執行器 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }
異步異常處理類:ide
@Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) { AsyncException asyncException = (AsyncException) ex; log.info("asyncException:{}",asyncException.getErrorMessage()); } log.info("Exception :"); ex.printStackTrace(); } } @Data @AllArgsConstructor public class AsyncException extends Exception { private int code; private String errorMessage; }
須要結合Callable測試
public class CallableDemo implements Callable<Integer> { private int sum; @Override public Integer call() throws Exception { System.out.println("Callable子線程開始計算啦!"); Thread.sleep(2000); for(int i=0 ;i<5000;i++){ sum=sum+i; } System.out.println("Callable子線程計算結束!"); return sum; } }
//建立線程池 ExecutorService es = Executors.newSingleThreadExecutor(); //建立Callable對象任務 CallableDemo calTask = new CallableDemo(); //提交任務並獲取執行結果 Future<Integer> future = es.submit(calTask); //關閉線程池 es.shutdown(); try { System.out.println("主線程在執行其餘任務"); if (future.get() != null) { //輸出獲取到的結果 System.out.println("future.get()-->" + future.get()); } else { //輸出獲取到的結果 System.out.println("future.get()未獲取到結果"); } } catch (Exception e) { e.printStackTrace(); } System.out.println("主線程在執行完成");
//建立線程池 ExecutorService es = Executors.newSingleThreadExecutor(); //建立Callable對象任務 CallableDemo calTask = new CallableDemo(); //建立FutureTask FutureTask<Integer> future = new FutureTask<>(calTask); // future.run(); // 因爲FutureTask繼承於Runable,因此也能夠直接調用run方法執行 //執行任務 es.submit(future); // 效果同上面直接調用run方法 //關閉線程池 es.shutdown(); try { System.out.println("主線程在執行其餘任務"); if (future.get() != null) { //輸出獲取到的結果 System.out.println("future.get()-->" + future.get()); } else { //輸出獲取到的結果 System.out.println("future.get()未獲取到結果"); } } catch (Exception e) { e.printStackTrace(); } System.out.println("主線程在執行完成");
public class FutureDemo{ public static void main(String[] args) { Long start = System.currentTimeMillis(); //開啓多線程 ExecutorService exs = Executors.newFixedThreadPool(10); try { //結果集 List<Integer> list = new ArrayList<Integer>(); List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); //1.高速提交10個任務,每一個任務返回一個Future入list for(int i=0;i<10;i++){ futureList.add(exs.submit(new CallableTask(i+1))); } Long getResultStart = System.currentTimeMillis(); System.out.println("結果歸集開始時間="+new Date()); //2.結果歸集,遍歷futureList,高速輪詢(模擬實現了併發)獲取future狀態成功完成後獲取結果,退出當前循環 for (Future<Integer> future : futureList) { //CPU高速輪詢:每一個future都併發輪循,判斷完成狀態而後獲取結果,這一行,是本實現方案的精髓所在。即有10個future在高速輪詢,完成一個future的獲取結果,就關閉一個輪詢 while (true) { //獲取future成功完成狀態,若是想要限制每一個任務的超時時間,取消本行的狀態判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用便可。 if (future.isDone()&& !future.isCancelled()) { Integer i = future.get();//獲取結果 System.out.println("任務i="+i+"獲取完成!"+new Date()); list.add(i); break;//當前future獲取結果完畢,跳出while } else { //每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU---》新手別忘記這個 Thread.sleep(1); } } } System.out.println("list="+list); System.out.println("總耗時="+(System.currentTimeMillis()-start)+",取結果歸集耗時="+(System.currentTimeMillis()-getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } }
static class CallableTask implements Callable<Integer>{ Integer i; public CallableTask(Integer i) { super(); this.i=i; } @Override public Integer call() throws Exception { if(i==1){ Thread.sleep(3000);//任務1耗時3秒 }else if(i==5){ Thread.sleep(5000);//任務5耗時5秒 }else{ Thread.sleep(1000);//其它任務耗時1秒 } System.out.println("task線程:"+Thread.currentThread().getName()+"任務i="+i+",完成!"); return i; } } }