java併發編程——線程池的工做原理與源碼解讀

線程池的簡單介紹

基於多核CPU的發展,使得多線程開發日趨流行。然而線程的建立和銷燬,都涉及到系統調用,比較消耗系統資源,因此就引入了線程池技術,避免頻繁的線程建立和銷燬。html

在Java用有一個Executors工具類,能夠爲咱們建立一個線程池,其本質就是new了一個ThreadPoolExecutor對象。java

建議使用較爲方便的 Executors 工廠方法來建立線程池。緩存

  • Executors.newCachedThreadPool()(無界線程池,能夠進行自動線程回收)
  • Executors.newFixedThreadPool(int)(固定大小線程池)
  • Executors.newSingleThreadExecutor()(單個後臺線程)。
  • Executors.newScheduledThreadPool() (支持計劃任務的線程池)

ThreadPoolExecutor工做原理介紹

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  1. corePoolSize:線程池的核心線程數,說白了就是,即使是線程池裏沒有任何任務,也會有corePoolSize個線程在候着等任務。
  2. maximumPoolSize:最大線程數,無論你提交多少任務,線程池裏最多工做線程數就是maximumPoolSize。
  3. keepAliveTime:線程的存活時間。當線程池裏的線程數大於corePoolSize時,若是等了keepAliveTime時長尚未任務可執行,則線程退出。
  4. unit:這個用來指定keepAliveTime的單位,好比秒:TimeUnit.SECONDS。
  5. workQueue:一個阻塞隊列,提交的任務將會被放到這個隊列裏。
  6. threadFactory:線程工廠,用來建立線程,主要是爲了給線程起名字,默認工廠的線程名字:pool-1-thread-3。
  7. handler:拒絕策略,當線程池裏線程被耗盡,且隊列也滿了的時候會調用。

線程池的執行流程圖 多線程

任務被提交到線程池,會先判斷當前線程數量是否小於corePoolSize,若是小於則建立線程來執行提交的任務,不然將任務放入workQueue隊列,若是workQueue滿了,則判斷當前線程數量是否小於maximumPoolSize,若是小於則建立線程執行任務,不然就會調用handler,以表示線程池拒絕接收任務。框架

線程池使用介紹

newScheduledThreadPool的使用示例

public class SchedulePoolDemo {

    public static void main(String[] args){
        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
        //若是前面的任務沒有完成, 調度也不會啓動
        service.scheduleAtFixedRate(()->{
         try {
             Thread.sleep(2000);// 每兩秒打印一次.
             System.out.println(System.currentTimeMillis()/1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
        }, 0, 2, TimeUnit.SECONDS);
    }
}

潛在宕機風險

使用Executors來建立要注意潛在宕機風險.其返回的線程池對象的弊端以下:ide

  • FixedThreadPool和SingleThreadPoolPool : 容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM.
  • CachedThreadPool和ScheduledThreadPool : 容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM.

綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來建立線程池, 具體構造函數配置以下:函數

線程池大小配置

通常根據任務類型進行區分, 假設CPU爲N核工具

  • CPU密集型任務須要減小線程數量, 下降線程之間切換形成的開銷, 可配置線程池大小爲N + 1.
  • IO密集型任務則能夠加大線程數量, 可配置線程池大小爲 N * 2.
  • 混合型任務則能夠拆分爲CPU密集型與IO密集型, 獨立配置.

自定義阻塞隊列BlockingQueue

主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不一樣的排隊隊列.oop

  • ArrayBlockingQueue:先進先出隊列,建立時指定大小, 有界;
  • LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小爲Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用於生產者和消費者互等對方, 一塊兒離開.
  • PriorityBlockingQueue: 支持優先級的隊列

回調接口

線程池提供了一些回調方法, 具體使用以下所示.源碼分析

ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行任務: " + r.toString());
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("結束任務: " + r.toString());
            }
            @Override
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };

能夠在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短期等等, 還有一些其餘的屬性以下:

  • taskCount:線程池須要執行的任務數量.
  • completedTaskCount:線程池在運行過程當中已完成的任務數量.小於或等於taskCount.
  • largestPoolSize:線程池曾經建立過的最大線程數量.經過這個數據能夠知道線程池是否滿過.如等於線程池的最大大小,則表示線程池曾經滿了.
  • getPoolSize:線程池的線程數量.若是線程池不銷燬的話,池裏的線程不會自動銷燬,因此這個大小隻增不減.
  • getActiveCount:獲取活動的線程數.

