第四章:Transports(傳輸)

本章內容java

Transports(傳輸)bootstrap

NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式)安全

Use-case(用例)服務器

APIs(接口)網絡

網絡應用程序都是以字節碼傳輸。Java開發網絡程序傳輸數據的過程和方式是被抽象了的,咱們不須要關注底層接口,只須要使用Java API或其餘網絡框架如Netty就能達到傳輸數據的目的。發送數據和接收數據都是字節碼。Nothing more,nothing less。多線程

咱們拿Netty的API和Java的API作比較來告訴你爲何Netty能夠更容易的使用?併發

4.1 案例研究:切換傳輸方式app

4.1.1 使用Java的I/O和NIO

  咱們將不用Netty實現這個例子,下面代碼是使用阻塞IO實現的例子:框架

package netty.in.action;  
import java.io.IOException;  
import java.io.OutputStream;  
import java.net.ServerSocket;  
import java.net.Socket;  
import java.nio.charset.Charset;  
/** 
 * Blocking networking without Netty 
 * @author c.k 
 * 
 */  
public class PlainOioServer {  
      
    public void server(int port) throws Exception {  
        //bind server to port  
        final ServerSocket socket = new ServerSocket(port);  
        try {  
            while(true){  
                //accept connection  
                final Socket clientSocket = socket.accept();  
                System.out.println("Accepted connection from " + clientSocket);  
                //create new thread to handle connection  
                new Thread(new Runnable() {  
                    @Override  
                    public void run() {  
                        OutputStream out;  
                        try{  
                            out = clientSocket.getOutputStream();  
                            //write message to connected client  
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));  
                            out.flush();  
                            //close connection once message written and flushed  
                            clientSocket.close();  
                        }catch(IOException e){  
                            try {  
                                clientSocket.close();  
                            } catch (IOException e1) {  
                                e1.printStackTrace();  
                            }  
                        }  
                    }  
                }).start();//start thread to begin handling  
            }  
        }catch(Exception e){  
            e.printStackTrace();  
            socket.close();  
        }  
    }  
}

這種阻塞模式在大鏈接數的狀況就會有很嚴重的問題,如客戶端鏈接超時,服務器響應嚴重延遲。爲了解決這種狀況,咱們可使用異步網絡處理全部的併發鏈接。less

問題在於NIO和OIO的API是徹底不一樣的,因此一個用OIO開發的網絡應用程序想要使用NIO重構代碼幾乎是從新開發。

下面代碼是使用Java NIO實現的例子:

package netty.in.action;  
  
import java.net.InetSocketAddress;  
import java.net.ServerSocket;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  
/** 
 * Asynchronous networking without Netty 
 * @author c.k 
 * 
 */  
public class PlainNioServer {  
    public void server(int port) throws Exception {  
        System.out.println("Listening for connections on port " + port);  
        //open Selector that handles channels  
        Selector selector = Selector.open();  
        //open ServerSocketChannel  
        ServerSocketChannel serverChannel = ServerSocketChannel.open();  
        //get ServerSocket  
        ServerSocket serverSocket = serverChannel.socket();  
        //bind server to port  
        serverSocket.bind(new InetSocketAddress(port));  
        //set to non-blocking  
        serverChannel.configureBlocking(false);  
        //register ServerSocket to selector and specify that it is interested in new accepted clients  
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);  
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());  
        while (true) {  
            //Wait for new events that are ready for process. This will block until something happens  
            int n = selector.select();  
            if (n > 0) {  
                //Obtain all SelectionKey instances that received events  
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  
                while (iter.hasNext()) {  
                    SelectionKey key = iter.next();  
                    iter.remove();  
                    try {  
                        //Check if event was because new client ready to get accepted  
                        if (key.isAcceptable()) {  
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();  
                            SocketChannel client = server.accept();  
                            System.out.println("Accepted connection from " + client);  
                            client.configureBlocking(false);  
                            //Accept client and register it to selector  
                            client.register(selector, SelectionKey.OP_WRITE, msg.duplicate());  
                        }  
                        //Check if event was because socket is ready to write data  
                        if (key.isWritable()) {  
                            SocketChannel client = (SocketChannel) key.channel();  
                            ByteBuffer buff = (ByteBuffer) key.attachment();  
                            //write data to connected client  
                            while (buff.hasRemaining()) {  
                                if (client.write(buff) == 0) {  
                                    break;  
                                }  
                            }  
                            client.close();//close client  
                        }  
                    } catch (Exception e) {  
                        key.cancel();  
                        key.channel().close();  
                    }  
                }  
            }  
        }  
    }  
}

