一個因中斷致使的死鎖分析

最近在一次壓測過程當中暴露出notifyclient的一個死鎖問題,發生死鎖的場景是消息的可靠異步發送,具體過程是:java

(生產者)消息發送線程拿到隊列鎖,當隊列未滿的時候寫入消息,釋放鎖,當隊列滿的時候,釋放鎖,等待隊列空條件。數據結構

(消費者)刷盤線程拿到隊列鎖,當隊列有數據的時候,取數據清空隊列,釋放鎖,再把取出來的消息數據刷盤持久化;沒數據的時候,釋放鎖,等待隊列非空條件。app

這是一個典型的多生產者-單消費者的問題。起初咱們經過review代碼來看,都以爲不會發生死鎖,由於在臨界區域裏面只用到了一把鎖,不會出現deadlyembrace類型的死鎖。異步

後來進一步瞭解到用戶對notifyclient的使用方式,發現他們的用法比較特殊,用戶會把Nms以內沒有完成消息發送的任務,強行cancel掉。也就是說生產者可能會在某個時刻檢測到interrupt標記位,響應interrupt,是否會產生死鎖必須把interrupt的因素也給考慮進去。ide

通常來講,當捕獲到InterruptedException以後,比較規範的作法是把InterruptedException拋給上層調用者;或者調用Thread.currentThread().interrupt(),從新把線程中斷標記置爲true(由於阻塞方法在拋出InterruptedException,會清除中斷標記位),暫不處理中斷,把中斷留給線程後續處理。基本原則就是要讓任務可以優雅地在一個合適的時機響應中斷,而不能對中斷絕不做爲。this

在這個案例裏面,生產者選擇了後者,暫不處理,經過Thread.currentThread().interrupt()從新設置中斷標記。線程

在大部分狀況下,這麼作是不會有問題的,可是在這種狀況下,問題很大。由於在enqueue裏面,會響應中斷的代碼是this.notEmpty.await(),而且是在一個循環裏,this.notEmpty.await()會在方法入口處檢測是否有中斷標記,若是有那麼就拋InterruptedException,這樣一來一旦拋出第一個InterruptedException,在catch方法塊裏執行Thread.currentThread().interrupt(),會致使在下一次循環裏繼續拋出InterruptedException。若是運氣好的話,可能在某個時刻this.nextWriteBatch!=null條件不成立,跳出循環。若是運氣很差的話,可能就是一個死循環。blog

在此次死鎖案例中,是屬於運氣很差的狀況,由於InterruptedException是在this.notEmpty.await()(Condition.await()會在執行過程當中釋放關聯的鎖)釋放鎖enqueueLock以前發生的,也就是說生產者在釋放鎖以前陷入中斷循環。惟一能讓this.nextWriteBatch!=null不成立的線程是消費者線程,消費者線程沒拿到鎖,沒機會執行this.nextWriteBatch=null。這樣一來這個中斷循環就成了死循環了,他永遠不會釋放鎖,其餘線程會一直阻塞等待鎖。隊列

鎖定義,如下代碼都在同一個實例裏rem

private final Lock enqueueLock = new ReentrantLock();
private final Condition notEmpty = this.enqueueLock.newCondition();
private final Condition empty = this.enqueueLock.newCondition();

生產者代碼

private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException {
WriteBatch result = null;
this.enqueueLock.lock();
try {
// 若是沒有啓動,則先啓動appender線程
this.startAppendThreadIfNessary();
if (this.nextWriteBatch == null) {
result = this.newWriteBatch(writeCommand);
this.empty.signalAll();
}
else {
if (this.nextWriteBatch.canAppend(writeCommand)) {
this.nextWriteBatch.append(writeCommand);
result = this.nextWriteBatch;
}
else {
while (this.nextWriteBatch != null) {
try {
this.notEmpty.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
result = this.newWriteBatch(writeCommand);
this.empty.signalAll();
}
}
if (!sync) {
InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);
switch (writeCommand.opItem.op) {
case OpItem.OP_ADD:
if (inflyWriteData == null) {
this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data));
}
else {
// update and increase reference count;
inflyWriteData.data = writeCommand.data;
inflyWriteData.count++;
}
break;
case OpItem.OP_DEL:
// 無條件刪除
if (inflyWriteData != null) {
this.inflyWrites.remove(writeCommand.bytesKey);
}
}
}
return result;
}
finally {
this.enqueueLock.unlock();
}
}

消費者代碼

public void processQueue() {
while (true) {
WriteBatch batch = null;
this.enqueueLock.lock();
try {
while (true) {
if (this.nextWriteBatch != null) {
batch = this.nextWriteBatch;
this.nextWriteBatch = null;
break;
}
if (this.shutdown) {
return;
}
try {
this.empty.await();
}
catch (InterruptedException e) {
break;
}
}
this.notEmpty.signalAll();
}
finally {
this.enqueueLock.unlock();
}
if (batch != null) {
final DataFile dataFile = batch.dataFile;
final LogFile logFile = batch.logFile;
final List<WriteCommand> cmdList = batch.cmdList;
try {
this.writeDataAndLog(batch, dataFile, logFile, cmdList);
this.proce***emove(batch, dataFile, logFile);
}
finally {
batch.latch.countDown();
}
}
}
}

