Netty高併發原理

    Netty是一個高性能 事件驅動的異步的非堵塞的IO(NIO)框架,用於創建TCP等底層的鏈接,基於Netty能夠創建高性能的Http服務器。支持HTTP、 WebSocket 、Protobuf、 Binary TCP |和UDP,Netty已經被不少高性能項目做爲其Socket底層基礎,如HornetQ Infinispan Vert.x
Play Framework Finangle和 Cassandra。其競爭對手是:Apache MINA和 Grizzly。css

    傳統堵塞的IO讀取以下:html

InputStream is = new FileInputStream("input.bin");
int byte = is.read(); // 當前線程等待結果到達直至錯誤
 

而使用NIO以下:

 
while (true) {
 selector.select(); // 從多個通道請求事件
 Iterator it = selector.selectedKeys().iterator();
 while (it.hasNext()) {
  SelectorKey key = (SelectionKey) it.next();
  handleKey(key);
  it.remove();
 }
 
堵塞與非堵塞原理

傳統硬件的堵塞以下,從內存中讀取數據,而後寫到磁盤,而CPU一直等到磁盤寫完成,磁盤的寫操做是慢的,這段時間CPU被堵塞不能發揮效率。安全

使用非堵塞的DMA以下圖:CPU只是發出寫操做這樣的指令,作一些初始化工做,DMA具體執行,從內存中讀取數據,而後寫到磁盤,當完成寫後發出一箇中斷事件給CPU。這段時間CPU是空閒的,能夠作別的事情。這個原理稱爲Zero.copy零拷貝。服務器

 

Netty底層基於上述Java NIO的零拷貝原理實現:框架

 

比較
  • Tomcat是一個Web服務器,它是採起一個請求一個線程,當有1000客戶端時,會耗費不少內存。一般一個線程將花費 256kb到1mb的stack空間。
  • Node.js是一個線程服務於全部請求,在錯誤處理上有限制
  • Netty是一個線程服務於不少請求,以下圖,當從Java NIO得到一個Selector事件,將激活通道Channel。

演示

Netty的使用代碼以下:異步

Channel channel = ...
ChannelFuture cf = channel.write(data);
cf.addListener(
  new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture future) throws Exception {
     if(!future.isSuccess() {
        future.cause().printStacktrace();
        ...
     }
     ...
   }
});
...
cf.sync();

 

經過引入觀察者監聽,當有數據時,將自動激活監聽者中的代碼運行。socket

咱們使用Netty創建一個服務器代碼:ide

public class EchoServer {
 
    private final int port;
 
    public EchoServer(int port) { 
        this.port = port; 
    }
 
    public void run() throws Exception { 
        // Configure the server. 
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        EventLoopGroup workerGroup = new NioEventLoopGroup(); 
        try { 
            ServerBootstrap b = new ServerBootstrap(); 
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) 
                   .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { 
                       @Override 
                       public void initChannel(SocketChannel ch) throws Exception { 
                           ch.pipeline().addLast( 
                           // new LoggingHandler(LogLevel.INFO), 
                                   new EchoServerHandler()); 
                       } 
                   });
 
            // Start the server. 
            ChannelFuture f = b.bind(port).sync();
 
            // Wait until the server socket is closed. 
            f.channel().closeFuture().sync(); 
        } finally { 
            // Shut down all event loops to terminate all threads. 
            bossGroup.shutdownGracefully(); 
            workerGroup.shutdownGracefully(); 
        } 
    }
 }

這段代碼調用:在9999端口啓動oop

new EchoServer(9999).run();
 
咱們須要完成的代碼是EchoServerHandler:
 
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
 
    private static final Logger logger = Logger.getLogger(EchoServerHandler.class.getName());
 
    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        ctx.write(msg); 
    }
 
    @Override 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
        ctx.flush(); 
    }
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
        // Close the connection when an exception is raised. 
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); 
        ctx.close(); 
    } 
}
 
原理

    一個Netty服務器的原理以下:性能

    圖中每次請求的讀取是經過UpStream來實現,而後激活咱們的服務邏輯如EchoServerHandler,而服務器向外寫數據,也就是響應是經過DownStream實現的。每一個通道Channel包含一對UpStream和DownStream,以及咱們的handlers(EchoServerHandler),以下圖,這些都是經過channel pipeline封裝起來的,數據流在管道里流動,每一個Socket對應一個ChannelPipeline。

    CHANNELPIPELINE是關鍵,它相似Unix的管道,有如下做用:

  • 爲每一個Channel 保留 ChannelHandlers ,如EchoServerHandler
  • 全部的事件都要經過它
  • 不斷地修改:相似unix的SH管道: echo "Netty is shit...." | sed -e 's/is /is the /'
  • 一個Channel對應一個 ChannelPipeline
  • 包含協議編碼解碼 安全驗證SSL/TLS和應用邏輯
客戶端代碼

前面咱們演示了服務器端代碼,下面是客戶端代碼:

public class EchoClient { 
    private final String host; 
    private final int port; 
    private final int firstMessageSize;
 
    public EchoClient(String host, int port, int firstMessageSize) { 
        this.host = host; 
        this.port = port; 
        this.firstMessageSize = firstMessageSize; 
    }
 
    public void run() throws Exception { 
        // Configure the client. 
        EventLoopGroup group = new NioEventLoopGroup(); 
        try { 
            Bootstrap b = new Bootstrap(); 
           b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { 
                @Override 
                public void initChannel(SocketChannel ch) throws Exception { 
                   ch.pipeline().addLast( 
                   // new LoggingHandler(LogLevel.INFO), 
                           new EchoClientHandler(firstMessageSize)); 
                } 
            });
 
            // Start the client. 
            ChannelFuture f = b.connect(host, port).sync();
 
            // Wait until the connection is closed. 
            f.channel().closeFuture().sync(); 
        } finally { 
            // Shut down the event loop to terminate all threads. 
            group.shutdownGracefully(); 
        } 
    } 
}

客戶端的應用邏輯EchoClientHandler

public class EchoClientHandler extends ChannelInboundHandlerAdapter {
 
    private static final Logger logger = Logger.getLogger(EchoClientHandler.class.getName());
 
    private final ByteBuf firstMessage;
 
    /** 
     * Creates a client-side handler. 
     */ 
    public EchoClientHandler(int firstMessageSize) { 
        if (firstMessageSize <= 0) { 
            throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize); 
        } 
        firstMessage = Unpooled.buffer(firstMessageSize); 
        for (int i = 0; i < firstMessage.capacity(); i++) { 
            firstMessage.writeByte((byte) i); 
        } 
    }
 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
        ctx.writeAndFlush(firstMessage); 
        System.out.print("active"); 
    }
 
    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        ctx.write(msg); 
        System.out.print("read"); 
    }
 
    @Override 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
        ctx.flush(); 
        System.out.print("readok"); 
    }
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
        // Close the connection when an exception is raised. 
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); 
        ctx.close(); 
    }
 
}

 

轉載自:http://www.jdon.com/concurrent/netty.html

相關文章
相關標籤/搜索