本節經過案例介紹springboot與netty的集成java
第一步:新建Spring Initializr 項目spring
我這裏選擇Gradle項目,也可選擇Maven項目bootstrap
(注意:最好選擇本身下載gradle,以下圖)springboot
而後修改build.gradle文件,加入依賴(須要安裝Lombok插件)服務器
plugins { id 'org.springframework.boot' version '2.1.5.RELEASE' id 'java' } apply plugin: 'io.spring.dependency-management' group = 'com.spring.netty' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' targetCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile( "junit:junit:4.12", "io.netty:netty-all:4.1.36.Final", "org.springframework.boot:spring-boot-starter", "org.springframework.boot:spring-boot-starter-test",
"org.projectlombok:lombok:1.18.8"
) }
接下來編寫服務端程序app
package com.spring.netty.springbootnettydemo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * description: * @since 2019/05/21 **/ @Component public class NettyTcpServer { private static final Logger log = LoggerFactory.getLogger(NettyTcpServer.class); //boss事件輪詢線程組 //處理Accept鏈接事件的線程,這裏線程數設置爲1便可,netty處理連接事件默認爲單線程,過分設置反而浪費cpu資源 private EventLoopGroup boss = new NioEventLoopGroup(1); //worker事件輪詢線程組 //處理hadnler的工做線程,其實也就是處理IO讀寫 。線程數據默認爲 CPU 核心數乘以2 private EventLoopGroup worker = new NioEventLoopGroup(); @Autowired ServerChannelInitializer serverChannelInitializer; @Value("${netty.tcp.client.port}") private Integer port; //與客戶端創建鏈接後獲得的通道對象 private Channel channel; /** * 存儲client的channel * key:ip,value:Channel */ public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>(); /** * 開啓Netty tcp server服務 * * @return */ public ChannelFuture start() { //啓動類 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker)//組配置,初始化ServerBootstrap的線程組 .channel(NioServerSocketChannel.class)///構造channel通道工廠//bossGroup的通道,只是負責鏈接 .childHandler(serverChannelInitializer)//設置通道處理者ChannelHandler////workerGroup的處理器 .option(ChannelOption.SO_BACKLOG, 1024)//socket參數,當服務器請求處理程全滿時,用於臨時存放已完成三次握手請求的隊列的最大長度。若是未設置或所設置的值小於1,Java將使用默認值50。 .childOption(ChannelOption.SO_KEEPALIVE, true);//啓用心跳保活機制,tcp,默認2小時發一次心跳 //Future:異步任務的生命週期,可用來獲取任務結果 ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//綁定端口,開啓監聽,同步等待 if (channelFuture1 != null && channelFuture1.isSuccess()) { channel = channelFuture1.channel();//獲取通道 log.info("Netty tcp server start success, port = {}", port); } else { log.error("Netty tcp server start fail"); } return channelFuture1; } /** * 中止Netty tcp server服務 */ @PreDestroy public void destroy() { if (channel != null) { channel.close(); } try { Future<?> future = worker.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("netty tcp workerGroup shutdown fail, {}", future.cause()); } Future<?> future1 = boss.shutdownGracefully().await(); if (!future1.isSuccess()) { log.error("netty tcp bossGroup shutdown fail, {}", future1.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } log.info("Netty tcp server shutdown success"); } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * description: 通道初始化,主要用於設置各類Handler * @since 2019/05/21 **/ @Component public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired ServerChannelHandler serverChannelHandler; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //IdleStateHandler心跳機制,若是超時觸發Handle中userEventTrigger()方法 pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //字符串編解碼器 pipeline.addLast( new StringDecoder(), new StringEncoder() ); //自定義Handler pipeline.addLast("serverChannelHandler", serverChannelHandler); } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * description: * @since 2019/05/21 **/ @Component @ChannelHandler.Sharable @Slf4j public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { /** * 拿到傳過來的msg數據,開始處理 * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Netty tcp server receive msg : " + msg); ctx.channel().writeAndFlush(" response msg ").syncUninterruptibly(); } /** * 活躍的、有效的通道 * 第一次鏈接成功後進入的方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); log.info("tcp client " + getRemoteAddress(ctx) + " connect success"); //往channel map中添加channel信息 NettyTcpServer.map.put(getIPString(ctx), ctx.channel()); } /** * 不活動的通道 * 鏈接丟失後執行的方法(client端可據此實現斷線重連) * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //刪除Channel Map中的失效Client NettyTcpServer.map.remove(getIPString(ctx)); ctx.close(); } /** * 異常處理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); //發生異常,關閉鏈接 log.error("引擎 {} 的通道發生異常,即將斷開鏈接", getRemoteAddress(ctx)); ctx.close();//再次建議close } /** * 心跳機制,超時處理 * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 讀超時"); ctx.disconnect();//斷開 } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 寫超時"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 總超時"); ctx.disconnect(); } } } /** * 獲取client對象:ip+port * * @param ctx * @return */ public String getRemoteAddress(ChannelHandlerContext ctx) { String socketString = ""; socketString = ctx.channel().remoteAddress().toString(); return socketString; } /** * 獲取client的ip * * @param ctx * @return */ public String getIPString(ChannelHandlerContext ctx) { String ipString = ""; String socketString = ctx.channel().remoteAddress().toString(); int colonAt = socketString.indexOf(":"); ipString = socketString.substring(1, colonAt); return ipString; } }
編寫客戶端代碼異步
package com.spring.netty.springbootnettydemo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * description: * @since 2019/05/21 **/ @Component public class NettyTcpClient { private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class); @Value(("${netty.tcp.server.host}")) String HOST; @Value("${netty.tcp.server.port}") int PORT; @Autowired ClientChannelInitializer clientChannelInitializer; //與服務端創建鏈接後獲得的通道對象 private Channel channel; /** * 初始化 `Bootstrap` 客戶端引導程序 * * @return */ private final Bootstrap getBootstrap() { Bootstrap b = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); b.group(group) .channel(NioSocketChannel.class)//通道鏈接者 .handler(clientChannelInitializer)//通道處理者 .option(ChannelOption.SO_KEEPALIVE, true);//心跳報活 return b; } /** * 創建鏈接,獲取鏈接通道對象 * * @return */ public void connect() { ChannelFuture channelFuture = getBootstrap().connect(HOST, PORT).syncUninterruptibly(); if (channelFuture != null && channelFuture.isSuccess()) { channel = channelFuture.channel(); log.info("connect tcp server host = {}, port = {} success", HOST, PORT); } else { log.error("connect tcp server host = {}, port = {} fail", HOST, PORT); } } /** * 向服務器發送消息 * * @param msg * @throws Exception */ public void sendMsg(Object msg) throws Exception { if (channel != null) { channel.writeAndFlush(msg).sync(); } else { log.warn("消息發送失敗,鏈接還沒有創建!"); } } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * description: 通道初始化,主要用於設置各類Handler * @since 2019/05/21 **/ @Component public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired ClientChannelHandler clientChannelHandler; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //IdleStateHandler心跳機制,若是超時觸發Handle中userEventTrigger()方法 pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //字符串編解碼器 pipeline.addLast( new StringDecoder(), new StringEncoder() ); //自定義Handler pipeline.addLast("clientChannelHandler", clientChannelHandler); } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.springframework.stereotype.Component; /** * description: * @since 2019/05/21 **/ @Component @ChannelHandler.Sharable public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { /** * 從服務器接收到的msg * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Netty tcp client receive msg : " + msg); } }
最後修改springboot啓動程序socket
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelFuture; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringbootNettyDemoApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringbootNettyDemoApplication.class, args); } @Autowired NettyTcpServer nettyTcpServer; @Autowired NettyTcpClient nettyTcpClient; @Override public void run(String... args) throws Exception { //啓動服務端 ChannelFuture start = nettyTcpServer.start(); //啓動客戶端,發送數據 nettyTcpClient.connect(); for (int i = 0; i < 10; i++) { nettyTcpClient.sendMsg("hello world" + i); } //服務端管道關閉的監聽器並同步阻塞,直到channel關閉,線程纔會往下執行,結束進程 start.channel().closeFuture().syncUninterruptibly(); } }
運行測試效果以下:maven
本節咱們經過一個案例將springboot與netty結合實現了異步通訊,下節咱們將詳細介紹Netty的相關知識tcp