十分鐘寫個RPC框架

互聯網時代,各類RPC框架盛行,細看各類框架,應用層面有各類變化,可是萬變不離其宗,RPC最核心的部分基本都是一致的。
那就是跟遠程的服務器進行通訊,像調用本地服務同樣調用遠程服務。
而後在這些基礎上可能會附加一些諸如,服務自動註冊和發現,負載均衡,就近路由,調用鏈路記錄,遠程mock等等功能。
今天想給你們分享的是,若是不考慮性能,API易用性,服務發現,負載均衡,環境隔離等其餘因素,其實作一個基本功能的RPC框架,幾分鐘就能搞定。
我的認爲RPC功能的基石有下面幾個 1、序列化協議 2、遠程通訊報文協議 3、協議報文處理實現。
序列化協議JDK爲咱們提供了一套自帶的序列化協議,雖然不能跨語言,壓縮率不高,不過咱們只爲展現RPC技術,沒有考慮其餘,若是以爲很差可使用Hessian ,protobuf,甚至是諸如fastjson這些json序列化的開源框架。
遠程通訊JDK有一套socket和輸入輸出流,雖然是同步的,性能不怎麼樣,可是隻爲展現RPC技術原理,不考慮其餘,爲了性能和吞吐量咱們能夠選擇netty進行改造。
通訊協議,我只作了一個簡單的 MAGIC_NUM+兩個字節報文長+序列化對象字節流 的協議,協議上能夠增長不少東西,好比版本號,心跳包,狀態碼,可擴展的報文頭等等,不過一樣的,這裏只爲展現RPC原理,不考慮其餘的。
協議報文處理部分只是經過報文體裏面攜帶的 類名 方法名,方法參數,作了個簡單的反射處理,這部分其實能夠擴展的部分不少,好比預先作方法緩存,方法簽名使用短命名註冊等等,或者想更快還能經過字節碼注入的方式自動生成一些模板代碼的方式,將反射變成直接的方法調用。

下面直接展現代碼吧
首先是傳輸的對象都是java可序列化對象:java

public class RpcCommand implements Serializable{
    String className ;
    String methodName ;
    String[] argumetsType ;

    Object[] params ;
    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public String[] getArgumetsType() {
        return argumetsType;
    }

    public void setArgumetsType(String[] argumetsType) {
        this.argumetsType = argumetsType;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }


}
public class RpcResponse implements Serializable{
    boolean isException;
    Object result ;
    Exception exception;
    public boolean isException() {
        return isException;
    }

    public void setException(boolean exception) {
        isException = exception;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }

    public Exception getException() {
        return exception;
    }

    public void setException(Exception exception) {
        this.exception = exception;
    }

}

其次是請求對象的處理部分git

package com.shock.rpc;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class RpcHandler {

    ConcurrentHashMap<String, Object> registered = new ConcurrentHashMap<String, Object>(128);

    public RpcResponse handler(RpcCommand commond) {
        String className = commond.getClassName();
        RpcResponse response = new RpcResponse();
        try {
            Object obj = registered.get(className);
            String[] argTypes = commond.getArgumetsType();
            Class aClass = Class.forName(className);
            List<Class> argsTypeList = new ArrayList<Class>(argTypes.length);
            for (String s : argTypes) {
                argsTypeList.add(Class.forName(s));
            }
            Method method = aClass.getMethod(commond.getMethodName(),
                argsTypeList.toArray(new Class[argsTypeList.size()]));
            Object object = method.invoke(obj, commond.getParams());
            response.setResult(object);
        } catch (Exception e) {
            e.printStackTrace();
            response.setException(true);
            response.setException(e);
        }
        return response;
    }

    public void regist(Class interfa, Object object) {
        registered.put(interfa.getName(), object);
    }
}

代碼裏面只有很粗暴的反射實現

第三個是服務端啓動和服務端協議處理代碼github

package com.shock.rpc;

