Netty的經常使用API(二)

在使用Netty以前先介紹下Netty的經常使用API,對其有一個大概的瞭解。java

1、EventLoop和EventLoopGroup

EventLoop如同它的名字,它是一個無限循環(Loop),在循環中不斷處理接收到的事件(Event)。react

Netty線程模型的基石是創建在EventLoop上的,從設計上來看,EventLoop採用了一種協同設計,它創建在兩個基本的API之上:Concurrent和Channel,也就是併發和網絡。併發是由於它採用了線程池來管理大量的任務,而且這些任務能夠併發的執行。其繼承了EventExecutor接口,而EventExecutor就是一個事件的執行器。另外爲了與Channel的事件進行交互,EventLoop繼承了EventLoopGroup接口。一個詳細的EventLoop類繼承層次結構以下:程序員

一個Netty服務端啓動時,一般會有兩個NioEventLoopGroup:一個是監聽線程組,主要是監聽客戶端請求,另外一個是工做線程組,主要是處理與客戶端的數據通信。算法

Netty客戶端只有一個NioEventLoopGroup,就是用來處理與服務端通訊的線程組。編程

NioEventLoopGroup能夠理解爲一個線程池,內部維護了一組線程,每一個線程負責處理多個Channel上的事件,而一個Channel只對應於一個線程,這樣能夠迴避多線程下的數據同步問題。NioEventLoopGroup的類圖以下:設計模式

2、Bootstrap or ServerBootstrap

ServerBootStrap是Netty服務端啓動配置類,BootStrap是Netty客戶端啓動配置類。promise

1. ServerBootstrap緩存

  • 綁定線程組,設置react模式的主線程池 以及 IO 操做線程池

group(bossGroup, workerGroup)服務器

  • channel(Class<? extends C> channelClass)

設置通信模式,調用的是實現io.netty.channel.Channel接口的類。如:NioSocketChannel、NioServerSocketChannel,服務端通常能夠選NioServerSocketChannel。網絡

  • option / handler / attr 方法
    • option: 設置通道的選項參數, 對於服務端而言就是ServerSocketChannel, 客戶端而言就是SocketChannel;
    • handler: 設置主通道的處理器, 對於服務端而言就是ServerSocketChannel,也就是用來處理Acceptor的操做;對於客戶端的SocketChannel,主要是用來處理 業務操做;
    • attr: 設置通道的屬性;

option / handler / attr方法都定義在AbstractBootstrap中, 因此服務端和客戶端的引導類方法調用都是調用的父類的對應方法。

  • childHandler / childOption / childAttr 方法(只有服務端ServerBootstrap纔有child類型的方法)

對於服務端而言,有兩種通道須要處理, 一種是ServerSocketChannel:用於處理用戶鏈接的accept操做, 另外一種是SocketChannel,表示對應客戶端鏈接。而對於客戶端,通常都只有一種channel,也就是SocketChannel。

所以以child開頭的方法,都定義在ServerBootstrap中,表示處理或配置服務端接收到的對應客戶端鏈接的SocketChannel通道。

2. BootStrap

  • 綁定線程組,設置IO操做線程池

group(workGroup)

  • channel(Class<? extends C> channelClass)

設置通信模式,調用的是實現io.netty.channel.Channel接口的類。如:NioSocketChannel、NioServerSocketChannel,客戶端通常能夠選NioSocketChannel。

3. ChannelOption

ChannelOption的各類屬性在套接字選項中都有對應,下面簡單的總結一下ChannelOption的含義已及使用的場景。

(1) ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG對應的是tcp/ip協議listen函數中的backlog參數,函數listen(int socketfd,int backlog)用來初始化服務端可鏈接隊列,服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog參數指定了隊列的大小。

(2) ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR對應於套接字選項中的SO_REUSEADDR,這個參數表示容許重複使用本地地址和端口,好比,某個服務器進程佔用了TCP的80端口進行監聽,此時再次監聽該端口就會返回錯誤,使用該參數就能夠解決問題,該參數容許共用該端口,這個在服務器程序中比較常使用;好比某個進程非正常退出,該程序佔用的端口可能要被佔用一段時間才能容許其餘進程使用,並且程序死掉之後,內核一須要必定的時間纔可以釋放此端口,不設置SO_REUSEADDR就沒法正常使用該端口。

