什麼是Netty框架?html
爲何要用Netty框架?java
怎麼用Netty框架?spring
Netty 是一個廣受歡迎的異步事件驅動的Java開源網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。apache
Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序。編程
Netty 是一個基於 NIO 的網絡編程框架,使用 Netty 能夠幫助你快速、簡單的開發出一個網絡應用,至關於簡化和流程化了 NIO 的開發過程。bootstrap
做爲當前最流行的 NIO 框架,Netty 在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,知名的 Elasticsearch 、Dubbo 框架內部都採用了 Netty。api
由於Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了JDK 原生 NIO 程序的問題。緩存
JDK 原生 NIO 程序的問題:安全
JDK 原生也有一套網絡應用程序 API,可是存在一系列問題,主要以下:服務器
1)NIO 的類庫和 API 繁雜,使用麻煩:你須要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
2)須要具有其餘的額外技能作鋪墊:例如熟悉 Java 多線程編程,由於 NIO 編程涉及到 Reactor 模式,你必須對多線程和網路編程很是熟悉,才能編寫出高質量的 NIO 程序。
3)可靠性能力補齊,開發工做量和難度都很是大:例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等等。NIO 編程的特色是功能開發相對容易,可是可靠性能力補齊工做量和難度都很是大。
4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會致使 Selector 空輪詢,最終致使 CPU 100%。官方聲稱在 JDK 1.6 版本的 update 18 修復了該問題,可是直到 JDK 1.7 版本該問題仍舊存在,只不過該 Bug 發生機率下降了一些而已,它並無被根本解決。
Netty的主要特色有:
1)設計優雅:適用於各類傳輸類型的統一 API 阻塞和非阻塞 Socket;基於靈活且可擴展的事件模型,能夠清晰地分離關注點;高度可定製的線程模型 - 單線程,一個或多個線程池;真正的無鏈接數據報套接字支持(自 3.1 起)。
2)使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其餘依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。
3)高性能、吞吐量更高:延遲更低;減小資源消耗;最小化沒必要要的內存複製。
4)安全:完整的 SSL/TLS 和 StartTLS 支持。
5)社區活躍、不斷更新:社區活躍,版本迭代週期短,發現的 Bug 能夠被及時修復,同時,更多的新功能會被加入。
Netty 常見的使用場景以下:
1)互聯網行業:在分佈式系統中,各個節點之間須要遠程服務調用,高性能的 RPC 框架必不可少,Netty 做爲異步高性能的通訊框架,每每做爲基礎通訊組件被這些 RPC 框架使用。典型的應用有:阿里分佈式服務框架 Dubbo 的 RPC 框架使用 Dubbo 協議進行節點間通訊,Dubbo 協議默認使用 Netty 做爲基礎通訊組件,用於實現各進程節點之間的內部通訊。
2)遊戲行業:不管是手遊服務端仍是大型的網絡遊戲,Java 語言獲得了愈來愈普遍的應用。Netty 做爲高性能的基礎通訊組件,它自己提供了 TCP/UDP 和 HTTP 協議棧。
很是方便定製和開發私有協議棧,帳號登陸服務器,地圖服務器之間能夠方便的經過 Netty 進行高性能的通訊。
3)大數據領域:經典的 Hadoop 的高性能通訊和序列化組件 Avro 的 RPC 框架,默認採用 Netty 進行跨界點通訊,它的 Netty Service 基於 Netty 框架二次封裝實現。
有興趣的讀者能夠了解一下目前有哪些開源項目使用了 Netty的Related Projects。
可參考學習 netty 官方API: http://netty.io/4.1/api/index.html
配置 pom.xml
1 <dependency> 2 <groupId>io.netty</groupId> 3 <artifactId>netty-all</artifactId> 4 <version>4.1.31.Final</version> 5 </dependency>
配置 ServerConnection.java
package com.example.demo.net; import java.net.InetSocketAddress; import org.apache.log4j.Logger; import com.example.demo.handler.ServerMsgHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ServerConnection { Logger logger = Logger.getLogger(ServerConnection.class); private final int port ; private EventLoopGroup bossGroup ; private EventLoopGroup workerGroup ; public ServerConnection(int port) { this.port = port ; } /*** * NioEventLoopGroup 是用來處理I/O操做的多線程事件循環器, * Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。 在這個例子中咱們實現了一個服務端的應用, * 所以會有2個NioEventLoopGroup會被使用。 第一個常常被叫作‘boss’,用來接收進來的鏈接。 * 第二個常常被叫作‘worker’,用來處理已經被接收的鏈接, 一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。 * 如何知道多少個線程已經被使用,如何映射到已經建立的Channels上都須要依賴於EventLoopGroup的實現, * 而且能夠經過構造函數來配置他們的關係。 */ public void run() { System.out.println("啓動服務端Netty鏈接"); bossGroup = new NioEventLoopGroup() ; workerGroup = new NioEventLoopGroup() ; /** * ServerBootstrap 是一個服務端啓動NIO服務的輔助啓動類 , 能夠在這個服務中直接使用Channel */ ServerBootstrap bootstrap = new ServerBootstrap() ; /** * 這一步是必須的,若是沒有設置group將會報java.lang.IllegalStateException: group not set異常 */ bootstrap = bootstrap.group(bossGroup, workerGroup) ; /*** * ServerSocketChannel以NIO的selector爲基礎進行實現的,用來接收新的鏈接 * 這裏告訴Channel如何獲取新的鏈接. */ bootstrap = bootstrap.channel(NioServerSocketChannel.class) ; /*** * 綁定端口 */ bootstrap = bootstrap.localAddress(new InetSocketAddress(port)) ; /*** * 你能夠設置這裏指定的通道實現的配置參數。 咱們正在寫一個TCP/IP的服務端, * 所以咱們被容許設置socket的參數選項好比tcpNoDelay和keepAlive。 * 請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此能夠對ChannelOptions的有一個大概的認識。 */ bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128) ; /*** * option()是提供給NioServerSocketChannel用來接收進來的鏈接。 * childOption()是提供給由父管道ServerChannel接收到的鏈接, * 在這個例子中也是NioServerSocketChannel。 */ bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) ; /*** * 這裏的事件處理類常常會被用來處理一個最近的已經接收的Channel。 ChannelInitializer是一個特殊的處理類, * 目的是幫助使用者配置一個新的Channel。 * 也許你想經過增長一些處理類好比NettyServerHandler來配置一個新的Channel * 或者其對應的ChannelPipeline來實現你的網絡程序。 當你的程序變的複雜時,可能你會增長更多的處理類到pipline上, * 而後提取這些匿名類到最頂層的類上。 */ bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline() ; pipeline.addLast("decoder", new StringDecoder()) ; pipeline.addLast("encoder", new StringEncoder()) ; pipeline.addLast("handler", new ServerMsgHandler()) ; } }) ; bootstrap.bind().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if ( future.isSuccess() ) { System.out.println("服務端開始監聽") ; // logger.info("服務端開始監聽") ; }else { logger.error("服務端沒法使用監聽端口",future.cause()) ; } } }) ; } public void shutdown() { logger.info("關閉 Server 端口"); bossGroup.shutdownGracefully() ; workerGroup.shutdownGracefully() ; } }
配置 ServerMsgHandler.java
package com.example.demo.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerMsgHandler extends ChannelInboundHandlerAdapter { /** * 這裏咱們覆蓋了chanelRead()事件處理方法。 每當從客戶端收到新的數據時, 這個方法會在收到消息時被調用, * 這個例子中,收到的消息的類型是ByteBuf * * @param ctx * 通道處理的上下文信息 * @param msg * 接收的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服務端接收的消息:"+msg.toString()) ; //向客戶端發送消息 String str = msg.toString() ; if ( "高性能NIO框架——Netty".equals(str) ) { ctx.writeAndFlush( "客戶端 , 你好!") ; } // ctx.writeAndFlush(msg.toString()+"你好!") ; } /*** * 這個方法會在發生異常時觸發 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /** * exceptionCaught() 事件處理方法是當出現 Throwable 對象纔會被調用,即當 Netty 因爲 IO * 錯誤或者處理器在處理事件時拋出的異常時。在大部分狀況下,捕獲的異常應該被記錄下來 而且把關聯的 channel * 給關閉掉。然而這個方法的處理方式會在遇到不一樣異常的狀況下有不 同的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。 */ // 出現異常就關閉 cause.printStackTrace() ; ctx.close() ; } }
配置 ClientConnection.java
package com.example.demo.net; import java.net.InetSocketAddress; import org.apache.log4j.Logger; import com.example.demo.handler.ClientMsgHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ClientConnection { Logger logger = Logger.getLogger(ClientConnection.class); private final int port ; private EventLoopGroup bossGroup ; private EventLoopGroup workerGroup ; private Channel channel ; public ClientConnection(int port) { this.port = port ; } public Channel getChannel() { return this.channel ; } /*** * NioEventLoopGroup 是用來處理I/O操做的多線程事件循環器, * Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。 在這個例子中咱們實現了一個服務端的應用, * 所以會有2個NioEventLoopGroup會被使用。 * 第一個常常被叫作‘boss’,用來接收進來的鏈接。 * 第二個常常被叫作‘worker’,用來處理已經被接收的鏈接, 一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。 * 如何知道多少個線程已經被使用,如何映射到已經建立的Channels上都須要依賴於EventLoopGroup的實現, * 而且能夠經過構造函數來配置他們的關係。 * @throws InterruptedException */ public void run() { System.out.println("啓動客戶端Netty鏈接"); bossGroup = new NioEventLoopGroup() ; workerGroup = new NioEventLoopGroup() ; /** * Bootstrap 是客戶端一個啓動NIO服務的輔助啓動類 , 能夠在這個服務中直接使用Channel */ Bootstrap bootstrap = new Bootstrap() ; // ServerBootstrap bootstrap = new ServerBootstrap() ; /** * 這一步是必須的,若是沒有設置group將會報java.lang.IllegalStateException: group not set異常 */ // bootstrap.group(bossGroup, workerGroup) bootstrap.group(bossGroup) /*** * ServerSocketChannel以NIO的selector爲基礎進行實現的,用來接收新的鏈接 * 這裏告訴Channel如何獲取新的鏈接. */ // .channel(NioServerSocketChannel.class) .channel(NioSocketChannel.class) /*** * 綁定端口,等價於bootstrap.bind("127.0.0.1", port) ,若下面用了,就要把這個註釋掉,否則會報錯 */ // .localAddress(new InetSocketAddress(port)) .remoteAddress("127.0.0.1", port) /*** * 你能夠設置這裏指定的通道實現的配置參數。 咱們正在寫一個TCP/IP的服務端, * 所以咱們被容許設置socket的參數選項好比tcpNoDelay和keepAlive。 * 請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此能夠對ChannelOptions的有一個大概的認識。 */ // .option(ChannelOption.SO_BACKLOG, 128) /*** * option()是提供給NioServerSocketChannel用來接收進來的鏈接。 * childOption()是提供給由父管道ServerChannel接收到的鏈接, * 在這個例子中也是NioServerSocketChannel。 */ // .childOption(ChannelOption.SO_KEEPALIVE, true) /*** * 這裏的事件處理類常常會被用來處理一個最近的已經接收的Channel。 ChannelInitializer是一個特殊的處理類, * 目的是幫助使用者配置一個新的Channel。 * 也許你想經過增長一些處理類好比NettyServerHandler來配置一個新的Channel * 或者其對應的ChannelPipeline來實現你的網絡程序。 當你的程序變的複雜時,可能你會增長更多的處理類到pipline上, * 而後提取這些匿名類到最頂層的類上。 */ // .childHandler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline() ; pipeline.addLast("decoder", new StringDecoder()) ; pipeline.addLast("encoder", new StringEncoder()) ; // pipeline.addLast("handler", new ClientMsgHandler()) ; pipeline.addLast(new ClientMsgHandler()) ; } }) ; /* bootstrap.bind().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if ( future.isSuccess() ) { System.out.println("客戶端開始監聽") ; logger.info("客戶端開始監聽") ; }else { logger.error("客戶端沒法使用監聽端口",future.cause()) ; } } }) ; */ /* //綁定端口,開始接收進來的鏈接 ChannelFuture cFuture; try { // cFuture = bootstrap.connect(host, port).sync(); // cFuture = bootstrap.bind("127.0.0.1", port).sync(); cFuture = bootstrap.bind().sync() ; //在這裏拿到這個channel,是爲了 等下 測試消息發送 用的 channel = cFuture.channel(); } catch (InterruptedException e) { e.printStackTrace(); } */ ChannelFuture cf; try { cf = bootstrap.connect().sync(); channel = cf.channel(); channel.writeAndFlush("ClientConnection客戶端已成功啓動!"); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if ( future.isSuccess() ) { System.out.println("客戶端開始監聽") ; // logger.info("客戶端開始監聽") ; }else { // logger.error("客戶端沒法使用監聽端口",future.cause()) ; } } }) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void shutdown() { System.out.println("關閉 Client 端口"); logger.info("關閉 Client 端口"); bossGroup.shutdownGracefully() ; workerGroup.shutdownGracefully() ; } }
配置 ClientMsgHandler.java
package com.example.demo.handler; import org.apache.log4j.Logger; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientMsgHandler extends ChannelInboundHandlerAdapter { Logger logger = Logger.getLogger(ClientMsgHandler.class) ; /** * 這裏咱們覆蓋了chanelRead()事件處理方法。 每當從客戶端收到新的數據時, 這個方法會在收到消息時被調用, * 這個例子中,收到的消息的類型是ByteBuf * * @param ctx * 通道處理的上下文信息 * @param msg * 接收的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // super.channelRead(ctx, msg); System.out.println("客戶端接收的消息:"+msg.toString()) ; //向服務端發送消息 ctx.writeAndFlush("服務端 , 你好!") ; } /*** * 這個方法會在發生異常時觸發 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /** * exceptionCaught() 事件處理方法是當出現 Throwable 對象纔會被調用,即當 Netty 因爲 IO * 錯誤或者處理器在處理事件時拋出的異常時。在大部分狀況下,捕獲的異常應該被記錄下來 而且把關聯的 channel * 給關閉掉。然而這個方法的處理方式會在遇到不一樣異常的狀況下有不 同的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。 */ // 出現異常就關閉 cause.printStackTrace() ; ctx.close() ; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // super.channelActive(ctx); // logger.info("client channel active"); System.out.println("client channel active"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel inactive"); ctx.close() ; } }
編寫測試類 DemoTest.java
1 package com.example.demo.netty; 2 3 import org.junit.runner.RunWith; 4 import org.springframework.boot.test.context.SpringBootTest; 5 import org.springframework.test.context.ActiveProfiles; 6 import org.springframework.test.context.junit4.SpringRunner; 7 8 import com.example.demo.DemoApplicationTests; 9 import com.example.demo.net.ClientConnection; 10 import com.example.demo.net.ServerConnection; 11 12 import io.netty.channel.Channel; 13 14 @RunWith(SpringRunner.class) 15 @SpringBootTest(classes = DemoApplicationTests.class) 16 @ActiveProfiles("test") 17 public class DemoTest { 18 19 public static void main(String[] args) { 20 int port = 2222 ; 21 Thread serverThread = new Thread( new Runnable() { 22 @Override 23 public void run() { 24 new ServerConnection(port).run() ; 25 } 26 } ) ; 27 serverThread.start() ; 28 ClientConnection clientConnection = new ClientConnection(port) ; 29 // Thread clientThread = new Thread( new Runnable() { 30 // @Override 31 // public void run() { 32 // new ClientConnection(port).run() ; 33 // } 34 // } ) ; 35 // clientThread.start() ; 36 37 clientConnection.run(); 38 Channel channel = clientConnection.getChannel() ; 39 channel.writeAndFlush("高性能NIO框架——Netty"); 40 41 // new Thread( ()->{ 42 // new ServerConnection(port) ; 43 // } ).start() ; 44 // new Thread( ()->{ 45 // new ClientConnection(port) ; 46 // } ).start() ; 47 48 } 49 }
服務端與客戶端的區別:
1. 在客戶端只建立了一個NioEventLoopGroup實例,由於客戶端並不須要使用I/O多路複用模型,須要有一個Reactor來接受請求。只須要單純的讀寫數據便可
2. 在客戶端只須要建立一個Bootstrap對象,它是客戶端輔助啓動類,功能相似於ServerBootstrap。
共同窗習,共同進步,如有補充,歡迎指出,謝謝!