從CompletableFuture學習Dubbo 2.7.x 全鏈路異步

CompletableFuture學習的小例子html

CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "1";
        });
// objectCompletableFuture.thenApply(r -> {
// System.out.println(r);
// return "2";
// });
// objectCompletableFuture.thenApply(r -> {
// System.out.println(r);
// return "2";
// });
        objectCompletableFuture.thenApply(r -> {
            System.out.println(r);
            return "3";
        }).whenComplete((result, t) -> {
            System.out.println(result);
        });
        Thread.sleep(20000L);
複製代碼

任務執行

下面的分析都是假設CompletableFuture線程未執行完的狀況
調用CompletableFuture#supplyAsync方法java

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
   	return asyncSupplyStage(asyncPool, supplier); // 默認線程池
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();// new一個CompletableFuture
    e.execute(new AsyncSupply<U>(d, f));// 執行一個新線程
    return d;
}

static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<T> fn;
     AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
         this.dep = dep; this.fn = fn;
     }

     public final Void getRawResult() { return null; }
     public final void setRawResult(Void v) {}
     public final boolean exec() { run(); return true; }

     public void run() {
         CompletableFuture<T> d; Supplier<T> f;
         if ((d = dep) != null && (f = fn) != null) {
             dep = null; fn = null;
             if (d.result == null) {
                 try {
                     d.completeValue(f.get()); // completeValue方法會把f的執行結果賦值到CompletableFuture#result
                 } catch (Throwable ex) {
                     d.completeThrowable(ex);
                 }
             }
             d.postComplete(); // 通知完成(先跳過這步,假設在上邊sleep了)
         }
     }
 }
複製代碼

假設run方法沒有跑完,則CompletableFuture#supplyAsync方法直接放回一個新CompletableFuture對象。
此時調用CompletableFuture#thenApply方法。apache

public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>(); // 又new一個CompletableFuture
    if (e != null || !d.uniApply(this, f, null)) { // e默認爲空,後面的uniApply會判斷this(也就是supplyAsync方法new的第一個CompletableFuture)是否執行完(result是否null),爲空就走這個if
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);// new一個UniApply對象,把this和新new的CompletableFuture封裝一下
        push(c);// push到this的stack中
        c.tryFire(SYNC);
    }
    return d;// 返回新new的CompletableFuture
}
複製代碼

此時的supplyAsync返回的CompletableFuture結構。CompletableFuture#stack存了封裝了兩個CompletableFuture對象的UniApply對象。
類圖只是爲了展現結構,不標準的哈 異步

在這裏插入圖片描述
thenApply返回新new的CompletableFuture,再調用CompletableFuture#whenComplete方法。和CompletableFuture#supplyAsync方法相似,只是封裝類變成了UniWhenComplete。

public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
private CompletableFuture<T> uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<T> d = new CompletableFuture<T>();
    if (e != null || !d.uniWhenComplete(this, f, null)) {
        UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}
複製代碼

類圖只是爲了展現結構,不標準的哈
async

在這裏插入圖片描述
如今把文章開頭的註釋打開,這樣會把新new的CompletableFuture的封裝類添加到CompletableFuture.Completion#next中,變成一個鏈表。
在這裏插入圖片描述
next存儲的是使用同一個對象執行thenApply等方法造成的鏈表,而dep存儲的是使用每一個thenApply等方法返回的CompletableFuture造成的stack?(看起來像鏈表)。

異步任務執行結果回調

回到CompletableFuture#supplyAsync方法execute的Runnable,當任務執行完會調用CompletableFuture#postComplete方法。
這個方法邏輯比較繞,不想看能夠跳過,功能是執行上面添加的全部的lambda回調ide

final void postComplete() {
    /* * 在每一個步驟中,變量f將當前依賴項保存爲彈出和運行。 它一次只沿一條路徑擴展,推進其餘路徑以免無限制的遞歸。 */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null || // h臨時存放了this stack
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) { // 將next鏈表cas到stack中
            if (t != null) {
                if (f != this) { // 若是f不等於this,則將添加到當前的stack中
                    pushStack(h); // 這樣操做會使得,以前多級結構,變成同一個Stack
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d; // tryFire會執行上述添加的全部lambda回調
        }
    }
}
複製代碼

Dubbo 2.7.x 全鏈路異步

dubbo.apache.org/zh-cn/docs/…post

NettyServerHandler#channelRead方法,Netty IO線程接收到請求。學習

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}
複製代碼

通過幾層調用後會調用到AllChannelHandler#received方法。會把請求分發到Dubbo內部的Executor。直接返回釋放Netty的IO線程。this

public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService executor = getExecutorService();
    try {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        // 異常處理,省略
    }
}
複製代碼

dubbo內部線程執行後,再通過幾層調用後會調用HeaderExchangeHandler#handleRequest方法。url

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    /** * 參數校驗,省略 */
    Object msg = req.getData();
    try {
        // handle data.
        CompletableFuture<Object> future = handler.reply(channel, msg); // 最終會調用本身實現的Service
        if (future.isDone()) {
            res.setStatus(Response.OK);
            res.setResult(future.get());
            channel.send(res);
            return;
        }
        future.whenComplete((result, t) -> {
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(result);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}
複製代碼

若是Service實現了CompletableFuture,則能夠把業務處理放到業務線程,釋放掉Dubbo線程。

public class AsyncServiceImpl implements AsyncService {
    @Override
    public CompletableFuture<String> sayHello(String name) {
        RpcContext savedContext = RpcContext.getContext();
        // 建議爲supplyAsync提供自定義線程池,避免使用JDK公用線程池
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(savedContext.getAttachment("consumer-key1"));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "async response from provider.";
        });
    }
}
複製代碼

優秀!~

相關文章
相關標籤/搜索