netty 4.x用戶使用指南

引言html

問題
  如今咱們使用通用的應用程序或庫來相互通訊。例如,咱們常用HTTP客戶機從web服務器檢索信息,並經過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像咱們不使用通用HTTP服務器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人遊戲數據)同樣。所須要的是一個高度優化的協議實現,專門用於一個特殊目的。例如,您可能但願實現一個針對基於ajax的聊天應用程序、媒體流或大文件傳輸進行優化的HTTP服務器。您甚至能夠設計和實現一個徹底根據您的須要量身定製的全新協議。另外一個不可避免的狀況是,您必須處理遺留的專有協議,以確保與舊系統的互操做性。在這種狀況下,重要的是在不犧牲結果應用程序的穩定性和性能的狀況下,咱們能以多快的速度實現該協議。
 
解決方案
  Netty項目旨在提供異步事件驅動的網絡應用程序框架和工具,以快速開發可維護的高性能·高可伸縮性協議服務器和客戶端。
   換句話說,Netty是一個NIO客戶端服務器框架,它支持協議服務器和客戶端等網絡應用程序的快速輕鬆開發。它極大地簡化和簡化了TCP和UDP套接字服務器開發等網絡編程。
  「快速和簡單」並不意味着最終的應用程序將遭受可維護性或性能問題的影響。Netty是根據從許多協議(如FTP、SMTP、HTTP以及各類基於二進制和文本的遺留協議)的實現中學到的經驗精心設計的。所以,Netty成功地找到了一種無需妥協就能夠輕鬆實現開發、性能、穩定性和靈活性的方法。
  一些用戶可能已經發現了其餘聲稱具備相同優點的網絡應用程序框架,您可能想知道是什麼使Netty與他們如此不一樣。答案是它所創建的哲學。Netty旨在從一開始就爲您提供最溫馨的API和實現體驗。它不是什麼有形的東西,但你會意識到這種哲學將使你的生活更容易,當你讀這本指南和玩Netty。
 
準備開始
  本章將介紹Netty的核心結構,並經過簡單的示例讓您快速入門。當你在這一章結束的時候,你將可以當即在Netty上編寫一個客戶端和一個服務器。
  若是您更喜歡學習自頂向下的方法,那麼您可能但願從第2章——體系結構概述開始,而後回到這裏。
 
在開始以前
  運行本章示例的最低要求只有兩個;Netty和JDK 1.6或更高版本的最新版本。最新版本的Netty可在項目下載頁面得到。要下載正確版本的JDK,請參考您首選的JDK供應商的網站。
 
編寫Discard服務
  世界上最簡單的協議不是「你好,世界!」,而是 DISCARD。它是一種協議,在沒有任何響應的狀況下丟棄任何接收到的數據。
  要實現拋棄協議,唯一須要作的就是忽略全部接收到的數據。讓咱們直接從處理程序實現開始,它處理Netty生成的I/O事件。
package io.netty.example.discard;

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handles a server-side channel. */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)  @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // Discard the received data silently.丟棄接收到的數據 ((ByteBuf) msg).release(); // (3)  } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close the connection when an exception is raised.  cause.printStackTrace(); ctx.close(); } }
注:一、DiscardServerHandler擴展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一個實現。ChannelInboundHandler提供能夠覆蓋的各類事件處理程序方法。如今,只須要擴展ChannelInboundHandlerAdapter,而不是本身實現處理程序接口就足夠了。
       二、咱們在這裏重寫channelRead()事件處理程序方法。當從客戶機接收到新數據時,將使用接收到的消息調用此方法。在本例中,接收到的消息類型是ByteBuf。
       三、要實現丟棄協議,處理程序必須忽略接收到的消息。ByteBuf是一個引用計數對象,必須經過release()方法顯式地釋放它。請記住,處理程序有責任釋放傳遞給處理程序的任何引用計數對象。一般,channelRead()處理程序方法是這樣實現的:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
  四、異常捕獲()事件處理程序方法在Netty因爲I/O錯誤或處理程序實現因爲在處理事件時拋出異常而引起異常時被一次性調用。在大多數狀況下,應該對捕獲的異常進行日誌記錄,並關閉其關聯的通道,儘管此方法的實現可能因您但願如何處理異常狀況而有所不一樣。例如,您可能但願在關閉鏈接以前發送帶有錯誤代碼的響應消息。
  到目前爲止還不錯。咱們已經實現了廢棄服務器的前一半。如今剩下的是編寫main()方法,該方法使用DiscardServerHandler啓動服務器。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.丟掉全部進來的消息
 */