(3) ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE參數對應於套接字選項中的SO_KEEPALIVE,該參數用於設置TCP鏈接,當設置該選項之後,鏈接會測試連接的狀態,這個選項用於可能長時間沒有數據交流的鏈接。當設置該選項之後,若是在兩小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文。

(4) ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF參數對應於套接字選項中的SO_SNDBUF,ChannelOption.SO_RCVBUF參數對應於套接字選項中的SO_RCVBUF這兩個參數用於操做接收緩衝區和發送緩衝區的大小,接收緩衝區用於保存網絡協議站內收到的數據,直到應用程序讀取成功,發送緩衝區用於保存發送數據,直到發送成功。

(5) ChannelOption.SO_LINGER

ChannelOption.SO_LINGER參數對應於套接字選項中的SO_LINGER,Linux內核默認的處理方式是當用戶調用close()方法的時候,函數返回,在可能的狀況下,儘可能發送數據,不必定保證會發生剩餘的數據,形成了數據的不肯定性,使用SO_LINGER能夠阻塞close()的調用時間,直到數據徹底發送

(6) ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY參數對應於套接字選項中的TCP_NODELAY,該參數的使用與Nagle算法有關Nagle算法是將小的數據包組裝爲更大的幀而後進行發送,而不是輸入一次發送一次,所以在數據包不足的時候會等待其餘數據的到了,組裝成大的數據包進行發送,雖然該方式有效提升網絡的有效負載,可是卻形成了延時,而該參數的做用就是禁止使用Nagle算法,使用於小數據即時傳輸,於TCP_NODELAY相對應的是TCP_CORK,該選項是須要等到發送的數據量最大的時候,一次性發送數據,適用於文件傳輸。

3、ChannelInitializer

 ChannelInitializer的類圖:

ChannelInitializer繼承於ChannelInboundHandler接口。

ChannelInitializer是一個抽象類,不能直接使用。

1. 抽象方法 initChannel

ChannelInitializer的實現類必需要重寫這個方法,這個方法在Channel被註冊到EventLoop的時候會被調用

2. ChannelInitializer的主要目的是爲程序員提供了一個簡單的工具,用於在某個Channel註冊到EventLoop後,對這個Channel執行一些初始化操做。ChannelInitializer雖然會在一開始會被註冊到Channel相關的pipeline裏,可是在初始化完成以後,ChannelInitializer會將本身從pipeline中移除,不會影響後續的操做。

4、ChannelPipeline

ChannelPipeline = Channel + Pipeline,也就是說首先它與Channel綁定,而後它是起到相似於管道的做用:字節流在ChannelPipeline上流動,流動的過程當中被ChannelHandler修飾,最終輸出。

ChannelPipeline類圖:

ChannelPipeline只有兩個子類,直接一塊兒放上來好了,其中EmbeddedChannelPipeline主要用於測試。

每一個channel內部都會持有一個ChannelPipeline對象pipeline.
pipeline默認實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext鏈表。

ChannelPipeline能夠理解爲ChannelHandler的容器,全部ChannelHandler都會註冊到ChannelPipeline中,並按順序組織起來。

5、ChannelHandler

ChannelHandler相似於Servlet的Filter過濾器,負責對I/O事件或者I/O操做進行攔截和處理,它能夠選擇性地攔截和處理本身感興趣的事件,也能夠透傳和終止事件的傳遞。基於ChannelHandler接口,用戶能夠方便地進行業務邏輯定製,例如打印日誌、統一封裝異常信息、性能統計和消息編解碼等。

ChannelHandler支持註解,目前支持的註解有兩種。

  • @Sharable:多個ChannelPipeline共用同一個ChannelHandler;
  • @Skip:被Skip註解的方法不會被調用,直接被忽略。

1. ChannelInboundHandlerAdapter

 ChannelInboundHandlerAdapter是ChannelInboundHandler的一個簡單實現,默認狀況下不會作任何處理,只是簡單的將操做經過fire*方法傳遞到ChannelPipeline中的下一個ChannelHandler中讓鏈中的下一個ChannelHandler去處理。

須要注意的是信息通過channelRead方法處理以後不會自動釋放(由於信息不會被自動釋放因此能將消息傳遞給下一個ChannelHandler處理)。

(1) SimpleChannelInboundHandler

