NIO(三):Selector選擇器

一.堵塞式與非堵塞式

在傳統IO中,將數據由當前線程從客戶端傳入服務端,由服務端的內核進行判斷傳過來的數據是否合法,內核中是否存在數據。

 

若是不存在數據 ,而且數據並不合法,當前線程將會堵塞等待。當前線程將沒法進行下一步傳輸,進行排隊現象。下降系統性能。

爲了解決這一步問題,調用資源開闢多個線程傳輸。

 

 雖然線程的開闢解決了部分堵塞排隊的問題,但因爲並無治理根本堵塞的緣由,線程數量也是有限的。總會有堵塞的線程 ,造成排隊現象。

爲了根本解決堵塞的問題。NIO的非堵塞式成爲了主要的傳輸方式。

在客戶端和服務端之間將通道註冊到selector選擇器,由選擇器進行監聽channel是否進行什麼操做(read()or write())。

 

當數據就緒或者準備完成時,由selector進行分配到服務端的一個(或多個)線程上進行相關運行操做。

 

 在IO的堵塞後無腦調用線程下。NIO是在準備完成時,才被selector選擇分配到一個或者多個線程上傳輸並被複制到內核地址空間中,因爲數據已準備完成或者已就緒,內核就無須被堵塞。

 

 

二.Selector(選擇器)

也稱多路複用器,多條channel複用selector。channe經過註冊到selector ,使selector對channel進行監聽,

  實現儘量少的線程管理多個鏈接。減小了 線程的使用,下降了由於線程的切換引發的沒必要要額資源浪費和多餘的開銷。

  也是網絡傳輸非堵塞的核心組件。

  

三.Selector的使用

分爲客戶端和服務端兩部分:

先實現客戶端吧:

  流程: 獲取通道綁定主機端口 --> 切換非堵塞狀態  --> 開闢buffer容量  -->  將當前時間做爲數據寫入buffer待傳  --> 切換讀寫方式flip()  --> 寫入通道 -->清空並關閉

 1  /*
 2     * 客戶端發送數據 經過channel通道
 3     * */
 4     @Test
 5     public void Client() throws IOException {
 6 
 7         //獲取channel通道   並設置主機號和端口號
 8         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));
 9 
10         //由於使用非阻塞NIO  因此必須切換爲非阻塞
11         socketChannel.configureBlocking(false);   //默認爲true 須要改成非堵塞的
12 
13         //開闢緩衝區進行存儲數據
14         ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
15 
16         //準備工做就緒後,準備發送數據給服務端
17         //打印當前日期轉爲Byte數據傳出
18         byteBuffer.put(new Date().toString().getBytes());
19         //切換讀寫模式
20         byteBuffer.flip();
21         //寫入通道
22         socketChannel.write(byteBuffer);
23         //完畢時,清除緩衝區內容
24         byteBuffer.clear();
25 
26     //====================
27         //關閉相關流
28         socketChannel.close();
29 
30     }

 

在獲取當前時間是用的new Date();還可使用java8的獲取時間的方法。

LocalDateTime.now().toString().getBytes()  //轉爲Byte字節

 

 由於是網絡傳輸的心形式,因此在獲取channel時,使用SocketChannel.open方法。實現方法:

 1   public static SocketChannel open(SocketAddress remote)
 2         throws IOException
 3     {
 4         SocketChannel sc = open();
 5         try {
 6             sc.connect(remote);   //打開一個新的channel時,綁定鏈接到主機和端口上
 7         } catch (Throwable x) {
 8             try {
 9                 sc.close();  //異常時關閉鏈接
10             } catch (Throwable suppressed) {
11                 x.addSuppressed(suppressed);
12             }
13             throw x;
14         }
15         assert sc.isConnected();
16         return sc;
17     }

 

 new InetSocketAddress實例建立主機和端口。

   */
    public InetSocketAddress(String hostname, int port) {
        checkHost(hostname);    //檢查主機號是否爲空 爲空返回異常。
        InetAddress addr = null;
        String host = null;
        try {
            addr = InetAddress.getByName(hostname);
        } catch(UnknownHostException e) {
            host = hostname;
        }
        holder = new InetSocketAddressHolder(host, addr, checkPort(port));  //檢查端口。
    }


//檢查端口方法  
private static int checkPort(int port) {
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException("port out of range:" + port);
return port;
}

//檢查主機號方法
private static String checkHost(String hostname) {
if (hostname == null)
throw new IllegalArgumentException("hostname can't be null");
return hostname;
}
 

 

 

