JAVA NIO詳解

前言

本篇主要講解Java中的IO機制和網絡通信中處理高併發的NIOcss

分爲兩塊:
第一塊講解多線程下的IO機制
第二塊講解如何在IO機制下優化CPU資源的浪費(New IO)java

Echo服務器

單線程下的socket機制就不用我介紹了,不懂得能夠去查閱下資料
那麼多線程下,若是進行套接字的使用呢?
咱們使用最簡單的echo服務器來幫助你們理解編程

首先,來看下多線程下服務端和客戶端的工做流程圖:數組

clipboard.png

能夠看到,多個客戶端同時向服務端發送請求
服務端作出的措施是開啓多個線程來匹配相對應的客戶端
而且每一個線程去獨自完成他們的客戶端請求緩存

原理講完了咱們來看下是如何實現的
在這裏我寫了一個簡單的服務器
用到了線程池的技術來建立線程(具體代碼做用我已經加了註釋):服務器

public class MyServer { private static ExecutorService executorService = Executors.newCachedThreadPool(); //建立一個線程池 private static class HandleMsg implements Runnable{ //一旦有新的客戶端請求,建立這個線程進行處理 Socket client; //建立一個客戶端 public HandleMsg(Socket client){ //構造傳參綁定 this.client = client; } @Override public void run() { BufferedReader bufferedReader = null; //建立字符緩存輸入流 PrintWriter printWriter = null; //建立字符寫入流 try { bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream())); //獲取客戶端的輸入流 printWriter = new PrintWriter(client.getOutputStream(),true); //獲取客戶端的輸出流,true是隨時刷新 String inputLine = null; long a = System.currentTimeMillis(); while ((inputLine = bufferedReader.readLine())!=null){ printWriter.println(inputLine); } long b = System.currentTimeMillis(); System.out.println("此線程花費了:"+(b-a)+"秒!"); } catch (IOException e) { e.printStackTrace(); }finally { try { bufferedReader.close(); printWriter.close(); client.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) throws IOException { //服務端的主線程是用來循環監聽客戶端請求 ServerSocket server = new ServerSocket(8686); //建立一個服務端且端口爲8686 Socket client = null; while (true){ //循環監聽 client = server.accept(); //服務端監聽到一個客戶端請求 System.out.println(client.getRemoteSocketAddress()+"地址的客戶端鏈接成功!"); executorService.submit(new HandleMsg(client)); //將該客戶端請求經過線程池放入HandlMsg線程中進行處理 } } }

上述代碼中咱們使用一個類編寫了一個簡單的echo服務器
在主線程中用死循環來開啓端口監聽網絡

簡單客戶端

有了服務器,咱們就能夠對其進行訪問,而且發送一些字符串數據
服務器的功能是返回這些字符串,而且打印出線程佔用時間多線程

下面來寫個簡單的客戶端來響應服務端:併發

public class MyClient { public static void main(String[] args) throws IOException { Socket client = null; PrintWriter printWriter = null; BufferedReader bufferedReader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost",8686)); printWriter = new PrintWriter(client.getOutputStream(),true); printWriter.println("hello"); printWriter.flush(); bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream())); //讀取服務器返回的信息並進行輸出 System.out.println("來自服務器的信息是:"+bufferedReader.readLine()); } catch (IOException e) { e.printStackTrace(); }finally { printWriter.close(); bufferedReader.close(); client.close(); } } }

代碼中,咱們用字符流發送了一個hello字符串過去,若是代碼沒問題
服務器會返回一個hello數據,而且打印出咱們設置的日誌信息異步

echo服務器結果展現

咱們來運行:
1.打開server,開啓循環監聽:

clipboard.png

2.打開一個客戶端:

clipboard.png

能夠看到客戶端打印出了返回結果

3.查看服務端日誌:

clipboard.png

很好,一個簡單的多線程套接字編程就實現了

可是試想一下:
若是一個客戶端請求中,在IO寫入到服務端過程當中加入Sleep,
使每一個請求佔用服務端線程10秒
而後有大量的客戶端請求,每一個請求都佔用那麼長時間
那麼服務端的並能能力就會大幅度降低
這並非由於服務端有多少繁重的任務,而僅僅是由於服務線程在等待IO(由於accept,read,write都是阻塞式的)
讓高速運行的CPU去等待及其低效的網絡IO是很是不合算的行爲

這時候該怎麼辦?

NIO

New IO成功的解決了上述問題,它是怎樣解決的呢?
IO處理客戶端請求的最小單位是線程
而NIO使用了比線程還小一級的單位:通道(Channel)
能夠說,NIO中只須要一個線程就能完成全部接收,讀,寫等操做

要學習NIO,首先要理解它的三大核心
Selector,選擇器
Buffer,緩衝區
Channel,通道

博主不才,畫了張醜圖給你們加深下印象 ^ . ^

clipboard.png

再給一張TCP下的NIO工做流程圖(好難畫的線條...)

clipboard.png

你們大體看懂就行,咱們一步步來

Buffer

首先要知道什麼是Buffer
在NIO中數據交互再也不像IO機制那樣使用流
而是使用Buffer(緩衝區)

博主以爲圖纔是最容易理解的
因此...

clipboard.png

能夠看出Buffer在整個工做流程中的位置

buffer其實是一個容器,一個連續數組,它經過幾個變量來保存這個數據的當前位置狀態:
1.capacity:容量,緩衝區能容納元素的數量
2.position:當前位置,是緩衝區中下一次發生讀取和寫入操做的索引,當前位置經過大多數讀寫操做向前推動
3.limit:界限,是緩衝區中最後一個有效位置以後下一個位置的索引
如圖:

clipboard.png

幾個經常使用方法:

.flip() //將limit設置爲position,而後position重置爲0,返回對緩衝區的引用 .clear() //清空調用緩衝區並返回對緩衝區的引用

來點實際點的,上面圖中的具體代碼以下:

1.首先給Buffer分配空間,以字節爲單位

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

建立一個ByteBuffer對象而且指定內存大小

2.向Buffer中寫入數據:

1).數據從Channel到Buffer:channel.read(byteBuffer); 2).數據從Client到Buffer:byteBuffer.put(...);

3.從Buffer中讀取數據:

1).數據從Buffer到Channel:channel.write(byteBuffer); 2).數據從Buffer到Server:byteBuffer.get(...);

