ThreadPoolExecutor-線程池開發的使用

很久沒有寫過筆記了,最近作的一個項目涉及打線程池和隊列的開發,以爲在這個項目中學習到的仍是挺多的,對線程安全,併發的知識有加深認知;固然,如今用過的東西並非表明之後還能嫺熟的使用,作好筆記很是重要;緩存

1:必須明白爲何要使用線程池:(這點很重要)安全

  a:手上項目所需,由於項目主要的目的是實現多線程的數據推送;須要建立多線程的話,那就要處理好線程安全的問題;由於項目須要,還涉及到排隊下載的功能,因此就選擇了線程池來管理線程以及線程池裏面的任務隊列workQueue來實現項目所需的功能;服務器

  b:在實際使用中,服務器在建立和銷燬線程上花費的時間和消耗的系統資源都至關大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多。除了建立和銷燬線程的開銷以外,活動的線程也須要消耗系統資源。若是在一個jvm裏建立太多的線程,可能會使系統因爲過分消耗內存或「切換過分」而致使系統資源不足。爲了防止資源不足,服務器應用程序須要採起一些辦法來限制任何給定時刻處理的請求數目,儘量減小建立和銷燬線程的次數,特別是一些資源耗費比較大的線程的建立和銷燬,儘可能利用已有對象來進行服務,這就是「池化資源」技術產生的緣由。 線程池主要用來解決線程生命週期開銷問題和資源不足問題(這段是摘自網絡)網絡

2:如何建立一個線程池:多線程

  

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

 

 

 

這裏只是建立線程池其中的一個構造函數;其實其餘的構造函數最終仍是調用的這個構造函數;併發

說明一下這些參數的做用:jvm

corePoolSize:核心池的大小,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;ide

maximumPoolSize:線程池最大線程數,它表示在線程池中最多能建立多少個線程;這個參數是跟後面的阻塞隊列聯繫緊密的;只有當阻塞隊列滿了,若是還有任務添加到線程池的話,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務;若是繼續添加任務到線程池,且線程池中的線程數已經達到了maximumPoolSize,那麼線程就會就會執行reject操做(這裏後面會說起到)函數

keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止;默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用;即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法並設置了參數爲true,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的阻塞隊列大小爲0;(這部分經過查看ThreadPoolExecutor的源碼分析--getTask()部分);源碼分析

unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性(時間單位)

workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇  

  ArrayBlockingQueue;

  LinkedBlockingQueue;

  SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

threadFactory:線程工廠,主要用來建立線程:默認值 DefaultThreadFactory;

handler:表示當拒絕處理任務時的策略,就是上面說起的reject操做;有如下四種取值:

  ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。(默認handle)

  ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。

  ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)

  ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

3:對線程池的基本使用及其部分源碼的分析(注意:這裏的源碼分析是基於jdk1.6;)

a:線程池的狀態 

volatile int runState;
static final int RUNNING = 0; 運行狀態
static final int SHUTDOWN = 1; 關閉狀態;SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢
static final int STOP = 2;中止狀態;此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務
static final int TERMINATED = 3;終止狀態;當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態

b:參數再次說明。這是摘自網絡的解釋,我以爲他比喻的很好,因此這裏直接就用它的解釋

  這裏要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

  corePoolSize在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小。舉個簡單的例子:

  假若有一個工廠,工廠裏面有10個工人,每一個工人同時只能作一件任務。

  所以只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人作;

  當10個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待;

  若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招4個臨時工人進來;

  而後就將任務也分配給這4個臨時工人作;

  若是說着14個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。

  不過爲了方便理解,在本文後面仍是將corePoolSize翻譯成核心池大小。

  largestPoolSize只是一個用來起記錄做用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係。

c:添加線程池任務的入口就是execute();

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();//任務爲空時拋出異常
    //若是線程池線程大小小於核心線程,就新建一個線程加入任務並啓動線程
    //若是線程池線程大小大於核心線且且添加任務到線程失敗,就把任務添加到阻塞隊列
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//新建線程並啓動
        if (runState == RUNNING && workQueue.offer(command)) {//添加任務到隊列
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);//添加到隊列失敗或已滿,作拒接任務處理策略
        }
        //若阻塞隊列失敗或已滿;這裏新建一個線程並啓動作應急處理(這裏就是用到了maximumPoolSize參數)
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // 若線程池的線程超過了maximumPoolSize;就作拒絕處理任務策略
    }
}

