史詩級最強教科書式「NIO與Netty編程」

史詩級最強教科書式「NIO與Netty編程」java

 

1.1 概述編程

java.nio全稱java non-blocking IO,是指JDK1.4開始提供的新API。從JDK1.4開始,Java提供了一系列改進的輸入/輸出的新特性,也被稱爲NIO(既New IO),新增了許多用於處理輸入輸出的類,這些類都被放在java.nio包及子包下,而且對原包中的不少類進行改寫,新增類知足NIO的功能。
NIO和BIO有着相同的目的和做用,可是它們的實現方式徹底不一樣,BIO以流的方式處理數據,而NIO以塊的方式處理數據,塊I/O的效率比流I/O高不少。另外,NIO是非阻塞式的,這一點跟BIO也很不相同,使用它能夠提供非阻塞式的高伸縮性網絡。
NIO主要有三大核心部分 :Channel(通道),Buffer(緩衝區),Selector(選擇器)。傳統的BIO基於字節流和字符流進行操做,而NIO基於Channel和Buffer(緩衝區)進行操做,數據老是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(選擇區)用於監聽多個通道的事件(好比 :鏈接打開,數據到達)。所以使用單個線程就能夠監聽多個數據管道。bootstrap

 

1.2 文件IO數組

1.2.1 概述和核心API服務器

緩衝區(Buffer):其實是一個容器,是一個特殊的數組,緩衝區對象內置流一些機制,可以跟蹤和記錄緩衝區的狀態變化狀況。Channel提供從文件、網絡讀取數據的渠道,可是讀取或寫入的數據都必須經由Buffer,以下圖所示 :網絡


在NIO中,Buffer是一個頂層父類,它是一個抽象類,經常使用的Buffer子類有:
ByteBuffer,存儲字節數據到緩衝區
ShortBuffer,存儲字符串數據到緩衝區
CharBuffer,存儲字符數據到緩衝區
IntBuffer,存儲整數數據到緩衝區
LongBuffer,存儲長整型數據到緩衝區
DoubleBuffer,存儲小數到緩衝區
FloatBuffer,存儲小數到緩衝區
對於Java中的基本數據類型,都有一個具體Buffer類型與之相對應,最經常使用的天然是ByteBuffer類(二進制數據),該類的主要方法以下所示 :
ByteBuffer類(二進制數據),該類的主要方法以下所示 :
public abstract ByteBuffer put(byte[] b);存儲字節數據到緩衝區
public abstract byte[] get();從緩衝區得到字節數據
public final byte[] array();把緩衝區數據轉換成字節數組
public static ByteBuffer allocate(int capacity);設置緩衝區的初始容量
public static ByteBuffer wrap(byte[] array);把一個現成的數組放到緩衝區中使用
public final Buffer flip();翻轉緩衝區,重置位置到初始位置
管道(Channel) :相似於BIO中的stream,例如FileInputStream對象,用來創建到目標(文件,網絡套接字,硬件設備等)的一個鏈接,可是須要注意 :BIO中的stream是單向的,例如FileInputStream對象只能進行讀取數據的操做,而NIO中的通道(Channel)是雙向的,既能夠用來進行讀操做,也能夠用來進行寫操做。經常使用的Channel類有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用於文件的數據讀寫,DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用於文件的數據讀寫,DatagramChannel用於UDP的數據讀寫,ServerSocketChannel和SocketChannel用於TCP的數據讀寫。多線程


FileChannel類,該類主要用來對本地文件進行IO操做,主要方法以下所示 :
public int read(ByteBuffer dst),讀取數據並放到緩衝區中
public int write(ByteBuffer src) , 把緩衝區的數據寫到通道中
public long transferFrom(ReadableByteChannel src,long position,long count),從目標通道中複製數據
public long transferTo(long position,long count,WritableByteChannel target),把數據從當前通道複製給目標通道架構

1.2.2 案例併發

1.往本地文件中寫數據框架

