Java之IO,BIO,NIO,AIO

參考文獻一

IO基礎知識回顧

java的核心庫java.io提供了全面的IO接口。包括:文件讀寫、標準設備輸出等。Java中IO是以流爲基礎進行輸入輸出的,全部數據被串行化寫入輸出流,或者從輸入流讀入。java

java.nio(java non-blocking IO),nio 是non-blocking的簡稱,是jdk1.4 及以上版本里提供的新api(New IO) ,爲全部的原始類型(boolean類型除外)提供緩存支持。Sun 官方標榜的特性以下: 爲全部的原始類型提供(Buffer)緩存支持。字符集編碼解碼解決方案。 Channel :一個新的原始I/O 抽象。 支持鎖和內存映射文件的文件訪問接口。 提供多路(non-bloking) 非阻塞式的高伸縮性網絡I/O 。react

IO流類圖結構express

這裏寫圖片描述

IO流簡單例子編程

實例一:後端

FileInputStream fis=null;
FileOutputStream fos=null;
try {
    fis = new FileInputStream(new File("D:\\a.txt"));
    fos = new FileOutputStream(new File("D:\\y.txt"));
    int ch;
    while((ch=fis.read()) != -1){
        System.out.println((char)ch);
        fos.write(ch);
    }
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}finally {
    if(null != fos){
        fos.close();
    }
    if(null != fis){
        fis.close();
    }
}

這裏寫圖片描述

實例二:字節流轉換成字符流api

public static void main(String[] args) throws Exception{
        BufferedReader br = null;
        BufferedWriter bw = null;
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream("D:\\a.txt")));
            bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("D:\\y.txt")));
            String s;
            StringBuilder sb = new StringBuilder();
            while((s=br.readLine())!=null){
                System.out.println(s);
                bw.write(s);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(null != bw){
                bw.close();
            }
            if(null != br){
                br.close();
            }
        }
    }

實例三:用轉換流從控制檯上讀入數據數組

public static void main(String[] args) throws Exception{
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader(System.in));
            String s=br.readLine();
            System.out.println(s);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(null != br){
                br.close();
            }
        }
    }

BIO編程

傳統BIO通訊模型圖緩存

​ 傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啓動監聽端口;Socket負責發起鏈接操做。鏈接成功後,雙方經過輸入和輸出流進行同步阻塞式通訊。 服務端提供IP和監聽端口,客戶端經過鏈接操做想服務端監聽的地址發起鏈接請求,經過三次握手鍊接,若是鏈接成功創建,雙方就能夠經過套接字進行通訊。服務器

​ 簡單的描述一下BIO的服務端通訊模型:採用BIO通訊模型的服務端,一般由一個獨立的Acceptor線程負責監聽客戶端的鏈接,它接收到客戶端鏈接請求以後爲每一個客戶端建立一個新的線程進行鏈路處理沒處理完成後,經過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通宵模型。網絡

​ 傳統BIO通訊模型圖:

​ 01

​ 該模型最大的問題就是缺少彈性伸縮能力,當客戶端併發訪問量增長後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇降低,隨着訪問量的繼續增大,系統最終就死-掉-了

傳統BIO編程實例

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啓動監聽端口;Socket負責發起鏈接操做。鏈接成功後,雙方經過輸入和輸出流進行同步阻塞式通訊。

package com.evada.de;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

/**
 * 描述:傳統BIO編程實例
 * @author Ay
 * @date 2017/6/27
 */
public final class AyTest extends BaseTest {

