Socket網絡通訊——IO、NIO、AIO介紹以及區別

一 基本概念

Socket又稱」套接字「,應用程序一般經過」套接字「向網路發出請求或者應答網絡請求。java

Socket和ServerSocket類位於java.net包中。ServerSocket用於服務器端,Socket是創建網絡鏈接時使用的。在鏈接成功時,應用程序兩端都會產生一個Socket實例,操做這個實例,完成所需的會話,對於一個網絡鏈接來講,套接字是平等的,不由於在服務器端或在客戶端而產生不一樣級別,無論是Socket仍是ServerSocket它們的工做都是經過SocketImpl類及其子類完成的。編程

套接字之間的鏈接過程能夠分爲四個步驟:服務器監聽、客戶端請求服務、服務器確認、客戶端確認、進行通訊。數組

  1. 服務器監聽: 是服務端套接字並不定位具體的客戶端套接字,而是處於等待鏈接的狀態,實時監控網絡狀態
  2. 客戶端請求:是指由客戶端的套接字發出鏈接請求,要鏈接的目標是服務器的套接字。爲此,客戶端的套接字必須首先描述它要鏈接的服務器的套接字,指出服務器端套接字的地址和端口號,而後就向服務器套接字提出鏈接請求。
  3. 服務器端鏈接確認:是指當服務器套接字監聽或者說接收到客戶端套接字的鏈接請求,它就響應客戶端套接字的請求,創建一個新的線程,把服務器套接字的描述發給客戶端。
  4. 客戶端鏈接確認:一旦客戶確認了此描述,鏈接就創建好了,雙方開始創建通訊,有服務器套接字繼續處於監聽狀態,繼續接收其餘客戶端套接字的鏈接請求。

1.1 同步阻塞式I/O編程

網絡編程的基本模型是Client/Server模型,也就是兩個進程直接進行相互通訊,其中服務端提供配置信息(綁定的IP地址和監聽端口),客戶端經過鏈接操做向服務器端監聽的地址發起鏈接請求,經過三次握手創建鏈接,若是鏈接成功,則雙方便可以進行通訊(通訊套接字socket)緩存

public class Server {

