java NIO原理及實例

一、reactor(反應器)模式html

  使用單線程模擬多線程,提升資源利用率和程序的效率,增長系統吞吐量。下面例子比較形象的說明了什麼是反應器模式:java

  一個老闆經營一個飯店,react

  傳統模式 - 來一個客人安排一個服務員招呼,客人很滿意;(至關於一個鏈接一個線程)服務器

  後來客人愈來愈多,須要的服務員愈來愈多,資源條件不足以再請更多的服務員了,傳統模式已經不能知足需求。老闆之因此爲老闆天然有過人之處,老闆發現,服務員在爲客人服務時,當客人點菜的時候,服務員基本處於等待狀態,(阻塞線程,不作事)。網絡

  因而乎就讓服務員在客人點菜的時候,去爲其餘客人服務,當客人菜點好後再招呼服務員便可。 --反應器(reactor)模式誕生了多線程

  飯店的生意紅紅火火,幾個服務員就足以支撐大量的客流量,老闆用有限的資源賺了更多的money~~~~^_^app

 

二、NIO中的重要概念 通道、緩衝區、選擇器異步

  通道:相似於流,可是能夠異步讀寫數據(流只能同步讀寫),通道是雙向的,(流是單向的),通道的數據老是要先讀到一個buffer 或者 從一個buffer寫入,即通道與buffer進行數據交互。socket

  通道類型:  ide

    • FileChannel:從文件中讀寫數據。  
    • DatagramChannel:能經過UDP讀寫網絡中的數據。  
    • SocketChannel:能經過TCP讀寫網絡中的數據。  
    • ServerSocketChannel:能夠監聽新進來的TCP鏈接,像Web服務器那樣。對每個新進來的鏈接都會建立一個SocketChannel。  

  FileChannel比較特殊,它能夠與通道進行數據交互, 不能切換到非阻塞模式,套接字通道能夠切換到非阻塞模式;

  緩衝區 - 本質上是一塊能夠存儲數據的內存,被封裝成了buffer對象而已!

  緩衝區類型:

    • ByteBuffer  
    • MappedByteBuffer  
    • CharBuffer  
    • DoubleBuffer  
    • FloatBuffer  
    • IntBuffer  
    • LongBuffer  
    • ShortBuffer  

  經常使用方法:

    •   allocate() - 分配一塊緩衝區  
    •   put() -  向緩衝區寫數據
    •   get() - 向緩衝區讀數據  
    •   filp() - 將緩衝區從寫模式切換到讀模式  
    •     clear() - 從讀模式切換到寫模式,不會清空數據,但後續寫數據會覆蓋原來的數據,即便有部分數據沒有讀,也會被遺忘;  
    •       compact() - 從讀數據切換到寫模式,數據不會被清空,會將全部未讀的數據copy到緩衝區頭部,後續寫數據不會覆蓋,而是在這些數據以後寫數據
    •   mark() - 對position作出標記,配合reset使用
    •       reset() - 將position置爲標記值    

緩衝區的一些屬性:

    •     capacity - 緩衝區大小,不管是讀模式仍是寫模式,此屬性值不會變;
    •     position - 寫數據時,position表示當前寫的位置,每寫一個數據,會向下移動一個數據單元,初始爲0;最大爲capacity - 1

        切換到讀模式時,position會被置爲0,表示當前讀的位置

    •     limit - 寫模式下,limit 至關於capacity 表示最多能夠寫多少數據,切換到讀模式時,limit 等於原先的position,表示最多能夠讀多少數據。

  選擇器:至關於一個觀察者,用來監聽通道感興趣的事件,一個選擇器能夠綁定多個通道;

   通道向選擇器註冊時,須要指定感興趣的事件,選擇器支持如下事件:

    • SelectionKey.OP_CONNECT
    • SelectionKey.OP_ACCEPT
    • SelectionKey.OP_READ
    • SelectionKey.OP_WRITE  

   若是你對不止一種事件感興趣,那麼能夠用「位或」操做符將常量鏈接起來,以下:

     int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 

   通道向選擇器註冊時,會返回一個 SelectionKey對象,具備以下屬性

    • interest集合
    • ready集合  
    • Channel  
    • Selector
    • 附加的對象(可選)  

  用「位與」操做interest 集合和給定的SelectionKey常量,能夠肯定某個肯定的事件是否在interest 集合中。

int interestSet = selectionKey.interestOps();  
 
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

  ready 集合是通道已經準備就緒的操做的集合。在一次選擇(Selection)以後,你會首先訪問這個ready set。Selection將在下一小節進行解釋。能夠這樣訪問ready集合:
  int readySet = selectionKey.readyOps();

   也可使用如下四個方法獲取已就緒事件,返回值爲boolean:

