Netty 介紹

本指南對Netty 進行了介紹並指出其意義所在。

1. 問題

如今,咱們使用適合通常用途的應用或組件來和彼此通訊。例如,咱們經常使用一個HTTP客戶端從遠程服務器獲取信息或者經過web services進行遠程方法的調用。
然而,一個適合普通目的的協議或其實現並不具有其規模上的擴展性。例如,咱們沒法使用一個普通的HTTP服務器進行大型文件,電郵信息的交互,或者處理金融信息和多人遊戲數據那種要求準實時消息傳遞的應用場景。所以,這些都要求使用一個適用於特殊目的並通過高度優化的協議實現。例如,你可能想要實現一個對基於AJAX的聊天應用,媒體流或大文件傳輸進行過特殊優化的HTTP服務器。你甚至可能想去設計和實現一個全新的,特定於你的需求的通訊協議。
另外一種沒法避免的場景是你可能不得不使用一種專有的協議和原有系統交互。在這種狀況下,你須要考慮的是如何可以快速的開發出這個協議的實現而且同時尚未犧牲最終應用的性能和穩定性。

2. 方案

Netty 是一個異步的,事件驅動的網絡編程框架和工具,使用Netty 能夠快速開發出可維護的,高性能、高擴展能力的協議服務及其客戶端應用。
也就是說,Netty 是一個基於NIO的客戶,服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty至關簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。
「快速」和「簡單」並不意味着會讓你的最終應用產生維護性或性能上的問題。Netty 是一個吸取了多種協議的實現經驗,這些協議包括FTP,SMPT,HTTP,各類二進制,文本協議,並通過至關精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性。
一些用戶可能找到了某些一樣聲稱具備這些特性的編程框架,所以大家可能想問Netty 又有什麼不同的地方。這個問題的答案是Netty 項目的設計哲學。從創立之初,不管是在API仍是在其實現上Netty 都致力於爲你提供最爲溫馨的使用體驗。雖然這並非顯而易見的,但你終將會認識到這種設計哲學將令你在閱讀本指南和使用Netty 時變得更加得輕鬆和容易。
第一章. 開始

這一章節將圍繞Netty的核心結構展開,同時經過一些簡單的例子可讓你更快的瞭解Netty的使用。當你讀完本章,你將有能力使用Netty完成客戶端和服務端的開發。
若是你更喜歡自上而下式的學習方式,你能夠首先完成 第二章:架構總覽 的學習,而後再回到這裏。
1.1. 開始以前

運行本章示例程序的兩個最低要求是:最新版本的Netty程序以及JDK 1.5或更高版本。最新版本的Netty程序可在項目下載頁 下載。下載正確版本的JDK,請到你偏好的JDK站點下載。
這就已經足夠了嗎?實際上你會發現,這兩個條件已經足夠你完成任何協議的開發了。若是不是這樣,請聯繫Netty項目社區 ,讓咱們知道還缺乏了什麼。
最終但不是至少,當你想了解本章所介紹的類的更多信息時請參考API手冊。爲方便你的使用,這篇文檔中全部的類名均鏈接至在線API手冊。此外,若是本篇文檔中有任何錯誤信息,不管是語法錯誤,仍是打印排版錯誤或者你有更好的建議,請不要顧慮,當即聯繫Netty項目社區 。
1.2. 拋棄協議服務

在這個世界上最簡化的協議不是「Hello,world!」而是拋棄協議 。這是一種丟棄接收到的任何數據並不作任何迴應的協議。
實現拋棄協議(DISCARD protocol),你僅須要忽略接受到的任何數據便可。讓咱們直接從處理器(handler)實現開始,這個處理器處理Netty的全部I/O事件。
 
package org.jboss.netty.example.discard;  
@ChannelPipelineCoverage("all")1 
public class DiscardServerHandler extends SimpleChannelHandler {2 
 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {3 
    }  
 
 @Override 
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {4 
     e.getCause().printStackTrace();  
      
     Channel ch = e.getChannel();  
     ch.close();  
 }  
 
 
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {4 
        e.getCause().printStackTrace();  
         
        Channel ch = e.getChannel();  
        ch.close();  
    }  
     代碼說明
1)ChannelPipelineCoverage註解了一種處理器類型,這個註解標示了一個處理器是否可被多個Channel通道共享(同時關聯着ChannelPipeline)。DiscardServerHandler沒有處理任何有狀態的信息,所以這裏的註解是「all」。
2)DiscardServerHandler繼承了SimpleChannelHandler,這也是一個ChannelHandler 的實現。SimpleChannelHandler提供了多種你能夠重寫的事件處理方法。目前直接繼承SimpleChannelHandler已經足夠了,並不須要你完成一個本身的處理器接口。
3)咱們這裏重寫了messageReceived事件處理方法。這個方法由一個接收了客戶端傳送數據的MessageEvent事件調用。在這個例子中,咱們忽略接收到的任何數據,並以此來實現一個拋棄協議(DISCARD protocol)。
4)exceptionCaught 事件處理方法由一個ExceptionEvent異常事件調用,這個異常事件原由於Netty的I/O異常或一個處理器實現的內部異常。多數狀況下,捕捉到的異常應當被記錄下來,並在這個方法中關閉這個channel通道。固然處理這種異常狀況的方法實現可能因你的實際需求而有所不一樣,例如,在關閉這個鏈接以前你可能會發送一個包含了錯誤碼的響應消息。
 
目前進展不錯,咱們已經完成了拋棄協議服務器的一半開發工做。下面要作的是完成一個能夠啓動這個包含DiscardServerHandler處理器服務的主方法。
 
 
package org.jboss.netty.example.discard;  
 
import java.net.InetSocketAddress;  
import java.util.concurrent.Executors;  
 
public class DiscardServer {  
 
