採用線程池進行異步任務處理

建立線程池

阿里JAVA編碼規約,建議採用ThreadPoolExecutor建立線程池。json

private static ExecutorService simpleExecutorService = new ThreadPoolExecutor(
            200,
            300,
            0L,
            TimeUnit.MICROSECONDS,
            new LinkedBlockingDeque<Runnable>(10000),
            new ThreadPoolExecutor.DiscardPolicy());

在同步操做過程當中經過線程池完成異步操做

public void doSomething(final String message) {
        simpleExecutorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("step 2");
                    System.out.println("message=>" + message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        System.out.println("step 1");
    }

進行測試

ThreadUtil threadUtil = new ThreadUtil();
threadUtil.doSomething("a thread pool demo");

輸出結果

step 1
step 2
message=>a thread pool demo

@Async

在Spring3.x以後框架已經支持採用@Async註解進行異步執行了。多線程

被@Async修飾的方法叫作異步方法,這些異步方法會在新的線程中進行處理,不影響主線程的順序執行。併發

無返回值執行

@Component
@Slf4j
public class AsyncTask {

    @Async
    public void dealNoReturnTask(){
        log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
    }
}

進行調用:框架

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@Slf4j
public class AsyncTest {

    @Autowired
    private AsyncTask asyncTask;

    @Test
    public void testDealNoReturnTask(){
        asyncTask.dealNoReturnTask();
        try {
            log.info("begin to deal other Task!");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

有返回值執行

@Async
    public Future<String> dealHaveReturnTask() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("thread", Thread.currentThread().getName());
        jsonObject.put("time", System.currentTimeMillis());
        return new AsyncResult<String>(jsonObject.toJSONString());
    }

判斷任務是否取消:異步

@Test
    public void testDealHaveReturnTask() throws Exception {

        Future<String> future = asyncTask.dealHaveReturnTask();
        log.info("begin to deal other Task!");
        while (true) {
            if(future.isCancelled()){
                log.info("deal async task is Cancelled");
                break;
            }
            if (future.isDone() ) {
                log.info("deal async task is Done");
                log.info("return result is " + future.get());
                break;
            }
            log.info("wait async task to end ...");
            Thread.sleep(1000);
        }
    }

異步執行結果異常處理

咱們能夠實現AsyncConfigurer接口,也能夠繼承AsyncConfigurerSupport類來實現 在方法getAsyncExecutor()中建立線程池的時候,必須使用 executor.initialize(), 否則在調用時會報線程池未初始化的異常。 若是使用threadPoolTaskExecutor()來定義bean,則不須要初始化async

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

//    @Bean
//    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//        executor.setCorePoolSize(10);
//        executor.setMaxPoolSize(100);
//        executor.setQueueCapacity(100);
//        return executor;
//    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsyncExecutorThread-");
        executor.initialize(); //若是不初始化,致使找到不到執行器
        return executor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }
}

異步異常處理類:ide

@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));

        if (ex instanceof AsyncException) {
            AsyncException asyncException = (AsyncException) ex;
            log.info("asyncException:{}",asyncException.getErrorMessage());
        }

        log.info("Exception :");
        ex.printStackTrace();
    }
}

@Data
@AllArgsConstructor
public class AsyncException extends Exception {
    private int code;
    private String errorMessage;
}
  • 在無返回值的異步調用中,異步處理拋出異常,AsyncExceptionHandler的handleUncaughtException()會捕獲指定異常,原有任務還會繼續運行,直到結束。
  • 在有返回值的異步調用中,異步處理拋出異常,會直接拋出異常,異步任務結束,原有處理結束執行。

Future或FutureTask

須要結合Callable測試

public class CallableDemo implements Callable<Integer> {

    private int sum;

    @Override
    public Integer call() throws Exception {
        System.out.println("Callable子線程開始計算啦!");
        Thread.sleep(2000);

        for(int i=0 ;i<5000;i++){
            sum=sum+i;
        }
        System.out.println("Callable子線程計算結束!");
        return sum;
    }
}

