線程池之刨根問底

線程池之刨根問底

前言

提及刨根問底,首先想到的居然是------小瀋陽。。。。。java

其實有關線程池的文章一搜都有一大堆,但。。。感受都不全(手動狗頭)android

什麼是線程?

先不說線程池的概念了,先說下什麼是線程。線程到底是什麼?百度這個問題的通常會獲得以下答案:數組

線程是指進程中的一個執行流程,一個進程中能夠運行多個線程。緩存

奧,線程是進程裏的,聽着這些名詞感受都很熟悉,感受一直在和它們打交道,但,什麼是進程呢?不就是一個內存中運行的應用程序嘛!並且有它本身獨立的一塊內存空間,一個程序至少有一個進程,一個進程至少有一個線程。ide

是否是感受挺繞,其實一點都不繞,安卓中不也有多進程嘛,直接在AndroidManifest給四大組件添加android:process屬性不得了,因此一個程序中並非只能有一個進程啊,能夠共同存在運行,一個進程中若是沒有線程還運行什麼呢,對不?性能

在Java中,每次程序運行至少啓動2個線程:一個是main線程,一個是垃圾收集線程。由於每當使用java命令執行一個類的時候,實際上都會啓動一個JVM,每個JVM實際上就是在操做系統中啓動了一個進程。測試

那JVM又是啥呢?JVM是虛擬機的英文簡稱。他是java運行環境的一部分。它是一個虛構出來的計算機,是經過在實際的計算機上仿真模擬各類計算機功能來實現的,JVM 中的內存能夠劃分爲若干個不一樣的數據區域,主要分爲:程序計數器、虛擬機棧、本地方法棧堆、方法區優化

好了好了,不能再說了,已經偏離主題了,如今說的是線程,從線程說到了進程,又說到了JVM。。。簡單總結下吧:進程(Process) 是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎,線程(thread) 是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。this

Java中的線程

在平時的代碼編寫中會遇到不少須要開啓線程來執行耗時的任務操做,由於安卓中的刷新機制,UI線程並不能執行耗時操做,因此要開啓子線程來執行,通常會有如下幾種方式:spa

  1. 直接繼承自Thread類,並重寫Thread中的run方法:

    public class TestThread extends Thread {
        @Override
        public void run() {
            //執行線程操做
        }
    }
    複製代碼
  2. 直接new一個Thread對象並start執行:

    new Thread(new Runnable() {
                @Override
                public void run() {
                    //執行線程操做
                }
            }).start();
    複製代碼
  3. 實現Runable接口,當咱們查看Thread源碼的時候會發現Thread類也是實現了Runable接口:

    public class MyRunnable implements Runnable{
            @Override
            public void run() {
    						//執行線程操做
            }
        }
    複製代碼

初識線程池

咱們都知道,Java 線程的建立以及上下文切換是比消耗性能的,因此引入了輕量級鎖、偏向鎖等優化,目的就是減小用戶態和核心態之間的切換頻率。既然建立和銷燬線程很是損耗性能,那可不能夠複用一些被建立好的線程呢?固然能夠了,用線程池啊。

終於說到線程池了。。。進入了今天的正題。。。

先放一張表示線程池體系的圖吧:

\[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-7jqkZ2K4-1590930342309)(/Users/zhujiang/Library/Application Support/typora-user-images/image-20200531094048614.png)\]

Executor

來看看線程池最頂層的接口:

public interface Executor {
    void execute(Runnable command);
}
複製代碼

能夠看到最頂層的Executor接口中只有一個execute方法,線程的建立、調度等細節由子類ExecutorService實現。

ExecutorService

接下來就看看ExecutorService:

