淺析Java高併發下ScheduleThreadPoolExecutor延時任務

Java中的計劃任務Timer工具類提供了以計時器或計劃任務的功能來實現按指定時間或時間間隔執行任務,但因爲Timer工具類並非以池pool方式實現的,而是以隊列的方式來管理線程的,因此在高併發的狀況下運行效率較低,在JDK 1.5版本之後提供了ScheduledExecutorService對象來解決效率與定時任務的性能問題。java

這篇文章咱們主要討論ScheduledExecutorService的使用技巧以及一些經常使用的線程池操做方法,後面的文章會繼續對執行器進行深刻的交流探討。程序員

image

Executors 工具類提供了兩個經常使用的ScheduledThreadPoolExecutor併發

這兩個經常使用的ScheduledThreadPoolExecutor:SingleThreadScheduledExecutor(單線程的線程池)、ScheduledThreadPool(線程數量固定的線程池),下面是 Executors 對應的源代碼。dom

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory arg) {
    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, arg));
}

public static ScheduledExecutorService newScheduledThreadPool(int arg) {
    return new ScheduledThreadPoolExecutor(arg);
}

public static ScheduledExecutorService newScheduledThreadPool(int arg, ThreadFactory arg0) {
    return new ScheduledThreadPoolExecutor(arg, arg0);
}

ScheduledExecutorService是一個接口,繼承於ExecutorService,支持線程池的全部功能,同時也提供了四個用於計劃任務調度的核心方法。ide

下面我將介紹這四個方法的使用和一些經常使用的線程池方法:高併發

一、schedule runnable工具

帶延遲時間的調度,只執行一次,返回值爲實現Future接口的對象,可調用Future.get()方法阻塞直到任務執行完畢性能

/**
 * 建立並執行在給定延遲後啓用的一次性操做
 *
 * @param command 要執行的任務 
 * @param delay 從如今開始延遲執行的時間 
 * @param unit 延時參數的時間單位 
 * @return 表示任務等待完成,而且其的ScheduledFuture get()方法將返回 null 
 * @throws RejectedExecutionException 若是任務沒法安排執行 
 * @throws NullPointerException 若是命令爲空 
 */
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

schedule runnable使用示例this

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
    ScheduledFuture<?> future = scheduled.schedule(() -> {
        try {
            System.out.println("開始執行任務");
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("執行完畢");
    }, 1000, TimeUnit.MILLISECONDS);
    System.out.println("阻塞開始");
    System.out.println(future.get() + "");
    System.out.println("阻塞結束");
}

執行結果以下:spa

阻塞開始
開始執行任務
執行完畢
null
阻塞結束

schedule runnable,這個方法是不提供返回值的,因此調用future.get()方法返回的是null

二、schedule callable

帶延遲時間的調度,只執行一次,返回值爲實現Future接口的對象,調用Future.get()方法阻塞直到任務完成,能夠獲取到返回結果

/**
 * 建立並執行在給定延遲後啓用的ScheduledFuture
 *
 * @param callable 執行的功能 
 * @param delay 從如今開始延遲執行的時間 
 * @param unit 延遲參數的時間單位 
 * @param <V> the 可調用結果的類型 
 * @return一個可用於提取結果或取消的ScheduledFuture 
 * @throws RejectedExecutionException 若是該任務沒法安排執行 
 * @throws NullPointerException 若是callable爲空 
 */
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

schedule Callable 使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
    ScheduledFuture<String> future = scheduled.schedule(() -> {
        try {
            System.out.println("開始執行任務");
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("執行完畢");
        return "success";
    }, 1000, TimeUnit.MILLISECONDS);
    System.out.println("阻塞開始");
    System.out.println(future.get() + "");
    System.out.println("阻塞結束");
}

執行結果:

阻塞開始
開始執行任務
執行完畢
success
阻塞結束

schedule callable 是帶返回值的,經過future.get()獲取

三、scheduleAtFixedRate

