Netty入門學習總結

Netty 概述

原生 NIO 存在的問題

  1. NIO 的類庫與 API 繁雜,須要熟練掌握 Selector、ServerSocketChannel、SocketChannel、Bytebuffer 等。
  2. 要求熟悉 Java 多線程編程和網絡編程。
  3. 開發工做量和難度大,例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常流的處理等。

什麼是 Netty

  • Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠的網絡 I/O 程序。
  • Netty 能夠快速、簡單的開發一個網絡應用,至關於簡化和流程化了 NIO 的開發過程。
  • Netty 是目前最流行的 NIO 框架,在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,Elasticsearch、Dubbo 框架內部都採用了 Netty。

Netty 做爲業界最流行的 nio 框架之一,它的健壯性、功能、性能、可定製性、可擴展性都是數一數二的。html

優勢java

  1. API 使用簡單,開發門檻低。
  2. 功能強大,預置了多種編解碼功能,支持多種主流協議。
  3. 定製能力強,經過 channelHandler 對通訊框架進行靈活擴展。
  4. 高性能。
  5. 成熟,穩定,修復了全部的 NIO BUG.
  6. 社區活躍。
  7. 經歷了大規模的商業應用考驗,質量獲得驗證。

線程模型介紹

目前存在的線程模型有:react

  • 傳統阻塞 I/O 服務模型。
  • Reactor 模式。
  • 根據 Reactor 的數量和處理資源線程池的數量不一樣,有三種不一樣實現:web

    • 單 Reactor 單線程。
    • 單 Reactor 多線程。
    • 主從 Reactor 多線程。
  • Netty 線程模式主要基於主從 Reactor 多線程模型作了必定的改進,其中主從 Reactor 多線程模型有多個 Reactor。

傳統阻塞 I/O 服務模型

傳統阻塞I/O服務模型

模型特色編程

  1. 採用阻塞 I/O 獲取輸入的數據。
  2. 每一個鏈接都須要獨立的線程完成數據的輸入、業務處理、數據返回。

    問題分析bootstrap

  3. 當併發數很大時,會建立大量的線程,佔用很大的系統資源。
  4. 鏈接建立後,若是當前線程暫時沒有數據可讀,該線程會阻塞在 Read 操做上,形成線程資源浪費。

    解決方案瀏覽器

  5. 基於I/O複用模型:多個鏈接共用一個阻塞對象,應用程序只須要在一個阻塞對象等待,無需阻塞全部鏈接,當某個鏈接有新的數據能夠處理時,操做系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。
  6. 基於線程池複用線程資源:沒必要爲每個鏈接建立線程,將鏈接完成後的業務處理任務分配給線程進行處理,一個線程能夠處理多個鏈接的業務。

Reactor 模式

Reactor模式

  1. Reactor 模式,經過一個或多個輸入同時傳遞給服務器處理的模式(基於事件驅動)。
  2. 服務器端程序處理傳入的多個請求,並將它們同步分派到相應的處理線程。
  3. Reactor 模式使用了 I/O 複用監聽事件,受到事件後分發給某個線程(進程),網絡服務高併發處理的關鍵。

核心組成緩存

  1. Reactor:在一個單獨的線程中運行,負責監聽和分發事件,分發給適當的處理程序對 I/O 事件做出反應。
  2. Handlers:處理程序執行 I/O 事件要完成的實際事件。Reactor 經過調用適當的處理程序來響應 I/O 事件,處理程序非阻塞操做。

單 Reactor 單線程

單Reactor單線程

  • select 是 I/O 複用模型介紹的標準網絡編程 API,能夠實現應用程序經過一個阻塞對象監聽多路鏈接請求。
  • Reactor 對象經過 Select 監控客戶端請求事件,收到事件後經過 Dispatch 進行分發。
  • 若是創建鏈接請求事件,則由 Acceptor 經過 Accept 處理鏈接請求,而後建立一個 Handler 對象處理鏈接完成後的後續業務處理。
  • 若是不是創建鏈接事件,則 Reactor 會分發給調用鏈接對應的 Handler 來響應。
  • Handler 會完成 Read—>業務處理—>Send 的完整業務流程。

優缺點服務器

  • 優勢:模型簡單,無多線程、進程通訊、競爭的問題,所有由一個線程完成。
  • 缺點:性能問題,只有一個線程沒法發揮出多核 CPU 的性能,Handler 在處理某鏈接業務時,整個進程沒法處理其餘鏈接事件,容易致使性能瓶頸。
  • 缺點:可靠性問題,線程意外停止,或者進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部信息,節點故障。
  • 使用場景:客戶端數量有限,業務處理快捷(例如 Redis 在業務處理的時間複雜度爲 O(1)的狀況)。

單 Reactor 多線程

單Reactor多線程

  • Reactor 經過 select 監控客戶端請求事件,收到事件後,經過 dispatch 進行分發。
  • 若是是創建鏈接的請求,則由 Acceptor 經過 accept 處理鏈接請求,同時建立一個 handler 處理完成鏈接後的後續請求。
  • 若是不是鏈接請求,則由 Reactor 分發調用鏈接對應的 handler 來處理。
  • Handler 只負責響應事件,不作具體的業務處理,經過 read 讀取數據後,會分發給後面的 worker 線程池中的某個線程處理業務。
  • Worker 線程池會分配獨立的線程處理真正的業務,並將結果返回給 Handler。
  • Handler 收到響應後,經過 send 方法將結果反饋給 Client。

優缺點網絡

  • 優勢:能夠充分的利用多核 CPU 的處理能力。
  • 缺點:多線程數據共享、訪問操做比較複雜,Reactor 處理全部的事件的監聽和響應,由於 Reactor 在單線程中運行,在高併發場景容易出現性能瓶頸。

