基於Netty的RPC簡易實現

代碼地址以下:
http://www.demodashi.com/demo/13448.htmlhtml

能夠給你提供思路java

也可讓你學到Netty相關的知識數組

固然,這只是一種實現方式瀏覽器

需求

看下圖,其實這個項目就是爲了作這樣一件事。服務器

有一個公共服務ServerA,它提供了一個名爲getUserName的服務。網絡

如今有多個相似ServerB的Web應用服務器。app

當客戶想經過ServerB要請求getUserName服務時,因爲ServerB服務中由於沒有UserService的實現類,致使不能正常提供服務的問題。ide

rpc原理

預期結果

能夠看到,在Client項目中,UserService沒有實現類,可是返回了正常的結果。工具

運行結果

項目結構

整個項目分爲三個部分,Server端、Client端以及一個公共jar。post

下圖正是整個項目的目錄結構圖。

目錄結構

公共部分

公共部分的存在是由於我將服務器端和客戶端寫在了一個項目中,爲了避免讓代碼重複警告,因此提出來的一個公共模塊。主要是幾個實體類和一些序列化工具類。

  • MsgPackDecoder

    • 該類是一個MsgPack編碼器,主要做用將Object對象序列化成byte數組。
  • MsgPackEncoder

    • 該類是一個MsgPack解碼器,主要做用將byte數組反序列化成Object對象。
  • ObjectCodec

    • 該類是繼承了MessageToMessageCodec<ByteBuf, Object>。是自定義序列化類主要有兩個方法,encode和decode

      @Override
          protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
              // 調用工具類的序列化方法
              byte[] data = ObjectSerializerUtils.serilizer(msg);
              ByteBuf buf = Unpooled.buffer();
              buf.writeBytes(data);
              out.add(buf);
          }
      
          @Override
          protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
              // 調用工具類的反序列化方法
              byte[] bytes = new byte[msg.readableBytes()];
              msg.readBytes(bytes);
              Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
              out.add(deSerilizer);
          }
  • ObjectSerializerUtils

    • 序列化工具類。序列化方法能夠本身實現
  • MethodInvokeMeta

    • 重點類。這個類是一個實體類。用於在網絡中傳輸的類。主要有5個字段分別記錄了一個接口的類對象,調用接口的方法名,方法的參數列表(包含參數類型,和參數列表),方法的返回值類型。
    • 在客戶端中,這個類將調用方所要調用的方法封裝。
    • 在服務端中,這個類主要用於服務器反射調用方法。
    • 固然,也能夠用String來記錄這些元信息。
  • NullWritable

    • 這個類主要用於在網絡中傳輸null。當返回值爲null時,服務端會返回NullWritable對象。客戶端接收到NullWritable時進行null處理。
  • User

    • 實體對象。測試用例

客戶端

上面的目錄結構圖也有提到,客戶端中只有UserService接口,因此客戶端中若是不作處理是不能正常運行的。

客戶端中核心類有如下7個,其中與Netty相關的核心類與服務端同樣有3個

  • ClientChannelHandlerAdapter
  • CustomChannelInitializerClient
  • NettyClient
  • RpcProxyFactoryBean
  • NettyBeanScanner
  • PackageClassUtils
  • WrapMethodUtils

NettyClient

這個類是Netty客戶端的啓動類,這個類中與Netty服務端進行通訊

CustomChannelInitializerClient

這個類是用於初始化管道事件的類。主要添加了TCP粘包問題解決方案和自定義編解碼器工具

ClientChannelHandlerAdapter

這個類是Netty客戶端的處理類,主要經過這個類將調用信息寫給Netty服務端。

NettyBeanScanner

這個類實現了BeanFactoryPostProcessor接口。BeanFactoryPostProcessor是Spring初始化Bean時對外暴露的擴展點。

Spring IoC容器容許BeanFactoryPostProcessor在容器實例化任何bean以前讀取bean的定義(配置元數據),並能夠修改它。同時能夠定義多個BeanFactoryPostProcessor,經過設置'order'屬性來肯定各個BeanFactoryPostProcessor執行順序。

在postProcessBeanFactory方法中,調用PackageClassUtils.resolver方法,將UserService.class註冊到SpringBean工廠。

/**
     * 註冊Bean到Spring的bean工廠
     */
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
        // 加載遠程服務的接口
        List<String> resolverClass = PackageClassUtils.resolver(basePackage);
        for (String clazz : resolverClass) {
            String simpleName;
            if (clazz.lastIndexOf('.') != -1) {
                simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
            } else {
                simpleName = clazz;
            }
            BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RpcProxyFactoryBean.class);
            gd.addPropertyValue("interfaceClass", clazz);
            gd.addPropertyReference("nettyClient", clientName);
            this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
        }
    }