    public static void main(String[] args) throws InterruptedException {
        //啓動線程,運行服務器
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    ServerBetter.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //避免客戶端先於服務器啓動前執行代碼
        Thread.sleep(100);

        //啓動線程,運行客戶端
        char operators[] = {'+', '-', '*', '/'};
        Random random = new Random(System.currentTimeMillis());
        new Thread(new Runnable() {
            @SuppressWarnings("static-access")
            @Override
            public void run() {
                while (true) {
                    //隨機產生算術表達式
                    String expression = random.nextInt(10) + "" + operators[random.nextInt(4)] + (random.nextInt(10) + 1);
                    Client.send(expression);
                    try {
                        Thread.currentThread().sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

}

class ServerBetter{

    //默認的端口號
    private static int DEFAULT_PORT = 12345;
    //單例的ServerSocket
    private static ServerSocket server;

    //根據傳入參數設置監聽端口,若是沒有參數調用如下方法並使用默認值
    public static void start() throws IOException {
        //使用默認值端口
        start(DEFAULT_PORT);
    }
    //這個方法不會被大量併發訪問,不太須要考慮效率,直接進行方法同步就好了
    public synchronized static void start(int port) throws IOException{
        if(server != null) return;
        try{
            //經過構造函數建立ServerSocket,若是端口合法且空閒,服務端就監聽成功
            server = new ServerSocket(port);
            System.out.println("服務器已啓動,端口號:" + port);
            //經過無線循環監聽客戶端鏈接,若是沒有客戶端接入,將阻塞在accept操做上。
            while(true){
                Socket socket = server.accept();
                //當有新的客戶端接入時,會執行下面的代碼
                //而後建立一個新的線程處理這條Socket鏈路
                new Thread(new ServerHandler(socket)).start();
            }
        }finally{
            //一些必要的清理工做
            if(server != null){
                System.out.println("服務器已關閉。");
                server.close();
                server = null;
            }
        }
    }

}


class ServerHandler implements Runnable{
    private Socket socket;
    public ServerHandler(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            String expression;
            String result;
            while(true){
                //經過BufferedReader讀取一行
                //若是已經讀到輸入流尾部,返回null,退出循環
                //若是獲得非空值,就嘗試計算結果並返回
                if((expression = in.readLine())==null) break;
                System.out.println("服務器收到消息:" + expression);
                try{
                    result = "123";//Calculator.cal(expression).toString();
                }catch(Exception e){
                    result = "計算錯誤:" + e.getMessage();
                }
                out.println(result);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //一些必要的清理工做
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                in = null;
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

class Client {
    //默認的端口號
    private static int DEFAULT_SERVER_PORT = 12345;
    //默認服務器Ip
    private static String DEFAULT_SERVER_IP = "127.0.0.1";

    public static void send(String expression){
        send(DEFAULT_SERVER_PORT,expression);
    }
    public static void send(int port,String expression){
        System.out.println("算術表達式爲:" + expression);
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            socket = new Socket(DEFAULT_SERVER_IP,port);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println(expression);
            System.out.println("___結果爲:" + in.readLine());
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //一下必要的清理工做
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                in = null;
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

僞異步I/O編程

咱們可使用線程池來管理這些線程(須要瞭解更多請參考前面提供的文章),實現1個或多個線程處理N個客戶端的模型(可是底層仍是使用的同步阻塞I/O),一般被稱爲「僞異步I/O模型「

僞異步I/O編程模型圖

這裏寫圖片描述

測試運行結果是同樣的。

​ 咱們知道,若是使用CachedThreadPool線程池(不限制線程數量,若是不清楚請參考文首提供的文章),其實除了能自動幫咱們管理線程(複用),看起來也就像是1:1的客戶端:線程數模型,而使用FixedThreadPool咱們就有效的控制了線程的最大數量,保證了系統有限的資源的控制,實現了N:M的僞異步I/O模型。

​ 可是,正由於限制了線程數量,若是發生大量併發請求,超過最大數量的線程就只能等待,直到線程池中的有空閒的線程能夠被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

  • ​ 有數據可讀
  • ​ 可用數據以及讀取完畢
  • ​ 發生空指針或I/O異常

​ 因此在讀取數據較慢時(好比數據量大、網絡傳輸慢等),大量併發的狀況下,其餘接入的消息,只能一直等待,這就是最大的弊端。

​ 然後面即將介紹的NIO,就能解決這個難題。

僞異步IO編程代碼

package com.anxpp.io.calculator.bio;  
import java.io.IOException;  
import java.net.ServerSocket;  
import java.net.Socket;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
/** 
 * BIO服務端源碼__僞異步I/O 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */  
public final class ServerBetter {  
    //默認的端口號  
    private static int DEFAULT_PORT = 12345;  
    //單例的ServerSocket  
    private static ServerSocket server;  
    //線程池 懶漢式的單例  
    private static ExecutorService executorService = Executors.newFixedThreadPool(60);  
    //根據傳入參數設置監聽端口,若是沒有參數調用如下方法並使用默認值  
    public static void start() throws IOException{  
        //使用默認值  
        start(DEFAULT_PORT);  
    }  
    //這個方法不會被大量併發訪問,不太須要考慮效率,直接進行方法同步就好了  
    public synchronized static void start(int port) throws IOException{  
        if(server != null) return;  
        try{  
            //經過構造函數建立ServerSocket  
            //若是端口合法且空閒,服務端就監聽成功  
            server = new ServerSocket(port);  
            System.out.println("服務器已啓動,端口號:" + port);  
            //經過無線循環監聽客戶端鏈接  
            //若是沒有客戶端接入,將阻塞在accept操做上。  
            while(true){  
                Socket socket = server.accept();  
                //當有新的客戶端接入時,會執行下面的代碼  
                //而後建立一個新的線程處理這條Socket鏈路  
                executorService.execute(new ServerHandler(socket));  
            }  
        }finally{  
            //一些必要的清理工做  
            if(server != null){  
                System.out.println("服務器已關閉。");  
                server.close();  
                server = null;  
            }  
        }  
    }  
}
  • 1,同步和異步是針對應用程序和內核的交互而言的。
  • 2,阻塞和非阻塞是針對於進程在訪問數據的時候,根據IO操做的就緒狀態來採起的不一樣方式,說白了是一種讀取或者寫入操做函數的實現方式,阻塞方式下讀取或者寫入函數將一直等待,而非阻塞方式下,讀取或者寫入函數會當即返回一個狀態值。

由上描述基本能夠總結一句簡短的話,同步和異步是目的,阻塞和非阻塞是實現方式

同步阻塞: 
在此種方式下,用戶進程在發起一個IO操做之後,必須等待IO操做的完成,只有當真正完成了IO操做之後,用戶進程才能運行。JAVA傳統的IO模型屬於此種方式。

同步非阻塞: 
在此種方式下,用戶進程發起一個IO操做之後邊可返回作其它事情,可是用戶進程須要時不時的詢問IO操做是否就緒,這就要求用戶進程不停的去詢問,從而引入沒必要要的CPU資源浪費。其中目前JAVA的NIO就屬於同步非阻塞IO。 
異步: 
此種方式下是指應用發起一個IO操做之後,不等待內核IO操做的完成,等內核完成IO操做之後會通知應用程序。

若是你想吃一份宮保雞丁蓋飯:

同步阻塞:你到飯館點餐,而後在那等着,還要一邊喊:好了沒啊!

同步非阻塞:在飯館點完餐,就去遛狗了。不過溜一下子,就回飯館喊一聲:好了沒啊!

異步阻塞:遛狗的時候,接到飯館電話,說飯作好了,讓您親自去拿。

異步非阻塞:飯館打電話說,咱們知道您的位置,一會給你送過來,安心遛狗就能夠了。

NIO 編程

簡介

Java NIO(New IO)是一個能夠替代標準Java IO API(從Java 1.4開始),Java NIO提供了與標準IO不一樣的IO工做方式。

Java NIO 由如下幾個核心部分組成:

  • Channels
  • Buffers
  • Selectors

雖然Java NIO 中除此以外還有不少類和組件,但Channel,Buffer 和 Selector 構成了核心的API。其它組件,如Pipe和FileLock,只不過是與三個核心組件共同使用的工具類。所以,我將集中精力在這三個組件上。其它組件會在單獨的章節中講到。

注意(每一個線程的處理流程大概都是讀取數據、解碼、計算處理、編碼、發送響應)

很是形象的實例

小量的線程如何同時爲大量鏈接服務呢,答案就是就緒選擇。這就比如到餐廳吃飯,每來一桌客人,都有一個服務員專門爲你服務,從你到餐廳到結賬走人,這樣方式的好處是服務質量好,一對一的服務,VIP啊,但是缺點也很明顯,成本高,若是餐廳生意好,同時來100桌客人,就須要100個服務員,那老闆發工資的時候得心痛死了,這就是傳統的一個鏈接一個線程的方式。

老闆是什麼人啊,精着呢。這老闆就得捉摸怎麼能用10個服務員同時爲100桌客人服務呢,老闆就發現,服務員在爲客人服務的過程當中並非一直都忙着,客人點完菜,上完菜,吃着的這段時間,服務員就閒下來了,但是這個服務員仍是被這桌客人佔用着,不能爲別的客人服務,用華爲領導的話說,就是工做不飽滿。那怎麼把這段閒着的時間利用起來呢。這餐廳老闆就想了一個辦法,讓一個服務員(前臺)專門負責收集客人的需求,登記下來,好比有客人進來了、客人點菜了,客人要結賬了,都先記錄下來按順序排好。每一個服務員到這裏領一個需求,好比點菜,就拿着菜單幫客人點菜去了。點好菜之後,服務員立刻回來,領取下一個需求,繼續爲別人客人服務去了。這種方式服務質量就不如一對一的服務了,當客人數據不少的時候可能須要等待。但好處也很明顯,因爲在客人正吃飯着的時候服務員不用閒着了,服務員這個時間內能夠爲其餘客人服務了,原來10個服務員最多同時爲10桌客人服務,如今可能爲50桌,10客人服務了。

這種服務方式跟傳統的區別有兩個:

一、增長了一個角色,要有一個專門負責收集客人需求的人。NIO裏對應的就是Selector。

二、由阻塞服務方式改成非阻塞服務了,客人吃着的時候服務員不用一直侯在客人旁邊了。傳統的IO操做,好比read(),當沒有數據可讀的時候,線程一直阻塞被佔用,直到數據到來。NIO中沒有數據可讀時,read()會當即返回0,線程不會阻塞。

NIO中,客戶端建立一個鏈接後,先要將鏈接註冊到Selector,至關於客人進入餐廳後,告訴前臺你要用餐,前臺會告訴你你的桌號是幾號,而後你就可能到那張桌子坐下了,SelectionKey就是桌號。當某一桌須要服務時,前臺就記錄哪一桌須要什麼服務,好比1號桌要點菜,2號桌要結賬,服務員從前臺取一條記錄,根據記錄提供服務,完了再來取下一條。這樣服務的時間就被最有效的利用起來了。

工做原理

這裏寫圖片描述

Java NIO和IO的主要區別

IO NIO
Stream oriented Buffer oriented
Blocking IO Non blocking IO
  Selectors

面向流與面向緩衝

Java NIO和IO之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的

Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取全部字節,它們沒有被緩存在任何地方。此外,它不能先後移動流中的數據。若是須要先後移動從流中讀取的數據,須要先將它緩存到一個緩衝區。 Java NIO的緩衝導向方法略有不一樣。數據讀取到一個它稍後處理的緩衝區,須要時可在緩衝區中先後移動。這就增長了處理過程當中的靈活性。可是,還須要檢查是否該緩衝區中包含全部您須要處理的數據。並且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區裏還沒有處理的數據。

阻塞與非阻塞IO

Java IO的各類流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據徹底寫入。該線程在此期間不能再幹任何事情了。 Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,可是它僅能獲得目前可用的數據,若是目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,因此直至數據變的能夠讀取以前,該線程能夠繼續作其餘的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不須要等待它徹底寫入,這個線程同時能夠去作別的事情。 線程一般將非阻塞IO的空閒時間用於在其它通道上執行IO操做,因此一個單獨的線程如今能夠管理多個輸入和輸出通道(channel)。

NIO和IO如何影響應用程序的設計

不管您選擇IO或NIO工具箱,可能會影響您應用程序設計的如下幾個方面:

  • 對NIO或IO類的API調用。
  • 數據處理。
  • 用來處理數據的線程數。

通道 Channel

簡介

Channel 是對數據的源頭和數據目標點流經途徑的抽象,在這個意義上和 InputStream 和 OutputStream 相似。Channel能夠譯爲「通道、管 道」,而傳輸中的數據彷彿就像是在其中流淌的水。前面也提到了Buffer,Buffer和Channel相互配合使用,纔是Java的NIO。

Java NIO的通道與流區別

  • 既能夠從通道中讀取數據,又能夠寫數據到通道。但流的讀寫一般是單向的。

  • 通道能夠異步地讀寫。

  • 通道中的數據老是要先讀到一個Buffer,或者老是要從一個Buffer中寫入。

咱們對數據的讀取和寫入要經過Channel,它就像水管同樣,是一個通道。通道不一樣於流的地方就是通道是雙向的,能夠用於讀、寫和同時讀寫操做。 數據能夠從Channel讀到Buffer中,也能夠從Buffer 寫到Channel中。

這裏寫圖片描述

注意:通道必須結合Buffer使用,不能直接向通道中讀/寫數據

Channel主要分類

廣義上來講通道能夠被分爲兩類:File I/O和Stream I/O,也就是文件通道和套接字通道。若是分的更細緻一點則是:

  • FileChannel 從文件讀寫數據
  • SocketChannel 經過TCP讀寫網絡數據
  • ServerSocketChannel 能夠監聽新進來的TCP鏈接,並對每一個連接建立對應的SocketChannel
  • DatagramChannel 經過UDP讀寫網絡中的數據
  • Pipe

Channel的實現

這些是Java NIO中最重要的通道的實現:

  • FileChannel:從文件中讀寫數據。
  • DatagramChannel:能經過UDP讀寫網絡中的數據。
  • SocketChannel:能經過TCP讀寫網絡中的數據。
  • ServerSocketChannel:能夠監聽新進來的TCP鏈接,像Web服務器那樣。對每個新進來的鏈接都會建立一個SocketChannel。

打開FileChannel

在使用FileChannel以前,必須先打開它。可是,咱們沒法直接打開一個FileChannel,須要經過使用一個InputStream、OutputStream或RandomAccessFile來獲取一個FileChannel實例。下面是經過RandomAccessFile打開FileChannel的示例:

RandomAccessFile aFile = new RandomAccessFile("d://nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

從FileChannel讀取數據

調用多個read()方法之一從FileChannel中讀取數據。如:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

首先,分配一個Buffer。從FileChannel中讀取的數據將被讀到Buffer中。

而後,調用FileChannel.read()方法。該方法將數據從FileChannel讀取到Buffer中。read()方法返回的int值表示了有多少字節被讀到了Buffer中。若是返回-1,表示到了文件末尾。

向FileChannel寫數據

使用FileChannel.write()方法向FileChannel寫數據,該方法的參數是一個Buffer。如:

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

注意FileChannel.write()是在while循環中調用的。由於沒法保證write()方法一次能向FileChannel寫入多少字節,所以須要重複調用write()方法,直到Buffer中已經沒有還沒有寫入通道的字節。

關閉FileChannel

用完FileChannel後必須將其關閉。如:

channel.close();

FileChannel的position方法

有時可能須要在FileChannel的某個特定位置進行數據的讀/寫操做。能夠經過調用position()方法獲取FileChannel的當前位置。

也能夠經過調用position(long pos)方法設置FileChannel的當前位置。

這裏有兩個例子:

long pos = channel.position();
channel.position(pos +123);

若是將位置設置在文件結束符以後,而後試圖從文件通道中讀取數據,讀方法將返回-1 —— 文件結束標誌。

若是將位置設置在文件結束符以後,而後向通道中寫數據,文件將撐大到當前位置並寫入數據。這可能致使「文件空洞」,磁盤上物理文件中寫入的數據間有空隙。

FileChannel的size方法

FileChannel實例的size()方法將返回該實例所關聯文件的大小。如:

long fileSize = channel.size();

FileChannel的truncate方法

可使用FileChannel.truncate()方法截取一個文件。截取文件時,文件將中指定長度後面的部分將被刪除。如:

channel.truncate(1024);

這個例子截取文件的前1024個字節。

FileChannel的force方法

FileChannel.force()方法將通道里還沒有寫入磁盤的數據強制寫到磁盤上。出於性能方面的考慮,操做系統會將數據緩存在內存中,因此沒法保證寫入到FileChannel裏的數據必定會即時寫到磁盤上。要保證這一點,須要調用force()方法。

force()方法有一個boolean類型的參數,指明是否同時將文件元數據(權限信息等)寫到磁盤上。

下面的例子同時將文件數據和元數據強制寫到磁盤上:

channel.force(true);

transferFrom()

FileChannel沒法設置爲非阻塞模式,它老是運行在阻塞模式下。

FileChannel的transferFrom()方法能夠將數據從源通道傳輸到FileChannel中(譯者注:這個方法在JDK文檔中的解釋爲將字節從給定的可讀取字節通道傳輸到此通道的文件中)。下面是一個簡單的例子:

//在使用FileChannel以前,必須先打開它。可是,咱們沒法直接打開一個FileChannel,
//須要經過使用一個InputStream、OutputStream或RandomAccessFile來獲取一個FileChannel實例。
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");  
FileChannel      fromChannel = fromFile.getChannel();  
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");  
FileChannel      toChannel = toFile.getChannel();   
long position = 0;  
long count = fromChannel.size();  
toChannel.transferFrom(position, count, fromChannel);

transferFrom 方法的輸入參數 position 表示從 position 處開始向目標文件寫入數據,count 表示最多傳輸的字節數。若是源通道的剩餘空間小於 count 個字節,則所傳輸的字節數要小於請求的字節數。

此外要注意,在 SoketChannel 的實現中,SocketChannel 只會傳輸此刻準備好的數據(可能不足count字節)。所以,SocketChannel 可能不會將請求的全部數據(count個字節)所有傳輸到 FileChannel 中。

transferTo()

transferTo()方法將數據從FileChannel傳輸到其餘的channel中。下面是一個簡單的例子:

RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");  
FileChannel      fromChannel = fromFile.getChannel();    
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");  
FileChannel      toChannel = toFile.getChannel();  
long position = 0;  
long count = fromChannel.size();  
fromChannel.transferTo(position, count, toChannel);

是否是發現這個例子和前面那個例子特別類似?除了調用方法的FileChannel對象不同外,其餘的都同樣。

上面所說的關於SocketChannel的問題在transferTo()方法中一樣存在。SocketChannel會一直傳輸數據直到目標buffer被填滿。

Channel簡單實例

下面是Channel的一個簡單的實例:

程序清單 1-1
RandomAccessFile aFile = new RandomAccessFile("d:\\ay.txt", "rw");
FileChannel fileChannel = aFile.getChannel();
//分配緩存區大小
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = fileChannel.read(buf);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    //buf.flip()的調用,首先讀取數據到Buffer,而後反轉Buffer,接着再從Buffer中讀取數據(注:flip:空翻,反轉)
    buf.flip();
    //判斷是否有剩餘(注:Remaining:剩餘的)
    while(buf.hasRemaining()){
        System.out.print((char) buf.get());
    }
    buf.clear();
    bytesRead = fileChannel.read(buf);
}
aFile.close();

緩衝區 Buffer

緩衝區本質上是一塊能夠寫入數據,而後能夠從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該塊內存。

Buffer的基本用法

使用Buffer讀寫數據通常遵循如下四個步驟:

  • 寫入數據到Buffer
  • 調用flip()方法
  • 從Buffer中讀取數據
  • 調用clear()方法或者compact()方法

當向buffer寫入數據時,buffer會記錄下寫了多少數據。一旦要讀取數據,須要經過 flip() 方法將 Buffer 從寫模式切換到讀模式。在讀模式下,能夠讀取以前寫入到buffer的全部數據。

一旦讀完了全部的數據,就須要清空緩衝區,讓它能夠再次被寫入。有兩種方式能清空緩衝區:調用 clear() 或 compact() 方法。clear() 方法會清空整個緩衝區。compact() 方法只會清除已經讀過的數據。任何未讀的數據都被移到緩衝區的起始處,新寫入的數據將放到緩衝區未讀數據的後面。

程序清單 1-1
RandomAccessFile aFile = new RandomAccessFile("d:\\ay.txt", "rw");
FileChannel fileChannel = aFile.getChannel();
//分配緩存區大小
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = fileChannel.read(buf);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    //buf.flip()的調用,首先讀取數據到Buffer,而後反轉Buffer,接着再從Buffer中讀取數據(注:flip:空翻,反轉)
    buf.flip();
    //判斷是否有剩餘(注:Remaining:剩餘的)
    while(buf.hasRemaining()){
        System.out.print((char) buf.get());
    }
    buf.clear();
    bytesRead = fileChannel.read(buf);
}
aFile.close();

Buffer的三個屬性

爲了理解Buffer的工做原理,須要熟悉它的三個屬性:

  • capacity:做爲一個內存塊,Buffer 有一個固定的大小值,也叫 「capacity」. 你只能往裏寫 capacity 個 byte、long,char 等類型。一旦 Buffer 滿了,須要將其清空(經過讀數據或者清除數據)才能繼續寫數據往裏寫數據。
  • position:當你寫數據到Buffer中時,position表示當前的位置。初始的position值爲0.當一個byte、long等數據寫到Buffer後, position會向前移動到下一個可插入數據的 Buffer 單元。position 最大可爲 capacity – 1。 當讀取數據時,也是從某個特定位置讀。當將 Buffer 從寫模式切換到讀模式,position會被重置爲 0。當從Buffer的 position 處讀取數據時,position 向前移動到下一個可讀的位置。
  • limit:在寫模式下,Buffer的limit表示你最多能往 Buffer 裏寫多少數據。 寫模式下,limit 等於 Buffer 的 capacity 。 當切換Buffer到讀模式時, limit 表示你最多能讀到多少數據。所以,當切換Buffer到讀模式時,limit 會被設置成寫模式下的 position 值。換句話說,你能讀到以前寫入的全部數據(limit被設置成已寫數據的數量,這個值在寫模式下就是 position )。

這裏寫圖片描述

Buffer的類型

Java NIO 有如下Buffer類型:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer的分配

要想得到一個Buffer對象首先要進行分配。 每個Buffer類都有一個allocate方法。下面是一個分配48字節capacity的ByteBuffer的例子。

ByteBuffer buf = ByteBuffer.allocate(48);

這是分配一個可存儲1024個字符的CharBuffer:

CharBuffer buf = CharBuffer.allocate(1024);

Buffer寫數據

寫數據到Buffer有兩種方式:

  • 從Channel寫到Buffer。
  • 經過Buffer的put()方法寫到Buffer裏。

從Channel寫到Buffer,例如:

int bytesRead = inChannel.read(buf); //read into buffer

經過put方法寫Buffer的例子:

buf.put(127);

put方法有不少版本,容許你以不一樣的方式把數據寫入到Buffer中。例如, 寫到一個指定的位置,或者把一個字節數組寫入到Buffer。 更多Buffer實現的細節參考JavaDoc。

flip()方法

flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成以前position的值。

換句話說,position如今用於標記讀的位置,limit表示以前寫進了多少個byte、char等 —— 如今能讀取多少個byte、char等。

Buffer中讀取數據

從Buffer中讀取數據有兩種方式:

  • 從Buffer讀取數據到Channel。
  • 使用get()方法從Buffer中讀取數據。

從Buffer讀取數據到Channel的例子:

//read from buffer into channel.  
int bytesWritten = inChannel.write(buf);

使用get()方法從Buffer中讀取數據的例子 :

byte aByte = buf.get();

get方法有不少版本,容許你以不一樣的方式從Buffer中讀取數據。例如,從指定position讀取,或者從Buffer中讀取數據到字節數組。

rewind()方法

Buffer.rewind()將 position 設回0,因此你能夠重讀Buffer中的全部數據。limit 保持不變,仍然表示能從Buffer中讀取多少個元素(byte、char等)

clear()與compact()方法

一旦讀完Buffer中的數據,須要讓Buffer準備好再次被寫入。能夠經過clear()或compact()方法來完成。

若是調用的是 clear() 方法,position將被設回 0,limit被設置成 capacity 的值。換句話說,Buffer 被清空了。

若是Buffer中有一些未讀的數據,調用clear()方法,數據將「被遺忘」,意味着再也不有任何標記會告訴你哪些數據被讀過,哪些尚未。

若是Buffer中仍有未讀的數據,且後續還須要這些數據,可是此時想要先先寫些數據,那麼使用compact()方法。

compact()方法將全部未讀的數據拷貝到Buffer起始處。而後將position設到最後一個未讀元素正後面。limit 屬性依然像 clear() 方法同樣,設置成 capacity。如今Buffer準備好寫數據了,可是不會覆蓋未讀的數據。

mark()與reset()方法

經過調用Buffer.mark()方法,能夠標記Buffer中的一個特定position。以後能夠經過調用Buffer.reset()方法恢復到這個position。例如:

buffer.mark();  
//set position back to mark.  
buffer.reset();  
equals()與compareTo()方法

可使用equals()和compareTo()方法兩個Buffer。

equals()

當知足下列條件時,表示兩個Buffer相等:

  • 有相同的類型(byte、char、int等)。
  • Buffer中剩餘的 byte、char 等的個數相等。
  • Buffer中全部剩餘的byte、char等都相同。

如你所見,equals只是比較Buffer的一部分,不是每個在它裏面的元素都比較。實際上,它只比較Buffer中的剩餘元素。

compareTo()方法

compareTo()方法比較兩個Buffer的剩餘元素(byte、char等), 若是知足下列條件,則認爲一個Buffer 「小於」 另外一個Buffer:

  • 第一個不相等的元素小於另外一個Buffer中對應的元素。
  • 全部元素都相等,但第一個Buffer比另外一個先耗盡(第一個Buffer的元素個數比另外一個少)。

選擇器( Selector)

簡單介紹

Java NIO引入了選擇器的概念,選擇器用於監聽多個通道的事件(好比:鏈接打開,數據到達)。Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,若是某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。

一個Selector能夠同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,因此沒有最大鏈接句柄1024/2048的限制。因此,只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。

這裏寫圖片描述

要使用Selector,得向 Selector 註冊 Channel ,而後調用它的 select() 方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子好比新鏈接進來,數據接收等。

Selector的建立

經過調用Selector.open()方法建立一個Selector,以下:

Selector selector = Selector.open();

Selector註冊通道

爲了將 Channel 和 Selector 配合使用,必須將 channel 註冊到 selector 上。經過 SelectableChannel.register() 方法來實現,以下:

channel.configureBlocking(false);  
SelectionKey key = channel.register(selector,  Selectionkey.OP_READ);

與 Selector 一塊兒使用時,Channel 必須處於非阻塞模式下。這意味着不能將 FileChannel 與 Selector 一塊兒使用,由於 FileChannel 不能切換到非阻塞模式。而套接字通道均可以。

注意register()方法的第二個參數。這是一個「interest集合」,意思是在經過Selector監聽Channel時對什麼事件感興趣。能夠監聽四種不一樣類型的事件:

  • Connect
  • Accept
  • Read
  • Write

通道觸發了一個事件意思是該事件已經就緒。因此,某個channel成功鏈接到另外一個服務器稱爲「鏈接就緒」。一個 server socket channel 準備好接收新進入的鏈接稱爲「接收就緒」。一個有數據可讀的通道能夠說是「讀就緒」。等待寫數據的通道能夠說是「寫就緒」。

這四種事件用 SelectionKey 的四個常量來表示:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

若是你對不止一種事件感興趣,那麼能夠用 「 位 或 」 操做符將常量鏈接起來,以下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey

在上一小節中,當向Selector註冊Channel時,register() 方法會返回一個SelectionKey對象。這個對象包含了一些你感興趣的屬性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的對象(可選)

下面我會描述這些屬性。

interest集合

就像向Selector註冊通道一節中所描述的,interest集合是你所選擇的感興趣的事件集合。能夠經過 SelectionKey 讀寫 interest 集合,像這樣:

int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

能夠看到,用「位與」操做interest 集合和給定的 SelectionKey 常量,能夠肯定某個肯定的事件是否在 interest 集合中

ready集合

ready 集合是通道已經準備就緒的操做的集合。在一次選擇(Selection)以後,你會首先訪問這個readySet。Selection將在下一小節進行解釋。能夠這樣訪問ready集合:

int readySet = selectionKey.readyOps();

能夠用像檢測 interest 集合那樣的方法,來檢測channel中什麼事件或操做已經就緒。可是,也可使用如下四個方法,它們都會返回一個布爾類型:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

從SelectionKey訪問Channel和Selector很簡單。以下:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

附加的對象

能夠將一個對象或者更多信息附着到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,能夠附加 與通道一塊兒使用的Buffer,或是包含彙集數據的某個對象。使用方法以下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

還能夠在用register()方法向Selector註冊Channel的時候附加對象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

經過Selector選擇通道

一旦向Selector註冊了一或多個通道,就能夠調用幾個重載的select()方法。這些方法返回你所感興趣的事件(如鏈接、接受、讀或寫)已經準備就緒的那些通道。換句話說,若是你對「讀就緒」的通道感興趣,select()方法會返回讀事件已經就緒的那些通道。

下面是select()方法:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()阻塞到至少有一個通道在你註冊的事件上就緒了。

select(long timeout) 和 select() 同樣,除了最長會阻塞 timeout 毫秒(參數)。

selectNow() 不會阻塞,無論什麼通道就緒都馬上返回(譯者注:此方法執行非阻塞的選擇操做。若是自從前一次選擇操做後,沒有通道變成可選擇的,則此方法直接返回零。)。

select()方法返回的int值表示有多少通道已經就緒。亦即,自上次調用select()方法後有多少通道變成就緒狀態。若是調用select()方法,由於有一個通道變成就緒狀態,返回了1,若再次調用select()方法,若是另外一個通道就緒了,它會再次返回1。若是對第一個就緒的channel沒有作任何操做,如今就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道就緒了。

selectedKeys()

一旦調用了select()方法,而且返回值代表有一個或更多個通道就緒了,而後能夠經過調用selector的selectedKeys()方法,訪問「已選擇鍵集(selected key set)」中的就緒通道。以下所示:

Set selectedKeys = selector.selectedKeys();

當向 Selector 註冊 Channel 時,Channel.register() 方法會返回一個 SelectionKey 對象。這個對象表明了註冊到該Selector的通道。能夠經過SelectionKey的selectedKeySet()方法訪問這些對象。

能夠遍歷這個已選擇的鍵集合來訪問就緒的通道。以下:

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

這個循環遍歷已選擇鍵集中的每一個鍵,並檢測各個鍵所對應的通道的就緒事件。

注意每次迭代末尾的 keyIterator.remove() 調用。Selector不會本身從已選擇鍵集中移除 SelectionKey 實例。必須在處理完通道時本身移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。

SelectionKey.channel() 方法返回的通道須要轉型成你要處理的類型,如 ServerSocketChannel 或 SocketChannel 等。

wakeUp()

某個線程調用select()方法後阻塞了,即便沒有通道已經就緒,也有辦法讓其從select()方法返回。只要讓其它線程在第一個線程調用select()方法的那個對象上調用Selector.wakeup()方法便可。阻塞在select()方法上的線程會立馬返回。

若是有其它線程調用了wakeup()方法,但當前沒有線程阻塞在select()方法上,下個調用select()方法的線程會當即「醒來(wake up)」。

close()

用完 Selector 後調用其 close() 方法會關閉該 Selector,且使註冊到該Selector上的全部SelectionKey實例無效。通道自己並不會關閉。

完整的示例

這裏有一個完整的示例,打開一個Selector,註冊一個通道註冊到這個Selector上(通道的初始化過程略去),而後持續監控這個Selector的四種事件(接受,鏈接,讀,寫)是否就緒。

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set selectedKeys = selector.selectedKeys();
  Iterator keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
  SelectionKey key = keyIterator.next();
  if(key.isAcceptable()) {
    // a connection was accepted by a ServerSocketChannel.
  } else if (key.isConnectable()) {
    // a connection was established with a remote server.
  } else if (key.isReadable()) {
    // a channel is ready for reading
  } else if (key.isWritable()) {
    // a channel is ready for writing
  }
    keyIterator.remove();
  }
}

分散(Scatter)/彙集(Gather)

分散概念

分散(scatter):從Channel中讀取是指在讀操做時將讀取的數據寫入多個buffer中。所以,Channel將從Channel中讀取的數據「分散(scatter)」到多個Buffer中。

這裏寫圖片描述

程序清單 1-1
ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
ByteBuffer[] bufferArray = { header, body };  
channel.read(bufferArray);

注意buffer首先被插入到數組,而後再將數組做爲 channel.read() 的輸入參數。read() 方法按照 buffer 在數組中的順序將從 channel 中讀取的數據寫入到buffer,當一個 buffer 被寫滿後,channel 緊接着向另外一個 buffer 中寫。

Scattering Reads在移動下一個buffer前,必須填滿當前的buffer,這也意味着它不適用於動態消息(譯者注:消息大小不固定)。換句話說,若是存在消息頭和消息體,消息頭必須完成填充(例如 128byte),Scattering Reads才能正常工做。

彙集概念

這裏寫圖片描述

彙集(gather):寫入Channel是指在寫操做時將多個buffer的數據寫入同一個Channel,所以,Channel 將多個Buffer中的數據「彙集(gather)」後發送到Channel。

示例1-1
ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
//write data into buffers  
ByteBuffer[] bufferArray = { header, body };  
channel.write(bufferArray);

buffer的一個數組被傳遞給了 write() 方法,這個方法寫他們在數組中遇到的接下來的 buffer 的內容。只是這些數據在 buffer 的 position 和 limit 直接被寫。所以,若是一個buffer有一個128字節的容量,可是隻包含了58個字節,只有58個字節能夠從 buffer 中寫到 channel 。所以,一個彙集寫操做經過動態可變大小的消息部分會工做的很好,跟分散讀取正好相反。

分散/彙集的應用

scatter / gather常常用於須要將傳輸的數據分開處理的場合。例如,您可能在編寫一個使用消息對象的網絡應用程序,每個消息被劃分爲固定長度的頭部和固定長度的正文。您能夠建立一個恰好能夠容納頭部的緩衝區和另外一個恰好能夠容納正文的緩衝區。當您將它們放入一個數組中並使用分散讀取來向它們讀入消息時,頭部和正文將整齊地劃分到這兩個緩衝區中。

咱們從緩衝區所獲得的方便性對於緩衝區數組一樣有效。由於每個緩衝區都跟蹤本身還能夠接受多少數據,因此分散讀取會自動找到有空間接受數據的第一個緩衝區。在這個緩衝區填滿後,它就會移動到下一個緩衝區。

簡單小例子

RandomAccessFile raf1=new RandomAccessFile("d:\\ay.txt", "rw");
//獲取通道
FileChannel channel1 = raf1.getChannel();
//設置緩衝區
ByteBuffer buf1=ByteBuffer.allocate(50);
ByteBuffer buf2=ByteBuffer.allocate(1024);
//分散讀取的時候緩存區應該是有序的,因此把幾個緩衝區加入數組中
ByteBuffer[] bufs={buf1,buf2};
//通道進行傳輸
channel1.read(bufs);
//查看緩衝區中的內容
for (int i = 0; i < bufs.length; i++) {
   //切換爲讀模式
   bufs[i].flip();
}
System.out.println(new String(bufs[0].array(),0,bufs[0].limit()));
System.out.println();
System.out.println(new String(bufs[1].array(),0,bufs[1].limit()));
//彙集寫入
RandomAccessFile  raf2=new RandomAccessFile("d:\\al.txt", "rw");
FileChannel channel2 = raf2.getChannel();
//只能經過通道來進行寫入
channel2.write(bufs);

其餘通道

文件通道

Socket管道

Java NIO中的 SocketChannel 是一個鏈接到 TCP 網絡套接字的通道。能夠經過如下2種方式建立 SocketChannel:

  • 打開一個SocketChannel並鏈接到互聯網上的某臺服務器。
  • 一個新鏈接到達 ServerSocketChannel 時,會建立一個 SocketChannel。

打開 SocketChannel

下面是SocketChannel的打開方式:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com",80));

從 SocketChannel 讀取數據

要從SocketChannel中讀取數據,調用一個read()的方法之一。如下是例子:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

首先,分配一個Buffer。從SocketChannel讀取到的數據將會放到這個Buffer中。

而後,調用SocketChannel.read()。該方法將數據從SocketChannel 讀到Buffer中。read()方法返回的int值表示讀了多少字節進Buffer裏。若是返回的是-1,表示已經讀到了流的末尾(鏈接關閉了)。

寫入 SocketChannel

寫數據到SocketChannel用的是SocketChannel.write()方法,該方法以一個Buffer做爲參數。示例以下:

String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

注意SocketChannel.write()方法的調用是在一個while循環中的。Write()方法沒法保證能寫多少字節到SocketChannel。因此,咱們重複調用write()直到Buffer沒有要寫的字節爲止。

非阻塞模式

能夠設置 SocketChannel 爲非阻塞模式(non-blocking mode).設置以後,就能夠在異步模式下調用connect(), read() 和write()了。

connect()

若是SocketChannel在非阻塞模式下,此時調用connect(),該方法可能在鏈接創建以前就返回了。爲了肯定鏈接是否創建,能夠調用finishConnect()的方法。像這樣:

socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
while(! socketChannel.finishConnect() ){
    //wait, or do something else...
}

write()

非阻塞模式下,write()方法在還沒有寫出任何內容時可能就返回了。因此須要在循環中調用write()。前面已經有例子了,這裏就不贅述了。

read()

非阻塞模式下,read()方法在還沒有讀取到任何數據時可能就返回了。因此須要關注它的int返回值,它會告訴你讀取了多少字節。

不錯的小例子

一下是來自網絡的一個小例子,我的以爲很不錯,就貼到這裏。

class NioClient {
    //管道管理器
    private Selector selector;

    public NioClient init(String serverIp, int port) throws IOException{
        //獲取socket通道
        SocketChannel channel = SocketChannel.open();

        channel.configureBlocking(false);
        //得到通道管理器
        selector=Selector.open();

        //客戶端鏈接服務器,須要調用channel.finishConnect();才能實際完成鏈接。
        channel.connect(new InetSocketAddress(serverIp, port));
        //爲該通道註冊SelectionKey.OP_CONNECT事件
        channel.register(selector, SelectionKey.OP_CONNECT);
        return this;
    }

    public void listen() throws IOException{
        System.out.println("客戶端啓動");
        //輪詢訪問selector
        while(true){
            //選擇註冊過的io操做的事件(第一次爲SelectionKey.OP_CONNECT)
            selector.select();
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while(ite.hasNext()){
                SelectionKey key = ite.next();
                //刪除已選的key,防止重複處理
                ite.remove();
                if(key.isConnectable()){
                    SocketChannel channel=(SocketChannel)key.channel();

                    //若是正在鏈接,則完成鏈接
                    if(channel.isConnectionPending()){
                        channel.finishConnect();
                    }

                    channel.configureBlocking(false);
                    //向服務器發送消息
                    channel.write(ByteBuffer.wrap(new String("send message to server.").getBytes()));

                    //鏈接成功後,註冊接收服務器消息的事件
                    channel.register(selector, SelectionKey.OP_READ);
                    System.out.println("客戶端鏈接成功");
                }else if(key.isReadable()){ //有可讀數據事件。
                    SocketChannel channel = (SocketChannel)key.channel();

                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    channel.read(buffer);
                    byte[] data = buffer.array();
                    String message = new String(data);

                    System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);
//                    ByteBuffer outbuffer = ByteBuffer.wrap(("client.".concat(msg)).getBytes());
//                    channel.write(outbuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new NioClient().init("127.0.0.1", 9981).listen();
    }
}



class NioServer {
    //通道管理器
    private Selector selector;

    //獲取一個ServerSocket通道,並初始化通道
    public NioServer init(int port) throws IOException{
        //獲取一個ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(port));
        //獲取通道管理器
        selector=Selector.open();
        //將通道管理器與通道綁定,併爲該通道註冊SelectionKey.OP_ACCEPT事件,
        //只有當該事件到達時,Selector.select()會返回,不然一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        return this;
    }

    public void listen() throws IOException{
        System.out.println("服務器端啓動成功");

        //使用輪詢訪問selector
        while(true){
            //當有註冊的事件到達時,方法返回,不然阻塞。
            selector.select();

            //獲取selector中的迭代器,選中項爲註冊的事件
            Iterator<SelectionKey> ite=selector.selectedKeys().iterator();

            while(ite.hasNext()){
                SelectionKey key = ite.next();
                //刪除已選key,防止重複處理
                ite.remove();
                //客戶端請求鏈接事件
                if(key.isAcceptable()){
                    ServerSocketChannel server = (ServerSocketChannel)key.channel();
                    //得到客戶端鏈接通道
                    SocketChannel channel = server.accept();
                    channel.configureBlocking(false);
                    //向客戶端發消息
                    channel.write(ByteBuffer.wrap(new String("send message to client").getBytes()));
                    //在與客戶端鏈接成功後,爲客戶端通道註冊SelectionKey.OP_READ事件。
                    channel.register(selector, SelectionKey.OP_READ);

                    System.out.println("客戶端請求鏈接事件");
                }else if(key.isReadable()){//有可讀數據事件
                    //獲取客戶端傳輸數據可讀取消息通道。
                    SocketChannel channel = (SocketChannel)key.channel();
                    //建立讀取數據緩衝器
                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    int read = channel.read(buffer);
                    byte[] data = buffer.array();
                    String message = new String(data);

                    System.out.println("receive message from client, size:" + buffer.position() + " msg: " + message);
//                    ByteBuffer outbuffer = ByteBuffer.wrap(("server.".concat(msg)).getBytes());
//                    channel.write(outbuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new NioServer().init(9981).listen();
    }
}

Datagram 通道

Java NIO中的DatagramChannel是一個能收發UDP包的通道。由於UDP是無鏈接的網絡協議,因此不能像其它通道那樣讀取和寫入。它發送和接收的是數據包。

Datagram 通道就做爲你們自學的內容。

管道(Pipe)

Java NIO 管道是2個線程之間的單向數據鏈接。Pipe有一個source通道和一個sink通道。數據會被寫到sink通道,從source通道讀取。

這裏寫圖片描述

建立管道

經過Pipe.open()方法打開管道。例如:

Pipe pipe = Pipe.open();

向管道寫數據

要向管道寫數據,須要訪問sink通道。像這樣:

Pipe.SinkChannel sinkChannel = pipe.sink();

經過調用SinkChannel的write()方法,將數據寫入SinkChannel,像這樣:

String newData = "New String to write to file..." + System.currentTimeMillis();  
ByteBuffer buf = ByteBuffer.allocate(48);  
buf.clear();  
buf.put(newData.getBytes());  
buf.flip();  
while(buf.hasRemaining()) {  
   <b>sinkChannel.write(buf);</b>  
}

從管道讀取數據

從讀取管道的數據,須要訪問source通道,像這樣:

Pipe.SourceChannel sourceChannel = pipe.source();

調用source通道的read()方法來讀取數據,像這樣:

ByteBuffer buf = ByteBuffer.allocate(48);  
int bytesRead = inChannel.read(buf);

read()方法返回的int值會告訴咱們多少字節被讀進了緩衝區。

簡單完整實例

//獲取管道
Pipe pipe = Pipe.open();
//獲取Sink 管道
Pipe.SinkChannel sinkChannel = pipe.sink();
//須要寫入數據
String newData = "New String to write to file..." + System.currentTimeMillis();
//新建緩存區
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
//緩存區存放數據
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}
//獲取Source 管道
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf2 = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf2);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    //buf.flip()的調用,首先讀取數據到Buffer,而後反轉Buffer,接着再從Buffer中讀取數據(注:flip:空翻,反轉)
    buf.flip();
    //判斷是否有剩餘(注:Remaining:剩餘的)
    while(buf.hasRemaining()){
        System.out.print((char) buf.get());
    }
    buf.clear();
    bytesRead = sourceChannel.read(buf);
}
sourceChannel.close();
sinkChannel.close();

AIO編程

AIO的特色

  • 讀完了再通知我

  • 不會加快IO,只是在讀完後進行通知

  • 使用回調函數,進行業務處理

AIO的相關代碼:

//AsynchronousServerSocketChannel類
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT));

使用server上的accept方法

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

CompletionHandler爲回調接口,當有客戶端accept以後,就作handler中的事情。

NIO與AIO區別

  • NIO是同步非阻塞的,AIO是異步非阻塞的
  • 因爲NIO的讀寫過程依然在應用線程裏完成,因此對於那些讀寫過程時間長的,NIO就不太適合。而AIO的讀寫過程完成後才被通知,因此AIO可以勝任那些重量級,讀寫過程長的任務。

參考文獻二

1、看圖

網上不少IO資料,對新手來講,越看越暈。根據本身的理解,總結對比了一下BIO、NIO、AIO。

BIO:線程發起IO請求,無論內核是否準備好IO操做,從發起請求起,線程一直阻塞,直到操做完成。以下圖:

NIO(reactor模型):線程發起IO請求,當即返回;內核在作好IO操做的準備以後,經過調用註冊的回調函數通知線程作IO操做,線程開始阻塞,直到操做完成。以下圖:

AIO(proactor模型):線程發起IO請求,當即返回;內存作好IO操做的準備以後,作IO操做,直到操做完成或者失敗,經過調用註冊的回調函數通知線程作IO操做完成或者失敗。以下圖:

2、詳解

 

一、BIO

     在JDK1.4出來以前,咱們創建網絡鏈接的時候採用BIO模式,須要先在服務端啓動一個ServerSocket,而後在客戶端啓動Socket來對服務端進行通訊,默認狀況下服務端須要對每一個請求創建一堆線程等待請求,而客戶端發送請求後,先諮詢服務端是否有線程相應,若是沒有則會一直等待或者遭到拒絕請求,若是有的話,客戶端會線程會等待請求結束後才繼續執行。

二、NIO

    NIO自己是基於事件驅動思想來完成的,其主要想解決的是BIO的大併發問題: 在使用同步I/O的網絡應用中,若是要同時處理多個客戶端請求,或是在客戶端要同時和多個服務器進行通信,就必須使用多線程來處理。也就是說,將每個客戶端請求分配給一個線程來單獨處理。這樣作雖然能夠達到咱們的要求,但同時又會帶來另一個問題。因爲每建立一個線程,就要爲這個線程分配必定的內存空間(也叫工做存儲器),並且操做系統自己也對線程的總數有必定的限制。若是客戶端的請求過多,服務端程序可能會由於不堪重負而拒絕客戶端的請求,甚至服務器可能會所以而癱瘓。

    NIO基於Reactor,當socket有流可讀或可寫入socket時,操做系統會相應的通知引用程序進行處理,應用再將流讀取到緩衝區或寫入操做系統。  也就是說,這個時候,已經不是一個鏈接就要對應一個處理線程了,而是有效的請求,對應一個線程,當鏈接沒有數據時,是沒有工做線程來處理的。

   BIO與NIO一個比較重要的不一樣,是咱們使用BIO的時候每每會引入多線程,每一個鏈接一個單獨的線程;而NIO則是使用單線程或者只使用少許的多線程,每一個鏈接共用一個線程。

      NIO的最重要的地方是當一個鏈接建立後,不須要對應一個線程,這個鏈接會被註冊到多路複用器上面,因此全部的鏈接只須要一個線程就能夠搞定,當這個線程中的多路複用器進行輪詢的時候,發現鏈接上有請求的話,纔開啓一個線程進行處理,也就是一個請求一個線程模式。

      在NIO的處理方式中,當一個請求來的話,開啓線程進行處理,可能會等待後端應用的資源(JDBC鏈接等),其實這個線程就被阻塞了,當併發上來的話,仍是會有BIO同樣的問題。

  HTTP/1.1出現後,有了Http長鏈接,這樣除了超時和指明特定關閉的http header外,這個連接是一直打開的狀態的,這樣在NIO處理中能夠進一步的進化,在後端資源中能夠實現資源池或者隊列,當請求來的話,開啓的線程把請求和請求數據傳送給後端資源池或者隊列裏面就返回,而且在全局的地方保持住這個現場(哪一個鏈接的哪一個請求等),這樣前面的線程仍是能夠去接受其餘的請求,然後端的應用的處理只須要執行隊列裏面的就能夠了,這樣請求處理和後端應用是異步的.當後端處理完,到全局地方獲得現場,產生響應,這個就實現了異步處理。

三、AIO

     與NIO不一樣,當進行讀寫操做時,只須直接調用API的read或write方法便可。這兩種方法均爲異步的,對於讀操做而言,當有流可讀取時,操做系統會將可讀的流傳入read方法的緩衝區,並通知應用程序;對於寫操做而言,當操做系統將write方法傳遞的流寫入完畢時,操做系統主動通知應用程序。  便可以理解爲,read/write方法都是異步的,完成後會主動調用回調函數。  在JDK1.7中,這部份內容被稱做NIO.2,主要在Java.nio.channels包下增長了下面四個異步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

其中的read/write方法,會返回一個帶回調函數的對象,當執行完讀取/寫入操做後,直接調用回調函數。

BIO是一個鏈接一個線程。

NIO是一個請求一個線程。

AIO是一個有效請求一個線程。

先來個例子理解一下概念,以銀行取款爲例: 

  • 同步 : 本身親自出馬持銀行卡到銀行取錢(使用同步IO時,Java本身處理IO讀寫);
  • 異步 : 委託一小弟拿銀行卡到銀行取錢,而後給你(使用異步IO時,Java將IO讀寫委託給OS處理,須要將數據緩衝區地址和大小傳給OS(銀行卡和密碼),OS須要支持異步IO操做API);
  • 阻塞 : ATM排隊取款,你只能等待(使用阻塞IO時,Java調用會一直阻塞到讀寫完成才返回);
  • 非阻塞 : 櫃檯取款,取個號,而後坐在椅子上作其它事,等號廣播會通知你辦理,沒到號你就不能去,你能夠不斷問大堂經理排到了沒有,大堂經理若是說還沒到你就不能去(使用非阻塞IO時,若是不能讀寫Java調用會立刻返回,當IO事件分發器會通知可讀寫時再繼續進行讀寫,不斷循環直到讀寫完成)

Java對BIO、NIO、AIO的支持:

  • Java BIO : 同步並阻塞,服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理,若是這個鏈接不作任何事情會形成沒必要要的線程開銷,固然能夠經過線程池機制改善。

  • Java NIO : 同步非阻塞,服務器實現模式爲一個請求一個線程,即客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。

  • Java AIO(NIO.2) : 異步非阻塞,服務器實現模式爲一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理,

BIO、NIO、AIO適用場景分析:

  • BIO方式適用於鏈接數目比較小且固定的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,JDK1.4之前的惟一選擇,但程序直觀簡單易理解。

  • NIO方式適用於鏈接數目多且鏈接比較短(輕操做)的架構,好比聊天服務器,併發侷限於應用中,編程比較複雜,JDK1.4開始支持。

  • AIO方式使用於鏈接數目多且鏈接比較長(重操做)的架構,好比相冊服務器,充分調用OS參與併發操做,編程比較複雜,JDK7開始支持。

另外,I/O屬於底層操做,須要操做系統支持,併發也須要操做系統的支持,因此性能方面不一樣操做系統差別會比較明顯。

在高性能的I/O設計中,有兩個比較著名的模式Reactor和Proactor模式,其中Reactor模式用於同步I/O,而Proactor運用於異步I/O操做。

    在比較這兩個模式以前,咱們首先的搞明白幾個概念,什麼是阻塞和非阻塞,什麼是同步和異步,同步和異步是針對應用程序和內核的交互而言的,同步指的是用戶進程觸發IO操做並等待或者輪詢的去查看IO操做是否就緒,而異步是指用戶進程觸發IO操做之後便開始作本身的事情,而當IO操做已經完成的時候會獲得IO完成的通知。而阻塞和非阻塞是針對於進程在訪問數據的時候,根據IO操做的就緒狀態來採起的不一樣方式,說白了是一種讀取或者寫入操做函數的實現方式,阻塞方式下讀取或者寫入函數將一直等待,而非阻塞方式下,讀取或者寫入函數會當即返回一個狀態值。

 通常來講I/O模型能夠分爲:同步阻塞,同步非阻塞,異步阻塞,異步非阻塞IO

同步阻塞IO:在此種方式下,用戶進程在發起一個IO操做之後,必須等待IO操做的完成,只有當真正完成了IO操做之後,用戶進程才能運行。JAVA傳統的IO模型屬於此種方式!

同步非阻塞IO:在此種方式下,用戶進程發起一個IO操做之後邊可返回作其它事情,可是用戶進程須要時不時的詢問IO操做是否就緒,這就要求用戶進程不停的去詢問,從而引入沒必要要的CPU資源浪費。其中目前JAVA的NIO就屬於同步非阻塞IO。

異步阻塞IO:此種方式下是指應用發起一個IO操做之後,不等待內核IO操做的完成,等內核完成IO操做之後會通知應用程序,這其實就是同步和異步最關鍵的區別,同步必須等待或者主動的去詢問IO是否完成,那麼爲何說是阻塞的呢?由於此時是經過select系統調用來完成的,而select函數自己的實現方式是阻塞的,而採用select函數有個好處就是它能夠同時監聽多個文件句柄,從而提升系統的併發性!

 異步非阻塞IO:在此種模式下,用戶進程只須要發起一個IO操做而後當即返回,等IO操做真正的完成之後,應用程序會獲得IO操做完成的通知,此時用戶進程只須要對數據進行處理就行了,不須要進行實際的IO讀寫操做,由於真正的IO讀取或者寫入操做已經由內核完成了。

相關文章
相關標籤/搜索