Okio 主要是替代 java.io 和 java.nio 那一套複雜的 api 調用框架,讓數據訪問、存儲和處理更加便捷。java
Okio 是 OkHttp 的基石,而 OkHttp 是 Retrofit 的基石。這三個框架合稱『Square 安卓平臺網絡層三板斧』node
參考 OkioTestgit
……
@Test public void readWriteFile() throws Exception {
File file = temporaryFolder.newFile();
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();
assertTrue(file.exists());
assertEquals(20, file.length());
BufferedSource source = Okio.buffer(Okio.source(file));
assertEquals("Hello, java.io file!", source.readUtf8());
source.close();
}
……
複製代碼
經過 OkioTest 能夠大體明白 Okio 主要有 『讀』、『寫』兩大類操做。能夠操做的對象爲:github
1. 文件
2. 文件路徑描述類 Path
3. Socket
4. OutputStream
5. InputStream
複製代碼
Okio 經過 sink(xxx) 去寫一個對象,經過 source(xxx)去讀一個對象。api
經過 Okio.buffer() 得到用來讀寫的 BufferedSource、BufferedSink緩存
BufferedSink sink = Okio.buffer(Okio.sink(file));
BufferedSource source = Okio.buffer(Okio.source(file));
複製代碼
進一步查看 buffer() 方法網絡
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
複製代碼
看下 RealBufferedSink 類框架
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}
……
}
複製代碼
RealBufferedSink 實現了 BufferedSink 接口,BufferedSink 實現了 Sink 接口。異步
而 Sink 實現了 Closeable, Flushable 接口。socket
1. Flushable 接口只定義了一個方法 flush() ,用來實現把數據從緩存區寫入到底層輸入流。
2. Closeable 接口定義 close() ,用來關閉流。
3. Sink 接口又定義了一個 write(Buffer source, long byteCount) 和 timeout() 用來寫入數據和設置超時。
4. BufferedSink 接口則定義了一堆 wirteXXX(……) 用來操做不一樣類型的數據寫入。
複製代碼
在看 RealBufferedSink 的成員變量
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
複製代碼
這裏出現了一個 Buffer 對象,一個從構造函數裏面傳入的 Sink 對象,以及一個用來記錄流是否關閉的 boolean 標誌。
RealBufferedSink 的各類 wirteXXX(……)大都以下
@Override public BufferedSink writeXXX(……) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeXXX(……);
return emitCompleteSegments();
}
複製代碼
可見寫入數據的方法,都是有 buffer 對象實現。而在 emitXXX() 和 flush() 方法中則調用了 sink.write(buffer, byteCount) 和 sink.flush()
RealBufferedSource 和 RealBufferedSink 相似
final class RealBufferedSource implements BufferedSource {
public final Buffer buffer = new Buffer();
public final Source source;
boolean closed;
RealBufferedSource(Source source) {
if (source == null) throw new NullPointerException("source == null");
this.source = source;
}
}
複製代碼
RealBufferedSource 實現了 BufferedSource 接口,BufferedSource 實現了 Source 接口。
Source 接口一樣也實現了 Closeable 接口。
1. Source 集成了 Closeable 接口,表示 Source 提供了一個 close 方法關閉讀取數據的流。
2. Source 定義了一個 read(Buffer sink, long byteCount) 用來讀取數據,一個 timeout() 方法用來設置讀取超時。
3. BufferedSource 定義了不少 readXXX(……) 用來讀取數據。
複製代碼
RealBufferedSource 中的 readXXX(……) 方法和 RealBufferedSink 中的 writeXXX(……) 相似,都是經過成員變量 buffer 和 構造對象時傳入的 Source 對象配合起來讀取數據。
總結一下整個讀寫框架的結構以下:
不管是 File 、Path、InputStream、OutputStream 、Socket ,在 Okio 框架中只要一個簡單的 Okio.sink(……) 方法便可得到對應的輸入流(RealBufferedSink)和輸出流(RealBufferedSource)
並且 Okio 還給輸入/輸出流的都提供一個額外參數:Timeout,用來設置讀寫的超時設置。
全部的 sink 方法,都會調用到
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
複製代碼
Okio.sink() 會建立一個匿名內部類的實例,實現了 write 方法,用來寫入數據到 OutputStream(File 、Path、Socket 都會被轉成成 OutputStream(),每次寫入數據的時候,都會檢測是否超時。(超時機制後面後講)
Okio.Source() 相似會建立一個實現 Source 接口的匿名內部類實例。實現 read 方法 ,負責從 InputStream 中讀取數據。
Okio 在讀寫數據的時候,裏面都會用用一個 Segment 對象。Segment 是 Okio 定義的一個***鏈表結構***的數據片斷,每一個 Segment 能夠最多存放 8K 的字節。
寫數據的時候 Okio 會先把數據寫到 buffer 中
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
sink.close();
複製代碼
Okio.buffer() 返回的是 RealBufferedSink
@Override public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeUtf8(string);
return emitCompleteSegments();
}
複製代碼
查看 writeUtf8
@Override public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
}
複製代碼
而後把 String 變成一個 Segment 鏈表
@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
……
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
if (c < 0x80) {
Segment tail = writableSegment(1);
byte[] data = tail.data;
……
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
}
……
} else if (c < 0x800) {
// Emit a 11-bit character with 2 bytes.
writeByte(c >> 6 | 0xc0); // 110xxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} ……
}
return this;
}
複製代碼
經過 writableSegment 是否是要開闢新的 Segment 到隊列尾部
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
複製代碼
在看 emitCompleteSegments()
@Override
public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
複製代碼
buffer.completeSegmentByteCount() 用來計算 Segment 的緩存的字節長度
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
複製代碼
sink.write(buffer, byteCount) 就是以前傳入的集成的 Sink 匿名類。
總結一下整個流程
讀數據的時候 Buffer 起到的做用相似,直接貼一下流程圖
Okio 能夠給他 OutputStream 、InputStream 增長一個超市設置。讀寫文件時會設置一個默認的 TimeOut ,這個方法是個空實現。
在讀寫 Socket 的時候,Okio 給咱們展現一個如何設置一個異步的超時機制,用來在 Socket 讀寫超時時關閉流。
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
複製代碼
private static AsyncTimeout timeout(final Socket socket) {
return new AsyncTimeout() {
@Override
protected IOException newTimeoutException(@Nullable IOException cause) {
……
}
@Override
protected void timedOut() {
try {
socket.close();
}……
}
};
}
複製代碼
這裏看出會返回一個 AsyncTimeout 的匿名對象,主要在 timeOut() 中關閉 Socket。
private static Sink sink(final OutputStream out, final Timeout timeout) {
……
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
……
while (byteCount > 0) {
timeout.throwIfReached();
……
}
}
……
};
}
複製代碼
在看一下 throwIfReached 方法
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
複製代碼
若是 hasDeadline 是 true,而且 deadlineNanoTime 大於 System.nanoTime() 來判斷是否達超時。
public final Sink sink(final Sink sink) {
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
long toWrite = 0L;
for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
int segmentSize = s.limit - s.pos;
toWrite += segmentSize;
if (toWrite >= byteCount) {
toWrite = byteCount;
break;
}
}
// Emit one write. Only this section is subject to the timeout.
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
@Override
public void flush() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.flush();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public Timeout timeout() {
return AsyncTimeout.this;
}
@Override
public String toString() {
return "AsyncTimeout.sink(" + sink + ")";
}
};
}
複製代碼
能夠看出 timeout.sink(sink) 從新包裝了 Sink 給 Sink 的每一個方法都增長一個 enter() 方法
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
複製代碼
這裏會發現若是知足了條件,會執行 scheduleTimeout 方法。可是默認狀況下,條件不會被知足。
查看一下 SocketTimeoutTest
@Test
public void readWithoutTimeout() throws Exception {
Socket socket = socket(ONE_MB, 0);
BufferedSource source = Okio.buffer(Okio.source(socket));
source.timeout().timeout(5000, TimeUnit.MILLISECONDS);
source.require(ONE_MB);
socket.close();
}
複製代碼
這裏能夠看到,須要調用 source.timeout().timeout(5000, TimeUnit.MILLISECONDS)
public Timeout timeout(long timeout, TimeUnit unit) {
if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
if (unit == null) throw new IllegalArgumentException("unit == null");
this.timeoutNanos = unit.toNanos(timeout);
return this;
}
複製代碼
這裏能夠看到 timeoutNanos 在這裏賦值了。因此設置 timeout(5000, TimeUnit.MILLISECONDS) 後會出發 scheduleTimeout(this, timeoutNanos, hasDeadline)
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
……
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
複製代碼
這裏用到一個同步鎖、啓動一個 Watchdog 線程。而且根據 timeout 的超時時間,把 AsyncTimeout 添加到一個任務隊列中。
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
複製代碼
Watchdog 線程會一直同步遍歷任務隊列執行 awaitTimeout()
static @Nullable
AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
複製代碼
} 這裏會根據隊列頭部的 AsyncTimeout 節點,計算出剩餘時間,而後執行 AsyncTimeout.class.wait(waitMillis, (int) waitNanos) 方法阻塞。
注意這個的 wait(timeout) 會被 AsyncTimeout.class.notify() 喚醒。
複製代碼
若是任務隊列爲空會執行 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS) ,等待一分鐘。而後再判斷是否有新的任務。