JAVA NIO學習三:NIO 的非阻塞式網絡通訊

緊接着上一章,咱們繼續來研究NIO,上一章中咱們講了NIO 中最多見的操做即文件通道的操做,但實際上NIO的主要用途仍是在於網絡通訊,那麼這個時候就會涉及到選擇器,這一章咱們就會對其進行講解操做。java

1、阻塞和非阻塞服務器

傳統的 IO 流都是阻塞式的。也就是說,當一個線程調用 read() 或 write()時,該線程被阻塞,直到有一些數據被讀取或寫入,該線程在此期間不能執行其餘任務。所以,在完成網絡通訊進行 IO 操做時,因爲線程會阻塞,因此服務器端必須爲每一個客戶端都提供一個獨立的線程進行處理,當服務器端須要處理大量客戶端時,性能急劇降低。
Java NIO 是非阻塞模式的。當線程從某通道進行讀寫數據時,若沒有數據可用時,該線程能夠進行其餘任務。線程一般將非阻塞 IO 的空閒時間用於在其餘通道上執行 IO 操做,因此單獨的線程能夠管理多個輸入和輸出通道。所以, NIO 可讓服務器端使用一個或有限幾個線程來同時處理鏈接到服務器端的全部客戶端。網絡

下面我看個例子來使用NIO 演示下阻塞式,即不採用選擇器狀況下:併發

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.Test;

/*
 * 1、使用 NIO 完成網絡通訊的三個核心:
 * 
 * 1. 通道(Channel):負責鏈接
 *         
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 * 
 *                 |--Pipe.SinkChannel
 *                 |--Pipe.SourceChannel
 * 
 * 2. 緩衝區(Buffer):負責數據的存取
 * 
 * 3. 選擇器(Selector):是 SelectableChannel 的多路複用器。用於監控 SelectableChannel 的 IO 情況
 * 
 */
public class TestBlockingNIO {

    //客戶端
    @Test
    public void client() throws IOException{
        //1. 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
        
        //2. 分配指定大小的緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //3. 讀取本地文件,併發送到服務端
        while(inChannel.read(buf) != -1){
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        //4. 關閉通道
        inChannel.close();
        sChannel.close();
    }
    
    //服務端
    @Test
    public void server() throws IOException{
        //1. 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        
        //2. 綁定鏈接
        ssChannel.bind(new InetSocketAddress(9898));
        
        //3. 獲取客戶端鏈接的通道
        SocketChannel sChannel = ssChannel.accept();
        
        //4. 分配指定大小的緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //5. 接收客戶端的數據,並保存到本地
        while(sChannel.read(buf) != -1){
            buf.flip();
            outChannel.write(buf);
            buf.clear();
        }
        
        //6. 關閉通道
        sChannel.close();
        outChannel.close();
        ssChannel.close();
        
    }
    
}

那麼解決上面的方法,之前沒有選擇器的時候,對於阻塞狀況,咱們能夠採用下面的方法:(發送完成,自動本身關閉告知已發送完成)性能

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.Test;

public class TestBlockingNIO2 {
    
    //客戶端
    @Test
    public void client() throws IOException{
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
        
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        while(inChannel.read(buf) != -1){
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        sChannel.shutdownOutput(); //接收服務端的反饋
        int len = 0;
        while((len = sChannel.read(buf)) != -1){
            buf.flip();
            System.out.println(new String(buf.array(), 0, len));
            buf.clear();
        }
        
        inChannel.close();
        sChannel.close();
    }
    
    //服務端
    @Test
    public void server() throws IOException{
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        
        ssChannel.bind(new InetSocketAddress(9898));
        
        SocketChannel sChannel = ssChannel.accept();
        
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        while(sChannel.read(buf) != -1){
            buf.flip();
            outChannel.write(buf);
            buf.clear();
        }
        
        //發送反饋給客戶端
        buf.put("服務端接收數據成功".getBytes());
        buf.flip();
        sChannel.write(buf);
        
        sChannel.close();
        outChannel.close();
        ssChannel.close();
    }

}

2、選擇器(Selector)ui

選擇器(Selector) 是 SelectableChannle 對象的多路複用器, Selector 能夠同時監控多個 SelectableChannel 的 IO 情況,也就是說,利用 Selector可以使一個單獨的線程管理多個 Channel。 Selector 是非阻塞 IO 的核心。
SelectableChannle 的結構以下圖:spa

