如何讓ThreadPoolExecutor更早地建立非核心線程

最近在項目中遇到一個須要用線程池來處理任務的需求,因而我用ThreadPoolExecutor來實現,可是在實現過程當中我發現提交大量任務時它的處理邏輯是這樣的(提交任務還有一個submit方法內部也調用了execute方法):java

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

註釋中已經寫的很是明白:瀏覽器

  1. 若是線程數量小於corePoolSize,直接建立新線程處理任務
  2. 若是線程數量等於corePoolSize,嘗試將任務放到等待隊列裏
  3. 若是等待隊列已滿,嘗試建立非核心線程處理任務(若是maximumPoolSIze > corePoolSize

可是在個人項目中一個線程啓動須要10s左右的時間(須要啓動一個瀏覽器對象),所以我但願實現一個更精細的邏輯提高資源的利用率:安全

  1. 線程池保持corePoolSize個線程確保有新任務到來時能夠當即獲得執行
  2. 當沒有空閒線程時,先把任務放到等待隊列中(由於開啓一個線程須要10s,因此若是在等待隊列比較小的時候,等待其餘任務完成比等待新線程建立更快)
  3. 當等待隊列的大小大於設定的閾值threshold時,說明堆積的任務已經太多了,這個時候開始建立非核心線程直到線程數量已經等於maximumPoolSize
  4. 當線程數量已經等於maximumPoolSize,再將新來的任務放回到任務隊列中等待(直到隊列滿後開始拒絕任務)
  5. 長時間空閒後退出非核心線程回收瀏覽器佔用的內存資源

當我研究了常見的CachedThreadPoolFixedThreadPool以及嘗試本身配置ThreadPoolExecutor的構造函數後,發現不管如何都不能實現上面提到的邏輯,由於默認的實現只有在workQueue達到容量上限後纔會開始建立非核心線程,所以須要經過繼承的方法實現一個新的類來完成需求。多線程

怎麼實如今workQueue到達容量上限前就建立非核心線程?還要回顧下execute函數的代碼ide

//嘗試將任務插入等待隊列,若是返回false
					//說明隊列已經到達容量上限,進入else if邏輯
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
//嘗試建立非核心線程
else if (!addWorker(command, false))
    reject(command);

那麼只要改變workQueue.offer()的邏輯,在線程數量還小於maximumPoolSize的時候就返回false拒絕插入,讓線程池調用addWoker,等不能再建立更多線程時再容許添加到隊列便可。函數

能夠經過子類重寫offer方法來實現添加邏輯的改變oop

@Override
public boolean offer(E e) {
    if (threadPoolExecutor == null) {
        throw new NullPointerException();
    }
    //當調用該方法時,已經肯定了workerCountOf(c) > corePoolSize
    //當數量小於threshold,在隊列裏等待
    if (size() < threshold) {
        return super.offer(e);
	//當數量大於等於threshold,說明堆積的任務太多,返回false
	//讓線程池來建立新線程處理
    } else {
        //此處可能會由於多線程致使錯誤的拒絕
        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
            return false;
        //線程池中的線程數量已經到達上限,只能添加到任務隊列中
        } else {
            return super.offer(e);
        }
    }
}

這樣就實現了基本實現了我須要的功能,可是在寫代碼的過程當中我找到了一個可能出錯的地方:ThreadPoolExecutor線程安全的,那麼重寫的offer方法也可能遇到多線程調用的狀況測試

//設想當poolSize = maximumPoolSize-1時,兩個任務到達此處同時返回false
if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	return false;
}

因爲添加到隊列返回falseexecute方法進入到else if (!addWorker(command, false))ui

if (isRunning(c) && workQueue.offer(command)) {
	int recheck = ctl.get();
	if (! isRunning(recheck) && remove(command))
		reject(command);
	else if (workerCountOf(recheck) == 0)
		addWorker(null, false);
}
//添加到隊列失敗後進入addWorker方法中
else if (!addWorker(command, false))
	reject(command);
}

再來看一下addWorker方法的代碼,這裏只截取須要的一部分this

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
    	//兩個線程都認爲還能夠建立再建立一個新線程
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
        //兩個線程同時調用cas方法只有一個可以成功
        //成功的線程break retry;進入後面的建立線程的邏輯
        //失敗的線程從新回到上面的檢查並返回false
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

最終,在競爭中失敗的線程因爲addWorker方法返回了false最終調用了reject(command)。在前面寫的要實現的邏輯裏提到了,只有在等待隊列容量達到上限沒法再插入時才拒絕任務,可是因爲多線程的緣由,這裏只是超過了threshold但沒有超過capacity的時候就拒絕任務了,因此要對拒絕策略的觸發作出修改:第一次觸發Reject時,嘗試從新添加到任務隊列中(不進行poolSize的檢測),若是仍然不能添加,再拒絕任務
這裏經過對execute方法進行重寫來實現重試

