Java 併發編程 | 線程池詳解

原文: chenmingyu.top/concurrent-…java

線程池

線程池用來處理異步任務或者併發執行的任務編程

優勢:數組

  1. 重複利用已建立的線程,減小建立和銷燬線程形成的資源消耗
  2. 直接使用線程池中的線程,提升響應速度
  3. 提升線程的可管理性,由線程池同一管理

ThreadPoolExecutor

java中線程池使用ThreadPoolExecutor實現緩存

構造函數

ThreadPoolExecutor提供了四個構造函數,其餘三個構造函數最終調用的都是下面這個構造函數併發

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

入參:框架

  1. corePoolSize:線程池的核心線程數量異步

    線程池維護的核心線程數量,當線程池初始化後,核心線程數量爲零,當有任務來到的時候纔會建立線程去執行任務,當線程池中的工做線程數量等於核心線程數量時,新到的任務就會放到緩存隊列中函數

  2. maximumPoolSize:線程池容許建立的最大線程數量oop

    當阻塞隊列滿了的時候,而且線程池中建立的線程數量小於maximumPoolSize,此時會建立新的線程執行任務ui

  3. keepAliveTime:線程活動保持時間

    只有當線程池數量大於核心線程數量時,keepAliveTime纔會有效,若是當前線程數量大於核心線程數量時,而且線程的空閒時間達到keepAliveTime,當前線程終止,直到線程池數量等於核心線程數

  4. unit:線程活動保持時間的單位

    keepAliveTime的單位,包括:TimeUnit.DAYS天,TimeUnit.HOURS小時,TimeUnit.MINUTES分鐘,TimeUnit.SECONDS秒,TimeUnit.MILLISECONDS毫秒,TimeUnit.MICROSECONDS微秒,TimeUnit.NANOSECONDS納秒

  5. workQueue:任務隊列,用來保存等待執行任務的阻塞隊列

    ArrayBlockingQueue:是一個基於數組結構的有界隊列

    LinkedBlockingQueue:是一個基於鏈表結構的阻塞隊列

    SynchronousQueue:不存儲元素的阻塞隊列,每個插入操做必須等到下一個線程調用移除操做,不然插入操做一直阻塞

    PriorityBlockingQueue:一個具備優先級的無線阻塞隊列

  6. threadFactory:用來建立線程的工廠

  7. handler:飽和策略,當線程池和隊列都滿了的時候,必需要採起一種策略處理新的任務,默認策略是AbortPolicy,根據本身需求選擇合適的飽和策略

    AbortPolicy:直接拋出異常

    CallerRunsPolicy:用調用者所在的線程來運行當前任務

    DiscardOldestPolicy:丟棄隊列裏面最近的一個任務,並執行當前任務

    DiscardPolicy:不處理,丟棄掉

    固然咱們也能夠經過實現RejectedExecutionHandler去自定義實現處理策略

入參不一樣,線程池的運行機制也不一樣,瞭解每一個入參的含義因爲咱們更透傳的理解線程池的實現原理

提交任務

線程池處理提交任務流程以下

處理流程

  1. 若是核心線程數量未滿,建立線程執行任務,不然添加到阻塞隊列中
  2. 若是阻塞隊列中未滿,將任務存到隊列裏
  3. 若是阻塞隊列滿了,看線程池數量是否達到了線程池最大數量,若是沒達到,建立線程執行任務
  4. 若是已經達到線程池最大數量,根據飽和策略進行處理

ThreadPoolExecutor使用execute(Runnable command)submit(Runnable task)向線程池中提交任務,在submit(Runnable task)方法中調用了execute(Runnable command),因此咱們只要瞭解execute(Runnable command)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 獲取線程池狀態,而且能夠經過ctl獲取到當前線程池數量及線程池狀態
    int c = ctl.get();
    // 若是工做線程數小於核心線程數量,則建立一個新線程執行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 若是不符合上面條件,當前線程處於運行狀態而且寫入阻塞隊列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 雙重檢查,再次獲取線程狀態,若是當前線程狀態變爲非運行狀態,則從隊列中移除任務,執行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 檢查工做線程數量是否爲0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //建立線程執行任務,若是添加失敗則執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

