FutureTask詳解

CAS:http://huangyunbin.iteye.com/blog/1942369併發

Future:http://blog.csdn.net/liulipuo/article/details/39029643異步

知識儲備CAS

CAS基於衝突檢測的無鎖併發策略,性能也較高。CAS操做有3個操做數,內存值M,預期值E,新值U,若是M==E,則將內存值修改成B,不然啥都不作。性能

public class TestUnsafe {

  private static Unsafe unsafe;
  static{
    try {
      //經過反射獲取rt.jar下的Unsafe類
      Field field = Unsafe.class.getDeclaredField("theUnsafe");
      field.setAccessible(true);  
      unsafe = (Unsafe) field.get(null); 
    } catch (Exception e) {
      System.out.println("Get Unsafe instance occur error"+ e);  
    }
  } 
  public static void main(String[] args) throws Exception  
  {  
      Class clazz = Target.class;  
      Field[] fields = clazz.getDeclaredFields();  
      System.out.println("fieldName:fieldOffset");  
      for (Field f : fields) {  
          // 獲取屬性偏移量,能夠經過這個偏移量給屬性設置  
          System.out.println(f.getName() + ":" + unsafe.objectFieldOffset(f));  
      }  
      Target target = new Target();  
      Field intFiled  =  clazz.getDeclaredField("intParam")  ;  
      int a=(Integer)intFiled.get(target ) ;  
      System.out.println("原始值是:"+a);  
      //intParam的字段偏移是12 原始值是3 咱們要改成10  
      System.out.println(unsafe.compareAndSwapInt(target, 12, 3, 10));  
      int b=(Integer)intFiled.get(target) ;  
      System.out.println("改變以後的值是:"+b);  

      //這個時候已經改成10了,因此會返回false  
      System.out.println(unsafe.compareAndSwapInt(target, 12, 3, 10));  //false
      //判斷target對象的偏移量24位置,即strParam,若是該變量值=null,則給strParam賦值5
      System.out.println(unsafe.compareAndSwapObject(target, 24, null, "5"));     //true
  }  
}  
  

class Target{
  int intParam=3;  
  long longParam;  
  String strParam;  
  String strParam2;  
}

 測試結果:測試

FutureTask

futureTask的異步計算功能就不介紹了,主要分析源碼,下圖是futureTask的結構:this

 

NEW                            新建            0
COMPLETING             執行中        1
NORMAL                    正常            2
EXCEPTIONAL            異常            3
CANCELLED                取消            4
INTERRUPTING        中斷中        5
INTERRUNPED            被中斷        6
state的狀態變化能夠有四種方式
NEW->COMPLETING->NORMAL                            正常完成的流程
NEW->COMPLETING->EXCEPTIONAL                出現異常的流程
NEW->CANCELED                                                  被取消
NEW->INTERRUNPING->INTERRRUNPTED        被中斷spa

建立FutureTask.net

 public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        //state初始化NEW狀態
        this.state = NEW;       // ensure visibility of callable
    }

run方法:線程

 1  public void run() {
 2 //若是當前state不是建立狀態,且已經在運行。則直接return。
 3         if (state != NEW ||
 4             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 5                                          null, Thread.currentThread()))
 6             return;
 7 //開始執行任務
 8         try {
 9             Callable<V> c = callable;
10   //建立狀態,且callable不爲空,開始執行調用call()
11             if (c != null && state == NEW) {
12                 V result;
13                 boolean ran;
14                 try {
15                     result = c.call();
16                     ran = true;
17                 } catch (Throwable ex) {
18                     result = null;
19                     ran = false;
20                     setException(ex);
21                 }
22 //成功調用任務,執行set()方法
23                 if (ran)
24                     set(result);
25             }
26         } finally {
27             // runner must be non-null until state is settled to
28             // prevent concurrent calls to run()
29 //不管是否成功,都要把runner設置爲null
30             runner = null;
31             // state must be re-read after nulling runner to prevent
32             // leaked interrupts
33             int s = state;
34             if (s >= INTERRUPTING)
35                 handlePossibleCancellationInterrupt(s);
36         }
37     }

接下來看一下24行的set方法:3d

 1 protected void set(V v) {
 2 //CAS設置state  NEW  ->  COMPLETING
 3         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 4 //結果賦值outcome
 5             outcome = v;
 6 //CAS將state設置爲NORMAL
 7             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
 8 //喚醒全部的等待線程,同時設置callable=null,調用done(),具體實現後面詳細介紹
 9             finishCompletion();
10         }
11     }

cancel()取消code

 1     public boolean cancel(boolean mayInterruptIfRunning) {
 2 //若是state已經不是NEW 或者NEW不能設置成INTERRUPTING , 則直接返回false 。
 3         if (!(state == NEW &&
 4               UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
 5                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 6             return false;
 7         try {    // in case call to interrupt throws exception
 8 //若是是可中斷 那麼就 調用系統中斷方法 而後把狀態設置成INTERRUPTED
 9             if (mayInterruptIfRunning) {
10                 try {
11                     Thread t = runner;
12                     if (t != null)
13 //中斷執行線程
14                         t.interrupt();
15                 } finally { // final state
16 //最後設置state爲INTERRUPTED
17                     UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
18                 }
19             }
20         } finally {
21             finishCompletion();
22         }
23         return true;
24     }

再來看一下get操做:

 public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
//若是還沒完成,而且等待timeout也沒有完成,則拋出超時異常
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

awaitDone()操做來如何實現超時,已經完成但未超時,則結束等待返回結果,看具體代碼:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
//超時時間點
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
//循環執行
        for (;;) {
//中斷處理
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
//首先判斷是否完成,若是完成,直接返回state結果
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
//未完成執行,則將當前線程(主線程)設置成等待節點並放入到等待隊列中
            else if (q == null)
                q = new WaitNode();
//將waiters設置成q的next節點,並替換,改操做只作一次,將main線程加入到隊列中,只加一次
else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //若是啓動了超時等待 else if (timed) { //計算剩餘超時時間 nanos = deadline - System.nanoTime(); //超時,移除隊列中的等待線程,返回state,上層判斷state,並報出超時異常 if (nanos <= 0L) { removeWaiter(q); return state; } //還未超時,則等待nanos時間,然而若是task執行接受,並不會一直等待,在run()中的set()中,會執行finishCompleted(),將對待隊列線性喚醒,調用LockSupport.unpark()方法來喚醒等待線程 LockSupport.parkNanos(this, nanos); } else //未啓用超時功能,在未完成callable,則無限期等待 LockSupport.park(this); } }

 最後看一下finishCompletion()方法,調用callable,是如何喚醒main線程

private void finishCompletion() {
        // assert state > COMPLETING;
//遍歷等待隊列
        for (WaitNode q; (q = waiters) != null;) {
//將waiters的頭結點中線程設置null,即移除某個等待線程
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
//若是等待線程不爲null  調用LockSupport.unpark(t)喚醒,而後操做下一個等待線程
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

至此,FutureTask的源碼解析已經完成,接下來看一下開源對於異步處理的解決方案。

 

觸類旁通:

ChannelFuture

ListenableFuture

相關文章
相關標籤/搜索