深刻淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3)[轉]

 

線程池任務執行結果html

這一節來探討下線程池中任務執行的結果以及如何阻塞線程、取消任務等等。java

1 package info.imxylz.study.concurrency.future;

3 public class SleepForResultDemo implements Runnable {

5     static boolean result = false;

7     static void sleepWhile(long ms) {
8         try {
9             Thread.sleep(ms);
10         } catch (Exception e) {}
11     }
12 
13     @Override
14     public void run() {
15         //do work
16         System.out.println("Hello, sleep a while.");
17         sleepWhile(2000L);
18         result = true;
19     }
20 
21     public static void main(String[] args) {
22         SleepForResultDemo demo = new SleepForResultDemo();
23         Thread t = new Thread(demo);
24         t.start();
25         sleepWhile(3000L);
26         System.out.println(result);
27     }
28 
29 }
30 

在沒有線程池的時代裏面,使用Thread.sleep(long)去獲取線程執行完畢的場景不少。顯然這種方式很笨拙,他須要你事先知道任務可能的執行時間,而且還會阻塞主線程,無論任務有沒有執行完畢。算法

1 package info.imxylz.study.concurrency.future;

3 public class SleepLoopForResultDemo implements Runnable {

5     boolean result = false;

7     volatile boolean finished = false;

9     static void sleepWhile(long ms) {
10         try {
11             Thread.sleep(ms);
12         } catch (Exception e) {}
13     }
14 
15     @Override
16     public void run() {
17         //do work
18         try {
19             System.out.println("Hello, sleep a while.");
20             sleepWhile(2000L);
21             result = true;
22         } finally {
23             finished = true;
24         }
25     }
26 
27     public static void main(String[] args) {
28         SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
29         Thread t = new Thread(demo);
30         t.start();
31         while (!demo.finished) {
32             sleepWhile(10L);
33         }
34         System.out.println(demo.result);
35     }
36 
37 }
38 

使用volatile與while死循環的好處就是等待的時間能夠稍微小一點,可是依然有CPU負載高而且阻塞主線程的問題。最簡單的下降CPU負載的方式就是使用Thread.join().數組

        SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
        Thread t = new Thread(demo);
        t.start();
        t.join();
        System.out.println(demo.result);

顯然這也是一種不錯的方式,另外還有本身寫鎖使用wait/notify的方式。其實join()從本質上講就是利用while和wait來實現的。安全

上面的方式中都存在一個問題,那就是會阻塞主線程而且任務不能被取消。爲了解決這個問題,線程池中提供了一個Future接口。網絡

ThreadPoolExecutor-Future

在Future接口中提供了5個方法。數據結構

  • V get() throws InterruptedException, ExecutionException: 等待計算完成,而後獲取其結果。
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。
  • boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。
  • boolean isCancelled():若是在任務正常完成前將其取消,則返回 true
  • boolean isDone():若是任務已完成,則返回 true。 可能因爲正常終止、異常或取消而完成,在全部這些狀況中,此方法都將返回 true

API看起來容易,來研究下異常吧。get()請求獲取一個結果會阻塞當前進程,而且可能拋出如下三種異常:併發

  • InterruptedException:執行任務的線程被中斷則會拋出此異常,此時不能知道任務是否執行完畢,所以其結果是無用的,必須處理此異常。
  • ExecutionException:任務執行過程當中(Runnable#run())方法可能拋出RuntimeException,若是提交的是一個java.util.concurrent.Callable<V>接口任務,那麼java.util.concurrent.Callable.call()方法有可能拋出任意異常。
  • CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務被取消了可是依然去獲取結果。

對於get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,因爲有超時機制,所以還可能獲得一個TimeoutException。ide

boolean cancel(boolean mayInterruptIfRunning)方法比較複雜,各類狀況比較多:函數

  1. 若是任務已經執行完畢,那麼返回false。
  2. 若是任務已經取消,那麼返回false。
  3. 循環直到設置任務爲取消狀態,對於未啓動的任務將永遠再也不執行,對於正在運行的任務,將根據mayInterruptIfRunning是否中斷其運行,若是不中斷那麼任務將繼續運行直到結束。
  4. 此方法返回後任務要麼處於運行結束狀態,要麼處於取消狀態。isDone()將永遠返回true,若是cancel()方法返回true,isCancelled()始終返回true。

來看看Future接口的實現類java.util.concurrent.FutureTask<V>具體是如何操做的。

在FutureTask中使用了一個AQS數據結構來完成各類狀態以及加鎖、阻塞的實現。

在此AQS類java.util.concurrent.FutureTask.Sync中一個任務用4中狀態:

ThreadPoolExecutor-FutureTask-state

初始狀況下任務狀態state=0,任務執行(innerRun)後狀態變爲運行狀態RUNNING(state=1),執行完畢後變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消後就變爲狀態CANCELLED(state=4)。AQS最擅長無鎖狀況下處理幾種簡單的狀態變動的。

        void innerRun() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }

