Dubbo源碼解析(八)遠程通訊——開篇

遠程通信——開篇

目標:介紹以後解讀遠程通信模塊的內容如何編排、介紹dubbo-remoting-api中的包結構設計以及最外層的的源碼解析。java

前言

服務治理框架中能夠大體分爲服務通訊和服務管理兩個部分,前面我先講到有關注冊中心的內容,也就是服務管理,固然dubbo的服務管理還包括監控中心、 telnet 命令,它們起到的是人工的服務管理做用,這個後續再介紹。接下來我要講解的就是跟服務通訊有關的部分,也就是遠程通信模塊。我在《dubbo源碼解析(一)Hello,Dubbo》的"(六)dubbo-remoting——遠程通訊模塊「中提到過一些內容。該模塊中提供了多種客戶端和服務端通訊的功能,而在對NIO框架選型上,dubbo交由用戶選擇,它集成了mina、netty、grizzly等各種NIO框架來搭建NIO服務器和客戶端,而且利用dubbo的SPI擴展機制可讓用戶自定義選擇。若是對SPI不太瞭解的朋友能夠查看《dubbo源碼解析(二)Dubbo擴展機制SPI》git

接下來咱們先來看看dubbo-remoting的包結構:github

remoting目錄

我接下來解讀遠程通信模塊的內容並非按照一個包一篇文章的編排,先來看看dubbo-remoting-api的包結構:segmentfault

dubbo-remoting-api

能夠看到,大篇幅的邏輯在dubbo-remoting-api中,因此我對於dubbo-remoting-api的解讀會分爲下面五個部分來講明,其中第五點會在本文介紹,其餘四點會分別用四篇文章來介紹:設計模式

  1. buffer包:緩衝在NIO框架中是很重要的存在,各個NIO框架都實現了本身相應的緩存操做。這個buffer包下包括了緩衝區的接口以及抽象
  2. exchange包:信息交換層,其中封裝了請求響應模式,在傳輸層之上從新封裝了 Request-Response 語義,爲了知足RPC的需求。這層能夠認爲專一在Request和Response攜帶的信息上。該層是RPC調用的通信基礎之一。
  3. telnet包:dubbo支持經過telnet命令來進行服務治理,該包下就封裝了這些通用指令的邏輯實現。
  4. transport包:網絡傳輸層,它只負責單向消息傳輸,是對 Mina, Netty, Grizzly 的抽象,它也能夠擴展 UDP 傳輸。該層是RPC調用的通信基礎之一。
  5. 最外層的源碼:該部分我會在下面之間給出介紹。

爲何我要把一個api分紅這麼多文章來說解,咱們先來看看下面的圖:api

dubbo-framework

咱們能夠看到紅框內的是遠程通信的框架,序列化我會在後面的主題中介紹,而Exchange層和Transport層在框架設計中起到了很重要的做用,也是支撐Remoting的核心,因此我要分開來介紹。緩存

除了上述的五點外,根據慣例,我仍是會分別介紹dubbo支持的實現客戶端和服務端通訊的七種方案,也就是說該遠程通信模塊我會用12篇文章詳細的講解。服務器

最外層源碼解析

(一)接口Endpoint

dubbo抽象出一個端的概念,也就是Endpoint接口,這個端就是一個點,而點對點之間是能夠雙向傳輸。在端的基礎上在衍生出通道、客戶端以及服務端的概念,也就是下面要介紹的Channel、Client、Server三個接口。在傳輸層,其實Client和Server的區別只是在語義上區別,並不區分請求和應答職責,在交換層客戶端和服務端也是一個點,可是已是有方向的點,因此區分了明確的請求和應答職責。二者都具有發送的能力,只是客戶端和服務端所關注的事情不同,這個在後面會分開介紹,而Endpoint接口抽象的方法就是它們共同擁有的方法。這也就是它們都能被抽象成端的緣由。網絡

來看一下它的源碼:框架

public interface Endpoint {

    // 得到該端的url
    URL getUrl();

    // 得到該端的通道處理器
    ChannelHandler getChannelHandler();
    
    // 得到該端的本地地址
    InetSocketAddress getLocalAddress();
    
    // 發送消息
    void send(Object message) throws RemotingException;
    
    // 發送消息,sent是是否已經發送的標記
    void send(Object message, boolean sent) throws RemotingException;
    
    // 關閉
    void close();
    
    // 優雅的關閉,也就是加入了等待時間
    void close(int timeout);
    
    // 開始關閉
    void startClose();
    
