基於TCP的RPC簡單實現

所謂RPC就是遠程方法調用(Remote  Process Call ),簡單的來講就是經過MQ,TCP,HTTP或者本身寫的網絡協議來傳輸我要調用對方的什麼接口,對方處理以後再把結果返回給我.就這麼簡單的一個過程.在一個大型的項目以後基本上各模塊都是分開的,以提供服務的方式進行相互調用.若是可以提供智能負載均衡,可選擇的java對象編碼解碼協議,網絡傳輸協議,服務監控,服務版本控制等不少功能的話就是一個SOA架構了.java


前兩天實現了一個基於java Socket 實現的阻塞的RPC.其原理很是簡單數組

  1. 客戶端用一個TransportMessage類去包裝須要調用的接口,調用的方法,調用方法的參數類型,調用方法的參數值.服務器

  2. 客戶端用Socet鏈接服務端,序列化TransportMessage,傳輸給服務端.網絡

  3. 服務端循環接收請求,一旦受到請求就起一個線程扔到線程池去執行,執行的內容就是反序列化TransportMessage類,在servicePool池中獲取接口實現類,經過調用方法參數類型數組獲取Method對象.而後經過method.invoke去調用方法.架構

  4. 服務器端序列化結果,而後經過socket傳輸給客戶端.併發

  5. 客戶端收到結果,反序列化結果對象.負載均衡


具體代碼實現,(爲了節省篇幅,setter,getter就不放進來了):
socket

1.遠程調用信息封裝   TransportMessage.java函數

/**
 * @author Lubby
 * @date 2015年4月22日 下午1:06:18
 *	遠程調用信息封裝.
 *	包括    1.調用接口名稱  (包名+接口名)   2.調用方法名  3.調用參數Class類型數組  4.調用接口的參數數組
 */
public class TransportMessage implements Serializable {
	//包名+接口名稱  如com.lubby.rpc.service.MathService.
	private String interfaceName;
	//調用方法名   如 getSum
	private String methodName;
	//參數類型 按照接口參數順序  getSum(int a, int b, String name)方法就是int.class int.class String.class的數組
	private Class[] paramsTypes;
	//參數 按照接口參數順序 getSum(int a, int b, String name)方法就是 1,3,"Tom"的數組
	private Object[] parameters;

	public TransportMessage() {
		super();
		// TODO Auto-generated constructor stub
	}

	public TransportMessage(String interfaceName, String methodName,
			Class[] paramsTypes, Object[] parameters) {
		super();
		this.interfaceName = interfaceName;
		this.methodName = methodName;
		this.paramsTypes = paramsTypes;
		this.parameters = parameters;
	}

}

2.客戶端調用遠程方法類 RPCClient.java
測試

public class RPCClient {
	// 服務端地址
	private String serverAddress;
	// 服務端端口
	private int serverPort;
	// 線程池大小
	private int threadPoolSize = 10;
	// 線程池
	private ExecutorService executorService = null;

	public RPCClient() {
		super();
		// TODO Auto-generated constructor stub
	}

	/**
	 * @param serverAddress
	 *            TPC服務地址
	 * @param serverPort
	 *            TPC服務端口
	 * 
	 */
	public RPCClient(String serverAddress, int serverPort) {
		this.serverAddress = serverAddress;
		this.serverPort = serverPort;
		executorService = Executors.newFixedThreadPool(threadPoolSize);
	}

