Netty http client 編寫總結

Apache http client 有兩個問題,第一個是 apache http client 是阻塞式的讀取 Http request, 異步讀寫網絡數據性能更好些。第二個是當 client 到 server 的鏈接中斷時,http client 沒法感知到這件事的發生,須要開發者主動的輪訓校驗,發 keep alive 或者 heart beat 消息,而 netty 能夠設置回調函數,確保網絡鏈接中斷時有邏輯來 handlejava

使用 Netty 編寫 Http client,也有一些問題。首先是 netty 是事件驅動的,邏輯主要基於回調函數。數據包到來了也好,網絡鏈接中斷也好,都要經過寫回調函數肯定這些事件來臨後的後續操做。沒有人喜歡回調函數,Future 是 scala 裏討人喜歡的特性,它能把常規於語言裏經過回調才能解決的問題經過主動調用的方式來解決,配合 map, flatmap, for 甚至 async,scala 裏能夠作到徹底看不到回調函數。因此用 netty 作 client 第一個問題是如何把 回調函數搞成主動調用的函數。第二點是 長鏈接,一個 channel 不能發了一個消息就關閉了,每次發消息都要通過 http 三次握手四次揮手效率過低了,最好能重用 channel。第三個是 thread-safe,這個一開始並無考慮到,後來發現這個是最難解決的問題。固然 netty 做爲一個比較底層的包,用它來實現一些高層的接口是比較費時費力的,有不少事情都要手動去作。我花了四五天的時間,沒有解決這幾個問題,只留下一些經驗,供之後參考(見後面的 update)。git

回調函數變主動調用函數

netty 的操做都是基於回調函數的
消息到達時的邏輯github

protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;

            if (content instanceof HttpContent) {
                sendFullResponse(ctx, content);
            } else {
                log.error("content is not http content");
            }
        }
    }

到 server 的鏈接創建後建立 channel 的邏輯apache

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new HttpClientCodec());
                p.addLast(new HttpContentDecompressor());
                p.addLast(new HttpObjectAggregator(512 * 1024));
                p.addLast(new ResponseHandler());
            }
        });

這是我就但願有一個像 scala Future/Promise 同樣的東西,幫我把回調函數轉成主動調用函數,這是 scala 的一個例子bootstrap

Promise promise = Promise[HttpContent]
    def channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
        HttpContent content = (HttpContent) msg
        promise.success(content)
    }
    
    //somewhere else
    promise.future.map(content => println("content has been recieved in client"))

能夠說有了 promise,咱們接收到 httpContent 之後的事情就都能用主動調用的方式來寫了,雖然不徹底像普通的 java 代碼那樣簡單,須要加一些組合子,可是已經夠好了。promise

Java 裏沒有 promise,須要本身實現,參考了別人的代碼,發現 CountDownLatch 是實現 promise 的關鍵。setComplete 和 await 是最重要的兩個函數,一個設置 CountDownLatch,一個等待 CountDownLatch。緩存

private boolean setComplete(ResultHolder holder) {
        log.info("set complete");

        if (isDone()) {
            return false;
        }

        synchronized (this) {
            if (isDone()) {
                return false;
            }

            this.result = holder;
            if (this.complteLatch != null) {

                log.info("set complete time: " + System.currentTimeMillis());
                this.complteLatch.countDown();
            } else {
                log.info("completeLatch is null at the time: " + System.currentTimeMillis());
            }
        }
        return true;
    }
    
    
    public TaskFuture await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        synchronized (this) {
            if (isDone()) {
                return this;
            }

            if (this.complteLatch == null) {
                log.info("await time: " + System.currentTimeMillis());
                this.complteLatch = new CountDownLatch(1);
            }
        }

        this.complteLatch.await();
        return this;
    }

有了 Promise 之後就能把回調函數轉爲主動調用的函數了。雖然沒有組合子,可是已經夠好了,起碼 await 函數可以保證開發者拿到 HttpContent 後可以像正常的 java 代碼同樣操縱這個值。安全

public TaskPromise executeInternal(HttpRequest httpRequest)

重用 channel

根據上面那一節,獲得了這個函數網絡