public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 該對象至關於Socket中使用一個線程專門用戶監聽一個socket端口,而後將監聽到的socket對象傳入另外一對象 EventLoopGroup workerGroup = new NioEventLoopGroup();// 該對象至關於Socket中對於每一個socket鏈接都都單獨開闢了一個線程進行數據解析出處理 try { ServerBootstrap b = new ServerBootstrap(); // (2)  b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4)  @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server.  f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new DiscardServer(port).run(); } }

 注:一、NioEventLoopGroup是一個處理I/O操做的多線程事件循環。Netty爲不一樣類型的傳輸提供了各類EventLoopGroup實現。在本例中,咱們正在實現一個服務器端應用程序,所以將使用兩個NioEventLoopGroup。第一個,一般被稱爲「老闆」,接受進入的鏈接。第二個一般稱爲「worker」,在boss接受鏈接並將接受的鏈接註冊給worker時,它將處理已接受鏈接的流量。使用多少線程以及如何將它們映射到建立的通道取決於EventLoopGroup實現,甚至能夠經過構造函數進行配置。java

        二、ServerBootstrap是一個設置服務器的助手類。您能夠直接使用Channel設置服務器。可是,請注意,這是一個冗長的過程,在大多數狀況下不須要這樣作。
        三、在這裏,咱們指定使用NioServerSocketChannel類,該類用於實例化一個新通道以接受傳入鏈接。
        四、這裏指定的處理程序將老是由新接受的通道進行計算。ChannelInitializer是一個用於幫助用戶配置新通道的特殊處理程序。您極可能但願經過添加一些處理程序(如DiscardServerHandler)來實現網絡應用程序來配置新通道的ChannelPipeline。隨着應用程序變得複雜,您可能會向管道中添加更多的處理程序,並最終將這個匿名類提取到頂級類中。
        五、您還能夠設置特定於通道實現的參數。咱們正在編寫一個TCP/IP服務器,因此咱們能夠設置套接字選項,如tcpNoDelay和keepAlive。請參考ChannelOption的apidocs以及特定的ChannelConfig實現,以得到關於支持的ChannelOptions的概述。
        六、你注意到option()和childOption()嗎?option()用於接受傳入鏈接的NioServerSocketChannel。childOption()是父ServerChannel接受的通道,在本例中是NioServerSocketChannel
        七、咱們準備好了。剩下的就是綁定到端口並啓動服務器。在這裏,咱們綁定到機器中全部NICs(網絡接口卡)的端口8080。如今,您能夠根據須要屢次調用bind()方法(使用不一樣的綁定地址)。
  恭喜你!你剛剛在Netty上完成了你的第一個服務器。
 
查看接收到的數據
  如今咱們已經編寫了第一個服務器,咱們須要測試它是否真正工做。測試它的最簡單方法是使用telnet命令。例如,您能夠在命令行中輸入telnet localhost 8080並輸入一些內容。可是,咱們能夠說服務器工做得很好嗎?咱們沒法真正知道這一點,由於它是一個丟棄服務器。你不會獲得任何迴應。爲了證實它確實有效,讓咱們修改服務器來打印它收到的內容。
咱們已經知道,每當接收到數據時都會調用channelRead()方法。讓咱們將一些代碼放入DiscardServerHandler的channelRead()方法中:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2)  } }
注: 一、這個低效的循環實際上能夠簡化爲:System.out.println(in.toString(io.net .util. charsetutil.us_ascii))
        二、或者,您能夠在這裏執行in.release()。
 
