Netty實戰四之傳輸(全章)

項目地址:請點擊推文結尾閱讀原文(碼雲免費下載、wiki完整閱讀)java

流經網絡的數據老是具備相同的類型:字節(網絡傳輸——一個幫助咱們抽象底層數據傳輸機制的概念)編程

Netty爲它全部的傳輸實現提供了一個通用的API,即咱們能夠將時間花在其餘更有成效的事情上。設計模式

咱們將經過一個案例來對傳輸進行學習,應用程序只簡單地接收鏈接,向客戶端寫 「Hi!」 ,而後關閉鏈接。安全

一、不經過Netty使用OIO和NIO服務器

先介紹JDK API的應用程序的阻塞(OIO)版本和異步(NIO)版本。網絡

public class PlainNioServer {
    public void server(int port) throws IOException{        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);        ServerSocket ssocket = serverChannel.socket();        InetSocketAddress address = new InetSocketAddress(port);
        //將服務器綁定到選定的端口
        ssocket.bind(address);
        //打開Selector來處理Channel
        Selector selector = Selector.open();
        //將ServerSocket註冊到Selector已接受鏈接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());        for(;;){            try {
                //等待須要處理的新事件;阻塞將一直持續到下一個傳入事件
                selector.select();
            }catch (IOException e){
                e.printStackTrace();
                //handle exception                break;
            }
            //獲取全部接收事件的SelectionKey實例            Set<SelectionKey> readyKeys = selector.selectedKeys();            Iterator<SelectionKey> iterator = readyKeys.iterator();            while (iterator.hasNext()){                SelectionKey key = iterator.next();                iterator.remove();                try {
                    //檢查事件是不是一個新的已經就緒能夠被接受的鏈接                    if (key.isAcceptable()){                        ServerSocketChannel server = (ServerSocketChannel)key.channel();                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        //接受客戶端,並將它註冊到選擇器
                        client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,msg.duplicate());                        System.out.println("Accepted connection from " + client);
                    }
                    //檢查套接字是否已經準備好寫數據                    if (key.isWritable()){                        SocketChannel client = (SocketChannel)key.channel();                        ByteBuffer buffer = (ByteBuffer)key.attachment();                        while(buffer.hasRemaining()){
                            //將數據寫到已鏈接的客戶端                            if (client.write(buffer) == 0){                                break;
                            }
                        }
                        //關閉鏈接
                        client.close();
                    }
                }catch (IOException e){
                    key.cancel();                    try {
                        key.channel().close();
                    }catch (IOException ex){
                        //ignore on close
                    }
                }
            }
        }
    }
}

這段代碼徹底能夠處理中等數量的併發客戶端,可是隨着應用程序變得流行起來,你會發現它並不能很好地伸縮到支撐成千上萬的併發連入鏈接。你決定改用異步網絡編程,可是很快就發現異步API是徹底不一樣的,以致於如今你不得不重寫你的應用程序。多線程

public class PlainOioServer {    public void server(int port) throws IOException{        //將服務器綁定到指定端口
        final ServerSocket socket = new ServerSocket(port);        try {            for (;;){                //接受鏈接
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from " + clientSocket);                //建立一個新的線程來處理該鏈接
                new Thread(new Runnable() {
                    @Override                    public void run() {
                        OutputStream out;                        try {                            out = clientSocket.getOutputStream();                            //將消息寫給已鏈接的客戶端
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            out.flush();                            //關閉鏈接
                            clientSocket.close();
                        }catch (IOException e){
                            e.printStackTrace();
                        }finally {                            try {
                                clientSocket.close();
                            }catch (IOException e){                                //ignore on close
                            }
                        }
                    }                    //啓動線程
                }).start();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

}

雖然這段代碼所作的事情與以前的版本徹底相同,可是代碼卻大相徑庭,若是爲了用於非阻塞I/O而從新實現這個簡單的應用程序,都須要一次徹底重寫的話,那麼不難想象,移植真正複雜的應用程序須要付出什麼樣的努力!併發

二、經過Netty使用OIO和NIO框架

public class NettyOioServer {    public void server(int port) throws Exception{        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();        try {            //建立ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)                    //使用OioEventLoopGroup以容許阻塞模式(舊的I/O)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))                    //指定ChannelInitializer,對於每一個已接受的鏈接都調用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(                                //添加一個ChannelInboundHandlerAdapter以攔截和處理事件
                                new ChannelInboundHandlerAdapter(){                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ctx.writeAndFlush(buf.duplicate())                                                //將消息寫到客戶端,並添加ChannelFutureListener,以便消息一被寫完就關閉鏈接
                                                .addListener(ChannelFutureListener.CLOSE);
                                    }
                                }
                            );
                        }
                    });            //綁定服務器以接受鏈接
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally {            //釋放全部的資源
            group.shutdownGracefully().sync();
        }
    }
}

