最近看hadoop底層通訊,都是經過RPC實現的。java
RPC(Remote Procedure Call Protocol)遠程調用: 遠程過程調用是一種經常使用的分佈式網絡通訊協議,它容許運行於 一臺計算機的程序調用另外一臺計算機的子程序,同時將網絡的通訊細節隱藏起來, 使得用戶無須額外地爲這個交互做用編程。分佈式系統之間的通訊大都經過RPC實現編程
要寫一個RPC框架,須要哪些組成部分?bash
服務接口:服務器
//計算學生年齡和的接口
public interface CalculateService {
String cal(Student sta, Student stb);
}
public class CalculateServiceImpl implements CalculateService {
@Override
public String cal(Student sta, Student stb) {
return "學生年齡之和:" + (sta.getAge() + stb.getAge());
}
}
複製代碼
服務發佈網絡
public class PublishUtilI {
//服務接口集合
private static List<Object> serviceList;
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10));
public static void publish(int port,Object... services) throws IOException {
serviceList= Arrays.asList(services);
ServerSocket server = new ServerSocket(port);
Socket client;
while (true) {
//阻塞等待請求
client = server.accept();
//使用線程池處理請求
executor.submit(new ServerHandler(client, serviceList));
}
}
}
複製代碼
反射調用服務架構
public class ServerHandler implements Runnable {
private Socket client = null;
private List<Object> serviceList = null;
public ServerHandler(Socket client, List<Object> service) {
this.client = client;
this.serviceList = service;
}
@Override
public void run() {
try (
ObjectInputStream input = new ObjectInputStream(client.getInputStream());
ObjectOutputStream output = new ObjectOutputStream(client.getOutputStream())
) {
// 讀取客戶端要訪問那個service
Class serviceClass = (Class) input.readObject();
// 找到該服務類
Object obj = findService(serviceClass);
if (obj == null) {
output.writeObject(serviceClass.getName() + "服務未發現");
} else {
//利用反射調用該方法,返回結果
String methodName = input.readUTF(); //讀取UTF編碼的String字符串
//讀取參數類型
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
//讀取參數
Object[] arguments = (Object[]) input.readObject();
Method method = obj.getClass().getMethod(methodName, parameterTypes);
//反射執行方法
Object result = method.invoke(obj, arguments);
output.writeObject(result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private Object findService(Class serviceClass) {
for (Object obj : serviceList) {
boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
if (isFather) {
return obj;
}
}
return null;
}
}
複製代碼
public class Client {
public static void main(String[] args) {
CallProxyHandler handler = new CallProxyHandler("127.0.0.1", 1111);
CalculateService calculateService = handler.getService(CalculateService.class);
Student sta = new Student(1);
Student stb = new Student(2);
String result = calculateService.cal(sta, stb);
System.out.println(result);
}
}
複製代碼
建立代理類遠程調用服務端發佈的服務併發
public class CallProxyHandler implements InvocationHandler {
private String ip;
private int port;
public CallProxyHandler(String ip, int port) {
this.ip = ip;
this.port = port;
}
/**
* 獲取代理對象
* @param clazz
* @param <T>
* @return
*/
@SuppressWarnings("all")
public <T> T getService(Class<T> clazz) {
return (T) Proxy.newProxyInstance(CallProxyHandler.class.getClassLoader(),
new Class<?>[] {clazz}, this);
}
/**
* 將須要調用服務的方法名,參數類型,參數按照必定格式封裝發送至服務端
* 讀取服務端返回的結果
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@SuppressWarnings("all")
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try (
Socket socket = new Socket(ip, port);
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream input = new ObjectInputStream(socket.getInputStream())
) {
output.writeObject(proxy.getClass().getInterfaces()[0]);
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
output.flush();
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
}
}
}
複製代碼
至此,一個簡單的RPC服務調用框架完成。可是存在不少問題:框架