前言java
上次提到要改進咱們的RPC框架,這周花時間研究一下JDK提供給咱們的原生NIO非阻塞式網絡編程思想。NIO 庫是在 JDK 1.4 中引入的。NIO 彌補了原來的 I/O 的不足,它在標準 Java 代碼中提供了高速的、面向塊的 I/O。編程
BIO與NIO的主要區別數組
1. 面向流和麪向緩衝
緩存
java NIO和BIO之間第一個最大的區別是,BIO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取全部字節,它們沒有被緩存在任何地方。此外,它不能先後移動流中的數據。若是須要先後移動從流中讀取的數據,須要先將它緩存到一個緩衝區。 Java NIO的緩衝導向方法略有不一樣。數據讀取到一個它稍後處理的緩衝區,須要時可在緩衝區中先後移動。這就增長了處理過程當中的靈活性。可是,還須要檢查是否該緩衝區中包含全部須要處理的數據。並且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區裏還沒有處理的數據。服務器
2. 阻塞與非阻塞網絡
Java BIO的各類流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據徹底寫入。該線程在此期間不能再幹任何事情了。併發
Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,可是它僅能獲得目前可用的數據,若是目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,因此直至數據變的能夠讀取以前,該線程能夠繼續作其餘的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不須要等待它徹底寫入,這個線程同時能夠去作別的事情。 線程一般將非阻塞IO的空閒時間用於在其它通道上執行IO操做,因此一個單獨的線程如今能夠管理多個輸入和輸出通道(channel)。框架
3. NIO特有的Selector選擇器機制socket
Java NIO的選擇器容許一個單獨的線程來監視多個輸入通道,你能夠註冊多個通道使用一個選擇器,而後使用一個單獨的線程來「選擇」通道:這些通道里已經有能夠處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的線程很容易來管理多個通道。ide
今天咱們就基於以上的理解,實現一個端對端的非阻塞式IO的網絡編程。
實戰設計
客戶端部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO客戶端核心處理器
*/
public class NioClientHandler implements Runnable {
//服務端主機
private final String host;
//服務端口
private final int port;
/**定義NIO選擇器:用於註冊和監聽事件
* 選擇監聽的事件類型: OP_READ 讀事件 / OP_WRITE 寫事件
* OP_CONNECT 客戶端鏈接事件 / OP_ACCEPT 服務端接收通道鏈接事件
*/
private Selector selector = null;
//定義客戶端鏈接通道
private SocketChannel channel = null;
//運行狀態是否被激活
private volatile boolean activated=false;
public NioClientHandler(String host, int port) {
this.port = port;
this.host = host;
this.init();
}
/**
* 處理器初始化
* 負責創建鏈接準備工做
*/
private void init(){
try {
//建立並打開選擇器
this.selector = Selector.open();
//創建並打開監聽通道
this.channel = SocketChannel.open();
/**
* 設置通道通信模式爲非阻塞,NIO默認爲阻塞式的
*/
this.channel.configureBlocking(false);
//激活運行狀態
this.activated = true;
} catch (IOException e) {
e.printStackTrace();
this.stop();
}
}
/**
* 鏈接服務器
*/
private void connect(){
try {
/**
* 鏈接服務端:由於以前設置了通信模式爲非阻塞
* 這裏會當即返回TCP握手是否已創建
*/
if(this.channel.connect(new InetSocketAddress(this.host, this.port))){
//鏈接創建後,在通道上註冊讀事件關注,客戶端一接收到數據當即觸發處理
this.channel.register(this.selector, SelectionKey.OP_READ);
}
else{
//若鏈接握手未創建,則在通道上繼續關注鏈接事件,一旦鏈接創建繼續進行後續的處理邏輯
this.channel.register(this.selector, SelectionKey.OP_CONNECT);
}
} catch (IOException e) {
e.printStackTrace();
this.stop();
}
}
/**
* 選擇器事件迭代處理
* @param keys 選擇器事件KEY
*/
private void eventIterator(Set<SelectionKey> keys){
SelectionKey key = null;
//這裏採用迭代器,由於須要迭代時對key進行移除操做
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()){
key = it.next();
//這裏先移除事件key,避免屢次處理
it.remove();
//處理迭代事件
this.proccessEvent(key);
}
}
/**
* 處理髮生的事件
* @param key 選擇器事件KEY
*/
private void proccessEvent(SelectionKey key){
//只對有效的事件類型進行處理
if(key.isValid()){
try {
//在事件通道上處理
SocketChannel socketChannel = (SocketChannel) key.channel();
/**處理鏈接就緒事件
* */
if(key.isConnectable()){
//檢測鏈接是否完成,避免發生致使NotYetConnectedException異常
if(socketChannel.finishConnect()){
System.out.println("Has completed connection with server..");
/**
* 在通道上關注讀事件,NO的寫事件通常不特別關注,
* 緣由:寫緩衝區大部分時間被認爲是空閒的,會頻繁被選擇器選擇(會浪費CPU資源),
* 因此不該該頻繁被註冊;
* 只有在寫的數據超過寫緩衝區可用空間時,把一部分數據刷出緩衝區後,
* 有空間時再通知應用程序進行寫;
* 且應用程序寫完後,應當即關閉寫事件
*/
socketChannel.register(this.selector, SelectionKey.OP_READ);
}else{//這裏若鏈接仍未創建通常視爲網絡或其餘緣由,暫時退出
this.stop();
}
}
/**
* 處理讀事件
*/
if(key.isReadable()){
//開闢內存緩衝區,這裏用JVM堆內存
ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
//將通道中的數據讀到緩衝區
int length = socketChannel.read(buffer);
if(0 < length){
/**
* 進行讀寫轉換,NIO固定範式
*/
buffer.flip();
//獲取buffer可用空間
int size = buffer.remaining();
byte[] bytes = new byte[size];
//讀Buffer
buffer.get(bytes);
//獲取緩衝區數據
String result = new String(bytes,"utf-8");
System.out.println("Recevied server message: "+result);
}else if(0 > length){
//取消關注當前事件,關閉通道
key.cancel();
socketChannel.close();
}
}
} catch (Exception e) {
key.cancel();
if(null != key.channel()){
try {
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
e.printStackTrace();
}
}
}
/**
* 寫數據到對端
* @param data
*/
public void write(String data){
try {
byte[] bytes = data.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
//將數據放入寫緩衝區
buffer.put(bytes);
buffer.flip();
this.channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 中止運行
*/
public void stop(){
this.activated = false;
System.exit(-1);
}
/**
* 客戶端通信業務覈實現
*/
@Override
public void run() {
//創建服務器鏈接
this.connect();
//持續監聽各類事件的發生
while (this.activated){
try {
//監聽事件是否發生,若發生直接返回;反之阻塞至事件發生
this.selector.select();
} catch (IOException e) {
e.printStackTrace();
this.stop();
}
//獲取發生事件的類型
Set<SelectionKey> keys = this.selector.selectedKeys();
//迭代處理事件
this.eventIterator(keys);
}
//關閉選擇器
if(null != this.selector){
try {
this.selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
this.stop();
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO客戶端啓動器
*/
public class NioClientStarter {
private static NioClientHandler clientHandler = null;
/*啓動運行客戶端*/
public static void main(String[] args) {
try {
clientHandler = new NioClientHandler(Constant.SERV_HOST, Constant.SERV_PORT);
new Thread(clientHandler).start();
} catch (Exception e) {
e.printStackTrace();
}
/**
* 在控制檯發實時數據到對端
*/
Scanner scanner = new Scanner(System.in);
while (true){
String data = scanner.next();
if(null != data && !"".equals(data)){
clientHandler.write(data);
}
}
}
}
服務端部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO服務端核心處理器
*/
public class NioServerHandler implements Runnable{
private final int port;
//定義選擇器
private Selector selector = null;
/**
* 定義服務端通道: 與客戶端相似的思路
*/
private ServerSocketChannel channel = null;
//服務器運行是否被激活
private volatile boolean activated = false;
public NioServerHandler(int port) {
this.port = port;
this.init();
}
/**
* 初始化處理器
* 負責作好運行監聽和接收以前的準備
*/
private void init(){
try {
//建立並打開選擇器
this.selector = Selector.open();
//建立並打開監聽通道
this.channel = ServerSocketChannel.open();
/**
* 設置通道通信模式爲非阻塞(NIO默認爲阻塞)
*/
this.channel.configureBlocking(false);
//綁定監聽的服務端口
this.channel.socket().bind(new InetSocketAddress(this.port));
/**
* 註冊在服務端通道上,首先關注的事件
*/
this.channel.register(this.selector, SelectionKey.OP_ACCEPT);
//設置運行狀態激活
this.activated = true;
} catch (IOException e) {
e.printStackTrace();
this.stop();
}
}
/**
* 中止服務
*/
public void stop(){
this.activated = false;
try {
//關閉選擇器
if(null != this.selector){
if(this.selector.isOpen()){
this.selector.close();
}
this.selector = null;
}
//關閉通道
if(null != this.channel){
if(this.channel.isOpen()){
this.channel.close();
}
this.channel = null;
}
} catch (IOException e) {
e.printStackTrace();
}
System.exit(-1);
}
/**
* 在迭代處理髮生的事件
* @param keys 發生的事件類型
*/
private void eventIterator(Set<SelectionKey> keys){
//SelectionKey key = null;
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()){
SelectionKey key = it.next();
/**
* 這裏先從迭代器移除,避免後面重複執行
*/
it.remove();
//處理事件
this.proccessEvent(key);
}
}
/**
*
* @param key 選擇執行的事件KEY
*/
private void proccessEvent(SelectionKey key){
//只對有效的事件KEY執行處理
if(key.isValid()){
try {
/**
* 處理通道接收數據事件
*/
if(key.isAcceptable()){
/**
* 注意這裏接收事件的通道是服務端通道
*/
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
//接收客戶端Socket
SocketChannel channel = serverChannel.accept();
//設置其爲非阻塞
channel.configureBlocking(false);
//而後註冊此通道的讀事件
channel.register(this.selector, SelectionKey.OP_READ);
System.out.println("Build connection with client..");
}
/**
* 處理讀事件
*/
if(key.isReadable()){
System.out.println("Reading client data...");
SocketChannel channel = (SocketChannel) key.channel();
//開闢內存空間,接收數據
ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
//將數據讀入緩衝區
int length = channel.read(buffer);
if(0 < length){
//讀寫切換
buffer.flip();
//更具緩衝區數據創建轉換的字節數組
byte[] bytes = new byte[buffer.remaining()];
//從緩衝區讀取字節數據
buffer.get(bytes);
//解碼數據
String data = new String(bytes, "utf-8");
System.out.println("Recevied data: "+data);
//向對端發送接收應答
String answer = "Server has recevied data:"+data;
this.reply(channel, answer);
}else if(0 > length){
//取消處理的事件
key.cancel();
channel.close();
}
}
/**
* 處理寫事件
*/
if(key.isWritable()){
SocketChannel channel = (SocketChannel) key.channel();
//拿到寫事件的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//若buffer中有數據,則刷到對端
if(buffer.hasRemaining()){
int length = channel.write(buffer);
System.out.println("Write data "+length+" byte to client.");
}else{
//若沒有數據,則繼續監聽讀事件
key.interestOps(SelectionKey.OP_READ);
}
}
} catch (IOException e) {
key.cancel();
e.printStackTrace();
}
}
}
/**
* 應答對端
* @param msg 應答消息
*/
private void reply(SocketChannel channel, String msg){
//消息編碼
byte[] bytes = msg.getBytes();
//開啓寫緩衝區
ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
//將數據寫入緩衝區
buffer.put(bytes);
//切換到讀事件
buffer.flip();
/**
* 這裏爲了避免出現寫空或寫溢出緩衝區狀況,創建寫事件監聽同時保留以前的讀監聽
* 做爲監聽的附件傳入寫操做的buffer
*/
try {
channel.register(this.selector, SelectionKey.OP_WRITE |SelectionKey.OP_READ, buffer);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
/**
* 服務端監聽運行核心業務實現
*/
@Override
public void run() {
while (this.activated){
try {
/**
* 運行到此方法阻塞,直到有事件發生再返回
* */
this.selector.select();
//獲取被監聽的事件
Set<SelectionKey> keys = this.selector.selectedKeys();
//在迭代器中,處理不一樣的事件
this.eventIterator(keys);
} catch (IOException e) {
e.printStackTrace();
this.stop();
}
}
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO網絡編程服務端啓動類
*/
public class NioServerStart {
/**
* 運行服務端監聽
* @param args
*/
public static void main(String[] args) {
String serverTag = "server: "+Constant.SERV_PORT;
NioServerHandler serverHandler = null;
try {
serverHandler = new NioServerHandler(Constant.SERV_PORT);
new Thread(serverHandler, serverTag).start();
System.out.println("Starting "+serverTag+" listening...");
} catch (Exception e) {
e.printStackTrace();
if(null != serverHandler){
serverHandler.stop();
}
}
}
}
屢次驗證結果
總結
經過以上的實戰,咱們看到NIO網絡編程實現比BIO稍微要複雜一些。面向緩衝的機制確實比面向流的機制要靈活不少;服務運行的體驗也比阻塞式IO更加流暢;獨有的選擇器機制也讓NIO能夠支撐較大併發數,但學習和開發的成本稍微高一些,項目當中能夠有選擇地使用。
目前網絡編程這塊用得比較多的優秀IO框架非Netty莫屬了,不少優秀的RPC框架的底層也基於Netty擴展和開發。下次咱們就順帶給你們展現一下Netty的網絡編程之美。