Java 併發編程-NIO 簡明教程

問題來源

在傳統的架構中,對於客戶端的每一次請求,服務器都會建立一個新的線程或者利用線程池複用去處理用戶的一個請求,而後返回給用戶結果,這樣作在高併發的狀況下會存在很是嚴重的性能問題:對於用戶的每一次請求都建立一個新的線程是須要必定內存的,同時線程之間頻繁的上下文切換也是一個很大的開銷。html

p.s: 本文涉及的完整實例代碼均可以在個人GitHub上面下載。java

什麼是Selector

NIO的核心就是Selector,讀懂了Selector就理解了異步機制的實現原理,下面先來簡單的介紹一下什麼是Selector。如今對於客戶端的每一次請求到來時咱們再也不當即建立一個線程進行處理,相反以epool爲例子當一個事件準備就緒以後經過回調機制將描述符加入到阻塞隊列中,下面只須要經過遍歷阻塞隊列對相應的事件進行處理就好了,經過這種回調機制整個過程都不須要對於每個請求都去建立一個線程去單獨處理。上面的解釋仍是有些抽象,下面我會經過具體的代碼實例來解釋,在這以前咱們先來了解一下NIO中兩個基礎概念Buffer和Channel。數組

若是你們對於多路IO複用好比select/epool徹底陌生的話,建議先讀一下個人這篇Linux下的五種IO模型 :-)服務器

Buffer

以ByteBuffer爲例子,咱們能夠經過ByteBuffer.allocate(n)來分配n個字節的緩衝區,對於緩衝區有四個重要的屬性:網絡

  1. capacity,緩衝區的容量,也就是咱們上面指定的n。
  2. position,當前指針指向的位置。
  3. mark,前一個位置,這裏咱們下面再解釋。
  4. limit,最大能讀取或者寫入的位置。

如上圖所示,Buffer實際上也是分爲兩種,一種用於寫數據,一種用於讀取數據。架構

put

經過直接閱讀ByteBuffer源碼能夠清晰看出put方法是把一個byte變量x放到緩衝區中去,同時position加1:併發

1
2
3
4
5
6
7
8
9
public ByteBuffer put( byte x) {
     hb[ix(nextPutIndex())] = x;
     return this ;
}
final int nextPutIndex() {
     if (position >= limit)
         throw new BufferOverflowException();
     return position++;
}

get

get方法是從緩衝區中讀取一個字節,同時position加一:異步

1
2
3
4
5
6
7
8
public byte get() {
     return hb[ix(nextGetIndex())];
}
final int nextGetIndex() {
     if (position >= limit)
         throw new BufferUnderflowException();
     return position++;
}

flip

若是咱們想將buffer從寫數據的狀況變成讀數據的狀況,能夠直接使用flip方法:socket

1
2
3
4
5
6
public final Buffer flip() {
     limit = position;
     position = 0 ;
     mark = - 1 ;
     return this ;
}

mark和reset

mark是記住當前的位置用的,也就是保存position的值:高併發

1
2
3
4
public final Buffer mark() {
     mark = position;
     return this ;
}

若是咱們在對緩衝區讀寫以前就調用了mark方法,那麼之後當position位置變化以後,想回到以前的位置能夠調用reset會將mark的值從新賦給position:

1
2
3
4
5
6
7
public final Buffer reset() {
     int m = mark;
     if (m < 0 )
         throw new InvalidMarkException();
     position = m;
     return this ;
}

Channel

利用NIO,當咱們讀取數據的時候,會先從buffer加載到channel,而寫入數據的時候,會先入到channel而後經過channel轉移到buffer中去。channel給咱們提供了兩個方法:經過channel.read(buffer)能夠將channel中的數據寫入到buffer中,而經過channel.write(buffer)則能夠將buffer中的數據寫入到到channel中。

Channel的話分爲四種:

  1. FileChannel從文件中讀寫數據。
  2. DatagramChannel以UDP的形式從網絡中讀寫數據。
  3. SocketChannel以TCP的形式從網絡中讀寫數據。
  4. ServerSocketChannel容許你監聽TCP鏈接。

由於今天咱們的重點是Selector,因此來看一下SocketChannel的用法。在下面的代碼利用SocketChannel模擬了一個簡單的server-client程序。

WebServer的代碼以下,和傳統的sock程序並無太多的差別,只是咱們引入了buffer和channel的概念:

1
2
3
4
5
6
7
8
9
10
11
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind( new InetSocketAddress( "127.0.0.1" , 5000 ));
SocketChannel socketChannel = ssc.accept();
ByteBuffer readBuffer = ByteBuffer.allocate( 128 );
socketChannel.read(readBuffer);
readBuffer.flip();
while (readBuffer.hasRemaining()) {
     System.out.println(( char )readBuffer.get());
}
socketChannel.close();
ssc.close();