其實在java技術規範裏面,並不推崇,也不提供簡單粗暴的任務中斷機制,強制要求一個任務立馬中止。由於若是一個任務在執行過程當中,被非正常取消的話,有可能會致使數據結構被破壞,數據不一致的狀況發生。因此java推崇的是經過協做的方式來終止一個任務,一個線程能夠向另一個線程發起終止信號,可是具體如何終止,應該由被終止的線程來決定。被終止的線程能夠經過檢查終止信號,在一個合適的時機優雅退出。

好比一個任務能夠設置一個volatile的cancel標記,當要終止這個任務的時候,咱們把cancel標記設置爲true,告訴任務咱們要終止了。任務在某個時候檢查到這個終止標記,知道要終止的,把數據結構維護好,在合適的時機退出。

儘管能夠經過設置用戶自定義的cancel標記來取消任務,可是也有可能任務調用了一些阻塞方法,好比Condition.await(),一旦阻塞可能就沒機會去檢測用戶自定義的cancel標記,這樣一來任務也就不能及時響應用戶的取消操做。所以jdk提供了線程內置的interrupt標記,能夠經過Thread.currentThread().interrupt()來設置,而且大部分的類庫阻塞方法都能會檢查這個中斷標記位,在中斷的時候拋出異常,以便讓任務及時響應用戶中斷。

固然也有一種多是任務沒有使用自定義cancel標記,也沒有調用可以拋出InterruptedException的方法,若是對這類任務調用Thread.currentThread().interrupt(),是不會產生預期效果的。所以調用方不該該隨意調用Thread.currentThread().interrupt()來取消任務,除非他知道這個任務的中斷策略。而做爲任務代碼編寫者,要保證程序健壯,應該考慮一個合適的中斷策略,可以在被中斷的時候,儘量及時響應中斷,優雅的退出。

從新回到上面的例子,結合業務場景,當用戶調用中斷的時候,是想取消發送消息任務。任務代碼在this.notEmpty.await()檢查到線程中斷標記,拋出InterruptedException。由於數據結構尚未被破壞,數據狀態是一致的,因此無需捕獲異常,直接往上層拋出InterruptedException,釋放鎖,以消息發送失敗了結;

private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException, InterruptedException {
WriteBatch result = null;
this.enqueueLock.lock();
try {
// 若是沒有啓動,則先啓動appender線程
this.startAppendThreadIfNessary();
if (this.nextWriteBatch == null) {
result = this.newWriteBatch(writeCommand);
this.empty.signalAll();
}
else {
if (this.nextWriteBatch.canAppend(writeCommand)) {
this.nextWriteBatch.append(writeCommand);
result = this.nextWriteBatch;
}
else {
while (this.nextWriteBatch != null) {
this.notEmpty.await();
}
result = this.newWriteBatch(writeCommand);
this.empty.signalAll();
}
}
if (!sync) {
InflyWriteData inflyWriteData = this.inflyWrites.get(writeCommand.bytesKey);
switch (writeCommand.opItem.op) {
case OpItem.OP_ADD:
if (inflyWriteData == null) {
this.inflyWrites.put(writeCommand.bytesKey, new InflyWriteData(writeCommand.data));
}
else {
// update and increase reference count;
inflyWriteData.data = writeCommand.data;
inflyWriteData.count++;
}
break;
case OpItem.OP_DEL:
// 無條件刪除
if (inflyWriteData != null) {
this.inflyWrites.remove(writeCommand.bytesKey);
}
}
}
return result;
}
finally {
this.enqueueLock.unlock();
}
}

若是在拋出中斷異常的時候,數據結構處於不一致的狀態,那麼能夠先把中斷狀態保存下來,等數據結構維護好以後再退出。好比下面的任務,子任務A和子任務B必須都執行才能保證數據結構不被破壞。

策略1:process聲明中斷異常,上層調用者必須考慮如何處理中斷異常。Process在執行完A以後被中斷,直接撤銷A,拋出中斷異常,讓調用者來處理中斷。

public void process() throws InterruptedException{
doA();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
undoA();
throw e;
}
doB();
}

策略2:process沒有聲明中斷異常,當中斷髮生時,選擇一不作二不休的策略。先保存中斷狀態,把A、B都作完了。而後從新標記中斷,把中斷信號傳遞下去,讓線程後面的邏輯來響應中斷。

public void process() {
boolean cancel=false;
doA();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
cancel=true;
}
doB();
if (cancel) {
Thread.currentThread().interrupt();
}
}
相關文章
相關標籤/搜索