selectionKey.isAcceptable();  
selectionKey.isConnectable();  
selectionKey.isReadable();  
selectionKey.isWritable();  

   能夠將一個對象或者更多信息附着到SelectionKey上,即記錄在附加對象上,方法以下:

selectionKey.attach(theObject);  
Object attachedObj = selectionKey.attachment();  

   能夠經過選擇器的select方法獲取是否有就緒的通道;

    • int select()  
    • int select(long timeout)  
    • int selectNow()

  返回值表示上次執行select以後,就緒通道的個數。 

  能夠經過selectedKeySet獲取已就緒的通道。返回值是SelectionKey 的集合,處理完相應的通道以後,須要removed 由於Selector不會本身removed

 

  select阻塞後,能夠用wakeup喚醒;執行wakeup時,若是沒有阻塞的select  那麼執行完wakeup後下一個執行select就會當即返回。

  調用close() 方法關閉selector

 下面是一個簡單的實例代碼,幫助理解上面的內容:

package com.pt.nio;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

public class Reactor implements Runnable {
    public int id = 100001;
    public int bufferSize = 2048;
    @Override
    public void run() {
        // TODO Auto-generated method stub
        init();
    }

    public void init() {
        try {
            // 建立通道和選擇器
            ServerSocketChannel socketChannel = ServerSocketChannel.open();
            Selector selector = Selector.open();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(
                    InetAddress.getLocalHost(), 4700);
            socketChannel.socket().bind(inetSocketAddress);
            // 設置通道非阻塞 綁定選擇器
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_ACCEPT).attach(
                    id++);
            System.out.println("Server started .... port:4700");
            listener(selector);

        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    public void listener(Selector in_selector) {
        try {
            while (true) {
                Thread.sleep(1*1000);
                in_selector.select(); // 阻塞 直到有就緒事件爲止
                Set<SelectionKey> readySelectionKey = in_selector
                        .selectedKeys();
                Iterator<SelectionKey> it = readySelectionKey.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    // 判斷是哪一個事件
                    if (selectionKey.isAcceptable()) {// 客戶請求鏈接
                        System.out.println(selectionKey.attachment()
                                + " - 接受請求事件");
                        // 獲取通道 接受鏈接,
                        // 設置非阻塞模式(必須),同時須要註冊 讀寫數據的事件,這樣有消息觸發時才能捕獲
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey
                                .channel();
                        serverSocketChannel
                                .accept()
                                .configureBlocking(false)
                                .register(
                                        in_selector,
                                        SelectionKey.OP_READ
                                                | SelectionKey.OP_WRITE).attach(id++);
                        System.out
                                .println(selectionKey.attachment() + " - 已鏈接");

                        // 下面這種寫法是有問題的 不該該在serverSocketChannel上面註冊
                        /*
                         * serverSocketChannel.configureBlocking(false);
                         * serverSocketChannel.register(in_selector,
                         * SelectionKey.OP_READ);
                         * serverSocketChannel.register(in_selector,
                         * SelectionKey.OP_WRITE);
                         */
                    }
                    if (selectionKey.isReadable()) {// 讀數據
                        System.out.println(selectionKey.attachment()
                                + " - 讀數據事件");
                        SocketChannel clientChannel=(SocketChannel)selectionKey.channel();
                        ByteBuffer receiveBuf = ByteBuffer.allocate(bufferSize);
                        clientChannel.read(receiveBuf);
                        System.out.println(selectionKey.attachment()
                                + " - 讀取數據:" + getString(receiveBuf));
                    }
                    if (selectionKey.isWritable()) {// 寫數據
                        System.out.println(selectionKey.attachment()
                                + " - 寫數據事件");
                        SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer sendBuf = ByteBuffer.allocate(bufferSize);
                        String sendText = "hello\n";
                        sendBuf.put(sendText.getBytes());
                        sendBuf.flip();        //寫完數據後調用此方法
                        clientChannel.write(sendBuf);
                    }
                    if (selectionKey.isConnectable()) {
                        System.out.println(selectionKey.attachment()
                                + " - 鏈接事件");
                    }
                    // 必須removed 不然會繼續存在,下一次循環還會進來,
                    // 注意removed 的位置,針對一個.next() remove一次
                    it.remove(); 
                }
            }
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("Error - " + e.getMessage());
            e.printStackTrace();
        }

    }
    /**
     * ByteBuffer 轉換 String
     * @param buffer
     * @return
     */
    public static String getString(ByteBuffer buffer)
    {
        String string = "";
        try
        {
            for(int i = 0; i<buffer.position();i++){
                string += (char)buffer.get(i);
            }
            return string;
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }
}
NIO服務器端
package com.pt.bio;

import java.io.*;
import java.net.*;

public class BioServer implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("Hello Server!!");