三、非阻塞的Netty版本異步

public class NettyNioServer {    public void server(int port) throws Exception{        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));        //爲非阻塞模式使用NioEventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();        try {            //建立ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))                    //指定ChannelInitializer,對於每一個已接受的鏈接都調用它
                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(                                    //添加一個ChannelInboundHandlerAdapter以攔截和處理事件
                                    new ChannelInboundHandlerAdapter(){                                        @Override
                                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())                                                    //將消息寫到客戶端,並添加ChannelFutureListener,以便消息一被寫完就關閉鏈接
                                                    .addListener(ChannelFutureListener.CLOSE);
                                        }
                                    }
                            );
                        }
                    });            //綁定服務器以接受鏈接
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally {            //釋放全部的資源
            group.shutdownGracefully().sync();
        }
    }
}

由於Netty爲每種傳輸的實現都暴露了相同的API,因此不管選用哪種傳輸的實現,你的代碼都仍然幾乎不受影響,在全部的狀況下,傳輸的實現都依賴於interface Channel、ChannelPipeline和ChannelHandler。

四、傳輸API

傳輸API的核心是interface Channel ,它被用於全部的I/O操做。Channel類的層次結構如圖4-1(Channel接口的層次結構)所示。
Netty實戰四之傳輸(全章)
如圖所示,每一個Channel都將會將分配一個ChannelPipeline和ChannelConfig。ChannelConfig包含了該Channel的全部配置設置,而且支持熱更新。

因爲特定的傳輸可能具備獨特的設置,因此它可能會實現一個ChannelConfig的子類型。

因爲Channel是獨一無二的,因此爲了保證順序將Channel聲明爲java.lang.Comparable的子接口。所以,若是兩個不一樣的Channel實例都返回相同的散列碼,那麼AbstractChannel中的compareTo()方法的實現將會拋出一個Error。

ChannelPiepeline持有全部將應用於入站和出站數據以及事件的ChannelHandler實例,這些ChannelHandler實現了應用程序用於處理狀態變化以及數據處理的邏輯。

ChannelHandler的典型用途包括:

-將數據從一種格式轉換爲另外一種格式

-提供異常的通知

-提供Channel變爲活動的或者非活動的通知

-提供當Channel註冊到EventLoop或者從EventLoop註銷時的通知

-提供有關用戶自定義事件的通知

攔截過濾器 ChannelPipeline實現了一種常見的設計模式——攔截過濾器(InterceptingFilter)。UNIX管道是另一個熟悉的例子:多個命令被連接在一塊兒,其中一個命令的輸出端將鏈接到命令行中下一個命令的輸入端。

你也能夠根據須要經過添加或者移除ChannelHandler實例來修改ChannelPipeline。經過利用Netty的這項能力能夠構建出高度靈活的應用程序。例如,每當STARTTLS協議被請求時,你能夠簡單地經過向ChannelPipeline添加一個適當的ChannelHandler(SslHandler)來按需地支持STARTTLS協議。
Netty實戰四之傳輸(全章)
考慮一下寫數據並將其沖刷到遠程節點這樣的常規任務,代碼清單4-5演示了使用Channel.writeAndFlush()來實現這一目的。

Channel channel = ...        //建立持有要寫數據的ByteBuf
        ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);        //寫數據並沖刷它
        ChannelFuture cf = channel.writeAndFlush(buf);        //添加ChannelFutureListener以便在寫操做完成後接收通知
        cf.addListener(new ChannelFutureListener() {            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {                //寫操做完成,而且沒有錯誤發生
                if (channelFuture.isSuccess()){
                    System.out.println("Write successful");
                } else {                    //記錄錯誤
                    System.out.println("Write error");
                    channelFuture.cause().printStackTrace();
                }
            }
        });

