【Java】Java AIO使用

歡迎關注公衆號: nullobject
文章首發在我的博客 https://www.nullobject.cn,公衆號 nullobject同步更新。
這篇文章主要介紹Java AIO網絡編程。

1. AIO是什麼

本文所說的AIO特指Java環境下的AIOAIOjavaIO模型的一種,做爲NIO的改進和加強隨JDK1.7版本更新被集成在JDKnio包中,所以AIO也被稱做是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4以前就存在於JDK中,NIOJDK1.4版本發佈更新)的阻塞式讀寫,AIO提供了從創建鏈接到讀、寫的全異步操做。AIO可用於異步的文件讀寫網絡通訊。本文將介紹如何使用AIO實現一個簡單的網絡通訊以及AIO的一些比較關鍵的API。java

2. 簡單的使用

首先以Server端爲例,須要建立一個AsynchronousServerSocketChannel示例並綁定監聽端口,接着開始監聽客戶端鏈接:編程

public class SimpleAIOServer {

    public static void main(String[] args) {
        try {
            final int port = 5555;
            //首先打開一個ServerSocket通道並獲取AsynchronousServerSocketChannel實例:
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            //綁定須要監聽的端口到serverSocketChannel:  
            serverSocketChannel.bind(new InetSocketAddress(port));
            //實現一個CompletionHandler回調接口handler,
            //以後須要在handler的實現中處理鏈接請求和監聽下一個鏈接、數據收發,以及通訊異常。
            CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel,
                    Object>() {
                @Override
                public void completed(final AsynchronousSocketChannel result, final Object attachment) {
                    // 繼續監聽下一個鏈接請求  
                    serverSocketChannel.accept(attachment, this);
                    try {
                        System.out.println("接受了一個鏈接:" + result.getRemoteAddress()
                                                              .toString());
                        // 給客戶端發送數據並等待發送完成
                        result.write(ByteBuffer.wrap("From Server:Hello i am server".getBytes()))
                              .get();
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        // 阻塞等待客戶端接收數據
                        result.read(readBuffer)
                              .get();
                        System.out.println(new String(readBuffer.array()));

                    } catch (IOException | InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Object attachment) {
                    System.out.println("出錯了:" + exc.getMessage());
                }
            };
            serverSocketChannel.accept(null, handler);
            // 因爲serverSocketChannel.accept(null, handler);是一個異步方法,調用會直接返回,
            // 爲了讓子線程可以有時間處理監聽客戶端的鏈接會話,
            // 這裏經過讓主線程休眠一段時間(固然實際開發通常不會這麼作)以確保應用程序不會當即退出。
            TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

其中result即表示當前接受的客戶端的鏈接會話,與客戶端的通訊都須要經過該鏈接會話進行。安全

Client端服務器

public class SimpleAIOClient {

    public static void main(String[] args) {
        try {
            // 打開一個SocketChannel通道並獲取AsynchronousSocketChannel實例
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            // 鏈接到服務器並處理鏈接結果
            client.connect(new InetSocketAddress("127.0.0.1", 5555), null, new CompletionHandler<Void, Void>() {
                @Override
                public void completed(final Void result, final Void attachment) {
                    System.out.println("成功鏈接到服務器!");
                    try {
                        // 給服務器發送信息並等待發送完成
                        client.write(ByteBuffer.wrap("From client:Hello i am client".getBytes()))
                              .get();
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        // 阻塞等待接收服務端數據
                        client.read(readBuffer)
                              .get();
                        System.out.println(new String(readBuffer.array()));
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Void attachment) {
                    exc.printStackTrace();
                }
            });
            TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. AIO主要API詳解

從第2節例子能夠看到,實現一個最簡單的AIO socket通訊serverclient,主要須要這些相關的類和接口:網絡

  • AsynchronousServerSocketChannel

    服務端Socket通道類,負責服務端Socket的建立和監聽;異步

  • AsynchronousSocketChannel

    客戶端Socket通道類,負責客戶端消息讀寫;socket

  • CompletionHandler<A,V>

    消息處理回調接口,是一個負責消費異步IO操做結果的消息處理器;ide

  • ByteBuffer

    負責承載通訊過程當中須要讀、寫的消息。this

此外,還有可選的用於異步通道資源共享的AsynchronousChannelGroup類,接下來將一一介紹這些類的主要接口及使用。線程

3.1.1 AsynchronousServerSocketChannel

AsynchronousServerSocketChannel是一個 流式監聽套接字的異步通道。

AsynchronousServerSocketChannel的使用須要通過三個步驟:建立/打開通道綁定地址和端口監聽客戶端鏈接請求

1、建立/打開通道:簡單地,能夠經過調用AsynchronousServerSocketChannel的靜態方法open()來建立AsynchronousServerSocketChannel實例:

try {
  AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
} catch (IOException e) {
  e.printStackTrace();
}

當打開通道失敗時,會拋出一個IOException異常。AsynchronousServerSocketChannel提供了設置通道分組(AsynchronousChannelGroup)的功能,以實現組內通道資源共享。能夠調用open(AsynchronousChannelGroup)重載方法建立指定分組的通道:

try {
  ExecutorService pool = Executors.newCachedThreadPool();
  AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
  AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
  e.printStackTrace();
}

AsynchronousChannelGroup封裝了處理由綁定到組的異步通道所觸發的I/O操做完成所需的機制。每一個AsynchronousChannelGroup關聯了一個被用於提交處理I/O事件分發消費在組內通道上執行的異步操做結果的completion-handlers的線程池。除了處理I/O事件,該線程池還有可能處理其餘一些用於支持完成異步I/O操做的任務。從上面例子能夠看到,經過指定AsynchronousChannelGroup的方式打開AsynchronousServerSocketChannel,能夠定製server channel執行的線程池。有關AsynchronousChannelGroup的詳細介紹能夠查看官方文檔註釋。若是不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個默認的分組中。

2、綁定地址和端口:經過調用AsynchronousServerSocketChannel.bind(SocketAddress)方法來綁定監聽地址和端口:

// 構建一個InetSocketAddress實例以指定監聽的地址和端口,若是須要指定ip,則調用InetSocketAddress(ip,port)構造方法建立便可
serverSocketChannel.bind(new InetSocketAddress(port));

3、監聽和接收客戶端鏈接請求:

監聽客戶端鏈接請求,主要經過調用AsynchronousServerSocketChannel.accept()方法完成。accept()有兩個重載方法:

public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>);
public abstract Future<AsynchronousSocketChannel> accept();

這兩個重載方法的行爲方式徹底相同,事實上,AIO的不少異步API都封裝了諸如此類的重載方法:提供CompletionHandle回調參數或者返回一個Future<T>類型變量。用過Feture接口的都知道,能夠調用Feture.get()方法阻塞等待調用結果。以第一個重載方法爲例,當接受一個新的客戶端鏈接,或者accept操做發生異常時,會經過CompletionHandler將結果返回給用戶處理:

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // 接收到新的客戶端鏈接時回調
            // result即和該客戶端的鏈接會話
            // 此時能夠經過result與客戶端進行交互
          }

          @Override
          public void failed(final Throwable exc, final AsynchronousServerSocketChannel attachment) {
            // accept失敗時回調
          }
        });

須要注意的是,AsynchronousServerSocketChannel是線程安全的,但在任什麼時候候同一時間內只能容許有一個accept操做。所以,必須得等待前一個accept操做完成以後才能啓動下一個accept

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // 接收到新的客戶端鏈接,此時本次accept已經完成
            // 繼續監聽下一個客戶端鏈接到來
            serverSocketChannel.accept(serverSocketChannel,this);
            // result即和該客戶端的鏈接會話
            // 此時能夠經過result與客戶端進行交互
          }
          ...
        });

此外,還能夠經過如下方法獲取和設置AsynchronousServerSocketChannelsocket選項:

// 設置socket選項
serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設置
boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

其中StandardSocketOptions類封裝了經常使用的socket設置選項。

獲取本地地址:

InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();

3.1.2 AsynchronousSocketChannel

AsynchronousSocketChannel是一個 流式鏈接套接字的異步通道。

AsynchronousSocketChannel表示服務端與客戶端之間的鏈接通道。客戶端能夠經過調用AsynchronousSocketChannel靜態方法open()建立,而服務端則經過調用AsynchronousServerSocketChannel.accept()方法後由AIO內部在合適的時候建立。下面以客戶端實現爲例,介紹AsynchronousSocketChannel

1、建立AsynchronousSocketChannel並鏈接到服務端:須要經過open()建立和打開一個AsynchronousSocketChannel實例,再調用其connect()方法鏈接到服務端,接着才能夠與服務端交互:

// 打開一個socket通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞等待鏈接成功
socketChannel.connect(new InetSocketAddress(ip,port)).get();
// 鏈接成功,接下來能夠進行read、write操做

AsynchronousServerSocketChannelAsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用於指定通道分組和定製線程池。socketChannel.connect()也提供了CompletionHandler回調和Future返回值兩個重載方法,上面例子使用帶Future返回值的重載,並調用get()方法阻塞等待鏈接創建完成。

2、發送消息:

能夠構建一個ByteBuffer對象並調用socketChannel.write(ByteBuffer)方法異步發送消息,並經過CompletionHandler回調接收處理髮送結果:

ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // 發送完成,result:總共寫入的字節數
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // 發送失敗
  }
});