    public static void main(String[] args) throws Exception {  
        ChannelFactory factory =  
            new NioServerSocketChannelFactory (  
                    Executors.newCachedThreadPool(),  
                    Executors.newCachedThreadPool());  
 
        ServerBootstrap bootstrap = new ServerBootstrap (factory);  
 
        DiscardServerHandler handler = new DiscardServerHandler();  
        ChannelPipeline pipeline = bootstrap.getPipeline();  
        pipeline.addLast("handler", handler);  
 
        bootstrap.setOption("child.tcpNoDelay", true);  
        bootstrap.setOption("child.keepAlive", true);  
 
        bootstrap.bind(new InetSocketAddress(8080));  
    }  
     代碼說明

1)ChannelFactory 是一個建立和管理Channel通道及其相關資源的工廠接口,它處理全部的I/O請求併產生相應的I/O ChannelEvent通道事件。Netty 提供了多種 ChannelFactory 實現。這裏咱們須要實現一個服務端的例子,所以咱們使用NioServerSocketChannelFactory實現。另外一件須要注意的事情是這個工廠並本身不負責建立I/O線程。你應當在其構造器中指定該工廠使用的線程池,這樣作的好處是你得到了更高的控制力來管理你的應用環境中使用的線程,例如一個包含了安全管理的應用服務。
2)ServerBootstrap 是一個設置服務的幫助類。你甚至能夠在這個服務中直接設置一個Channel通道。然而請注意,這是一個繁瑣的過程,大多數狀況下並不須要這樣作。
3)這裏,咱們將DiscardServerHandler處理器添加至默認的ChannelPipeline通道。任什麼時候候當服務器接收到一個新的鏈接,一個新的ChannelPipeline管道對象將被建立,而且全部在這裏添加的ChannelHandler對象將被添加至這個新的 ChannelPipeline管道對象。這很像是一種淺拷貝操做(a shallow-copy operation);全部的Channel通道以及其對應的ChannelPipeline實例將分享相同的DiscardServerHandler 實例。
4)你也能夠設置咱們在這裏指定的這個通道實現的配置參數。咱們正在寫的是一個TCP/IP服務,所以咱們運行設定一些socket選項,例如 tcpNoDelay和keepAlive。請注意咱們在配置選項裏添加的"child."前綴。這意味着這個配置項僅適用於咱們接收到的通道實例,而不是ServerSocketChannel實例。所以,你能夠這樣給一個ServerSocketChannel設定參數:
bootstrap.setOption("reuseAddress", true);
5)咱們繼續。剩下要作的是綁定這個服務使用的端口而且啓動這個服務。這裏,咱們綁定本機全部網卡(NICs,network interface cards)上的8080端口。固然,你如今也能夠對應不一樣的綁定地址屢次調用綁定操做。
大功告成!如今你已經完成你的第一個基於Netty的服務端程序。
1.3. 查看接收到的數據

如今你已經完成了你的第一個服務端程序,咱們須要測試它是否能夠真正的工做。最簡單的方法是使用telnet 命令。例如,你能夠在命令行中輸入「telnet localhost 8080 」或其餘類型參數。
然而,咱們能夠認爲服務器在正常工做嗎?因爲這是一個丟球協議服務,因此實際上咱們沒法真正的知道。你最終將收不到任何迴應。爲了證實它在真正的工做,讓咱們修改代碼打印其接收到的數據。
咱們已經知道當完成數據的接收後將產生MessageEvent消息事件,而且也會觸發messageReceived處理方法。因此讓我在DiscardServerHandler處理器的messageReceived方法內增長一些代碼。
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
    ChannelBuffer  buf = (ChannelBuffer) e.getMessage();  
    while(buf.readable()) {  
        System.out.println((char) buf.readByte());  
    }  
     代碼說明
1) 基本上咱們能夠假定在socket的傳輸中消息類型老是ChannelBuffer。ChannelBuffer是Netty的一個基本數據結構,這個數據結構存儲了一個字節序列。ChannelBuffer相似於NIO的ByteBuffer,可是前者卻更加的靈活和易於使用。例如,Netty容許你建立一個由多個ChannelBuffer構建的複合ChannelBuffer類型,這樣就能夠減小沒必要要的內存拷貝次數。
2) 雖然ChannelBuffer有些相似於NIO的ByteBuffer,但強烈建議你參考Netty的API手冊。學會如何正確的使用ChannelBuffer是無障礙使用Netty的關鍵一步。
 
若是你再次運行telnet命令,你將會看到你所接收到的數據。
拋棄協議服務的全部源代碼均存放在在分發版的org.jboss.netty.example.discard包下。

1.4. 響應協議服務

目前,咱們雖然使用了數據,但最終卻未做任何迴應。然而通常狀況下,一個服務都須要迴應一個請求。讓咱們實現ECHO協議 來學習如何完成一個客戶請求的迴應消息,ECHO協議規定要返回任何接收到的數據。
與咱們上一節實現的拋棄協議服務惟一不一樣的地方是,這裏須要返回全部的接收數據而不是僅僅打印在控制檯之上。所以咱們再次修改messageReceived方法就足夠了。
 
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
    Channel  ch = e.getChannel();  
    ch.write(e.getMessage());  
代碼說明
1) 一個ChannelEvent通道事件對象自身存有一個和其關聯的Channel對象引用。這個返回的Channel通道對象表明了這個接收 MessageEvent消息事件的鏈接(connection)。所以,咱們能夠經過調用這個Channel通道對象的write方法向遠程節點寫入返回數據。
如今若是你再次運行telnet命令,你將會看到服務器返回的你所發送的任何數據。
相應服務的全部源代碼存放在分發版的org.jboss.netty.example.echo包下。
1.5. 時間協議服務