WebClient的代碼以下:

1
2
3
4
5
6
7
8
SocketChannel socketChannel = null ;
socketChannel = SocketChannel.open();
socketChannel.connect( new InetSocketAddress( "127.0.0.1" , 5000 ));
ByteBuffer writeBuffer = ByteBuffer.allocate( 128 );
writeBuffer.put( "hello world" .getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
socketChannel.close();

Scatter / Gather

在上面的client程序中,咱們也能夠同時將多個buffer中的數據放入到一個數組後而後統一放入到channel後傳遞給服務器:

1
2
3
4
5
6
7
8
ByteBuffer buffer1 = ByteBuffer.allocate( 128 );
ByteBuffer buffer2 = ByteBuffer.allocate( 16 );
buffer1.put( "hello " .getBytes());
buffer2.put( "world" .getBytes());
buffer1.flip();
buffer2.flip();
ByteBuffer[] bufferArray = {buffer1, buffer2};
socketChannel.write(bufferArray);

Selector

經過使用selector,咱們能夠經過一個線程來同時管理多個channel,省去了建立線程以及線程之間進行上下文切換的開銷。

建立一個selector

經過調用selector類的靜態方法open咱們就能夠建立一個selector對象:

1
Selector selector = Selector.open();

註冊channel

爲了保證selector可以監聽多個channel,咱們須要將channel註冊到selector當中:

1
2
channel.configureBlocking( false );
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

咱們能夠監聽四種事件:

  1. SelectionKey.OP_CONNECT:當客戶端的嘗試鏈接到服務器
  2. SelectionKey.OP_ACCEPT:當服務器接受來自客戶端的請求
  3. SelectionKey.OP_READ:當服務器能夠從channel中讀取數據
  4. SelectionKey.OP_WRITE:當服務器能夠向channel中寫入數據

對SelectorKey調用channel方法能夠獲得key對應的channel:

1
Channel channel = key.channel();

而key自身感興趣的監聽事件也能夠經過interestOps來得到:

1
int interestSet = selectionKey.interestOps();

對selector調用selectedKeys()方法咱們能夠獲得註冊的全部key:

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();

實戰

服務器的代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind( new InetSocketAddress( "127.0.0.1" , 5000 ));
ssc.configureBlocking( false );
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer readBuff = ByteBuffer.allocate( 128 );
ByteBuffer writeBuff = ByteBuffer.allocate( 128 );
writeBuff.put( "received" .getBytes());
writeBuff.flip(); // make buffer ready for reading
while ( true ) {
     selector.select();
     Set<SelectionKey> keys = selector.selectedKeys();
     Iterator<SelectionKey> it = keys.iterator();
     while (it.hasNext()) {
         SelectionKey key = it.next();
         it.remove();
         if (key.isAcceptable()) {
             SocketChannel socketChannel = ssc.accept();
             socketChannel.configureBlocking( false );
             socketChannel.register(selector, SelectionKey.OP_READ);
         } else if (key.isReadable()) {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             readBuff.clear(); // make buffer ready for writing
             socketChannel.read(readBuff);
             readBuff.flip(); // make buffer ready for reading
             System.out.println( new String(readBuff.array()));
             key.interestOps(SelectionKey.OP_WRITE);
         } else if (key.isWritable()) {
                 writeBuff.rewind(); // sets the position back to 0
                 SocketChannel socketChannel = (SocketChannel) key.channel();
                 socketChannel.write(writeBuff);
                 key.interestOps(SelectionKey.OP_READ);
         }
     }
}

客戶端程序的代碼以下,各位讀者能夠同時在終端下面多開幾個程序來同時模擬多個請求,而對於多個客戶端的程序咱們的服務器始終只用一個線程來處理多個請求。一個很常見的應用場景就是多個用戶同時往服務器上傳文件,對於每個上傳請求咱們不在單獨去建立一個線程去處理,同時利用Executor/Future咱們也能夠不用阻塞在IO操做中而是當即返回用戶結果。

1
2
3
4
5
6
7
8
9
10
11
12
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect( new InetSocketAddress( "127.0.0.1" , 5000 ));
ByteBuffer writeBuffer = ByteBuffer.allocate( 32 );
ByteBuffer readBuffer = ByteBuffer.allocate( 32 );
writeBuffer.put( "hello" .getBytes());
writeBuffer.flip(); // make buffer ready for reading
while ( true ) {
     writeBuffer.rewind(); // sets the position back to 0
     socketChannel.write(writeBuffer); // hello
     readBuffer.clear(); // make buffer ready for writing
     socketChannel.read(readBuffer); // recieved
}
相關文章
相關標籤/搜索