最近有朋友向我詢問一些Netty與SpringBoot整合的相關問題,這裏,我就總結了一下基本整合流程,也就是說,這篇文章 ,默認你們是對netty與Spring,SpringMVC的整合是沒有什麼問題的。如今,就進入正題吧。javascript
總的來講,服務端仍是比較簡單的,本身一共寫了三個核心類。分別是java
下面開始集成過程:web
在pom.xml中添加如下依賴spring
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
讓SpringBoot的啓動類實現CommandLineRunner接口並重寫run方法,好比個人啓動類是CloudApplication.javabootstrap
@SpringBootApplication public class CloudApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(CloudApplication.class, args); } @Override public void run(String... strings) { } }
建立類NettyServerListener.java服務器
// 讀取yml的一個配置類 import com.edu.hart.modules.constant.NettyConfig; // Netty鏈接信息配置類 import com.edu.hart.modules.constant.NettyConstant; // import com.edu.hart.rpc.util.ObjectCodec; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; /** * 服務啓動監聽器 * * @author 葉雲軒 */ @Component public class NettyServerListener { /** * NettyServerListener 日誌輸出器 * * @author 葉雲軒 create by 2017/10/31 18:05 */ private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class); /** * 建立bootstrap */ ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * BOSS */ EventLoopGroup boss = new NioEventLoopGroup(); /** * Worker */ EventLoopGroup work = new NioEventLoopGroup(); /** * 通道適配器 */ @Resource private ServerChannelHandlerAdapter channelHandlerAdapter; /** * NETT服務器配置類 */ @Resource private NettyConfig nettyConfig; /** * 關閉服務器方法 */ @PreDestroy public void close() { LOGGER.info("關閉服務器...."); //優雅退出 boss.shutdownGracefully(); work.shutdownGracefully(); } /** * 開啓及服務線程 */ public void start() { // 從配置文件中(application.yml)獲取服務端監聽端口號 int port = nettyConfig.getPort(); serverBootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)); try { //設置事件處理 serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength() , 0, 2, 0, 2)); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new ObjectCodec()); pipeline.addLast(channelHandlerAdapter); } }); LOGGER.info("netty服務器在[{}]端口啓動監聽", port); ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.info("[出現異常] 釋放資源"); boss.shutdownGracefully(); work.shutdownGracefully(); } } }
建立類ServerChannelHandlerAdapter.java - 通道適配器markdown
// 記錄調用方法的元信息的類 import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 多線程共享 */ @Component @Sharable public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter { /** * 日誌處理 */ private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter.class); /** * 注入請求分排器 */ @Resource private RequestDispatcher dispatcher; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg; // 屏蔽toString()方法 if (invokeMeta.getMethodName().endsWith("toString()") && !"class java.lang.String".equals(invokeMeta.getReturnType().toString())) logger.info("客戶端傳入參數 :{},返回值:{}", invokeMeta.getArgs(), invokeMeta.getReturnType()); dispatcher.dispatcher(ctx, invokeMeta); } }
RequestDispatcher.java數據結構
// 封裝的返回信息枚舉類 import com.edu.hart.modules.communicate.ResponseCodeEnum; // 封裝的返回信息實體類 import com.edu.hart.modules.communicate.ResponseResult; // 封裝的鏈接常量類 import com.edu.hart.modules.constant.NettyConstant; // 記錄元方法信息的實體類 import com.edu.hart.rpc.entity.MethodInvokeMeta; // 對於返回值爲空的一個處理 import com.edu.hart.rpc.entity.NullWritable; // 封裝的返回信息實體工具類 import com.edu.hart.rpc.util.ResponseResultUtil; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 請求分排器 */ @Component public class RequestDispatcher implements ApplicationContextAware { private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads()); private ApplicationContext app; /** * 發送 * * @param ctx * @param invokeMeta */ public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) { executorService.submit(() -> { ChannelFuture f = null; try { Class<?> interfaceClass = invokeMeta.getInterfaceClass(); String name = invokeMeta.getMethodName(); Object[] args = invokeMeta.getArgs(); Class<?>[] parameterTypes = invokeMeta.getParameterTypes(); Object targetObject = app.getBean(interfaceClass); Method method = targetObject.getClass().getMethod(name, parameterTypes); Object obj = method.invoke(targetObject, args); if (obj == null) { f = ctx.writeAndFlush(NullWritable.nullWritable()); } else { f = ctx.writeAndFlush(obj); } f.addListener(ChannelFutureListener.CLOSE); } catch (Exception e) { ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR); f = ctx.writeAndFlush(error); } finally { f.addListener(ChannelFutureListener.CLOSE); } }); } /** * 加載當前application.xml * * @param ctx * @throws BeansException */ public void setApplicationContext(ApplicationContext ctx) throws BeansException { this.app = ctx; } }
application.yml文件中對於netty的一個配置多線程
netty: port: 11111
NettyConfig.javaapp
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * 讀取yml配置文件中的信息 * Created by 葉雲軒 on 2017/10/31 - 18:38 * Concat tdg_yyx@foxmail.com */ @Component @ConfigurationProperties(prefix = "netty") public class NettyConfig { private int port; public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
NettyConstanct.java
import org.springframework.stereotype.Component; /** * Netty服務器常量 * Created by 葉雲軒 on 2017/10/31 - 17:47 * Concat tdg_yyx@foxmail.com */ @Component public class NettyConstant { /** * 最大線程量 */ private static final int MAX_THREADS = 1024; /** * 數據包最大長度 */ private static final int MAX_FRAME_LENGTH = 65535; public static int getMaxFrameLength() { return MAX_FRAME_LENGTH; } public static int getMaxThreads() { return MAX_THREADS; } }
至此,netty服務端算是與SpringBoot整合成功。那麼看一下啓動狀況吧。
Client我感受要比Server端要麻煩一點。這裏仍是先給出核心類吧。
在Client端裏。SpringBoot的啓動類要繼承SpringBootServletInitializer這個類,並覆蓋SpringApplicationBuilder方法
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.support.SpringBootServletInitializer; @SpringBootApplication public class OaApplication extends SpringBootServletInitializer { public static void main(String[] args) { SpringApplication.run(OaApplication.class, args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(OaApplication.class); } }
NettyClient.java
// 記錄元方法信息的實體類 import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.management.MBeanServer; /** * 客戶端發送類 * Created by 葉雲軒 on 2017/6/16-16:58 * Concat tdg_yyx@foxmail.com */ public class NettyClient { private Logger logger = LoggerFactory.getLogger(MBeanServer.class); private Bootstrap bootstrap; private EventLoopGroup worker; private int port; private String url; private int MAX_RETRY_TIMES = 10; public NettyClient(String url, int port) { this.url = url; this.port = port; bootstrap = new Bootstrap(); worker = new NioEventLoopGroup(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); } public void close() { logger.info("關閉資源"); worker.shutdownGracefully(); } public Object remoteCall(final MethodInvokeMeta cmd, int retry) { try { CustomChannelInitializerClient customChannelInitializer = new CustomChannelInitializerClient(cmd); bootstrap.handler(customChannelInitializer); ChannelFuture sync = bootstrap.connect(url, port).sync(); sync.channel().closeFuture().sync(); Object response = customChannelInitializer.getResponse(); return response; } catch (InterruptedException e) { retry++; if (retry > MAX_RETRY_TIMES) { throw new RuntimeException("調用Wrong"); } else { try { Thread.sleep(100); } catch (InterruptedException e1) { e1.printStackTrace(); } logger.info("第{}次嘗試....失敗", retry); return remoteCall(cmd, retry); } } } }
ClientChannelHandlerAdapter.java
import com.edu.hart.rpc.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Created by 葉雲軒 on 2017/6/16-17:03 * Concat tdg_yyx@foxmail.com */ public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter { private Logger logger = LoggerFactory.getLogger(ClientChannelHandlerAdapter.class); private MethodInvokeMeta methodInvokeMeta; private CustomChannelInitializerClient channelInitializerClient; public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, CustomChannelInitializerClient channelInitializerClient) { this.methodInvokeMeta = methodInvokeMeta; this.channelInitializerClient = channelInitializerClient; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.info("客戶端出異常了,異常信息:{}", cause.getMessage()); cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (methodInvokeMeta.getMethodName().endsWith("toString") && !"class java.lang.String".equals(methodInvokeMeta.getReturnType().toString())) logger.info("客戶端發送信息參數:{},信息返回值類型:{}", methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType()); ctx.writeAndFlush(methodInvokeMeta); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { channelInitializerClient.setResponse(msg); } }
CustomChannelInitializerClient.java
import com.edu.hart.rpc.entity.MethodInvokeMeta; import com.edu.hart.rpc.entity.NullWritable; import com.edu.hart.rpc.util.ObjectCodec; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/**
private Logger logger = LoggerFactory.getLogger(CustomChannelInitializerClient.class); private MethodInvokeMeta methodInvokeMeta; private Object response; public CustomChannelInitializerClient(MethodInvokeMeta methodInvokeMeta) { if (!"toString".equals(methodInvokeMeta.getMethodName())) { logger.info("[CustomChannelInitializerClient] 調用方法名:{},入參:{},參數類型:{},返回值類型{}" , methodInvokeMeta.getMethodName() , methodInvokeMeta.getArgs() , methodInvokeMeta.getParameterTypes() , methodInvokeMeta.getReturnType()); } this.methodInvokeMeta = methodInvokeMeta; } public Object getResponse() { if (response instanceof NullWritable) { return null; } return response; } public void setResponse(Object response) { this.response = response; } @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2)); pipeline.addLast(new ObjectCodec()); pipeline.addLast(new ClientChannelHandlerAdapter(methodInvokeMeta, this)); }
}
4. RPCProxyFactoryBean.java
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.util.WrapMethodUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* Created by 葉雲軒 on 2017/6/16-17:16
* Concat tdg_yyx@foxmail.com
*/
public class RPCProxyFactoryBean extends AbstractFactoryBean