主從 Reactor 多線程

主從Reactor多線程

  • Reactor 主線程 MainReactor 對象經過 select 監聽鏈接事件,收到事件後,經過 Acceptor 處理鏈接事件。
  • 當 Acceptor 處理鏈接事件後,MainReactor 將建立好的鏈接分配給 SubReactor。
  • SubReactor 將鏈接加入到鏈接隊列進行監聽,並建立 Handler 進行各類事件處理。
  • 當有新事件發生時,SubReactor 調用對應的 Handler 進行處理。
  • Handler 經過 read 讀取數據,分發給後面的 Worker 線程池處理。
  • Worker 線程池會分配獨立的 Worker 線程進行業務處理,並將結果返回。
  • Handler 收到響應結果後,經過 send 方法將結果返回給 Client。

優缺點:

  • 優勢:父線程和子線程的職責明確,父線程只須要接收新鏈接,子線程完成後續業務處理。
  • 優勢:父線程與子線程的數據交互簡單,Reactor 主線程是須要把新鏈接傳給子線程,子線程無需返回數據。
  • 缺點:編程複雜度較高。

Reactor 模式小結

單 Reactor 單線程:前臺接待員和服務員是同一我的,全程爲顧客服務。

單 Reactor 多線程:一個前臺接待員,多個服務員。

主從 Reactor 多線程:多個前臺接待員,多個服務員。

  1. 響應快,雖然 Reactor 自己是同步的,但沒必要爲單個同步事件所阻塞。
  2. 最大程度的避免了複雜的多線程及同步問題,避免了多線程/進程的切換開銷。
  3. 擴展性好,能夠方便的經過增長 Reactor 勢力個數充分利用 CPU 資源。
  4. 複用性好,Reactor 模型自己與具體事件處理邏輯無關,具備很高的複用性。

Netty 模型

Netty工做架構圖

服務端端包含 1 個 Boss NioEventLoopGroup 和 1 個 Worker NioEventLoopGroup。

NioEventLoopGroup 至關於 1 個事件循環組,這個組裏包含多個事件循環 NioEventLoop,每一個 NioEventLoop 包含 1 個 Selector 和 1 個事件循環線程。

每一個 Boss NioEventLoop 循環執行的任務包含 3 步:

  1. 輪訓 Accept 事件。
  2. 處理 Accept I/O 事件,與 Client 創建鏈接,生成 NioSocketChannel,並將 NioSocketChannel 註冊到某個 Worker NioEventLoop 的 Selector 上。
  3. 處理任務隊列中的任務,runAllTasks。任務隊列中的任務包括用戶調用 eventloop.execute 或 schedule 執行的任務,或者其它線程提交到該 eventloop 的任務。

每一個 Worker NioEventLoop 循環執行的任務包含 3 步:

  1. 輪詢 read、write 事件。
  2. 處理 I/O 事件,即 read、write 事件,在 NioSocketChannel 可讀、可寫事件發生時進行處理。
  3. 處理任務隊列中的任務,runAllTasks。
  4. 每一個 Worker NioEventLoop 處理業務時,會使用 PipeLine(管道),pipeline 中包含了 channel,即經過 pipeline 能夠獲取對應通道,通道中維護了不少處理器。

Netty 簡單通信代碼案例

/**
 * @author jack
 */
public class SimpleServer {

    public static void main(String[] args) {
        //建立bossGroup , 只負責鏈接請求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //建立workerGroup , 負責客戶端業務處理
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        //建立服務端啓動對象,配置參數.
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workerGroup)//設置線程組
                    .channel(NioServerSocketChannel.class)//使用NioSocketChannel做爲服務端的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列獲得鏈接個數
                    .childOption(ChannelOption.SO_KEEPALIVE, true)//設置保持活動鏈接狀態
                    .childHandler(new ChannelInitializer<SocketChannel>() {//建立一個通道測試對象
                        //給pipeline設置處理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler()); //自定義handler
                        }
                    });//workerGroup的EventLoop對應的管道設置處理器

            System.out.println("服務端準備就緒...");

            //綁定一個端口而且同步,生成了一個channelFuture對象
            ChannelFuture cf = serverBootstrap.bind(6667).sync();
            //對關閉通道進行監聽
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}
/**
 * 服務端自定義handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 讀取實際數據(這裏咱們能夠讀取客戶端發送的消息)
     *
     * @param ctx 上下文對象,含有管道pipeline,通道channel ,地址
     * @param msg 客戶端發送的內容
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客戶端發送: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址爲:" + ctx.channel().remoteAddress());
    }

    /**
     * 讀取完成後
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客戶端", CharsetUtil.UTF_8));
    }


    /**
     * 處理異常,通常是關閉通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
/**
 * @author jack
 */
public class SimpleClient {

