Java 實現簡單的 RPC 框架

RPC 簡介

RPC,全稱爲 Remote Procedure Call,即遠程過程調用,它是一個計算機通訊協議。它容許像調用本地服務同樣調用遠程服務。它能夠有不一樣的實現方式,而不須要了解底層網絡技術的協議。 RPC 協議假定某些傳輸協議的存在,如 TCP 或 UDP,爲通訊程序之間攜帶信息數據。如 RMI(遠程方法調用)、Hessian、Http invoker 等。java

怎樣實現一個 RPC 框架

RPC 可以讓本地應用簡單、高效地調用服務器中的過程。它主要應用在分佈式系統。如 Hadoop 中的 IPC 組件。但怎樣實現一個 RPC 框架呢?
能夠從下面幾個方面思考:bash

  • 通訊模型:假設通訊的爲 A 機器與 B 機器,A 與 B 之間有通訊模型,在 Java 中通常基於 BIO 或 NIO。
  • 過程(服務)定位:使用給定的通訊方式,與肯定 IP 與端口及方法名稱肯定具體的過程或方法;
  • 遠程代理對象:本地調用的方法(服務)實際上是遠程方法的本地代理,所以可能須要一個遠程代理對象,對於 Java 而言,遠程代理對象能夠使用 Java 的動態對象實現,封裝了調用遠程方法調用;
  • 序列化,將對象名稱、方法名稱、參數等對象信息進行網絡傳輸須要轉換成二進制傳輸,這裏可能須要不一樣的序列化技術方案。如:protobuf,Arvo 等。

RPC 框架架構

RPC 架構分爲三部分:服務器

  • 服務提供者,運行在服務器端,提供服務接口定義與服務實現類。
  • 服務中心,運行在服務器端,負責將本地服務發佈成遠程服務,管理遠程服務,提供給服務消費者使用。
  • 服務消費者,運行在客戶端,經過遠程代理對象調用遠程服務。

RPC 框架的簡單實現

這裏我只介紹服務提供者和客戶端的實現方式。網絡

服務提供者

服務提供者 IHello 接口定義:架構

public interface IHello {
    String sayHello(String string);
}

服務提供者 IHello 接口實現:框架

public class HelloImpl implements IHello {
    @Override
    public String sayHello(String string) {
        return "Hello:" + string;
    }
}

服務端 RpcProxyServer 類:socket

public class RpcProxyServer {

    ExecutorService executorService = Executors.newCachedThreadPool();

    public void publisher(Object service, int port) {
        ServerSocket serverSocket = null;
        try {
            // 啓動 socket 服務
            serverSocket = new ServerSocket(port);
            while (true) {
                Socket socket = serverSocket.accept();
                executorService.execute(new ProcessorHandler(service, socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}  

服務端 RpcRequest 類:分佈式

public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 383378368319625542L;
    private String className;
    private String methodName;
    private Object[] params;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }

    @Override
    public String toString() {
        return "RpcRequest{" +
                "className='" + className + '\'' +
                ", methodName='" + methodName + '\'' +
                ", params=" + Arrays.toString(params) +
                '}';
    }
}

服務端 ProcessorHandler 類:ide

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

public class ProcessorHandler implements Runnable {
    Socket socket;
    Object service;

    public ProcessorHandler(Object service, Socket socket) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        System.out.println("begin processor handler!");
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Object restlt = invoke(rpcRequest);

            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(restlt);
            objectOutputStream.flush();

            objectInputStream.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Object[] args = request.getParams();
        Class<?>[] types = new Class[args.length];
        for (int i = 0; i < args.length; i++) {
            types[i] = args[i].getClass();
        }
        Method method = service.getClass().getMethod(request.getMethodName(), types);
        return method.invoke(service, args);
    }
}

服務端主類 RpcServerMain:oop

public class RpcServerMain {
    public static void main(String[] args) {
        IHello hello = new HelloImpl();
        RpcProxyServer rpcProxyServer = new RpcProxyServer();
        rpcProxyServer.publisher(hello, 8080);
        System.out.println(hello.sayHello("charles"));
    }
}

客戶端

客戶端 IHello 類:

public interface IHello {
    String sayHello(String string);
}

客戶端 RpcClientProxy 類:

public class RpcClientProxy {
    public <T> T clientProxy(Class<T> interfaceCls, String host, int port) {
        return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}

客戶端 RpcRequest 類:

public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 383378368319625542L;
    private String className;
    private String methodName;
    private Object[] params;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }

    @Override
    public String toString() {
        return "RpcRequest{" +
                "className='" + className + '\'' +
                ", methodName='" + methodName + '\'' +
                ", params=" + Arrays.toString(params) +
                '}';
    }
}

客戶端 RpcNetTransport 類:

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.net.Socket;

public class RpcNetTransport {

    String host;
    int port;

    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    private Socket createSocket() {
        System.out.println("Begin create socket connect!");
        Socket socket = null;

        try {
            socket = new Socket(host, port);
        } catch (Exception e) {
            throw new RuntimeException("build connect failed.");
        }
        return socket;
    }

    public Object send(RpcRequest rpcRequest) {
        Socket socket = null;


        try {
            socket = createSocket();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();

            // 返回結果接收

            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            Object resultObject = objectInputStream.readObject();// 反序列化 對象
            objectInputStream.close();
            objectOutputStream.close();

            return resultObject;
        } catch (Exception e) {
            throw new RuntimeException("send request exception:" + e);
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

客戶端 RemoteInvocationHandler 類:

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class RemoteInvocationHandler implements InvocationHandler {
    String host;
    int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParams(args);

        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        return rpcNetTransport.send(rpcRequest);
    }
}

客戶端主類 RpcClientMain:

public class RpcClientMain {
    public static void main(String[] args) {
        RpcClientProxy rpcClientProxy = new RpcClientProxy();
        IHello hello = rpcClientProxy.clientProxy(IHello.class, "localhost", 8080);
        System.out.println(hello.sayHello("charles"));
    }
}

項目啓動後客戶端向服務端發送了一條消息,分別運行兩個項目後輸出結果以下

服務端:

begin processor handler!

客戶端:

Begin create socket connect!
Hello:charles

總結

RPC 本質爲消息處理模型,RPC 屏蔽了底層不一樣主機間的通訊細節,讓進程調用遠程的服務就像是本地的服務同樣。

相關文章
相關標籤/搜索