Netty的Channel實現是線程安全的,所以你能夠存儲一個到Channel的引用,而且每當你須要向遠程節點寫數據時,均可以使用它,即便當時許多線程都在使用它。代碼清單4-6展現了一個多線程寫數據的簡單例子,須要注意的是,消息將會被保證按順序發送的。

final Channel channel = ...        //建立持有要寫數據的ByteBuf
        final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8).retain();        //建立將數據寫到Channel的Runable
        Runnable writer = new Runnable() {            @Override
            public void run() {
                channel.writeAndFlush(buf.duplicate());
            }
        };        //獲取到線程池Executor的引用
        Executor executor = Executors.newCachedThreadPool();        //write in one thread
        //遞交寫任務給線程池以便在某個線程中執行
        executor.execute(writer);        //write in another thread
        //遞交另外一個寫任務以便在另外一個線程中執行
        executor.execute(writer);

五、內置的傳輸

Netty內置了一些可開箱即用的傳輸,由於並非它們全部的傳輸都支持每一種協議,因此你必須選擇一個和你的應用程序所使用的協議相容的傳輸。 下表顯示了全部Netty提供的傳輸

Netty實戰四之傳輸(全章)

六、NIO——非阻塞I/O

NIO提供了一個全部I/O操做的全異步的實現。它利用了自NIO子系統被引入JDK1.4時即可用的基於選擇器的API。

選擇器背後的基本概念是充當一個註冊表,在那裏你將能夠請求在Channel的狀態發生變化時獲得通知。

-新的Channel已被接受而且就緒

-Channel鏈接已經完成

-Channel有已經就緒的可供讀取的數據

-Channel可用於寫數據

選擇器運行在一個檢查狀態變化並對其作出相應響應的線程上,在應用程序對狀態的改變作出響應以後,選擇器將會被重置,並將重複這個過程。

下表中的常量值表明了由class java.nio.channels.SelectionKey定義的位模式。這些位模式能夠組合起來定義一組應用程序正在請求通知的狀態變化集。
Netty實戰四之傳輸(全章)
對於全部Netty的傳輸實現都共有的用戶級別API徹底地隱藏了這些NIO的內部細節。下圖展現了該處理流程。
Netty實戰四之傳輸(全章)

零拷貝:

零拷貝(zero-copy)是一種目前只有在使用NIO和Epoll傳輸時纔可以使用的特性。它使你能夠快速高效地將數據從文件系統移動到網絡接口,而不須要將其從內核空間複製到用戶空間,其在像FTP或者HTTP這樣的協議中能夠顯著地提高性能。可是,並非全部的操做系統都支持這一特性。特別地,它對於實現了數據加密或者壓縮的文件系統是不可用的——只能傳輸文件的原始內容。反過來講,傳輸已被加密的文件則不是問題。

七、Epoll——用於Linux的本地非阻塞傳輸

Linux做爲高性能網絡編程的平臺,其重要性與日俱增,這催生了大量先進特性的開發,其中包括Epoll——一個高度可擴展的I/O事件通知特性,這個API自Linux內核版本2.5.44被引入,提供了比舊的POSIX select和poll系統調用更好的性能,同時如今也是Linux上非阻塞網絡編程的事實標準。Linux JDK NIO API使用了這些epoll調用。

Netty爲Linux提供了一組NIO API,其以一種和它自己的設計更加一致的方式使用epoll,而且以一種更加輕量的方式使用中斷,若是你的應用程序旨在運行於Linux系統,那麼請考慮利用這個版本的傳輸,你將發如今高負載下它的性能要優於JDK的NIO實現。

八、OIO——舊的阻塞I/O

Netty的OIO傳輸實現表明了一種折中:它能夠經過常規的傳輸API使用,可是因爲它是創建在java.net包的阻塞實現上的,因此他不是異步的。

例如,你可能須要移植使用了一些進行阻塞調用的庫(如JDBC)的遺留代碼,而將邏輯轉換爲非阻塞的可能也是不切實際。相反,你能夠在短時間內使用Netty的OIO傳輸,而後再將你的代碼移植到純粹的異步傳輸上。

