一文帶你實現RPC框架

想要獲取更多文章能夠訪問個人博客 - 代碼無止境java

如今大部分的互聯網公司都會採用微服務架構,但具體實現微服務架構的方式有所不一樣,主流上分爲兩種,一種是基於Http協議的遠程調用,另一種是基於RPC方式的調用。兩種方式都有本身的表明框架,前者是著名的Spring Cloud,後者則是有阿里巴巴開源的Dubbo,兩者都被普遍的採用。今天這篇文章,咱們就一塊兒來了解一下RPC,而且和你們一塊兒動手實現一個簡單的RPC框架的Demo。git

什麼是RPC

RPC是一種遠程調用過程,是一種經過網絡遠程調用其餘服務的協議。通俗的說就是,A經過打電話的方式讓B幫忙辦一件事,B辦完過後將結果告知A。 咱們下面經過一張圖來大概瞭解一下在一個完整的RPC框架中存在的角色以及整個遠程調用的過程。github

Dubbo框架

經過上面的圖能夠看出來,在RPC框架中主要有如下4個角色:web

  • registry - 註冊中心,當服務提供者啓動時會向註冊中心註冊,而後註冊中心會告知全部的消費者有新的服務提供者。
  • provider - 服務提供者,遠程調用過程當中的被消費方。
  • consumer - 服務消費者,遠程調用過程當中的消費方。
  • monitor - 監視器,它主要負責統計服務的消費和調用狀況。

啓動服務提供者後,服務提供者會以異步的方式向註冊中心註冊。而後啓動服務消費者,它會訂閱註冊中心中服務提供者列表,當有服務提供者的信息發生改變時,註冊中心會通知全部的消費者。當消費者發起遠程調用時,會經過動態代理將須要請求的參數以及方法簽名等信息經過Netty發送給服務提供者,服務提供者收到調用的信息後調用對應的方法並將產生的結果返回給消費者,這樣就完成了一個完整的遠程調用。固然了這個過程當中可能還會將調用信息異步發送給monitor用於監控和統計。spring

閱讀過上面的內容後,你應該對RPC框架有了一個大概的認識。爲了更好更深刻的瞭解RPC框架的原理,下面咱們就一塊兒來動手實現一個簡單的RPC框架吧。bootstrap

框架核心部分

首先咱們要實現的是整個RPC框架的核心部分,這部分的主要包含如下內容:api

  1. RPC服務的註解的實現。
  2. 服務提供者初始化、註冊、以及響應遠程調用的實現。
  3. 服務消費者訂閱註冊中心、監聽服務提供者的變化的實現。
  4. 動態代理的實現。

整個核心部分將以一個Spring Boot Starter的形式實現,這樣咱們能夠很方便的在Spring Boot項目中使用它。數組

註解

咱們須要使用一個註解來標識服務提供者所提供服務的實現類,方便在初始化的時候將其交由Spring管理,也只有這樣咱們才能夠在遠程調用發生時能夠找到它們。瀏覽器

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    Class<?> value();

}

value屬性用來標記這個服務的實現類對應的接口,RPC框架中服務提供者和消費者之間會共同引用一個服務接口的包,當咱們須要遠程調用的時候實際上只須要調用接口中定義的方法便可。
除了一個標識服務實現類的註解以外,咱們還須要一個標識服務消費者注入服務實現的註解@RpcConsumer,被其修飾的屬性在初始化的時候都會被咱們設置上動態代理,這一點在後面會詳細講到,咱們先來看下它的具體實現吧。服務器

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcConsumer {

    /**
     * 服務名稱
     * @return
     */
    String providerName();

}

服務提供者

服務提供者啓動的時候,咱們RPC框架須要作如下幾件事情:

  1. 掃描服務提供者中全部提供服務的類(被@RpcService修飾的類),並將其交由BeanFactory管理。
  2. 啓動Netty服務端,用來收到消費者的調用消息,而且返回調用結果。
  3. 向註冊中心註冊,本例中使用的註冊中心是Zookeeper。

這部分咱們定義了一個ProviderAutoConfiguration類來實現這幾個步驟,