建立並執行一個在給定初始延遲後的按期操做,也就是將在 initialDelay 後開始執行,而後在initialDelay+period 後下一個任務執行,接着在 initialDelay + 2 * period 後執行,依此類推 ,也就是隻在第一次任務執行時有延時。

/**
 * @param command 要執行的任務 
 * @param initialDelay 首次執行的延遲時間
 * @param period 連續執行之間的週期
 * @param unit initialDelay和period參數的時間單位 
 * @return 一個ScheduledFuture表明待完成的任務,其 get()方法將在取消時拋出異常 
 * @throws RejectedExecutionException 若是任務沒法安排執行 
 * @throws NullPointerException 若是命令爲空 
 * @throws IllegalArgumentException 若是period小於或等於零 
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

scheduleAtFixedRate使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
    ScheduledFuture<?> future = scheduled.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("開始執行任務");
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("執行完畢");
        }
    }, 1000L, 1000L, TimeUnit.MILLISECONDS);
    System.out.println("阻塞開始");
    System.out.println(future.get() + "");
    System.out.println("阻塞結束");    
}

打印結果以下:

阻塞開始
開始執行任務
執行完畢
開始執行任務
執行完畢
開始執行任務
執行完畢
....

四、scheduleWithFixedDelay

建立並執行一個在給定初始延遲後首次啓用的按期操做,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲,即總時間是(initialDelay + period)* n

/**
 * @param command 要執行的任務 
 * @param initialDelay 首次執行的延遲時間
 * @param delay 一次執行終止和下一次執行開始之間的延遲
 * @param unit initialDelay和delay參數的時間單位
 * @return 表示掛起任務完成的ScheduledFuture,而且其get()方法在取消後將拋出異常
 * @throws RejectedExecutionException 若是任務不能安排執行 
 * @throws NullPointerException 若是command爲null
 * @throws IllegalArgumentException 若是delay小於等於0
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

scheduledWithFixedDelay使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
    ScheduledFuture<?> future = scheduled.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("開始執行任務");
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("執行完畢");
        }
    }, 1000L, 1000L, TimeUnit.MILLISECONDS);
    System.out.println("阻塞開始");
    System.out.println(future.get() + "");
    System.out.println("阻塞結束");
}

打印結果以下:

阻塞開始
開始執行任務
執行完畢
開始執行任務
執行完畢
開始執行任務
執行完畢
....

scheduleAtFixedRate和scheduleWithFixedDelay的區別在於,scheduleAtFixedRate()爲固定頻率,scheduleWithFixedDelay()爲固定延遲。固定頻率是相對於任務執行的開始時間,而固定延遲是相對於任務執行的結束時間,這就是他們最根本的區別!

五、線程池關閉,shutdown()和shutdownNow()的使用

兩個關閉線程池的方法,一旦線程池被關閉,就會拒絕之後提交的全部任務

使用shutdown()可使用awaitTermination等待全部線程執行完畢當前任務。在shutdown之前已提交任務的執行中發起一個有序的關閉,可是不接受新任務。

使用shutdownNow()嘗試中止全部正在執行的任務、暫停等待任務的處理,並返回等待執行的任務列表。對於正在運行,嘗試經過中斷該線程來結束線程。對於還沒有運行的任務,則都再也不執行。

class PrintThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "PrintThreadFactory");
    }
}
public static void main(String[] args) {
    final AtomicInteger count = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());
    Runnable runnable = () -> {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
            System.out.println("任務繼續...");
            try {
                Thread.sleep(3000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("任務結束");
        }
    };
    schedule.scheduleAtFixedRate(runnable, 0L, 2L, TimeUnit.SECONDS);
    try {
        countDownLatch.await();
        schedule.shutdown();  //平滑中止線程,不處理新任務,完成正在執行的任務
//      schedule.shutdownNow();  // 嘗試強制中止線程,讓終止的線程去設置休眠會拋出異常

        if (schedule.isShutdown()) {
            System.out.println("Scheduled is shutdown");
        }
        if (schedule.awaitTermination(10L, TimeUnit.SECONDS)) {
            System.out.println("termination");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

經過Future.cancel()取消運行任務

cancel()方法接收參數是布爾型的,傳入true會中斷線程中止任務,傳入false則會讓線程正常執行至完成。這裏傳入false既然不會中斷線程,那麼這個cancel方法不就沒有意義了?

class PrintThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "PrintThreadFactory");
    }
}
public static void main(String[] args) {
    final AtomicInteger count = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());
    Runnable runnable = () -> {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
        }
    };
    Future future = schedule.scheduleAtFixedRate(runnable, 0L, 2L, TimeUnit.SECONDS);
    try {
        countDownLatch.await();
        future.cancel(true);
        if (future.isCancelled()) {
            System.out.println("is Cancelled");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

簡單來講傳入false參數只能取消還沒開始的任務,若任務已經開始了,就由其運行下去。因此對於已經開始的任務,若是想要中止的話,須要給cancel方法的參數設置爲true。

六、ScheduledThreadPoolExecutor參數使用

image

continueExistingPeriodicTasksAfterShutdown,對於經過scheduleAtFixedRate、scheduleWithFixedDelay 提交的週期任務有效 默認值爲false,設置爲true表示當執行器調用shutdown後,繼續執行延時任務;

與之對應的get和set方法

void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()

executeExistingDelayedTasksAfterShutdown,對於經過schedule()方法提交的延時任務有效,默認爲true,設置爲false表示當執行器調用shutdown後,再也不繼續執行現有延遲任務;

與之對應的get和set方法

void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()

removeOnCancel,默認爲false,設置爲true則從隊列中刪除執行任務;

與之對應的get和set方法

void setRemoveOnCancelPolicy(boolean value)
boolean getRemoveOnCancelPolicy()

使用示例

public static void main(String[] args) {

        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

        Runnable runnable = () -> System.out.println(Thread.currentThread().getName() + "_1");

//        ScheduledFuture<?> future = executor.schedule(runnable, 3, TimeUnit.SECONDS);
        // 對於經過schedule()方法提交的延時任務有效,默認爲true,設置爲false表示當執行器調用shutdown後,再也不繼續執行現有延遲任務
//        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);

//        System.out.println(executor.getQueue().size());
//        // 默認爲false,設置爲true則從隊列中刪除執行任務
//        executor.setRemoveOnCancelPolicy(false);
//        future.cancel(true);
//        System.out.println(executor.getQueue().size());

        executor.scheduleAtFixedRate(runnable, 1L, 1L, TimeUnit.SECONDS);
//        //對於經過scheduleAtFixedRate、scheduleWithFixedDelay 提交的週期任務有效 默認值爲false,設置爲true表示當執行器調用shutdown後,繼續執行延時任務
        executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);

        executor.shutdown();

        System.out.println("線程池中止");
    }

七、其餘

零延時的 execute()、submit() 方法

execute()、submit() 方法都被重寫了,本質上調用的仍是 schedule() 方法;從下面的源碼能夠看出,這兩個方法提交的任務都是延時爲0的 「實時任務」;

public void execute(Runnable arg0) {
    this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
}
public Future<?> submit(Runnable arg0) {
    return this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
}

封裝計劃任務線程池工具類

下面是使用單例模式封裝的工具類

public final class MyScheduledExecutor {

    // 全局用於處理接收Future對象的集合
    private ConcurrentHashMap<String, Future> futureMap = new ConcurrentHashMap<>();

    // 計劃執行任務
    private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);

    private MyScheduledExecutor() {
    }

    // 設計爲單例模式
    private static final class InnerExecutorService {
        private static final MyScheduledExecutor INSTANCE = new MyScheduledExecutor();
    }
    public static MyScheduledExecutor getInstance() {
        return InnerExecutorService.INSTANCE;
    }

    public ConcurrentHashMap<String, Future> getFutureMap() {
        return futureMap;
    }

    public void shutdown() {
        executorService.shutdown();
    }

    /**
     * 執行任務
     * @param runnable {@code Runnable}
     */
    public void execute(Runnable runnable) {
        executorService.execute(runnable);
    }

    /**
     * 執行延時任務
     *
     * @param runnable {@code Runnable}
     * @param delay    延遲時間
     * @param timeUnit 時間單位
     */
    public void scheduler(Runnable runnable, long delay, TimeUnit timeUnit) {
        executorService.schedule(runnable, delay, timeUnit);
    }

    /**
     * 執行延時週期性任務scheduleAtFixedRate
     *
     * @param runnable     {@code ScheduledExecutorService.JobRunnable}
     * @param initialDelay 延遲時間
     * @param period       週期時間
     * @param timeUnit     時間單位
     * @param <T>          {@code ScheduledExecutorService.JobRunnable}
     */
    public <T extends JobRunnable> void scheduleAtFixedRate(T runnable, long initialDelay, long period, TimeUnit timeUnit) {
        Future future = executorService.scheduleAtFixedRate(runnable, initialDelay, period, timeUnit);
        futureMap.put(runnable.getJobId(), future);
    }

    /**
     * 執行延時週期性任務scheduleWithFixedDelay
     *
     * @param runnable     {@code ScheduledExecutorService.JobRunnable}
     * @param initialDelay 延遲時間
     * @param period       週期時間
     * @param timeUnit     時間單位
     * @param <T>          {@code ScheduledExecutorService.JobRunnable}
     */
    public <T extends JobRunnable> void scheduleWithFixedDelay(T runnable, long initialDelay, long period, TimeUnit timeUnit) {
        Future future = executorService.scheduleWithFixedDelay(runnable, initialDelay, period, timeUnit);
        futureMap.put(runnable.getJobId(), future);
    }

    public static abstract class JobRunnable implements Runnable {
        private String jobId;

        public JobRunnable(String jobId) {
            this.jobId = jobId;
        }

        public void terminal() {
            try {
                Future future = MyScheduledExecutor.getInstance().getFutureMap().remove(jobId);
                future.cancel(true);
            } finally {
                System.out.println("jobId " + jobId + " had cancel");
            }
        }

        public String getJobId() {
            return jobId;
        }
    }
}

