java 遠程調用 RPC

1. 概念

  RPC,全稱爲Remote Procedure Call,即遠程過程調用,它是一個計算機通訊協議。它容許像調用本地服務同樣調用遠程服務。它能夠有不一樣的實現方式。如RMI(遠程方法調用)、Hessian、Http invoker等。RPC是與語言無關的。直觀說法就是A經過網絡調用B的過程方法。也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要調用B服務器上應用提供的函數/方法,因爲不在一個內存空間,不能直接調用,須要經過網絡來表達調用的語義和傳達調用的數據。php

 

一、首先要解決尋址的問題,也就是說,A服務器上的應用怎麼告訴底層的RPC框架,B服務器的IP,以及應用綁定的端口,還有方法的名稱,這樣才能完成調用java

二、方法的參數須要經過底層的網絡協議如TCP傳遞到B服務器,因爲網絡協議是基於二進制的,內存中的參數的值要序列化成二進制的形式node

三、在B服務器上完成尋址後,須要對參數進行反序列化,恢復爲內存中的表達方式,而後找到對應的方法進行本地調用,而後獲得返回值,python

四、返回值還要發送回服務器A上的應用,也要通過序列化的方式發送,服務器A接到後,再反序列化,恢復爲內存中的表達方式,交給應用git

1.1 框架

1.1.1 RMI

  java自帶遠程調用框架github

1.1.2 THrift

  thrift是一個軟件框架,用來進行可擴展且跨語言的服務的開發。它結合了功能強大的軟件堆棧和代碼生成引擎,以構建在 C++, Java, Go,Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 這些編程語言間無縫結合的、高效的服務。objective-c

1.1.3 Dubbo(Dubbox)  

  服務治理,也能夠作微服務編程

1.1.4 gRPC

  gRPC 是一個高性能、開源和通用的 RPC 框架,面向移動和 HTTP/2 設計。目前提供 C、Java 和 Go 語言版本,分別是:grpcgrpc-javagrpc-go. 其中 C 版本支持 CC++Node.jsPythonRubyObjective-CPHP 和 C# 支持.ruby

  開源中國組織翻譯的《gRPC 官方文檔中文版》:http://doc.oschina.net/grpc服務器

  gRPC 基於 HTTP/2 標準設計,帶來諸如雙向流、流控、頭部壓縮、單 TCP 鏈接上的多複用請求等特。這些特性使得其在移動設備上表現更好,更省電和節省空間佔用。

2. RPC的實現

  RPC可以讓本地應用簡單、高效地調用服務器中的過程(服務)。它主要應用在分佈式系統。如Hadoop中的IPC組件。但怎樣實現一個RPC框架呢?

  從下面幾個方面思考,僅供參考:

  1.通訊模型:假設通訊的爲A機器與B機器,A與B之間有通訊模型,在Java中通常基於BIO或NIO;。

  2.過程(服務)定位:使用給定的通訊方式,與肯定IP與端口及方法名稱肯定具體的過程或方法;

  3.遠程代理對象:本地調用的方法(服務)實際上是遠程方法的本地代理,所以可能須要一個遠程代理對象,對於Java而言,遠程代理對象可使用Java的動態對象實現,封裝了調用遠程方法調用;

  4.序列化,將對象名稱、方法名稱、參數等對象信息進行網絡傳輸須要轉換成二進制傳輸,這裏可能須要不一樣的序列化技術方案。如:protobuf,Arvo等。

2.1 實現技術方案

下面使用比較原始的方案實現RPC框架,採用Socket通訊、動態代理與反射與Java原生的序列化。

2.2 RPC框架架構

RPC架構分爲三部分:

1)服務提供者,運行在服務器端,提供服務接口定義與服務實現類。

2)服務中心,運行在服務器端,負責將本地服務發佈成遠程服務,管理遠程服務,提供給服務消費者使用。

3)服務消費者,運行在客戶端,經過遠程代理對象調用遠程服務。

2.3 Demo

2.3.1 服務提供者接口定義

HelloService.java

1 package com.loveincode.rpc;
2 
3 public interface HelloService {
4 
5     String hello(String name);
6 
7 }

2.3.2 服務提供者接口實現

HelloServiceImpl.java

 1 package com.loveincode.rpc;
 2 
 3 //HelloServices接口實現類:
 4 public class HelloServiceImpl implements HelloService {
 5      
 6     public String hello(String name) {
 7         return "hello, " + name;
 8     }
 9  
10 }

 

2.3.3 RPC框架服務中心

