dubbo協議下的單一長鏈接與多線程併發如何協同工做

dubbo協議下的單一長鏈接與多線程併發如何協同工做

上班的路上忽然就冒出了這麼個問題:既然在dubbo中描述消費者和提供者之間採用的是單一長鏈接,那麼若是消費者端是高併發多線程模型的web應用,單一長鏈接如何解決多線程併發請求問題呢?

其實若是不太瞭解socket或者多線程編程的相關知識,不太容易理解這個問題。傳統的最簡單的RPC方式,應該是爲每次遠程調用請求建立一個對應的線程,咱們先不說這種方式的缺點。至少優勢很明顯,就是簡單。簡單體如今哪兒?html

通訊雙方一對一(相比NIO來講)。java

通俗點來講,socket通訊的雙方發送和接受數據不會被其它(線程)干擾,這種干擾不一樣於數數據包的「粘包問題」。其實說白了就至關於電話線路的場景:git

試想一下若是多我的同時對着同一個話筒大喊,對方接受到的聲音就會是重疊且雜亂的。github

對於單一的socket通道來講,若是發送方多線程的話,不加控制就會致使通道中的數據亂七八糟,接收端沒法區分數據的單位,也就沒法正確的處理請求。web

乍一看,彷佛dubbo協議所說的單一長鏈接與客戶端多線程併發請求之間,是水火不容的。但其實稍加設計,就可讓它們和諧相處。編程

socket中的粘包問題是怎麼解決的?用的最多的實際上是定義一個定長的數據包頭,其中包含了完整數據包的長度,以此來完成服務器端拆包工做。服務器

那麼解決多線程使用單一長鏈接併發請求時包乾擾的方法也有點雷同,就是給包頭中添加一個標識id,服務器端響應請求時也要攜帶這個id,供客戶端多線程領取對應的響應數據提供線索。多線程

其實若是不考慮性能的話,dubbo徹底也能夠爲每一個客戶端線程建立一個對應的服務器端線程,但這是海量高併發場景所不能接受的~~併發

那麼腦補一張圖:socket

下面我們試圖從代碼中找到痕跡。

一路追蹤,咱們來到這個類:com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.java,先來看看其中的request方法,大概在第101行左右:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);

    //這個future就是前面咱們提到的:客戶端併發請求線程阻塞的對象
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        channel.send(req);  //非阻塞調用
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

注意這個方法返回的ResponseFuture對象,當前處理客戶端請求的線程在通過一系列調用後,會拿到ResponseFuture對象,最終該線程會阻塞在這個對象的下面這個方法調用上,以下:

public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (! isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (! isDone()) {    //無限連
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

上面我已經看到請求線程已經阻塞,那麼又是如何被喚醒的呢?再看一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.java,其實全部實現了ChannelHandler接口的類都被設計爲裝飾器模式,因此你能夠看到相似這樣的代碼:

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(
            new HeartbeatHandler(
                    ExtensionLoader.getExtensionLoader(Dispather.class).getAdaptiveExtension().dispath(handler, url)
            ));
}

如今來仔細看一下HeaderExchangeHandler類的定義,先看一下它定義的received方法,下面是代碼片斷:

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
          .....
        } else if (message instanceof Response) {   
            //這裏就是做爲消費者的dubbo客戶端在接收到響應後,觸發通知對應等待線程的起點
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
           .....
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

咱們主要看中間的那個條件分支,它是用來處理響應消息的,也就是說當dubbo客戶端接收到來自服務端的響應後會執行到這個分支,它簡單的調用了handleResponse方法,咱們追過去看看:

static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {  //排除心跳類型的響應
        DefaultFuture.received(channel, response);
    }
}

熟悉的身影:DefaultFuture,它是實現了咱們上面說的ResponseFuture接口類型,實際上細心的童鞋應該能夠看到,上面request方法中其實實例化的就是這個DefaultFutrue對象:

DefaultFuture future = new DefaultFuture(channel, req, timeout);

那麼咱們能夠繼續來看一下DefaultFuture.received方法的實現細節:

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

留一下咱們以前提到的id的做用,這裏能夠看到它已經開始發揮做用了。經過idDefaultFuture.FUTURES能夠拿到具體的那個DefaultFuture對象,它就是上面咱們提到的,阻塞請求線程的那個對象。好,找到目標後,調用它的doReceived方法,這就是標準的java多線程編程知識了:

private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

這樣咱們就能夠證明上圖中左邊的綠色箭頭所標註的兩點。


接下來咱們再來看看右邊綠色箭頭提到的兩點是如何實現的?其實dubbo在NIO的實現上默認依賴的是netty,也就是說真正在長鏈接兩端發包和接包的苦力是netty。因爲哥們我對netty不是很熟悉,因此暫時咱們就直接把netty當作黑箱,只須要知道它能夠很好的完成NIO通訊便可。

相關文章
相關標籤/搜索