Java-NIO 之 Selector 與 Pipe

關於阻塞與非阻塞:http://www.javashuo.com/article/p-qrzhqjkj-mh.htmlhtml

 

1、傳統的 IO 流都是阻塞式的

當一個線程調用 read() 或 write() 時,該線程被阻塞,直到有一些數據被讀取或寫入,該線程在此期間不能執行其餘任務。java

所以,在網絡通訊進行 IO 操做時,因爲線程會阻塞,因此服務器端必須爲每一個客戶端都提供一個獨立的線程進行處理,當服務器端須要處理大量客戶端時,性能急劇降低。服務器

package nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Scanner;


public class TestBlockingNIO {

    // 客戶端
    @Test
    public void client() throws IOException {
        // 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        // 分配指定大小的緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);

        // 發送到服務端
        Scanner scan = new Scanner(System.in);
        while (scan.hasNext()) {
            String str = scan.next();
            buf.put(str.getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();

            if ("exit".equals(str)) {
                break;
            }
        }


        // 接收服務端的反饋
        sChannel.shutdownOutput();
        int len = 0;
        while ((len = sChannel.read(buf)) != -1) {
            buf.flip();
            System.out.println(new String(buf.array(), 0, len));
            buf.clear();
        }

        // 關閉通道
        sChannel.close();
    }

    // 服務端
    @Test
    public void server() throws IOException {
        // 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        // 綁定鏈接
        ssChannel.bind(new InetSocketAddress(9898));

        retry:
        while (true) {
            // 獲取客戶端鏈接的通道
            SocketChannel sChannel = ssChannel.accept();

            // 分配指定大小的緩衝區
            ByteBuffer buf = ByteBuffer.allocate(1024);

            // 接收客戶端的數據
            while (sChannel.read(buf) != -1) {
                String str = new String(buf.array()).trim();
                if ("exit".equals(str)) {
                    break retry;
                }
                buf.flip();
                System.out.println(str);
                buf.clear();
            }

            // 發送反饋給客戶端
            buf.put("服務端接收數據成功!".getBytes());
            buf.flip();
            sChannel.write(buf);

            // 關閉通道
            sChannel.close();
        }

        // 關閉通道
        ssChannel.close();
    }
}
View Code

 

2、Java NIO 是非阻塞式的

當線程從某通道進行讀寫數據時,若沒有數據可用時,該線程能夠進行其餘任務。網絡

線程一般將非阻塞 IO 的空閒時間用於在其餘通道上執行 IO 操做,因此單獨的線程能夠管理多個輸入和輸出通道。ide

所以,NIO 可讓服務器端使用一個或有限幾個線程來同時處理鏈接到服務器端的全部客戶端。性能

注:NIO 的 IO 行爲仍是同步的。spa

/*
 * 使用 NIO 完成網絡通訊的三個核心:
 *
 * 1. 通道(Channel):負責鏈接
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 *
 *                 |--Pipe.SinkChannel
 *                 |--Pipe.SourceChannel
 *
 * 2. 緩衝區(Buffer):負責數據的存取
 *
 * 3. 選擇器(Selector):是 SelectableChannel 的多路複用器。用於監控 SelectableChannel 的 IO 情況
 * 能夠監聽的事件類型(可以使用 SelectionKey 的四個常量表示)
 *     讀: SelectionKey.OP_READ  (1)
 *     寫: SelectionKey.OP_WRITE    (4)
 *     鏈接: SelectionKey.OP_CONNECT(8)
 *     接收: SelectionKey.OP_ACCEPT  (16)
 *
 * Selector 經常使用方法
 * Set<SelectionKey> keys():全部的 SelectionKey 集合。表明註冊在該 Selector上的 Channel
 * selectedKeys():被選擇的 SelectionKey 集合。返回此Selector的已選擇鍵集
 * intselect():監控全部註冊的 Channel,當它們中間有須要處理的 IO 操做時,該方法返回,並將對應得的 SelectionKey 加入被選擇的 SelectionKey 集合中,該方法返回這些 Channel 的數量。
 * int select(long timeout):能夠設置超時時長的 select() 操做
 * int selectNow():執行一個當即返回的 select() 操做,該方法不會阻塞線程
 * Selector wakeup():使一個還未返回的 select() 方法當即返回
 * void close():關閉該選擇器
 */

1.TCP-SocketChannel

package nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class TestNonBlockingNIO {

    //客戶端
    @Test
    public void client() throws IOException {
        // 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        // 切換非阻塞模式
        sChannel.configureBlocking(false);

        // 分配指定大小的緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);

        // 發送數據給服務端
        Scanner scan = new Scanner(System.in);

        while (scan.hasNext()) {
            String str = scan.next();
            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }

        // 關閉通道
        sChannel.close();
    }

    //服務端
    @Test
    public void server() throws IOException {
        // 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        // 切換非阻塞模式
        ssChannel.configureBlocking(false);

        // 綁定鏈接
        ssChannel.bind(new InetSocketAddress(9898));

        // 獲取選擇器
        Selector selector = Selector.open();

        // 將通道註冊到選擇器上, 而且指定「監聽接收事件」
        ssChannel.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);

        // 輪詢式的獲取選擇器上已經「準備就緒」的事件
        while (selector.select() > 0) {

            // 獲取當前選擇器中全部註冊的「選擇鍵(已就緒的監聽事件)」
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while (it.hasNext()) {
                // 獲取準備「就緒」的是事件
                SelectionKey sk = it.next();

                // 判斷具體是什麼事件準備就緒
                if (sk.isAcceptable()) {
                    // 若「接收就緒」,獲取客戶端鏈接
                    SocketChannel sChannel = ssChannel.accept();

                    // 切換非阻塞模式
                    sChannel.configureBlocking(false);

                    // 將該通道註冊到選擇器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                } else if (sk.isReadable()) {
                    // 獲取當前選擇器上「讀就緒」狀態的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();

                    // 讀取數據
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    int len = 0;
                    while ((len = sChannel.read(buf)) > 0) {
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }

                // 移除當前 SelectionKey
                it.remove();
            }
        }
    }
}
View Code

2.UDP-DatagramChannel

package nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class TestNonBlockingNIO2 {

    @Test
    public void send() throws IOException{
        DatagramChannel dc = DatagramChannel.open();

        dc.configureBlocking(false);

        ByteBuffer buf = ByteBuffer.allocate(1024);

        Scanner scan = new Scanner(System.in);

        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + ":\n" + str).getBytes());
            buf.flip();
            dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
            buf.clear();
        }

        dc.close();
    }

    @Test
    public void receive() throws IOException{
        DatagramChannel dc = DatagramChannel.open();

        dc.configureBlocking(false);

        dc.bind(new InetSocketAddress(9898));

        Selector selector = Selector.open();

        dc.register(selector, SelectionKey.OP_READ);

        while(selector.select() > 0){
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while(it.hasNext()){
                SelectionKey sk = it.next();

                if(sk.isReadable()){
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    dc.receive(buf);
                    buf.flip();
                    System.out.println(new String(buf.array(), 0, buf.limit()));
                    buf.clear();
                }
            }

            it.remove();
        }
    }
}
View Code

 

3、Pipe(管道)

Java NIO 管道是 2 個線程之間的單向數據鏈接。Pipe 有一個 source 通道和一個 sink 通道。數據會被寫到 sink 通道,從 source 通道讀取。.net

package nio;

import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class TestPipe {

    @Test
    public void test() throws IOException {
        // 獲取管道
        Pipe pipe = Pipe.open();

        ByteBuffer buf = ByteBuffer.allocate(1024);
        buf.put("經過單向管道發送數據".getBytes());
        buf.flip();

        // 將緩衝區中的數據寫入管道
        Pipe.SinkChannel sinkChannel = pipe.sink();
        sinkChannel.write(buf);


        ByteBuffer buf2 = ByteBuffer.allocate(1024);
        // 讀取緩衝區中的數據
        Pipe.SourceChannel sourceChannel = pipe.source();
        int len = sourceChannel.read(buf2);
        System.out.println(new String(buf2.array(), 0, len));

        sourceChannel.close();
        sinkChannel.close();
    }
}
View Code

 


https://mp.weixin.qq.com/s?__biz=Mzg3MjA4MTExMw==&mid=2247484746&idx=1&sn=c0a7f9129d780786cabfcac0a8aa6bb7線程

相關文章
相關標籤/搜索