在咱們進行一次網絡的讀寫處理過程當中會涉及到幾個步驟,好比客戶端和服務器進行Socket通訊:java
關於read和write函數不懂的可使用man read
或man write
查看。linux
當發生read或write系統調用後,用戶空間被會被一直阻塞,直到read或write對應的內核空間返回結果。在Java中,Socket和ServerSocket類的IO操做就是典型的阻塞IO。
阻塞IO的大體交互以下: 從上面的圖中能夠發現,在用戶空間從發起read系統調用到拿到結果這個過程是阻塞的,在內核空間中內核緩衝區等待數據,內核緩衝區複製到用戶緩衝區這兩個過程也是阻塞的。咱們能夠經過java來實現一個BIO程序:編程
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
Socket socket = serverSocket.accept();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter writer = new PrintWriter(socket.getOutputStream());
String value = reader.readLine();
System.out.println("客戶端發送的消息:" + value);
writer.write(value);
writer.flush();
}
}
複製代碼
以上代碼中 serverSocket.accept();
形成系統調用發生阻塞,在Linux系統中經過man accept
能夠查看關於accept的詳細說明(我這裏只複製部分信息):數組
[root@izbp1hvx6s6h8yr3sgj333z ~]# man accept
ACCEPT(2) Linux Programmer's Manual ACCEPT(2)
NAME
accept, accept4 - accept a connection on a socket
DESCRIPTION
The accept() system call is used with connection-based socket types (SOCK_STREAM, SOCK_SEQPACKET). It extracts the first connection request on
the queue of pending connections for the listening socket, sockfd, creates a new connected socket, and returns a new file descriptor referring
to that socket. The newly created socket is not in the listening state. The original socket sockfd is unaffected by this call.
複製代碼
accept() 系統調用與基於鏈接的套接字類型(SOCK_STREAM、SOCK_SEQPACKET)一塊兒使用。它提取第一個鏈接請求 偵聽套接字 sockfd 的掛起鏈接隊列建立一個新的鏈接套接字,並返回一個新的文件描述符引用 到那個插座。新建立的套接字未處於偵聽狀態。原始套接字 sockfd 不受此調用的影響。服務器
對於BIO模型存在的最大的問題是,每次監聽鏈接、讀寫操做都會對用戶線程形成阻塞,這個線程什麼都不能幹,只能等着,對於咱們追求高併發的系統來講有很大的限制,基於這種思想,咱們能夠藉助與多線程和線程池技術以異步的思想去將問題避開,可是同時又會帶來新的問題:多線程的頻繁上下文切換、受操做系統線程數的限制。markdown
上面的同步阻塞IO咱們發現問題的本質是阻塞,而同步非阻塞的優點就在於他能夠在不使用多線程異步的狀況下讓咱們的用戶線程不阻塞(注意:對於內核空間來講,仍是會有阻塞的,可是他不會影響到用戶線程);實現思路是,調用read函數若是數據尚未到達用戶緩衝區的話,直接返回,不阻塞,過一會來查一下read的狀態有麼有執行成功,若是沒有就再返回;不停的重複這個動做,若是調用read函數發現數據已經到達內核緩衝區了,那麼就會進行用戶緩衝區的複製,這個過程是阻塞的。網絡
咱們能夠經過Java來實現一個同步非阻塞模型的程序:多線程
public static void main(String[] args) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(9000)).configureBlocking(false);
List<SocketChannel> list = new ArrayList<>();
while (true) {
SocketChannel socket = server.accept();
if (socket != null) {
socket.configureBlocking(false);
System.out.println("已有客戶端接入...");
list.add(socket);
}
Iterator<SocketChannel> iterator = list.iterator();
while (iterator.hasNext()) {
SocketChannel channel = iterator.next();
ByteBuffer buffer = ByteBuffer.allocate(32);
int read = channel.read(buffer);
if (read > 0) {
System.out.println("收到消息:" + new String(buffer.array()));
} else if(read == -1) {
System.out.println("斷開鏈接");
iterator.remove();
}
}
}
}
複製代碼
上面設置了configureBlocking(false)
,因此在server.accept();
和channel.read(buffer);
的時候不會阻塞,用戶線程不停的進行IO系統調用,輪詢判斷數據有沒有準備好,可是這麼作存在的問題是:帶來大量的CPU的空輪詢的開銷,同時也沒法知足高併發的狀況。併發
在基於非阻塞的思想之上作了一次升級,客戶端不須要死循環去調用read函數,也不須要判斷read的數據有沒有拷貝到用戶空間,而是對於每一個Socket鏈接都添加一個事件監聽,當事件被觸發的時候,客戶端再去執行對應的操做。好比我如今要去read,可是我不直接去read,由於我不知道數據有沒有準備好,我先註冊一個監聽器,讓監聽器去監聽數據有沒有讀取完成,一旦有數據讀取完成,那麼監聽器就會告訴我數據好了你能夠去read了,此時我客戶端再調用read函數去拿數據。而這個監聽器就是多路複用器,他還能夠同時綁定多個事件。app
下面是一個多路複用模型在java中的實現:
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
serverSocket.configureBlocking(false);
// 1. 這個selector就是文章中提到的監聽器,也就是多路複用器
Selector selector = Selector.open();
// 2. 將ServerSocket綁定到selector並告訴他幫我監聽一下accept事件
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 3. 等待事件被觸發,若是沒有事件則會阻塞,由於沒有事件你再往下執行也沒意義啊
selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 若是觸發的是accept事件,說明有客戶端接入了
ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 若是觸發的是read事件,說明內核緩衝區中有數據了,能夠去讀了
SocketChannel socketChannel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = socketChannel.read(buffer);
if (read > 0) {
System.out.println("客戶端發來的數據:" + new String(buffer.array()));
} else if (read == -1){
System.out.println("客戶端斷開鏈接");
}
}
iterator.remove();
}
}
}
複製代碼
多路複用的代碼和同步非阻塞的代碼差很少,可是多了一個Selector對象,在selector對象上註冊了一個ServerSocketChannel的accept事件和SocketChannel的read事件,經過這種事件回調的方式可使一個線程來處理不少個IO操做,這個特性是依賴於Selector.select()方法,然而這個方法的底層是調用OS的select/poll/epoll函數 的這種模式雖然能夠解決阻塞的問題,可是卻多了一個問題:進行一次IO操做要發起兩次系統調用:第1次是select調用,第2次是read調用。也就是說多路複用IO不必定比BIO性能高,由於自己多路複用也會存在阻塞問題,但BIO存在的根本問題是沒法支持高併發,而在多路複用IO中能夠解決這個問題,換言之,若是個人系統不是高併發的系統的話直接使用BIO還好點,由於只涉及一次系統調用。若是要支持高併發那就可使用多路複用IO模型。
發生一次系統調用後,會有一個新的線程經過事件回調的方式將數據回傳進來,注意這裏與多路複用IO不一樣的是:多路複用IO是事件監聽,AIO是事件回調;事件監聽是說我關心的事件被觸發以後,我本身去處理,而事件回調指的是我關心的事件被觸發以後,會有一個新的線程經過回調方法將數據傳給我,不須要我本身再去拿。
在java的nio包中也提供了對AIO的支持:
public static void main(String[] args) throws Exception {
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
System.out.println("2--"+Thread.currentThread().getName());
// 再此接收客戶端鏈接,若是不寫這行代碼後面的客戶端鏈接連不上服務端
serverChannel.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
System.out.println("3--"+Thread.currentThread().getName());
buffer.flip();
System.out.println(new String(buffer.array(), 0, result));
socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
System.out.println("1--"+Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
複製代碼
AIO這種編程纔是真正意義上的異步,可是在linux系統中仍是使用epoll的方式去作的,因此這種方式就不多用了。
關於怎麼編譯openjdk能夠參考這篇文章blog.csdn.net/qq_35559877…
下面經過openjdk的源碼分析一下nio底層是怎麼實現多路複用的,其中很關鍵的代碼是以下3步:
1.Selector selector = Selector.open();
2.serverSocket.register(selector, SelectionKey.OP_ACCEPT);
3.selector.select();
複製代碼
咱們先看一下Selector.open()
作了哪些事情,點進去源碼。發現是調用DefaultSelectorProvider.create()方法,而這個類在window和linux各實現了一個版本,咱們找到openjdk的源碼到linux實現的版本中找到這個類
最終會調到EPollSelectorProvider.openSeletor()方法建立EPollSelectorImpl對象 在EPollSelectorImpl內部維護了一個EPollArrayWrapper對象,在建立EPollArrayWrapper的時候調用了epollCreate()方法,這個方法是native的,咱們找到jvm的底層實現:EPollArrayWrapper.Java_sun_nio_ch_EPollArrayWrapper_epollCreate
發現這裏是調用了epoll_create系統函數,那麼epoll_create()是幹嗎的呢,epoll_create是Linux OS的系統函數,建立一個epoll對象去實現操做系統層面的多路複用機制:
[root@izbp1hvx6s6h8yr3sgj333z ~]# man epoll_create
EPOLL_CREATE(2) Linux Programmer's Manual EPOLL_CREATE(2)
NAME
epoll_create, epoll_create1 - open an epoll file descriptor
SYNOPSIS
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_create1(int flags);
DESCRIPTION
epoll_create() creates an epoll(7) instance. Since Linux 2.6.8, the size argument is ignored, but must be greater than zero; see NOTES below.
epoll_create() returns a file descriptor referring to the new epoll instance. This file descriptor is used for all the subsequent calls to the
epoll interface. When no longer required, the file descriptor returned by epoll_create() should be closed by using close(2). When all file
descriptors referring to an epoll instance have been closed, the kernel destroys the instance and releases the associated resources for reuse.
epoll_create1()
If flags is 0, then, other than the fact that the obsolete size argument is dropped, epoll_create1() is the same as epoll_create(). The follow‐
ing value can be included in flags to obtain different behavior:
EPOLL_CLOEXEC
Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. See the description of the O_CLOEXEC flag in open(2) for reasons why
this may be useful.
RETURN VALUE
On success, these system calls return a nonnegative file descriptor. On error, -1 is returned, and errno is set to indicate the error.
複製代碼
看到這裏發現Selector.open()無非就是調用epoll_create()建立一個epoll對象;這個epoll對象對應的就是Java裏的Selector對象。接着往下看:serverSocket.register(selector, SelectionKey.OP_ACCEPT);
點進去,跳過套娃的代碼,到openjdk源碼中找到linux版本的實現:EPollSelectorImpl.implRegister() 第164行代碼是很關鍵的一步,上面在建立epoll對象的過程當中順帶建立了EPollArrayWrapper對象,在這裏會將epoll的channel對應的文件描述符放進去,也就是說每個須要註冊的channel都會被放到EPollArrayWrapper裏面。接着看
selector.select();
底層會調用到EPollArrayWrapper.poll()方法再調用到updateRegistrations()方法:
updateRegistrations()裏面又會調用一個native方法epollCtl() 使用man epoll_ctl命令查看其DESCRIPTION部分信息:
This system call performs control operations on the epoll(7) instance referred to by the file descriptor epfd. It requests that the operation
op be performed for the target file descriptor, fd.
複製代碼
翻譯一下:
該系統調用對文件描述符 epfd 引用的 epoll(7) 實例執行控制操做。它要求操做對目標文件描述符 fd 執行op。
也就是說epoll_ctl函數纔是真正的將channel與所關心的op綁定在一塊兒,緊接着是最核心的一步,在執行完updateRegistrations()後執行了epollWait本地方法
這個方法確定是調用了OS的epoll_wait函數。然而這個函數就是用來監聽epoll上所註冊的事件。返回值對應的就是Java的SelectionKey。
上面一通分析以後,作一個小小的總結:其實對於IO程序來講,jdk只是把操做系統作了一層封裝,並無本身去實現(想實現也實現不了啊,IO涉及到硬件接口,Java進程處在用戶態只能調操做系統),在調用系統函數的時候涉及到幾個函數:
epoll_create()
: 建立一個epoll對象;
epoll_ctl()
:將channel與op綁定在一塊兒;
epoll_wait()
:等待事件被觸發;
在Linux的多路複用實現中,除了epoll之外還有select和poll,這也是java nio包剛出來的時候使用的。在select的底層實現是用數組,當有事件發生的時候會將數組中的全部文件描述符都循環一遍,時間複雜度爲O(n);假設我如今一共有1w個鏈接,可是每次會觸發IO操做的只有10個,就會存在9990次無效的循環,而且因爲他是經過數組實現的,因此他支持的鏈接數是有限的。poll在select的基礎上稍稍作了一點改進,將實現方式改成鏈表,沒有鏈接上限,可是查詢方式仍是基於循環去作的也是O(n)。而epoll是使用哈希表,當有事件發生時經過水平觸發的方式對fd進行回調,時間複雜度爲O(1)。
含淚播種的人必定能含笑收穫。