我所理解的AIO

AIO也叫NIO2.0或者NIO.2,隨JDK1.7發佈。NIO的API比較麻煩,易錯,開發效率低。AIO經過回調函數的方式來表示異步通訊,API相對簡單一些。java

AIO在Windows系統中底層使用IOCP這樣系統級的支持,比NIO的性能要好。但Java的服務端程序不多將Windows系統做爲生產服務器。而在Linux系統上(內核2.6以上),AIO的底層使用的依然是epoll技術,與NIO同樣,只不過封裝成異步IO的樣子,簡化了API而已服務器

接下來經過一個客戶端與服務端通訊的例子,來學習使用AIO。客戶端每隔1秒向服務端發送請求,服務端響應並返回數據。能夠與上一篇的NIO對比學習。異步

服務端:ide

package cn.testAio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/** 
 * @Description : TODO
 * @Author : houshuiqiang@163.com, 2017年10月3日 下午1:38:39
 * @Modified :houshuiqiang@163.com, 2017年10月3日
 */
public class AioDemoServer {

    public static void main(String[] args) {
        AioServer aioServer = new AioServer(8181);
        new Thread(aioServer, "aio-server-test").start();
    }
}

class AioServer implements Runnable {

    private AsynchronousServerSocketChannel assChannel;
    private CountDownLatch cdl;
    
    public AioServer (int port) {
        try {
            assChannel = AsynchronousServerSocketChannel.open();
            assChannel.bind(new InetSocketAddress(port));
            cdl = new CountDownLatch(1);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    
    
    @Override
    public void run() {
        
        assChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel,AioServer>(){ // 註冊接受接連的回調函數

            @Override
            public void completed(AsynchronousSocketChannel result, AioServer attachment) { // 若是鏈接成功
                assChannel.accept(attachment, this); // 等待下一個連接
                
                ByteBuffer buffer = ByteBuffer.allocate(1024); // 1M的空間用來讀取數據,實際可能會>1M,須要屢次註冊讀的回調函數
                result.read(buffer, buffer, new ReadsCompletionHandler(result)); // 註冊讀取完成以後的回調函數
            }

            @Override
            public void failed(Throwable exc, AioServer attachment) { // 若是鏈接失敗
                exc.printStackTrace();
                attachment.cdl.countDown();
            }
            
        });
        
        try {
            cdl.await(); // 阻塞該線程,使Server保持運行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class ReadsCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{

    private AsynchronousSocketChannel channel;
    
    public ReadsCompletionHandler(AsynchronousSocketChannel channel){
        this.channel = channel;
    }
    
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        
        try {
            String body = getBody(attachment); // 獲取請求內容
            String resultBody = handlerBody(channel, body); // 模擬處理請求,獲得返回結果
            write2Client(channel, resultBody); // 將結果返回給客戶端
        } catch (IOException e) {
            e.printStackTrace();
            // ignore
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private String getBody(ByteBuffer byteBuffer){
        byteBuffer.flip();
        byte[] body = new byte[byteBuffer.remaining()];
        byteBuffer.get(body);
        return new String(body);
    }
    
    private String handlerBody(AsynchronousSocketChannel channel, String body) throws IOException{
        String remoteAddress = channel.getRemoteAddress().toString();
        
        System.out.println("message from client : " + remoteAddress + ", content: " + body); // 模擬請求處理
        
        return "server received message: " + body; // 模擬返回處理結果
    }
    
    private void write2Client(AsynchronousSocketChannel channel, String resultBody){
        byte[] bytes = resultBody.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        byteBuffer.flip();
        channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>(){ // 註冊寫完以後的回調函數

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (attachment.hasRemaining()) {
                    channel.write(attachment, attachment, this);
                }else{
                    // 寫完以後註冊讀,等待客戶端再次請求
                    ByteBuffer buffer = ByteBuffer.allocate(1024); // 1M的空間用來讀取數據,實際可能會>1M
                    channel.read(buffer, buffer, new ReadsCompletionHandler(channel)); // 註冊讀取完成以後的回調函數
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (IOException e) {
                    // ingnore on close
                }
                
            }
            
        });
        
    }
}

客戶端:函數

package cn.testAio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/** 
 * @Description : TODO
 * @Author : houshuiqiang@163.com, 2017年10月3日 下午3:16:10
 * @Modified :houshuiqiang@163.com, 2017年10月3日
 */
public class AioDemoClient {
    public static void main(String[] args) throws InterruptedException {
        AioClient aioClient = new AioClient("192.168.10.47", 8181);
        Thread aioClientThread = new Thread(aioClient, "aio-client-test");
        aioClientThread.start();
        
        for (int i = 0; i < 5; i++) {
            aioClient.getQueue().offer("time" + i);
            Thread.sleep(1000);
        }
        
        
        aioClient.stop();
        aioClient = null;
    }
}

class AioClient implements Runnable{
    
    private int port;
    
    private String address;
    
    private AsynchronousSocketChannel channel;
    
    private LinkedBlockingQueue<String> queue;
    
    public AioClient(String address, int port){
        this.address = address;
        this.port = port;
        queue = new LinkedBlockingQueue<String>();
        try {
            channel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
    
    public void stop(){
        if (channel.isOpen()) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    @Override
    public void run() {
       
        channel.connect(new InetSocketAddress(address, port), this, new CompletionHandler<Void, AioClient>(){

            @Override
            public void completed(Void result, AioClient attachment) {
                try {
                    sendRequest();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
            }

            @Override
            public void failed(Throwable exc, AioClient attachment) {
                exc.printStackTrace();
            }
            
        });
        
    }
    
    public BlockingQueue<String> getQueue(){
        return queue;
    }
    
    private void sendRequest() throws InterruptedException{
        
        String requestBody = queue.take(); // 從隊列中阻塞獲取數據
        
        byte[] bytes = requestBody.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        byteBuffer.flip();
        
        channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>(){ // 註冊寫完以後的回調函數

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (attachment.hasRemaining()) { // 若是buffer尚未寫完
                    channel.write(attachment, attachment, this);
                } else {
                    readResult(); // 寫完以後註冊讀的回調函數,等待服務端返回數據
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            
        });
    }
    
    private void readResult(){
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                attachment.flip();
                byte[] bytes = new byte[attachment.remaining()];
                attachment.get(bytes);
                String body = new String(bytes);
                System.out.println("The msg from Server is :" + body);
                try {
                    sendRequest(); // 讀完以後,註冊寫的回調函數
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                
            }
        });
    }
}
相關文章
相關標籤/搜索