@Test public void contextLoads() throws Exception { String str = "hello,nio,我是谷"; // 建立輸出流  FileOutputStream fileOutputStream = new FileOutputStream("basic.txt"); // 從流中獲得一個通道  FileChannel fileChannel = fileOutputStream.getChannel(); // 提供一個緩衝區  ByteBuffer allocate = ByteBuffer.allocate(1024); // 往緩衝區中存入數據  allocate.put(str.getBytes()); // 當數據寫入到緩衝區中時,指針指向數據最後一行,那麼緩衝區寫入通道中輸出時,是從最後一行數據開始寫入,  // 這樣就會致使寫入1024的剩餘沒有數據的空緩衝區。因此須要翻轉緩衝區,重置位置到初始位置  allocate.flip(); // 把緩衝區寫到通道中,通道負責把數據寫入到文件中  fileChannel.write(allocate); // 關閉輸出流,由於通道是輸出流建立的,因此會一塊兒關閉  fileOutputStream.close(); }

NIO中通道是從輸出流對象裏經過getChannel方法獲取到的,該通道是雙向的,既能夠讀,又能夠寫。在往通道里寫數據以前,必須經過put方法把數據存到ByteBuffer中,而後經過通道write能夠寫數據。在write以前,須要調用flip方法翻轉緩衝區,把內部定位到初始位置,這樣在接下來寫數據時才能把全部數據寫到通道里。運行效果以下圖 :

@Test  // 從本地文件中讀取數據
public void test2() throws Exception {
	File file = new File("basic.text");
	// 1. 建立輸入流
	FileInputStream fis = new FileInputStream(file);
	// 2. 獲得一個通道
	FileChannel fc = fis.getChannel();
	// 3. 準備一個緩衝區
	ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
	// 4. 從通道里讀取數據並存到緩衝區中
	fc.read(buffer);
	System.out.println(new String(buffer.array));
	// 5.關閉
	fis.close();
}
@Test // 使用NIO實現文件複製
public void test3() throws Exception {
	//1. 建立兩個流
	FileInputStream fis = new FileInputStream("basic.text");
	FileOutputStream fos = new FileOutputStream("c:\\test\\basic.text");
	// 2. 獲得兩個通道
	FileChannel sourceFc = fis.getChannel();
	FileChannel destFc = fos.getChannel();
	//3. 複製
	destFc.transferFrom(sourceFc,0,sourceFc.size());
	//4.關閉
	fis.close();
	fos.close();
}

1.3 網絡IO

1.3.1 概述和核心API

前面在進行文件IO時用到的FileChannel並不支持非阻塞操做,學習NIO主要就是進行網絡IO,Java NIO中的網絡通道是非阻塞IO的實現,基於事件驅動,很是適用於服務器須要維持大量鏈接,可是數據交換量不大的狀況,例如一些即時通信的服務等待。
在Java中編寫Socket服務器,一般有如下幾種模式 :
一個客戶端鏈接用一個線程,優勢 :程序編寫簡單;缺點 :若是鏈接很是多,分配的線程也會很是多,服務器可能會由於資源耗盡而崩潰。
把每個客戶端鏈接交給一個擁有固定數量線程的鏈接池,優勢 : 程序編寫相對簡單,能夠處理大量的鏈接。缺點 :線程的開銷很是大,鏈接若是很是多,排到現象會比較嚴重。
使用Java的NIO,用非阻塞的IO方式處理。這種模式能夠用一個線程,處理大量的客戶端鏈接。
1。Selector,選擇器,可以檢測多個註冊的通道上是否有事件發生,便獲取事件而後針對每一個事件進行相應的響應處理。這樣就能夠只用一個單線程去管理多個通道,也就是管理多個鏈接。這樣使得只用在鏈接真正有讀寫事件發生時,纔會調用函數來進行讀寫,就大大地減小了系統開銷,而且沒必要爲每一個鏈接都建立一個線程,不用去維護多個線程,而且避免了多線程之間的上下文切換致使的開銷。
該類的經常使用方法以下所示 :
public static Selector open(),獲得一個選擇器對象
public int select(long timeout),監控全部註冊的channel,當其中有註冊的IO操做能夠進行時,將對應的SelectionKey加入到內部集團中並返回,參數用來設置超時時間
public Set selectedKeys(),從內部集合中獲得全部的SelectionKey
2. SelectionKey,表明了Selector和serverSocketChannel的註冊關係,一共四種 :
int OP_ACCEPT :有新的網絡鏈接能夠accept,值爲16
int OP_CONNECT : 表明鏈接已經創建,值爲8
int OP_READ和int OP_WRITE : 表明了讀、寫操做,值爲1和4
該類的經常使用方法以下所示 :
public abstract Selector selector(),獲得與之關聯的Selector對象
public abstract SelectorChannel channel(),獲得與之關聯的通道
public final Object attachment(),獲得與之關聯的共享數據
public abstract SelectionKey interestOps(int ops),設置或改變監聽事件
public final boolean isAcceptable(),是否能夠accept
public final boolean isReadable(),是否能夠讀
public final boolean isWritable(),是否能夠寫
3. ServerSocketChannel,用來在服務器端監聽新的客戶端Socket鏈接,經常使用方法以下所示 :
public static ServerSocketChannel open(),獲得一個ServerSocketChannel通道
public final ServerSocketChannel bind(SocketAddress local),設置服務器端端口號
public final SelectableChannel configureBlocking(boolean block),設置阻塞或非阻塞模式,取值false表示採用非阻塞模式
public SocketChannel accept(),接受一個鏈接,返回表明這個鏈接的通道對象
public final SelectionKey register(Selector sel,int ops),註冊一個選擇器並設置監聽事件
4. SocketChannel,網絡IO通道,具體負責進行讀寫操做。NIO老是把緩衝區的數據寫入通道,或者把通道里的數據讀出到緩衝區(buffer)。經常使用方法以下所示 :
public static SocketChannel open(),獲得一個SocketChannel通道
public final SelectableChannel configureBlocking(boolean block),設置阻塞或非阻塞模式,取值false表示採用非阻塞模式
public boolean connect(SocketAddress remote),鏈接服務器
public boolean finishConnect(),若是上面的方法鏈接失敗,接下來就要經過該方法完成鏈接操做
public int write(ByteBuffer src),往通道里寫數據
public int read(ByteBuffer dst),從通道里讀數據
public final SelectionKey register(Selector sel,int ops,Object att),註冊一個選擇器並設置監聽事件,最後一個參數能夠設置共享數據
public final void close(),關閉通道

3.4 AIO編程

JDK1.7引入了Asynchronous I/O,既AIO。再進行I/O編程中,經常使用到兩種模式 :Reactor和Proactor。Java的NIO就是Reactor,當有事件觸發時,服務器端獲得通知,進行相應的處理。
AIO即NIO2.0,叫作異步不阻塞的IO。AIO引入異步通道的概念,採用 了Proactor模式,簡化了程序編寫,一個有效的請求才啓動一個線程,它的特色是先有操做系統完成後才通知服務端程序啓動線程去處理,通常適用於鏈接數較多且鏈接時間長的應用。

3.5 IO對比總結

IO的方式一般分爲幾種,同步阻塞的BIO、同步非阻塞的NIO、異步非阻塞的AIO。
BIO方式適用於鏈接數目比較小且固定的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,JDK1.4之前的惟一選擇,但程序直觀簡單易理解。
NIO方式適用於鏈接數據多且鏈接比較短(輕操做)的架構,好比聊天服務器,併發侷限於應用中,編程比較複雜,JDK1.4開始支持。
AIO方式適用於鏈接數目多且鏈接比較長(重操做)的架構,好比相冊服務器,充分調用OS參與併發操做,編程比較複雜,JDK7開始支持。

4.1 概述

Netty是由JBOSS提供的一個Java開源框架。Netty提供異步的、基於事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡IO程序。
Netty是一個基於NIO的網絡編程框架,使用Netty能夠幫助你快速、簡單的開發出一個網絡應用,至關於簡化和流程化了NIO的開發過程。做爲當前最流行的NIO框架,Netty在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,知名的Elasticsearch、Dubbo框架內部都採用了Netty。

 

4.2 Netty總體設計

4.2.1 線程模型

1.單線程模型


服務器端用一個線程經過多路複用搞定全部的IO操做(包括鏈接,讀、寫等),編碼簡單,清洗明瞭,可是若是客戶端鏈接數量較多,將沒法支撐,咋們前面的NIO案例就屬於這種模型。
2. 線程池模型
服務器端採用一個線程專門處理客戶端鏈接請求,採用一個線程組負責IO操做。在絕大數場景下,該模型都能知足使用。


3.Netty模型


Netty抽象出兩組線程池,BossGroup專門負責接收客戶端鏈接,WorkderGroup專門負責網絡讀寫操做。NioEventLoop表示一個不斷循環執行處理任務的線程,每一個NioEventLoop都有一個selector,用於監聽綁定在其上的socket網絡通道。NioEventLoop內部採用串行化設計,從消息的讀取-》解碼-》處理-》編碼-》發送,始終由IO線程NioEventLoop負責。
一個NioEventLoopGroup下包含多個NioEventLoop
每一個NioEventLoop中包含有一個Selector,一個taskQueue
每一個NioChannel只會綁定在惟一的NioEventLoop上
每一個NioChannel都綁定有一個本身的ChannelPipeline

4.2.2 異步模型

FUTURE、CALLBACK和HANDLER
Netty的異步模型是創建在future和callback的之上的。Future的核心思想是 :假設一個方法fun,計算過程可能很是耗時,等待fun返回顯然不適合。那麼能夠在調用fun的時候,立馬返回一個Future,後續能夠經過Future去監控方法fun的處理過程。
在使用Netty進行編程時,攔截操做和轉換出入站數據只須要提供callback或利用future便可。這使得鏈式操做簡單、高效,並有利於編寫可重用的、通用的代碼。Netty框架的目標就是讓你的業務邏輯從網絡基礎應用編碼中分離出來、解脫出來。

 

4.3 核心API

 

  • ChannelHandler及其實現類
    ChannelHandler接口定義了許多事件處理的方法,咱們能夠經過重寫這些方法區實現具體的業務邏輯。API關係以下圖所示:

 


自定義一個Handler類去繼承ChannelInboundHandlerAdapter,而後經過重寫相應方法實現業務邏輯,通常都須要重寫如下方法:
public void channelActive(ChannelHandlerContext ctx),通道就緒事件
public void channelRead(ChannelHandlerContext ctx,Object msg),通道讀取數據事件
public void channelReadComplete(ChannelHandlerContext ctx),數據讀取完畢事件
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause),通道發生異常事件

  • Pipeline和ChannelPipline
    ChannelPipeline是一個Handler的集合,它負責處理和攔截inbound或者outbound的事件和操做,至關於一個貫穿Netty的鏈。


