SOFA 源碼分析 —— 服務引用過程

前言

在前面的 SOFA 源碼分析 —— 服務發佈過程 文章中,咱們分析了 SOFA 的服務發佈過程,一個完整的 RPC 除了發佈服務,固然還須要引用服務。 So,今天就一塊兒來看看 SOFA 是如何引用服務的。實際上,基礎邏輯和咱們以前用 Netty 寫的 RPC 小 demo 相似。有興趣能夠看看這個 demo—— 本身用 Netty 實現一個簡單的 RPCjava

示例代碼

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
    .setInterfaceId(HelloService.class.getName()) // 指定接口
    .setProtocol("bolt") // 指定協議
    .setDirectUrl("bolt://127.0.0.1:9696"); // 指定直連地址

HelloService helloService = consumerConfig.refer();

while (true) {
    System.out.println(helloService.sayHello("world"));
    try {
        Thread.sleep(2000);
    } catch (Exception e) {
    }
}
複製代碼

一樣的,代碼例子來自 SOFA-RPC 源碼,位於com.alipay.sofa.rpc.quickstart.QuickStartClient網絡

很簡單,建立一個消費者配置類,而後使用這個配置引用一個代理對象,調用 代理對象的方法,實際上就是調用了遠程服務。app

咱們就經過上面這個簡單的例子,看看 SOFA 是如何進行服務引用的。注意:咱們今天的目的是看看主流程,有些細節可能暫時就一帶而過了,好比負載均衡,錯誤重試啥的,咱們之後再詳細分析,實際上,Client 相對有 Server,仍是要複雜一些的,由於它要考慮更多的狀況。負載均衡

好,開始吧!框架

源碼分析

首先看這個 ConsumerConfig 類,和前面的 ProviderConfig 相似,甚至於實現的接口和繼承的抽象類都是同樣的。異步

上面的例子設置了一些屬性,好比接口名稱,協議,直連地址。ide

關鍵點來了, refer 方法。源碼分析

這個方法就是返回了一個代理對象,代理對象中包含了以後遠程調用中須要的全部信息,好比過濾器,負載均衡等等。性能

而後,調用動態代理的方法,進行遠程調用,若是是 JDK 的動態代理的話,就是一個實現了 InvocationHandler 接口的類。這個類的 invoke 方法會攔截並進行遠程調用,天然就是使用 Netty 的客戶端對服務端發起調用並獲得數據啦。ui

先看看 refer 方法。

從 refer 方法開始源碼分析

該方法類套路和 provider 的套路相似,都是使用了一個 BootStrap 引導。即單一職責。

public T refer() {
    if (consumerBootstrap == null) {
        consumerBootstrap = Bootstraps.from(this);
    }
    return consumerBootstrap.refer();
}
複製代碼

ConsumerBootstrap 是個抽象類,SOFA 基於他進行擴展,目前有 2 個擴展點,bolt 和 rest。默認是 bolt。而 bolt 的實現則是 BoltConsumerBootstrap,目前來看 bolt 和 rest 並無什麼區別,都是使用的一個父類 DefaultConsumerBootstrap。

因此,來看看 DefaultConsumerBootstrap 的 refer 方法。代碼我就不貼了,由於很長。基本上引用服務的邏輯所有在這裏了,相似 Spring 的 refresh 方法。邏輯以下:

  1. 根據 ConsumerConfig 建立 key 和 appName。檢查參數合法性。對調用進行計數。
  2. 建立一個 Cluster 對象,這個對象很是重要,該對象管理着核心部分的信息。詳細的後面會說,而構造該對象的參數則是 BootStrap。
  3. 設置一些監聽器。
  4. 初始化 Cluster。其中包括設置路由,地址初始化,鏈接管理,過濾器鏈構造,重連線程等。
  5. 建立一個 proxyInvoker 執行對象,也就是初始調用對象,做用是注入到動態代理的攔截類中,以便動態代理今後處開始調用。構造參數也是 BootStrap。
  6. 最後,建立一個動態代理對象,目前動態代理有 2 個擴展點,分別是 JDK,javassist。默認是 JDK,但彷佛 javassist 的性能會更好一點。若是是 JDK 的話,攔截器則是 JDKInvocationHandler 類,構造方法須要代理類和剛剛建立的 proxyInvoker 對象。proxyInvoker 的做用就是從這裏開始鏈式調用。

其中,關鍵的對象是 Cluster。該對象須要重點關注。

Cluster 是個抽象類,也是個擴展點,實現了 Invoker, ProviderInfoListener, Initializable, Destroyable 等接口。而他目前的具體擴展點有 2 個: FailFastCluster(快速失敗), FailoverCluster(故障轉移和重試)。默認是後者。固然,還有一個抽象父類,AbstractCluster。

該類有個重要的方法, init 方法,初始化 Cluster,Cluster 能夠理解成客戶端,封裝了集羣模式、長鏈接管理、服務路由、負載均衡等抽象類。

init 方法代碼以下,很少。

public synchronized void init() {
    if (initialized) { // 已初始化
        return;
    }
    // 構造Router鏈
    routerChain = RouterChain.buildConsumerChain(consumerBootstrap);
    // 負載均衡策略 考慮是否可動態替換?
    loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap);
    // 地址管理器
    addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap);
    // 鏈接管理器
    connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);
    // 構造Filter鏈,最底層是調用過濾器
    this.filterChain = FilterChain.buildConsumerChain(this.consumerConfig,
        new ConsumerInvoker(consumerBootstrap));
    // 啓動重連線程
    connectionHolder.init();
    // 獲得服務端列表
    List<ProviderGroup> all = consumerBootstrap.subscribe();
    if (CommonUtils.isNotEmpty(all)) {
        // 初始化服務端鏈接(創建長鏈接)
        updateAllProviders(all);
    }
    // 啓動成功
    initialized = true;
    // 若是check=true表示強依賴
    if (consumerConfig.isCheck() && !isAvailable()) {
    }
}
複製代碼

