❀ 衆所周知:html
Netty 是一款基於 NIO 客戶、服務器端的 Java 開源編程框架,提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
❀ 通俗來說:java
Netty 一個很是好用的處理 Socket 的 Jar 包,能夠用它來開發服務器和客戶端。編程
Netty 做爲一個優秀的網絡通訊框架,許多開源項目都使用它來構建通訊層。好比 Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper甚至咱們經常使用的 Spring 等等。bootstrap
更重要的是,Netty 是開發高性能 Java 服務器的必學框架。api
能夠說做爲一個 Java 工程師,要了解 Java 服務器的高階知識,Netty 是一個必需要學習的東西。promise
Talk is cheap, show me the code!
接下來從代碼中感覺一下 Netty,首先實現一個 discard(丟棄)服務器,即對收到的數據不作任何處理。安全
實現 ChannelInBoundHandlerAdapter 首先咱們從 handler 的實現開始, Netty 使用 handler 來處理 I/O 事件。服務器
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 丟棄收到的數據 ((ByteBuf) msg).release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
第 6 行,ByteBuf 是一個引用計數對象,這個對象必須顯式地調用 release() 方法來釋放。處理器的職責是釋放全部傳遞處處理器的引用計數對象,下面是比較常見的 chanelRead() 方法實現:網絡
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
啓動 Handler 實現 handler 後,咱們須要一個 main() 方法來啓動它。數據結構
public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { // 接收進來的鏈接 EventLoopGroup boss = new NioEventLoopGroup(); // 處理已經接收的鏈接 EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加自定義的 handler socketChannel.pipeline().addLast(new DiscardServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); // 綁定端口,開始接收進來的鏈接 ChannelFuture channelFuture = bootstrap.bind(port).sync(); // 關閉 channelFuture.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new DiscardServer(port).run(); } }
EventLoopGroup
是用來處理 I/O 操做的多線程事件循環器,Netty 提供了許多不一樣的 EventLoopGroup 的實現用來處理不一樣的傳輸。在本例咱們實現了一個服務端應用,所以須要兩個 EventLoopGroup
。第一個用來接收進來的鏈接,常被稱做 boss ;第二個用來處理已經接收的鏈接,成爲 worker。一旦 boss 接收到一個新進來的鏈接,就會把鏈接的信息註冊到 worker 上面。ServerBootstrap
是一個啓動 NIO 服務的輔助啓動類。NioServerSocketChannel
用來講明一個新的 Channel 如何接收進來的鏈接。ChannelInitializer
用來幫助使用者建立一個新的 channel ,同時可使用 pipline 指定一些特定的處理器。查看接收到的數據 如此,一個基於 Netty 的服務端程序就完成了,可是如今啓動起來咱們看不到任何交互,因此咱們稍微修改一下 DiscardServerHandler
類的 channelRead()
方法,能夠查看到客戶端發來的消息。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; try { while (byteBuf.isReadable()) { System.out.print((char) byteBuf.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); } }
DiscardServer
,使用 telnet
來測試一下。控制檯接收到了命令行發來的消息:
咱們已經實現了服務器能夠接收客戶端發來的消息,一般服務器會對客戶端發來的請求做出迴應,下面就經過 ECHO 協議來實現對客戶端的消息響應。
ECHO 協議即會把客戶端發來的數據原樣返回,因此也戲稱「乒乓球」協議。
在上述代碼的基礎上面,咱們只需對 DiscardServerHandler
類的 channelRead()
方法稍加修改:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); ctx.flush(); }
cxt.writeAndFlush(msg)
以達到一樣的目的。再次運行 telnet
命令,就會接受到你發送的信息。
接下來咱們基於 TIME 協議,實現構建和發送一個消息,而後在完成時關閉鏈接。和以前的例子不一樣的是在不接受任何請求時會發送一個含 32 位的整數的消息,而且一旦消息發送就會當即關閉鏈接。
TIME 協議能夠提供機器可讀的日期時間信息。
咱們會在鏈接建立時發送時間消息,因此須要覆蓋 channelActive()
方法:
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 分配空間 final ByteBuf time = ctx.alloc().buffer(4); // 獲取 32 位時間戳並寫入 time.writeInt((int) (System.currentTimeMillis() / 1000L)); final ChannelFuture future = ctx.writeAndFlush(time); // 添加監聽器 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { assert future == channelFuture; // 關閉鏈接 ctx.close(); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
channelActive()
方法將會在鏈接被創建而且準備進行通訊時被調用。Buffer
時,在讀寫操做之間,須要調用 buffer.flip( )
方法設置指針位置。可是在在 Netty 中不須要這樣操做,緣由是 Netty 提供了兩個指針,一個讀指針和一個寫指針,在讀寫時二者不相互影響。不再用擔憂忘記調用 flip( )
方法時數據爲空或者數據錯誤啦。ctx.writeAndFlush(time)
後會返回一個 ChannelFuture
對象,表明着尚未發生的一次 I/O 操做。這意味着任何一個請求操做都不會立刻被執行,由於在 Netty 裏全部的操做都是異步的。這樣來看,咱們想完成消息發送後關閉鏈接,直接在後邊調用 ctx.close( )
可能不能馬上關閉鏈接。返回的 ChannelFuture
對象在操做完成後會通知它的監聽器,繼續執行操做完成後的動做。對於時間服務端不能直接用 telnet
的方式測試,由於不能靠人工把一個 32 位的二進制數據翻譯成時間,因此下面將實現一個時間客戶端。
與服務端的實現惟一不一樣的就是使用了不一樣的 Bootstrap 和 Channel 實現:
public class TimeClient { private String host; private int port; public TimeClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); // 啓動 ChannelFuture future = bootstrap.connect(host, port).sync(); // 等待鏈接關閉 future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { TimeClient timeClient = new TimeClient("localhost", 8080); timeClient.run(); } }
再稍微改動一下 handler :
public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 在 TCP/IP 中,Netty 會把讀到的數據放入 ByteBuf 中 ByteBuf byteBuf = (ByteBuf) msg; try { long time = byteBuf.readUnsignedInt() * 1000L; System.out.println(new Date(time)); ctx.close(); }finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
分別啓動 TimeServer 和 TimeClient ,控制檯打印出了當前時間:
然而,屢次運行後處理器有時候會由於拋出 IndexOutOfBoundsException 而拒絕工做。帶着這個問題,繼續往下面看。
比較典型的基於流傳輸的 TCP/IP 協議,也就是說,應用層兩個不一樣的數據包,在 TCP/IP 協議傳輸時,可能會組合或者拆分應用層協議的數據。因爲兩個數據包之間並沒有邊界區分,可能致使消息的讀取錯誤。
不少資料也稱上述這種現象爲 TCP 粘包,而值得注意的是:
一、TCP 協議自己設計就是面向流的,提供可靠傳輸。 二、正由於面向流,對於應用層的數據包而言,沒有邊界區分。這就須要應用層主動處理不一樣數據包之間的組裝。 三、發生粘包現象不是 TCP 的缺陷,只是應用層沒有主動作數據包的處理。
回到上面程序,這也就是上述異常發生的緣由。一個 32 位整型是很是小的數據,它並不見得會被常常拆分到到不一樣的數據段內。然而,問題是它確實可能會被拆分到不一樣的數據段內。
比較常見的兩種解決方案就是基於長度或者基於終結符,繼續以上面的 TIME 協議程序爲基礎,着手解決這個問題。由於只發送一個 32 位的整形時間戳,咱們採用基於數據長度的方式:
最簡單的方案是構造一個內部的可積累的緩衝,直到4個字節所有接收到了內部緩衝。修改一下 TimeClientHandler
的代碼:
public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; private static final int CAPACITY = 4; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { buf = ctx.alloc().buffer(CAPACITY); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { buf.release(); buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; buf.writeBytes(byteBuf); byteBuf.release(); // 數據大於或等於 4 字節 if (buf.readableBytes() >= CAPACITY) { long time = buf.readUnsignedInt() * 1000L; System.out.println(new Date(time)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
其中覆蓋了 handler 生命週期的兩個方法:
handlerAdded()
:當檢測到新的鏈接以後,調用ch.pipeline().addLast(new LifeCycleTestHandler())
以後的回調,表示當前的channel中已經成功添加了一個邏輯處理器handlerRemoved()
:在鏈接關閉後把這條鏈接上的全部邏輯處理器所有移除掉。儘管上述方案已經解決了 TIME 客戶端的問題了,可是在處理器中增長了邏輯,咱們能夠把處理消息的部分抽取出來,成爲一個單獨的處理器,而且能夠增長多個 ChannelHandler 到 ChannelPipline ,每一個處理器各司其職,減小模塊的複雜度。
由此,拆分出一個 TimeDecoder 用於處理消息:
public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readBytes(4)); } } }
ByteToMessageDecoder
繼承自 ChannelInboundHandlerAdapter
,每當有新數據接收的時候,ByteToMessageDecoder
都會調用 decode()
方法來處理內部的那個累積緩衝。decode()
方法裏增長了一個對象到 out 對象裏,這意味着解碼器解碼消息成功。ByteToMessageDecoder
將會丟棄在累積緩衝裏已經被讀過的數據。最後,修改 TimeClient 的代碼,將 TimeDecoder 加入 ChannelPipline :
bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
除此以外,Netty還提供了更多開箱即用的解碼器使你能夠更簡單地實現更多的協議,幫助你避免開發一個難以維護的處理器實現,感興趣的小夥伴能夠自行了解。
上述的例子咱們一直在使用 ByteBuf 做爲協議消息的主要數據結構,可是實際使用中,須要傳輸的消息更加複雜,抽象爲對象來處理更加方便。繼續以 TIME 客戶端和服務器爲基礎,使用自定義的對象代替 ByteBuf 。
定義保存時間的對象 OurTime :
public class OurTime { private final long value; public OurTime() { this(System.currentTimeMillis() / 1000L); } public OurTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date(value() * 1000L).toString(); } }
修改 TimeDecoder 類,返回 OurTime 類:
public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(new OurTime(in.readUnsignedInt())); } } }
修改後的 TimeClientHandler 類,處理新消息更加簡潔:
public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { OurTime ourTime = (OurTime) msg; System.out.println(ourTime); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
而對於服務端來講,大同小異。
修改 TimeServerHandler 的代碼:
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
如今,惟一缺乏的功能是一個編碼器,是ChannelOutboundHandler的實現,用來將 OurTime 對象從新轉化爲一個 ByteBuf。這是比編寫一個解碼器簡單得多,由於沒有須要處理的數據包編碼消息時拆分和組裝。
public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (OurTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
在這幾行代碼裏還有幾個重要的事情。第一,經過 ChannelPromise,當編碼後的數據被寫到了通道上 Netty 能夠經過這個對象標記是成功仍是失敗。第二, 咱們不須要調用 cxt.flush()。由於處理器已經單獨分離出了一個方法 void flush(ChannelHandlerContext cxt),若是像本身實現 flush() 方法內容能夠自行覆蓋這個方法。
進一步簡化操做,你可使用 MessageToByteEncode:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
最後在 TimeServerHandler 以前把 TimeEncoder 插入到ChannelPipeline。
相信讀完這篇文章的從頭到尾,小夥伴們對使用 Netty 編寫一個客戶端和服務端有了大概的瞭解。後面咱們將繼續探究 Netty 的源碼實現,並結合其涉及的基礎知識進行了解、深刻。
❤ 轉載請註明本文地址或來源,謝謝合做 ❤