執行一個任務有四步:設置運行狀態、設置當前線程(AQS須要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這裏也能夠看到,一個任務只能執行一次,由於執行完畢後它的狀態不在爲初始值0,要麼爲CANCELLED,要麼爲RAN。

取消一個任務(cancel)又是怎樣進行的呢?對比下前面取消任務的描述是否是很簡單,這裏無非利用AQS的狀態來改變任務的執行狀態,最終達到放棄未啓動或者正在執行的任務的目的。

boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED))
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt();
    }
    releaseShared(0);
    done();
    return true;
}

到目前爲止咱們依然沒有說明究竟是如何阻塞獲取一個結果的。下面四段代碼描述了這個過程。

1     V innerGet() throws InterruptedException, ExecutionException {
2         acquireSharedInterruptibly(0);
3         if (getState() == CANCELLED)
4             throw new CancellationException();
5         if (exception != null)
6             throw new ExecutionException(exception);
7         return result;
8     }
9     //AQS#acquireSharedInterruptibly
10     public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
11         if (Thread.interrupted())
12             throw new InterruptedException();
13         if (tryAcquireShared(arg) < 0)
14             doAcquireSharedInterruptibly(arg); //park current Thread for result
15     }
16     protected int tryAcquireShared(int ignore) {
17         return innerIsDone()? 1 : -1;
18     }
19 
20     boolean innerIsDone() {
21         return ranOrCancelled(getState()) && runner == null;
22     }

當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這裏獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。若是不知足條件,那麼在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到知足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,所以性能消耗是很低的。

至於將Runnable接口轉換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現。

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

延遲、週期性任務調度的實現

java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、週期性任務調度的實現。

有了整個線程池的實現,再回頭來看延遲、週期性任務調度的實現應該就很簡單了,由於所謂的延遲、週期性任務調度,無非添加一系列有序的任務隊列,而後按照執行順序的前後來處理整個任務隊列。若是是週期性任務,那麼在執行完畢的時候加入下一個時間點的任務便可。

因而可知,ScheduledThreadPoolExecutor和ThreadPoolExecutor的惟一區別在於任務是有序(按照執行時間順序)的,而且須要到達時間點(臨界點)才能執行,並非任務隊列中有任務就須要執行的。也就是說惟一不一樣的就是任務隊列BlockingQueue<Runnable> workQueue不同。ScheduledThreadPoolExecutor的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基於java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現。

DelayQueue是基於有序隊列PriorityQueue實現的。PriorityQueue 也叫優先級隊列,按照天然順序對元素進行排序,相似於TreeMap/Collections.sort同樣。

一樣是有序隊列,DelayQueue和PriorityQueue區別在什麼地方?

因爲DelayQueue在獲取元素時須要檢測元素是否「可用」,也就是任務是否達到「臨界點」(指定時間點),所以加入元素和移除元素會有一些額外的操做。

典型的,移除元素須要檢測元素是否達到「臨界點」,增長元素的時候若是有一個元素比「頭元素」更早達到臨界點,那麼就須要通知任務隊列。所以這須要一個條件變量final Condition available 。

移除元素(出隊列)的過程是這樣的:

  • 老是檢測隊列的頭元素(順序最小元素,也是最早達到臨界點的元素)
  • 檢測頭元素與當前時間的差,若是大於0,表示還未到底臨界點,所以等待響應時間(使用條件變量available)
  • 若是小於或者等於0,說明已經到底臨界點或者已通過了臨界點,那麼就移除頭元素,而且喚醒其它等待任務隊列的線程。
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);
                    } else {
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // wake up other takers
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }

一樣加入元素也會有相應的條件變量操做。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才須要喚醒「等待線程」去檢測元素。由於頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            q.offer(e);
            if (first == null || e.compareTo(first) < 0)
                available.signalAll();
            return true;
        } finally {
            lock.unlock();
        }
    }

有了任務隊列後再來看Future在ScheduledThreadPoolExecutor中是如何操做的。

java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是繼承java.util.concurrent.FutureTask<V>的,區別在於執行任務是不是週期性的。

        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = now() - p;
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            if (isPeriodic())
                runPeriodic();
            else
                ScheduledFutureTask.super.run();
        }
    }

若是不是週期性任務調度,那麼就和java.util.concurrent.FutureTask.Sync的調度方式是同樣的。若是是週期性任務(isPeriodic())那麼就稍微有所不一樣的。

