深刻理解JAVA線程池

前言

多線程的異步執行方式,雖然可以最大限度發揮多核計算機的計算能力,可是若是不加控制,反而會對系統形成負擔。線程自己也要佔用內存空間,大量的線程會佔用內存資源而且可能會致使Out of Memory。即使沒有這樣的狀況,大量的線程回收也會給GC帶來很大的壓力。javascript

爲了不重複的建立線程,線程池的出現可讓線程進行復用。通俗點講,當有工做來,就會向線程池拿一個線程,當工做完成後,並非直接關閉線程,而是將這個線程歸還給線程池供其餘任務使用。java

接下來從整體到細緻的方式,來共同探討線程池。web

1. 整體的架構

來看Executor的框架圖:數組

img

接口:Executor,CompletionService,ExecutorService,ScheduledExecutorService緩存

抽象類:AbstractExecutorServicebash

實現類:ExecutorCompletionService,ThreadPoolExecutor,ScheduledThreadPoolExecutor多線程

從圖中就能夠看到主要的方法,本文主要討論的是ThreadPoolExecutor架構

2. 研讀ThreadPoolExecutor

看一下該類的構造器:併發

public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit, BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory, RejectedExecutionHandler paramRejectedExecutionHandler) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
            throw new IllegalArgumentException();
        if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
            throw new NullPointerException();
        this.corePoolSize = paramInt1;
        this.maximumPoolSize = paramInt2;
        this.workQueue = paramBlockingQueue;
        this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
        this.threadFactory = paramThreadFactory;
        this.handler = paramRejectedExecutionHandler;
    }
複製代碼

corePoolSize :線程池的核心池大小,在建立線程池以後,線程池默認沒有任何線程。框架

當有任務過來的時候纔會去建立建立線程執行任務。換個說法,線程池建立以後,線程池中的線程數爲0,當任務過來就會建立一個線程去執行,直到線程數達到corePoolSize 以後,就會被到達的任務放在隊列中。(注意是到達的任務)。換句更精煉的話:corePoolSize 表示容許線程池中容許同時運行的最大線程數。

若是執行了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。

maximumPoolSize :線程池容許的最大線程數,他表示最大能建立多少個線程。maximumPoolSize確定是大於等於corePoolSize。

keepAliveTime :表示線程沒有任務時最多保持多久而後中止。默認狀況下,只有線程池中線程數大於corePoolSize 時,keepAliveTime 纔會起做用。換句話說,當線程池中的線程數大於corePoolSize,而且一個線程空閒時間達到了keepAliveTime,那麼就是shutdown。

Unit: keepAliveTime 的單位。

workQueue :一個阻塞隊列,用來存儲等待執行的任務,當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。經過workQueue,線程池實現了阻塞功能

threadFactory :線程工廠,用來建立線程。

handler :表示當拒絕處理任務時的策略。

2.1 任務緩存隊列

在前面咱們屢次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

1)有界任務隊列ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;

2)無界任務隊列LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE

3)直接提交隊列synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

2.2 拒絕策略

AbortPolicy:丟棄任務並拋出RejectedExecutionException

CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是,任務提交線程的性能極有可能會急劇降低。

DiscardOldestPolicy:丟棄隊列中最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。

DiscardPolicy:丟棄任務,不作任何處理。

2.3 線程池的任務處理策略:

若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;

若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;

若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

2.4 線程池的關閉

ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()shutdownNow(),其中:

shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務

shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

2.5 源碼分析

首先來看最核心的execute方法,這個方法在AbstractExecutorService中並無實現,從Executor接口,直到ThreadPoolExecutor才實現了改方法,

