【手擼RPC框架】SpringBoot+Netty4實現RPC框架

【手擼RPC框架】SpringBoot+Netty4實現RPC框架

線程模型

Netty高性能架構設計java

簡單瞭解React線程模型,參考文章【五分鐘快速理解 Reactor 模型】react

舉例說明:Reactor的三種線程模型spring

線程模型1:傳統阻塞 I/O 服務模型

模型特色:shell

  • 採用阻塞IO模式獲取輸入的數據
  • 每一個連接都須要獨立的線程完成數據的輸入,業務處理、數據返回。

問題分析:編程

  • 當併發數很大,就會建立大量的線程,佔用很大系統資源
  • 鏈接建立後,若是當前線程暫時沒有數據可讀,該線程會阻塞在read操做,形成線程資源浪費。

線程模型2:Reactor 模式

針對傳統阻塞I/O服務模型的2個缺點,解決方案以下:bootstrap

  • 基於 I/O 複用模型:多個鏈接共用一個阻塞對象,應用程序只須要在一個阻塞對象等待,無需阻塞等待全部鏈接。當某個鏈接有新的數據能夠處理時,操做系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。Reactor 對應的叫法: 1. 反應器模式 2. 分發者模式(Dispatcher) 3. 通知者模式(notifier)
  • 基於線程池複用線程資源:沒必要再爲每一個鏈接建立線程,將鏈接完成後的業務處理任務分配給線程進行處理,一個線程能夠處理多個鏈接的業務。

單 Reactor 單線程

模型分析api

  • 優勢:模型簡單,沒有多線程、進程通訊、競爭的問題,所有都在一個線程中完成
  • 缺點:性能問題,只有一個線程,沒法徹底發揮多核 CPU 的性能。Handler 在處理某個鏈接上的業務時,整個進程沒法處理其餘鏈接事件,很容易致使性能瓶頸
  • 缺點:可靠性問題,線程意外終止,或者進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部消息,形成節點故障
  • 使用場景:客戶端的數量有限,業務處理很是快速,好比 Redis在業務處理的時間複雜度 O(1) 的狀況

單 Reactor 多線程

模型分析緩存

  • 優勢:能夠充分的利用多核cpu 的處理能力
  • 缺點:多線程數據共享和訪問比較複雜, reactor 處理全部的事件的監聽和響應,在單線程運行, 在高併發場景容易出現性能瓶頸.

主從 Reactor 多線程

模型分析服務器

  • 優勢:父線程與子線程的數據交互簡單職責明確,父線程只須要接收新鏈接,子線程完成後續的業務處理。
  • 優勢:父線程與子線程的數據交互簡單,Reactor 主線程只須要把新鏈接傳給子線程,子線程無需返回數據
  • 缺點:編程複雜度較高
  • 結合實例:這種模型在許多項目中普遍使用,包括 Nginx 主從 Reactor 多進程模型,Memcached 主從多線程,Netty 主從多線程模型的支持

先實現簡單的Netty通訊

服務端示例

public static void main(String[] args) {
    //建立鏈接線程組,線程數爲1。只負責處理鏈接請求
    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    //建立工做線程組,線程數默認爲cpu核數*2。處理與客戶端的業務處理
    NioEventLoopGroup worker = new NioEventLoopGroup();
    //建立Server端的啓動對象
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //配置線程組
    serverBootstrap.group(boss, worker)
        //使用 NioServerSocketChannel 做爲服務器的通道實現
        .channel(NioServerSocketChannel.class)
        //給worker線程組初始化處理器
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                    //添加字符串的編解碼器
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    //添加對象的編解碼器,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器,防止內存溢出
                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                    .addLast(new ObjectEncoder())
                    //添加自定義的業務處理器
                    .addLast(new SimpleChannelInboundHandler<Object>() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.info("客戶端鏈接啦。。。客戶端地址:{}", ctx.channel().remoteAddress());
                        }
                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
                            log.info("服務端接收到的數據:{}", o.toString());
                            //價值1個億的AI代碼
                            String str = o.toString();
                            str = str.replace("嗎", "");
                            str = str.replace("?", "!");
                            str = str.replace("? ", "! ");
                            channelHandlerContext.writeAndFlush(str);
                        }
                    });
            }
        });
    //啓動而且監聽
    ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly();
    //監聽關閉通道
    channelFuture.channel().closeFuture();
}

