[轉載] Java線程池框架源碼分析

轉載自http://www.linuxidc.com/Linux/2014-11/108791.htmlinux

相關類Executor,Executors,AbstractExecutorService,ExecutorService緩存

Executor:整個線程池執行者框架的頂層接口。定義了一個execute方法,整個線程執行者框架的核心方法。安全

public interface Executor {框架

    void execute(Runnable command);
}函數

ExecutorService:這是一個接口它繼承自Executor,定義了shutdown,shutdownNow,awaitTermination,submit,invokeAll等方法。this

AbstractExecutorService:實現了ExecutorService接口中的submit,invokeAll的方法。線程

  public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }設計

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }htm


    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }對象

在這裏,全部submit方法提交的任務最終仍是調用了execute方法,execute是接口Executor中定義的方法,AbstractExecutorService沒有實現它,

須要子類去實現這個方法,ThreadPoolExecutor繼承了AbstractExecutorService,它實現了execute方法。ScheduledThreadPoolExecutor繼承自

ThreadPoolExecutor,並覆蓋了ThreadPoolExecutor的execute方法。這個方法是線程執行框者架的核心邏輯,不一樣的線程池執行者有不一樣的實現邏輯。

AbstractExecutorService的功能較爲簡單,實現了不一樣參數的submit,invokeAll方法。

ThreadPoolExecutor線程池執行者:它有一個核心的成員變量:

private final HashSet<Worker> workers = new HashSet<Worker>();

workers能夠看作是ThreadPoolExecutor中用於運行任務的線程池。

worker是一個封裝了一個Thread對象並實現了Runnable接口的類。封裝Thread很容易理解,由於它要利用Thread去運行execute方法提交過來的runnable任務,可是爲何會繼承runnable接口呢?

下面是剔除了部分代碼的Worker源碼:

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
      final Thread thread;
        
        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }


    }

Worker是ThreadPoolExecutor的一個內部類,Worker自己實現了Runnable接口,並封裝了一個Thread對象,最後在構造方法中獲取了一個Runnable對象,這個對象就是ThreadPoolExecutor經過execute提交過來的目標任務。

跟蹤runWorker(this)方法:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();

                if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//在這裏直接調用了目標任務的run方法,並無將它傳給Thread對象。
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

回過頭來在看看Worker的構造方法:

Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

它將本身傳給了本身的成員變量thread。目標任務被執行的步驟可能就是:Worker的成員變量thread啓動後調用worker的run方法,worker的run方法中將本身傳給runWorker,runWorker在調用目標執行對象的run方法。

那麼thread是什麼時候被執行的呢?

下面看看ThreadPoolExecutor中的一個其餘方法:

  private boolean addWorker(Runnable firstTask, boolean core) {
      ......
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;//這裏初始化一個Worker對象w,在將w的成員變量thread付給t
            if (t != null) {
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//在這裏調用t的start方法。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

這裏爲何會設計的這麼繞,我想主要是Worker不只封裝了一個thread,並且對目標任務進行了封裝,在運行封裝事後的目標任務前,addWorker能夠作一些相關操做。

這裏僅僅介紹了ThreadPoolExecutor的線程池,那麼這個線程池是如何被維護的,下面介紹幾個關鍵的參數。

private volatile int corePoolSize;
  private volatile int maximumPoolSize;
  private final BlockingQueue<Runnable> workQueue;

這三個是ThreadPoolExecutor的成員變量,其中workQueue跟縣城池沒有關係。workQueue是一個線程安全的阻塞隊列。

corePoolSize是線程池的核心大小,maximumPoolSize是線程池的最大大小。

當提交新任務時,若是ThreadPoolExecutor中有線程在運行,而且線程的數量小於corePoolSize,那麼就會有新的線程被建立。若是當前運行的線程數大於corePoolSize,就會放到緩存隊列workQueue中。若是緩衝隊列也滿了,就繼續建立線程,直到線程的數量達到maximumPoolSize

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        //判斷若是當前運行的線程數小於 corePoolSize,添加新的線程(addWorker會添加一個新的線程,上面有介紹),方法直接返回。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }


        //若是當前的運行的線程數大於或等於corePoolSize則新的任務會放到緩存隊列中。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
        //最後緩衝隊列添加失敗,則會繼續添加線程。若是添加新的線程失敗,則拒絕這個任務。
            reject(command);

    }

還有些其餘的參數:

private volatile ThreadFactory threadFactory //線程的工廠函數。
 private volatile RejectedExecutionHandler handler;//任務拒絕的處理類。
 private volatile long keepAliveTime;//任務等待的是將。

ThreadPoolExecutor有幾個構造方法來初始化這些參數。Executors類將這些參數簡化了來得到一個ExecutorService的引用。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

這四個方法中前兩個的核心線程數和最大線程數相同,全部可運行的線程數是固定的,<=nThreads。當任務數大於nThreads時,就是放入緩衝隊列中。  後兩個方法中,線程數是無邊界的,核心線程數是0,最大線程數是整型的最大值,而後若是有線程60秒內沒有任務運行的話就銷燬。每次有新的任務來,都會建立新的線程或使用之前建立的線程(60秒內沒有任務運行的線程)。你可能有疑問,既然核心線程數是0,那麼全部的任務不是都放到隊裏裏了嗎?那麼如今就來看看SynchronousQueue這個隊裏,能夠看看這裏的介紹 http://www.linuxidc.com/Linux/2014-11/108792.htm 。

回過頭來看看任務提交方法的源碼:

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//這裏是在往隊列裏方任務,若是不成功就會添加Worker(封裝了線程對象)
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

上面連接裏提到:offer()往queue裏放一個element後當即返回,若是碰巧這個element被另外一個thread取走了,offer方法返回true,認爲offer成功;不然返回false。

試想一下,第一次提交任務的時候,核心線程數爲0,此時沒有線程因此沒有線程從workQueue中取東西,因此這裏的workQueue.offer(command)會返回false,那麼就會經過addWorker(command, false)建立一個新的線程。

相關文章
相關標籤/搜索