ExecutorService中的submit(),invokeAll(),invokeAny()都是調用的execute方法,因此execute是核心中的核心,源碼分析將圍繞它逐步展開。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 若是正在運行的線程數小於corePoolSize,那麼將調用addWorker 方法來建立一個新的線程,並將該任務做爲新線程的第一個任務來執行。 &emsp;&emsp;&emsp;&emsp;&emsp;&emsp; 固然,在建立線程以前會作原子性質的檢查,若是條件不容許,則不建立線程來執行任務,並返回false.&emsp;&emsp; * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 若是一個任務成功進入阻塞隊列,那麼咱們須要進行一個雙重檢查來確保是咱們已經添加一個線程(由於存在着一些線程在上次檢查後他已經死亡)或者 &emsp;&emsp;&emsp;&emsp;&emsp;&emsp; 當咱們進入該方法時,該線程池已經關閉。因此,咱們將從新檢查狀態,線程池關閉的狀況下則回滾入隊列,線程池沒有線程的狀況則建立一個新的線程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. &emsp;&emsp;&emsp;&emsp;&emsp;&emsp; 若是任務沒法入隊列(隊列滿了),那麼咱們將嘗試新開啓一個線程(從corepoolsize到擴充到maximum),若是失敗了,那麼能夠肯定緣由,要麼是 &emsp;&emsp;&emsp;&emsp;&emsp;&emsp; 線程池關閉了或者飽和了(達到maximum),因此咱們執行拒絕策略。 */
&emsp;&emsp;&emsp;&emsp;
&emsp;&emsp;&emsp;&emsp;// 1.當前線程數量小於corePoolSize,則建立並啓動線程。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;// 成功,則返回
return;
            c = ctl.get();
        }
&emsp;&emsp;&emsp;&emsp;// 2.步驟1失敗,則嘗試進入阻塞隊列,
        if (isRunning(c) && workQueue.offer(command)) {
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;// 入隊列成功,檢查線程池狀態,若是狀態部署RUNNING並且remove成功,則拒絕任務
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;// 若是當前worker數量爲0,經過addWorker(null, false)建立一個線程,其任務爲null
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
&emsp;&emsp;&emsp;&emsp;// 3. 步驟1和2失敗,則嘗試將線程池的數量有corePoolSize擴充至maxPoolSize,若是失敗,則拒絕任務
        else if (!addWorker(command, false))
            reject(command);
    }[![複製代碼](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);
複製代碼

相信看了代碼也是一臉懵,接下來用一個流程圖來說一講,他究竟幹了什麼事:

img

結合上面的流程圖來逐行解析,首先前面進行空指針檢查,

wonrkerCountOf()方法可以取得當前線程池中的線程的總數,取得當前線程數與核心池大小比較,

  • 若是小於,將經過addWorker()方法調度執行。
  • 若是大於核心池大小,那麼就提交到等待隊列。
  • 若是進入等待隊列失敗,則會將任務直接提交給線程池。
  • 若是線程數達到最大線程數,那麼就提交失敗,執行拒絕策略。

excute()方法中添加任務的方式是使用addWorker()方法,看一下源碼,一塊兒學習一下。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
&emsp;&emsp;&emsp;&emsp; // 外層循環,用於判斷線程池狀態
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp; // 內層的循環,任務是將worker數量加1
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
&emsp;&emsp;&emsp;&emsp;// worker加1後,接下來將woker添加到HashSet<Worker>中,並啓動worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;// 若是往HashSet<Worker>添加成功,則啓動該線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

addWorker(Runnable firstTask, boolean core)的主要任務是建立並啓動線程。

他會根據當前線程的狀態和給定的值(core or maximum)來判斷是否能夠建立一個線程。

addWorker共有四種傳參方式。execute使用了其中三種,分別爲:

1.addWorker(paramRunnable, true)

線程數小於corePoolSize時,放一個須要處理的task進Workers Set。若是Workers Set長度超過corePoolSize,就返回false.

2.addWorker(null, false)

放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task爲空的worker在線程執行的時候會去任務隊列裏拿任務,這樣就至關於建立了一個新的線程,只是沒有立刻分配任務。

3.addWorker(paramRunnable, false)

當隊列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。若是線程池也滿了的話就返回false.

還有一種狀況是execute()方法沒有使用的

addWorker(null, true)

這個方法就是放一個null的task進Workers Set,並且是在小於corePoolSize時,若是此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來爲線程池預先啓動corePoolSize個worker等待從workQueue中獲取任務執行。

執行流程:

一、判斷線程池當前是否爲能夠添加worker線程的狀態,能夠則繼續下一步,不能夠return false: A、線程池狀態>shutdown,可能爲stop、tidying、terminated,不能添加worker線程 B、線程池狀態==shutdown,firstTask不爲空,不能添加worker線程,由於shutdown狀態的線程池不接收新任務 C、線程池狀態==shutdown,firstTask==null,workQueue爲空,不能添加worker線程,由於firstTask爲空是爲了添加一個沒有任務的線程再從workQueue獲取task,而workQueue爲     空,說明添加無任務線程已經沒有意義 二、線程池當前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步 三、在線程池的ReentrantLock保證下,向Workers Set中添加新建立的worker實例,添加完成後解鎖,並啓動worker線程,若是這一切都成功了,return true,若是添加worker入Set失敗或啓動失敗,調用addWorkerFailed()邏輯

3. 常見的四種線程池

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}
複製代碼

