阻塞式ThreadPoolExecutor

最近在作一個大文件批量數據導入數據庫的時候遇到個問題,在使用ThreadPoolExecutor提交任務的時候,發如今線程池滿的時候,不能達到阻塞線程的做用。致使的後果就是文件被不斷讀取到內存,而後丟給ThreadPoolExecutor執行,因爲消費速度跟不上生產速度,致使內存不斷增加,最後OOM。java

因而開始研究ThreadPoolExecutor如何實如今任務滿的狀況下阻塞線程。數據庫

ThreadPoolExecutor類提供了多個參數用於定製化本身的線程池,經常使用的有corePoolSize,maximumPoolSize,workQueue等幾個,以下面構造函數:多線程

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

爲了實現阻塞式線程池,workQueue參數須要傳一個有界的BlockQueue,默認Executors.newFixedThreadPool()傳入的無參數LinkedBlockingQueue邊界爲Integer.MAX_VALUE,不能起到Block的效果。less

傳了有界BlockQueue以後,ThreadPoolExecutor在線程隊列Blcok的時候不會阻塞線程提交,而是調用RejectedExecutionHandler,拋出RejectedExecutionException異常。ide

JDK默認提供了4種失敗策略: 
    AbortPolicy(停止)、CallersRunPolicy(調用者運行)、DiscardPolicy(丟棄)、DiscardOldestPolicy(丟棄最舊的)
函數

JDK默認使用了AbortPolicy(停止)策略,這個能夠經過handler參數來設置。工具


這裏收集了幾種阻塞線程池提交的方法:ui

1、經過CallersRunPolicy調用策略實現this

其中CallersRunPolicy(調用者運行)方法,在線程池隊列滿了後會調用主線程來執行任務,一樣能夠達到阻塞線程提交的目的。這樣作有兩個缺點:spa

    一、執行任務的線程會是size+1個(主線程),這在有些資源敏感的場景是不被容許的

    二、因爲主線程被用於執行任務,若是這個任務比較大,會長時間阻塞主線程的執行,致使其餘線程空閒時候也不能接受新的任務,造成資源浪費

實例代碼以下:

new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
                new ThreadPoolExecutor.CallerRunsPolicy())


2、經過自定義RejectedExecutionHandler實現

經過自定義RejectedExecutionHandler,顯示調用queue.put()阻塞方法來實現線程池阻塞。這種方法可以避免CallersRunPolicy方法的兩個缺點。

示例代碼以下:

new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });

3、經過其餘多線程工具輔助控制

好比經常使用的能夠經過信號量來控制,在提交任務的時候acquire,任務執行完後release。

這種方法的缺點是會侵入任務的執行過程

示例代碼以下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
//        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(10));
        final Semaphore semaphore = new Semaphore(5);
        final AtomicInteger counter = new AtomicInteger(0);
        int i = 0;
        while (true) {
            semaphore.acquire();
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        int count = counter.addAndGet(1);
                        System.out.println(Thread.currentThread() + "start, counter: " + count);
                        try {
                            Thread.sleep(1000 * 2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread() + "end, counter: " + count);
                    } finally {
                        semaphore.release();
                    }
                }
            });
            if (++i > 20) {
                break;
            }
            System.out.println("now it is " + i);
        }

        System.out.println("shotdown...");
        ExecutorUtils.shutdownAndWait(executorService, executorService.toString());
        System.out.println("Test ends.");
    }

這裏注意線程池的shutdown過程,沒有使用簡單的shutdown,由於這樣會致使部分task沒有執行完成

ExecutorUtils.shutdownAndWait方法代碼以下:

    public static void shutdownAndWait(ExecutorService executor, String name) {
        log.info("Shutting down " + name);
        executor.shutdown();
        awaitTermination(executor, name);
    }
    private static void awaitTermination(ExecutorService executor, String name) {
        try {
            while (!executor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT, TimeUnit.SECONDS)) {
                log.info("Waiting for all tasks complete execution in " + name);
            }
            log.info(name + " is shut down.");
        } catch (InterruptedException e) {
            log.error("Shutting down " + name + " failed.", e);
            Thread.currentThread().interrupt();
        }
    }
相關文章
相關標籤/搜索