調用示例

public static void main(String[] args) throws Exception {

    MyScheduledExecutor service = MyScheduledExecutor.getInstance();
    service.execute(() -> System.out.println("execute"));
//    service.scheduler(new Runnable() {
//        @Override
//        public void run() {
//            for (Map.Entry<String, Future> next : service.getFutureMap().entrySet()) {
//                String key = next.getKey();
//                int i = Integer.parseInt(key.substring(3));
//                // 中止部分線程
//                if (i % 2 == 0) {
//                    next.getValue().cancel(true);
//                }
//            }
//        }
//    }, 20, TimeUnit.SECONDS);

    for (int i = 0; i < 5; i++) {
        int num = new Random().nextInt(500);
        service.scheduleAtFixedRate(new MyScheduledExecutor.JobRunnable("scheduleAtFixedRate" + num) {
            @Override
            public void run() {
                System.out.println(num);
            }
        }, 10, 2, TimeUnit.SECONDS);
    }
    Thread.sleep(15000);
    for (Map.Entry<String, Future> next : service.getFutureMap().entrySet()) {
        String key = next.getKey();
        int i = Integer.parseInt(key.substring(3));
        // 中止部分線程
        if (i % 2 == 0) {
            next.getValue().cancel(true);
        }
    }
    Thread.sleep(20000);
    service.shutdown();
}

總結

須要注意,經過ScheduledExecutorService執行的週期任務,若是任務執行過程當中拋出了異常,那麼ScheduledExecutorService就會中止執行任務,且也不會再週期地執行該任務了。因此若是想保住任務都一直被週期執行,那麼catch一切可能的異常。

若是文章的內容對你有幫助,歡迎關注公衆號:優享JAVA(ID:YouXiangJAVA),那裏有更多的技術乾貨,並精心準備了一份程序員書單。期待你的到來!

相關文章
相關標籤/搜索