JDK源碼閱讀:InterruptibleChannel與可中斷IO,ig牛逼

Java傳統IO是不支持中斷的,因此若是代碼在read/write等操做阻塞的話,是沒法被中斷的。這就沒法和Thead的interrupt模型配合使用了。JavaNIO衆多的升級點中就包含了IO操做對中斷的支持。InterruptiableChannel表示支持中斷的Channel。咱們經常使用的FileChannel,SocketChannel,DatagramChannel都實現了這個接口。

InterruptibleChannel接口java

 

public interface InterruptibleChannel extends Channel
{
 
 /**
 * 關閉當前Channel
 * 
 * 任何當前阻塞在當前channel執行的IO操做上的線程,都會收到一個AsynchronousCloseException異常
 */
 public void close() throws IOException;
}

InterruptibleChannel接口沒有定義任何方法,其中的close方法是父接口就有的,這裏只是添加了額外的註釋。sql

AbstractInterruptibleChannel實現了InterruptibleChannel接口,並提供了實現可中斷IO機制的重要的方法,好比begin(),end()。架構

在解讀這些方法的代碼前,先了解一下NIO中,支持中斷的Channel代碼是如何編寫的。併發

第一個要求是要正確使用begin()和end()方法:異步

 

boolean completed = false;
try {
 begin();
 completed = ...; // 執行阻塞IO操做
 return ...; // 返回結果
} finally {
 end(completed);
}

NIO規定了,在阻塞IO的語句先後,須要調用begin()和end()方法,爲了保證end()方法必定被調用,要求放在finally語句塊中。分佈式

第二個要求是Channel須要實現java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個方法。AbstractInterruptibleChannel在處理中斷時,會調用這個方法,使用Channel的具體實現來關閉Channel。函數

接下來咱們具體看一下begin()和end()方法是如何實現的。高併發

begin方法性能

 

// 保存中斷處理對象實例
private Interruptible interruptor;
// 保存被中斷線程實例
private volatile Thread interrupted;
 
protected final void begin() {
 // 初始化中斷處理對象,中斷處理對象提供了中斷處理回調
 // 中斷處理回調登記被中斷的線程,而後調用implCloseChannel方法,關閉Channel
 if (interruptor == null) {
 interruptor = new Interruptible() {
 public void interrupt(Thread target) {
 synchronized (closeLock) {
 // 若是當前Channel已經關閉,則直接返回
 if (!open)
 return;
 
 // 設置標誌位,同時登記被中斷的線程
 open = false;
 interrupted = target;
 try {
 // 調用具體的Channel實現關閉Channel
 AbstractInterruptibleChannel.this.implCloseChannel();
 } catch (IOException x) { }
 }
 }};
 }

// 登記中斷處理對象到當前線程學習

blockedOn(interruptor);

// 判斷當前線程是否已經被中斷,若是已經被中斷,可能登記的中斷處理對象沒有被執行,這裏手動執行一下

Thread me = Thread.currentThread();

if (me.isInterrupted())

interruptor.interrupt(me);

}

從begin()方法中,咱們能夠看出NIO實現可中斷IO操做的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理對象,這樣Thread對象在被中斷時,會執行中斷處理對象中的回調,這個回調中,執行關閉Channel的操做。這樣就實現了Channel對線程中斷的響應了。

接下來重點就是研究「Thread添加中斷處理邏輯」這個機制是如何實現的了,是經過blockedOn方法實現的:

 

static void blockedOn(Interruptible intr) { // package-private
 sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);
}

blockedOn方法使用的是JavaLangAccess的blockedOn方法。

SharedSecrets是一個神奇而糟糕的類,爲啥說是糟糕呢,由於這個方法的存在,就是爲了訪問JDK類庫中一些由於類做用域限制而外部沒法訪問的類或者方法。JDK不少類與方法是私有或者包級別私有的,外部是沒法訪問的,可是JDK在自己實現的時候又存在互相依賴的狀況,因此爲了外部能夠不依賴反射訪問這些類或者方法,在sun包下,存在這麼一個類,提供了各類超越限制的方法。

SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess對象。JavaLangAccess對象就和名稱所說的同樣,提供了java.lang包下一些非公開的方法的訪問。這個類在System初始化時被構造:

 

// java.lang.System#setJavaLangAccess
private static void setJavaLangAccess() {
 sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
 public void blockedOn(Thread t, Interruptible b) {
 t.blockedOn(b);
 }
 //...
 });
}

能夠看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個包級別私有的方法:

 

/* The object in which this thread is blocked in an interruptible I/O
 * operation, if any. The blocker's interrupt method should be invoked
 * after setting this thread's interrupt status.
 */
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
 
