你好,我是 N0tExpectErr0r,一名熱愛技術的 Android 開發java
個人我的博客:blog.N0tExpectErr0r.cnnode
從前面的 OkHttp 源碼解析中咱們能夠知道,OkHttp 中的 I/O 都不是經過咱們平時所使用的 Input/Output Stream 來實現,而是使用了 Okio 這個第三方庫,那它與尋常的 IOStream 有什麼區別呢?讓咱們來分析一下它的源碼。數組
Okio 中有兩個很是重要的接口——Sink
以及 Source
,它們都繼承了 Closeable
,其中 Sink
對應了咱們原來所使用的 OutputStream
,而 Source
則對應了咱們原來所使用的 InputStream
。緩存
Okio 的入口就是Okio
類,它是一個工廠類,能夠經過它內部的一些 static 方法來建立 Sink
、Source
等對象。安全
Sink
實際上只是一個接口,讓咱們看看 Sink
中有哪些方法:bash
public interface Sink extends Closeable, Flushable {
void write(Buffer source, long byteCount) throws IOException;
@Override void flush() throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
複製代碼
能夠看到,它主要包含了 write
、flush
、timeout
、close
這幾個方法,咱們能夠經過 Okio.sink
方法基於 OutputStream
獲取一個 Sink
:less
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
複製代碼
這裏構建並實現了一個 Sink
的匿名內部類並返回,主要實現了它的 write
方法,剩餘方法都是簡單地轉調到 OutputStream
的對應方法。異步
在 write
方法中,首先進行了一些狀態檢驗,這裏貌似在 Timeout
類中實現了對超時的處理,咱們稍後再分析。以後從 Buffer
中獲取了一個 Segment
,並從中取出數據,計算出寫入的量後將其寫入 Sink
所對應的 OutputStream
。ide
Segment
採用了一種相似鏈表的形式進行鏈接,看來 Buffer
中維護了一個 Segment
鏈表,表明了數據的其中一段。這裏將 Buffer
中的數據分段取出並寫入了 OutputStream
中。oop
最後,經過 SegmentPool.recycle
方法對當前 Segment
進行回收。
從上面的代碼中咱們能夠獲取到以下信息:
Buffer
其實就是內存中的一段數據的抽象,其中經過 Segment
以鏈表的形式保存用於存儲數據。Segment
存儲數據採用了分段的存儲方式,所以獲取數據時須要分段從 Segment
中獲取數據。SegmentPool
池用於實現 Segment
的複用。Segment
的使用有點相似鏈表。Source
與 Sink
同樣,也僅僅是一個接口:
public interface Source extends Closeable {
long read(Buffer sink, long byteCount) throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
複製代碼
在 Okio
中能夠經過 source
方法根據 InputStream
建立一個 Source
:
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) {
throw new IllegalArgumentException("in == null");
} else if (timeout == null) {
throw new IllegalArgumentException("timeout == null");
} else {
return new Source() {
public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0L) {
throw new IllegalArgumentException("byteCount < 0: " + byteCount);
} else if (byteCount == 0L) {
return 0L;
} else {
try {
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int)Math.min(byteCount, (long)(8192 - tail.limit));
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) {
return -1L;
} else {
tail.limit += bytesRead;
sink.size += (long)bytesRead;
return (long)bytesRead;
}
} catch (AssertionError var7) {
if (Okio.isAndroidGetsocknameError(var7)) {
throw new IOException(var7);
} else {
throw var7;
}
}
}
}
public void close() throws IOException {
in.close();
}
public Timeout timeout() {
return timeout;
}
public String toString() {
return "source(" + in + ")";
}
};
}
}
複製代碼
這裏構建並實現了 Source
的一個匿名內部類並返回,應該就是 Source
的默認實現了。
它除了 read
方法其餘都只是簡單地調用了 InputStream
的對應方法,咱們重點看 read
方法:
首先它進行了一些相關的狀態檢測,以後經過 sink.writeableSegment
獲取到了一個能夠寫入的 Segment
。以後從 InputStream
中讀取數據向 Segment
中寫入,讀取的大小被限制爲了 8192 個字節。
Buffer
在 Sink
及 Source
中都擔任了一個十分重要的地位,它對應了咱們內存中存儲的數據,對這些數據進行了抽象。下面讓咱們對 Buffer
進行分析:
Buffer
雖然是咱們內存中數據的抽象,但數據實際上並非存儲在 Buffer
中的,它在內部維護了一個 Segment
的循環鏈表,Segment
纔是真正存儲數據的地方。它經過 Segment
將數據分紅了幾段,經過鏈表進行鏈接。在 Buffer
內部封裝了許多 I/O 操做,都是在對 Segment
中的數據進行處理。
爲何要使用 Segment
對數據進行分段存儲而不直接存儲整個數據呢?因爲數據是分段存放的,這些段中的某一部分可能與另外一個 Buffer
中的數據剛好是相同的,此時就體現出了 Segment
的靈活性,咱們不須要將數據拷貝到另外一個 Buffer
中,只須要將其 Segment
指向這個重複段的 Segment
便可。同時,對於一些如將數據從 Source
轉移到 Sink
中這種狀況,也不須要進行拷貝,只須要將鏈表指向咱們的 Segment
便可,極大地提升了效率,同時節省了內存空間。
咱們首先看一下存儲數據的 Segment
,它表明了數據中的一段,是一個雙向的循環鏈表,主要有如下的參數:
final class Segment {
// Segment 存儲數據的大小
static final int SIZE = 8192;
// 進行數據共享的最小字節數
static final int SHARE_MINIMUM = 1024;
// 存儲數據的字節數組
final byte[] data;
// 用戶讀取數據的下一個起始位置
int pos;
// 能夠被寫入的下一個起始位置
int limit;
// 數據是否已被共享
boolean shared;
// 該字節數組是否屬於該Segment
boolean owner;
// 鏈表指針
Segment next;
// 鏈表指針
Segment prev;
// ...
}
複製代碼
能夠看到,其中 pos
表明了下一次讀取的起始位置,而 limit
表明了下一次寫入的起始位置,咱們能夠根據它們兩個值將整個 Segment
的空間分爲如圖的三段:
其中已讀區域的數據咱們之後都不會再用到,已寫入區域的數據正在等待讀取,而空閒區域尚未填入數據,能夠進行寫入。
同時,Segment
還支持了對數據的共享,經過 shared
及 owner
字段分別代表了數據是否已被共享以及其是否屬於當前 Segment
。同時它提供了兩種拷貝方式: sharedCopy
以及 unsharedCopy
。
unsharedCopy
返回了一個新的 Segment
,並將 data
數組經過 clone
方法拷貝到了新 Segment
中:
/** Returns a new segment that its own private copy of the underlying byte array. */
final Segment unsharedCopy() {
return new Segment(data.clone(), pos, limit, false, true);
}
複製代碼
而 sharedCopy
一樣返回了一個新的 Segment
,但其 data
數組是與新 Segment
進行共享的:
/**
* Returns a new segment that shares the underlying byte array with this. Adjusting pos and limit
* are safe but writes are forbidden. This also marks the current segment as shared, which
* prevents it from being pooled.
*/
final Segment sharedCopy() {
shared = true;
return new Segment(data, pos, limit, true, false);
}
複製代碼
同時經過註釋咱們能夠看到,當數據共享後,爲了保證安全性,禁止了寫入操做。同時將被拷貝的 Segment
也標記爲了 shared
,從而防止其被回收。
這樣的設計一樣是爲了減小拷貝,從而提升 I/O 的效率。
Segment
還支持了與前一個 Segment
的合併以及對自身的分割操做,從而使得使用者可以更靈活地操做。
合併操做會在當前 Segment
與它的前一個節點都沒有超過其大小的一半時,將兩者的數據進行合併,並將當前 Segment
進行回收,從而增大內存的利用效率:
/**
* Call this when the tail and its predecessor may both be less than half
* full. This will copy data so that segments can be recycled.
*/
public final void compact() {
if (prev == this) throw new IllegalStateException();
// 上一個節點的數據不是能夠寫入的(是共享數據),取消合併
if (!prev.owner) return;
// 計算當前節點與前一個節點的剩餘空間
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
// 沒有足夠的寫入空間時,不進行合併å
if (byteCount > availableByteCount) return;
// 進行合併,將當前節點數據寫入前一節點
writeTo(prev, byteCount);
// 從鏈表中刪除當前節點,並進行回收
pop();
SegmentPool.recycle(this);
}
複製代碼
而分割操做則會將 Segment
中的數據分割爲 [pos, pos+byteCount)
及 [pos+byteCount, limit)
的兩段:
public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
// We have two competing performance goals:
// - Avoid copying data. We accomplish this by sharing segments.
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// To balance these goals we only share segments when the copy will be large.
if (byteCount >= SHARE_MINIMUM) {
// 若是拷貝量大於1024字節,經過共享的形式
prefix = sharedCopy();
} else {
// 拷貝量低於1024字節,經過arrayCopy進行拷貝
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
// 對limit及pos進行修改
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
複製代碼
這裏首先對不一樣數據段的數據進行了處理,若是數據段大於了 1024 字節,則將數據經過共享交給了分割的前一個節點,兩端 Segment
公用同一個 data
數組,不然經過拷貝的形式構建一個新的 Segment
。
爲何這裏須要對數據大小的不一樣採用不一樣的處理方式呢?咱們能夠看到上面的註釋,裏面給出了答案:首先,爲了不拷貝數據帶來的性能開銷,加入了共享 Segment
的功能。可是因爲共享的數據是隻讀的,若是有不少很短的數據段的話,使用的表現並不會很好,所以只有當拷貝的數據量比較大時,纔會進行 Segment
的共享。
以後,將兩者的 pos
及 limit
都進行了設置。因爲 pos
以前的部分及 limit
以後的部分都不會影響到咱們正常的讀取和寫入,所以咱們能夠不用關心它們目前的狀態,不必再對它們進行一些如填充零之類的操做。
同時,Okio 還使用了 SegmentPool
來實現一個對象池,從而避免 Segment
頻繁地建立及銷燬所帶來的性能開銷。
SegmentPool
的實現十分簡單,它內部維護了一個單鏈表,用於存儲被回收存在池中的 Segment
,其最大容量被限制在了 64 k。
當須要 Segment
時,能夠經過 take
方法來獲取一個被回收的對象:
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock. } 複製代碼
它會在單鏈表中找到一個空閒的 Segment
並初始化後返回。若當前鏈表中沒有對象,則會建立一個新的 Segment
。
當 Segment
使用完畢時,首先能夠經過 Segment
的 pop
操做將其從鏈表中移除,以後能夠調用 SegmentPool.recycle
方法對其進行回收:
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
複製代碼
回收 Segment
時,不會對只讀的 Segment
進行回收,若 Segment
個數超過了上限,則不會對該 Segment
進行回收。
Okio 與 java.io 有個很大的不一樣,體如今 Buffer
的數據轉移上,咱們能夠經過其 copyTo
方法來完成數據的轉移。之因此叫轉移,由於它相對於複製來講,是有很大的數據提高的。例如咱們能夠看到兩個 Buffer
之間的數據轉移是如何進行的:
public final Buffer copyTo(Buffer out, long offset, long byteCount) {
if (out == null) throw new IllegalArgumentException("out == null");
checkOffsetAndCount(size, offset, byteCount);
if (byteCount == 0) return this;
out.size += byteCount;
// 跳過不進行拷貝的 Segment
Segment s = head;
for (; offset >= (s.limit - s.pos); s = s.next) {
offset -= (s.limit - s.pos);
}
for (; byteCount > 0; s = s.next) {
// 經過 sharedCopy 將數據拷貝到 copy 中
Segment copy = s.sharedCopy();
copy.pos += offset;
copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
// 插入 Segment
if (out.head == null) {
out.head = copy.next = copy.prev = copy;
} else {
out.head.prev.push(copy);
}
byteCount -= copy.limit - copy.pos;
offset = 0;
}
return this;
}
複製代碼
從上面的代碼中能夠看出,實際上這個過程是經過了 Segment
共享實現的,所以不須要進行拷貝,極大地提升了數據轉移的效率。
咱們能夠經過 Okio.buffer
方法對一個普通的 Source
進行包裝,獲取一個具備緩衝能力的 BufferSource
,它是一個接口,定義了一系列讀取的方法:
public interface BufferedSource extends Source, ReadableByteChannel {
@Deprecated
Buffer buffer();
Buffer getBuffer();
boolean exhausted() throws IOException;
void require(long byteCount) throws IOException;
boolean request(long byteCount) throws IOException;
byte readByte() throws IOException;
short readShort() throws IOException;
short readShortLe() throws IOException;
// ...一系列讀取方法
long indexOf(byte b, long fromIndex) throws IOException;
long indexOf(byte b, long fromIndex, long toIndex) throws IOException;
long indexOf(ByteString bytes) throws IOException;
long indexOf(ByteString bytes, long fromIndex) throws IOException;
long indexOfElement(ByteString targetBytes) throws IOException;
long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;
boolean rangeEquals(long offset, ByteString bytes) throws IOException;
boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
throws IOException;
BufferedSource peek();
InputStream inputStream();
}
複製代碼
它主要有兩個實現類:Buffer
與 RealBufferedSource
。其中 RealBufferedSource
顯然是咱們經過 buffer
方法包裝後獲得的類,而 Buffer
實際上對 BufferSource
也進行了實現,經過一系列 read
方法能夠從 Segment
中讀取處對應的數據。而咱們的 RealBufferedSource
則是 Source
的一個包裝類,而且其維護了一個 Buffer
,從而提升 Input 的效率。咱們先分析其思路,再來討論爲何這樣能提升 Input 的效率。
咱們能夠首先看到 RealBufferedSource
的讀取方法,這裏以 readByteArray
方法舉例:
@Override public byte[] readByteArray(long byteCount) throws IOException {
require(byteCount);
return buffer.readByteArray(byteCount);
}
複製代碼
這裏首先調用了 require
方法,以後再從 buffer
中將數據讀出,看來在 require
中將數據先讀取到了 buffer
中。
咱們看到 require
方法:
@Override public void require(long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}
複製代碼
它實際上轉調到了 request
方法:
@Override public boolean request(long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
while (buffer.size < byteCount) {
if (source.read(buffer, Segment.SIZE) == -1) return false;
}
return true;
}
複製代碼
request
方法中,不斷地向 buffer
中讀取,每次讀取 Segment.SIZE
也就是 8192 個字節。也就是它讀取的量是 byteCount
以 8192 字節向上取整。爲何它不恰好讀取 byteCount
個字節,要讀滿 8192 個字節呢?
這就是一種預取思想,由於 I/O 操做每每是很是頻繁的,若是進行了一次讀取,那就頗有可能還會進行下一次讀取,所以咱們預先把它下一次可能讀取的部分一塊兒讀取出來,這樣下次讀取時,就不須要再對系統進行請求以獲取數據了,能夠直接從咱們的 buffer
中拿到。這就是爲何說加入 buffer
提升了咱們 I/O 的效率。
可能還有人會問,爲何這樣能提升 I/O 效率呢,不都是讀了同樣的量麼?這個就涉及到一些操做系統的知識了。在現代的操做系統中,咱們的程序每每運行在用戶態,而用戶態其實是沒有進行 I/O 的權限的,所以每每都是向操做系統發起請求,切換到內核態,再進行 I/O,完成後再次回到用戶態。這樣的用戶態及內核態的切換其實是很是耗時的,而且這個過程當中也伴隨着拷貝。所以採用上面的 buffer
能夠有效地減小咱們的這種系統 I/O 調用,加快咱們的效率。
咱們一樣能夠經過 Okio.buffer
方法對一個普通的 Sink
進行包裝,從而獲取一個帶有 buffer 緩衝能力的 BufferedSink
。BufferedSink
也是一個接口 ,內部定義了一系列寫入的方法:
public interface BufferedSink extends Sink, WritableByteChannel {
Buffer buffer();
BufferedSink write(ByteString byteString) throws IOException;
BufferedSink write(byte[] source) throws IOException;
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
long writeAll(Source source) throws IOException;
BufferedSink write(Source source, long byteCount) throws IOException;
// ...一些對 write 的封裝
@Override void flush() throws IOException;
BufferedSink emit() throws IOException;
BufferedSink emitCompleteSegments() throws IOException;
OutputStream outputStream();
}
複製代碼
BufferedSink
一樣有兩個實現類:Buffer
和 RealBufferedSink
,咱們能夠先看到 RealBufferedSink
,它是一個 Sink
的包裝類,而且內部維護了一個 Buffer
。
咱們先看看其寫入方法:
@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source, offset, byteCount);
return emitCompleteSegments();
}
複製代碼
這裏拿了一個簡單的寫入 byte[]
的方法進行了舉例,它首先將數據寫入了 buffer
中,以後調用了 emitCompleteSegments
方法。能夠看到這裏並無對 sink
真正進行寫入,那寫入到底是在哪裏進行的呢?咱們看看 emitCompleteSegments
方法中作了什麼:
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
複製代碼
這裏首先調用 buffer.completeSegmentByteCount
方法獲取到了 buffer
中已寫入但未被讀取的部分字節數(只包括已經被寫滿了的 Segment
中的),以後調用 sink.write
將其寫入到了 sink
中。
這裏其實很奇怪,按道理來講 buffer
的做用是經過緩存來進行一些優化,但這個方法將數據寫入 buffer
後,數據又當即被寫入到了 sink
中。這樣相比直接寫入到 sink
中,反而會帶來性能的損耗啊。這裏爲何要這樣作呢?
我看到這裏時對這段也比較奇怪,但考慮到 Okio
的總體設計來講,應該是把 Buffer
當作了一個數據統一的中轉站,將讀寫的優化統一放在了 Buffer
中進行,所以考慮到總體的一致性,將 RealBufferedSink
也採用了經過 Buffer
中轉的方式編寫,應該算是一種妥協吧。而且採用 Buffer
還有好處就是,一份數據既能夠用於讀也能夠用於寫。
RealBufferedSink
還支持了 flush
操做,經過 flush
方法能夠將緩衝區的全部數據寫入 sink
中:
@Override public void flush() throws IOException {
if (closed) throw new IllegalStateException("closed");
if (buffer.size > 0) {
sink.write(buffer, buffer.size);
}
sink.flush();
}
複製代碼
RealBufferedSink
還具備 emit
功能,分別是 emitCompleteSegments
方法及 emit
方法,前者是將全部已填滿的 Segment
中已寫入未讀取的數據寫入 sink
,後者則是將 buffer
中全部已寫入未讀取數據寫入 sink
(相似 flush
):
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
@Override public BufferedSink emit() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.size();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
複製代碼
Okio 中經過 Timeout
類實現了 Sink
與 Source
的超時機制,在 Sink
的寫入與 Source
的讀取時對超時進行判斷,若是超時則中斷寫入等操做。其中對於包裝了普通 InputStream
/ OutputStream
的使用了普通的 Timeout
,而對於對 Socket
進行了包裝的則使用 AsyncTimeout
。
咱們先對普通的 Timeout
進行研究,Timeout
中主要有兩個值,timeout
與 deadline
,分別表明了 wait
的最大等待時間與完成某個工做的超時時間。
對於 deadline
,咱們能夠經過 deadline
方法進行設定:
/** Set a deadline of now plus {@code duration} time. */
public final Timeout deadline(long duration, TimeUnit unit) {
if (duration <= 0) throw new IllegalArgumentException("duration <= 0: " + duration);
if (unit == null) throw new IllegalArgumentException("unit == null");
return deadlineNanoTime(System.nanoTime() + unit.toNanos(duration));
}
複製代碼
以後,在每一個須要檢查超時的地方須要調用該 Timeout
的 throwIfReached
方法(如 Sink
的 write
方法):
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
Thread.currentThread().interrupt(); // Retain interrupted status.
throw new InterruptedIOException("interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
複製代碼
這裏很簡單,就是進行時間的校驗,若達到了設定的時間,則拋出異常從而中斷後續操做。
同時,Timeout
還實現了對 monitor
進行 wait
的超時機制,經過 waitUntilNotified
方法能夠等待 monitor
被 notify
,若等待的過程超過了 Timeout
所設定的時間或當前線程被中斷,則會拋出異常,從而避免一直進行等待。而且,該方法須要在 synchronized
代碼塊中調用,以保證線程安全。
在咱們構造了一個 Timeout
後,可使用 timeout
方法對其 wait
超時時間進行設定:
public Timeout timeout(long timeout, TimeUnit unit) {
if (timeout < 0) throw new IllegalArgumentException("timeout < 0: " + timeout);
if (unit == null) throw new IllegalArgumentException("unit == null");
this.timeoutNanos = unit.toNanos(timeout);
return this;
}
複製代碼
這裏主要是將 timeoutNanos
設置爲了對應的值。接着咱們看到 waitUntilNotified
方法:
/**
* Waits on {@code monitor} until it is notified. Throws {@link InterruptedIOException} if either
* the thread is interrupted or if this timeout elapses before {@code monitor} is notified. The
* caller must be synchronized on {@code monitor}.
*/
public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
try {
boolean hasDeadline = hasDeadline();
long timeoutNanos = timeoutNanos();
// 沒有 timeout 的設定的話,直接調用 monitor 的 wait 方法
if (!hasDeadline && timeoutNanos == 0L) {
monitor.wait(); // There is no timeout: wait forever.
return;
}
// 計算咱們要 wait 的時間
long waitNanos;
long start = System.nanoTime();
// 下面主要就是等待 timeout 與 deadline 中最小的那個
if (hasDeadline && timeoutNanos != 0) {
long deadlineNanos = deadlineNanoTime() - start;
waitNanos = Math.min(timeoutNanos, deadlineNanos);
} else if (hasDeadline) {
waitNanos = deadlineNanoTime() - start;
} else {
waitNanos = timeoutNanos;
}
// wait 相應時間
long elapsedNanos = 0L;
if (waitNanos > 0L) {
long waitMillis = waitNanos / 1000000L;
monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
elapsedNanos = System.nanoTime() - start;
}
// 若是尚未 notify,則拋出異常
if (elapsedNanos >= waitNanos) {
throw new InterruptedIOException("timeout");
}
} catch (InterruptedException e) {
// 線程若是在這個過程當中被 interrupt,則拋出異常
Thread.currentThread().interrupt();
throw new InterruptedIOException("interrupted");
}
}
複製代碼
AsyncTimeout
是 Timeout
的子類,接下來咱們看看 AsyncTimeout
是如何對 Socket
中的超時進行處理的。
首先能夠看到 AsyncTimeout
中保存了一個 head
及一個 next
引用,顯然這裏是有一個鏈表存儲的 AsyncTimeout
隊列的:
// AsyncTimeout 隊列的頭部
static @Nullable AsyncTimeout head;
// 當前節點是否在隊列中
private boolean inQueue;
// 下一個節點
private @Nullable AsyncTimeout next;
複製代碼
這裏感受與 MessageQueue
有點類似,猜想 AsyncTimeout
會根據超時的時間按序存儲在隊列中。
而且從 AsyncTimeout
的 JavaDoc 中能夠看到,它須要使用者在異步的事件開始時調用 enter
方法,結束時調用 exit
方法。同時它在背後開闢了一個線程對超時進行定時檢查。
讓咱們先看到 enter
方法:
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline
// 時間到了就不加入隊列了
if (timeoutNanos == 0 && !hasDeadline) {
return;
}
inQueue = true;
// 開啓線程對超時進行檢查
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
複製代碼
上面主要是將其 inQueue
設置爲了 true,以後調用 scheduleTimeout
方法對超時進行定時檢查。咱們暫時先不關注 scheduleTimeout
的具體實現。
接着咱們看到 exit
方法:
/** Returns true if the timeout occurred. */
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
複製代碼
這裏也很是簡單,就是將 inQueue
設置爲了 false,並調用 cancelScheduledTimeout
方法中止前面的定時校驗線程。
咱們接下來看看這個定時校驗的具體實現,咱們先看到 scheduleTimeout
方法:
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// 若是隊列中尚未節點,構造一個頭節點並啓動 Watchdog
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
// 計算具體超時時間,主要是取 timeout 與 deadline 的最小值
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
long remainingNanos = node.remainingNanos(now);
// 按剩餘時間從小到大插入到隊列中
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
// 插入在隊列頭部,進行 notify
AsyncTimeout.class.notify();
}
break;
}
}
}
複製代碼
上面的邏輯主要分爲如下幾步:
Watchdog
,Watchdog
是一個 Thread
的子類,也就是咱們的定時掃描線程。Timeout
的超時時間,取了 timeout
與 deadline
的最小值timeout
按剩餘時間從小到大的順序插入隊列中notify
(這裏還沒法瞭解到意圖,咱們能夠日後看看)接着咱們看看 cancelScheduledTimeout
作了些什麼:
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn t found in the linked list: it must have timed out!
return true;
}
複製代碼
這裏很簡單,就是將該 AsyncTimeout
從隊列中移除,若返回 true,則表明超時已經發生,若返回 false,則表明超時還未發生,該 Timeout
被移除。這個返回值一樣反映到了咱們的 exit
方法返回值中。
接着咱們看看 Watchdog
到底是如何對超時進行檢測的:
private static final class Watchdog extends Thread {
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// 找不到要 interrupt 的節點,繼續尋找
if (timedOut == null) continue;
// 隊列已空,中止線程
if (timedOut == head) {
head = null;
return;
}
}
// 調用 timeout 方法通知超時
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
複製代碼
Wachdog
中不斷調用 awaitTimeout
方法嘗試獲取一個能夠中止的 Timeout
,以後調用了其 timeOut
方法通知外部已超時。
咱們能夠看看 awaitTimeout
作了什麼:
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
AsyncTimeout node = head.next;
// 隊列爲空,wait 直到有新的節點加入
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// 計算該節點須要 wait 的時間
if (waitNanos > 0) {
// wait 對應的時間
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// wait 事後已超時,將其移出隊列
head.next = node.next;
node.next = null;
return node;
}
複製代碼
這裏主要有如下幾步:
wait
直到有新的節點加入隊列wait
的時間並 wait
對應時間經過這裏的代碼,咱們就知道爲何前面在鏈表頭部加入節點時須要進行一次 notify
了,主要有兩個目的:
notify
通知此處有新元素加入隊列。wait
來計算該新的 Timeout
所須要的等待時間,並對其進行超時處理。這裏的處理和 Android 中 MessageQueue
的設計仍是有殊途同歸之妙的,咱們能夠學習一波。
AsyncTimeout
還實現了 sink
及 source
方法來實現了支持 AsyncTimeout
超時機制的 Sink
及 Source
,主要是經過在其各類操做先後分別調用 enter
及 exit
。下面以 Sink
爲例:
public final Sink sink(final Sink sink) {
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
long toWrite = 0L;
for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
int segmentSize = s.limit - s.pos;
toWrite += segmentSize;
if (toWrite >= byteCount) {
toWrite = byteCount;
break;
}
}
// Emit one write. Only this section is subject to the timeout.
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
@Override public void flush() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.flush();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public Timeout timeout() {
return AsyncTimeout.this;
}
@Override public String toString() {
return "AsyncTimeout.sink(" + sink + ")";
}
}
複製代碼
比較簡單,這裏就不作太多解釋了。
Okio 是一套基於 java.io 進行了一系列優化的十分優秀的 I/O 庫,它經過引入了 Segment
機制大大下降了數據遷移的成本,減小了拷貝的次數,而且對 java.io 繁瑣的體系進行了簡化,使得整個庫更易於使用。在 Okio
中還實現了不少有其它功能的 Source
及 Sink
,感興趣的讀者能夠自行翻閱一下源碼。同時各位能夠去回顧一下前面的 OkHttp 源碼解析中,OkHttp 是如何使用 Okio 進行 Socket 的數據寫入及讀取的。