【netty in action】學習筆記-第四章

netty提供了統一的API進行傳輸數據,這個相比於JDK的方式方便不少。好比下面是一個不用netty而使用原生的阻塞IO進行傳輸的例子。數據庫

public class PlainOioServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            for(;;) {
                final Socket clientSocket = socket.accept();
                System.out.println(
                        "Accepted connection from " + clientSocket);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!rn".getBytes(
                                    Charset.forName("UTF-8")));
                            out.flush();
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

代碼很好理解,爲每個新來的鏈接建立一個線程處理。這種方式有個比較大的問題是,客戶端鏈接數受限於服務器所能承受的線程數。爲了改進這個問題咱們可使用異步模式來重寫這段代碼,可是你會發現,幾乎全部的代碼都要重寫。原生的OIO和NIO的API幾乎徹底不能複用。不信你看看下面這段NIO的代碼,安全

public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!rn".getBytes());
        for (;;){
            try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace();
                //handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server =
                                (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {
                        SocketChannel client =
                                (SocketChannel) key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // ignore on close
                    }
                }
            }
        }
    }
}

這個代碼不作過多解釋了,畢竟咱們的重點是netty不是JDK NIO。服務器

用netty實現一個OIO的程序是下面這樣的姿式:多線程

public class NettyOioServer {
    public void server(int port)
            throws Exception {
        final ByteBuf buf =
                Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!rn", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                                ch.pipeline().addLast(
                                    new ChannelInboundHandlerAdapter() {
                                        @Override
                                        public void channelActive(
                                                ChannelHandlerContext ctx)
                                                throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(
                                                            ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

而後若是要改爲異步非阻塞的模式,只須要把OioEventLoopGroup改爲NioEventLoopGroup,把OioServerSocketChannel改爲NioServerSocketChannel,簡單到使人髮指。異步

下面是Channel的類關係圖,socket

image

從這幅圖看出ChannelConfigChannelPipeline都屬於Channel,在代碼中體現爲類的成員。ChannelPipeline其實前面咱們也講過了,它實現了責任鏈模式,把ChannelHandler一個個串起來。經過後者咱們能夠擁有包括但不限於以下的功能:ide

  • 數據的格式轉換
  • 異常通知
  • active或者inactive通知
  • EventLoop註冊或者註銷事件通知
  • 用戶自定義事件通知

下面列舉了一些Channle自己提供的重要方法。工具

方法名oop

解釋測試

eventLoop()

返回分配到channel上的eventloop

pipeline()

返回分配到channel上的channelpipeline

isActive()

返回到channel是否鏈接到一個遠程服務

localAddress()

返回本地綁定的socketAddress

remoteAddress()

返回遠程綁定的socketAddress

write()

寫入數據到遠程(客戶端或者服務端),數據會通過channelpipeline

有些方法咱們已經在前面的示例中見過了。來看下write()方法的使用示例:

Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
        ChannelFuture cf = channel.writeAndFlush(buf);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    System.out.println("Write successful");
                } else {
                    System.err.println("Write error");
                    future.cause().printStackTrace();
                }
            }
        });

簡單解釋下,

buf裏是要寫的數據,而後調用write方法寫入數據,返回一個寫入的future結果。前面已經說過這個future了,咱們給future添加一個監聽器,以便寫入成功後能夠經過回調獲得通知。

另外write這個方法也是線程安全的,下面是一個用多線程操做write方法的示例,

final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        final ByteBuf buf = Unpooled.copiedBuffer("your data",
                CharsetUtil.UTF_8);
        Runnable writer = new Runnable() {
            @Override
            public void run() {
                channel.write(buf.duplicate());
            }
        };
        Executor executor = Executors.newCachedThreadPool();

        // write in one thread
        executor.execute(writer);

        // write in another thread
        executor.execute(writer);
        //...
    }