    public static void main(String[] args) {
        //客戶端須要一個事件循環組
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();
        //建立客戶端啓動對象
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(clientLoopGroup)//設置線程組
                    .channel(NioSocketChannel.class)//設置客戶端通道實現類
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定義處理器
                        }
                    });

            System.out.println("客戶端已準備就緒");
            //鏈接服務器
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 6667).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }

}
/**
 * 客戶端自定義handler
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 通道準備就緒時調用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服務端!", CharsetUtil.UTF_8));
    }

    /**
     * 獲取客戶端回覆
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服務端回覆: " + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 處理異常,通常是關閉通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

運行結果

服務端

客戶端

任務隊列中的 task 有 3 種使用場景

  1. 用戶自定義的普通任務

    ctx.channel().eventLoop().execute(() -> System.out.println("任務邏輯"));
  2. 用戶自定義的定時任務

    ctx.channel().eventLoop().schedule(() -> System.out.println("任務邏輯..."), 60, TimeUnit.SECONDS);
  3. 非當前 reactor 線程調用 channel 的各類方法

    例如在推送系統的業務線程裏面,根據用戶的標識,找到對應的 channel 引用,而後調用 write 類方法向該用戶推送消息,就會進入到這種場景。最終的 write 會提交到任務隊列中後被異步消費。

Netty 模型小結

  1. Netty 抽象出兩組線程池:BossGroup 專門負責接收客戶端的鏈接;WorkerGroup 專門負責網絡的讀寫。
  2. NioEventLoop 表示一個不斷循環的執行任務的線程,每一個 NioEventLoop 都有一個 selector,用於監聽綁定在其上的 socket 的網絡通道。
  3. NioEventLoop 內部採用串行化設計,從消息讀取->處理->編碼->發送始終由 I/O 線程 NioEventLoop 負責。
  4. NioEventLoopGroup 下包含多個 NioEventLoop。
  5. 每一個 NioEventLoop 中包含一個 Selector,一個 taskQueue。
  6. 每一個 NioEventLoop 的 Selector 能夠註冊監聽多個 NioChannel。
  7. 每一個 NioChannel 只會綁定惟一的 NioEventLoop。
  8. 每一個 NioChannel 都會綁定一個本身的 ChannelPipeLine。

Netty 核心組件

BootStrap、ServerBootStrap

一個 Netty 應用一般由一個 BootStrap 開始,主要做用是配置整個 Netty 程序,串聯各個組件,Netty 中的 BootStrap 類是客戶端程序的啓動引導類,ServerBootStrap 是服務端啓動引導類。

經常使用方法:

方法 含義
public ServerBootstrap group(EventLoopGroup parentGroup , EventLoopGroup childGroup) 做用於服務器端,用來設置兩個 EventLoop
public B group(EventLoopGroup group) 做用於客戶端,用來設置一個 EventLoopGroup
public B channel(Class<? extends C> channelClass) 用來設置一個服務端的通道實現
public <T> B option(ChannelOption<T> option, T value) 用來給 ServerChannel 添加配置
public <T> ServerBootStrap childOption (ChannelOption<T> childOption, T value) 用來給接收到的通道添加配置
public ServerBootstrap childHandler (ChannelHandler childHandler) 用來設置業務處理類(自定義 handler)
public B handler(ChannelHandler handler) Handler 則在服務器端自己 bossGroup 中使用
public ChannelFuture bind(int inetPort) 用於服務端,設置佔用的端口號
public ChannelFuture connect (String inetHost,int inetPort) 該方法用於客戶端,用來鏈接服務器

Future、ChannelFuture

Netty 中全部操做都是異步的,不能當即得知消息是否被正確處理,但能夠過一會等它執行完成或直接註冊一個監聽器,具體實現經過 Future 和 ChannelFuture,它們能夠註冊一個監聽,當操做執行成功或失敗時,監聽會自動觸發註冊的監聽事件。

經常使用方法:

方法 含義
Channel channel() 返回當前正在進行 I/O 操做的通道
ChannelFuture sync() 等待異步操做執行完畢

Channel

  1. Channel 是 Netty 網絡通訊組件,可以用於執行網絡 I/O 操做。
  2. 經過 Channel 可得到當前網絡鏈接的通道狀態、配置參數(好比緩衝區大小)。
  3. Channel 提供異步的網絡 I/O 操做(創建鏈接,讀寫,綁定端口),異步調用意味着任何 I/O 調用都將當即返回,但不保證在調用結束時所請求的 I/O 操做已完成。
  4. 調用當即返回一個 ChannelFuture 實例,經過註冊監聽器,能夠在 I/O 操做成功、失敗或取消時回調通知調用方。
  5. 支持關聯 I/O 操做與對應的處理程序。
  6. 不一樣協議、不一樣的阻塞類型的鏈接是不一樣的,Channel 類型與之對應。

經常使用的 Channel 類型有:

方法 含義
NioSocketChannel 異步的客戶端 TCP Socket 鏈接
NioServerSocketChannel 異步的服務端 TCP Socket 鏈接
NioDatagramChannel 異步的 UDP 鏈接
NioStcpChannel 異步的客戶端 Sctp 鏈接
NioSctpServerChannel 異步的服務端 Sctp 鏈接

Selector

  • Netty 基於 Selector 對象實現 I/O 多路複用,經過 Selector 一個線程能夠監聽多個鏈接的 Channel 事件。
  • 當向一個 Selector 中註冊 Channel 後,Selector 內部的機制就能夠自動不斷地查詢(select)這些 Channel 中是否有就緒的 I/O 事件(可讀、可寫、完成網絡鏈接等),這樣程序就能夠簡單地使用一個線程高效地管理多個 Channel。

ChannelHandler

  • ChannelHandler 是一個接口,處理 I/O 事件或攔截 I/O 操做,並將其轉發到其 ChannelPipeline(業務處理鏈)中的下一個處理程序。
  • ChannelHandler 自己並無提供不少方法,由於這個接口有許多的方法須要實現,方便使用期間,能夠繼承他的子類。

相關子接口和實現類

  • ChannelInboundHandler : 用於處理 Channel 入站 I/O 事件。
  • ChannelOutBoundHandler:用於處理 Channel 出站 I/O 操做。

適配器:

  • ChannelInboundHandlerAdapter:用於處理出站 I/O 操做。
  • ChanneInboundHandlerAdapter:用於處理入站 I/O 操做。
  • ChannelDuplexHandler:用於處理入站和出站事件。
以客戶端應用程序爲例:若是事件運動方向是客戶端服務器,咱們稱之爲「出站」,即客戶端發送的數據會經過 pipeline 中的一系列 ChannelOutboundHandler,並被這些 Handler 處理,反之稱爲「入站」。

Pipeline、ChannelPipeline

ChannelPipeline 是一個重點:

  1. ChannelPipeline 是一個 Handler 的集合,它負責處理和攔截 Inbound 或者 outbound 的事件和操做。
  2. ChannelPipeline 實現了一種高級形式的攔截過濾器模式,使用戶能夠徹底控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互。
  3. 在 Netty 中每一個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關係以下:

ChannelPipeline

  1. 一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向鏈表,而且每一個 ChannelHandlerContext 中又關聯了一個 ChannelHandler。
  2. 入站事件和出站事件在一個雙向鏈表中,入站事件會從鏈表 head 日後傳遞到最後一個入站的 Handeler,出站事件會從鏈表 tail 往前傳遞到最前一個出站的 Handler,兩種類型的 Handler 互不干擾。

經常使用方法:

方法 含義
ChannelPipeline addFirst(ChannelHandler... handlers) 把一個業務處理類,放到鏈表中頭結點的位置
ChannelPipeline addLast(ChannelHandler... handlers) 把一個業務處理類,放到鏈表中尾結點的位置

ChannelHandlerContext

  • 保存 Channel 相關的全部上下文信息,同時關聯一個 ChannelHandler 對象。
  • 即 ChannelHandlerContext 中包含一個具體的事件處理器 ChannelHandler,同時 ChannelHandlerContext 中也綁定了對應的 pipeline 和 Channel 的信息,方便對 ChannelHandler 進行調用。

經常使用方法:

方法 含義
ChannelFuture close() 關閉通道
ChannelOutboundInvoker flush() 刷新
ChannelFuture writeAndFlush(Object msg) 將數據寫入到 ChannelPipeline 中當前 ChannelHandler 的下一個 ChannelHandler 開始處理(出站)

ChannelOption

  1. Netty 在建立 Channel 實例後,通常須要經過 ChannelOption 參數來配置 channel 的相關屬性。
  2. ChannelOption 參數以下:

    • ChannelOption.SO_BACKLOG:對應 TCP/IP 協議 listen 函數中的 backlog 參數,用來初始化服務器可鏈接隊列大小。服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog 參數指定了隊列的大小。
    • ChannelOption.SO_KEEPALIVE:一直保持鏈接活動狀態。

EventLoopGroup、以及實現類 NioEventLoopGroup

  1. EventLoopGroup 本質上是一個接口(interface),繼承了 EventExecutorGroup,經過繼承關係分析,能夠發現 EventLoopGroup 的實現子類是 MultithreadEventLoopGroup 下的 NioEventLoopGroup。
  2. EventLoopGroup 是一組 EventLoop 的抽象,Netty 爲了更好的利用多核 CPU 資源,通常會有多個 EventLoop 同時工做,每一個 EventLoop 維護了一個 selector 實例。
  3. EventLoopGroup 提供next接口,能夠從組裏按照必定規則獲取其中一個 EventLoop 來處理任務。在 Netty 服務器端編程中,咱們通常都須要提供兩個 EventLoopGroup,例如 BossEventLoopGroup 和 WorkerEventLoopGroup。
  4. 一般一個服務端口(ServerSocketChannel)對應一個 Selector 和一個 EventLoop 線程。BossEventLoopGroup 負責接收客戶端鏈接並將 SocketChannel 交給 WorkerEventLoopGroup 進行 I/O 處理。

  1. BossEventLoopGroup 一般是一個單線程的 EventLoop,EventLoop 維護了一個註冊了 ServerSocketChannel 的 Selector 實例。BossEventLoopGroup 不斷輪詢 Selector 將鏈接事件分離出來。
  2. 一般是 OP_ACCEPT 事件,而後將接收的 SocketChannel 交給 WorkerEventLoopGroup。
  3. WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop 將這個 SocketChannel 註冊到其維護的 Selector 並對其後續的 I/O 事件進行處理。

Unpooled

  1. Unpolled 類是 Netty 提供的專門用於操做緩衝區(即 Netty 的數據容器)的工具類。
  2. 經過給定的數據和字符編碼返回一個 ByteBuf 對象:經常使用方法:public static ByteBuf copierBuffer(CharSequence string, Charset charset)。
ByteBuf buffer = Unpooled.buffer(10);
ByteBuf buf =Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8);

在 Netty 的 buffer 中,讀取 buffer 中的數據不須要經過 flip()方法進行狀態切換,其底層維護了 readerIndex 和 writerIndex

  • 0 ——> readerIndex :已讀區域。
  • readerIndex ——> writerIndex:未讀但可讀區域。
  • writerIndex ——> capacity:可寫區域。
  1. 每調用一次 byteBuf.readByte()讀取數據,byteBuf 的 readerIndex 便減小 1;調用 byteBuf.getByte()則不會引發 readerIndex 的變化。
  2. public abstract CharSequence getCharSequence(int index, int length, Charset charset) :的做用是按照某一個範圍進行數據的讀取,index 表示起始位置,length 表示讀取長度,charset 表示字符編碼格式。

Netty 實現羣聊系統

  1. 服務器端:檢測用戶上線、離線、轉發客戶端消息。
  2. 客戶端:經過 channel 能夠無阻塞發送消息給其餘客戶,同時能夠接收其餘客戶端發送的消息(服務器轉發獲得)。

Server 端

public class Server {

    private static final int port = 6667;

    public static void main(String[] args) {
        run();
    }

    /**
     * 處理客戶端請求
     */
    public static void run() {
        //建立兩個線程組
        NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossLoopGroup, workerLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    //增長解碼器
                                    .addLast("decoder", new StringDecoder())
                                    //增長編碼器
                                    .addLast("encoder", new StringEncoder())
                                    //加入自定義業務處理器
                                    .addLast(new ServerHandler());

                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

}

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 定義一個channel 組,管理全部的channel , GlobalEventExecutor.INSTANCE是全局事件執行器,單例模式
     */
    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    /**
     * 鏈接創建調用,將當前channel加入channelGroup
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //提示其餘客戶端當前客戶端已上線
        channels.writeAndFlush("[客戶端]" + channel.remoteAddress() + "加入聊天!\n");

        channels.add(channel);
    }

    /**
     * 表示channel處於活動狀態,提示上線
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + ":已上線!");
    }

    /**
     * 非活動狀態提示 離線
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + ":已離線!");
    }

    /**
     * 斷開鏈接
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //提示其餘客戶端當前客戶端已斷開鏈接
        channels.writeAndFlush("[客戶端]" + channel.remoteAddress() + "斷開鏈接!\n");
    }

    /**
     * 讀取客戶端消息並轉發
     * @param channelHandlerContext
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel channel = channelHandlerContext.channel();
        channels.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("[客戶]: " + channel.remoteAddress() + sdf.format(new Date()) +" 說:" + msg + "\n");
            } else {
                ch.writeAndFlush(sdf.format(new Date())+" 你說:" + msg + "\n");
            }
        });
    }

    /**
     * 異常關閉
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

Client 端

public class Client {

    private static final String HOST = "127.0.0.1";
    private static final int PORT = 6667;


    public static void main(String[] args) {
        run();
    }

    public static void run() {
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(clientLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    //增長解碼器
                                    .addLast("decoder", new StringDecoder())
                                    //增長編碼器
                                    .addLast("encoder", new StringEncoder())
                                    .addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
            Channel channel = future.channel();
            System.out.println("客戶端:" + channel.localAddress() + " 準備就緒");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.nextLine();
                //經過channel發送到服務器端
                channel.writeAndFlush(msg + "\r\n");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }
}

ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

運行結果

Netty 心跳監測機制案例

客戶端同用上面的便可。記得端口對應

Server 端

public class Server {

    public static void main(String[] args) {
        //建立兩個線程組
        NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossLoopGroup, workerLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))//在bossLoopGroup 增長日誌處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 加入 IdleStateHandler
                            // 第一個參數 多長時間沒讀 就發送心跳監測包看是否鏈接
                            // 第二個參數 多長時間沒寫 就發送心跳監測包看是否鏈接
                            // 第三個參數 多長時間沒有讀寫 就發送心跳監測包看是否鏈接
                            // 第四個參數 時間單位
                            //當 觸發後 會傳遞給管道中的下一個handler來處理,調用下一個handler的userEventTriggered
                            pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                            //加入空閒檢測處理的handler
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }
    }

}

ServerHandler

public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            //將evt轉型
            IdleStateEvent event = (IdleStateEvent) evt;
            SocketAddress socketAddress = ctx.channel().remoteAddress();
            switch (event.state()){
                case READER_IDLE:
                    System.out.println(socketAddress + "發生讀空閒");
                    break;
                case WRITER_IDLE:
                    System.out.println(socketAddress + "發生寫空閒");
                    break;
                case ALL_IDLE:
                    System.out.println(socketAddress + "發生讀寫空閒");
                    break;
            }

        }
    }
}

運行結果

WebSocket

Http 短鏈接和長鏈接

  • Http 短鏈接即 TCP 短鏈接,即客戶端和服務器經過「三次握手」創建鏈接後,進行一次 HTTP 操做之後,便斷開鏈接。所以,瀏覽器每打開一個 web 資源,便建立了一個新的 http 會話。
  • Http 長鏈接即 TCP 長鏈接,即客戶端和服務器創建鏈接後保持必定的時間,即便用戶在進行某次操做後將瀏覽器(或客戶端)關閉,但只要在保持時間內又一次訪問該服務器,則默認使用已經建立好的鏈接。
  • Http1.0 默認支持短鏈接,Http1.1 默認支持長鏈接。

Http 鏈接無狀態

  • Http 協議無狀態是指協議對於事務處理沒有記憶性,即某一次打開一個服務器的網頁和上一次打開這個服務器的網頁之間沒有關係。

WebSocket 簡介

  • WebSocket 是一種能夠在單個 TCP 鏈接上實現全雙工通訊的通訊協議,HTTP 協議只能實現客戶端請求,服務端響應的單向通訊,而 webSocket 則能夠實現服務端主動向客戶端推送消息。
  • WebSocket 複用了 HTTP 的握手通道,客戶端和服務器的數據交換則遵守升級後的協議進行:WebSocket 相關的業務處理器能夠將 HTTP 協議升級爲 ws 協議,其核心功能之一爲保持穩定的長鏈接。

代碼案例

  • 實現基於 webSocket 的長鏈接全雙工交互。
  • 改變 HTTP 協議屢次請求的約束,實現長鏈接,服務器能夠發送消息給瀏覽器。
  • 客戶端和服務器會相互感知。若服務器關閉,客戶端會感知;一樣客戶端關閉,服務器也會感知。

Server 端

public class Server {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();

                            //由於基於http協議,故使用http的編解碼器
                            pipeline.addLast(new HttpServerCodec());
                            //過程當中以塊的方式寫,添加 ChunkedWriteHandler 處理器
                            pipeline.addLast(new ChunkedWriteHandler());
                            /**
                             * 說明
                             * 一、http數據在傳輸過程當中是分段的,HttpObjectAggregator 能夠將多個數據段整合起來
                             * 二、所以,當瀏覽器發送大量數據時,就會發出屢次http請求
                             * */
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            /**
                             * 說明
                             * 一、對於 WebSocket,它的數據以 幀(Frame)的形式傳遞
                             * 二、能夠看到 WebSocketFrame 下面有6個子類
                             * 三、瀏覽器請求時 ws://localhost:7000/xxx 表示請求的uri
                             * 四、WebSocketServerProtocolHandler 會把 http 協議升級爲ws協議
                             *      即保持長鏈接----------核心功能
                             * 五、如何升級——經過狀態瑪切換101
                             */
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                            //自定義的 handler 處理業務邏輯
                            pipeline.addLast(new TextWebSocketFrameHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

Handler

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println("服務器收到消息:" + textWebSocketFrame.text());
        //回覆消息
        channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("服務器時間:" + LocalDateTime.now() + " " + textWebSocketFrame.text()));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded 被調用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 被調用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常發生"+cause.getMessage());
        ctx.close();
    }
}

