Netty入門簡介

前言

Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持,做爲一個異步NIO框架,Netty的全部IO操做都是異步非阻塞的,經過Future-Listener機制,用戶能夠方便的主動獲取或者經過通知機制得到IO操做結果。html

做爲當前最流行的NIO框架,Netty在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,一些業界著名的開源組件也基於Netty的NIO框架構建。react

因此,Netty這個ZB技能必需要學會,不熟悉NIO的能夠先看看深刻淺出NIO Socket。api

Reactor模型

Netty中的Reactor模型主要由多路複用器(Acceptor)、事件分發器(Dispatcher)、事件處理器(Handler)組成,能夠分爲三種。安全

一、單線程模型:全部I/O操做都由一個線程完成,即多路複用、事件分發和處理都是在一個Reactor線程上完成的。網絡


Reactor單線程模型


對於一些小容量應用場景,可使用單線程模型。可是對於高負載、大併發的應用卻不合適,主要緣由以下:多線程

  • 一個線程同時處理成百上千的鏈路,性能上沒法支撐,即使CPU負荷達到100%,也沒法知足海量消息的編碼、解碼、讀取和發送;
  • 當負載太重後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每會進行重發,最終會致使大量消息積壓和處理超時,成爲系統的性能瓶頸;
  • 一旦單線程意外跑飛,或者進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部消息,形成節點故障,可靠性不高。

二、多線程模型:爲了解決單線程模型存在的一些問題,演化而來的Reactor線程模型。架構


Reactor多線程模型


多線程模型的特色:併發

  • 有專門一個Acceptor線程用於監聽服務端,接收客戶端的TCP鏈接請求;
  • 網絡IO的讀寫操做由一個NIO線程池負責,線程池能夠採用標準的JDK線程池實現,包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
  • 一個NIO線程能夠同時處理多條鏈路,可是一個鏈路只能對應一個NIO線程,防止發生併發操做問題。

在絕大多數場景下,Reactor多線程模型均可以知足性能需求;可是,在極特殊應用場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如百萬客戶端併發鏈接,或者服務端須要對客戶端的握手消息進行安全認證,認證自己很是損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,爲了解決性能問題,產生了第三種Reactor線程模型-主從Reactor多線程模型。框架

三、主從多線程模型:採用多個reactor,每一個reactor都在本身單獨的線程裏執行。若是是多核,則能夠同時響應多個客戶端的請求,一旦鏈路創建成功就將鏈路註冊到負責I/O讀寫的SubReactor線程池上。異步


主從多線程模型

事實上,Netty的線程模型並不是固定不變,在啓動輔助類中建立不一樣的EventLoopGroup實例並經過適當的參數配置,就能夠支持上述三種Reactor線程模型。正是由於Netty對Reactor線程模型的支持提供了靈活的定製能力,因此能夠知足不一樣業務場景的性能需求。

示例代碼

如下是server和client的示例代碼,其中使用的是 Netty 4.x,先看看如何實現,後續會針對各個模塊進行深刻分析。

server 代碼實現

public class EchoServer {
    private final int port;
    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(port).sync(); // (5)

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new EchoServer(port).run();
    }
}

 

EchoServerHandler 實現

public class EchoServerHandler extends ChannelInboundHandlerAdapter {  

    private static final Logger logger = Logger.getLogger(  
            EchoServerHandler.class.getName());  

    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  

    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
    }  

    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}

 

一、NioEventLoopGroup 是用來處理I/O操做的線程池,Netty對 EventLoopGroup 接口針對不一樣的傳輸協議提供了不一樣的實現。在本例子中,須要實例化兩個NioEventLoopGroup,一般第一個稱爲「boss」,用來accept客戶端鏈接,另外一個稱爲「worker」,處理客戶端數據的讀寫操做。
二、ServerBootstrap 是啓動服務的輔助類,有關socket的參數能夠經過ServerBootstrap進行設置。
三、這裏指定NioServerSocketChannel類初始化channel用來接受客戶端請求。
四、一般會爲新SocketChannel經過添加一些handler,來設置ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法能夠爲SocketChannel 的pipeline添加指定handler。
五、經過綁定端口8080,就能夠對外提供服務了。

client 代碼實現

public class EchoClient {  

    private final String host;  
    private final int port;  
    private final int firstMessageSize;  

    public EchoClient(String host, int port, int firstMessageSize) {  
        this.host = host;  
        this.port = port;  
        this.firstMessageSize = firstMessageSize;  
    }  

    public void run() throws Exception {  
        // Configure the client.  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            Bootstrap b = new Bootstrap();  
            b.group(group)  
             .channel(NioSocketChannel.class)  
             .option(ChannelOption.TCP_NODELAY, true)  
             .handler(new ChannelInitializer<SocketChannel>() {  
                 @Override  
                 public void initChannel(SocketChannel ch) throws Exception {  
                     ch.pipeline().addLast(  
                             //new LoggingHandler(LogLevel.INFO),  
                             new EchoClientHandler(firstMessageSize));  
                 }  
             });  

            // Start the client.  
            ChannelFuture f = b.connect(host, port).sync();  

            // Wait until the connection is closed.  
            f.channel().closeFuture().sync();  
        } finally {  
            // Shut down the event loop to terminate all threads.  
            group.shutdownGracefully();  
        }  
    }  

    public static void main(String[] args) throws Exception {  
        final String host = args[0];  
        final int port = Integer.parseInt(args[1]);  
        final int firstMessageSize;  
        if (args.length == 3) {  
            firstMessageSize = Integer.parseInt(args[2]);  
        } else {  
            firstMessageSize = 256;  
        }  

        new EchoClient(host, port, firstMessageSize).run();  
    }  
}

 

EchoClientHandler 實現

public class EchoClientHandler extends ChannelInboundHandlerAdapter {  

    private static final Logger logger = Logger.getLogger(  
            EchoClientHandler.class.getName());  

    private final ByteBuf firstMessage;  

    /** 
     * Creates a client-side handler. 
     */  
    public EchoClientHandler(int firstMessageSize) {  
        if (firstMessageSize <= 0) {  
            throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);  
        }  
        firstMessage = Unpooled.buffer(firstMessageSize);  
        for (int i = 0; i < firstMessage.capacity(); i ++) {  
            firstMessage.writeByte((byte) i);  
        }  
    }  

    @Override  
    public void channelActive(ChannelHandlerContext ctx) {  
        ctx.writeAndFlush(firstMessage);  
    }  

    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  

    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
       ctx.flush();  
    }  

    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}
相關文章
相關標籤/搜索