public interface ExecutorService extends Executor {
		void shutdown();
		List<Runnable> shutdownNow();
		boolean isShutdown();
		boolean isTerminated();
		boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

上面就是ExecutorService的源碼,我把裏面的註釋去掉了,你們想看的話直接點進去查看便可,能夠看到ExecutorService 繼承並拓展了 Executor,在 ExecutorService 內部提供了更全面的任務提交機制以及線程池關閉方法。

ScheduledExecutorService

再來看下ScheduledExecutorService的源碼吧:

public interface ScheduledExecutorService extends ExecutorService {
			public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
  		public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
  		public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
 			public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}
複製代碼

能夠看到ScheduledExecutorService繼承自 ExecutorService,增長了四個定時任務相關方法。

ScheduledThreadPoolExecutor

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
複製代碼

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor,並實現了 ScheduledExecutorService 接口。

ForkJoinPool

public class ForkJoinPool extends AbstractExecutorService {}
複製代碼

ForkJoinPool 是一種支持任務分解的線程池,通常要配合可分解任務接口 ForkJoinTask 來使用。

ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService 的默認實現,所謂的線程池機制也大多封裝在此類當中。

瞭解線程池

經過上面的介紹你們應該對線程池已經有了大概瞭解,那麼。。。咱們該怎樣使用呢?上面也提到了ThreadPoolExecutor 是 ExecutorService 的默認實現,那麼直接看看 ThreadPoolExecutor 的構造方法不就得了嘛!來,看看:

<img src=">

廢話很少說,先來看第一個構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
複製代碼

代碼很簡單,外部傳入參數後直接調用其餘構造方法,來看一下須要傳入的參數都是啥意思吧:

  • corePoolSize:即便處於空閒狀態依然保留在池中的線程數(核心),除非設置了allowCoreThreadTimeOut,當 allowCoreThreadTimeOut 設置爲 true 時,核心線程超時後也會被銷燬。
  • maximumPoolSize:池中容許的最大線程數;
  • keepAliveTime:線程池空閒時線程的存活時長;
  • unit:keepAliveTime的時間單位;
  • workQueue:存聽任務的隊列,使用的是阻塞隊列;

上面的幾個參數都挺好理解,可是workQueue阻塞隊列不太好理解,下面來講一下吧:

  • ArrayBlockingQueue:有界隊列,一個用數組實現的有界阻塞隊列,按FIFO排序量。
  • LinkedBlockingQueue:可設置容量的隊列,基於鏈表結構的阻塞隊列,按FIFO排序任務,容量能夠選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度爲Integer.MAX_VALUE,吞吐量一般要高於ArrayBlockingQuene;newFixedThreadPool線程池使用了這個隊列(這裏有坑)。
  • DelayQueue:延遲隊列,是一個任務定時週期的延遲執行的隊列。根據指定的執行時間從小到大排序,不然根據插入到隊列的前後排序。newScheduledThreadPool線程池使用了這個隊列。
  • PriorityBlockingQueue:優先級隊列,具備優先級的無界阻塞隊列。
  • SynchronousQueue:同步隊列,一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene,newCachedThreadPool線程池使用了這個隊列。

再來看看第二個:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
複製代碼

能夠看到第二個構造方法和第一個方法相比只多了一個參數,那就來講下多的這個參數的意思吧:

  • threadFactory:執行程序建立新線程時要使用的工廠;

第三個構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
複製代碼

OK,又多了一個參數,來看看吧:

  • 在隊列(workQueue)和線程池達到最大線程數(maximumPoolSize)均滿時仍有任務的狀況下的處理方式;

這裏須要詳細說一下了,拒絕策略是線程池的一種保護機制,目的就是當這種無節制的線程資源申請發生時,拒絕新的任務保護線程池。默認拒絕策略會直接報異常,可是 JDK 中一共提供了 4 種保護策略,以下圖所示:

<img src=">

最後一個構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

這個方法的參數和第三個同樣,只是進行了一些空值判斷、異常的拋出以及參數的賦值操做。

使用

構造方法有了,來看看怎麼使用吧:

ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(corePoolSize,// 核心線程數
                        maximumPoolSize, // 最大線程數
                        keepAliveTime, // 閒置線程存活時間
                        TimeUnit.MILLISECONDS,// 時間單位
                        new LinkedBlockingDeque<Runnable>(),// 線程隊列
                        Executors.defaultThreadFactory(),// 線程工廠
                        new ThreadPoolExecutor.AbortPolicy()// 隊列已滿,並且當前線程數已經超過最大線程數時的異常處理策略
複製代碼

先來建立一個線程池,而後直接使用就能夠了:

mExecutor.execute(runnable);
複製代碼

是否是很簡單啊?不不不,還有更簡單的,爲了方便開發者能夠更方便的使用線程池,JDK 中給咱們提供了一個線程池的工廠類—Executors。在 Executors 中定義了多個靜態方法,用來建立不一樣配置的線程池。常見有如下幾種。

newSingleThreadExecutor

顧名思義,這是一個單線程化的線程池,只會用惟一的工做線程來執行任務,保證全部任務按先進先出的順序執行。

private void one(){
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //執行線程操做
            }
        });
    }