3、讀取消息:

構建一個指定接收長度的ByteBuffer用於接收數據,調用socketChannel.read()方法讀取消息並經過CompletionHandler處理讀取結果:

ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // 讀取完成,result:實際讀取的字節數。若是通道中沒有數據可讀則result=-1。
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // 讀取失敗
  }
});

此外AsynchronousSocketChannel也封裝了設置/獲取socket選項的方法:

// 設置socket選項
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設置
boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

3.1.3 CompletionHandler

CompletionHandler是一個用於 消費異步I/O操做結果的處理器。

AIO中定義的異步通道容許指定一個CompletionHandler處理器消費一個異步操做的結果。從上文中也能夠看到,AIO中大部分的異步I/O操做接口都封裝了一個帶CompletionHandler類型參數的重載方法,使用CompletionHandler能夠很方便地處理AIO中的異步I/O操做結果。CompletionHandler是一個具備兩個泛型類型參數的接口,聲明瞭兩個接口方法:

public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

其中,泛型V表示I/O操做的結果類型,經過該類型參數消費I/O操做的結果;泛型A爲附加到I/O操做中的對象類型,能夠經過該類型參數將須要的變量傳入到CompletionHandler實現中使用。所以,AIO中大部分的異步I/O操做都有一個相似這樣的重載方法:

<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);

例如,AsynchronousServerSocketChannel.accept()方法:

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

AsynchronousSocketChannel.write()方法等:

public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)

當I/O操做成功完成時,會回調到completed方法,failed方法則在I/O操做失敗時被回調。須要注意的是:CompletionHandler的實現中應立即使處理操做結果,以免一直佔用調用線程而不能分發其餘的CompletionHandler處理器。

4 The End :)

相關文章
相關標籤/搜索