本文主要講述Netty框架的一些特性以及重要組件,但願看完以後能對Netty框架有一個比較直觀的感覺,但願能幫助讀者快速入門Netty,減小一些彎路。html
官方的介紹:java
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.linux
Netty是 一個異步事件驅動的網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。程序員
從官網上介紹,Netty是一個網絡應用程序框架,開發服務器和客戶端。也就是用於網絡編程的一個框架。既然是網絡編程,Socket就不談了,爲何不用NIO呢?web
對於這個問題,以前我寫了一篇文章《NIO入門》對NIO有比較詳細的介紹,NIO的主要問題是:算法
相對地,Netty的優勢有不少:編程
上面這張圖就是在官網首頁的架構圖,咱們從上到下分析一下。bootstrap
綠色的部分Core核心模塊,包括零拷貝、API庫、可擴展的事件模型。api
橙色部分Protocol Support協議支持,包括Http協議、webSocket、SSL(安全套接字協議)、谷歌Protobuf協議、zlib/gzip壓縮與解壓縮、Large File Transfer大文件傳輸等等。安全
紅色的部分Transport Services傳輸服務,包括Socket、Datagram、Http Tunnel等等。
以上可看出Netty的功能、協議、傳輸方式都比較全,比較強大。
首先搭建一個HelloWord工程,先熟悉一下API,還有爲後面的學習作鋪墊。如下面這張圖爲依據:
使用的版本是4.1.20,相對比較穩定的一個版本。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
複製代碼
public class MyServer {
public static void main(String[] args) throws Exception {
//建立兩個線程組 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//建立服務端的啓動對象,設置參數
ServerBootstrap bootstrap = new ServerBootstrap();
//設置兩個線程組boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//設置服務端通道實現類型
.channel(NioServerSocketChannel.class)
//設置線程隊列獲得鏈接個數
.option(ChannelOption.SO_BACKLOG, 128)
//設置保持活動鏈接狀態
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名內部類的形式初始化通道對象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//給pipeline管道設置處理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//給workerGroup的EventLoop對應的管道設置處理器
System.out.println("java技術愛好者的服務端已經準備就緒...");
//綁定端口號,啓動服務端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//對關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
複製代碼
/** * 自定義的Handler須要繼承Netty規定好的HandlerAdapter * 才能被Netty框架所關聯,有點相似SpringMVC的適配器模式 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//獲取客戶端發送過來的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客戶端" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//發送消息給客戶端
ctx.writeAndFlush(Unpooled.copiedBuffer("服務端已收到消息,並給你發送一個問號?", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//發生異常,關閉通道
ctx.close();
}
}
複製代碼
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//建立bootstrap對象,配置參數
Bootstrap bootstrap = new Bootstrap();
//設置線程組
bootstrap.group(eventExecutors)
//設置客戶端的通道實現類型
.channel(NioSocketChannel.class)
//使用匿名內部類初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客戶端通道的處理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客戶端準備就緒,隨時能夠起飛~");
//鏈接服務端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//對通道關閉進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
//關閉線程組
eventExecutors.shutdownGracefully();
}
}
}
複製代碼
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//發送消息到服務端
ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~馬來西亞~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服務端發送過來的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服務端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
複製代碼
先啓動服務端,再啓動客戶端,就能夠看到結果:
MyServer打印結果:
MyClient打印結果:
若是Handler處理器有一些長時間的業務處理,能夠交給taskQueue異步處理。怎麼用呢,請看代碼演示:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//獲取到線程池eventLoop,添加線程,執行
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
//長時間操做,不至於長時間的業務操做致使Handler阻塞
Thread.sleep(1000);
System.out.println("長時間的業務處理");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
複製代碼
咱們打一個debug調試,是能夠看到添加進去的taskQueue有一個任務。
延時任務隊列和上面介紹的任務隊列很是類似,只是多了一個可延遲必定時間再執行的設置,請看代碼演示:
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//長時間操做,不至於長時間的業務操做致使Handler阻塞
Thread.sleep(1000);
System.out.println("長時間的業務處理");
} catch (Exception e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);//5秒後執行
複製代碼
依然打開debug進行調試查看,咱們能夠有一個scheduleTaskQueue任務待執行中
在搭建HelloWord工程的時候,咱們看到有一行這樣的代碼:
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
複製代碼
不少操做都返回這個ChannelFuture對象,究竟這個ChannelFuture對象是用來作什麼的呢?
ChannelFuture提供操做完成時一種異步通知的方式。通常在Socket編程中,等待響應結果都是同步阻塞的,而Netty則不會形成阻塞,由於ChannelFuture是採起相似觀察者模式的形式進行獲取結果。請看一段代碼演示:
//添加監聽器
channelFuture.addListener(new ChannelFutureListener() {
//使用匿名內部類,ChannelFutureListener接口
//重寫operationComplete方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//判斷是否操做成功
if (future.isSuccess()) {
System.out.println("鏈接成功");
} else {
System.out.println("鏈接失敗");
}
}
});
複製代碼
Bootstrap和ServerBootStrap是Netty提供的一個建立客戶端和服務端啓動器的工廠類,使用這個工廠類很是便利地建立啓動類,根據上面的一些例子,其實也看得出來能大大地減小了開發的難度。首先看一個類圖:
能夠看出都是繼承於AbstractBootStrap抽象類,因此大體上的配置方法都相同。
通常來講,使用Bootstrap建立啓動器的步驟可分爲如下幾步:
在上一篇文章《Reactor模式》中,咱們就講過服務端要使用兩個線程組:
通常建立線程組直接使用如下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
複製代碼
有點好奇的是,既然是線程組,那線程數默認是多少呢?深刻源碼:
//使用一個常量保存
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//NettyRuntime.availableProcessors() * 2,cpu核數的兩倍賦值給常量
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//若是不傳入,則使用常量的值,也就是cpu核數的兩倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
複製代碼
經過源碼能夠看到,默認的線程數是cpu核數的兩倍。假設想自定義線程數,可使用有參構造器:
//設置bossGroup線程數爲1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//設置workerGroup線程數爲16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
複製代碼
這個方法用於設置通道類型,當創建鏈接後,會根據這個設置建立對應的Channel實例。
使用debug模式能夠看到
通道類型有如下:
NioSocketChannel: 異步非阻塞的客戶端 TCP Socket 鏈接。
NioServerSocketChannel: 異步非阻塞的服務器端 TCP Socket 鏈接。
經常使用的就是這兩個通道類型,由於是異步非阻塞的。因此是首選。
OioSocketChannel: 同步阻塞的客戶端 TCP Socket 鏈接。
OioServerSocketChannel: 同步阻塞的服務器端 TCP Socket 鏈接。
稍微在本地調試過,用起來和Nio有一些不一樣,是阻塞的,因此API調用也不同。由於是阻塞的IO,幾乎沒什麼人會選擇使用Oio,因此也很難找到例子。我稍微琢磨了一下,通過幾回報錯以後,總算調通了。代碼以下:
//server端代碼,跟上面幾乎同樣,只需改三個地方
//這個地方使用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只須要設置一個線程組boosGroup
.channel(OioServerSocketChannel.class)//設置服務端通道實現類型
//client端代碼,只需改兩個地方
//使用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道類型設置爲OioSocketChannel
bootstrap.group(eventExecutors)//設置線程組
.channel(OioSocketChannel.class)//設置客戶端的通道實現類型
複製代碼
NioSctpChannel: 異步的客戶端 Sctp(Stream Control Transmission Protocol,流控制傳輸協議)鏈接。
NioSctpServerChannel: 異步的 Sctp 服務器端鏈接。
本地沒啓動成功,網上看了一些網友的評論,說是隻能在linux環境下才能夠啓動。從報錯信息看:SCTP not supported on this platform,不支持這個平臺。由於我電腦是window系統,因此網友說的有點道理。
首先說一下這兩個的區別。
option()設置的是服務端用於接收進來的鏈接,也就是boosGroup線程。
childOption()是提供給父管道接收到的鏈接,也就是workerGroup線程。
搞清楚了以後,咱們看一下經常使用的一些設置有哪些:
SocketChannel參數,也就是childOption()經常使用的參數:
SO_RCVBUF Socket參數,TCP數據接收緩衝區大小。 TCP_NODELAY TCP參數,當即發送數據,默認值爲Ture。 SO_KEEPALIVE Socket參數,鏈接保活,默認值爲False。啓用該功能時,TCP會主動探測空閒鏈接的有效性。
ServerSocketChannel參數,也就是option()經常使用參數:
SO_BACKLOG Socket參數,服務端接受鏈接的隊列長度,若是隊列已滿,客戶端鏈接將被拒絕。默認值,Windows爲200,其餘爲128。
因爲篇幅限制,其餘就不列舉了,你們能夠去網上找資料看看,瞭解一下。
ChannelPipeline是Netty處理請求的責任鏈,ChannelHandler則是具體處理請求的處理器。實際上每個channel都有一個處理器的流水線。
在Bootstrap中childHandler()方法須要初始化通道,實例化一個ChannelInitializer,這時候須要重寫initChannel()初始化通道的方法,裝配流水線就是在這個地方進行。代碼演示以下:
//使用匿名內部類的形式初始化通道對象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//給pipeline管道設置自定義的處理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
複製代碼
處理器Handler主要分爲兩種:
ChannelInboundHandlerAdapter(入站處理器)、ChannelOutboundHandler(出站處理器)
入站指的是數據從底層java NIO Channel到Netty的Channel。
出站指的是經過Netty的Channel來操做底層的java NIO Channel。
ChannelInboundHandlerAdapter處理器經常使用的事件有:
註冊事件 fireChannelRegistered。
鏈接創建事件 fireChannelActive。
讀事件和讀完成事件 fireChannelRead、fireChannelReadComplete。
異常通知事件 fireExceptionCaught。
用戶自定義事件 fireUserEventTriggered。
Channel 可寫狀態變化事件 fireChannelWritabilityChanged。
鏈接關閉事件 fireChannelInactive。
ChannelOutboundHandler處理器經常使用的事件有:
端口綁定 bind。
鏈接服務端 connect。
寫事件 write。
刷新時間 flush。
讀事件 read。
主動斷開鏈接 disconnect。
關閉 channel 事件 close。
還有一個相似的handler(),主要用於裝配parent通道,也就是bossGroup線程。通常狀況下,都用不上這個方法。
提供用於服務端或者客戶端綁定服務器地址和端口號,默認是異步啓動。若是加上sync()方法則是同步。
有五個同名的重載方法,做用都是用於綁定地址端口號。不一一介紹了。
//釋放掉全部的資源,包括建立的線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
複製代碼
會關閉全部的child Channel。關閉以後,釋放掉底層的資源。
Channel是什麼?不妨看一下官方文檔的說明:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
翻譯大意:一種鏈接到網絡套接字或能進行讀、寫、鏈接和綁定等I/O操做的組件。
若是上面這段說明比較抽象,下面還有一段說明:
A channel provides a user:
the current state of the channel (e.g. is it open? is it connected?), the configuration parameters of the channel (e.g. receive buffer size), the I/O operations that the channel supports (e.g. read, write, connect, and bind), and the ChannelPipeline which handles all I/O events and requests associated with the channel.
翻譯大意:
channel爲用戶提供:
通道當前的狀態(例如它是打開?仍是已鏈接?)
channel的配置參數(例如接收緩衝區的大小)
channel支持的IO操做(例如讀、寫、鏈接和綁定),以及處理與channel相關聯的全部IO事件和請求的ChannelPipeline。
boolean isOpen(); //若是通道打開,則返回true
boolean isRegistered();//若是通道註冊到EventLoop,則返回true
boolean isActive();//若是通道處於活動狀態而且已鏈接,則返回true
boolean isWritable();//當且僅當I/O線程將當即執行請求的寫入操做時,返回true。
複製代碼
以上就是獲取channel的四種狀態的方法。
獲取單條配置信息,使用getOption(),代碼演示:
ChannelConfig config = channel.config();//獲取配置參數
//獲取ChannelOption.SO_BACKLOG參數,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//由於我啓動器配置的是128,因此我這裏獲取的soBackLogConfig=128
複製代碼
獲取多條配置信息,使用getOptions(),代碼演示:
ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
/** SO_REUSEADDR : false WRITE_BUFFER_LOW_WATER_MARK : 32768 WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536) SO_BACKLOG : 128 如下省略... */
複製代碼
寫操做,這裏演示從服務端寫消息發送到客戶端:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("這波啊,這波是肉蛋蔥雞~", CharsetUtil.UTF_8));
}
複製代碼
客戶端控制檯:
//收到服務端/127.0.0.1:6666的消息:這波啊,這波是肉蛋蔥雞~
複製代碼
鏈接操做,代碼演示:
ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//通常使用啓動器,這種方式不經常使用
複製代碼
經過channel獲取ChannelPipeline,並作相關的處理:
//獲取ChannelPipeline對象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中添加ChannelHandler處理器,裝配流水線
pipeline.addLast(new MyServerHandler());
複製代碼
在NioEventLoop中,有一個成員變量selector,這是nio包的Selector,在以前《NIO入門》中,我已經講過Selector了。
Netty中的Selector也和NIO的Selector是同樣的,就是用於監聽事件,管理註冊到Selector中的channel,實現多路複用器。
在前面介紹Channel時,咱們知道能夠在channel中裝配ChannelHandler流水線處理器,那一個channel不可能只有一個channelHandler處理器,確定是有不少的,既然是不少channelHandler在一個流水線工做,確定是有順序的。
因而pipeline就出現了,pipeline至關於處理器的容器。初始化channel時,把channelHandler按順序裝在pipeline中,就能夠實現按序執行channelHandler了。
在一個Channel中,只有一個ChannelPipeline。該pipeline在Channel被建立的時候建立。ChannelPipeline包含了一個ChannelHander造成的列表,且全部ChannelHandler都會註冊到ChannelPipeline中。
在Netty中,Handler處理器是有咱們定義的,上面講過經過集成入站處理器或者出站處理器實現。這時若是咱們想在Handler中獲取pipeline對象,或者channel對象,怎麼獲取呢。
因而Netty設計了這個ChannelHandlerContext上下文對象,就能夠拿到channel、pipeline等對象,就能夠進行讀寫等操做。
經過類圖,ChannelHandlerContext是一個接口,下面有三個實現類。
實際上ChannelHandlerContext在pipeline中是一個鏈表的形式。看一段源碼就明白了:
//ChannelPipeline實現類DefaultChannelPipeline的構造器方法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//設置頭結點head,尾結點tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
複製代碼
下面我用一張圖來表示,會更加清晰一點:
咱們先看一下EventLoopGroup的類圖:
其中包括了經常使用的實現類NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有使用過。
從Netty的架構圖中,能夠知道服務器是須要兩個線程組進行配合工做的,而這個線程組的接口就是EventLoopGroup。
每一個EventLoopGroup裏包括一個或多個EventLoop,每一個EventLoop中維護一個Selector實例。
咱們不妨看一段DefaultEventExecutorChooserFactory的源碼:
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
@Override
public EventExecutor next() {
//idx.getAndIncrement()至關於idx++,而後對任務長度取模
return executors[idx.getAndIncrement() & executors.length - 1];
}
複製代碼
這段代碼能夠肯定執行的方式是輪詢機制,接下來debug調試一下:
它這裏還有一個判斷,若是線程數不是2的N次方,則採用取模算法實現。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
複製代碼
參考Netty官網文檔:API文檔
創做不易,以爲有用就點個贊吧。
我不要下次必定,但願此次必定素質三連,感謝!
想第一時間看到我更新的文章,能夠微信搜索公衆號「java技術愛好者
」,拒絕作一條鹹魚,我是一個努力讓你們記住的程序員。咱們下期再見!!!
能力有限,若是有什麼錯誤或者不當之處,請你們批評指正,一塊兒學習交流!