[網絡通訊] Netty 入門實戰
簡介html
什麼是 Netty?讓咱們帶着問題來跟着官網的 Demo 教程先入個門。java
依賴git
實戰web
丟棄服務器面試
響應服務器shell
時間服務器編程
流數據傳輸bootstrap
對象序列化傳輸windows
關閉promise
小結
REFERENCES
Netty 是異步事件驅動的Java開源網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。
-
Netty 項目旨在爲可維護的高性能和高可伸縮性協議服務器和客戶端的快速開發提供一個異步事件驅動的網絡應用框架和工具。 -
Netty 是一個 NIO 客戶機服務器框架,能夠快速簡單地開發網絡應用程序,如協議服務器和客戶機。它極大地簡化了網絡編程,如 TCP 和 UDP 套接字服務器的開發。 -
「快速和簡單」並不意味着產生的應用程序會受到可維護性或性能問題的影響。Netty 是根據實現許多協議(如 FTP、 SMTP、 HTTP 以及各類二進制和基於文本的遺留協議)的經驗而精心設計的。所以,Netty 成功地找到了一種方法來實現簡單的開發、性能、穩定性和靈活性。 -
一些用戶可能已經發現了其餘聲稱具備一樣優點的網絡應用程序框架,您可能想要問是什麼使 Netty 與他們如此不一樣。答案就是它所創建的哲學。Netty 的目的是從第一天開始就在 API 和實現方面爲您提供最溫馨的體驗。它不是什麼實實在在的東西,可是當你閱讀本指南和玩 Netty 的時候,你會意識到這種哲學會讓你的生活變得更加輕鬆。
依賴
dependencies {
implementation "io.netty:netty-all:4.1.56.Final"
}
實戰
世界上最簡單的協議實現不是發送
Hello World
消息,被服務器接受到返回相應的響應結果。而是服務器接收到消息後直接丟棄,不作任何響應。
丟棄服務器
要實現
DISCARD
協議,您須要作的惟一一件事就是忽略全部接收到的數據。讓咱們直接從處理程序實現開始,它處理 Netty 生成的I/O
事件。
// [1]
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
// [2]
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
// 不處理消息,直接釋放
// [3]
((ByteBuf) msg).release();
}
// [4]
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
-
ChannelInboundHandlerAdapter
實現了接口ChannelInboundHandler
。充當適配器的角色提供了各類能夠重寫的事件處理程序方法,經過適配器的標準實現方式,能夠避免咱們本身實現處理程序接口。 -
咱們能夠覆蓋 channelRead()
的事件處理器方法。只要從客戶機接收到新數據,就會使用接收到的消息調用此方法。 -
爲了實現 DISCARD
協議,處理程序必須忽略接收到的消息。ByteBuf
是一個引用計數的對象,必須經過release()
方法來進行釋放。一般的事項方式是這樣的
// [2]
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
// 不處理消息,直接釋放
// [3]
//((ByteBuf) msg).release();
try {
// 針對消息 msg 進行處理
} finally {
// 釋放引用
ReferenceCountUtil.release(msg);
}
}
-
exceptionCaught()
做爲異常處理,當 Netty 因爲I/O
錯誤或處理程序實現因爲處理事件時拋出的異常而引起異常時,使用 Throwable 調用事件處理程序方法。在大多數狀況下,被捕獲的異常應該被記錄,其相關的通道應該在這裏關閉,儘管這個方法的實現能夠根據您想要處理的異常狀況而有所不一樣。例如,您可能但願在關閉鏈接以前發送帶有錯誤代碼的響應消息。
啓動服務器
public class DiscardServer {
/**
* 端口
*/
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
// [1]
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// [2]
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
// [3]
.channel(NioServerSocketChannel.class)
// [4]
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
// [5]
.option(ChannelOption.SO_BACKLOG, 128)
// [6]
.childOption(ChannelOption.SO_KEEPALIVE, true);
// [7]
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
-
NioEventLoopGroup
是一個處理I/O
操做的多線程事件循環的處理器定義。例子中定義了2個處理器: -
第一個一般被稱爲「 boss」 ,接受一個傳入的鏈接。
-
第二個一般被稱爲「工人」 ,一旦老闆接受了鏈接並註冊了與工人接受的鏈接,就處理接受鏈接的通訊。
-
ServerBootstrap
是服務器構造的輔助類,通常不推薦此方式進行服務器的建立。 -
此處指定
NioServerSocketChannel
類,用於實例化一個新的Channel
來接受傳入的鏈接。 -
此處指定的處理程序將始終由新接受的
Channel
,ChannelInitializer
做爲特殊的處理程序,用於幫助用戶配置新的Channel
。每每適用於爲新的Channel
添加一些處理程序來實現更爲複雜的應用程序。 -
option
參數設置,支持設置特定的套接字選項。來知足特定的協議需求,如.option(ChannelOption.TCP_NODELAY, true)
來編寫TCP/IP 服務協議。 -
childOption
與option
不一樣之處在於: -
option
適用於NioServerSocketChannel
來接受傳入的鏈接。 -
childOption
適用於被父級的ServerChannel
接受的Channels
。 -
綁定到指定端口。
模擬通訊
-
調整代碼,打印接受的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
// 針對消息 msg 進行處理
while (in.isReadable()) { // [4]
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
}
-
windows 環境下使用 powershell 輸入命令 telnet localhost 8080
,進行通訊。 -
powershell 終端輸入的字符會同步在控制檯打印出來。
響應服務器
目前爲止,咱們只接受可是沒有任何響應。一臺服務器,一般應該響應該請求。讓咱們學習如何經過實現
ECHO
協議向客戶端寫入響應消息,其中任何接收到的數據都被髮送回來。與前面部分實現的丟棄服務器的惟一區別在於它將接收到的數據發回,而不是將接收的數據輸出到控制檯。所以,再次修改
channelRead()
方法是足夠的:參考地址:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
public class EchoServer {
/**
* 端口
*/
private int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new EchoServer(port).run();
}
}
經過終端輸入telnet localhost 8080
後輸入英文字符會獲得響應,原字符返回。如依次輸入abc
,終端打印結果:
aabbcc
-
ChannelHandlerConetxt 提供了不少方法讓你去觸發 IO 事件或操做。這裏咱們調用 write(object)來逐字的寫入接受到的消息。注意,咱們不像 DISCARD 例子裏的那樣,咱們沒有釋放咱們收到的消息。這是由於當它被寫回到 wire 時,Netty 替咱們釋放它。 -
ctx.write(Object) 不會讓消息發送,它存在於內部緩衝區,經過調用 ctx.flush() 來把消息發送出去,或者,您能夠簡潔的調用 ctx.writeAndFlush(msg)。
時間服務器
接下來要實現的協議是 TIME 協議。它不一樣於前面的示例,由於它發送包含32位整數的消息,而不接收任何請求,並在消息發送後關閉鏈接。在本例中,您將學習如何構造和發送消息,以及如何在完成時關閉鏈接。
由於咱們將忽略任何接收到的數據,可是一旦創建鏈接就發送消息,因此此次不能使用 channelRead() 方法。相反,咱們應該重寫 channelActive()方法。代碼以下:
服務端
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
final ByteBuf time = ctx.alloc().buffer(4); // [2]
time.writeInt(89); //ASCII 10進制,對應 Y
time.writeInt(105); //ASCII 10進制,對應 i
final ChannelFuture f = ctx.writeAndFlush(time); // [3]
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // [4]
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
-
如前所述,當創建鏈接並準備生成通訊量時,將調用 channelActive ()方法。讓咱們編寫一個32位的整數,它表示此方法中的當前時間。
-
要發送一個新消息,咱們須要分配一個新的緩衝區,其中將包含消息。咱們要寫一個32位的整數,所以咱們須要一個容量至少爲4字節的 ByteBuf。經過 ChannelHandlerContext.alloc ()獲取當前 ByteBufAllocator 並分配一個新緩衝區。
-
像往常同樣,咱們寫入一條構造好的消息。可是,等等,哪裏冒險了?咱們之前不是叫 java.nio 嗎。在 NIO 中發送消息以前使用 ByteBuffer.flip () ?ByteBuf 沒有這樣的方法,由於它有兩個指針: 一個用於讀操做,另外一個用於寫操做。當您將某些內容寫入 ByteBuf 而讀取器索引不變時,寫入器索引會增長。讀者索引和寫者索引分別表示消息的開始和結束位置。
-
相比之下,NIO 緩衝區並不提供一種清晰的方法來肯定消息內容的開始和結束位置而不調用 flip 方法。當您忘記翻轉緩衝區時,您將遇到麻煩,由於不會發送任何內容或錯誤的數據。這種錯誤在 Netty 不會發生,由於咱們對不一樣的操做類型有不一樣的指針。當你習慣了它,你會發現它會讓你的生活變得更加輕鬆。
-
另外一點須要注意的是 ChannelHandlerContext.write () 和 writeAndFlush () 方法返回 ChannelFuture。ChannelFuture 表示還沒有發生的
I/O
操做。這意味着,任何請求的操做可能還沒有執行,由於全部操做在 Netty 都是異步的。例如,下面的代碼可能會在發送消息以前關閉鏈接:Channel ch = ...;
ch.writeAndFlush(message);
ch.close(); -
所以,您須要在 ChannelFuture 完成以後調用 close ()方法,該方法由 write ()方法返回,並在完成寫操做後通知其偵聽器。請注意,close () 也可能不會當即關閉鏈接,而是返回一個 ChannelFuture。
-
那麼,當寫請求完成時,咱們如何獲得通知呢?這很簡單,能夠添加一個
ChannelFutureListener
來監聽返回的結果ChannelFuture
。在這裏,咱們建立了一個新的匿名通道ChannelFutureListener
,當操做完成時它會關閉通道。
-
或者,您可使用預約義的偵聽器簡化代碼:
f.addListener(ChannelFutureListener.CLOSE);
-
要測試咱們的時間服務器是否正常工做,可使用 telnet localhost 8080
命令,終端在鏈接上後,打印消息後直接失去鏈接:
Yi
遺失對主機的鏈接。
客戶端
與 DISCARD 和 ECHO 服務器不一樣,咱們須要 TIME 協議的客戶端,由於人不能將32位二進制數據轉換爲日曆上的日期。在本節中,咱們將討論如何確保服務器正常工做,並學習如何使用 Netty 編寫客戶機。
-
調整服務端接受請求並返回時間戳
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
final ByteBuf time = ctx.alloc().buffer(4); // (2)
// 2208988800爲1900年1月1日00:00:00~1970年1月1日00:00:00的總秒數
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
-
客戶端接收服務端的響應並轉換爲時間格式輸出
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
public class TimeServer {
/**
* 端口
*/
private int port;
public TimeServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 在 Netty,服務器和客戶機之間最大也是惟一的區別是使用了不一樣的 Bootstrap 和 Channel 實現。請看下面的代碼:
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TimeServer(port).run();
}
}
客戶端接收到響應打印結果:
Tue Dec 29 12:01:58 CST 2020
流數據傳輸
在基於流的傳輸(如 TCP/IP)中,接收到的數據被存儲到套接字接收緩衝區中。不幸的是,基於流的傳輸的緩衝區不是一個包隊列,而是一個字節隊列。這意味着,即便您將兩條消息做爲兩個獨立的數據包發送,操做系統也不會將其視爲兩條消息,而只是將其視爲一堆字節。所以,不能保證您所讀到的內容與遠程對等方所寫的內容徹底一致。
例如,假設一個操做系統的 TCP/IP 協議棧已經接收了三個數據包:

因爲基於流的協議的這個通常屬性,在應用程序中頗有可能如下面的碎片形式讀取它們:

所以,接收部分,不管是服務器端仍是客戶端,都應該將接收到的數據碎片整理成一個或多個有意義的幀,應用程序邏輯能夠很容易地理解這些幀。在上面的例子中,接收到的數據應該以下所示:

第一個解決方案
如今讓咱們回到 TIME 客戶端示例。咱們這裏也有一樣的問題。32位整數是一個很是小的數據量,它不太可能常常被分段。然而,問題在於它多是支離破碎的,而且隨着流量的增長,支離破碎的可能性也會增長。
最簡單的解決方案是建立一個內部累積緩衝區,並等待全部4個字節都被接收到內部緩衝區。如下是修改後的 TimeClientHandler 實現,它解決了這個問題:
public class TimeClientWithBufferHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buff;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
buff = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
buff.release();
buff = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // (1)
buff.writeBytes(m);
m.release();
if (buff.readableBytes() >= 4) {
long currentTimeMillis = (buff.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
-
有兩種生命週期監聽方法: handlerAdded()
and 及handlerRemoved()
; -
您能夠執行任意初始化任務,只要它不長時間阻塞; -
首先,全部接收到的數據應該累積成 buff
; -
而後,處理程序必須檢查 buff
有足夠的數據,在這個例子中是4個字節,而後繼續進行實際的業務邏輯,當更多的數據到達時,這個函數會從新調用一個方法,最終全部的4個字節都會被累積;

-
非 4 字節的數據會直接被丟棄掉。
第二種解決方案
儘管第一個解決方案已經解決了 TIME 客戶機的問題,可是修改後的處理程序看起來並不那麼幹淨。想象一個更復雜的協議,它由多個字段組成,好比一個可變長度的字段。您的 ChannelInboundHandler 實現將很快變得不可維護。
正如您可能已經注意到的,您能夠向 ChannelPipeline 添加多個 ChannelHandler,所以,您能夠將一個單片 ChannelHandler 分割爲多個模塊化的 ChannelHandler,以下降應用程序的複雜性。例如,你能夠將 TimeClientHandler 分紅兩個處理器:
-
TimeDecoder
處理碎片化問題 -
最初的簡單版本 TimeClientHandler
幸運的是,Netty 提供了一個可擴展的類,能夠幫助你寫出第一個開箱即用的類:
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
public class TimeClientWithDecoder {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new TimeDecoder()) // (5)
.addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
-
ByteToMessageDecoder
使得處理分裂問題變得容易; -
每當接收到新數據時, ByteToMessageDecoder
利用內部維護的累積緩衝區,調用decode方法來處理新數據; -
當累積緩衝區中沒有足夠的數據時 ByteToMessageDecoder
什麼都不會添加到out
緩衝區中。當收到更多的數據時會再次調用decode()
; -
若是 decode()
將一個數據添加到out
, 這意味着解碼器成功解碼了一條信息,將丟棄累積緩衝區的讀取部分。請記住,您不須要解碼多個消息,ByteToMessageDecoder
將繼續調用方法,直到它沒什麼數據能夠放入out
了; -
ChannelPipeline
添加處理程序TimeDecoder
來實現數據的分解。
還能夠經過如下方式進一步簡化解碼器:
public class TimeWithReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readBytes(4));
}
}
// 一樣的,別忘了在 ChannelPipeline 中添加相應的處理程序
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast(new TimeDecoder())
.addLast(new TimeWithReplayingDecoder())
.addLast(new TimeClientHandler());
}
});
此外,Netty 提供了開箱即用的解碼器,使您可以很是容易地實現大多數協議,並幫助您避免最終獲得一個不可維護的總體處理程序實現。更詳細的例子請參考如下軟件包:
-
io.netty.example.factorial 二進制協議 -
io.netty.example.telnet 基於文本行的協議
對象序列化傳輸
到目前爲止,咱們討論的全部示例都使用 ByteBuf 做爲協議消息的主要數據結構。實際的網絡通訊過程遠比上面的時間協議實現的要更復雜,功能也要更增強大,好比咱們經常使用的 Json 序列化傳輸,若是用 Netty,可否直接傳輸對象呢?
在 ChannelHandlers 中使用 POJO 的優點是顯而易見的;經過分離從處理程序中提取 ByteBuf 信息的代碼,您的處理程序變得更加可維護和可重用。在 TIME 協議的客戶端和服務器示例中,咱們只讀取一個32位整數,直接使用 Bytebuf 並非一個主要問題。可是,您會發如今實現現實世界的協議時有必要進行分離。
首先,咱們將咱們要傳輸的時間戳封裝成一個簡單對象:
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
增長解碼器:
public class TimeDecoderWithPojo extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
//out.add(in.readBytes(4)); // (4)
out.add(new UnixTime(in.readUnsignedInt())); // (4)
}
}
增長處理器:
public class TimeClientHandlerWithPojo extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
和前面同樣,設置客戶端的處理器:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast(new TimeDecoder())
.addLast(new TimeDecoderWithPojo())
.addLast(new TimeClientHandlerWithPojo());
}
});
響應結果以下:
經過更新的解碼器,``TimeClientHandler
再也不使用 ByteBuf。
更簡單和優雅,對不對?一樣的技術也能夠應用於服務器端。
首先是消息處理器,負責發送一個時間戳數據做爲響應結果:
public class TimeServerHandlerWithPojo extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UnixTime unixTime = new UnixTime();
System.out.println("準備發送:"+ unixTime);
final ChannelFuture f = ctx.writeAndFlush(unixTime);
f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 當異常發生的時候關閉鏈接
cause.printStackTrace();
ctx.close();
}
}
而後是編碼處理器,將Pojo
轉換爲ByteBuf
進行傳輸:
public class TimeServerEncoderHandlerWithPojo extends ChannelOutboundHandlerAdapter {
// 它是 ChannelOutboundHandler 的一個實現,它將 UnixTime 轉換回 ByteBuf。這比編寫解碼器要簡單得多,由於在對消息進行編碼時不須要處理數據包碎片和彙編。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); // (1)
// 首先,咱們傳遞原始的 ChannelPromise as-is,這樣當編碼的數據實際寫入到連線時,Netty 將其標記爲成功或失敗。
// 其次,咱們沒有調用 ctx.flush ()。有一個單獨的處理程序方法 void flush (ChannelHandlerContext ctx) ,它旨在重寫 flush ()操做。
}
}
最後是服務端ChannelPipeline
程序設置:
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new TimeServerEncoderHandlerWithPojo())
.addLast(new TimeServerHandlerWithPojo());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
服務端接收到請求發送數據:
客戶端接收到請求的響應數據:

一樣的,Netty 也爲服務端的消息編碼定義了不少拆箱即用的工具類:
public class TimeServerMessageToByteEncoderHandler extends MessageToByteEncoder<UnixTime> {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); // (1)
}
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
out.writeInt((int) msg.value());
}
}
// ChannelPipeline 設置
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast(new TimeServerEncoderHandlerWithPojo())
.addLast(new TimeServerMessageToByteEncoderHandler())
.addLast(new TimeServerHandlerWithPojo());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
關閉
關閉 Netty 應用程序一般很是簡單,只需關閉經過 shutdownly() 建立的全部 EventLoopGroups 便可。它返回一個 Future,當 EventLoopGroup 徹底終止而且屬於該組的全部通道都已關閉時,它會通知您。(前文示例已演示屢次,此處再也不贅述。)
源碼:https://gitee.com/zacsnz/architectrue-adventure/tree/master/netty-examples/netty-chapter-1
小結
Netty 做爲高性能的異步通訊框架,提供了不少不少好用的 API。
-
Channel: Channel 接口是 Netty 對網絡操做抽象類,它除了包括基本的 I/O 操做,如 bind()、connect()、read()、write() 等。比較經常使用的Channel接口實現類是NioServerSocketChannel(服務端)和NioSocketChannel(客戶端),這兩個 Channel 能夠和 BIO 編程模型中的ServerSocket以及Socket兩個概念對應上。Netty 的 Channel 接口所提供的 API,大大地下降了直接使用 Socket 類的複雜性。
-
EventLoop: 定義了 Netty 的核心抽象,用於處理鏈接的生命週期中所發生的事件。主要做用實際就是負責監聽網絡事件並調用事件處理器進行相關 I/O 操做的處理。那 Channel 和 EventLoop 直接有啥聯繫呢?Channel 爲 Netty 網絡操做(讀寫等操做)抽象類,EventLoop 負責處理註冊到其上的Channel 處理 I/O 操做,二者配合參與 I/O 操做。
-
ChannelFuture: Netty 是異步非阻塞的,全部的 I/O 操做都爲異步的。所以,咱們不能馬上獲得操做是否執行成功,可是,你能夠經過 ChannelFuture 接口的 addListener() 方法註冊一個 ChannelFutureListener,當操做執行成功或者失敗時,監聽就會自動觸發返回結果。而且,你還能夠經過ChannelFuture 的 channel() 方法獲取關聯的Channel。
-
ChannelHandler: 息的具體處理器。他負責處理讀寫操做、客戶端鏈接等事情。
-
ChannelPipeline: ChannelHandler 的鏈,提供了一個容器並定義了用於沿着鏈傳播入站和出站事件流的 API 。當 Channel 被建立時,它會被自動地分配到它專屬的ChannelPipeline。咱們能夠在 ChannelPipeline 上經過 addLast() 方法添加一個或者多個ChannelHandler ,由於一個數據或者事件可能會被多個 Handler 處理。當一個 ChannelHandler 處理完以後就將數據交給下一個 ChannelHandler 。
-
EventLoopGroup 包含多個 EventLoop(每個 EventLoop 一般內部包含一個線程),上面咱們已經說了 EventLoop 的主要做用實際就是負責監聽網絡事件並調用事件處理器進行相關 I/O 操做的處理。
-
Bootstrap 是客戶端的啓動引導類/輔助類。
-
ServerBootstrap 客戶端的啓動引導類/輔助類。
REFERENCES
-
新手入門:目前爲止最透徹的的Netty高性能原理和框架架構解析 -
線上API和源碼 -
官網入門使用說明 -
Socket和ServerSocket的簡單介紹及例子 -
Netty 4.1 Getting Start (翻譯) + Demo -
Netty實戰入門詳解——讓你完全記住什麼是Netty -
阿里大牛總結的Netty最全常見面試題,面試不再怕被問Netty了 - JavaAOE的文章 - 知乎

本文分享自微信公衆號 - 架構探險之道(zacsnz1314)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。