【本身讀源碼】Netty4.X系列(四) Netty中的異步調用

Netty中的異步調用

若是你們觀察仔細,會發現咱們以前所寫的代碼都是串行執行的,這是什麼意思?就是咱們看到代碼是什麼順序,最後程序就是按什麼順序執行的。java

可是Netty做爲一個高性能網絡框架,他的調用不少都是異步的,這樣,就能夠不等上一步作完,繼續行進下一步,達到多任務並行的做用。promise

實現概述

Netty是怎麼實現他的異步調用呢,大體總結了下由如下幾個核心部分
組成:網絡

  • 異步執行(executor)
  • 異步結果(future and promise)
  • Listener
  • 同步接口

首先,既然是異步調用,確定要有異步執行,同窗們這裏確定想到的是使用線程,沒錯,他的底層確實也是線程,只不過netty自身封裝成了executor,加強了線程的調度。框架

其次,是要能獲取到此次執行的結果,有的同窗可能會說使用callable,沒錯這確實是一種解決方案,可是netty並無使用這種,而是使用了一種更爲巧妙的設計(也就是經過promise對象來傳遞執行的結果)來完成這種操做,下面咱們會詳細說明。異步

最後就是promise對象提供的各類接口,好比Listener:能夠監聽執行的完成。或者是同步接口:保證異步執行的方法順序也是同步的。這篇文章中,咱們主要就講這兩,三個,其餘的各位童鞋能夠本身去看源碼。async

Executor實現

Netty中每一個Channel都有一個eventloop對象,實現還蠻複雜的,在這裏不是重點,因此咱們先實現一個,具備異步調用功能的exector。ide

自定義executor很簡單,只要實現Executor接口就行oop

public class MyNettyExecutor implements Executor {
private ThreadFactory factory;

public MyNettyExecutor(ThreadFactory factory) {
    this.factory = factory;
}

public void execute(Runnable command) {
    factory.newThread(command).start();
}
}

而後在須要使用的時候,實例化這個類,這裏爲了加強使用,咱們在類內部提供一個靜態初始化方法,並提供最簡單factory實現。性能

public static Executor newExecutor(){
        return new MyNettyExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
    }

Promise詳解

對furture/promise的理解

我對future的認識最開始源於Java的FutureTask框架,簡單來講,FutureTask是Future接口的一種實現,Future則是異步執行的結果。
而promise,從接口註釋上來看,是一種可修改的Future測試

/**
 * Special {@link Future} which is writable.
 */

那麼如今來看,一個異步結果的程序主要有下面幾步

  1. 生成promise對象
  2. 具體調用的地方傳入promise參數
  3. 異步調用完成後,設置promise爲完成
  4. 返回future對象

其中,第三步是發生在異步調用裏的,因此咱們看到的順序其實就是1->2>4,讓咱們來畫一張圖。
圖片描述

這其實能夠用一個現實中的例子來說述。

今天是小明女友的生日,小明想給她一個驚喜,因而想到了訂一個蛋糕給她,因此小明打電話給蛋糕店預約,店員回覆他說:好的,咱們知道了,製做好了會通知你的。因而小明就開開心心的打遊戲去了。

在上面的例子中,預約蛋糕就是一個異步過程,我只要通知須要作這件事的人(execute),並拿到回覆(Future),而後就能夠作其餘事情了。而後過一段時間打電話詢問蛋糕作好沒(isDone),若是沒作好,那就請他作好的時候通知我(listener)

因此如今咱們有了異步執行,還須要什麼呢?

  1. Future和Promise的定義接口
  2. Promise實現

而後,咱們理一下須要哪些接口

  1. isDone 判斷任務是否完成
  2. addListener
  3. trySuccess 設置任務完成並通知全部listener
  4. sync 同步方法,等待任務完成

定義

首先定義接口

/*listener接口,提供complete方法**/
public interface MyFutureListener<F extends MyFuture<?>>  extends EventListener {