Selector

選擇器是NIO的核心,它是channel的管理者
經過執行select()阻塞方法,監聽是否有channel準備好
一旦有數據可讀,此方法的返回值是SelectionKey的數量

因此服務端一般會死循環執行select()方法,直到有channl準備就緒,而後開始工做
每一個channel都會和Selector綁定一個事件,而後生成一個SelectionKey的對象
須要注意的是:
channel和Selector綁定時,channel必須是非阻塞模式
而FileChannel不能切換到非阻塞模式,由於它不是套接字通道,因此FileChannel不能和Selector綁定事件

在NIO中一共有四種事件:
1.SelectionKey.OP_CONNECT:鏈接事件
2.SelectionKey.OP_ACCEPT:接收事件
3.SelectionKey.OP_READ:讀事件
4.SelectionKey.OP_WRITE:寫事件

Channel

共有四種通道:
FileChannel:做用於IO文件流
DatagramChannel:做用於UDP協議
SocketChannel:做用於TCP協議
ServerSocketChannel:做用於TCP協議

本篇文章經過經常使用的TCP協議來說解NIO

咱們以ServerSocketChannel爲例:

打開一個ServerSocketChannel通道

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

關閉ServerSocketChannel通道:

serverSocketChannel.close();

循環監聽SocketChannel:

while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); }

clientChannel.configureBlocking(false);語句是將此通道設置爲非阻塞,也就是異步
自由控制阻塞或非阻塞即是NIO的特性之一

SelectionKey

SelectionKey是通道和選擇器交互的核心組件
好比在SocketChannel上綁定一個Selector,並註冊爲鏈接事件:

SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); clientChannel.connect(new InetSocketAddress(port)); clientChannel.register(selector, SelectionKey.OP_CONNECT);

核心在register()方法,它返回一個SelectionKey對象
來檢測channel事件是那種事件可使用如下方法:

selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();

服務端即是經過這些方法 在輪詢中執行相對應操做

固然經過Channel與Selector綁定的key也能夠反過來拿到他們

Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();

在Channel上註冊事件時,咱們也能夠順帶綁定一個Buffer:

clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));

或者綁定一個Object:

selectionKey.attach(Object); Object anthorObj = selectionKey.attachment();

NIO的TCP服務端

講了這麼多,都是理論
咱們來看下最簡單也是最核心的代碼(加那麼多註釋很不優雅,但方便你們看懂):

