Java併發編程筆記之FutureTask源碼分析

FutureTask可用於異步獲取執行結果或取消執行任務的場景。經過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執行,以後能夠在外部經過FutureTask的get方法異步獲取執行結果,所以,FutureTask很是適合用於耗時的計算,主線程能夠在完成本身的任務後,再去獲取結果。另外,FutureTask還能夠確保即便調用了屢次run方法,它都只會執行一次Runnable或者Callable任務,或者經過cancel取消FutureTask的執行等。java

類圖結構以下所示:異步

線程池使用 FutureTask 時候須要注意的一點事,FutureTask 使用不當可能會形成調用線程一直阻塞,如何避免?ide

線程池使用 FutureTask 的時候若是拒絕策略設置爲了 DiscardPolicyDiscardOldestPolicy而且在被拒絕的任務的 Future 對象上調用無參 get 方法那麼調用線程會一直被阻塞。函數

下面先經過一個簡單的例子來複現問題,代碼以下:this

public class FutureTest {

    //(1)線程池單個線程,線程池隊列元素個數爲1
        private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws Exception {

        //(2)添加任務one
        Future futureOne = executorService.submit(new Runnable() {
            @Override
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任務two
        Future futureTwo = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任務three
        Future futureThree=null;
        try {
            futureThree = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one " + futureOne.get());//(5)等待任務one執行完畢
        System.out.println("task two " + futureTwo.get());//(6)等待任務two執行完畢
        System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務three執行完畢

        executorService.shutdown();//(8)關閉線程池,阻塞直到全部任務執行完畢
 }

運行結果以下:spa

代碼 (1) 建立了一個單線程而且隊列元素個數爲 1 的線程池,而且拒絕策略設置爲了DiscardPolicy線程

代碼(2)向線程池提交了一個任務 one,那麼這個任務會使用惟一的一個線程進行執行,任務在打印 start runable one後會阻塞該線程 5s.3d

代碼(3)向線程池提交了一個任務 two,這時候會把任務 two 放入到阻塞隊列code

代碼(4)向線程池提交任務 three,因爲隊列已經滿了則會觸發拒絕策略丟棄任務 three, 從執行結果看在任務 one 阻塞的 5s 內,主線程執行到了代碼 (5) 等待任務 one 執行完畢,當任務 one 執行完畢後代碼(5)返回,主線程打印出 task one null。任務 one 執行完成後線程池的惟一線程會去隊列裏面取出任務 two 並執行因此輸出 start runable two 而後代碼(6)會返回,這時候主線程輸出 task two null,而後執行代碼(7)等待任務 three 執行完畢,從執行結果看代碼(7)會一直阻塞不會返回,至此問題產生,若是把拒絕策略修改成 DiscardOldestPolicy 也會存在有一個任務的 get 方法一直阻塞只是如今是任務 two 被阻塞。可是若是拒絕策略設置爲默認的 AbortPolicy 則會正常返回,而且會輸出以下結果:對象

 

要分析這個問題須要看下線程池的 submit 方法裏面作了什麼,submit 方法源碼以下:

public Future<?> submit(Runnable task) {
        ...
        //(1)裝飾Runnable爲Future對象
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        //(6)返回future對象
        return ftask;
}

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
 }
public void execute(Runnable command) { ... //(2) 若是線程個數消息核心線程數則新增處理線程處理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(3)若是當前線程個數已經達到核心線程數則任務放入隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(4)嘗試新增處理線程進行處理 else if (!addWorker(command, false)) reject(command);//(5)新增失敗則調用拒絕策略 }

代碼(1)裝飾 Runnable 爲 FutureTask 對象,而後調用線程池的 execute 方法。

代碼 (2) 若是線程個數消息核心線程數則新增處理線程處理

代碼(3)若是當前線程個數已經達到核心線程數則任務放入隊列

代碼(4)嘗試新增處理線程進行處理,失敗則進行代碼(5),否者直接使用新線程處理

代碼(5)執行具體拒絕策略,從這裏也能夠看出拒絕策略執行是使用的業務線程。

因此要分析上面例子中問題所在只須要看步驟(5)對被拒絕任務的影響,這裏先看下拒絕策略 DiscardPolicy 的源碼,以下:

public static class DiscardPolicy implements RejectedExecutionHandler {
  
public DiscardPolicy() { }
  
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}

可知拒絕策略 rejectedExecution 方法裏面什麼都沒作,因此代碼(4)調用 submit 後會返回一個 future 對象,這裏有必要在從新說 future 是有狀態的,FutureTask 內部有一個state用來展現任務的狀態,而且是volatile修飾的,future 的狀態枚舉值以下:

/** Possible state transitions:
 * NEW -> COMPLETING -> NORMAL 正常的狀態轉移
 * NEW -> COMPLETING -> EXCEPTIONAL 異常
 * NEW -> CANCELLED 取消
 * NEW -> INTERRUPTING -> INTERRUPTED 中斷
 */
 
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

在代碼(1)的時候使用 newTaskFor 方法轉換 Runnable 任務爲 FutureTask,而 FutureTask 的構造函數裏面設置的狀態就是 New。FutureTask的構造函數源碼以下:

public FutureTask(Runnable runnable, V result) {
     this.callable = Executors.callable(runnable, result);
     this.state = NEW;       // ensure visibility of callable
}

 

把FutureTask提交到線程池或者線程執行start時候會調用run方法,源碼以下:

public void run() {

    //若是當前不是new狀態,或者當前cas設置當前線程失敗則返回,只有一個線程能夠成功。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //當前狀態爲new 則調用任務的call方法執行任務
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 狀態轉移
            }

            //執行任務成功則保存結果更新狀態,unpark全部等待線程。
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


protected void set(V v) {
    //狀態從new->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        //狀態從COMPLETING-》NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //unpark全部等待線程。
        finishCompletion();
    }
}

 

因此使用 DiscardPolicy 策略提交任務後返回了一個狀態值爲NEW的future對象。那麼咱們下面就要看下當future的無參get()方法的時候,future變爲何狀態纔會返回,這時候就要看一下FutureTask的get方法的源碼,源碼以下:

  public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //當狀態值<=COMPLETING時候須要等待,否者調用report返回
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {

            //若是被中斷,則拋異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            //組建單列表
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) {


                nanos = deadline - System.nanoTime();
                //超時則返回
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //否者設置park超時時間
                LockSupport.parkNanos(this, nanos);
            }
            else
                //直接掛起當前線程
                LockSupport.park(this);
        }
    }
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        //狀態值爲NORMAL正常返回
        if (s == NORMAL)
            return (V)x;
        //狀態值大於等於CANCELLED則拋異常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

也就是說當 future 的狀態 > COMPLETING 時候調用 get 方法纔會返回,而明顯 DiscardPolicy 策略在拒絕元素的時候並無設置該 future 的狀態,後面也沒有其餘機會能夠設置該 future 的狀態,因此 future 的狀態一直是 NEW,因此一直不會返回,同理 DiscardOldestPolicy 策略也是這樣的問題,最老的任務被淘汰時候沒有設置被淘汰任務對於 future 的狀態。、

在submit任務後還能夠調用futuretask的cancel來取消任務:

 

public boolean cancel(boolean mayInterruptIfRunning) {
        //只有任務是new的才能取消
        if (state != NEW)
            return false;
       //運行時容許中斷
        if (mayInterruptIfRunning) {
           //完成new->INTERRUPTING
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            //完成INTERRUPTING->INTERRUPTED
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
       //不容許中斷則直接new->CANCELLED
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
}

 

那麼默認的 AbortPolicy 策略爲啥沒問題呢?

也就是說當 future 的狀態 > COMPLETING 時候調用 get 方法纔會返回,而明顯 DiscardPolicy 策略在拒絕元素的時候並無設置該 future 的狀態,後面也沒有其餘機會能夠設置該 future 的狀態,因此 future 的狀態一直是 NEW,因此一直不會返回,同理 DiscardOldestPolicy 策略也是這樣的問題,最老的任務被淘汰時候沒有設置被淘汰任務對於 future 的狀態。

因此當使用 Future 的時候,儘可能使用帶超時時間的 get 方法,這樣即便使用了 DiscardPolicy 拒絕策略也不至於一直等待,等待超時時間到了會自動返回的,若是非要使用不帶參數的 get 方法則能夠重寫 DiscardPolicy 的拒絕策略在執行策略時候設置該 Future 的狀態大於 COMPLETING 便可,可是查看 FutureTask 提供的方法發現只有 cancel 方法是 public 的而且能夠設置 FutureTask 的狀態大於 COMPLETING,重寫拒絕策略具體代碼能夠以下:

/**
 * Created by cong on 2018/7/13.
 */
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        if (!threadPoolExecutor.isShutdown()) {
            if(null != runnable && runnable instanceof FutureTask){
                ((FutureTask) runnable).cancel(true);
            }
        }
    }
}

