史詩級最強教科書式「NIO與Netty編程」java
1.1 概述編程
java.nio全稱java non-blocking IO,是指JDK1.4開始提供的新API。從JDK1.4開始,Java提供了一系列改進的輸入/輸出的新特性,也被稱爲NIO(既New IO),新增了許多用於處理輸入輸出的類,這些類都被放在java.nio包及子包下,而且對原http://java.io包中的不少類進行改寫,新增類知足NIO的功能。
NIO和BIO有着相同的目的和做用,可是它們的實現方式徹底不一樣,BIO以流的方式處理數據,而NIO以塊的方式處理數據,塊I/O的效率比流I/O高不少。另外,NIO是非阻塞式的,這一點跟BIO也很不相同,使用它能夠提供非阻塞式的高伸縮性網絡。
NIO主要有三大核心部分 :Channel(通道),Buffer(緩衝區),Selector(選擇器)。傳統的BIO基於字節流和字符流進行操做,而NIO基於Channel和Buffer(緩衝區)進行操做,數據老是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(選擇區)用於監聽多個通道的事件(好比 :鏈接打開,數據到達)。所以使用單個線程就能夠監聽多個數據管道。bootstrap
1.2 文件IO數組
1.2.1 概述和核心API服務器
緩衝區(Buffer):其實是一個容器,是一個特殊的數組,緩衝區對象內置流一些機制,可以跟蹤和記錄緩衝區的狀態變化狀況。Channel提供從文件、網絡讀取數據的渠道,可是讀取或寫入的數據都必須經由Buffer,以下圖所示 :網絡
在NIO中,Buffer是一個頂層父類,它是一個抽象類,經常使用的Buffer子類有:
ByteBuffer,存儲字節數據到緩衝區
ShortBuffer,存儲字符串數據到緩衝區
CharBuffer,存儲字符數據到緩衝區
IntBuffer,存儲整數數據到緩衝區
LongBuffer,存儲長整型數據到緩衝區
DoubleBuffer,存儲小數到緩衝區
FloatBuffer,存儲小數到緩衝區
對於Java中的基本數據類型,都有一個具體Buffer類型與之相對應,最經常使用的天然是ByteBuffer類(二進制數據),該類的主要方法以下所示 :
ByteBuffer類(二進制數據),該類的主要方法以下所示 :
public abstract ByteBuffer put(byte[] b);存儲字節數據到緩衝區
public abstract byte[] get();從緩衝區得到字節數據
public final byte[] array();把緩衝區數據轉換成字節數組
public static ByteBuffer allocate(int capacity);設置緩衝區的初始容量
public static ByteBuffer wrap(byte[] array);把一個現成的數組放到緩衝區中使用
public final Buffer flip();翻轉緩衝區,重置位置到初始位置
管道(Channel) :相似於BIO中的stream,例如FileInputStream對象,用來創建到目標(文件,網絡套接字,硬件設備等)的一個鏈接,可是須要注意 :BIO中的stream是單向的,例如FileInputStream對象只能進行讀取數據的操做,而NIO中的通道(Channel)是雙向的,既能夠用來進行讀操做,也能夠用來進行寫操做。經常使用的Channel類有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用於文件的數據讀寫,DatagramChannel、ServerSocketChannel和SocketChannel。FileChannel用於文件的數據讀寫,DatagramChannel用於UDP的數據讀寫,ServerSocketChannel和SocketChannel用於TCP的數據讀寫。多線程
FileChannel類,該類主要用來對本地文件進行IO操做,主要方法以下所示 :
public int read(ByteBuffer dst),讀取數據並放到緩衝區中
public int write(ByteBuffer src) , 把緩衝區的數據寫到通道中
public long transferFrom(ReadableByteChannel src,long position,long count),從目標通道中複製數據
public long transferTo(long position,long count,WritableByteChannel target),把數據從當前通道複製給目標通道架構
1.2.2 案例併發
1.往本地文件中寫數據框架
@Test public void contextLoads() throws Exception { String str = "hello,nio,我是谷"; // 建立輸出流 FileOutputStream fileOutputStream = new FileOutputStream("basic.txt"); // 從流中獲得一個通道 FileChannel fileChannel = fileOutputStream.getChannel(); // 提供一個緩衝區 ByteBuffer allocate = ByteBuffer.allocate(1024); // 往緩衝區中存入數據 allocate.put(str.getBytes()); // 當數據寫入到緩衝區中時,指針指向數據最後一行,那麼緩衝區寫入通道中輸出時,是從最後一行數據開始寫入, // 這樣就會致使寫入1024的剩餘沒有數據的空緩衝區。因此須要翻轉緩衝區,重置位置到初始位置 allocate.flip(); // 把緩衝區寫到通道中,通道負責把數據寫入到文件中 fileChannel.write(allocate); // 關閉輸出流,由於通道是輸出流建立的,因此會一塊兒關閉 fileOutputStream.close(); }
NIO中通道是從輸出流對象裏經過getChannel方法獲取到的,該通道是雙向的,既能夠讀,又能夠寫。在往通道里寫數據以前,必須經過put方法把數據存到ByteBuffer中,而後經過通道write能夠寫數據。在write以前,須要調用flip方法翻轉緩衝區,把內部定位到初始位置,這樣在接下來寫數據時才能把全部數據寫到通道里。運行效果以下圖 :
@Test // 從本地文件中讀取數據
public void test2() throws Exception {
File file = new File("basic.text");
// 1. 建立輸入流
FileInputStream fis = new FileInputStream(file);
// 2. 獲得一個通道
FileChannel fc = fis.getChannel();
// 3. 準備一個緩衝區
ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
// 4. 從通道里讀取數據並存到緩衝區中
fc.read(buffer);
System.out.println(new String(buffer.array));
// 5.關閉
fis.close();
}
@Test // 使用NIO實現文件複製
public void test3() throws Exception {
//1. 建立兩個流
FileInputStream fis = new FileInputStream("basic.text");
FileOutputStream fos = new FileOutputStream("c:\\test\\basic.text");
// 2. 獲得兩個通道
FileChannel sourceFc = fis.getChannel();
FileChannel destFc = fos.getChannel();
//3. 複製
destFc.transferFrom(sourceFc,0,sourceFc.size());
//4.關閉
fis.close();
fos.close();
}
1.3 網絡IO
1.3.1 概述和核心API
前面在進行文件IO時用到的FileChannel並不支持非阻塞操做,學習NIO主要就是進行網絡IO,Java NIO中的網絡通道是非阻塞IO的實現,基於事件驅動,很是適用於服務器須要維持大量鏈接,可是數據交換量不大的狀況,例如一些即時通信的服務等待。
在Java中編寫Socket服務器,一般有如下幾種模式 :
一個客戶端鏈接用一個線程,優勢 :程序編寫簡單;缺點 :若是鏈接很是多,分配的線程也會很是多,服務器可能會由於資源耗盡而崩潰。
把每個客戶端鏈接交給一個擁有固定數量線程的鏈接池,優勢 : 程序編寫相對簡單,能夠處理大量的鏈接。缺點 :線程的開銷很是大,鏈接若是很是多,排到現象會比較嚴重。
使用Java的NIO,用非阻塞的IO方式處理。這種模式能夠用一個線程,處理大量的客戶端鏈接。
1。Selector,選擇器,可以檢測多個註冊的通道上是否有事件發生,便獲取事件而後針對每一個事件進行相應的響應處理。這樣就能夠只用一個單線程去管理多個通道,也就是管理多個鏈接。這樣使得只用在鏈接真正有讀寫事件發生時,纔會調用函數來進行讀寫,就大大地減小了系統開銷,而且沒必要爲每一個鏈接都建立一個線程,不用去維護多個線程,而且避免了多線程之間的上下文切換致使的開銷。
該類的經常使用方法以下所示 :
public static Selector open(),獲得一個選擇器對象
public int select(long timeout),監控全部註冊的channel,當其中有註冊的IO操做能夠進行時,將對應的SelectionKey加入到內部集團中並返回,參數用來設置超時時間
public Set selectedKeys(),從內部集合中獲得全部的SelectionKey
2. SelectionKey,表明了Selector和serverSocketChannel的註冊關係,一共四種 :
int OP_ACCEPT :有新的網絡鏈接能夠accept,值爲16
int OP_CONNECT : 表明鏈接已經創建,值爲8
int OP_READ和int OP_WRITE : 表明了讀、寫操做,值爲1和4
該類的經常使用方法以下所示 :
public abstract Selector selector(),獲得與之關聯的Selector對象
public abstract SelectorChannel channel(),獲得與之關聯的通道
public final Object attachment(),獲得與之關聯的共享數據
public abstract SelectionKey interestOps(int ops),設置或改變監聽事件
public final boolean isAcceptable(),是否能夠accept
public final boolean isReadable(),是否能夠讀
public final boolean isWritable(),是否能夠寫
3. ServerSocketChannel,用來在服務器端監聽新的客戶端Socket鏈接,經常使用方法以下所示 :
public static ServerSocketChannel open(),獲得一個ServerSocketChannel通道
public final ServerSocketChannel bind(SocketAddress local),設置服務器端端口號
public final SelectableChannel configureBlocking(boolean block),設置阻塞或非阻塞模式,取值false表示採用非阻塞模式
public SocketChannel accept(),接受一個鏈接,返回表明這個鏈接的通道對象
public final SelectionKey register(Selector sel,int ops),註冊一個選擇器並設置監聽事件
4. SocketChannel,網絡IO通道,具體負責進行讀寫操做。NIO老是把緩衝區的數據寫入通道,或者把通道里的數據讀出到緩衝區(buffer)。經常使用方法以下所示 :
public static SocketChannel open(),獲得一個SocketChannel通道
public final SelectableChannel configureBlocking(boolean block),設置阻塞或非阻塞模式,取值false表示採用非阻塞模式
public boolean connect(SocketAddress remote),鏈接服務器
public boolean finishConnect(),若是上面的方法鏈接失敗,接下來就要經過該方法完成鏈接操做
public int write(ByteBuffer src),往通道里寫數據
public int read(ByteBuffer dst),從通道里讀數據
public final SelectionKey register(Selector sel,int ops,Object att),註冊一個選擇器並設置監聽事件,最後一個參數能夠設置共享數據
public final void close(),關閉通道
3.4 AIO編程
JDK1.7引入了Asynchronous I/O,既AIO。再進行I/O編程中,經常使用到兩種模式 :Reactor和Proactor。Java的NIO就是Reactor,當有事件觸發時,服務器端獲得通知,進行相應的處理。
AIO即NIO2.0,叫作異步不阻塞的IO。AIO引入異步通道的概念,採用 了Proactor模式,簡化了程序編寫,一個有效的請求才啓動一個線程,它的特色是先有操做系統完成後才通知服務端程序啓動線程去處理,通常適用於鏈接數較多且鏈接時間長的應用。
3.5 IO對比總結
IO的方式一般分爲幾種,同步阻塞的BIO、同步非阻塞的NIO、異步非阻塞的AIO。
BIO方式適用於鏈接數目比較小且固定的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,JDK1.4之前的惟一選擇,但程序直觀簡單易理解。
NIO方式適用於鏈接數據多且鏈接比較短(輕操做)的架構,好比聊天服務器,併發侷限於應用中,編程比較複雜,JDK1.4開始支持。
AIO方式適用於鏈接數目多且鏈接比較長(重操做)的架構,好比相冊服務器,充分調用OS參與併發操做,編程比較複雜,JDK7開始支持。
4.1 概述
Netty是由JBOSS提供的一個Java開源框架。Netty提供異步的、基於事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡IO程序。
Netty是一個基於NIO的網絡編程框架,使用Netty能夠幫助你快速、簡單的開發出一個網絡應用,至關於簡化和流程化了NIO的開發過程。做爲當前最流行的NIO框架,Netty在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,知名的Elasticsearch、Dubbo框架內部都採用了Netty。
4.2 Netty總體設計
4.2.1 線程模型
1.單線程模型
服務器端用一個線程經過多路複用搞定全部的IO操做(包括鏈接,讀、寫等),編碼簡單,清洗明瞭,可是若是客戶端鏈接數量較多,將沒法支撐,咋們前面的NIO案例就屬於這種模型。
2. 線程池模型
服務器端採用一個線程專門處理客戶端鏈接請求,採用一個線程組負責IO操做。在絕大數場景下,該模型都能知足使用。
3.Netty模型
Netty抽象出兩組線程池,BossGroup專門負責接收客戶端鏈接,WorkderGroup專門負責網絡讀寫操做。NioEventLoop表示一個不斷循環執行處理任務的線程,每一個NioEventLoop都有一個selector,用於監聽綁定在其上的socket網絡通道。NioEventLoop內部採用串行化設計,從消息的讀取-》解碼-》處理-》編碼-》發送,始終由IO線程NioEventLoop負責。
一個NioEventLoopGroup下包含多個NioEventLoop
每一個NioEventLoop中包含有一個Selector,一個taskQueue
每一個NioChannel只會綁定在惟一的NioEventLoop上
每一個NioChannel都綁定有一個本身的ChannelPipeline
4.2.2 異步模型
FUTURE、CALLBACK和HANDLER
Netty的異步模型是創建在future和callback的之上的。Future的核心思想是 :假設一個方法fun,計算過程可能很是耗時,等待fun返回顯然不適合。那麼能夠在調用fun的時候,立馬返回一個Future,後續能夠經過Future去監控方法fun的處理過程。
在使用Netty進行編程時,攔截操做和轉換出入站數據只須要提供callback或利用future便可。這使得鏈式操做簡單、高效,並有利於編寫可重用的、通用的代碼。Netty框架的目標就是讓你的業務邏輯從網絡基礎應用編碼中分離出來、解脫出來。
4.3 核心API
自定義一個Handler類去繼承ChannelInboundHandlerAdapter,而後經過重寫相應方法實現業務邏輯,通常都須要重寫如下方法:
public void channelActive(ChannelHandlerContext ctx),通道就緒事件
public void channelRead(ChannelHandlerContext ctx,Object msg),通道讀取數據事件
public void channelReadComplete(ChannelHandlerContext ctx),數據讀取完畢事件
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause),通道發生異常事件
ChannelPipeline addFirst(ChannelHandler…handlers),把一個業務處理類(handler)添加到鏈中的第一個位置
ChannelPipeline addLast(ChannelHandler…handlers),把一個業務處理類(handler)添加到鏈中的最後一個位置
BossEventLoopGroup一般是一個單線程的EventLoop,EventLoop維護着一個註冊了ServerSocketChannel的Selector實例,BossEventLoop不斷輪詢Selector將鏈接事件分離出來,一般是OP_ACCEPT事件,而後將接收到的SocketChannel交給WorkderEventLoopGroup,WorkderEventLoopGroup會由next選擇其中一個EventLoopGroup來將這個SocketChannel註冊到其維護的Selector並對其後續的IO事件進行處理。
經常使用方法以下所示 :
public NioEventLoopGroup(),構造方法
public Future<?> shutdownGracefully(),斷開鏈接,關閉線程
入門案例 :
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
package com.example.testdemo.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1. 建立一個線程組,接收客戶端鏈接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 建立一個線程組,處理網絡操做
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 3. 建立服務器端啓動助手來配置參數
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 4. 設置兩個線程組
serverBootstrap.group(bossGroup, workerGroup)
// 5。 使用NioServerSocketChannel做爲服務器端通道的實現
.channel(NioServerSocketChannel.class)
// 6. 設置線程隊列中等待鏈接的個數
.option(ChannelOption.SO_BACKLOG, 128)
// 7. 保持活躍鏈接狀態
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 8。 建立一個通道初始化對象
.childHandler(new ChannelInitializer<SocketChannel>() {
// 9。往pipline鏈中添加自定義的handler類
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("...Server ready....");
// 10. 綁定端口 ,bind方法是異步的,sync同步阻塞
ChannelFuture sync = serverBootstrap.bind(9999).sync();
System.out.println("...server start");
// 11。 關閉通道,關閉線程組
sync.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
package com.example.testdemo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 讀取數據事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server : " + ctx);
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("客戶端發來的消息 :" + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 數據讀取完畢事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("就是沒錢", CharsetUtil.UTF_8));
}
}
package com.example.testdemo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1. 建立一個線程組
EventLoopGroup group = new NioEventLoopGroup();
// 2。建立客戶端的啓動助手,完成香港配置
Bootstrap bootstrap = new Bootstrap();
// 3.設置線程組
bootstrap.group(group)
// 4. 設置客戶端通道的實現類
.channel(NioSocketChannel.class)
// 5. 建立一個通道初始化對象
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 6. 往pipline鏈中添加自定義handler
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
// 7. 啓動客戶端去鏈接服務器 異步非阻塞,connect是異步的,它會立馬返回一個future對象,sync是同步阻塞的用於等待主線程
System.out.println("...Client is ready ...");
ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();
// 8. 關閉鏈接 異步非阻塞
sync.channel().closeFuture().sync();
}
}
package com.example.testdemo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 客戶端業務處理類
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("服務器端發來的消息 : " + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 通道就緒事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client : " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("老闆,還錢吧", CharsetUtil.UTF_8));
}
}
聊天案例 :
package com.example.testdemo.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 聊天程序服務器端
*/
public class ChatServer {
/**
* 服務器端端口號
*/
private int port;
public ChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 往pipeline鏈中添加一個解碼器
.addLast("decoder", new StringDecoder())
// 往pipeline鏈中添加一個編碼器
.addLast("encoder", new StringEncoder())
// 往pipline鏈中添加自定義的handler(業務處理類)
.addLast(new ChatServerHandler());
}
});
System.out.println("Netty chat Server啓動。。。");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("Netty chat Server關閉。。。");
}
}
public static void main(String[] args) throws Exception {
new ChatServer(9999).run();
}
}
package com.example.testdemo.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.ArrayList;
import java.util.List;
/**
* 自定義一個服務器端業務處理類
*/
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
private static List<Channel> channels = new ArrayList<>();
/**
* 讀取數據
*
* @param channelHandlerContext
* @param s
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel inChannel = channelHandlerContext.channel();
channels.forEach(channel -> {
if (channel != inChannel) {
channel.writeAndFlush("[" + inChannel.remoteAddress().toString().substring(1) + "]" + "說 :" + s + "\n");
}
});
}
/**
* 通道就緒
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.add(channel);
System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "上線");
}
/**
* 通道未就緒
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.remove(channel);
System.out.println("[Server] : " + channel.remoteAddress().toString().substring(1) + "離線");
}
}
package com.example.testdemo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* 聊天程序客戶端
*/
public class ChatClient {
/**
* 服務器端IP地址
*/
private final String host;
/**
* 服務器端端口和
*/
private final int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 往pipeline鏈中添加一個解碼器
pipeline.addLast("decoder", new StringDecoder());
// 往pipeline鏈中添加一個編碼器
pipeline.addLast("encoder", new StringEncoder());
// 往pipeline鏈中添加自定義的handler(業務處理類)
pipeline.addLast(new ChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("----" + channel.localAddress().toString().substring(1) + "----");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String nextLine = scanner.nextLine();
channel.writeAndFlush(nextLine + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatClient("127.0.0.1", 9999).run();
}
}
package com.example.testdemo.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 自定義一個客戶端業務處理類
*/
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim());
}
}
4.6 編碼和解碼
4.6.1 概述
在編寫網絡應用程序的時候須要注意codec(編解碼器),由於數據在網絡中傳輸的都是二進制字節嗎數據,而咱們拿到的目標數據每每不是字節嗎數據。所以在發送數據時就須要編碼,接收數據時須要解碼。
codec的組成部分有兩個 :decoder(解碼器)和encoder(編碼器)。encoder負責把業務數據轉換成字節碼數據,decoder負責把字節碼數據轉換成業務數據。
其實java的序列化技術就能夠做爲codec去使用,可是它的硬傷太多 :
4.6.2 Google的Protobuf
Protobuf是Google發佈的開源項目,全稱Google Protocol Buffers,特定以下 :
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.9.1</version>
</dependency>
經過protoc.exe根據該文件生成java類,以下操做所示 :
在protoc.exe根目錄運行命令 :protoc --java_out=. Book.pro
5.1 自定義RPC
概述:
RPC(Remote Procedure Call),即遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡實現的技術。常見的RPC框架有 :Dubbo,Grpc。
5.2 設計與實現
5.2.1 結果設計
package com.example.testdemo.rpc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyRPCServer {
private int port;
public NettyRPCServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 編碼器
pipeline.addLast("encoder", new ObjectEncoder());
// 解碼器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 服務器端業務處理類
pipeline.addLast(new InvokerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("...Server is ready...");
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyRPCServer(9999).start();
}
}
package com.example.testdemo.rpc;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class NettyRPCProxy {
/**
* 根據接口建立代理對象
*
* @param target
* @return
*/
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 封裝classinfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(getClass().getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
// 開始用Netty發送數據
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 編碼器
pipeline.addLast("encoder", new ObjectEncoder());
// 解碼器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 客戶端業務處理類
pipeline.addLast("handler", resultHandler);
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
package com.example.testdemo.rpc;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reflections.Reflections;
import java.lang.reflect.Method;
import java.util.Set;
/**
* 服務器端業務處理類
*
*/
public class InvokerHandler extends ChannelInboundHandlerAdapter {
/**
* 獲得某接口下某個實現類的名字
*
* @param classInfo
* @return
* @throws Exception
*/
private String getImplClassName(ClassInfo classInfo) throws Exception {
// 服務方接口和實現類所在的包路徑
String interfacePath = "com.example.testdemo.rpc";
int lastDot = classInfo.getClassName().lastIndexOf(".");
String interfaceName = classInfo.getClassName().substring(lastDot);
Class supperClass = Class.forName(interfacePath + interfaceName);
Reflections reflection = new Reflections(interfacePath);
// 獲得某接口下的全部實現類
Set<Class> implClassSet = reflection.getSubTypesOf(supperClass);
if (implClassSet.size() == 0) {
System.out.println("未找到實現類");
return null;
} else if (implClassSet.size() > 1) {
System.out.println("找個多個實現類,未明確使用哪個");
return null;
} else {
// 把集合轉換爲數組
Class[] classes = implClassSet.toArray(new Class[0]);
// 獲得實現類的名字
return classes[0].getName();
}
}
/**
* 讀取客戶端發來的數據並經過反射調用實現類的方法
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo)msg;
Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
Method method = classInfo.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
// 經過反射調用實現類的方法
Object result = method.invoke(clazz, classInfo.getObjects());
ctx.writeAndFlush(result);
}
}
package com.example.testdemo.rpc;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 客戶端業務處理類
*/
public class ResultHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
package com.example.testdemo.rpc;
import java.io.Serializable;
public class ClassInfo implements Serializable {
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypes() {
return types;
}
public void setTypes(Class<?>[] types) {
this.types = types;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
private static final long serialVersionUID = 1L;
/**
* 類名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 參數類型
*/
private Class<?>[] types;
/**
* 參數列表
*/
private Object[] objects;
}