這一節須要實現的協議是TIME協議 。這是一個與先前所介紹的不一樣的例子。這個例子裏,服務端返回一個32位的整數消息,咱們不接受請求中包含的任何數據而且當消息返回完畢後當即關閉鏈接。經過這個例子你將學會如何構建和發送消息,以及當完成處理後如何主動關閉鏈接。
由於咱們會忽略接收到的任何數據而只是返回消息,這應當在創建鏈接後就當即開始。所以此次咱們再也不使用messageReceived方法,取而代之的是使用channelConnected方法。下面是具體的實現:
 
 
package org.jboss.netty.example.time;  
 
@ChannelPipelineCoverage("all")  
public class TimeServerHandler extends SimpleChannelHandler {  
 
    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
        Channel ch = e.getChannel();  
         
        ChannelBuffer time = ChannelBuffers.buffer(4);  
        time.writeInt(System.currentTimeMillis() / 1000);  
         
        ChannelFuture f = ch.write(time);  
         
        f.addListener(new ChannelFutureListener() {  
            public void operationComplete(ChannelFuture future) {  
                Channel ch = future.getChannel();  
                ch.close();  
            }  
        });  
    }  
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
        e.getCause().printStackTrace();  
        e.getChannel().close();  
    }  
代碼說明
1) 正如咱們解釋過的,channelConnected方法將在一個鏈接創建後當即觸發。所以讓咱們在這個方法裏完成一個表明當前時間(秒)的32位整數消息的構建工做。
2) 爲了發送一個消息,咱們須要分配一個包含了這個消息的buffer緩衝。由於咱們將要寫入一個32位的整數,所以咱們須要一個4字節的 ChannelBuffer。ChannelBuffers是一個能夠建立buffer緩衝的幫助類。除了這個buffer方法,ChannelBuffers還提供了不少和ChannelBuffer相關的實用方法。更多信息請參考API手冊。
另外,一個很不錯的方法是使用靜態的導入方式:
import static org.jboss.netty.buffer.ChannelBuffers.*;
...
ChannelBuffer dynamicBuf = dynamicBuffer(256);
ChannelBuffer ordinaryBuf = buffer(1024);
3) 像一般同樣,咱們須要本身構造消息。
可是打住,flip在哪?過去咱們在使用NIO發送消息時不是經常須要調用 ByteBuffer.flip()方法嗎?實際上ChannelBuffer之因此不須要這個方法是由於 ChannelBuffer有兩個指針;一個對應讀操做,一個對應寫操做。當你向一個 ChannelBuffer寫入數據的時候寫指針的索引值便會增長,但與此同時讀指針的索引值不會有任何變化。讀寫指針的索引值分別表明了這個消息的開始、結束位置。
與之相應的是,NIO的buffer緩衝沒有爲咱們提供如此簡潔的一種方法,除非你調用它的flip方法。所以,當你忘記調用flip方法而引發發送錯誤時,你便會陷入困境。這樣的錯誤不會再Netty中發生,由於咱們對應不一樣的操做類型有不一樣的指針。你會發現就像你已習慣的這樣過程變得更加容易— 一種沒有flippling的體驗!
另外一點須要注意的是這個寫方法返回了一個ChannelFuture對象。一個ChannelFuture 對象表明了一個還沒有發生的I/O操做。這意味着,任何已請求的操做均可能是沒有被當即執行的,由於在Netty內部全部的操做都是異步的。例如,下面的代碼可能會關閉一 個鏈接,這個操做甚至會發生在消息發送以前:
Channel ch = ...;
ch.write(message);
ch.close();
所以,你須要這個write方法返回的ChannelFuture對象,close方法須要等待寫操做異步完成以後的ChannelFuture通知/監聽觸發。須要注意的是,關閉方法仍舊不是當即關閉一個鏈接,它一樣也是返回了一個ChannelFuture對象。
4) 在寫操做完成以後咱們又如何獲得通知?這個只須要簡單的爲這個返回的ChannelFuture對象增長一個ChannelFutureListener 便可。在這裏咱們建立了一個匿名ChannelFutureListener對象,在這個ChannelFutureListener對象內部咱們處理了異步操做完成以後的關閉操做。
另外,你也能夠經過使用一個預約義的監聽類來簡化代碼。
f.addListener(ChannelFutureListener.CLOSE);

1.6. 時間協議服務客戶端

不一樣於DISCARD和ECHO協議服務,咱們須要一個時間協議服務的客戶端,由於人們沒法直接將一個32位的二進制數據轉換一個日曆時間。在這一節咱們將學習如何確保服務器端工做正常,以及如何使用Netty完成客戶端的開發。
使用Netty開發服務器端和客戶端代碼最大的不一樣是要求使用不一樣的Bootstrap及ChannelFactory。請參照如下的代碼:
 
package org.jboss.netty.example.time;  
 
import java.net.InetSocketAddress;  
import java.util.concurrent.Executors;  
 
public class TimeClient {  
 
    public static void main(String[] args) throws Exception {  
        String host = args[0];  
        int port = Integer.parseInt(args[1]);  
 
        ChannelFactory factory =  
            new NioClientSocketChannelFactory (  
                    Executors.newCachedThreadPool(),  
                    Executors.newCachedThreadPool());  
 
        ClientBootstrap bootstrap = new ClientBootstrap (factory);  
 
        TimeClientHandler handler = new TimeClientHandler();  
        bootstrap.getPipeline().addLast("handler", handler);  
         
        bootstrap.setOption("tcpNoDelay" , true);  
        bootstrap.setOption("keepAlive", true);  
 
        bootstrap.connect (new InetSocketAddress(host, port));  
    }  
 
代碼說明
1) 使用NioClientSocketChannelFactory而不是NioServerSocketChannelFactory來建立客戶端的Channel通道對象。
2) 客戶端的ClientBootstrap對應ServerBootstrap。
3) 請注意,這裏不存在使用「child.」前綴的配置項,客戶端的SocketChannel實例不存在父級Channel對象。
4) 咱們應當調用connect鏈接方法,而不是以前的bind綁定方法。
 