Future模式

//建立線程池
        ExecutorService es = Executors.newSingleThreadExecutor();
        //建立Callable對象任務
        CallableDemo calTask = new CallableDemo();

        //提交任務並獲取執行結果
        Future<Integer> future = es.submit(calTask);

        //關閉線程池
        es.shutdown();

        try {
            System.out.println("主線程在執行其餘任務");

            if (future.get() != null) {
                //輸出獲取到的結果
                System.out.println("future.get()-->" + future.get());
            } else {
                //輸出獲取到的結果
                System.out.println("future.get()未獲取到結果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主線程在執行完成");

FutureTask模式

//建立線程池
        ExecutorService es = Executors.newSingleThreadExecutor();
        //建立Callable對象任務
        CallableDemo calTask = new CallableDemo();

        //建立FutureTask
        FutureTask<Integer> future = new FutureTask<>(calTask);
        // future.run();  // 因爲FutureTask繼承於Runable,因此也能夠直接調用run方法執行
        //執行任務
        es.submit(future); // 效果同上面直接調用run方法

        //關閉線程池
        es.shutdown();

        try {
            System.out.println("主線程在執行其餘任務");

            if (future.get() != null) {
                //輸出獲取到的結果
                System.out.println("future.get()-->" + future.get());
            } else {
                //輸出獲取到的結果
                System.out.println("future.get()未獲取到結果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主線程在執行完成");

歸併異步執行結果

public class FutureDemo{
 
     public static void main(String[] args)  {
         Long start = System.currentTimeMillis();
         //開啓多線程
         ExecutorService exs = Executors.newFixedThreadPool(10);
         try {
             //結果集
             List<Integer> list = new ArrayList<Integer>();
             List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
             //1.高速提交10個任務,每一個任務返回一個Future入list
             for(int i=0;i<10;i++){
                 futureList.add(exs.submit(new CallableTask(i+1)));
             }

             Long getResultStart = System.currentTimeMillis();
             System.out.println("結果歸集開始時間="+new Date());
             //2.結果歸集,遍歷futureList,高速輪詢(模擬實現了併發)獲取future狀態成功完成後獲取結果,退出當前循環
             for (Future<Integer> future : futureList) {
                  //CPU高速輪詢:每一個future都併發輪循,判斷完成狀態而後獲取結果,這一行,是本實現方案的精髓所在。即有10個future在高速輪詢,完成一個future的獲取結果,就關閉一個輪詢
                 while (true) {
                    //獲取future成功完成狀態,若是想要限制每一個任務的超時時間,取消本行的狀態判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用便可。 
                    if (future.isDone()&& !future.isCancelled()) {
                         Integer i = future.get();//獲取結果
                         System.out.println("任務i="+i+"獲取完成!"+new Date());
                         list.add(i);
                         break;//當前future獲取結果完畢,跳出while
                     } else {
                          //每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU---》新手別忘記這個
                         Thread.sleep(1);
                     }
                 }
             }

             System.out.println("list="+list);

             System.out.println("總耗時="+(System.currentTimeMillis()-start)+",取結果歸集耗時="+(System.currentTimeMillis()-getResultStart));
            
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             exs.shutdown();
         }
     }
static class CallableTask implements Callable<Integer>{
         Integer i;
         
         public CallableTask(Integer i) {
             super();
             this.i=i;
         }
 
         @Override
         public Integer call() throws Exception {
             if(i==1){
                 Thread.sleep(3000);//任務1耗時3秒
             }else if(i==5){
                 Thread.sleep(5000);//任務5耗時5秒
             }else{
                 Thread.sleep(1000);//其它任務耗時1秒
             }
             System.out.println("task線程:"+Thread.currentThread().getName()+"任務i="+i+",完成!");  
             return i;
         }
     }
 }
相關文章
相關標籤/搜索