@PostConstruct
public void  init() {
    logger.info("rpc server start scanning provider service...");
    Map<String, Object> beanMap = this.applicationContext.getBeansWithAnnotation(RpcService.class);
    if (null != beanMap && !beanMap.isEmpty()) {
        beanMap.entrySet().forEach(one -> {
            initProviderBean(one.getKey(), one.getValue());
        });
    }
    logger.info("rpc server scan over...");
    // 若是有服務的話才啓動netty server
    if (!beanMap.isEmpty()) {
        startNetty(rpcProperties.getPort());
    }
}

看上面的代碼,首先咱們獲取到了全部被@RpcService註解修飾的實體,而且調用了initProviderBean方法逐一對其處理,而後咱們啓動了Netty。那麼咱們須要在initProviderBean方法中作些什麼呢?其實很簡單,就是逐一將其交由BeanFactory管理。

private void initProviderBean(String beanName, Object bean) {
    RpcService rpcService = this.applicationContext
                .findAnnotationOnBean(beanName, RpcService.class);
    BeanFactory.addBean(rpcService.value(), bean);
}

將服務實現類交由Spring管理以後,咱們還須要啓動Netty用來接收遠程調用信息,啓動Netty的代碼在這裏我就不所有粘出來了,你們能夠在源碼中查看。在Netty啓動成功以後,其實咱們還執行了下面的代碼,用來向ZK註冊。

new RegistryServer(rpcProperties.getRegisterAddress(),
                    rpcProperties.getTimeout(), rpcProperties.getServerName(),
                    rpcProperties.getHost(), port)
                    .register();

整個註冊的過程也很是容易理解,首先是建立了一個ZK鏈接,而後是判斷是否有/rpc的根節點,若是沒有的話就建立一個,最後就是在根節點下建立一個EPHEMERAL_SEQUENTIAL類型的節點,這種類型的節點在ZK重啓以後會自動清除,這樣能夠保證註冊中心重啓後會自動清除服務提供者的信息。而在節點中會存儲服務提供者的名稱,IP地址以及端口號的信息,這樣RPC框架就能夠根據這些信息順利的定位到服務提供者。

public void register() throws ZkConnectException {
    try {
        // 獲取zk鏈接
        ZooKeeper zooKeeper = new ZooKeeper(addr, timeout, event -> {
            logger.info("registry zk connect success...");
        });
        if (zooKeeper.exists(Constants.ZK_ROOT_DIR, false) == null) {
            zooKeeper.create(Constants.ZK_ROOT_DIR, Constants.ZK_ROOT_DIR.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        zooKeeper.create(Constants.ZK_ROOT_DIR + "/" + serverName,
                (serverName + ","+ host + ":" + port).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("provider register success {}", serverName);
    } catch (Exception e) {
        throw new ZkConnectException("register to zk exception," + e.getMessage(), e.getCaus());
    }
}

就這樣咱們RPC框架與服務提供者相關的內容就完成了,接下來要完成的是服務消費者部分。

服務消費者

對於服務消費者,咱們框架須要對它的處理就是,爲全部的RPC服務(被@RpcConsumer修飾的屬性)設置上動態代理。具體的設置代碼以下所示(PS:這段代碼寫在ConsumerAutoConfiguration類中哦):

@Bean
public BeanPostProcessor beanPostProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName)
                throws BeansException {
            Class<?> objClz = bean.getClass();
            for (Field field : objClz.getDeclaredFields()) {
                RpcConsumer rpcConsumer = field.getAnnotation(RpcConsumer.class);
                if (null != rpcConsumer) {
                    Class<?> type = field.getType();
                    field.setAccessible(true);
                    try {
                        field.set(bean, rpcProxy.create(type, rpcConsumer.providerName()));
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } finally {
                        field.setAccessible(false);
                    }
                }
            }
            return bean;
        }
    };
}

