簡述RPC原理實現

 

 

 

前言

架構的改變,每每是由於業務規模的擴張。java

隨着業務規模的擴張,爲了知足業務對技術的要求,技術架構須要從單體應用架構升級到分佈式服務架構,來下降公司的技術成本,更好的適應業務的發展。git

分佈式服務架構的諸多優點,這裏就不一一列舉了,今天圍繞的話題是服務框架,爲了推行服務化,必然須要一套易用的服務框架,來支撐業務技術架構升級。 github

 

服務框架

服務架構的核心是服務調用,分佈式服務架構中的服務分佈在不一樣主機的不一樣進程上,服務的調用跟單體應用進程內方法調用的本質區別就是須要藉助網絡來進行通訊。算法

 

 

RPC Demo實現思路

原做者梁飛,在此記錄下他很是簡潔的rpc實現思路。apache

 

核心框架類

  /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.framework;
  ​
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.lang.reflect.InvocationHandler;
  import java.lang.reflect.Method;
  import java.lang.reflect.Proxy;
  import java.net.ServerSocket;
  import java.net.Socket;
  ​
  /**
   * RpcFramework
   * 
   * @author william.liangf
   */
  public class RpcFramework {
  ​
      /**
       * 暴露服務
       * 
       * @param service 服務實現
       * @param port 服務端口
       * @throws Exception
       */
      public static void export(final Object service, int port) throws Exception {
          if (service == null)
              throw new IllegalArgumentException("service instance == null");
          if (port <= 0 || port > 65535)
              throw new IllegalArgumentException("Invalid port " + port);
          System.out.println("Export service " + service.getClass().getName() + " on port " + port);
          ServerSocket server = new ServerSocket(port);
          for(;;) {
              try {
                  final Socket socket = server.accept();
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              try {
                                  ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                  try {
                                      String methodName = input.readUTF();
                                      Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                      Object[] arguments = (Object[])input.readObject();
                                      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                      try {
                                          Method method = service.getClass().getMethod(methodName, parameterTypes);
                                          Object result = method.invoke(service, arguments);
                                          output.writeObject(result);
                                      } catch (Throwable t) {
                                          output.writeObject(t);
                                      } finally {
                                          output.close();
                                      }
                                  } finally {
                                      input.close();
                                  }
                              } finally {
                                  socket.close();
                              }
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
  ​
      /**
       * 引用服務
       * 
       * @param <T> 接口泛型
       * @param interfaceClass 接口類型
       * @param host 服務器主機名
       * @param port 服務器端口
       * @return 遠程服務
       * @throws Exception
       */
      @SuppressWarnings("unchecked")
      public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
          if (interfaceClass == null)
              throw new IllegalArgumentException("Interface class == null");
          if (! interfaceClass.isInterface())
              throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
          if (host == null || host.length() == 0)
              throw new IllegalArgumentException("Host == null!");
          if (port <= 0 || port > 65535)
              throw new IllegalArgumentException("Invalid port " + port);
          System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
          return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
              public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                  Socket socket = new Socket(host, port);
                  try {
                      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                      try {
                          output.writeUTF(method.getName());
                          output.writeObject(method.getParameterTypes());
                          output.writeObject(arguments);
                          ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                          try {
                              Object result = input.readObject();
                              if (result instanceof Throwable) {
                                  throw (Throwable) result;
                              }
                              return result;
                          } finally {
                              input.close();
                          }
                      } finally {
                          output.close();
                      }
                  } finally {
                      socket.close();  
                }  
            }  
        });  
    }  
​  
}

 

 

定義服務接口

 /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.test;
  ​
  /**
   * HelloService
   * 
   * @author william.liangf
   */
  public interface HelloService {
  ​
      String hello(String name);
  ​
  }

 

 

實現服務

 1   /*
 2    * Copyright 2011 Alibaba.com All right reserved. This software is the
 3    * confidential and proprietary information of Alibaba.com ("Confidential
 4    * Information"). You shall not disclose such Confidential Information and shall
 5    * use it only in accordance with the terms of the license agreement you entered
 6    * into with Alibaba.com.
 7    */
 8   package com.alibaba.study.rpc.test;
 9 10   /**
11    * HelloServiceImpl
12    * 
13    * @author william.liangf
14    */
15   public class HelloServiceImpl implements HelloService {
16 17       public String hello(String name) {
18           return "Hello " + name;
19       }
20 21   }

 

 

暴露服務

 /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.test;
  ​
  import com.alibaba.study.rpc.framework.RpcFramework;
  ​
  /**
   * RpcProvider
   * 
   * @author william.liangf
   */
  public class RpcProvider {
  ​
      public static void main(String[] args) throws Exception {
          HelloService service = new HelloServiceImpl();
          RpcFramework.export(service, 1234);
      }
  ​
  }

 

 