正如你所看到的,這與服務端的啓動過程是徹底不同的。ChannelHandler又該如何實現呢?它應當負責接收一個32位的整數,將其轉換爲可讀的格式後,打印輸出時間,並關閉這個鏈接。
 
 
 
 
 package org.jboss.netty.example.time;  
 
  
 
 import java.util.Date;  
 
  
 
 @ChannelPipelineCoverage("all")  
 
 public class TimeClientHandler extends SimpleChannelHandler {  
 
  
 
     @Override 
 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
 
         ChannelBuffer buf = (ChannelBuffer) e.getMessage();  
 
         long currentTimeMillis = buf.readInt() * 1000L;  
 
         System.out.println(new Date(currentTimeMillis));  
 
         e.getChannel().close();  
 
     }  
 
  
 
     @Override 
 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
 
         e.getCause().printStackTrace();  
 
         e.getChannel().close();  
 
     }  
 
 
 
這看起來非常簡單,與服務端的實現也並未有什麼不一樣。然而,這個處理器卻時常會由於拋出IndexOutOfBoundsException異常而拒絕工做。咱們將在下一節討論這個問題產生的緣由。
1.7. 流數據的傳輸處理
 
1.7.1. Socket Buffer的缺陷

對於例如TCP/IP這種基於流的傳輸協議實現,接收到的數據會被存儲在socket的接受緩衝區內。不幸的是,這種基於流的傳輸緩衝區並非一個包隊列,而是一個字節隊列。這意味着,即便你以兩個數據包的形式發送了兩條消息,操做系統卻不會把它們當作是兩條消息,而僅僅是一個批次的字節序列。所以,在這種狀況下咱們就沒法保證收到的數據剛好就是遠程節點所發送的數據。例如,讓咱們假設一個操做系統的TCP/IP堆棧收到了三個數據包:
 
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
 
因爲這種流傳輸協議的廣泛性質,在你的應用中有較高的可能會把這些數據讀取爲另一種形式:
 
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
 
所以對於數據的接收方,無論是服務端仍是客戶端,應當重構這些接收到的數據,讓其變成一種可以讓你的應用邏輯易於理解的更有意義的數據結構。在上面所述的這個例子中,接收到的數據應當重構爲下面的形式:
 
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
 
1.7.2. 第一種方案

如今讓咱們回到時間協議服務客戶端的例子中。咱們在這裏遇到了一樣的問題。一個32位的整數是一個很是小的數據量,所以它經常不會被切分在不一樣的數據段內。然而,問題是它確實能夠被切分在不一樣的數據段內,而且這種可能性隨着流量的增長而提升。
最簡單的方案是在程序內部建立一個可準確接收4字節數據的累積性緩衝。下面的代碼是修復了這個問題後的TimeClientHandler實現。
 
 
package org.jboss.netty.example.time;  
 
import static org.jboss.netty.buffer.ChannelBuffers.*;  
 
import java.util.Date;  
 
@ChannelPipelineCoverage("one")  
public class TimeClientHandler extends SimpleChannelHandler {  
 
    private final ChannelBuffer buf = dynamicBuffer();  
 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
        ChannelBuffer m = (ChannelBuffer) e.getMessage();  
        buf.writeBytes(m);  
         
        if (buf.readableBytes() >= 4) {  
            long currentTimeMillis = buf.readInt() * 1000L;  
            System.out.println(new Date(currentTimeMillis));  
            e.getChannel().close();  
        }  
    }  
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
        e.getCause().printStackTrace();  
        e.getChannel().close();  
    }  
 
 
     代碼說明 
1) 這一次咱們使用「one」作爲ChannelPipelineCoverage的註解值。這是因爲這個修改後的TimeClientHandler不在不在內部保持一個buffer緩衝,所以這個TimeClientHandler實例不能夠再被多個Channel通道或ChannelPipeline共享。不然這個內部的buffer緩衝將沒法緩衝正確的數據內容。
2) 動態的buffer緩衝也是ChannelBuffer的一種實現,其擁有動態增長緩衝容量的能力。當你沒法預估消息的數據長度時,動態的buffer緩衝是一種頗有用的緩衝結構。
3) 首先,全部的數據將會被累積的緩衝至buf容器。
4) 以後,這個處理器將會檢查是否收到了足夠的數據而後再進行真實的業務邏輯處理,在這個例子中須要接收4字節數據。不然,Netty將重複調用messageReceived方法,直至4字節數據接收完成。
這裏還有另外一個地方須要進行修改。你是否還記得咱們把TimeClientHandler實例添加到了這個ClientBootstrap實例的默認ChannelPipeline管道里?這意味着同一個TimeClientHandler實例將被多個Channel通道共享,所以接受的數據也將受到破壞。爲了給每個Channel通道建立一個新的TimeClientHandler實例,咱們須要實現一個 ChannelPipelineFactory管道工廠:
 
package org.jboss.netty.example.time;  
 
public class TimeClientPipelineFactory implements ChannelPipelineFactory {  
 
    public ChannelPipeline getPipeline() {  
        ChannelPipeline pipeline = Channels.pipeline();  
        pipeline.addLast("handler", new TimeClientHandler());  
        return pipeline;  
    }  
}
如今,咱們須要把TimeClient下面的代碼片斷:
 
TimeClientHandler handler = new TimeClientHandler();  
bootstrap.getPipeline().addLast("handler", handler); 
 
替換爲:
 
bootstrap.setPipelineFactory(new TimeClientPipelineFactory()); 
 
