Java 多線程NIO學習

IO模型

  1. 阻塞IO 若是數據沒有準備就緒,就一直等待,直到數據準備就緒;整個進程會被阻塞。
  2. 非阻塞IO 需不斷詢問內核是否已經準備好數據,非阻塞雖然不用等待可是一直佔用CPU。
  3. 多路複用IO NIO 多路複用IO,會有一個線程不斷地去輪詢多個socket的狀態,當socket有讀寫事件的時候纔會調用IO讀寫操做。 用一個線程管理多個socket,是經過selector.select()查詢每一個通道是否有事件到達,若是沒有事件到達,則會一直阻塞在那裏,所以也會帶來線程阻塞問題。
  4. 信號驅動IO模型 在信號驅動IO模型中,當用戶發起一個IO請求操做時,會給對應的socket註冊一個信號函數,線程會繼續執行,當數據準備就緒的時候會給線程發送一個信號,線程接受到信號時,會在信號函數中進行IO操做。 非阻塞IO、多路複用IO、信號驅動IO都不會形成IO操做的第一步,查看數據是否準備就緒而帶來的線程阻塞,可是在第二步,對數據進行拷貝都會使線程阻塞。
  5. 異步IO jdk7AIO 異步IO是最理想的IO模型,當線程發出一個IO請求操做時,接着就去作本身的事情了,內核去查看數據是否準備就緒和準備就緒後對數據的拷貝,拷貝完之後內核會給線程發送一個通知說整個IO操做已經完成了,數據能夠直接使用了。 同步的IO操做在第二個階段,對數據的拷貝階段,都會形成線程的阻塞,異步IO則不會。

異步IO在IO操做的兩個階段,都不會使線程阻塞。 Java 的 I/O 依賴於操做系統的實現。html

Java NIO的工做原理

  1. 由一個專門的線程(Selector)來處理全部的IO事件,並負責分發。
  2. 事件驅動機制:事件到的時候觸發,而不是同步的去監視事件。
  3. 線程通信:線程之間經過 wait,notify 等方式通信。保證每次上下文切換都是有意義的。減小無謂的線程切換。

三大基本組件

Channel

  1. FileChannel, 從文件中讀寫數據。
  2. DatagramChannel,經過UDP讀寫網絡中的數據。
  3. SocketChannel,經過TCP讀寫網絡中的數據。
  4. ServerSocketChannel,能夠監聽新進來的TCP鏈接,對每個新進來的鏈接都會建立一個SocketChannel。

Java NIO 的通道相似流,但又有些不一樣:java

  1. 既能夠從通道中讀取數據,又能夠寫數據到通道。但流的讀寫一般是單向的。
  2. 通道能夠異步地讀寫。
  3. 通道中的數據老是要先讀到一個 Buffer,或者老是要從一個 Buffer 中寫入。

Buffer

關鍵的Buffer實現 ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer服務器

Buffer兩種模式、三個屬性: 網絡

capacity
做爲一個內存塊,Buffer有一個固定的大小值,也叫「capacity」.你只能往裏寫capacity個byte、long,char等類型。一旦Buffer滿了,須要將其清空(經過讀數據或者清除數據)才能繼續寫數據往裏寫數據。多線程

position
當你寫數據到Buffer中時,position表示當前的位置。初始的position值爲0.當一個byte、long等數據寫到Buffer後, position會向前移動到下一個可插入數據的Buffer單元。position最大可爲capacity – 1. 當讀取數據時,也是從某個特定位置讀。當將Buffer從寫模式切換到讀模式,position會被重置爲0. 當從Buffer的position處讀取數據時,position向前移動到下一個可讀的位置。併發

limit
在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。 寫模式下,limit等於Buffer的capacity。 當切換Buffer到讀模式時, limit表示你最多能讀到多少數據。所以,當切換Buffer到讀模式時,limit會被設置成寫模式下的position值。換句話說,你能讀到以前寫入的全部數據(limit被設置成已寫數據的數量,這個值在寫模式下就是position)app

參考連接:Buffer原理 www.cnblogs.com/chenpi/p/64…異步

Selector

Selector(選擇器)是Java NIO中可以檢測一到多個NIO通道,並可以知曉通道是否爲諸如讀寫事件作好準備的組件。這樣,一個單獨的線程能夠管理多個channel,從而管理多個網絡鏈接。socket

監聽四種事件ide

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

select()方法
select()阻塞到至少有一個通道在你註冊的事件上就緒了。 select(long timeout)和select()同樣,除了最長會阻塞timeout毫秒(參數)。
selectedKeys()方法
調用selector的selectedKeys()方法,訪問「已選擇鍵集(selected key set)」中的就緒通道。

參考連接:操做系統層面分析Selector原理 zhhphappy.iteye.com/blog/203289…

NIO實現

服務端

public class NIOServerSocket {
 
    //存儲SelectionKey的隊列
    private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();
    private static Selector selector = null;
 
    //添加SelectionKey到隊列
    public static void addWriteQueue(SelectionKey key){
        synchronized (writeQueue) {
            writeQueue.add(key);
            //喚醒主線程
            selector.wakeup();
        }
    }
 