使用這個策略時候因爲從 report 方法知道在 cancel 的任務上調用 get() 方法會拋出異常因此代碼(7)須要使用 try-catch 捕獲異常代碼(7)修改成以下:

package com.hjc;

import java.util.concurrent.*;

/**
 * Created by cong on 2018/7/13.
 */
public class FutureTest {

    //(1)線程池單個線程,線程池隊列元素個數爲1
    private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler());

    public static void main(String[] args) throws Exception {

        //(2)添加任務one
        Future futureOne = executorService.submit(new Runnable() {
           
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任務two
        Future futureTwo = executorService.submit(new Runnable() {
            
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任務three
        Future futureThree = null;
        try {
            futureThree = executorService.submit(new Runnable() {
                
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one " + futureOne.get());//(5)等待任務one執行完畢
        System.out.println("task two " + futureTwo.get());//(6)等待任務two執行完畢
        try{
            System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務three
        }catch(Exception e){
            System.out.println(e.getLocalizedMessage());
        }

        executorService.shutdown();//(8)關閉線程池,阻塞直到全部任務執行完畢
    }
}

 

運行結果以下:

固然這相比正常狀況下多了一個異常捕獲,其實最好的狀況是重寫拒絕策略時候設置 FutureTask 的狀態爲 NORMAL,可是這須要重寫 FutureTask 方法了,由於 FutureTask 並無提供接口進行設置。

相關文章
相關標籤/搜索