Netty3文檔翻譯(一)

簡單找了下發現網上沒有關於Netty3比較完整的源碼解析的文章,因而我就去讀官方文檔,爲了增強記憶,翻譯成了中文,有適當的簡化。bootstrap

原文檔地址:Netty3文檔緩存

Chapter 1 開始

一、開始以前

運行demo的前提有兩個:最新版本的Netty3和JDK1.5以上安全

二、寫一個Discard Server

最簡單的協議就是Discard協議——忽略全部接收到的數據而且不做任何響應。咱們從Netty處理I/O事件的handler實現開始:服務器

public class DiscardServerHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {

        e.getCause().printStackTrace();

        Channel ch = e.getChannel();
        ch.close();
    }
}
  • DiscardServerHandler 繼承SimpleChannelHandler——ChannelHandler的一個實現;
  • messageReceived方法接收MessageEvent類型的參數,它包含接收的客戶端數據;
  • exceptionCaught方法在出現I/O錯誤或者處理事件時拋出錯誤時被調用,一般包含記錄錯誤信息和關閉通道的動做;

接下來寫一個main方法來開啓使用DiscardServerHandler的服務:網絡

public class DiscardServer {
    
    public static void main(String[] args) throws Exception {
        ChannelFactory factory =
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());

        ServerBootstrap bootstrap = new ServerBootstrap(factory);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(new DiscardServerHandler());
            }
        });
        
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress(8080));
    }
}
  • ChannelFactory是建立和管理Channel及其關聯資源的工廠,它負責處理全部I/O請求而且執行I/O生成ChannelEvent。可是它不是本身建立I/O線程,而是從調用構造方法時指定的線程池中獲取線程。服務端應用使用NioServerSocketChannelFactory;
  • ServerBootstrap是一個設置服務端的幫助類;
  • 當服務端接收到一個新的鏈接,指定的ChannelPipelineFactory就會建立一個新的ChannelPipeline,這個新的Pipeline包含一個DiscardServerHandler對象;
  • 你能夠給Channel實現設置具體的參數,選項帶"child."前綴表明應用在接收到的Channel上而不是服務端自己的ServerSocketChannel;
  • 剩下的就是綁定端口啓動服務,能夠綁定多個不一樣的端口。

三、處理接收到的數據

咱們能夠經過"telnet localhost 8080"命令去測試服務,但由於是Discard服務,咱們都不知道服務是否正常工做。因此咱們修改下服務,讓它打印出接收到的數據。數據結構

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
    while(buf.readable()) {
        System.out.println((char) buf.readByte());
        System.out.flush();
    }
}
  • ChannelBuffer是Netty基本的存儲字節的數據結構,跟NIO的ByteBuffer相似,可是更容易使用更靈活。好比Netty容許你在儘可能少的內存複製次數的狀況下把多個ChannelBuffer組合成一個。

四、寫一個Echo服務

一個服務一般對請求是有響應的。接下來咱們嘗試寫一個實現Echo協議——將接收的數據原路返回給客戶端的服務:異步

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    Channel ch = e.getChannel();
    ch.write(e.getMessage());
}
  • MessageEvent繼承了ChannnelEvent,一個ChannnelEvent持有它相關的Channel的引用。咱們能夠獲取這個Channel而後調用寫方法寫入數據返回給客戶端。

五、寫一個時間服務