import com.shock.rpc.demo.IDemoImpl;
import com.shock.rpc.demo.IDemoInterface;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class RpcServer {
    int port;

    public RpcServer(int port, RpcHandler handler) {
        this.port = port;
        this.handler = handler;
    }

    RpcHandler      handler;

    ExecutorService executorService = Executors.newFixedThreadPool(20);

    public void start() {
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            while (true) {
                Socket socket = serverSocket.accept();
                executorService.submit(new WorkThread(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public class WorkThread implements Runnable {
        Socket socket;

        WorkThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();
                while (true) {
                    int magic = inputStream.read();
                    //魔數
                    if (magic == 0x5A) {
                        //兩個字節用來計算長度數據長度,服務傳送的數據過大可能會出現截斷問題
                        int length1 = inputStream.read();
                        int length2 = inputStream.read();
                        int length = (length1 << 8) + length2;
                        ByteArrayOutputStream bout = new ByteArrayOutputStream(length);
                        int sum = 0;
                        byte[] bs = new byte[length];
                        while (true) {
                            int readLength = inputStream.read(bs, 0, length - sum);
                            if (readLength > 0) {
                                bout.write(bs, 0, readLength);
                                sum += readLength;
                            }
                            if (sum >= length) {
                                break;
                            }
                        }
                        ObjectInputStream objectInputStream = new ObjectInputStream(
                            new ByteArrayInputStream(bout.toByteArray()));
                        try {
                            RpcCommand commond = (RpcCommand) objectInputStream.readObject();
                            RpcResponse response = handler.handler(commond);
                            ByteArrayOutputStream objectout = new ByteArrayOutputStream(length);
                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout);
                            objectOutputStream.writeObject(response);
                            objectOutputStream.flush();
                            byte[] commondBytes = objectout.toByteArray();
                            int len = commondBytes.length;
                            outputStream.write(0x5A);
                            outputStream.write(len >> 8);
                            outputStream.write(len & 0x00FF);
                            outputStream.write(commondBytes);
                            outputStream.flush();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("和客戶端鏈接斷開了");
            } finally {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        RpcHandler rpcHandler = new RpcHandler();
        rpcHandler.regist(IDemoInterface.class, new IDemoImpl());
        RpcServer servcer = new RpcServer(8081, rpcHandler);
        servcer.start();
    }
}

代碼實現也很簡單,就是根據前面說的傳輸報文協議讀取傳輸的報文,反序列化出請求對象RpcCommand,給處理類進行處理,若是作好兼容加上版本和不一樣協議的話,能夠增長不一樣的處理實現。
 json

最後是客戶端傳輸和協議處理代碼api

package com.shock.rpc;

import com.shock.rpc.demo.IDemoInterface;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class RpcClient {

    String       host;
    int          port;
    Socket       socket;
    InputStream  inputStream;
    OutputStream outputStream;

    public RpcClient(String host, int port) {
        try {
            socket = new Socket();
            socket.connect(new InetSocketAddress(host, port));
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //這個不能併發請求,不然會出現數據流亂的狀況
    public synchronized RpcResponse invoke(RpcCommand commond) {
        RpcResponse response = new RpcResponse();
        try {
            ByteArrayOutputStream objectout = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout);
            objectOutputStream.writeObject(commond);
            objectOutputStream.flush();
            byte[] commondBytes = objectout.toByteArray();
            outputStream.write(0x5A);
            int len = commondBytes.length;

            outputStream.write(len >> 8);
            outputStream.write(0x00FF & len);
            outputStream.write(commondBytes);
            outputStream.flush();
            while (true) {
                int magic = inputStream.read();
                if (magic == 0x5A) {
                    int length1 = inputStream.read();
                    int length2 = inputStream.read();
                    int length = (length1 << 8) + length2;
                    ByteArrayOutputStream bout = new ByteArrayOutputStream(length);
                    int sum = 0;
                    byte[] bs = new byte[length];
                    while (true) {
                        int readLength = inputStream.read(bs, 0, length - sum);
                        if (readLength > 0) {
                            bout.write(bs, 0, readLength);
                            sum += readLength;
                        }
                        if (sum >= length) {
                            break;
                        }
                    }
                    ObjectInputStream objectInputStream = new ObjectInputStream(
                        new ByteArrayInputStream(bout.toByteArray()));
                    RpcResponse response1 = (RpcResponse) objectInputStream.readObject();
                    return response1;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return response;
    }

    public static void main(String[] args) {
        RpcClient client = new RpcClient("localhost", 8081);
        RpcCommand command = new RpcCommand();
        command.setClassName(IDemoInterface.class.getName());
        command.setMethodName("noArgument");
        command.setArgumetsType(new String[0]);
        RpcResponse response = client.invoke(command);

        RpcCommand command2 = new RpcCommand();
        command2.setClassName(IDemoInterface.class.getName());
        command2.setMethodName("withReturn");
        command2.setArgumetsType(new String[] { "java.lang.String" });
        command2.setParams(new String[] { "shocklee" });
        RpcResponse response2 = client.invoke(command2);
        System.out.println(response.getResult());
        System.out.println(response2.getResult());
    }
}

至此整個框架部分已經完成,暫時尚未作常見的rpc客戶端api包裝,好比包裝成從某個容器裏面根據接口取出一個遠程對象,直接調用遠程對象的方法。
最後貼個測試類和接口緩存

package com.shock.rpc.demo;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.demo.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public interface IDemoInterface {

    public String withReturn(String name);
    public void noReturn(String name);
    public String noArgument();
}
package com.shock.rpc.demo;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.demo.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class IDemoImpl implements IDemoInterface {
    @Override
    public String withReturn(String name) {
        System.out.println("withReturn "+name);
        return "hello " + name;
    }

    @Override
    public void noReturn(String name) {
        System.out.println("noReturn "+name);
    }

    @Override
    public String noArgument() {
        System.out.println("noArgument");
        return "noArgument";
    }
}

整個RPC功能已經都貼出來了,代碼沒有作過整理,沒有把序列化/反序列代碼抽象,協議部分也沒作抽象,只是想寫的快點,可以在短期內寫出來和標題十分鐘對應上,因此可能代碼難看點,不過總體已經展現出來了,關鍵代碼是不須要使用任何第三方框架和工具包的。服務器

歡迎你們進行拍磚。

另外打個廣告吧,本人寫了一個稍微複雜點的RPC放在github上,有木有同窗想一塊兒進行寫着玩的的,趕忙約起啊,代碼地址是
https://github.com/shocklee6315/simpleRpcServer併發

相關文章
相關標籤/搜索