Netty 系列筆記之開篇

1、引言

❀ 衆所周知html

Netty 是一款基於 NIO 客戶、服務器端的 Java 開源編程框架,提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

❀ 通俗來說java

Netty 一個很是好用的處理 Socket 的 Jar 包,能夠用它來開發服務器和客戶端。編程

2、爲何要學習 Netty

Netty 做爲一個優秀的網絡通訊框架,許多開源項目都使用它來構建通訊層。好比 Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper甚至咱們經常使用的 Spring 等等。bootstrap

更重要的是,Netty 是開發高性能 Java 服務器的必學框架。api

能夠說做爲一個 Java 工程師,要了解 Java 服務器的高階知識,Netty 是一個必需要學習的東西。promise

3、Netty 的特性

一、設計
  • 爲不一樣的傳輸類型(阻塞和非阻塞)提供統一的 API
  • 基於靈活且可擴展的事件模型,可將關注點明確分離
  • 高度可定製的線程模型:單線程、一個或多個線程池
  • 可靠的無鏈接數據 Socket 支持(UDP)
二、易用
  • 完善的 JavaDoc ,用戶指南和樣例
  • 無需額外依賴,JDK 5 (Netty 3.x) 、JDK 6 (Netty 4.x)
三、性能
  • 更高的吞吐量,更低的延遲
  • 更省資源
  • 減小沒必要要的內存拷貝
四、安全
  • 完整的 SSL/TLS 和 STARTTLS 的支持
五、社區
  • 活躍的社區和衆多的開源貢獻者

4、初識 Netty

Talk is cheap, show me the code!
一、丟棄服務器

接下來從代碼中感覺一下 Netty,首先實現一個 discard(丟棄)服務器,即對收到的數據不作任何處理。安全

  • 實現 ChannelInBoundHandlerAdapter 首先咱們從 handler 的實現開始, Netty 使用 handler 來處理 I/O 事件。服務器

    public class DiscardServerHandler extends ChannelInboundHandlerAdapter { 
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
          // 丟棄收到的數據
          ((ByteBuf) msg).release();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
          cause.printStackTrace();
          ctx.close();
        }
    }
    • 1 行,DiscardServerHandler 繼承自 ChannelInboundHandlerAdapter,這個類實現了 ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法。
    • 4 行,當收到新的消息時,就會調用 chanelRead() 方法。
    • 6 行,ByteBuf 是一個引用計數對象,這個對象必須顯式地調用 release() 方法來釋放。處理器的職責是釋放全部傳遞處處理器的引用計數對象,下面是比較常見的 chanelRead() 方法實現:網絡

      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
      }
    • 10 行,exceptionCaught() 方法是在處理事件時發生異常調用的方法。
  • 啓動 Handler 實現 handler 後,咱們須要一個 main() 方法來啓動它。數據結構

    public class DiscardServer {
    
      private int port;
    
      public DiscardServer(int port) {
          this.port = port;
      }
    
      public void run() throws Exception {
          // 接收進來的鏈接
          EventLoopGroup boss = new NioEventLoopGroup();
          // 處理已經接收的鏈接
          EventLoopGroup worker = new NioEventLoopGroup();
          try {
              ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {
                      // 添加自定義的 handler
                      socketChannel.pipeline().addLast(new DiscardServerHandler());
                  }
              }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
              // 綁定端口,開始接收進來的鏈接
              ChannelFuture channelFuture = bootstrap.bind(port).sync();
              // 關閉
              channelFuture.channel().closeFuture().sync();
          } finally {
              boss.shutdownGracefully();
              worker.shutdownGracefully();
          }
      }
    
      public static void main(String[] args) throws Exception {
          int port = 8080;
          new DiscardServer(port).run();
      }
    }
    • 11 行,EventLoopGroup 是用來處理 I/O 操做的多線程事件循環器,Netty 提供了許多不一樣的 EventLoopGroup 的實現用來處理不一樣的傳輸。在本例咱們實現了一個服務端應用,所以須要兩個 EventLoopGroup 。第一個用來接收進來的鏈接,常被稱做 boss ;第二個用來處理已經接收的鏈接,成爲 worker。一旦 boss 接收到一個新進來的鏈接,就會把鏈接的信息註冊到 worker 上面。
    • 15 行,ServerBootstrap 是一個啓動 NIO 服務的輔助啓動類。
    • 16 行,指定 NioServerSocketChannel 用來講明一個新的 Channel 如何接收進來的鏈接。
    • 20 行, ChannelInitializer 用來幫助使用者建立一個新的 channel ,同時可使用 pipline 指定一些特定的處理器。
    • 22 行,經過這兩個方法能夠指定新配置的 channel 的一些參數配置。
  • 查看接收到的數據 如此,一個基於 Netty 的服務端程序就完成了,可是如今啓動起來咱們看不到任何交互,因此咱們稍微修改一下 DiscardServerHandler 類的 channelRead() 方法,能夠查看到客戶端發來的消息。

    @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          ByteBuf byteBuf = (ByteBuf) msg;
          try {
              while (byteBuf.isReadable()) {
                  System.out.print((char) byteBuf.readByte());
                  System.out.flush();
              }
          } finally {
              ReferenceCountUtil.release(msg);
          }
      }
  • 測試 接下來咱們啓動 DiscardServer ,使用 telnet 來測試一下。

    image.png

    控制檯接收到了命令行發來的消息:

    image.png

    • *
