java 線程池

參考:《實戰java高併發程序設計》 3.2.3 刨根問底:核心線程池的內部是實現 java

java api 提供了幾種工廠方法建立線程池 算法

public class ExecutorServiceTest {
    public static void main(String[] args) {
        // 可根據實際狀況調整線程數量的線程池
        ExecutorService executorService1 = Executors.newCachedThreadPool();
        //固定大小的線程池
        ExecutorService executorService2 = Executors.newFixedThreadPool(1);
        // 線程池大小爲1 
        ExecutorService executorService3 = Executors.newSingleThreadExecutor();
    }
}

查看底層代碼的話,發現其均使用了ThreadPoolExecutor實現。api

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  

因此好好來看一下ThreadPoolExecutor的代碼吧,下面是ThreadPoolExecutor的一個構造函數。併發

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}

corePoolSize  :指定了線程中的核心線程數量函數

maximumPoolSize:指定了線程池中的最大線程數量高併發

keepAliveTime:當線程池中線程數量超過corePoolSize時,多餘的空閒線程的存活時間。即超過corePoolSize的空閒線程,在多長時間,會被銷燬。性能

unit: keepAliveTime 的單位。ui

workQueue:任務隊列,被提交但還沒有被執行的任務。this

threadFactory:線程工廠,用於建立線程,通常用默認的便可。atom

handler:拒絕策略,當任務太多來不及處理的時候,如何拒絕任務。

參數詳解

workQueue 指被提交但未執行的任務隊列,它是一個BlockingQueue接口的對象,僅用於存放Runnable對象,如下介紹幾種BlockingQueue。

直接提交的隊列:SynchronousQueue 對象提供。SynchronousQueue是一個特殊的BlockingQueue。SynchronousQueue沒有容量,每個插入操做都要等待一個相應的刪除操做,反之,每個刪除操做都要等待一個插入操做。若是使用SynchronousQueue,提交的任務不會被真實的保存,而老是將新任務提交給線程執行,若是沒有空閒的線程,則嘗試建立新的線程,若是線程數量已經達到最大值,則執行拒絕策略。所以,若是使用SynchronousQueue,一般要設置很大的maximumPoolSize值,不然很容易執行拒絕策略。

有界的隊列:有界的隊列可使用ArrayBlockingQueue實現。ArrayBlockingQueue的構造函數必須帶一個容量參數,表示該隊列的最大容量。當使用有界隊列時,如有新的任務執行,若是線程數量小於corePoolSize ,則優先建立新的線程,若大於corePoolSize,則會將任務加入隊列中。若隊列已滿,沒法加入,則在總線程數不大於maximumPoolSize的前提下建立新的線程執行任務,若大於maximumPoolSize時,則執行拒絕策略。可見,有界隊列僅當在任務裝滿時,纔可能將線程數量提高到corePoolSize以上。

無界的隊列:無界的隊列能夠經過LinkedBlockingQueue 實現,與有界隊列相比,除非系統資源耗盡,不然無界隊列不存在任務入隊失敗的狀況,當有新的任務到來,系統的線程數小於corePoolSize時,線程池會生成新的線程執行任務,但當系統的線程數達到corePoolSize後,就不會繼續增長。若後續仍有新的任務加入,而又有空閒的線程資源,則任務直接進入隊列等待。若任務建立和處理的速度差別很大,誤解隊列會保持快速增加,直到耗盡系統內存。

優先任務隊列:優先隊列是帶有執行優先隊列級的隊列,它經過PriorityBlockingQueue實現,能夠控制任務執行的前後順序。它是一個無界隊列。不管是有界隊列ArrayBlockingQueue,仍是未指定大小的LinkedBlockingQueue都是按照先進先出算法處理任務的。而PriorityBlockQueue則能夠根據任務自身的優先級順序執行,在確保系統性能的同時,也能有很好的質量保證。

 

拒絕策略:

AbortPolicy:該策略會拋出異常,阻止系統正常工做。

CallerRunsPolicy: 只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣作不會真的丟棄任務,可是,任務提交線程的性能極有可能會急劇降低。

DiscardOledestPolicy:該策略將丟棄一個最老的請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務,

DiscardPolicy:該策略默默丟棄沒法處理的任務,不予任何處理。若是容許任務丟失,這多是最好的一種處理方案了。

 

ThreadPoolExecutor 線程池的核心調度代碼

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

 

是否小於corePoolSize,若是小於,分配線程執行。若是大於corePoolSize,則提交到隊列中,若是提交到隊列失敗,則將任務直接提交給線程池。若是當前線程數已經達到maximumPoolSize,則提交失敗,直接拒絕策略。

在這裏有一個疑問,當隊列已滿時,提交的任務直接提交給線程池執行,那麼這樣的話是不是 後面提交的任務比先提交的任務先執行了嗎?

 

阿里巴巴java開發手冊強制不容許使用Executors去建立線程池,就是因爲Executors建立的線程中對 線程的最大數量或者工做隊列的長度設置爲Integer.MAX_VALUE,會致使OOM。

相關文章
相關標籤/搜索