BeanPostProcessor也稱爲Bean後置處理器,它是Spring中定義的接口,在Spring容器的建立過程當中(具體爲Bean初始化先後)會回調BeanPostProcessor中定義的兩個方法。上面實現的postProcessBeforeInitialization是在Bean初始化以前調用的,還有一個postProcessAfterInitialization方法是在Bean初始化以後調用的。
如上面代碼所示,咱們會在每個帶有@RpcConsumer的實例初始化以前利用反射機制爲其設置一個RpcProxy的代理,能夠看到咱們在建立這個動態代理的時候還須要服務提供者的名稱,這是由於在動態代理的實現裏面須要使用服務提供者的名稱來查詢服務提供者的地址信息。那麼這個動態代理的實現又是怎樣的呢?這就是咱們下一步須要作的事情。

動態代理

在這個RPC框架裏面動態代理主要實現的內容就是,當服務消費者調用服務提供者提供的接口時,將調用信息經過Netty發送給對應的服務調用者,而後由服務提供者完成相關的處理而且將處理結果返回給服務消費者。下面咱們就一塊兒來看一下RpcProxy的是如何實現這部分功能的。

@Component
public class RpcProxy {

    @Autowired
    private ServiceDiscovery serviceDiscovery;

    public <T> T create(Class<?> interfaceClass, String providerName) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
            // 經過netty向Rpc服務發送請求。
            // 構建一個請求。
            RpcRequest request = new RpcRequest();
            request.setRequestId(UUID.randomUUID().toString())
                    .setClassName(method.getDeclaringClass().getName())
                    .setMethodName(method.getName())
                    .setParamTypes(method.getParameterTypes())
                    .setParams(args);
            // 獲取一個服務提供者。
            ProviderInfo providerInfo = serviceDiscovery.discover(providerName);
            // 解析服務提供者的地址信息,數組第一個元素爲ip地址,第二個元素爲端口號。
           String[] addrInfo = providerInfo.getAddr().split(":");
            String host = addrInfo[0];
            int port = Integer.parseInt(addrInfo[1]);
            RpcClient rpcClient = new RpcClient(host, port);
            // 使用Netty向服務提供者發送調用消息,並接收請求結果。
            RpcResponse response = rpcClient.send(request);
            if (response.isError()) {
                throw response.getError();
            } else {
                return response.getResult();
            }
        });
    }
}

其實在代理裏面首先咱們會構造請求信息實體,而後會根據服務提供者的名稱獲取一個服務提供者的地址,最後再將請求信息發送給服務提供者並接收調用結果。獲取服務提供者的方法會在後面消費者和提供者的通用配置裏面講解。咱們在這裏重點來看一下發送調用信息並接收調用結果的實現。

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
    
    ... 此處省略對象屬性信息,可查看源碼。

    public RpcResponse send(RpcRequest request){
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ... 此處省略Netty相關配置,可查看源碼。
            // 鏈接服務器
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().writeAndFlush(request).sync();
            future = new CompletableFuture<>();
            future.get();
            if (response != null) {
                // 關閉netty鏈接。
                channelFuture.channel().closeFuture().sync();
            }
            return response;
        } catch (Exception e) {
            logger.error("client send msg error,", e);
            return null;
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcResponse rpcResponse) throws Exception {
        logger.info("client get request result,{}", rpcResponse);
        this.response = rpcResponse;
        future.complete("");
    }
}

經過上面的代碼能夠看出向服務提供者發送消息是異步的,咱們經過CompletableFutureget()方法阻塞當前線程,直到接收到調用結果(PS:咱們在channelRead0方法中收到返回結果後會將其設置成完成狀態)。看到這裏,你可能會問服務提供者收到調用請求信息後如何處理的呢?具體的處理邏輯咱們寫在了ServerHandler這個類中,能夠看出在channelRead0方法收到一條調用信息以後,調用handle方法來處理具體的調用過程,在handle方法中會使用反射機制找到所調用方法的具體實現,而後執行調用過程並獲取結果,最後再使用Netty將結果返回給消費者服務。

public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcRequest request) throws Exception {
        logger.info("provider accept request,{}", request);
        // 返回的對象。
        RpcResponse rpcResponse = new RpcResponse();
        // 將請求id原路帶回
        rpcResponse.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            rpcResponse.setResult(result);
        } catch (Exception e) {
            rpcResponse.setError(e);
        }
        channelHandlerContext.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(RpcRequest request) throws Exception {
        String className = request.getClassName();
        Class<?> objClz = Class.forName(className);
        Object o = BeanFactory.getBean(objClz);
        // 獲取調用的方法名稱。
        String methodName = request.getMethodName();
        // 參數類型
        Class<?>[] paramsTypes = request.getParamTypes();
        // 具體參數。
        Object[] params = request.getParams();
        // 調用實現類的指定的方法並返回結果。
        Method method = objClz.getMethod(methodName, paramsTypes);
        Object res = method.invoke(o, params);
        return res;
    }
}

