在這篇文章中,咱們主要討論一下死鎖及其解決辦法。java
在上一篇文章中,咱們討論瞭如何使用一個互斥鎖去保護多個資源,以銀行帳戶轉帳爲例,當時給出的解決方法是基於Class對象建立互斥鎖。後端
這樣雖然解決了同步的問題,可是能在現實中使用嗎?答案是不能夠,尤爲是在高併發的狀況下,緣由是咱們使用的互斥鎖的範圍太大,以轉帳爲例,咱們的作法會鎖定整個帳戶Class對象,這樣會致使轉帳操做只能串行進行,可是在實際場景中,大量的轉帳操做業務中的雙方是不相同的,直接在Class對象級別上加鎖是不能接受的。併發
那若是在對象實例級別上加鎖,使用細粒度鎖,會有什麼問題?可能會發生死鎖。app
咱們接下來看一下形成死鎖的緣由和可能的解決方案。高併發
什麼是死鎖?性能
死鎖是指一組互相競爭資源的線程因互相等待,致使「永久」阻塞的現象。測試
通常來講,當咱們使用細粒度鎖時,它在提高性能的同時,也可能會致使死鎖。this
咱們仍是以銀行轉帳爲例,來看一下死鎖是如何發生的。線程
首先,咱們先定義個BankAccount對象,來存儲基本信息,代碼以下。code
public class BankAccount { private int id; private double balance; private String password; public int getId() { return id; } public void setId(int id) { this.id = id; } public double getBalance() { return balance; } public void setBalance(double balance) { this.balance = balance; } }
接下來,咱們使用細粒度鎖來嘗試完成轉帳操做,代碼以下。
public class BankTransferDemo { public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { synchronized(sourceAccount) { synchronized(targetAccount) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } } }
咱們用下面的代碼來作簡單測試。
public static void main(String[] args) throws InterruptedException { BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); BankTransferDemo obj = new BankTransferDemo(); Thread t1 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(sourceAccount, targetAccount, 1); } }); Thread t2 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(targetAccount, sourceAccount, 1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Finished."); }
測試代碼中包含了2個線程,其中t1線程循環從sourceAccount向targetAccount轉帳,而t2線程會循環從targetAccount向sourceAccount轉帳。
從運行結果來看,t1線程中的循環在運行600次左右時,t2線程也建立好,開始循環轉帳了,這時就會發生死鎖,致使t1線程和t2線程都沒法繼續執行。
咱們能夠用下面的資源分配圖來更直觀的描述死鎖。
併發程序一旦死鎖,通常沒有特別好的辦法,不少時候咱們只能重啓應用,所以,解決死鎖問題的最好辦法是規避死鎖。
咱們先來看一下死鎖發生的條件,一個叫Coffman的牛人,於1971年在ACM Computing Surveys發表了一篇名爲System Deadlocks的文章,他總結了只有如下四個條件所有知足的狀況下,纔會發生死鎖:
經過上述描述,咱們可以推導出,只要破壞上面其中一個條件,就能夠避免死鎖的發生。
可是第一個條件互斥,是不能夠被破壞的,不然咱們就沒有用鎖的必要了,那麼咱們來看如何破壞其餘三個條件。
若是要破壞佔用且等待條件,咱們能夠嘗試一次性申請所有資源,這樣就不須要等待了。
在實現過程當中,咱們須要建立一個新的角色,負責同時申請和同時釋放所有資源,咱們能夠將其稱爲Allocator。
咱們來看一下具體的代碼實現。
public class Allocator { private volatile static Allocator instance; private Allocator() {} public static Allocator getInstance() { if (instance == null) { synchronized(Allocator.class) { if (instance == null) { instance = new Allocator(); } } } return instance; } private Set<Object> lockObjs = new HashSet<Object>(); public synchronized boolean apply(Object... objs) { for (Object obj : objs) { if (lockObjs.contains(obj)) { return false; } } for (Object obj : objs) { lockObjs.add(obj); } return true; } public synchronized void free(Object... objs) { for (Object obj : objs) { if (lockObjs.contains(obj)) { lockObjs.remove(obj); } } } }
Allocator是一個單例模式,它會使用一個Set對象來保存全部須要處理的資源,而後使用apply()和free()來同時鎖定或者釋放全部資源,它們會接收不固定參數。
咱們來看一下新的transfer()方法應該怎麼寫。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { Allocator allocator = Allocator.getInstance(); while(!allocator.apply(sourceAccount, targetAccount)); try { synchronized(sourceAccount) { synchronized(targetAccount) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } } finally { allocator.free(sourceAccount, targetAccount); } }
咱們能夠看到,transfer()方法中,首先獲取Allocator實例,而後調用apply(),傳入sourceAccount和targetAccount實例,請注意這裏使用了while循環,即直到apply()返回true,纔會退出循環,此時,Allocator已經鎖定了sourceAccount和targetAccount,接下來,咱們使用synchronized關鍵字來鎖定sourceAccount和targetAccount,而後執行轉帳的業務邏輯。這裏並非必需要用synchronized,可是這樣作能夠避免其餘操做來影響轉帳操做,例如若是轉帳的過程當中對sourceAccount實例進行取錢操做,若是不用synchronized,就有可能引起併發問題。
下面是測試代碼。
public static void main(String[] args) throws InterruptedException { BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); BankTransferDemo obj = new BankTransferDemo(); Thread t1 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(sourceAccount, targetAccount, 1); } }); Thread t2 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { obj.transfer(targetAccount, sourceAccount, 1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Finished."); }
程序是能夠正常執行的,結果和咱們預期一致。
在這裏,咱們須要保證鎖對象的不可變性,對於BankAccount對象來講,id屬性能夠看作是其主鍵,id相同的BankAccount實例,從業務角度來講,指向的都是同一個帳戶,可是對於鎖對象來講,id相同的不一樣實例,會產生不一樣的鎖,從而引起併發問題。
咱們來看下面修改後的測試代碼。
public static void main(String[] args) throws InterruptedException { BankTransferDemo obj = new BankTransferDemo(); Thread t1 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { // 這裏應該從後端獲取帳戶實例,此處只作演示。 BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); obj.transfer(sourceAccount, targetAccount, 1); } }); Thread t2 = new Thread(() ->{ for (int i = 0; i < 10000; i++) { // 這裏應該從後端獲取帳戶實例,此處只作演示。 BankAccount sourceAccount = new BankAccount(); sourceAccount.setId(1); sourceAccount.setBalance(50000); BankAccount targetAccount = new BankAccount(); targetAccount.setId(2); targetAccount.setBalance(20000); obj.transfer(targetAccount, sourceAccount, 1); } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Finished."); }
上述代碼中,每次轉帳都建立新的BankAccount實例,而後將其傳入Allocator,這樣作,是不可以正常處理的,由於每次使用的互斥鎖都做用在不一樣的實例上,這一點,須要特別注意。
破壞不可搶佔條件很簡單,解決的關鍵在於可以主動釋放它佔有的資源,可是synchronized是不能作到這一點的。
synchronized申請資源的時候,若是申請失敗,線程會直接進入阻塞狀態,什麼都不能作,已經鎖定的資源也沒法釋放。
咱們可使用java.util.concurrent包中的Lock對象來實現這一點,相關代碼以下。
private Lock lock = new ReentrantLock(); public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { try { lock.lock(); if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } finally { lock.unlock(); } }
破壞循環條件,須要對資源進行排序,而後按序申請資源。
咱們來看下面的代碼。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { BankAccount left = sourceAccount; BankAccount right = targetAccount; if (sourceAccount.getId() > targetAccount.getId()) { left = targetAccount; right = sourceAccount; } synchronized(left) { synchronized(right) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } }
在這裏,咱們假設BankAccount中的id
是主鍵,咱們按照id
對sourceAccount和targetAccount進行排序,以後按照id
從小到大申請資源,這樣就不會有死鎖發生了。
咱們在解決併發問題的時候,可能會有多種方式,咱們須要評估一下各個解決方案,從中選擇一個成本最低的方案。
對於咱們一直談論的轉帳示例,破壞循環條件多是一個比較好的解決方法。
咱們上面在破壞佔用且等待條件時,使用了以下的死循環:
while(!allocator.apply(sourceAccount, targetAccount));
在併發量不高的狀況下,這樣寫沒有問題,可是在高併發的狀況下,這樣寫可能須要循環太屢次才能拿到鎖,太消耗CPU了,屬於蠻幹型。
在這種狀況下,一種合理的方案是:若是線程要求的條件不知足,那麼線程阻塞本身,進入等待狀態,當線程要求的條件知足後,通知等待的線程從新執行,這裏線程阻塞就避免了循環消耗CPU的問題。
這就是咱們要討論的等待-通知機制。
Java中的等待-通知機制流程是怎樣的?
線程首先獲取互斥鎖,當線程要求的條件不知足時,釋放互斥鎖,進入等待狀態;當要求的條件知足時,通知等待的線程,從新獲取互斥鎖。
Java使用synchronized關鍵字配合wait()、notify()、notifyAll()三個方法實現等待-通知機制。
在併發程序中,當一個線程進入臨界區後,因爲某些條件沒有知足,須要進入等待狀態,Java對象的wait()方法可以實現這一點。當線程要求的條件知足時,Java對象的notify()和notifyAll()方法就能夠通知等待的線程,它會告訴線程,你須要的條件曾經知足過,之因此說曾經,是由於notify()只能保證在通知的那一時刻,條件是知足的,而被通知線程的執行時刻和通知時刻通常不會重合,因此在線程開始執行的時候,可能條件又不知足了。
另外須要注意,被通知的線程從新執行時,還須要獲取互斥鎖,由於以前在調用wait()方法時,互斥鎖已經被釋放了。
wait()、notify()和notifyAll()三個方法可以被調用的前提是已經獲取了響應的互斥鎖,因此這三個方法都是在synchronized{}內部被調用的。
下面咱們來看一下修改後的Allocator,其中apply()和free()方法的代碼以下。
public synchronized void apply(Object... objs) { for (Object obj : objs) { while (lockObjs.contains(obj)) { try { this.wait(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } } } for (Object obj : objs) { lockObjs.add(obj); } } public synchronized void free(Object... objs) { for (Object obj : objs) { if (lockObjs.contains(obj)) { lockObjs.remove(obj); } } this.notifyAll(); }
對應的transfer()方法的代碼以下。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) { Allocator allocator = Allocator.getInstance(); allocator.apply(sourceAccount, targetAccount); try { synchronized(sourceAccount) { synchronized(targetAccount) { if (sourceAccount.getBalance() > amount) { System.out.println("Start transfer."); System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); sourceAccount.setBalance(sourceAccount.getBalance() - amount); targetAccount.setBalance(targetAccount.getBalance() + amount); System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance())); } } } } finally { allocator.free(sourceAccount, targetAccount); } }
運行結果和咱們指望是一致的。
在上述代碼中,咱們能夠發現,apply()方法中的判斷條件以前是if,如今改爲了while, while (lockObjs.contains(obj))
,這樣作能夠解決條件曾經知足的問題。
由於當wait()返回時,有可能條件已經發生了變化,曾經條件知足,可是如今已經不知足了,因此要從新檢驗條件是否知足。
這是一種範式,是一種經典的作法。
notify()和notifyAll()有什麼區別?
notify()會隨機的通知等待隊列中的一個線程, 而notifyAll()會通知等待隊列中的全部線程。
咱們儘可能使用notifyAll()方法,由於notify()可能會致使某些線程永遠不會被通知到。
假設咱們有一個實例,它有資源 A、B、C、D,咱們使用實例對象來建立互斥鎖。
wait()方法與sleep()方法的不一樣之處在於,wait()方法會釋放對象的「鎖標誌」。當調用某一對象的wait()方法後,會使當前線程暫停執行,並將當前線程放入對象等待池中,直到調用了notify()方法後,將從對象等待池中移出任意一個線程並放入鎖標誌等待池中,只有鎖標誌等待池中的線程能夠獲取鎖標誌,它們隨時準備爭奪鎖的擁有權。當調用了某個對象的notifyAll()方法,會將對象等待池中的全部線程都移動到該對象的鎖標誌等待池。
sleep()方法須要指定等待的時間,它可讓當前正在執行的線程在指定的時間內暫停執行,進入阻塞狀態,該方法既可讓其餘同優先級或者高優先級的線程獲得執行的機會,也可讓低優先級的線程獲得執行機會。可是sleep()方法不會釋放「鎖標誌」,也就是說若是有synchronized同步塊,其餘線程仍然不能訪問共享數據。
總結一下,wait()和sleep()區別以下。
wait()和sleep()都會讓渡CPU執行時間,等待再次調度!