SimpleChannelInboundHandler支持泛型的消息處理,默認狀況下消息處理完將會被自動釋放,沒法提供fire*方法傳遞給ChannelPipeline中的下一個ChannelHandler,若是想要傳遞給下一個ChannelHandler須要調用ReferenceCountUtil#retain方法。

通常用netty來發送和接收數據都會繼承SimpleChannelInboundHandler和ChannelInboundHandlerAdapter這兩個抽象類。

其實用這兩個抽象類是有講究的,在客戶端的業務Handler繼承的是SimpleChannelInboundHandler,而在服務器端繼承的是ChannelInboundHandlerAdapter。

最主要的區別就是SimpleChannelInboundHandler在接收到數據後會自動release掉數據佔用的Bytebuffer資源(自動調用Bytebuffer.release())。而爲什麼服務器端不能用呢,由於咱們想讓服務器把客戶端請求的數據發送回去,而服務器端有可能在channelRead方法返回前尚未寫完數據,所以不能讓它自動release。

 2. ChannelOutboundHandlerAdapter

表示出去的動做,監聽本身的IO操做,好比connect,bind等,在重寫這個Adapter的方法時,記得執行super.xxxx,不然動做沒法執行。

Netty 中的事件分爲inbound 事件和outbound 事件。inbound 事件一般由I/O線程觸發,例如TCP 鏈路創建事件、鏈路關閉事件、讀事件、異常通知事件等。Outbound 事件一般是I/O 用戶主動發起的網絡I/O 操做,例如用戶發起的鏈接操做、綁定操做、消息發送等操做。

咱們經常使用的inbound事件有:

  • channelRegistered(ChannelHandlerContext) //channel註冊事件
  • channelActive(ChannelHandlerContext)//通道激活時觸發,當客戶端connect成功後,服務端就會接收到這個事件,從而能夠把客戶端的Channel記錄下來,供後面複用
  • exceptionCaught(ChannelHandlerContext, Throwable)//出錯時會觸發,作一些錯誤處理
  • userEventTriggered(ChannelHandlerContext, Object)//用戶自定義事件
  • channelRead(ChannelHandlerContext, Object) //當收到對方發來的數據後,就會觸發,參數msg就是發來的信息,能夠是基礎類型,也能夠是序列化的複雜對象。

經常使用的outbound事件有:

  • bind(ChannelHandlerContext ctx, SocketAddress localAddress,ChannelPromise promise) //服務端執行bind時,會進入到這裏,咱們能夠在bind前及bind後作一些操做
  • connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) //客戶端執行connect鏈接服務端時進入
  • write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)//發送事件
  • flush(ChannelHandlerContext ctx) //刷新事件

ChannelPromise是ChannelFuture的擴展,容許設置I/O操做的結果,使ChannelFutureListener能夠執行相關操做。

6、Channel