    final static int PORT = 8763;
    public static void main(String[] args) {
        ServerSocket server = null;
        try {
            server = new ServerSocket(PORT);
            System.out.println("server start...");
            Socket socket = server.accept();
            //新建一個線程執行客戶端任務
            new Thread(new ServerHandler(socket)).start();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

public class Client {

    final static String ADDRESS = "127.0.0.1";
    final static int PORT = 8763;
    
    public static void main(String[] args) {
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(ADDRESS, PORT);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            
            //向服務器端發出數據
            out.println("接收到客戶端的請求數據...");
            String response = in.readLine();
            System.out.println("Client : " + response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (out != null) {
                out.flush();
                out.close();
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            socket = null;
        }
    }
}


public class ServerHandler implements Runnable{

    private Socket socket;
    
    public ServerHandler(Socket socket) {
        super();
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null) break;
                System.out.println("Server : " + body);
                out.println("服務器端回送響應的數據");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (out != null) {
                out.flush();
                out.close();
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}

運行結果:
Server端:  
server start...
Server : 接收到客戶端的請求數據...
Client端:
Client : 服務器端回送響應的數據...服務器

1.2 僞異步IO實現

JDK 1.5以前,採用線程池和任務隊列能夠實現一種僞異步的IO通訊。
將客戶端的Socket封裝成一個task任務(實現runnable接口的類),而後投遞到線程池中去,配置相應的隊列進行實現。網絡

public class Server {

    final static int PORT = 8763;
    public static void main(String[] args) {
        ServerSocket server = null;
        try {
            server = new ServerSocket(PORT);
            System.out.println("server start...");
            Socket socket = null;
            HandlerExecutorPool handlerExecutorPool = new HandlerExecutorPool(50, 100);
            while (true) {
                socket = server.accept();
                handlerExecutorPool.execute(new ServerHandler(socket));
            }
            
            //新建一個線程執行客戶端任務
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

Client.java 和 ServerHandler.java不變

public class HandlerExecutorPool {

    private ExecutorService executor;
    public HandlerExecutorPool(int maxPoolSize, int queueSize) {
        this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                maxPoolSize,
                120L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueSize));
    }
    
    public void execute(Runnable task) {
        this.executor.execute(task);
    }
}

1.3 基於NIO的同步非阻塞編程

Buffer(緩衝區)、Channel(管道、通道)、Selector(選擇器、多路複用器)
異步

  1. Buffer緩衝區:Buffer是一個對象,它 包含一些要寫入或者要讀取的數據。在NIO類庫中加入Buffer對象,體現了新庫與原庫IO的重要區別。在面向流的IO中,能夠將數據直接寫入或者讀取到Stream對象中,==在NIO庫中,全部數據都是用緩衝區處理的(讀寫)==。緩衝區實質上是一個數組,一般他是一個字節數組(ByteBuffer),也可使用其餘類型的數組。這個數組爲緩衝區提供了數據的訪問讀寫等操做屬性,如位置、容量、上限等概念,參考API文檔。
    Buffer類型:咱們最經常使用的就是ByteBuffer,實際上每一種java基本類型都對應了一種緩存區(除了Boolean),好比:byteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer
public class TestBuffer {

    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(10);
        intBuffer.put(20);
        intBuffer.put(13);
        intBuffer.put(22);
        System.out.println("獲取下標爲1的元素:" + intBuffer.get(1));
        System.out.println(intBuffer);
        intBuffer.flip();//使用flip給position進行復位,每次put以後都要flip,不然get()時會報錯
        System.out.println("使用flip後:" + intBuffer);
        System.out.println("get(index)方法,position位置不變:" + intBuffer);
        intBuffer.put(0, 11); //替換了index爲0的元素
        System.out.println("put(index, value)方法,position位置不變:" + intBuffer);
        for (int i = 0; i < intBuffer.limit(); i++) {
            System.out.print(intBuffer.get() + "\t");
        }
        
        int[] arr = new int[]{1,2,3,4,5};
        IntBuffer intBuffer2 = IntBuffer.wrap(arr);
        System.out.println("\n" + intBuffer2);
        intBuffer2.position(1);
        System.out.println("當前position位置:" + intBuffer2 + "\t當前可獲取元素數量:" + intBuffer2.remaining());
        IntBuffer intBuffer3 = IntBuffer.wrap(arr, 0, 2);
        System.out.println(intBuffer3);
        for (int i = 0; i < intBuffer3.limit(); i++) {
            System.out.print(intBuffer3.get() + "\t");
        }
    }
}

運行結果:
獲取下標爲1的元素:13
java.nio.HeapIntBuffer[pos=3 lim=10 cap=10]
使用flip後:java.nio.HeapIntBuffer[pos=0 lim=3 cap=10]
get(index)方法,position位置不變:java.nio.HeapIntBuffer[pos=0 lim=3 cap=10]
put(index, value)方法,position位置不變:java.nio.HeapIntBuffer[pos=0 lim=3 cap=10]
11 13 22
java.nio.HeapIntBuffer[pos=0 lim=5 cap=5]
當前position位置:java.nio.HeapIntBuffer[pos=1 lim=5 cap=5] 當前可獲取元素數量:4
java.nio.HeapIntBuffer[pos=0 lim=2 cap=5]
1 2socket

  1. 待添加
public class Server implements Runnable{

    private Selector selector;
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    public Server(int port) {
        try {
            this.selector = Selector.open();
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(port));
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Server start, port :" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void run() {
        while (true) {
            try {
                this.selector.select();
                Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    keys.remove();
                    if (key.isValid()) {
                        if (key.isAcceptable())
                            this.accept(key);
                        if (key.isReadable())
                            this.read(key);
                        if (key.isWritable())
                            this.write(key);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    private void write(SelectionKey key) throws ClosedChannelException {
        //ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
        //ssc.register(this.selector, SelectionKey.OP_WRITE);
    }
    
    private void read(SelectionKey key) {
        try {
            this.readBuf.clear();
            SocketChannel sc = (SocketChannel)key.channel();
            int count = sc.read(readBuf);
            if (count == -1) {
                key.channel().close();
                key.cancel();
                return;
            }
            this.readBuf.flip();
            byte[] bytes = new byte[this.readBuf.remaining()];
            this.readBuf.get(bytes);
            String body = new String(bytes).trim();
            System.out.println("Server :" + body);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private void accept (SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            sc.register(this.selector,  SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        new Thread(new Server(8765)).start();
    }
}

public class Client {

    public static void main(String[] args) {
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
        SocketChannel sc = null;
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            sc = SocketChannel.open();
            sc.connect(address);
            while (true) {
                byte[] bytes = new byte[1024];
                System.in.read(bytes);
                buf.put(bytes);
                buf.flip();
                sc.write(buf);
                buf.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (sc != null) {
                try {
                    sc.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}

NIO的本質就是避免原始的TCP創建鏈接使用3次握手的操做,減小鏈接開銷

1.4 基於NIO 2.0的異步非阻塞AIO編程

AIO編程,在NIO基礎上引入了異步通道的概念,並提升了異步文件和異步套接字通道的實現,從而在真正意義上實現了異步阻塞,以前咱們學習的NIO只是非阻塞而並不是異步。而AIO它不須要經過多路複用器對註冊的通道進行輪詢操做便可實現異步讀寫,從而簡化了NIO編程模型。也能夠稱之爲NIO 2.0,這種模式才真正的屬於咱們異步非阻塞的模型。async

AsynchronousServerSocketChannel

AsynchronousSocketChannel

public class Server {

    private ExecutorService executorService;
    //線程池
    private AsynchronousChannelGroup threadGroup;
    //服務器通道
    public AsynchronousServerSocketChannel assc;
    
    public Server (int port) {
        try {
            executorService = Executors.newCachedThreadPool();
            //建立線程池
            threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            //建立線程組
            assc = AsynchronousServerSocketChannel.open(threadGroup);
            //建立服務器通道
            assc.bind(new InetSocketAddress(port));
            //綁定
            System.out.println("Server start, port : " + port);
            assc.accept(this, new ServerCompletionHandler());
            //進行堵塞
            Thread.sleep(Integer.MAX_VALUE);
            //一直阻塞,不讓服務器中止
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        Server server = new Server(8765);
    }
}

public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {

    @Override
    public void completed(AsynchronousSocketChannel asc, Server attachment) {
        // 當有下一個客戶端進入的時候,直接調用Server的accept方法,這樣反覆下去,保證多個客戶端能夠阻塞
        attachment.assc.accept(attachment, this);
        read(asc);
    }
    
    @Override
    public void failed(Throwable exc, Server attachment) {
        exc.printStackTrace();
        
    }

    private void read(final AsynchronousSocketChannel asc) {
        // 讀取數據
        ByteBuffer buf = ByteBuffer.allocate(2014);
        asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                //進行讀取以前,重置標示符
                attachment.flip();
                //獲取讀取的字節數
                System.out.println("Server -> " + "收到客戶端的數據長度爲: " + result);
                //讀取獲取的數據
                String resultData = new String(attachment.array()).trim();
                System.out.println("Server -> 收到客戶端的數據信息爲: " + resultData);
                String response = "服務器響應,收到了客戶端發來的數據:" + resultData;
                write(asc, response);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();  
            }   
        });
    }

    protected void write(AsynchronousSocketChannel asc, String response) {
        try {
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.put(response.getBytes());
            buf.flip();
            asc.write(buf).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

}

public class Client implements Runnable{

    private AsynchronousSocketChannel asc;
    
    public Client() throws IOException {
        asc = AsynchronousSocketChannel.open();
    }
    
    public void connect() {
        asc.connect(new InetSocketAddress("127.0.0.1", 8765));
    }
    
    public void write(String request) {
        try {
            asc.write(ByteBuffer.wrap(request.getBytes())).get();
            read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void read() {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            asc.read(buf).get();
            buf.flip();
            byte[] respBuf = new byte[buf.remaining()];
            buf.get(respBuf);
            System.out.println(new String(respBuf, "utf-8").trim());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void run() {
        while (true) {
            
        }
    }
    
    public static void main(String[] args) throws IOException, InterruptedException {
        Client c1 = new Client();
        c1.connect();
        Client c2 = new Client();
        c2.connect();
        Client c3 = new Client();
        c3.connect();
        
        new Thread(c1, "c1").start();
        new Thread(c2, "c2").start();
        new Thread(c3, "c3").start();
        
        Thread.sleep(1000);
        
        c1.write("c1 AAA");
        c2.write("c2 BBB");
        c3.write("c3 CCC");
    }
}

運行結果:
Server端:
Server start, port : 8765
Server -> 收到客戶端的數據長度爲: 6
Server -> 收到客戶端的數據信息爲: c1 AAA
Server -> 收到客戶端的數據長度爲: 6
Server -> 收到客戶端的數據信息爲: c2 BBB
Server -> 收到客戶端的數據長度爲: 6
Server -> 收到客戶端的數據信息爲: c3 CCC
Client端:
服務器響應,收到了客戶端發來的數據:c1 AAA
服務器響應,收到了客戶端發來的數據:c2 BBB
服務器響應,收到了客戶端發來的數據:c3 CCCtcp

二 區別

2.1 IO(BIO)和NIO的區別

其本質就是阻塞和非阻塞的區別

  1. 阻塞概念:應用程序在獲取網絡數據的時候,若是網絡傳輸數據很慢,那麼程序就一直等着,直到傳輸完畢爲止。
  2. 非阻塞概念:應用程序直接能夠獲取已經準備就緒好的數據,無需等待。
  3. IO爲同步阻塞形式,NIO爲同步非阻塞形式。NIO並無實現異步,在JDK 1.7以後,升級了NIO庫包,支持異步非阻塞通訊模型即NIO 2.0(AIO)

2.2 同步和異步

同步和異步通常是面向操做系統與應用程序對IO操做的層面上來區別的

  1. 同步時,應用程序會直接參與IO讀寫操做,而且 咱們的應用程序會直接阻塞到某一個方法上,直到數據準備就緒;或者採用輪詢的策略實時檢查數據的就緒狀態,若是就緒則獲取數據。
  2. 異步時,則全部的IO讀寫操做都交給操做系統處理,與咱們的應用程序沒有直接關係,咱們程序不須要關心IO讀寫,當操做系統完成了IO讀寫操做時,就會給咱們應用程序發送通知,咱們的應用程序直接拿走數據便可。

阻塞說的是具體的技術,接收數據的方式(io, nio)

同步說的是你的server服務器的執行方式

相關文章
相關標籤/搜索