netty保證write方法線程安全的原理,是將用戶線程的操做封裝成Task放入消息隊列中,底層由同一個I/O線程負責執行,這樣就實現了局部無鎖化。

這部分要解釋清楚須要深刻到源碼底層,由於本篇系列是netty in action的筆記系列就很少說了。後面可能考慮寫一個源碼解析系列在深刻這一塊。

支持的四種傳輸方式

NIO

這是netty最多見的使用場景。當channel狀態變動時用戶能夠收到通知,有如下幾個狀態:

  • 新的channel被accept
  • channel鏈接成功
  • channel收到數據
  • channel發送數據

image

如上圖所示,netty內部其實也是封裝了JDK的NIO,使用selector來管理IO狀態的變動。在前面的章節裏咱們其實給過JDK NIO的代碼示例,這裏就不貼出來了。

netty NIO模型裏有一個不得不說的特性叫zero-file-copy,不少地方翻譯成零拷貝。這種特性可讓咱們直接在文件系統和網卡傳輸數據,避免了數據從內核空間到用戶空間的拷貝。

OIO

OIO是在netty裏是一種折中的存在,阻塞的方式儘管應用場景不多,可是不表明不存在。好比經過jdbc調用數據庫,若是是異步的方案是不太合適的。

netty的OIO模型底層也是調用JDK,前面的筆記咱們也給過示例。這種模型就是用一個線程處理監聽(accetp),而後爲每一個成功的鏈接建立一個處理線程。這樣作的目的是防止對於某個鏈接的處理阻塞影響其它鏈接,畢竟I/O操做是很容易引發阻塞的。

既然是阻塞的模型,netty的封裝能作的工做也有限。netty只是給socket上加了SO_TIMEOUT,這樣若是一個操做在超時時間內沒有完成,就會拋出SocketTimeoutException,netty會捕獲這個異常,而後繼續後面的流程。而後就是下一個EventLoop執行,循環往復。這種處理方案弊端在於拋出異常的開銷,由於異常會佔用堆棧。

image

這個圖就是對上面的歸納,分配一個線程給socket,socket鏈接服務器而後讀數據,讀數據可能阻塞也可能成功。若是是前者捕獲異常後再次重試。

Local In VM transport

netty包含對本地傳輸的支持,這個傳輸實現使用相同的API用於虛擬機之間的通訊,傳輸是徹底異步的。

每一個Channel使用惟一的SocketAddress,客戶端經過使用SocketAddress進行鏈接,在服務器會被註冊爲長期運行,一旦通道關閉,它會自動註銷,客戶端沒法再使用它。

使用本地傳輸服務器的行爲與其餘的傳輸實現幾乎是相同的,須要注意的一個重點是隻能在本地的服務器和客戶端上使用它們。

Embedded transport

Embedded transport可讓你更容易的在不一樣的ChannelHandler之間的交互,更多的時候它像是一個工具類。通常用於測試的場景。它自帶了一個具體的Channel實現,EmbeddedChannel。好比下面是一個使用示例:

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException("frameLength must be positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
}
@Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
        Assert.assertTrue(channel.writeInbound(input.retain()));
        Assert.assertTrue(channel.finish());

        ByteBuf read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        read = channel.readInbound();
        Assert.assertEquals(buf.readSlice(3), read);
        read.release();

        Assert.assertNull(channel.readInbound());
        buf.release();
    }

用到的幾個方法解釋下,

  • writeInbound 將入站消息寫到EmbeddedChannel中。若是能夠經過readInbound方法從EmbeddedChannel中讀取數據,則返回true
  • readInbound 從EmbeddedChannel中讀取入站消息。任何返回東西都通過整個ChannelPipeline。若是沒有任何可供讀取的,則返回null
  • finish 將EmbeddedChannel標記爲完成,若是有可讀取的入站或出站數據,則返回true。這個方法還將會調用EmbeddedChannel上的close方法

更多的使用細節能夠去網上了解下。

相關文章
相關標籤/搜索