使用 Executors,ThreadPoolExecutor,建立線程池,源碼分析理解

以前建立線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 這四個方法。
固然 Executors 也是用不一樣的參數去 new ThreadPoolExecutor 實現的,本文先分析前四種線程建立方式,後在分析 new ThreadPoolExecutor 建立方式java

使用 Executors 建立線程池

1.newFixedThreadPool()

因爲使用了LinkedBlockingQueue因此maximumPoolSize沒用,當corePoolSize滿了以後就加入到LinkedBlockingQueue隊列中。
每當某個線程執行完成以後就從LinkedBlockingQueue隊列中取一個。
因此這個是建立固定大小的線程池。git

源碼分析github

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

2.newSingleThreadPool()

建立線程數爲1的線程池,因爲使用了LinkedBlockingQueue因此maximumPoolSize 沒用,corePoolSize爲1表示線程數大小爲1,滿了就放入隊列中,執行完了就從隊列取一個。數組

源碼分析緩存

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (
                    new ThreadPoolExecutor(
                            1,
                            1,
                            0L,
                            TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>())
            );
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

3.newCachedThreadPool()

建立可緩衝的線程池。沒有大小限制。因爲corePoolSize爲0因此任務會放入SynchronousQueue隊列中,SynchronousQueue只能存放大小爲1,因此會馬上新起線程,因爲maxumumPoolSizeInteger.MAX_VALUE因此能夠認爲大小爲2147483647。受內存大小限制。多線程

源碼分析併發

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
            0,
            Integer.MAX_VALUE,
            60L,
            TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

使用 ThreadPoolExecutor 建立線程池

源碼分析 ,ThreadPoolExecutor 的構造函數函數

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

構造函數參數

一、corePoolSize 核心線程數大小,當線程數 < corePoolSize ,會建立線程執行 runnableoop

二、maximumPoolSize 最大線程數, 當線程數 >= corePoolSize的時候,會把 runnable 放入 workQueue中源碼分析

三、keepAliveTime 保持存活時間,當線程數大於corePoolSize的空閒線程能保持的最大時間。

四、unit 時間單位

五、workQueue 保存任務的阻塞隊列

六、threadFactory 建立線程的工廠

七、handler 拒絕策略

任務執行順序

一、當線程數小於 corePoolSize時,建立線程執行任務。

二、當線程數大於等於 corePoolSize而且 workQueue 沒有滿時,放入workQueue

三、線程數大於等於 corePoolSize而且當 workQueue 滿時,新任務新建線程運行,線程總數要小於 maximumPoolSize

四、當線程總數等於 maximumPoolSize 而且 workQueue 滿了的時候執行 handlerrejectedExecution。也就是拒絕策略。

JDK7提供了7個阻塞隊列。(也屬於併發容器)

  1. ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  2. LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  3. PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  4. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  5. SynchronousQueue:一個不存儲元素的阻塞隊列。
  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  7. LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

什麼是阻塞隊列?

阻塞隊列是一個在隊列基礎上又支持了兩個附加操做的隊列。

2個附加操做:

支持阻塞的插入方法:隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
支持阻塞的移除方法:隊列空時,獲取元素的線程會等待隊列變爲非空。

阻塞隊列的應用場景

阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。簡而言之,阻塞隊列是生產者用來存放元素、消費者獲取元素的容器。

幾個方法

在阻塞隊列不可用的時候,上述2個附加操做提供了四種處理方法

方法處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

JAVA裏的阻塞隊列

JDK 7 提供了7個阻塞隊列,以下

一、ArrayBlockingQueue 數組結構組成的有界阻塞隊列。

此隊列按照先進先出(FIFO)的原則對元素進行排序,可是默認狀況下不保證線程公平的訪問隊列,即若是隊列滿了,那麼被阻塞在外面的線程對隊列訪問的順序是不能保證線程公平(即先阻塞,先插入)的。

二、LinkedBlockingQueue一個由鏈表結構組成的有界阻塞隊列

此隊列按照先出先進的原則對元素進行排序

三、PriorityBlockingQueue支持優先級的無界阻塞隊列

四、DelayQueue支持延時獲取元素的無界阻塞隊列,便可以指定多久才能從隊列中獲取當前元素

五、SynchronousQueue不存儲元素的阻塞隊列,每個put必須等待一個take操做,不然不能繼續添加元素。而且他支持公平訪問隊列。

六、LinkedTransferQueue由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,多了tryTransfer和transfer方法

transfer方法

若是當前有消費者正在等待接收元素(take或者待時間限制的poll方法),transfer能夠把生產者傳入的元素馬上傳給消費者。若是沒有消費者等待接收元素,則將元素放在隊列的tail節點,並等到該元素被消費者消費了才返回。

tryTransfer方法