二、應答服務器

咱們已經實現了服務器能夠接收客戶端發來的消息,一般服務器會對客戶端發來的請求做出迴應,下面就經過 ECHO 協議來實現對客戶端的消息響應。

ECHO 協議即會把客戶端發來的數據原樣返回,因此也戲稱「乒乓球」協議。

在上述代碼的基礎上面,咱們只需對 DiscardServerHandler 類的 channelRead() 方法稍加修改:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);
        ctx.flush();
}
  • ChannelHandlerContext 對象提供了許多操做,使你可以觸發各類各樣的 I/O 事件和操做。這裏咱們調用了 write(Object) 方法來逐字地把接受到的消息寫入。請注意不一樣於 DISCARD 的例子咱們並無釋放接受到的消息,這是由於當寫入的時候 Netty 已經幫咱們釋放了。
  • ctx.write(Object) 方法不會使消息寫入到通道上,他被緩衝在了內部,你須要調用 ctx.flush() 方法來把緩衝區中數據強行輸出。或者能夠用更簡潔的 cxt.writeAndFlush(msg) 以達到一樣的目的。

再次運行 telnet 命令,就會接受到你發送的信息。


三、時間服務器

接下來咱們基於 TIME 協議,實現構建和發送一個消息,而後在完成時關閉鏈接。和以前的例子不一樣的是在不接受任何請求時會發送一個含 32 位的整數的消息,而且一旦消息發送就會當即關閉鏈接。

TIME 協議能夠提供機器可讀的日期時間信息。

咱們會在鏈接建立時發送時間消息,因此須要覆蓋 channelActive() 方法:

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 分配空間
        final ByteBuf time = ctx.alloc().buffer(4);
        // 獲取 32 位時間戳並寫入
        time.writeInt((int) (System.currentTimeMillis() / 1000L));
        final ChannelFuture future = ctx.writeAndFlush(time);
        // 添加監聽器
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                assert future == channelFuture;
                // 關閉鏈接
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 4 行,channelActive() 方法將會在鏈接被創建而且準備進行通訊時被調用。
  • 6 行,同 Java 的 NIO 相似,爲了構建一個消息,須要爲緩衝區分配空間。由於要發送一個 32 位的時間戳,因此至少 4 字節。
  • 8 行,消息構建完畢後,執行寫入。回想使用 Java NIO 的 Buffer 時,在讀寫操做之間,須要調用 buffer.flip( ) 方法設置指針位置。可是在在 Netty 中不須要這樣操做,緣由是 Netty 提供了兩個指針,一個讀指針和一個寫指針,在讀寫時二者不相互影響。不再用擔憂忘記調用 flip( ) 方法時數據爲空或者數據錯誤啦。
  • 11 行,在第 9 行執行完 ctx.writeAndFlush(time) 後會返回一個 ChannelFuture 對象,表明着尚未發生的一次 I/O 操做。這意味着任何一個請求操做都不會立刻被執行,由於在 Netty 裏全部的操做都是異步的。這樣來看,咱們想完成消息發送後關閉鏈接,直接在後邊調用 ctx.close( ) 可能不能馬上關閉鏈接。返回的 ChannelFuture 對象在操做完成後會通知它的監聽器,繼續執行操做完成後的動做。
    • *