ChannelPipeline addFirst(ChannelHandler…handlers),把一個業務處理類(handler)添加到鏈中的第一個位置
ChannelPipeline addLast(ChannelHandler…handlers),把一個業務處理類(handler)添加到鏈中的最後一個位置

  • ChannelHandlerContext
    這是事件處理器上下文對象,Pipeline鏈中的實際處理節點。每一個處理節點ChannelHandlerContext中包含一個具體的事件處理器ChannelHandler,同時ChannelHandlerContext中也綁定了對應的pipeline和Channel的信息,方便對ChannelHandler進行調用。
    經常使用方法以下所示 :
    ChannelFuture close(),關閉通道
    ChannelOutboundInvoker flush(),刷新
    ChannelFuture writeAndFlush(Object msg),將數據寫到ChannelPipeline中當前ChannelHandler的下一個ChanelHandler開始處理(出站)
  • ChannelOption
    Netty在建立Channel實例後,通常都須要設置ChannelOption參數。ChannelOption是Socket的標準參數,而非Netty首創的。經常使用的參數配置有 :

 

 

  1. ChannelOption.SO_BACKLOG
    對應TCP/IP協議listen函數中的backlog參數,用來初始化服務器可鏈接隊列大小。服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接。多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog參數指定了隊列d大小。
  2. ChannelOption.SO_KEEPALIVE
    一直保持鏈接活動狀態。

 

 

  • ChannelFuture
    表示Channel中異步I/O操做的結果,在Netty中全部的I/O操做都是異步的,I/O的調用會直接返回,調用者並不能馬上得到結果,可是能夠經過ChannelFuture來獲取I/O操做的處理狀態。經常使用方法以下所示 :
    Channel channel(),返回當前正在進行IO操做的通道
    ChannelFuture sync(),等待異步操做執行完畢
  • EventLoopGroup和其實現類NioEventLoopGroup
    EventLoopGroup是一組EventLoop的抽象,Netty爲了更好的利用多核CPU資源,通常會有多個EventLoop同時工做,每一個EventLoop維護着一個Selector實例。
    EventLoopGroup提供next接口,能夠從組裏面按照必定規則獲取其中一個EventLoop來處理任務。在Netty服務器端編程中,咱們通常都須要提供兩個EventLoopGroup,例如:BossEventLoopGroup和WorkderEventLoopGroup。
    一般一個服務端口即一個ServerSocketChannel對應一個Selector和一個EventLoop線程。BossEventLoop負責接收客戶端的鏈接並將SocketChannel交給WorkerEventLoopGroup來進行IO處理,以下圖所示 :