HTML

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <title>Title</title>
  </head>
  <body>
    <script>
      var socket;
      //判斷當前瀏覽器是否支持webSocket編程
      if (window.WebSocket) {
        //go on
        socket = new WebSocket("ws://localhost:7000/hello");
        //至關於channelRead0,收到服務器端回送的消息
        socket.onmessage = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + ev.data;
        };
        //至關於鏈接開啓
        socket.onopen = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = "鏈接開啓";
        };

        socket.onclose = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + "鏈接關閉";
        };
      } else {
        alert("當前瀏覽器不支持webSocket");
      }
      //發送消息到服務器
      function send(message) {
        if (!window.socket) {
          //先判斷socket是否建立好了
          return;
        }
        if (socket.readyState == WebSocket.OPEN) {
          //經過socket發送消息
          socket.send(message);
        } else {
          alert("鏈接沒有開啓");
        }
      }
    </script>
    <form onsubmit="return false">
      <textarea name="message" style="height: 300px; width: 300px"></textarea>
      <input
        type="button"
        value="發送消息"
        onclick="send(this.form.message.value)"
      />
      <textarea
        id="responseText"
        style="height: 300px; width: 300px"
      ></textarea>
      <input
        type="button"
        value="清空內容"
        onclick="document.getElementById('responseText').value=''"
      />
    </form>
  </body>