如你所見,即便它們實現的功能是同樣,可是代碼徹底不一樣。下面咱們將用Netty來實現相同的功能。

4.1.2 Netty中使用I/O和NIO

下面代碼是使用Netty做爲網絡框架編寫的一個阻塞IO例子:

package netty.in.action;  
import java.net.InetSocketAddress;  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.buffer.ByteBuf;  
import io.netty.buffer.Unpooled;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelFutureListener;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInboundHandlerAdapter;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.oio.OioServerSocketChannel;  
import io.netty.util.CharsetUtil;  
public class NettyOioServer {  
  
    public void server(int port) throws Exception {  
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));  
        //事件循環組  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            //用來引導服務器配置  
            ServerBootstrap b = new ServerBootstrap();  
            //使用OIO阻塞模式  
            b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
            //指定ChannelInitializer初始化handlers  
                    .childHandler(new ChannelInitializer<Channel>() {  
                        @Override  
                        protected void initChannel(Channel ch) throws Exception {  
                            //添加一個「入站」handler到ChannelPipeline  
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
                                @Override  
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {  
                                    //鏈接後,寫消息到客戶端,寫完後便關閉鏈接  
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);  
                                }  
                            });  
                        }  
                    });  
            //綁定服務器接受鏈接  
            ChannelFuture f = b.bind().sync();  
            f.channel().closeFuture().sync();  
        } catch (Exception e) {  
            //釋放全部資源  
            group.shutdownGracefully();  
        }  
    }  
}

面代碼實現功能同樣,但結構清晰明瞭,這只是Netty的優點之一。

4.1.3 Netty中實現異步支持

下面代碼是使用Netty實現異步,能夠看出使用Netty由OIO切換到NIO是很是的方便。

package netty.in.action;  
  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.buffer.ByteBuf;  
import io.netty.buffer.Unpooled;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelFutureListener;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInboundHandlerAdapter;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioServerSocketChannel;  
import io.netty.util.CharsetUtil;  
import java.net.InetSocketAddress;  
public class NettyNioServer {  
    public void server(int port) throws Exception {  
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));  
        // 事件循環組  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            // 用來引導服務器配置  
            ServerBootstrap b = new ServerBootstrap();  
            // 使用NIO異步模式  
            b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
            // 指定ChannelInitializer初始化handlers  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            // 添加一個「入站」handler到ChannelPipeline  
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
                                @Override  
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {  
                                    // 鏈接後,寫消息到客戶端,寫完後便關閉鏈接  
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);  
                                }  
                            });  
                        }  
                    });  
            // 綁定服務器接受鏈接  
            ChannelFuture f = b.bind().sync();  
            f.channel().closeFuture().sync();  
        } catch (Exception e) {  
            // 釋放全部資源  
            group.shutdownGracefully();  
        }  
    }  
}

由於Netty使用相同的API來實現每一個傳輸,它並不關心你使用什麼來實現。Netty經過操做Channel接口和ChannelPipeline、ChannelHandler來實現傳輸。

4.2 Transport API

 傳輸API的核心是Channel接口,它用於全部出站的操做。Channel接口的類層次結構以下

如上圖所示,每一個Channel都會分配一個ChannelPipeline和ChannelConfig。

