Netty基礎概念

Boot

  1. BossGroupWorkerGroup: 前者負責接收客戶端的鏈接,後者負責網絡的讀寫。類型都是NioEventLoopGroup
  2. NioEventLoopGroup 至關於一個事件循環組, 這個組中含有多個事件循環 ,每個事件循環是 NioEventLoop
  3. NioEventLoop 表示一個不斷循環的執行處理任務的線程, 每一個 NioEventLoop 都有一個 selector , 用於監聽綁 定在其上的 socket 的網絡通信。
  4. 每一個 Boss NioEventLoop 循環執行的步驟有 3 步
    1. 輪詢 accept 事件
    2. 處理 accept 事件 , 與 client 創建鏈接 , 生成 NioScocketChannel , 並將其註冊到某個 worker NIOEventLoop 上 的 selector
    3. 處理任務隊列的任務
  5. 每一個 Worker NIOEventLoop 循環執行的步驟:
    1. 輪詢 read, write 事件
    2. 處理 i/o 事件, 即 read , write 事件,在對應 NioScocketChannel 處理
    3. 處理任務隊列的任務 , 即 runAllTasks

核心對象

  1. ChannelInitializer 通道初始化接口
  2. ChannelHandler 通道處理程序,Netty默認提供一些開箱即用的的處理器
  3. ChannelPipeline
  4. NioEventLoop 不知道咋描述,能夠視爲Reactor
    1. execute 提交一個任務到同步任務隊列中,會發生阻塞
    2. schedule 提交一個定時任務到scheduledTaskQueue異步隊列中,

CODE

Server

//建立 BossGroup 和 WorkerGroup
        //說明
        //1. 建立兩個線程組 bossGroup 和 workerGroup
        //2. bossGroup 只是處理鏈接請求 , 真正的和客戶端業務處理,會交給 workerGroup 完成
        //3. 兩個都是無限循環
        //4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數
        // 默認實際 cpu 核數 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //建立服務器端的啓動對象,配置參數
        ServerBootstrap bootstrap = new ServerBootstrap();
        //使用鏈式編程來進行設置
        bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 做爲服務器的通道實現
                .option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列獲得鏈接個數
                .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動鏈接狀態
                .childHandler(new ChannelInitializer<socketchannel>() {//建立一個通道測試對象(匿名對象)
                    //給 pipeline 設置處理器
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                    //咱們自定義一個 Handler 須要繼續 netty 規定好的某個 HandlerAdapter(規範)
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
                    }
                }); // 給咱們的 workerGroup 的 EventLoop 對應的管道設置處理器
        System.out.println(".....服務器 is ready...");
        //綁定一個端口而且同步, 生成了一個 ChannelFuture 對象
        //啓動服務器(並綁定端口)
        ChannelFuture cf = bootstrap.bind(6668).sync();
        //對關閉通道進行監聽
        cf.channel().closeFuture().sync();

Client

//客戶端須要一個事件循環組
        EventLoopGroup group = new NioEventLoopGroup();
        //建立客戶端啓動對象
        //注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
        Bootstrap bootstrap = new Bootstrap();
        //設置相關參數
        bootstrap.group(group) //設置線程組
                .channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射)
                .handler(new ChannelInitializer<socketchannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                    //咱們自定義一個 Handler 須要繼續 netty 規定好的某個 HandlerAdapter(規範)
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); //加入本身的處理器
                    }
                });
        System.out.println("客戶端 ok..");
        //啓動客戶端去鏈接服務器端
        //關於 ChannelFuture 要分析,涉及到 netty 的異步模型
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();

ChannelHandler

ChannelInboundHandler

public interface ChannelInboundHandler extends ChannelHandler {
	// 當channel被註冊到EventLoop時被調用
    void channelRegistered(ChannelHandlerContext var1) throws Exception;
	
	// 當channel已經被建立,但還未註冊到EventLoop(或者從EventLoop中註銷)被調用
    void channelUnregistered(ChannelHandlerContext var1) throws Exception;
	
	// 當channel處於活動狀態(鏈接到遠程節點)被調用
    void channelActive(ChannelHandlerContext var1) throws Exception;

	// 當channel處於非活動狀態(沒有鏈接到遠程節點)被調用
    void channelInactive(ChannelHandlerContext var1) throws Exception;
	
	// 當從channel讀取數據時被調用
    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
	
	// 當channel的上一個讀操做完成時被調用
    void channelReadComplete(ChannelHandlerContext var1) throws Exception;
	
	// 當ChannelInboundHandler.fireUserEventTriggered()方法被調用時被調用
    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
	
	// 當channel的可寫狀態發生改變時被調用
    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
	
	// 當處理過程當中發生異常時被調用
    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

ChannelOutboundHandler

public interface ChannelOutboundHandler extends ChannelHandler {

	// 當請求將Channel綁定到一個地址時被調用
	// ChannelPromise是ChannelFuture的一個子接口,定義瞭如setSuccess(),setFailure()等方法
    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
	
	// 當請求將Channel鏈接到遠程節點時被調用
    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
	
	// 當請求將Channel從遠程節點斷開時被調用
    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

	// 當請求關閉Channel時被調用
    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

	// 當請求將Channel從它的EventLoop中註銷時被調用
    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
	
	// 當請求從Channel讀取數據時被調用
    void read(ChannelHandlerContext var1) throws Exception;

	// 當請求經過Channel將數據寫到遠程節點時被調用
    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
	
	// 當請求經過Channel將緩衝中的數據沖刷到遠程節點時被調用
    void flush(ChannelHandlerContext var1) throws Exception;
}
相關文章
相關標籤/搜索