RPC筆記之初探RPC:DIY簡單RPC框架

 

1、什麼是RPC

RPC(Remote Procedure Call)即遠程過程調用,簡單的說就是在A機器上去調用B機器上的某個方法,在分佈式系統中極其經常使用。html

rpc原理其實很簡單,比較容易理解,在rpc中有兩個角色,rpc server和rpc client,先從client開始討論,由於client要像調用普通類的方法那樣調用server端的方法,好比client要調用server端的Foo#bar()方法,因此它必須首先獲取到Foo的實例,可是又不能直接new,由於直接new的話仍是至關於在本地調用,因此這個時候就必須有個什麼機制可以把Foo包一下,使得表面上看起來和Foo徹底同樣可是調用它的bar方法時底層替換爲去調用server的bar,這個機制就是代理,代理提供了一種相似於攔截的機制,能夠把調用bar方法替換成爲本身的實現,好比本地調用bar方法大體執行過程(粗糙歸納):java

execute bar()
return result

代理替換以後的bar方法(粗糙歸納):json

call rpc server execute bar()
get result from server
return result

第一步的call rpc server execute bar(),client如何告訴server本身要調用哪一個方法呢,這個方法就比較多了,比較常見的是約定一種協議,好比第幾個字節是表示的嘛意思,而後server接收後解析按照指令執行就能夠了,這樣網絡傳輸的數據比較少,或者不太講究的直接將現成的協議拿過來用,好比經過socket直接傳json、傳xml、傳對象流等等,再或者甚至用http請求的,反正可以把本身要調用哪一個方法告訴server,同時還有調用方法時須要傳遞的參數,而後等待server執行完獲取到其結果就能夠了。網絡

而後就是server端的處理,若是是使用socket傳輸數據的話,server應該啓動一個服務監聽在約定的端口(不約定好的話客戶端不知道去連誰啊),一個while循環不斷地等待客戶端的鏈接,每來一個客戶端就啓動一個新的線程去處理(此處沒有考慮高併發狀況下的負載和優化,只是基本的實現),在新線程中讀取socket流看客戶端要調用哪一個方法,而後調用本地的此方法,調用的時候將client傳過來的參數傳入進去,待方法執行完再傳回給client,傳回的方法和client傳數據過來相同,無非是走socket自定義協議、xml、json、http等等,再而後client讀取到結果返回,一次rpc調用就完成了。至此,一個簡單的rpc框架的雛形已經完成。併發

2、DIY簡單RPC框架

這一章節基於上面討論的rpc調用的過程,實現一個簡單的rpc框架,其中代理使用JDK提供的代理實現,傳輸層使用Java的ObjectInputStream和ObjectOutputStream實現。app

定義一個工具類,提供兩個方法,分別用於服務端啓動rpc server和客戶端獲取相關serviceProvider的代理對象。框架

RpcServiceProviderUtil.java:dom

package cc11001100.diySimpleRpcFramework.util;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

import static cc11001100.diySimpleRpcFramework.util.RpcLogUtil.info;

/**
 * @author CC11001100
 */
public class RpcServiceProviderUtil {

