深刻剖析ThreadPool的運行原理

線程在執行任務時,正常的狀況是這樣的:編程

Thread  t=new Thread(new  Runnable() {            
            @Override
            public void run() {
                // TODO Auto-generated method stub    
            }
        });
        
        t.start();

  Thread 在初始化的時候傳入一個Runnable,之後就沒有機會再傳入一個Runable了。那麼,woker做爲一個已經啓動的線程。是如何不斷獲取Runnable的呢?
這個時候可使用一個包裝器,將線程包裝起來,在Run方法內部獲取任務。多線程

public final class Worker implements Runnable {
    Thread thread = null;
    Runnable task;
    private BlockingQueue<Runnable> queues;
    public Worker(Runnable task, BlockingQueue<Runnable> queues) {
        this.thread = new Thread(this);
        this.task = task;
        this.queues = queues;
    }
    public void run() {
        if (task != null) {
            task.run();
        } 
            try {
                while (true) {
                    task = queues.take();
                    if (task != null) {
                        task.run();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    public void start() {
        this.thread.start();
    }
}

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Runnable> queues=new ArrayBlockingQueue<Runnable>(100);
        Worker  worker=new Worker(new Runnable() {
            public void run() {
                System.out.println("hello!!! ");
                try {
                    Thread.currentThread().sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }            
            }
        }, queues);
        worker.start();
        for(int i=0;i<100;i++){
            queues.offer(new Runnable() {
                public void run() {
                    System.out.println("hello!!! ");
                    try {
                        Thread.currentThread().sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

}

  這樣咱們就簡單地實現了一個「線程池」(能夠將這個「線程池」改形成官方的模式,不過能夠本身嘗試一下)。ThreadPool的這種實現模式是併發編程中經典的Cyclic Work Distribution模式。
  那麼,這種實現的線程池性能如何呢?
  因爲其任務隊列使用的是阻塞隊列,在隊列內部是自旋的。Reeteenlok是改進的CLH隊列。自旋鎖會耗費必定CPU的資源,在擁有大量任務執行下的狀況下比較有效。並且,線程池中的線程並無睡眠,而是進入了自旋狀態。併發

CPU的線程與關係

  若是是不支持超線程的CPU,在同一時刻的確只能處理2個線程,可是並不意味着雙核的CPU只能處理兩個線程,它能夠經過切換上下文來執行多個線程。好比我只有一個大腦,可是我要處理5我的提交的任務,我能夠處理完A的事情後,把事情的中間結果保存下,而後再處理B的,而後再讀取A的中間結果,處理A的事情。ide

JDK中的線程池實現分析

  Woker自身繼承了Runnable,並對Thread作了一個包裝。Woker代碼以下所示:函數

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        private static final long serialVersionUID = 6138294804551838833L;

    
        Runnable firstTask;
   
        volatile long completedTasks;

 
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

  execute(Runnable command)方法內部是這樣的:oop

public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
     
       int c = ctl.get();
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       else if (!addWorker(command, false))
           reject(command);
   }

  ctl一個合併類型的值。將當前線程數和線程池狀態經過數學運算合併到了一個值。具體是如何合併的能夠參看一下源碼,這裏就不敘述了。繼續向下走:性能

if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

  能夠看到,若是當前線程數量小於了核心線程數量corePoolSize,就直接增長線程處理任務。與隊列沒有關係。可是緊接着又檢查了一遍狀態,由於在這個過程當中,別的線程也可能在添加任務。繼續向下走:ui

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

  能夠看到若是線程池是運行態的,就把線程添加到任務隊列。workQueue是構造函數傳遞過來的,能夠是有界隊列,也能夠是無界隊列。能夠看出來,隊列若是是無界的,直接往隊列裏面添加任務,這個時候,線程池中的線程也不會增長,一直會等於核心線程數。
  若是隊列是有界的,就嘗試直接新增線程處理任務,若是添加任務失敗,就調用reject方法來處理添加失敗的任務:this

else if (!addWorker(command, false))
            reject(command);

  來看看addWorker是如何實現的,邏輯流程已經直接在註釋中說明了。線程

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //若是狀態大於SHUTDOWN,再也不接受新的任務,直接返回
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

/**根據core來判斷,若是當前線程數量大於corePoolSize或者最大線程數,直接返回。添加任務失敗。
**若是隊列是有界的或者任務添加到隊列失敗(參數core是false),那麼就會新開一個線程處理業務,但若是線程已經大於了maximumPoolSize,就會出現添加失敗,返回false。
*/

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  若是建立失敗的狀況下會調用addWorkerFailed方法,從而將減小實際線程數。

addWorker中for循環的意義

  在addWorker中有這麼一段代碼,表示爲當前線程數加1:

private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

  因爲多線程可能同時操做。expect值可能會變化。僅僅一次的操做compareAndIncrementWorkerCount可能一次並不會成功,並且,一個線程在執行addWork的過程當中間,另一個線程假設直接shotdown這個線程池。for循環的存在能夠保證狀態必定是一致的。

任務的執行

在Worker中間其實是調用的runWorker方法來執行的具體業務:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  能夠看到while循環不斷的從隊列中取出任務執行。若是task==null 而且getTask()等於null的話,那麼就會跳出循環,進入到processWorkerExit,run方法執行完畢之後,這個線程也被銷燬了。可是爲何在各自的線程執行,爲何還須要加鎖呢?答案是由於要線程池須要判斷這個線程是否在執行任務。在interruptIdleWorkers方法中,要中斷那寫目前空閒的線程,經過當前Worker是否得到了鎖就能判斷這個worker是不是空閒的:

private void interruptIdleWorkers(boolean onlyOne) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
          for (Worker w : workers) {
              Thread t = w.thread;
              if (!t.isInterrupted() && w.tryLock()) {
                  try {
                      t.interrupt();
                  } catch (SecurityException ignore) {
                  } finally {
                      w.unlock(); //中斷不起做用。interrupt()對於自旋鎖是不起做用的。只是邏輯上被阻塞,
                  }
              }
              if (onlyOne)
                  break;
          }
      } finally {
          mainLock.unlock();
      }
  }

  能夠看到,若是w.tryLock()能夠獲取到鎖,那麼就意味着當前的 Woker並無處理任務(沒有進入到循環裏面或者被getTask方法所阻塞,沒法獲取鎖)。
Work之因此繼承AbstractQueuedSynchronizer,而不去使用ReentrantLock。是由於ReentrantLock是可重入鎖,在調用lock方法獲取鎖以後,再調用tryLock()仍是會返回true。

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        System.out.println(lock.tryLock());        
    }

輸出結果是true,因此使用ReentrantLock則難以判斷當前Worker是否在執行任務。

線程超時allowCoreThreadTimeOut、keepAliveTime以及線程死亡

  在上面的interruptIdleWorkers方法中,線程被中斷。普通的線程被中斷會致使線程繼續執行,從而run方法運行完畢,線程退出。

對於一個沒有被阻塞的線程,中斷是不起做用的。中斷在以下線程被阻塞的方法中起做用:
the wait(),
wait(long),
wait(long, int)
join(),
join(long),
join(long, int),
sleep(long),
or sleep(long, int)
LockSupport.park(Object object);
LockSupport.park();

  ,若是喚醒這些被阻塞的線程,從而能使得run方法繼續執行,當run方法執行完畢,那麼線程也就終結死亡。可是對於ReentrantLock和AbstractQueuedSynchronizer這種自旋+CAS實現的「邏輯鎖」,是不起做用的。
並且runWork自己也是While循環,靠中斷是沒法退出循環的。

  可是在ThreadPoolExecutor的構造函數中,有一個容許設置線程超時allowCoreThreadTimeOut參數的方法。若是容許超時,多於corePoolSize的線程將會在處在空閒狀態以後存活keepAliveTime時長後終止。所以有了一個allowCoreThreadTimeOut方法:

public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

   正如上面提到的同樣,容許allowCoreThreadTimeOut而且調用interruptIdleWorkers方法並不能使線程退出。那麼線程池又如何殺掉這個線程呢?
  沒錯,就是getTask方法。只有當getTask返回null的時候才能跳出While循環,run方法運行完畢,那麼線程天然而然就死亡了。getTask方法以下所示:

private Runnable getTask() {
      boolean timedOut = false; // Did the last poll() time out?
      for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);

          // Check if queue empty only if necessary.
          if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
              decrementWorkerCount();
              return null;
          }

          int wc = workerCountOf(c);
          // Are workers subject to culling?
          boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
          if ((wc > maximumPoolSize || (timed && timedOut))
              && (wc > 1 || workQueue.isEmpty())) {
              if (compareAndDecrementWorkerCount(c))
                  return null;
              continue;
          }
          try {
              Runnable r = timed ?
                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                  workQueue.take();
              if (r != null)
                  return r;
              timedOut = true;
          } catch (InterruptedException retry) {
              timedOut = false;
          }
      }
  }

