Netty是一個Java的開源框架。提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。java
Netty是一個NIO客戶端,服務端框架。容許快速簡單的開發網絡應用程序。例如:服務端和客戶端之間的協議,它簡化了網絡編程規範。算法
一、NIO類庫和API複雜,使用麻煩。編程
二、須要具有Java多線程編程能力(涉及到Reactor模式)。後端
三、客戶端斷線重連、網絡不穩定、半包讀寫、失敗緩存、網絡阻塞和異常碼流等問題處理難度很是大數組
四、存在部分BUG緩存
NIO進行服務器開發的步驟:安全
一、建立ServerSocketChannel,配置爲非阻塞模式;服務器
二、綁定監聽,配置TCP參數;網絡
三、建立一個獨立的IO線程,用於輪詢多路複用器Selector;多線程
四、建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,監聽Accept事件;
五、啓動IO線程,在循環中執行Select.select()方法,輪詢就緒的Channel;
六、當輪詢處處於就緒狀態的Channel時,須要對其進行判斷,若是是OP_ACCEPT狀態,說明有新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端;
七、設置新接入的客戶端鏈路SocketChannel爲非阻塞模式,配置TCP參數;
八、將SocketChannel註冊到Selector上,監聽READ事件;
九、若是輪詢的Channel爲OP_READ,則說明SocketChannel中有新的準備就緒的數據包須要讀取,則構造ByteBuffer對象,讀取數據包;
十、若是輪詢的Channel爲OP_WRITE,則說明還有數據沒有發送完成,須要繼續發送。
一、API使用簡單,開發門檻低;
二、功能強大,預置了多種編解碼功能,支持多種主流協議;
三、定製功能強,能夠經過ChannelHandler對通訊框架進行靈活的擴展;
四、性能高,經過與其餘業界主流的NIO框架對比,Netty綜合性能最優;
五、成熟、穩定,Netty修復了已經發現的NIO全部BUG;
六、社區活躍;
七、經歷了不少商用項目的考驗。
/** * 服務端 */ public class TimeServer { public static void main(String[] args) throws Exception { int port=8080; //服務端默認端口 new TimeServer().bind(port); } public void bind(int port) throws Exception{ //1用於服務端接受客戶端的鏈接 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); //2用於進行SocketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty用於啓動NIO服務器的輔助啓動類 ServerBootstrap sb = new ServerBootstrap(); //將兩個NIO線程組傳入輔助啓動類中 sb.group(acceptorGroup, workerGroup) //設置建立的Channel爲NioServerSocketChannel類型 .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的TCP參數 .option(ChannelOption.SO_BACKLOG, 1024) //設置綁定IO事件的處理類 .childHandler(new ChannelInitializer<SocketChannel>() { //建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } }); //綁定端口,同步等待成功(sync():同步阻塞方法,等待bind操做完成才繼續) //ChannelFuture主要用於異步操做的通知回調 ChannelFuture cf = sb.bind(port).sync(); System.out.println("服務端啓動在8080端口。"); //等待服務端監聽端口關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * 服務端channel */ public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; //buf.readableBytes():獲取緩衝區中可讀的字節數; //根據可讀字節數建立數組 byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); //將待發送的消息放到發送緩存數組中 ctx.writeAndFlush(resp); } }
/** * 客戶端 */ public class TimeClient { public static void main(String[] args) throws Exception { int port=8080; //服務端默認端口 new TimeClient().connect(port, "127.0.0.1"); } public void connect(int port, String host) throws Exception{ //配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override //建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeClientHandler()); } }); //發起異步鏈接操做 ChannelFuture cf = bs.connect(host, port).sync(); //等待客戶端鏈路關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } }
/** * 客戶端channel */ public class TimeClientHandler extends ChannelHandlerAdapter { @Override //向服務器發送指令 public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 1; i++) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuf firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); ctx.writeAndFlush(firstMessage); } } @Override //接收服務器的響應 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; //buf.readableBytes():獲取緩衝區中可讀的字節數; //根據可讀字節數建立數組 byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is : "+body); } @Override //異常處理 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //釋放資源 ctx.close(); } }
TCP是一個「流」協議,所謂流,就是沒有界限的一串數據。能夠想象爲河流中的水,並無分界線。TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分,因此在業務上認爲,一個完整的包可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。
TCP粘包拆包問題示例圖:
假設客戶端分別發送了兩個數據包D1和D2給服務端,因爲服務端一次讀取到的字節數是不肯定的,可能存在如下4種狀況。
一、服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;
二、服務端一次接收到了兩個數據包,D1和D2粘合在一塊兒,被稱爲TCP粘包;
三、服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部份內容,第二次讀取到了D2包的剩餘部份內容,這被稱爲TCP拆包;
四、服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部份內容D1_1,第二次讀取到了D1包的剩餘內容D1_1和D2包的完整內容;
若是此時服務器TCP接收滑窗很是小,而數據包D1和D2比較大,頗有可能發生第五種狀況,既服務端分屢次才能將D1和D2包接收徹底,期間發生屢次拆包;
問題的解決策略
因爲底層的TCP沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案可概括以下:
一、消息定長,例如每一個報文的大小爲固定長度200字節,若是不夠,空位補空格;
二、在包尾增長回車換行符進行分割,例如FTP協議;
三、將消息分爲消息頭和消息體,消息頭中包含消息總長度(或消息體總長度)的字段,一般設計思路爲消息頭的第一個字段使用int32來表示消息的總程度;
四、更復雜的應用層協議;
LineBasedFrameDecoder
爲了解決TCP粘包/拆包致使的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包。
LinkeBasedFrameDecoder的工做原理是它一次遍歷ByteBuf中的可讀字節,判斷看是否有「\n」、「\r\n」,若是有,就一次位置爲結束位置,從可讀索引到結束位置區間的字節就組成一行。它是以換行符爲結束標誌的編解碼,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度。若是連續讀取到最大長度後任然沒有發現換行符,就會拋出異常,同時忽略掉以前讀到的異常碼流。
/** * 服務端 */ public class TimeServer { public static void main(String[] args) throws Exception { int port=8080; //服務端默認端口 new TimeServer().bind(port); } public void bind(int port) throws Exception{ //Reactor線程組 //1用於服務端接受客戶端的鏈接 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); //2用於進行SocketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty用於啓動NIO服務器的輔助啓動類 ServerBootstrap sb = new ServerBootstrap(); //將兩個NIO線程組傳入輔助啓動類中 sb.group(acceptorGroup, workerGroup) //設置建立的Channel爲NioServerSocketChannel類型 .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的TCP參數 .option(ChannelOption.SO_BACKLOG, 1024) //設置綁定IO事件的處理類 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { //處理粘包/拆包問題 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeServerHandler()); } }); //綁定端口,同步等待成功(sync():同步阻塞方法) //ChannelFuture主要用於異步操做的通知回調 ChannelFuture cf = sb.bind(port).sync(); //等待服務端監聽端口關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * 服務端channel */ public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf buf = (ByteBuf) msg; // //buf.readableBytes():獲取緩衝區中可讀的字節數; // //根據可讀字節數建立數組 // byte[] req = new byte[buf.readableBytes()]; // buf.readBytes(req); // String body = new String(req, "UTF-8"); String body = (String) msg; System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); //將待發送的消息放到發送緩存數組中 ctx.writeAndFlush(resp); } }
/** * 客戶端 */ public class TimeClient { public static void main(String[] args) throws Exception { int port=8080; //服務端默認端口 new TimeClient().connect(port, "127.0.0.1"); } public void connect(int port, String host) throws Exception{ //配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override //建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 protected void initChannel(SocketChannel arg0) throws Exception { //處理粘包/拆包問題 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeClientHandler()); } }); //發起異步鏈接操做 ChannelFuture cf = bs.connect(host, port).sync(); //等待客戶端鏈路關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } }
/** * 客戶端channel */ public class TimeClientHandler extends ChannelHandlerAdapter { private int counter; private byte[] req; @Override //向服務器發送指令 public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message=null; //模擬一百次請求,發送重複內容 for (int i = 0; i < 200; i++) { req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes(); message=Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override //接收服務器的響應 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf buf = (ByteBuf) msg; // //buf.readableBytes():獲取緩衝區中可讀的字節數; // //根據可讀字節數建立數組 // byte[] req = new byte[buf.readableBytes()]; // buf.readBytes(req); // String body = new String(req, "UTF-8"); String body = (String) msg; System.out.println("Now is : "+body+". the counter is : "+ ++counter); } @Override //異常處理 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //釋放資源 ctx.close(); } }
DelimiterBasedFrameDecoder
實現自定義分隔符做爲消息的結束標誌,完成解碼。
/** * 服務端 */ public class TimeServer { public static void main(String[] args) throws Exception { int port=8080; //服務端默認端口 new TimeServer().bind(port); } public void bind(int port) throws Exception{ //Reactor線程組 //1用於服務端接受客戶端的鏈接 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); //2用於進行SocketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty用於啓動NIO服務器的輔助啓動類 ServerBootstrap sb = new ServerBootstrap(); //將兩個NIO線程組傳入輔助啓動類中 sb.group(acceptorGroup, workerGroup) //設置建立的Channel爲NioServerSocketChannel類型 .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的TCP參數 .option(ChannelOption.SO_BACKLOG, 1024) //設置綁定IO事件的處理類 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { //處理粘包/拆包問題 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeServerHandler()); } }); //綁定端口,同步等待成功(sync():同步阻塞方法) //ChannelFuture主要用於異步操做的通知回調 ChannelFuture cf = sb.bind(port).sync(); //等待服務端監聽端口關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * 服務端channel */ public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime += "$_"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); //將待發送的消息放到發送緩存數組中 ctx.writeAndFlush(resp); } }
/** * 客戶端 */ public class TimeClient { public static void main(String[] args) throws Exception { int port=8080; //服務端默認端口 new TimeClient().connect(port, "127.0.0.1"); } public void connect(int port, String host) throws Exception{ //配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override //建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 protected void initChannel(SocketChannel arg0) throws Exception { //處理粘包/拆包問題 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeClientHandler()); } }); //發起異步鏈接操做 ChannelFuture cf = bs.connect(host, port).sync(); //等待客戶端鏈路關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } }
/** * 客戶端channel */ public class TimeClientHandler extends ChannelHandlerAdapter { private int counter; private byte[] req; @Override //向服務器發送指令 public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message=null; //模擬一百次請求,發送重複內容 for (int i = 0; i < 200; i++) { req = ("QUERY TIME ORDER"+"$_").getBytes(); message=Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override //接收服務器的響應 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("Now is : "+body+". the counter is : "+ ++counter); } @Override //異常處理 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //釋放資源 ctx.close(); } }
FixedLengthFrameDecoder
是固定長度解碼器,可以按照指定的長度對消息進行自動解碼,開發者不須要考慮TCP的粘包/拆包問題。
在IO編程過程當中,當須要同時處理多個客戶端接入請求時,能夠利用多線程或者IO多路複用技術進行處理。IO多路複用技術經過把多個IO的阻塞複用到同一個Selector的阻塞上,從而使得系統在單線程的狀況下能夠同時處理多個客戶端請求。與傳統的多線程/多進程模型相比,IO多路複用的最大優點是系統開銷小,系統不須要建立新的額外進程或者線程,也不須要維護這些進程和線程的運行,下降了系統的維護工做量,節省了系統資源。
Netty的IO線程NioEventLoop因爲聚合了多路複用器Selector,能夠同時併發處理成百上千個客戶端SocketChannel。因爲讀寫操做都是非阻塞的,這就能夠充分提高IO線程的運行效率,避免由頻繁的IO阻塞致使的線程掛起。另外,因爲Netty採用了異步通訊模式,一個IO線程能夠併發處理N個客戶端鏈接和讀寫操做,這從根本上解決了傳統同步阻塞IO中 一鏈接一線程模型,架構的性能、彈性伸縮能力和可靠性都獲得了極大的提高。
經常使用的Reactor線程模型有三種,分別以下:
1.Reactor單線程模型;
2.Reactor多線程模型;
3.主從Reactor多線程模型;
Reactor單線程模型,指的是全部的IO操做都在同一個NIO線程上面完成,NIO線程職責以下:
一、做爲NIO服務端,接收客戶端的TCP鏈接;
二、做爲NIO客戶端,向服務端發起TCP鏈接;
三、讀取通訊對端的請求或者應答消息;
四、向通訊對端發送請求消息或者應答消息;
因爲Reactor模式使用的是異步非阻塞IO,全部的IO操做都不會致使阻塞,理論上一個線程能夠獨立處理全部IO相關操做。從架構層面看,一個NIO線程確實能夠完成其承擔的職責。例如,經過Acceptor接收客戶端的TCP鏈接請求消息,鏈路創建成功以後,經過Dispatch將對應的ByteBuffer派發到指定的Handler上進行消息編碼。用戶Handler能夠經過NIO線程將消息發送給客戶端。
對於一些小容量應用場景,可使用單線程模型,可是對於高負載、大併發的應用卻不合適,主要緣由以下:
一、一個NIO線程同時處理成百上千的鏈路,性能上沒法支撐。即使NIO線程的CPU負荷達到100%,也沒法知足海量消息的編碼、解碼、讀取和發送;
二、當NIO線程負載太重後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每會進行重發,這更加劇了NIO線程的負載,最終會致使大量消息積壓和處理超時,NIO線程會成爲系統的性能瓶頸;
三、可靠性問題。一旦NIO線程意外進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部消息,形成節點故障。
爲了解決這些問題,從而演進出了Reactor多線程模型。
Reactor多線程模型與單線程模型最大的區別就是有一組NIO線程處理IO操做,特色以下:
一、有一個專門的NIO線程——Acceptor線程用於監聽服務端,接收客戶端TCP鏈接請求;
二、網絡IO操做——讀、寫等由一個NIO線程池負責,線程池能夠採用標準的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、編碼、解碼和發送;
三、1個NIO線程能夠同時處理N條鏈路,可是1個鏈路只對應1個NIO線程,防止發生併發操做問題。
在絕大多數場景下,Reactor多線程模型均可以知足性能需求;可是,在極特殊應用場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如百萬客戶端併發鏈接,或者服務端須要對客戶端的握手消息進行安全認證,認證自己很是損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,爲了解決性能問題,產生了第三種Reactor線程模型——主從Reactor多線程模型。
主從Reactor線程模型的特色是:服務端用於接收客戶端鏈接的再也不是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP鏈接請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到IO線程池(subReactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工做。Acceptor線程池只用於客戶端的登陸、握手和安全認證,一旦鏈路創建成功,就將鏈路註冊到後端subReactor線程池的IO線程上,由IO線程負責後續的IO操做。
利用主從NIO線程模型,能夠解決1個服務端監聽線程沒法有效處理全部客戶端鏈接的性能不足問題。Netty官方推薦使用該線程模型。它的工做流程總結以下:
一、從主線程池中隨機選擇一個Reactor線程做爲Acceptor線程,用於綁定監聽端口,接收客戶端鏈接;
二、Acceptor線程接收客戶端鏈接請求以後,建立新的SocketChannel,將其註冊到主線程池的其餘Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操做;
三、而後也業務層的鏈路正式創建成功,將SocketChannel從主線程池的Reactor線程的多路複用器上摘除,從新註冊到Sub線程池的線程上,用於處理IO的讀寫操做。
在大多數場景下,並行多線程處理能夠提高系統的併發性能。可是,若是對於共享資源的併發訪問處理不當,會帶來嚴重的鎖競爭,這最終會致使性能的降低。爲了儘量地避免鎖競爭帶來的性能損耗,能夠經過串行化設計,既消息的處理儘量在同一個線程內完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。
爲了儘量提高性能,Netty採用了串行無鎖化設計,在IO線程內部進行串行操做,避免多線程競爭致使的性能降低。表面上看,串行化設計彷佛CPU利用率不高,併發程度不夠。可是,經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列——多個工做線程模型性能更優。
Netty串行化設計工做原理圖以下:
Netty的NioEventLoop讀取到消息後,直接調用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調用到用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程致使的鎖競爭,從性能角度看是最優的。
Netty中高效併發編程主要體現:
一、volatile的大量、正確使用;
二、CAS和原子類的普遍使用;
三、線程安全容器的使用;
四、經過讀寫鎖提高併發性能。
影響序列化性能的關鍵因素總結以下:
一、序列化後的碼流大小(網絡寬帶的佔用);
二、序列化與反序列化的性能(CPU資源佔用);
三、是否支持跨語言(異構系統的對接和開發語言切換)。
Netty默認提供了對GoogleProtobuf的支持,經過擴展Netty的編解碼接口,用戶能夠實現其餘的高性能序列化框架
Netty的「零拷貝」主要體如今三個方面:
1)、Netty的接收和發送ByteBuffer採用DIRECT BUFFERS,使用堆外直接內存進行Socket讀寫,不須要進行字節緩衝區的二次拷貝。若是使用傳統的堆內存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內存Buffer拷貝一份到直接內存中,而後才寫入Socket中。相比於堆外直接內存,消息在發送過程當中多了一次緩衝區的內存拷貝。
2)、第二種「零拷貝 」的實現CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統一封裝後的ByteBuf接口。
3)、第三種「零拷貝」就是文件傳輸,Netty文件傳輸類DefaultFileRegion經過transferTo方法將文件發送到目標Channel中。不少操做系統直接將文件緩衝區的內容發送到目標Channel中,而不須要經過循環拷貝的方式,這是一種更加高效的傳輸方式,提高了傳輸性能,下降了CPU和內存佔用,實現了文件傳輸的「零拷貝」。
隨着JVM虛擬機和JIT即時編譯技術的發展,對象的分配和回收是個很是輕量級的工做。可是對於緩衝區Buffer,狀況卻稍有不一樣,特別是對於堆外直接內存的分配和回收,是一件耗時的操做。爲了儘可能重用緩衝區,Netty提供了基於內存池的緩衝區重用機制。
Netty在啓動輔助類中能夠靈活的配置TCP參數,知足不一樣的用戶場景。合理設置TCP參數在某些場景下對於性能的提高能夠起到的顯著的效果,總結一下對性能影響比較大的幾個配置項:
1)、SO_RCVBUF和SO_SNDBUF:一般建議值爲128KB或者256KB;
2)、TCP_NODELAY:NAGLE算法經過將緩衝區內的小封包自動相連,組成較大的封包,阻止大量小封包的發送阻塞網絡,從而提升網絡應用效率。可是對於時延敏感的應用場景須要關閉該優化算法;
3)、軟中斷:若是Linux內核版本支持RPS(2.6.35以上版本),開啓RPS後能夠實現軟中斷,提高網絡吞吐量。RPS根據數據包的源地址,目的地址以及目的和源端口,計算出一個hash值,而後根據這個hash值來選擇軟中斷運行的CPU。從上層來看,也就是說將每一個鏈接和CPU綁定,並經過這個hash值,來均衡軟中斷在多個CPU上,提高網絡並行處理性能。