最近在作一個大文件批量數據導入數據庫的時候遇到個問題,在使用ThreadPoolExecutor提交任務的時候,發如今線程池滿的時候,不能達到阻塞線程的做用。致使的後果就是文件被不斷讀取到內存,而後丟給ThreadPoolExecutor執行,因爲消費速度跟不上生產速度,致使內存不斷增加,最後OOM。java
因而開始研究ThreadPoolExecutor如何實如今任務滿的狀況下阻塞線程。數據庫
ThreadPoolExecutor類提供了多個參數用於定製化本身的線程池,經常使用的有corePoolSize,maximumPoolSize,workQueue等幾個,以下面構造函數:多線程
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
爲了實現阻塞式線程池,workQueue參數須要傳一個有界的BlockQueue,默認Executors.newFixedThreadPool()傳入的無參數LinkedBlockingQueue邊界爲Integer.MAX_VALUE,不能起到Block的效果。less
傳了有界BlockQueue以後,ThreadPoolExecutor在線程隊列Blcok的時候不會阻塞線程提交,而是調用RejectedExecutionHandler,拋出RejectedExecutionException異常。ide
JDK默認提供了4種失敗策略:
AbortPolicy(停止)、CallersRunPolicy(調用者運行)、DiscardPolicy(丟棄)、DiscardOldestPolicy(丟棄最舊的)函數
JDK默認使用了AbortPolicy(停止)策略,這個能夠經過handler參數來設置。工具
這裏收集了幾種阻塞線程池提交的方法:ui
1、經過CallersRunPolicy調用策略實現this
其中CallersRunPolicy(調用者運行)方法,在線程池隊列滿了後會調用主線程來執行任務,一樣能夠達到阻塞線程提交的目的。這樣作有兩個缺點:spa
一、執行任務的線程會是size+1個(主線程),這在有些資源敏感的場景是不被容許的
二、因爲主線程被用於執行任務,若是這個任務比較大,會長時間阻塞主線程的執行,致使其餘線程空閒時候也不能接受新的任務,造成資源浪費
實例代碼以下:
new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy())
2、經過自定義RejectedExecutionHandler實現
經過自定義RejectedExecutionHandler,顯示調用queue.put()阻塞方法來實現線程池阻塞。這種方法可以避免CallersRunPolicy方法的兩個缺點。
示例代碼以下:
new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { throw new RuntimeException(e); } } });
3、經過其餘多線程工具輔助控制
好比經常使用的能夠經過信號量來控制,在提交任務的時候acquire,任務執行完後release。
這種方法的缺點是會侵入任務的執行過程
示例代碼以下:
public static void main(String[] args) throws InterruptedException, ExecutionException { // ExecutorService executorService = Executors.newFixedThreadPool(5); ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10)); final Semaphore semaphore = new Semaphore(5); final AtomicInteger counter = new AtomicInteger(0); int i = 0; while (true) { semaphore.acquire(); executorService.submit(new Runnable() { @Override public void run() { try { int count = counter.addAndGet(1); System.out.println(Thread.currentThread() + "start, counter: " + count); try { Thread.sleep(1000 * 2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "end, counter: " + count); } finally { semaphore.release(); } } }); if (++i > 20) { break; } System.out.println("now it is " + i); } System.out.println("shotdown..."); ExecutorUtils.shutdownAndWait(executorService, executorService.toString()); System.out.println("Test ends."); }
這裏注意線程池的shutdown過程,沒有使用簡單的shutdown,由於這樣會致使部分task沒有執行完成
ExecutorUtils.shutdownAndWait方法代碼以下:
public static void shutdownAndWait(ExecutorService executor, String name) { log.info("Shutting down " + name); executor.shutdown(); awaitTermination(executor, name); } private static void awaitTermination(ExecutorService executor, String name) { try { while (!executor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT, TimeUnit.SECONDS)) { log.info("Waiting for all tasks complete execution in " + name); } log.info(name + " is shut down."); } catch (InterruptedException e) { log.error("Shutting down " + name + " failed.", e); Thread.currentThread().interrupt(); } }