所謂RPC就是遠程方法調用(Remote Process Call ),簡單的來講就是經過MQ,TCP,HTTP或者本身寫的網絡協議來傳輸我要調用對方的什麼接口,對方處理以後再把結果返回給我.就這麼簡單的一個過程.在一個大型的項目以後基本上各模塊都是分開的,以提供服務的方式進行相互調用.若是可以提供智能負載均衡,可選擇的java對象編碼解碼協議,網絡傳輸協議,服務監控,服務版本控制等不少功能的話就是一個SOA架構了.java
前兩天實現了一個基於java Socket 實現的阻塞的RPC.其原理很是簡單數組
客戶端用一個TransportMessage類去包裝須要調用的接口,調用的方法,調用方法的參數類型,調用方法的參數值.服務器
客戶端用Socet鏈接服務端,序列化TransportMessage,傳輸給服務端.網絡
服務端循環接收請求,一旦受到請求就起一個線程扔到線程池去執行,執行的內容就是反序列化TransportMessage類,在servicePool池中獲取接口實現類,經過調用方法參數類型數組獲取Method對象.而後經過method.invoke去調用方法.架構
服務器端序列化結果,而後經過socket傳輸給客戶端.併發
客戶端收到結果,反序列化結果對象.負載均衡
具體代碼實現,(爲了節省篇幅,setter,getter就不放進來了):
socket
1.遠程調用信息封裝 TransportMessage.java函數
/** * @author Lubby * @date 2015年4月22日 下午1:06:18 * 遠程調用信息封裝. * 包括 1.調用接口名稱 (包名+接口名) 2.調用方法名 3.調用參數Class類型數組 4.調用接口的參數數組 */ public class TransportMessage implements Serializable { //包名+接口名稱 如com.lubby.rpc.service.MathService. private String interfaceName; //調用方法名 如 getSum private String methodName; //參數類型 按照接口參數順序 getSum(int a, int b, String name)方法就是int.class int.class String.class的數組 private Class[] paramsTypes; //參數 按照接口參數順序 getSum(int a, int b, String name)方法就是 1,3,"Tom"的數組 private Object[] parameters; public TransportMessage() { super(); // TODO Auto-generated constructor stub } public TransportMessage(String interfaceName, String methodName, Class[] paramsTypes, Object[] parameters) { super(); this.interfaceName = interfaceName; this.methodName = methodName; this.paramsTypes = paramsTypes; this.parameters = parameters; } }
2.客戶端調用遠程方法類 RPCClient.java
測試
public class RPCClient { // 服務端地址 private String serverAddress; // 服務端端口 private int serverPort; // 線程池大小 private int threadPoolSize = 10; // 線程池 private ExecutorService executorService = null; public RPCClient() { super(); // TODO Auto-generated constructor stub } /** * @param serverAddress * TPC服務地址 * @param serverPort * TPC服務端口 * */ public RPCClient(String serverAddress, int serverPort) { this.serverAddress = serverAddress; this.serverPort = serverPort; executorService = Executors.newFixedThreadPool(threadPoolSize); } /** * 同步的請求和接收結果 * * @param transportMessage * @return */ public Object sendAndReceive(TransportMessage transportMessage) { Object result = null; Socket socket = null; try { socket = new Socket(serverAddress, serverPort); //反序列化 TransportMessage對象 ObjectOutputStream objectOutpusStream = new ObjectOutputStream( socket.getOutputStream()); objectOutpusStream.writeObject(transportMessage); ObjectInputStream objectInputStream = new ObjectInputStream( socket.getInputStream()); //阻塞等待讀取結果並反序列化結果對象 result = objectInputStream.readObject(); socket.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); }finally{ try { //最後關閉socket socket.close(); } catch (IOException e) { e.printStackTrace(); } } return result; } }
3.服務器處理類 RPCServer.java
public class RPCServer { private int threadSize = 10; private ExecutorService threadPool; private Map<String, Object> servicePool; private int port = 4321; public RPCServer() { super(); synchronized (this) { threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * * @param threadSize * 內部處理線程池大小 * @param port * 當前TPC服務的端口號 * */ public RPCServer(int threadSize, int port) { this.threadSize = threadSize; this.port = port; synchronized (this) { threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * * * @param servicePool * 裝有service對象的Map, Key爲全限定接口名,Value爲接口實現類對象 * @param threadSize * 內部處理線程池大小 * @param port * 當前TPC服務的端口號 * */ public RPCServer(Map<String, Object> servicePool, int threadSize, int port) { this.threadSize = threadSize; this.servicePool = servicePool; this.port = port; synchronized (this) { threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * RPC服務端處理函數 監聽指定TPC端口,每次有請求過來的時候調用服務,放入線程池中處理. * * @throws IOException */ public void service() throws IOException { ServerSocket serverSocket = new ServerSocket(port); while (true) { Socket receiveSocket = serverSocket.accept(); final Socket socket = receiveSocket; threadPool.execute(new Runnable() { public void run() { try { process(socket); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (SecurityException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }); } } /** * 調用服務 經過TCP Socket返回結果對象 * * @param receiveSocket * 請求Socket * @throws IOException * @throws ClassNotFoundException * @throws NoSuchMethodException * @throws SecurityException * @throws IllegalAccessException * @throws IllegalArgumentException * @throws InvocationTargetException * @throws InstantiationException */ private void process(Socket receiveSocket) throws IOException, ClassNotFoundException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, InstantiationException { /* * try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO * Auto-generated catch block e.printStackTrace(); } */ ObjectInputStream objectinputStream = new ObjectInputStream( receiveSocket.getInputStream()); TransportMessage message = (TransportMessage) objectinputStream .readObject(); // 調用服務 Object result = call(message); ObjectOutputStream objectOutputStream = new ObjectOutputStream( receiveSocket.getOutputStream()); objectOutputStream.writeObject(result); objectinputStream.close(); objectOutputStream.close(); } /** * 服務處理函數 經過包名+接口名在servicePool中找到對應服務 經過調用方法參數類型數組獲取Method對象 * 經過Method.invoke(對象,參數)調用對應服務 * * @return * @throws ClassNotFoundException * @throws SecurityException * @throws NoSuchMethodException * @throws InvocationTargetException * @throws IllegalArgumentException * @throws IllegalAccessException * @throws InstantiationException */ private Object call(TransportMessage message) throws ClassNotFoundException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, InstantiationException { if (servicePool == null) { synchronized (this) { servicePool = new HashMap<String, Object>(); } } String interfaceName = message.getInterfaceName(); Object service = servicePool.get(interfaceName); Class<?> serviceClass = Class.forName(interfaceName); // 檢查servicePool中對象,若沒有着生產對象 if (service == null) { synchronized (this) { service = serviceClass.newInstance(); servicePool.put(interfaceName, service); } } Method method = serviceClass.getMethod(message.getMethodName(), message.getParamsTypes()); Object result = method.invoke(service, message.getParameters()); return result; } }
4.爲了方便測試寫了個接口和其實現類 MathService 和 MathServiceImpl
public interface MathService { public int getSum(int a, int b, String name); }
public class MathServiceImpl implements MathService { public int getSum(int a, int b, String name) { System.out.println(name); return a + b; } }
5.服務器端測試代碼
public class ServerTest { public static void main(String[] args){ Map<String,Object> servicePool = new HashMap<String, Object>(); servicePool.put("com.lubby.rpc.service.MathService", new MathServiceImpl()); RPCServer server = new RPCServer(servicePool,4, 4321); try { server.service(); } catch (IOException e) { e.printStackTrace(); } } }
6.客戶端測試代碼
public class ClientTest { public static void main(String[] args) { String serverAddress = "127.0.0.1"; int serverPort = 4321; final RPCClient client = new RPCClient(serverAddress, serverPort); final TransportMessage transportMessage = buildTransportMessage(); for (int i = 0; i < 1000; i++) { final int waitTime = i * 10; new Thread(new Runnable() { public void run() { Object result = client.sendAndReceive(transportMessage); System.out.println(result); } }).start(); } } private static TransportMessage buildTransportMessage() { String interfaceName = "com.lubby.rpc.service.MathService"; Class[] paramsTypes = { int.class, int.class, String.class }; Object[] parameters = { 1, 3, "Lubby" }; String methodName = "getSum"; TransportMessage transportMessage = new TransportMessage(interfaceName, methodName, paramsTypes, parameters); return transportMessage; } }
7.併發問題
因爲ServerSocket是阻塞的,因此在ServerSocket.accept()方法同一時刻只能有一個線程進入,雖然以後的處理都另起一個線程,可是有瓶頸的,
我在用400個線程併發鏈接服務端的時候基本沒問題,可是500個線程併發鏈接服務端的時候就會有部分線程鏈接不到服務器端.後面看下NIO回頭用NIO來改一下.