Java網絡編程與NIO詳解2:JAVA NIO 一步步構建I/O多路複用的請求模型

<font color="#333333" face="PingFangSC, helvetica neue, hiragino sans gb, arial, microsoft yahei ui, microsoft yahei, simsun, sans-serif">微信公衆號【黃小斜】做者是螞蟻金服 JAVA 工程師,專一於 JAVA 後端技術棧:SpringBoot、SSM全家桶、MySQL、分佈式、中間件、微服務,同時也懂點投資理財,堅持學習和寫做,相信終身學習的力量!關注公衆號後回覆」架構師「便可領取 Java基礎、進階、項目和架構師等免費學習資料,更有數據庫、分佈式、微服務等熱門技術學習視頻,內容豐富,兼顧原理和實踐,另外也將贈送做者原創的Java學習指南、Java程序員面試指南等乾貨資源。</font>

[](https://github.com/jasonGeng8...

  1. jdk == 1.8

[](https://github.com/jasonGeng8...

git 地址:https://github.com/jasonGeng88/java-network-programmingjava

[](https://github.com/jasonGeng8...

  • nio 下 I/O 阻塞與非阻塞實現
  • SocketChannel 介紹
  • I/O 多路複用的原理
  • 事件選擇器與 SocketChannel 的關係
  • 事件監聽類型
  • 字節緩衝 ByteBuffer 數據結構

[](https://github.com/jasonGeng8...

接着上一篇中的站點訪問問題,若是咱們須要併發訪問10個不一樣的網站,咱們該如何處理?git

在上一篇中,咱們使用了java.net.socket類來實現了這樣的需求,以一線程處理一鏈接的方式,並配以線程池的控制,貌似獲得了當前的最優解。但是這裏也存在一個問題,鏈接處理是同步的,也就是併發數量增大後,大量請求會在隊列中等待,或直接異常拋出。程序員

爲解決這問題,咱們發現元兇處在「一線程一請求」上,若是一個線程能同時處理多個請求,那麼在高併發下性能上會大大改善。這裏就借住 JAVA 中的 nio 技術來實現這一模型。github

[](https://github.com/jasonGeng8... 的阻塞實現

關於什麼是 nio,從字面上理解爲 New IO,就是爲了彌補本來 I/O 上的不足,而在 JDK 1.4 中引入的一種新的 I/O 實現方式。簡單理解,就是它提供了 I/O 的阻塞與非阻塞的兩種實現方式(_固然,默認實現方式是阻塞的。_)。面試

下面,咱們先來看下 nio 以阻塞方式是如何處理的。數據庫

[](https://github.com/jasonGeng8...

有了上一篇 socket 的經驗,咱們的第一步必定也是創建 socket 鏈接。只不過,這裏不是採用 new socket() 的方式,而是引入了一個新的概念 SocketChannel。它能夠看做是 socket 的一個完善類,除了提供 Socket 的相關功能外,還提供了許多其餘特性,如後面要講到的向選擇器註冊的功能。後端

類圖以下: 性能優化

創建鏈接代碼實現:微信

<pre>// 初始化 socket,創建 socket 與 channel 的綁定關係
SocketChannel socketChannel = SocketChannel.open();
// 初始化遠程鏈接地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 處理設置阻塞,這也是默認的方式,可不設置
socketChannel.configureBlocking(true);
// 創建鏈接
socketChannel.connect(remote);</pre>數據結構

[](https://github.com/jasonGeng8... socket 鏈接

由於是一樣是 I/O 阻塞的實現,因此後面的關於 socket 輸入輸出流的處理,和上一篇的基本相同。惟一差異是,這裏須要經過 channel 來獲取 socket 鏈接。

  • 獲取 socket 鏈接

<pre>Socket socket = socketChannel.socket();</pre>

  • 處理輸入輸出流

<pre>PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());</pre>

[](https://github.com/jasonGeng8...

<pre>package com.jason.network.mode.nio;

import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;

public class NioBlockingHttpClient {

private SocketChannel socketChannel;
private String host;

public static void main(String[] args) throws IOException {

    for (String host: HttpConstant.HOSTS) {

        NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);
        client.request();

    }

}

public NioBlockingHttpClient(String host, int port) throws IOException {
    this.host = host;
    socketChannel = SocketChannel.open();
    socketChannel.socket().setSoTimeout(5000);
    SocketAddress remote = new InetSocketAddress(this.host, port);
    this.socketChannel.connect(remote);
}

public void request() throws IOException {
    PrintWriter pw = getWriter(socketChannel.socket());
    BufferedReader br = getReader(socketChannel.socket());

    pw.write(HttpUtil.compositeRequest(host));
    pw.flush();
    String msg;
    while ((msg = br.readLine()) != null){
        System.out.println(msg);
    }
}

private PrintWriter getWriter(Socket socket) throws IOException {
    OutputStream out = socket.getOutputStream();
    return new PrintWriter(out);
}

private BufferedReader getReader(Socket socket) throws IOException {
    InputStream in = socket.getInputStream();
    return new BufferedReader(new InputStreamReader(in));
}

}</pre>

[](https://github.com/jasonGeng8... 的非阻塞實現

[](https://github.com/jasonGeng8...

nio 的阻塞實現,基本與使用原生的 socket 相似,沒有什麼特別大的差異。

下面咱們來看看它真正強大的地方。到目前爲止,咱們將的都是阻塞 I/O。何爲阻塞 I/O,看下圖:

咱們主要觀察圖中的前三種 I/O 模型,關於異步 I/O,通常須要依靠操做系統的支持,這裏不討論。

從圖中能夠發現,阻塞過程主要發生在兩個階段上:

  • 第一階段:等待數據就緒;
  • 第二階段:將已就緒的數據從內核緩衝區拷貝到用戶空間;

這裏產生了一個從內核到用戶空間的拷貝,主要是爲了系統的性能優化考慮。假設,從網卡讀到的數據直接返回給用戶空間,那勢必會形成頻繁的系統中斷,由於從網卡讀到的數據不必定是完整的,可能斷斷續續的過來。經過內核緩衝區做爲緩衝,等待緩衝區有足夠的數據,或者讀取完結後,進行一次的系統中斷,將數據返回給用戶,這樣就能避免頻繁的中斷產生。

瞭解了 I/O 阻塞的兩個階段,下面咱們進入正題。看看一個線程是如何實現同時處理多個 I/O 調用的。從上圖中的非阻塞 I/O 能夠看出,僅僅只有第二階段須要阻塞,第一階段的數據等待過程,咱們是不須要關心的。不過該模型是頻繁地去檢查是否就緒,形成了 CPU 無效的處理,反而效果很差。若是有一種相似的好萊塢原則— 「不要給咱們打電話,咱們會打給你」 。這樣一個線程能夠同時發起多個 I/O 調用,而且不須要同步等待數據就緒。在數據就緒完成的時候,會以事件的機制,來通知咱們。這樣不就實現了單線程同時處理多個 IO 調用的問題了嗎?即所說的「I/O 多路複用模型」。

    • *

廢話講了一大堆,下面就來實際操刀一下。

[](https://github.com/jasonGeng8...

由上面分析能夠,咱們得有一個選擇器,它能監聽全部的 I/O 操做,而且以事件的方式通知咱們哪些 I/O 已經就緒了。

代碼以下:

<pre>import java.nio.channels.Selector;

...

private static Selector selector;
static {

try {
    selector = Selector.open();
} catch (IOException e) {
    e.printStackTrace();
}

}
</pre>

[](https://github.com/jasonGeng8... I/O

下面,咱們來建立一個非阻塞的 SocketChannel,代碼與阻塞實現類型,惟一不一樣是socketChannel.configureBlocking(false)

注意:只有在socketChannel.configureBlocking(false)以後的代碼,纔是非阻塞的,若是socketChannel.connect()在設置非阻塞模式以前,那麼鏈接操做依舊是阻塞調用的。

<pre>SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 設置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);</pre>

[](https://github.com/jasonGeng8... socket 的關聯

選擇器與 socket 都建立好了,下一步就是將二者進行關聯,好讓選擇器和監聽到 Socket 的變化。這裏採用了以 SocketChannel 主動註冊到選擇器的方式進行關聯綁定,這也就解釋了,爲何不直接new Socket(),而是以SocketChannel的方式來建立 socket。

代碼以下:

<pre>socketChannel.register(selector,

SelectionKey.OP_CONNECT
                    | SelectionKey.OP_READ
                    | SelectionKey.OP_WRITE);</pre>

上面代碼,咱們將 socketChannel 註冊到了選擇器中,而且對它的鏈接、可讀、可寫事件進行了監聽。

具體的事件監聽類型以下:

操做類型 描述 所屬對象
OP_READ 1 << 0 讀操做 SocketChannel
OP_WRITE 1 << 2 寫操做 SocketChannel
OP_CONNECT 1 << 3 鏈接socket操做 SocketChannel
OP_ACCEPT 1 << 4 接受socket操做 ServerSocketChannel

[](https://github.com/jasonGeng8... socket 變化

如今,選擇器已經與咱們關心的 socket 進行了關聯。下面就是感知事件的變化,而後調用相應的處理機制。

這裏與 Linux 下的 selector 有點不一樣,nio 下的 selecotr 不會去遍歷全部關聯的 socket。咱們在註冊時設置了咱們關心的事件類型,每次從選擇器中獲取的,只會是那些符合事件類型,而且完成就緒操做的 socket,減小了大量無效的遍歷操做。

public void select() throws IOException {
    // 獲取就緒的 socket 個數
    while (selector.select() > 0){

        // 獲取符合的 socket 在選擇器中對應的事件句柄 key
        Set keys = selector.selectedKeys();

        // 遍歷全部的key
        Iterator it = keys.iterator();
        while (it.hasNext()){

            // 獲取對應的 key,並從已選擇的集合中移除
            SelectionKey key = (SelectionKey)it.next();
            it.remove();

            if (key.isConnectable()){
                // 進行鏈接操做
                connect(key);
            }
            else if (key.isWritable()){
                // 進行寫操做
                write(key);
            }
            else if (key.isReadable()){
                // 進行讀操做
                receive(key);
            }
        }
    }
}

注意:這裏的selector.select()是同步阻塞的,等待有事件發生後,纔會被喚醒。這就防止了 CPU 空轉的產生。固然,咱們也能夠給它設置超時時間,selector.select(long timeout)來結束阻塞過程。

[](https://github.com/jasonGeng8...

下面,咱們分別來看下,一個 socket 是如何來處理鏈接、寫入數據和讀取數據的(_這些操做都是阻塞的過程,只是咱們將等待就緒的過程變成了非阻塞的了_)。

處理鏈接代碼:

<pre>// SelectionKey 表明 SocketChannel 在選擇器中註冊的事件句柄
private void connect(SelectionKey key) throws IOException {

// 獲取事件句柄對應的 SocketChannel
SocketChannel channel = (SocketChannel) key.channel();

// 真正的完成 socket 鏈接

channel.finishConnect();

// 打印鏈接信息

InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("訪問地址: %s:%s 鏈接成功!", host, port));

}</pre>

[](https://github.com/jasonGeng8...

<pre>// 字符集處理類
private Charset charset = Charset.forName("utf8");

private void write(SelectionKey key) throws IOException {

SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();

// 獲取 HTTP 請求,同上一篇
String request = HttpUtil.compositeRequest(host);

// 向 SocketChannel 寫入事件 
channel.write(charset.encode(request));

// 修改 SocketChannel 所關心的事件
key.interestOps(SelectionKey.OP_READ);

}</pre>

這裏有兩個地方須要注意:

  • 第一個是使用 channel.write(charset.encode(request)); 進行數據寫入。有人會說,爲何不能像上面同步阻塞那樣,經過PrintWriter包裝類進行操做。由於PrintWriter的 write() 方法是阻塞的,也就是說要等數據真正從 socket 發送出去後才返回。

這與咱們這裏所講的阻塞是不一致的,這裏的操做雖然也是阻塞的,但它發生的過程是在數據從用戶空間到內核緩衝區拷貝過程。至於系統將緩衝區的數據經過 socket 發送出去,這不在阻塞範圍內。也解釋了爲何要用 Charset 對寫入內容進行編碼了,由於緩衝區接收的格式是ByteBuffer

  • 第二,選擇器用來監聽事件變化的兩個參數是 interestOps 與 readyOps

    • interestOps:表示 SocketChannel 所關心的事件類型,也就是告訴選擇器,當有這幾種事件發生時,纔來通知我。這裏經過key.interestOps(SelectionKey.OP_READ);告訴選擇器,以後我只關心「讀就緒」事件,其餘的不用通知我了。
    • readyOps:表示 SocketChannel 當前就緒的事件類型。以key.isReadable()爲例,判斷依據就是:return (readyOps() & OP_READ) != 0;

[](https://github.com/jasonGeng8...

<pre>private void receive(SelectionKey key) throws IOException {

SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString();

// 當再沒有數據可讀時,取消在選擇器中的關聯,並關閉 socket 鏈接
if ("".equals(receiveData)) {
    key.cancel();
    channel.close();
    return;
}

System.out.println(receiveData);

}</pre>

這裏的處理基本與寫入一致,惟一要注意的是,這裏咱們須要自行處理去緩衝區讀取數據的操做。首先會分配一個固定大小的緩衝區,而後從內核緩衝區中,拷貝數據至咱們剛分配固定緩衝區上。這裏存在兩種狀況:

  • 咱們分配的緩衝區過大,那多餘的部分以0補充(_初始化時,其實會自動補0_)。
  • 咱們分配的緩衝去太小,由於選擇器會不停的遍歷。只要 SocketChannel 處理讀就緒狀態,那下一次會繼續讀取。固然,分配太小,會增長遍歷次數。

最後,將一下 ByteBuffer 的結構,它主要有 position, limit,capacity 以及 mark 屬性。以 buffer.flip(); 爲例,講下各屬性的做用(_mark 主要是用來標記以前 position 的位置,是在當前 postion 沒法知足的狀況下使用的,這裏不做討論_)。

從圖中看出,

  • 容量(capacity):表示緩衝區能夠保存的數據容量;
  • 極限(limit):表示緩衝區的當前終點,即寫入、讀取都不可超過該重點;
  • 位置(position):表示緩衝區下一個讀寫單元的位置;

[](https://github.com/jasonGeng8...

<pre>package com.jason.network.mode.nio;

import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NioNonBlockingHttpClient {

private static Selector selector;
private Charset charset = Charset.forName("utf8");

static {
    try {
        selector = Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) throws IOException {

    NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();

    for (String host: HttpConstant.HOSTS) {

        client.request(host, HttpConstant.PORT);

    }

    client.select();

}

public void request(String host, int port) throws IOException {
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.socket().setSoTimeout(5000);
    SocketAddress remote = new InetSocketAddress(host, port);
    socketChannel.configureBlocking(false);
    socketChannel.connect(remote);
    socketChannel.register(selector,
                    SelectionKey.OP_CONNECT
                    | SelectionKey.OP_READ
                    | SelectionKey.OP_WRITE);
}

public void select() throws IOException {
    while (selector.select(500) > 0){
        Set keys = selector.selectedKeys();

        Iterator it = keys.iterator();

        while (it.hasNext()){

            SelectionKey key = (SelectionKey)it.next();
            it.remove();

            if (key.isConnectable()){
                connect(key);
            }
            else if (key.isWritable()){
                write(key);
            }
            else if (key.isReadable()){
                receive(key);
            }
        }
    }
}

private void connect(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    channel.finishConnect();
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();
    int port = remote.getPort();
    System.out.println(String.format("訪問地址: %s:%s 鏈接成功!", host, port));
}

private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();

    String request = HttpUtil.compositeRequest(host);
    System.out.println(request);

    channel.write(charset.encode(request));
    key.interestOps(SelectionKey.OP_READ);
}

private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);
    buffer.flip();
    String receiveData = charset.decode(buffer).toString();

    if ("".equals(receiveData)) {
        key.cancel();
        channel.close();
        return;
    }

    System.out.println(receiveData);
}

}
</pre>

[](https://github.com/jasonGeng8...

[](https://github.com/jasonGeng8...

本文從 nio 的阻塞方式講起,介紹了阻塞 I/O 與非阻塞 I/O 的區別,以及在 nio 下是如何一步步構建一個 IO 多路複用的模型的客戶端。文中須要理解的內容比較多,若是有理解錯誤的地方,歡迎指正~

補充1:基於NIO的多路複用客戶端(線程池版)

<pre>public static void main(String[] args) {

基於線程池的僞異步NIO模型 a = new 基於線程池的僞異步NIO模型();

a.startServer(); }
private Charset charset = Charset.forName("utf8"); class WriteThread implements Runnable {

private SelectionKey key;

public WriteThread(SelectionKey key) {

this.key = key;

}

@Override

public void run() {

SocketChannel socketChannel = (SocketChannel) key.channel();

Socket socket = socketChannel.socket();
try {

socketChannel.finishConnect();

} catch (IOException e) {

e.printStackTrace();

}

InetSocketAddress remote = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();

String host = remote.getHostName();
int port = remote.getPort();
System._out_.println(String.format("訪問地址: %s:%s 鏈接成功!", host, port)); }
}
class ReadThread implements Runnable {

private SelectionKey key;

public ReadThread(SelectionKey key) {

this.key = key;

}

@Override

public void run() {

SocketChannel socketChannel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
try {

socketChannel.read(buffer);

} catch (IOException e) {

e.printStackTrace();

}

buffer.flip();

String receiveData = null;
try {

receiveData = new String(buffer.array(), "utf8");

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

if ("".equals(receiveData)) {
        key.cancel();

try {

socketChannel.close();

} catch (IOException e) {

e.printStackTrace();

}

return;

}

System._out_.println(receiveData);

}
}
class ConnectThread implements Runnable {

private SelectionKey key;

public ConnectThread(SelectionKey key) {

this.key = key;

}

@Override

public void run() {

SocketChannel socketChannel = (SocketChannel) key.channel();

ByteBuffer byteBuffer = charset.encode("hello world");
try {

socketChannel.write(byteBuffer);

System._out_.println("hello world");
} catch (IOException e) {

e.printStackTrace();

}

key.interestOps(SelectionKey._OP_READ_);

}
}
public void startServer() {

ExecutorService executorService = Executors.newFixedThreadPool(10);

try {

SocketChannel socketChannel = SocketChannel.open();

Selector selector = Selector.open(); socketChannel.configureBlocking(false);
InetSocketAddress inetAddress = new InetSocketAddress(1234); socketChannel.connect(inetAddress);
socketChannel.register(selector, SelectionKey._OP_CONNECT_ |

SelectionKey._OP_READ_ |
            SelectionKey._OP_WRITE_);   while (selector.select(500) > 0) {
        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

while (keys.hasNext()) {

SelectionKey key = keys.next();

if (key.isConnectable()) {

executorService.submit(new ConnectThread(key));

}else if(key.isReadable()) {

executorService.submit(new ReadThread(key));

}else {

executorService.submit(new WriteThread(key));

}

}
    }

} catch (IOException e) {
    e.printStackTrace();

}
}</pre>

[](https://github.com/jasonGeng8...:基於NIO的多路複用服務端

<pre>class NioNonBlockingHttpServer {

private static Selector _selector_;

private Charset charset = Charset.forName("utf8"); static {

try {
        _selector_ = Selector.open();

} catch (IOException e) {

e.printStackTrace();

}

}

public static void main(String[] args) throws IOException {

    NioNonBlockingHttpServer httpServer = new NioNonBlockingHttpServer();

httpServer.select(); }

public void request(int port) throws IOException {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().setSoTimeout(5000);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8383)); // serverSocketChannel.register(selector, // SelectionKey.OP_CONNECT // | SelectionKey.OP_READ // | SelectionKey.OP_WRITE);
}

public void select() throws IOException {
    while (_selector_.select(500) > 0) {
        Set keys = _selector_.selectedKeys();    Iterator it = keys.iterator();   while (it.hasNext()) {

            SelectionKey key = (SelectionKey) it.next();

it.remove(); if (key.isAcceptable()) {

accept(key);

} else if (key.isWritable()) {

write(key);

} else if (key.isReadable()) {

receive(key);

}

}
    }
}

private void accept(SelectionKey key) throws IOException {
    SocketChannel socketChannel;

ServerSocketChannel channel = (ServerSocketChannel) key.channel();
socketChannel = channel.accept();//接受鏈接請求
socketChannel.configureBlocking(false); socketChannel.register(_selector_, SelectionKey._OP_READ_ | SelectionKey._OP_WRITE_); InetSocketAddress local = (InetSocketAddress) channel.socket().getLocalSocketAddress();
String host = local.getHostName();
int port = local.getPort();
System._out_.println(String.format("請求地址: %s:%s 接收成功!", host, port)); }

private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();    InetSocketAddress local = (InetSocketAddress) channel.socket().getRemoteSocketAddress();

String host = local.getHostName();
String msg = "hello Client";
channel.write(charset.encode(msg)); System._out_.println(msg);
key.interestOps(SelectionKey._OP_READ_);
}

private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString(); if ("".equals(receiveData)) {

key.cancel();

channel.close();
return; }

System._out_.println(receiveData);

}}</pre>

相關文章
相關標籤/搜索