客戶端示例

public static void main(String[] args) {
    //設置客戶端工做線程
    NioEventLoopGroup worker = new NioEventLoopGroup();
    //建立客戶端啓動對象
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(worker)
        //通道鏈接者
        .channel(NioSocketChannel.class)
        //給worker線程組初始化處理器
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                    //添加字符串的編解碼器
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    //添加對象的編解碼器,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器,防止內存溢出
                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                    .addLast(new ObjectEncoder())
                    //添加自定義的業務處理器
                    .addLast(new SimpleChannelInboundHandler<Object>() {

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ctx.writeAndFlush("哈哈哈");
                        }

                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
                            log.info("客戶端接收到的數據:{}", o.toString());
                        }
                    });
            }
        });

    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly();
    //客戶端須要輸入信息,建立一個掃描器
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()) {
        String msg = scanner.nextLine();
        //經過channel發送到服務器端
        channel.writeAndFlush(msg + "\r\n");
    }
    channelFuture.channel().closeFuture();
}

快啓動試試看把,不過須要注意的是,得先啓動服務端哦~多線程

SpringBoot + Netty4實現rpc框架

好了,接下來就讓咱們進入正題,讓咱們利用咱們所學的知識去實現本身一個簡單的rpc框架吧

簡單說下RPC(Remote Procedure Call)遠程過程調用,簡單的理解是一個節點請求另外一個節點提供的服務。讓兩個服務之間調用就像調用本地方法同樣。

RPC時序圖:

QQ截圖20210421170511.png

RPC流程:

  1. 【客戶端】發起調用
  2. 【客戶端】數據編碼
  3. 【客戶端】發送編碼後的數據到服務端
  4. 【服務端】接收客戶端發送的數據
  5. 【服務端】對數據進行解碼
  6. 【服務端】處理消息業務並返回結果值
  7. 【服務端】對結果值編碼
  8. 【服務端】將編碼後的結果值回傳給客戶端
  9. 【客戶端】接收結果值
  10. 【客戶端】解碼結果值
  11. 【客戶端】處理返回數據業務

引入依賴

<dependencies>
    <!-- SpringBoot依賴 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring容器上下文 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
    </dependency>
    <!-- Spring配置 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Netty4 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.58.Final</version>
    </dependency>
    <!-- 工具 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.5.8</version>
    </dependency>
</dependencies>

編寫服務端

自定義消息協議:

/**
 * @author zc
 * @date 2021/3/1 17:43
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcMessage implements Serializable {
    private static final long serialVersionUID = 430507739718447406L;
    /**
     * interface接口名
     */
    private String name;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 參數類型
     */
    private Class<?>[] parTypes;
    /**
     * 參數
     */
    private Object[] pars;
    /**
     * 結果值
     */
    private Object result;
}

自定義Rpc註解:

/**
 * @author zc
 * @date 2021/3/2 15:36
 */
