Java網絡編程 -- Netty入門

Netty簡介

Netty是一個高性能,高可擴展性的異步事件驅動的網絡應用程序框架,它極大的簡化了TCP和UDP客戶端和服務器端網絡開發。它是一個NIO框架,對Java NIO進行了良好的封裝。做爲一個異步NIO框架,Netty的全部IO操做都是異步非阻塞的,經過Future-Listener機制,用戶能夠方便的主動獲取或者經過通知機制得到IO操做結果。vue

Netty的特性

  • 統一的API,適用於不一樣的協議
  • 基於靈活、可擴展的事件驅動模型
  • 高度可定製的線程模型
  • 更好的吞吐量,低延遲
  • 更省資源,儘可能減小沒必要要的內存拷貝
  • 完整的SSL/TLS和STARTTLS的支持
  • 能在Applet與Android的限制環境運行良好
  • 再也不因過快、過慢或超負載鏈接致使OutOfMemoryError
  • 再也不有在高速網絡環境下NIO讀寫頻率不一致的問題

Netty核心內容

Netty中最核心的內容主要有如下四個方面:java

  • Reactor線程模型:一種高性能的多線程程序設計思路
  • Netty中本身定義的Channel概念:加強版的通道概念
  • ChannelPipeline職責鏈設計模式:事件處理機制
  • 內存管理:加強的ByteBuf緩衝區

Netty總體結構圖

img

Netty核心組件

EventLoop:EventLoop維護了一個線程和任務隊列,支持異步提交執行任務。EventLoop自身實現了Executor接口,當調用executor方法提交任務時,則判斷是否啓動,未啓動則調用內置的executor建立新線程來觸發run方法執行,其大體流程參考Netty源碼SingleThreadEventExecutor以下:bootstrap

img

EventLoopGroup: EventLoopGroup主要是管理eventLoop的生命週期,能夠將其看做是一個線程池,其內部維護了一組EventLoop,每一個eventLoop對應處理多個Channel,而一個Channel只能對應一個EventLoop設計模式

img

Bootstrap:BootStrap 是客戶端的引導類,主要用於客戶端鏈接遠程主機,有1個EventLoopGroup。Bootstrap 在調用 bind()(鏈接UDP)和 connect()(鏈接TCP)方法時,會新建立一個單獨的、沒有父 Channel 的 Channel 來實現全部的網絡交換。服務器

ServerBootstrap: ServerBootstrap 是服務端的引導類,主要用戶服務端綁定本地端口,有2個EventLoopGroup。ServerBootstarp 在調用 bind() 方法時會建立一個 ServerChannel 來接受來自客戶端的鏈接,而且該 ServerChannel 管理了多個子 Channel 用於同客戶端之間的通訊。markdown

Channel:Netty中的Channel是一個抽象的概念,能夠理解爲對Java NIO Channel的加強和擴展,增長了許多新的屬性和方法,如bing方法等。網絡

ChannelFuture:ChannelFuture可以註冊一個或者多個ChannelFutureListener 實例,當操做完成時,無論成功仍是失敗,均會被通知。ChannelFuture存儲了以後執行的操做的結果而且沒法預測操做什麼時候被執行,提交至Channel的操做按照被喚醒的順序被執行。多線程

ChannelHandler:ChannelHandler用來處理業務邏輯,分別有入站和出站的實現。app

ChannelPipeline: ChannelPipeline 提供了 ChannelHandler鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。框架

Netty線程模型

Netty的線程模型是基於Reactor模式的線程實現。關於Reactor模式能夠參考 Reactor模式 ,Netty中依據用戶的配置能夠支持單線程的Reactor模型,多線程的Reactor模型以及主從多Reactor的模型。在Netty中其大體流程以下以下:

img

Netty入門代碼示例

服務端代碼示例:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.nio.charset.Charset;

public class EchoServer {

  public static void main(String[] args) {
    // accept線程組,用來接受鏈接
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    // I/O線程組, 用於處理業務邏輯
    EventLoopGroup workerGroup = new NioEventLoopGroup(1);

    try {
      // 服務端啓動引導
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup) // 綁定兩個線程組
          .channel(NioServerSocketChannel.class) // 指定通道類型
          .option(ChannelOption.SO_BACKLOG, 100) // 設置TCP鏈接的緩衝區
          .handler(new LoggingHandler(LogLevel.INFO)) // 設置日誌級別
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  ChannelPipeline pipeline = socketChannel.pipeline(); // 獲取處理器鏈
                  pipeline.addLast(new EchoServerHandler()); // 添加新的件處理器
                }
              });

      // 經過bind啓動服務
      ChannelFuture f = b.bind(8080).sync();
      // 阻塞主線程,知道網絡服務被關閉
      f.channel().closeFuture().sync();

    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      workerGroup.shutdownGracefully();
      bossGroup.shutdownGracefully();
    }
  }
}

class EchoServerHandler extends ChannelInboundHandlerAdapter {

  // 每當從客戶端收到新的數據時,這個方法會在收到消息時被調用
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("收到數據:" + ((ByteBuf) msg).toString(Charset.defaultCharset()));
    ctx.write(Unpooled.wrappedBuffer("Server message".getBytes()));
    ctx.fireChannelRead(msg);
  }

  // 數據讀取完後被調用
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  // 當Netty因爲IO錯誤或者處理器在處理事件時拋出的異常時被調用
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

客戶端代碼示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

public class EchoClient {

  public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(group)
          .channel(NioSocketChannel.class)
          .option(ChannelOption.TCP_NODELAY, true)
          .handler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                  ChannelPipeline p = ch.pipeline();
                  p.addLast(new EchoClientHandler());
                }
              });

      ChannelFuture f = b.connect("127.0.0.1", 8080).sync();
      f.channel().closeFuture().sync();

    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      group.shutdownGracefully();
    }
  }
}

class EchoClientHandler extends ChannelInboundHandlerAdapter {

  private final ByteBuf firstMessage;

  public EchoClientHandler() {
    firstMessage = Unpooled.buffer(256);
    for (int i = 0; i < firstMessage.capacity(); i++) {
      firstMessage.writeByte((byte) i);
    }
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    ctx.writeAndFlush(firstMessage);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    System.out.println("收到數據:" + ((ByteBuf) msg).toString(Charset.defaultCharset()));
    ctx.write(Unpooled.wrappedBuffer("Client message".getBytes()));
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) {
    ctx.flush();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
  }
}

 

相關文章
相關標籤/搜索