四、時間客戶端

對於時間服務端不能直接用 telnet 的方式測試,由於不能靠人工把一個 32 位的二進制數據翻譯成時間,因此下面將實現一個時間客戶端。

與服務端的實現惟一不一樣的就是使用了不一樣的 Bootstrap 和 Channel 實現:

public class TimeClient {

    private String host;

    private int port;

    public TimeClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            // 啓動
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 等待鏈接關閉
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        TimeClient timeClient = new TimeClient("localhost", 8080);
        timeClient.run();
    }

}
  • 13 行,對比 server 端只指定了一個 EventLoopGroup ,它即會做爲 boss group 也會做爲 worker group,儘管客戶端不須要使用到 boss group。
  • 15 行,Bootstrap 和 ServerBootstrap 相似,Bootstrap 面向於服務端的 channel ,好比客戶端和無鏈接傳輸模式的 channel。

再稍微改動一下 handler :

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 在 TCP/IP 中,Netty 會把讀到的數據放入 ByteBuf 中
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            long time = byteBuf.readUnsignedInt() * 1000L;
            System.out.println(new Date(time));
            ctx.close();
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分別啓動 TimeServer 和 TimeClient ,控制檯打印出了當前時間:

image.png

然而,屢次運行後處理器有時候會由於拋出 IndexOutOfBoundsException 而拒絕工做。帶着這個問題,繼續往下面看。

五、處理基於流的傳輸

比較典型的基於流傳輸的 TCP/IP 協議,也就是說,應用層兩個不一樣的數據包,在 TCP/IP 協議傳輸時,可能會組合或者拆分應用層協議的數據。因爲兩個數據包之間並沒有邊界區分,可能致使消息的讀取錯誤。

不少資料也稱上述這種現象爲 TCP 粘包,而值得注意的是:

一、TCP 協議自己設計就是面向流的,提供可靠傳輸。 二、正由於面向流,對於應用層的數據包而言,沒有邊界區分。這就須要應用層主動處理不一樣數據包之間的組裝。 三、發生粘包現象不是 TCP 的缺陷,只是應用層沒有主動作數據包的處理。

回到上面程序,這也就是上述異常發生的緣由。一個 32 位整型是很是小的數據,它並不見得會被常常拆分到到不一樣的數據段內。然而,問題是它確實可能會被拆分到不一樣的數據段內。

比較常見的兩種解決方案就是基於長度或者基於終結符,繼續以上面的 TIME 協議程序爲基礎,着手解決這個問題。由於只發送一個 32 位的整形時間戳,咱們採用基於數據長度的方式:

❀ 解決方案一