引用服務

 1   /*
 2    * Copyright 2011 Alibaba.com All right reserved. This software is the
 3    * confidential and proprietary information of Alibaba.com ("Confidential
 4    * Information"). You shall not disclose such Confidential Information and shall
 5    * use it only in accordance with the terms of the license agreement you entered
 6    * into with Alibaba.com.
 7    */
 8   package com.alibaba.study.rpc.test;
 9 10   import com.alibaba.study.rpc.framework.RpcFramework;
11 12   /**
13    * RpcConsumer
14    * 
15    * @author william.liangf
16    */
17   public class RpcConsumer {
18       
19       public static void main(String[] args) throws Exception {
20           HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
21           for (int i = 0; i < Integer.MAX_VALUE; i ++) {
22               String hello = service.hello("World" + i);
23               System.out.println(hello);
24               Thread.sleep(1000);
25           }
26       }
27       
28   }

 

 

小結

梁飛大大的博客使用原生的jdk api就展示給各位讀者一個生動形象的rpc demo,實在是強。api

這個簡單的例子的實現思路是:數組

  • 使用阻塞的socket IO流來進行server和client的通訊,也就是rpc應用中服務提供方和服務消費方。而且是端對端的,用端口號來直接進行通訊 安全

  • 方法的遠程調用使用的是jdk的動態代理服務器

  • 參數的序列化也是使用的最簡單的objectStream網絡

 


 

服務框架

服務框架的核心是服務調用,分佈式服務架構中的服務分佈在不一樣主機的不一樣進程上,服務的調用跟單體應用進程內方法調用的本質區別就是須要藉助網絡來進行通訊。

下圖是服務框架的架構圖,主流的服務框架的實現都是這套架構,如 Dubbo、SpringCloud 等。

 

 

  • Invoker 是服務的調用方

  • Provider 是服務的提供方

  • Registry 是服務的註冊中心

  • Monitor 是服務的監控模塊

Invoker 和 Provider 分別做爲服務的調用和被調用方,這點很明確。

可是僅有這二者仍是不夠的,由於做爲調用方須要知道服務部署在哪,去哪調用服務,因此有了 Registry 模塊,它的功能是給服務提供方註冊服務,給服務調用方發現服務。

Monitor 做爲服務的監控模塊,負責服務的調用統計以及鏈路分析功能,也是服務治理重要的一環。

 

核心模塊

下圖是服務框架的流程圖,咱們分服務註冊、發現、調用三個方面來進行流程分解。

 

 

服務註冊是服務提供方向註冊中心註冊服務信息;當提供服務應用下線時,負責將服務註冊信息從註冊中心刪去。

服務發現是服務調用方從註冊中心訂閱服務,獲取服務提供方的相關信息;當服務註冊信息有變動時,註冊中心負責通知到服務調用方。

服務調用是服務調用方經過從註冊中心拿到服務提供方的信息,向服務提供方發起服務調用,獲取調用結果。

對照上述流程圖,咱們按照請求的具體過程進行分析。

做爲服務調用方 Invoker 的具體流程是:

  1. Request 從下往上,因爲服務調用方只能拿到服務提供方提供的 API 接口或者 API 接口的 JAR 包,因此服務調用方須要通過一層代理 Proxy 來假裝服務的實現;

  2. 通過代理 Proxy 以後,會通過路由 Router、負載均衡 LoadBalance 模塊,目的是從一堆從註冊中心拿到的服務提供方信息中選出最合適的服務提供方機器進行調用。另外,還會通過 Monitor 監控等模塊;

  3. 接着會通過服務編碼 Codec 模塊,這個模塊的目的是由於請求在網絡傳輸前須要按照通訊協議以及對象的序列化方式,對傳輸的請求進行編解碼;

  4. 最終會通過網絡通訊 Transporter 模塊,這個模塊將 Codec 編碼好的請求進行傳輸。

 

做爲服務提供方 Provider 的具體流程是:

  1. Request 從上往下,通過網絡通訊 Transporter 模塊,獲取到的是由調用方發送的Request字節數組。

  2. 接着通過服務編碼 Codec 模塊,根據通訊協議解出一個完整的請求包,而後使用具體的序列化方式反序列化成請求對象。

  3. 緊接着會通過監控、限流、鑑權等模塊。

  4. 最終會執行服務的真正業務實現 ServiceImpl,執行完後,結果按原路返回。

     

按照上述流程分解一個服務框架的相關工做,再去看一些開源的服務框架也就不難理解了。

