本文目的是在於對netty有一個大體印象,便於深刻了解netty前有一些基本概念java
sequenceDiagram Channel->>Buffer: 讀取 Buffer->>程序:讀取 程序->>Buffer:寫入 Buffer->>Channel: 寫入
- 在NIO中,全部的數據都是從Channel中讀取並緩存到Buffer中,用戶本身的代碼再從Buffer中讀取
- 要想寫數據,則必須先寫入Buffer中,而後在把Buffer寫入到Channnel中。
Channel 一個鏈接的抽象。全部的讀寫數據,最終都是經過Channel流通的。主要有NioSocketChannel、NioServerSocketChannelbootstrap
netty本身實現的一個緩存Buf,相比於JDK的ByteBuffer有一下優勢緩存
- 長度能夠動態拓展(在寫入的時候,判斷到capacity不夠,會從新開闢一段大容量的buf,而後把以前buf中的數據拷貝到新的buf中)
- 操做簡單。JDK的ByteBuffer只有一個位置指針position。netty的ByteBuf則有readIndex(度位置)、writeIndex(寫位置)。在對buf操做時候,不須要flip()和rewind();操做簡單。
ChannelHandler 從Channel中讀取到數據後,就須要把數據交給ChannelHandler進行處理app
記錄當前ChannelHandler的環境上下文,大體有如下信息。 每個ChannelHandler都會有一個ChannelHandlerContext與之對應(一對一關係)框架
- 記錄當前ChannelHandler對象
- 標識是inbound仍是outbound
- 所屬的ChannelPipeline
- 前一個ChannelHandlerContext、後一個ChannelHandlerContext。 這樣造成一個處理鏈,相似於SpringMVC中的攔截器鏈。
netty是事件驅動的。在獲取到不一樣的事件後(數據),會作不一樣的業務邏輯處理,這時候有的可能須要多個Handler協做完成,有的Handler可能對當前的事件不作關心,有的可能處理完了,不想要後面的Handler處理了。<br/> 這時候如何對事件進行傳播處理,這時候就須要用到ChannelPipeline了。ChannelPipeline中保存了頭部的ChannelHandlerContext(進來的類型事件會從頭開始)和尾部的ChannelHandlerContext(出去的類型事件會從尾部開始),他們是一個串行連接。socket
Channel和Channel之間都是經過ChannelHandlerContext聯繫起來的。ide
具體內容這篇博客講的很詳細oop
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Discards any incoming data. */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package io.netty.example.time; public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }