java nio 學習

     1,nio概述

     NIO主要有三大核心部分:Channel(通道),Buffer(緩衝區), Selector。傳統IO基於字節流和字符流進行操做,而NIO基於Channel和Buffer(緩衝區)進行操做,數據老是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(選擇區)用於監聽多個通道的事件(好比:鏈接打開,數據到達)。所以,單個線程能夠監聽多個數據通道。
 
NIO和傳統IO(一下簡稱IO)之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取全部字節,它們沒有被緩存在任何地方。此外,它不能先後移動流中的數據。若是須要先後移動從流中讀取的數據,須要先將它緩存到一個緩衝區。NIO的緩衝導向方法略有不一樣。數據讀取到一個它稍後處理的緩衝區,須要時可在緩衝區中先後移動。這就增長了處理過程當中的靈活性。可是,還須要檢查是否該緩衝區中包含全部您須要處理的數據。並且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區裏還沒有處理的數據。
 
IO的各類流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據徹底寫入。該線程在此期間不能再幹任何事情了。 NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,可是它僅能獲得目前可用的數據,若是目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,因此直至數據變的能夠讀取以前,該線程能夠繼續作其餘的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不須要等待它徹底寫入,這個線程同時能夠去作別的事情。 線程一般將非阻塞IO的空閒時間用於在其它通道上執行IO操做,因此一個單獨的線程如今能夠管理多個輸入和輸出通道(channel)。
 

     2,nio相關類

     buffer

     nio中的數據讀寫操做都要在buffer中進行,buffer是一個線性的有序的數據集,只能容納某種特定的數據類型.
     buffer中有三個變量在讀取的時候會涉及到,position,limit,capacity
 

 

示例代碼以下
        IntBuffer buf = IntBuffer.allocate(10) ;    // 準備出10個大小的緩衝區
        System.out.print("一、寫入數據以前的position、limit和capacity:") ;
        System.out.println("position = " + buf.position() + ",limit = " + buf.limit() + ",capacty = " + buf.capacity()) ;
        int temp[] = {5,7,9} ;// 定義一個int數組
        buf.put(3) ;    // 設置一個數據
        buf.put(temp) ;    // 此時已經存放了四個記錄
        System.out.print("二、寫入數據以後的position、limit和capacity:") ;
        System.out.println("position = " + buf.position() + ",limit = " + buf.limit() + ",capacty = " + buf.capacity()) ;
 
        buf.flip() ;    // 重設緩衝區
        // postion = 0 ,limit = 本來position
        System.out.print("三、準備輸出數據時的position、limit和capacity:") ;
        System.out.println("position = " + buf.position() + ",limit = " + buf.limit() + ",capacty = " + buf.capacity()) ;
        System.out.print("緩衝區中的內容:") ;
        while(buf.hasRemaining()){
            int x = buf.get() ;
            System.out.print(x + "、") ;
        }

 

buffer示意圖以下
 

 

 

     channel

     channel能夠用來讀取和寫入數據,相似於以前的inputstream和outputstream,可是操做的數據都是先讀寫到buffer中的,或者從buffer中來讀寫.,傳統的流操做分爲input和output,而channel既能夠輸入也能夠輸出.下圖展現了channel的類體系圖,其中經常使用的紅框標註
     

 

 
     filechannel的例子
     
        File file1 = new File("d:" + File.separator + "note.txt") ;
        File file2 = new File("d:" + File.separator + "outnote.txt") ;
        FileInputStream input = null ;
        FileOutputStream output = null ;
        output = new FileOutputStream(file2) ;
        input = new FileInputStream(file1) ;
        FileChannel fout = null ;    // 聲明FileChannel對象
        FileChannel fin = null ;    // 定義輸入的通道
        fout = output.getChannel() ;    // 獲得輸出的通道
        fin = input.getChannel() ;    // 獲得輸入的通道
        ByteBuffer buf = ByteBuffer.allocate(1024) ;
 
        int temp = 0 ;
        while((temp=fin.read(buf))!=-1){
            buf.flip() ;
            fout.write(buf) ;
            buf.clear() ;    // 清空緩衝區,全部的狀態變量的位置恢復到原點
        }

 

 

     selector

     在nio中selector是最重要的一個概念,在原來使用io和scoket的構造網絡服務的時候,全部的網絡服務將使用阻塞的方法來進行客戶端的連接,若是使用了selector就能夠構造一個非阻塞的網絡服務.
     在構造非阻塞網絡服務的時候,須要使用selectablechannel類想selector類註冊,並且在新io中實現網絡服務須要依靠serverscoketchannel和scoketchannel這兩個類都是selectablechannel的子類,這個父類提供了註冊selector和阻塞模式的選擇.
     

 

    
     

     3,scoket網絡應用中的nio實現

     下面經過代碼來實現三種如今網絡中使用的網絡服務提供模式(時間服務器)

          傳統的bio

          傳統的bio模型,服務端通常都是一個獨立的acceptor線程負責監聽客戶端的鏈接,收到客戶端的請求後,爲每個client都開啓一個線程去進行處理處理完成後經過outputstream返回給client,線程銷燬.以下圖

 

 
缺點就是缺少彈性伸縮能力,當client端的併發量上去以後,server這邊的線程個數也上去了,慢慢就會致使系統性能降低,堆棧溢出,建立新線程失敗,系統宕機等.
實例代碼
     
public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
            }
        }
        ServerSocket server = null;
        try {
            server = new ServerSocket(port);
            System.out.println("The time server is start in port : " + port);
            Socket socket = null;
            while (true) {
                socket = server.accept();
                new Thread(new TimeServerHandler(socket)).start();
            }
        } finally {
            if (server != null) {
                System.out.println("The time server close");
                server.close();
                server = null;
            }
        }
    }
}
 
public class TimeServerHandler implements Runnable {
    private Socket socket;
    public TimeServerHandler(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(
                    this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String currentTime = null;
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("The time server receive order : " + body);
                currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                        System.currentTimeMillis()).toString() : "BAD ORDER";
                out.println(currentTime);
            }
 
        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
                out = null;
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

 

 
 

          pio

          僞異步io模式,僞瞭解決上面的那種問題,對她進行優化,在server端,使用線程池的來處理client的連接,這樣就能夠防止由於建立了太多的線程吧系統給拖垮的問題,雖然解決了這個,可是底層採用的依然是阻塞的網絡服務模型,當client數量上來的時候,或者一個線程的處理時間過長,線程池一直處於滿載的狀態,後續的client會一直進不來,鏈接超時. 下面是實例
     
public class TimeServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
            }
        }
        ServerSocket server = null;
        try {
            server = new ServerSocket(port);
            System.out.println("The time server is start in port : " + port);
            Socket socket = null;
            TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(
                    50, 10000);// 建立IO任務線程池
            while (true) {
                socket = server.accept();
                singleExecutor.execute(new TimeServerHandler(socket));
            }
        } finally {
            if (server != null) {
                System.out.println("The time server close");
                server.close();
                server = null;
            }
        }
    }
}
 
 
public class TimeServerHandlerExecutePool {
 
    private ExecutorService executor;
 
    public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
    executor = new ThreadPoolExecutor(Runtime.getRuntime()
        .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<java.lang.Runnable>(queueSize));
    }
 
    public void execute(java.lang.Runnable task) {
    executor.execute(task);
    }
}

 

 

          nio

      通俗來將,其實就是,我服務端有個監聽8080端口的serverscoket,我吧這個它和一個serverscoketchannel關聯到一塊兒,之後給她的信息也好,他要輸出的信息也好,都去操做這個channel,而後,我把這個channel註冊到一個selector上,而且選擇一個事件,(on_accpet),這樣相似的也能夠在這個selector上註冊其餘的一些channel並選擇好對應的事件類型,而後,這個selector它本身會去無限輪詢全部的channel,而後看看每個channel是否已經達到了那個事件的狀態,好比,個人那個serverscoket,若是有一個client連接過來了,那麼這個channel就已是on_accpet狀態了,當selector輪詢到這個channel的時候,就要開始處理我這個channel了,針對這種serverscoket的處理就是,先經過selector拿到當狀態的selectkey,從selectkey中拿到當狀態的channel,而後從這個serverscoketchannel中拿到client端對應的scoketchannel(普通的io是話是拿到一個scoket),拿到這個scoketchannel後,立馬把這個channel也註冊到selector裏面,設置對應事件類型爲on_read意思就是,client若是傳輸數據過來的狀態,而後selector記錄作輪詢,又一個client連接過來,就又有一個scoketchannel出來,病註冊到selector裏,輪詢的時候,若是發現有on_read事件了,那麼就能夠拿到這個scoketchannel,而後read裏面的信息,並作處理,並write對應是信息,若是處理完了,就把對應的該關的都關掉.   這樣一來不論client有多少個,server這邊均可以處理的來,由於server這邊其實就是一個selector線程在作輪詢處理而已,  若是還須要優化,那就是在這個的基礎上,吧selector也作成線程池模型.  總體理解下來就是這樣.   這樣就確定不會有client連接不過來的狀況. 下面看書本上咋說

 

     

 


 

 
 
 
下面是實例代碼 能夠研究研究
public class TimeServer {
 
    public static void main(String[] args) throws IOException {
    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 採用默認值
        }
    }
    MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
    new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}
 
 
public class MultiplexerTimeServer implements Runnable {
    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
        // 多路複用器關閉後,全部註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,因此不須要重複釋放資源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 處理新接入的請求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                            + body);
                    String currentTime = "QUERY TIME ORDER"
                            .equalsIgnoreCase(body) ? new java.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                } else
                    ; // 讀到0字節,忽略
            }
        }
    }
    private void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
    public void stop() {
        this.stop = true;
    }
}
 

 

 

          aio     

     java nio2.0引入了異步通道的概念提供了異步文件通道和異步套接字通道的實現,上面說的那個nio實際上是同步非阻塞io,aio纔是真正的異步非阻塞io,尚未研究........
相關文章
相關標籤/搜索