Java IO編程全解(五)——AIO編程

  轉載請註明出處:http://www.cnblogs.com/Joanna-Yan/p/7794151.html html

  前面講到:Java IO編程全解(四)——NIO編程java

  NIO2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。異步通道提供兩種方式獲取操做結果。編程

  • 經過java.util.concurrent.Future類來表示異步操做的結果;
  • 在執行異步操做的時候傳入一個java.nio.channels。

  CompletionHandler接口的實現類做爲操做完成的回調。服務器

  NIO2.0的異步套接字通道是真正的異步非阻塞I/O,它對UNIX網絡編程中的事件驅動I/O(AIO),它不須要經過多路複用器(Selector)對註冊的通道進行輪詢操做便可實現異步讀寫,從而簡化了NIO的編程模型。微信

  下面經過代碼來熟悉NIO2.0 AIO的相關類庫,仍舊以時間服務器爲例程進行講解。網絡

 1.AIO建立的TimeServer源碼分析

package joanna.yan.aio;

public class TimeServer {

    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 採用默認值
            }
        }
        
        AsyncTimeServerHandler timeServer=new AsyncTimeServerHandler(port);
        new Thread(timeServer,"AIO-AsyncTimeServerHandler-001").start();
    }
}
package joanna.yan.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeServerHandler implements Runnable{
    private int port;
    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
    public AsyncTimeServerHandler(int port){
        this.port=port;
        try {
            //建立一個異步的服務端通道AsynchronousServerSocketChannel
            asynchronousServerSocketChannel=AsynchronousServerSocketChannel.open();
            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("The time server is start in port: "+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        /*
         *初始化CountDownLatch對象。 
         *它的做用是,在完成一組正在執行的操做以前,容許當前的線程一直阻塞。
         *在本例中,咱們讓線程在此阻塞,防止服務器執行完成退出。
         *在實際項目應用中,不須要啓動獨立的線程來處理AsynchronousServerSocketChannel,這裏僅僅是個demo演示。
         */
        latch=new CountDownLatch(1);
        doAccept();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 接收客戶端的鏈接。
     * 因爲這裏是異步操做。咱們能夠傳遞一個CompletionHandler<AsynchronousSocketChannel,? super A>類型
     * 的handler實例接收accept操做成功的通知消息
     */
    public void doAccept() {
        asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
    }
}
package joanna.yan.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
 * 接收accept操做成功的通知消息
 * @author Administrator
 *
 */
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler>{

    @Override
    public void completed(AsynchronousSocketChannel result,
            AsyncTimeServerHandler attachment) {
        /*
         * 疑惑:既然已經接收客戶端成功了,爲何還要再次調用accept方法呢?
         * 緣由:當咱們調用AsynchronousServerSocketChannel的accept方法後,若是有新的客戶端鏈接接入,
         * 系統將回調咱們傳入的CompletionHandler實例的completed方法,表示新的客戶端已經接入成功,
         * 由於一個AsynchronousServerSocketChannel能夠接收成千上萬個客戶端,因此咱們須要繼續調用它的accep方法,
         * 接收其餘的客戶端鏈接,最終造成一個循環。
         * 每當接收一個客戶讀鏈接成功以後,再異步接收新的客戶端鏈接。
         */
        attachment.asynchronousServerSocketChannel.accept(attachment, this);
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        /*
         * 參數一:接收緩衝區,用於從異步Channel中讀取數據包;
         * 參數二:異步Channel攜帶的附件,通知回調的時候做爲入參使用;
         * 參數三:接收通知回調的業務handler
         */
        result.read(buffer, buffer, new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.latch.countDown();
    } 
}
package joanna.yan.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
/**
 * 主要用於讀取半包消息和發送應答。
 * 本例不對半包讀寫進行具體說明,在後面的Netty半包處理中會介紹。
 * @author Administrator
 *
 */
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{
    
    private AsynchronousSocketChannel channel;
    
    public ReadCompletionHandler(AsynchronousSocketChannel channel){
        if(this.channel==null){
            this.channel=channel;
        }
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        //爲後續從緩衝區讀取數據作準備
        attachment.flip();
        byte[] body=new byte[attachment.remaining()];
        attachment.get(body);
        
        try {
            String req=new String(body, "UTF-8");
            System.out.println("The time server receive order : "+req);
            String currentTime="QUERY TIME ORDER".equalsIgnoreCase(req) ? 
                    new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
            doWrite(currentTime);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 發生異常的時候調用。
     * 對異常Throwable進行判斷,若是是I/O異常,就關閉鏈路,釋放資源;
     * 若是是其它異常,按照業務本身的邏輯進行處理。
     * 本例做爲簡單demo,沒有對異常進行分類判斷,只要發生了讀寫異常,就關閉鏈路,釋放資源。
     */
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private void doWrite(String currentTime) {
        if(currentTime!=null&&currentTime.trim().length()>0){
            byte[] bytes=currentTime.getBytes();
            ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            
            channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {

                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    //若是沒有發送完成,繼續發送
                    if(buffer.hasRemaining()){
                        channel.write(buffer, buffer, this);
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        //ingnore on close
                    }
                }
            });
        }
    }
}

2.AIO建立的TimeClient源碼分析

package joanna.yan.aio;

public class TimeClient {
    public static void main(String[] args) {
        int port=9090;
        if(args!=null&&args.length>0){
            try {
                port=Integer.valueOf(args[0]);
            } catch (Exception e) {
                // 採用默認值
            }
        }
        
        /*
         * 經過一個獨立的I/O線程建立異步時間服務器客戶端handler。
         * 在實際項目中,咱們不須要獨立的線程建立異步鏈接對象,由於底層都是經過JDK的系統回調實現的。
         */
        new Thread(new AsyncTimeClientHandler("127.0.0.1", port),"AIO-AsyncTimeClientHandle-001").start();
    
        /*
         * 須要指出的是,正如以前的NIO例程,咱們並無完整的處理網絡的半包讀寫,在對例程進行功能測試的是尚未問題,
         * 可是,若是對代碼稍加改造,進行壓力或者性能測試,就會發現輸出結果存在問題。
         * 這裏只集中將NIO的入門知識,後面會詳細講到半包讀寫
         */
    }
}
package joanna.yan.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>,Runnable{
    
    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;
    
    public AsyncTimeClientHandler(String host,int port){
        this.host=host;
        this.port=port;
        try {
            client=AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
        byte[] req="QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {//用於寫操做完成後的回調

            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if(buffer.hasRemaining()){
                    client.write(buffer, buffer, this);
                }else{
                    ByteBuffer readBuffer=ByteBuffer.allocate(1024);
                    client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){//當讀取完成被JDK回調時,構造應答消息。

                        @Override
                        public void completed(Integer result,
                                ByteBuffer buffer) {
                            buffer.flip();
                            byte[] bytes=new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String body;
                            try {
                                body=new String(bytes, "UTF-8");
                                System.out.println("Now is : "+body);
                                latch.countDown();
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                client.close();
                                //讓AsyncTimeClientHandler線程執行完畢,客戶端退出執行
                                latch.countDown();
                            } catch (IOException e) {
                                //ingnore on close
                            }
                        }   
                    });
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    client.close();
                    latch.countDown();
                } catch (IOException e) {
                    //ingnore on close
                }
            }
        });
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
        exc.printStackTrace();
        try {
            client.close();
            latch.countDown();
        } catch (IOException e) {
            //ingnore on close
        }
    }

    @Override
    public void run() {
        //建立CountDownLatch進行等待,防止異步操做沒有執行完成線程就退出。
        latch=new CountDownLatch(1);
        /*
         * 參數二:AsynchronousSocketChannel的附件,用於回調通知時做爲入參被傳遞,調用者能夠自定義
         * 參數三:異步參數回調通知接口,由調用者實現。
         */
        client.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  異步SocketChannel是被動執行對象,咱們不須要像NIO編程那樣建立一個獨立I/O線程來處理讀寫操做。對於AsynchronousServerSocketChannel和 AsynchronousSocketChannel,它們都由JDK底層的線程池負責回調並驅動讀寫操做。正由於如此,基於NIO2.0新的異步非阻塞Channel進行編程比NIO編程更爲簡單。異步

  後面,咱們將對前面講到的4種I/O進行概念澄清和比較,讓你們從總體上掌握這些I/O模型的差別。以便在將來的工做中可以根據產品的實際狀況選擇合適的I/O模型。async

 Java IO編程全解(六)——4種I/O的對比與選型ide

若是此文對您有幫助,微信打賞我一下吧~源碼分析

相關文章
相關標籤/搜索