上一篇文章(Okio 源碼解析(一):數據讀取流程)分析了 Okio 數據讀取的流程,從中能夠看出 Okio 的便捷與高效。Okio 的另一個優勢是提供了超時機制,而且分爲同步超時與異步超時。本文具體分析這兩種超時的實現。java
回顧一下 Okio.source
的代碼:node
public static Source source(InputStream in) { // 生成一個 Timeout 對象 return source(in, new Timeout()); } private static Source source(final InputStream in, final Timeout timeout) { if (in == null) throw new IllegalArgumentException("in == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); if (byteCount == 0) return 0; try { // 超時檢測 timeout.throwIfReached(); Segment tail = sink.writableSegment(1); int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; tail.limit += bytesRead; sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } @Override public void close() throws IOException { in.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "source(" + in + ")"; } }; }
在 Source
的構造方法中,傳入了一個 Timeout
對象。在下面建立的匿名的 Source
對象的 read
方法中,先調用了 timeout.throwIfReached()
,這裏顯然是判斷是否已經超時,代碼以下:segmentfault
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }
這裏邏輯很簡單,若是超時了則拋出異常。在 TimeOut
中有幾個變量用於設定超時的時間:異步
private boolean hasDeadline; private long deadlineNanoTime; private long timeoutNanos;
因爲 throwIfReached
是在每次讀取數據以前調用而且與數據讀取在同一個線程,因此若是讀取操做阻塞,則沒法及時拋出異常。socket
異步超時與同步超時不一樣,其開了新的線程用於檢測是否超時,下面是 Socket 的例子。ide
Okio 能夠接受一個 Socket
對象構建 Source
,代碼以下:this
public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); // 返回 timeout 封裝的 source return timeout.source(source); }
相比於 InputStream
,這裏的額外操做是引入了 AsyncTimeout
來封裝 socket
。timeout
方法生成一個 AsyncTimeout
對象,看一下代碼:線程
private static AsyncTimeout timeout(final Socket socket) { return new AsyncTimeout() { @Override protected IOException newTimeoutException(@Nullable IOException cause) { InterruptedIOException ioe = new SocketTimeoutException("timeout"); if (cause != null) { ioe.initCause(cause); } return ioe; } // 超時後調用 @Override protected void timedOut() { try { socket.close(); } catch (Exception e) { logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e); } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) { logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e); } else { throw e; } } } }; }
上面的代碼生成了一個匿名的 AsyncTimeout
,其中有個 timedout
方法,這個方法是在超時的時候被調用,能夠看出裏面的操做主要是關閉 socket
。有了 AsyncTimeout
以後,調用其 source
方法來封裝 socket
的 InputStream
。code
下面具體看看 AsyncTimeout
。對象
AsyncTimeout
繼承了 Timeout
,提供了異步的超時機制。每個 AsyncTimeout
對象包裝一個 source
,並與其它 AsyncTimeout
組成一個鏈表,根據超時時間的長短插入。AsyncTimeout
內部會新開一個叫作 WatchDog
的線程,根據超時時間依次處理 AsyncTimout
鏈表的節點。
下面是 AsyncTimeout
的一些內部變量:
// 鏈表頭結點 static @Nullable AsyncTimeout head; // 此節點是否在隊列中 private boolean inQueue; // 鏈表中下一個節點 private @Nullable AsyncTimeout next;
其中 head
是鏈表的頭結點,next
是下一個節點,inQueue
則標識此 AsyncTimeout
是否處於鏈表中。
在上面的 Okio.source(Socket socket)
中,最後返回的是 timeout.source(socket)
,下面是其代碼:
public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; // enter enter(); try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.source(" + source + ")"; } }; }
AsyncTimtout#source
依然是返回一個匿名的 Source
對象,只不過是將參數中真正的 source
包裝了一下,在 source.read
以前添加了 enter
方法,在 catch
以及 finally
中添加了 exit
方法。enter
和 exit
是重點,其中 enter
中會將當前的 AsyncTimeout
加入鏈表,具體代碼以下:
public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); } private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // 若是鏈表爲空,則新建一個頭結點,而且啓動 Watchdog線程 if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); } long now = System.nanoTime(); if (timeoutNanos != 0 && hasDeadline) { node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); } else if (timeoutNanos != 0) { node.timeoutAt = now + timeoutNanos; } else if (hasDeadline) { node.timeoutAt = node.deadlineNanoTime(); } else { throw new AssertionError(); } // 按時間將節點插入鏈表 long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } }
真正插入鏈表的操做在 scheduleTimeout
中,若是 head
節點還不存在則新建一個頭結點,而且啓動 Watchdog
線程。接着就是計算超時時間,而後遍歷鏈表進行插入。若是插入在鏈表的最前面(head
節點後面的第一個節點),則主動進行喚醒 Watchdog
線程,從這裏能夠猜到 Watchdog
線程在等待超時的過程當中是調用了 AsyncTimeout.class
的 wait
進入了休眠狀態。那麼就來看看 WatchDog
線程的實際邏輯:
private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { AsyncTimeout timedOut; synchronized (AsyncTimeout.class) { timedOut = awaitTimeout(); // Didn't find a node to interrupt. Try again. if (timedOut == null) continue; // The queue is completely empty. Let this thread exit and let another watchdog thread // get created on the next call to scheduleTimeout(). if (timedOut == head) { head = null; return; } } // Close the timed out node. timedOut.timedOut(); } catch (InterruptedException ignored) { } } } }
WatchDog
主要是調用 awaitTimeout()
獲取一個已超時的 timeout
,若是不爲空而且是 head
節點,說明鏈表中已經沒有其它節點,能夠結束線程,不然調用 timedOut.timedOut()
, timeOut()
是一個空方法,由用戶實現超時後應該採起的操做。 awaitTimeout
是獲取超時節點的方法:
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { // Get the next eligible node. AsyncTimeout node = head.next; // 隊列爲空的話等待有節點進入隊列或者達到超時IDLE_TIMEOUT_MILLIS的時間 if (node == null) { long startNanos = System.nanoTime(); AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head // The idle timeout elapsed. : null; // The situation has changed. } // 計算等待時間 long waitNanos = node.remainingNanos(System.nanoTime()); // The head of the queue hasn't timed out yet. Await that. if (waitNanos > 0) { // Waiting is made complicated by the fact that we work in nanoseconds, // but the API wants (millis, nanos) in two arguments. long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); // 調用 wait AsyncTimeout.class.wait(waitMillis, (int) waitNanos); return null; } // 第一個節點超時,移除並返回這個節點 head.next = node.next; node.next = null; return node; }
與 enter
相反,exit
則是視狀況拋出異常而且移除鏈表中的節點,這裏就不放具體代碼了。
Okio 經過 Timeout
以及 AsyncTimeout
分別提供了同步超時和異步超時功能,同步超時是在每次讀取數據前判斷是否超時,異步超時則是將 AsyncTimeout
組成有序鏈表,而且開啓一個線程來監控,到達超時則觸發相關操做。
若是個人文章對您有幫助,不妨點個贊支持一下(^_^)