問題編寫ECHO服務
  到目前爲止,咱們一直在使用數據,而沒有作出任何響應。然而,服務器一般應該響應請求。讓咱們學習如何經過實現ECHO協議向客戶端寫入響應消息,其中全部接收到的數據都被髮回。
  與咱們在前幾節中實現的Discards服務器的惟一區別是,它將接收到的數據發送回服務器,而不是將接收到的數據打印到控制檯。所以,再次修改channelRead()方法就足夠了: 
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); // (1) ctx.flush(); // (2) }
注: 一、ChannelHandlerContext對象提供各類操做,使您可以觸發各類I/O事件和操做。在這裏,咱們調用write(Object)以逐字記錄接收到的消息。請注意,咱們沒有釋放接收到的消息,這與咱們在丟棄示例中所作的不一樣。這是由於當它被寫在網上時,Netty會爲你釋放它。
        二、ctx.write(Object) 不會將消息寫到線路上。它在內部進行緩衝,而後經過ctx.flush()將其衝到電線上。或者,爲了簡潔起見,能夠調用ctx.writeAndFlush(msg)。

若是您再次運行telnet命令,您將看到服務器返回您發送給它的任何內容。git

echo服務器的完整源代碼位於發行版的io.net .example.echo包中。github

編寫一個時間服務
  本節中要實現的協議是TIME協議。與前面的示例不一樣的是,它發送一個包含32位整數的消息,而不接收任何請求,而且在消息發送後關閉鏈接。在本例中,您將學習如何構造和發送消息,以及如何在完成時關閉鏈接。由於咱們將忽略任何接收到的數據,可是在鏈接創建後當即發送消息,因此此次咱們不能使用channelRead()方法。相反,咱們應該重寫channelActive()方法。下面是實現:
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4)  } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
注: 一、如前所述,將在創建鏈接並準備生成通訊量時調用channelActive()方法。讓咱們寫一個32位整數,表示這個方法中的當前時間。
        二、要發送新消息,咱們須要分配一個包含消息的新緩衝區。咱們要寫一個32位整數,所以咱們須要一個ByteBuf,它的容量至少是4字節。經過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator,並分配一個新的緩衝區。

        三、像往常同樣,咱們編寫構造好的消息。web

             可是等等,拋硬幣在哪裏?在使用NIO發送消息以前,咱們不是曾經調用java.nio.ByteBuffer.flip()嗎?ByteBuf沒有這樣的方法,由於它有兩個指針;一個用於讀操做,另外一個用於寫操做。當您向ByteBuf寫入內容時,寫入器索引會增長,而讀取器索引不會改變。閱讀器索引和寫入器索引分別表示消息開始和結束的位置。ajax

              相反,NIO緩衝區沒有提供一種乾淨的方法來肯定消息內容在哪裏開始和結束,而不調用flip方法。當您忘記翻轉緩衝區時,您將遇到麻煩,由於不會發送任何或不正確的數據。在Netty中不會發生這樣的錯誤,由於對於不一樣的操做類型,咱們有不一樣的指針。當你習慣了它,你會發現它讓你的生活變得更容易——一個沒有翻轉的生活!
編程

              要注意的另外一點是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示還沒有發生的I/O操做。這意味着,因爲Netty中的全部操做都是異步的,所以可能尚未執行任何請求的操做。例如,如下代碼可能會在發送消息以前關閉鏈接:bootstrap

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
  所以,您須要在ChannelFuture完成以後調用close()方法,這個方法由write()方法返回,當寫操做完成時,它會通知它的偵聽器。請注意,close()也可能不會當即關閉鏈接,它將返回ChannelFuture。
        四、那麼,當寫請求完成時,咱們如何獲得通知?這就像在返回的通道將來中添加一個ChannelFutureListener同樣簡單。在這裏,咱們建立了一個新的匿名通道futurelistener,它在操做完成時關閉通道。
              或者,您可使用預約義的偵聽器簡化代碼:
f.addListener(ChannelFutureListener.CLOSE);

  要測試咱們的時間服務器是否按預期工做,您可使用UNIX rdate命令:api

$ rdate -o <port> -p <host>
  其中<port>是main()方法中指定的端口號,<host>一般是本地主機。
 
