從零講解搭建一個NIO消息服務端

本文首發於本博客,如需轉載,請申明出處.java

假設

假設你已經瞭解並實現過了一些OIO消息服務端,並對異步消息服務端更有興趣,那麼本文或許能帶你更好的入門,並瞭解JDK部分源碼的關係流程,正如題目所說,筆者將竟可能還原,以初學者能理解的角度,講訴並構建一個NIO消息服務端。git

啓動通道並註冊選擇器

啓動模式

感謝Java一直在持續更新,對應的各個API也作得愈來愈好了,咱們本次生成 服務端套接字通道 也是使用到JDK提供的一個方式 open ,咱們將啓動一個 ServerSocketChannel ,他是一個 支持同步異步模式 的 服務端套接字通道 。github

它是一個抽象類,官方給了推薦的方式 open 來開啓一個咱們須要的 服務端套接字通道實例 。(以下的官方源碼相關注釋)服務器

/** * A selectable channel for stream-oriented listening sockets. */ public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel { /** * Opens a server-socket channel. */ public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); } } 

那麼好了,咱們如今能夠肯定咱們第一步的代碼是什麼樣子的了!沒錯,和你想象中的同樣,這很簡單。網絡

public class NioServer { public void server(int port) throws IOException{ //一、打開服務器套接字通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); } } 

本節的重點是 啓動模式 ,那麼這意味着,咱們須要向 ServerSocketChannel 進行標識,那麼它是否提供了對用的方法設置 同步異步(阻塞非阻塞) 呢?框架

這很明顯,它是提供的,這也是它的核心功能之一,其實應該是它繼承的 父抽象類AbstractSelectableChannel 的實現方法: configgureBlocking(Boolean),這個方法將標識咱們的 服務端套接字通道 是否阻塞模式。(以下的官方源碼相關注釋)異步

/** * Base implementation class for selectable channels. */ public abstract class AbstractSelectableChannel extends SelectableChannel { /** * Adjusts this channel's blocking mode. */ public final SelectableChannel configureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; } } 

那麼,咱們如今能夠進行 啓動模式的配置 了,讀者很聰明。咱們的項目Demo能夠這樣寫: false爲非阻塞模式、true爲阻塞模式 。socket

public class NioServer { public void server(int port) throws IOException{ //一、打開服務器套接字通道 ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open(); //二、設定爲非阻塞、調整此通道的阻塞模式。 serverSocketChannel.configureBlocking(false); } } 

若未配置阻塞模式,註冊選擇器 會報 java.nio.channels.IllegalBlockingModeException 異常,相關將於該小節大體講解說明。ide

套接字地址端口綁定

作過消息通信服務器的朋友應該都清楚,咱們須要向服務端 指定IP與端口 ,即便是NIO服務器也是同樣的,不然,咱們的客戶端會報 java.net.ConnectException: Connection refused: connect 異常學習

對於NIO的地址端口綁定,咱們也須要用到 ServerSocket服務器套接字 。咱們知道在寫OIO服務端的時候,咱們可能僅僅須要寫一句便可,以下。

//將服務器綁定到指定端口 final ServerSocket socket = new ServerSocket(port); 

固然,JDK在實現NIO的時候就已經想到了,一樣,咱們可使用 服務器套接字通道 來獲取一個 ServerSocket服務器套接字 。這時的它並無綁定端口,咱們須要對應綁定地址,這個類自身就有一個 bind 方法。(以下源碼相關注釋)

/** * This class implements server sockets. A server socket waits for * requests to come in over the network. It performs some operation * based on that request, and then possibly returns a result to the requester. */ public class ServerSocket implements java.io.Closeable { /** * * Binds the {@code ServerSocket} to a specific address * (IP address and port number). */ public void bind(SocketAddress endpoint) throws IOException { bind(endpoint, 50); } } 

經過源碼,咱們知道,綁定iP與端口 須要一個SocketAddress類,咱們僅須要將 IP與端口配置到對應的SocketAddress類 中便可。其實JDK中,已經有了一個更加方便且繼承了SocketAddress的類:InetSocketAddress

InetSocketAddress有一個須要一個port爲參數的構造方法,它將建立 一個ip爲通配符、端口爲指定值的套接字地址 。這很方便咱們的開發,對吧?(以下源碼相關注釋)

/** * * This class implements an IP Socket Address (IP address + port number) * It can also be a pair (hostname + port number), in which case an attempt * will be made to resolve the hostname. If resolution fails then the address * is said to be <I>unresolved</I> but can still be used on some circumstances * like connecting through a proxy. */ public class InetSocketAddress extends SocketAddress { /** * Creates a socket address where the IP address is the wildcard address * and the port number a specified value. */ public InetSocketAddress(int port) { this(InetAddress.anyLocalAddress(), port); } } 

好了,那麼接下來咱們的項目代碼能夠繼續添加綁定IP與端口了,我想聰明的你應該有所感受了。

public class NioServer { public void server(int port) throws IOException{ //一、打開服務器套接字通道 ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open(); //二、設定爲非阻塞、調整此通道的阻塞模式。 serverSocketChannel.configureBlocking(false); //三、檢索與此通道關聯的服務器套接字。 ServerSocket serverSocket = serverSocketChannel.socket(); //四、此類實現 ip 套接字地址 (ip 地址 + 端口號)  InetSocketAddress address = new InetSocketAddress(port); //五、將服務器綁定到選定的套接字地址 serverSocket.bind(address); } } 

正如開頭咱們所說的,你的項目中不添加3-5環節的代碼並無問題,可是當客戶端接入時,則會報錯,由於客戶端將要 接入的地址是鏈接不到的 ,如會報這樣的錯誤。

java.net.ConnectException: Connection refused: connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.Net.connect(Net.java:449) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) at com.github.myself.WebClient.main(WebClient.java:16) 

註冊選擇器

接下來會是 NIO實現的重點 ,可能有點難理解,若是但願你們能一次理解,徹底深刻有點難講明白,不過先大體點一下。

首先要先介紹如下JDK實現NIO的核心:多路複用器(Selector)——選擇器

先簡單並抽象的理解下,Java經過 選擇器來實現處理多個Channel連接 ,將空閒未進行數據操做的擱置,優先執行有需求的數據傳輸,即 經過一個選擇器來選擇誰須要誰不須要使用共享的線程 。

由此,理所固然,這樣的選擇器應該也有Java本身定義的獲取方法, 其自身的 open 就是啓動一個這樣的選擇器。(以下源碼相關注釋)

/** * A multiplexor of {@link SelectableChannel} objects. */ public abstract class Selector implements Closeable { /** * Opens a selector. */ public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } } 

那麼如今,咱們還要考慮一件事情,咱們的 服務器套接字通道 要如何與 選擇器 相關聯呢?

ServerSocketChannel 有一個註冊的方法,這個方法就是將它們兩個進行了關聯,同時這個註冊方法 除了關聯選擇器外,還標識了註冊的狀態 ,讓咱們先看看源碼吧。

如下的 ServerSocketChannel 繼承 —》 AbstractSelectableChannel 繼承 —》 SelectableChannel

/** * A channel that can be multiplexed via a {@link Selector}. */ public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel { /** * Registers this channel with the given selector, returning a selection * key. */ public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); } } 

咱們通常須要將選擇器註冊上去,並將 ServerSocketChannel 標識爲 接受鏈接 的狀態。咱們先看看咱們的項目代碼應該如何寫。

public class NioServer { public void server(int port) throws IOException{ //一、打開服務器套接字通道 ServerSocketChannel serverSocketzhannel = ServerSocketChannel.open(); //二、設定爲非阻塞、調整此通道的阻塞模式。 serverSocketChannel.configureBlocking(false); //三、檢索與此通道關聯的服務器套接字。 ServerSocket serverSocket = serverSocketChannel.socket(); //四、此類實現 ip 套接字地址 (ip 地址 + 端口號)  InetSocketAddress address = new InetSocketAddress(port); //五、將服務器綁定到選定的套接字地址 serverSocket.bind(address); //六、打開Selector來處理Channel Selector selector = Selector.open(); //七、將ServerSocket註冊到Selector已接受鏈接,註冊會判斷是否爲非阻塞模式 SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer readBuff = ByteBuffer.allocate(1024); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); while(true){ //下方代碼..... } } } 

注意: 咱們前面說到,若是 ServerSocketChannel 沒有啓動非阻塞模式,那麼咱們在啓動的時候會報 java.lang.IllegalArgumentException 異常,這是爲何呢? 我想咱們可能須要更深刻底層去看看 register 這個方法(以下源碼註釋)

/** * Base implementation class for selectable channels. */ public abstract class AbstractSelectableChannel extends SelectableChannel { /** * Registers this channel with the given selector, returning a selection key. */ public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } } } 

我想咱們終於真相大白了,原來註冊這個方法會對 ServerSocketChannel 的一系列參數進行 校驗,只有經過,才能註冊成功,因此咱們也明白了,爲何 非阻塞是false,同時咱們也能夠看到,它還對咱們所給的標識作了校驗,一點要優先註冊 接受鏈接(OP_ACCEPT) 這個狀態才行,否則依舊會報 java.lang.IllegalArgumentException 異常。

這裏解釋一下,之因此只接受 OP_ACCEPT ,是由於若是沒有一個接受其餘連接的主服務,那麼通訊根本無從提及,同時這樣的標識在咱們的NIO服務端中 只容許標識一次(一個ServerSocketChannel) 。

可能你們還會好奇有什麼標識,我想源碼的說明確實寫的很清楚了。

/** * Operation-set bit for read operations. */ public static final int OP_READ = 1 << 0; /** * Operation-set bit for write operations. */ public static final int OP_WRITE = 1 << 2; /** * Operation-set bit for socket-connect operations. */ public static final int OP_CONNECT = 1 << 3; /** * Operation-set bit for socket-accept operations. */ public static final int OP_ACCEPT = 1 << 4; 

好了,這裏給一個調試截圖,但願你們也能夠慢慢的摸索一下。

Image Text

注意這裏的服務端並無構建完成哦,咱們還須要下面的幾個步驟。

NIO選擇實例與興趣點

客戶端代碼

說到這裏,咱們暫時先休息下,轉頭看看 客戶端的代碼 吧,這裏就簡單的介紹下,咱們將創建 一個針對服務地址端口的鏈接 ,而後不停的循環 寫操做與讀操做 ,沒有對客戶端進行 關閉操做

你們若是有興趣的話,也能夠本身調試,並看看部分類的JDK源碼,以下給出本項目案例的客戶端代碼。

public class WebClient { public static void main(String[] args) throws IOException { try { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("0.0.0.0",8090)); ByteBuffer writeBuffer = ByteBuffer.allocate(32); ByteBuffer readBuffer = ByteBuffer.allocate(32); writeBuffer.put("hello".getBytes()); writeBuffer.flip(); while (true){ writeBuffer.rewind(); socketChannel.write(writeBuffer); readBuffer.clear(); socketChannel.read(readBuffer); readBuffer.flip(); System.out.println(new String(readBuffer.array())); } }catch (IOException e){ e.printStackTrace(); } } } 

