所以若是 使用無界阻塞隊列,那麼線程數將只會達到 coreThreadCount(若是coreThreadCount == 0,那麼會建立一個線程),而不會繼續建立到 maxThreadCount.java
例如: ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 事實上這個pool只會建立一個線程,這個pool事實上是串行執行的.
能夠參見 ThreadPoolExecutor.execute(Runnable)方法:ide
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
// 注意,這裏 coreThreadCount 設置爲和 maxThreadCount 同樣大 ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); // 這個的做用是使得coreThread和非coreThread採用同樣的idle超時destroy策略. pool.allowCoreThreadTimeOut(true);
注意: 這樣作的特色是ui
即便那4個線程目前處於idle狀態
.若是這不符合你的要求,能夠採用下面的方案: java 1.5 及之前
private static class SpecialBlockingQueue<T> extends LinkedBlockingQueue<T> { private ThreadPoolExecutor executor; public void setExecutor(ThreadPoolExecutor executor) { synchronized (this) { if(this.executor != null) { throw new IllegalStateException("You can only call setExecutor() once!"); } if(executor == null) { throw new NullPointerException("executor argument can't be null!"); } this.executor = executor; } } @Override public boolean offer(T t) { synchronized (this) { if(executor == null) { throw new NullPointerException("you must call setExecutor() before use it!"); } int poolSize = executor.getPoolSize(); int activeSize = executor.getActiveCount(); int maxSize = executor.getMaximumPoolSize(); if(activeSize == poolSize && poolSize < maxSize) { return false; // let ThreadPoolExecutor create a new Thread } } return super.offer(t); } } // 注意,這裏 coreThreadCount 就能夠直接設置爲 0 SpecialBlockingQueue<Runnable> specialBlockingQueue = new SpecialBlockingQueue<>(); ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, 5, 2, TimeUnit.SECONDS, specialBlockingQueue, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("ThreadPoolExecutor-" + t.getId() + "-" + System.identityHashCode(t)); return t; } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace();// will never reach here } } }); specialBlockingQueue.setExecutor(tpe);
Executors.newFixedThreadPool()
上面講的方案
或者Executors.newCachedThreadPool()