最簡單的方案是構造一個內部的可積累的緩衝,直到4個字節所有接收到了內部緩衝。修改一下 TimeClientHandler 的代碼:

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buf;

    private static final int CAPACITY = 4;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(CAPACITY);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        buf.writeBytes(byteBuf);
        byteBuf.release();
        // 數據大於或等於 4 字節
        if (buf.readableBytes() >= CAPACITY) {
            long time = buf.readUnsignedInt() * 1000L;
            System.out.println(new Date(time));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

其中覆蓋了 handler 生命週期的兩個方法:

  • 8 行,handlerAdded():當檢測到新的鏈接以後,調用ch.pipeline().addLast(new LifeCycleTestHandler())以後的回調,表示當前的channel中已經成功添加了一個邏輯處理器
  • 13 行,handlerRemoved():在鏈接關閉後把這條鏈接上的全部邏輯處理器所有移除掉。
❀ 解決方案二

儘管上述方案已經解決了 TIME 客戶端的問題了,可是在處理器中增長了邏輯,咱們能夠把處理消息的部分抽取出來,成爲一個單獨的處理器,而且能夠增長多個 ChannelHandler 到 ChannelPipline ,每一個處理器各司其職,減小模塊的複雜度。

由此,拆分出一個 TimeDecoder 用於處理消息:

public class TimeDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readBytes(4));
        }
    }
}
  • ByteToMessageDecoder 繼承自 ChannelInboundHandlerAdapter ,每當有新數據接收的時候,ByteToMessageDecoder 都會調用 decode() 方法來處理內部的那個累積緩衝。
  • 若是在 decode() 方法裏增長了一個對象到 out 對象裏,這意味着解碼器解碼消息成功。ByteToMessageDecoder 將會丟棄在累積緩衝裏已經被讀過的數據。

最後,修改 TimeClient 的代碼,將 TimeDecoder 加入 ChannelPipline :

bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

除此以外,Netty還提供了更多開箱即用的解碼器使你能夠更簡單地實現更多的協議,幫助你避免開發一個難以維護的處理器實現,感興趣的小夥伴能夠自行了解。

六、將消息解碼爲自定義對象

上述的例子咱們一直在使用 ByteBuf 做爲協議消息的主要數據結構,可是實際使用中,須要傳輸的消息更加複雜,抽象爲對象來處理更加方便。繼續以 TIME 客戶端和服務器爲基礎,使用自定義的對象代替 ByteBuf 。

  • 定義保存時間的對象 OurTime :

    public class OurTime {
    
      private final long value;
    
      public OurTime() {
          this(System.currentTimeMillis() / 1000L);
      }
    
      public OurTime(long value) {
          this.value = value;
      }
    
      public long value() {
          return value;
      }
    
      @Override
      public String toString() {
          return new Date(value() * 1000L).toString();
      }
    }
  • 修改 TimeDecoder 類,返回 OurTime 類:

    public class TimeDecoder extends ByteToMessageDecoder {
    
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          if (in.readableBytes() >= 4) {
              out.add(new OurTime(in.readUnsignedInt()));
          }
      }
    }
  • 修改後的 TimeClientHandler 類,處理新消息更加簡潔:

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          OurTime ourTime = (OurTime) msg;
          System.out.println(ourTime);
          ctx.close();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          cause.printStackTrace();
          ctx.close();
      }
    }
    • *

而對於服務端來講,大同小異。

修改 TimeServerHandler 的代碼:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
 }

如今,惟一缺乏的功能是一個編碼器,是ChannelOutboundHandler的實現,用來將 OurTime 對象從新轉化爲一個 ByteBuf。這是比編寫一個解碼器簡單得多,由於沒有須要處理的數據包編碼消息時拆分和組裝。

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (OurTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

在這幾行代碼裏還有幾個重要的事情。第一,經過 ChannelPromise,當編碼後的數據被寫到了通道上 Netty 能夠經過這個對象標記是成功仍是失敗。第二, 咱們不須要調用 cxt.flush()。由於處理器已經單獨分離出了一個方法 void flush(ChannelHandlerContext cxt),若是像本身實現 flush() 方法內容能夠自行覆蓋這個方法。

進一步簡化操做,你可使用 MessageToByteEncode:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
            out.writeInt((int)msg.value());
        }
    }

最後在 TimeServerHandler 以前把 TimeEncoder 插入到ChannelPipeline。

5、總結

相信讀完這篇文章的從頭到尾,小夥伴們對使用 Netty 編寫一個客戶端和服務端有了大概的瞭解。後面咱們將繼續探究 Netty 的源碼實現,並結合其涉及的基礎知識進行了解、深刻。

❤ 轉載請註明本文地址或來源,謝謝合做 ❤


BLe36I.png

相關文章
相關標籤/搜索