        try {
            ServerSocket server = null;
            try {
                server = new ServerSocket(4700);
                // 建立一個ServerSocket在端口4700監聽客戶請求
            } catch (Exception e) {
                System.out.println("can not listen to:" + e);
                // 出錯,打印出錯信息
            }
            Socket socket = null;
            try {
                socket = server.accept();
                // 使用accept()阻塞等待客戶請求,有客戶
                // 請求到來則產生一個Socket對象,並繼續執行
            } catch (Exception e) {
                System.out.println("Error." + e);
                // 出錯,打印出錯信息
            }
            String line;
            BufferedReader is = new BufferedReader(new InputStreamReader(
                    socket.getInputStream()));
            // 由Socket對象獲得輸入流,並構造相應的BufferedReader對象
            
            // 由Socket對象獲得輸出流,並構造PrintWriter對象
//            BufferedReader sin = new BufferedReader(new InputStreamReader(
//                    System.in));
            // 由系統標準輸入設備構造BufferedReader對象
            System.out.println("Client:" + is.readLine());
            PrintWriter os = new PrintWriter(socket.getOutputStream());
            // 在標準輸出上打印從客戶端讀入的字符串
            line = "hello";
            // 從標準輸入讀入一字符串
//            while (!line.equals("bye")) {
                // 若是該字符串爲 "bye",則中止循環
                os.println(line);
                // 向客戶端輸出該字符串
                os.flush();
                // 刷新輸出流,使Client立刻收到該字符串
//                System.out.println("Server:" + line);
                // 在系統標準輸出上打印讀入的字符串
//                System.out.println("Client:" + is.readLine());
                // 從Client讀入一字符串,並打印到標準輸出上
//                line = sin.readLine();
                // 從系統標準輸入讀入一字符串
//            } // 繼續循環
//            os.close(); // 關閉Socket輸出流
            is.close(); // 關閉Socket輸入流
            socket.close(); // 關閉Socket
            server.close(); // 關閉ServerSocket
        } catch (Exception e) {
            System.out.println("Error." + e);
            // 出錯,打印出錯信息
        }

    }

}
BIO服務器端
package com.pt;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

import org.junit.Test;

import com.pt.bio.BioServer;
import com.pt.nio.Reactor;

public class TestReactor {

    @Test
    public void testConnect() throws Exception{
        Socket socket=new Socket("192.168.82.35",4700);//BIO 阻塞
        System.out.println("鏈接成功");
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         
        //下面這種寫法,不用關閉客戶端,服務器端也是能夠收到的
        {
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            printWriter.println("hi");
            printWriter.flush();
        }
        //這種寫法必須關閉客戶端,服務器端才能夠收到 NIO不用
        {
//        socket.getOutputStream().write(new byte[]{'h','i'});
//        socket.getOutputStream().flush();
        //必須關閉BIO服務器才能收到消息.NIO服務器不須要關閉
        //socket.close();
        }
        byte[] buf = new byte[2048];
        System.out.println("準備讀取數據~~");
        
        while(true){
            try {
                //兩種讀取數據方式
                int count = socket.getInputStream().read(buf);        //會阻塞
                //String readFromServer = bufferedReader.readLine();//能夠讀取到數據 會阻塞,直到碰見\n
                //System.out.println("方式二: 讀取數據" + readFromServer);    
                System.out.println("方式一: 讀取數據" + new String(buf) + " count = " + count);
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            //break;
        }
        
    }
    
    @Test 
    public void testNioServer(){
        Thread server = new Thread(new Reactor());
        server.start();

        while(true){
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    @Test
    public void testBioServer(){
        Thread server = new Thread(new BioServer());
        server.start();

        while(true){
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}
BIO客戶端及測試類

其中 testNioServer()方法,是啓動NIO服務器端;

testBioServer()方法是啓動BIO服務器端

testConnect()是BIO的一個鏈接

基於NIO實現的時鐘服務器:http://www.cnblogs.com/tengpan-cn/p/6529628.html

 

 

一篇寫的比較詳細的JAVA NIO的文章:http://www.iteye.com/magazines/132-Java-NIO

相關文章
相關標籤/搜索