Future模擬

1. 疑惑java

    Future f = executor.submit(callable);異步

    線程池執行任務的時候, 爲了高效,使用的都是異步執行方式;上面這種形式是異步執行的,可是使用       f.get()方法獲取結果時又是阻塞的, 這是怎麼實現的呢?ide

2. 分析this

    從使用方式上咱們能夠推測出幾點:線程

           進入線程池中的任務(callable), 應該被放到Future中執行了;code

           Future執行任務是在新的線程中執行(這樣纔不會阻塞原來的線程);orm

           get方法調用時,阻塞主線程, 當任務執行完成後, 才取消主線程的阻塞;get

 

3. 實現it

/**
 *  future有幾種狀態:new(新建立), completing(正在執行), normal(執行完畢)
 */
public class MyFuture<T> implements Runnable {

    private int state;
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;

    private Callable<T> callable;
    private Object result;
    private WaitNode waiters;

    public MyFuture(Callable<T> callable) {
        this.callable = callable;
        this.state = NEW;
    }

    public T get() throws InterruptedException, ExecutionException{
        int s = state;
        //若不是‘完成’狀態, 則阻塞當前線程
        if (s <= COMPLETING) {
            s = await();
        }
        return report(s);
    }

    private int await() {
        WaitNode q = null;
        boolean queued = false;
        for(;;) {
            if (state > COMPLETING) {
                //若是是完成狀態, 則直接返回
                return state;
            } else if (q == null) {
                q = new WaitNode();
            } else if (!queued) {
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            }
            else {
                //阻塞當前線程
                LockSupport.park(this);
            }
        }
    }

    private T report(int s) throws ExecutionException{
        Object x = result;
        if (s == NORMAL) {
            return (T)x;
        }
        throw new ExecutionException((Throwable)x);
    }

    @Override
    public void run() {
        //此方法是有線程池中線程調用, 主線程阻塞, 不會影響這個方法的執行
        try {
            if (callable != null && state == NEW) {
                //任務的執行
                T v = callable.call();
                setResult(v);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void setResult(T result) {
        if (UNSAFE.compareAndSwapLong(this, stateOffset, state, COMPLETING)) {
            this.result = result;
            UNSAFE.compareAndSwapLong(this, stateOffset, state, NORMAL);
            finishComplete();
        }
    }

    private void finishComplete() {
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //取消阻塞
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        callable = null;
    }

   static final class WaitNode {
       Thread thread;
       WaitNode next;
       WaitNode() {
           this.thread = Thread.currentThread();
       }
   }

    private static sun.misc.Unsafe UNSAFE;
    private static long stateOffset;
    private static long waitersOffset;
    static {

        try {
            Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            UNSAFE = (Unsafe)f.get(null);

            Class myFutureClass = MyFuture.class;
            stateOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("state"));
            waitersOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("waiters"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
相關文章
相關標籤/搜索