固定大小的線程池,能夠指定線程池的大小,該線程池corePoolSize和maximumPoolSize相等,阻塞隊列使用的是LinkedBlockingQueue,大小爲整數最大值。該線程池中的線程數量始終不變,當有新任務提交時,線程池中有空閒線程則會當即執行,若是沒有,則會暫存到阻塞隊列。對於固定大小的線程池,不存在線程數量的變化。同時使用無界的LinkedBlockingQueue來存放執行的任務。當任務提交十分頻繁的時候,LinkedBlockingQueue迅速增大,存在着耗盡系統資源的問題。並且在線程池空閒時,即線程池中沒有可運行任務時,它也不會釋放工做線程,還會佔用必定的系統資源,須要shutdown。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
    }
複製代碼

單個線程線程池,只有一個線程的線程池,阻塞隊列使用的是LinkedBlockingQueue,如有多餘的任務提交到線程池中,則會被暫存到阻塞隊列,待空閒時再去執行。按照先入先出的順序執行任務。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
    }
複製代碼

緩存線程池,緩存的線程默認存活60秒。線程的核心池corePoolSize大小爲0,核心池最大爲Integer.MAX_VALUE,阻塞隊列使用的是SynchronousQueue。是一個直接提交的阻塞隊列, 他總會迫使線程池增長新的線程去執行新的任務。在沒有任務執行時,當線程的空閒時間超過keepAliveTime(60秒),則工做線程將會終止被回收,當提交新任務時,若是沒有空閒線程,則建立新線程執行任務,會致使必定的系統開銷。若是同時又大量任務被提交,並且任務執行的時間不是特別快,那麼線程池便會新增出等量的線程池處理任務,這極可能會很快耗盡系統的資源。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
    }

    public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
        return new ScheduledThreadPoolExecutor(var0, var1);
    }
複製代碼

定時線程池,該線程池可用於週期性地去執行任務,一般用於週期性的同步數據。

scheduleAtFixedRate:是以固定的頻率去執行任務,週期是指每次執行任務成功執行之間的間隔。

schedultWithFixedDelay:是以固定的延時去執行任務,延時是指上一次執行成功以後和下一次開始執行的以前的時間。

4. 使用實例

newFixedThreadPool實例:

public class FixPoolDemo {

    private static Runnable getThread(final int i) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
            }
        };
    }

    public static void main(String args[]) {
        ExecutorService fixPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            fixPool.execute(getThread(i));
        }
        fixPool.shutdown();
    }
}
複製代碼

newCachedThreadPool實例:

public class CachePool {
    private static Runnable getThread(final int i){
        return new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                }catch (Exception e){

                }
                System.out.println(i);
            }
        };
    }

    public static void main(String args[]){
        ExecutorService cachePool = Executors.newCachedThreadPool();
        for (int i=1;i<=10;i++){
            cachePool.execute(getThread(i));
        }
    }
}
複製代碼

這裏沒用調用shutDown方法,這裏能夠發現過60秒以後,會自動釋放資源。

newSingleThreadExecutor實例:

