We all know wthat is thread pool used for, in Java, the core class of thread pool is ThreadPoolExecutor, less
Through usually we don't use this class directly, we still need to know what the basic machanism of this class.async
As we know, thread pool allow us to reuse created thread, so it must have some basic policy to control the number of threads in pool:ide
1. If fewer than corePoolSize threads are running, always try to start a new thread with the submitted task.oop
2. If grater than corePoolSize, pool will try to add the task to one blocking queue, if a task can be successfully queued, then we still need to double-check whether we should have added a thread(because existing ones died since last checking) or that the pool shut down since entry into this method. ui
3. If we cannot queue task(Maybe the queue is full), then we try to add a new thread until it reachs the maxiumPoolSize. If it fails, we know we are shut down or saturated and so reject the task.this
4:We can difine the reject policy if the pool is saturated, for example: throw exception directly/execute this task directly/silently discards the rejected task.spa
For each thread pool, it has two important conceptual fields, code
1: workerCount: indicating the effective number of threads in this pool.blog
2: runState: Thread pool has five states to indicate whether this pool is running, shutting down etc. ip
And The runState provides the main lifecycle control like below:
Constructor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//omit some checks
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize: the number of threads to keep in the pool, even if they are idle, unless 'llowCoreThreadTimeOut' is set.
maximumPoolSize: the maximum number of threads to allow in the pool.
keepAliveTime: when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit: the time unit for the keepAliveTime argument.
workQueue: the blocking queue to use for holding tasks before executed. This queue will hold only the Runnable tasks submitted by the execute(Runnable runnable) method.
handler: User defined policy to reject the task when the pool is saturated.
As mentioned above, thread pool has 2 important fields, workCount & runState, but JDK doesn't declare 2 fields for each, it constructs one AtomicInteger variable 'ctl' via 'workCount' & 'runState'.
1: Each int variable has 32 bits, runState is stored in the high-order bits(Occupy 3 bits), and workCount is in low-order bits(Occupy 29 bits). We can use Bit operation '|' to construct the new variable.
2: The benefits of construct 2 variables to one is to avoid multiple thread problem.
//Thread pool initial state is running and workcount should be 0.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; //The runState provides the main lifecycle control, taking on values: private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
All the tasks submitted to thread pool finally it will call the 'execute(Runnable command)' method to run the task,
public void execute(Runnable command) { //Can not be null if (command == null) throw new NullPointerException(); // int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
From above, the execute() method use addWorker(Runnable firstTask, boolean core) method to control the concrete task/thread scheduler
//
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //Pre-Chek,each Time need to make sure the pool is not shutdown.Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){ return false;
} for (;;) { int wc = workerCountOf(c);
//According to the parameter 'core', make sure the number of workerCount don't exceed if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//work count +1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);// initiate one Worker final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //start the thread workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
From above, we know that it encapsulate the thread to inner class worker, we extracted part of the code:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ //Thread this worker is running in Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {
//Loop, After we executed our task, this thread will also try to get the task from blocking queue until the queue is empty while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//hook Throwable thrown = null; try { task.run(); //Just execute the task } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
Above is the general flow for how thread pool execute one task. As we mentioned, we don't use the ThreadPoolExecutor usually, cause JDK already provide us some util class to create different kinds of
thread pool to meet different requirement.
Simple example for creating one fixed number thread pool:
//Use util class create one thread pool ExecutorService executorService = Executors.newFixedThreadPool(5); //submit one asynchronized task Future<String> future = executorService.submit(new Callable<String>() { @Override public String call() { System.out.println("Select * from tableName"); //Mock wait LockSupport.parkNanos(1000); System.out.println("Select Done!"); return "Query Data"; } }); //get the result String returnStr = future.get(); System.out.println(returnStr); //shut down pool System.out.println(executorService.isShutdown()); executorService.shutdown(); System.out.println(executorService.isShutdown());
Henry
22:53:34