在使用Netty以前先介紹下Netty的經常使用API,對其有一個大概的瞭解。java
EventLoop如同它的名字,它是一個無限循環(Loop),在循環中不斷處理接收到的事件(Event)。react
Netty線程模型的基石是創建在EventLoop上的,從設計上來看,EventLoop採用了一種協同設計,它創建在兩個基本的API之上:Concurrent和Channel,也就是併發和網絡。併發是由於它採用了線程池來管理大量的任務,而且這些任務能夠併發的執行。其繼承了EventExecutor接口,而EventExecutor就是一個事件的執行器。另外爲了與Channel的事件進行交互,EventLoop繼承了EventLoopGroup接口。一個詳細的EventLoop類繼承層次結構以下:程序員
一個Netty服務端啓動時,一般會有兩個NioEventLoopGroup:一個是監聽線程組,主要是監聽客戶端請求,另外一個是工做線程組,主要是處理與客戶端的數據通信。算法
Netty客戶端只有一個NioEventLoopGroup,就是用來處理與服務端通訊的線程組。編程
NioEventLoopGroup能夠理解爲一個線程池,內部維護了一組線程,每一個線程負責處理多個Channel上的事件,而一個Channel只對應於一個線程,這樣能夠迴避多線程下的數據同步問題。NioEventLoopGroup的類圖以下:設計模式
ServerBootStrap是Netty服務端啓動配置類,BootStrap是Netty客戶端啓動配置類。promise
1. ServerBootstrap緩存
group(bossGroup, workerGroup)服務器
設置通信模式,調用的是實現io.netty.channel.Channel接口的類。如:NioSocketChannel、NioServerSocketChannel,服務端通常能夠選NioServerSocketChannel。網絡
option / handler / attr方法都定義在AbstractBootstrap中, 因此服務端和客戶端的引導類方法調用都是調用的父類的對應方法。
對於服務端而言,有兩種通道須要處理, 一種是ServerSocketChannel:用於處理用戶鏈接的accept操做, 另外一種是SocketChannel,表示對應客戶端鏈接。而對於客戶端,通常都只有一種channel,也就是SocketChannel。
所以以child開頭的方法,都定義在ServerBootstrap中,表示處理或配置服務端接收到的對應客戶端鏈接的SocketChannel通道。
2. BootStrap
group(workGroup)
設置通信模式,調用的是實現io.netty.channel.Channel接口的類。如:NioSocketChannel、NioServerSocketChannel,客戶端通常能夠選NioSocketChannel。
3. ChannelOption
ChannelOption的各類屬性在套接字選項中都有對應,下面簡單的總結一下ChannelOption的含義已及使用的場景。
(1) ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG對應的是tcp/ip協議listen函數中的backlog參數,函數listen(int socketfd,int backlog)用來初始化服務端可鏈接隊列,服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog參數指定了隊列的大小。
(2) ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR對應於套接字選項中的SO_REUSEADDR,這個參數表示容許重複使用本地地址和端口,好比,某個服務器進程佔用了TCP的80端口進行監聽,此時再次監聽該端口就會返回錯誤,使用該參數就能夠解決問題,該參數容許共用該端口,這個在服務器程序中比較常使用;好比某個進程非正常退出,該程序佔用的端口可能要被佔用一段時間才能容許其餘進程使用,並且程序死掉之後,內核一須要必定的時間纔可以釋放此端口,不設置SO_REUSEADDR就沒法正常使用該端口。
(3) ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE參數對應於套接字選項中的SO_KEEPALIVE,該參數用於設置TCP鏈接,當設置該選項之後,鏈接會測試連接的狀態,這個選項用於可能長時間沒有數據交流的鏈接。當設置該選項之後,若是在兩小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文。
(4) ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF參數對應於套接字選項中的SO_SNDBUF,ChannelOption.SO_RCVBUF參數對應於套接字選項中的SO_RCVBUF這兩個參數用於操做接收緩衝區和發送緩衝區的大小,接收緩衝區用於保存網絡協議站內收到的數據,直到應用程序讀取成功,發送緩衝區用於保存發送數據,直到發送成功。
(5) ChannelOption.SO_LINGER
ChannelOption.SO_LINGER參數對應於套接字選項中的SO_LINGER,Linux內核默認的處理方式是當用戶調用close()方法的時候,函數返回,在可能的狀況下,儘可能發送數據,不必定保證會發生剩餘的數據,形成了數據的不肯定性,使用SO_LINGER能夠阻塞close()的調用時間,直到數據徹底發送
(6) ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY參數對應於套接字選項中的TCP_NODELAY,該參數的使用與Nagle算法有關Nagle算法是將小的數據包組裝爲更大的幀而後進行發送,而不是輸入一次發送一次,所以在數據包不足的時候會等待其餘數據的到了,組裝成大的數據包進行發送,雖然該方式有效提升網絡的有效負載,可是卻形成了延時,而該參數的做用就是禁止使用Nagle算法,使用於小數據即時傳輸,於TCP_NODELAY相對應的是TCP_CORK,該選項是須要等到發送的數據量最大的時候,一次性發送數據,適用於文件傳輸。
ChannelInitializer的類圖:
ChannelInitializer繼承於ChannelInboundHandler接口。
ChannelInitializer是一個抽象類,不能直接使用。
1. 抽象方法 initChannel
ChannelInitializer的實現類必需要重寫這個方法,這個方法在Channel被註冊到EventLoop的時候會被調用
2. ChannelInitializer的主要目的是爲程序員提供了一個簡單的工具,用於在某個Channel註冊到EventLoop後,對這個Channel執行一些初始化操做。ChannelInitializer雖然會在一開始會被註冊到Channel相關的pipeline裏,可是在初始化完成以後,ChannelInitializer會將本身從pipeline中移除,不會影響後續的操做。
ChannelPipeline = Channel + Pipeline,也就是說首先它與Channel綁定,而後它是起到相似於管道的做用:字節流在ChannelPipeline上流動,流動的過程當中被ChannelHandler修飾,最終輸出。
ChannelPipeline類圖:
ChannelPipeline只有兩個子類,直接一塊兒放上來好了,其中EmbeddedChannelPipeline主要用於測試。
每一個channel內部都會持有一個ChannelPipeline對象pipeline.
pipeline默認實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext鏈表。
ChannelPipeline能夠理解爲ChannelHandler的容器,全部ChannelHandler都會註冊到ChannelPipeline中,並按順序組織起來。
ChannelHandler相似於Servlet的Filter過濾器,負責對I/O事件或者I/O操做進行攔截和處理,它能夠選擇性地攔截和處理本身感興趣的事件,也能夠透傳和終止事件的傳遞。基於ChannelHandler接口,用戶能夠方便地進行業務邏輯定製,例如打印日誌、統一封裝異常信息、性能統計和消息編解碼等。
ChannelHandler支持註解,目前支持的註解有兩種。
1. ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter是ChannelInboundHandler的一個簡單實現,默認狀況下不會作任何處理,只是簡單的將操做經過fire*方法傳遞到ChannelPipeline中的下一個ChannelHandler中讓鏈中的下一個ChannelHandler去處理。
須要注意的是信息通過channelRead方法處理以後不會自動釋放(由於信息不會被自動釋放因此能將消息傳遞給下一個ChannelHandler處理)。
(1) SimpleChannelInboundHandler
SimpleChannelInboundHandler支持泛型的消息處理,默認狀況下消息處理完將會被自動釋放,沒法提供fire*方法傳遞給ChannelPipeline中的下一個ChannelHandler,若是想要傳遞給下一個ChannelHandler須要調用ReferenceCountUtil#retain方法。
通常用netty來發送和接收數據都會繼承SimpleChannelInboundHandler和ChannelInboundHandlerAdapter這兩個抽象類。
其實用這兩個抽象類是有講究的,在客戶端的業務Handler繼承的是SimpleChannelInboundHandler,而在服務器端繼承的是ChannelInboundHandlerAdapter。
最主要的區別就是SimpleChannelInboundHandler在接收到數據後會自動release掉數據佔用的Bytebuffer資源(自動調用Bytebuffer.release())。而爲什麼服務器端不能用呢,由於咱們想讓服務器把客戶端請求的數據發送回去,而服務器端有可能在channelRead方法返回前尚未寫完數據,所以不能讓它自動release。
2. ChannelOutboundHandlerAdapter
表示出去的動做,監聽本身的IO操做,好比connect,bind等,在重寫這個Adapter的方法時,記得執行super.xxxx,不然動做沒法執行。
Netty 中的事件分爲inbound 事件和outbound 事件。inbound 事件一般由I/O線程觸發,例如TCP 鏈路創建事件、鏈路關閉事件、讀事件、異常通知事件等。Outbound 事件一般是I/O 用戶主動發起的網絡I/O 操做,例如用戶發起的鏈接操做、綁定操做、消息發送等操做。
咱們經常使用的inbound事件有:
經常使用的outbound事件有:
ChannelPromise是ChannelFuture的擴展,容許設置I/O操做的結果,使ChannelFutureListener能夠執行相關操做。
1) 通道狀態主要包括:打開、關閉、鏈接
2) 通道主要的IO操做,讀(read)、寫(write)、鏈接(connect)、綁定(bind)。
3) 全部的IO操做都是異步的,調用諸如read,write方法後,並不保證IO操做完成,但會返回一個憑證,在IO操做成功,取消或失敗後會記錄在該憑證中。
4) channel有父子關係,SocketChannel是經過ServerSocketChannel接受建立的,故SocketChannel的parent()方法返回的就是ServerSocketChannel。
5) 在Channel使用完畢後,請調用close方法,釋放通道佔用的資源。
//返回全局惟一的channel id ChannelId id(); //返回該Channel註冊的線程模型,先理解爲Ractor模型的Ractor線程。 EventLoop eventLoop(); //返回該Channel由誰建立的,ServerSocketChannel返回null,SocketChannel返回建立它的ServerSocketChannel Channel parent(); //返回通道的配置信息 ChannelConfig config(); //通道是否打開 boolean isOpen(); //該通道是否已經註冊在事件模型中,此處先參考Nio編程模型,一個經過須要註冊在Register上 boolean isRegistered(); //通道是否激活 boolean isActive(); //通道是否支持 調用disconnect方法後,調用connect方法 ChannelMetadata metadata(); //返回綁定的地址,服務端的Channel返回監聽的地址,而客戶端的Channel返回鏈接到服務端的本地套接字。 SocketAddress localAddress(); //返回channel的遠程套接字。 SocketAddress remoteAddress(); //通道的關閉憑證(許可),這裏是多線程編程一種典型的設計模式,一個channle返回一個固定的 ChannelFuture closeFuture(); //是否可寫,若是通道的寫緩衝區未滿,即返回true,表示寫操做能夠當即操做緩衝區,而後返回。 boolean isWritable(); Unsafe unsafe(); //返回管道 ChannelPipeline pipeline(); //返回ByteBuf內存分配器 ByteBufAllocator alloc(); //諸如newPromise,newSuccessedFuture()方法,就是返回一個憑證,用來保存通知結果的 ChannelPromise newPromise(); ChannelProgressivePromise newProgressivePromise(); ChannelFuture newSucceededFuture(); ChannelFuture newFailedFuture(Throwable cause); ChannelPromise voidPromise(); //綁定 ChannelFuture bind(SocketAddress localAddress); //鏈接 ChannelFuture connect(SocketAddress remoteAddress); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); //斷開鏈接 ChannelFuture disconnect(); //關閉,釋放通道資源 ChannelFuture close(); ChannelFuture deregister(); ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise); ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); ChannelFuture disconnect(ChannelPromise promise); ChannelFuture close(ChannelPromise promise); ChannelFuture deregister(ChannelPromise promise); Channel read(); ChannelFuture write(Object msg); ChannelFuture write(Object msg, ChannelPromise promise); Channel flush(); ChannelFuture writeAndFlush(Object msg, ChannelPromise promise); ChannelFuture writeAndFlush(Object msg); interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHandle(); ChannelHandlerInvoker invoker(); SocketAddress localAddress(); SocketAddress remoteAddress(); void register(EventLoop eventLoop, ChannelPromise promise); void bind(SocketAddress localAddress, ChannelPromise promise); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); void disconnect(ChannelPromise promise); void close(ChannelPromise promise); void closeForcibly(); void deregister(ChannelPromise promise); void beginRead(); void write(Object msg, ChannelPromise promise); void flush(); ChannelPromise voidPromise(); //返回通道的環形緩存區 ChannelOutboundBuffer outboundBuffer(); }
Channel類圖:
Netty的Future接口繼承了JDK的Future
接口,同時提供了更多的方法:
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> sync() throws InterruptedException; Future<V> await() throws InterruptedException; V getNow(); }
任務成功完成後isSuccess()返回true
任務執行過程當中有異常,cause()會返回異常對象
任務被取消執行,父接口方法isCancelled返回true
以上3種狀況isDone()均爲true:
//任務完成 if (task.isDone()) { if (task.isSuccess()) { // 成功 } else if (task.isCancelled()) { // 被取消 } else { // 異常 System.out.print(task.cause()) } }
wait和sync都會阻塞,並等待任務完成
getNow()不會阻塞,會當即返回,但任務還沒有執行完成時,會返回null
addListener方法在當前Future對象中添加監聽器,當任務完成時,會通知全部的監聽器。
1. ChannelFuture
ChannelFuture繼承了Netty的Future接口,表明 Netty channel的I/O操做的執行結果。在Netty中全部的I/O操做都是異步的,會當即返回一個表明I/O操做的結果,即ChannelFuture。
在得到執行結果時,推薦使用添加監聽器,監聽執行完成事件operaionCompleted,而不要使用await方法。在ChannelHandler中調用await,會形成死鎖。由於ChannelHandler中的方法一般是I/O線程調用的,再調用await會形成I/O阻塞。
//錯誤 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelFuture future = ctx.channel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... } // 正確 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelFuture future = ctx.channel().close(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { // Perform post-closure operation // ... } }); }
即便是經過添加ChannelFutureListener的方式獲取執行結果,但要注意的是:回調方法operationComplete也是由I/O線程調用的,因此也不能在其中執行耗時任務。如必須,則啓用線程池執行。
ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()) .bind(8899) .sync();
bind方法是異步的,其返回值是ChannelFuture類型。須要調用sync()同步方法,等待綁定動做執行完成。
1. ByteToMessageDecoder
2. MessageToMessageDecoder
3. LengthFieldBasedFrameDecoder
4. MessageToByteEncoder
5. MessageToMessageEncoder
6. LengthFieldPrepender