在不少開源項目中都有netty的影子,好比阿里系的rocketmq和dubbo,大數據領elasticsearch,hbase,hadoop都是使用了netty做爲底層傳輸的。到底netty是個啥東東呢,其實 Netty是一個NIO client-server(客戶端服務器)框架,使用Netty能夠快速開發網絡應用,例如服務器和客戶端協議。Netty是業界最流行的NIO框架之一,它的健壯性、功能、性能、可定製性和可擴展性在同類框架中都是數一數二的,它已經獲得成百上千的商用項目驗證,例如Hadoop的RPC框架(Remote Procedure Call Protocol ,遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議)avro使用Netty做爲底層通訊框架。不少其它業界主流的RPC框架,也使用Netty來構建高性能的異步通訊能力。經過對Netty的分析,咱們將它的優勢總結以下:java
說到netty不得不說IO,如今通訊方式無非就是三種通訊模式,BIO,NIO,AIO。git
BIO: 同步並阻塞,服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理(一客戶端一線程)。該模型最大的問題就是缺少彈性伸縮能力,當客戶端併發訪問量增長後,服務端的線程數與客戶端併發訪問數呈1:1的關係,系統性能將急劇降低,隨着併發訪問量的繼續增長,系統會發生線程堆棧溢出、建立新線程失敗等問題,並最終致使宕機或僵死。github
NIO:同步非阻塞,服務器實現模式爲一個請求一個線程,客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。 對於NIO,有兩點須要強調的: (1)關於概念有兩種理解,New I/O(相對於以前的I/O庫是新增的)和Non-block I/O(非阻塞的)。因爲NIO的目標就是讓java支持非阻塞I/O,全部更多人喜歡用Non-block I/O。 (2)不少人喜歡將NIO稱爲異步非阻塞I/O,可是,若是按照嚴格的NUIX網絡編程模型和JDK的實現進行區分,實際上它只是非阻塞I/O,不能稱之爲異步非阻塞I/O。但因爲NIO庫支持非阻塞讀和寫,相對於以前的同步阻塞讀和寫,它是異步的,所以不少人習慣稱NIO爲異步非阻塞I/O。編程
AIO:JDK1.7升級了NIO庫,升級後的NIO庫被稱爲NIO2.0,正式引入了異步通道的概念。NIO2.0的異步套接字通道是真正的異步非阻塞I/O,此即AIO。其服務器實現模式爲一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理。bootstrap
NIO 就是New IO , NIO和IO有相同的做用和目的,但實現方式不一樣,NIO主要用到的是塊,因此NIO的效率要比IO高不少。NIO主要是經過buffer和channel傳遞數據,而原始的io 主要是經過字節流輸入輸出數據。瀏覽器
Netty就是採用了NIO模型,說了這麼多,仍是先來個栗子看看吧:安全
首先構建服務端,SslContext主要是來構建安全套接字,不過這個不是重點, EventLoopGroup bossGroup = new NioEventLoopGroup(1)開始構建boss線程組, ServerBootstrap負責初始化netty服務器,而且開始監聽端口的socket請求。ServerBootstrap用一個ServerSocketChannelFactory 來實例化。ServerSocketChannelFactory 有兩種選擇,一種是NioServerSocketChannelFactory,一種是OioServerSocketChannelFactory。 前者使用NIO,後則使用普通的阻塞式IO。它們都須要兩個線程池實例做爲參數來初始化,一個是boss線程池,一個是worker線程池。服務器
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL.構建安全套接字 final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { [@Override](https://my.oschina.net/u/1162528) public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
ServerBootstrap.bind(int)負責綁定端口,當這個方法執行後,ServerBootstrap就能夠接受指定端口上的socket鏈接了。一個ServerBootstrap能夠綁定多個端口。也就是說ServerBootstrap監聽的一個端口對應一個boss線程,它們一一對應,在boss線程接收了socket鏈接請求後,會產生一個channel(一個打開的socket對應一個打開的channel),並把這個channel交給ServerBootstrap初始化時指定的ServerSocketChannelFactory來處理,boss線程則繼續處理socket的請求。ServerSocketChannelFactory則會從worker線程池中找出一個worker線程來繼續處理這個請求。若是是OioServerSocketChannelFactory的話,那個這個channel上全部的socket消息,從開始到channel(socket)關閉,都只由這個特定的worker來處理,也就是說一個打開的socket對應一個指定的worker線程,這個worker線程在socket沒有關閉的狀況下,也只能爲這個socket處理消息,沒法服務其餘socket。網絡
若是是NioServerSocketChannelFactory的話則否則,每一個worker能夠服務不一樣的socket或者說channel,worker線程和channel再也不有一一對應的關係。 顯然,NioServerSocketChannelFactory只須要少許活動的worker線程及能很好的處理衆多的channel,而OioServerSocketChannelFactory則須要與打開channel等量的worker線程來服務。併發
線 程是一種資源,因此當netty服務器須要處理長鏈接的時候,最好選擇NioServerSocketChannelFactory,這樣能夠避免建立大 量的worker線程。在用做http服務器的時候,也最好選擇NioServerSocketChannelFactory,由於現代瀏覽器都會使用 http keepalive功能(可讓瀏覽器的不一樣http請求共享一個信道),這也是一種長鏈接。
worker線程池中的線程生命週期是怎麼樣的?當某個channel有消息到達或者有消息須要寫入socket的時候,worker線程就會從線程池中取出一個。在worker線程中,消息會通過設定好 的ChannelPipeline處理。ChannelPipeline就是一堆有順序的filter,它分爲兩部分:UpstreamHandler和 DownStreamHandler, 客戶端送入的消息會首先由許多UpstreamHandler依次處理,處理獲得的數據送入應用的業務邏輯handler,對 於Nio當messageReceived()方法執行後,若是沒有產生異常,worker線程就執行完畢了,它會被線程池回收。業務邏輯hanlder 會經過一些方法,把返回的數據交給指定好順序的DownStreamHandler處理,處理後的數據若是須要,會被寫入channel,進而經過綁定的 socket發送給客戶端。這個過程是由另一個線程池中的worker線程來完成的。 對於Oio來講,從始到終,都是由一個指定的worker來處理。
減小worker線程的處理佔用時間
worker 線程是由netty內部管理,統一調配的一種資源,因此最好應該儘快的把讓worker線程執行完畢,返回給線程池回收利用。worker線程的大部分時 間消耗在在ChannelPipeline的各類handler中,而在這些handler中,通常是負責應用程序業務邏輯摻入的那個handler最佔 時間,它一般是排在最後的UpstreamHandler。因此經過把這部分處理內容交給另一個線程來處理,能夠有效的減小worker線程的週期循環 時間。messageReceived()方法中開啓一個新的線程來處理業務邏輯,固然也可使用利用netty框架自帶的ExecutionHandler,這個之後在說。。
下面要說明下載本栗子中用於進行業務邏輯處理的EchoServerHandler,實際的開發中更能夠擁有多個handler作消息的處理,下面咱們來看下這個handler都作了那些工做。 EchoServerHandler主要是繼承ChannelInboundHandlerAdapter,而且複寫其中的某幾個方法就能夠實現本身定義的邏輯,例如咱們能夠複寫channelRead方法能夠獲取客戶端傳過來的消息。
public class EchoServerHandler extends ChannelInboundHandlerAdapter { [@Override] public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String response = new String(req,"utf-8"); System.out.println("收到客戶端服務器的請求消息"+response); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } ctx.writeAndFlush(Unpooled.copiedBuffer("服務端返回的消息hello".getBytes())) .addListener(ChannelFutureListener.CLOSE); } [@Override] public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } [@Override](https://my.oschina.net/u/1162528) public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }
下面來看看客戶端是如何編寫的,首先構建客戶端線程組,只須要構建一個線程組便可,而且綁定服務器主機的ip和port,而且向pipeline中綁定本身的handler這個handler主要是客戶端中須要自定義的業務處理邏輯。
public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { [@Override](https://my.oschina.net/u/1162528) public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
下面看看handler的處理邏輯,只是在鏈接服務器的時候向服務器註冊完畢時候及channelActive發送條消息,而且獲得服務器返回消息的時候調用channelRead方法,獲得消息而且將消息打印。
public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String response = new String(req,"utf-8"); System.out.println("收到服務端返回的消息"+response); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
總結一下Netty的核心包含如下幾個部分:
bootstrap:應用啓動入口
buffer:數據交換的載體
handler:協議與事件的處理工具
channel下包含的核心概念有:
Channel:通訊通道,對應一個物理鏈接 ChannelPipeline:事件處理管道 ChannelHandler:事件處理器 ChannelHandlerContext:上下文環境,包含了handler的引用