此次咱們實現一個時間協議——在不須要任何請求數據的狀況下返回一個32位整型數字而且在發送以後關閉鏈接。由於咱們忽略請求數據,只須要在鏈接創建的發送消息,因此此次不能使用messageReceived方法而是重寫channelConnected方法:socket

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    Channel ch = e.getChannel();

    ChannelBuffer time = ChannelBuffers.buffer(4);
    time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

    ChannelFuture f = ch.write(time);

    f.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            Channel ch = future.getChannel();
            ch.close();
        }
    });
}
  • channelConnected方法在鏈接創建的時間被調用,而後咱們寫入一個32位整型數字表明以秒爲單位的當前時間;
  • 咱們使用ChannelBuffers工具類分配了一個容量爲4字節的ChannelBuffer來存放這個32位整型數字;
  • 而後咱們把ChannelBuffer寫入Channel...等一下,flip方法哪裏去了?在NIO中咱們不是要在寫入通道前調用ByteBuffer的flip方法的嗎?ChannelBuffer沒有這個方法,由於它有兩個指針,一個用於讀操做一個用於寫操做。當數據寫入ChannelBuffer時寫索引增長而讀索引不變。讀索引和寫索引相互獨立。對比之下,Netty的ChannelBuffer比NIO的buffer更容易使用。
  • 另外須要注意的一點是ChannelBuffer的write方法返回的是一個ChannelFuture對象。它表示一個還未發生的I/O操做,由於Netty中全部操做都是異步的。因此咱們必須在ChannelFuture收到操做完成的通知以後才能關閉Channel。哦,對了,close方法也是返回ChannelFuture...
  • 那麼問題來了,咱們如何獲得操做完成的通知呢?只須要簡單得向返回的ChannelFuture對象中添加一個ChannelFutureListener,這裏咱們建立了一個ChannelFutureListener的匿名內部類,它在操做完成的時候會關閉Channel。

六、寫一個時間客戶端

咱們還須要一個遵照時間協議,即能把整型數字翻譯成日期的客戶端。Netty服務端和客戶端惟一的區別就是要求不一樣的Bootstrap和ChannelFactory:tcp

public static void main(String[] args) throws Exception {
    String host = args[0];
    int port = Integer.parseInt(args[1]);

    ChannelFactory factory =
            new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());

    ClientBootstrap bootstrap = new ClientBootstrap(factory);

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            return Channels.pipeline(new TimeClientHandler());
        }
    });

    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("keepAlive", true);

    bootstrap.connect(new InetSocketAddress(host, port));
}
  • NioClientSocketChannelFactory,用來建立一個客戶端Channel;
  • ClientBootstrap是ServerBootStrap在客戶端的對應部分;
  • 須要注意的是設置參數時不須要"child."前綴,客戶端SocketChannel沒有父Channel;
  • 對應服務端的bind方法,這裏咱們須要調用connect方法。

另外咱們須要一個ChannelHandler實現,負責把接收到服務端返回的32位整型數字翻譯成日期並打印出來,而後斷開鏈接:ide

public class TimeClientHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();
        long currentTimeMillis = buf.readInt() * 1000L;
        System.out.println(new Date(currentTimeMillis));
        e.getChannel().close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        e.getChannel().close();
    }
}

看上去很簡單是吧?可是實際運行過程當中這個handler有時會拋出一個IndexOutOfBoundsException。下一節咱們會討論爲何會這樣。

七、處理基於流的傳輸

7.一、一個關於Socket Buffer的小警告

在像TCP/IP那樣基於流的傳輸中,接收數據保存在一個socket接收緩存中。可是這個緩存不是一個以包爲單位的隊列,而是一個以字節爲單位的隊列。這就意味着,即便發送兩個獨立的消息,操做系統會把他們視爲一個字節串。所以,不能保證你讀到的和另外一端寫入的同樣。因此,不論是客戶端仍是服務端,對於接收到的數據都須要整理成符合應用程序邏輯的結構。

7.二、第一種解決方式

回到前面的時間客戶端的問題,32位整型數字很小,可是它也是能夠拆分的,特別是當流量上升的時候,被拆分的可能性也隨之上升。
一個簡單的處理方式就是內部建立一個累計的緩存,直到接收滿4個字節才進行處理。

private final ChannelBuffer buf = dynamicBuffer();

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    ChannelBuffer m = (ChannelBuffer) e.getMessage();
    buf.writeBytes(m);
    if (buf.readableBytes() >= 4) {
        long currentTimeMillis = buf.readInt() * 1000L;
        System.out.println(new Date(currentTimeMillis));
        e.getChannel().close();
    }
}
  • ChannelBuffers.dynamicBuffer()返回一個自動擴容的ChannelBuffer;
  • 全部接收的數據都累積到這個動態緩存中;
  • handler須要檢查緩存是否滿4個字節,是的話才能繼續業務邏輯;不然,Netty會在數據繼續到達以後持續調用messageReceive。

7.三、第二種解決方案

