上一篇文章JAVA NIO 編程入門(二)學習了NIO的彙集和分散,以及選擇器的使用,並經過一個小demo進行實戰,演示瞭如何進行分散和彙集,以及其主要使用場景,本文將是NIO編程入門最後一篇,進行一個RPC簡單小demo框架的實現,對前面的知識進行總結性的實戰,因爲只是演示性質的demo,因此RPC功能並無考慮很完善,也不涉及到性能等問題考慮。編程
RPC 英文全稱 Remote Procedure Calls,翻譯過來就是遠程過程調用,是分佈式系統中不一樣節點間流行的通訊方式。舉例:假設有A服務和B服務分別位於不一樣的服務器,A服務想調用B服務像調用本地方法同樣,這個時候就須要藉助RPC方式進行調用。json
RPC由三個主要部分組成,服務提供者,服務消費者,服務註冊中心,服務註冊中心提供服務提供者註冊服務。客戶端和服務端的交互協議採用json的形式,方便演示,同時考慮到複雜性,本次RPC不利用匯集和分散進行協議設計。服務器
@Data public class RpcRquest { /**請求id*/ private String requestId; /**請求接口名*/ private String interfaceName; /**服務版本**/ private String serviceVersion; /**方法名*/ private String methodName; /**參數類型*/ private Class<?>[] parameterTypes; /**參數*/ private Object[] parameters; }
@Data public class RpcResponse { /**請求流水號*/ private String requestId; /**異常*/ private Exception exception; /**返回結果**/ private Object result; }
public class RpcRegister { /**存儲註冊的服務提供實現類*/ private HashMap<String, Object> registMap = new HashMap<>(); private static RpcRegister register=new RpcRegister(); public static RpcRegister buildRegist(){ return register; } public RpcRegister regist(String interfaceName,Object obj){ registMap.put(interfaceName,obj); return this; } public Object findServier(String interfaceName){ return registMap.get(interfaceName); } }
這裏利用一個map存提供服務的實例,後續再在服務端只須要經過接口就能夠查找到對應的實現類。框架
public class ProviderServer implements Runnable { /** * 服務提供端口 */ private int port; public ProviderServer(int port) { this.port = port; } @Override public void run() { try { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int readyChannels = selector.selectNow(); if (readyChannels == 0) continue; Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = (SelectionKey) keyIterator.next(); if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel1.accept(); ByteBuffer buf1 = ByteBuffer.allocate(2048); socketChannel.read(buf1); buf1.flip(); String reciveStr = new String(buf1.array()); if (buf1.hasRemaining()) { System.out.println(">>>服務端收到數據:" + reciveStr); //判斷接受的內容是否有結束符,若是有,說明是一個請求結束。 if (reciveStr.contains(RpcConstant.PROTOCOL_END)) { RpcRquest req = JSONObject.parseObject(reciveStr.replace(RpcConstant.PROTOCOL_END, ""), RpcRquest.class); RpcResponse res = new RpcResponse(); res.setRequestId(req.getRequestId()); System.out.println(req.toString()); Class<?> remoteInterface = Class.forName(req.getInterfaceName()); Method method = remoteInterface.getMethod(req.getMethodName(), req.getParameterTypes()); if (null != method) { Object obj = method.invoke(RpcRegister.buildRegist().findServier(req.getInterfaceName()), req.getParameters()); res.setException(null); res.setResult(obj); } buf1.clear(); buf1.put(JSONObject.toJSON(res).toString().getBytes()); buf1.flip(); socketChannel.write(buf1); } } socketChannel.close(); } else if (key.isConnectable()) { } else if (key.isReadable()) { } else if (key.isWritable()) { } keyIterator.remove(); } } } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } } }
這部分源碼在《JAVA NIO 編程入門(二)》的基礎上增長了反射的部份內容,主要根據接口調用協議,生成客戶端須要調用的方法,進行調用,而後將結果返回。dom
public class RpcInitFactory { /** * 客戶端鏈接遠程ip地址 **/ private String ip; /***遠程端口*/ private int port; public RpcInitFactory(String ip, int port) { this.ip = ip; this.port = port; } }
@Data public class CommonClient { private RpcInitFactory factory; public CommonClient(RpcInitFactory factory) { this.factory = factory; } public <T> T invoke(RpcRquest req) { RpcResponse response = null; req.setRequestId(UUID.randomUUID().toString()); try { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(factory.getIp(), factory.getPort())); ByteBuffer buf1 = ByteBuffer.allocate(2048); buf1.put(JSONObject.toJSON(req).toString().getBytes()); buf1.put(RpcConstant.PROTOCOL_END.getBytes()); buf1.flip(); if (buf1.hasRemaining()) socketChannel.write(buf1); buf1.clear(); ByteBuffer body = ByteBuffer.allocate(2048); socketChannel.read(body); body.flip(); if (body.hasRemaining()) { response = JSONObject.parseObject(new String(body.array()), RpcResponse.class); } body.clear(); socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } return (T) response; }
public interface Idemo { /***加法**/ public Integer add(Integer i,Integer j); }
public class DemoRemoteImpl implements Idemo { private CommonClient client; public DemoRemoteImpl(CommonClient client) { this.client = client; } @Override public Integer add(Integer i, Integer j) { //構造rpc請求實體類 RpcRquest rpcRquest=new RpcRquest(); //設置版本號 rpcRquest.setServiceVersion("123"); //設置調用的接口名稱 rpcRquest.setInterfaceName(Idemo.class.getName()); //設置調用方法名稱 rpcRquest.setMethodName("add"); //設置參數 rpcRquest.setParameters(new Integer[] {i,j}); //設置參數類型 rpcRquest.setParameterTypes(new Class[] {Integer.class,Integer.class}); //進行遠程調用 RpcResponse response= client.invoke(rpcRquest); if (null!=response){ return Integer.parseInt(response.getResult().toString()); } return null; } }
public class DemoImp implements Idemo{ @Override public Integer add(Integer i, Integer j) { return i+j; } }
測試socket
public static void main(String[] args) { ProviderServer server = new ProviderServer(8090); RpcRegister.buildRegist().regist(Idemo.class.getName(), new DemoImp()); new Thread(server).start(); }
public static void main(String[] args) { RpcInitFactory initFactory= new RpcInitFactory("127.0.0.1",8090); Idemo demo = new DemoRemoteImpl(new CommonClient(initFactory)); System.out.println(demo.add(2, 1)); }
到這裏RPC的小demo功能實現完畢,實際上的RPC框架要比這個複雜的多,真正的RPC框架要考慮性能,高可用,半包,粘包等問題,這裏只是給出了一個RPC框架的實現原理,便於理解RPC框架的實現,並不能真正用於生產環境。分佈式
《Java鎖之ReentrantLock(二)》post