用來試探生產者傳入的元素可否直接傳給消費者。,若是沒有消費者在等待,則返回false。和上述方法的區別是該方法不管消費者是否接收,方法當即返回。而transfer方法是必須等到消費者消費了才返回。

七、LinkedBlockingDeque鏈表結構的雙向阻塞隊列,優點在於多線程入隊時,減小一半的競爭。

四個拒絕策略

ThreadPoolExecutor默認有四個拒絕策略:

一、ThreadPoolExecutor.AbortPolicy() 直接拋出異常RejectedExecutionException

二、ThreadPoolExecutor.CallerRunsPolicy() 直接調用run方法而且阻塞執行

三、ThreadPoolExecutor.DiscardPolicy() 直接丟棄後來的任務

四、ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在隊列中隊首的任務

固然能夠本身繼承RejectedExecutionHandler來寫拒絕策略.

TestThreadPoolExecutor 示例

TestThreadPoolExecutor.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 描述:
 *
 * @author yanpenglei
 * @create 2017-10-12 15:39
 **/
public class TestThreadPoolExecutor {
    public static void main(String[] args) {

        long currentTimeMillis = System.currentTimeMillis();

        // 構造一個線程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
        );

        for (int i = 1; i <= 10; i++) {
            try {
                String task = "task=" + i;
                System.out.println("建立任務並提交到線程池中:" + task);
                threadPool.execute(new ThreadPoolTask(task));

                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        try {
            //等待全部線程執行完畢當前任務。
            threadPool.shutdown();

            boolean loop = true;
            do {
                //等待全部線程執行完畢當前任務結束
                loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
            } while (loop);

            if (loop != true) {
                System.out.println("全部線程執行完畢");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("耗時:" + (System.currentTimeMillis() - currentTimeMillis));
        }


    }
}

ThreadPoolTask.java

package io.ymq.thread.TestThreadPoolExecutor;

import java.io.Serializable;

/**
 * 描述:
 *
 * @author yanpenglei
 * @create 2017-10-12 15:40
 **/
public class ThreadPoolTask implements Runnable, Serializable {

    private Object attachData;

    ThreadPoolTask(Object tasks) {
        this.attachData = tasks;
    }

    public void run() {

        try {

            System.out.println("開始執行任務:" + attachData + "任務,使用的線程池,線程名稱:" + Thread.currentThread().getName());

            System.out.println();

        } catch (Exception e) {
            e.printStackTrace();
        }
        attachData = null;
    }

}

遇到java.util.concurrent.RejectedExecutionException

第一

你的線程池 ThreadPoolExecutor 顯示的 shutdown() 以後,再向線程池提交任務的時候。 若是你配置的拒絕策略是 AbortPolicy 的話,這個異常就會拋出來。

第二

當你設置的任務緩存隊列太小的時候,或者說, 你的線程池裏面全部的線程都在幹活(線程數== maxPoolSize),而且你的任務緩存隊列也已經充滿了等待的隊列, 這個時候,你再向它提交任務,則會拋出這個異常。

響應

能夠看到線程 pool-1-thread-1 到5 循環使用

建立任務並提交到線程池中:task=1
開始執行任務:task=1任務,使用的線程池,線程名稱:pool-1-thread-1

建立任務並提交到線程池中:task=2
開始執行任務:task=2任務,使用的線程池,線程名稱:pool-1-thread-2

建立任務並提交到線程池中:task=3
開始執行任務:task=3任務,使用的線程池,線程名稱:pool-1-thread-3

建立任務並提交到線程池中:task=4
開始執行任務:task=4任務,使用的線程池,線程名稱:pool-1-thread-4

建立任務並提交到線程池中:task=5
開始執行任務:task=5任務,使用的線程池,線程名稱:pool-1-thread-5

建立任務並提交到線程池中:task=6
開始執行任務:task=6任務,使用的線程池,線程名稱:pool-1-thread-1

建立任務並提交到線程池中:task=7
開始執行任務:task=7任務,使用的線程池,線程名稱:pool-1-thread-2

建立任務並提交到線程池中:task=8
開始執行任務:task=8任務,使用的線程池,線程名稱:pool-1-thread-3

建立任務並提交到線程池中:task=9
開始執行任務:task=9任務,使用的線程池,線程名稱:pool-1-thread-4

建立任務並提交到線程池中:task=10
開始執行任務:task=10任務,使用的線程池,線程名稱:pool-1-thread-5

全部線程執行完畢
耗時:1015

測試代碼

github https://github.com/souyunku/ymq-example/tree/master/ymq-thread

Contact

  • 做者:鵬磊
  • 出處:http://www.ymq.io
  • Email:admin@souyunku.com
  • 版權歸做者全部,轉載請註明出處
  • Wechat:關注公衆號,搜雲庫,專一於開發技術的研究與知識分享

關注公衆號-搜雲庫

相關文章
相關標籤/搜索