-->>繼續跟蹤代碼到addIfUnderCorePoolSize(Runnable firstTask):函數名稱就能夠看出來這個函數要執行的什麼;若是線程池的線程小於核心線程數corePoolSize就新建線程加入任務並啓動線程【在從此的開發中儘可能把須要作的功能在函數名體現出來】

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;//獲取當前線程池的鎖
        mainLock.lock();//加鎖
        try {
            /*
            這裏線程池線程大小還須要判斷一次;前面的判斷過程當中並無加鎖,所以可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完以後,在其餘線程中又向線程池提交了任務,就可能致使poolSize不小於corePoolSize了,因此須要在這個地方繼續判斷
            */
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);//新建線程
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();//若建立線程超過,就啓動線程池的線程 return true;
    }
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);//worker:ThreadPoolExecutor的內部類;
        Thread t = threadFactory.newThread(w);//使用線程工廠建立一個線程
        if (t != null) {
            w.thread = t;
            workers.add(w);//保存線程池正在運行的線程
            int nt = ++poolSize;//線程池的線程數加1
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

-->>接下來定位worker類,看看線程池裏的線程是如何執行的

上面的addIfUnderCorePoolSize(..)已經把線程啓動了;如今就直接查看worker 的run()方法了

public void run() {
    try {
        Runnable task = firstTask;//該線程的第一個任務,執行完後就從阻塞隊列取任務執行
        firstTask = null;
        while (task != null || (task = getTask()) != null) {//getTask()從隊列去任務執行
            runTask(task);//線程執行任務
            task = null;
        }
    } finally {
        workerDone(this);//若任務所有執行完,就開始嘗試去中止線程池;這部分代碼就再也不追蹤下去,有興趣的讀者能夠本身打開源碼分析,沒必要懼怕,學習大神們的編碼方式,看源碼能讓你學習到不少
    }
}
 private void runTask(Runnable task) {
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try {
         //屢次檢查線程池有沒有關閉
        if (runState < STOP &&
            Thread.interrupted() &&
            runState >= STOP)
            thread.interrupt();
            
        boolean ran = false;
        //這裏就能夠繼承ThreadPoolExecutor,並覆蓋beforeExecute(...)該方法,來作一些執行任務以前的統計工做或者用來保存正在執行的任務
        beforeExecute(thread, task);
        try {
            task.run();
            ran = true;
            //這裏就能夠繼承ThreadPoolExecutor,並覆蓋beforeExecute(...)該方法,來作一些執行任務完成以後的統計工做或者用來保存正在執行的任務
            afterExecute(task, null);
            ++completedTasks;//統計總共執行的任務數
        } catch (RuntimeException ex) {
            if (!ran)
                afterExecute(task, ex);
            throw ex;
        }
    } finally {
        runLock.unlock();
    }
}

至此線程池基本的流程完了;

再說說我在項目中的使用:

MyExtendThreadPoolExecutor 繼承了 ThreadPoolExecutor,並覆蓋了其中的一些方法
public class MyExtendThreadPoolExecutor extends ThreadPoolExecutor{
    public static Logger logger=LoggerFactory.getLogger(MyExtendThreadPoolExecutor.class);
    /**
     * 記錄運行中任務
     */
    private LinkedBlockingQueue<Runnable> workBlockingQueue=new  LinkedBlockingQueue<Runnable>();
    
    public MyExtendThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        workBlockingQueue.add((GtdataBreakpointResumeDownloadThread)r);//保存在運行的任務
        logger.info("Before the task execution");
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        workBlockingQueue.remove((GtdataBreakpointResumeDownloadThread)r);//移除關閉的任務
        logger.info("After the task execution");
    }
    /**
     * 
    * Description: 正在運行的任務
    * @return LinkedBlockingQueue<Runnable><br>
    * @author lishun 
     */
    public LinkedBlockingQueue<Runnable> getWorkBlockingQueue() {
        return workBlockingQueue;
    }
}

MyExtendThreadPoolExecutor pool = new MyExtendThreadPoolExecutor(3, 3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue <Runnable>()); //建立線程池

public void addToThreadPool(DownloadRecord downloadRecord){
    BlockingQueue<Runnable> waitThreadQueue = pool.getQueue();//Returns the task queue 
    LinkedBlockingQueue<Runnable> workThreadQueue =pool.getWorkBlockingQueue();//Returns the running work
    GtdataBreakpointResumeDownloadThread downloadThread = 
            new GtdataBreakpointResumeDownloadThread(downloadRecord);//須要執行的任務線程
    
    if (!waitThreadQueue.contains(downloadThread)&&!workThreadQueue.contains(downloadThread)) {//判斷任務是否存在正在運行的線程或存在阻塞隊列,不存在的就加入線程池(這裏的比較要重寫equals())
        Timestamp recordtime = new Timestamp(System.currentTimeMillis());
        logger.info("a_workThread:recordId="+downloadRecord.getId()+",name="+downloadRecord.getName()+" add to  workThreadQueue");
        downloadThread.setName("th_"+downloadRecord.getName());
        pool.execute(downloadThread);//添加到線程池
    }else{
        logger.info("i_workThread:recordId="+downloadRecord.getId()+",name="+downloadRecord.getName()+" in  waitThreadQueue or workThreadQueue");
    }
}
相關文章
相關標籤/搜索