/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
 */
void blockedOn(Interruptible b) {
 // 串行化blocker相關操做
 synchronized (blockerLock) {
 blocker = b;
 }
}

而這個方法也很是簡單,就是設置java.lang.Thread#blocker變量爲以前提到的中斷處理對象。並且從註釋中能夠看出,這個方法就是專門爲NIO設計的,註釋都很是直白的提到了,NIO的代碼會經過sun.misc.SharedSecrets調用到這個方法。。

接下來就是重頭戲了,看一下Thread在中斷時,如何調用NIO註冊的中斷處理器:

 

public void interrupt() {
 if (this != Thread.currentThread())
 checkAccess();
 
 synchronized (blockerLock) {
 Interruptible b = blocker;

// 若是NIO設置了中斷處理器,則只需Thread自己的中斷邏輯後,調用中斷處理器的回調函數

if (b != null) {
 interrupt0(); // 這一步會設置interrupt標誌位
 b.interrupt(this);
 return;
 }
 }

// 若是沒有的話,就走普通流程

interrupt0();

}

JDK源碼閱讀:InterruptibleChannel與可中斷IO,ig牛逼

 

end方法

begin()方法負責添加Channel的中斷處理器到當前線程。end()是在IO操做執行完/中斷完後的操做,負責判斷中斷是否發生,若是發生判斷是當前線程發生仍是別的線程中斷把當前操做的Channel給關閉了,對於不一樣的狀況,拋出不一樣的異常。

 

protected final void end(boolean completed) throws AsynchronousCloseException
{
 // 清空線程的中斷處理器引用,避免線程一直存活致使中斷處理器沒法被回收
 blockedOn(null);
 Thread interrupted = this.interrupted;
 
 if (interrupted != null && interrupted == Thread.currentThread()) {
 interrupted = null;
 throw new ClosedByInterruptException();
 }

// 若是此次沒有讀取到數據,而且Channel被另一個線程關閉了,則排除Channel被異步關閉的異常

// 可是若是此次讀取到了數據,就不能拋出異常,由於此次讀取的數據是有效的,須要返回給用戶的(重要邏輯)

if (!completed && !open)

throw new AsynchronousCloseException();

}

經過代碼能夠看出,若是是當前線程被中斷,則拋出ClosedByInterruptException異常,表示Channel由於線程中斷而被關閉了,IO操做也隨之中斷了。

若是是當前線程發現Channel被關閉了,而且是讀取還未執行完畢的狀況,則拋出AsynchronousCloseException異常,表示Channel被異步關閉了。

end()邏輯的活動圖以下:

JDK源碼閱讀:InterruptibleChannel與可中斷IO,ig牛逼

 

場景分析

併發的場景分析起來就是複雜,上面的代碼很少,可是場景不少,咱們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)爲例分析一下可能的場景:

  1. A線程read,B線程中斷A線程:A線程拋出ClosedByInterruptException異常
  2. A,B線程read,C線程中斷A線程
  3. A被中斷時,B剛剛進入read方法:A線程拋出ClosedByInterruptException異常,B線程ensureOpen方法拋出ClosedChannelException異常
  4. A被中斷時,B阻塞在底層read方法中:A線程拋出ClosedByInterruptException異常,B線程底層方法拋出異常返回,end方法中拋出AsynchronousCloseException異常
  5. A被中斷時,B已經讀取到數據:A線程拋出ClosedByInterruptException異常,B線程正常返回

sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)代碼以下:

 

public int read(ByteBuffer dst) throws IOException {
 ensureOpen(); // 1
 if (!readable) // 2
 throw new NonReadableChannelException();
 synchronized (positionLock) {
 int n = 0;
 int ti = -1;
 try { 
 begin();
 ti = threads.add();
 if (!isOpen())
 return 0; // 3
 do {
 n = IOUtil.read(fd, dst, -1, nd); // 4
 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 return IOStatus.normalize(n);
 } finally {
 threads.remove(ti);
 end(n > 0);
 assert IOStatus.check(n);
 }
 }
}

總結

在JavaIO時期,人們爲了中斷IO操做想了很多方法,核心操做就是關閉流,促使IO操做拋出異常,達到中斷IO的效果。NIO中,將這個操做植入了java.lang.Thread#interrupt方法,免去用戶本身編碼特定代碼的麻煩。使IO操做能夠像其餘可中斷方法同樣,在中斷時拋出ClosedByInterruptException異常,業務程序捕獲該異常便可對IO中斷作出響應。

歡迎工做一到五年的Java工程師朋友們加入Java架構開發 : 867748702 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、 Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper, Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料) 合理利用本身每一分每一秒的時間來學習提高本身, 不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索