</html>

運行結果

html

服務端

編碼和解碼

  1. 數據在網絡中是以二進制字節碼的形式流動,而咱們在接收或發送的數據形式則各類各樣(文本、圖片、音視頻等),所以須要在發送端對數據進行編碼,在接收端對收到的數據解碼;
  2. codec(編解碼器)的組成部分——Encoder(編碼器)負責將業務數據轉換爲二進制字節碼;Decoder(解碼器)負責將二進制字節碼轉換爲業務數據。
  3. Netty 編碼機制——StringEncoder / StringDecoder負責字符串數據對象的編解碼;ObjectEncoder / ObjectDecoder負責 java 對象的編解碼。
  4. Netty 自帶的 ObjectEncoder 和 ObjectDecoder 能夠用於實現 POJO 對象或其餘業務對象的編解碼,其底層使用的還是 java 的序列化技術,存在如下問題:

    • 沒法實現客戶端與服務器端的跨語言。
    • 序列化體積過大,是二進制字節碼的 5 倍多。
    • 序列化性能相對較低。

ProtoBuf 概述

  • ProtoBuf 是 Google 發佈的開源項目,全稱 Google Protocol Buffers,ProtoBuf 是一種平臺無關、語言無關的、可擴展且輕便高效的序列化數據結構的協議,適合用於數據存儲和 RPC(遠程過程調用)數據交換格式。
  • ProtoBuf 是以Message的方式來管理數據的。
  • 所謂「平臺無關、語言無關」,即客戶端和服務器可使用不一樣的編程語言進行開發。
  • ProtoBuf 具備更高的性能和可靠性。
  • 使用 ProtoBuf 編譯器能夠自動生成代碼,ProtoBuf 是把類的定義使用.proto文件描述出來,在經過 proto.exe 將.proto 文件編譯爲.java 文件。