通常服務框架的核心模塊應該有註冊中心、網絡通訊、服務編碼(通訊協議、序列化)、服務路由、負載均衡,服務鑑權,可用性保障(服務降級、服務限流、服務隔離)、服務監控(Metrics、Trace)、配置中心、服務治理平臺等。

 

註冊中心

註冊中心是用來註冊和發現服務的,須要具有的基本功能有註冊服務、下線服務、發現服務、通知服務變動等。

當前使用比較多的開源註冊中心有 ZookeeperETCDEureka 等。

Zookeeper 與 ETCD 在總體架構上都比較相似,使用方式很是便捷,應用比較普遍。

這兩套系統按照 CAP 理論,屬於 CP 系統,可用性會差一點,可是做爲中小規模服務註冊中心,仍是遊刃有餘,並無某些人說的那麼差勁。 Eureka 是 Spring Cloud Netflix 微服務套件中的一部分,很不幸的是 Eureka 2.0 開源工做宣告中止。

 

網絡通訊

服務的調用方和提供方都來自不一樣的主機的不一樣的進程,因此要進行調用,必然少不了網絡通訊。能夠說網絡通訊是分佈式系統的重中之重,網絡通訊框架的好壞直接影響服務框架的性能。從零實現一套性能高,穩定性強的通訊框架仍是很是難的,好在目前已經有不少開源的高性能的網絡通訊框架。 針對 Java 生態有 Mina、Netty 等,目前使用最普遍的也當屬 Netty。Netty 使用的是 per thread one eventloop 線程模型,這點與 Nginx 等其餘高性能網絡框架相似。另外,Netty 很是易用,因此網絡通訊選擇 Netty 框架天然是毫無疑問的。

 

Netty實踐學習案例,是Netty初學者及核心技術鞏固的最佳實踐。
能夠見個人netty學習工程:
https://github.com/sanshengshui/netty-learning-example

 

服務編碼

內存對象要通過網絡傳輸前須要作兩件事:第一是肯定好通訊協議,第二序列化。

 

通訊協議

通訊協議說白了在發送數據前按照必定的格式來處理數據,而後進行發送,保證接收方拿到數據知道按照什麼樣的格式進行處理。

有些同窗可能不理解,爲何須要通訊協議,不是有 TCP、UDP 協議了嗎?這裏說的不是傳輸層的通訊協議,應該是應用層的協議相似 HTTP。

由於的 TCP 協議雖然已經保證了可靠有序的傳輸,可是若是沒有一套應用層的協議,就不知道發過來的字節數據是否是一個完整的數據請求,或者說是多個請求的字節數據都在一塊兒,沒法拆分,這就是是所謂的粘包,須要按照協議進行拆包,拆成一個個完整的請求包進行處理。

協議的實現上通常大廠或者開源的服務框架選擇自建協議,更偏向服務領域。如 Dubbo,固然也有些框架直接使用 HTTP,HTTP/2,好比 GRPC 使用的就是 HTTP/2。

序列化

因爲向網絡層發送的數據必須是字節數據,不可能直接將一個對象發送到網絡,因此在發送對象數據前,通常須要將對象序列化成字節數據,而後進行傳輸。

在服務方收到網絡的字節數據時,須要通過反序列化拿到相關的對象。

序列化的實現目前現成比較多,如 Hessian、JSON、Thrift、ProtoBuf 等。Thrift 和 ProtoBuf 能支持跨語言,性能比較好,不過使用時須要編寫 IDL 文件,有點麻煩。Hessian、JSON 使用起來比較友好,可是性能會差一點。


 

服務路由

服務路由指的是向服務提供方發起調用時,須要根據必定的算法從註冊中心拿到的服務方地址信息中選擇其中的一批機器進行調用。

路由的算法通常是根據場景來進行選擇的,好比有些公司實施兩地三中心這種高可用部署,可是因爲兩地的網絡延時比較大,那這時就能夠實施同地區路由策略,好比上海的調用方請求會優先選擇上海的服務進行調用,來下降網絡延時致使的服務端到端的調用耗時。

還有些框架支持腳本配置來進行定向路由策略。


 

負載均衡

負載均衡是緊接着服務路由的模塊,負載均衡負責將發送請求均勻合理的發送到服務提供方的節點上,而備選機器,通常就是通過路由模塊選擇出來的。

負載均衡的算法有不少,如 RoundRobin、Random、LeastActive、ConsistentHash 等。

並且這些算法通常都是基於權重的加強版本,由於須要根據權重來調節每臺服務節點的流量。


 

服務鑑權

服務鑑權是服務安全調用的基礎,雖然絕大部分服務都是公司內部服務,可是對於敏感度較高的數據仍是須要進行鑑權的。