	/**
	 * 同步的請求和接收結果
	 * 
	 * @param transportMessage
	 * @return
	 */
	public Object sendAndReceive(TransportMessage transportMessage) {
		Object result = null;
		Socket socket = null;
		try {
			 socket = new Socket(serverAddress, serverPort);
			 
			 //反序列化 TransportMessage對象
			ObjectOutputStream objectOutpusStream = new ObjectOutputStream(
					socket.getOutputStream());
			objectOutpusStream.writeObject(transportMessage);

			ObjectInputStream objectInputStream = new ObjectInputStream(
					socket.getInputStream());
			//阻塞等待讀取結果並反序列化結果對象
			result = objectInputStream.readObject();
			socket.close();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}finally{
			try {
				//最後關閉socket
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return result;
	}
}

3.服務器處理類 RPCServer.java

public class RPCServer {
	private int threadSize = 10;
	private ExecutorService threadPool;
	private Map<String, Object> servicePool;
	private int port = 4321;

	public RPCServer() {
		super();
		synchronized (this) {
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}

	/**
	 * 
	 * @param threadSize
	 *            內部處理線程池大小
	 * @param port
	 *            當前TPC服務的端口號
	 * 
	 */

	public RPCServer(int threadSize, int port) {
		this.threadSize = threadSize;
		this.port = port;
		synchronized (this) {
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}

	/**
	 * 
	 * 
	 * @param servicePool
	 *            裝有service對象的Map, Key爲全限定接口名,Value爲接口實現類對象
	 * @param threadSize
	 *            內部處理線程池大小
	 * @param port
	 *            當前TPC服務的端口號
	 * 
	 */
	public RPCServer(Map<String, Object> servicePool, int threadSize, int port) {
		this.threadSize = threadSize;
		this.servicePool = servicePool;
		this.port = port;
		synchronized (this) {
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}

	/**
	 * RPC服務端處理函數 監聽指定TPC端口,每次有請求過來的時候調用服務,放入線程池中處理.
	 * 
	 * @throws IOException
	 */
	public void service() throws IOException {
		ServerSocket serverSocket = new ServerSocket(port);
		while (true) {
			Socket receiveSocket = serverSocket.accept();
			final Socket socket = receiveSocket;
			threadPool.execute(new Runnable() {

				public void run() {
					try {
						process(socket);

					} catch (ClassNotFoundException e) {
						e.printStackTrace();
					} catch (NoSuchMethodException e) {
						e.printStackTrace();
					} catch (SecurityException e) {
						e.printStackTrace();
					} catch (IllegalAccessException e) {
						e.printStackTrace();
					} catch (IllegalArgumentException e) {
						e.printStackTrace();
					} catch (InvocationTargetException e) {
						e.printStackTrace();
					} catch (InstantiationException e) {
						e.printStackTrace();
					} catch (IOException e) {
						e.printStackTrace();
					} finally {
						try {
							socket.close();
						} catch (IOException e) {
							e.printStackTrace();
						}
					}

				}
			});
		}

	}

	/**
	 * 調用服務 經過TCP Socket返回結果對象
	 * 
	 * @param receiveSocket
	 *            請求Socket
	 * @throws IOException
	 * @throws ClassNotFoundException
	 * @throws NoSuchMethodException
	 * @throws SecurityException
	 * @throws IllegalAccessException
	 * @throws IllegalArgumentException
	 * @throws InvocationTargetException
	 * @throws InstantiationException
	 */
	private void process(Socket receiveSocket) throws IOException,
			ClassNotFoundException, NoSuchMethodException, SecurityException,
			IllegalAccessException, IllegalArgumentException,
			InvocationTargetException, InstantiationException {

		/*
		 * try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO
		 * Auto-generated catch block e.printStackTrace(); }
		 */
		ObjectInputStream objectinputStream = new ObjectInputStream(
				receiveSocket.getInputStream());
		TransportMessage message = (TransportMessage) objectinputStream
				.readObject();

		// 調用服務
		Object result = call(message);

		ObjectOutputStream objectOutputStream = new ObjectOutputStream(
				receiveSocket.getOutputStream());
		objectOutputStream.writeObject(result);
		objectinputStream.close();
		objectOutputStream.close();
	}

	/**
	 * 服務處理函數 經過包名+接口名在servicePool中找到對應服務 經過調用方法參數類型數組獲取Method對象
	 * 經過Method.invoke(對象,參數)調用對應服務
	 * 
	 * @return
	 * @throws ClassNotFoundException
	 * @throws SecurityException
	 * @throws NoSuchMethodException
	 * @throws InvocationTargetException
	 * @throws IllegalArgumentException
	 * @throws IllegalAccessException
	 * @throws InstantiationException
	 */
	private Object call(TransportMessage message)
			throws ClassNotFoundException, NoSuchMethodException,
			SecurityException, IllegalAccessException,
			IllegalArgumentException, InvocationTargetException,
			InstantiationException {
		if (servicePool == null) {
			synchronized (this) {
				servicePool = new HashMap<String, Object>();
			}
		}
		String interfaceName = message.getInterfaceName();
		Object service = servicePool.get(interfaceName);
		Class<?> serviceClass = Class.forName(interfaceName);
		// 檢查servicePool中對象,若沒有着生產對象
		if (service == null) {
			synchronized (this) {
				service = serviceClass.newInstance();
				servicePool.put(interfaceName, service);
			}
		}
		Method method = serviceClass.getMethod(message.getMethodName(),
				message.getParamsTypes());
		Object result = method.invoke(service, message.getParameters());
		return result;

	}
}

4.爲了方便測試寫了個接口和其實現類 MathService 和 MathServiceImpl

public interface MathService {
	public int getSum(int a, int b, String name);
}
public class MathServiceImpl implements MathService {
	public int getSum(int a, int b, String name) {
		System.out.println(name);
		return a + b;
	}

}

5.服務器端測試代碼

public class ServerTest {
	
	public static void main(String[] args){
		Map<String,Object> servicePool = new  HashMap<String, Object>();
		servicePool.put("com.lubby.rpc.service.MathService", new MathServiceImpl());
		RPCServer server = new RPCServer(servicePool,4, 4321);
		try {
			server.service();
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}

}

6.客戶端測試代碼

public class ClientTest {
	public static void main(String[] args) {
		String serverAddress = "127.0.0.1";
		int serverPort = 4321;
		
		final RPCClient client = new RPCClient(serverAddress, serverPort);
		final TransportMessage transportMessage = buildTransportMessage();
		
		for (int i = 0; i < 1000; i++) {
			final int waitTime = i * 10;
			new Thread(new Runnable() {
				public void run() {
					Object result = client.sendAndReceive(transportMessage);
					System.out.println(result);
				}
			}).start();
		}
	}

	private static TransportMessage buildTransportMessage() {

		String interfaceName = "com.lubby.rpc.service.MathService";
		Class[] paramsTypes = { int.class, int.class, String.class };
		Object[] parameters = { 1, 3, "Lubby" };
		String methodName = "getSum";

		TransportMessage transportMessage = new TransportMessage(interfaceName,
				methodName, paramsTypes, parameters);

		return transportMessage;
	}

}


7.併發問題

因爲ServerSocket是阻塞的,因此在ServerSocket.accept()方法同一時刻只能有一個線程進入,雖然以後的處理都另起一個線程,可是有瓶頸的,

我在用400個線程併發鏈接服務端的時候基本沒問題,可是500個線程併發鏈接服務端的時候就會有部分線程鏈接不到服務器端.後面看下NIO回頭用NIO來改一下.

相關文章
相關標籤/搜索