@Override
public void execute(Runnable command) {
    try {
        super.execute(command);
    } catch (RejectedExecutionException e) {
    	/*
    	這裏參考源碼中將任務添加到任務隊列的實現
    	可是其中經過(workerCountOf(recheck) == 0)
    	檢查當任務添加到隊列後是否還有線程存活的部分
    	因爲是private權限的,沒法實現相似的邏輯,所以須要作必定的特殊處理
		if (isRunning(c) && workQueue.offer(command)) {
		     int recheck = ctl.get();
		     if (! isRunning(recheck) && remove(command))
		         reject(command);
		     else if (workerCountOf(recheck) == 0)
		         addWorker(null, false);
		 }
		*/
        if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
            if (this.isShutdown() && remove(command))
                //二次檢查
                realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失敗,隊列已經滿了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}

這裏有兩個小問題:

  1. 初始化線程池傳入的RejectedExecutionHandler不必定會拋出異常(事實上,ThreadPoolExecutor本身實現的4中拒絕策略中只有AbortPolicy可以拋出異常並被捕捉到),所以須要在初始化父類時傳入AbortPolicy拒絕策略並將構造函數中傳入的自定義拒絕策略保存下來,在重試失敗後才調用本身的rejectedExecution
  2. corePoolSize = 0 的極端狀況下,可能出現一個任務剛被插入隊列的同時,全部的線程都結束任務而後被銷燬了,此使這個被加入的任務就沒法被執行,在ThreadPoolExecutor中是經過
    else if (workerCountOf(recheck) == 0)
    	addWorker(null, false);
    在添加後再檢查工做線程是否爲0來確保任務能夠被執行,可是其中使用的方法是私有的,沒法在子類中實現相似的邏輯,所以在初始化時只能強制corePoolSize至少爲1來解決這個問題。

所有代碼以下

public class MyThreadPool extends ThreadPoolExecutor {

    private RejectedExecutionHandler realRejectedExecutionHandler;

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity) {
        this(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                queueCapacity,
                new AbortPolicy());
    }

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        int queueCapacity,
                        RejectedExecutionHandler handler) {
        super(corePoolSize == 0 ? 1 : corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                new MyLinkedBlockingQueue<>(queueCapacity),
                new AbortPolicy());
        ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this);
        realRejectedExecutionHandler = handler;
    }

    @Override
    public void execute(Runnable command) {
        try {
            super.execute(command);
        } catch (RejectedExecutionException e) {
            if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) {
                if (this.isShutdown() && remove(command))
                    //二次檢查
                    realRejectedExecutionHandler.rejectedExecution(command, this);
            } else {
                //插入失敗,隊列已經滿了
                realRejectedExecutionHandler.rejectedExecution(command, this);
            }
        }
    }
}


public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private int threshold = 20;

    private ThreadPoolExecutor threadPoolExecutor = null;

    public MyLinkedBlockingQueue(int queueCapacity) {
        super(queueCapacity);
    }

    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Override
	public boolean offer(E e) {
	    if (threadPoolExecutor == null) {
	        throw new NullPointerException();
	    }
	    //當調用該方法時,已經肯定了workerCountOf(c) > corePoolSize
	    //當數量小於threshold,在隊列裏等待
	    if (size() < threshold) {
	        return super.offer(e);
		//當數量大於等於threshold,說明堆積的任務太多,返回false
		//讓線程池來建立新線程處理
	    } else {
	        //此處可能會由於多線程致使錯誤的拒絕
	        if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) {
	            return false;
	        //線程池中的線程數量已經到達上限,只能添加到任務隊列中
	        } else {
	            return super.offer(e);
	        }
	    }
	}

    public boolean offerWithoutCheck(E e) {
        return super.offer(e);
    }
}

最後進行簡單的測試

corePoolSize:2
maximumPoolSize:5
queueCapacity:10
threshold:7
任務2
線程數量:2
等待隊列大小:0
等待隊列大小小於閾值,繼續等待。
任務3
線程數量:2
等待隊列大小:1
等待隊列大小小於閾值,繼續等待。
任務4
線程數量:2
等待隊列大小:2
等待隊列大小小於閾值,繼續等待。
任務5
線程數量:2
等待隊列大小:3
等待隊列大小小於閾值,繼續等待。
任務6
線程數量:2
等待隊列大小:4
等待隊列大小小於閾值,繼續等待。
任務7
線程數量:2
等待隊列大小:5
等待隊列大小小於閾值,繼續等待。
任務8
線程數量:2
等待隊列大小:6
等待隊列大小小於閾值,繼續等待。
任務9
線程數量:2
等待隊列大小:7
等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,建立新線程處理。
任務10
線程數量:3
等待隊列大小:7
等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,建立新線程處理。
任務11
線程數量:4
等待隊列大小:7
等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,建立新線程處理。
任務12
線程數量:5
等待隊列大小:7
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
任務13
線程數量:5
等待隊列大小:8
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
任務14
線程數量:5
等待隊列大小:9
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
任務15
線程數量:5
等待隊列大小:10
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
隊列已滿
任務16
線程數量:5
等待隊列大小:10
等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。
隊列已滿

再從新複習一遍要實現的功能:

  1. 線程池保持corePoolSize個線程確保有新任務到來時能夠當即獲得執行
  2. 當沒有空閒線程時,先把任務放到等待隊列中(由於開啓一個線程須要10s,因此若是在等待隊列比較小的時候,等待其餘任務完成比等待新線程建立更快)
  3. 當等待隊列的大小大於設定的閾值threshold時,說明堆積的任務已經太多了,這個時候開始建立非核心線程直到線程數量已經等於maximumPoolSize
  4. 當線程數量已經等於maximumPoolSize,再將新來的任務放回到任務隊列中等待(直到隊列滿後開始拒絕任務)
  5. 長時間空閒後退出非核心線程回收瀏覽器佔用的內存資源

能夠看出,線程池運行的邏輯和要實現的目標是相同的。

相關文章
相關標籤/搜索