BossEventLoopGroup一般是一個單線程的EventLoop,EventLoop維護着一個註冊了ServerSocketChannel的Selector實例,BossEventLoop不斷輪詢Selector將鏈接事件分離出來,一般是OP_ACCEPT事件,而後將接收到的SocketChannel交給WorkderEventLoopGroup,WorkderEventLoopGroup會由next選擇其中一個EventLoopGroup來將這個SocketChannel註冊到其維護的Selector並對其後續的IO事件進行處理。
經常使用方法以下所示 :
public NioEventLoopGroup(),構造方法
public Future<?> shutdownGracefully(),斷開鏈接,關閉線程

  • ServerBootstrap和Bootstrap
    ServerBootStrap是Netty中的服務端端啓動助手,經過它能夠完成服務器端的各類配置;Bootstrap是Netty中的客戶端啓動助手,經過它能夠完成客戶端的各類配置。經常使用方法以下所示 :
    public ServerBootstrap group(EventLoopGroup parentGroup,EventLoopGroup childGroup),該方法用於服務器端,用來設置兩個EventLoop
    public B group(EventLoopGroup group),該方法用於客戶端,用來設置一個EventLoop
    public B channel(Class<? extends C> channelClass),該方法用來設置一個服務器端的通道實現
    public B option(ChannelOption option,T value),用來給ServerChannel添加配置
    public ServerBootstrap childOption(ChannelOption childOption,T value),用來給接收到的通道添加配置
    public ServerBootstrap childHandler(ChannelHandler childHandler),該方法用來設置業務處理類(自定義的handler)
    public ChannelFuture bind(int inetPort),該方法用於服務器端,用來設置佔用的端口號
    public ChannelFuture connect(String inetHost,int inetPort),該方法用於客戶端,用來鏈接服務器端
  • Unpooled類
    這是Netty提供的一個專門用來操做緩衝區的工具類,經常使用方法以下所示 :
    public static ByteBuf copiedBuffer(CharSequence string, Charset charset),經過給定的數據和字符編碼返回一個ByteBuf對象(相似於Nio中的ByteBuffer對象)

 

 