準備IO接入操做

這裏有點複雜,我也儘量的思考了表達的方式,首先咱們先明確一下,全部的鏈接都會被Selector所囊括,即咱們要獲取新接入的鏈接,也要經過Selector來獲取,咱們一開始啓動的 服務器套接字通道ServerSocketChannel 起到一個接入\入口(或許不夠準確)的做用,客戶端鏈接經過IP與端口進入後,會 被註冊的Selector所獲取 到,成爲 Selector 其中的一員。

可是這裏的一員 並不會包括一開始註冊並被標誌爲接收鏈接 的 ServerSocketChannel 。

Selector有這樣一個方法,它會自動去等待新的鏈接事件,若是沒有鏈接接入,那麼它將一直處於阻塞狀態。經過字面意思咱們能夠大體這樣寫代碼。

while(true){ try{ //一、等到須要處理的新事件:阻塞將一直持續到下一個傳入事件 selector.select(); }catch(IOException e){ e.printStackTrace(); break; } } 

那麼這樣寫好像有點像樣,畢竟異常咱們也捕獲了,同時也使用了剛剛 開啓並註冊完畢的選擇器Selector

讓咱們看看源碼中對於這個方法 select 的註釋吧。

/** * A multiplexor of {@link SelectableChannel} objects. */ public abstract class Selector implements Closeable { /** * Selects a set of keys whose corresponding channels are ready for I/O * operations. */ public abstract int select() throws IOException; } 

好的,看樣子是對的,它將返回一組套接字通道已經準備好執行I/O操做的鍵。那麼這個Key到底是什麼呢?

這裏可能直觀的感覺下會更好。以下圖是我調試下看到的key對象,我想你們應該能夠理解了,這個Key中也會 存放對應鏈接的Channel與Selector 。

Image Text

具體的內部更深層的就探討了。那麼這也解決了咱們接下來的 一個疑問 ,咱們要怎麼向Selector拿鏈接進來的實例呢?

答案很明顯,咱們僅須要 獲取到這個Keys 就行了。

選擇鍵集合操做

對於獲取Keys這個如今應該已經不是什麼問題了,經過上面章節的瞭解,我想你們也能夠想到這樣的大體語法。

//獲取全部接收事件的SelectionKey實例 Set<SelectionKey> readykeys = selector.selectedKeys(); 

你們或許會好奇,這裏的Key對象竟然是前面的 SelectionKey.OP_ACCEPT 對象,是的,這也是接下來要講的,這很奇妙,也很好玩。

前面說到的標識,這是每個Key自有的,而且是能夠 改變的狀態 ,在剛剛鏈接的時候,或許我應該大體的描述一下 一個新鏈接進入選擇器後的流程 :select方法將接受到新接入的鏈接事件,它會被Selector以Key的形式存儲,這時咱們須要 對其進行判斷 ,是不是已經就緒能夠被接受的鏈接,若是是,這時咱們須要 獲取這個鏈接 ,同時也將其設定爲 非阻塞的狀態 ,並將它 註冊到選擇器上(固然,這時的標識就不能是一開始的 OP_ACCEPT ),你能夠選擇性的 註冊它的標識 ,以後咱們能夠經過循環遍歷Keys來,讓 某一標識的鏈接去執行對應的操做 。

說到這裏,我想部分新手可能會有點模糊,我想我仍是把接下來的代碼都一塊兒放出來吧,你們先看看是否可以再次結合文本進行了解。

while (true){ try { //等到須要處理的新事件:阻塞將一直持續到下一個傳入事件 selector.select(); }catch (IOException e){ e.printStackTrace(); break; } //獲取全部接收事件的SelectionKey實例 Set<SelectionKey> readykeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readykeys.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); try { //檢查事件是不是一個新的已經就緒能夠被接受的鏈接 if (key.isAcceptable()){ //channel:返回爲其建立此鍵的通道。 即便在取消密鑰後, 此方法仍將繼續返回通道。 ServerSocketChannel server = (ServerSocketChannel)key.channel(); //可選擇的通道, 用於面向流的鏈接插槽。 SocketChannel client = server.accept(); //設定爲非阻塞 client.configureBlocking(false); //接受客戶端,並將它註冊到選擇器,並添加附件 client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,msg.duplicate()); System.out.println("Accepted connection from " + client); } //檢查套接字是否已經準備好讀數據 if (key.isReadable()){ SocketChannel client = (SocketChannel)key.channel(); readBuff.clear(); client.read(readBuff); readBuff.flip(); System.out.println("received:"+new String(readBuff.array())); //將此鍵的興趣集設置爲給定的值。 OP_WRITE key.interestOps(SelectionKey.OP_WRITE); } //檢查套接字是否已經準備好寫數據 if (key.isWritable()){ SocketChannel client = (SocketChannel)key.channel(); //attachment : 檢索當前附件 ByteBuffer buffer = (ByteBuffer)key.attachment(); buffer.rewind(); client.write(buffer); //將此鍵的興趣集設置爲給定的值。 OP_READ key.interestOps(SelectionKey.OP_READ); } }catch (IOException e){ e.printStackTrace(); } } } 

提示:讀到此處,還請各位讀者能運行整個demo,並調試下,看看與本身理解的是否有差異。

流程效果

如下我簡單敘述一下,我在調試時的理解與效果。

  • 一、啓動服務端後,運行到 selector.select(); 後阻塞,由於沒有監聽到新的鏈接。

  • 二、啓動客戶端後,selector.select() 監聽到新鏈接,往下執行獲取到的Keys的size爲1,進入Key標識分支判斷

  • 三、key.isAcceptable() 首次接入爲true,設置爲非阻塞,並註釋到選擇器中修改標識爲 SelectionKey.OP_WRITE | SelectionKey.OP_READ ,同時添加附件信息 msg.duplicate() ,首次循環結束

  • 四、二次循環,鏈接未關閉,獲取到的Keys的size爲1,進入Key標識分支判斷。

  • 五、因爲第一次該Key標識改變,因此此次 key.isAcceptable() 爲false,而因爲改了標識,因此接下來的 key.isReadable() 、 key.isWritable() 都爲true,執行讀寫操做,循環結束。

  • 六、接下來的循環,基本上是key.isReadable() 、 key.isWritable() 都爲true,執行讀寫操做。

  • 七、設想一下,若是多加一條連接是什麼效果。

回顧

這裏給出幾個代碼的注意點,但願你們能夠本身去了解學習。

  • 一、關於 ByteBuffer 本文並不重點講解,你們能夠自行了解
  • 二、關於Key標識判斷的代碼,如下兩句的刪減是否會對代碼有所影響呢?
key.interestOps(SelectionKey.OP_WRITE); key.interestOps(SelectionKey.OP_READ); 
  • 三、若是刪除了2中的代碼,並把客戶端註冊選擇器並給標識的代碼改成如下,那麼項目運行效果怎麼樣呢?
client.register(selector, SelectionKey.OP_READ,msg.duplicate()); 
  • 四、若是改了3的代碼,但是不刪除2的代碼,那麼效果又是怎麼樣呢?

答案留給讀者去揭曉吧,若是你有答案,歡迎留言。

我的相關項目

Image text

InChat : 一個輕量級、高效率的支持多端(應用與硬件Iot)的異步網絡應用通信框架

相關文章
相關標籤/搜索