自定義拒絕策略

線程池滿負荷運轉後, 由於時間空間的問題, 可能須要拒絕掉部分任務的執行.

jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略

  • AbortPolicy: 直接拒絕策略, 拋出異常.
  • CallerRunsPolicy: 調用者本身執行任務策略.
  • DiscardOldestPolicy: 捨棄最老的未執行任務策略. 使用方式也很簡單, 直接傳參給ThreadPool
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("reject task: " + r.toString());
                    }
                });

自定義ThreadFactory

線程工廠用於建立池裏的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.或者統一指定線程優先級, 設置名稱等等.

class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger threadIndex = new AtomicInteger(0);
    private final String baseName;
    private final boolean daemon;

    public NamedThreadFactory(String baseName) {
        this(baseName, true);
    }

    public NamedThreadFactory(String baseName, boolean daemon) {
        this.baseName = baseName;
        this.daemon = daemon;
    }

    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
        thread.setDaemon(this.daemon);
        return thread;
    }
}

關閉線程池

跟直接new Thread不同, 局部變量的線程池, 須要手動關閉, 否則會致使線程泄漏問題.

默認提供兩種方式關閉線程池.

- shutdown: 等全部任務, 包括阻塞隊列中的執行完, 纔會終止, 可是不會接受新任務.
- shutdownNow: 當即終止線程池, 打斷正在執行的任務, 清空隊列.

ThreadPoolExecutor源碼分析

ThreadPoolExecutor中ctl屬性介紹

ctl是ThreadPoolExecutor的一個重要屬性,它記錄着ThreadPoolExecutor的線程數量和線程狀態。

//Integer有32位,其中前三位用於記錄線程狀態,後29位用於記錄線程的數量.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示用於記錄線程數量的位數
private static final int COUNT_BITS = Integer.SIZE - 3;
//將1左移COUNT_BITS位再減1,表示能表示的最大線程數
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//用ctl前三位分別表示線程池的狀態
//(前三位爲111)接受新任務而且處理已經進入隊列的任務
private static final int RUNNING    = -1 << COUNT_BITS;
//(前三位爲000)不接受新任務,可是處理已經進入隊列的任務
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//(前三位001)不接受新任務,不處理已經進入隊列的任務,而且中斷正在執行的任務
private static final int STOP       =  1 << COUNT_BITS;
//(前三位010)全部任務執行完成,workerCount爲0。線程轉到了狀態TIDYING會執行terminated()鉤子方法
private static final int TIDYING    =  2 << COUNT_BITS;
//(前三位011)任務已經執行完成
private static final int TERMINATED =  3 << COUNT_BITS;
//狀態值就是隻關心前三位的值,因此把後29位清0
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//線程數量就是隻關心後29位的值,因此把前3位清0
private static int workerCountOf(int c)  { return c & CAPACITY; }

//兩個數相或
private static int ctlOf(int rs, int wc) { return rs | wc; }

execute()方法解析

public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();
        int c = ctl.get();
       //判斷當前活躍線程數是否小於corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//調用addWorker建立線程執行任務
                return;
            c = ctl.get();
        }
        //若是不小於corePoolSize,則將任務添加到workQueue隊列。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();//再次獲取ctl的狀態
            //若是不在運行狀態了,那麼就從隊列中移除任務
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若是在運行階段,可是Worker數量爲0,調用addWorker方法
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //嘗試建立非核心線程若是建立失敗就會調用reject拒絕接受任務。
        else if (!addWorker(command, false))
            reject(command);
    }
//調用handler的rejectedExecution(command,this)方法。handler是RejectedExecutionHandler接口,默認實現是AbortPolicy
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

addWorker()方法解析