雖然這看上去有些複雜,而且因爲在TimeClient內部咱們只建立了一個鏈接(connection),所以咱們在這裏確實不必引入TimeClientPipelineFactory實例。
然而,當你的應用變得愈來愈複雜,你就總會須要實現本身的ChannelPipelineFactory,這個管道工廠將會令你的管道配置變得更加具備靈活性。
1.7.3. 第二種方案
 
雖然第二種方案解決了時間協議客戶端遇到的問題,可是這個修改後的處理器實現看上去卻再也不那麼簡潔。設想一種更爲複雜的,由多個可變長度字段組成的協議。你的ChannelHandler實現將變得愈來愈難以維護。
正如你已注意到的,你能夠爲一個ChannelPipeline添加多個ChannelHandler,所以,爲了減少應用的複雜性,你能夠把這個臃腫的ChannelHandler切分爲多個獨立的模塊單元。例如,你能夠把TimeClientHandler切分爲兩個獨立的處理器:
 TimeDecoder,解決數據分段的問題。
 TimeClientHandler,原始版本的實現。
幸運的是,Netty提供了一個可擴展的類,這個類能夠直接拿過來使用幫你完成TimeDecoder的開發:
 
package org.jboss.netty.example.time;  
 
 
public class TimeDecoder extends FrameDecoder {  
 
    @Override 
    protected Object decode(  
            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)  {  
             
        if (buffer.readableBytes() < 4) {  
            return null;   
        }  
         
        return buffer.readBytes(4);  
    }  
 
代碼說明
1) 這裏再也不須要使用ChannelPipelineCoverage的註解,由於FrameDecoder老是被註解爲「one」。
2) 當接收到新的數據後,FrameDecoder會調用decode方法,同時傳入一個FrameDecoder內部持有的累積型buffer緩衝。
3) 若是decode返回null值,這意味着尚未接收到足夠的數據。當有足夠數量的數據後FrameDecoder會再次調用decode方法。
4) 若是decode方法返回一個非空值,這意味着decode方法已經成功完成一條信息的解碼。FrameDecoder將丟棄這個內部的累計型緩衝。請注意你不須要對多條消息進行解碼,FrameDecoder將保持對decode方法的調用,直到decode方法返回非空對象。
若是你是一個敢於嘗試的人,你或許應當使用ReplayingDecoder,ReplayingDecoder更加簡化了解碼的過程。爲此你須要查看API手冊得到更多的幫助信息。
 
package org.jboss.netty.example.time;  
 
public class TimeDecoder extends ReplayingDecoder<VoidEnum> {  
 
    @Override 
    protected Object decode(  
            ChannelHandlerContext ctx, Channel channel,  
            ChannelBuffer buffer, VoidEnum state) {  
             
        return buffer.readBytes(4);  
    }  
此外,Netty還爲你提供了一些能夠直接使用的decoder實現,這些decoder實現不只可讓你很是容易的實現大多數協議,而且還會幫你避免某些臃腫、難以維護的處理器實現。請參考下面的代碼包得到更加詳細的實例:
org.jboss.netty.example.factorial for a binary protocol, and
 org.jboss.netty.example.telnet for a text line-based protocol
1.8. 使用POJO代替ChannelBuffer

目前爲止全部的實例程序都是使用ChannelBuffer作爲協議消息的原始數據結構。在這一節,咱們將改進時間協議服務的客戶/服務端實現,使用POJO 而不是ChannelBuffer作爲協議消息的原始數據結構。
在你的ChannelHandler實現中使用POJO的優點是很明顯的;從你的ChannelHandler實現中分離從 ChannelBuffer獲取數據的代碼,將有助於提升你的ChannelHandler實現的可維護性和可重用性。在時間協議服務的客戶/服務端代碼中,直接使用ChannelBuffer讀取一個32位的整數並非一個主要的問題。然而,你會發現,當你試圖實現一個真實的協議的時候,這種代碼上的分離是頗有必要的。
首先,讓咱們定義一個稱之爲UnixTime的新類型。
 
package org.jboss.netty.example.time;  
 
import java.util.Date;  
 
public class UnixTime {  
    private final int value;  
     
    public UnixTime(int value) {  
        this.value = value;  
    }  
     
    public int getValue() {  
        return value;  
    }  
     
