okio
是Square開源框架之一,它對java.io
和java.nio
作了補充,使訪問,存儲和數據處理變得更加容易。它最先是Okhttp
組件之一。 html
Okio
主要圍繞ByteString
與Buffer
這兩個類展開,其主要功能都封裝在這兩個類中:java
ByteString
:是一個相似String
的不可變類,它能夠很容易的在byte
與String
之間進行轉換。該類提供了編/解碼爲hex,md5,base64及UTF-8等方法。Buffer
:是一個可變的字節序列。 與ArrayList
同樣,無需提早調整緩衝區大小。 Buffer
內部維護了一個雙向鏈表,從鏈表尾部寫入數據,頭部讀取數據。 ByteString
和Buffer
作了一些節省CPU和內存的操做。 若是將一個字符串編碼爲ByteString
,ByteString
就會緩存對該字符串的引用(以空間換時間),這樣若是之後對其進行編/解碼等操做,則無需在byte
與String
之間進行轉換。node
//字符串對應的字節數據,避免再一次轉換
final byte[] data;
//字符串
transient String utf8; // Lazily computed.
複製代碼
Buffer
內部維護了一個以Segment
爲節點的雙向鏈表。 當數據從一個Buffer
移動到另外一個Buffer
時,僅須要進行一次數據拷貝,且它會從新分配Segment
的全部權,而不是從新建立Segment
對象。git
Okio
包含本身的流類型,稱爲Source
和Sink
,其工做方式雖然相似InputStream
和OutputStream
,但它與Java I/O相比具備如下優點(參考自Android學習筆記——Okio):github
Okio
實現了I/O讀寫的超時機制(Timeout
),防止讀寫出錯從而致使一直阻塞。OKio
精簡了輸入輸出流的類個數Segment
和SegmentPool
複用機制ByteString
處理不變byte
,Buffer
處理可變byte
。OKio
支持md五、sha、base64等數據處理 Source
、Sink
能夠與InputStream
、OutputStream
互相操做。咱們能夠將任何Source
視爲InputStream
,也能夠將任何InputStream
視爲Source
。一樣適用於Sink
和InputStream
。數組
前面簡單介紹了Okio
,下面就來看看如何使用。緩存
//okio實現圖片複製
public void copyImage(File sinkFile, File sourceFile) throws IOException {
//try裏面的代碼是Okio的標準寫法,不能改變
try (Sink sink = Okio.sink(sinkFile);
BufferedSink bufferedSink = Okio.buffer(sink);
//從文件讀取數據
Source source = Okio.source(sourceFile);
BufferedSource bufferedSource = Okio.buffer(source)) {
//圖片複製
bufferedSink.write(bufferedSource.readByteArray());
//設置超時時間爲1秒中,
sink.timeout().deadline(1, TimeUnit.SECONDS);
//寫入數據,將字符串以UTF-8格式寫入,Okio專門針對utf-8作了處理
bufferedSink.writeUtf8(entry.getKey())
.writeUtf8("=")
.writeUtf8(entry.getValue())
.writeUtf8("\n");
//讀取數據
String str=bufferedSource.readUtf8();
//讀取數據並返回一個ByteString
ByteStringstr=bufferedSource.readByteString();
}
}
複製代碼
正如前面所說的那樣,Okio
使用起來很是方便。因爲Java字符串採用的是UTF-16編碼,而通常開發中使用的都是UTF-8編碼,因此Okio
對字符串編碼作了特殊處理。框架
Source
的意思是水源,它對應着輸入流,在Okio
中經過Okio.source
方法來得到一個Source
對象。異步
//在Okio這個類中關於source重載的方法仍是蠻多的,這裏以文件爲例
public static Source source(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return source(new FileInputStream(file));
}
public static Source source(InputStream in) {
return source(in, new Timeout());
}
private static Source source(final InputStream in, final Timeout timeout) {
...
//這裏纔是真正讀去數據的地方
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
...
try {
//每次寫數據時都先檢查是否超時,默認未設置超時
timeout.throwIfReached();
//獲取鏈表的尾節點
Segment tail = sink.writableSegment(1);
//因爲每一個Segment的SIZE爲8KB,因此每一次拷貝不能超過這個值
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
//經過InputStream讀取數據
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
//數據讀取完畢
if (bytesRead == -1) return -1;
//可寫取位置日後移
tail.limit += bytesRead;
//讀取的總字節數
sink.size += bytesRead;
//返回當前讀取的字節數
return bytesRead;
} catch (AssertionError e) {
...
}
}
...
};
}
複製代碼
能夠發現,這個的Source
是一個匿名對象。獲得Source
對象後,經過Okio.buffer
方法將該對象傳遞給BufferedSource
,BufferedSource
是一個接口,它的具體實現類是RealBufferedSource
。socket
在上面例子中是調用RealBufferedSource
的readByteArray
方法來讀取數據,下面就來看這個方法的實現。
//RealBufferedSource對應的Buffer
public final Buffer buffer = new Buffer();
@Override public byte[] readByteArray() throws IOException {
//將數據寫入buffer
buffer.writeAll(source);
//將全部數據已字節數組形式返回
return buffer.readByteArray();
}
複製代碼
在readByteArray
方法中會首先將數據寫入到Buffer
中,並生成一個雙向鏈表。
@Override public long writeAll(Source source) throws IOException {
if (source == null) throw new IllegalArgumentException("source == null");
long totalBytesRead = 0;
//這裏的source就是前面在Okio中建立的匿名Source對象
for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) {
totalBytesRead += readCount;
}
return totalBytesRead;
}
複製代碼
將數據寫入Buffer
後,調用Buffer
的readByteArray
方法生成一個字節數組並返回。
@Override
public byte[] readByteArray() {
try {
//在讀取數據時,就會獲得size的大小
return readByteArray(size);
} catch (EOFException e) {
throw new AssertionError(e);
}
}
@Override
public byte[] readByteArray(long byteCount) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
...
//建立一個大小爲size的byte數組
byte[] result = new byte[(int) byteCount];
//將讀取的數據寫入這個數組中
readFully(result);
return result;
}
@Override
public void readFully(byte[] sink) throws EOFException {
int offset = 0;
while (offset < sink.length) {
//不斷的將數據寫入sink數組中
int read = read(sink, offset, sink.length - offset);
if (read == -1) throw new EOFException();
offset += read;
}
}
@Override
public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);
Segment s = head;
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);
//進行數據拷貝
System.arraycopy(s.data, s.pos, sink, offset, toCopy);
s.pos += toCopy;
size -= toCopy;
//釋放Segment並將其放入緩衝池
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return toCopy;
}
複製代碼
這樣就將數據寫入到一個新的數組中,並將鏈表中的全部Segment
從新初始化並放入池中。
Sink
的意思是水槽,它對應着輸出流。經過Okio.sink
來獲取一個Sink
對象。
public static Sink sink(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return sink(new FileOutputStream(file));
}
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
...
//建立一個匿名Sink對象
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
//寫入數據
while (byteCount > 0) {
//每次寫數據時都先檢查是否超時,默認未設置超時
timeout.throwIfReached();
//獲取頭結點
Segment head = source.head;
//能copy的最小字節
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
//經過OutputStream來寫入數據
out.write(head.data, head.pos, toCopy);
//可讀取的位置向後移動
head.pos += toCopy;
//減小可寫入的字節數
byteCount -= toCopy;
//減小buffer中字節數
source.size -= toCopy;
//達到最大可寫的位置
if (head.pos == head.limit) {
//釋放節點
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
...
};
}
複製代碼
得到Sink
對象後,將該對象傳遞給BufferedSink
,BufferedSink
是一個接口,它的具體實現是RealBufferedSink
。
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
複製代碼
在3.1節中講了經過InputStream
讀取數據並返回一個字節數組。這裏就將這個數組經過RealBufferedSink
的write
方法寫入到新的文件中。
@Override public BufferedSink write(byte[] source) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source);
return emitCompleteSegments();
}
複製代碼
寫入數據跟讀取數據流程基本上同樣,須要先將數據寫入到Buffer
中。
@Override
public Buffer write(byte[] source) {
if (source == null) throw new IllegalArgumentException("source == null");
return write(source, 0, source.length);
}
@Override
public Buffer write(byte[] source, int offset, int byteCount) {
...
int limit = offset + byteCount;
while (offset < limit) {
Segment tail = writableSegment(1);
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
//進行數據拷貝
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
offset += toCopy;
tail.limit += toCopy;
}
size += byteCount;
return this;
}
複製代碼
前面說過Buffer
維護的是一個鏈表,因此這裏也是將數據寫入一個鏈表中,因爲在數據讀取完畢後會將Segment
對象從新初始化並放入到池中,因此這裏就不用建立新的Segment
對象,直接從池中獲取便可。在寫入Buffer
成功後,再調用emitCompleteSegments
方法,該方法就是將數據從Buffer
寫入到新文件。
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
複製代碼
這裏的Sink
就是在Okio
中建立的匿名對象,在Sink
對象中經過OutputStream
將數據寫入到新文件。
整體流程以下。
Segment
是Okio
中很是重要的一環,它能夠說是Buffer
中數據的載體。容量是8kb,頭結點爲head。
final class Segment {
//Segment的容量,最大爲8kb
static final int SIZE = 8192;
//若是Segment中字節數 > SHARE_MINIMUM時(大Segment),就能夠共享,不能添加到SegmentPool
static final int SHARE_MINIMUM = 1024;
//存儲的數據
final byte[] data;
//下一次讀取的開始位置
int pos;
//寫入的開始位置
int limit;
//當前Segment是否能夠共享
boolean shared;
//data是否僅當前Segment獨有,不share
boolean owner;
//後繼節點
Segment next;
//前驅節點
Segment prev;
...
//移除當前Segment
public final @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
//在當前節點後添加一個新的節點
public final Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
//將當前Segment分裂成2個Segment結點。前面結點pos~limit數據範圍是[pos..pos+byteCount),後面結點pos~limit數據範圍是[pos+byteCount..limit)
public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
//若是字節數大於SHARE_MINIMUM則拆分紅共享節點
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
//當前Segment結點和prev前驅結點合併成一個Segment,統一合併到prev,而後當前Segment結點從雙向鏈表移除並添加到SegmentPool複用。固然合併的前提是:2個Segment的字節總和不超過8K。合併後可能會移動pos、limit
public final void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
//從當前節點移動byteCount個字節到sink中
public final void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
複製代碼
SegmentPool
是一個Segment
池,內部維護了一個Segment
單向鏈表,容量爲64kb(8個Segment
),回收不用的Segment
對象。
final class SegmentPool {
//SegmentPool的最大容量
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
//後繼節點
static Segment next;
//當前池內的總字節數
static long byteCount;
private SegmentPool() {
}
//從池中獲取一個Segment對象
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
//將Segment狀態初始化並放入池中
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
複製代碼
當從InputStream
中讀數據時,讀取的數據會寫進以Segment
爲節點的雙向鏈表中。若是Segment
容量不夠(容量大於8kb),就會從SegmentPool
中take
一個Segment
對象並添加到雙向鏈表尾部。
當經過OutputStrem
寫數據時,會從雙向鏈表的head
節點開始讀取,當Segment
中的數據讀取完畢後,就會將該Segment
從雙向鏈表中移除,並回收到SegmentPool
中,等待下次複用。
Okio
的亮點之一就是增長了超時機制,防止由於意外致使I/O一直阻塞的問題,默認的超時機制是同步的。AsyncTimeout
是Okio
中異步超時機制的實現,它是一個單鏈表,結點按等待時間從小到大排序,head是一個頭結點,起佔位做用。使用了一個WatchDog
的後臺線程來不斷的遍歷全部節點,若是某個節點超時就會將該節點從鏈表中移除,並關閉Socket
。
AsyncTimeout
提供了3個方法enter
、exit
、timeout
,分別用於流操做開始、結束、超時三種狀況調用。
public class AsyncTimeout extends Timeout {
//頭結點,佔位使用
static
AsyncTimeout head;
//是否在鏈表中
private boolean inQueue;
//後繼節點
private
AsyncTimeout next;
//超時時間
private long timeoutAt;
//把當前AsyncTimeout對象加入節點
public final void enter() {
...
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
//建立佔位頭結點並開啓子線程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
...
//插入到鏈表中,按照時間長短進行排序,等待事件越長越靠後
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
//從鏈表中移除節點
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
//執行真正的移除操做
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
//在子類中重寫了該方法,主要是進行socket的關閉
protected void timedOut() {
}
//監聽節點是否超時的子線程
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
//表明頭結點的後繼節點已超時,
if (timedOut == null) continue;
//除頭結點外沒有任何其餘節點
if (timedOut == head) {
head = null;
return;
}
}
//關閉socket
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
static AsyncTimeout awaitTimeout() throws InterruptedException {
AsyncTimeout node = head.next;
//除了頭結點外沒有任何其餘節點
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
//進行等待
if (waitNanos > 0) {
//等待
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
//表明node節點已超時
head.next = node.next;
node.next = null;
return node;
}
}
複製代碼
默認都是未設置超時時間的,須要咱們本身來設置,同步及異步的超時時間設置方式是同樣的,經過下面代碼便可。
sink.timeout().deadline(1, TimeUnit.SECONDS);
source.timeout().deadline(1,TimeUnit.MILLISECONDS);
複製代碼
在Okio
中可使用Pipe
來實現一個生產者/消費者模型。Pipe
維護了一個必定大小Buffer
。當該Buffer
容量達到最大時,線程就會等待直到該Buffer
有剩餘的空間。
public final class Pipe {
//Pipe的最大容量
final long maxBufferSize;
//Pipe對應的Buffer
final Buffer buffer = new Buffer();
boolean sinkClosed;
boolean sourceClosed;
//寫入流,對應着生產者
private final Sink sink = new PipeSink();
//讀取流,對應着消費者
private final Source source = new PipeSource();
public Pipe(long maxBufferSize) {
//最大容量不能小於1
if (maxBufferSize < 1L) {
throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize);
}
this.maxBufferSize = maxBufferSize;
}
...
//寫入數據到Pipe中
final class PipeSink implements Sink {
final Timeout timeout = new Timeout();
@Override public void write(Buffer source, long byteCount) throws IOException {
synchronized (buffer) {
...
while (byteCount > 0) {
...
long bufferSpaceAvailable = maxBufferSize - buffer.size();
if (bufferSpaceAvailable == 0) {
//buffer中,沒有剩餘空間,等待消費者消費
timeout.waitUntilNotified(buffer); // Wait until the source drains the buffer.
continue;
}
long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount);
buffer.write(source, bytesToWrite);
byteCount -= bytesToWrite;
//通知buffer,有新的數據了,
buffer.notifyAll(); // Notify the source that it can resume reading.
}
}
}
...
}
//從Pipe中讀取數據
final class PipeSource implements Source {
final Timeout timeout = new Timeout();
@Override public long read(Buffer sink, long byteCount) throws IOException {
synchronized (buffer) {
...
while (buffer.size() == 0) {
if (sinkClosed) return -1L;
//Pipe中沒有數據,等待生產者寫入
timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer.
}
long result = buffer.read(sink, byteCount);
buffer.notifyAll(); // Notify the sink that it can resume writing.
return result;
}
}
...
}
}
複製代碼
Pipe
的代碼仍是比較少的。下面就來如何使用Pipe
。
public void pipe() throws IOException {
//設置Pipe的容量爲1024字節,即1kb
Pipe pipe = new Pipe(1024);
new Thread(new Runnable() {
@Override
public void run() {
try (BufferedSource bufferedSource = Okio.buffer(pipe.source())) {
//將Pipe中數據寫入env4.txt這個文件中
bufferedSource.readAll(Okio.sink(new File("file/env4.txt")));
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try (BufferedSink bufferedSink = Okio.buffer(pipe.sink())) {
//將env3.txt中數據寫入到Pipe中
bufferedSink.writeAll(Okio.source(new File("file/env3.txt")));
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
複製代碼
雖然沒有將Okio
的所有功能點一一講解(如GZip
),但通過前面梳理想必對Okio
有了一個比較全面的瞭解。這對在之後的開發中熟練的使用Okio
很是有幫助。須要注意的是,雖然Okio
很好用,但Okio
是在Java I/O、nio的基礎上作了封裝、優化,並不具有非阻塞I/O的特性。關於非阻塞I/O能夠去學習netty這個庫。
【參考資料】