@Target(value = {ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RpcServer {
}

定義ServerHandle業務處理器:

/**
 * Netty Server端Handle處理類,消息體RpcMessage
 * 實現ApplicationContextAware接口:該接口能夠加載獲取到全部的 spring bean。
 * 實現了這個接口的bean,當spring容器初始化的時候,會自動的將ApplicationContext注入進來
 *
 * @author ZC
 * @date 2021/3/1 22:15
 */
@Slf4j
@ChannelHandler.Sharable
public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware {
    private Map<String, Object> serviceMap;

    /**
     * 在類被Spring容器加載時會自動執行setApplicationAware
     *
     * @param applicationContext Spring上下文
     * @throws BeansException 異常信息
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //從Spring容器中獲取到全部擁有@RpcServer註解的Beans集合,Map<Name(對象類型,對象全路徑名),實例對象>
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class);
        log.info("被@RpcServer註解加載的Bean: {}", beansWithAnnotation);
        if (beansWithAnnotation.size() > 0) {
            Map<String, Object> map = new ConcurrentHashMap<>(16);
            for (Object o : beansWithAnnotation.values()) {
                //獲取該實例對象實現的接口Class
                Class<?> anInterface = o.getClass().getInterfaces()[0];
                //獲取該接口類名,做爲Key,實例對象做爲Value
                map.put(anInterface.getName(), o);
            }
            //使用變量接住map
            serviceMap = map;
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客戶端鏈接了: {}", ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("異常信息");
        cause.printStackTrace();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
        log.info("客戶端發送的消息:{}", rpcMessage);
        //從Map中獲取實例對象
        Object service = serviceMap.get(rpcMessage.getName());
        //獲取調用方法
        Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes());
        method.setAccessible(true);
        //反射調用實例對象方法,獲取返回值
        Object result = method.invoke(service, rpcMessage.getPars());
        rpcMessage.setResult(JSONUtil.toJsonStr(result));
        log.info("回給客戶端的消息:{}", rpcMessage);
        //Netty服務端將數據寫會Channel併發送給客戶端,同時添加一個監聽器,當全部數據包發送完成後,關閉通道
        channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE);
    }
}

定義NettyServer端:

/**
 * Netty服務端
 *
 * @author zc
 * @date 2021/2/24 13:23
 **/
@Slf4j
public class NettyServer {

    /**
     * server端處理器
     */
    private final ServerHandle serverHandle;
    /**
     * 服務端通道
     */
    private Channel channel;

    /**
     * 構造器
     *
     * @param serverHandle server處理器
     */
    public NettyServer(ServerHandle serverHandle) {
        this.serverHandle = serverHandle;
    }

    /**
     * 啓動
     *
     * @param port 啓動端口
     */
    public void start(int port) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(serverHandle);
                        }
                    });

            final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
            log.info("服務端啓動-端口: {}", port);
            channel = channelFuture.channel();
            channel.closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    /**
     * 關閉當前通道
     */
    public void stop() {
        channel.close();
    }
}

自定義rpc配置屬性類:

/**
 * @author zc
 * @date 2021/3/4 23:38
 */
@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyRpcProperties {
    private int serverPort;
}`

建立Server端啓動配置類:

/**
 * NettyServer服務端配置類
 *
 * @author zc
 * @date 2021/3/1 18:24
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(NettyRpcProperties.class)
public class ServerBeanConfig {

    private final NettyRpcProperties nettyRpcProperties;

    @Autowired
    public ServerBeanConfig(NettyRpcProperties nettyRpcProperties) {
        this.nettyRpcProperties = nettyRpcProperties;
    }

    /**
     * 配置ServerHandle
     *
     * @return ServerHandle處理類
     */
    @Bean
    public ServerHandle serverHandle() {
        return new ServerHandle();
    }

    /**
     * 配置NettyServer
     *
     * @param handle ServerHandle處理類
     * @return NettyServer
     */
    @Bean
    public NettyServer nettyServer(ServerHandle handle) {
        NettyServer nettyServer = new NettyServer(handle);
//        nettyServer.start(nettyRpcProperties.getServerPort());
        return nettyServer;
    }

    /**
     * 解決SpringBoot端口沒法監聽問題
     */
    @Component
    static class NettyServerStart implements ApplicationRunner {
        private final NettyServer nettyServer;
        private final NettyRpcProperties properties;

        @Autowired
        NettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) {
            this.nettyServer = nettyServer;
            this.properties = properties;
        }

        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("===============ApplicationRunner");
            if (nettyServer != null) {
                nettyServer.start(properties.getServerPort());
            }
        }
    }
}

注入Spring容器

此時有兩種方式讓該配置自動注入Spring容器生效:

  1. 自動注入

    在resource目錄下建立META-INF目錄,建立spring.factories文件

    在該文件裏寫上

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包路徑:xxx.xxx.xxx}.${配置類:ServerBeanConfig}

    配置好以後,在SpringBoot啓動時會自動加載該配置類。

  2. 經過註解注入

    /**
     * 自定義SpringBoot啓動註解
     * 注入ServerBeanConfig配置類
     *
     * @author ZC
     * @date 2021/3/1 23:48
     */
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @ImportAutoConfiguration({ServerBeanConfig.class})
    public @interface EnableNettyServer {
    }

編寫客戶端


建立客戶端處理器`ClientHandle

/**
 * @author zc
 * @date 2021/3/2 15:19
 */
@Slf4j
@ChannelHandler.Sharable
public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> {
    /**
     * 定義消息Map,將鏈接通道Channel做爲key,消息返回值做爲value
     */
    private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap;

    public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) {
        this.rpcMessageConcurrentMap = rpcMessageConcurrentMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
        log.info("客戶端收到服務端消息:{}", rpcMessage);
        rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage);
    }
}

