Java BIO:同步並阻塞
(傳統阻塞型),服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時服務器端就須要啓動一個線程進行處理,若是這個鏈接不做任何事情會形成沒必要要的線程開銷。java
Java NIO:同步非阻塞
,服務器實現模式爲一個線程處理多個請求(鏈接),即客戶端發送的鏈接請求會被註冊到多路複用器上,多路複用器輪詢到有 I/O 請求就會進行處理。程序員
Java AIO:異步非阻塞
,AIO 引入了異步通道的概念,採用了 Proactor 模式,簡化了程序編寫,有效的請求才啓動線程,它的特色是先由操做系統完成後才通知服務端程序啓動線程去處理,通常適用於鏈接數較多且鏈接時間較長的應用。shell
同步阻塞
:你到飯館點餐,而後在那等着,還要一邊喊:好了沒啊!同步非阻塞
:在飯館點完餐,就去遛狗了。不過溜一下子,就回飯館喊一聲:好了沒啊!異步阻塞
:遛狗的時候,接到飯館電話,說飯作好了,讓您親自去拿。異步非阻塞
:飯館打電話說,咱們知道您的位置,一會給你送過來,安心遛狗就能夠了。鏈接數比較小且固定
的架構,這種方式對服務器資源要求比較高,併發侷限於應用中,JDK1.4 以前惟一的選擇,程序較爲簡單容易理解。鏈接數目多且鏈接比較短
的架構,好比聊天服務器,彈幕系統,服務器間通信等,編程比較複雜,JDK1.4 開始支持。鏈接數目多且鏈接比較長
的架構,好比相冊服務器,充分調用 OS 參與併發操做,變成比較複雜,JDK7 開始支持。同步阻塞
,服務器實現模式爲一個鏈接一個線程,即客戶端有鏈接請求時,服務器就會須要啓動一個線程來進行處理。若是這個鏈接不做任何事情就會形成沒必要要的開銷,能夠經過線程池機制改善。public class Server { public static void main(String[] args) throws IOException { //建立線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //建立serverSocket ServerSocket serverSocket = new ServerSocket(6666); for (; ; ) { System.out.println("等待鏈接中..."); //監聽,等待客戶端鏈接 Socket socket = serverSocket.accept(); System.out.println("鏈接到一個客戶端"); executorService.execute(() -> handler(socket)); } } //編寫一個handler方法,和客戶端通信 public static void handler(Socket socket) { byte[] bytes = new byte[1024]; System.out.println("當前線程信息: " + Thread.currentThread().getName()); try { //經過socket獲取輸入流 InputStream inputStream = socket.getInputStream(); //循環讀取客戶端發送的數據 while (inputStream.read(bytes) != -1) { System.out.println(Thread.currentThread().getName()+ " : 發送信息爲 :"+ new String(bytes, 0, bytes.length)); } } catch (IOException e) { e.printStackTrace(); } finally { System.out.println("關閉鏈接"); try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
使用終端命令編程
telnet 127.0.0.1 6666
建立大量線程來處理鏈接
,系統資源佔用較大。同步非阻塞
的。Channel(管道)
、Buffer(緩衝區)
、Selector(選擇器)
。緩衝區
編程的。數據讀取到了一個它稍微處理的緩衝區,須要時可在緩衝區中先後移動,這就增長了處理過程當中的靈活性,使用它能夠提供非阻塞的高伸縮性網絡。public class BufferTest { public static void main(String[] args) { //同理對應的還有:ByteBuffer,IntBuffer,FloatBuffer,CharBuffer,ShortBuffer,DoubleBuffer,LongBuffer //建立一個Buffer,大小爲5 IntBuffer buffer = IntBuffer.allocate(5); //存放數據 for (int i = 0; i < buffer.capacity(); i++) { buffer.put(i); } //切換成讀模式. 讀寫切換 buffer.flip(); while (buffer.hasRemaining()) { System.out.println(buffer.get()); // 0 1 2 3 4 } } }
使用單個線程就能夠監聽多個客戶端通道
。
說明:數組
事件
決定的(Event)。flip()
切換讀寫模式。而 BIO 是單向的,要麼輸入流要麼輸出流。
Buffer(緩衝區)基本介紹服務器
緩衝區本質上是一個能夠讀寫數據的內存塊,能夠理解爲是一個容器對象(含數組)
,該對象提供了一組方法
,能夠更輕鬆地使用內存塊,緩衝區對象內置了一些機制,可以跟蹤和記錄緩衝區的狀態變化狀況。網絡
Channel 提供從文件、網絡讀取數據的渠道,可是讀取或者都必須通過 Buffer。多線程
在 Buffer 子類中維護着一個對應類型的數組,用來存放數據:架構
public abstract class IntBuffer extends Buffer implements Comparable<IntBuffer> { // These fields are declared here rather than in Heap-X-Buffer in order to // reduce the number of virtual method invocations needed to access these // values, which is especially costly when coding small buffers. // final int[] hb; // Non-null only for heap buffers final int offset; boolean isReadOnly; // Valid only for heap buffers // Creates a new buffer with the given mark, position, limit, capacity, // backing array, and array offset // IntBuffer(int mark, int pos, int lim, int cap, // package-private int[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; this.offset = offset; } // Creates a new buffer with the given mark, position, limit, and capacity // IntBuffer(int mark, int pos, int lim, int cap) { // package-private this(mark, pos, lim, cap, null, 0); }
Buffer 經常使用子類 | 描述 |
---|---|
ByteBuffer | 存儲字節數據到緩衝區 |
ShortBuffer | 存儲字符串數據到緩衝區 |
CharBuffer | 存儲字符數據到緩衝區 |
IntBuffer | 存儲整數數據據到緩衝區 |
LongBuffer | 存儲長整型數據到緩衝區 |
DoubleBuffer | 存儲浮點型數據到緩衝區 |
FloatBuffer | 存儲浮點型數據到緩衝區 |
Buffer 中定義了四個屬性來提供所其包含的數據元素。併發
// Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity;
屬性 | 描述 |
---|---|
capacity | 容量,便可以容納的最大數據量;在緩衝區被建立時候就被指定,沒法修改 |
limit | 表示緩衝區的當前終點,不能對緩衝區超過極限的位置進行讀寫操做,但極限是能夠修改的 |
position | 當前位置,下一個要被讀或者寫的索引,每次讀寫緩衝區數據都會改變該值,爲下次讀寫作準備 |
Mark | 標記當前 position 位置,當 reset 後回到標記位置。 |
NIO 的通道相似於流,但有以下區別:
經常使用的 Channel 有:FileChannel、DatagramChannel、SocketChannel、SocketServerChannel。
FileChannel 類
FileChannel 主要用來對本地文件進行 IO 操做,常見的方法有:
public class NIOFileChannel { public static void main(String[] args) throws IOException { String str = "Hello,Java菜鳥程序員"; //建立一個輸出流 FileOutputStream fileOutputStream = new FileOutputStream("hello.txt"); //獲取通道 FileChannel channel = fileOutputStream.getChannel(); //建立緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(100); //寫入byteBuffer byteBuffer.put(str.getBytes()); //切換模式 byteBuffer.flip(); //寫入通道 channel.write(byteBuffer); //關閉 channel.close(); fileOutputStream.close(); } }
public class NIOFileChannel { public static void main(String[] args) throws IOException { FileInputStream fileInputStream = new FileInputStream("hello.txt"); FileChannel channel = fileInputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(100); channel.read(byteBuffer); System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit())); //Hello,Java菜鳥程序員 channel.close(); fileInputStream.close(); } }
public class NIOFileChannel03 { public static void main(String[] args) throws IOException { FileInputStream fileInputStream = new FileInputStream("hello.txt"); FileOutputStream fileOutputStream = new FileOutputStream("world.txt"); FileChannel inChannel = fileInputStream.getChannel(); FileChannel outChannel = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1); while (inChannel.read(byteBuffer) != -1) { byteBuffer.flip(); outChannel.write(byteBuffer); //清空重置 byteBuffer.clear(); } fileOutputStream.close(); fileInputStream.close(); } }
public class NIOFileChannel04 { public static void main(String[] args) throws IOException { FileInputStream fileInputStream = new FileInputStream("hello.txt"); FileOutputStream fileOutputStream = new FileOutputStream("world.txt"); FileChannel inChannel = fileInputStream.getChannel(); FileChannel outChannel = fileOutputStream.getChannel(); //從哪拷貝,從幾開始到幾結束 對應的還有transferTo()方法. outChannel.transferFrom(inChannel, 0, inChannel.size()); outChannel.close(); inChannel.close(); fileOutputStream.close(); fileInputStream.close(); } }
NIO 還支持經過多個 Buffer(即 Buffer 數組)完成讀寫操做,即Scattering(分散)和 Gathering(彙集)。
Scattering(分散)
:在向緩衝區寫入數據時,可使用 Buffer 數組依次寫入,一個 Buffer 數組寫滿後,繼續寫入下一個 Buffer 數組。Gathering(彙集)
:從緩衝區讀取數據時,能夠依次讀取,讀完一個 Buffer 再按順序讀取下一個。Netty 的 I/O 線程 NioEventLoop 聚合了 Selector(選擇器 / 多路複用器),能夠併發處理成百上千個客戶端鏈接。
當線程從某客戶端 Socket 通道進行讀寫時,若沒有數據可用,該線程能夠進行其餘任務。
線程一般將非阻塞 I/O 的空閒時間用於其餘通道上執行 I/O 操做,因此單獨的線程能夠管理多個輸入輸出通道。
因爲讀寫操做都是非阻塞的,就能夠充分提升 I/O 線程的運行效率,避免因爲頻繁 I/O 阻塞致使的線程掛起。
一個 I/O 線程能夠併發處理 N 個客戶端鏈接和讀寫操做,這從根本上解決了傳統同步阻塞 I/O 一鏈接一線程模型,架構性能、彈性伸縮能力和可靠性都獲得極大地提高。
public abstract class Selector implement Closeable{ public static Selector open(); //獲得一個選擇器對象 public int select(long timeout); //監控全部註冊的通道,當其中的IO操做能夠進行時,將對應的selectionkey加入內部集合並返回,參數設置超時時間 public Set<SelectionKey> selectionKeys(); //從內部集合中獲得全部的SelectionKey }
selector.select()
://若未監聽到註冊管道中有事件,則持續阻塞selector.select(1000)
://阻塞 1000 毫秒,1000 毫秒後返回selector.wakeup()
://喚醒 selectorselector.selectNow()
: //不阻塞,當即返回SelectionKey 中定義了四個操做標誌位:OP_READ
表示通道中發生讀事件;OP_WRITE
—表示通道中發生寫事件;OP_CONNECT
—表示創建鏈接;OP_ACCEPT
—請求新鏈接。
public class Server { public static void main(String[] args) throws IOException { //建立serverSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //綁定端口 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //設置爲非阻塞 serverSocketChannel.configureBlocking(false); //獲得Selector對象 try (Selector selector = Selector.open()) { //把ServerSocketChannel註冊到selector,事件爲OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //若是返回的>0,表示已經獲取到關注的事件 while (selector.select() > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { //得到到一個事件 SelectionKey next = iterator.next(); //若是是OP_ACCEPT,表示有新的客戶端鏈接 if (next.isAcceptable()) { //給該客戶端生成一個SocketChannel SocketChannel accept = serverSocketChannel.accept(); accept.configureBlocking(false); //將當前的socketChannel註冊到selector,關注事件爲讀事件,同時給socket Channel關聯一個buffer accept.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024)); System.out.println("獲取到一個客戶端鏈接"); //若是是讀事件 } else if (next.isReadable()) { //經過key 反向獲取到對應的channel SocketChannel channel = (SocketChannel) next.channel(); //獲取到該channel關聯的buffer ByteBuffer buffer = (ByteBuffer) next.attachment(); while (channel.read(buffer) != -1) { buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.limit())); buffer.clear(); } } iterator.remove(); } } } } }
public class Client { public static void main(String[] args) throws IOException { //獲得一個網絡通道 SocketChannel socketChannel = SocketChannel.open(); //設置爲非阻塞 socketChannel.configureBlocking(false); //提供服務器端的IP和端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); //鏈接服務器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("鏈接須要時間,客戶端不會阻塞...先去吃個宵夜"); } } //鏈接成功,發送數據 String str = "hello,Java菜鳥程序員"; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); socketChannel.write(byteBuffer); socketChannel.close(); System.out.println("客戶端退出"); } }
方法 | 描述 |
---|---|
public abstract Selector selector(); | 獲得與之關聯的 Selector 對象 |
public abstract SelectableChannel channel(); | 獲得與之關聯的通道 |
public final Object attachment() | 獲得與之關聯的共享數據 |
public abstract SelectionKey interestOps(int ops); | 設置或改變監聽的事件類型 |
public final boolean isReadable(); | 通道是否可讀 |
public final boolean isWritable(); | 通道是否可寫 |
public final boolean isAcceptable(); | 是否能夠創建鏈接 ACCEPT |
public class GroupChatClient { private static final String HOST = "127.0.0.1"; private static final int PORT = 6667; private Selector selector; private SocketChannel socketChannel; private String username; public GroupChatClient() { try { selector = Selector.open(); //鏈接服務器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); //設置非阻塞 socketChannel.configureBlocking(false); //註冊 socketChannel.register(selector, SelectionKey.OP_READ); username = socketChannel.getLocalAddress().toString().substring(1); System.out.println("客戶端: " + username + ",準備就緒..."); } catch (IOException e) { e.printStackTrace(); } } /** * 向服務器發送數據 * * @param info */ public void sendInfo(String info) { info = username + "說: " + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (IOException e) { e.printStackTrace(); } } /** * 讀取服務端回覆的消息 */ public void readInfo() { try { //有可用通道 if (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { //獲得相關的通道 SocketChannel sc = (SocketChannel) key.channel(); //獲得一個buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取 sc.read(buffer); //把讀取到的緩衝區數據轉成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } iterator.remove(); //刪除當前的selectionKey,防止重複操做 } } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { //啓動客戶端 GroupChatClient chatClient = new GroupChatClient(); //啓動一個線程,每隔3秒,讀取從服務器端發送的數據 new Thread(() -> { while (true) { chatClient.readInfo(); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //發送數據給服務器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { chatClient.sendInfo(scanner.nextLine()); } } }
public class GroupChatServer { //定義屬性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; public GroupChatServer() { try { //得到選擇器 selector = Selector.open(); //listenChannel listenChannel = ServerSocketChannel.open(); //綁定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //設置非阻塞模式 listenChannel.configureBlocking(false); //將該listenChannel註冊到Selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { //建立一個服務器對象 GroupChatServer groupChatServer = new GroupChatServer(); //監聽 groupChatServer.listen(); } /** * 監聽 */ public void listen() { try { //若是返回的>0,表示已經獲取到關注的事件 while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //判斷是否有事件 while (iterator.hasNext()) { //得到事件 SelectionKey key = iterator.next(); //若是是OP_ACCEPT,表示有新的客戶端鏈接 if (key.isAcceptable()) { SocketChannel socketChannel = listenChannel.accept(); //設置爲非阻塞 socketChannel.configureBlocking(false); //註冊到Selector socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("獲取到一個客戶端鏈接 : " + socketChannel.getRemoteAddress() + " 上線!"); } else if (key.isReadable()) { //若是是讀事件,就讀取數據 readData(key); } iterator.remove(); } } } catch (IOException e) { e.printStackTrace(); } finally { } } /** * 讀取客戶端消息 */ private void readData(SelectionKey key) { SocketChannel channel = null; try { //獲得channel channel = (SocketChannel) key.channel(); //建立buffer ByteBuffer buffer = ByteBuffer.allocate(1024); if (channel.read(buffer) != -1) { String msg = new String(buffer.array()); System.out.println(msg); // 轉發消息給其它客戶端(排除本身) sendInfoOtherClients(msg, channel); } } catch (Exception e) { try { System.out.println(channel.getRemoteAddress() + " 下線了!"); // 關閉通道 key.cancel(); channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } finally { } } /** * 轉發消息給其它客戶端(排除本身) */ private void sendInfoOtherClients(String msg, SocketChannel self) throws IOException { //服務器轉發消息 System.out.println("服務器轉發消息中..."); //遍歷全部註冊到selector的socketChannel並排除自身 for (SelectionKey key : selector.keys()) { //反向獲取通道 Channel targetChannel = key.channel(); //排除自身 if (targetChannel instanceof SocketChannel && targetChannel != self) { //轉型 SocketChannel dest = (SocketChannel) targetChannel; //將msg存儲到buffer中 ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //將buffer中的數據寫入通道 dest.write(buffer); } } } }
JDK 7 引入了 Asynchronous I/O,即 AIO。在進行 I/O 編程中,一般用到兩種模式:Reactor 和 Proactor 。Java 的 NIO 就是 Reactor,當有事件觸發時,服務器端獲得通知,進行相應的處理。
AIO 叫作異步非阻塞
的 I/O,引入了異步通道的概念,採用了 Proactor 模式,簡化了程序編寫,有效的請求才會啓動線程,特色就是先由操做系統完成後才通知服務端程序啓動線程去處理,通常用於鏈接數較多且鏈接時長較長的應用。
Reactor 與 Proactor
因爲 AIO 目前應用並不普遍,因此本文只是講述 AIO 基本介紹。