execute(Runnable command)方法中咱們比較關心的就是如何建立新的線程執行任務,就addWorker(command, true)方法

workQueue.offer(command)方法是用來向阻塞隊列中添加任務的

reject(command)方法會根據建立線程池時傳入的飽和策略對任務進行處理,例如默認的AbortPolicy,查看源碼後知道就是直接拋了個RejectedExecutionException異常,其餘的飽和策略的源碼也是特別簡單

關於線程池狀態與工做線程的數量是如何表示的

ThreadPoolExecutor中使用一個AtomicInteger類型變量表示

/** * ctl表示兩個信息,一個是線程池的狀態(高3位表示),一個是當前線程池的數量(低29位表示),這個跟咱們前面 * 說過的讀寫鎖的state變量是同樣的,以一個變量記錄兩個信息,都是以利用int的32個字節,高十六位表述讀,低十 * 六位表示寫鎖 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//低29位保存線程池數量
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 運行狀態存儲在高3位
// 運行狀態
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

addWorker(command, boolean)建立工做線程,執行任務

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 線程池狀態
        int rs = runStateOf(c);
        // 判斷線程池狀態,以及阻塞隊列是否爲空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取線程工做線程數量
            int wc = workerCountOf(c);
            // 判斷是否大於最大容量,以及根據傳入的core判斷是否大於核心線程數量仍是最大線程數量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增長工做線程數量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //若是線程池狀態改變,則重試
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 建立Worker,內部建立了一個新的線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				// 線程池狀態判斷
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將建立的線程添加到線程池
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //執行任務,首先會執行Worker對象的firstTask
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //若是任務執行失敗
        if (! workerStarted)
            //移除worker
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼
關閉線程池

ThreadPoolExecutor中關閉線程池使用shutdown()shutdownNow()方法,原理都是經過遍歷線程池中的線程,對線程進行中斷

for (Worker w : workers) {
	Thread t = w.thread;
	if (!t.isInterrupted() && w.tryLock()) {
		try {
			t.interrupt();
		} catch (SecurityException ignore) {
		} finally {
			w.unlock();
		}
	}
	if (onlyOne)
		break;
	}
複製代碼
Executor框架

Executor框架將任務的提交與任務的執行進行分離

Executors提供了一系列工廠方法用於創先線程池,返回的線程池都實現了 ExecutorService 接口

工廠方法:

  1. newFixedThreadPool:用於建立固定數目線程的線程池
  2. newCachedThreadPool:用於建立一個可緩存的線程池,調用execute將重用之前構造的線程,若是現有線程沒有可用的,則建立一個新線 程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程
  3. newSingleThreadExecutor:用於建立只有一個線程的線程池
  4. newScheduledThreadPool:用於建立一個支持定時及週期性的任務執行的線程池

在阿里巴巴手冊中強制要求禁止使用Executors提供的工廠方法建立線程池

這個確實是一個很嚴重的問題,咱們部門曾經就出現過使用FixedThreadPool線程池,致使OOM,這是由於線程執行任務的時候被阻塞或耗時很長時間,致使阻塞隊列一直在添加任務,直到內存被打滿,報OOM

因此咱們在使用線程池的時候要使用ThreadPoolExecutor的構造函數去建立線程池,根據本身的任務類型來肯定核心線程數和最大線程數,選擇適合阻塞隊列和阻塞隊列的長度

合理配置線程池

合理的配置線程池須要分析一下任務的性質(使用ThreadPoolExecutor建立線程池):

  1. CPU密集型任務應配置竟可能小的線程,好比 cpu數量+1

  2. IO密集型任務並非一直在執行任務,應該配置儘量多的線程,好比 cpu數量x2

    可經過Runtime.getRuntime().availableProcessors()獲取cpu數量

  3. 執行的任務有調用外部接口比較費時的時候,這時cup空閒的時間就越長,能夠將線程池數量設置大一些,這樣cup空閒的時間就能夠去執行別的任務

  4. 建議使用有界隊列,可根據須要將長度設置大一些,防止OOM

參考:java併發編程的藝術

推薦閱讀

java併發編程 | 線程詳解

java併發編程 | 鎖詳解:AQS,Lock,ReentrantLock,ReentrantReadWriteLock

相關文章
相關標籤/搜索