建立客戶端啓動類NettyClient

/**
 * @author ZC
 * @date 2021/3/1 23:30
 */
@Slf4j
public class NettyClient {

    private Channel channel;
    /**
     * 存放請求編號與響應對象的映射關係
     */
    private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>();

    public RpcMessage send(int port, final RpcMessage rpcMessage) {
        //客戶端須要一個事件循環組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(new ClientHandle(rpcMessageConcurrentMap));
                        }
                    });
            final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
            log.info("鏈接服務端成功: " + channelFuture.channel().remoteAddress());
            channel = channelFuture.channel();
            channel.writeAndFlush(rpcMessage);
            log.info("發送數據成功:{}", rpcMessage);
            channel.closeFuture().syncUninterruptibly();
            return rpcMessageConcurrentMap.get(channel);
        } catch (Exception e) {
            log.error("client exception", e);
            return null;
        } finally {
            group.shutdownGracefully();
            //移除請求編號和響應對象直接的映射關係
            rpcMessageConcurrentMap.remove(channel);
        }
    }

    public void stop() {
        channel.close();
    }
}

定義Netty客戶端Bean後置處理器

/**
 * Netty客戶端Bean後置處理器
 * 實現Spring後置處理器接口:BeanPostProcessor
 * 在Bean對象在實例化和依賴注入完畢後,在顯示調用初始化方法的先後添加咱們本身的邏輯。注意是Bean實例化完畢後及依賴注入完成後觸發的
 *
 * @author ZC
 * @date 2021/3/2 23:00
 */
@Slf4j
public class NettyClientBeanPostProcessor implements BeanPostProcessor {

    private final NettyClient nettyClient;

    public NettyClientBeanPostProcessor(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    /**
     * 實例化、依賴注入完畢,在調用顯示的初始化以前完成一些定製的初始化任務
     * 注意:方法返回值不能爲null
     * 若是返回null那麼在後續初始化方法將報空指針異常或者經過getBean()方法獲取不到Bean實例對象
     * 由於後置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException {
        //獲取實例Class
        Class<?> beanClass = bean.getClass();
        do {
            //獲取該類全部字段
            Field[] fields = beanClass.getDeclaredFields();
            for (Field field : fields) {
                //判斷該字段是否擁有@RpcServer
                if (field.getAnnotation(RpcServer.class) != null) {
                    field.setAccessible(true);
                    try {
                        //經過JDK動態代理獲取該類的代理對象
                        Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient));
                        //將代理類注入該字段
                        field.set(bean, o);
                        log.info("建立代理類 ===>>> {}", beanName);
                    } catch (IllegalAccessException e) {
                        log.error(e.getMessage());
                    }
                }
            }
        } while ((beanClass = beanClass.getSuperclass()) != null);
        return bean;
    }

    /**
     * 實例化、依賴注入、初始化完畢時執行
     * 注意:方法返回值不能爲null
     * 若是返回null那麼在後續初始化方法將報空指針異常或者經過getBean()方法獲取不到Bean實例對象
     * 由於後置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 能夠根據beanName不一樣執行不一樣的處理操做
        return bean;
    }

    /**
     * JDK動態代理處理器
     */
    static class ClientInvocationHandle implements InvocationHandler {
        private final NettyClient nettyClient;

        public ClientInvocationHandle(NettyClient nettyClient) {
            this.nettyClient = nettyClient;
        }

        /**
         * 代理方法調用
         *
         * @param proxy  代理類
         * @param method 方法
         * @param args   參數
         * @return 返回值
         */
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) {
            //組裝Netty參數
            RpcMessage rpcMessage = RpcMessage.builder()
                    .name(method.getDeclaringClass().getName())
                    .methodName(method.getName())
                    .parTypes(method.getParameterTypes())
                    .pars(args)
                    .build();
            //調用Netty,發送數據
            RpcMessage send = nettyClient.send(1111, rpcMessage);
            log.info("接收到服務端數據:{}, 返回結果值 ====》》》》{}", send, send.getResult());
            return send.getResult();
        }
    }
}

定義客戶端配置類

/**
 * @author zc
 * @date 2021/3/1 18:24
 */
@Configuration
public class ClientBeanConfig {

