J.U.C FutureTask之源碼解析

      經過直接繼承Thread, 實現Runnable接口來建立線程。但這兩種方式都有一種缺陷:在執行完任務以後沒法得到執行結果。html

      若是須要得到執行結果,就必須經過共享變量或者使用線程通訊的方式來達到效果,這樣使用起來比較麻煩,而jdk中Callable和Future,經過他們能夠在任務執行完畢以後獲得任務執行結果。先看看他們之間的組織關係:java

     Callable:node

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

       源碼可知,它也是個一個接口,在他裏面也只是申明一個方法,只不過這個方法爲call(),call方法返回的就是該泛型傳遞進來的V類型,他怎麼使用呢?就是結合以前的ExecuteService:數據結構

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

      第一個submit方法裏面的參數類型就是Callable。多線程

      

     Future:併發

      Future就是對於具體的Runnable或者Callable任務的執行進度的查看,取消,查詢是否完成,獲取結果(正確完成時的結果,或異常)。必要時能夠經過get方法獲取執行的結果,該方法會阻塞直到任務返回結果,或經過指定阻塞時間的版本。dom

public interface Future<V> {

 
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

    其中cancel()方法用來取消任務,若是取消任務成功則返回true, 若是取消任務失敗則返回false。 參數mayInterruptIfRunning表示是否容許取消真在執行去沒有執行完畢的任務,若是設置true, 則表示能夠取消正在執行過程的任務。 當任務已經完成,或者已經被取消過了,或者由於別的緣由不能取消, 則返回false。 當取消時,該任務尚未開始執行,則該任務不會執行,而且老是返回true。ide

    

    FutureTask:this

 

public class FutureTask<V> implements RunnableFuture<V>

     FutureTask類實現了RunnableFuture接口,看一下RunnableFuture接口的定義:spa

public interface RunnableFuture<V> extends Runnable, Future<V>

     RunnableFuture接口接觸了Runnable接口和Future接口, 而FutureTask實現了RunnableFuture接口,因此它既可做爲Runnable被線程執行,也能夠做爲Future獲得Callable的返回值。

     構造器定義:

 public FutureTask(Callable<V> callable) 
 public FutureTask(Runnable runnable, V result) {

     再來看看第二個構造器中的參數怎麼變身Callable的:

this.callable = Executors.callable(runnable, result);

     調用Executors.callable方法:

 public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    簡單實現Callable:

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

 

流程:

     下面結合完整具體流程走一下FutureTask過程,並解析源碼,草圖以下:

        實例代碼以下:

 1 public class Test {
 2     public static void main(String[] args) {
 3         //第一種方式
 4         ExecutorService executor = Executors.newCachedThreadPool();
 5         Task task = new Task();
 6         FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
 7         executor.submit(futureTask);
 8         executor.shutdown();
15          
16         try {
17             Thread.sleep(1000);
18         } catch (InterruptedException e1) {
19             e1.printStackTrace();
20         }
21          
22         System.out.println("主線程在執行任務");
23          
24         try {
25             System.out.println("task運行結果"+futureTask.get());
26         } catch (InterruptedException e) {
27             e.printStackTrace();
28         } catch (ExecutionException e) {
29             e.printStackTrace();
30         }
31          
32         System.out.println("全部任務執行完畢");
33     }
34 }
35 class Task implements Callable<Integer>{
36     @Override
37     public Integer call() throws Exception {
38         System.out.println("子線程在進行計算");
39         Thread.sleep(3000);
40         int sum = 0;
41         for(int i=0;i<100;i++)
42             sum += i;
43         return sum;
44     }
45 }

     分析過程以前,先準備前準備知識,首先看一下FutureTask內部狀態,以及之間的轉變:

    private volatile int state; // volatile 內存可見性
    private static final int NEW          = 0; //該狀態爲new FutureTask()時設定,同時也表示內部成員callable已經成功賦值,一直到worker thread完成FutureTask中run().
    private static final int COMPLETING   = 1; //該狀態位worker thread完成task時設定的中間狀態,處於該狀態下,說明worker thread 真正準備設置result.
    private static final int NORMAL       = 2;  //當設置result結果完成後,FutureTask處於該狀態,表明過程結果,該狀態爲最終狀態final state,(正確完成的最終狀態)
    private static final int EXCEPTIONAL  = 3;  // 同上,只不過task執行過程出現異常,此時結果設值爲exception,也是final state
    private static final int CANCELLED    = 4;  //final state, 代表task被cancel(task尚未執行就被cancel的狀態).
    private static final int INTERRUPTING = 5;  // 中間狀態,task運行過程當中被interrupt時,設置的中間狀態;
    private static final int INTERRUPTED  = 6;   // final state, 中斷完畢的最終狀態,幾種狀況,下面具體分析。

     下面是狀態之間的轉變,貫穿主線:

   * Possible state transitions:
     1* NEW -> COMPLETING -> NORMAL   
     2* NEW -> COMPLETING -> EXCEPTIONAL
     3* NEW -> CANCELLED
     4* NEW -> INTERRUPTING -> INTERRUPTED
     */

      其餘重要的變量:

 /** The underlying callable; nulled out after running */
    private Callable<V> callable;   // 具體run運行時會調用其方法call(),並得到結果,結果時置爲null.
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes   不必爲votaile,由於其是伴隨state 進行讀寫,而state是FutureTask的主導因素。
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;   //具體的worker thread.
    /** Treiber stack of waiting threads */  
    private volatile WaitNode waiters;     //Treiber stack 併發stack數據結構,用於存放阻塞在該futuretask#get方法的線程。

     OK,構造new FutureTask開始:

  public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable; //底層callable賦值
        this.state = NEW;       // 初始狀態NEW,同時也標誌了callable的賦值,可見性
    }

