目錄java
BIO設計模式
server端:服務器
client端:異步
NIOsocket
單線程模型ide
代碼實現測試
AIOthis
單線程AIO代碼實現操作系統
當肯定客戶端鏈接數不多時,BIO也可使用,簡單不易出錯,效率低下不表明沒有用武之地。
BIO的accept,read,write都是阻塞的,一個線程老在那阻塞着,其實它沒幹事,可是它腦門上貼着一個「I`m busying」我很忙,效率極其低下,即便用線程池還不是一堆線程在那"Busy"
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class Server { public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(); ss.bind(new InetSocketAddress("127.0.0.1", 8888)); while(true) { Socket s = ss.accept(); //阻塞方法 new Thread(() -> { handle(s); }).start(); } } static void handle(Socket s) { try { byte[] bytes = new byte[1024]; int len = s.getInputStream().read(bytes); System.out.println(new String(bytes, 0, len)); s.getOutputStream().write(bytes, 0, len); s.getOutputStream().flush(); } catch (IOException e) { e.printStackTrace(); } } }
import java.io.IOException; import java.io.OutputStream; import java.net.Socket; public class Client { public static void main(String[] args) throws IOException { Socket s = new Socket("127.0.0.1", 8888); s.getOutputStream().write("HelloServer".getBytes()); s.getOutputStream().flush(); //s.getOutputStream().close(); System.out.println("write over, waiting for msg back..."); byte[] bytes = new byte[1024]; int len = s.getInputStream().read(bytes); System.out.println(new String(bytes, 0, len)); s.close(); } }
selector選擇器大管家,專門作監聽這件事,監聽鏈接、讀、寫事件。
只須要一個鏈接就能夠處理客戶端的鏈接,客戶端的讀,客戶端的寫,selector所有都能處理。
使用到的設計模式就是observer觀察者模式。
selector說:我對某些事感興趣,哪些事呢?關於客戶端的鏈接、讀、寫這些事我通通感興趣!我會每隔一段時間就去檢查server這些事件有沒有發生,若是有一個客戶端要鏈接了,selector就幫助客戶端和server創建鏈接,來一個就幫忙創建一個鏈接,來一個建一個。selector除了負責客戶端的鏈接以外,還會盯着已經鏈接好的客戶端通道有沒有須要讀和寫的數據,須要讀就讀過來,須要寫就寫出去。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888)); ssc.configureBlocking(false); System.out.println("server started, listening on :" + ssc.getLocalAddress()); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); handle(key); } } } private static void handle(SelectionKey key) { if(key.isAcceptable()) { try { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //new Client // //String hostIP = ((InetSocketAddress)sc.getRemoteAddress()).getHostString(); /* log.info("client " + hostIP + " trying to connect"); for(int i=0; i<clients.size(); i++) { String clientHostIP = clients.get(i).clientAddress.getHostString(); if(hostIP.equals(clientHostIP)) { log.info("this client has already connected! is he alvie " + clients.get(i).live); sc.close(); return; } }*/ sc.register(key.selector(), SelectionKey.OP_READ ); } catch (IOException e) { e.printStackTrace(); } finally { } } else if (key.isReadable()) { //flip SocketChannel sc = null; try { sc = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(512); buffer.clear(); int len = sc.read(buffer); if(len != -1) { System.out.println(new String(buffer.array(), 0, len)); } ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes()); sc.write(bufferToWrite); } catch (IOException e) { e.printStackTrace(); } finally { if(sc != null) { try { sc.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
引入了線程池,selector領着一幫工人在幹活,成了一個包工頭,再也不是一我的單打獨鬥了。
selector就是一個boss,只負責客戶端的鏈接,線程池就是worker,須要讀寫了就交給worker來處理,worker中誰閒着就交給誰來處理,而不是像NIO單線程模型同樣有讀有寫了就new一個工人出來。
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class PoolServer { ExecutorService pool = Executors.newFixedThreadPool(50); private Selector selector; //中文測試 /** * * @throws IOException */ public static void main(String[] args) throws IOException { PoolServer server = new PoolServer(); server.initServer(8000); server.listen(); } /** * * @param port * @throws IOException */ public void initServer(int port) throws IOException { // ServerSocketChannel serverChannel = ServerSocketChannel.open(); // serverChannel.configureBlocking(false); // serverChannel.socket().bind(new InetSocketAddress(port)); // this.selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服務端啓動成功!"); } /** * * @throws IOException */ @SuppressWarnings("unchecked") public void listen() throws IOException { // 輪詢訪問selector while (true) { // selector.select(); // Iterator ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // ite.remove(); // if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // SocketChannel channel = server.accept(); // channel.configureBlocking(false); // channel.register(this.selector, SelectionKey.OP_READ); // } else if (key.isReadable()) { // key.interestOps(key.interestOps()&(~SelectionKey.OP_READ)); // pool.execute(new ThreadHandlerChannel(key)); } } } } } /** * * @param * @throws IOException */ class ThreadHandlerChannel extends Thread{ private SelectionKey key; ThreadHandlerChannel(SelectionKey key){ this.key=key; } @Override public void run() { // SocketChannel channel = (SocketChannel) key.channel(); // ByteBuffer buffer = ByteBuffer.allocate(1024); // ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { int size = 0; while ((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(),0,size); buffer.clear(); } baos.close(); // byte[] content=baos.toByteArray(); ByteBuffer writeBuf = ByteBuffer.allocate(content.length); writeBuf.put(content); writeBuf.flip(); channel.write(writeBuf);// if(size==-1){ channel.close(); }else{ // key.interestOps(key.interestOps()|SelectionKey.OP_READ); key.selector().wakeup(); } }catch (Exception e) { System.out.println(e.getMessage()); } } }AIO
NIO須要不停地輪詢server有沒有事件發生。
AIO大概是這樣的:有客戶端要鏈接的時候,交給操做系統OS去鏈接,OS一旦鏈接上客戶端,就會給大管家selector發一消息說有人要連上來,要不要給它連,大管家能夠有多個,但通常是一個,它只負責鏈接客戶端,跟NIO不一樣的是,selector只須要在那裏坐着等就行,不用轉圈去輪詢server。
AIO與NIO底層都是使用OS的epoll系統調用函數實現的,epoll就是輪詢,而NIO也是輪詢,因此netty就直接封裝了NIO,其API封裝的更像是AIO。
Windows實現的AIO是真正的AIO異步IO,效率要比Linux要高。可是服務器大多數選擇的是Linux而非Windows。
AIO用到的設計模式是鉤子函數(模板方法)。
並非線程數越多越好,線程數太多會增長線程切換的開銷。
IO都是OS操做系統來實現的,要否則你程序能夠隨便往硬盤裏寫點東西,不通過OS老大也太放肆了吧?
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class Server { public static void main(String[] args) throws Exception { final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open() .bind(new InetSocketAddress(8888)); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel client, Object attachment) { serverChannel.accept(null, this); try { System.out.println(client.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); System.out.println(new String(attachment.array(), 0, result)); client.write(ByteBuffer.wrap("HelloClient".getBytes())); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); while (true) { Thread.sleep(1000); } } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ServerWithThreadGroup { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //中文測試 final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(threadGroup) .bind(new InetSocketAddress(8888)); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel client, Object attachment) { serverChannel.accept(null, this); try { System.out.println(client.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); System.out.println(new String(attachment.array(), 0, result)); client.write(ByteBuffer.wrap("HelloClient".getBytes())); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); while (true) { Thread.sleep(1000); } } }