smart-socket實現RPC

smart-socket實現RPC

RPC是目前被普遍應用於互聯網服務的一項技術,關於它的基本介紹你們可經過百度瞭解一下,此處再也不贅述。 正所謂讀萬卷書不如行萬里路,原理性的文章看的再多都不如親自實現一遍RPC,方可對其瞭解的更加透徹。 本文將以純技術視角,爲你們演示一下RPC的工做原理與實現方案。java

正式開始以前,先羅列一下實現RPC須要運用到的技術點:算法

  1. 通訊
  2. 序列化/反序列化
  3. 反射
  4. 動態代理

在具體實現上除了通訊部分咱們選用smart-socket來輔助,其他包括序列化/反序列化、反射、動態代理等部分咱們將採用JDK提供的解決方案,待您掌握RPC後可再嘗試結合第三方技術來重構RPC。數組

名詞解釋

  • Provider
    RPC服務提供者
  • Consumer
    RPC服務調用者

消息通訊

既然RPC是跨網絡通訊服務,那咱們先制定通訊規則,該部分的內容涉及到通訊、序列化/反序列化技術。網絡

通訊協議

通訊協議咱們採用最簡單的length+data模式,編解碼的實現算法以下。 做爲示例咱們假設readBuffer足夠容納一個完整的消息,協議中的data部分即是RPC服務序列化後的byte數組,Provider/Consumer則必須對byte數組完成反序列化後才能繼續RPC服務處理。session

public class RpcProtocol implements Protocol<byte[]> {
    private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE;

    @Override
    public byte[] decode(ByteBuffer readBuffer, AioSession<byte[]> session, boolean eof) {
        int remaining = readBuffer.remaining();
        if (remaining < INTEGER_BYTES) {
            return null;
        }
        int messageSize = readBuffer.getInt(readBuffer.position());
        if (messageSize > remaining) {
            return null;
        }
        byte[] data = new byte[messageSize - INTEGER_BYTES];
        readBuffer.getInt();
        readBuffer.get(data);
        return data;
    }

    @Override
    public ByteBuffer encode(byte[] msg, AioSession<byte[]> session) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length + INTEGER_BYTES);
        byteBuffer.putInt(byteBuffer.capacity());
        byteBuffer.put(msg);
        byteBuffer.flip();
        return byteBuffer;
    }
}

RPC請求消息

RPC請求消息由Consumer發送,Consumer須要在請求消息中提供足夠信息以供Provider準確識別服務接口。核心要素包括:dom

  1. uuid
    請求消息惟一標識,用於關聯、識別響應消息。
  2. interfaceClass
    Consumer要調用的API接口名
  3. method
    Consumer要執行的API接口方法名
  4. paramClassList
    Consumer調用的方法入參類型,用於區分同方法名不一樣入參的狀況
  5. params
    Consumer執行方法傳入的參數值
public class RpcRequest implements Serializable {

    /**
     * 消息的惟一標識
     */
    private final String uuid = UUID.randomUUID().toString();

    /**
     * 接口名稱
     */
    private String interfaceClass;

    /**
     * 調用方法
     */
    private String method;

    /**
     * 參數類型字符串
     */
    private String[] paramClassList;

    /**
     * 入參
     */
    private Object[] params;

    getX/setX()
}

RPC響應消息

RPC響應消息爲Provider將接口執行結果響應給Consumer的載體。socket

  • uuid
    與RPC請求消息同值
  • returnObject
    RPC接口執行返回值
  • returnType
    RPC接口返回值類型
  • exception
    RPC執行異常信息,若是出現異常的話。
public class RpcResponse implements Serializable {
    /**
     * 消息的惟一標示,與對應的RpcRequest uuid值相同
     */
    private String uuid;
    /**
     * 返回對象
     */
    private Object returnObject;

    /**
     * 返回對象類型
     */
    private String returnType;

    /**
     * 異常
     */
    private String exception;
    

    public RpcResponse(String uuid) {
        this.uuid = uuid;
    }

   getX/setX()

}

經過上述內容便完成RPC通訊的消息設計,至於RpcRequest、RpcResponse如何轉化爲通訊協議要求的byte數組格式,咱們採用JDK提供的序列化方式(生產環境不建議使用)。ide

  • 序列化
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream);
objectOutput.writeObject(request);
aioSession.write(byteArrayOutputStream.toByteArray());
  • 反序列化
ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
RpcResponse resp = (RpcResponse) objectInput.readObject();