       ThreadPoolExecutor.submit(Runnable),ThreadPoolExecutor裏面具體細節請見這裏,這裏就假設它直接new a thread來處理該任務了,由於FutureTask爲Runnable的子類,因此worker thread調用該類的run()方法:

        public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //狀態檢測,和當前worker Thread的cas原子賦值,有一個不成立,就直接返回。什麼狀況下還沒run()呢?就不是NEW狀態了呢? return; //caller調用cancel了,此時狀態爲Interrupting,也說明了上面的cancel方法說明,task沒運行時,就interrupt,task得不到運行。老是返回 try { //true。
//再來看看這裏worker thread賦值爲何要用cas操做,有競爭racing? 競爭哪裏來?難道threadPoolExecutor線程池多個線程可能搶同一個 Callable
<V> c = callable; //任務?不可能 1:線程數 < coreThreadPool 時, 直接new thread, 2 : 大於 coreThreadpool時,放在blockingqueue裏,取的話只能一 if (c != null && state == NEW) { //線程。能想到就是caller那邊了,即多callers(多線程)提交同一FutureTask. V result; //多線程同時提交同一FutureTask,確保該FutureTask的run()只被調用一次, boolean ran; try { result = c.call(); //此處的if,1:當state == NEW(task沒完成,中斷) 而且 worker Thread爲null時,纔會獲得運行 ran = true; // 2: task已經完成了 或者 該任務已經有worker thread來執行時,直接返回不會運行。 } catch (Throwable ex) { //調用callable的call方法 result = null; //執行task時有異常 ran = false; //附異常 setException(ex); } if (ran) //正常完成,則賦值 set(result); } } finally { //注意!!什麼這裏吧runner置爲null,此時run()方法還沒運行完呢啊!如今置爲null,不怕併發調用run()嗎?注意此時state已經變化了(Comple runner = null; //teing或者interrupting了,run()一開始state != NEW 直接return,不會運行。能夠說經過state和 worker thread來一塊兒控制併發調用run int s = state; //必須再讀一次,防止worker thread == null後,遺漏的interrup信號,底下具體分析中斷的狀況。 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); //若是caller中斷信號有的話,則處理該interrupt. } //另外該任務是一致性任務,即state只要不爲NEW,該任務就不會在運行,運行結束或cancel後,就不能在運行了,由於state狀態在那不變哦! }

