netty提供了統一的API進行傳輸數據,這個相比於JDK的方式方便不少。好比下面是一個不用netty而使用原生的阻塞IO進行傳輸的例子。數據庫
public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for(;;) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!rn".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch (IOException e) { e.printStackTrace(); } } }
代碼很好理解,爲每個新來的鏈接建立一個線程處理。這種方式有個比較大的問題是,客戶端鏈接數受限於服務器所能承受的線程數。爲了改進這個問題咱們可使用異步模式來重寫這段代碼,可是你會發現,幾乎全部的代碼都要重寫。原生的OIO和NIO的API幾乎徹底不能複用。不信你看看下面這段NIO的代碼,安全
public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!rn".getBytes()); for (;;){ try { selector.select(); } catch (IOException ex) { ex.printStackTrace(); //handle exception break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println( "Accepted connection from " + client); } if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }
這個代碼不作過多解釋了,畢竟咱們的重點是netty不是JDK NIO。服務器
用netty實現一個OIO的程序是下面這樣的姿式:多線程
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!rn", Charset.forName("UTF-8"))); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ChannelInboundHandlerAdapter() { @Override public void channelActive( ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buf.duplicate()) .addListener( ChannelFutureListener.CLOSE); } }); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
而後若是要改爲異步非阻塞的模式,只須要把OioEventLoopGroup
改爲NioEventLoopGroup
,把OioServerSocketChannel
改爲NioServerSocketChannel
,簡單到使人髮指。異步
下面是Channel
的類關係圖,socket
從這幅圖看出ChannelConfig
和ChannelPipeline
都屬於Channel
,在代碼中體現爲類的成員。ChannelPipeline
其實前面咱們也講過了,它實現了責任鏈模式,把ChannelHandler
一個個串起來。經過後者咱們能夠擁有包括但不限於以下的功能:ide
下面列舉了一些Channle
自己提供的重要方法。工具
方法名oop
解釋測試
eventLoop()
返回分配到channel上的eventloop
pipeline()
返回分配到channel上的channelpipeline
isActive()
返回到channel是否鏈接到一個遠程服務
localAddress()
返回本地綁定的socketAddress
remoteAddress()
返回遠程綁定的socketAddress
write()
寫入數據到遠程(客戶端或者服務端),數據會通過channelpipeline
有些方法咱們已經在前面的示例中見過了。來看下write()
方法的使用示例:
Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); ChannelFuture cf = channel.writeAndFlush(buf); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { System.out.println("Write successful"); } else { System.err.println("Write error"); future.cause().printStackTrace(); } } });
簡單解釋下,
buf裏是要寫的數據,而後調用write方法寫入數據,返回一個寫入的future結果。前面已經說過這個future了,咱們給future添加一個監聽器,以便寫入成功後能夠經過回調獲得通知。
另外write這個方法也是線程安全的,下面是一個用多線程操做write方法的示例,
final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere final ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); Runnable writer = new Runnable() { @Override public void run() { channel.write(buf.duplicate()); } }; Executor executor = Executors.newCachedThreadPool(); // write in one thread executor.execute(writer); // write in another thread executor.execute(writer); //... }
netty保證write方法線程安全的原理,是將用戶線程的操做封裝成Task放入消息隊列中,底層由同一個I/O線程負責執行,這樣就實現了局部無鎖化。
這部分要解釋清楚須要深刻到源碼底層,由於本篇系列是netty in action的筆記系列就很少說了。後面可能考慮寫一個源碼解析系列在深刻這一塊。
這是netty最多見的使用場景。當channel狀態變動時用戶能夠收到通知,有如下幾個狀態:
如上圖所示,netty內部其實也是封裝了JDK的NIO,使用selector來管理IO狀態的變動。在前面的章節裏咱們其實給過JDK NIO的代碼示例,這裏就不貼出來了。
netty NIO模型裏有一個不得不說的特性叫zero-file-copy
,不少地方翻譯成零拷貝。這種特性可讓咱們直接在文件系統和網卡傳輸數據,避免了數據從內核空間到用戶空間的拷貝。
OIO是在netty裏是一種折中的存在,阻塞的方式儘管應用場景不多,可是不表明不存在。好比經過jdbc調用數據庫,若是是異步的方案是不太合適的。
netty的OIO模型底層也是調用JDK,前面的筆記咱們也給過示例。這種模型就是用一個線程處理監聽(accetp),而後爲每一個成功的鏈接建立一個處理線程。這樣作的目的是防止對於某個鏈接的處理阻塞影響其它鏈接,畢竟I/O操做是很容易引發阻塞的。
既然是阻塞的模型,netty的封裝能作的工做也有限。netty只是給socket上加了SO_TIMEOUT
,這樣若是一個操做在超時時間內沒有完成,就會拋出SocketTimeoutException
,netty會捕獲這個異常,而後繼續後面的流程。而後就是下一個EventLoop執行,循環往復。這種處理方案弊端在於拋出異常的開銷,由於異常會佔用堆棧。
這個圖就是對上面的歸納,分配一個線程給socket,socket鏈接服務器而後讀數據,讀數據可能阻塞也可能成功。若是是前者捕獲異常後再次重試。
netty包含對本地傳輸的支持,這個傳輸實現使用相同的API用於虛擬機之間的通訊,傳輸是徹底異步的。
每一個Channel使用惟一的SocketAddress,客戶端經過使用SocketAddress進行鏈接,在服務器會被註冊爲長期運行,一旦通道關閉,它會自動註銷,客戶端沒法再使用它。
使用本地傳輸服務器的行爲與其餘的傳輸實現幾乎是相同的,須要注意的一個重點是隻能在本地的服務器和客戶端上使用它們。
Embedded transport可讓你更容易的在不一樣的ChannelHandler之間的交互,更多的時候它像是一個工具類。通常用於測試的場景。它自帶了一個具體的Channel實現,EmbeddedChannel
。好比下面是一個使用示例:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException("frameLength must be positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= frameLength) { ByteBuf buf = in.readBytes(frameLength); out.add(buf); } } }
@Test public void testFramesDecoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3)); Assert.assertTrue(channel.writeInbound(input.retain())); Assert.assertTrue(channel.finish()); ByteBuf read = channel.readInbound(); Assert.assertEquals(buf.readSlice(3), read); read.release(); read = channel.readInbound(); Assert.assertEquals(buf.readSlice(3), read); read.release(); read = channel.readInbound(); Assert.assertEquals(buf.readSlice(3), read); read.release(); Assert.assertNull(channel.readInbound()); buf.release(); }
用到的幾個方法解釋下,
更多的使用細節能夠去網上了解下。