ThreadPool實現原理

本文主要分析java.util.concurrent.ThreadPoolExecutor的實現原理,首先看它的構造函數:java

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:線程池中穩定保存的線程數(一開始會小於這個數)
  • maximumPoolSize:線程池中最大線程數
  • keepAliveTime and unit:大於最小線程數的線程空閒後存活時間
  • workQueue:用於存聽任務的阻塞隊列
  • threadFactory:用於建立線程的工廠類
  • handler:當任務隊列滿了且線程數達到了最大時的飽和策略

對於IO密集型任務,線程數通常設爲CPU數*2,對於計算密集型任務,線程數通常設爲CPU數。函數

當調用execute方法時:ui

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

其流程如圖:this

clipboard.png

建立線程是經過addWorker建立內部Worker類,其中調用getThreadFactory().newThread(this)來建立執行本身的線程,以後在addWorker中start該線程,執行Worker run方法中的runWorker會不斷的從任務隊列中獲取任務或阻塞,而且每次執行任務前會執行beforeExecute,以後會afterExecute,能夠經過重寫beforeExecute方法來給執行線程重命名。atom

線程池狀態變化如圖:spa

  • RUNNING: Accept new tasks and process queued tasks
  • SHUTDOWN: Don't accept new tasks, but process queued tasks
  • STOP: Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
  • TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
  • TERMINATED: terminated() has completed

clipboard.png

shutdownNow終止線程的方法是經過調用Thread.interrupt()方法來實現的:線程

* <p> If this thread is blocked in an invocation of the {@link
 * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
 * Object#wait(long, int) wait(long, int)} methods of the {@link Object}
 * class, or of the {@link #join()}, {@link #join(long)}, {@link
 * #join(long, int)}, {@link #sleep(long)}, or {@link #sleep(long, int)},
 * methods of this class, then its interrupt status will be cleared and it
 * will receive an {@link InterruptedException}.
 *
 * <p> If this thread is blocked in an I/O operation upon an {@link
 * java.nio.channels.InterruptibleChannel InterruptibleChannel}
 * then the channel will be closed, the thread's interrupt
 * status will be set, and the thread will receive a {@link
 * java.nio.channels.ClosedByInterruptException}.
 *
 * <p> If this thread is blocked in a {@link java.nio.channels.Selector}
 * then the thread's interrupt status will be set and it will return
 * immediately from the selection operation, possibly with a non-zero
 * value, just as if the selector's {@link
 * java.nio.channels.Selector#wakeup wakeup} method were invoked.
 *
 * <p> If none of the previous conditions hold then this thread's interrupt
 * status will be set. </p>

能夠看到若是線程處於正常活動狀態,那麼會將該線程的中斷標誌設置爲true,而沒法中斷當前的線程。因此,shutdownNow並不表明線程池就必定當即就能退出,它也可能必需要等待全部正在執行的任務都執行完成了才能退出。code

相關文章
相關標籤/搜索