java中的AIO

簡介

jdk7中新增了一些與文件(網絡)I/O相關的一些api。這些API被稱爲NIO.2,或稱爲AIO(Asynchronous I/O)。AIO最大的一個特性就是異步能力,這種能力對socket與文件I/O都起做用。AIO實際上是一種在讀寫操做結束以前容許進行其餘操做的I/O處理。AIO是對JDK1.4中提出的同步非阻塞I/O(NIO)的進一步加強。html

關於NIO,以前的一篇文章能夠看看:java中的NIOjava

jdk7主要增長了三個新的異步通道:react

  • AsynchronousFileChannel: 用於文件異步讀寫;linux

  • AsynchronousSocketChannel: 客戶端異步socket;git

  • AsynchronousServerSocketChannel: 服務器異步socket。github

由於AIO的實施需充分調用OS參與,IO須要操做系統支持、併發也一樣須要操做系統的支持,因此性能方面不一樣操做系統差別會比較明顯。編程

前提概念

在具體看AIO以前,咱們須要知道一些必要的前提概念。segmentfault

Unix中的I/O模型

Unix定義了五種I/O模型設計模式

  • 阻塞I/Oapi

  • 非阻塞I/O

  • I/O複用(select、poll、linux 2.6種改進的epoll)

  • 信號驅動IO(SIGIO)

  • 異步I/O(POSIX的aio_系列函數)

一個戲謔的例子:

若是你想吃一份宮保雞丁蓋飯:

  • 同步阻塞:你到飯館點餐,而後在那等着,還要一邊喊:好了沒啊!

  • 同步非阻塞:在飯館點完餐,就去遛狗了。不過溜一下子,就回飯館喊一聲:好了沒啊!

  • 異步阻塞:遛狗的時候,接到飯館電話,說飯作好了,讓您親自去拿。

  • 異步非阻塞:飯館打電話說,咱們知道您的位置,一會給你送過來,安心遛狗就能夠了。

詳情參見文章末尾的他山之石-Unix下五種IO模型。

Reactor與Proactor

  • 兩種IO多路複用方案:Reactor and Proactor。

  • Reactor模式是基於同步I/O的,而Proactor模式是和異步I/O相關的。

  • reactor:能收了你跟俺說一聲。proactor: 你給我收十個字節,收好了跟俺說一聲。

詳情參見文章末尾的他山之石-IO設計模式:Reactor和Proactor對比。

異步的處理

異步無非是通知系統作一件事情。而後忘掉它,本身作其餘事情去了。不少時候系統作完某一件事情後須要一些後續的操做。怎麼辦?這時候就是告訴異步調用如何作後續處理。一般有兩種方式:

  • 未來式: 當你但願主線程發起異步調用,並輪詢等待結果的時候使用未來式;

  • 回調式: 常說的異步回調就是它。

以文件讀取爲例

未來式

未來式異步讀取

未來式用現有的Java.util.concurrent技術聲明一個Future,用來保存異步操做的處理結果。一般用Future get()方法(帶或不帶超時參數)在異步IO操做完成時獲取其結果。

AsynchronousFileChannel會關聯線程池,它的任務是接收IO處理事件,並分發給負責處理通道中IO操做結果的結果處理器。跟通道中發起的IO操做關聯的結果處理器確保是由線程池中的某個線程產生。

未來式例子

Path path = Paths.get("/data/code/github/java_practice/src/main/resources/1log4j.properties");
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(path);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    Future<Integer> future = channel.read(buffer,0);
//        while (!future.isDone()){
//            System.out.println("I'm idle");
//        }
    Integer readNumber = future.get();

    buffer.flip();
    CharBuffer charBuffer = CharBuffer.allocate(1024);
    CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
    decoder.decode(buffer,charBuffer,false);
    charBuffer.flip();
    String data = new String(charBuffer.array(),0, charBuffer.limit());
    System.out.println("read number:" + readNumber);
    System.out.println(data);

回調式

回調式異步讀取

回調式所採用的事件處理技術相似於Swing UI編程採用的機制。基本思想是主線程會派一個偵查員CompletionHandler到獨立的線程中執行IO操做。這個偵查員將帶着IO的操做的結果返回到主線程中,這個結果會觸發它本身的completed或failed方法(要重寫這兩個方法)。在異步IO活動結束後,接口java.nio.channels.CompletionHandler會被調用,其中V是結果類型,A是提供結果的附着對象。此時必須已經有了該接口completed(V,A)和failed(V,A)方法的實現,你的程序才能知道異步IO操做成功或失敗時該如何處理。

回調式例子

Path path = Paths.get("/data/code/github/java_practice/src/main/resources/1log4j.properties");
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(path);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            System.out.println(Thread.currentThread().getName() + " read success!");
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            System.out.println("read error");
        }
    });

    while (true){
        System.out.println(Thread.currentThread().getName() + " sleep");
        Thread.sleep(1000);
    }

異步socket client操做

AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
    channel.connect(new InetSocketAddress("127.0.0.1",8888)).get();
    ByteBuffer buffer = ByteBuffer.wrap("中文,你好".getBytes());
    Future<Integer> future = channel.write(buffer);

    future.get();
    System.out.println("send ok");

異步socket server操做

final AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel
            .open()
            .bind(new InetSocketAddress("0.0.0.0",8888));
    channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
        @Override
        public void completed(final AsynchronousSocketChannel client, Void attachment) {
            channel.accept(null, this);

            ByteBuffer buffer = ByteBuffer.allocate(1024);
            client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result_num, ByteBuffer attachment) {
                    attachment.flip();
                    CharBuffer charBuffer = CharBuffer.allocate(1024);
                    CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
                    decoder.decode(attachment,charBuffer,false);
                    charBuffer.flip();
                    String data = new String(charBuffer.array(),0, charBuffer.limit());
                    System.out.println("read data:" + data);
                    try{
                        client.close();
                    }catch (Exception e){}
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("read error");
                }
            });
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.out.println("accept error");
        }
    });

    while (true){
        Thread.sleep(1000);
    }

他山之石

相關文章
相關標籤/搜索