    @Override 
    public String toString() {  
        return new Date(value * 1000L).toString();  
    }  
如今讓咱們從新修改TimeDecoder實現,讓其返回一個UnixTime,而不是一個ChannelBuffer。
 
@Override 
protected Object decode(  
        ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {  
    if (buffer.readableBytes() < 4) {  
        return null;  
    }  
 
    return new UnixTime(buffer.readInt());  
}
FrameDecoder和ReplayingDecoder容許你返回一個任何類型的對象。若是它們僅容許返回一個ChannelBuffer類型的對象,咱們將不得不插入另外一個能夠從ChannelBuffer對象轉換 爲UnixTime對象的ChannelHandler實現。

有了這個修改後的decoder實現,這個TimeClientHandler便不會再依賴ChannelBuffer。
 
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
    UnixTime m = (UnixTime) e.getMessage();  
    System.out.println(m);  
    e.getChannel().close();  
}
更加簡單優雅了,不是嗎?一樣的技巧也能夠應用在服務端,讓咱們如今更新TimeServerHandler的實現:
 
@Override 
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
    UnixTime time = new UnixTime(System.currentTimeMillis() / 1000);  
    ChannelFuture f = e.getChannel().write(time);  
    f.addListener(ChannelFutureListener.CLOSE);  
如今剩下的惟一須要修改的部分是這個ChannelHandler實現,這個ChannelHandler實現須要把一個UnixTime對象從新轉換爲一個ChannelBuffer。但這卻已經是至關簡單了,由於當你對消息進行編碼的時候你再也不須要處理數據包的拆分及組裝。
 
package org.jboss.netty.example.time;  
     
import static org.jboss.netty.buffer.ChannelBuffers.*;  
 
@ChannelPipelineCoverage("all")  
public class TimeEncoder extends SimpleChannelHandler {  
 
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent  e) {  
        UnixTime time = (UnixTime) e.getMessage();  
         
        ChannelBuffer buf = buffer(4);  
        buf.writeInt(time.getValue());  
         
        Channels.write(ctx, e.getFuture(), buf);  
    }  
     代碼說明
1) 由於這個encoder是無狀態的,因此其使用的ChannelPipelineCoverage註解值是「all」。實際上,大多數encoder實現都是無狀態的。
2) 一個encoder經過重寫writeRequested方法來實現對寫操做請求的攔截。不過請注意雖然這個writeRequested方法使用了和 messageReceived方法同樣的MessageEvent參數,可是它們卻分別對應了不一樣的解釋。一個ChannelEvent事件能夠既是一個上升流事件(upstream event)也能夠是一個降低流事件(downstream event),這取決於事件流的方向。例如:一個MessageEvent消息事件能夠做爲一個上升流事件(upstream event)被messageReceived方法調用,也能夠做爲一個降低流事件(downstream event)被writeRequested方法調用。請參考API手冊得到上升流事件(upstream event)和降低流事件(downstream event)的更多信息。
3) 一旦完成了POJO和ChannelBuffer轉換,你應當確保把這個新的buffer緩衝轉發至先前的 ChannelDownstreamHandler處理,這個降低通道的處理器由某個ChannelPipeline管理。Channels提供了多個能夠建立和發送ChannelEvent事件的幫助方法。在這個例子中,Channels.write(...)方法建立了一個新的 MessageEvent事件,並把這個事件發送給了先前的處於某個ChannelPipeline內的 ChannelDownstreamHandler處理器。
另外,一個很不錯的方法是使用靜態的方式導入Channels類:
import static org.jboss.netty.channel.Channels.*;
...
ChannelPipeline pipeline = pipeline();
write(ctx, e.getFuture(), buf);
fireChannelDisconnected(ctx);
 
 
最後的任務是把這個TimeEncoder插入服務端的ChannelPipeline,這是一個很簡單的步驟。
1.9. 關閉你的應用

若是你運行了TimeClient,你確定能夠注意到,這個應用並無自動退出而只是在那裏保持着無心義的運行。跟蹤堆棧記錄你能夠發現,這裏有一些運行狀態的I/O線程。爲了關閉這些I/O線程並讓應用優雅的退出,你須要釋放這些由ChannelFactory分配的資源。
一個典型的網絡應用的關閉過程由如下三步組成:
關閉負責接收全部請求的server socket。
關閉全部客戶端socket或服務端爲響應某個請求而建立的socket。
釋放ChannelFactory使用的全部資源。
爲了讓TimeClient執行這三步,你須要在TimeClient.main()方法內關閉惟一的客戶鏈接以及ChannelFactory使用的全部資源,這樣作即可以優雅的關閉這個應用。
 
package org.jboss.netty.example.time;  
 
public class TimeClient {  
    public static void main(String[] args) throws Exception {  
        ...  
        ChannelFactory factory = ...;  
        ClientBootstrap bootstrap = ...;  
        ...  
        ChannelFuture future  = bootstrap.connect(...);  
        future.awaitUninterruptible();  
        if (!future.isSuccess()) {  
            future.getCause().printStackTrace();  
        }  
        future.getChannel().getCloseFuture().awaitUninterruptibly();  
        factory.releaseExternalResources();  
    }  
 
代碼說明
1) ClientBootstrap對象的connect方法返回一個ChannelFuture對象,這個ChannelFuture對象將告知這個鏈接操做的成功或失敗狀態。同時這個ChannelFuture對象也保存了一個表明這個鏈接操做的Channel對象引用。
2) 阻塞式的等待,直到ChannelFuture對象返回這個鏈接操做的成功或失敗狀態。
3) 若是鏈接失敗,咱們將打印鏈接失敗的緣由。若是鏈接操做沒有成功或者被取消,ChannelFuture對象的getCause()方法將返回鏈接失敗的緣由。
4) 如今,鏈接操做結束,咱們須要等待而且一直到這個Channel通道返回的closeFuture關閉這個鏈接。每個Channel均可得到本身的closeFuture對象,所以咱們能夠收到通知並在這個關閉時間點執行某種操做。
而且即便這個鏈接操做失敗,這個closeFuture仍舊會收到通知,由於這個表明鏈接的 Channel對象將會在鏈接操做失敗後自動關閉。
5) 在這個時間點,全部的鏈接已被關閉。剩下的惟一工做是釋放ChannelFactory通道工廠使用的資源。這一步僅須要調用 releaseExternalResources()方法便可。包括NIO Secector和線程池在內的全部資源將被自動的關閉和終止。
 
關閉一個客戶端應用是很簡單的,但又該如何關閉一個服務端應用呢?你須要釋放其綁定的端口並關閉全部接受和打開的鏈接。爲了作到這一點,你須要使用一種數據結構記錄全部的活動鏈接,但這卻並非一件容易的事。幸運的是,這裏有一種解決方案,ChannelGroup。
ChannelGroup是Java 集合 API的一個特有擴展,ChannelGroup內部持有全部打開狀態的Channel通道。若是一個Channel通道對象被加入到 ChannelGroup,若是這個Channel通道被關閉,ChannelGroup將自動移除這個關閉的Channel通道對象。此外,你還能夠對一個ChannelGroup對象內部的全部Channel通道對象執行相同的操做。例如,當你關閉服務端應用時你能夠關閉一個ChannelGroup 內部的全部Channel通道對象。
爲了記錄全部打開的socket,你須要修改你的TimeServerHandler實現,將一個打開的Channel通道加入全局的ChannelGroup對象,TimeServer.allChannels:
 