服務端:

  流程:使用ServerSocketChannel 的方法獲取服務端額channel  --> 切換爲堵塞狀態 --> 爲buffer分配容量 --> 綁定端口號 --> 獲取selector選擇器 --> channel註冊進選擇器中,並進行監聽 -->  選擇器進行輪詢,進行下一步讀寫操做。

 1  /*
 2     * 服務端接收客戶端傳來的數據
 3     * */
 4     @Test
 5     public void server() throws IOException {
 6 
 7         //獲取channel通道
 8         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 9         //切換爲非堵塞狀態
10         serverSocketChannel.configureBlocking(false);
11         //分配服務端的緩衝區
12         ByteBuffer serverByteBuffer = ByteBuffer.allocate(1024);
13         //將客戶端的InetSocketAddress綁定到通道,不綁定 不統一將獲取不到數據
14         serverSocketChannel.bind(new InetSocketAddress(8080));
15         //獲取選擇器
16         Selector selector = Selector.open();
17         //將通道註冊到選擇器中,而且制定監聽方式
18         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
19         //進行輪詢選擇器上就緒成功的事件  當存在就緒成功的及進行下一步
20         while (selector.select() > 0){
21             //對已存在的就緒事件進行迭代
22             Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
23 
24             //有元素就進行下一步
25             while (selectionKeyIterator.hasNext()){
26                 //獲取到就緒事件
27                 SelectionKey next = selectionKeyIterator.next();
28 
29                 //對獲取到的就緒事件判斷是何種類型
30                 if (next.isAcceptable()){
31 
32                     //獲取鏈接
33                     SocketChannel accept = serverSocketChannel.accept();
34 
35                     //將獲取到的鏈接切換爲非堵塞模式
36                     accept.configureBlocking(false);
37 
38                     //將獲取到的連接 註冊金selector
39                     accept.register(selector,SelectionKey.OP_READ);
40 
41                     //判斷是否準備好讀
42                 }else if (next.isReadable()){
43 
44                     //獲取已就緒的通道
45                     SocketChannel channel = (SocketChannel) next.channel();
46 
47                     //分配緩衝區
48                     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
49 
50                     //讀取數據
51                     int length = 0 ;
52                     while ((length = channel.read(byteBuffer)) > 0){
53                         byteBuffer.flip();
54                         System.out.println(new String(byteBuffer.array(),0,length));
55                         byteBuffer.clear();
56                     }
57 
58 
59                 }
60 
61                 //完成傳輸須要取消選擇鍵,防止下次出問題
62                 selectionKeyIterator.remove();
63 
64             }
65         }
66 
67 
68     }

 

如何獲取選擇器?

Selector selector = Selector.open();

 

 

實現過程:

 public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }



//首先進入此方法判斷是否存在選擇器
 public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)  //第一次爲false
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }


//false時 跳入以下方法。
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
 

 

隨後將獲取到的通道註冊到獲取到的選擇器中,在註冊時給定監聽方式:

 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  //可多選監聽操做項

selectionKey中定義了四個可操做項:

  • OP_READ  可讀就緒

  • OP_WRITE  可寫就緒

  • OP_CONNECT  鏈接就緒

  • OP_ACCEPT  接收就緒

 

迭代key中已就緒的元素。

Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();

 

獲取到當前就緒事件叢迭代器中獲取。

selectionKeyIterator.next()

 

 

selectionKey包含四個方法:

  • isReadable():測試此選擇鍵是否可讀  

  • isWritable():測試此選擇鍵是否可寫

  • isConnectable():測試此選擇鍵是否完成

  • isAcceptable():測試此選擇鍵是否能夠接受一個新的鏈接

 經過這些相應的方法,單獨判斷是否能夠讀寫,和進行操做。

 

最後取消選擇鍵,防止下次獲取出現異常狀況。(第一次判斷可能會爲true)

selectionKeyIterator.remove();

 

 

四.附加

在上面的例子中,把客戶端的代碼進行稍微改寫一下,使之可以無限輸入,並經過傳輸打印在服務端中。

public static void main(String[] args) throws IOException {
        //獲取channel通道   並設置主機號和端口號
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));

        //由於使用非阻塞NIO  因此必須切換爲非阻塞
        socketChannel.configureBlocking(false);

        //開闢緩衝區進行存儲數據
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //附加輸入:
        Scanner scanner = new Scanner(System.in); //經過控制檯鍵入數據 while (scanner.hasNext()){ String str = scanner.next(); //準備工做就緒後,準備發送數據給服務端 //打印當前日期轉爲Byte數據傳出 byteBuffer.put((new Date().toString()+":--->"+str).getBytes()); //切換讀寫模式 byteBuffer.flip(); //寫入通道 socketChannel.write(byteBuffer); //完畢時,清除緩衝區內容  byteBuffer.clear(); }

    }

 

因爲掃描流(scanner)不能用於測試類,因此在main方法下進行測試:

每次輸入的內容都會被轉爲Byte字節進行傳輸。

客戶端輸入結果:

 

 服務端輸出結果:

 

 每輸入一次便傳輸一次。

//完成傳輸須要取消選擇鍵,防止下次出問題
selectionKeyIterator.remove();
相關文章
相關標籤/搜索