入門案例 :

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
package com.example.testdemo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        // 1. 建立一個線程組,接收客戶端鏈接
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        // 2. 建立一個線程組,處理網絡操做
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 3. 建立服務器端啓動助手來配置參數
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 4. 設置兩個線程組
        serverBootstrap.group(bossGroup, workerGroup)
                // 5。 使用NioServerSocketChannel做爲服務器端通道的實現
                .channel(NioServerSocketChannel.class)
                // 6. 設置線程隊列中等待鏈接的個數
                .option(ChannelOption.SO_BACKLOG, 128)
                // 7. 保持活躍鏈接狀態
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 8。 建立一個通道初始化對象
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    // 9。往pipline鏈中添加自定義的handler類
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline().addLast(new NettyServerHandler());
                    }
                });
        System.out.println("...Server ready....");
        // 10. 綁定端口 ,bind方法是異步的,sync同步阻塞
        ChannelFuture sync = serverBootstrap.bind(9999).sync();
        System.out.println("...server start");
        // 11。 關閉通道,關閉線程組
        sync.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
package com.example.testdemo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 讀取數據事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server : " + ctx);
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("客戶端發來的消息 :" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 數據讀取完畢事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("就是沒錢", CharsetUtil.UTF_8));
    }
}
package com.example.testdemo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        // 1. 建立一個線程組
        EventLoopGroup group = new NioEventLoopGroup();
        // 2。建立客戶端的啓動助手,完成香港配置
        Bootstrap bootstrap = new Bootstrap();
        // 3.設置線程組
        bootstrap.group(group)
                // 4. 設置客戶端通道的實現類
                .channel(NioSocketChannel.class)
                // 5. 建立一個通道初始化對象
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 6. 往pipline鏈中添加自定義handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        // 7. 啓動客戶端去鏈接服務器 異步非阻塞,connect是異步的,它會立馬返回一個future對象,sync是同步阻塞的用於等待主線程
        System.out.println("...Client is ready ...");
        ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();
        // 8. 關閉鏈接 異步非阻塞
        sync.channel().closeFuture().sync();
    }
}
package com.example.testdemo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 客戶端業務處理類
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        System.out.println("服務器端發來的消息 : " + byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 通道就緒事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client : " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("老闆,還錢吧", CharsetUtil.UTF_8));
    }
}