ChannelConfig負責設置並存儲配置,並容許在運行期間更新它們。傳輸通常有特定的配置設置,只做用於傳輸,沒有其餘的實現。

ChannelPipeline容納了使用的ChannelHandler實例,這些ChannelHandler將處理通道傳遞的「入站」和「出站」數據。ChannelHandler的實現容許你改變數據狀態和傳輸數據。

如今咱們可使用ChannelHandler作下面一些事情:

    a. 傳輸數據時,將數據從一種格式轉換到另外一種格式

    b. 異常通知

    c. Channel變爲有效或無效時得到通知

    d. Channel被註冊或從EventLoop中註銷時得到通知

    e. 通知用戶特定事件

這些ChannelHandler實例添加到ChannelPipeline中,在ChannelPipeline中按順序逐個執行。它相似於一個鏈條,有使用過Servlet的讀者可能會更容易理解。

ChannelPipeline實現了攔截過濾器模式,這意味着咱們鏈接不一樣的ChannelHandler來攔截並處理通過ChannelPipeline的數據或事件。

能夠把ChannelPipeline想象成UNIX管道,它容許不一樣的命令鏈(ChannelHandler至關於命令)。

你還能夠在運行時根據須要添加ChannelHandler實例到ChannelPipeline或從ChannelPipeline中刪除,這能幫助咱們構建高度靈活的Netty程序。

此外,訪問指定的ChannelPipeline和ChannelConfig,你能在Channel自身上進行操做。

Channel提供了不少方法,以下列表:

    eventLoop(),返回分配給Channel的EventLoop

    pipeline(),返回分配給Channel的ChannelPipeline

    isActive(),返回Channel是否激活,已激活說明與遠程鏈接對等

    localAddress(),返回已綁定的本地SocketAddress

    remoteAddress(),返回已綁定的遠程SocketAddress

    write(),寫數據到遠程客戶端,數據經過ChannelPipeline傳輸過去

寫數據到遠程已鏈接客戶端能夠調用Channel.write()方法,以下代碼:

Channel channel = ...  
//Create ByteBuf that holds data to write  
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);  
//Write data  
ChannelFuture cf = channel.write(buf);  
//Add ChannelFutureListener to get notified after write completes  
cf.addListener(new ChannelFutureListener() {  
    @Override  
    public void operationComplete(ChannelFuture future) {  
        //Write operation completes without error  
        if (future.isSuccess()) {  
            System.out.println(.Write successful.);  
        } else {  
            //Write operation completed but because of error  
            System.err.println(.Write error.);  
            future.cause().printStacktrace();  
        }  
    }  
});

Channel是線程安全(thread-safe)的,它能夠被多個不一樣的線程安全的操做,在多線程環境下,全部的方法都是安全的。正由於Channel是安全的,咱們存儲對Channel的引用,並在學習的時候使用它寫入數據到遠程已鏈接的客戶端,使用多線程也是如此。

下面的代碼是一個簡單的多線程例子:

final Channel channel = ...  
//Create ByteBuf that holds data to write  
final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);  
//Create Runnable which writes data to channel  
Runnable writer = new Runnable() {  
    @Override  
    public void run() {  
        channel.write(buf.duplicate());  
    }  
};  
//Obtain reference to the Executor which uses threads to execute tasks  
Executor executor = Executors.newChachedThreadPool();  
// write in one thread  
//Hand over write task to executor for execution in thread  
executor.execute(writer);  
// write in another thread  
//Hand over another write task to executor for execution in thread  
executor.execute(writer);

此外,這種方法保證了寫入的消息以相同的順序經過寫入它們的方法。想了解全部方法的使用能夠參考Netty API文檔。

4.3 Netty包含的傳輸實現