RpcProxyFactoryBean

重點類,這個類繼承了AbstractFactoryBean 實現了InvocationHandler。是一個代理類。

使用jdk動態代理的方式建立代理對象。

Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);

在invoke方法中,

調用WrapMethodUtils工具類中的方法,將代理對象方法封裝成MethodInvokeMeta對象。

而後經過NettyClient傳輸給NettyServer端,進行RPC調用,並將結果返回。

至此,客戶端的核心類介紹完畢。

服務端

服務端主要的核心類有三個

  • ServerChannelHandlerAdapter
  • RequestDispatcher
  • NettyServer
NettyServer

這個類主要有兩個方法,一個是啓動Netty服務的方法,一個是關閉服務器的方法。核心代碼以下:

/**
     * 啓動netty服務的方法
     */
    public void start() {
        // 服務器監聽端口號
        int port = serverConfig.getPort();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // BACKLOG用於構造服務端套接字ServerSocket對象,
                // 標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度。
                // 若是未設置或所設置的值小於1,Java將使用默認值50
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO));
        try {
            // 設置事件處理
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 自定義長度解碼器解決TCP黏包問題
                    // maxFrameLength 最大包字節大小,超出報異常
                    // lengthFieldOffset 長度字段的誤差
                    // lengthFieldLength 長度字段佔的字節數
                    // lengthAdjustment 添加到長度字段的補償值
                    // initialBytesToStrip 從解碼幀中第一次去除的字節數
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(65535
                            , 0, 2, 0, 2));
                    // LengthFieldPrepender編碼器,它能夠計算當前待發送消息的二進制字節長度,將該長度添加到ByteBuf的緩衝區頭中
                    pipeline.addLast(new LengthFieldPrepender(2));
                    // 序列化工具
                    pipeline.addLast(new ObjectCodec());
                    pipeline.addLast(channelHandlerAdapter);
                }
            });
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
ServerChannelHandlerAdapter

這個類主要重寫了ChannelRead方法,在ChannelRead方法中調用了RequestDispatcher類中的dispatcher方法來處理消息。

RequestDispatcher

這個類的做用爲,將ChannelRead方法中讀到的數據(也能夠說命令),經過反射來調用,執行對應方法,將執行後的結果寫回通道,供客戶端使用。

/**
     * 分發命令
     *
     * @param channelHandlerContext channelHandlerContext
     * @param invokeMeta            invokeMeta
     */
    public void dispatcher(final ChannelHandlerContext channelHandlerContext, final MethodInvokeMeta invokeMeta) {
        ChannelFuture f = null;
        try {
            // 獲取客戶端準備調用的接口類
            Class<?> interfaceClass = invokeMeta.getInterfaceClass();
            // 獲取準備調用的方法名稱
            String name = invokeMeta.getMethodName();
            // 獲取方法對應的參數列表
            Object[] args = invokeMeta.getArgs();
            // 獲取參數類型
            Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
            // 經過Spring獲取對象
            Object targetObject = app.getBean(interfaceClass);
            // 獲取待調用方法
            Method method = targetObject.getClass().getMethod(name, parameterTypes);
            // 執行方法
            Object obj = method.invoke(targetObject, args);
            if (obj == null) {
                // 若是方法結果爲空,將NULL結果寫給客戶端
                f = channelHandlerContext.writeAndFlush(NullWritable.nullWritable());
            } else {
                // 寫給客戶端結果
                f = channelHandlerContext.writeAndFlush(obj);
            }
            // 釋放通道,不是關閉鏈接
            f.addListener(ChannelFutureListener.CLOSE);
        } catch (Exception e) {
            // 出現異常後的處理
            f = channelHandlerContext.writeAndFlush(e.getMessage());
        } finally {
            if (f != null) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

使用方法

  1. 啓動ServerApplication
  2. 啓動ClientApplication
  3. 打開Chrome瀏覽器,輸入:http://localhost:8080/client/get/name 或者 http://localhost:8080/client/get/info 便可看到結果。

若是想整合到現有項目中,請直接留言或者聯繫做者,這次並無提供集成版本,但若是此篇文章已理解,那麼本身能夠手動的去集成到本身的項目中。基於Netty的RPC簡易實現

代碼地址以下:
http://www.demodashi.com/demo/13448.html

注:本文著做權歸做者,由demo大師代發,拒絕轉載,轉載須要做者受權

相關文章
相關標籤/搜索