請戳GitHub原文: github.com/wangzhiwubi…java
Usergit
User-stubgithub
RPCRuntimeweb
Server-stub編程
Serverjson
這 5 個部分的關係以下圖所示數組
這裏 user 就是 client 端,當 user 想發起一個遠程調用時,它實際是經過本地調用user-stub。user-stub 負責將調用的接口、方法和參數經過約定的協議規範進行編碼並經過本地的 RPCRuntime 實例傳輸到遠端的實例。遠端 RPCRuntime 實例收到請求後交給 server-stub 進行解碼後發起本地端調用,調用結果再返回給 user 端。bash
粗粒度的 RPC 實現概念結構,這裏咱們進一步細化它應該由哪些組件構成,以下圖所示。服務器
RPC 服務方經過 RpcServer 去導出(export)遠程接口方法,而客戶方經過 RpcClient 去引入(import)遠程接口方法。客戶方像調用本地方法同樣去調用遠程接口方法,RPC 框架提供接口的代理實現,實際的調用將委託給代理RpcProxy 。代理封裝調用信息並將調用轉交給RpcInvoker 去實際執行。在客戶端的RpcInvoker 經過鏈接器RpcConnector 去維持與服務端的通道RpcChannel,並使用RpcProtocol 執行協議編碼(encode)並將編碼後的請求消息經過通道發送給服務方。網絡
RPC 服務端接收器 RpcAcceptor 接收客戶端的調用請求,一樣使用RpcProtocol 執行協議解碼(decode)。解碼後的調用信息傳遞給RpcProcessor 去控制處理調用過程,最後再委託調用給RpcInvoker 去實際執行並返回調用結果。以下是各個部分的詳細職責:
1. RpcServer
負責導出(export)遠程接口
2. RpcClient
負責導入(import)遠程接口的代理實現
3. RpcProxy
遠程接口的代理實現
4. RpcInvoker
客戶方實現:負責編碼調用信息和發送調用請求到服務方並等待調用結果返回
服務方實現:負責調用服務端接口的具體實現並返回調用結果
5. RpcProtocol
負責協議編/解碼
6. RpcConnector
負責維持客戶方和服務方的鏈接通道和發送數據到服務方
7. RpcAcceptor
負責接收客戶方請求並返回請求結果
8. RpcProcessor
負責在服務方控制調用過程,包括管理調用線程池、超時時間等
9. RpcChannel
數據傳輸通道
複製代碼
目前經常使用的RPC框架以下:
1. Thrift:thrift是一個軟件框架,用來進行可擴展且跨語言的服務的開發。它結合了功能強大的軟件堆棧和代碼生成引擎,以構建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 這些編程語言間無縫結合的、高效的服務。
2. Dubbo:Dubbo是一個分佈式服務框架,以及SOA治理方案。其功能主要包括:高性能NIO通信及多協議集成,服務動態尋址與路由,軟負載均衡與容錯,依賴分析與降級等。 Dubbo是阿里巴巴內部的SOA服務化治理方案的核心框架,Dubbo自2011年開源後,已被許多非阿里系公司使用。
3. Spring Cloud:Spring Cloud由衆多子項目組成,如Spring Cloud Config、Spring Cloud Netflix、Spring Cloud Consul 等,提供了搭建分佈式系統及微服務經常使用的工具,如配置管理、服務發現、斷路器、智能路由、微代理、控制總線、一次性token、全局鎖、選主、分佈式會話和集羣狀態等,知足了構建微服務所需的全部解決方案。Spring Cloud基於Spring Boot, 使得開發部署極其簡單。
在架構上,RPC和Message的差別點是,Message有一箇中間結點Message Queue,能夠把消息存儲。 消息的特色
RPC框架實現的幾個核心技術點:
(1)服務暴露:
遠程提供者須要以某種形式提供服務調用相關的信息,包括但不限於服務接口定義、數據結構、或者中間態的服務定義文件。例如Facebook的Thrift的IDL文件,Web service的WSDL文件;服務的調用者須要經過必定的途徑獲取遠程服務調用相關的信息。
目前,大部分跨語言平臺 RPC 框架採用根據 IDL 定義經過 code generator 去生成 stub 代碼,這種方式下實際導入的過程就是經過代碼生成器在編譯期完成的。代碼生成的方式對跨語言平臺 RPC 框架而言是必然的選擇,而對於同一語言平臺的 RPC 則能夠經過共享接口定義來實現。這裏的導入方式本質也是一種代碼生成技術,只不過是在運行時生成,比靜態編譯期的代碼生成看起來更簡潔些。
java 中還有一種比較特殊的調用就是多態,也就是一個接口可能有多個實現,那麼遠程調用時到底調用哪一個?這個本地調用的語義是經過 jvm 提供的引用多態性隱式實現的,那麼對於 RPC 來講跨進程的調用就無法隱式實現了。若是前面DemoService 接口有 2 個實現,那麼在導出接口時就須要特殊標記不一樣的實現須要,那麼遠程調用時也須要傳遞該標記才能調用到正確的實現類,這樣就解決了多態調用的語義問題。
(2)遠程代理對象:
服務調用者用的服務實際是遠程服務的本地代理。說白了就是經過動態代理來實現。
java 裏至少提供了兩種技術來提供動態代碼生成,一種是 jdk 動態代理,另一種是字節碼生成。動態代理相比字節碼生成使用起來更方便,但動態代理方式在性能上是要遜色於直接的字節碼生成的,而字節碼生成在代碼可讀性上要差不少。二者權衡起來,我的認爲犧牲一些性能來得到代碼可讀性和可維護性顯得更重要。
(3)通訊:
RPC框架與具體的協議無關。RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具備良好的跨平臺性,但其性能卻不如基於 TCP 協議的 RPC。
TCP/HTTP:衆所周知,TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在數據傳輸方面,越底層越快,所以,在通常狀況下,TCP 必定比 HTTP 快。
消息ID:RPC 的應用場景實質是一種可靠的請求應答消息流,和 HTTP 相似。所以選擇長鏈接方式的 TCP 協議會更高效,與 HTTP 不一樣的是在協議層面咱們定義了每一個消息的惟一 id,所以能夠更容易的複用鏈接。
IO方式:爲了支持高併發,傳統的阻塞式 IO 顯然不太合適,所以咱們須要異步的 IO,即 NIO。Java 提供了 NIO 的解決方案,Java 7 也提供了更優秀的 NIO.2 支持。
多鏈接:既然使用長鏈接,那麼第一個問題是到底 client 和 server 之間須要多少根鏈接?實際上單鏈接和多鏈接在使用上沒有區別,對於數據傳輸量較小的應用類型,單鏈接基本足夠。單鏈接和多鏈接最大的區別在於,每根鏈接都有本身私有的發送和接收緩衝區,所以大數據量傳輸時分散在不一樣的鏈接緩衝區會獲得更好的吞吐效率。因此,若是你的數據傳輸量不足以讓單鏈接的緩衝區一直處於飽和狀態的話,那麼使用多鏈接並不會產生任何明顯的提高,反而會增長鏈接管理的開銷。
心跳:鏈接是由 client 端發起創建並維持。若是 client 和 server 之間是直連的,那麼鏈接通常不會中斷(固然物理鏈路故障除外)。若是 client 和 server 鏈接通過一些負載中轉設備,有可能鏈接一段時間不活躍時會被這些中間設備中斷。爲了保持鏈接有必要定時爲每一個鏈接發送心跳數據以維持鏈接不中斷。心跳消息是 RPC 框架庫使用的內部消息,在前文協議頭結構中也有一個專門的心跳位,就是用來標記心跳消息的,它對業務應用透明。
(4)序列化:
兩方面會直接影響 RPC 的性能,一是傳輸方式,二是序列化。
1. 序列化方式:畢竟是遠程通訊,須要將對象轉化成二進制流進行傳輸。不一樣的RPC框架應用的場景不一樣,在序列化上也會採起不一樣的技術。 就序列化而言,Java 提供了默認的序列化方式,但在高併發的狀況下,這種方式將會帶來一些性能上的瓶頸,因而市面上出現了一系列優秀的序列化框架,好比:Protobuf、Kryo、Hessian、Jackson 等,它們能夠取代 Java 默認的序列化,從而提供更高效的性能。
2. 編碼內容:出於效率考慮,編碼的信息越少越好(傳輸數據少),編碼的規則越簡單越好(執行效率高)。以下是編碼需具有的信息:
-- 調用編碼 --
1. 接口方法
包括接口名、方法名
2. 方法參數
包括參數類型、參數值
3. 調用屬性
包括調用屬性信息,例如調用附件隱式參數、調用超時時間等
-- 返回編碼 --
1. 返回結果
接口方法中定義的返回值
2. 返回碼
異常返回碼
3. 返回異常信息
調用異常信息
複製代碼
除了以上這些必須的調用信息,咱們可能還須要一些元信息以方便程序編解碼以及將來可能的擴展。這樣咱們的編碼消息裏面就分紅了兩部分,一部分是元信息、另外一部分是調用的必要信息。若是設計一種 RPC 協議消息的話,元信息咱們把它放在協議消息頭中,而必要信息放在協議消息體中。下面給出一種概念上的 RPC 協議消息設計格式:
-- 消息頭 --
magic : 協議魔數,爲解碼設計
header size: 協議頭長度,爲擴展設計
version : 協議版本,爲兼容設計
st : 消息體序列化類型
hb : 心跳消息標記,爲長鏈接傳輸層心跳設計
ow : 單向消息標記,
rp : 響應消息標記,不置位默認是請求消息
status code: 響應消息狀態碼
reserved : 爲字節對齊保留
message id : 消息 id
body size : 消息體長度
-- 消息體 --
採用序列化編碼,常見有如下格式
xml : 如 webservie soap
json : 如 JSON-RPC
binary: 如 thrift; hession; kryo 等
複製代碼
(1).服務端
服務端提供客戶端所期待的服務,通常包括三個部分:服務接口,服務實現以及服務的註冊暴露三部分,以下:服務接口
public interface HelloService {
String hello(String name);
String hi(String msg);
}
複製代碼
服務實現
public class HelloServiceImpl implements HelloService{
@Override
public String hello(String name) {
return "Hello " + name;
}
@Override
public String hi(String msg) {
return "Hi, " + msg;
}
}
複製代碼
服務暴露:只有把服務暴露出來,才能讓客戶端進行調用,這是RPC框架功能之一。
public class RpcProvider {
public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
// RPC框架將服務暴露出來,供客戶端消費
RpcFramework.export(service, 1234);
}
}
複製代碼
(2).客戶端
客戶端消費服務端所提供的服務,通常包括兩個部分:服務接口和服務引用兩個部分,以下:服務接口:與服務端共享同一個服務接口
public interface HelloService {
String hello(String name);
String hi(String msg);
}
複製代碼
服務引用:消費端經過RPC框架進行遠程調用,這也是RPC框架功能之一
public class RpcConsumer {
public static void main(String[] args) throws Exception {
// 由RpcFramework生成的HelloService的代理
HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
String hello = service.hello("World");
System.out.println("客戶端收到遠程調用的結果 : " + hello);
}
}
複製代碼
(3).RPC框架原型實現
RPC框架主要包括兩大功能:一個用於服務端暴露服務,一個用於客戶端引用服務。服務端暴露服務
/**
* 暴露服務
*
* @param service 服務實現
* @param port 服務端口
* @throws Exception
*/
public static void export(final Object service, int port) throws Exception {
if (service == null) {
throw new IllegalArgumentException("service instance == null");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println("Export service " + service.getClass().getName() + " on port " + port);
// 創建Socket服務端
ServerSocket server = new ServerSocket(port);
for (; ; ) {
try {
// 監聽Socket請求
final Socket socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
try {
try {
/* 獲取請求流,Server解析並獲取請求*/
// 構建對象輸入流,從源中讀取對象到程序中
ObjectInputStream input = new ObjectInputStream(
socket.getInputStream());
try {
System.out.println("\nServer解析請求 : ");
String methodName = input.readUTF();
System.out.println("methodName : " + methodName);
// 泛型與數組是不兼容的,除了通配符做泛型參數之外
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
System.out.println(
"parameterTypes : " + Arrays.toString(parameterTypes));
Object[] arguments = (Object[])input.readObject();
System.out.println("arguments : " + Arrays.toString(arguments));
/* Server 處理請求,進行響應*/
ObjectOutputStream output = new ObjectOutputStream(
socket.getOutputStream());
try {
// service類型爲Object的(能夠發佈任何服務),故只能經過反射調用處理請求
// 反射調用,處理請求
Method method = service.getClass().getMethod(methodName,
parameterTypes);
Object result = method.invoke(service, arguments);
System.out.println("\nServer 處理並生成響應 :");
System.out.println("result : " + result);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
從該RPC框架的簡易實現來看,RPC服務端邏輯是:首先建立ServerSocket負責監聽特定端口並接收客戶鏈接請求,而後使用Java原生的序列化/反序列化機制來解析獲得請求,包括所調用方法的名稱、參數列表和實參,最後反射調用服務端對服務接口的具體實現並將獲得的結果回傳至客戶端。至此,一次簡單PRC調用的服務端流程執行完畢。客戶端引用服務
/**
* 引用服務
*
* @param <T> 接口泛型
* @param interfaceClass 接口類型
* @param host 服務器主機名
* @param port 服務器端口
* @return 遠程服務,返回代理對象
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
throws Exception {
if (interfaceClass == null) {
throw new IllegalArgumentException("Interface class == null");
}
// JDK 動態代理的約束,只能實現對接口的代理
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException(
"The " + interfaceClass.getName() + " must be interface class!");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("Host == null!");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println(
"Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
// JDK 動態代理
T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] {interfaceClass}, new InvocationHandler() {
// invoke方法本意是對目標方法的加強,在這裏用於發送RPC請求和接收響應
@Override
public Object invoke(Object proxy, Method method, Object[] arguments)
throws Throwable {
// 建立Socket客戶端,並與服務端創建連接
Socket socket = new Socket(host, port);
try {
/* 客戶端像服務端進行請求,並將請求參數寫入流中*/
// 將對象寫入到對象輸出流,並將其發送到Socket流中去
ObjectOutputStream output = new ObjectOutputStream(
socket.getOutputStream());
try {
// 發送請求
System.out.println("\nClient發送請求 : ");
output.writeUTF(method.getName());
System.out.println("methodName : " + method.getName());
output.writeObject(method.getParameterTypes());
System.out.println("parameterTypes : " + Arrays.toString(method
.getParameterTypes()));
output.writeObject(arguments);
System.out.println("arguments : " + Arrays.toString(arguments));
/* 客戶端讀取並返回服務端的響應*/
ObjectInputStream input = new ObjectInputStream(
socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable)result;
}
System.out.println("\nClient收到響應 : ");
System.out.println("result : " + result);
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
return proxy;
}
複製代碼
從該RPC框架的簡易實現來看,RPC客戶端邏輯是:首先建立Socket客戶端並與服務端創建連接,而後使用Java原生的序列化/反序列化機制將調用請求發送給客戶端,包括所調用方法的名稱、參數列表將服務端的響應返回給用戶便可。至此,一次簡單PRC調用的客戶端流程執行完畢。特別地,從代碼實現來看,實現透明的PRC調用的關鍵就是 動態代理,這是RPC框架實現的靈魂所在。RPC原型實現
public class RpcFramework {
/**
* 暴露服務
*
* @param service 服務實現
* @param port 服務端口
* @throws Exception
*/
public static void export(final Object service, int port) throws Exception {
if (service == null) {
throw new IllegalArgumentException("service instance == null");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println("Export service " + service.getClass().getName() + " on port " + port);
// 創建Socket服務端
ServerSocket server = new ServerSocket(port);
for (; ; ) {
try {
// 監聽Socket請求
final Socket socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
try {
try {
/* 獲取請求流,Server解析並獲取請求*/
// 構建對象輸入流,從源中讀取對象到程序中
ObjectInputStream input = new ObjectInputStream(
socket.getInputStream());
try {
System.out.println("\nServer解析請求 : ");
String methodName = input.readUTF();
System.out.println("methodName : " + methodName);
// 泛型與數組是不兼容的,除了通配符做泛型參數之外
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
System.out.println(
"parameterTypes : " + Arrays.toString(parameterTypes));
Object[] arguments = (Object[])input.readObject();
System.out.println("arguments : " + Arrays.toString(arguments));
/* Server 處理請求,進行響應*/
ObjectOutputStream output = new ObjectOutputStream(
socket.getOutputStream());
try {
// service類型爲Object的(能夠發佈任何服務),故只能經過反射調用處理請求
// 反射調用,處理請求
Method method = service.getClass().getMethod(methodName,
parameterTypes);
Object result = method.invoke(service, arguments);
System.out.println("\nServer 處理並生成響應 :");
System.out.println("result : " + result);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 引用服務
*
* @param <T> 接口泛型
* @param interfaceClass 接口類型
* @param host 服務器主機名
* @param port 服務器端口
* @return 遠程服務,返回代理對象
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
throws Exception {
if (interfaceClass == null) {
throw new IllegalArgumentException("Interface class == null");
}
// JDK 動態代理的約束,只能實現對接口的代理
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException(
"The " + interfaceClass.getName() + " must be interface class!");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("Host == null!");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println(
"Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
// JDK 動態代理
T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] {interfaceClass}, new InvocationHandler() {
// invoke方法本意是對目標方法的加強,在這裏用於發送RPC請求和接收響應
@Override
public Object invoke(Object proxy, Method method, Object[] arguments)
throws Throwable {
// 建立Socket客戶端,並與服務端創建連接
Socket socket = new Socket(host, port);
try {
/* 客戶端像服務端進行請求,並將請求參數寫入流中*/
// 將對象寫入到對象輸出流,並將其發送到Socket流中去
ObjectOutputStream output = new ObjectOutputStream(
socket.getOutputStream());
try {
// 發送請求
System.out.println("\nClient發送請求 : ");
output.writeUTF(method.getName());
System.out.println("methodName : " + method.getName());
output.writeObject(method.getParameterTypes());
System.out.println("parameterTypes : " + Arrays.toString(method
.getParameterTypes()));
output.writeObject(arguments);
System.out.println("arguments : " + Arrays.toString(arguments));
/* 客戶端讀取並返回服務端的響應*/
ObjectInputStream input = new ObjectInputStream(
socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable)result;
}
System.out.println("\nClient收到響應 : ");
System.out.println("result : " + result);
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
return proxy;
}
}
複製代碼
以上是簡易RPC框架實現的簡易完整代碼。
(1).RPC框架如何作到透明化遠程服務調用?
如何封裝通訊細節才能讓用戶像以本地調用方式調用遠程服務呢?就Java而言,動態代理恰是解決之道。Java動態代理有JDK動態代理和CGLIB動態代理兩種方式。儘管字節碼生成方式實現的代理更爲強大和高效,但代碼維護不易,所以RPC框架的大部分實現仍是選擇JDK動態代理的方式。在上面的例子中,RPCFramework實現中的invoke方法封裝了與遠端服務通訊的細節,消費方首先從RPCFramework得到服務提供方的接口,當執行helloService.hi(「Panda」)方法時就會調用invoke方法。
(2).如何發佈本身的服務?
如何讓別人使用咱們的服務呢?難道就像咱們上面的代碼同樣直接寫死服務的IP以及端口就能夠了嗎?事實上,在實際生產實現中,使用人肉告知的方式是不現實的,由於實際生產中服務機器上/下線太頻繁了。若是你發現一臺機器提供服務不夠,要再添加一臺,這個時候就要告訴調用者我如今有兩個IP了,大家要輪詢調用來實現負載均衡;調用者咬咬牙改了,結果某天一臺機器掛了,調用者發現服務有一半不可用,他又只能手動修改代碼來刪除掛掉那臺機器的ip。這必然是至關痛苦的!
有沒有一種方法能實現自動告知,即機器的上線/下線對調用方透明,調用者再也不須要寫死服務提供方地址?固然能夠,生產中的RPC框架都採用的是自動告知的方式,好比,阿里內部使用的RPC框架HSF是經過ConfigServer來完成這項任務的。此外,Zookeeper也被普遍用於實現服務自動註冊與發現功能。無論具體採用何種技術,他們大都採用的都是 發佈/訂閱模式。
(3).序列化與反序列化
咱們知道,Java對象是沒法直接在網絡中進行傳輸的。那麼,咱們的RPC請求如何發給服務端,客戶端又如何接收來自服務端的響應呢?答案是,在傳輸Java對象時,首先對其進行序列化,而後在相應的終端進行反序列化還原對象以便進行處理。事實上,序列化/反序列化技術也有不少種,好比Java的原生序列化方式、JSON、阿里的Hessian和ProtoBuff序列化等,它們在效率上存在差別,但又有各自的特色。