最近在一次壓測過程當中暴露出notifyclient的一個死鎖問題,發生死鎖的場景是消息的可靠異步發送,具體過程是:java
(生產者)消息發送線程拿到隊列鎖,當隊列未滿的時候寫入消息,釋放鎖,當隊列滿的時候,釋放鎖,等待隊列空條件。數據結構
(消費者)刷盤線程拿到隊列鎖,當隊列有數據的時候,取數據清空隊列,釋放鎖,再把取出來的消息數據刷盤持久化;沒數據的時候,釋放鎖,等待隊列非空條件。app
這是一個典型的多生產者-單消費者的問題。起初咱們經過review代碼來看,都以爲不會發生死鎖,由於在臨界區域裏面只用到了一把鎖,不會出現deadlyembrace類型的死鎖。異步
後來進一步瞭解到用戶對notifyclient的使用方式,發現他們的用法比較特殊,用戶會把Nms以內沒有完成消息發送的任務,強行cancel掉。也就是說生產者可能會在某個時刻檢測到interrupt標記位,響應interrupt,是否會產生死鎖必須把interrupt的因素也給考慮進去。ide
通常來講,當捕獲到InterruptedException以後,比較規範的作法是把InterruptedException拋給上層調用者;或者調用Thread.currentThread().interrupt(),從新把線程中斷標記置爲true(由於阻塞方法在拋出InterruptedException,會清除中斷標記位),暫不處理中斷,把中斷留給線程後續處理。基本原則就是要讓任務可以優雅地在一個合適的時機響應中斷,而不能對中斷絕不做爲。this
在這個案例裏面,生產者選擇了後者,暫不處理,經過Thread.currentThread().interrupt()從新設置中斷標記。線程
在大部分狀況下,這麼作是不會有問題的,可是在這種狀況下,問題很大。由於在enqueue裏面,會響應中斷的代碼是this.notEmpty.await(),而且是在一個循環裏,this.notEmpty.await()會在方法入口處檢測是否有中斷標記,若是有那麼就拋InterruptedException,這樣一來一旦拋出第一個InterruptedException,在catch方法塊裏執行Thread.currentThread().interrupt(),會致使在下一次循環裏繼續拋出InterruptedException。若是運氣好的話,可能在某個時刻this.nextWriteBatch!=null條件不成立,跳出循環。若是運氣很差的話,可能就是一個死循環。blog
在此次死鎖案例中,是屬於運氣很差的狀況,由於InterruptedException是在this.notEmpty.await()(Condition.await()會在執行過程當中釋放關聯的鎖)釋放鎖enqueueLock以前發生的,也就是說生產者在釋放鎖以前陷入中斷循環。惟一能讓this.nextWriteBatch!=null不成立的線程是消費者線程,消費者線程沒拿到鎖,沒機會執行this.nextWriteBatch=null。這樣一來這個中斷循環就成了死循環了,他永遠不會釋放鎖,其餘線程會一直阻塞等待鎖。隊列
鎖定義,如下代碼都在同一個實例裏rem
private final Lock enqueueLock = new ReentrantLock(); private final Condition notEmpty = this.enqueueLock.newCondition(); private final Condition empty = this.enqueueLock.newCondition();
生產者代碼
private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException { WriteBatch result = null; this.enqueueLock.lock(); try { // 若是沒有啓動,則先啓動appender線程 this.startAppendThreadIfNessary(); if (this.nextWriteBatch == null) { result = this.newWriteBatch(writeCommand); this.empty.signalAll(); } else { if (this.nextWriteBatch.canAppend(writeCommand)) { this.nextWriteBatch.append(writeCommand); result = this.nextWriteBatch; } else { while (this.nextWriteBatch != null) { try { this.notEmpty.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } result = this.newWriteBatch(writeCommand); this.empty.signalAll(); } } if (!sync) { InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey); switch (writeCommand.opItem.op) { case OpItem.OP_ADD: if (inflyWriteData == null) { this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data)); } else { // update and increase reference count; inflyWriteData.data = writeCommand.data; inflyWriteData.count++; } break; case OpItem.OP_DEL: // 無條件刪除 if (inflyWriteData != null) { this.inflyWrites.remove(writeCommand.bytesKey); } } } return result; } finally { this.enqueueLock.unlock(); } }
消費者代碼
public void processQueue() { while (true) { WriteBatch batch = null; this.enqueueLock.lock(); try { while (true) { if (this.nextWriteBatch != null) { batch = this.nextWriteBatch; this.nextWriteBatch = null; break; } if (this.shutdown) { return; } try { this.empty.await(); } catch (InterruptedException e) { break; } } this.notEmpty.signalAll(); } finally { this.enqueueLock.unlock(); } if (batch != null) { final DataFile dataFile = batch.dataFile; final LogFile logFile = batch.logFile; final List<WriteCommand> cmdList = batch.cmdList; try { this.writeDataAndLog(batch, dataFile, logFile, cmdList); this.proce***emove(batch, dataFile, logFile); } finally { batch.latch.countDown(); } } } }
其實在java技術規範裏面,並不推崇,也不提供簡單粗暴的任務中斷機制,強制要求一個任務立馬中止。由於若是一個任務在執行過程當中,被非正常取消的話,有可能會致使數據結構被破壞,數據不一致的狀況發生。因此java推崇的是經過協做的方式來終止一個任務,一個線程能夠向另一個線程發起終止信號,可是具體如何終止,應該由被終止的線程來決定。被終止的線程能夠經過檢查終止信號,在一個合適的時機優雅退出。
好比一個任務能夠設置一個volatile的cancel標記,當要終止這個任務的時候,咱們把cancel標記設置爲true,告訴任務咱們要終止了。任務在某個時候檢查到這個終止標記,知道要終止的,把數據結構維護好,在合適的時機退出。
儘管能夠經過設置用戶自定義的cancel標記來取消任務,可是也有可能任務調用了一些阻塞方法,好比Condition.await(),一旦阻塞可能就沒機會去檢測用戶自定義的cancel標記,這樣一來任務也就不能及時響應用戶的取消操做。所以jdk提供了線程內置的interrupt標記,能夠經過Thread.currentThread().interrupt()來設置,而且大部分的類庫阻塞方法都能會檢查這個中斷標記位,在中斷的時候拋出異常,以便讓任務及時響應用戶中斷。
固然也有一種多是任務沒有使用自定義cancel標記,也沒有調用可以拋出InterruptedException的方法,若是對這類任務調用Thread.currentThread().interrupt(),是不會產生預期效果的。所以調用方不該該隨意調用Thread.currentThread().interrupt()來取消任務,除非他知道這個任務的中斷策略。而做爲任務代碼編寫者,要保證程序健壯,應該考慮一個合適的中斷策略,可以在被中斷的時候,儘量及時響應中斷,優雅的退出。
從新回到上面的例子,結合業務場景,當用戶調用中斷的時候,是想取消發送消息任務。任務代碼在this.notEmpty.await()檢查到線程中斷標記,拋出InterruptedException。由於數據結構尚未被破壞,數據狀態是一致的,因此無需捕獲異常,直接往上層拋出InterruptedException,釋放鎖,以消息發送失敗了結;
private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException, InterruptedException { WriteBatch result = null; this.enqueueLock.lock(); try { // 若是沒有啓動,則先啓動appender線程 this.startAppendThreadIfNessary(); if (this.nextWriteBatch == null) { result = this.newWriteBatch(writeCommand); this.empty.signalAll(); } else { if (this.nextWriteBatch.canAppend(writeCommand)) { this.nextWriteBatch.append(writeCommand); result = this.nextWriteBatch; } else { while (this.nextWriteBatch != null) { this.notEmpty.await(); } result = this.newWriteBatch(writeCommand); this.empty.signalAll(); } } if (!sync) { InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey); switch (writeCommand.opItem.op) { case OpItem.OP_ADD: if (inflyWriteData == null) { this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data)); } else { // update and increase reference count; inflyWriteData.data = writeCommand.data; inflyWriteData.count++; } break; case OpItem.OP_DEL: // 無條件刪除 if (inflyWriteData != null) { this.inflyWrites.remove(writeCommand.bytesKey); } } } return result; } finally { this.enqueueLock.unlock(); } }
若是在拋出中斷異常的時候,數據結構處於不一致的狀態,那麼能夠先把中斷狀態保存下來,等數據結構維護好以後再退出。好比下面的任務,子任務A和子任務B必須都執行才能保證數據結構不被破壞。
策略1:process聲明中斷異常,上層調用者必須考慮如何處理中斷異常。Process在執行完A以後被中斷,直接撤銷A,拋出中斷異常,讓調用者來處理中斷。
public void process() throws InterruptedException{ doA(); try { Thread.sleep(1000); } catch (InterruptedException e) { undoA(); throw e; } doB(); }
策略2:process沒有聲明中斷異常,當中斷髮生時,選擇一不作二不休的策略。先保存中斷狀態,把A、B都作完了。而後從新標記中斷,把中斷信號傳遞下去,讓線程後面的邏輯來響應中斷。
public void process() { boolean cancel=false; doA(); try { Thread.sleep(1000); } catch (InterruptedException e) { cancel=true; } doB(); if (cancel) { Thread.currentThread().interrupt(); } }