上一篇文章 JAVA NIO編程入門(一)咱們學習了NIO編程的基礎知識,並經過一個小demo實戰幫助瞭解NIO編程的channel,buffer等概念。本文會繼續學習JAVA NIO編程,並經過一個小示例來幫助理解相關知識,經過本文你將能夠學習到html
分散(Scatter)示意圖java
從通道填充buffer,必須填充完前一個buffer纔會填充後面的buffer,這也意味着不能動態調整每一個buffer的接受大小。編程
彙集(Gather)示意圖bash
彙集和分散是相反的形式,從buffer寫入數據到通道,只會寫入buffer的positon位置到limit位置的內容,也就是意味着能夠動態的寫入內容到通道中。網絡
什麼是選擇器框架
Selector(選擇器)是Java NIO中可以檢測多個NIO通道,並可以知道通道是否爲諸如讀寫事件作好準備的組件。這樣,一個單獨的線程能夠管理多個channel,從而管理多個網絡鏈接,提升效率。socket
爲何要用選擇器分佈式
使用了選擇器就能夠用一個線程管理多個channel,若是多個channel由多個線程管理,線程以前的切換是消耗資源的,而單個線程就避免了線程之間切換的消耗。post
選擇器經常使用方法學習
方法名 | 功能 |
---|---|
register(Selector sel, int ops) | 向選擇器註冊通道,而且能夠選擇註冊指定的事件,目前事件分爲4種;1.Connect,2.Accept,3.Read,4.Write,一個通道能夠註冊多個事件 |
select() | 阻塞到至少有一個通道在你註冊的事件上就緒了 |
selectNow() | 不會阻塞,無論什麼通道就緒都馬上返回 |
select(long timeout) | 和select()同樣,除了最長會阻塞timeout毫秒(參數) |
selectedKeys() | 一旦調用了select()方法,而且返回值代表有一個或更多個通道就緒了,而後能夠經過調用selector的selectedKeys()方法,訪問「已選擇鍵集(selected key set)」中的就緒通道 |
wakeUp() | 能夠使調用select()阻塞的對象返回,不阻塞。 |
close() | 用完Selector後調用其close()方法會關閉該Selector,且使註冊到該Selector上的全部SelectionKey實例無效。通道自己並不會關閉 |
實戰需求說明
編碼客戶端和服務端,服務端能夠接受客戶端的請求,並返回一個報文,客戶端接受報文並解析輸出。
服務端代碼
try {
//建立一個服socket並打開
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//監聽綁定8090端口
serverSocketChannel.socket().bind(new InetSocketAddress(8090));
//設置爲非阻塞模式
serverSocketChannel.configureBlocking(false);
while(true){
//獲取請求鏈接
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel!=null){
ByteBuffer buf1 = ByteBuffer.allocate(1024);
socketChannel.read(buf1);
buf1.flip();
if(buf1.hasRemaining())
System.out.println(">>>服務端收到數據:"+new String(buf1.array()));
buf1.clear();
//構造返回的報文,分爲頭部和主體,實際狀況能夠構造複雜的報文協議,這裏只演示,不作特殊設計。
ByteBuffer header = ByteBuffer.allocate(6);
header.put("[head]".getBytes());
ByteBuffer body = ByteBuffer.allocate(1024);
body.put("i am body!".getBytes());
header.flip();
body.flip();
ByteBuffer[] bufferArray = { header, body };
socketChannel.write(bufferArray);
socketChannel.close();
}else{
Thread.sleep(1000);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
複製代碼
服務端selector(選擇器版本)
try {
//打開選擇器
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8090));
serverSocketChannel.configureBlocking(false);
//向通道註冊選擇器,而且註冊接受事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//獲取已經準備好的通道數量
int readyChannels = selector.selectNow();
//若是沒準備好,重試
if (readyChannels == 0) continue;
//獲取準備好的通道中的事件集合
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
if (key.isAcceptable()) {
//在本身註冊的事件中寫業務邏輯,
//我這裏註冊的是accept事件,
//這部分邏輯和上面非選擇器服務端代碼同樣。
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel1.accept();
ByteBuffer buf1 = ByteBuffer.allocate(1024);
socketChannel.read(buf1);
buf1.flip();
if (buf1.hasRemaining())
System.out.println(">>>服務端收到數據:" + new String(buf1.array()));
buf1.clear();
ByteBuffer header = ByteBuffer.allocate(6);
header.put("[head]".getBytes());
ByteBuffer body = ByteBuffer.allocate(1024);
body.put("i am body!".getBytes());
header.flip();
body.flip();
ByteBuffer[] bufferArray = {header, body};
socketChannel.write(bufferArray);
socketChannel.close();
} else if (key.isConnectable()) {
} else if (key.isReadable()) {
} else if (key.isWritable()) {
}
//注意每次迭代末尾的keyIterator.remove()調用。
//Selector不會本身從已選擇鍵集中移除SelectionKey實例。必須在處理完通道時本身移除。
//下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
複製代碼
客戶端代碼
try {
//打開socket鏈接,鏈接本地8090端口,也就是服務端
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8090));
//請求服務端,發送請求
ByteBuffer buf1 = ByteBuffer.allocate(1024);
buf1.put("來着客戶端的請求".getBytes());
buf1.flip();
if (buf1.hasRemaining())
socketChannel.write(buf1);
buf1.clear();
//接受服務端的返回,構造接受緩衝區,咱們定義頭6個字節爲頭部,後續其餘字節爲主體內容。
ByteBuffer header = ByteBuffer.allocate(6);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
socketChannel.read(bufferArray);
header.flip();
body.flip();
if (header.hasRemaining())
System.out.println(">>>客戶端接收頭部數據:" + new String(header.array()));
if (body.hasRemaining())
System.out.println(">>>客戶端接收body數據:" + new String(body.array()));
header.clear();
body.clear();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
複製代碼
運行結果
服務端:
客戶端:
這裏給出了服務端代碼的兩個版本,一個是非選擇器的版本,一個是選擇器的版本。查看最後運行結果,發現客戶端根據雙方約定的協議格式,正確解析到了頭部和body的內容,其實這也是彙集和分散最主要的做用和應用場景,在網絡交互中,進行協議報文格式的定義和實現。後續學完NIO編程入門後咱們最後進行總結性的實戰,編寫一個RPC的demo框架,實現分佈式系統的遠程調用,有興趣的同窗能夠關注筆者和後續的文章。
《JAVA NIO》