鑑權的服務須要對服務的調用方進行受權,未經受權的調用方是不可以調用該服務的。

關於服務鑑權的實現大都是基於 token 的認證方案,如 JWT(JSON Web Token) 認證。


 

可用性保障

可用性保障模塊是服務高可用的一個重要保證。

服務在交互中主要分紅調用方和提供方兩種角色,做爲服務調用方,能夠經過服務降級提高可用性。做爲服務提供方,能夠經過服務限流、服務隔離來保證可用性。

服務降級

服務降級指的是當依賴的服務不可用時,使用預設的值來替代服務調用。

試想一下,假設調用一個非關鍵路徑上的服務(也就是說該調用獲取的結果是否實時,是否正確不是特別重要)出現問題,致使調用超時、失敗等,在沒有降級措施的狀況下,會直接應用服務調用方業務。

所以,有些非關鍵路徑上服務調用,能夠經過服務降級實現有損服務,柔性可用。 開源的降級組件有 Netflix 的 Hystrix,Hystrix 使用比較普遍。

服務限流

服務降級保護的是服務的調用方,也就是服務的依賴方。而服務的提供方呢,如何保證服務的可用性呢? 服務限流指的是對服務調用流量的限制,限制其調用頻次,來保護服務。

在高併發的場景中,很容易出現流量太高,致使服務被打垮。這裏就須要限流來保證服務自身的穩定運行。 Hystrix 也是能夠用來限流的,可是用的比較多的有 guava 的 RateLimiter,其使用的是令牌桶算法,可以保證平滑限流。

服務隔離

除了服務限流對服務提供方進行保護,就夠了嗎? 可能還不夠,考慮一下這樣的場景,假設某一個有問題的方法出現問題,處理很是耗時,這樣會堵住整個服務處理線程,致使正常的服務方法也不可以正常調用。所以還須要服務隔離。 服務隔離指的是對服務執行的方法進行線程池隔離,保證異常耗時方法不會對正常的方法調用產生干擾,進而保護服務的穩定運行,提高可用性。


 

服務監控

服務監控是高可用系統不可或缺的重要支撐。

服務監控不只包括服務調用等業務統計信息 Metrics,還包括分佈式鏈路追蹤 Trace。

分佈式系統監控比單體應用要複雜的多,須要將大量的監控信息進行聚合展現,尤爲是在分佈式鏈路追蹤方面,因爲服務調用過程當中涉及到多個分佈在不一樣機器上的服務,須要一個調用鏈路展現系統方便查看調用鏈路中耗時和出問題的環節。

Metrics

Metrics 監控主要是服務調用的一些統計報表,包括服務調用次數、成功數、失敗數,以及服務方法的調用耗時,如平均耗時,耗時99線,999線等。全方位展現服務的可用性以及性能等信息。

目前開源的 Metrics 監控有美團點評的 Cat、SoundCloud 的 Prometheus 以及基於 OpenTracking 的 SkyWalking。

Trace

Trace 監控是對分佈式服務調用過程當中的總體鏈路展現和分析。方便查看鏈路上各個環境的性能問題。

分佈式鏈路追蹤的原理大都是基於 Google 的論文 Dapper, a Large-Scale Distributed Systems Tracing Infrastructure。 開源的分佈式鏈路追蹤系統有美團點評的 Cat,基於 OpenTracking 的SkyWalking、Twitter 的 ZipKin。


 

配置中心

配置中心不光是常見的系統須要,服務框架也須要,它可以對系統中使用的配置進行管理,也可以針對修改配置動態通知到應用系統。 一套完善的服務框架,必然少不了配置,如一些動態開關、降級配置、限流配置、鑑權配置等。

開源的配置中心有阿里的 Diamond,攜程的 Apollo。


 

治理平臺

治理平臺指的是對服務進行管理的平臺。

微服務微了以後,必然會致使服務數量的上升,若是沒有一個完善的治理平臺,服務規模擴大以後,很難去維護,也必然致使故障頻頻,而且極度影響開發效率。

治理平臺主要是服務功能的相關操做平臺,包括服務權重修改、服務下線、鑑權降級等配置修改等。 治理平臺跟服務框架的耦合比較強,因此開源的比較少。

其餘

關於RPC原理實現詳解到這裏就結束了。

原創不易,若是感受不錯,但願給個推薦!您的支持是我寫做的最大動力!

版權聲明:

做者:穆書偉

博客園出處:www.cnblogs.com/sanshengshu…

github出處:github.com/sanshengshu…    

我的博客出處:sanshengshui.github.io/

相關文章
相關標籤/搜索