參考:https://www.jianshu.com/p/f5941bcf3a2djava
將JDK封裝的IO操做再進行一層封裝數組
好處:安全
(1)使用了segement片斷管理的方式管理數據,以片斷做爲IO操做的單位,使IO操做吞吐率增長架構
(2)使用鏈表將Segment片斷聯繫起來管理,對於移動指針就能夠對數據進行管理,擴容方便異步
(3)使用SegmentPool對廢棄的片斷回收、複用和內存共享,從而減小內存的申請和減小了GC的頻率,性能獲得優化。socket
(4)封裝了輸出輸入的超時處理,着在JDK原生io操做中沒有ide
(5)具有了不少從字節到特定編碼格式或者數據類型的轉化方法,很方便函數
OKIO寫流程圖:性能
public static void main(String[] args) { File file = new File("hello.txt"); try { readString(new FileInputStream(file)); } catch (IOException e) { e.printStackTrace(); } } public static void readString(InputStream in) throws IOException { BufferedSource source = Okio.buffer(Okio.source(in)); //建立RealBufferedSource輸入流 String s = source.readUtf8(); //以UTF-8讀,就是經過執行回調(Okio.source(in)的read方法)往Buffer中寫source輸入流的數據,再返回Buffer存儲size的所有字節 System.out.println(s); source.close(); } public static void writeString(OutputStream out){ BufferedSink sink = Okio.buffer(Okio.sink(out)); //建立RealBufferedSink輸出流 sink.writeLone("test");//裏面往Buffer裏面寫「test」,再經過執行回調(Okio.sink(out)的write方法)將Buffer裏面的數據寫到輸出流sink裏 sink.close(); }
3.okio架構優化
兩接口分別定義了read()和wriete(),表明着輸入流和輸出流,
BufferedSource、BufferedSink接口:
分別繼承了Source和Sink,維護者Buffer,分別定義了一堆針對各類類型的讀和寫操做方法。
至關於一個操做的中介代理,內部都維護着一個Buffer,分別定義了進行具體操做。
存儲最大長度爲8K字節數組,該數組爲不可變字節序列,pos表明開始能夠讀的字節序號,limit表明能夠寫的字節序號,boolean shared表明該片斷是否共享,boolean owner表明本身是否能夠操做本片斷(與shared互斥),經過per、next造成鏈表(Buffer構建出雙向鏈表),方法固然有push()、pop()
因爲存儲的結果就是Head-new1-new2-new3-這樣子,新建new的前提僅僅是上一片斷(例如new3)的limit到Segment_size_max小於要存儲的字節數,若是很長時間會有一個segment只存了小部分數據,可是有不少segment,形成內存浪費。所以須要壓縮方法compact(),具體作法:先將上一片斷的數據從pos往前移,使pos從序號0開始,而後將本片斷的數據寫到上一片斷上,從limit開始,而後將本片斷拋出。
public void compact() { if (prev == this) throw new IllegalStateException();//若是上一節點是本身表明沒有上一節點 if (!prev.owner) return; // Cannot compact: prev isn't writable.//若是上一節點不能夠操做,返回 int byteCount = limit - pos;//記錄當前片斷存儲的字節長度L1 int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);//計算上一節點沒有存儲數據的長度L2(pos以前的長度+limit以後的長度,若是perv是共享的,那麼就不能加入pos以前的長度) if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.//若是L1>L2,那麼就不夠位置存,返回 writeTo(prev, byteCount);//將本部分數據寫入上一節點 pop();//彈出本節點 SegmentPool.recycle(this); }
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */ public void writeTo(Segment sink, int byteCount) {//往sink部分寫入this(Segment)長度爲byteCount的數據 if (!sink.owner) throw new IllegalArgumentException(); if (sink.limit + byteCount > SIZE) { // We can't fit byteCount bytes at the sink's current position. Shift sink first. if (sink.shared) throw new IllegalArgumentException(); if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException(); System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);//先將sink本來存儲的數據往前移,pos設爲1 sink.limit -= sink.pos; sink.pos = 0; } System.arraycopy(data, pos, sink.data, sink.limit, byteCount);//將this(Segment)數據寫入sink中,從limit開始 sink.limit += byteCount; pos += byteCount; }
原理:將一個Segment分爲[pos...pos+byteCount]和[pos+byteCount...limit]兩部分,該方法只用在Buffer.wirte()上,注意的是byteCount參數的意義是兩個Segment相同的部分,若是相同的部分超多SHARE_MINIMUM=1024字節,就共享一個Segment,看起來咱們是new了一個Segment,可是裏面的data[]引用仍是原來的Segment,所以可以減小數據複製帶來的內存開銷,從而提升性能
public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; // We have two competing performance goals: // - Avoid copying data. We accomplish this by sharing segments. // - Avoid short shared segments. These are bad for performance because they are readonly and // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be large. if (byteCount >= SHARE_MINIMUM) { prefix = new Segment(this); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; }
該池經過一個segment變量將所有segment聯繫起來,存儲最多8個segment的片斷池,若是該池還未滿,一旦segment廢棄了就放到該池中,減小了GC。若是io操做須要新建segment的話就不用new申請內存空間了,直接從片斷池中獲取。
具體讀寫操做的執行類,內部維護着一個Segment head,經過該片斷就能夠獲取整個鏈表的全部數據,long size表明已經存儲的總字節數。實現了三個接口,讀、寫和Clone。
實現及好處:循環的雙向鏈表實現,鏈表節點爲Segment片斷,而Segment維護着固定長度的字節數組。這樣採起分片的連接方式,片中又經過數組存儲,兼具讀的連續性和寫的可插入性,對比於單一的用鏈表或者數組,是比較折中的方式。還有個好處就是根據需求來改動分片的大小來權衡讀寫的業務操做。
/** Returns a deep copy of this buffer. */ @Override public Buffer clone() { Buffer result = new Buffer(); if (size == 0) return result; result.head = new Segment(head); result.head.next = result.head.prev = result.head;//1 for (Segment s = head.next; s != head; s = s.next) { result.head.prev.push(new Segment(s)); } result.size = size; return result; }
Segment(Segment shareFrom) { this(shareFrom.data, shareFrom.pos, shareFrom.limit); shareFrom.shared = true; } Segment(byte[] data, int pos, int limit) { this.data = data; this.pos = pos; this.limit = limit; this.owner = false; this.shared = true; }
該Clone拷貝方法爲淺拷貝,可是咱們注意到雖然拷貝新建了一個Buffer,可是該Buffer的head變量的構造由new Segment(head)構建,細看該構造器中最重要的字節數組data引用指向了新的data,沒有從新分配新的內存空間,性能獲得優化。Segment的shared共享標示參數設爲true,結合Segment類不少設計到shared的方法,有不少好處。
同時注意到語句1,result.head.next = result.head.prev = result.head,這樣就造成了循環的鏈表中循環的特性了,以前分析Segment類不管增長數據add Segment或者讀取數據 popSegment都沒有造成循環閉環的特性,原來閉環(循環)的特性在Buffer類中造成。
除此以外,Buffer進行寫操做每次都必須先獲取segment,語句2也是造成閉環的緣由
Segment writableSegment(int minimumCapacity) { if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); if (head == null) { head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head;//2 } Segment tail = head.prev; if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {//須要的長度若是從limit上再追加大於了Segment的最大大小了 tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.//使得head.prev每次都指向新push的segment } return tail; }
具體分爲int(4)、long(8)、short(2)的等等數據類型的讀和寫,須要注意的是各類數據類型須要多少個字節存儲,讀和寫的參數分爲直接是字節數組或者是source、sink、輸入流、輸出流等等,大同小異。
若是爲String數據類型做爲輸入或者輸出時,編碼格式能夠選擇,utf-8
以md五、sha一、sha256數字摘要輸出Buffer裏面存儲的數據。
其實就是一個不可變的字節數組,以特定的編碼格式進行解碼,支持的解碼方式有utf-8,base64和hex。
該字段其實就是不可變的意思,在Segment存儲data仍是ByteString裏面data都設了該字段。
定義:
類不被拓展(繼承或者實現)
全部域都是final的,全部域都是private
沒有任何提供改變該類狀態的方法
好處:固然是線程安全的,不用考慮同步的問題(final的語義:當構造函數結束時,final定義的值被保證當其餘線程訪問該對象時,該值是可見的)
壞處:須要建立大量類的對象(。。。)
超時機制:意義在於防止IO操做阻塞在某個異常上,okio的超時機制的基礎就是採起同步超時TimeOut。
同步:TimeOut類
使用在以任何inputStream或者outPutStream做爲輸入流輸出流參數構建時,使用同步超時機制。
其實經過對OKIO的簡單使用,咱們知道讀寫操做的具體操做都在構建輸入輸出流的的回調方法。
如下sink構建輸出流代碼爲例(source(final InPutStream in)構建輸入流在超時處理上也同樣執行了語句1,timeOut.throwInReached(),不拓展了)
OKIO:
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); while (byteCount > 0) { timeout.throwIfReached();//1 Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; } }; }
TimeOut:
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }
異步:AsyncTimeOut,
使用在socket做爲輸入輸出流時,使用異步超時處理,之因此在socket上使用異步超時處理,由於socket自身性質決定,socket經常由於自身阻塞致使往下不執行。
okio:與其餘inputStream不一樣的是,他不直接返回source,而是做爲AsyncTime.source()的參數再進行一次封裝,返回source
public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); }
AsyncTimeOut:咱們發現這裏纔是執行的準確位置,語句1就是涉及到異步超時處理機制。
在開始讀或者寫以前執行enter(),該方法就是開啓一個WatchDog線程,待分析。。。
public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; enter();//1 try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.source(" + source + ")"; } }; }