在 java.net API中,你一般會有一個用來接受到達正在監聽的ServerSocket的新鏈接的線程。會建立一個新的和遠程節點進行交互的套接字,而且會分配一個新的用於處理響應通訊流量的線程。這是必需的,由於某個指定套接字上的任何I/O操做在任意的時間點上均可能會阻塞。使用單個線程來處理多個套接字,很容易致使一個套接字上的阻塞操做也捆綁了全部其餘的套接字。

Netty是如何可以使用和用於異步傳輸相同的API支持OIO的呢?Netty利用了SO_TIMEOUT這個Socket標誌,它指定了等待一個I/O操做完成的最大毫秒數。若是操做在指定的時間間隔內沒有完成,則將會拋出一個SocketTimeout Exception。Netty將捕獲這個異常並繼續處理循環。在EventLoop下一次運行時,它將再次嘗試,這也是Netty這樣的異步框架可以支持OIO的惟一方式。
Netty實戰四之傳輸(全章)

九、用於JVM內部通訊的Local傳輸

Netty提供了一個Local傳輸,用於在同一個JVM中運行的客戶端和服務器程序之間的異步通訊,且也支持對於全部Netty傳輸實現都共同的API。

在這個傳輸中,和服務器Channel相關聯的SocketAddress並無綁定物理網絡地址;相反,只要服務器還在運行,它就會被存儲在註冊表裏,並在Channel關閉時註銷。由於這個傳輸並不接受真正的網絡流量,因此它並不可以和其餘傳輸實現進行互操做。所以,客戶端但願鏈接到(在同一個JVM中)使用了這個傳輸的服務器端時也必須使用它。除了這個限制,它的使用方式和其餘傳輸如出一轍。

十、Embedded傳輸

Netty提供了一種額外的傳輸,使得你能夠將一組ChannelHandler做爲幫助器類嵌入到其餘的ChannelHandler內部。經過這種方式,你將能夠擴展一個CHannelHandler的功能,而又不須要修改其內部代碼。

十一、傳輸的用例

在Linux上啓用SCTP

SCTP須要內核的支持,而且須要安裝用戶庫

例如,對於Ubuntn,可使用下面的命令

sudo apt-get install libsctpl
對於Fedora,可使用yum

sudo yum install kernel-modules-extra.x86_64 lksctp-tools.x86_64
有關如何啓用SCTP的詳細信息,請參考你的Linux發行版的文檔。

雖然只有SCTP傳輸有這些特殊要求,可是其餘傳輸可能也有它們本身的配置選項須要考慮。此外,若是隻是爲了支持更高的併發鏈接數,服務器平臺可能須要配置得和客戶端不同。

——非阻塞代碼庫:若是你的代碼庫中沒有阻塞調用(或者你可以限制它們的範圍),那麼在Linux上使用NIO或者epoll始終是個好主意。雖然NIO/Epoll旨在處理大量的併發鏈接,可是在處理較小數目的併發鏈接時,它也能很好地工做,尤爲是考慮到它在鏈接之間共享線程的方式。

——阻塞代碼庫:若是你的代碼庫嚴重地依賴於阻塞I/O,並且你的應用程序也有一個相應的設計,那麼在你嘗試將其直接轉換爲Netty的NIO傳輸時,你將可能會遇到和阻塞操做相關的問題。不要爲此而重寫你的代碼,能夠考慮分階段遷移:先從OIO開始,等你的代碼修改好以後,在遷移到NIO(或者EPoll,若是你在使用Linux)

——在同一個JVM內部的通訊:同一個JVM內部的通訊,不須要經過網絡暴露服務,是Local傳輸的完美用例。這將消除全部真實網絡操做的開銷,同時仍然使用你的Netty代碼庫。若是隨後須要經過網絡暴露服務,那麼你將只須要把傳輸改成NIO或者OIO便可。

——測試你的ChannelHandler實現:若是你想要爲本身的ChannelHandler實現編寫單元測試,那麼請考慮使用Embedded傳輸。這既便於測試你的代碼,而又不須要建立大量的模擬對象。你的類將仍然符合常規API事件流,保證該Channelhandler在和真實的傳輸一塊兒使用時可以正確地工做。

Netty實戰四之傳輸(全章)

相關文章
相關標籤/搜索