聊天案例 :

package com.example.testdemo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 聊天程序服務器端
 */
public class ChatServer {

    /**
     * 服務器端端口號
     */
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    // 往pipeline鏈中添加一個解碼器
                                    .addLast("decoder", new StringDecoder())
                                    // 往pipeline鏈中添加一個編碼器
                                    .addLast("encoder", new StringEncoder())
                                    // 往pipline鏈中添加自定義的handler(業務處理類)
                                    .addLast(new ChatServerHandler());
                        }
                    });
            System.out.println("Netty chat Server啓動。。。");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("Netty chat Server關閉。。。");
        }
    }

    public static void main(String[] args) throws Exception {
        new ChatServer(9999).run();
    }

}
package com.example.testdemo.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定義一個服務器端業務處理類
 */
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static List<Channel> channels = new ArrayList<>();

    /**
     * 讀取數據
     *
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel inChannel = channelHandlerContext.channel();
        channels.forEach(channel -> {
            if (channel != inChannel) {
                channel.writeAndFlush("[" + inChannel.remoteAddress().toString().substring(1) + "]" + "說 :" + s + "\n");
            }
        });
    }

    /**
     * 通道就緒
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.add(channel);
        System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "上線");
    }

    /**
     * 通道未就緒
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channels.remove(channel);
        System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "離線");
    }
}
package com.example.testdemo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * 聊天程序客戶端
 */
public class ChatClient {
    /**
     * 服務器端IP地址
     */
    private final String host;

