Netty 是一款異步的事件驅動的網絡應用程序框架,支持快速地開發可維護的高性能的面向協議的服務器和客戶端。咱們能夠很簡單的使用Netty 構建應用程序,你沒必要是一名網絡編程專家;並且Netty 比直接使用底層的Java API 容易得多,它推崇良好的設計實踐,能夠將你的應用程序邏輯和網絡層解耦。java
在咱們開始首次深刻地瞭解Netty 以前,請仔細審視表1-1 中所總結的關鍵特性。有些是技術性的,而其餘的更多的則是關於架構或設計哲學的。在本書的學習過程當中,咱們將不止一次地從新審視它們。編程
在深刻netty以前,咱們先來簡單說說NIO;咱們都知道,它和之前的普通I/O相比的最大優點在於它是非阻塞的。bootstrap
由於普通I/O的阻塞,之前咱們設計併發只能像下圖這樣,爲每一個I/O分配一個線程:安全
這顯然帶來了一些問題:服務器
Java 對於非阻塞I/O 的支持是在2002 年引入的,位於JDK 1.4 的java.nio 包中。下圖展現了一個非阻塞設計,其實際上消除了普通I/O的那些弊端。選擇器使得咱們可以經過較少的線程即可監視許多鏈接上的事件。
class java.nio.channels.Selector 是Java 的非阻塞I/O 實現的關鍵。它使用了事件通知API以肯定在一組非阻塞套接字中有哪些已經就緒可以進行I/O 相關的操做。由於能夠在任何的時間檢查任意的讀操做或者寫操做的完成狀態,因此如上圖 所示,一個單一的線程即可以處理多個併發的鏈接。網絡
整體來看,與阻塞I/O 模型相比,這種模型提供了更好的資源管理:多線程
在本節中我將要討論Netty 的主要構件塊:架構
這些構建塊表明了不一樣類型的構造:資源、邏輯以及通知。你的應用程序將使用它們來訪問網絡以及流經網絡的數據。併發
對於每一個組件來講,咱們都將提供一個基本的定義,而且在適當的狀況下,還會提供一個簡單的示例代碼來講明它的用法。框架
基本的I/O 操做(bind()、connect()、read()和write())依賴於底層網絡傳輸所提供的原語。在基於Java 的網絡編程中,其基本的構造是class Socket。Netty 的Channel 接口所提供的API,大大地下降了直接使用Socket 類的複雜性。
Channel 是Java NIO 的一個基本構造。它表明一個到實體(如一個硬件設備、一個文件、一個網絡套接字或者一個可以執行一個或者多個不一樣的I/O操做的程序組件)的開放鏈接,如讀操做和寫操做。
目前,能夠把Channel 看做是傳入(入站)或者傳出(出站)數據的載體。所以,它能夠被打開或者被關閉,鏈接或者斷開鏈接。
一個回調其實就是一個方法,一個指向已經被提供給另一個方法的方法的引用。這使得後者能夠在適當的時候調用前者。回調在普遍的編程場景中都有應用,並且也是在操做完成後通知相關方最多見的方式之一。
Future 提供了另外一種在操做完成時通知應用程序的方式。這個對象能夠看做是一個異步操做的結果的佔位符;它將在將來的某個時刻完成,並提供對其結果的訪問。
JDK 預置了interface java.util.concurrent.Future,可是其所提供的實現,只容許手動檢查對應的操做是否已經完成,或者一直阻塞直到它完成。這是很是繁瑣的,因此Netty提供了它本身的實現——ChannelFuture,用於在執行異步操做的時候使用。
ChannelFuture提供了幾種額外的方法,這些方法使得咱們可以註冊一個或者多個ChannelFutureListener實例。監聽器的回調方法operationComplete(),將會在對應的操做完成時被調用。而後監聽器能夠判斷該操做是成功地完成了仍是出錯了。若是是後者,咱們能夠檢索產生的Throwable。簡而言之,由ChannelFutureListener提供的通知機制消除了手動檢查對應的操做是否完成的必要。
下面展現了一個異步地鏈接到遠程節點,ChannelFuture 做爲一個I/O 操做的一部分返回的例子。這裏,connect()方法將會直接返回,而不會阻塞。
Channel channel = ...; ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25));
下面的代碼顯示瞭如何利用ChannelFutureListener。首先,要鏈接到遠程節點上。而後,要註冊一個新的ChannelFutureListener 到對connect()方法的調用所返回的ChannelFuture 上。當該監聽器被通知鏈接已經創建的時候,要檢查對應的狀態。若是該操做是成功的,那麼將數據寫到該Channel。不然,要從ChannelFuture 中檢索對應的Throwable。
Channel channel = ...; // 鏈接遠程節點 ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25)); //註冊一個ChannelFutureListener,以便在操做完成時得到通知 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { //狀態判斷 if (future.isSuccess()){ //若是操做是成功的,則建立一個ByteBuf 以持有數據 ByteBuf buffer = Unpooled.copiedBuffer( "Hello",Charset.defaultCharset()); //將數據異步地發送到遠程節點。返回一個ChannelFuture ChannelFuture wf = future.channel() .writeAndFlush(buffer); .... } else { //若是發生錯誤,則訪問描述緣由的Throwable Throwable cause = future.cause(); cause.printStackTrace(); } } });
若是你把ChannelFutureListener 看做是回調的一個更加精細的版本,那麼你是對的。事實上,回調和Future 是相互補充的機制;它們相互結合,構成了Netty 自己的關鍵構件塊之一。
Netty 使用不一樣的事件來通知咱們狀態的改變或者是操做的狀態。這使得咱們可以基於已經發生的事件來觸發適當的動做。這些動做多是:
Netty 是一個網絡編程框架,因此事件是按照它們與入站或出站數據流的相關性進行分類的。可能由入站數據或者相關的狀態更改而觸發的事件包括:
出站事件是將來將會觸發的某個動做的操做結果,這些動做包括:
從應用程序開發人員的角度來看,Netty 的主要組件是ChannelHandler,它充當了全部處理入站和出站數據的應用程序邏輯的容器。該組件實現了服務器對從客戶端接收的數據的處理。每一個事件均可以被分發給ChannelHandler 類中的某個用戶實現的方法。這是一個很好的將事件驅動範式直接轉換爲應用程序構件塊的例子。圖1-3 展現了一個事件是如何被一個這樣的ChannelHandler 鏈處理的。
Netty 的ChannelHandler 爲處理器提供了基本的抽象,如圖1-3 所示的那些。你能夠認爲每一個ChannelHandler 的實例都相似於一種爲了響應特定事件而被執行的回調。
下列代碼就是一個handler的示例:
@ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //重寫了channelActive()方法,其將在一個鏈接創建時被調用 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //重寫了channelRead0()方法。每當接收數據時,都會調用這個方法。 //須要注意的是,由服務器發送的消息可能會被分塊接收。 // 也就是說,若是服務器發送了5 字節,那麼不能保證這5 字節會被一次性接收。 //即便是對於這麼少許的數據,channelRead0()方法也可能 // 會被調用兩次,第一次使用一個持有3 字節的ByteBuf(Netty 的字節容器) // 第二次使用一個持有2 字節的ByteBuf。 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); } //發生異常時被調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
channelHandler的主要抽象方法都定義於ChannelHandlerAdapter類中,咱們經過重寫適當的方法,來控制整個生命週期的重要節點的邏輯。
Netty 提供了大量預約義的能夠開箱即用的ChannelHandler 實現,包括用於各類協議(如HTTP 和SSL/TLS)的ChannelHandler。在內部,ChannelHandler 本身也使用了事件和Future,使得它們也成爲了你的應用程序將使用的相同抽象的消費者。
ChannelPipeline 提供了ChannelHandler 鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。當Channel 被建立時,它會被自動地分配到它專屬的ChannelPipeline。
ChannelHandler 安裝到ChannelPipeline 中的過程以下所示:
ServerBootstrap b = new ServerBootstrap(); //一個ChannelInitializer的實現被註冊到了ServerBootstrap中①; b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { //當ChannelInitializer.initChannel()方法被調用時ChannelInitializer將在 //ChannelPipeline 中安裝一組自定義的ChannelHandler serverHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } });
EventLoop 定義了Netty 的核心抽象,用於處理鏈接的生命週期中所發生的事件。圖3-1
下圖在高層次上說明了Channel、EventLoop、Thread 以及EventLoopGroup 之間的關係。
這些關係是:
注意,在這種設計中,一個給定Channel 的I/O 操做都是由相同的Thread 執行的,實際
上消除了不一樣線程間對於同步的須要。
Netty 的引導類爲應用程序的網絡層配置提供了容器,這涉及將一個進程綁定到某個指定的端口(ServerBootstrap),或者將一個進程鏈接到另外一個運行在某個指定主機的指定端口上的進程(Bootstrap)。Netty提供兩種類型的引導,一種用於客戶端(簡單地稱爲Bootstrap),而另外一種(ServerBootstrap)用於服務器。不管你的應用程序使用哪一種協議或者處理哪一種類型的數據,惟一決定它使用哪一種引導類的是它是做爲一個客戶端仍是做爲一個服務器。表3-1 比較了這兩種類型的引導類。
這兩種類型的引導類之間的第一個區別已經討論過了:ServerBootstrap 將綁定到一個端口,由於服務器必需要監聽鏈接,而Bootstrap 則是由想要鏈接到遠程節點的客戶端應用程序所使用的。
第二個區別可能更加明顯。引導一個客戶端只須要一個EventLoopGroup,可是一個ServerBootstrap 則須要兩個(也能夠是同一個實例)。爲何呢?
由於服務器須要兩組不一樣的Channel。第一組將只包含一個ServerChannel,表明服務器自身的已綁定到某個本地端口的正在監聽的套接字。而第二組將包含全部已建立的用來處理傳入客戶端鏈接(對於每一個服務器已經接受的鏈接都有一個)的Channel。圖3-4 說明了這個模型,而且展現了爲什麼須要兩個不一樣的EventLoopGroup。
與ServerChannel 相關聯的EventLoopGroup 將分配一個負責爲 傳入鏈接請求 建立Channel 的EventLoop。一旦鏈接被接受,第二個EventLoopGroup 就會給它的Channel分配一個EventLoop。
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //重寫了channelActive()方法,其將在一個鏈接創建時被調用 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //重寫了channelRead0()方法。每當接收數據時,都會調用這個方法。 //須要注意的是,由服務器發送的消息可能會被分塊接收。 // 也就是說,若是服務器發送了5 字節,那麼不能保證這5 字節會被一次性接收。 //即便是對於這麼少許的數據,channelRead0()方法也可能 // 會被調用兩次,第一次使用一個持有3 字節的ByteBuf(Netty 的字節容器) // 第二次使用一個持有2 字節的ByteBuf。 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); } //發生異常時被調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { //定義EventLoop EventLoopGroup group = new NioEventLoopGroup(); try { //Bootstrap類包提供包含豐富API的幫助類,可以很是方便的實現典型的服務器端和客戶端通道初始化功能。 Bootstrap b = new Bootstrap(); //綁定EventLoop b.group(group) //使用默認的channelFactory建立一個channel .channel(NioSocketChannel.class) //定義遠程地址 .remoteAddress(new InetSocketAddress(host, port)) //綁定自定義的EchoClientHandler到ChannelPipeline上 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoClientHandler()); } }); //同步式的連接 ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 8155).start(); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.util.CharsetUtil; /** * 由於你的Echo 服務器會響應傳入的消息,因此它須要實現ChannelInboundHandler 接口,用 * 來定義響應入站事件的方法。 */ //標示一個ChannelHandler 能夠被多個Channel 安全地共享 @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { //對於每一個傳入的消息都會被調用; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //將消息記錄到控制檯 ByteBuf in = (ByteBuf) msg; System.out.println( "Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } //通知ChannelInboundHandler最後一次對channelRead() //的調用是當前批量讀取中的最後一條消息; @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } //在讀取操做期間,有異常拋出時會調用。 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { //設置端口值(若是端口參數的格式不正確,則拋出一個NumberFormatException) int port = 8155; new EchoServer(port).start(); } public void start() throws Exception { //定義EventLoop EventLoopGroup group = new NioEventLoopGroup(); try { //與Bootstrap類包包含豐富的客戶端API同樣,ServerBootstrap可以很是方便的實現典型的服務端。 ServerBootstrap b = new ServerBootstrap(); b.group(group) //指定所使用的NIO傳輸Channel .channel(NioServerSocketChannel.class) //使用指定的端口設置套接字地址 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler 到子Channel的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); //新建一個future實例,異步地綁定服務器;調用sync()方法阻塞等待直到綁定完成 ChannelFuture f = b.bind().sync(); //獲取Channel 的CloseFuture,而且阻塞當前線程直到它完成 //該應用程序將會阻塞等待直到服務器的Channel關閉(由於你在Channel 的CloseFuture 上調用了sync()方法)。 f.channel().closeFuture().sync(); } finally { //關閉EventLoopGroup,釋放全部的資源 group.shutdownGracefully().sync(); } } }
在本章中,咱們從技術和體系結構這兩個角度探討了理解Netty 的重要性。咱們也更加詳細地從新審視了以前引入的一些概念和組件,特別是ChannelHandler、ChannelPipeline和引導。
特別地,咱們討論了ChannelHandler 類的層次結構,並介紹了編碼器和解碼器,描述了它們在數據和網絡字節格式之間來回轉換的互補功能。下面的許多章節都將致力於深刻研究這些組件,而這裏所呈現的概覽應該有助於你對總體的把控。