Boot
BossGroup
和WorkerGroup
: 前者負責接收客戶端的鏈接,後者負責網絡的讀寫。類型都是NioEventLoopGroup
NioEventLoopGroup
至關於一個事件循環組, 這個組中含有多個事件循環 ,每個事件循環是 NioEventLoop
NioEventLoop
表示一個不斷循環的執行處理任務的線程, 每一個 NioEventLoop 都有一個 selector
, 用於監聽綁 定在其上的 socket 的網絡通信。
- 每一個 Boss NioEventLoop 循環執行的步驟有 3 步
- 輪詢 accept 事件
- 處理 accept 事件 , 與 client 創建鏈接 , 生成
NioScocketChannel
, 並將其註冊到某個 worker NIOEventLoop
上 的 selector
- 處理任務隊列的任務
- 每一個 Worker NIOEventLoop 循環執行的步驟:
- 輪詢 read, write 事件
- 處理 i/o 事件, 即 read , write 事件,在對應 NioScocketChannel 處理
- 處理任務隊列的任務 , 即 runAllTasks
核心對象
ChannelInitializer
通道初始化接口
ChannelHandler
通道處理程序,Netty默認提供一些開箱即用的的處理器
ChannelPipeline
NioEventLoop
不知道咋描述,能夠視爲Reactor
- execute 提交一個任務到同步任務隊列中,會發生阻塞
- schedule 提交一個定時任務到
scheduledTaskQueue
異步隊列中,
CODE
Server
//建立 BossGroup 和 WorkerGroup
//說明
//1. 建立兩個線程組 bossGroup 和 workerGroup
//2. bossGroup 只是處理鏈接請求 , 真正的和客戶端業務處理,會交給 workerGroup 完成
//3. 兩個都是無限循環
//4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數
// 默認實際 cpu 核數 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//建立服務器端的啓動對象,配置參數
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈式編程來進行設置
bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 做爲服務器的通道實現
.option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列獲得鏈接個數
.childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動鏈接狀態
.childHandler(new ChannelInitializer<socketchannel>() {//建立一個通道測試對象(匿名對象)
//給 pipeline 設置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//咱們自定義一個 Handler 須要繼續 netty 規定好的某個 HandlerAdapter(規範)
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
}); // 給咱們的 workerGroup 的 EventLoop 對應的管道設置處理器
System.out.println(".....服務器 is ready...");
//綁定一個端口而且同步, 生成了一個 ChannelFuture 對象
//啓動服務器(並綁定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//對關閉通道進行監聽
cf.channel().closeFuture().sync();
Client
//客戶端須要一個事件循環組
EventLoopGroup group = new NioEventLoopGroup();
//建立客戶端啓動對象
//注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設置相關參數
bootstrap.group(group) //設置線程組
.channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射)
.handler(new ChannelInitializer<socketchannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//咱們自定義一個 Handler 須要繼續 netty 規定好的某個 HandlerAdapter(規範)
ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); //加入本身的處理器
}
});
System.out.println("客戶端 ok..");
//啓動客戶端去鏈接服務器端
//關於 ChannelFuture 要分析,涉及到 netty 的異步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
ChannelHandler
ChannelInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
// 當channel被註冊到EventLoop時被調用
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// 當channel已經被建立,但還未註冊到EventLoop(或者從EventLoop中註銷)被調用
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// 當channel處於活動狀態(鏈接到遠程節點)被調用
void channelActive(ChannelHandlerContext var1) throws Exception;
// 當channel處於非活動狀態(沒有鏈接到遠程節點)被調用
void channelInactive(ChannelHandlerContext var1) throws Exception;
// 當從channel讀取數據時被調用
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// 當channel的上一個讀操做完成時被調用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
// 當ChannelInboundHandler.fireUserEventTriggered()方法被調用時被調用
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// 當channel的可寫狀態發生改變時被調用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// 當處理過程當中發生異常時被調用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
ChannelOutboundHandler
public interface ChannelOutboundHandler extends ChannelHandler {
// 當請求將Channel綁定到一個地址時被調用
// ChannelPromise是ChannelFuture的一個子接口,定義瞭如setSuccess(),setFailure()等方法
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
// 當請求將Channel鏈接到遠程節點時被調用
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// 當請求將Channel從遠程節點斷開時被調用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當請求關閉Channel時被調用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當請求將Channel從它的EventLoop中註銷時被調用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當請求從Channel讀取數據時被調用
void read(ChannelHandlerContext var1) throws Exception;
// 當請求經過Channel將數據寫到遠程節點時被調用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// 當請求經過Channel將緩衝中的數據沖刷到遠程節點時被調用
void flush(ChannelHandlerContext var1) throws Exception;
}