RPC服務實現

經過上文方案咱們解決了RPC的通訊問題,接下來便得根據通訊消息實現服務能力。測試

Consumer

因爲RPC的Consumer端只有接口,沒有具體實現,但在使用上咱們又指望跟本地服務有一樣的使用體驗。 所以咱們須要將接口實例化成對象,並使其具有跨應用服務能力,此處便運用到動態代理。 當Consumer調用RPC接口時,代理類內部發送請求消息至Provider並獲取結果。ui

obj = (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{remoteInterface},
        new InvocationHandler() {

            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                RpcRequest req = new RpcRequest();
                req.setInterfaceClass(remoteInterface.getName());
                req.setMethod(method.getName());
                Class<?>[] types = method.getParameterTypes();
                if (!ArrayUtils.isEmpty(types)) {
                    String[] paramClass = new String[types.length];
                    for (int i = 0; i < types.length; i++) {
                        paramClass[i] = types[i].getName();
                    }
                    req.setParamClassList(paramClass);
                }
                req.setParams(args);

                RpcResponse rmiResp = sendRpcRequest(req);
                if (StringUtils.isNotBlank(rmiResp.getException())) {
                    throw new RuntimeException(rmiResp.getException());
                }
                return rmiResp.getReturnObject();
            }
        });

Provider

Provider可將其提供的RPC服務維護在集合裏,採用Map存儲便可,key爲暴露的接口名,value爲接口的具體實現。 一旦Provider接受到RPC的請求消息,只需根據請求消息內容找到並執行對應的服務,最後將返回結果以消息的形式返回至Consumer便可。

ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
RpcRequest req = (RpcRequest) objectInput.readObject();

RpcResponse resp = new RpcResponse(req.getUuid());
try {
    String[] paramClassList = req.getParamClassList();
    Object[] paramObjList = req.getParams();
    // 獲取入參類型
    Class<?>[] classArray = null;
    if (paramClassList != null) {
        classArray = new Class[paramClassList.length];
        for (int i = 0; i < classArray.length; i++) {
            Class<?> clazz = primitiveClass.get(paramClassList[i]);
            if (clazz == null) {
                classArray[i] = Class.forName(paramClassList[i]);
            } else {
                classArray[i] = clazz;
            }
        }
    }
    // 調用接口
    Object impObj = impMap.get(req.getInterfaceClass());
    if (impObj == null) {
        throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass());
    }
    Method method = impObj.getClass().getMethod(req.getMethod(), classArray);
    Object obj = method.invoke(impObj, paramObjList);
    resp.setReturnObject(obj);
    resp.setReturnType(method.getReturnType().getName());
} catch (InvocationTargetException e) {
    LOGGER.error(e.getMessage(), e);
    resp.setException(e.getTargetException().getMessage());
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
    resp.setException(e.getMessage());
}
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
objectOutput = new ObjectOutputStream(byteArrayOutputStream);
objectOutput.writeObject(resp);
session.write(byteArrayOutputStream.toByteArray());

測試RPC服務

服務端定義接口DemoApi,並將其實現示例DemoApiImpl註冊至Provider中。

public class Provider {
    public static void main(String[] args) throws IOException {
        RpcProviderProcessor rpcProviderProcessor = new RpcProviderProcessor();
        AioQuickServer<byte[]> server = new AioQuickServer<>(8888, new RpcProtocol(), rpcProviderProcessor);
        server.start();

        rpcProviderProcessor.publishService(DemoApi.class, new DemoApiImpl());
    }
}

Consumer調用RPC接口testsum得到執行結果。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {

        RpcConsumerProcessor rpcConsumerProcessor = new RpcConsumerProcessor();
        AioQuickClient<byte[]> consumer = new AioQuickClient<>("localhost", 8888, new RpcProtocol(), rpcConsumerProcessor);
        consumer.start();

        DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class);
        System.out.println(demoApi.test("smart-socket"));
        System.out.println(demoApi.sum(1, 2));
    }

}

總結

本文簡要描述了RPC服務實現的關鍵部分,可是提供穩定可靠的RPC服務還有不少細節須要考慮,有興趣的朋友可自行研究。 本文示例的完整代碼可從smart-socket項目中獲取。

相關文章
相關標籤/搜索