Java ThreadPoolExecutor 線程池

Executors

Executors 是一個Java中的工具類. 提供工廠方法來建立不一樣類型的線程池.java

clipboard.png

從上圖中也能夠看出, Executors的建立線程池的方法, 建立出來的線程池都實現了 ExecutorService接口. 經常使用方法有如下幾個:apache

  • newFixedThreadPool(int Threads): 建立固定數目線程的線程池, 超出的線程會在隊列中等待.
  • newCachedThreadPool(): 建立一個可緩存線程池, 若是線程池長度超過處理須要, 可靈活回收空閒線程(60秒), 若無可回收,則新建線程.
  • newSingleThreadExecutor(): 建立一個單線程化的線程池, 它只會用惟一的工做線程來執行任務, 保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行. 若是某一個任務執行出錯, 將有另外一個線程來繼續執行.
  • newScheduledThreadPool(int corePoolSize): 建立一個支持定時及週期性的任務執行的線程池, 多數狀況下可用來替代Timer類.

Executors 例子

newCachedThreadPool

線程最大數爲 Integer.MAX_VALUE, 當咱們往線程池添加了 n 個任務, 這 n 個任務都是一塊兒執行的.數組

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

使用 execute 方法添加任務時要注意: execute 提交的方式只能提交一個 Runnable 的對象, 且該方法的返回值是 void, 而且當線程的執行過程當中拋出了異常一般來講主線程也沒法獲取到異常的信息的, 只有經過 ThreadFactory 主動設置線程的異常處理類才能感知到提交的線程中的異常信息.緩存

若是想要獲取返回信息可使用 submit 方法.多線程

newFixedThreadPool

ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

newScheduledThreadPool

三秒執行一次, 只有執行完這一次後, 纔會執行.ide

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(2000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

順序執行各個任務, 第一個任務執行完, 纔會執行下一個.函數

ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

Executors存在什麼問題

clipboard.png

在阿里巴巴Java開發手冊中提到,使用Executors建立線程池可能會致使OOM(OutOfMemory ,內存溢出),可是並無說明爲何,那麼接下來咱們就來看一下到底爲何不容許使用Executors?工具

咱們先來一個簡單的例子,模擬一下使用Executors致使OOM的狀況.this

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executor.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            //do nothing
        }
    }
}

經過指定JVM參數:-Xmx8m -Xms8m 運行以上代碼,會拋出OOM:spa

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上代碼指出,ExecutorsDemo.java 的第16行,就是代碼中的 executor.execute(new SubThread());

Java中的 BlockingQueue 主要有兩種實現, 分別是 ArrayBlockingQueueLinkedBlockingQueue.

ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列, 必須設置容量.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

LinkedBlockingQueue 是一個用鏈表實現的有界阻塞隊列, 容量能夠選擇進行設置, 不設置的話, 將是一個無邊界的阻塞隊列, 最大長度爲 Integer.MAX_VALUE.

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

這裏的問題就出在若是咱們不設置 LinkedBlockingQueue 的容量的話, 其默認容量將會是 Integer.MAX_VALUE.

newFixedThreadPool 中建立 LinkedBlockingQueue 時, 並未指定容量. 此時, LinkedBlockingQueue 就是一個無邊界隊列, 對於一個無邊界隊列來講, 是能夠不斷的向隊列中加入任務的, 這種狀況下就有可能由於任務過多而致使內存溢出問題.

newCachedThreadPoolnewScheduledThreadPool 這兩種方式建立的最大線程數多是Integer.MAX_VALUE, 而建立這麼多線程, 必然就有可能致使OOM.

ThreadPoolExecutor 建立線程池

避免使用 Executors 建立線程池, 主要是避免使用其中的默認實現, 那麼咱們能夠本身直接調用 ThreadPoolExecutor 的構造函數來本身建立線程池. 在建立的同時, 給 BlockQueue 指定容量就能夠了.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));

這種狀況下, 一旦提交的線程數超過當前可用線程數時, 就會拋出 java.util.concurrent.RejectedExecutionException, 這是由於當前線程池使用的隊列是有邊界隊列, 隊列已經滿了便沒法繼續處理新的請求.

除了本身定義 ThreadPoolExecutor 外. 還有其餘方法. 如apache和guava等.

四個構造函數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

int corePoolSize => 該線程池中核心線程數最大值
線程池新建線程的時候,若是當前線程總數小於corePoolSize, 則新建的是核心線程, 若是超過corePoolSize, 則新建的是非核心線程

核心線程默認狀況下會一直存活在線程池中, 即便這個核心線程啥也不幹(閒置狀態).

若是指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 這個屬性爲 true, 那麼核心線程若是不幹活(閒置狀態)的話, 超過必定時間(時長下面參數決定), 就會被銷燬掉

很好理解吧, 正常狀況下你不幹活我也養你, 由於我總有用到你的時候, 但有時候特殊狀況(好比我本身都養不起了), 那你不幹活我就要把你幹掉了

int maximumPoolSize
該線程池中線程總數最大值

