FutureTask解析源碼

1:使用java

package com.huiguan.facade.mobile;

import java.util.concurrent.*;

/**
 * @author chengwei
 * @since $$Revision:1.0.0, $$Date: 2018/4/26 11:39 $$
 */
public class FutureDemo {
    public static void main(String[] args) {
        try {
            //建立線程池,此線程池建立爲非推薦方式
            ExecutorService threadPool = Executors.newCachedThreadPool();
            //task是線程
            Task task = new Task();
            FutureTask<String> futureTask = new FutureTask<String>(task);
            //submit運行有返回值的線程,execute運行無返回值的線程
            threadPool.submit(futureTask);
            //get方法會阻塞,直到task線程運行結束,纔會獲取返回值
            String s = futureTask.get();
            System.out.println(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}
package com.huiguan.facade.mobile;

import java.util.concurrent.Callable;

/**
 * @author chengwei
 * @since $$Revision:1.0.0, $$Date: 2018/4/26 11:39 $$
 */
public class Task implements Callable {
    @Override
    public String call() throws Exception {
        //線程休眠1s,方便觀察FutureTask.get的阻塞現象
        Thread.sleep(1000);
        return "qqqqqqq";
    }
}

 

 

2. 解析FutureTask.get()源碼算法

2.1 原理安全

   當futuretask任務線程未運行結束時,調用get方法時會把futuretask放入鏈表中,使用LockSupport.park將當前主線程置爲休眠狀態;當futuretask任務線程運行結束,會遍歷鏈表,使用LockSupport.unpark喚醒主線程,主線程獲取futuretask線程返回的值。ide

2.2 源碼ui

Futureta各類狀態this

 * Possible state transitions:

//Futuretask運行時各類狀態轉化
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;

//建立狀態
private static final int NEW          = 0;

//運行中
private static final int COMPLETING   = 1;

//正常結束
private static final int NORMAL       = 2;

//異常結束
private static final int EXCEPTIONAL  = 3;

//取消
private static final int CANCELLED    = 4;

//中斷
private static final int INTERRUPTING = 5;

//中斷結束
private static final int INTERRUPTED  = 6;

主線程運行邏輯

java.util.concurrent.FutureTask#get()spa

public V get() throws InterruptedException, ExecutionException {
    int s = state;

    //當是建立或者運行中狀態時,休眠,等待task運行結束
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);

    //運行結束,獲取返回值
    return report(s);
}

java.util.concurrent.FutureTask#awaitDone線程

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {

       //線程中斷,拋出異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;

        //非建立和運行中狀態,直接返回狀態值
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }

       //當是運行中狀態時,讓出cpu,由本身和其餘線程從新競爭
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();

        //新建一個等待鏈表,等待鏈表是任務線程處於運行中,把主線程放入等待鏈表,等待任務線程運行結束
        else if (q == null)
            q = new WaitNode();

//用CAS算法,將當前主線程放入等待鏈表waiters中,CAS算法:compareAndSwapObject(this,vOffset,A,B)內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。能夠保證線程安全。
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }

       //LockSupport.park相似於wait,使當前線程等待
        else
            LockSupport.park(this);
    }
}

java.util.concurrent.FutureTask#report 獲取返回值code

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes



private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)

        //正常結束,獲取返回值
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

 

任務線程運行邏輯

java.util.concurrent.FutureTask#run內存

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)

                //運行結束,返回返回值,喚醒等待的線程
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

java.util.concurrent.FutureTask#set

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

        //設置返回值
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

        //喚醒等待線程
        finishCompletion();
    }
}

java.util.concurrent.FutureTask#finishCompletion

private void finishCompletion() {
    // assert state > COMPLETING;

    //遍歷waiters鏈表
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;

                    //喚醒等待線程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
相關文章
相關標籤/搜索