protoBuf

ProtoBuf 使用

第一步:idea 加入插件 protoc

第二步:加入 maven 依賴

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>3.6.1</version>
</dependency>

第三步:編寫 proto 文件

syntax = "proto2";  //版本
option java_outer_classname = "StudentPOJO"; //生成的外部類名稱,同時文件名
//protobuf以message的形式管理數據
message Student{ //會在 studentPOJO 外部類生成一個內部類 Student,它是真正發送的POJO對象
  required int32 id = 1; //表示 Student 類中有一個屬性 名字爲id,類型爲 int32(protoType),1表示屬性的序號
  required string name = 2;
}

根據網上教程安裝 protobuf。生成 StudnetPOJO 文件,這裏就不展現代碼了,比較長。

Server 端

public class Server {

    public static void main(String[] args) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入ProtoBufferDecoder
                            //指定對哪種對象進行解碼
                            pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            ChannelFuture cf = serverBootstrap.bind(6668).sync();
            //給 cf 添加監聽器,監聽感興趣的事件
            cf.addListener((ChannelFutureListener) future -> {
                if (cf.isSuccess()) {
                    System.out.println("綁定端口 6668 成功");
                } else {
                    System.out.println(cf.cause());
                }
            });
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
      System.out.println("客戶端發送: id = " + msg.getId() + " 名字 = " + msg.getName());
    }

}