      請看下例子,三個提交線程(提交同一個FutureTask):

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        CountDownLatch latch = new CountDownLatch(1);
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        for (int i = 0 ; i < 3; i++) {
            new Thread(new Submit(executor,  futureTask, latch)).start();
        }
        try {
            Thread.sleep(3000);
            latch.countDown();
            Thread.sleep(20000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        } 
        System.out.println("全部任務執行完畢");
        executor.shutdown();
    }
    
}

class Submit implements Runnable {
    private CountDownLatch latch ;
    private ExecutorService es ;
    private FutureTask<Integer> task; 
    public Submit(ExecutorService es, FutureTask<Integer> task, CountDownLatch latch) {
         this.latch = latch;
         this.es = es;    
         this.task = task;
    }
    public void run() {
        
        
        try {
            latch.await();
            Future<?> future = (Future<?>) es.submit(task);
            System.out.println("Thread name : " + Thread.currentThread().getName() + "go!");
            future.get(3000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
        } catch (TimeoutException e2) {
            System.err.println("Thread name : " + Thread.currentThread().getName()  + " " + e2);
        }
    }    
}

class Task implements Callable<Integer>{
   
    public Integer call() throws Exception {
        System.out.println("thread name : " + Thread.currentThread().getName() + "do the work!");
        Thread.sleep(6000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

    顯示以下:

Thread name : Thread-1go!
Thread name : Thread-0go!
Thread name : Thread-2go!
thread name : pool-1-thread-1do the work!
Thread name : Thread-1 java.util.concurrent.TimeoutException
全部任務執行完畢

      結果很顯然,同一個任務屢次提交(併發提交),FutureTask保證只是啓一個線程來運行。

 

      想運行屢次(只要不cancel,和throw exception,由於他set(result),正常運行結束,state仍是new),用這個:

protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            
            runner = null;
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;   
    }

  再來看看setException()和set(result):

 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // cas原子操做,失敗直接返回,成功的前提以前的狀態必須爲NEW.
            outcome = v;                                                    //可能和什麼衝突呢? caller已經cancel該task,狀態位Interrupting或者Interrpted(此次Interrupted表明interrupt完成,這set()
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  // 不是在worker thread中調用的嘛,怎麼intterupt都完成了,怎麼worker thread還在運行呢?worker thread運行的代碼中沒有響
            finishCompletion();                                              //應interrupt的代碼。因此客戶端cancel操做,對運行中的worker thread,並不必定讓它停下來,不過此時即便運行完畢,也不能賦值。
        }
    }                                                                        //new -> Completing-> NORMAL 或者NEW ->Interrupting->Intterrpted.
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;    //同上,不過附異常。
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }                                                                   //new ->completing ->exception 或者 同上
    }

     finishCompletion()等會細聊,主要是沒說到get()阻塞呢!看看caller端線程調用cancel()和workerThread的handlePossibleCancellationInterrupt(int s)協調:

   public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW) 
            return false; //1:已經cancel(cancelled,Interruping, Interrupted)過了 2:正常完成 Completing(Completed) 3:異常完成completing(exception) 直接返回false; 
        if (mayInterruptIfRunning) { // flag : worker thread 已經啓動運行了,是否能夠中斷
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) //再次檢查state狀態,完成的話(上面的三種),直接返回false;
                return false;
            Thread t = runner; 
            if (t != null)       // t == null對應Future task還沒啓動, 跳過thread.interrupt(),直接由interrpting -> interrupted,成功的話
                t.interrupt();   //調用worker thread的 interrupt() //mayInterrptIfRunning 爲true ,interrupt 狀態轉變 new -> interrupting -> interrupted.
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) //mayInterruptIfRunning 爲false,interrupt成功的 狀態轉變 new -> Cancelled
            return false;
        finishCompletion();
        return true;
    }

       由上面可知,客戶端cancel()中很多cas操做,主要來自兩方面的racing, 1:線程池worker Thread的完成(異常,正常)狀態設置; 2:同一futuretask,不一樣客戶端線程callers的cancel操做。

 private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

      當state處於Interrupting, 即caller即將調用worker thread.interrupt(), 因此worker thread自旋會,等會interrupt方法的調用,保留interrupt標誌。

      再來看看get()和帶參數的get(timeout):

 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)  //結果未設定的狀況下
            s = awaitDone(false, 0L); //無條件等待
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等到timeout時間內,沒完成,throws TimeoutException
            throw new TimeoutException();
        return report(s);
    }

        awaitDone():

   private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L; //計算出等待時間。
        WaitNode q = null;
        boolean queued = false;  //是否加入阻塞隊列的標誌
        for (;;) {
            if (Thread.interrupted()) { //阻塞該caller線程以前,caller線程被中斷,直接throw 異常
                removeWaiter(q);    //在阻塞隊列中移除該線程的封裝node.此處無心義
                throw new InterruptedException();
            }

            int s = state; //讀取state,阻塞前 recheck一下 是否完成?
            if (s > COMPLETING) { //完成了,直接返回,不要阻塞了
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield(); //等會,快完成了。
            else if (q == null)
                q = new WaitNode(); //當前阻塞線程鏈表的簡單封裝
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); //設爲當前FutureTask阻塞鏈表(stack結構)的棧頂。
            else if (timed) {
                nanos = deadline - System.nanoTime(); //計算當前要阻塞的等待時間
                if (nanos <= 0L) {
                    removeWaiter(q); //小於0 直接返回,當前REMOVEWaiter無心義,並無加入stack中。
                    return state;
                }
                LockSupport.parkNanos(this, nanos);本地native方法,阻塞當前線程。
            }
            else
                LockSupport.park(this); //無時間條件阻塞
        }
    }

      無時間限制阻塞,有時間阻塞(阻塞時間大於task完成時間)會等到任務完成而給通知,喚醒該線程,即finishCompletion();而有時間阻塞(阻塞時間在task完成之間就已經結束的)會經過for()退出(退出前,刪除等待隊列中的節點)。 

      WaiterNode定義:

  static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); } //當前阻塞線程的引用
    }

       結合awaitDone()中的新阻塞節點加入順序,其定位stack結構(Treiber stack);

       removeWaiter():

 private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // 檢測競爭
                            continue retry; //發生重試
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

 

      finishCompletion():

 private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) { 獲得棧頂阻塞節點的引用
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread; //取得阻塞的線程引用,
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//解阻塞
                    }
                    WaitNode next = q.next; //遍歷解阻塞線程
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

      其實,前面的分析可知,多個caller線程併發提交同一個FutureTask, 而且所謂調用get()阻塞的話(阻塞在該FutureTask上),實際上也就一個caller線程阻塞,其餘線程在調用該FutureTask的run()開始條件檢查時,就直接return了,實際狀況:三個併發線程提交同一個future task,對應生成三份FutureTask(不一樣於以前),三份FutureTask中對應三分Callable,而這三份Callable含有相同的FutureTask(所謂的相同任務) ,向ThreadPoolExecutor.submit(Runnable)實際上提交了三份Runnable(即生成的三分FutureTask), FutureTask實現了Runnable接口, 而後ThreadPoolExecutor生成三個線程來執行這所謂的三個任務,這三個任務run()中都是調用對應內部的callable的call(), 而callable的call方法調用的是他們共同引用的FutureTask(同一個對像)的run()方法,而run方法, 咱們上面解析過了,經過cas和狀態檢測,只運行一個worker thread 調用run()(見上),另外兩個線程直接從共同底層FutureTask的run方法開始直接返回。

      暈了?從頭再來看看提交的過程:

     1:submit(FutureTask(Runnable)):AbstractExecutorService

  public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

      2:生成三個FutureTask(其中runnable就是同一個底層FutureTask任務):

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

      3:調用Executors.callable():

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
 static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() { //直接調用底層同一個FutureTask的run();
            task.run();
            return result;
        }
    }

 

      即三次提交,生成三份FutureTask,每份FutureTask調用Executors.callable()爲本身底層的callable賦值,而Executors.callable方法生成簡單的Callable實現,其中call(),調用底層共同FutureTask的run(), 也就受共同futureTask內部狀態(state, runThread)限制。因此,阻塞在底層共同FutureTask阻塞隊列中的只有一個線程,看下例:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        int waitTime = 4000;
        CountDownLatch latch = new CountDownLatch(1);
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        for (int i = 0 ; i < 3; i++) {
            new Thread(new Submit(executor,  futureTask, latch, waitTime)).start();
        }
        try {
            Thread.sleep(3000);
            latch.countDown();
            Thread.sleep(8000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        } 
        System.out.println("全部任務執行完畢");
        executor.shutdown();
    }
    
}

