Java的BIO,NIO,AIO

  Java中的IO操做可謂常見。在Java的IO體系中,常有些名詞容易讓人困惑不解。爲此,先通俗地介紹下這些名詞。  java

   1 什麼是同步?
  2 什麼是異步?
  3 什麼是阻塞?
  4 什麼是非阻塞?
  5 什麼是同步阻塞?
  6 什麼是同步非阻塞?
  7 什麼是異步阻塞?
  8 什麼是異步非阻塞?

   咱們先來弄明白什麼是同步,異步,阻塞,非阻塞,只有這幾個概念理解清楚了,而後再組合起來理解,就相對比較容易了。react

  同步和異步是針對應用程序和內核的交互方式而言的。
  阻塞和非阻塞是針對於進程在訪問數據的時候,根據IO操做的就緒狀態來採起的不一樣方式。其中,阻塞方式下讀取或者寫入函數將一直等待;而在非阻塞方式下,讀取或者寫入函數會當即返回一個狀態值。  編程

  具體來講以下:windows

1.同步指的是用戶進程觸發IO操做並等待或者輪詢地去查看IO操做是否就緒。
2.異步是指用戶進程觸發IO操做之後便開始作本身的事情,而當IO操做已經完成的時候會獲得IO完成的通知(異步的特色就是通知)。
3.阻塞是指, 當試圖對該文件描述符進行讀寫時, 若是當時沒有東西可讀,或者暫時不可寫, 線程就進入等待狀態, 直到有東西可讀或者可寫爲止。
4.非阻塞是指,若是沒有東西可讀, 或者不可寫, 讀寫函數立刻返回, 而不會等待。

   咱們有了上述基礎後,再接下來進一步學習。設計模式

  Java中的IO方式一般分爲幾種,即:同步阻塞的BIO、同步非阻塞的NIO、異步非阻塞的AIO。api

  IO爲同步阻塞形式,NIO爲同步非阻塞形式,NIO並無實現異步,在JDK1.7後升級NIO庫包,開始支持異步非阻塞(AIO)。數組

  1.BIO:同步阻塞式IO,服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理,若是這個鏈接不作任何事情會形成沒必要要的線程開銷,固然能夠經過線程池機制改善。緩存

  在JDK1.4出來以前,咱們創建網絡鏈接的時候採用BIO模式,須要先在服務端啓動一個ServerSocket,而後在客戶端啓動Socket來對服務端進行通訊。默認狀況下服務端須要對每一個請求創建一堆線程等待請求,而客戶端發送請求後,先諮詢服務端是否有線程響應,若是沒有則會一直等待或者遭到拒絕請求,若是有的話,客戶端線程會在等待請求結束後繼續執行。服務器

  2.NIO即non-blocking IO,是指jdk1.4 及以上版本里提供的新api(New IO) ,爲全部的原始類型(boolean類型除外)提供緩存支持的數據容器。網絡

  NIO(reactor模型,同步IO):同步非阻塞式IO,服務器實現模式爲一個請求一個線程,即客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。

  NIO主要想解決的是BIO的大併發問題: 在使用同步I/O的網絡應用中,若是要同時處理多個客戶端請求,或是在客戶端要同時和多個服務器進行通信,就必須使用多線程來處理。也就是說,將每個客戶端請求分配給一個線程來單獨處理。這樣作雖然能夠達到咱們的要求,但同時又會帶來另一個問題:因爲每建立一個線程,就要爲這個線程分配必定的內存空間(也叫工做存儲器),而操做系統自己對線程的總數也會有必定的限制,若是客戶端的請求過多,服務端程序可能會由於不堪重負而拒絕客戶端的請求,服務器甚至也可能會所以而癱瘓。NIO基於Reactor,當socket有流可讀或可寫入socket時,操做系統會相應地通知應用程序進行處理,應用再將流讀取到緩衝區或寫入操做系統。    

  NIO的最重要的地方是當一個鏈接建立後,不須要對應一個線程,這個鏈接會被註冊到多路複用器上面,因此全部的鏈接只須要一個線程就能夠搞定,當這個線程中的多路複用器進行輪詢的時候,發現鏈接上有請求的話,纔開啓一個線程進行處理,也就是一個請求一個線程模式。

  NIO主要有三大核心部分:Channel(通道),Buffer(緩衝區), Selector。傳統IO基於字節流和字符流進行操做,而NIO基於Channel和Buffer(緩衝區)進行操做,數據老是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(選擇區)用於監聽多個通道的事件(好比:鏈接打開,數據到達)。所以,單個線程能夠監聽多個數據通道。

  NIO和傳統IO(一下簡稱IO)之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取全部字節,它們沒有被緩存在任何地方;此外,它不能先後移動流中的數據。若是須要先後移動從流中讀取的數據,須要先將它緩存到一個緩衝區。NIO的緩衝導向方法略有不一樣。數據讀取到一個它稍後處理的緩衝區,須要時可在緩衝區中先後移動。這就增長了處理過程當中的靈活性。可是,還須要檢查該緩衝區中是否包含全部你須要處理的數據。並且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區裏還沒有處理的數據。
  IO的各類流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據徹底寫入。該線程在此期間不能再幹任何事情了。 NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,可是它僅能獲得目前可用的數據,若是目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,因此直至數據變的能夠讀取以前,該線程能夠繼續作其餘的事情。非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不須要等待它徹底寫入,這個線程同時能夠去作別的事情。 線程一般將非阻塞IO的空閒時間用於在其它通道上執行IO操做,因此一個單獨的線程如今能夠管理多個輸入和輸出通道(channel)。

  NIO中的Channel和BIO中的Stream(流)基本上是一個等級的。只不過Stream是單向的,如InputStream, OutputStream;而Channel是雙向的,既能夠用來進行讀操做,又能夠用來進行寫操做。

  NIO中的Channel的主要實現有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

  見名知義,上述Channel分別能夠對應文件IO、UDP和TCP(Server和Client)。

  NIO中的Buffer實現有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,它們分別對應基本數據類型: byte, char, double, float, int, long, short。另外,NIO中還有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等。

  NIO中的Selector運行單線程處理多個Channel,若是你的應用打開了多個通道,但每一個鏈接的流量都很低,使用Selector就會很方便。例如在一個聊天服務器中。要使用Selector, 得向Selector註冊Channel,而後調用它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件。

  案例1:傳統IO方式讀取文件:

