【分佈式】RPC初探

事先聲明:本文代碼參考自Dubbo做者的博客

RPC(Remote Procedure Call)遠程過程調用,是分佈式系統當中必不可少的一個玩意。好比說在單機系統當中,我想調用某個方法,直接調就能夠了對吧,可是當環境變成多機分佈式系統時,A機器上想調用B機器上的某個方法時,就須要用到RPC了。RPC的原理在知乎這個問答上有很清楚的解釋。html

簡單點來講,就是客戶端利用了socket把但願調用的方法的信息(方法名、方法須要的參數等)傳給服務器端,服務器端把這些信息反序列化以後利用反射去調用指定的方法,並把返回值再經過socket返回給客戶端。下面是代碼示例,關鍵部分我寫了本身理解的註釋。java

代碼主要用到了socket通訊和JDK的動態代理,這兩部分我在以前的博客中也都有涉及。服務器

RPCServer.java

public class RPCServer {
    private static final int PORT = 8000;
    /**
     * 暴露服務
     *
     * @param service 服務的對象實例
     * */
    public static void open(final Object service) throws Exception {
        if (service == null) {
            throw new IllegalArgumentException();
        }
        System.out.println("Service is opening for " + service.getClass().getName() + " at port: " + PORT);
        //開啓ServerSocket監聽8000端口
        final ServerSocket server = new ServerSocket(PORT);
        for (;;) {
            try {
                //接收到一個客戶端請求
                final Socket client = server.accept();
                //開一個線程處理
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                            try {
                                ObjectInputStream input = new ObjectInputStream(client.getInputStream());
                                try {
                                    String methodName = input.readUTF();
                                    System.out.println(">>>>methodName: " + methodName);
                                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                    Object[] arguments = (Object[]) input.readObject();
                                    System.out.println(">>>>arguments: " + arguments.toString());
                                    ObjectOutputStream out = new ObjectOutputStream(client.getOutputStream());
                                    try {
                                        //利用反射獲取到方法對象
                                        Method method = service.getClass().getMethod(methodName, parameterTypes);
                                        //調用方法並獲取返回值
                                        Object result = method.invoke(service, arguments);
                                        //把返回值寫入socket,返回給客戶端
                                        out.writeObject(result);
                                        //                                out.flush();
                                    } catch (Throwable t) {
                                        out.writeObject(t);
                                    } finally {
                                        out.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 指定在遠程主機上的服務
     *
     * @param <T> 接口泛型
     * @param interfaceClass 接口
     * @param host 遠程主機IP
    * */

    @SuppressWarnings("unchecked")
    public static <T> T refer(final Class<T> interfaceClass, final String host) {
        if (interfaceClass == null) {
            throw new IllegalArgumentException("invalid interface");
        }
        if (host == null || "".equals(host)) {
            throw new IllegalArgumentException("invalid host");
        }
        System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + PORT);
        //動態代理返回對象實例,而且利用泛型轉成服務類型
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = new Socket(host, PORT);
                        try {
                            ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                //發送方法名
                                out.writeUTF(method.getName());
                                //發生方法參數列表
                                out.writeObject(method.getParameterTypes());
                                //發生方法須要的參數
                                out.writeObject(args);
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                try {
                                    //獲取返回值
                                    Object result = input.readObject();
                                    if (result instanceof Throwable) {
                                        throw (Throwable) result;
                                    }
                                    return result;
                                }finally {
                                    input.close();
                                }
                            }finally {
                                out.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
    }
}

接口 HelloService.java

public interface HelloService {
    public String show(String name);
}

接口實現 HelloServiceImpl.java

public class HelloServiceImpl implements HelloService {
    @Override
    public String show(String name) {
        System.out.println(name);
        return "name: " + name;
    }
}

測試:

服務端測試代碼 ServerTest.java

public class ServerTest {
    public static void main(String[] args) throws Exception {
        HelloService helloService = new HelloServiceImpl();
        //開啓RPC服務,而且綁定一個對象實例,指定服務器上的服務類型
        RPCServer.open(helloService);
    }
}

客戶端測試代碼 ClientTest.java併發

public class ClientTest {
    public static void main(String[] args) {
        try {
            //調用指定IP的遠程主機上的指定服務
            HelloService service = RPCServer.refer(HelloService.class, "127.0.0.1");
            System.out.println(service.show("hello"));
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

結果以下:

服務端:socket

客戶端:分佈式

思考

關於這段示例代碼,有哪些改進的地方呢?首先我想到的是把TCP通訊模型改爲NIO通訊,不要用BIO這種低併發的模型;其次是傳輸的信息能夠用其餘方式進行壓縮或者叫序列化,減小傳輸的大小從而下降服務器壓力和提升傳輸速度;還有就是這段代碼使用的動態代理是JDK自帶的方法,有個很大的缺點是必需要接口,以前的文章也提到了,能夠採用CGlib來改善一下。目前能想到的就這三點了,找時間我再來完善一下。ide

同時也能夠去看看Dubbo源碼。測試

相關文章
相關標籤/搜索