編寫一個時間客戶端
  與Discard服務器和ECHO服務器不一樣,咱們須要一個時間協議客戶機,由於人不能將32位二進制數據轉換爲日曆上的日期。在本節中,咱們將討論如何確保服務器正確工做,以及如何使用Netty編寫客戶機。
  在Netty中,服務器和客戶機之間最大也是惟一的區別是使用了不一樣的引導和通道實現。請查看如下代碼:
package io.netty.example.time;

public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed.  f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
注:一、Bootstrap相似於ServerBootstrap,只是它用於非服務器通道,好比客戶端通道或無鏈接通道。
    二、若是隻指定一個EventLoopGroup,它將做爲boss組和工做者組使用。可是,老闆員工並不用於客戶端
    三、與NioServerSocketChannel不一樣,NioSocketChannel用於建立客戶端通道。
    四、注意,這裏咱們不像使用ServerBootstrap那樣使用childOption(),由於客戶端SocketChannel沒有父節點。
    五、咱們應該調用connect()方法,而不是bind()方法。
如您所見,它與服務器端代碼並無什麼不一樣。那麼ChannelHandler實現呢?它應該從服務器接收一個32位的整數,將其轉換爲人類可讀的格式,打印翻譯後的時間,並關閉鏈接:
package io.netty.example.time;

import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
注:一、在TCP/IP中,Netty將從對等點發送的數據讀入ByteBuf。
  它看起來很是簡單,與服務器端示例沒有任何不一樣。然而,這個處理程序有時會拒絕啓動IndexOutOfBoundsException。咱們將在下一節討論爲何會發生這種狀況。
 
處理基於流的傳輸
  套接字緩衝區的一個小警告
  在基於流的傳輸(如TCP/IP)中,接收到的數據存儲在套接字接收緩衝區中。不幸的是,基於流的傳輸的緩衝區不是包的隊列,而是字節的隊列。這意味着,即便您將兩個消息做爲兩個獨立的信息包發送,操做系統也不會將它們視爲兩個消息,而只是一堆字節。所以,不能保證您所閱讀的內容就是您的遠程對等者所寫的內容。例如,假設一個操做系統的
TCP/IP棧接收了三個包:
   因爲基於流的協議的通常特性,在您的應用程序中頗有可能以如下片斷形式閱讀它們:
  所以,不管接收部分是服務器端仍是客戶端,都應該將接收到的數據碎片整理成應用程序邏輯能夠容易理解的一個或多個有意義的幀,應用程序邏輯能夠很容易地理解這些幀。對於上面的例子,接收到的數據應該像下面這樣構造:
第一個解決方案:
  如今讓咱們回到 TIME客戶端示例。咱們在這裏遇到一樣的問題。32位整數是很是少許的數據,而且不太可能常常被分段。然而,問題在於它多是碎片化的,而且隨着流量的增長,碎片化的可能性將增長。
  簡單的解決方案是建立一個內部累積緩衝區,並等待全部4個字節都被接收到內部緩衝區。如下是 TimeClientHandler修復此問題的修改實現:
package io.netty.example.time;

import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1)  } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2)  m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
注:一、ChannelHandler有兩個生命週期監聽器方法:handlerAdded()和handlerRemoved()。您能夠執行任意(de)初始化任務,只要它不會長時間阻塞。
    二、首先,應將全部收到的數據累積到buf。
    三、而後,處理程序必須檢查buf是否有足夠的數據,在此示例中爲4個字節,而後繼續執行實際的業務邏輯。不然,channelRead()當更多數據到達時,Netty將再次調用該方法,最終將累計全部4個字節。
 
第二種解決方案
  雖然第一個解決方案已經解決了 TIME客戶端的問題,但修改後的處理程序看起來並不乾淨。想象一個更復雜的協議,它由多個字段組成,例如可變長度字段。您的 ChannelInboundHandler實施將很快變得沒法維護。
  您可能已經注意到,您能夠 ChannelHandler爲a 添加多個 ChannelPipeline,所以,您能夠將一個單片拆分 ChannelHandler爲多個模塊化,以下降應用程序的複雜性。例如,您能夠拆分 TimeClientHandler爲兩個處理程序:
  • TimeDecoder 它涉及碎片問題,以及
  • 最初的簡單版本TimeClientHandler
  幸運的是,Netty提供了一個可擴展的類,能夠幫助您編寫第一個開箱即用的類:
package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3)  } out.add(in.readBytes(4)); // (4)  } }
注:一、ByteToMessageDecoder是一種實現ChannelInboundHandler,能夠很容易地處理碎片問題。
       二、ByteToMessageDecoderdecode()每當收到新數據時,都會使用內部維護的累積緩衝區調用該方法。
       三、decode()能夠決定不向累積緩衝區中沒有足夠數據的地方添加任何內容。當接收到更多數據時,ByteToMessageDecoder將再次調用decode()。
      四、若是decode()向out添加一個對象,則表示解碼器成功解碼一條消息。ByteToMessageDecoder將丟棄累積緩衝區的讀取部分。請記住,您不須要解碼多個消息。ByteToMessageDecoder將繼續調用decode()方法,直到它沒有向out添加任何內容爲止。
  如今咱們有另外一個處理程序插入到ChannelPipeline中,咱們應該修改TimeClient中的ChannelInitializer實現:
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });

  若是你是一個喜歡冒險的人,你可能想試試ReplayingDecoder,這將解碼器變得更加簡單。不過,您須要參考API參考以得到更多信息。promise

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { out.add(in.readBytes(4)); } }
  另外,Netty提供了開箱即用的解碼器,它使您可以很是容易地實現大多數協議,並幫助您避免最終出現難以維護的單塊處理程序實現。更詳細的例子請參考如下包:   
用POJO代替ByteBuf
  到目前爲止,咱們所審查的全部示例都使用了 ByteBuf做爲協議消息的主要數據結構。在本節中,咱們將改進 TIME協議客戶端和服務器示例以使用POJO而不是 ByteBuf
在你的 ChannelHandler中使用POJO的優點是顯而易見的; 經過分離 ByteBuf從處理程序中提取信息的代碼,您的處理程序變得更易於維護和重用。在 TIME客戶端和服務器示例中,咱們只讀取一個32位整數,這不是 ByteBuf直接使用的主要問題。可是,您會發如今實現真實世界協議時必須進行分離。
  首先,讓咱們定義一個名爲的新類型 UnixTime
package io.netty.example.time;

import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }

  咱們如今能夠修改它TimeDecoder來產生一個UnixTime而不是一個ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readUnsignedInt())); }

  使用更新的解碼器,TimeClientHandler再也不使用ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }

  更簡單,更優雅,對吧?能夠在服務器端應用相同的技術。咱們TimeServerHandler此次更新第一次:

@Override
public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }

  如今,惟一缺乏的部分是一個編碼器,它的實現ChannelOutboundHandler將一個UnixTime轉換爲一個ByteBuf。它比編寫解碼器簡單得多,由於編碼消息時無需處理數據包碎片和彙編。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1)  } }
注:一、這一行中有不少重要的事情。
            首先,咱們按原樣傳遞原始文件ChannelPromise,以便當編碼數據實際寫入線路時,Netty將其標記爲成功或失敗。
            第二,咱們沒有調用ctx.flush()。有一個單獨的處理程序方法void flush(ChannelHandlerContext ctx),用於覆蓋flush()操做。
  爲了進一步簡化,您可使用 MessageToByteEncoder
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
  最後一個任務是在TimeServerHandler以前將一個TimeEncoder插入到服務器端ChannelPipeline中,這只是一個簡單的練習。
 
關閉你的應用程序
  關閉一個Netty應用程序一般與關閉經過shutdowndowns()建立的全部EventLoopGroups同樣簡單。當EventLoopGroup徹底終止而且屬於該組的全部通道都已關閉時通知您,它返回一個 Future
 
概要
  在本章中,咱們快速瀏覽了Netty,並演示瞭如何在Netty上編寫完整的網絡應用程序。
  在接下來的章節中有關於Netty的更多詳細信息。咱們還鼓勵您查看 io.netty.example包中的Netty示例。
  另請注意,社區始終在等待您的問題和想法,以幫助您並根據您的反饋不斷改進Netty及其文檔。
相關文章
相關標籤/搜索