@Override 
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {  
    TimeServer.allChannels.add(e.getChannel());  
 
代碼說明
是的,ChannelGroup是線程安全的。
 
如今,全部活動的Channel通道將被自動的維護,關閉一個服務端應用有如關閉一個客戶端應用同樣簡單。
 
package org.jboss.netty.example.time;  
 
public class TimeServer {  
 
    static final ChannelGroup allChannels = new DefaultChannelGroup("time-server" );  
 
    public static void main(String[] args) throws Exception {  
        ...  
        ChannelFactory factory = ...;  
        ServerBootstrap bootstrap = ...;  
        ...  
        Channel channel  = bootstrap.bind(...);  
        allChannels.add(channel);  
        waitForShutdownCommand();  
        ChannelGroupFuture future = allChannels.close();  
        future.awaitUninterruptibly();  
        factory.releaseExternalResources();  
    }  
 
代碼說明
1) DefaultChannelGroup須要一個組名做爲其構造器參數。這個組名僅是區分每一個ChannelGroup的一個標示。
2) ServerBootstrap對象的bind方法返回了一個綁定了本地地址的服務端Channel通道對象。調用這個Channel通道的close()方法將釋放這個Channel通道綁定的本地地址。
3) 無論這個Channel對象屬於服務端,客戶端,仍是爲響應某一個請求建立,任何一種類型的Channel對象都會被加入ChannelGroup。所以,你儘可在關閉服務時關閉全部的Channel對象。
4) waitForShutdownCommand()是一個想象中等待關閉信號的方法。你能夠在這裏等待某個客戶端的關閉信號或者JVM的關閉回調命令。
5) 你能夠對ChannelGroup管理的全部Channel對象執行相同的操做。在這個例子裏,咱們將關閉全部的通道,這意味着綁定在服務端特定地址的 Channel通道將解除綁定,全部已創建的鏈接也將異步關閉。爲了得到成功關閉全部鏈接的通知,close()方法將返回一個 ChannelGroupFuture對象,這是一個相似ChannelFuture的對象。
 
1.10. 總述

在這一章節,咱們快速瀏覽並示範瞭如何使用Netty開發網絡應用。下一章節將涉及更多的問題。同時請記住,爲了幫助你以及可以讓Netty基於你的回饋獲得持續的改進和提升,Netty社區 將永遠歡迎你的問題及建議。
第二章. 架構總覽
 
在這個章節,咱們將闡述Netty提供的核心功能以及在此基礎之上如何構建一個完備的網絡應用。
2.1. 豐富的緩衝實現

Netty使用自建的buffer API,而不是使用NIO的ByteBuffer來表明一個連續的字節序列。與ByteBuffer相比這種方式擁有明顯的優點。Netty使用新的 buffer類型ChannelBuffer,ChannelBuffer被設計爲一個可從底層解決ByteBuffer問題,並可知足平常網絡應用開發須要的緩衝類型。這些很酷的特性包括:
 
若是須要,容許使用自定義的緩衝類型。
複合緩衝類型中內置的透明的零拷貝實現。
開箱即用的動態緩衝類型,具備像StringBuffer同樣的動態緩衝能力。
再也不須要調用的flip()方法。
正常狀況下具備比ByteBuffer更快的響應速度。
更多信息請參考:org.jboss.netty.buffer package description
2.2. 統一的異步 I/O API

傳統的Java I/O API在應對不一樣的傳輸協議時須要使用不一樣的類型和方法。例如:java.net.Socket 和 java.net.DatagramSocket它們並不具備相同的超類型,所以,這就須要使用不一樣的調用方式執行socket操做。
這種模式上的不匹配使得在更換一個網絡應用的傳輸協議時變得繁雜和困難。因爲(Java I/O API)缺少協議間的移植性,當你試圖在不修改網絡傳輸層的前提下增長多種協議的支持,這時便會產生問題。而且理論上講,多種應用層協議可運行在多種傳輸層協議之上例如TCP/IP,UDP/IP,SCTP和串口通訊。
讓這種狀況變得更糟的是,Java新的I/O(NIO)API與原有的阻塞式的I/O(OIO)API並不兼容,NIO.2(AIO)也是如此。因爲全部的API不管是在其設計上仍是性能上的特性都與彼此不一樣,在進入開發階段,你經常會被迫的選擇一種你須要的API。
例如,在用戶數較小的時候你可能會選擇使用傳統的OIO(Old I/O) API,畢竟與NIO相比使用OIO將更加容易一些。然而,當你的業務呈指數增加而且服務器須要同時處理成千上萬的客戶鏈接時你便會遇到問題。這種狀況下你可能會嘗試使用NIO,可是複雜的NIO Selector編程接口又會耗費你大量時間並最終會阻礙你的快速開發。
Netty有一個叫作Channel的統一的異步I/O編程接口,這個編程接口抽象了全部點對點的通訊操做。也就是說,若是你的應用是基於 Netty的某一種傳輸實現,那麼一樣的,你的應用也能夠運行在Netty的另外一種傳輸實現上。Netty提供了幾種擁有相同編程接口的基本傳輸實現:
 
NIO-based TCP/IP transport (See org.jboss.netty.channel.socket.nio),
OIO-based TCP/IP transport (See org.jboss.netty.channel.socket.oio),
OIO-based UDP/IP transport, and
Local transport (See org.jboss.netty.channel.local).
切換不一樣的傳輸實現一般只需對代碼進行幾行的修改調整,例如選擇一個不一樣的ChannelFactory實現。
此外,你甚至能夠利用新的傳輸實現沒有寫入的優點,只需替換一些構造器的調用方法便可,例如串口通訊。並且因爲核心API具備高度的可擴展性,你還能夠完成本身的傳輸實現。
2.3. 基於攔截鏈模式的事件模型

