近期公司經過
TCP
鏈接的的方式接了一個硬件設備,用了最基礎的ServerSocket
類,參考的oracle的文檔 。 實現的比較簡單,放在github 上,不過這裏應該用Netty
纔是正解。因此,過一下Netty
的入門文檔。java本文demogit
咱們通常會用Http客戶端庫來調用web服務,獲取數據。若是一個東西是出於通常性目的設計出來的,那麼他在某些方面可能就不是最合適的。好比獲取大文件,收發郵件,展現實時的金融數據,遊戲數據傳輸等。爲了實現這些需求,須要一個爲其高度優化的特定協議。還有一個沒法避免的問題是你可能須要調用老系統的數據,可是他的協議又是特定。重點來了,如何在不犧牲可靠性
和性能
的前提下快速
實現這麼一個系統。github
用Netty。用Netty。用Netty。重要的事情說3遍。web
Netty是一個異步 事件驅動 網絡框架 ,能夠用來快速開發易維護,高性能,可擴展的服務端/客戶端。換句話說他簡化了TCP和UDP 等服務的網絡開發。bootstrap
容易開發或者快速開發並不意味着他會犧牲可維護性或者是面臨性能問題。Netty吸收了大量用於實現FTP,SMTP,HTTP協議的經驗,而且仔細當心謹慎的設計。因此,他在易於開發,追求性能,確保穩定性和靈活性上並無對任何一點有所妥協。api
有人可能會說別的框架他們也這麼說本身,那Netty到底或者爲何和他們不同。答案是他的設計理念。Netty提供的API用起來就很是舒服。如今可能不是那麼直觀,可是當你使用的時候就會體會到。數組
這節會圍繞Netty的核心構建過程,用幾個例子來讓你快速上手。學完這節你會能夠在Netty框架的基礎上學會寫client和server。promise
若是你想學的深刻一點,瞭解一下他的底層實現,第二節,架構概覽是個不錯的起點。瀏覽器
這節須要兩個東西,新版的Netty和jdk1.6+。Netty下載地址。
<dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.49.Final</version> </dependency> </dependencies>
隨着你不斷往下看,你能會對這節引入的類有疑惑,你能夠隨時經過API文檔來了解更多。類名都是帶連接的,能夠直接點過去。
世界上最簡單的協議並非輸出Hello world,而是Discard,就是過來什麼都直接丟棄,而且不給任何回覆。下面讓咱們直接從Netty提供的handler實現來處理IO事件。
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class DiscardServerHandler extends ChannelInboundHandlerAdapter {//1 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//2 // super.channelRead(ctx, msg); ((ByteBuf) msg).release();//3 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//4 // super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
有如下幾點:
咱們寫一個DiscardServerHandler
類繼承自ChannelInboundHandlerAdapter,這個ChannelInboundHandlerAdapter
繼承自抽象類ChannelHandlerAdapter
而且實現了接口ChannelInboundHandler
。ChannelInboundHandler
提供了各類各樣的可重寫的事件handler方法。這裏只要使用ChannelInboundHandlerAdapter
對ChannelInboundHandler
的默認實現就好,不須要本身去實現全部的ChannelInboundHandler
方法。
channelRead
方法咱們重寫掉了,這個方法會在收到客戶端消息的時候調用。例子中,消息msg
的類型爲ByteBuf。ByteBuf
是對byte[]
的一種抽象,可讓咱們訪問數組內容。
咱們這裏須要實現的是Discard協議,就是丟棄協議,因此須要忽略收到的全部消息。ByteBuf是一種reference-counted
的對象(能夠簡單理解指針之類的東西),必須經過顯式調用其release
方法來釋放。一般,咱們的channelRead
方法是下面這樣的
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ //對msg作一些處理 }finally { ReferenceCountUtil.release(msg); } }
Netty在處理IO的遇到exception就會進入exceptionCaught
方法。一般,須要作一下日誌記錄,而後把相關的channel(通道)關閉。這裏作法也不是固定的,你能夠先發一個帶code的Response而後再關閉。
到這一步,咱們已經實現了Discard服務的前半部分,剩下的就是寫一個main
方法而後來啓動這個DiscardServerHandler
服務。
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup();//1 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap();//2 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//3 .childHandler(new ChannelInitializer<SocketChannel>() {//4 protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128)//5 最大鏈接數128 .childOption(ChannelOption.SO_KEEPALIVE, true);//6 //綁定端口啓動服務 ChannelFuture f = b.bind(port).sync();//7 //server關閉的時候調用。由於這裏是Discard 服務,因此永遠不會調用。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { int port = 8080; if (args.length > 0) port = Integer.parseInt(args[0]); new DiscardServer(port).run(); } }
有如下幾點:
NioEventLoopGroup是一個多線程的event loop(事件環?)。Netty針對不一樣的狀況提供了多(18)種EventLoopGroup
的實現,由於這裏是一個服務端應用,因此使用NioEventLoopGroup
。new出來兩個對象,一般第一個叫boss,接收進來的鏈接。第二個,一般叫worker,由於當boss接收了鏈接以後會把連接註冊給worker,讓worker來處理後面的通訊。每一個EventLoopGroup
使用線程數以及他們如何被映射到Channel由EventLoopGroup
的實現決定,而且可能能夠經過構造函數來指定。
ServerBootstarp是一個配置server的幫助類,你可使用Channel本身來配置,可是會比較枯燥,因此,大多數狀況下直接使用這個ServerBootstrap
就好。
NioServerSocketChannel
是一個Channel的實例,用來處理進來的鏈接(上面說的channel的功能)。
ChannelInitializer是一個特殊的Handler,做用是幫助用戶配置Channel。一般的做用是把ChannelHandler放到ChannelPipeline(管道)裏面,請求會進入到Pipeline,處理就按照這個Pipeline配置的Handler來。DiscardServerHandler
就是一種Handler。
用來配置Channel的參數。順道看一下ServerBootstrap
的定義,這個ServerBootstrap
是用來啓動ServerChannel
,ServerChannel
實際上就是一個Channel
。咱們這裏實現的是一個TCP/IP server,因此,能夠設置tcpNoDelay
和keepAlive
等參數。具體設置看文檔。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>{}
option的爲接收鏈接的配置,也就是給boss用,後面的childOption爲worker配置選項。
萬事俱備,只欠把綁定端口配置上去而後啓動服務。main
方法裏面。
恭喜,搞定。用個tcp 客戶端鏈接試試~~能夠看到鏈接成功,發送了3字節,而後由於是Discard,因此沒有返回。
讓咱們稍微修改一下代碼,以便看看咱們收到的數據。按照以前的例子,須要再channelRead方法裏面作修改。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { final ByteBuf in = (ByteBuf) msg; try { System.out.println(in.toString(CharsetUtil.US_ASCII)); } finally { in.release(); } }
msg能夠直接轉換成ByteBuf
對象,而後用ByteBuf的toString方法,設置ASCII參數裝成string打印出來。
運行起來而後能夠直接在瀏覽器輸入localhost:8080訪問,就能看到傳過來的數據。
咱們寫一個Echo服務,客戶端輸入什麼,咱們就回復什麼。
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg);//1 ctx.flush();//2 } }
wirte
方法,Netty會幫咱們釋放。ctx.write(Object)
會把內容寫到緩衝區,在調用flush後再輸出出去。能夠用writeAndFlush
方法代替。測試一下,發送3個字節,收到3個字節的回覆。
這個例子用來實現一個Time協議。經過實現這個協議,咱們能夠了解Netty如何構造和發送數據。根據RFC868協議,Time協議有這麼幾步
這裏服務端忽略收到的任何客戶端數據,而是當客戶端一創建鏈接就返回數據,因此這裏不使用channelRead
方法,而是channelActive
方法。
package io.netty.example.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception {//1 final ByteBuf timeBuf = ctx.alloc().buffer(4);//2 timeBuf.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf);//3 channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { assert channelFuture == future; ctx.close(); } });//4 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
重寫的是channelActive
方法,這個方法會在鏈接進來的時候調用。
由於要返回一個int值,因此須要4個字節,經過ChannelHandlerContext
分配,而後writeAndFlush方法寫入併發送。
把數據發送給非阻塞IO流的時候不須要調用java.nio.ByteBuffer.flip()
方法,Netty的ByteBuf
沒有提供這個方法,由於他不須要。ByteBuf
內部有兩個指針,一個用於讀,一個用於寫。write的時候讀指針移動,寫指針不動,反之同理。在使用ByteBuffer
的時候若是沒有flip,數據就會亂。
Netty裏面全部的IO操做都是異步的,這樣可能會致使write沒有開始(或者沒有完成)以前就鏈接就close掉了。好比下面的代碼:
Channel ch = ...; ch.writeAndFlush(message); ch.close();//這也不是立馬關閉,也是一個ChannelFuture對象
write(writeAndFlush)
返回的是一個ChannelFuture
對象,來大體看下這個對象的解釋。
繼承自Future,表示一個Channel的IO操做的結果,不過他還沒完成,只是表示已經建立。【詳細的之後再講。】
如何能知道這個IO操做的結果呢?咱們能夠給這個ChannelFuture增長一個ChannelFutureListener
的實例(接口),而後實現它的operationComplete
方法。這裏面的方法比較簡單,就是close掉這個ChannelHandlerContext
,因此,可使用定義好的ChannelFutureListener.CLOSE
方法。像下面這樣
channelFuture.addListener(ChannelFutureListener.CLOSE);
用rdate 測試一下。測試經過。
寫完server以後就要寫client了。client程序和server程序最大的不一樣在於選擇的Bootstrap
和Channel
的實現類的差別。
package io.netty.example.time; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public static void main(String[] args) throws InterruptedException { int port = 37; String host = "192.168.1.181"; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final Bootstrap bootstrap = new Bootstrap();//1 bootstrap.group(workerGroup);//2 bootstrap.channel(NioSocketChannel.class);//3 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);//4 bootstrap.handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler());//6 } }); final ChannelFuture connectFuture = bootstrap.connect(host, port).sync();//5 connectFuture.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
Bootstrap和前面的ServerBootstrap相似,不過這個不是給non-server
非服務器用,而是給客戶端或者connectionless
非鏈接的用。
客戶端就不須要boss EventLoopGroup
了。其實前面的Server中group能用同一個。
serverBootstrap.group(workGroup, workGroup);//同一個group
channel也須要換成NioSocketChannel
,而不是NioServerSocketChannel
。
這裏直接用option方法,而不是childOption和option,由於對應client,沒有childOption的概念。
client須要去connect,而不是bind來監聽。
看一下TimeClientHander.java
package io.netty.example.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { final ByteBuf m = (ByteBuf) msg;//1 try { final long currentTimeMills = (m.readUnsignedInt() - 2208988800L) * 1000L;//2 System.out.println(new Date(currentTimeMills)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ByteBuf
。readUnsignedInt()
方法讀取數值。代碼看起來比較簡單,可是必定可能性(小几率)會有IndexOutOfBoundsException
異常,咱們下節討論。
TCP/IP
是一個典型的stream-based協議,接收數據而後放到socket buffer裏面。可是這個buffer隊列存的是byte,而不是packet數據包。因此,就算髮了兩個packet,在系統看來,他就是一堆byte。因此,沒有辦法保證你讀取到的東西和發過來的必定同樣。
舉個例子,假設收到了3個數據包,ABC,DEF,GHI
有可能收到的是下面這樣的
因此,server和client須要一種規則來劃分數據包,而後對方就知道每一個包究竟是啥樣的。
其實道理上來講由於int數據包也就4個字節,因此不太會被分片,不太容易出現IndexOutOfBoundsException
異常。可是,隨着數據包變大,分片的可能性就會增長,到時候異常出現的機率就會增大。
由於咱們知道收到的數據是4個字節,因此,咱們能夠分配一個4本身的空間,等到一滿,咱們就知道已經收到該有的數據包了,就直接處理就好。來改一下咱們的TimeClientHandler
package io.netty.example.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeClientHandler2 extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { buf = ctx.alloc().buffer(4);//1 } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { buf.release();//1 buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { final ByteBuf m = (ByteBuf) msg; buf.writeBytes(m);//2 m.release(); if (buf.readableBytes() >= 4) {//3 final long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
handlerAdded
和handlerRemoved
方法,在這兩個方法裏面初始化或者銷燬buf對象。只要這兩個方法不會阻塞太長時間,是沒有關係的。雖然上面的問題是解決了,可是由於咱們曉得發過來的數據是4個字節的(就一個字段),因此比較好處理。可是,若是這個對象是一個比較複雜的業務對象,那麼要維護這個類就會比較麻煩。
咱們能夠對這個TimeClientHandler2
的功能拆解成2部分。
TimeDecoder
專門處理數據包分片的問題。TimeClientHandler2
仍是保持簡單。package io.netty.example.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class TimeDecoder extends ByteToMessageDecoder {//1 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//2 if (in.readableBytes()<4) return;//3 out.add(in.readBytes(4));//4 } }
ChannelHandler
對象。他專門用來處理分片問題。ByteToMessageDecoder
會在有新的數據進來的時候調用decode方法,內部維護一個buffer。ByteToMessageDecoder
能夠根據本身的業務邏輯來執行。前面的例子讀寫數據的核心都是ByteBuf
類,在ChannelHandler
裏面直接把object msg 轉成ByteBuf,而後操做。若是咱們能經過POJO來操做,那麼,代碼的可維護性明顯會高一些。讓咱們來改造一下咱們的代碼。
第一步,定義一個UnixTime
類,來表示咱們要處理的對象。
package io.netty.example.time2; import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long getValue() { return value; } @Override public String toString() { return "轉換出來的時間是:"+ new Date((getValue() - 2208988800L) * 1000L).toString(); } }
第二步,咱們改一下咱們的TimeDecoder
來產生一個UnixTime對象。
package io.netty.example.time2.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes()<4) return; out.add(new UnixTime(in.readUnsignedInt())); } }
第三步,在ChannelHandler
裏面咱們直接按照UnixTime
對象操做。
package io.netty.example.time2.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { UnixTime time = (UnixTime) msg; System.out.println(time); ctx.close(); } }
第四步,同理,server端也能夠相似的修改。
package io.netty.example.time2.server; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ChannelFuture channelFuture = ctx.writeAndFlush(new UnixTime()); channelFuture.addListener(ChannelFutureListener.CLOSE); } }
相較於之前的分配空間的操做,明顯簡單了許多。
第五步,如今,還缺一個東西,一個encoder,用來把UnixTime
轉成ByteBuf
,這個是逃不開的,哈哈。
package io.netty.example.time2.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { UnixTime m = (UnixTime) msg; ByteBuf buffer = ctx.alloc().buffer(4); buffer.writeInt((int)m.getValue()); ctx.write(buffer,promise);//1 } }
flush
方法,由於ChannelOutboundHandlerAdapter
有個flush
會自動調用。其實這個用MessageToByteEncoder
package io.netty.example.time2.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class TimeEncoder2 extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception { out.writeInt((int) msg.getValue()); } }
MessageToByteEncoder
是一個ChannelOutboundHandlerAdapter
的實現抽象類,專門負責把POJO對象轉成ByteBuf。
最後一步,把Encoder像以前Decoder同樣加到ChannelPipeline
裏面。你懂的。
client.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());//客戶端解碼 加進去 } });
server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeEncoder2(), new TimeServerHandler());//服務端直接操做UnixTime,須要編碼,加進去 } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);
代碼結構看起來是這樣的
關閉比較簡單,調用shutdownGracefully()
便可,而後會返回一個Future對象。
強烈建議看看官方的例子。