手寫RPC框架

1、RPC簡介

最近看hadoop底層通訊,都是經過RPC實現的。java

RPC(Remote Procedure Call Protocol)遠程調用: 遠程過程調用是一種經常使用的分佈式網絡通訊協議,它容許運行於 一臺計算機的程序調用另外一臺計算機的子程序,同時將網絡的通訊細節隱藏起來, 使得用戶無須額外地爲這個交互做用編程。分佈式系統之間的通訊大都經過RPC實現編程

2、RPC請求過程

  1. client發起服務調用請求
  2. client stub代理程序將調用的方法,參數按照必定格式封裝,經過服務方的地址,發起網絡請求
  3. 消息經過網絡發送到服務端,server stub接收到消息,進行解包,反射調用本地對應的服務
  4. 本地服務執行將結果返回給server stub,而後server stub會將結果消息打包返回到客戶端
  5. client stub接收消息解碼,獲得最終結果.

3、RPC框架架構

要寫一個RPC框架,須要哪些組成部分?bash

  1. 序列化方式。序列化主要做用是將結構化對象轉爲字節流以便於經過網絡進行傳輸或寫入持久存儲。
  2. 遠程代理對象,通常使用jdk動態代理或者cglib代理
  3. 服務暴露 設置註冊中心Zookeeper
  4. 網絡通訊,基於事件驅動的Reactor模式

4、RPC框架示例

  1. 服務提供者,運行在服務器端,提供服務接口定義與服務實現類
  2. 服務發佈者,運行在服務器端,負責將本地服務發佈成遠程服務,管理遠程服務,提供給服務消費者使用
  3. 服務消費者,運行在客戶端,經過遠程代理對象調用遠程服務

服務端代碼

服務接口:服務器

//計算學生年齡和的接口
public interface CalculateService {
    String cal(Student sta, Student stb);
}

public class CalculateServiceImpl implements CalculateService {
    @Override
    public String cal(Student sta, Student stb) {
        return "學生年齡之和:" + (sta.getAge() + stb.getAge());
    }
}
複製代碼

服務發佈網絡

public class PublishUtilI {
    //服務接口集合
    private static List<Object> serviceList;
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,
                                                    new LinkedBlockingQueue<Runnable>(10));

    public static void publish(int port,Object... services) throws IOException {
        serviceList= Arrays.asList(services);
        ServerSocket server = new ServerSocket(port);
        Socket client;
        while (true) {
            //阻塞等待請求
            client = server.accept();
            //使用線程池處理請求
            executor.submit(new ServerHandler(client, serviceList));
        }

    }
}
複製代碼

反射調用服務架構

  1. 讀取客戶端發送的服務名
  2. 判斷服務是否發佈
  3. 若是發佈,反射調用服務端對應服務
  4. 返回結果給客戶端
public class ServerHandler implements Runnable {
    private Socket client = null;

    private List<Object> serviceList = null;

    public ServerHandler(Socket client, List<Object> service) {
        this.client = client;
        this.serviceList = service;
    }

    @Override
    public void run() {
        try (
                ObjectInputStream input = new ObjectInputStream(client.getInputStream());
                ObjectOutputStream output = new ObjectOutputStream(client.getOutputStream())
        ) {
            // 讀取客戶端要訪問那個service
            Class serviceClass = (Class) input.readObject();

            // 找到該服務類
            Object obj = findService(serviceClass);
            if (obj == null) {
                output.writeObject(serviceClass.getName() + "服務未發現");
            } else {
                //利用反射調用該方法,返回結果
                String methodName = input.readUTF(); //讀取UTF編碼的String字符串
                //讀取參數類型
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                //讀取參數
                Object[] arguments = (Object[]) input.readObject();
                Method method = obj.getClass().getMethod(methodName, parameterTypes);
                //反射執行方法
                Object result = method.invoke(obj, arguments);
                output.writeObject(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Object findService(Class serviceClass)  {
        for (Object obj : serviceList) {
            boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
            if (isFather) {
                return obj;
            }
        }
        return null;
    }
}
複製代碼

客戶端代碼


public class Client {
    public static void main(String[] args) {
        CallProxyHandler handler = new CallProxyHandler("127.0.0.1", 1111);
        CalculateService calculateService = handler.getService(CalculateService.class);
        Student sta = new Student(1);
        Student stb = new Student(2);
        String result = calculateService.cal(sta, stb);
        System.out.println(result);
    }
}
複製代碼

建立代理類遠程調用服務端發佈的服務併發

public class CallProxyHandler implements InvocationHandler {

    private String ip;
    private int port;

    public CallProxyHandler(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    /**
     * 獲取代理對象
     * @param clazz
     * @param <T>
     * @return
     */
    @SuppressWarnings("all")
    public <T> T getService(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(CallProxyHandler.class.getClassLoader(),
                new Class<?>[] {clazz}, this);
    }
    
    /**
     * 將須要調用服務的方法名,參數類型,參數按照必定格式封裝發送至服務端
     * 讀取服務端返回的結果
     * @param proxy
     * @param method
     * @param args
     * @return
     * @throws Throwable
     */
    @SuppressWarnings("all")
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        try (
                Socket socket = new Socket(ip, port);
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream input = new ObjectInputStream(socket.getInputStream())
        ) {
            output.writeObject(proxy.getClass().getInterfaces()[0]);
            output.writeUTF(method.getName());
            output.writeObject(method.getParameterTypes());
            output.writeObject(args);
            output.flush();
            Object result = input.readObject();
            if (result instanceof Throwable) {
                throw (Throwable) result;
            }
            return result;
        }
    }
}
複製代碼

至此,一個簡單的RPC服務調用框架完成。可是存在不少問題:框架

  1. 使用java自帶的序列化,效率不高,可使用Hadoop Avro與protobuf
  2. 使用BIO方式進行網絡傳輸,高併發狀況沒法應對,使用Netty框架進行網絡通訊
  3. 缺乏註冊中心,服務註冊可使用Zookeeper進行管理。
相關文章
相關標籤/搜索