RpcFramework.java

  1 package com.loveincode.rpc;
  2 
  3 import java.io.ObjectInputStream;
  4 import java.io.ObjectOutputStream;
  5 import java.lang.reflect.InvocationHandler;
  6 import java.lang.reflect.Method;
  7 import java.lang.reflect.Proxy;
  8 import java.net.ServerSocket;
  9 import java.net.Socket;
 10 
 11 public class RpcFramework {
 12     /**
 13      * 暴露服務
 14      * 
 15      * @param service
 16      *            服務實現
 17      * @param port
 18      *            服務端口
 19      * @throws Exception
 20      */
 21     public static void export(final Object service, int port) throws Exception {
 22         if (service == null)
 23             throw new IllegalArgumentException("service instance == null");
 24         if (port <= 0 || port > 65535)
 25             throw new IllegalArgumentException("Invalid port " + port);
 26         System.out.println("Export service " + service.getClass().getName() + " on port " + port);
 27         ServerSocket server = new ServerSocket(port);// 前面都是驗證調用是否符合規則,這裏在被調用端開一個服務。下面就是死循環運行。
 28         for (;;) {
 29             try {
 30                 final Socket socket = server.accept();
 31                 new Thread(new Runnable() {// 對每個請求new一個線程,匿名類
 32                     @Override
 33                     public void run() {
 34                         try {
 35                             try {
 36                                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
 37                                 try {
 38                                     String methodName = input.readUTF();
 39                                     Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
 40                                     Object[] arguments = (Object[]) input.readObject();
 41                                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());// 接收客戶端傳來的方法名、參數類型、參數
 42                                     try {
 43                                         Method method = service.getClass().getMethod(methodName, parameterTypes);// 在本地生成對應的方法,
 44                                         Object result = method.invoke(service, arguments);// 調用
 45                                         output.writeObject(result);// 返回結果
 46                                     } catch (Throwable t) {
 47                                         output.writeObject(t);
 48                                     } finally {
 49                                         output.close();
 50                                     }
 51                                 } finally {
 52                                     input.close();
 53                                 }
 54                             } finally {
 55                                 socket.close();
 56                             }
 57                         } catch (Exception e) {
 58                             e.printStackTrace();
 59                         }
 60                     }
 61                 }).start();
 62             } catch (Exception e) {
 63                 e.printStackTrace();
 64             }
 65         }
 66     }
 67 
 68     /**
 69      * 引用服務
 70      * 
 71      * @param <T>
 72      *            接口泛型
 73      * @param interfaceClass
 74      *            接口類型
 75      * @param host
 76      *            服務器主機名
 77      * @param port
 78      *            服務器端口
 79      * @return 遠程服務
 80      * @throws Exception
 81      */
 82     @SuppressWarnings("unchecked")
 83     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
 84         if (interfaceClass == null)
 85             throw new IllegalArgumentException("Interface class == null");
 86         if (!interfaceClass.isInterface())
 87             throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
 88         if (host == null || host.length() == 0)
 89             throw new IllegalArgumentException("Host == null!");
 90         if (port <= 0 || port > 65535)
 91             throw new IllegalArgumentException("Invalid port " + port);
 92         System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
 93         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass },
 94                 new InvocationHandler() {// 用動態代理的方法進行包裝,看起來是在調用一個方法,其實在內部經過socket通訊傳到服務器,並接收運行結果
 95                     public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
 96                         Socket socket = new Socket(host, port);
 97                         try {
 98                             ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
 99                             try {
100                                 output.writeUTF(method.getName());
101                                 output.writeObject(method.getParameterTypes());
102                                 output.writeObject(arguments);
103                                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
104                                 try {
105                                     Object result = input.readObject();
106                                     if (result instanceof Throwable) {
107                                         throw (Throwable) result;
108                                     }
109                                     return result;// 返回結果
110                                 } finally {
111                                     input.close();
112                                 }
113                             } finally {
114                                 output.close();
115                             }
116                         } finally {
117                             socket.close();
118                         }
119                     }
120                 });
121     }
122 }

 

2.3.4 服務提供者

RpcProvider.java

1 package com.loveincode.rpc;
2 
3 public class RpcProvider {
4     public static void main(String[] args) throws Exception {
5         HelloService service = new HelloServiceImpl();
6         RpcFramework.export(service, 1234);
7     }
8 }

 

2.3.5 服務消費者

RpcConsumer.java

package com.loveincode.rpc;

public class RpcConsumer {
    public static void main(String[] args) throws Exception {
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            String hello = service.hello("World " + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}

 

運行結果

執行RpcProvider 結果:

Export service com.loveincode.rpc.HelloServiceImpl on port 1234

 

執行RpcConsumer 結果:

Get remote service com.loveincode.rpc.HelloService from server 127.0.0.1:1234
hello, World 0
hello, World 1
hello, World 2
hello, World 3
hello, World 4
hello, World 5
hello, World 6
hello, World 7
hello, World 8
hello, World 9...
相關文章
相關標籤/搜索