Client 端

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        //發送一個 student 對象到服務器
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1000).setName("Jack").build();
        ctx.writeAndFlush(student);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服務器回送消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服務器端地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler

public class Client {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //在pipeline中加入ProtoBufferEncoder
                            ChannelPipeline pipeline = ch.pipeline();
                            //編碼
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            pipeline.addLast(new ClientHandler());

                        }
                    });
            System.out.println("客戶端已準備就緒");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }

}

運行結果

服務端

客戶端

handler 調用機制

  • ChannelHandler 充當了處理入站和出站數據的應用程序邏輯的容器。例如:實現 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),能夠接收入站事件和數據,這些數據將被業務邏輯處理;當給客戶端回送響應時,也能夠經過 ChannelInboundHandler 沖刷數據。業務邏輯一般寫在一個或多個 ChannelInboundHandler 中。
  • ChannelOutboundHandler 與之相似,只不過是用來處理出站數據的。
  • ChannelPipeline 提供了 ChannelHandler 鏈的容器(pipeline.addLast()能夠將一系列的 handler 以鏈表的形式添加),以客戶端應用程序爲例,若是事件運動方向爲客戶端->服務器,稱之爲「出站」,即客戶端發送給服務器的數據經過 pipeline 中的一系列 ChannelOutboundHandler,並被這些 handler 處理。反之則稱爲「入站」。

編碼解碼器

  1. 當 Netty 發送或者接受一個消息的時候,就將會發生一次數據轉換。入站消息會被解碼:從字節碼轉換到另外一種格式(好比 Java)。若是是出站消息,它會被編碼成字節。
  2. Netty 提供一系列使用的編解碼器,它們都實現了 CHannelInboundHandler 或者 ChannelOutboundHandler 接口。在這些類中,channelRead 方法已經被重寫。

    以入站爲例,對於每一個從入站 Channel 讀取的消息,這個方法會被調用。隨後,他將調用由解碼器所提供的 decode()方法進行解碼,並將已經解碼的字節轉發給 ChannelPipeline 中的下一個 ChannelInboundHandler。

消息入站後,會通過 ChannelPipeline 中的一系列 ChannelHandler 處理,這些 handler 中有 Netty 已經實現的,也有咱們從新實現的自定義 handler,但它們都須要實現 ChannelInboundHandler 接口;即消息入站後所通過的 handler 鏈是由一系列 ChannelInboundHandler 組成的,其中第一個通過的 handler 就是解碼器 Decoder;消息出站與入站相似,但消息出站須要通過一系列 ChannelOutboundHandler 的實現類,最後一個通過的 handler 是編碼器 Encoder。

解碼器 — ByteToMessageDecoder

關係繼承圖

因爲不知道遠程節點是否會發送一個完整的信息,TCP 可能出現粘包和拆包的問題。ByteToMessageDecoder 的做用就是對入站的數據進行緩衝,直至數據準備好被處理。

ByteToMessageDecoder 示例分析:

public class ToIntgerDecoder extends ByteToMessageDecoder{
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{
                if (in.readableBytes() >= 4) {
                         out.add(in.readint());
                }
        }
}

在此實例中,假設經過 Socket 發送了 8 字節數據,每次入站從 ByteBuf 中讀取個 4 字節,將其解碼爲一個 int,並加入一個 List 中。當沒有更多的元素能夠被添加到該 List 中時,表明這次發送的數據已發送完成,List 中的全部內容會被髮送給下一個 ChannelInboundHandler。Int 在被添加到 List 中時,會被自動裝箱爲 Intger,調用 readInt()方法前必須驗證所輸入的 ByteBuf 是否有足夠的數據。

代碼示例:

  • 使用自定義的編碼解碼器
  • 客戶端能夠發送一個 Long 類型的數據給服務器。

Server 端

public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        try {
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer()); //自定義初始化類
            ChannelFuture future = serverBootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

ServerInitializer 自定義初始化類

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //入站的handler解碼
        pipeline.addLast(new ByteToLongDecoder()).addLast(new ServerInboundHandler());
    }
}

ByteToLongDecoder 自定義解碼器

public class ByteToLongDecoder extends ByteToMessageDecoder {