能夠看到,乾的活不少。每一步都值得花不少時間去看,但看完全部不是咱們今天的任務,咱們今天關注一下調用,上面的代碼中,有一段構建 FilterChain 的代碼是值得咱們今天注意的。

建立了一個 ConsumerInvoker 對象,做爲最後一個過濾器調用,關於過濾器的設計,咱們以前已經研究過了,再也不贅述,詳情 SOFA 源碼分析 —— 過濾器設計

咱們主要看看 ConsumerInvoker 類,該類是離 Netty 最近的過濾器。實際上,他也是擁有了一個 BootStrap,但,注意,擁有了 BootStrap ,至關於挾天子以令諸侯,啥都有了,在他的 invoke 方法中,會直接獲取 Boostrap 的 Cluster 向 Netty 發送數據。

代碼以下:

return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
複製代碼

厲害吧。

那麼,Cluster 是如何進行 sendMsg 的呢?若是是 bolt 類型的 Cluster 的話,就直接使用 bolt 的 RpcClient 進行調用了,而 RpcClient 則是使用的 Netty 的 Channel 的 writeAndFlush 方法發送數據。若是是同步調用的話,就阻塞等待數據。

總的流程就是這樣,具體細節留到之後慢慢分析。

下面看看拿到動態代理對象後,如何進行調用。

動態代理如何調用?

當咱們調用的時候,確定會被 JDKInvocationHandler 攔截。攔截方法則是 invoke 方法。方法很簡單,主要就是使用咱們以前注入的 proxyInvoker 的 invoke 方法。咱們以前說了,proxyInvoker 的做用其實就是一個鏈表的頭。而他主要了代理了真正的主角 Cluster,因此,你能夠想到,他的 invoke 方法確定是調用了 Cluster 的 invoke 方法。

Cluster 是真正的主角(注意:默認的 Cluster 是 FailoverCluster),那麼他的調用確定是一連串的過濾器。目前默認有兩個過濾器:ConsumerExceptionFilter, RpcReferenceContextFilter。最後一個過濾器則是咱們以前說的,離 Netty 最近的過濾器 —— ConsumerInvoker。

ConsumerInvoker 會調用 Cluster 的 sendMsg 方法,Cluster 內部包含一個 ClientTransport ,這個 ClientTransport 就是個膠水類,融合 bolt 的 RpcClient。因此,你能夠想到,當 ConsumerInvoker 調用 sendMsg 方法的時候,最後會調用 RpcClient 的 invokeXXX 方法,多是異步,也多是同步的,bolt 支持多種調用方式。

而 RpcClient 最後則是調用 Netty 的 Channel 的 writeAndFlush 方法向服務提供者發送數據。

取一段 RpcClietn 中同步(默認)執行的代碼看看:

protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request, final int timeoutMillis) throws RemotingException, InterruptedException {
    final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
    conn.addInvokeFuture(future);
    conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (!f.isSuccess()) {
                conn.removeInvokeFuture(request.getId());
                future.putResponse(commandFactory.createSendFailedResponse(
                    conn.getRemoteAddress(), f.cause()));
            }
        }
    });
    // 阻塞等待
    RemotingCommand response = future.waitResponse(timeoutMillis);
    return response;
}
複製代碼

經過 Netty 的 Channel 的 writeAndFlush 方法發送數據,並添加一個監聽器,若是失敗了,就向 future 中注入一個失敗的對象。

在異步執行後,主線程開始等待,內部使用 countDownLatch 進行阻塞。而 countDownLatch 的初始化參數爲 1。何時喚醒 countDownLatch 呢?

在 putResponse 方法中,會喚醒 countDownLatch。

而 putResponse 方法則會被不少地方使用。好比在 bolt 的 RpcResponseProcessor 的 doProcess 方法中就會調用。而這個方法則是在 RpcHandler 的 channelRead 方法中間接調用。

因此,若是 writeAndFlush 失敗了,會 putResponse ,沒有失敗的話,正常執行,則會在 channelRead 方法後簡介調用 putResponse.

總結一下調用的邏輯吧,樓主畫了一張圖,你們能夠看看,畫的很差,還請見諒。

image.png

紅色線條是調用鏈,從 JDKInvocationHandler 開始,直到 Netty。綠色部分是 Cluster,和 Client 的核心。大紅色部分是 bolt 和 Netty。

好了,關於 SOFA 的服務引用主流程咱們今天就差很少介紹完了,固然,有不少精華尚未研究到。咱們之後會慢慢的去研究。

總結

看完了 SOFA 的服務發佈和服務引用,相比較 SpringCloud 而言,真心以爲很輕量。上面的一幅圖基本就能展現大部分調用過程,這在 Spring 和 Tomcat 這種框架中,是不可想象的。而 bolt 的隔離也讓 RPC 框架有了更多的選擇,經過不一樣的擴展點實現,你可使用不一樣的網絡通訊框架。這時候,有必要上一張 SOFA 官方的圖了:

image.png

從上圖能夠看出,咱們今天比較熟悉的地方,好比 Cluster,包含了過濾器,負載均衡,路由,而後調用 remoting 的遠程模塊,也就是 bolt。 經過 sendMsg 方法。

而 Cluster 的外部模塊,咱們今天就沒有仔細看了,這個確定是要留到從此看的。好比地址管理,鏈接管理等等。

好啦,今天就到這裏。若有不對之處,還請指正。

相關文章
相關標籤/搜索