	/**
	 * 在指定的端口上啓動rpc服務,用於server端啓動rpc服務
	 *
	 * @param port
	 * @throws IOException
	 */
	public static <T> void startService(T object, int port) throws IOException {
		ServerSocket serverSocket = new ServerSocket(port);
		while (true) {
			final Socket socket = serverSocket.accept();
			info(socket, "start");
			new Thread(() -> {
				ObjectInputStream ois = null;
				ObjectOutputStream oos = null;
				try {
					// 從輸入流中讀取要調用的方法名和調用時傳入的參數
					ois = new ObjectInputStream(socket.getInputStream());
					String methodName = ois.readUTF();
					Class[] parameterTypes = (Class[]) ois.readObject();
					Object[] parameterValues = (Object[]) ois.readObject();
					Method method = object.getClass().getMethod(methodName, parameterTypes);

					// 調用方法執行
					info(socket, "begin invoke method ", methodName);
					long start = System.currentTimeMillis();
					Object invoke = method.invoke(object, parameterValues);
					long cost = System.currentTimeMillis() - start;
					info(socket, "exec method ", methodName, " done, cost=", cost, "ms");

					// 將執行結果傳回調用端
					oos = new ObjectOutputStream(socket.getOutputStream());
					oos.writeObject(invoke);
				} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
					// 若是發生了異常,反饋給調用端
					try {
						info(socket, "exec exception, e=", e.getClass(), ", cause=", e.getCause());
						oos = new ObjectOutputStream(socket.getOutputStream());
						oos.writeObject(e);
					} catch (IOException e1) {
						e1.printStackTrace();
					}
				} catch (IOException e) {
					e.printStackTrace();
				} finally {
					close(ois);
					close(oos);
					close(socket);
				}
				info(socket, "end");
			}).start();
		}
	}

	private static void close(Closeable closeable) {
		if (closeable != null) {
			try {
				closeable.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 用於客戶端獲取rpc的endpoint
	 *
	 * @param clazz
	 * @param remoteHost
	 * @param remotePort
	 * @param <T>
	 * @return
	 */
	public static <T> T wrap(Class<T> clazz, String remoteHost, int remotePort) {
		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), clazz.getInterfaces(), (proxy, method, args) -> {
			// 獲取要調用的方法的名字
			String methodName = method.getName();
			Class<?>[] parameterTypes = method.getParameterTypes();

			// 調用rpc server端去執行
			Socket socket = new Socket(remoteHost, remotePort);
			ObjectOutputStream oos = null;
			ObjectInputStream ois = null;
			Object result = null;
			try {
				oos = new ObjectOutputStream(socket.getOutputStream());
				oos.writeUTF(methodName);
				oos.writeObject(parameterTypes);
				oos.writeObject(args);

				// 讀取rpc server端執行結果
				ois = new ObjectInputStream(socket.getInputStream());
				result = ois.readObject();

				// 此處不catch,執行時出了異常儘管拋出
			} finally {
				close(ois);
				close(oos);
				close(socket);
			}

			// 檢測server端執行是否拋了異常
			if (result != null && result instanceof Throwable) {
				throw (Throwable) result;
			}

			return result;
		});
	}

}

RpcLogUtil.java:socket

package cc11001100.diySimpleRpcFramework.util;

import java.net.Socket;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @author CC11001100
 */
public class RpcLogUtil {

	private static String now() {
		return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
	}

	public static void info(Socket socket, Object... messages) {
		String remoteAddress = socket.getRemoteSocketAddress().toString();
		StringBuilder sb = new StringBuilder();
		sb.append("[").append(now()).append("]")
				.append(" - ").append(remoteAddress).append(":").append(" - ");
		for (Object msg : messages) {
			sb.append(msg.toString());
		}
		System.out.println(sb.toString());
	}

}

由於代理是使用JDK提供的代理機制實現的,這種代理方式要求必需要定義一個接口而後實現它,因此首先定義一個接口:分佈式

package cc11001100.diySimpleRpcFramework.rpcServiceProvider;

public interface FooRpcServiceProvider {

	int add(int a, int b);

}

而後實現它:

package cc11001100.diySimpleRpcFramework.rpcServiceProvider;

/**
 * @author CC11001100
 */
public class FooRpcServiceProviderImpl implements FooRpcServiceProvider {

	@Override
	public int add(int a, int b) {
		return a + b;
	}

}

測試一下此RPC server是否可用,先啓動一個rpc server:

package cc11001100.diySimpleRpcFramework.test;

import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProviderImpl;
import cc11001100.diySimpleRpcFramework.util.RpcServiceProviderUtil;

import java.io.IOException;

/**
 * 啓動rpc server端
 *
 * @author CC11001100
 */
public class FooRpcServiceProviderServerTest {

	public static void main(String[] args) throws IOException {

		RpcServiceProviderUtil.startService(new FooRpcServiceProviderImpl(), 10086);

	}

}

而後啓動client去調用server:

package cc11001100.diySimpleRpcFramework.test;

import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProvider;
import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProviderImpl;
import cc11001100.diySimpleRpcFramework.util.RpcServiceProviderUtil;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * rpc客戶端調用
 *
 * @author CC11001100
 */
public class FooRpcServiceProviderClientTest {

	public static void main(String[] args) throws InterruptedException {

		FooRpcServiceProvider foo = RpcServiceProviderUtil.wrap(FooRpcServiceProviderImpl.class, "localhost", 10086);
		Random random = new Random();
		while (true) {
			int a = random.nextInt(10);
			int b = random.nextInt(10);
			int result = foo.add(a, b);
			System.out.printf("%d + %d = %d\n", a, b, result);
			TimeUnit.MILLISECONDS.sleep(random.nextInt(900) + 100);
		}

	}

}

控制檯輸出:

image image

我寫的rpc server精通10之內加法,這點client能夠做證。

 

.

相關文章
相關標籤/搜索