static void bioMethod() {
        InputStream in = null;
        try {
            in = new BufferedInputStream(new FileInputStream("src/com/itszt/test6/io.txt"));
            byte[] buf = new byte[1024];
            int bytesRead = in.read(buf);
            while (bytesRead != -1) {
                for (int i = 0; i < bytesRead; i++){
                    System.out.print((char) buf[i]);
                }
                bytesRead = in.read(buf);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

   案例2:經過NIO的方式來處理上述文件讀取問題:

static void nioMethod() {
        RandomAccessFile aFile = null;
        try {
            aFile = new RandomAccessFile("src/com/itszt/test6/io.txt", "rw");
            FileChannel fileChannel = aFile.getChannel();
            ByteBuffer buf = ByteBuffer.allocate(1024);

            int bytesRead = fileChannel.read(buf);//返回字符的數量
            System.out.println(bytesRead);

            while (bytesRead != -1) {
                buf.flip();
                while (buf.hasRemaining()) {
                    System.out.print((char) buf.get());
                }

                buf.compact();
                bytesRead = fileChannel.read(buf);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (aFile != null) {
                    aFile.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

   經過案例2,能夠總結出使用Buffer時,通常遵循下面幾個步驟:

  • 分配空間(ByteBuffer buf = ByteBuffer.allocate(1024); )
  • 寫入數據到Buffer(int bytesRead = fileChannel.read(buf);)
  • 調用filp()方法( buf.flip();)
  • 從Buffer中讀取數據(System.out.print((char)buf.get());)
  • 調用clear()方法或者compact()方法

  Buffer顧名思義是緩衝區的意思,它其實是一個容器,至關於一個連續數組。Channel提供從文件、網絡讀取數據的渠道,可是讀寫的數據都必須通過Buffer。

  向Buffer中寫數據:

  • 從Channel寫到Buffer (fileChannel.read(buf))
  • 經過Buffer的put()方法 (buf.put(…))

  從Buffer中讀取數據:

  • 從Buffer讀取到Channel (channel.write(buf))
  • 使用get()方法從Buffer中讀取數據 (buf.get())

  能夠把Buffer簡單地理解爲一組基本數據類型的元素列表,它經過幾個變量來保存這個數據的當前位置狀態:capacity, position, limit, mark:

索引 說明
capacity 緩衝區數組的總長度
position 下一個要操做的數據元素的位置
limit 緩衝區數組中不可操做的下一個元素的位置:limit<=capacity
mark 用於記錄當前position的前一個位置或者默認是0

  處理大文件,通常用BufferedReader,BufferedInputStream這類帶緩衝的IO類,不過若是文件超大的話,更快的方式是採用MappedByteBuffer。

  MappedByteBuffer是NIO引入的文件內存映射方案,讀寫性能極高。NIO最主要的就是實現了對異步操做的支持。其中一種經過把一個套接字通道(SocketChannel)註冊到一個選擇器(Selector)中,不時調用後者的選擇(select)方法就能返回知足的選擇鍵(SelectionKey),鍵中包含了SOCKET事件信息。這就是select模型。

  SocketChannel的讀寫是經過一個類叫ByteBuffer來操做的。ByteBuffer有兩種模式:直接/間接。間接模式最典型的就是HeapByteBuffer,即操做堆內存 (byte[]),可是內存畢竟有限,若是發送一個超大空間的文件(如1GB),這時就必須使用」直接」模式,即 MappedByteBuffer,文件映射。

  咱們先來看操做系統的內存管理。通常操做系統的內存分兩部分:物理內存;虛擬內存。虛擬內存通常使用的是頁面映像文件,即硬盤中的某個(某些)特殊文件。操做系統負責頁面文件內容的讀寫,這個過程叫」頁面中斷/切換」。 MappedByteBuffer 是一種特殊的ByteBuffer,是ByteBuffer的子類。 MappedByteBuffer 將文件直接映射到內存(這裏的內存指的是虛擬內存,並非物理內存)。一般,能夠映射整個文件,若是文件比較大的話能夠分段進行映射,只要指定文件的哪一個部分便可。

  FileChannel提供了map方法把文件影射爲內存映像文件:
  MappedByteBuffer map(int mode,long position,long size); 能夠把文件從position開始的size大小的區域映射爲內存映像文件,mode指出了 可訪問該內存映像文件的方式:  

  • READ_ONLY,(只讀): 試圖修改獲得的緩衝區將致使拋出 ReadOnlyBufferException.(MapMode.READ_ONLY)
  • READ_WRITE(讀/寫): 對獲得的緩衝區的更改最終將傳播到文件;該更改對映射到同一文件的其餘程序不必定是可見的。 (MapMode.READ_WRITE)
  • PRIVATE(專用): 對獲得的緩衝區的更改不會傳播到文件,而且該更改對映射到同一文件的其餘程序也不是可見的;相反,會建立緩衝區已修改部分的專用副本。 (MapMode.PRIVATE)

  MappedByteBuffer是ByteBuffer的子類,其擴充了三個方法:

  • force():緩衝區是READ_WRITE模式下,此方法對緩衝區內容的修改強行寫入文件;
  • load():將緩衝區的內容載入內存,並返回該緩衝區的引用;
  • isLoaded():若是緩衝區的內容在物理內存中,則返回真,不然返回假;

  案例3:採用MappedByteBuffer讀取文件:

static void mapNIOMethod() {
        RandomAccessFile aFile = null;
        FileChannel fc = null;
        try {
            aFile = new RandomAccessFile("src/com/itszt/test6/io.txt",
                    "rw");
            fc = aFile.getChannel();
            long timeBegin = System.currentTimeMillis();
            MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_ONLY, 0, aFile.length());
            long timeEnd = System.currentTimeMillis();
            System.out.println("Read time: " + (timeEnd - timeBegin) + "ms");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (aFile != null) {
                    aFile.close();
                }
                if (fc != null) {
                    fc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

   咱們再看下scatter和gather。

  分散(scatter)從Channel中讀取是指在讀操做時將讀取的數據寫入多個buffer中。所以,Channel將從Channel中讀取的數據「分散(scatter)」到多個Buffer中。

  彙集(gather)寫入Channel是指在寫操做時將多個buffer的數據寫入同一個Channel。所以,Channel 將多個Buffer中的數據「彙集(gather)」後發送到Channel。

  scatter / gather常常用於將須要傳輸的數據分開處理的場合,例如傳輸一個由消息頭和消息體組成的消息,你可能會將消息體和消息頭分散到不一樣的buffer中,這樣你能夠方便的處理消息頭和消息體。

static void gather() {
        ByteBuffer header = ByteBuffer.allocate(10);
        ByteBuffer body = ByteBuffer.allocate(10);

        byte[] b1 = {'0', '1'};
        byte[] b2 = {'2', '3'};
        header.put(b1);
        body.put(b2);

        ByteBuffer[] buffs = {header, body};

        try {
            FileOutputStream os = new FileOutputStream("src/com/itszt/test6/io.txt");
            FileChannel channel = os.getChannel();
            channel.write(buffs);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

   FileChannel的transferFrom()方法能夠將數據從源通道傳輸到FileChannel中:

static void transferFrom() {
        RandomAccessFile fromFile = null;
        RandomAccessFile toFile = null;
        try {
            fromFile = new RandomAccessFile("src/com/itszt/test6/fromFile.xml", "rw");
            FileChannel fromChannel = fromFile.getChannel();
            toFile = new RandomAccessFile("src/com/itszt/test6/toFile.txt", "rw");
            FileChannel toChannel = toFile.getChannel();

            long position = 0;
            long count = fromChannel.size();
            System.out.println(count);
            toChannel.transferFrom(fromChannel, position, count);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (fromFile != null) {
                    fromFile.close();
                }
                if (toFile != null) {
                    toFile.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

   方法的輸入參數position表示從position處開始向目標文件寫入數據,count表示最多傳輸的字節數。若是源通道的剩餘空間小於 count 個字節,則所傳輸的字節數要小於請求的字節數。此外要注意,在SoketChannel的實現中,SocketChannel只會傳輸此刻準備好的數據(可能不足count字節)。所以,SocketChannel可能不會將請求的全部數據(count個字節)所有傳輸到FileChannel中。

  transferTo()方法將數據從FileChannel傳輸到其餘的channel中:

static void transferTo() {
        RandomAccessFile fromFile = null;
        RandomAccessFile toFile = null;
        try {
            fromFile = new RandomAccessFile("src/com/itszt/test6/fromFile.xml", "rw");
            FileChannel fromChannel = fromFile.getChannel();
            toFile = new RandomAccessFile("src/com/itszt/test6/toFile.txt", "rw");
            FileChannel toChannel = toFile.getChannel();

            long position = 0;
            long count = fromChannel.size();
            System.out.println(count);
            fromChannel.transferTo(position, count, toChannel);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (fromFile != null) {
                    fromFile.close();
                }
                if (toFile != null) {
                    toFile.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

   NIO 管道Pipe是2個線程之間的單向數據鏈接。Pipe有一個source通道和一個sink通道。數據會被寫到sink通道,從source通道讀取。以下:

static void pipeMethod() {
        Pipe pipe = null;
        ExecutorService exec = Executors.newFixedThreadPool(2);
        try {
            pipe = Pipe.open();
            final Pipe pipeTemp = pipe;

            exec.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    Pipe.SinkChannel sinkChannel = pipeTemp.sink();//向通道中寫數據
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        String newData = "Pipe Test At Time " +
                                System.currentTimeMillis();
                        ByteBuffer buf = ByteBuffer.allocate(1024);
                        buf.clear();
                        buf.put(newData.getBytes());
                        buf.flip();
                        while (buf.hasRemaining()) {
                            System.out.println(buf);
                            sinkChannel.write(buf);
                        }
                    }
                }
            });

            exec.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    Pipe.SourceChannel sourceChannel = pipeTemp.source();//向通道中讀數據
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        ByteBuffer buf = ByteBuffer.allocate(1024);
                        buf.clear();
                        int bytesRead = sourceChannel.read(buf);
                        System.out.println("bytesRead=" + bytesRead);
                        while (bytesRead > 0) {
                            buf.flip();
                            byte b[] = new byte[bytesRead];
                            int i = 0;
                            while (buf.hasRemaining()) {
                                b[i] = buf.get();
                                System.out.printf("%X", b[i]);
                                i++;
                            }
                            String s = new String(b);
                            System.out.println("=================||" + s);
                            bytesRead = sourceChannel.read(buf);
                        }
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            exec.shutdown();
        }
    }

   NIO中的DatagramChannel是一個能收發UDP包的通道。由於UDP是無鏈接的網絡協議,因此不能像其它通道那樣讀取和寫入。它發送和接收的是數據包。以下:

static void recive() {
        DatagramChannel channel = null;
        try {
            channel = DatagramChannel.open();
            channel.socket().bind(new InetSocketAddress(8888));
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.clear();
            channel.receive(buf);
            buf.flip();
            while (buf.hasRemaining()) {
                System.out.print((char) buf.get());
            }
            System.out.println();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static void send() {
        DatagramChannel channel = null;
        try {
            channel = DatagramChannel.open();
            String info = "I'm the Sender!";
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.clear();
            buf.put(info.getBytes());
            buf.flip();

            int bytesSent = channel.send(buf, new InetSocketAddress("127.0.0.1",
                    8888));
            //System.out.println(bytesSent);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(2);
        exec.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                while (true){
                    recive();
                }
            }
        });
        exec.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                while (true){
                    send();
                }
            }
        });

    }

   咱們再來看TCP的NIO實現。

  NIO的強大功能部分來自於Channel的非阻塞特性,套接字的某些操做可能會無限期地阻塞。例如,對accept()方法的調用可能會由於等待一個客戶端鏈接而阻塞;對read()方法的調用可能會由於沒有數據可讀而阻塞,直到鏈接的另外一端傳來新的數據。總的來講,建立/接收鏈接或讀寫數據等I/O調用,均可能無限期地阻塞等待。慢速的,有損耗的網絡,或僅僅是簡單的網絡故障均可能致使任意時間的延遲。然而不幸的是,在調用一個方法以前沒法知道其是否阻塞。NIO的channel抽象的一個重要特徵就是能夠經過配置它的阻塞行爲,以實現非阻塞式的信道。

channel.configureBlocking(false)

   在非阻塞式信道上調用一個方法老是會當即返回。這種調用的返回值指示了所請求的操做完成的程度。例如,在一個非阻塞式ServerSocketChannel上調用accept()方法,若是有鏈接請求來了,則返回客戶端SocketChannel,不然返回null。

  Selector類能夠用於避免使用阻塞式客戶端中很浪費資源的「忙等」方法。例如,考慮一個IM服務器。像QQ或者旺旺這樣的,可能有幾萬甚至幾千萬個客戶端同時鏈接到了服務器,但在任什麼時候刻都只是很是少許的消息。

  這就須要一種方法阻塞等待,直到至少有一個信道能夠進行I/O操做,並指出是哪一個信道。NIO的選擇器就實現了這樣的功能。
  一個Selector實例能夠同時檢查一組信道的I/O狀態,它像是一個多路開關選擇器,由於一個選擇器可以管理多個信道上的I/O操做。
  若是用傳統的方式來處理衆多客戶端,使用的方法是循環地一個一個地去檢查全部的客戶端是否有I/O操做,若是當前客戶端有I/O操做,則可能把當前客戶端扔給一個線程池去處理,若是沒有I/O操做則進行下一個輪詢,當全部的客戶端都輪詢過了又接着從頭開始輪詢。這種方法是很是耗時並且浪費資源的,由於大部分客戶端是沒有I/O操做的。
  Selector就不同了,它在內部能夠同時管理多個I/O,當一個信道有I/O操做的時候,它會通知Selector,因而,Selector就記住這個信道有I/O操做,而且知道是何種I/O操做(讀,寫,或者接受新的鏈接)。所以,若是使用Selector,它返回的結果只有兩種,一種是0,即在你調用的時刻沒有任何客戶端須要I/O操做;另外一種結果是一組須要I/O操做的客戶端。這種通知的方式比主動輪詢的方式要高效得多。

NIO的通訊步驟:
①建立ServerSocketChannel,爲其配置非阻塞模式。
②綁定監聽,配置TCP參數,錄入backlog大小等。
③建立一個獨立的IO線程,用於輪詢多路複用器Selector。
④建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,並設置監聽標識位SelectionKey.OP_ACCEPT。
⑤啓動IO線程,在循環體中執行Selector.select()方法,輪詢就緒的通道。
⑥當輪詢處處於就緒狀態的通道時,須要進行操做位判斷,若是是ACCEPT狀態,說明是新的客戶端接入,則調用accept方法接收新的客戶端。
⑦設置新接入客戶端的一些參數,如非阻塞,並將其繼續註冊到Selector上,設置監聽標識位等。
⑧若是輪詢的通道標識位是READ,則進行讀取,構造Buffer對象等。
⑨更細節的問題還有數據沒發送完成繼續發送的問題...... 

  要使用選擇器(Selector),須要建立一個Selector實例(使用靜態工廠方法open())並將其註冊(register)到想要監控的信道上(注意,這要經過channel的方法實現,而不是使用selector的方法)。最後,調用選擇器的select()方法。該方法會阻塞等待,直到有一個或更多的信道準備好了I/O操做或等待超時。select()方法將返回可進行I/O操做的信道數量。如今,在一個單獨的線程中,經過調用select()方法就能檢查多個信道是否準備好進行I/O操做。若是通過一段時間後仍然沒有信道準備好,select()方法就會返回0,並容許程序繼續執行其餘任務。  

package com.itszt.test6;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * TCP,NIO
 */
public class TCPNIOtest {
    private static final int BUF_SIZE = 1024;
    private static final int PORT = 8080;
    private static final int TIMEOUT = 3000;

    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(2);
        /*exec.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                client();
                return null;
            }
        });*/
        exec.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                serverSelector();
                return null;
            }
        });

        /*Callable<String> callable = new Callable<String>() {

            @Override
            public String call() throws Exception {

                return "haha";
            }
        };
        try {
            String call = callable.call();
            System.out.println("call = " + call);
        } catch (Exception e) {
            e.printStackTrace();
        }*/

    }

    static void client() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));

            if (socketChannel.finishConnect()) {
                int i = 0;
                while (true) {
                    TimeUnit.SECONDS.sleep(1);
                    String info = "I'm " + i++ + "-th information from client";
                    buffer.clear();
                    buffer.put(info.getBytes());
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        System.out.println(buffer);
                        socketChannel.write(buffer);
                    }
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (socketChannel != null) {
                    socketChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
        SocketChannel sc = ssChannel.accept();
        sc.configureBlocking(false);
        sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
    }

    public static void handleRead(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        long bytesRead = sc.read(buf);
        while (bytesRead > 0) {
            buf.flip();
            while (buf.hasRemaining()) {
                System.out.print((char) buf.get());
            }
            System.out.println();
            buf.clear();
            bytesRead = sc.read(buf);
        }
        if (bytesRead == -1) {
            sc.close();
        }
    }

    public static void handleWrite(SelectionKey key) throws IOException {
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.flip();
        SocketChannel sc = (SocketChannel) key.channel();
        while (buf.hasRemaining()) {
            sc.write(buf);
        }
        buf.compact();
    }

    public static void serverSelector() {
        Selector selector = null;
        ServerSocketChannel ssc = null;
        try {
            selector = Selector.open();
            ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(PORT));
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                if (selector.select(TIMEOUT) == 0) {
                    System.out.println("==");
                    continue;
                }
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    if (key.isAcceptable()) {
                        handleAccept(key);
                    }
                    if (key.isReadable()) {
                        handleRead(key);
                    }
                    if (key.isWritable() && key.isValid()) {
                        handleWrite(key);
                    }
                    if (key.isConnectable()) {
                        System.out.println("isConnectable = true");
                    }
                    iter.remove();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (selector != null) {
                    selector.close();
                }
                if (ssc != null) {
                    ssc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

   總的來講,NIO主要經過通道和緩衝區來操做數據,期間使用Native函數庫直接分配堆外內存(這個堆外內存不在JVM虛擬機定義的內存範圍),而後經過一個存儲於Java堆中的DirectByteBuffer對象做爲這塊內存的引用來對數據進行操做。

  其本質就是阻塞和非阻塞的區別。

  同步時,應用程序會直接參與IO讀寫操做,而且咱們的應用程序會直接阻塞到某一個方法上,直到數據準備就緒;或者採用輪訓的策略實時檢查數據的就緒狀態,若是就緒則獲取數據.
  異步時,則全部的IO讀寫操做交給操做系統,與咱們的應用程序沒有直接關係,咱們程序不須要關係IO讀寫,當操做系統完成了IO讀寫操做時,會給咱們應用程序發送通知,咱們的應用程序直接拿走數據極便可。 

  3.AIO(proactor模型):異步非阻塞式IO,服務器實現模式爲一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理。

  與NIO不一樣,當進行讀寫操做時,只須直接調用API的read或write方法便可。這兩種方法均爲異步的,對於讀操做而言,當有流可讀取時,操做系統會將可讀的流傳入read方法的緩衝區,並通知應用程序;對於寫操做而言,當操做系統將write方法傳遞的流寫入完畢時,操做系統主動通知應用程序。  便可以理解爲,read/write方法都是異步的,完成後會主動調用回調函數。

  Java.nio.channels包下增長了下面四個異步信道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

  總的來講,BIO,NIO,AIO能夠簡述以下:

  BIO是同步並阻塞,服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理,若是這個鏈接不作任何事情會形成沒必要要的線程開銷,固然能夠經過線程池機制改善。

  NIO是同步非阻塞,服務器實現模式爲一個請求一個線程,即客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。

  AIO是異步非阻塞,服務器實現模式爲一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啓動線程進行處理。

 BIO、NIO、AIO適用場景分析:

  • BIO方式適用於鏈接數目比較小且固定的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,是JDK1.4之前的惟一選擇,程序直觀簡單易理解。

  • NIO方式適用於鏈接數目多且鏈接比較短(輕操做)的架構,好比聊天服務器,併發侷限於應用中,編程比較複雜,JDK1.4開始支持。

  • AIO方式使用於鏈接數目多且鏈接比較長(重操做)的架構,好比相冊服務器,充分調用OS參與併發操做,編程比較複雜,JDK7開始支持。

  通常來講,I/O屬於底層操做,須要操做系統支持,併發也須要操做系統的支持,因此性能方面不一樣的操做系統差別會比較明顯。在高性能的I/O設計中,有兩個比較著名的模式Reactor和Proactor模式,其中Reactor模式用於同步I/O,而Proactor運用於異步I/O操做。

  Reactor 和 Proactor 是基於事件驅動,在網絡編程中常常用到這兩種設計模式。

  Reactor,即反應堆。Reactor 的通常工做過程是首先在 Reactor 中註冊(Reactor)感興趣事件,並在註冊時指定某個已定義的回調函數(callback);當客戶端發送請求時,在 Reactor 中會觸發剛纔註冊的事件,並調用對應的處理函數。

  Reactor包含以下角色:

  • Handle 句柄;用來標識socket鏈接或是打開文件;
  • Synchronous Event Demultiplexer:同步事件多路分解器:由操做系統內核實現的一個函數;用於阻塞等待發生在句柄集合上的一個或多個事件;
  • Event Handler:事件處理接口
  • Concrete Event HandlerA:實現應用程序所提供的特定事件處理邏輯;
  • Reactor:反應器,定義一個接口,實現如下功能:
    1)供應用程序註冊和刪除關注的事件句柄;
    2)運行事件循環;
    3)有就緒事件到來時,分發事件到以前註冊的回調函數上處理。

  Proactor藉助操做系統的異步讀寫,在調用的時候能夠傳遞迴調函數或者回送信號,當異步操做完畢,內核會自動調用回調函數或者發送信號,因此很依賴操做系統。

  Proactor模式包含以下角色

  • Handle 句柄;用來標識socket鏈接或是打開文件;
  • Asynchronous Operation Processor:異步操做處理器;負責執行異步操做,通常由操做系統內核實現;
  • Asynchronous Operation:異步操做
  • Completion Event Queue:完成事件隊列;異步操做完成的結果放到隊列中等待後續使用
  • Proactor:主動器;爲應用程序進程提供事件循環;從完成事件隊列中取出異步操做的結果,分發調用相應的後續處理邏輯;
  • Completion Handler:完成事件接口;通常是由回調函數組成的接口;
  • Concrete Completion Handler:完成事件處理邏輯;實現接口定義特定的應用處理邏輯;

  Reactor與Proactor對比:

  (1)以主動寫爲例:
  Reactor將handle放到select(),等待可寫就緒,而後調用write()寫入數據;寫完處理後續邏輯;
  Proactor調用aoi_write後馬上返回,由內核負責寫操做,寫完後調用相應的回調函數處理後續邏輯。

  能夠看出,Reactor被動的等待指示事件的到來並作出反應;它有一個等待的過程,作什麼都要先放入到監聽事件集合中等待handler可用時再進行操做;。
  Proactor直接調用異步讀寫操做,調用完後馬上返回。

  (2)Reactor實現了一個被動的事件分離和分發模型,服務等待請求事件的到來,再經過不受間斷的同步處理事件,從而作出反應。

  Proactor實現了一個主動的事件分離和分發模型;這種設計容許多個任務併發的執行,從而提升吞吐量;並可執行耗時長的任務(各個任務間互不影響)。

  (3)優勢:

  Reactor實現相對簡單,對於耗時短的處理場景處理高效; 操做系統能夠在多個事件源上等待,而且避免了多線程編程相關的性能開銷和編程複雜性; 事件的串行化對應用是透明的,能夠順序的同步執行而不須要加鎖; 在事務分離上,能夠將與應用無關的多路分解和分配機制和與應用相關的回調函數分離開來。

  Proactor性能更高,可以處理耗時長的併發場景。

  (4)缺點:

  Reactor處理耗時長的操做會形成事件分發的阻塞,影響到後續事件的處理。

  Proactor實現邏輯複雜;依賴操做系統對異步的支持,目前實現了純異步操做的操做系統少,實現優秀的如windows IOCP,但因爲其windows系統用於服務器的侷限性,目前應用範圍較小;而Unix/Linux系統對純異步的支持有限,應用事件驅動的主流仍是經過select/epoll來實現。

  (5)適用場景。

  Reactor:同時接收多個服務請求,而且依次同步的處理它們的事件驅動程序;。  Proactor:異步接收和同時處理多個服務請求的事件驅動程序。

相關文章
相關標籤/搜索