netty學習之一 netty入門

在不少開源項目中都有netty的影子,好比阿里系的rocketmq和dubbo,大數據領elasticsearch,hbase,hadoop都是使用了netty做爲底層傳輸的。到底netty是個啥東東呢,其實 Netty是一個NIO client-server(客戶端服務器)框架,使用Netty能夠快速開發網絡應用,例如服務器和客戶端協議。Netty是業界最流行的NIO框架之一,它的健壯性、功能、性能、可定製性和可擴展性在同類框架中都是數一數二的,它已經獲得成百上千的商用項目驗證,例如Hadoop的RPC框架(Remote Procedure Call Protocol ,遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議)avro使用Netty做爲底層通訊框架。不少其它業界主流的RPC框架,也使用Netty來構建高性能的異步通訊能力。經過對Netty的分析,咱們將它的優勢總結以下:java

    1. API使用簡單,開發門檻低;
    1. 功能強大,預置了多種編解碼功能,支持多種主流協議;
    1. 定製能力強,能夠經過ChannelHandler對通訊框架進行靈活的擴展;
    1. 性能高,經過與其它業界主流的NIO框架對比,Netty的綜合性能最優;
    1. 成熟、穩定,Netty修復了已經發現的全部JDK NIO BUG,業務開發人員不須要再爲NIO的BUG而煩惱;
    1. 社區活躍,版本迭代週期短,發現的BUG能夠被及時修復,同時,更多的新功能會被加入;
    1. 經歷了大規模的商業應用考驗,質量已經獲得驗證。在互聯網、大數據、網絡遊戲、企業應用、電信軟件等衆多行業獲得成功商用,證實了它能夠徹底知足不一樣行業的商業應用。
  • 說到netty不得不說IO,如今通訊方式無非就是三種通訊模式,BIO,NIO,AIO。git

    BIO: 同步並阻塞,服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理(一客戶端一線程)。該模型最大的問題就是缺少彈性伸縮能力,當客戶端併發訪問量增長後,服務端的線程數與客戶端併發訪問數呈1:1的關係,系統性能將急劇降低,隨着併發訪問量的繼續增長,系統會發生線程堆棧溢出、建立新線程失敗等問題,並最終致使宕機或僵死。github

    NIO:同步非阻塞,服務器實現模式爲一個請求一個線程,客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。 對於NIO,有兩點須要強調的: (1)關於概念有兩種理解,New I/O(相對於以前的I/O庫是新增的)和Non-block I/O(非阻塞的)。因爲NIO的目標就是讓java支持非阻塞I/O,全部更多人喜歡用Non-block I/O。 (2)不少人喜歡將NIO稱爲異步非阻塞I/O,可是,若是按照嚴格的NUIX網絡編程模型和JDK的實現進行區分,實際上它只是非阻塞I/O,不能稱之爲異步非阻塞I/O。但因爲NIO庫支持非阻塞讀和寫,相對於以前的同步阻塞讀和寫,它是異步的,所以不少人習慣稱NIO爲異步非阻塞I/O。編程

AIO:JDK1.7升級了NIO庫,升級後的NIO庫被稱爲NIO2.0,正式引入了異步通道的概念。NIO2.0的異步套接字通道是真正的異步非阻塞I/O,此即AIO。其服務器實現模式爲一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理。bootstrap

NIO 就是New IO , NIO和IO有相同的做用和目的,但實現方式不一樣,NIO主要用到的是塊,因此NIO的效率要比IO高不少。NIO主要是經過buffer和channel傳遞數據,而原始的io 主要是經過字節流輸入輸出數據。瀏覽器

Netty就是採用了NIO模型,說了這麼多,仍是先來個栗子看看吧:安全

首先構建服務端,SslContext主要是來構建安全套接字,不過這個不是重點, EventLoopGroup bossGroup = new NioEventLoopGroup(1)開始構建boss線程組, ServerBootstrap負責初始化netty服務器,而且開始監聽端口的socket請求。ServerBootstrap用一個ServerSocketChannelFactory 來實例化。ServerSocketChannelFactory 有兩種選擇,一種是NioServerSocketChannelFactory,一種是OioServerSocketChannelFactory。 前者使用NIO,後則使用普通的阻塞式IO。它們都須要兩個線程池實例做爲參數來初始化,一個是boss線程池,一個是worker線程池。服務器