第一種方案有不少問題,好比一個複雜的協議,由多個可變長度的域組成,這種狀況下第一種方案的handler就沒法支持了。
你會發現你能夠添加多個ChannelHandler到ChannelPipeline中,利用這個特性,你能夠把一個臃腫的ChannelHandler拆分到多個模塊化的ChannelHandler中,這樣能夠下降應用程序的複雜度。好比,你能夠把TimeClientHandler拆分紅兩個handler:

  • TimeDecoder,負責分段問題;
  • 最初那個簡版的TimeClientHandler.

Netty提供了可擴展的類幫助你實現TimeDecoder:

public class TimeDecoder extends FrameDecoder {

    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {

        if (buffer.readableBytes() < 4) {
            return null;
        }

        return buffer.readBytes(4);
    }
}
  • FrameDecoder是ChannelHandler的一種實現,專門用來處理分段問題;
  • FrameDecoder在每次接收到新的數據時調用decode方法,攜帶一個內部維持的累積緩存;
  • 若是返回null,說明目前數據接收的還不夠,當數據量足夠時FrameDecoder會再次調用方法;
  • 若是返回非null對象,表明解碼成功,FrameDecoder會丟棄累積緩存中剩餘的數據。你無需提供批量解碼,FrameDecoder會繼續調用decode方法直到返回null。

拆分以後,咱們須要修改TimeClient的ChannelPipelineFactory實現:

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
                new TimeDecoder(),
                new TimeClientHandler());
    }
});

Netty還提供了進一步簡化解碼的ReplayingDecoder:

public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
    @Override
    protected Object decode(
            ChannelHandlerContext ctx, Channel channel,
            ChannelBuffer buffer, VoidEnum state) {
        return buffer.readBytes(4);
    }
 }

此外,Netty提供了一批開箱即用的解碼器,讓你能夠簡單得實現大多數協議:

  • org.jboss.netty.example.factorial 用於二進制協議;
  • org.jboss.netty.example.telnet 用於基於行的文本協議.

八、用POJO替代ChannelBuffer

上面的demo咱們都是用ChannelBuffer做爲協議化消息的基本數據結構,這一節咱們用POJO替代ChannelBuffer。將從ChannelBuffer提取信息的代碼跟handler分離開,會使handler變得更加可維護的和可重用的。從上面的demo裏不容易看出這個優點,可是實際應用中分離頗有必要。
首先,咱們定義一個類型UnixTime:

public class UnixTime {

    private final int value;

    public UnixTime(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }

    @Override
    public String toString() {
        return new Date(value * 1000L).toString();
    }
}

如今咱們能夠修改TimeDecoder讓它返回一個UnixTime而不是ChannelBuffer:

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {

    if (buffer.readableBytes() < 4) {
        return null;
    }

    return new UnixTime(buffer.readInt());
}

編碼器改了,那麼相應的TimeClientHandler就不會繼續使用ChannelBuffer:

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    UnixTime m = (UnixTime) e.getMessage();
    System.out.println(m);
    e.getChannel().close();
}

一樣的技術也能夠應用到服務端的TimeServerHandler上:

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000));
    ChannelFuture f = e.getChannel().write(time);
    f.addListener(ChannelFutureListener.CLOSE);
}

能這樣運用的前提是有一個編碼器,能夠把UnixTime對象翻譯成ChannelBuffer:

public class TimeEncoder extends SimpleChannelHandler {

    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
        UnixTime time = (UnixTime) e.getMessage();
        ChannelBuffer buf = buffer(4);
        buf.writeInt(time.getValue());
        Channels.write(ctx, e.getFuture(), buf);
    }
}
  • 一個編碼器重寫writeRequested方法攔截一個寫請求。這裏須要注意的一點是,儘管這裏的writeRequested方法參數裏也有一個MessageEvent對象,客戶端TimeClientHandler的messageReceived的參數裏也有一個,可是它們的解讀是徹底不一樣的。一個ChannelEvent能夠是upstream也能夠是downstream事件,這取決於事件的流向。messageReceived方法裏的MessageEvent是一個upstream事件,而writeRequested方法裏的是downstream事件。
  • 當把POJO類轉化爲ChannelBuffer後,你須要把ChannelBuffer轉發到以前在ChannelPipeline內的ChannelDownstreamHandler,也就是TimeServerHandler。Channels提供了多個幫助方法建立和發送ChanenlEvent。

