AIO編程

AIO簡介

咱們知道NIO是同步非阻塞,服務器實現模式爲一個請求一個線程,即客戶端發送的鏈接請求都會註冊到多路複用器上,多路複用器輪詢到鏈接有I/O請求時才啓動一個線程進行處理。
而AIO則是則是異步非阻塞的,而且提供了異步文件通道和異步套接字通道的實現。主要經過兩種方式獲取操做的結果:java

  • 經過Future類來表示異步操做的結果
  • 在執行異步操做的時候傳入一個java.nio.channels
    AIO的異步套接字通道是真正的異步非阻塞I/O,對應於UNIX網絡編程模型中的事件驅動I/O,他不須要經過多路複用器對註冊的通道進行輪詢操做便可實現異步讀寫,從而簡化了NIO編程模型。

AIO建立的TimeServer

public class TimeServer {
    public static void main(String[] args){
        int port = 8080;
        if (args != null && args.length >0){
            try{
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){

            }
        }
        AsyncTimeServerHandler timeServerHandler = new AsyncTimeServerHandler(port);
        new Thread(timeServerHandler,"AIO-AsyncTimeServerHandler-001").start();
    }
}

首先建立異步的時間服務處理器,而後啓動線程將異步時間服務Handler拉起編程

public class AsyncTimeServerHandler implements Runnable{

    private int port;
    CountDownLatch countDownLatch;
    AsynchronousServerSocketChannel channel;

    public AsyncTimeServerHandler(int port) {
        this.port = port;
        try {
            channel = AsynchronousServerSocketChannel.open();
            channel.bind(new InetSocketAddress(port));
        }catch (IOException e){
            e.printStackTrace();
        }

    }

    public void run() {
        countDownLatch = new CountDownLatch(1);
        doAccept();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doAccept(){
        channel.accept(this,new AcceptCompletionHandler());
    }
}

在構造方法中,咱們建立了一個一步的Channel,而後調用bind方法綁定了監聽的端口。
在run方法中咱們初始化了一個CountDownLatch對象,是爲了在完成一組正在執行的操做以前,線程一直阻塞在那兒
在doAccept方法中接收客戶端的鏈接,咱們能夠傳遞一個handler示例接受accept操做成功的通知消息,其代碼以下:服務器

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler>{

    @Override
    public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
        attachment.channel.accept(attachment,this);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        result.read(buffer,buffer,new ReadCompltetionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.countDownLatch.countDown();
    }
}

其中,咱們在complete方法中繼續調用了accept方法,是爲了有新的客戶端接入成功,由於一個AsynchronousServerSocketChannel能夠接受成千上萬個客戶端
而鏈路創建成功之後,服務端能夠接受客戶端的請求消息了,經過read方法進行異步讀操做,其中傳入了一個Handler,接受通知回調業務。其代碼以下網絡

public class ReadCompltetionHandler implements CompletionHandler<Integer,ByteBuffer>{

    private AsynchronousSocketChannel channel;

    public ReadCompltetionHandler(AsynchronousSocketChannel channel) {
        if (channel == null)
            this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try {
            String req = new String(body,"UTF-8");
            String curentTime = "QUERY TIME ORDER".equalsIgnoreCase(req)?new Date(
                    System.currentTimeMillis()
            ).toString():"BAD ORDER";
            doWrite(curentTime);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    private void doWrite(String currentTime){
        if (!StringUtil.isNullOrEmpty(currentTime)){
            byte[] bytes = currentTime.getBytes();
            ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
            buffer.put(bytes);
            buffer.flip();
            channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    if (buffer.hasRemaining()){
                        channel.write(buffer,buffer,this);
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

其中具體的處理邏輯和NIO的TimeServer相同,不作詳細分析了異步

AIO的TimeClient

public class TimeClient {

    public static void main(String[] args){
        int port = 8080;
        if (args != null && args.length >0){
            try{
                port = Integer.parseInt(args[0]);
            }catch (NumberFormatException e){

            }
        }

        AsyncTimeClientHandler timeClientHandler = new AsyncTimeClientHandler("127.0.0.1",port);
        new Thread(timeClientHandler,"AIO-AsyncTimeClientHandler-001").start();
    }
}

在其中咱們經過一個I/O線程建立一步時間服務器客戶端Handler,具體代碼以下:ide

public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>,Runnable{

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch countDownLatch;

    public AsyncTimeClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            client = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(req.length);
        buffer.put(req);
        buffer.flip();
        client.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (attachment.hasRemaining()){
                    client.write(attachment,attachment,this);
                }else{
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            byte[] bytes = new byte[attachment.remaining()];
                            attachment.get(bytes);
                            try {
                                String body = new String(bytes,"UTF-8");
                                System.out.print(body);
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                client.close();
                                countDownLatch.countDown();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    client.close();
                    countDownLatch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
        exc.printStackTrace();
        try {
            client.close();
            countDownLatch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        countDownLatch = new CountDownLatch(1);
        client.connect(new InetSocketAddress(host,port),this,this);
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索