InterruptibleChannel接口java
public interface InterruptibleChannel extends Channel { /** * 關閉當前Channel * * 任何當前阻塞在當前channel執行的IO操做上的線程,都會收到一個AsynchronousCloseException異常 */ public void close() throws IOException; }
InterruptibleChannel接口沒有定義任何方法,其中的close方法是父接口就有的,這裏只是添加了額外的註釋。sql
AbstractInterruptibleChannel實現了InterruptibleChannel接口,並提供了實現可中斷IO機制的重要的方法,好比begin(),end()。架構
在解讀這些方法的代碼前,先了解一下NIO中,支持中斷的Channel代碼是如何編寫的。併發
第一個要求是要正確使用begin()和end()方法:異步
boolean completed = false; try { begin(); completed = ...; // 執行阻塞IO操做 return ...; // 返回結果 } finally { end(completed); }
NIO規定了,在阻塞IO的語句先後,須要調用begin()和end()方法,爲了保證end()方法必定被調用,要求放在finally語句塊中。分佈式
第二個要求是Channel須要實現java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個方法。AbstractInterruptibleChannel在處理中斷時,會調用這個方法,使用Channel的具體實現來關閉Channel。函數
接下來咱們具體看一下begin()和end()方法是如何實現的。高併發
begin方法性能
// 保存中斷處理對象實例 private Interruptible interruptor; // 保存被中斷線程實例 private volatile Thread interrupted; protected final void begin() { // 初始化中斷處理對象,中斷處理對象提供了中斷處理回調 // 中斷處理回調登記被中斷的線程,而後調用implCloseChannel方法,關閉Channel if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { // 若是當前Channel已經關閉,則直接返回 if (!open) return; // 設置標誌位,同時登記被中斷的線程 open = false; interrupted = target; try { // 調用具體的Channel實現關閉Channel AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; }
// 登記中斷處理對象到當前線程學習
blockedOn(interruptor);
// 判斷當前線程是否已經被中斷,若是已經被中斷,可能登記的中斷處理對象沒有被執行,這裏手動執行一下
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
從begin()方法中,咱們能夠看出NIO實現可中斷IO操做的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理對象,這樣Thread對象在被中斷時,會執行中斷處理對象中的回調,這個回調中,執行關閉Channel的操做。這樣就實現了Channel對線程中斷的響應了。
接下來重點就是研究「Thread添加中斷處理邏輯」這個機制是如何實現的了,是經過blockedOn方法實現的:
static void blockedOn(Interruptible intr) { // package-private sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr); }
blockedOn方法使用的是JavaLangAccess的blockedOn方法。
SharedSecrets是一個神奇而糟糕的類,爲啥說是糟糕呢,由於這個方法的存在,就是爲了訪問JDK類庫中一些由於類做用域限制而外部沒法訪問的類或者方法。JDK不少類與方法是私有或者包級別私有的,外部是沒法訪問的,可是JDK在自己實現的時候又存在互相依賴的狀況,因此爲了外部能夠不依賴反射訪問這些類或者方法,在sun包下,存在這麼一個類,提供了各類超越限制的方法。
SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess對象。JavaLangAccess對象就和名稱所說的同樣,提供了java.lang包下一些非公開的方法的訪問。這個類在System初始化時被構造:
// java.lang.System#setJavaLangAccess private static void setJavaLangAccess() { sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){ public void blockedOn(Thread t, Interruptible b) { t.blockedOn(b); } //... }); }
能夠看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個包級別私有的方法:
/* The object in which this thread is blocked in an interruptible I/O * operation, if any. The blocker's interrupt method should be invoked * after setting this thread's interrupt status. */ private volatile Interruptible blocker; private final Object blockerLock = new Object(); /* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code */ void blockedOn(Interruptible b) { // 串行化blocker相關操做 synchronized (blockerLock) { blocker = b; } }
而這個方法也很是簡單,就是設置java.lang.Thread#blocker變量爲以前提到的中斷處理對象。並且從註釋中能夠看出,這個方法就是專門爲NIO設計的,註釋都很是直白的提到了,NIO的代碼會經過sun.misc.SharedSecrets調用到這個方法。。
接下來就是重頭戲了,看一下Thread在中斷時,如何調用NIO註冊的中斷處理器:
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker;
// 若是NIO設置了中斷處理器,則只需Thread自己的中斷邏輯後,調用中斷處理器的回調函數
if (b != null) { interrupt0(); // 這一步會設置interrupt標誌位 b.interrupt(this); return; } }
// 若是沒有的話,就走普通流程
interrupt0();
}
end方法
begin()方法負責添加Channel的中斷處理器到當前線程。end()是在IO操做執行完/中斷完後的操做,負責判斷中斷是否發生,若是發生判斷是當前線程發生仍是別的線程中斷把當前操做的Channel給關閉了,對於不一樣的狀況,拋出不一樣的異常。
protected final void end(boolean completed) throws AsynchronousCloseException { // 清空線程的中斷處理器引用,避免線程一直存活致使中斷處理器沒法被回收 blockedOn(null); Thread interrupted = this.interrupted; if (interrupted != null && interrupted == Thread.currentThread()) { interrupted = null; throw new ClosedByInterruptException(); }
// 若是此次沒有讀取到數據,而且Channel被另一個線程關閉了,則排除Channel被異步關閉的異常
// 可是若是此次讀取到了數據,就不能拋出異常,由於此次讀取的數據是有效的,須要返回給用戶的(重要邏輯)
if (!completed && !open)
throw new AsynchronousCloseException();
}
經過代碼能夠看出,若是是當前線程被中斷,則拋出ClosedByInterruptException異常,表示Channel由於線程中斷而被關閉了,IO操做也隨之中斷了。
若是是當前線程發現Channel被關閉了,而且是讀取還未執行完畢的狀況,則拋出AsynchronousCloseException異常,表示Channel被異步關閉了。
end()邏輯的活動圖以下:
場景分析
併發的場景分析起來就是複雜,上面的代碼很少,可是場景不少,咱們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)爲例分析一下可能的場景:
sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)代碼以下:
public int read(ByteBuffer dst) throws IOException { ensureOpen(); // 1 if (!readable) // 2 throw new NonReadableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; // 3 do { n = IOUtil.read(fd, dst, -1, nd); // 4 } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } }
總結
在JavaIO時期,人們爲了中斷IO操做想了很多方法,核心操做就是關閉流,促使IO操做拋出異常,達到中斷IO的效果。NIO中,將這個操做植入了java.lang.Thread#interrupt方法,免去用戶本身編碼特定代碼的麻煩。使IO操做能夠像其餘可中斷方法同樣,在中斷時拋出ClosedByInterruptException異常,業務程序捕獲該異常便可對IO中斷作出響應。
歡迎工做一到五年的Java工程師朋友們加入Java架構開發 : 867748702 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、 Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper, Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料) 合理利用本身每一分每一秒的時間來學習提高本身, 不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!