同步阻塞
式IO,服務端建立一個ServerSocket,而後客戶端用一個Socket去鏈接那個ServerSocket,而後ServerSocket接收到一個Socket的鏈接請求就建立一個Socket和一個線程去跟那個Socket進行通訊。linux
public class BioServer { public static void main(String[] args) { // 服務端開啓一個端口進行監聽 int port = 8080; ServerSocket serverSocket = null; //服務端 Socket socket; //客戶端 InputStream in = null; OutputStream out = null; try { serverSocket = new ServerSocket(port); //經過構造函數建立ServerSocket,指定監聽端口,若是端口合法且空閒,服務器就會監聽成功 // 經過無限循環監聽客戶端鏈接,若是沒有客戶端接入,則會阻塞在accept操做 while (true) { System.out.println("Waiting for a new Socket to establish" + " ," + new Date().toString()); socket = serverSocket.accept();//阻塞 三次握手 in = socket.getInputStream(); byte[] buffer = new byte[1024]; int length = 0; while ((length = in.read(buffer)) > 0) {//阻塞 System.out.println("input is:" + new String(buffer, 0, length) + " ," + new Date().toString()); out = socket.getOutputStream(); out.write("success".getBytes()); System.out.println("Server end" + " ," + new Date().toString()); } } } catch (Exception e) { e.printStackTrace(); } finally { // 必要的清理活動 if (serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
同步非阻塞
包括Selector,這是多路複用器,selector會不斷輪詢註冊的channel,若是某個channel上發生了讀寫事件,selector就會將這些channel獲取出來,咱們經過SelectionKey獲取有讀寫事件的channel,就能夠進行IO操做。一個Selector就經過一個線程,就能夠輪詢成千上萬的channel,這就意味着你的服務端能夠接入成千上萬的客戶端。服務器
public class NioDemo implements Runnable { public int id = 100001; public int bufferSize = 2048; @Override public void run() { init(); } public void init() { try { // 建立通道和選擇器 ServerSocketChannel socketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress( InetAddress.getLocalHost(), 4700); socketChannel.socket().bind(inetSocketAddress); // 設置通道非阻塞 綁定選擇器 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_ACCEPT).attach( id++); System.out.println("Server started .... port:4700"); listener(selector); } catch (Exception e) { } } public void listener(Selector in_selector) { try { while (true) { Thread.sleep(1 * 1000); in_selector.select(); // 阻塞 直到有就緒事件爲止 Set<SelectionKey> readySelectionKey = in_selector .selectedKeys(); Iterator<SelectionKey> it = readySelectionKey.iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); // 判斷是哪一個事件 if (selectionKey.isAcceptable()) {// 客戶請求鏈接 System.out.println(selectionKey.attachment() + " - 接受請求事件"); // 獲取通道 接受鏈接, // 設置非阻塞模式(必須),同時須要註冊 讀寫數據的事件,這樣有消息觸發時才能捕獲 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey .channel(); serverSocketChannel .accept() .configureBlocking(false) .register( in_selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE).attach(id++); System.out .println(selectionKey.attachment() + " - 已鏈接"); // 下面這種寫法是有問題的 不該該在serverSocketChannel上面註冊 /* * serverSocketChannel.configureBlocking(false); * serverSocketChannel.register(in_selector, * SelectionKey.OP_READ); * serverSocketChannel.register(in_selector, * SelectionKey.OP_WRITE); */ } if (selectionKey.isReadable()) {// 讀數據 System.out.println(selectionKey.attachment() + " - 讀數據事件"); SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); ByteBuffer receiveBuf = ByteBuffer.allocate(bufferSize); clientChannel.read(receiveBuf); System.out.println(selectionKey.attachment() + " - 讀取數據:" + getString(receiveBuf)); } if (selectionKey.isWritable()) {// 寫數據 System.out.println(selectionKey.attachment() + " - 寫數據事件"); SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); ByteBuffer sendBuf = ByteBuffer.allocate(bufferSize); String sendText = "hello\n"; sendBuf.put(sendText.getBytes()); sendBuf.flip(); //寫完數據後調用此方法 clientChannel.write(sendBuf); } if (selectionKey.isConnectable()) { System.out.println(selectionKey.attachment() + " - 鏈接事件"); } // 必須removed 不然會繼續存在,下一次循環還會進來, // 注意removed 的位置,針對一個.next() remove一次 it.remove(); } } } catch (Exception e) { System.out.println("Error - " + e.getMessage()); e.printStackTrace(); } } /** * ByteBuffer 轉換 String * * @param buffer * @return */ public static String getString(ByteBuffer buffer) { String string = ""; try { for (int i = 0; i < buffer.position(); i++) { string += (char) buffer.get(i); } return string; } catch (Exception ex) { ex.printStackTrace(); return ""; } } }
異步非阻塞
每一個鏈接發送過來的請求,都會綁定一個buffer,而後通知操做系統去異步完成讀,此時你的程序是會去幹別的事兒的,等操做系統完成數據讀取以後,就會回調你的接口,給你操做系統異步讀完的數據。網絡
public class AIOServer { public final static int PORT = 9888; private AsynchronousServerSocketChannel server; public AIOServer() throws IOException { server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress(PORT)); } public void startWithFuture() throws InterruptedException, ExecutionException, TimeoutException { while (true) {// 循環接收客戶端請求 Future<AsynchronousSocketChannel> future = server.accept(); AsynchronousSocketChannel socket = future.get();// get() 是爲了確保 accept 到一個鏈接 handleWithFuture(socket); } } public void handleWithFuture(AsynchronousSocketChannel channel) throws InterruptedException, ExecutionException, TimeoutException { ByteBuffer readBuf = ByteBuffer.allocate(2); readBuf.clear(); while (true) {// 一次可能讀不完 //get 是爲了確保 read 完成,超時時間能夠有效避免DOS攻擊,若是客戶端一直不發送數據,則進行超時處理 Integer integer = channel.read(readBuf).get(10, TimeUnit.SECONDS); System.out.println("read: " + integer); if (integer == -1) { break; } readBuf.flip(); System.out.println("received: " + Charset.forName("UTF-8").decode(readBuf)); readBuf.clear(); } } public void startWithCompletionHandler() throws InterruptedException, ExecutionException, TimeoutException { server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { public void completed(AsynchronousSocketChannel result, Object attachment) { server.accept(null, this);// 再此接收客戶端鏈接 handleWithCompletionHandler(result); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); } public void handleWithCompletionHandler(final AsynchronousSocketChannel channel) { try { final ByteBuffer buffer = ByteBuffer.allocate(4); final long timeout = 10L; channel.read(buffer, timeout, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { System.out.println("read:" + result); if (result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } buffer.flip(); System.out.println("received message:" + Charset.forName("UTF-8").decode(buffer)); buffer.clear(); channel.read(buffer, timeout, TimeUnit.SECONDS, null, this); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) throws Exception { // new AIOServer().startWithFuture(); new AIOServer().startWithCompletionHandler(); Thread.sleep(100000); } }
一種多路複用的技術,能夠解決以前poll和select大量併發鏈接狀況下cpu利用率太高,以及須要遍歷整個被偵聽的描述符集的問題。epoll只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就好了。併發
把一個磁盤文件映射到內存裏來,而後把映射到內存裏來的數據經過socket發送出去
有一種mmap技術,也就是內存映射,直接將磁盤文件數據映射到內核緩衝區,這個映射的過程是基於DMA引擎拷貝的,同時用戶緩 衝區是跟內核緩衝區共享一塊映射數據的,創建共享映射以後,就不須要從內核緩衝區拷貝到用戶緩衝區了
光是這一點,就能夠避免一次拷貝了,可是這個過程當中仍是會用戶態切換到內核態去進行映射拷貝,接着再次從內核態切換到用戶態, 創建用戶緩衝區和內核緩衝區的映射
接着把數據經過Socket發送出去,仍是要再次切換到內核態
接着直接把內核緩衝區裏的數據拷貝到Socket緩衝區裏去,而後再拷貝到網絡協議引擎裏,發送出去就能夠了,最後切換回用戶態
減小一次拷貝,可是並不減小切換次數,一共是4次切換,3次拷貝異步
linux提供了sendfile,也就是零拷貝技術
這個零拷貝技術,就是先從用戶態切換到內核態,在內核態的狀態下,把磁盤上的數據拷貝到內核緩衝區,同時從內核緩衝區拷貝一些 offset和length到Socket緩衝區;接着從內核態切換到用戶態,從內核緩衝區直接把數據拷貝到網絡協議引擎裏去
同時從Socket緩衝區裏拷貝一些offset和length到網絡協議引擎裏去,可是這個offset和length的量不多,幾乎能夠忽略
只要2次切換,2次拷貝,就能夠了socket
select
,poll
實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。epoll
也須要調用epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間。這就是回調機制帶來的性能提高。ide
select
,poll
每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll
只要一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內部定義的等待隊列)。這也能節省很多的開銷。函數