ThreadPoolExecutor原理及使用

你們先從ThreadPoolExecutor的整體流程入手: 

針對ThreadPoolExecutor代碼,咱們來看下execute方法:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
	//poolSize大於等於corePoolSize時不增長線程,反之新初始化線程
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
	    //線程執行狀態外爲執行,同時能夠添加到隊列中
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
	    //poolSize大於等於corePoolSize時,新初始化線程
            else if (!addIfUnderMaximumPoolSize(command))
		//沒法添加初始化執行線程,怎麼執行reject操做(調用RejectedExecutionHandler)
                reject(command); // is shutdown or saturated
        }
    }

 咱們再看下真正的線程執行者(Worker):

private final class Worker implements Runnable {
	/**
         * Runs a single task between before/after methods.
         */
        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * If pool is stopping ensure thread is interrupted;
                 * if not, ensure thread is not interrupted. This requires
                 * a double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
		 //當線程池的執行狀態爲關閉等,則執行當前線程的interrupt()操做
                if ((runState >= STOP ||
                    (Thread.interrupted() && runState >= STOP)) &&
                    hasRun)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
		    //任務執行
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

        /**
         * Main run loop
         */
        public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
		//判斷是否存在須要執行的任務
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
		//若是沒有,則將工做線程移除,當poolSize爲0是則嘗試關閉線程池
                workerDone(this);
            }
        }
    }

    /* Utilities for worker thread control */

    /**
     * Gets the next task for a worker thread to run.  The general
     * approach is similar to execute() in that worker threads trying
     * to get a task to run do so on the basis of prevailing state
     * accessed outside of locks.  This may cause them to choose the
     * "wrong" action, such as trying to exit because no tasks
     * appear to be available, or entering a take when the pool is in
     * the process of being shut down.  These potential problems are
     * countered by (1) rechecking pool state (in workerCanExit)
     * before giving up, and (2) interrupting other workers upon
     * shutdown, so they can recheck state. All other user-based state
     * changes (to allowCoreThreadTimeOut etc) are OK even when
     * performed asynchronously wrt getTask.
     *
     * @return the task
     */
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
		//當線程池大於corePoolSize,同時,存在執行超時時間,則等待相應時間,拿出隊列中的線程
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
		//阻塞等待隊列中能夠取到新線程
                    r = workQueue.take();
                if (r != null)
                    return r;
		//判斷線程池運行狀態,若是大於corePoolSize,或者線程隊列爲空,也或者線程池爲終止的工做線程能夠銷燬
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

     /**
     * Performs bookkeeping for an exiting worker thread.
     * @param w the worker
     */
     //記錄執行任務數量,將工做線程移除,當poolSize爲0是則嘗試關閉線程池
    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

 

 經過上述代碼,總結下四個關鍵字的用法

  • corePoolSize 核心線程數量

線程保有量,線程池總永久保存執行線程的數量 java

  • maximumPoolSize 最大線程數量

最大線程量,線程最多不能超過此屬性設置的數量,當大於線程保有量後,會新啓動線程來知足線程執行。 緩存

  • 線程存活時間

獲取隊列中任務的超時時間,當閾值時間內沒法獲取線程,則會銷燬處理線程,前提是線程數量在corePoolSize 以上 多線程

  • 執行隊列

執行隊列是針對任務的緩存,任務在提交至線程池時,都會壓入到執行隊列中。因此這裏你們最好設置下隊列的上限,防止溢出 app

 

ThreadPoolExecuter的幾種實現

 

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  •  CachedThreadPool 執行線程不固定,
     好處:能夠把新增任務所有緩存在一塊兒,
     壞處:只能用在短期完成的任務 (佔用時間較長的操做能夠致使線程數無限增大,系統資源耗盡)
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  •  單線程線程池
       好處:針對單cpu,單線程避免系統資源的搶奪
       壞處:多cpu多線程時,不能徹底利用cpu資源
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
  •     固定長度線程池
        好處:線程數量固定,不會存在線程重複初始化
        壞處:沒有對隊列大小進行限制,線程初始化後,不再能回收線程資源
相關文章
相關標籤/搜索