    public static void main(String[] args) throws IOException {
 
        // 1.建立ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 2.綁定端口
        serverSocketChannel.bind(new InetSocketAddress(60000));
        // 3.設置爲非阻塞
        serverSocketChannel.configureBlocking(false);
        // 4.建立通道選擇器
        selector = Selector.open();
        /* * 5.註冊事件類型 * * sel:通道選擇器 * ops:事件類型 ==>SelectionKey:包裝類,包含事件類型和通道自己。四個常量類型表示四種事件類型 * SelectionKey.OP_ACCEPT 獲取報文 SelectionKey.OP_CONNECT 鏈接 * SelectionKey.OP_READ 讀 SelectionKey.OP_WRITE 寫 */
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            System.out.println("服務器端:正在監聽60000端口");
            // 6.獲取可用I/O通道,得到有多少可用的通道
            int num = selector.select();
            if (num > 0) { // 判斷是否存在可用的通道
                // 得到全部的keys
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                // 使用iterator遍歷全部的keys
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                // 迭代遍歷當前I/O通道
                while (iterator.hasNext()) {
                    // 得到當前key
                    SelectionKey key = iterator.next();
                    // 調用iterator的remove()方法,並非移除當前I/O通道,標識當前I/O通道已經處理。
                    iterator.remove();
                    // 判斷事件類型,作對應的處理
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = ssChannel.accept();
 
                        System.out.println("處理請求:"+ socketChannel.getRemoteAddress());
                        // 獲取客戶端的數據
                        // 設置非阻塞狀態
                        socketChannel.configureBlocking(false);
                        // 註冊到selector(通道選擇器)
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        System.out.println("讀事件");
                        //取消讀事件的監控
                        key.cancel();
                        //調用讀操做工具類
                        NIOHandler.read(key);
                    } else if (key.isWritable()) {
                        System.out.println("寫事件");
                        //取消讀事件的監控
                        key.cancel();
                        //調用寫操做工具類
                        NIOHandler.write(key);
                    }
                }
            }else{
                synchronized (writeQueue) {
                    while(writeQueue.size() > 0){
                        SelectionKey key = writeQueue.remove(0);
                        //註冊寫事件
                        SocketChannel channel = (SocketChannel) key.channel();
                        Object attachment = key.attachment();
                        channel.register(selector, SelectionKey.OP_WRITE,attachment);
                    }
                }
            }
        }
    }
 
}
複製代碼

消息處理

public class NIOHandler {
 
    //構造線程池
    private static ExecutorService executorService  = Executors.newFixedThreadPool(10);
 
    public static void read(final SelectionKey key){
        //得到線程並執行
        executorService.submit(new Runnable() {
 
            @Override
            public void run() {
                try {
                    SocketChannel readChannel = (SocketChannel) key.channel();
                    // I/O讀數據操做
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int len = 0;
                    while (true) {
                        buffer.clear();
                        len = readChannel.read(buffer);
                        if (len == -1) break;
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            baos.write(buffer.get());
                        }
                    }
                    System.out.println("服務器端接收到的數據:"+ new String(baos.toByteArray()));
                    //將數據添加到key中
                    key.attach(baos);
                    //將註冊寫操做添加到隊列中
                    NIOServerSocket.addWriteQueue(key);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
 
    public static void write(final SelectionKey key) {
        //拿到線程並執行
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // 寫操做
                    SocketChannel writeChannel = (SocketChannel) key.channel();
                    //拿到客戶端傳遞的數據
                    ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();
                    System.out.println("客戶端發送來的數據:"+new String(attachment.toByteArray()));
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    String message = "你好,我是服務器!!";
                    buffer.put(message.getBytes());
                    buffer.flip();
                    writeChannel.write(buffer);
                    writeChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
複製代碼

客戶端

public class NIOClientSocket {
 
    public static void main(String[] args) throws IOException {
        //使用線程模擬用戶 併發訪問
        for (int i = 0; i < 1; i++) {
            new Thread(){
                public void run() {
                    try {
                        //1.建立SocketChannel
                        SocketChannel socketChannel=SocketChannel.open();
                        //2.鏈接服務器
                        socketChannel.connect(new InetSocketAddress("localhost",60000));
                        //寫數據
                        String msg="我是客戶端"+Thread.currentThread().getId();
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        buffer.put(msg.getBytes());
                        buffer.flip();
                        socketChannel.write(buffer);
                        socketChannel.shutdownOutput();
                        //讀數據
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        int len = 0;
                        while (true) {
                            buffer.clear();
                            len = socketChannel.read(buffer);
                            if (len == -1)
                                break;
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                bos.write(buffer.get());
                            }
                        }
                        System.out.println("客戶端收到:"+new String(bos.toByteArray()));
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
        }
    }
}
複製代碼

多線程NIO Tips

  1. 示例代碼僅供學習參考。對於一個已經被監聽到的事件,處理前先取消事件(SelectionKey .cancel())監控。不然selector.selectedKeys()會一直獲取到該事件,但該方法比較粗暴,而且後續register會產生多個SelectionKey。推薦使用selectionKey.interestOps()改變感興趣事件。
  2. Selector.select()和Channel.register()需同步。
  3. 當Channel設置爲非阻塞(Channel.configureBlocking(false))時,SocketChannel.read 沒讀到數據也會返回,返回參數等於0。
  4. OP_WRITE事件,寫緩衝區在絕大部分時候都是有空閒空間的,因此若是你註冊了寫事件,這會使得寫事件一直處於就就緒,選擇處理現場就會一直佔用着CPU資源。參考下面的第二個連接。
  5. 粘包問題。

參考連接:SocketChannel.read blog.csdn.net/cao47820824…
參考連接:NIO坑 www.jianshu.com/p/1af407c04…

相關文章
相關標籤/搜索