public final class EchoServer {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    public static void main(String[] args) throws Exception {
        // Configure SSL.構建安全套接字
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 [@Override](https://my.oschina.net/u/1162528)
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     p.addLast(new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

ServerBootstrap.bind(int)負責綁定端口,當這個方法執行後,ServerBootstrap就能夠接受指定端口上的socket鏈接了。一個ServerBootstrap能夠綁定多個端口。也就是說ServerBootstrap監聽的一個端口對應一個boss線程,它們一一對應,在boss線程接收了socket鏈接請求後,會產生一個channel(一個打開的socket對應一個打開的channel),並把這個channel交給ServerBootstrap初始化時指定的ServerSocketChannelFactory來處理,boss線程則繼續處理socket的請求。ServerSocketChannelFactory則會從worker線程池中找出一個worker線程來繼續處理這個請求。若是是OioServerSocketChannelFactory的話,那個這個channel上全部的socket消息,從開始到channel(socket)關閉,都只由這個特定的worker來處理,也就是說一個打開的socket對應一個指定的worker線程,這個worker線程在socket沒有關閉的狀況下,也只能爲這個socket處理消息,沒法服務其餘socket。網絡

若是是NioServerSocketChannelFactory的話則否則,每一個worker能夠服務不一樣的socket或者說channel,worker線程和channel再也不有一一對應的關係。 顯然,NioServerSocketChannelFactory只須要少許活動的worker線程及能很好的處理衆多的channel,而OioServerSocketChannelFactory則須要與打開channel等量的worker線程來服務。併發

線 程是一種資源,因此當netty服務器須要處理長鏈接的時候,最好選擇NioServerSocketChannelFactory,這樣能夠避免建立大 量的worker線程。在用做http服務器的時候,也最好選擇NioServerSocketChannelFactory,由於現代瀏覽器都會使用 http keepalive功能(可讓瀏覽器的不一樣http請求共享一個信道),這也是一種長鏈接。

worker線程池中的線程生命週期是怎麼樣的?當某個channel有消息到達或者有消息須要寫入socket的時候,worker線程就會從線程池中取出一個。在worker線程中,消息會通過設定好 的ChannelPipeline處理。ChannelPipeline就是一堆有順序的filter,它分爲兩部分:UpstreamHandler和 DownStreamHandler, 客戶端送入的消息會首先由許多UpstreamHandler依次處理,處理獲得的數據送入應用的業務邏輯handler,對 於Nio當messageReceived()方法執行後,若是沒有產生異常,worker線程就執行完畢了,它會被線程池回收。業務邏輯hanlder 會經過一些方法,把返回的數據交給指定好順序的DownStreamHandler處理,處理後的數據若是須要,會被寫入channel,進而經過綁定的 socket發送給客戶端。這個過程是由另一個線程池中的worker線程來完成的。 對於Oio來講,從始到終,都是由一個指定的worker來處理。

減小worker線程的處理佔用時間

worker 線程是由netty內部管理,統一調配的一種資源,因此最好應該儘快的把讓worker線程執行完畢,返回給線程池回收利用。worker線程的大部分時 間消耗在在ChannelPipeline的各類handler中,而在這些handler中,通常是負責應用程序業務邏輯摻入的那個handler最佔 時間,它一般是排在最後的UpstreamHandler。因此經過把這部分處理內容交給另一個線程來處理,能夠有效的減小worker線程的週期循環 時間。messageReceived()方法中開啓一個新的線程來處理業務邏輯,固然也可使用利用netty框架自帶的ExecutionHandler,這個之後在說。。

下面要說明下載本栗子中用於進行業務邏輯處理的EchoServerHandler,實際的開發中更能夠擁有多個handler作消息的處理,下面咱們來看下這個handler都作了那些工做。 EchoServerHandler主要是繼承ChannelInboundHandlerAdapter,而且複寫其中的某幾個方法就能夠實現本身定義的邏輯,例如咱們能夠複寫channelRead方法能夠獲取客戶端傳過來的消息。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    [@Override]
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf buf = (ByteBuf)msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String response = new String(req,"utf-8");
            System.out.println("收到客戶端服務器的請求消息"+response);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        ctx.writeAndFlush(Unpooled.copiedBuffer("服務端返回的消息hello".getBytes()))
                .addListener(ChannelFutureListener.CLOSE);
    }

    [@Override]
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
    [@Override](https://my.oschina.net/u/1162528)
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

下面來看看客戶端是如何編寫的,首先構建客戶端線程組,只須要構建一個線程組便可,而且綁定服務器主機的ip和port,而且向pipeline中綁定本身的handler這個handler主要是客戶端中須要自定義的業務處理邏輯。

public final class EchoClient {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 [@Override](https://my.oschina.net/u/1162528)
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }

下面看看handler的處理邏輯,只是在鏈接服務器的時候向服務器註冊完畢時候及channelActive發送條消息,而且獲得服務器返回消息的時候調用channelRead方法,獲得消息而且將消息打印。

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf buf = (ByteBuf)msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String response = new String(req,"utf-8");
            System.out.println("收到服務端返回的消息"+response);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

總結一下Netty的核心包含如下幾個部分:

  • channel:通訊框架的核心抽象
  • bootstrap:應用啓動入口
  • buffer:數據交換的載體
  • handler:協議與事件的處理工具

channel下包含的核心概念有:

Channel:通訊通道,對應一個物理鏈接
ChannelPipeline:事件處理管道
ChannelHandler:事件處理器
ChannelHandlerContext:上下文環境,包含了handler的引用

完整代碼連接:https://github.com/winstonelei/Smt

相關文章
相關標籤/搜索