Netty高性能架構設計java
簡單瞭解React線程模型,參考文章【五分鐘快速理解 Reactor 模型】react
舉例說明:Reactor的三種線程模型spring
模型特色:shell
IO
模式獲取輸入的數據問題分析:編程
read
操做,形成線程資源浪費。針對傳統阻塞I/O服務模型的2個缺點,解決方案以下:bootstrap
I/O
複用模型:多個鏈接共用一個阻塞對象,應用程序只須要在一個阻塞對象等待,無需阻塞等待全部鏈接。當某個鏈接有新的數據能夠處理時,操做系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。Reactor
對應的叫法: 1. 反應器模式 2. 分發者模式(Dispatcher
) 3. 通知者模式(notifier
)模型分析api
模型分析緩存
cpu
的處理能力reactor
處理全部的事件的監聽和響應,在單線程運行, 在高併發場景容易出現性能瓶頸.模型分析服務器
public static void main(String[] args) { //建立鏈接線程組,線程數爲1。只負責處理鏈接請求 NioEventLoopGroup boss = new NioEventLoopGroup(1); //建立工做線程組,線程數默認爲cpu核數*2。處理與客戶端的業務處理 NioEventLoopGroup worker = new NioEventLoopGroup(); //建立Server端的啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //配置線程組 serverBootstrap.group(boss, worker) //使用 NioServerSocketChannel 做爲服務器的通道實現 .channel(NioServerSocketChannel.class) //給worker線程組初始化處理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //添加字符串的編解碼器 .addLast(new StringDecoder()) .addLast(new StringEncoder()) //添加對象的編解碼器,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器,防止內存溢出 .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) //添加自定義的業務處理器 .addLast(new SimpleChannelInboundHandler<Object>() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客戶端鏈接啦。。。客戶端地址:{}", ctx.channel().remoteAddress()); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { log.info("服務端接收到的數據:{}", o.toString()); //價值1個億的AI代碼 String str = o.toString(); str = str.replace("嗎", ""); str = str.replace("?", "!"); str = str.replace("? ", "! "); channelHandlerContext.writeAndFlush(str); } }); } }); //啓動而且監聽 ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly(); //監聽關閉通道 channelFuture.channel().closeFuture(); }
public static void main(String[] args) { //設置客戶端工做線程 NioEventLoopGroup worker = new NioEventLoopGroup(); //建立客戶端啓動對象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker) //通道鏈接者 .channel(NioSocketChannel.class) //給worker線程組初始化處理器 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() //添加字符串的編解碼器 .addLast(new StringDecoder()) .addLast(new StringEncoder()) //添加對象的編解碼器,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器,防止內存溢出 .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) //添加自定義的業務處理器 .addLast(new SimpleChannelInboundHandler<Object>() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("哈哈哈"); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { log.info("客戶端接收到的數據:{}", o.toString()); } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly(); //客戶端須要輸入信息,建立一個掃描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //經過channel發送到服務器端 channel.writeAndFlush(msg + "\r\n"); } channelFuture.channel().closeFuture(); }
快啓動試試看把,不過須要注意的是,得先啓動服務端哦~多線程
好了,接下來就讓咱們進入正題,讓咱們利用咱們所學的知識去實現本身一個簡單的rpc框架吧
簡單說下RPC(Remote Procedure Call)遠程過程調用,簡單的理解是一個節點請求另外一個節點提供的服務。讓兩個服務之間調用就像調用本地方法同樣。
RPC時序圖:
RPC流程:
- 【客戶端】發起調用
- 【客戶端】數據編碼
- 【客戶端】發送編碼後的數據到服務端
- 【服務端】接收客戶端發送的數據
- 【服務端】對數據進行解碼
- 【服務端】處理消息業務並返回結果值
- 【服務端】對結果值編碼
- 【服務端】將編碼後的結果值回傳給客戶端
- 【客戶端】接收結果值
- 【客戶端】解碼結果值
- 【客戶端】處理返回數據業務
<dependencies> <!-- SpringBoot依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Spring容器上下文 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> <!-- Spring配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- Netty4 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.58.Final</version> </dependency> <!-- 工具 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.8</version> </dependency> </dependencies>
自定義消息協議:
/** * @author zc * @date 2021/3/1 17:43 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class RpcMessage implements Serializable { private static final long serialVersionUID = 430507739718447406L; /** * interface接口名 */ private String name; /** * 方法名 */ private String methodName; /** * 參數類型 */ private Class<?>[] parTypes; /** * 參數 */ private Object[] pars; /** * 結果值 */ private Object result; }
自定義Rpc註解:
/** * @author zc * @date 2021/3/2 15:36 */ @Target(value = {ElementType.TYPE, ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RpcServer { }
定義ServerHandle
業務處理器:
/** * Netty Server端Handle處理類,消息體RpcMessage * 實現ApplicationContextAware接口:該接口能夠加載獲取到全部的 spring bean。 * 實現了這個接口的bean,當spring容器初始化的時候,會自動的將ApplicationContext注入進來 * * @author ZC * @date 2021/3/1 22:15 */ @Slf4j @ChannelHandler.Sharable public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware { private Map<String, Object> serviceMap; /** * 在類被Spring容器加載時會自動執行setApplicationAware * * @param applicationContext Spring上下文 * @throws BeansException 異常信息 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //從Spring容器中獲取到全部擁有@RpcServer註解的Beans集合,Map<Name(對象類型,對象全路徑名),實例對象> Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class); log.info("被@RpcServer註解加載的Bean: {}", beansWithAnnotation); if (beansWithAnnotation.size() > 0) { Map<String, Object> map = new ConcurrentHashMap<>(16); for (Object o : beansWithAnnotation.values()) { //獲取該實例對象實現的接口Class Class<?> anInterface = o.getClass().getInterfaces()[0]; //獲取該接口類名,做爲Key,實例對象做爲Value map.put(anInterface.getName(), o); } //使用變量接住map serviceMap = map; } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客戶端鏈接了: {}", ctx.channel().remoteAddress()); super.channelActive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("異常信息"); cause.printStackTrace(); super.exceptionCaught(ctx, cause); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { log.info("客戶端發送的消息:{}", rpcMessage); //從Map中獲取實例對象 Object service = serviceMap.get(rpcMessage.getName()); //獲取調用方法 Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes()); method.setAccessible(true); //反射調用實例對象方法,獲取返回值 Object result = method.invoke(service, rpcMessage.getPars()); rpcMessage.setResult(JSONUtil.toJsonStr(result)); log.info("回給客戶端的消息:{}", rpcMessage); //Netty服務端將數據寫會Channel併發送給客戶端,同時添加一個監聽器,當全部數據包發送完成後,關閉通道 channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE); } }
定義NettyServer
端:
/** * Netty服務端 * * @author zc * @date 2021/2/24 13:23 **/ @Slf4j public class NettyServer { /** * server端處理器 */ private final ServerHandle serverHandle; /** * 服務端通道 */ private Channel channel; /** * 構造器 * * @param serverHandle server處理器 */ public NettyServer(ServerHandle serverHandle) { this.serverHandle = serverHandle; } /** * 啓動 * * @param port 啓動端口 */ public void start(int port) { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(serverHandle); } }); final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("服務端啓動-端口: {}", port); channel = channelFuture.channel(); channel.closeFuture().syncUninterruptibly(); } catch (Exception e) { boss.shutdownGracefully(); worker.shutdownGracefully(); } } /** * 關閉當前通道 */ public void stop() { channel.close(); } }
自定義rpc配置屬性類:
/** * @author zc * @date 2021/3/4 23:38 */ @Component @ConfigurationProperties(prefix = "netty") @Data public class NettyRpcProperties { private int serverPort; }`
建立Server端啓動配置類:
/** * NettyServer服務端配置類 * * @author zc * @date 2021/3/1 18:24 */ @Slf4j @Configuration @EnableConfigurationProperties(NettyRpcProperties.class) public class ServerBeanConfig { private final NettyRpcProperties nettyRpcProperties; @Autowired public ServerBeanConfig(NettyRpcProperties nettyRpcProperties) { this.nettyRpcProperties = nettyRpcProperties; } /** * 配置ServerHandle * * @return ServerHandle處理類 */ @Bean public ServerHandle serverHandle() { return new ServerHandle(); } /** * 配置NettyServer * * @param handle ServerHandle處理類 * @return NettyServer */ @Bean public NettyServer nettyServer(ServerHandle handle) { NettyServer nettyServer = new NettyServer(handle); // nettyServer.start(nettyRpcProperties.getServerPort()); return nettyServer; } /** * 解決SpringBoot端口沒法監聽問題 */ @Component static class NettyServerStart implements ApplicationRunner { private final NettyServer nettyServer; private final NettyRpcProperties properties; @Autowired NettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) { this.nettyServer = nettyServer; this.properties = properties; } @Override public void run(ApplicationArguments args) throws Exception { log.info("===============ApplicationRunner"); if (nettyServer != null) { nettyServer.start(properties.getServerPort()); } } } }
注入Spring容器
此時有兩種方式讓該配置自動注入Spring容器生效:
自動注入
在resource目錄下建立META-INF目錄,建立spring.factories文件
在該文件裏寫上
org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包路徑:xxx.xxx.xxx}.${配置類:ServerBeanConfig}
配置好以後,在SpringBoot啓動時會自動加載該配置類。
經過註解注入
/** * 自定義SpringBoot啓動註解 * 注入ServerBeanConfig配置類 * * @author ZC * @date 2021/3/1 23:48 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ServerBeanConfig.class}) public @interface EnableNettyServer { }
建立客戶端處理器`ClientHandle
/** * @author zc * @date 2021/3/2 15:19 */ @Slf4j @ChannelHandler.Sharable public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> { /** * 定義消息Map,將鏈接通道Channel做爲key,消息返回值做爲value */ private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap; public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) { this.rpcMessageConcurrentMap = rpcMessageConcurrentMap; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { log.info("客戶端收到服務端消息:{}", rpcMessage); rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage); } }
建立客戶端啓動類NettyClient
/** * @author ZC * @date 2021/3/1 23:30 */ @Slf4j public class NettyClient { private Channel channel; /** * 存放請求編號與響應對象的映射關係 */ private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>(); public RpcMessage send(int port, final RpcMessage rpcMessage) { //客戶端須要一個事件循環組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))) .addLast(new ObjectEncoder()) .addLast(new ClientHandle(rpcMessageConcurrentMap)); } }); final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly(); log.info("鏈接服務端成功: " + channelFuture.channel().remoteAddress()); channel = channelFuture.channel(); channel.writeAndFlush(rpcMessage); log.info("發送數據成功:{}", rpcMessage); channel.closeFuture().syncUninterruptibly(); return rpcMessageConcurrentMap.get(channel); } catch (Exception e) { log.error("client exception", e); return null; } finally { group.shutdownGracefully(); //移除請求編號和響應對象直接的映射關係 rpcMessageConcurrentMap.remove(channel); } } public void stop() { channel.close(); } }
定義Netty客戶端Bean後置處理器
/** * Netty客戶端Bean後置處理器 * 實現Spring後置處理器接口:BeanPostProcessor * 在Bean對象在實例化和依賴注入完畢後,在顯示調用初始化方法的先後添加咱們本身的邏輯。注意是Bean實例化完畢後及依賴注入完成後觸發的 * * @author ZC * @date 2021/3/2 23:00 */ @Slf4j public class NettyClientBeanPostProcessor implements BeanPostProcessor { private final NettyClient nettyClient; public NettyClientBeanPostProcessor(NettyClient nettyClient) { this.nettyClient = nettyClient; } /** * 實例化、依賴注入完畢,在調用顯示的初始化以前完成一些定製的初始化任務 * 注意:方法返回值不能爲null * 若是返回null那麼在後續初始化方法將報空指針異常或者經過getBean()方法獲取不到Bean實例對象 * 由於後置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中 */ @Override public Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException { //獲取實例Class Class<?> beanClass = bean.getClass(); do { //獲取該類全部字段 Field[] fields = beanClass.getDeclaredFields(); for (Field field : fields) { //判斷該字段是否擁有@RpcServer if (field.getAnnotation(RpcServer.class) != null) { field.setAccessible(true); try { //經過JDK動態代理獲取該類的代理對象 Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient)); //將代理類注入該字段 field.set(bean, o); log.info("建立代理類 ===>>> {}", beanName); } catch (IllegalAccessException e) { log.error(e.getMessage()); } } } } while ((beanClass = beanClass.getSuperclass()) != null); return bean; } /** * 實例化、依賴注入、初始化完畢時執行 * 注意:方法返回值不能爲null * 若是返回null那麼在後續初始化方法將報空指針異常或者經過getBean()方法獲取不到Bean實例對象 * 由於後置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中 */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 能夠根據beanName不一樣執行不一樣的處理操做 return bean; } /** * JDK動態代理處理器 */ static class ClientInvocationHandle implements InvocationHandler { private final NettyClient nettyClient; public ClientInvocationHandle(NettyClient nettyClient) { this.nettyClient = nettyClient; } /** * 代理方法調用 * * @param proxy 代理類 * @param method 方法 * @param args 參數 * @return 返回值 */ @Override public Object invoke(Object proxy, Method method, Object[] args) { //組裝Netty參數 RpcMessage rpcMessage = RpcMessage.builder() .name(method.getDeclaringClass().getName()) .methodName(method.getName()) .parTypes(method.getParameterTypes()) .pars(args) .build(); //調用Netty,發送數據 RpcMessage send = nettyClient.send(1111, rpcMessage); log.info("接收到服務端數據:{}, 返回結果值 ====》》》》{}", send, send.getResult()); return send.getResult(); } } }
定義客戶端配置類
/** * @author zc * @date 2021/3/1 18:24 */ @Configuration public class ClientBeanConfig { @Bean public NettyClient nettyClient() { return new NettyClient(); } @Bean public NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) { return new NettyClientBeanPostProcessor(nettyClient); } }
最後和服務端同樣,注入Spring容器
/** * @author ZC * @date 2021/3/1 23:48 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ClientBeanConfig.class}) public @interface EnableNettyClient { }
至此咱們的SpringBoot + Netty4的就已經實現了最最簡單的rpc框架模式了;而後咱們就能夠引用咱們本身的rpc依賴了。
最後再執行一下maven命令
mvn install
pom裏啥也沒有。。。
定義一個接口
/** * @author zc * @date 2021/3/1 17:55 */ public interface Test1Api { void test(); void test(int id, String name); String testStr(int id); Object testObj(); }
正常的SpringBoot工程
引入pom
<!-- 自定義rpc依賴 --> <dependency> <groupId>cn.happyloves.rpc</groupId> <artifactId>netty-rpc</artifactId> <version>0.0.1</version> </dependency> <!-- 接口依賴 --> <dependency> <groupId>cn.happyloves.netty.rpc.examples.api</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
配置屬性
# 應用名稱 spring.application.name=rpc-server # 應用服務 WEB 訪問端口 server.port=8080 netty.server-port=1111
建立一個實體類
/** * @author ZC * @date 2021/3/2 23:59 */ @Data public class Account implements Serializable { private static final long serialVersionUID = 667178018106218163L; private Integer id; private String name; private String username; private String password; }
建立Server實現Test1Api接口
/** * @author ZC * @date 2021/3/2 23:59 */ @Slf4j @Service @RpcServer public class TestServiceImpl implements Test1Api { @Override public void test() { log.info("111111111"); } @Override public void test(int id, String name) { log.info("222222222,{},{}", id, name); } @Override public String testStr(int id) { log.info("33333333333333333,{}", id); return "33333333333333333 " + id; } @Override public Object testObj() { log.info("444444444444444444"); Account account = new Account(); account.setName("張三"); return account; } }
最後在SpringBoot啓動類上加上@EnableNettyServer
/** * @author ZC * @date 2021/3/2 23:55 */ @EnableNettyServer @SpringBootApplication public class RpcServerApplication { public static void main(String[] args) { SpringApplication.run(RpcServerApplication.class, args); } }
引入pom依賴
<dependency> <groupId>cn.happyloves.rpc</groupId> <artifactId>netty-rpc</artifactId> <version>0.0.1</version> </dependency> <dependency> <groupId>cn.happyloves.netty.rpc.examples.api</groupId> <artifactId>rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
建立Controller
/** * @author ZC * @date 2021/3/3 0:04 */ @RestController public class ClientController { @RpcServer private Test1Api testServiceImpl; @GetMapping("/test1") public void test() { testServiceImpl.test(); } @GetMapping("/test2") public void test(int id, String name) { testServiceImpl.test(id, name); } @GetMapping("/test3") public String testStr(int id) { return testServiceImpl.testStr(id); } @GetMapping("/test4") public Object testObj() { return testServiceImpl.testObj(); } }
最後在啓動類上加上註解@EnableNettyClient
@EnableNettyClient @SpringBootApplication public class RpcClientApplication { public static void main(String[] args) { SpringApplication.run(RpcClientApplication.class, args); } }
先運行服務端,在運行客戶端,而後在調用客戶端接口就能夠看到服務端可以接收到客戶端發來的消息,而後服務端處理並返回,客戶端接收並返回。。。
至此,一個小demo就完成了。
固然啦,後續還有不少需求須要處理的,比方說當前demo中客戶端每次通訊都須要建立一個實例去鏈接、服務的註冊、客戶端和服務端是同一個應用等等,這個後面再慢慢完善吧
趙小胖我的博客:https://zc.happyloves.cn:4443/wordpress/