    // 判斷是否已經關閉
    boolean isClosed();

}
複製代碼
  1. 前三個方法是得到該端自己的一些屬性,
  2. 兩個send方法是發送消息,其中第二個方法多了一個sent的參數,爲了區分是不是第一次發送消息。
  3. 後面幾個方法是提供了關閉通道的操做以及判斷通道是否關閉的操做。

(二)接口Channel

該接口是通道接口,通道是通信的載體。仍是用自動販賣機的例子,自動販賣機就比如是一個通道,消息發送端會往通道輸入消息,而接收端會從通道讀消息。而且接收端發現通道沒有消息,就去作其餘事情了,不會形成阻塞。因此channel能夠讀也能夠寫,而且能夠異步讀寫。channel是client和server的傳輸橋樑。channel和client是一一對應的,也就是一個client對應一個channel,可是channel和server是多對一對關係,也就是一個server能夠對應多個channel。

public interface Channel extends Endpoint {

    // 得到遠程地址
    InetSocketAddress getRemoteAddress();

    // 判斷通道是否鏈接
    boolean isConnected();

    // 判斷是否有該key的值
    boolean hasAttribute(String key);

    // 得到該key對應的值
    Object getAttribute(String key);

    // 添加屬性
    void setAttribute(String key, Object value);

    // 移除屬性
    void removeAttribute(String key);

}
複製代碼

能夠看到Channel繼承了Endpoint,也就是端抽象出來的方法也一樣是channel所須要的。上面的幾個方法很好理解,我就很少介紹了。

(三)接口ChannelHandler

@SPI
public interface ChannelHandler {

    // 鏈接該通道
    void connected(Channel channel) throws RemotingException;

    // 斷開該通道
    void disconnected(Channel channel) throws RemotingException;

    // 發送給這個通道消息
    void sent(Channel channel, Object message) throws RemotingException;

    // 從這個通道內接收消息
    void received(Channel channel, Object message) throws RemotingException;

    // 從這個通道內捕獲異常
    void caught(Channel channel, Throwable exception) throws RemotingException;

}
複製代碼

該接口是負責channel中的邏輯處理,而且能夠看到這個接口有註解@SPI,是個可擴展接口,到時候都會在下面介紹各種NIO框架的時候會具體講到它的實現類。

(四)接口Client

public interface Client extends Endpoint, Channel, Resetable {
    
    // 重連
    void reconnect() throws RemotingException;

    // 重置,不推薦使用
    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);

}
複製代碼

客戶端接口,能夠看到它繼承了Endpoint、Channel和Resetable接口,繼承Endpoint的緣由上面我已經提到過了,客戶端和服務端其實只是語義上的不一樣,客戶端就是一個點。繼承Channel是由於客戶端跟通道是一一對應的,因此作了這樣的設計,還繼承了Resetable接口是爲了實現reset方法,該方法,不過已經打上@Deprecated註解,不推薦使用。除了這些客戶端就只須要關注一個重連的操做。

這裏插播一個公共模塊下的接口Resetable:

public interface Resetable {

    // 用於根據新傳入的 url 屬性,重置本身內部的一些屬性
    void reset(URL url);

}
複製代碼

該方法就是根據新的url來重置內部的屬性。

(五)接口Server

public interface Server extends Endpoint, Resetable {

    // 判斷是否綁定到本地端口,也就是該服務器是否啓動成功,可以鏈接、接收消息,提供服務。
    boolean isBound();

    // 得到鏈接該服務器的通道
    Collection<Channel> getChannels();

    // 經過遠程地址得到該地址對應的通道
    Channel getChannel(InetSocketAddress remoteAddress);

    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);

}
複製代碼

該接口是服務端接口,繼承了Endpoint和Resetable,繼承Endpoint是由於服務端也是一個點,繼承Resetable接口是爲了繼承reset方法。除了這些之外,服務端獨有的是檢測是否啓動成功,還有事得到鏈接該服務器上全部通道,這裏得到全部通道其實就意味着得到了全部鏈接該服務器的客戶端,由於客戶端和通道是一一對應的。

(六)接口Codec && Codec2

這兩個都是編解碼器,那麼什麼叫作編解碼器,在網絡中只是講數據當作是原始的字節序列,可是咱們的應用程序會把這些字節組織成有意義的信息,那麼網絡字節流和數據間的轉化就是很常見的任務。而編碼器是講應用程序的數據轉化爲網絡格式,解碼器則是講網絡格式轉化爲應用程序,同時具有這兩種功能的單一組件就叫編解碼器。在dubbo中Codec是老的編解碼器接口,而Codec2是新的編解碼器接口,而且dubbo已經用CodecAdapter把Codec適配成Codec2了。因此在這裏我就介紹Codec2接口,畢竟人總要往前看。