addWorker方法用於建立線程,而且經過core參數表示該線程是不是核心線程,若是返回true則表示建立成功,不然失敗。addWorker的代碼以下所示:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//獲得線程池的運行狀態

            // rs>=SHUTDOWN爲false,即線程池處於RUNNING狀態.
            // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()這個條件爲true,也就意味着三個條件同時知足,即線程池狀態爲SHUTDOWN且firstTask爲null且隊列不爲空,這種狀況爲處理隊列中剩餘任務。上面提到過當處於SHUTDOWN狀態時,不接受新任務,可是會處理完隊列裏面的任務。若是firstTask不爲null,那麼就屬於添加新任務;若是firstTask爲null,而且隊列爲空,那麼就不須要再處理了。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                //若是建立的是非核心線程(core=false)時,則須要判斷當前線程數wc>=maximumPoolSize,若是返回false,建立非核心線程失敗。
                //若是建立的是核心線程(core=true)時,則須要判斷當前線程數wc>=corePoolSize,若是返回false,建立核心線程失敗。
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//worker+1執行成功,那麼跳出外循環
                    break retry;
                c = ctl.get();
                if (runStateOf(c) != rs)//再次判斷當前狀態,若是新獲取的狀態和當前狀態不一致,則再次進入外循環
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }


/*
一旦跳出外循環,表示能夠建立建立線程,這裏具體是Worker對象,Worker實現了Runnable接口而且繼承AbstractQueueSynchronizer,內部維持一個Runnable的隊列。try塊中主要就是建立Worker對象,而後將其保存到workers中,workers是一個HashSet,表示工做線程的集合。而後若是添加成功,則開啓Worker所在的線程。若是開啓線程失敗,則調用addWorkerFailed方法,addWokerFailed用於回滾worker線程的建立。
*/
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //以firstTask做爲Worker的第一個任務建立Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//對整個線程池加鎖
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//啓動啓動這個線程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorkerFailed()方法解析

private void addWorkerFailed(Worker w) {
        //對整個線程成績加鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //移除Worker對象
            if (w != null)
                workers.remove(w);
            //減少worker數量
            decrementWorkerCount();
            //檢查termination狀態
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

addWorkerFailed首先從workers集合中移除線程,而後將wokerCount減1,最後檢查終結。

tryTerminate()方法解析

tryTerminate()方法用於檢查是否有必要將線程池狀態轉移到TERMINATED。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /*
                狀態判斷,若是有符合如下條件之一。則跳出循環
               (1)線程池處於RUNNING狀態
               (2)線程池狀態處於TIDYING狀態
               (3)線程池狀態處於SHUTDOWN狀態而且隊列不爲空
若是不知足上述的狀況,那麼目前狀態屬於SHUTDOWN切隊列爲空,或者狀態屬於STOP,那麼調用interruptIdleWorkers方法中止一個Worker線程,而後退出。
            */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
/*
若是沒有退出循環的話,那麼就首先將狀態設置成TIDYING,而後調用terminated方法,最後設置狀態爲TERMINATED。terminated方法是個空實現,用於當線程池終結時處理一些事情。
*/
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

構造函數Worker(firstTask)解析

Worker繼承自AbstractQueuedSynchronizer並實現Runnbale接口。AbstractQueuedSynchronizer提供了一個實現阻塞鎖和其餘同步工具,好比信號量、事件等依賴於等待隊列的框架。Worker的構造方法中會使用threadFactory構造線程變量並持有run方法調用了runWorker方法,將線程委託給主循環線程。

Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;//設置該線程的
    this.thread = getThreadFactory().newThread(this);//建立一個線程
}

//當我咱們啓動一個線程時就會觸發Worker中的此方法
public void run() {
    runWorker(this);
}

runWorker()方法解析

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//首次任務是建立Worker時添加的任務
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //線程調用runWoker,會while循環調用getTask方法從workerQueue裏讀取任務,而後執行任務。只要getTask方法不返回null,此線程就不會退出。
            while (task != null || (task = getTask()) != null) {
                w.lock();//對Worker加鎖
                //若是線程池中止了,那麼中斷線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//空實現,任務運行以前的操做
                    Throwable thrown = null;
                    try {
                        task.run();//執行任務
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//空實現,任務運行以後的操做
                    }
                } finally {
                    task = null;//任務執行完畢後,將task設置爲null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask()方法解析

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //必要時檢查隊列是否爲空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //判斷是否容許超時,wc>corePoolSize則是判斷當前線程數是否大於corePoolSize。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //若是當前線程數大於corePoolSize,
                //則會調用workQueue的poll方法獲取任務,超時時間是keepAliveTime。
                //若是超過keepAliveTime時長,poll返回了null,
                //上邊提到的while循序就會退出,線程也就執行完了。
                //若是當前線程數小於corePoolSize,
                //則會調用workQueue的take方法阻塞當前線程,不會退出
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

參考地址:

相關文章
相關標籤/搜索