消費者和提供者的通用配置

除了ProviderAutoConfigurationConsumerAutoConfiguration兩個配置類,咱們還定義了一個RpcAutoConfiguration類來配置一些其餘的東西,以下所示。

public class RpcAutoConfiguration {
    ...

    @Bean
    @ConditionalOnMissingBean
    public ServiceDiscovery serviceDiscovery() {
        ServiceDiscovery serviceDiscovery =
                null;
        try {
            serviceDiscovery = new ServiceDiscovery(rpcProperties.getRegisterAddress());
        } catch (ZkConnectException e) {
            logger.error("zk connect failed:", e);
        }
        return serviceDiscovery;
    }

    @Bean
    @ConditionalOnMissingBean
    public RpcProxy rpcProxy() {
        RpcProxy rpcProxy = new RpcProxy();
        rpcProxy.setServiceDiscovery(serviceDiscovery());
        return rpcProxy;
    }
}

在這個配置類裏面,主要初始化了一個ServiceDiscovery的對象以及一個RpcProxy的對象。其中RpcProxy是動態代理,在上面咱們已經詳細瞭解過了。那麼這裏就來着重瞭解一下ServiceDiscovery是幹啥的吧。
你們還記得咱們在文章開始的時候貼出來的那張圖片嗎?在服務消費者初始化的時候會去訂閱服務提供者內容的變化,ServiceDiscovery的主要功能就是這個,其主要代碼以下所示(若是你須要完整的代碼,能夠查看本文源碼)。

public class ServiceDiscovery {

    // 存儲服務提供者的信息。
    private volatile List<ProviderInfo> dataList = new ArrayList<>();

    public ServiceDiscovery(String registoryAddress) throws ZkConnectException {
        try {
            // 獲取zk鏈接。
            ZooKeeper zooKeeper = new ZooKeeper(registoryAddress, 2000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    logger.info("consumer connect zk success!");
                }
            });
            watchNode(zooKeeper);
        } catch (Exception e) {
            throw new ZkConnectException("connect to zk exception," + e.getMessage(), e.getCause());
        }
    }

    /**
     * 監聽服務提供者的變化
     */
    public void watchNode(final ZooKeeper zk) {
        ...
    }

    /**
     * 獲取一個服務提供者
     */
    public ProviderInfo discover(String providerName) {
        ....
    }
}

在這個類的構造方法裏面,咱們和ZK註冊中心創建了一個鏈接,而且在watchNode方法中監聽服務提供者節點的變化,當有服務提供者信息有變化時會去修改dataList裏的內容,這樣能夠保證在服務本地維持一份可用的服務提供者的信息。而在遠程調用發生的時候咱們會經過discover方法(PS:前面有見到過哦)去dataList裏面尋找一個可用的服務提供者來提供服務。

Starter的配置

咱們還須要在resources目錄下新建一個META-INF目錄,而後在該目錄下新建一個spring.factories文件,裏面的內容以下面代碼所示。它主要是用來指定在Spring Boot項目啓動的時候須要加載的其餘配置。若是你有不明白的地方能夠查詢一下Spring Boot自定義Stater的相關內容。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.itweknow.sbrpccorestarter.config.RpcAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ProviderAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ConsumerAutoConfiguration

到這一步咱們框架的核心部分就完成了,它將會以一個Spring Boot Stater的形式提供給服務提供者和服務消費者使用,接下來咱們就將分別定義一個服務提供者和一個消費者來測試咱們本身實現的RPC框架。

建立服務提供者

在建立服務提供者以前,咱們須要新建一個與服務消費者之間共享的服務接口。由於前面提到過,在服務消費者眼裏的遠程調用實際上就是調用本地的接口方法而已。在這個項目裏咱們就建立了一個HelloRpcService.java的接口,以下所示:

