BIO與AIO模型在JDK實現,Netty序章

BIO編程

回顧下Linux下阻塞IO模型:linux

clipboard.png

再看看Java的BIO編程模型:編程

clipboard.png

/**
 * 類說明:客戶端
 */
public class BioClient {

    public static void main(String[] args) throws InterruptedException,
            IOException {
        //經過構造函數建立Socket,而且鏈接指定地址和端口的服務端
        Socket socket =  new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT);
        System.out.println("請輸入請求消息:");
        //啓動讀取服務端輸出數據的線程
        new ReadMsg(socket).start();
        PrintWriter pw = null;
        //容許客戶端在控制檯輸入數據,而後送往服務器
        while(true){
            pw = new PrintWriter(socket.getOutputStream());
            pw.println(new Scanner(System.in).next());
            pw.flush();
        }
    }

    //讀取服務端輸出數據的線程
    private static class ReadMsg extends Thread {
        Socket socket;

        public ReadMsg(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            //負責socket讀寫的輸入流
            try (BufferedReader br = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()))){
                String line = null;
                //經過輸入流讀取服務端傳輸的數據
                //若是已經讀到輸入流尾部,返回null,退出循環
                //若是獲得非空值,就將結果進行業務處理
                while((line=br.readLine())!=null){
                    System.out.printf("%s\n",line);
                }
            } catch (SocketException e) {
                System.out.printf("%s\n", "服務器斷開了你的鏈接");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                clear();
            }
        }
        //必要的資源清理工做
        private void clear() {
            if (socket != null)
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 類說明:bio的服務端主程序
 */
public class BioServer {
    //服務器端必須
    private static ServerSocket server;
    //線程池,處理每一個客戶端的請求
    private static ExecutorService executorService
            = Executors.newFixedThreadPool(5);

    private static void start() throws IOException{

        try{
            //經過構造函數建立ServerSocket
            //若是端口合法且空閒,服務端就監聽成功
            server = new ServerSocket(DEFAULT_PORT);
            System.out.println("服務器已啓動,端口號:" + DEFAULT_PORT);
            while(true){

                Socket socket= server.accept();
                System.out.println("有新的客戶端鏈接----" );
                //當有新的客戶端接入時,打包成一個任務,投入線程池
                executorService.execute(new BioServerHandler(socket));
            }
        }finally{
            if(server!=null){
                server.close();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        start();
    }

}
/**
 * 類說明:
 */
public class BioServerHandler implements Runnable{
    private Socket socket;
    public BioServerHandler(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        try(//負責socket讀寫的輸出、輸入流
            BufferedReader in = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream(),
                    true)){
            String message;
            String result;
            //經過輸入流讀取客戶端傳輸的數據
            //若是已經讀到輸入流尾部,返回null,退出循環
            //若是獲得非空值,就將結果進行業務處理
            while((message = in.readLine())!=null){
                System.out.println("Server accept message:"+message);
                result = response(message);
                //將業務結果經過輸出流返回給客戶端
                out.println(result);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }

}

過程:windows

  1. 服務端提供IP和監聽端口
  2. 客戶端經過鏈接操做想服務端監聽的地址發起鏈接請求,經過三次握手鍊接
  3. 若是鏈接成功創建,雙方就能夠經過套接字進行通訊

最先的時候服務器端是針對一個鏈接新建一個線程來處理演變成服務端針對每一個客戶端鏈接把請求丟進線程池來處理任務
缺點:若高併發場景且處理時間稍長則許多請求會阻塞一直等待,嚴重影響性能.數組

AIO

先回顧下Linux下AIO模型:服務器

clipboard.png

原生JDK網絡編程AIO:網絡

clipboard.png

異步IO採用「訂閱-通知」模式:即應用程序向操做系統註冊IO監聽,而後繼續作本身的事情。當操做系統發生IO事件,而且準備好數據後,在主動通知應用程序,觸發相應的函數。
注意:異步IO裏面客戶端和服務端均採用這種「訂閱-通知」模式.併發

AIO編程幾個核心類:
:AsynchronousServerSocketChannel:相似BIO裏面的ServerSocket異步


②:AsynchronousSocketChannel :相似BIO裏面的socket用來通訊,有三個方法:connect():用於鏈接到指定端口,指定IP地址的服務器,read()、write():完成讀寫
注意點:socket

  • 1.這三個方法會執行就至關於上面圖解裏面的Subscrible函數向操做系統監聽線程。
  • 2.這幾個方法裏面有個參數,好比write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A>handler)的attachment,是附加到IO操做裏面的對象.

    Channel可看作JDK對IO的抽象,除了網絡通道,還有文件通道FileChannel。ide


③:CompletionHandler:源碼註釋是異步IO操做中用來處理消費的結果,其實也就是結果回調函數,鏈接丶讀寫都是異步操做都須要實現此接口。

而CompletionHandler接口中定義了兩個方法,

  1. completed(V result , A attachment):當IO完成時觸發該方法,該方法的第一個參數表明IO操做返回的對象,第二個參數表明發起IO操做時傳入的附加參數。
  2. faild(Throwable exc, A attachment):當IO失敗時觸發該方法,第一個參數表明IO操做失敗引起的異常或錯誤。

先上代碼
客戶端:

/**
 * 類說明:aio的客戶端主程序
 */
public class AioClient {

    //IO通訊處理器
    private static AioClientHandler clientHandle;

    public static void start(){
        if(clientHandle!=null)
            return;
        clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT);
        //負責網絡通信的線程
        new Thread(clientHandle,"Client").start();
    }
    //向服務器發送消息
    public static boolean sendMsg(String msg) throws Exception{
        if(msg.equals("q")) return false;
        clientHandle.sendMessag(msg);
        return true;
    }

    public static void main(String[] args) throws Exception{
        AioClient.start();
        System.out.println("請輸入請求消息:");
        Scanner scanner = new Scanner(System.in);
        while(AioClient.sendMsg(scanner.nextLine()));
    }

}
/**
 * 類說明:IO通訊處理器,負責鏈接服務器,對外暴露對服務端發送數據的API
 */
public class AioClientHandler
        implements CompletionHandler<Void,AioClientHandler>,Runnable {

    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;

    private CountDownLatch latch;//防止線程退出

    public AioClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            //建立一個實際異步的客戶端通道
            clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        //建立CountDownLatch,由於是異步調用,下面的connect不會阻塞,
        // 那麼整個run方法會迅速結束,那麼負責網絡通信的線程也會迅速結束
        latch = new CountDownLatch(1);
        //發起異步鏈接操做,回調參數就是這個實例自己,
        // 若是鏈接成功會回調這個實例的completed方法
        clientChannel.connect(new InetSocketAddress(host,port),
                        null,this);
        try {

            latch.await();
            clientChannel.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //鏈接成功,這個方法會被系統調用
    @Override
    public void completed(Void result, AioClientHandler attachment) {
        System.out.println("已經鏈接到服務端。");
    }

    //鏈接失敗,這個方法會被系統調用
    @Override
    public void failed(Throwable exc, AioClientHandler attachment) {
        System.err.println("鏈接失敗。");
        exc.printStackTrace();
        latch.countDown();
        try {
            clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //對外暴露對服務端發送數據的API
    public void sendMessag(String msg){
        /*爲了把msg變成能夠在網絡傳輸的格式*/
        byte[] bytes = msg.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();

        /*進行異步寫,一樣的這個方法會迅速返回,
         須要提供一個接口讓系統在一次網絡寫操做完成後通知咱們的應用程序。
        因此咱們傳入一個實現了CompletionHandler的AioClientWriteHandler
        第1個writeBuffer,表示咱們要發送給服務器的數據;
        第2個writeBuffer,考慮到網絡寫有可能沒法一次性將數據寫完,須要進行屢次網絡寫,
        因此將writeBuffer做爲附件傳遞給AioClientWriteHandler。
        */
        clientChannel.write(writeBuffer,writeBuffer,
                new AioClientWriteHandler(clientChannel,latch));

    }
}


 /**
     * 類說明:網絡寫的處理器,CompletionHandler<Integer, ByteBuffer>中
     * Integer:本次網絡寫操做完成實際寫入的字節數,
     * ByteBuffer:寫操做的附件,存儲了寫操做須要寫入的數據
     */
    public class AioClientWriteHandler
            implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel clientChannel;
        private CountDownLatch latch;
        public AioClientWriteHandler(AsynchronousSocketChannel clientChannel,
                                     CountDownLatch latch) {
            this.clientChannel = clientChannel;
            this.latch = latch;
        }
        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            //有可能沒法一次性將數據寫完,須要檢查緩衝區中是否還有數據須要繼續進行網絡寫
            if(buffer.hasRemaining()){
                clientChannel.write(buffer,buffer,this);
            }else{
                //寫操做已經完成,爲讀取服務端傳回的數據創建緩衝區
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                /*這個方法會迅速返回,須要提供一個接口讓
                系統在讀操做完成後通知咱們的應用程序。*/
                clientChannel.read(readBuffer,readBuffer,
                        new AioClientReadHandler(clientChannel,latch));
            }
        }
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            System.err.println("數據發送失敗...");
            try {
                clientChannel.close();
                latch.countDown();
            } catch (IOException e) {
            }
        }
    
    }
/**
 * 類說明:網絡讀的處理器
 * CompletionHandler<Integer, ByteBuffer>中
 * Integer:本次網絡讀操做實際讀取的字節數,
 * ByteBuffer:讀操做的附件,存儲了讀操做讀到的數據 *
 */
public class AioClientReadHandler
        implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public AioClientReadHandler(AsynchronousSocketChannel clientChannel,
                                CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }

    @Override
    public void completed(Integer result,ByteBuffer buffer) {
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        String msg;
        try {
            msg = new String(bytes,"UTF-8");
            System.out.println("accept message:"+msg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc,ByteBuffer attachment) {
        System.err.println("數據讀取失敗...");
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
        }
    }

}

服務端

/**
 * 類說明:服務器主程序
 */
public class AioServer {
    private static AioServerHandler serverHandle;
    //統計客戶端個數
    public volatile static long clientCount = 0;

    public static void start(){
        if(serverHandle!=null)
            return;
        serverHandle = new AioServerHandler(DEFAULT_PORT);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args){
        AioServer.start();
    }
}
/**
 * 類說明:處理用戶鏈接的處理器
 */
public class AioAcceptHandler
        implements CompletionHandler<AsynchronousSocketChannel,
        AioServerHandler> {
    @Override
    public void completed(AsynchronousSocketChannel channel,
                          AioServerHandler serverHandler) {
        AioServer.clientCount++;
        System.out.println("鏈接的客戶端數:" + AioServer.clientCount);
        //從新註冊監聽,讓別的客戶端也能夠鏈接
        serverHandler.channel.accept(serverHandler,this);
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        //1)ByteBuffer dst:接收緩衝區,用於從異步Channel中讀取數據包;
        //2)  A attachment:異步Channel攜帶的附件,通知回調的時候做爲入參使用;
        //3)  CompletionHandler<Integer,? super A>:系統回調的業務handler,進行讀操做
        channel.read(readBuffer,readBuffer,
                new AioReadHandler(channel));

    }

    @Override
    public void failed(Throwable exc, AioServerHandler serverHandler) {
        exc.printStackTrace();
        serverHandler.latch.countDown();
    }
}

/**
 * 類說明:讀數據的處理器
 */
public class AioReadHandler
        implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel channel;

    public AioReadHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }

    //讀取到消息後的處理
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        //若是條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就能夠了
        if(result == -1) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return;
        }
        //flip操做
        attachment.flip();
        byte[] message = new byte[attachment.remaining()];
        attachment.get(message);
        try {
            System.out.println(result);
            String msg = new String(message,"UTF-8");
            System.out.println("server accept message:"+msg);
            String responseStr = response(msg);
            //向客戶端發送消息
            doWrite(responseStr);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //發送消息
    private void doWrite(String result) {
        byte[] bytes = result.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        //異步寫數據
        channel.write(writeBuffer, writeBuffer,
                new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if(attachment.hasRemaining()){
                    channel.write(attachment,attachment,this);
                }else{
                    //讀取客戶端傳回的數據
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    //異步讀數據
                    channel.read(readBuffer,readBuffer,
                            new AioReadHandler(channel));
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 類說明:響應網絡操做的處理器
 */
public class AioServerHandler implements Runnable {

    public CountDownLatch latch;
    /*進行異步通訊的通道*/
    public AsynchronousServerSocketChannel channel;

    public AioServerHandler(int port) {
        try {
            //建立服務端通道
            channel = AsynchronousServerSocketChannel.open();
            //綁定端口
            channel.bind(new InetSocketAddress(port));
            System.out.println("Server is start,port:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {

        latch = new CountDownLatch(1);
        //用於接收客戶端的鏈接,異步操做,
        // 須要實現了CompletionHandler接口的處理器處理和客戶端的鏈接操做
        channel.accept(this,new AioAcceptHandler());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

疑難點1:

clipboard.png
怎麼理解這裏客戶端寫操做的處理器回調方法?

  1. 客戶端把ByteBuffer裏面的數據寫到AsynchronousSocketChannel這個管道上,
  2. 若是ByteBuffer裏面數據很大,超過了管道容量,這時會先完成寫操做,服務端收到數據回調這個completed方法
  3. 則須要ByteBuffer再寫入剩下的數據到管道里,每發完一次數據通知一次,這個管道容量取決於網卡的緩衝區。這個completed方法並非說ByteBuffer的數據寫完了,而是當前網卡這份數據寫完了.

疑難點2:
Buffer:

查看源碼可看到幾個重要屬性:
capacity:表示分配的內存大小
position:相似指針類的索引,讀取或寫入的位置標識符,下一個可寫入的初始位置/下一個可讀取的初始位置
limit:可讀或可寫的範圍,小於等於capacity,當小於capacity,limit到capaticy的最大容量值的這段空間不予寫入是放一些初始化值的.

ByteBuffer能夠理解爲放在內存中的一個數組。
好比圖中一開始是寫入模式,寫入五個字節,地址爲0-4,position在5,調用flip方法後切換到讀模式,position變爲0即開始序列,limit變爲5,這樣就能夠buffer開頭開始讀取了.

clipboard.png

應用場景:
能夠服務端用AIO模型,客戶端使用BIO簡化編程,本文的例子便可調試,啓動AioServer再啓動BioClient,通訊是沒問題的

AIO編程相對複雜,代碼中一些關鍵方法都有註釋,目前Linux下沒有真正意義上的AIO,其實是用了NIO裏面的epoll(true),底層原理仍是用了IO複用(NIO).windows實現了AIO,AIO是將來的方向,需待linux內核支持.

相關文章
相關標籤/搜索