複製代碼

那麼。。。它是怎麼實現只有一個工做線程呢?來看看源碼不得了:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼

很簡單吧,只是把核心線程數和最大線程數都設置爲了1,緩存隊列爲LinkedBlockingQueue(可設置容量隊列)

newCachedThreadPool

這個是建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。

private void two(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
								//執行線程操做
            }
        });
    }
複製代碼

再來看看可緩存的線程池是怎麼實現的:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼

能夠看到可緩存的線程池是沒有核心線程的,可是最大線程數爲Integer.MAX_VALUE(2147483647),緩存隊列爲SynchronousQueue(同步隊列)

newFixedThreadPool

這個方法是建立一個固定數目的、可重用的線程池。

private void three(){
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //執行線程操做
            }
        });
    }
複製代碼

來看下固定數目的線程池的實現吧:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
複製代碼

很簡單,只是傳入一個數字,核心線程數和最大線程數都設置爲輸入的值的大小,緩存隊列爲LinkedBlockingQueue(可設置容量隊列)

newScheduledThreadPool

建立一個定時線程池,支持定時及週期性任務執行。

private void four(){
        ExecutorService executorService = Executors.newScheduledThreadPool(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //執行線程操做
            }
        });
    }
複製代碼

最後再看一下定時線程池的實現:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
複製代碼

上面提到過,定時線程池的實現是ScheduledThreadPoolExecutor,那咱就再來看下ScheduledThreadPoolExecutor的構造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
複製代碼

核心線程數也是傳進的值,最大線程是也是Integer.MAX_VALUE(2147483647),緩存隊列爲DelayedWorkQueue(延遲隊列)

小總結

是否是發現Exectors中的這幾個靜態方法很方便啊,特別省事,直接調用便可使用,比本身來建立ThreadPoolExecutor方便的多。

可是。。。

凡事就怕可是,先來看一段代碼:

private static void three(){
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10000000; i++) {
            final int taskId = i;
            System.out.println(taskId);
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                   try{
                       System.out.println("線程:"+Thread.currentThread().getName()+" 正在執行:"+taskId);
                       Thread.sleep(1000);
                   }catch (Exception e){
                       e.printStackTrace();
                   }
                }
            });
        }

    }
複製代碼

上面說過LinkedBlockingQueue若是容量不設置的話將是一個無邊界的阻塞隊列,最大長度爲Integer.MAX_VALUE,源碼中也沒有進行設置,因此。。。。會OOM。。此處即爲上面提到的坑。。

因爲newFixedThreadPool和newSingleThreadExecutor實現基本同樣,就再也不測試newSingleThreadExecutor。

下面來看下newCachedThreadPool:

private void two(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10000000; i++) {
            final int taskId = i;
            System.out.println(taskId);
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        System.out.println("線程:"+Thread.currentThread().getName()+" 正在執行:"+taskId);
                        Thread.sleep(1000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            });
        }
    }
複製代碼

一樣會報OOM,緣由很簡單,它的最大線程數爲Integer.MAX_VALUE(2147483647),沒有限制,因此。。。。

CachedThreadPool 和 ScheduledThreadPool都容許的建立線程數量爲 Integer.MAX_VALUE,此處再也不測試ScheduledThreadPool。

因此。。。

有了上面的可是,也就有了下面的因此,相信阿里的開發規範你們都或多或少的看過,裏面有這麼一條:

線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。

那麼。。。我到底該怎麼作?

很簡單,經過ThreadPoolExecutor的方式本身建立線程池,根據業務邏輯選擇阻塞隊列、拒絕策略等。

固然安卓開發其實無需考慮那麼多,使用Kotlin的朋友應該知道一個更香的東西------協程,直接使用協程不得了,也不用考慮這麼多亂七八糟的東西。。。谷歌大法好。。協程的使用在這裏我就不現眼了,扔物線的課程中講的明明白白的。

總結

原本感受沒多少東西,但爲何寫了這麼久。。。哎。喜歡的幫忙點點關注,點點贊啊。

我的編寫,不免出錯,文中若有錯誤請指出,萬分感激。下次再見。

相關文章
相關標籤/搜索