線程總數 = 核心線程數 + 非核心線程數.

long keepAliveTime
該線程池中非核心線程閒置超時時長

一個非核心線程, 若是不幹活(閒置狀態)的時長超過這個參數所設定的時長, 就會被銷燬掉

若是設置 allowCoreThreadTimeOut = true, 則會做用於核心線程

TimeUnit unit

keepAliveTime的單位, TimeUnit是一個枚舉類型, 其包括:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

BlockingQueue workQueue

一個阻塞隊列, 用來存儲等待執行的任務. 也就是說如今有10個任務, 核心線程 有四個, 非核心線程有六個, 那麼這六個線程會被添加到 workQueue 中, 等待執行.

這個參數的選擇也很重要, 會對線程池的運行過程產生重大影響, 通常來講, 這裏的阻塞隊列有如下幾種選擇:

SynchronousQueue: 這個隊列接收到任務的時候, 會直接提交給線程處理, 而不保留它, 若是全部線程都在工做怎麼辦? 那就*新建一個線程來處理這個任務!因此爲了保證不出現<線程數達到了maximumPoolSize而不能新建線程>的錯誤, 使用這個類型隊列的時候, maximumPoolSize 通常指定成 Integer.MAX_VALUE, 即無限大.

LinkedBlockingQueue: 這個隊列接收到任務的時候, 若是當前線程數小於核心線程數, 則核心線程處理任務; 若是當前線程數等於核心線程數, 則進入隊列等待. 因爲這個隊列最大值爲 Integer.MAX_VALUE , 即全部超過核心線程數的任務都將被添加到隊列中,這也就致使了 maximumPoolSize 的設定失效, 由於總線程數永遠不會超過 corePoolSize.

ArrayBlockingQueue: 能夠限定隊列的長度, 接收到任務的時候, 若是沒有達到 corePoolSize 的值, 則核心線程執行任務, 若是達到了, 則入隊等候, 若是隊列已滿, 則新建線程(非核心線程)執行任務, 又若是總線程數到了maximumPoolSize, 而且隊列也滿了, 則發生錯誤.

DelayQueue: 隊列內元素必須實現 Delayed 接口, 這就意味着你傳進去的任務必須先實現Delayed接口. 這個隊列接收到任務時, 首先先入隊, 只有達到了指定的延時時間, 纔會執行任務.

ThreadFactory threadFactory

它是ThreadFactory類型的變量, 用來建立新線程.

默認使用 Executors.defaultThreadFactory() 來建立線程. 使用默認的 ThreadFactory 來建立線程時, 會使新建立的線程具備相同的 NORM_PRIORITY 優先級而且是非守護線程, 同時也設置了線程的名稱.

RejectedExecutionHandler handler

表示當拒絕處理任務時的策略, 有如下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認).
ThreadPoolExecutor.DiscardPolicy:直接丟棄任務, 可是不拋出異常.
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務, 而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:用調用者所在的線程來執行任務.

submit 方法

返回一個 Future 對象, 這個 Future 對象表明這線程的執行結果. 當主線程調用 Futureget 方法的時候會獲取到從線程中返回的結果數據. 若是在線程的執行過程當中發生了異常, get 會獲取到異常的信息.

submit 提交的方式有以下三種狀況.

<T> Future<T> submit(Callable<T> task);

Callable 接口和 Runnable 接口的定義很相似, 只不過 Runnable 接口中是一個沒有返回值的 run 方法, 而 Callable接口中是一個有返回值的 call 方法.

Future<?> submit(Runnable task);

也能夠提交一個 Runable 接口的對象, 這樣當調用 get 方法的時候, 若是線程執行成功會直接返回 null, 若是線程執行異常會返回異常的信息.

<T> Future<T> submit(Runnable task, T result);

這個接口就比較有意思了, 除了 task 以外還有一個 result 對象, 當線程正常結束的時候調用 Futureget 方法會返回 result 對象, 當線程拋出異常的時候會獲取到對應的異常的信息.

值得注意的是:

submit 在執行過程當中與 execute 不同, 不會拋出異常而是把異常保存在成員變量中, 在 FutureTask.get 阻塞獲取的時候再把異常拋出來.

execute 直接拋出異常以後線程就死掉了, submit 保存異常線程沒有死掉, 所以 execute 的線程池可能會出現沒有意義的狀況, 由於線程沒有獲得重用. 而 submit 不會出現這種狀況.

線程池好處

"在線程池中執行任務" 比 "爲每一個任務分配一個線程" 優點更多. 經過重用現有的線程而不是建立新線程, 能夠在處理多個請求時分攤在線程建立和銷燬過程當中產生的巨大開銷.

另外一個額外的好處是, 當請求到達時, 工做線程一般已經存在, 所以不會因爲等待建立線程而延遲任務執行, 從而提升了響應性.

經過適當調整線程池打的大小, 能夠建立足夠多的線程以便使處理器保持忙碌狀態, 同時還能夠防止過多線程相互競爭資源而使應用程序耗盡內存或失敗.

相關文章
相關標籤/搜索