1) 通道狀態主要包括:打開、關閉、鏈接
2) 通道主要的IO操做,讀(read)、寫(write)、鏈接(connect)、綁定(bind)。
3) 全部的IO操做都是異步的,調用諸如read,write方法後,並不保證IO操做完成,但會返回一個憑證,在IO操做成功,取消或失敗後會記錄在該憑證中。
4) channel有父子關係,SocketChannel是經過ServerSocketChannel接受建立的,故SocketChannel的parent()方法返回的就是ServerSocketChannel。
5) 在Channel使用完畢後,請調用close方法,釋放通道佔用的資源。

    //返回全局惟一的channel id
    ChannelId id();
    //返回該Channel註冊的線程模型,先理解爲Ractor模型的Ractor線程。
    EventLoop eventLoop();
    //返回該Channel由誰建立的,ServerSocketChannel返回null,SocketChannel返回建立它的ServerSocketChannel
    Channel parent();
    //返回通道的配置信息
    ChannelConfig config();
    //通道是否打開
    boolean isOpen();
    //該通道是否已經註冊在事件模型中,此處先參考Nio編程模型,一個經過須要註冊在Register上
    boolean isRegistered();
    //通道是否激活
    boolean isActive();
    //通道是否支持 調用disconnect方法後,調用connect方法
    ChannelMetadata metadata();
    //返回綁定的地址,服務端的Channel返回監聽的地址,而客戶端的Channel返回鏈接到服務端的本地套接字。
    SocketAddress localAddress();
    //返回channel的遠程套接字。
    SocketAddress remoteAddress();
    //通道的關閉憑證(許可),這裏是多線程編程一種典型的設計模式,一個channle返回一個固定的
    ChannelFuture closeFuture();
    //是否可寫,若是通道的寫緩衝區未滿,即返回true,表示寫操做能夠當即操做緩衝區,而後返回。
    boolean isWritable();
    Unsafe unsafe();
    //返回管道
    ChannelPipeline pipeline();
    //返回ByteBuf內存分配器
    ByteBufAllocator alloc();
    //諸如newPromise,newSuccessedFuture()方法,就是返回一個憑證,用來保存通知結果的
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);
    ChannelPromise voidPromise();
    //綁定
    ChannelFuture bind(SocketAddress localAddress);
    //鏈接
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
    //斷開鏈接
    ChannelFuture disconnect();
    //關閉,釋放通道資源
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect(ChannelPromise promise);
    ChannelFuture close(ChannelPromise promise);
    ChannelFuture deregister(ChannelPromise promise);
    Channel read();
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    Channel flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
    interface Unsafe {
        RecvByteBufAllocator.Handle recvBufAllocHandle();
        ChannelHandlerInvoker invoker();
        SocketAddress localAddress();
        SocketAddress remoteAddress();
        void register(EventLoop eventLoop, ChannelPromise promise);
        void bind(SocketAddress localAddress, ChannelPromise promise);
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        void disconnect(ChannelPromise promise);
        void close(ChannelPromise promise);
        void closeForcibly();
        void deregister(ChannelPromise promise);
        void beginRead();
        void write(Object msg, ChannelPromise promise); 
        void flush();
        ChannelPromise voidPromise();
        //返回通道的環形緩存區
        ChannelOutboundBuffer outboundBuffer();
    }
View Code

Channel類圖:

7、Future or ChannelFuture

 Netty的Future接口繼承了JDK的Future接口,同時提供了更多的方法:

public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess();
    Throwable cause();
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> sync() throws InterruptedException;
    Future<V> await() throws InterruptedException;
    V getNow();
}

任務成功完成後isSuccess()返回true
任務執行過程當中有異常,cause()會返回異常對象
任務被取消執行,父接口方法isCancelled返回true
以上3種狀況isDone()均爲true:

//任務完成
 if (task.isDone()) {
    if (task.isSuccess()) {
        // 成功
    } else if (task.isCancelled()) {
        // 被取消
    } else {
        // 異常
        System.out.print(task.cause())
    }
 }

wait和sync都會阻塞,並等待任務完成
getNow()不會阻塞,會當即返回,但任務還沒有執行完成時,會返回null
addListener方法在當前Future對象中添加監聽器,當任務完成時,會通知全部的監聽器。

1. ChannelFuture

ChannelFuture繼承了Netty的Future接口,表明 Netty channel的I/O操做的執行結果。在Netty中全部的I/O操做都是異步的,會當即返回一個表明I/O操做的結果,即ChannelFuture。

在得到執行結果時,推薦使用添加監聽器,監聽執行完成事件operaionCompleted,而不要使用await方法。在ChannelHandler中調用await,會形成死鎖。由於ChannelHandler中的方法一般是I/O線程調用的,再調用await會形成I/O阻塞。

 //錯誤
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ChannelFuture future = ctx.channel().close();
   future.awaitUninterruptibly();
   // Perform post-closure operation
   // ...
 }

 // 正確
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ChannelFuture future = ctx.channel().close();
   future.addListener(new ChannelFutureListener() {
       public void operationComplete(ChannelFuture future) {
           // Perform post-closure operation
           // ...
       }
   });
 }

即便是經過添加ChannelFutureListener的方式獲取執行結果,但要注意的是:回調方法operationComplete也是由I/O線程調用的,因此也不能在其中執行耗時任務。如必須,則啓用線程池執行。

ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerInitializer())
                .bind(8899)
                .sync();

bind方法是異步的,其返回值是ChannelFuture類型。須要調用sync()同步方法,等待綁定動做執行完成。

8、其餘

1. ByteToMessageDecoder

2. MessageToMessageDecoder

3. LengthFieldBasedFrameDecoder

4. MessageToByteEncoder

5. MessageToMessageEncoder

6. LengthFieldPrepender

相關文章
相關標籤/搜索