public class SingPoolDemo {
    private static Runnable getThread(final int i){
        return new Runnable() {
            @Override
            public void run() {
                try {

                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
            }
        };
    }

    public static void main(String args[]) throws InterruptedException {
        ExecutorService singPool = Executors.newSingleThreadExecutor();
        for (int i=0;i<10;i++){
            singPool.execute(getThread(i));
        }
        singPool.shutdown();
    }
複製代碼

這裏須要注意一點,newSingleThreadExecutor和newFixedThreadPool同樣,在線程池中沒有任務可執行時,也不會釋放系統資源的,因此須要shudown。

newScheduledThreadPool實例:

public class ScheduledExecutorServiceDemo {
    public static void main(String args[]) {

        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(4000);
                    System.out.println(Thread.currentThread().getId() + "執行了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 2, TimeUnit.SECONDS);
    }
}
複製代碼

5. 手動建立線程池有幾個注意點

  1. **任務獨立。**如何任務依賴於其餘任務,那麼可能產生死鎖。例如某個任務等待另外一個任務的返回值或執行結果,那麼除非線程池足夠大,不然將發生線程飢餓死鎖。
  2. **合理配置阻塞時間過長的任務。**若是任務阻塞時間過長,那麼即便不出現死鎖,線程池的性能也會變得很糟糕。在Java併發包裏可阻塞方法都同時定義了限時方式和不限時方式。例如Thread.join,BlockingQueue.put,CountDownLatch.await等,若是任務超時,則標識任務失敗,而後停止任務或者將任務放回隊列以便隨後執行,這樣,不管任務的最終結果是否成功,這種辦法都可以保證任務總能繼續執行下去。
  3. 設置合理的線程池大小。只須要避免過大或者太小的狀況便可,上文的公式線程池大小=NCPU *UCPU(1+W/C)
  4. **選擇合適的阻塞隊列。**newFixedThreadPool和newSingleThreadExecutor都使用了無界的阻塞隊列,無界阻塞隊列會有消耗很大的內存,若是使用了有界阻塞隊列,它會規避內存佔用過大的問題,可是當任務填滿有界阻塞隊列,新的任務該怎麼辦?在使用有界隊列是,須要選擇合適的拒絕策略,隊列的大小和線程池的大小必須一塊兒調節。對於很是大的或者無界的線程池,可使用SynchronousQueue來避免任務排隊,以直接將任務從生產者提交到工做者線程。

下面是Thrift框架處理socket任務所使用的一個線程池,能夠看一下FaceBook的工程師是如何自定義線程池的。

private static ExecutorService createDefaultExecutorService(Args args) {
        SynchronousQueue executorQueue = new SynchronousQueue();

        return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
                executorQueue);
    }
複製代碼

6. 總結

6.1 如何選擇線程池數量

線程池的大小決定着系統的性能,過大或者太小的線程池數量都沒法發揮最優的系統性能。

固然線程池的大小也不須要作的太過於精確,只須要避免過大和太小的狀況。通常來講,肯定線程池的大小須要考慮CPU的數量,內存大小,任務是計算密集型仍是IO密集型等因素

NCPU = CPU的數量

UCPU = 指望對CPU的使用率 0 ≤ UCPU ≤ 1

W/C = 等待時間與計算時間的比率

若是但願處理器達到理想的使用率,那麼線程池的最優大小爲:

線程池大小=NCPU *UCPU(1+W/C)

在Java中使用

int ncpus = Runtime.getRuntime().availableProcessors();
複製代碼

獲取CPU的數量。

6.2 線程池工廠

Executors的線程池若是不指定線程工廠會使用Executors中的DefaultThreadFactory,默認線程池工廠建立的線程都是非守護線程。

使用自定義的線程工廠能夠作不少事情,好比能夠跟蹤線程池在什麼時候建立了多少線程,也能夠自定義線程名稱和優先級。若是將

新建的線程都設置成守護線程,當主線程退出後,將會強制銷燬線程池。

下面這個例子,記錄了線程的建立,並將全部的線程設置成守護線程。

public class ThreadFactoryDemo {
    public static class MyTask1 implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+"Thrad ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
          MyTask1 task = new MyTask1();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("建立線程"+t);
                return  t;
            }
        });
        for (int i = 0;i<=4;i++){
           es.submit(task);
        }
    }
}
複製代碼

6.3 擴展線程池

ThreadPoolExecutor是能夠拓展的,它提供了幾個能夠在子類中改寫的方法:beforeExecute,afterExecute和terimated。

在執行任務的線程中將調用beforeExecute和afterExecute,這些方法中還能夠添加日誌,計時,監視或統計收集的功能,

還能夠用來輸出有用的調試信息,幫助系統診斷故障。下面是一個擴展線程池的例子:

public class ThreadFactoryDemo {
    public static class MyTask1 implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+"Thrad ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
          MyTask1 task = new MyTask1();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("建立線程"+t);
                return  t;
            }
        });
        for (int i = 0;i<=4;i++){
           es.submit(task);
        }
    }
}
複製代碼

線程池的正確使用

如下阿里編碼規範裏面說的一段話:

線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:

  1. newFixedThreadPool和newSingleThreadExecutor:

    主要問題是堆積的請求處理隊列可能會耗費很是大的內存,甚至OOM。

  2. newCachedThreadPool和newScheduledThreadPool:

    主要問題是線程數最大數是Integer.MAX_VALUE,可能會建立數量很是多的線程,甚至OOM。

相關文章
相關標籤/搜索