@SPI
public interface Codec2 {
  	//編碼
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
    //解碼
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;

    enum DecodeResult {
        // 須要更多輸入和忽略一些輸入
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }

}
複製代碼

由於是編解碼器,因此有兩個方法分別是編碼和解碼,上述有如下幾個關注的:

  1. Codec2是一個可擴展的接口,由於有@SPI註解。
  2. 用到了Adaptive機制,首先去url中尋找key爲codec的value,來加載url攜帶的配置中指定的codec的實現。
  3. 該接口中有個枚舉類型DecodeResult,由於解碼過程當中,須要解決 TCP 拆包、粘包的場景,因此增長了這兩種解碼結果,關於TCP 拆包、粘包的場景我就很少解釋,不懂得朋友能夠google一下。

(七)接口Decodeable

public interface Decodeable {

    //解碼
    public void decode() throws Exception;

}
複製代碼

該接口是可解碼的接口,該接口有兩個做用,第一個是在調用真正的decode方法實現的時候會有一些校驗,判斷是否能夠解碼,而且對解碼失敗會有一些消息設置;第二個是被用來message覈對用的。後面看具體的實現會更瞭解該接口的做用。

(八)接口Dispatcher

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    // 調度
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    // The last two parameters are reserved for compatibility with the old configuration
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}
複製代碼

該接口是調度器接口,dispatch是線程池的調度方法,這邊有幾個注意點:

  1. 該接口是一個可擴展接口,而且默認實現AllDispatcher,也就是全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件,心跳等。
  2. 用了Adaptive註解,也就是按照URL中配置來加載實現類,後面兩個參數是爲了兼容老版本,若是這是三個key對應的值都爲空,就選擇AllDispatcher來實現。

(九)接口Transporter

@SPI("netty")
public interface Transporter {
    
    // 綁定一個服務器
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    
    // 鏈接一個服務器,即建立一個客戶端
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}
複製代碼

該接口是網絡傳輸接口,有如下幾個注意點:

  1. 該接口是一個可擴展的接口,而且默認實現NettyTransporter。
  2. 用了dubbo SPI擴展機制中的Adaptive註解,加載對應的bind方法,使用url攜帶的server或者transporter屬性值,加載對應的connect方法,使用url攜帶的client或者transporter屬性值,不瞭解SPI擴展機制的能夠查看《dubbo源碼解析(二)Dubbo擴展機制SPI》

(十)Transporters

public class Transporters {

    static {
        // check duplicate jar package
        // 檢查重複的 jar 包
        Version.checkDuplicate(Transporters.class);
        Version.checkDuplicate(RemotingException.class);
    }

    private Transporters() {
    }

    public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
        return bind(URL.valueOf(url), handler);
    }

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        // 建立handler
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 調用Transporter的實現類對象的bind方法。
        // 例如實現NettyTransporter,則調用NettyTransporter的connect,而且返回相應的server
        return getTransporter().bind(url, handler);
    }

    public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
        return connect(URL.valueOf(url), handler);
    }

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 調用Transporter的實現類對象的connect方法。
        // 例如實現NettyTransporter,則調用NettyTransporter的connect,而且返回相應的client
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

}
複製代碼
  1. 該類用到了設計模式的外觀模式,經過該類的包裝,咱們就不會看到內部具體的實現細節,這樣下降了程序的複雜度,也提升了程序的可維護性。好比這個類,包裝了調用各類實現Transporter接口的方法,經過getTransporter來得到Transporter的實現對象,具體實現哪一個實現類,取決於url中攜帶的配置信息,若是url中沒有相應的配置,則默認選擇@SPI中的默認值netty。
  2. bind和connect方法分別有兩個重載方法,其中的操做只是把把字符串的url轉化爲URL對象。
  3. 靜態代碼塊中檢測了一下jar包是否有重複。

(十一)RemotingException && ExecutionException && TimeoutException

這三個類是遠程通訊的異常類:

  1. RemotingException繼承了Exception類,是遠程通訊的基礎異常。
  2. ExecutionException繼承了RemotingException類,ExecutionException是遠程通訊的執行異常。
  3. TimeoutException繼承了RemotingException類,TimeoutException是超時異常。

爲了避免影響篇幅,這三個類源碼我就不介紹了,由於比較簡單。

後記

該部分相關的源碼解析地址:github.com/CrazyHZM/in…

該文章講解了dubbo-remoting-api中的包結構設計以及最外層的的源碼解析,其中關鍵的是理解端的概念,明白在哪一層才區分了發送和接收的職責,後續文章會按照我上面的編排去寫。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見。

相關文章
相關標籤/搜索