class Submit implements Runnable {
    private CountDownLatch latch ;
    private ExecutorService es ;
    private FutureTask<Integer> task; 
    private int waitTime ;
    public Submit(ExecutorService es, FutureTask<Integer> task, CountDownLatch latch, int waitTime) {
         this.latch = latch;
         this.es = es;    
         this.task = task;
         this.waitTime = waitTime;
    }
    public void run() {
        try {
            latch.await();
            Future<?> future =  es.submit(task);
            System.out.println("Thread name : " + Thread.currentThread().getName() + " go!");
            waitTime = new Random().nextInt(waitTime);
            System.out.println("Thread name : " + Thread.currentThread().getName() + " , The wait time : =  " + waitTime );
            future.get(waitTime, TimeUnit.MILLISECONDS);
            System.out.println("Thread name : " + Thread.currentThread().getName() + " run over!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
        } catch (TimeoutException e2) {
            System.err.println("Thread name : " + Thread.currentThread().getName()  + " " + e2);
        }
    }    
}

class Task implements Callable<Integer>{
   
    public Integer call() throws Exception {
        System.out.println("thread name : " + Thread.currentThread().getName() + " do the work!");
        Thread.sleep(4000);
        int sum = 0;
        for(int i=0;i<20;i++)
            sum += i;
        return sum;
    }
}

class Task1 implements Runnable{
    int sum = 0;
    @Override
    public void run() {
        System.out.println("Thread Name : " + Thread.currentThread().getName() + "do the work!");
        try {
            Thread.sleep(6000);
            
            for(int i=0;i<100;i++)
                sum += i;
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

   顯示結果:

Thread name : Thread-2 go!
Thread name : Thread-0 go!
Thread name : Thread-0 , The wait time : =  2738
thread name : pool-1-thread-1 do the work!
Thread name : Thread-1 go!
Thread name : Thread-2 , The wait time : =  284
Thread name : Thread-1 , The wait time : =  678
Thread name : Thread-2 run over!
Thread name : Thread-0 run over!
Thread name : Thread-1 java.util.concurrent.TimeoutException
全部任務執行完畢

       三個線程都是阻塞一段時間,可是隻有一個超時,另外兩個運行完畢,(他兩實際工做那部分沒運行,處理各自FutureTask那部分代碼,因此只能看到線程池只有一個線程處理底層FutureTask);

但,若是直接併發提交Callable,或者Runnable,線程池會啓動三個線程來分別處理這三個不一樣任務,朋友能夠自行試驗demo下。而FutureTask是本身的自身的限制。

      後話,通常調用ThreadPoolExecutor.submit()提交的是Callable<T>和Runnable, 返回的Future<T>, Future<?>(返回Null,或者不要求返回值),提交FutureTask用不着,因此實際中不會碰見這種狀況。

      另外,本文源碼基於jdk1.7,與網上1.7以前源碼不一樣(1.6經過AQS實現)。

相關文章
相關標籤/搜索