實現本身的 RPC 框架(二)

前段時間本身搞了個 RPC 的輪子,不過相對來講比較簡單,最近在原來的基礎上加以改造,使用 Zookeeper 實現了 provider 自動尋址以及消費者的簡單負載均衡,對以前的感興趣的請轉 造個輪子---RPC動手實現java

RPC 模型

在原來使用 TCP 直連的基礎上實現基於 Zookeeper 的服務的註冊與發現,改造後的依賴關係是這樣的。git

child-rpc

怎麼用

話很少說,咱們來看下如何發佈和引用服務。 服務端咱們將服務的 IP 和端口號基礎信息註冊到 Zookeeper 上。github

/** * @author wuhaifei 2019-08-02 */
public class ZookeeperServerMainTest {

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig();
        serverConfig.setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer)
                .setHost("172.16.30.114")
                .setPort(5201)
                .setRef(HelloServiceImpl.class.getName())
                .setRegister(true)
                .setInterfaceId(HelloService.class.getName());

        RegistryConfig registryConfig = new RegistryConfig().setAddress("127.0.0.1:2181")
                .setSubscribe(true)
                .setRegister(true)
                .setProtocol(RpcConstants.ZOOKEEPER);
        ServerProxy serverProxy = new ServerProxy(new NettyServerAbstract())
                .setServerConfig(serverConfig)
                .setRegistryConfig(registryConfig);
        try {
            serverProxy.export();
            while (true){

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製代碼

經過 Zookeeper 引用註冊在其上的服務。算法

/** * @author wuhaifei 2019-08-02 */
public class ZookeeperClientMainTest {
    public static void main(String[] args) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setProtocol(RpcConstants.ZOOKEEPER)
                .setTimeoutMillis(100000)
                .setSerializer(AbstractSerializer.SerializeEnum.HESSIAN.serializer);

        RegistryConfig registryConfig = new RegistryConfig()
                .setAddress("127.0.0.1:2181")
                .setProtocol(RpcConstants.ZOOKEEPER)
                .setRegister(true)
                .setSubscribe(true);
        ClientProxy<HelloService> clientProxy = new ClientProxy(clientConfig, new NettyClientAbstract(), HelloService.class)
                .setRegistryConfig(registryConfig);
        for (int i = 0; i < 10; i++) {
            HelloService helloService = clientProxy.refer();
            System.out.println(helloService.sayHi());
        }
    }
}
複製代碼

運行結果就不一一貼出了,感興趣的小夥伴能夠查看樓主傳到 github 上的源碼這是一個rpc的輪子app

服務的發佈與訂閱

樓主在原來代碼的基礎上添加了 Zookeeper 的註冊的邏輯,原來的代碼相關介紹請轉 造個輪子---RPC動手實現負載均衡

服務的發佈

/** * 發佈服務 */
public void export() {
    try {
        Object serviceBean = Class.forName((String) serverConfig.getRef()).newInstance();
        RpcInvokerHandler.serviceMap.put(serverConfig.getInterfaceId(), serviceBean);
        this.childServer.start(this.getServerConfig());

        if (serverConfig.isRegister()) {
            // 將服務註冊到zookeeper
            register();
        }
    } catch (Exception e) {
        // 取消服務註冊
        unregister();
        if (e instanceof ChildRpcRuntimeException) {
            throw (ChildRpcRuntimeException) e;
        } else {
            throw new ChildRpcRuntimeException("Build provider proxy error!", e);
        }
    }
    exported = true;
}

/** * 註冊服務 */
protected void register() {
    if (serverConfig.isRegister()) {
        Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
        registry.init();
        registry.start();
        try {
            registry.register(this.serverConfig);
        } catch (ChildRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            String appName = serverConfig.getInterfaceId();
            LOGGER.info(appName, "Catch exception when register to registry: "
                    + registryConfig.getId(), e);
        }
    }
}

複製代碼

服務的訂閱

/** * 服務的引用. */
public T refer() {
    try {
        if (config.isSubscribe()) {
            subscribe();
        }
        childClient.init(this.clientConfig);
        return invoke();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}

/** * 訂閱zk的服務列表. */
private void subscribe() {
    Registry registry = RegistryFactory.getRegistry(this.getRegistryConfig());
    registry.init();
    registry.start();

    this.clientConfig = (ClientConfig) config;
    List<String> providerList = registry.subscribe(this.clientConfig);

    if (null == providerList) {
        throw new ChildRpcRuntimeException("無可用服務供訂閱!");
    }

    // 使用隨機算法,隨機選擇一個provider
    int index = ThreadLocalRandom.current().nextInt(providerList.size());
    String providerInfo = providerList.get(index);
    String[] providerArr = providerInfo.split(":");
    clientConfig = (ClientConfig) this.config;
    clientConfig.setHost(providerArr[0]);
    clientConfig.setPort(Integer.parseInt(providerArr[1]));
}
複製代碼

上面代碼比較簡單,就是在原來直連的基礎上添加 zk 的操做,在發佈服務的時候將 provider 的 IP 和端口號基礎信息註冊到 zk 上,在引用服務的時候使用隨機算法從 zk 上選取可用的 provider 信息,而後進行 invoke 調用。框架

小結

RPC(Remote procedure call)底層邏輯相對來講比較簡單,樓主在實現的過程當中參考了其餘 RPC 框架的部分代碼,受益不淺~dom

相關文章
相關標籤/搜索