ThreadPoolExecutor 建立Worker Thread的策略

ThreadPoolExecutor建立新線程的條件:

  • coreThreadCount < 已建立線程數 < maxThreadCount, 或者 已建立線程數 < maxThreadCount 且 阻塞隊列已滿

所以若是 使用無界阻塞隊列,那麼線程數將只會達到 coreThreadCount(若是coreThreadCount == 0,那麼會建立一個線程),而不會繼續建立到 maxThreadCount.java

例如:
ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

事實上這個pool只會建立一個線程,這個pool事實上是串行執行的.

能夠參見 ThreadPoolExecutor.execute(Runnable)方法:ide

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

java 1.6 之後能夠如此解決:

// 注意,這裏 coreThreadCount 設置爲和 maxThreadCount 同樣大
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

// 這個的做用是使得coreThread和非coreThread採用同樣的idle超時destroy策略.
pool.allowCoreThreadTimeOut(true);

注意: 這樣作的特色是ui

  • 提交一個task後,只要當前的 線程數(pool size)小於 core size,那麼就會建立一個新的Worker Thread(緣由參見上面列出的execute方法實現).
  • 就上面的例子來看,假設目前線程池裏面已有4個線程,那麼再次提交一個任務後(execute或者submit),線程池都會建立一個新的Worker Thread,即便那4個線程目前處於idle狀態.若是這不符合你的要求,能夠採用下面的方案: java 1.5 及之前
  • allowCoreThreadTimeOut(true) 的做用是當線程空閒時間超過設置的idle時間後,就會銷燬該線程,即便當前線程數小於core size.所以空閒一段時間後,線程池中的線程將會減小到0.

java 1.5 及之前:

private static class SpecialBlockingQueue<T> extends LinkedBlockingQueue<T> {
	private ThreadPoolExecutor executor;
	
	public void setExecutor(ThreadPoolExecutor executor) {
	    synchronized (this) {
	        if(this.executor != null) {
	            throw new IllegalStateException("You can only call setExecutor() once!");
	        }
	        if(executor == null) {
	            throw new NullPointerException("executor argument can't be null!");
	        }
	        this.executor = executor;
	    }
	}
	
	@Override
	public boolean offer(T t) {
	    synchronized (this) {
	        if(executor == null) {
	            throw new NullPointerException("you must call setExecutor() before use it!");
	        }
	
	        int poolSize = executor.getPoolSize();
	        int activeSize = executor.getActiveCount();
	        int maxSize = executor.getMaximumPoolSize();
	
	        if(activeSize == poolSize && poolSize < maxSize) {
	            return false; // let ThreadPoolExecutor create a new Thread
	        }
	    }
	    return super.offer(t);
	}
}

// 注意,這裏 coreThreadCount 就能夠直接設置爲 0
SpecialBlockingQueue<Runnable> specialBlockingQueue = new SpecialBlockingQueue<>();
ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, 5, 2, TimeUnit.SECONDS, specialBlockingQueue, new ThreadFactory() {
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("ThreadPoolExecutor-" + t.getId() + "-" + System.identityHashCode(t));
        return t;
    }
}, new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            e.printStackTrace();// will never reach here
        }
    }
});
specialBlockingQueue.setExecutor(tpe);

備註

  • 若是線程池使用比較頻繁,那麼可能會出現Thread剛被銷燬就須要建立的狀況,能夠考慮使用Executors.newFixedThreadPool()
  • 若是但願線程池中的線程數能夠降爲0.(非 daemon 的Thread將會阻止JVM退出)那麼能夠考慮上面講的方案或者Executors.newCachedThreadPool()
相關文章
相關標籤/搜索