一樣,TimeEncoder也須要加入到服務端的ChannelPipeline中:

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
                new TimeServerHandler(),
                new TimeEncoder());
    }
});

九、關閉你的應用程序

爲了關閉I/O線程讓應用程序優雅得退出,咱們須要釋放ChannelFactory分配的資源。
一個典型網絡應用程序的關閉過程分爲三步:

  1. 關閉全部服務端socket鏈接;
  2. 關閉全部非服務端socket鏈接(包括客戶端socket和服務端接收到的socket);
  3. 釋放ChannelFactory使用的全部資源。

應用到TimeClient上:

ChannelFuture future = bootstrap.connect(...);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
    future.getCause().printStackTrace();
}
future.getChannel().getCloseFuture().awaitUninterruptibly();
factory.releaseExternalResources();
  • CilentBootStrap的connect方法返回一個ChannelFuture,當鏈接嘗試成功或者失敗時會通知到ChannelFuture。它還持有鏈接嘗試關聯的Channel的引用;
  • ChannelFuture.awaitUninterruptibly()等待ChannelFuture肯定鏈接是否嘗試成功;
  • 若是鏈接失敗,咱們打印出失敗的緣由。ChannelFuture.getCause()會在鏈接即沒有成功也沒有取消的狀況下返回失敗的緣由;
  • 鏈接嘗試的狀況處理以後,咱們還須要等待鏈接關閉。每一個Channel有它本身的closeFuture,用來通知你鏈接關閉而後你能夠針對關閉作一些動做。即便鏈接嘗試失敗了,closeFuture仍然會被通知,由於Channel會在鏈接失敗後自動關閉;
  • 全部鏈接關閉以後,剩下的就是釋放ChannelFactory使用的資源了。釋放過程很簡單,調用它的releaseExternalResources方法,全部相關的NIO Selector和線程池將會自動關閉。

關閉一個客戶端很簡單,那服務端呢?你須要從端口解綁並關閉全部接收到的鏈接。前提是你須要一個保持跟蹤活躍鏈接的數據結構,Netty提供了ChannelGroup。

ChannelGroup是Java集合API一個特殊的的擴展,它表明一組打開的Channel。若是一個Channel被添加到ChannelGroup,而後這個Channel被關閉了,它會從ChannelGroup中自動移除。你能夠對同一ChannelGroup中的Channel作批量操做,好比在關閉服務的時候關閉全部Channel。

要跟蹤打開的socket,你須要修改TimeServerHandler,把新打開的Channel添加到全局的ChannelGroup變量中。ChannelGroup是線程安全的。

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
    TimeServer.allChannels.add(e.getChannel());
}

如今咱們自動維持了一個包含全部活躍Channel的列表,關閉服務端就像關閉客戶端同樣容易了。

public class TimeServer {
    static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");

    public static void main(String[] args) throws Exception {
        ...
        ChannelFactory factory = ...;
        ...
        ServerBootstrap bootstrap = ...;
        ...
        Channel channel = bootstrap.bind(new InetSocketAddress(8080));
        allChannels.add(channel);
        waitForShutdownCommand();
        ChannelGroupFuture future = allChannels.close();
        future.awaitUninterruptibly();
        factory.releaseExternalResources();
    }
}
  • DefaultChannelGroup構造方法接收組名爲參數,組名是它的惟一標識;
  • ServerBootstrap的bind方法返回一個服務端的綁定指定本地地址的Channel,調用Channel的close方法將會使它與本地地址解綁;
  • 全部Channel類型均可以被添加到ChannelGroup中,不論是客戶端、服務端或是服務端接收的。由於你能夠在服務器關閉時同時關閉綁定的Channel和接收到的Channel;
  • waitForShutdownCommand()是一個等待關閉信號的虛構方法。
  • 咱們能夠對ChannelGroup中的Channel進行統一操做,這裏咱們調用close方法,至關於解綁服務端Channel而且異步關閉全部接收到的Channel。close方法返回一個功能和ChannelFuture相近的ChannelGroupFuture,在全部鏈接都成功關閉通知咱們。

十、總結

這一節咱們快速瀏覽了Netty,示範瞭如何用Netty寫一個能正常工做的網絡應用。下一節將介紹Netty的更多細節。

相關文章
相關標籤/搜索