Java線程池核心原理剖析

在系統開發時,咱們常常會遇到「池」的概念。使用池一種以空間換時間的作法,一般在內存中事先保存一系列整裝待命的對象,以供後期供其餘對象隨時調用。常見的池有:數據庫鏈接池,socket鏈接池,線程池等。今天咱們就來看一下線程池的概念。java


Executor
git

JDK爲咱們提供了一套Executor框架來方便咱們來管理和使用線程池。
打開java.util.concurrent.Executors類,咱們能夠發現JDK爲咱們提供了那麼多的方法來幫助咱們高效快捷的建立線程池:github

123456複製代碼
public static ExecutorService newFixedThreadPool(int nThreads);//建立一個固定數目的、可重用的線程池public static ExecutorService newSingleThreadExecutor();//建立一個單線程化的線程public static ExecutorService newCachedThreadPool();//建立一個可緩存線程池public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);//建立一個支持定時及週期性任務執行的線程池public static ScheduledExecutorService newSingleThreadScheduledExecutor() ;//建立一個支持定時及週期性任務執行的線程池public static ExecutorService newWorkStealingPool() ;//建立一個擁有多個任務隊列的線程池複製代碼

上方簡單列舉了幾個Executor框架爲咱們提供的建立線程池的方法,這些線程池擁有各類各樣的功能,我想當你剛剛開始使用線程的時候google如何使用線程池的時候大部分文章都是教你如何使用上方的一些方法建立一個線程池。可是若是你去查看他們的源碼就會發現他們最後構造的時候都調用了同一個構造方法。(除了newWorkStealingPool以外,這個咱們在下篇文章再討論)數據庫