 選擇器(Selector)的應用
.net

建立 Selector :經過調用 Selector.open() 方法建立一個 Selector
線程

向選擇器註冊通道: SelectableChannel.register(Selector sel, int ops)
3d

選擇器(Selector)的應用

當調用 register(Selector sel, int ops) 將通道註冊選擇器時,選擇器對通道的監聽事件,須要經過第二個參數 ops 指定。
能夠監聽的事件類型(可以使用 SelectionKey 的四個常量表示):
 讀 : SelectionKey.OP_READ (1)
 寫 : SelectionKey.OP_WRITE (4)
 鏈接 : SelectionKey.OP_CONNECT (8)
 接收 : SelectionKey.OP_ACCEPT (16)
若註冊時不止監聽一個事件,則能夠使用「位或」操做符鏈接

例: 

SelectionKey 

SelectionKey: 表示 SelectableChannel 和 Selector 之間的註冊關係。每次向選擇器註冊通道時就會選擇一個事件(選擇鍵)。 選擇鍵包含兩個表示爲整數值的操做集。操做集的每一位都表示該鍵的通道所支持的一類可選擇操做 。

方 法 描 述
int interestOps() 獲取感興趣事件集合
int readyOps() 獲取通道已經準備就緒的操做的集合
SelectableChannel channel() 獲取註冊通道
Selector selector() 返回選擇器
boolean isReadable() 檢測 Channal 中讀事件是否就緒
boolean isWritable() 檢測 Channal 中寫事件是否就緒
boolean isConnectable() 檢測 Channel 中鏈接是否就緒
boolean isAcceptable() 檢測 Channel 中接收是否就緒

Selector 的經常使用方法

方 法 描 述
Set<SelectionKey> keys() 全部的 SelectionKey 集合。表明註冊在該Selector上的Channel
selectedKeys() 被選擇的 SelectionKey 集合。返回此Selector的已選擇鍵集
int select() 監控全部註冊的Channel,當它們中間有須要處理的 IO 操做時,
該方法返回,並將對應得的 SelectionKey 加入被選擇的
SelectionKey 集合中,該方法返回這些 Channel 的數量。
int select(long timeout) 能夠設置超時時長的 select() 操做
int selectNow() 執行一個當即返回的 select() 操做,該方法不會阻塞線程
Selector wakeup() 使一個還未返回的 select() 方法當即返回
void close() 關閉該選擇器

SocketChannel 

Java NIO中的SocketChannel是一個鏈接到TCP網絡套接字的通道。
操做步驟:
 打開 SocketChannel
 讀寫數據
 關閉 SocketChannel

Java NIO中的 ServerSocketChannel 是一個能夠監聽新進來的TCP鏈接的通道,就像標準IO中的ServerSocket同樣
代碼樣例;

package com.atguigu.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

import org.junit.Test;

/*
 * 1、使用 NIO 完成網絡通訊的三個核心:
 * 
 * 1. 通道(Channel):負責鏈接
 *         
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 * 
 *                 |--Pipe.SinkChannel
 *                 |--Pipe.SourceChannel
 * 
 * 2. 緩衝區(Buffer):負責數據的存取
 * 
 * 3. 選擇器(Selector):是 SelectableChannel 的多路複用器。用於監控 SelectableChannel 的 IO 情況
 * 
 */
public class TestNonBlockingNIO {
    
    //客戶端
    @Test
    public void client() throws IOException{
        //1. 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        //2. 切換非阻塞模式
        sChannel.configureBlocking(false);
        
        //3. 分配指定大小的緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //4. 發送數據給服務端
        Scanner scan = new Scanner(System.in);
        
        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        //5. 關閉通道
        sChannel.close();
    }

