RPC是目前被普遍應用於互聯網服務的一項技術,關於它的基本介紹你們可經過百度瞭解一下,此處再也不贅述。 正所謂讀萬卷書不如行萬里路,原理性的文章看的再多都不如親自實現一遍RPC,方可對其瞭解的更加透徹。 本文將以純技術視角,爲你們演示一下RPC的工做原理與實現方案。java
正式開始以前,先羅列一下實現RPC須要運用到的技術點:算法
在具體實現上除了通訊部分咱們選用smart-socket來輔助,其他包括序列化/反序列化、反射、動態代理等部分咱們將採用JDK提供的解決方案,待您掌握RPC後可再嘗試結合第三方技術來重構RPC。數組
既然RPC是跨網絡通訊服務,那咱們先制定通訊規則,該部分的內容涉及到通訊、序列化/反序列化技術。網絡
通訊協議咱們採用最簡單的length+data模式,編解碼的實現算法以下。 做爲示例咱們假設readBuffer足夠容納一個完整的消息,協議中的data部分即是RPC服務序列化後的byte數組,Provider/Consumer則必須對byte數組完成反序列化後才能繼續RPC服務處理。session
public class RpcProtocol implements Protocol<byte[]> { private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE; @Override public byte[] decode(ByteBuffer readBuffer, AioSession<byte[]> session, boolean eof) { int remaining = readBuffer.remaining(); if (remaining < INTEGER_BYTES) { return null; } int messageSize = readBuffer.getInt(readBuffer.position()); if (messageSize > remaining) { return null; } byte[] data = new byte[messageSize - INTEGER_BYTES]; readBuffer.getInt(); readBuffer.get(data); return data; } @Override public ByteBuffer encode(byte[] msg, AioSession<byte[]> session) { ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length + INTEGER_BYTES); byteBuffer.putInt(byteBuffer.capacity()); byteBuffer.put(msg); byteBuffer.flip(); return byteBuffer; } }
RPC請求消息由Consumer發送,Consumer須要在請求消息中提供足夠信息以供Provider準確識別服務接口。核心要素包括:dom
public class RpcRequest implements Serializable { /** * 消息的惟一標識 */ private final String uuid = UUID.randomUUID().toString(); /** * 接口名稱 */ private String interfaceClass; /** * 調用方法 */ private String method; /** * 參數類型字符串 */ private String[] paramClassList; /** * 入參 */ private Object[] params; getX/setX() }
RPC響應消息爲Provider將接口執行結果響應給Consumer的載體。socket
public class RpcResponse implements Serializable { /** * 消息的惟一標示,與對應的RpcRequest uuid值相同 */ private String uuid; /** * 返回對象 */ private Object returnObject; /** * 返回對象類型 */ private String returnType; /** * 異常 */ private String exception; public RpcResponse(String uuid) { this.uuid = uuid; } getX/setX() }
經過上述內容便完成RPC通訊的消息設計,至於RpcRequest、RpcResponse如何轉化爲通訊協議要求的byte數組格式,咱們採用JDK提供的序列化方式(生產環境不建議使用)。ide
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(request); aioSession.write(byteArrayOutputStream.toByteArray());
ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcResponse resp = (RpcResponse) objectInput.readObject();
經過上文方案咱們解決了RPC的通訊問題,接下來便得根據通訊消息實現服務能力。測試
因爲RPC的Consumer端只有接口,沒有具體實現,但在使用上咱們又指望跟本地服務有一樣的使用體驗。 所以咱們須要將接口實例化成對象,並使其具有跨應用服務能力,此處便運用到動態代理。 當Consumer調用RPC接口時,代理類內部發送請求消息至Provider並獲取結果。ui
obj = (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{remoteInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest req = new RpcRequest(); req.setInterfaceClass(remoteInterface.getName()); req.setMethod(method.getName()); Class<?>[] types = method.getParameterTypes(); if (!ArrayUtils.isEmpty(types)) { String[] paramClass = new String[types.length]; for (int i = 0; i < types.length; i++) { paramClass[i] = types[i].getName(); } req.setParamClassList(paramClass); } req.setParams(args); RpcResponse rmiResp = sendRpcRequest(req); if (StringUtils.isNotBlank(rmiResp.getException())) { throw new RuntimeException(rmiResp.getException()); } return rmiResp.getReturnObject(); } });
Provider可將其提供的RPC服務維護在集合裏,採用Map存儲便可,key爲暴露的接口名,value爲接口的具體實現。 一旦Provider接受到RPC的請求消息,只需根據請求消息內容找到並執行對應的服務,最後將返回結果以消息的形式返回至Consumer便可。
ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcRequest req = (RpcRequest) objectInput.readObject(); RpcResponse resp = new RpcResponse(req.getUuid()); try { String[] paramClassList = req.getParamClassList(); Object[] paramObjList = req.getParams(); // 獲取入參類型 Class<?>[] classArray = null; if (paramClassList != null) { classArray = new Class[paramClassList.length]; for (int i = 0; i < classArray.length; i++) { Class<?> clazz = primitiveClass.get(paramClassList[i]); if (clazz == null) { classArray[i] = Class.forName(paramClassList[i]); } else { classArray[i] = clazz; } } } // 調用接口 Object impObj = impMap.get(req.getInterfaceClass()); if (impObj == null) { throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass()); } Method method = impObj.getClass().getMethod(req.getMethod(), classArray); Object obj = method.invoke(impObj, paramObjList); resp.setReturnObject(obj); resp.setReturnType(method.getReturnType().getName()); } catch (InvocationTargetException e) { LOGGER.error(e.getMessage(), e); resp.setException(e.getTargetException().getMessage()); } catch (Exception e) { LOGGER.error(e.getMessage(), e); resp.setException(e.getMessage()); } ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(resp); session.write(byteArrayOutputStream.toByteArray());
服務端定義接口DemoApi
,並將其實現示例DemoApiImpl
註冊至Provider中。
public class Provider { public static void main(String[] args) throws IOException { RpcProviderProcessor rpcProviderProcessor = new RpcProviderProcessor(); AioQuickServer<byte[]> server = new AioQuickServer<>(8888, new RpcProtocol(), rpcProviderProcessor); server.start(); rpcProviderProcessor.publishService(DemoApi.class, new DemoApiImpl()); } }
Consumer調用RPC接口test
、sum
得到執行結果。
public class Consumer { public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { RpcConsumerProcessor rpcConsumerProcessor = new RpcConsumerProcessor(); AioQuickClient<byte[]> consumer = new AioQuickClient<>("localhost", 8888, new RpcProtocol(), rpcConsumerProcessor); consumer.start(); DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class); System.out.println(demoApi.test("smart-socket")); System.out.println(demoApi.sum(1, 2)); } }
本文簡要描述了RPC服務實現的關鍵部分,可是提供穩定可靠的RPC服務還有不少細節須要考慮,有興趣的朋友可自行研究。 本文示例的完整代碼可從smart-socket項目中獲取。