    /**
     * @param channelHandlerContext 上下文對象
     * @param byteBuf               入站的ByteBuf
     * @param list                  List集合,將解碼後的數據傳給下一個Handler
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // Long 大於 8個字節
        if (byteBuf.readableBytes() >= 8) {
            list.add(byteBuf.readLong());
        }
    }
}

ServerInboundHandler 自定義 handler,處理業務

public class ServerInboundHandler extends SimpleChannelInboundHandler<Long> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception {

        System.out.println("從客戶端讀取:" + aLong);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端

public class Client {

    public static void main(String[] args) {
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(clientLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());//自定義初始化類
            ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }
}

ClientInitializer 客戶端自定義初始化類

public class ClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //出站,數據進行編碼
        pipeline.addLast(new LongToByteEncoder()).addLast(new ClientHandler());
    }
}

LongToByteEncoder 編碼器

public class LongToByteEncoder extends MessageToByteEncoder<Long> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Long aLong, ByteBuf byteBuf) throws Exception {
        System.out.println("開始編碼,msg = " + aLong);
        byteBuf.writeLong(aLong);
    }
}

ClientHandler 自定義 handler,處理邏輯

public class ClientHandler extends SimpleChannelInboundHandler<Long> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("服務器的ip : " + ctx.channel().remoteAddress());
        System.out.println("收到服務器數據 : " + msg);
    }

    /**
     * 發送數據
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client發送數據");
        ctx.writeAndFlush(12345678L);
    }

}

運行結果

客戶端

服務端

其餘解碼器

  1. LineBasedFrameDecoder:它使用行尾控制字符(\n 或\r\n)做爲分割符來解析數據;
  2. DelimiterBasedFrameDecoder:使用自定義的特殊字符做爲分隔符;
  3. HttpObjectDecoder:一個 HTTP 數據的解碼器;
  4. LengthFieldBasedFrameDecoder:經過指定長度來標識整包信息,這樣就能夠自動的處理粘包和半包信息

TCP 粘包和拆包基本介紹

  • TCP 是面向鏈接,面向流,提供高可靠性服務。在消息收發過程當中,須要在發送端和接收端創建對應的 Socket,發送端不會一有數據就進行發送,而是將屢次間隔較小的,數據量較小的數據合併成必定長度的數據包總體發送。這樣能夠提升效率,但會給接收方分辨單個數據消息增長難度,由於面向流的通訊是沒有消息保護邊界的。
  • TCP 粘包與拆包,是指發送端在發送多個數據消息時出現的不一樣情形。因爲數據在發送前須要先轉換爲二進制字節碼,當多個數據消息的字節碼被合併成一個數據包發送時,稱爲粘包;當某個數據消息的字節碼被劃分到幾個數據包內發送時,稱爲拆包粘包拆包可能使接收端解碼數據包時出現錯誤。
  • TCP 粘包和拆包的解決方案:使用自定義協議+編解碼器解決,只要接收端可以知道每次讀取數據的長度,就能夠按位讀取,避免出現讀取錯誤。咱們須要作的就是使接收端知道每次讀取數據的長度。

TCP粘包、拆包圖解

TCP 粘包拆包代碼演示

Server 端

public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        try {
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer()); //自定義初始化類
            ChannelFuture future = serverBootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

ServerInitializer

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new ServerHandler());
    }
}

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
        byte[] buffer = new byte[buf.readableBytes()];
        buf.readBytes(buffer);

        //將buffer轉換成字符串
        String str = new String(buffer, CharsetUtil.UTF_8);
        System.out.println("服務端接收到數據:" + str);
        System.out.println("服務端接收次數:" + ++count);

        ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8);
        ctx.writeAndFlush(byteBuf);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端

public class Client {

    public static void main(String[] args) {
        NioEventLoopGroup clientLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(clientLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());//自定義初始化類
            ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            clientLoopGroup.shutdownGracefully();
        }
    }
}

ClientInitializer

public class ClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new ClientHandler());
    }
}

ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String str = new String(bytes, CharsetUtil.UTF_8);
        System.out.println("客戶端接收到數據: " + str);
        System.out.println("客戶端接收次數:" + ++count);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //發送十條數據
        for (int i = 0; i < 10; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, CharsetUtil.UTF_8);
            ctx.writeAndFlush(byteBuf);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

運行結果

能夠看到在第一個客戶端沒有發生問題,啓動第二個客戶端後就發生了拆包問題。

服務端

自定義協議解決粘包拆包

  1. 要求客戶端發送 5 個 message 對象,客戶端每次發送一個 message 對象。
  2. 服務器端每次接收一個 message,分 5 次進行解碼,每讀取一個 message,會回送一個 message 對象給客戶端。

使用自定義協議+編解碼器實現具體功能:

具體代碼

客戶端與服務器主程序與以前相同

MessageProtocol 自定義協議

public class MessageProtocol {

    private int length;  //關鍵
    private byte[] context;

    public int getLength() {
        return length;
    }

    public byte[] getContext() {
        return context;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public void setContext(byte[] context) {
        this.context = context;
    }

MessageEncoder 自定義編碼器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MessageEncoder encode方法被調用");
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContext());

    }
}

MessageDecoder.自定義解碼器

public class MessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MessageDecoder decode方法被調用");
        //將獲得的二進制字節碼轉換爲 MessageProtocol 數據包
        int length = in.readInt();
        byte[] content = new byte[length];

        in.readBytes(content);

        //封裝成MessageProtocol對象,放入out中交給下一個handler處理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(length);
        messageProtocol.setContext(content);

        out.add(messageProtocol);
    }
}

在 ServerInitializer 和 ClientInitializer 中增長 addList()編解碼器

ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        //接收數據並處理
        int len = msg.getLength();
        byte[] context = msg.getContext();

        System.out.println("服務端接收到信息以下");
        System.out.println("數據長度:"+len);
        System.out.println("內容:"+new String(context, CharsetUtil.UTF_8));

        System.out.println("服務器接收到協議包數量 = "+(++this.count));

        //回覆消息
        String response = UUID.randomUUID().toString();
        int responseLen = response.getBytes("utf-8").length;
        byte[] responseBytes = response.getBytes("utf-8");
        //構建一個協議包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(responseLen);
        messageProtocol.setContext(responseBytes);

        ctx.writeAndFlush(messageProtocol);
    }
}

ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客戶端循環發送10條數據

        for (int i=0;i<5;i++){
            String mes = "今天下雨,出門帶傘";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));

            int length = mes.getBytes(Charset.forName("utf-8")).length;

            //建立協議包
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLength(length);
            messageProtocol.setContext(content);

            ctx.writeAndFlush(messageProtocol);
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常消息 = "+cause.getMessage());
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLength();
        byte[] msgContext = msg.getContext();

        System.out.println("客戶端接收的消息以下:");
        System.out.println("消息長度 = "+len);
        System.out.println("消息內容 = "+new String(msgContext, CharsetUtil.UTF_8));

        System.out.println("客戶端接收消息的數量 = "+(++this.count));
    }
}
相關文章
相關標籤/搜索