public TaskPromise executeInternal(HttpRequest httpRequest) {
        final TaskPromise promise = new DefaultTaskPromise();

        log.info("new created promise hashcode is " + promise.hashCode());

        Channel channel = channelFuture.channel();
        channel.pipeline().get(ResponseHandler.class).setResponseHandler(promise);

        channel.writeAndFlush(httpRequest).addListener((ChannelFutureListener) future -> {
            if(future.isSuccess()) {
                log.info("write success");
             }
        });

public class ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {

    Logger log = LoggerFactory.getLogger(getClass());

    private TaskPromise promise;

    public void setResponseHandler(TaskPromise promise) {
        this.promise = promise;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
        log.info("channel read0 returned");
        promise.setSuccess(new NettyHttpResponse(ctx, msg));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
        log.info("exception caught in response handler");
        this.promise.setFailure(cause);
    }

}

每次調用 executeInternal 都建立一個 promise 將此 promise 放到 ResponseHandler 註冊一下,而後將 promise 句柄當作返回值。channel.pipeline().get(xxx).set(yyy) 是在 SO 找到的,看起來像個黑科技。這個函數看起來能夠知足需求了。
實際上否則,它不是線程安全的。當兩個線程同時調用 executeInternal 時,可能會同時 setResponseHandler,致使第一個 promise 被沖掉,而後兩個線程持有同一個 promise,一個 promise 只能被 setComplete 一次,第二次時會 exception。假如把 executeInernal 寫成同步的,線程安全問題仍在,由於只要是在一個請求返回來以前設置了 promise,第一個 promise 老是會被沖掉的。看起來這是一個解決不了的問題。併發

在 github 看了不少別人的代碼,發現你們都沒認真研究線程安全的問題,或者一個 channel 只發一個消息。查閱了一些資料,瞭解到InboundHandler 的執行是原子的,不用擔憂線程安全問題,但這對我也沒什麼幫助。找到 AsyncRestTemplate 的底層實現, Netty4ClientHttpRequest,我以爲它想作的事情跟我很像,但不過它好像是每一個 channel 只發一個消息。由於每次發新的消息,Bootstrap 都會調用 connect 函數。

@Override
    protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
        final SettableListenableFuture<ClientHttpResponse> responseFuture =
                new SettableListenableFuture<ClientHttpResponse>();

        ChannelFutureListener connectionListener = new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    Channel channel = future.channel();
                    channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
                    FullHttpRequest nettyRequest = createFullHttpRequest(headers);
                    channel.writeAndFlush(nettyRequest);
                }
                else {
                    responseFuture.setException(future.cause());
                }
            }
        };

        this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);

        return responseFuture;
    }

若是 bootstrap 可以緩存住之前的鏈接,那麼他就是我想要的東西了,可是我循環了 executeInternal 十次,發現創建了十個到 Server 的鏈接,也就說它並無重用 channel

update:

上一次寫總結時還卡在一個解決不了的併發問題上,當初的併發問題實際上能夠寫成 how concurrent response mapping to request. 在 Stackoverflow 和中文論壇上有人討論過這個問題,從他們的討論中看的結論是:

在 Netty 裏,channel 是 multiplex 的,可是返回的 Response 不會自動映射到發出的 Request 上,Netty 自己沒有這種能力,爲了達到這個效果,須要在應用層作一些功夫。通常有兩種作法

  • 若是 Client, Server 都由開發者掌控,那麼 client 和 server 能夠在交互協議上添加 requestId field, request 和 response 都有 requestId 標識。client 端每發送一個 request 後,就在本地記錄 (requestId, Future[Response]) 這麼一個 pair, 當 response 返回後,根據 requestId 找到對應的 future, 填充 future
  • 當 server 端不禁開發者掌控時,channel 只能被動接受沒有狀態的 response,沒有其餘信息可供 client 分辨它對應的是那個 request, 此時就只能使用 sync 模式發送消息了,這樣可以保證 response 對應着的就是正在等待它的那個 request. 使用這種方法就失掉了併發的特性,可是能夠建立一個 channel pool, 提供必定的併發性

對於有些不須要 response, request 對應關係的服務,channel 的寫法能夠保持原始的回調函數,好比 heartbeat 服務就能夠能夠這麼寫。

源碼連接https://github.com/sangszhou/NettyHttpClient

作了個簡單的 benchmark, 發現比 apache http client 慢了 2~3 倍,目前還不肯定性能瓶頸的位置。

相關文章
相關標籤/搜索