package cn.blog.test.NioTest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class MyNioServer { private Selector selector; //建立一個選擇器 private final static int port = 8686; private final static int BUF_SIZE = 10240; private void initServer() throws IOException { //建立通道管理器對象selector this.selector=Selector.open(); //建立一個通道對象channel ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); //將通道設置爲非阻塞 channel.socket().bind(new InetSocketAddress(port)); //將通道綁定在8686端口 //將上述的通道管理器和通道綁定,併爲該通道註冊OP_ACCEPT事件 //註冊事件後,當該事件到達時,selector.select()會返回(一個key),若是該事件沒到達selector.select()會一直阻塞 SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT); while (true){ //輪詢 selector.select(); //這是一個阻塞方法,一直等待直到有數據可讀,返回值是key的數量(能夠有多個) Set keys = selector.selectedKeys(); //若是channel有數據了,將生成的key訪入keys集合中 Iterator iterator = keys.iterator(); //獲得這個keys集合的迭代器 while (iterator.hasNext()){ //使用迭代器遍歷集合 SelectionKey key = (SelectionKey) iterator.next(); //獲得集合中的一個key實例 iterator.remove(); //拿到當前key實例以後記得在迭代器中將這個元素刪除,很是重要,不然會出錯 if (key.isAcceptable()){ //判斷當前key所表明的channel是否在Acceptable狀態,若是是就進行接收 doAccept(key); }else if (key.isReadable()){ doRead(key); }else if (key.isWritable() && key.isValid()){ doWrite(key); }else if (key.isConnectable()){ System.out.println("鏈接成功!"); } } } } public void doAccept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); System.out.println("ServerSocketChannel正在循環監聽"); SocketChannel clientChannel = serverChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(key.selector(),SelectionKey.OP_READ); } public void doRead(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); long bytesRead = clientChannel.read(byteBuffer); while (bytesRead>0){ byteBuffer.flip(); byte[] data = byteBuffer.array(); String info = new String(data).trim(); System.out.println("從客戶端發送過來的消息是:"+info); byteBuffer.clear(); bytesRead = clientChannel.read(byteBuffer); } if (bytesRead==-1){ clientChannel.close(); } } public void doWrite(SelectionKey key) throws IOException { ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); byteBuffer.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); while (byteBuffer.hasRemaining()){ clientChannel.write(byteBuffer); } byteBuffer.compact(); } public static void main(String[] args) throws IOException { MyNioServer myNioServer = new MyNioServer(); myNioServer.initServer(); } } 

我打印了監聽channel,告訴你們ServerSocketChannel是在何時開始運行的
若是配合NIO客戶端的debug,就能很清楚的發現,進入select()輪詢前
雖然已經有了ACCEPT事件的KEY,但select()默認並不會去調用
而是要等待有其它感興趣事件被select()捕獲以後,纔會去調用ACCEPT的SelectionKey
這時候ServerSocketChannel纔開始進行循環監聽

也就是說一個Selector中,始終保持着ServerSocketChannel的運行
serverChannel.accept();真正作到了異步(在initServer方法中的channel.configureBlocking(false);)
若是沒有接受到connect,會返回一個null
若是成功鏈接了一個SocketChannel,則此SocketChannel會註冊寫入(READ)事件
而且設置爲異步

NIO的TCP客戶端

有服務端一定有客戶端
其實若是能徹底理解了服務端
客戶端的代碼大同小異

package cn.blog.test.NioTest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class MyNioClient { private Selector selector; //建立一個選擇器 private final static int port = 8686; private final static int BUF_SIZE = 10240; private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE); private void initClient() throws IOException { this.selector = Selector.open(); SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); clientChannel.connect(new InetSocketAddress(port)); clientChannel.register(selector, SelectionKey.OP_CONNECT); while (true){ selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if (key.isConnectable()){ doConnect(key); }else if (key.isReadable()){ doRead(key); } } } } public void doConnect(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); if (clientChannel.isConnectionPending()){ clientChannel.finishConnect(); } clientChannel.configureBlocking(false); String info = "服務端你好!!"; byteBuffer.clear(); byteBuffer.put(info.getBytes("UTF-8")); byteBuffer.flip(); clientChannel.write(byteBuffer); //clientChannel.register(key.selector(),SelectionKey.OP_READ); clientChannel.close(); } public void doRead(SelectionKey key) throws IOException { SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.read(byteBuffer); byte[] data = byteBuffer.array(); String msg = new String(data).trim(); System.out.println("服務端發送消息:"+msg); clientChannel.close(); key.selector().close(); } public static void main(String[] args) throws IOException { MyNioClient myNioClient = new MyNioClient(); myNioClient.initClient(); } } 

輸出結果

這裏我打開一個服務端,兩個客戶端:

clipboard.png

接下來,你能夠試下同時打開一千個客戶端,只要你的CPU夠給力,服務端就不可能由於阻塞而下降性能

以上即是Java NIO的基礎詳解
謝謝閱讀和關注~

相關文章
相關標籤/搜索