    @Bean
    public NettyClient nettyClient() {
        return new NettyClient();
    }

    @Bean
    public NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) {
        return new NettyClientBeanPostProcessor(nettyClient);
    }
}

最後和服務端同樣,注入Spring容器

/**
 * @author ZC
 * @date 2021/3/1 23:48
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@ImportAutoConfiguration({ClientBeanConfig.class})
public @interface EnableNettyClient {
}

至此咱們的SpringBoot + Netty4的就已經實現了最最簡單的rpc框架模式了;而後咱們就能夠引用咱們本身的rpc依賴了。

最後再執行一下maven命令

mvn install

netty-rpc-examples例子

接口服務

pom裏啥也沒有。。。

定義一個接口

/**
 * @author zc
 * @date 2021/3/1 17:55
 */
public interface Test1Api {

    void test();

    void test(int id, String name);

    String testStr(int id);

    Object testObj();
}

rpc-server服務端

正常的SpringBoot工程

引入pom

<!-- 自定義rpc依賴 -->
<dependency>
    <groupId>cn.happyloves.rpc</groupId>
    <artifactId>netty-rpc</artifactId>
    <version>0.0.1</version>
</dependency>
<!-- 接口依賴 -->
<dependency>
    <groupId>cn.happyloves.netty.rpc.examples.api</groupId>
    <artifactId>rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

配置屬性

# 應用名稱
spring.application.name=rpc-server
# 應用服務 WEB 訪問端口
server.port=8080
netty.server-port=1111

建立一個實體類

/**
 * @author ZC
 * @date 2021/3/2 23:59
 */
@Data
public class Account implements Serializable {
    private static final long serialVersionUID = 667178018106218163L;
    private Integer id;

    private String name;
    private String username;
    private String password;
}

建立Server實現Test1Api接口

/**
 * @author ZC
 * @date 2021/3/2 23:59
 */
@Slf4j
@Service
@RpcServer
public class TestServiceImpl implements Test1Api {
    @Override
    public void test() {
        log.info("111111111");
    }

    @Override
    public void test(int id, String name) {
        log.info("222222222,{},{}", id, name);
    }

    @Override
    public String testStr(int id) {
        log.info("33333333333333333,{}", id);
        return "33333333333333333 " + id;
    }

    @Override
    public Object testObj() {
        log.info("444444444444444444");
        Account account = new Account();
        account.setName("張三");
        return account;
    }
}

最後在SpringBoot啓動類上加上@EnableNettyServer

/**
 * @author ZC
 * @date 2021/3/2 23:55
 */
@EnableNettyServer
@SpringBootApplication
public class RpcServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RpcServerApplication.class, args);
    }
}

rpc-server客戶端

引入pom依賴

<dependency>
    <groupId>cn.happyloves.rpc</groupId>
    <artifactId>netty-rpc</artifactId>
    <version>0.0.1</version>
</dependency>
<dependency>
    <groupId>cn.happyloves.netty.rpc.examples.api</groupId>
    <artifactId>rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

建立Controller

/**
 * @author ZC
 * @date 2021/3/3 0:04
 */
@RestController
public class ClientController {
    @RpcServer
    private Test1Api testServiceImpl;

    @GetMapping("/test1")
    public void test() {
        testServiceImpl.test();
    }

    @GetMapping("/test2")
    public void test(int id, String name) {
        testServiceImpl.test(id, name);
    }

    @GetMapping("/test3")
    public String testStr(int id) {
        return testServiceImpl.testStr(id);
    }

    @GetMapping("/test4")
    public Object testObj() {
        return testServiceImpl.testObj();
    }
}

最後在啓動類上加上註解@EnableNettyClient

@EnableNettyClient
@SpringBootApplication
public class RpcClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(RpcClientApplication.class, args);
    }
}

先運行服務端,在運行客戶端,而後在調用客戶端接口就能夠看到服務端可以接收到客戶端發來的消息,而後服務端處理並返回,客戶端接收並返回。。。

至此,一個小demo就完成了。

固然啦,後續還有不少需求須要處理的,比方說當前demo中客戶端每次通訊都須要建立一個實例去鏈接、服務的註冊、客戶端和服務端是同一個應用等等,這個後面再慢慢完善吧
趙小胖我的博客:https://zc.happyloves.cn:4443/wordpress/

相關文章
相關標籤/搜索