ScheduledThreadPoolExecutor-ScheduledFutureTask

先從功能/結構上分析下。第一種狀況假設提交的任務每次執行花費10s,間隔(delay/period)爲20s,對於scheduleAtFixedRate而言,每次執行開始時間20s,對於scheduleWithFixedDelay來講每次執行開始時間30s。第二種狀況假設提交的任務每次執行時間花費20s,間隔(delay/period)爲10s,對於scheduleAtFixedRate而言,每次執行開始時間10s,對於scheduleWithFixedDelay來講每次執行開始時間30s。(具體分析能夠參考這裏

也就是說scheduleWithFixedDelay的執行開始時間爲(delay+cost),而對於scheduleAtFixedRate來講執行開始時間爲max(period,cost)。

回頭再來看上面源碼runPeriodic()就很容易了。但特別要提醒的,若是任務的任何一個執行遇到異常,則後續執行都會被取消,這從runPeriodic()就能看出。要強調的第二點就是同一個週期性任務不會被同時執行。就好比說盡管上面第二種狀況的scheduleAtFixedRate任務每隔10s執行到達一個時間點,可是因爲每次執行時間花費爲20s,所以每次執行間隔爲20s,只不過執行的任務次數會多一點。但從本質上講就是每隔20s執行一次,若是任務隊列不取消的話。

爲何不會同時執行?

這是由於ScheduledFutureTask執行的時候會將任務從隊列中移除來,執行完畢之後纔會添加下一個同序列的任務,所以任務隊列中其實最多隻有同序列的任務的一份副本,因此永遠不會同時執行(儘管要執行的時間在過去)。

 

ScheduledThreadPoolExecutor使用一個無界(容量無限,整數的最大值)的容器(DelayedWorkQueue隊列),根據ThreadPoolExecutor的原理,只要當容器滿的時候纔會啓動一個大於corePoolSize的線程數。所以實際上ScheduledThreadPoolExecutor是一個固定線程大小的線程池,固定大小爲corePoolSize,構造函數裏面的Integer.MAX_VALUE實際上是不生效的(儘管PriorityQueue使用數組實現有PriorityQueue大小限制,若是你的任務數超過了2147483647就會致使OutOfMemoryError,這個參考PriorityQueue的grow方法)。

 

再回頭看scheduleAtFixedRate等方法就容易多了。無非就是往任務隊列中添加一個將來某一時刻的ScheduledFutureTask任務,若是是scheduleAtFixedRate那麼period/delay就是正數,若是是scheduleWithFixedDelay那麼period/delay就是一個負數,若是是0那麼就是一次性任務。直接調用父類ThreadPoolExecutor的execute/submit等方法就至關於period/delay是0,而且initialDelay也是0。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        if (initialDelay < 0) initialDelay = 0;
        long triggerTime = now() + unit.toNanos(initialDelay);
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime,
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
    }

另外須要補充說明的一點,前面說過java.util.concurrent.FutureTask.Sync任務只能執行一次,那麼在runPeriodic()裏面怎麼又將執行過的任務加入隊列中呢?這是由於java.util.concurrent.FutureTask.Sync提供了一個innerRunAndReset()方法,此方法不只執行任務還將任務的狀態還原成0(初始狀態)了,因此此任務就能夠重複執行。這就是爲何runPeriodic()裏面調用runAndRest()的緣故。

        boolean innerRunAndReset() {
            if (!compareAndSetState(0, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
                    callable.call(); // don't set result
                runner = null;
                return compareAndSetState(RUNNING, 0);
            } catch (Throwable ex) {
                innerSetException(ex);
                return false;
            }
        }

 

後話

整個併發實踐原理和實現(源碼)上的東西都講完了,後面幾個小節是一些總結和掃尾的工做,包括超時機制、異常處理等一些細節問題。也就是說大部分只須要搬出一些理論和最佳實踐知識出來就行了,不會有大量費腦筋的算法分析和原理、思想探討之類的。後面的章節也會加快一些進度。

老實說從剛開始的好奇到中間的興奮,再到如今的徹悟,收穫仍是不少,我的以爲這是最認真、最努力也是自我最滿意的一次技術研究和探討,同時在這個過程當中將不少技術細節都串聯起來了,慢慢就有了那種技術相通的感受。原來有了理論之後再去實踐、再去分析問題、解決問題和那種純解決問題獲得的經驗徹底不同。整個專輯下來不只僅是併發包這一點點知識,設計到硬件、軟件、操做系統、網絡、安全、性能、算法、理論等等,總的來講這也算是一次比較成功的研究切入點,這比Guice那次探討要深刻和持久的多。

--

相關文章
相關標籤/搜索