IO | NIO |
---|---|
阻塞式 | 非阻塞式、選擇器selectors |
面向流:單向流動,直接將數據從一方流向另外一方 | 面向緩存:將數據放到緩存區中進行存取,經通道進行數據的傳輸 |
根據數據類型的不一樣,提供了對應的類型緩衝區(boolean類型除外),每個Buffer類都是Buffer接口的一個實例。經過Buffer類.allocate()方法獲取緩衝區;對緩衝區的數據進行操做可使用put方法和get方法。java
四個核心屬性程序員
// Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity;
經常使用方法(以ByteBuffer爲例)數組
public static ByteBuffer allocateDirect(int capacity):分配一個直接緩衝區public static ByteBuffer allocate(int capacity):分配一個間接緩衝區緩存
當分配一個緩衝區時,capacity=capacity,mark=-1, position=0, limit=capacity,源碼分析以下:服務器
public static ByteBuffer allocate(int capacity) { ... return new HeapByteBuffer(capacity, capacity); } // class HeapByteBuffer extends ByteBuffer HeapByteBuffer(int cap, int lim) { // 調用ByteBuffer的構造函數傳入默認參數:mark=-1, position=0, limit=capacity super(-1, 0, lim, cap, new byte[cap], 0); }; // public abstract class ByteBuffer extends Buffer ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; // final byte[] hb; this.offset = offset; // final int offset; } Buffer(int mark, int pos, int lim, int cap) { ... this.capacity = cap; limit(lim); // 設置limit position(pos); // 設置position if (mark >= 0) { ... this.mark = mark; } }
public final ByteBuffer put(byte[] src):將一個字節數組放入緩衝區。
每當放置一個字節時,position將會+1,保證position的值就是下一個可插入數據的buffer單元位置。源碼分析以下:網絡
public final ByteBuffer put(byte[] src) { return put(src, 0, src.length); } // 由allocate方法調用分配緩衝區可知,返回的是Buffer的實現類HeapByteBuffer對象 public ByteBuffer put(byte[] src, int offset, int length) { checkBounds(offset, length, src.length); // 檢查是否下標越界 if (length > remaining()) // 檢查是否超出了可操做的數據範圍= limit-position throw new BufferOverflowException(); System.arraycopy(src, offset, hb, ix(position()), length); position(position() + length); // 重設position return this; }
public ByteBuffer get(byte[] dst):從緩衝區中讀取數據到 dst中。應在 flip() 方法後調用。
獲取數據,是在緩衝區字節數組中的position位置處開始,讀取一次完畢後,並會記錄當前讀取的位置,即position,以便於下一次調用get方法繼續讀取。app
public ByteBuffer get(byte[] dst) { return get(dst, 0, dst.length); } // 調用HeapByteBuffer對象的get方法 public ByteBuffer get(byte[] dst, int offset, int length) { ... // 從緩衝區的字節數組final byte[] hb中,拷貝從 hb的 offset+position(注:offset=0) 處的長度爲length的數據到 dst中 System.arraycopy(hb, ix(position()), dst, offset, length); position(position() + length); // 設置position return this; }
經過源碼分析可知,當put操做後,position記錄的是下一個可用的buffer單元,而get會從position位置處開始獲取數據,這顯然是沒法得到的,所以須要從新設置 position, 即 flip()方法。dom
public final Buffer flip() :翻轉緩衝區,在一個通道讀取或PUT操做序列以後,調用此方法以準備一個通道寫入或相對獲取操做的序列
將此通道的緩衝區的界限設置爲當前position,保證了有可操做的數據。異步
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
public final Buffer mark():標記當前position
可用於在put操做轉get操做時標記當前的position位置,以便於調用reset方法從該位置繼續操做socket
public final Buffer mark() { mark = position; return this; }
public final Buffer reset():回到mark標記的位置
public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
public final Buffer clear():清除緩衝,重置初始化原始狀態
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
public final Buffer rewind():倒回,用於從新讀取數據
public final Buffer rewind() { position = 0; mark = -1; return this; }
直接緩衝區與間接緩衝區
間接緩衝:經過allocate方法分配的緩衝區。當程序發起read請求獲取磁盤文件時,該文件首先被OS讀取到內核地址空間中,並copy一份原始數據傳入JVM用戶地址空間,再傳給應用程序。增長了一個copy操做,致使效率下降。
直接緩衝:經過allocateDirecr方法分配的緩衝區,此緩衝區創建在物理內存中。直接在兩個空間中開闢內存空間,建立映射文件,去除了在內核地址空間和用戶地址空間中的copy操做,使得直接經過物理內存傳輸數據。雖然有效提升了效率,可是分配和銷燬該緩衝區的成本高於間接緩衝,且對於緩衝區中的數據將交付給OS管理,程序員沒法控制。
用於源節點與目標節點之間的鏈接,負責對緩衝區中的數據提供傳輸服務。
經常使用類
FileChannel:用於讀取、寫入、映射和操做文件的通道。
SocketChannel:經過 TCP 讀寫網絡中的數據。
ServerSocketChannerl:經過 UDP 讀寫網絡中的數據通道。
DatagramChannel:經過 UDP 讀寫網絡中的數據通道。
本地IO:FileInputStream、FileOutputStream、RandomAccessFile
網絡IO:Socket、ServerSocket、DatagramSocket
獲取Channel方式(以FileChannel爲例)
1. Files.newByteChannel工具類靜態方法
2. getChannel方法:經過對象動態獲取,使用間接緩衝區。
FileInputStream fis = new FileInputStream(ORIGINAL_FILE); FileOutputStream fos = new FileOutputStream(OUTPUT_FILE); // 獲取通道 FileChannel inChannel = fis.getChannel(); FileChannel outChannel = fos.getChannel(); // 提供緩衝區(間接緩衝區) ByteBuffer buffer = ByteBuffer.allocate(1024); while (inChannel.read(buffer) != -1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); }
3. 靜態open方法:使用open獲取到的Channel通道,使用直接緩衝區。
FileChannel inChannel = FileChannel.open(Paths.get(ORIGINAL_FILE), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get(OUTPUT_FILE), StandardOpenOption.READ, StandardOpenOption.CREATE, StandardOpenOption.WRITE); // 使用物理內存 內存映射文件 MappedByteBuffer inBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size()); byte[] dst = new byte[inBuffer.limit()]; inBuffer.get(dst); outBuffer.put(dst);
// 使用DMA 直接存儲器存儲 inChannel.transferTo(0, inChannel.size(), outChannel); outChannel.transferFrom(inChannel, 0, inChannel.size());
public static FileChannel open(Path path, OpenOption... options):從path路徑中以某種方式獲取文件的Channel
StandardOpenOption | 描述 |
---|---|
CREATE | 建立一個新的文件,若是存在,則覆蓋。 |
CREATE_NEW | 建立一個新的文件,若是該文件已經存在則失敗。 |
DELETE_ON_CLOSE | 關閉時刪除。 |
DSYNC | 要求將文件內容的每次更新都與底層存儲設備同步寫入。 |
READ | 讀方式 |
SPARSE | 稀疏文件 |
SYNC | 要求將文件內容或元數據的每次更新都同步寫入底層存儲設備。 |
TRUNCATE_EXISTING | 若是文件已經存在,而且打開 wirte訪問,則其長度將截斷爲0。 |
WRITE | 寫方式 |
APPEND | 若是文件以wirte訪問打開,則字節將被寫入文件的末尾而不是開頭。 |
public abstract MappedByteBuffer map(MapMode mode, long position, long size):將通道的文件區域映射到內從中。當操做較大的文件時,將數據映射到物理內存中才是值得的,由於映射到內存是須要開銷的。
FileChannel.MapMode | 描述 |
---|---|
PRIVATE | 專用映射模式(寫入時拷貝) |
READ_ONLY | 只讀模式 |
READ_WRIT | 讀寫模式 |
public abstract long transferFrom(ReadableByteChannel src, long position, long count):從給定的可讀取通道src,傳輸到本通道中。直接使用直接存儲器(DMA)對數據進行存儲。public abstract long transferTo(long position, long count, WritableByteChannel target):將本通道的文件傳輸到可寫入的target通道中。
分散(Scatter)與彙集(Gather)
分散讀取:將通道中的數據分散到多個緩衝區中。 public final long read(ByteBuffer[] dsts)
彙集寫入:將多個緩衝區中的數據彙集到一個Channel通道中。public final long write(ByteBuffer[] srcs)
字符集(Charset)
public final ByteBuffer encode(CharBuffer cb):編碼public final CharBuffer decode(ByteBuffer bb):解碼
阻塞是相對網絡傳輸而言的。傳統的IO流都是阻塞的,在網絡通訊中,因爲 IO 阻塞,須要爲每個客戶端建立一個獨立的線程來進行數據傳輸,性能大大下降;而NIO是非阻塞的,當存在空閒線程時,能夠轉去操做其餘通道,所以沒必要非要建立一個獨立的線程來服務每個客戶端請求。
選擇器(Selector)
SelectableChannle對象的多路複用器,可同時對多個SelectableChannle對象的 IO 狀態監聽,每當建立一個Channel時,就向Selector進行註冊,交由Selector進行管理,只有Channel準備就緒時,Selector可會將任務分配給一個或多個線程去執行。Selector能夠同時管理多個Channel,是非阻塞 IO 的核心。
NIO 阻塞式
服務器Server不斷監聽客戶端Client的請求,當創建了一個Channel時,服務器進行read操做,接收客戶端發送的數據,只有當客戶端斷開鏈接close,或者執行shutdownOutput操做時,服務器才知曉沒有數據了,不然會一直進行read操做;當客戶端在read操做獲取服務器的反饋時,若服務器沒有關閉鏈接或者shutdownInput時也會一直阻塞。示例代碼以下:
static final String ORIGINAL_FILE = "F:/1.png"; static final String OUTPUT_FILE = "F:/2.jpg";
public void server() throws Exception { // 打開TCP通道,綁定端口監聽 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(9988)); ByteBuffer buf = ByteBuffer.allocate(1024); // 獲取鏈接 SocketChannel accept = null; while ((accept= serverChannel.accept()) != null) { FileChannel fileChannel = FileChannel.open( Paths.get(OUTPUT_FILE), StandardOpenOption.CREATE, StandardOpenOption.WRITE); // 讀取客戶端的請求數據 while (accept.read(buf) != -1) { buf.flip(); fileChannel.write(buf); buf.clear(); } // 發送執行結果 buf.put("成功接收".getBytes()); buf.flip(); accept.write(buf); buf.clear(); fileChannel.close(); // 關閉鏈接,不然客戶端會一直等待讀取致使阻塞,可以使用shutdownInput,但任務已結束,該close accept.close(); } serverChannel.close(); }
public void client() throws Exception { // 打開一個socket通道 SocketChannel clientChannel = SocketChannel.open( new InetSocketAddress("127.0.0.1", 9988)); // 建立緩衝區和文件傳輸通道 FileChannel fileChannel = FileChannel.open(Paths.get(ORIGINAL_FILE), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while ( fileChannel.read(buf) != -1) { buf.flip(); clientChannel.write(buf); buf.clear(); } // 關閉輸出(不關閉通道),告知服務器已經發送完畢,去掉下面一行代碼服務區將一直讀取致使阻塞 clientChannel.shutdownOutput(); int len = 0; while ((len = clientChannel.read(buf)) != -1) { buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } fileChannel.close(); clientChannel.close(); }
NIO 非阻塞式
經過在通道Channel中調用configureBlocking將blocking設置爲false,讓Channel能夠進行異步 I/O 操做。
public void client() throws Exception { // 打開一個socket通道 SocketChannel clientChannel = SocketChannel.open( new InetSocketAddress("127.0.0.1", 9988)); ByteBuffer buf = ByteBuffer.allocate(1024); // 告知服務器,已經發送完畢 // clientChannel.shutdownOutput(); // 設置非阻塞 clientChannel.configureBlocking(Boolean.FALSE); buf.put("哈哈".getBytes()); buf.flip(); clientChannel.write(buf); clientChannel.close(); }
public void server() throws Exception { // 打開TCP通道,綁定端口監聽 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(Boolean.FALSE); serverChannel.bind(new InetSocketAddress(9988)); // 建立一個Selector用於管理Channel Selector selector = Selector.open(); // 將服務器的Channel註冊到selector中,並添加 OP_ACCEPT 事件,讓selector監聽通道的請求 serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 一直判斷是否有已經準備就緒的Channel while (selector.select() > 0) { // 存在一個已經準備就緒的Channel,獲取SelectionKey集合中獲取觸發該事件的全部key Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey sk = keys.next(); SocketChannel accept = null; ByteBuffer buffer = null; // 針對不一樣的狀態進行操做 if (sk.isAcceptable()) { // 可被鏈接,設置非阻塞並註冊到selector中 accept = serverChannel.accept(); accept.configureBlocking(Boolean.FALSE); accept.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { // 可讀,獲取該選擇器上的 Channel進行讀操做 accept = (SocketChannel) sk.channel(); buffer = ByteBuffer.allocate(1024); int len = 0; while ((len = accept.read(buffer)) != -1) { buffer.flip(); System.out.println(new String(buffer.array(), 0, len)); buffer.clear(); } } } // 移除本次操做的SelectionKey keys.remove(); } serverChannel.close(); }
方法使用說明
Pipe管道
Channel都是雙向通道傳輸,而Pipe就是爲了實現單向管道傳送的通道對,有一個source通道(Pipe.SourceChannel)和一個sink通道(Pipe.SinkChannel)。sink用於寫數據,source用於讀數據。直接使用Pipe.open()獲取Pipe對象,操做和FileChannel同樣。