一個定義良好並具備擴展能力的事件模型是事件驅動開發的必要條件。Netty具備定義良好的I/O事件模型。因爲嚴格的層次結構區分了不一樣的事件類型,所以Netty也容許你在不破壞現有代碼的狀況下實現本身的事件類型。這是與其餘框架相比另外一個不一樣的地方。不少NIO框架沒有或者僅有有限的事件模型概念;在你試圖添加一個新的事件類型的時候經常須要修改已有的代碼,或者根本就不容許你進行這種擴展。
在一個ChannelPipeline內部一個ChannelEvent被一組ChannelHandler處理。這個管道是攔截過濾器 模式的一種高級形式的實現,所以對於一個事件如何被處理以及管道內部處理器間的交互過程,你都將擁有絕對的控制力。例如,你能夠定義一個從socket讀取到數據後的操做:
 
public class MyReadHandler implements SimpleChannelHandler {  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {  
        Object message = evt.getMessage();  
        // Do something with the received message. 
        ...  
 
        // And forward the event to the next handler. 
        ctx.sendUpstream(evt);  
    }  
同時你也能夠定義一種操做響應其餘處理器的寫操做請求:
 
public class MyWriteHandler implements SimpleChannelHandler {  
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt) {  
        Object message = evt.getMessage();  
        // Do something with the message to be written. 
        ...  
 
        // And forward the event to the next handler. 
        ctx.sendDownstream(evt);  
    }  
有關事件模型的更多信息,請參考API文檔ChannelEvent和ChannelPipeline部分。
2.4. 適用快速開發的高級組件

上述所說起的核心組件已經足夠實現各類類型的網絡應用,除此以外,Netty也提供了一系列的高級組件來加速你的開發過程。
2.4.1. Codec框架

就像「1.8. 使用POJO代替ChannelBuffer」一節所展現的那樣,從業務邏輯代碼中分離協議處理部分老是一個很不錯的想法。然而若是一切從零開始便會遭遇到實現上的複雜性。你不得不處理分段的消息。一些協議是多層的(例如構建在其餘低層協議之上的協議)。一些協議過於複雜以至難以在一臺主機(single state machine)上實現。
所以,一個好的網絡應用框架應該提供一種可擴展,可重用,可單元測試而且是多層的codec框架,爲用戶提供易維護的codec代碼。
Netty提供了一組構建在其核心模塊之上的codec實現,這些簡單的或者高級的codec實現幫你解決了大部分在你進行協議處理開發過程會遇到的問題,不管這些協議是簡單的仍是複雜的,二進制的或是簡單文本的。
2.4.2. SSL / TLS 支持

不一樣於傳統阻塞式的I/O實現,在NIO模式下支持SSL功能是一個艱難的工做。你不能只是簡單的包裝一下流數據並進行加密或解密工做,你不得不借助於javax.net.ssl.SSLEngine,SSLEngine是一個有狀態的實現,其複雜性不亞於SSL自身。你必須管理全部可能的狀態,例如密碼套件,密鑰協商(或從新協商),證書交換以及認證等。此外,與一般指望狀況相反的是SSLEngine甚至不是一個絕對的線程安全實現。
在Netty內部,SslHandler封裝了全部艱難的細節以及使用SSLEngine可能帶來的陷阱。你所作的僅是配置並將該 SslHandler插入到你的ChannelPipeline中。一樣Netty也容許你實現像StartTlS 那樣所擁有的高級特性,這很容易。
2.4.3. HTTP實現

HTTP無疑是互聯網上最受歡迎的協議,而且已經有了一些例如Servlet容器這樣的HTTP實現。所以,爲何Netty還要在其核心模塊之上構建一套HTTP實現?
與現有的HTTP實現相比Netty的HTTP實現是至關不同凡響的。在HTTP消息的低層交互過程當中你將擁有絕對的控制力。這是由於Netty的 HTTP實現只是一些HTTP codec和HTTP消息類的簡單組合,這裏不存在任何限制——例如那種被迫選擇的線程模型。你能夠爲所欲爲的編寫那種能夠徹底按照你指望的工做方式工做的客戶端或服務器端代碼。這包括線程模型,鏈接生命期,快編碼,以及全部HTTP協議容許你作的,全部的一切,你都將擁有絕對的控制力。
因爲這種高度可定製化的特性,你能夠開發一個很是高效的HTTP服務器,例如:
要求持久化連接以及服務器端推送技術的聊天服務(e.g. Comet )
須要保持連接直至整個文件下載完成的媒體流服務(e.g. 2小時長的電影)
須要上傳大文件而且沒有內存壓力的文件服務(e.g. 上傳1GB文件的請求)
支持大規模mash-up應用以及數以萬計鏈接的第三方web services異步處理平臺
2.4.4. Google Protocol Buffer 整合

Google Protocol Buffers 是快速實現一個高效的二進制協議的理想方案。經過使用ProtobufEncoder和ProtobufDecoder,你能夠把Google Protocol Buffers 編譯器 (protoc)生成的消息類放入到Netty的codec實現中。請參考「LocalTime 」實例,這個例子也同時顯示出開發一個由簡單協議定義 的客戶及服務端是多麼的容易。
2.5. 總述
在這一章節,咱們從功能特性的角度回顧了Netty的總體架構。Netty有一個簡單卻不失強大的架構。這個架構由三部分組成——緩衝(buffer),通道(channel),事件模型(event model)——全部的高級特性都構建在這三個核心組件之上。一旦你理解了它們之間的工做原理,你便不難理解在本章簡要說起的更多高級特性。
你可能對Netty的總體架構以及每一部分的工做原理仍舊存有疑問。若是是這樣,最好的方式是告訴咱們 應該如何改進這份指南。
相關文章
相關標籤/搜索