    /**
     * 服務器端端口和
     */
    private final int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 往pipeline鏈中添加一個解碼器
                            pipeline.addLast("decoder", new StringDecoder());
                            // 往pipeline鏈中添加一個編碼器
                            pipeline.addLast("encoder", new StringEncoder());
                            // 往pipeline鏈中添加自定義的handler(業務處理類)
                            pipeline.addLast(new ChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("----" + channel.localAddress().toString().substring(1) + "----");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String nextLine = scanner.nextLine();
                channel.writeAndFlush(nextLine  + "\r\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new ChatClient("127.0.0.1", 9999).run();
    }
}
package com.example.testdemo.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 自定義一個客戶端業務處理類
 */
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

4.6 編碼和解碼

4.6.1 概述

在編寫網絡應用程序的時候須要注意codec(編解碼器),由於數據在網絡中傳輸的都是二進制字節嗎數據,而咱們拿到的目標數據每每不是字節嗎數據。所以在發送數據時就須要編碼,接收數據時須要解碼。
codec的組成部分有兩個 :decoder(解碼器)和encoder(編碼器)。encoder負責把業務數據轉換成字節碼數據,decoder負責把字節碼數據轉換成業務數據。


其實java的序列化技術就能夠做爲codec去使用,可是它的硬傷太多 :

 

  1. 沒法跨語言,這應該是Java序列化最致命的問題了。
  2. 序列化後體積太大,是二進制編碼的5倍多。
  3. 序列化性能過低。
    因爲Java序列化硬傷太多,所以Netty自身提供了一些codec,以下所示 :
    Netty提供的解碼器 :
  4. StringDecoder,對字符串數據解碼
  5. ObjectDecoder,對Java對象進行解碼
    Netty提供的解碼器:
  6. StringEncoder,對字符串數據進行編碼
  7. ObjectEncoder,對Java對象進行編碼
    Netty自帶的ObjectDecoder和ObjectEncoder能夠用來實現POJP對象或各類業務對象的編碼和解碼,但其內部使用的還是Java序列化技術,因此不建議使用。

 

4.6.2 Google的Protobuf

Protobuf是Google發佈的開源項目,全稱Google Protocol Buffers,特定以下 :

 