Netty中的傳輸方式有以下幾種:

    NIO,io.netty.channel.socket.nio,基於java.nio.channels的工具包,使用選擇器做爲基礎的方法。

    OIO,io.netty.channel.socket.oio,基於java.net的工具包,使用阻塞流。

    Local,io.netty.channel.local,用來在虛擬機之間本地通訊。

    Embedded,io.netty.channel.embedded,嵌入傳輸,它容許在沒有真正網絡的運輸中使用ChannelHandler,能夠很是有用的來測試ChannelHandler的實現。

4.3.1 NIO - Nonblocking I/O

NIO傳輸是目前最經常使用的方式,它經過使用選擇器提供了徹底異步的方式操做全部的I/O,NIO從Java 1.4才被提供。NIO中,咱們能夠註冊一個通道或得到某個通道的改變的狀態,通道狀態有下面幾種改變:

    一個新的Channel被接受並已準備好

    Channel鏈接完成

    Channel中有數據並已準備好讀取

    Channel發送數據出去

處理完改變的狀態後需從新設置他們的狀態,用一個線程來檢查是否有已準備好的Channel,若是有則執行相關事件。在這裏可能只同時一個註冊的事件而忽略其餘的。選擇器所支持的操做在SelectionKey中定義,具體以下:

    OP_ACCEPT,有新鏈接時獲得通知

    OP_CONNECT,鏈接完成後獲得通知

    OP_READ,準備好讀取數據時獲得通知

    OP_WRITE,寫入數據到通道時獲得通知

Netty中的NIO傳輸就是基於這樣的模型來接收和發送數據,經過封裝將本身的接口提供給用戶使用,這徹底隱藏了內部實現。如前面所說,Netty隱藏內部的實現細節,將抽象出來的API暴露出來供使用,下面是處理流程圖:

NIO在處理過程也會有必定的延遲,若鏈接數不大的話,延遲通常在毫秒級,可是其吞吐量依然比OIO模式的要高。Netty中的NIO傳輸是「zero-file-copy」,也就是零文件複製,這種機制可讓程序速度更快,更高效的從文件系統中傳輸內容,零複製就是咱們的應用程序不會將發送的數據先複製到JVM堆棧在進行處理,而是直接從內核空間操做。接下來咱們將討論OIO傳輸,它是阻塞的。

4.3.2 OIO - Old blocking I/O

OIO就是java中提供的Socket接口,java最開始只提供了阻塞的Socket,阻塞會致使程序性能低。下面是OIO的處理流程圖,若想詳細瞭解,能夠參閱其餘相關資料。

4.3.3 Local - In VM transport

Netty包含了本地傳輸,這個傳輸實現使用相同的API用於虛擬機之間的通訊,傳輸是徹底異步的。每一個Channel使用惟一的SocketAddress,客戶端經過使用SocketAddress進行鏈接,在服務器會被註冊爲長期運行,一旦通道關閉,它會自動註銷,客戶端沒法再使用它。

鏈接到本地傳輸服務器的行爲與其餘的傳輸實現幾乎是相同的,須要注意的一個重點是隻能在本地的服務器和客戶端上使用它們。Local未綁定任何Socket,值提供JVM進程之間的通訊。

4.3.4 Embedded transport

Netty還包括嵌入傳輸,與以前講述的其餘傳輸實現比較,它是否是一個真的傳輸呢?若不是一個真的傳輸,咱們用它能夠作什麼呢?Embedded transport容許更容易的使用不一樣的ChannelHandler之間的交互,這也更容易嵌入到其餘的ChannelHandler實例並像一個輔助類同樣使用它們。它通常用來測試特定的ChannelHandler實現,也能夠在ChannelHandler中從新使用一些ChannelHandler來進行擴展,爲了實現這樣的目的,它自帶了一個具體的Channel實現,即:EmbeddedChannel。

4.4 每種傳輸方式在何時使用?

很少加贅述,看下面列表:

    OIO,在低鏈接數、須要低延遲時、阻塞時使用

    NIO,在高鏈接數時使用

    Local,在同一個JVM內通訊時使用

    Embedded,測試ChannelHandler時使用

相關文章
相關標籤/搜索