public interface HelloRpcService {
    String sayHello();
}

在接口定義完成以後,咱們就來建立咱們的服務提供者,而且實現上面定義的HelloRpcService接口。在服務提供者服務裏還須要依賴RPC框架的核心Starter以及服務接口包,咱們須要在pom.xml中添加下面的依賴。

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-core-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

添加完依賴後,咱們就來看下HelloRpcService的具體實現吧:

@RpcService(HelloRpcService.class)
public class HelloRpcServiceImpl implements HelloRpcService {
    
    @Override
    public String sayHello() {
        return "Hello RPC!";
    }
}

其實現很簡單,主要是要須要在實現類上加上@RpcService註解,這樣在項目啓動的時候RPC框架纔會掃描到它,並將其交給BeanFactory管理。接下來還須要配置的是一些RPC框架須要的配置項,包括服務名稱,ZK的地址以及Netty啓動的端口等信息。這些信息在框架是經過RpcProperties這個配置類來讀取的,有興趣的同窗能夠在源碼中找到它。

spring.rpc.host=localhost
# netty服務的端口號
spring.rpc.port=21810
# zk地址
spring.rpc.register-address=localhost:2181
spring.rpc.server-name=provider
# 鏈接zk的超時時間
spring.rpc.timeout=2000

建立服務消費者

服務消費者一樣也須要RPC核心框架的Starter以及服務接口的依賴,和RPC框架的一些基礎配置項,和服務提供者相似,這裏就不粘出來了。這裏須要說明的一點是,爲了方便測試,服務消費者是一個Web服務,因此它還添加了spring-boot-starter-web的依賴。下面咱們就一塊兒來看下服務消費者是如何調用遠程服務的吧。

@RestController
@RequestMapping("/hello-rpc")
public class HelloRpcController {


    @RpcConsumer(providerName = "provider")
    private HelloRpcService helloRpcService;

    @GetMapping("/hello")
    public String hello() {
        return helloRpcService.sayHello();
    }
}

咱們在消費者服務中寫了一個hello的接口,在接口裏面調用了HelloRpcService接口裏的sayHello()方法,看過前面內容的同窗應該知道,被@RpcConsumer修飾的helloRpcService屬性在初始化的時候會爲其設置一個動態代理,當咱們調用這個接口裏面的方法時,會經過Netty向服務提供者發送調用信息,而後由服務提供者調用相應方法並返回結果。
到這一步,咱們能夠說完成了一個簡單的RPC框架以及其使用,下面咱們就一塊兒來驗證一下結果吧。

測試

在測試以前咱們須要在本身本地電腦上安裝Zookeeper,具體的安裝方式很是簡單。能夠參考這篇文章。
安裝好Zookeeper後,咱們須要完成如下幾個步驟:

  1. 啓動Zookeeper。
  2. 啓動服務提供者。
  3. 啓動服務消費者。

第一次啓動服務消費者的過程當中,你的控制檯能夠能會報一個找不到/rpc節點的錯誤,產生這個錯誤的緣由是咱們在第一次啓動的時候ZK裏面並不存在/rpc這個節點,可是若是你仔細研究源碼的話,會發現當這個節點不存在的時候,咱們會建立一個。因此直接忽略這個異常便可。完成以上幾步以後,咱們只須要在瀏覽器中訪問http://127.0.0.1:8080/hello-rpc/hello,若是你看到了下面的結果,那麼恭喜你,整個RPC框架完美的運行成功了。

遠程調用結果

結束語

本文的主要內容是和你們一塊兒完成了一個Demo版的RPC框架,其主要目的是讓你們更深入的理解RPC的原理以及其調用過程。固然因爲文章篇幅的緣由,不少代碼沒有直接在文中給出,您能夠在Github上找到完整的實現。若是您有什麼問題能夠在Github上提交Issue或者發送郵件到個人郵箱(gancy.programmer@gmail.com),若是您以爲這篇文章寫的還行的話,但願您能給我個Star,這是對我最好的鼓勵。

PS:學習不止,碼不停蹄!若是您喜歡個人文章,就關注我吧!

掃碼關注「代碼無止境」
相關文章
相關標籤/搜索