  • 支持跨平臺、多語言(支持目前絕大多數語言,例如C++、C#、Java、Python等)
  • 高性能,高可靠性
  • 使用protobuf編譯器能自動生成代碼,Protpbuf是將類的定義使用.protp文件進行描述,而後經過protoc.exe編譯器根據.proto自動生成.java文件
    目前在使用Netty開發時,常常會結合Protobuf做爲codec(編解碼器)去使用,具體用法以下所示 :

 

<!--  -->

<dependency>

<groupId>com.google.protobuf</groupId>

<artifactId>protobuf-java</artifactId>

<version>3.9.1</version>

</dependency>

 


經過protoc.exe根據該文件生成java類,以下操做所示 :


在protoc.exe根目錄運行命令 :protoc --java_out=. Book.pro

5.1 自定義RPC

概述:
RPC(Remote Procedure Call),即遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡實現的技術。常見的RPC框架有 :Dubbo,Grpc。

 

 

  1. 服務消費房(client)以本地調用方式調用服務
  2. client stub 接收到調用後負責將方法、參數等封裝成可以進行傳輸的消息體
  3. client stub 將消息進行編碼併發送到服務端
  4. server stub 收到消息後進行解碼
  5. server stub 根據解碼結果調用本地的服務
  6. 本地服務執行並將結果返回給server stub
  7. server stub將返回導入結果進行編碼併發送至消費方
  8. client stub接收到消息並進行解碼
  9. 服務消費方(client)獲得結果
    RPC的目標就是將2-8這些步驟都封裝起來,用戶無需關係這些細節,能夠像調用本地方法同樣便可完成遠程服務調用。

 

5.2 設計與實現

5.2.1 結果設計

 

 

 

  • Client(服務的調用方) :兩個接口 + 一個包含main方法的測試類
  • Client Stub :一個客戶端代理類 + 一個客戶端業務處理類
  • Server(服務的提供方) :兩個接口 + 兩個實現類
  • Server Stub :一個網絡處理服務器 + 一個服務器業務處理類
    注意 :服務調用方的接口必須跟服務提供方的接口保持一致(包路徑能夠不一致)
    最終要實現的目標是 :在TestNettyRPC中遠程調用HelloRPCImpl或HelloNettyImpl中的方法
package com.example.testdemo.rpc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class NettyRPCServer {
    private int port;
    public NettyRPCServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 編碼器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            // 解碼器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            // 服務器端業務處理類
                            pipeline.addLast(new InvokerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("...Server is ready...");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyRPCServer(9999).start();
    }
}
package com.example.testdemo.rpc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class NettyRPCProxy {

    /**
     * 根據接口建立代理對象
     *
     * @param target
     * @return
     */
    public static Object create(Class target) {
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 封裝classinfo
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(getClass().getName());
                classInfo.setMethodName(method.getName());
                classInfo.setObjects(args);
                classInfo.setTypes(method.getParameterTypes());

                // 開始用Netty發送數據
                EventLoopGroup group = new NioEventLoopGroup();
                ResultHandler resultHandler = new ResultHandler();
                try {
                    Bootstrap bootstrap = new Bootstrap();
                    bootstrap.group(group)
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    // 編碼器
                                    pipeline.addLast("encoder", new ObjectEncoder());
                                    // 解碼器
                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                    // 客戶端業務處理類
                                    pipeline.addLast("handler", resultHandler);
                                }
                            });
                    ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
                    future.channel().writeAndFlush(classInfo).sync();
                    future.channel().closeFuture().sync();
                } finally {
                    group.shutdownGracefully();
                }

                return resultHandler.getResponse();
            }
        });
    }

}
package com.example.testdemo.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * 服務器端業務處理類
 *
 */
public class InvokerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 獲得某接口下某個實現類的名字
     *
     * @param classInfo
     * @return
     * @throws Exception
     */
    private String getImplClassName(ClassInfo classInfo) throws Exception {
        // 服務方接口和實現類所在的包路徑
        String interfacePath = "com.example.testdemo.rpc";
        int lastDot = classInfo.getClassName().lastIndexOf(".");
        String interfaceName = classInfo.getClassName().substring(lastDot);
        Class supperClass = Class.forName(interfacePath + interfaceName);
        Reflections reflection = new Reflections(interfacePath);
        // 獲得某接口下的全部實現類
        Set<Class> implClassSet = reflection.getSubTypesOf(supperClass);
        if (implClassSet.size() == 0) {
            System.out.println("未找到實現類");
            return null;
        } else if (implClassSet.size() > 1) {
            System.out.println("找個多個實現類,未明確使用哪個");
            return null;
        } else {
            // 把集合轉換爲數組
            Class[] classes = implClassSet.toArray(new Class[0]);
            // 獲得實現類的名字
            return classes[0].getName();
        }
    }

    /**
     * 讀取客戶端發來的數據並經過反射調用實現類的方法
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo)msg;
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        Method method = classInfo.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        // 經過反射調用實現類的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }
}
package com.example.testdemo.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 客戶端業務處理類
 */
public class ResultHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    public Object getResponse() {
        return response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }
}
package com.example.testdemo.rpc;

import java.io.Serializable;

public class ClassInfo implements Serializable {

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getTypes() {
        return types;
    }

    public void setTypes(Class<?>[] types) {
        this.types = types;
    }

    public Object[] getObjects() {
        return objects;
    }

    public void setObjects(Object[] objects) {
        this.objects = objects;
    }

    private static final long serialVersionUID = 1L;

    /**
     * 類名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 參數類型
     */
    private Class<?>[] types;

    /**
     * 參數列表
     */
    private Object[] objects;


}
相關文章
相關標籤/搜索