1234567複製代碼
ThreadPoolExecutor(int corePoolSize,//線程池線程數量                           int maximumPoolSize,//線程中最大的線程數量                           long keepAliveTime,//線程池線程數量超過corePoolSize的空閒線程的存活時間                           TimeUnit unit,//keepAliveTime時間單位                           BlockingQueue<Runnable> workQueue,//被提交還沒執行的任務存放在這裏                           ThreadFactory threadFactory,//線程工廠                           RejectedExecutionHandler handler)//任務過多時的拒絕策略複製代碼

上方的4個參數我想你看到了就會明白了,如今咱們着重來說一下下面的三個參數。緩存


WorkQueue
bash

參數workQueue是用來存放已提交但還未執行的任務,JDK爲咱們提供了一下實現:框架

直接提交隊列SynchronousQueuesocket

12345678910複製代碼
當新任務過來的時候它是這樣處理的:if(有空閒線程){    處理}else{    if(當前線程數<maximumPoolSize){        建立新線程處理    }else{        執行拒絕策略    }}複製代碼

所以使用這個隊列時必定要設置很大的maximumPoolSizeide

有界的任務隊列ArrayBlockingQueueui

12345678910111213複製代碼
if(當前線程數<corePoolSize){    建立新線程執行}else{    if(任務隊列是否已滿){       if(當前線程<maximumPoolSize){          建立新線程處理       }else{          執行拒絕策略        }    }else{       放到任務隊列    }}複製代碼

無界的任務隊列LinkedBlockingDeque

12345678910111213複製代碼
if(當前線程數<corePoolSize){    建立新線程執行}else{    放入任務隊列,等待執行,直到系統資源耗盡}優先任務隊列PriorityBlockingQueue根據任務的優先級將任務存放在任務隊列特定位置if(當前線程數<corePoolSize){    建立新線程執行}else{    等待執行,直到系統資源耗盡}複製代碼


線程工廠

第六個參數threadFactory是爲線程池中建立線程的,咱們使用Executor框架建立的線程就是有threadFactory提供的。咱們看一下JDK提供的默認的threadFactory:

1234567891011121314151617181920212223242526複製代碼
static class DefaultThreadFactory implements ThreadFactory {        private static final AtomicInteger poolNumber = new AtomicInteger(1);        private final ThreadGroup group;        private final AtomicInteger threadNumber = new AtomicInteger(1);        private final String namePrefix;        DefaultThreadFactory() {            SecurityManager s = System.getSecurityManager();            group = (s != null) ? s.getThreadGroup() :                                  Thread.currentThread().getThreadGroup();            namePrefix = "pool-" +                          poolNumber.getAndIncrement() +                         "-thread-";        }        public Thread newThread(Runnable r) {            Thread t = new Thread(group, r,                                  namePrefix + threadNumber.getAndIncrement(),                                  0);            if (t.isDaemon())                t.setDaemon(false);            if (t.getPriority() != Thread.NORM_PRIORITY)                t.setPriority(Thread.NORM_PRIORITY);            return t;        }    }複製代碼

重點關注一下其中的newThread方法,看到這個我想你就明白了爲何你使用線程池建立出來的線程打印的時候名字的來源,還有是不是守護線程和優先級等屬性的來源了。


拒絕策略

看到剛剛的幾種任務隊列咱們發現當任務過多時是須要指定拒絕策略來進行拒絕呢,那麼JDK又爲咱們提供了哪些拒絕策略呢。

1234複製代碼
AbortPolicy直接拋出異常。CallerRunsPolicy:若是線程池未關閉,則在調用者線程中運行當前任務DiscardOldestPolicy:丟棄即將執行的任務,而後再嘗試提交當前任務DiscardPolicy:丟棄此任務複製代碼


線程池的擴展

ThreadPoolExecutor不單單可以建立各類各樣的線程來幫助咱們實行功能,它還預留了三個接口來供咱們進行擴展。

在runWorker方法中調用線程進行執行以前調用了beforeExecute方法,執行以後調用了afterExecute()方法

123456789101112131415161718192021222324252627282930313233343536373839複製代碼
final void runWorker(Worker w) {       Thread wt = Thread.currentThread();       Runnable task = w.firstTask;       w.firstTask = null;       w.unlock();        boolean completedAbruptly = true;       try {           while (task != null || (task = getTask()) != null) {               w.lock();               if ((runStateAtLeast(ctl.get(), STOP) ||                    (Thread.interrupted() &&                     runStateAtLeast(ctl.get(), STOP))) &&                   !wt.isInterrupted())                   wt.interrupt();               try {                   beforeExecute(wt, task);//線程執行前                   Throwable thrown = null;                   try {                       task.run();                   } catch (RuntimeException x) {                       thrown = x; throw x;                   } catch (Error x) {                       thrown = x; throw x;                   } catch (Throwable x) {                       thrown = x; throw new Error(x);                   } finally {                       afterExecute(task, thrown);//線程執行後                   }               } finally {                   task = null;                   w.completedTasks++;                   w.unlock();               }           }           completedAbruptly = false;       } finally {           processWorkerExit(w, completedAbruptly);       }   }複製代碼

這兩個方法在ThreadPoolExecutor類中是沒有實現的,咱們想要監控線程運行先後的數據就能夠經過繼承ThreadPoolExecutor類來實現這個擴展。
另外還有一個terminated()方法是在整個線程池退出的時候調用的,咱們這裏一併擴展。

public class ThreadPoolExecutorDemo extends ThreadPoolExecutor {
    //注意這裏由於ThreadPoolExecutor沒有無參的構造,因此還須要重寫一下構造方法。
    //這裏限於篇幅就不貼了
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println(Thread.currentThread().getId()+"執行完成");

    }
    @Override
    protected void terminated() {
        System.out.println("線程池退出");
    }
}

//使用這個demo就能夠驗證咱們擴展的結果了。

public class ThreadPoolDemo {
    static class ThreadDemo extends Thread {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID is:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutorDemo threadPoolExecutorDemo=  new ThreadPoolExecutorDemo(5,5,0,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
        ThreadDemo threadDemo = new ThreadDemo();
        for (int i = 0; i < 20; i++) {
            threadPoolExecutorDemo.submit(threadDemo);
        }
        threadPoolExecutorDemo.shutdown();
    }
}
複製代碼

本文全部源碼:github.com/shiyujun/sy…



相關文章
相關標籤/搜索