淺談Java併發編程系列(六) —— 線程池的使用

線程池的做用

  1. 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的資源浪費。java

  2. 提升響應速度。當任務到達時,不須要等到線程建立就能當即執行。數據庫

  3. 方便管理線程。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠對線程進行統一的分配,優化及監控。安全

設置線程池大小

設置線程池的大小能夠從如下幾個方面分析入手:多線程

  • 系統中有多少個cpu?less

  • 多大的內存?ide

  • 任務是計算密集型、I/O密集集仍是兩者皆可?oop

  • 是否須要像JDBC鏈接這樣的稀缺資源?優化

對於計算密集型的任務,在擁有N個cpu的機器上,一般將線程池大小設置爲N+1時,可以實現最優的利用率。 對於包含I/O操做或者其餘阻塞操做的任務,因爲線程並不會一直執行,所以線程池的規模應該更大。
可經過以下公式進行估計:
$$N_{threads} = N_{cpu}*U_{cpu}*(1+\frac{W}{C})$$
其中:
$$ U_{cpu} = target\ CPU\ utilization, 0 \le U_{cpu} \le 1$$
$$ \frac{W}{C} = ration\ of\ wait\ time\ to\ compute\ time$$
能夠經過Rumtime來得到CUP的數目:ui

int N_CPUS = Runtime.getRuntime().availableProcessor();

固然,CPU週期並非惟一影響線程池大小的資源,還包括內存、文件句柄、套接字句柄和數據庫鏈接等。計算方法:計算每一個任務對該資源的需求量,而後用該資源的可用總量除以每一個任務的須要量,所得結果就是線程池的大小上限。this

線程池的實現原理

ThreadPoolExecutor

Java的線程池針對不一樣應用的場景,主要有固定長度類型、可變長度類型以及定時執行等幾種。針對這幾種類型的建立,java中有一個專門的Executors類提供了一系列的方法封裝了具體的實現。這些功能和用途不同的線程池主要依賴ThreadPoolExecutor, ScheduledThreadPoolExecutor等幾個類。如前面文章討論所說,這些類和相關類的主要結構以下:

Java線程池相關類

ThreadPoolExecutor是實現線程池最核心的類之一。在分析ThreadPoolExecutor的實現原理以前,讓來看看實現線程池須要考慮的點:
從線程池自己的定義來看,它是將一組事先建立好的線程放在一個資源池裏,當須要的時候就將該線程分配給具體的任務來執行。那麼,這個池子的大小如何肯定?線程池確定要面臨多個線程資源訪問的狀況,是否是自己的結構要保證線程安全呢?若是線程池建立好以後後續有若干任務使用了線程資源,當池裏面的資源使用完以後要如何安排?是給線程擴容,建立更多的線程資源,仍是增長一個隊列,讓一些任務先在裏面排隊呢?在一些極端的狀況下,好比任務數量實在太多線程池處理不過來,對於這些任務怎麼處理呢?線程執行的時候會碰到異常或都錯誤的狀況,這些異常要如何處理?如何保證這些異常的處理不會致使線程池其餘任務的正常運行不出錯呢?

總結一下,這些問題能夠概括爲以下幾點:

  1. 線程池的結構;

  2. 線程池的任務分配策略;

  3. 線程池的異常和錯誤處理機制;

下面結合ThreadPoolExecutor的實現源碼來詳細分析一下。

線程數量和線程池狀態

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是線程池的主要控制狀態,它是一個AtomicInteger的數值,它表示了兩部份內容:

  1. workerCount, 表示有效線程數;

  2. runState, 表示線程池狀態,是否運行,中止等。

ctl是用一個integer(32)來包含線程池狀態和數量的表示,高三位爲線程池的狀態,後(2^29)-1爲線程數限制。 這就是爲何前面用一個Integer.SIZE-3來做爲位數。這樣這個整數的0-28位表示的就是線程的數目。而高位的部分,29-31位表示線程池的狀態。這裏定義的主要有5種狀態,分別對應值是從-1到3. 他們對應着線程的running, shutdown, stop, tidying, terminated這幾個狀態。

線程池的結構

除了以上部分外,線程池裏還有以下成員:

private final BlockingQueue<Runnable> workQueue;
    
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;

workerQueue: 一個BlockingQueue<Runnable>隊列,自己的結構能夠保證訪問的線程安全。至關於一個排隊等待隊列。當線程池的線程數達到corePoolSize的時候,一些須要等待執行的線程就放到這個隊列裏等待。
worker: 一個HashSet<Worker> 集合。線程池裏全部能夠當即執行的線程都放在這個集合裏。
mainLock: 一個訪問workers所須要使用的鎖。從前面的workQueue,workers這兩個結構能夠看出,若是要往線程池裏面增長執行任務或者執行完畢一個任務,都要訪問這兩個結構。因此大多數狀況下爲了保證線程安全,就須要使用mainLock這個鎖。
corePoolSize:處於活躍狀態的最少worker數目。每一個worker會建立一個新的線程去執行任務。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務來才建立線程去執行,除非調用了prestartAllCoreThreads()或prestartCoreThread()方法。當線程池中的線程數達到corePoolSize後,就會把到達的任務放到workerQueue中去;
maximumPoolSize: 線程池的最大長度。當線程池裏面的線程數達到這個數字時就不能再往裏面加了,此時會根據設置的handler參數,即拒絕處理任務策略來處理新到來的任務。
keepAliveTime: 表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才起做用。當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程數爲0。
threadFactory: 線程工廠,主要用來建立線程;
largestPoolSize: 用來記錄線程池中曾經有過的最大線程數,跟線程池的容易沒有任何關係。
handler: 表示當拒絕處理任務時的策略,有以四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

任務的執行者——Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
相關文章
相關標籤/搜索