  能夠看到,若是線程池狀態大於SHUTDOWN而且隊列空,返回null,從而結束循環。(線程死亡)

  或者狀態大於SHUTDOWN而且線程大於STOP(STOP必定大於SHUTDOWN,因此能夠直接說線程大於STOP)返回null,從而結束循環。(線程死亡)
再往下能夠看到若是超過了maximumPoolSize,返回null,從而結束循環。(線程死亡)
超過keepAliveTime時間,任務對列沒有數據而返回null。從而結束循環。(線程死亡)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;保證了線程池至少留下corePoolSize個線程。

預約義的拒接處理協議

  在execute方法中,若是線程池拒絕添加任務,就會有一個鉤子方法來處理被拒絕的任務。
能夠本身定義,也可使用線城池中默認的拒接處理協議。

AbortPolicy :直接拋出RejectedExecutionException異常;

CallerRunsPolicy:誰調用的execute方法,誰就執行這個任務;

DiscardPolicy:直接丟棄,什麼也不作;

DiscardOldestPolicy:丟棄對列中間最老的任務,執行新任務。

有什麼問題或者建議,能夠加入小密圈和我一塊兒討論,或者在簡書留言,歡迎喜歡和打賞。

最後向你們安利一本我寫的關於Java併發的書籍:Java併發編程系統與模型,我的以爲寫得不錯,比較通俗易懂,很是適合初學者,百度閱讀能夠下載電子書。

相關文章
相關標籤/搜索