    void operationComplete(F future);
}
/*Future接口**/
public interface MyFuture<V> {

    boolean isDone();

    MyFuture<V> sync() throws InterruptedException ;

    MyFuture<V> addListener(MyFutureListener<? extends MyFuture<? super V>> listener);


}
/*promise接口**/
public interface MyPromise<V> extends MyFuture<V>{

    boolean trySuccess();

    @Override
    MyPromise<V> addListener(MyFutureListener<? extends MyFuture<? super V>> listener);


}

isDone

咱們假設只有完成和未完成兩個狀態,Promise內維護着這個狀態值(初始爲null),那麼判斷是否完成只須要判斷這個值不爲空就好了。

private volatile Object result = null;

    @Override
    public boolean isDone() {
        return result != null;
    }

trySucess

那麼最簡單的success實現就是給這個對象賦值

@Override
    public boolean trySuccess() {
        result = new Object();
        return true;
    }

固然,這裏很不嚴謹,咱們後面再說。

Listener接口實現

上面咱們定義了listener接口,這裏要實現addListener方法

private List<MyFutureListener<? extends MyFuture<?>>> listeners;

@Override
    public MyPromise<Void> addListener(MyFutureListener<? extends MyFuture<? super Void>> listener) {
        synchronized (this) {
            if(listeners == null){
                listeners = new ArrayList<MyFutureListener<? extends MyFuture<?>>>();
                listeners.add(listener);
            }else {
                listeners.add(listener);
            }
        }
        if (isDone()){
            for (MyFutureListener f: listeners
                    ) {
                f.operationComplete(this);
            }
        }
        return this;
    }

而後完善下success方法,成功的時候調用每個listener的complete方法。

@Override
    public boolean trySuccess() {
        result = new Object();

        for (MyFutureListener f: listeners
             ) {
            f.operationComplete(this);
        }

        return true;
    }

同步接口實現

同步也很簡單,就是先判斷任務是否完成,沒有完成就wait一下。注意,wait以前咱們要保持同步,引入synchronized原語。

@Override
    public MyFuture<Void> sync() throws InterruptedException {
        if (isDone()){
            return this;
        }
       
         synchronized (this){
            while (!isDone()) {
            waiters++;
                try {
                    wait();
                }finally {
                    waiters--;
                }
            }
        }
      
        return this;
    }

同理,須要有地方去喚醒它,咱們繼續完善success方法,最終咱們的trySuccess方法以下

private synchronized void checkNotify(){
        if (waiters > 0){
            notifyAll();
        }
    }

 @Override
    public boolean trySuccess() {
        result = new Object();
        checkNotify();
        for (MyFutureListener f: listeners
             ) {
            f.operationComplete(this);
        }

        return true;
    }

Demo

輪子造好了,是時候寫個demo測試一下

public class MyExecutorDemo {
    public static void main(String[] args) {

        MyFuture future = asyncHello().addListener((MyFutureListener<MyPromise<Void>>) future1 -> System.out.println("監聽到完成"));
        if (future.isDone()){
            System.out.println("異步執行完成");
        }else{
            try {
                future.sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static MyFuture asyncHello(){
        Executor executor = MyNettyExecutor.newExecutor();
        final DefaultPromise promise = new DefaultPromise();
        executor.execute(() -> {
            System.out.println("Hello Async");
            try {
                //模擬一些操做
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            promise.trySuccess();
        });
        return promise;
    }
}

警告

不可用於生產,這個Future/promise的設計僅僅爲了說明異步執行和結果,距離netty中的異步框架還缺乏不少。

  1. NULL值檢查,整個設計中均沒有對對象作NULL的檢查,容易引發NullPointException。
  2. 異常處理缺失,對可能失敗的地方作異常處理(這也是是否能用於生產的合格檢驗)
  3. 非徹底異步,listener的通知沒有使用異步
  4. 待補充(以我如今的水平,暫時想不到)
相關文章
相關標籤/搜索