    //服務端
    @Test
    public void server() throws IOException{
        //1. 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        //2. 切換非阻塞模式
        ssChannel.configureBlocking(false);
        
        //3. 綁定鏈接
        ssChannel.bind(new InetSocketAddress(9898));
        
        //4. 獲取選擇器
        Selector selector = Selector.open();
        
        //5. 將通道註冊到選擇器上, 而且指定「監聽接收事件」
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        //6. 輪詢式的獲取選擇器上已經「準備就緒」的事件
        while(selector.select() > 0){
            
            //7. 獲取當前選擇器中全部註冊的「選擇鍵(已就緒的監聽事件)」
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            
            while(it.hasNext()){
                //8. 獲取準備「就緒」的是事件
                SelectionKey sk = it.next();
                
                //9. 判斷具體是什麼事件準備就緒
                if(sk.isAcceptable()){
                    //10. 若「接收就緒」,獲取客戶端鏈接
                    SocketChannel sChannel = ssChannel.accept();
                    
                    //11. 切換非阻塞模式
                    sChannel.configureBlocking(false);
                    
                    //12. 將該通道註冊到選擇器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13. 獲取當前選擇器上「讀就緒」狀態的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    
                    //14. 讀取數據
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    
                    int len = 0;
                    while((len = sChannel.read(buf)) > 0 ){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }
                
                //15. 取消選擇鍵 SelectionKey
                it.remove();
            }
        }
    }
}

DatagramChannel

Java NIO中的DatagramChannel是一個能收發UDP包的通道。
 操做步驟:
 打開 DatagramChannel
 接收/發送數據

樣例;

package com.atguigu.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

import org.junit.Test;

public class TestNonBlockingNIO2 {
    
    @Test
    public void send() throws IOException{
        DatagramChannel dc = DatagramChannel.open();
        
        dc.configureBlocking(false);
        
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        Scanner scan = new Scanner(System.in);
        
        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + ":\n" + str).getBytes());
            buf.flip();
            dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
            buf.clear();
        }
        
        dc.close();
    }
    
    @Test
    public void receive() throws IOException{
        DatagramChannel dc = DatagramChannel.open();
        
        dc.configureBlocking(false);
        
        dc.bind(new InetSocketAddress(9898));
        
        Selector selector = Selector.open();
        
        dc.register(selector, SelectionKey.OP_READ);
        
        while(selector.select() > 0){
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            
            while(it.hasNext()){
                SelectionKey sk = it.next();
                
                if(sk.isReadable()){
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    
                    dc.receive(buf);
                    buf.flip();
                    System.out.println(new String(buf.array(), 0, buf.limit()));
                    buf.clear();
                }
            }
            
            it.remove();
        }
    }

}

管道 (Pipe)

Java NIO 管道是2個線程之間的單向數據鏈接。Pipe有一個source通道和一個sink通道。數據會被寫到sink通道,從source通道讀取。

向管道寫數據

 

從管道讀取數據

從讀取管道的數據,須要訪問source通道。

調用source通道的read()方法來讀取數據

代碼樣例:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

import org.junit.Test;

public class TestPipe {

    @Test
    public void test1() throws IOException{
        //1. 獲取管道
        Pipe pipe = Pipe.open();
        
        //2. 將緩衝區中的數據寫入管道
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        Pipe.SinkChannel sinkChannel = pipe.sink();
        buf.put("經過單向管道發送數據".getBytes());
        buf.flip();
        sinkChannel.write(buf);
        
        //3. 讀取緩衝區中的數據
        Pipe.SourceChannel sourceChannel = pipe.source();
        buf.flip();
        int len = sourceChannel.read(buf);
        System.out.println(new String(buf.array(), 0, len));
        
        sourceChannel.close();
        sinkChannel.close();
    }
    
}

 

參考資料:

《尚硅谷》視頻

相關文章
相關標籤/搜索