Java併發編程實戰(4)- 死鎖

在這篇文章中,咱們主要討論一下死鎖及其解決辦法。java

概述

在上一篇文章中,咱們討論瞭如何使用一個互斥鎖去保護多個資源,以銀行帳戶轉帳爲例,當時給出的解決方法是基於Class對象建立互斥鎖。後端

這樣雖然解決了同步的問題,可是能在現實中使用嗎?答案是不能夠,尤爲是在高併發的狀況下,緣由是咱們使用的互斥鎖的範圍太大,以轉帳爲例,咱們的作法會鎖定整個帳戶Class對象,這樣會致使轉帳操做只能串行進行,可是在實際場景中,大量的轉帳操做業務中的雙方是不相同的,直接在Class對象級別上加鎖是不能接受的。併發

那若是在對象實例級別上加鎖,使用細粒度鎖,會有什麼問題?可能會發生死鎖。app

咱們接下來看一下形成死鎖的緣由和可能的解決方案。高併發

死鎖案例

什麼是死鎖?性能

死鎖是指一組互相競爭資源的線程因互相等待,致使「永久」阻塞的現象。測試

通常來講,當咱們使用細粒度鎖時,它在提高性能的同時,也可能會致使死鎖。this

咱們仍是以銀行轉帳爲例,來看一下死鎖是如何發生的。線程

首先,咱們先定義個BankAccount對象,來存儲基本信息,代碼以下。code

public class BankAccount {
	private int id;
	private double balance;
	private String password;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public double getBalance() {
		return balance;
	}
	public void setBalance(double balance) {
		this.balance = balance;
	}
}

接下來,咱們使用細粒度鎖來嘗試完成轉帳操做,代碼以下。

public class BankTransferDemo {
	
	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
}

咱們用下面的代碼來作簡單測試。

public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

測試代碼中包含了2個線程,其中t1線程循環從sourceAccount向targetAccount轉帳,而t2線程會循環從targetAccount向sourceAccount轉帳。

從運行結果來看,t1線程中的循環在運行600次左右時,t2線程也建立好,開始循環轉帳了,這時就會發生死鎖,致使t1線程和t2線程都沒法繼續執行。

咱們能夠用下面的資源分配圖來更直觀的描述死鎖。

死鎖的緣由和預防

併發程序一旦死鎖,通常沒有特別好的辦法,不少時候咱們只能重啓應用,所以,解決死鎖問題的最好辦法是規避死鎖。

咱們先來看一下死鎖發生的條件,一個叫Coffman的牛人,於1971年在ACM Computing Surveys發表了一篇名爲System Deadlocks的文章,他總結了只有如下四個條件所有知足的狀況下,纔會發生死鎖:

  • 互斥,共享資源X和Y只能被一個線程佔用。
  • 佔有且等待,線程t1已經取得共享資源X,在等待共享資源Y的時候,不釋放共享資源X。
  • 不可搶佔,其餘線程不能強行搶佔線程t1佔有的資源。
  • 循環等待,線程t1等待線程t2佔有的資源,線程t2等待線程t1佔有的資源,就是循環等待。

經過上述描述,咱們可以推導出,只要破壞上面其中一個條件,就能夠避免死鎖的發生。

可是第一個條件互斥,是不能夠被破壞的,不然咱們就沒有用鎖的必要了,那麼咱們來看如何破壞其餘三個條件。

破壞佔用且等待條件

若是要破壞佔用且等待條件,咱們能夠嘗試一次性申請所有資源,這樣就不須要等待了。

在實現過程當中,咱們須要建立一個新的角色,負責同時申請和同時釋放所有資源,咱們能夠將其稱爲Allocator。

咱們來看一下具體的代碼實現。

public class Allocator {
	
	private volatile static Allocator instance;
	
	private Allocator() {}
	
	public static Allocator getInstance() {
		if (instance == null) {
			synchronized(Allocator.class) {
				if (instance == null) {
					instance = new Allocator();
				}
			}
		}
		
		return instance;
	}
	
	private Set<Object> lockObjs = new HashSet<Object>();
	
	public synchronized boolean apply(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				return false;
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
		
		return true;
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
	}
}

Allocator是一個單例模式,它會使用一個Set對象來保存全部須要處理的資源,而後使用apply()和free()來同時鎖定或者釋放全部資源,它們會接收不固定參數。

咱們來看一下新的transfer()方法應該怎麼寫。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		Allocator allocator = Allocator.getInstance();
		while(!allocator.apply(sourceAccount, targetAccount));
		try {
			synchronized(sourceAccount) {
				synchronized(targetAccount) {
					if (sourceAccount.getBalance() > amount) {
						System.out.println("Start transfer.");
						System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
						sourceAccount.setBalance(sourceAccount.getBalance() - amount);
						targetAccount.setBalance(targetAccount.getBalance() + amount);
						System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					}
				}
			}
		}
		finally {
			allocator.free(sourceAccount, targetAccount);
		}
	}

咱們能夠看到,transfer()方法中,首先獲取Allocator實例,而後調用apply(),傳入sourceAccount和targetAccount實例,請注意這裏使用了while循環,即直到apply()返回true,纔會退出循環,此時,Allocator已經鎖定了sourceAccount和targetAccount,接下來,咱們使用synchronized關鍵字來鎖定sourceAccount和targetAccount,而後執行轉帳的業務邏輯。這裏並非必需要用synchronized,可是這樣作能夠避免其餘操做來影響轉帳操做,例如若是轉帳的過程當中對sourceAccount實例進行取錢操做,若是不用synchronized,就有可能引起併發問題。

下面是測試代碼。

public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

程序是能夠正常執行的,結果和咱們預期一致。

在這裏,咱們須要保證鎖對象的不可變性,對於BankAccount對象來講,id屬性能夠看作是其主鍵,id相同的BankAccount實例,從業務角度來講,指向的都是同一個帳戶,可是對於鎖對象來講,id相同的不一樣實例,會產生不一樣的鎖,從而引起併發問題。

咱們來看下面修改後的測試代碼。

public static void main(String[] args) throws InterruptedException {
		
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 這裏應該從後端獲取帳戶實例,此處只作演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 這裏應該從後端獲取帳戶實例,此處只作演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

上述代碼中,每次轉帳都建立新的BankAccount實例,而後將其傳入Allocator,這樣作,是不可以正常處理的,由於每次使用的互斥鎖都做用在不一樣的實例上,這一點,須要特別注意。

破壞不可搶佔條件

破壞不可搶佔條件很簡單,解決的關鍵在於可以主動釋放它佔有的資源,可是synchronized是不能作到這一點的。

synchronized申請資源的時候,若是申請失敗,線程會直接進入阻塞狀態,什麼都不能作,已經鎖定的資源也沒法釋放。

咱們可使用java.util.concurrent包中的Lock對象來實現這一點,相關代碼以下。

private Lock lock = new ReentrantLock();
	
    public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
        try {
            lock.lock();
            if (sourceAccount.getBalance() > amount) {
                System.out.println("Start transfer.");
                System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
                sourceAccount.setBalance(sourceAccount.getBalance() - amount);
                targetAccount.setBalance(targetAccount.getBalance() + amount);
                System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
            }
        }
        finally {
            lock.unlock();
        }
    }

破壞循環條件

破壞循環條件,須要對資源進行排序,而後按序申請資源。

咱們來看下面的代碼。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		BankAccount left = sourceAccount;
		BankAccount right = targetAccount;
		if (sourceAccount.getId() > targetAccount.getId()) {
			left = targetAccount;
			right = sourceAccount;
		}
		synchronized(left) {
			synchronized(right) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}

在這裏,咱們假設BankAccount中的id是主鍵,咱們按照id對sourceAccount和targetAccount進行排序,以後按照id從小到大申請資源,這樣就不會有死鎖發生了。

咱們在解決併發問題的時候,可能會有多種方式,咱們須要評估一下各個解決方案,從中選擇一個成本最低的方案。

對於咱們一直談論的轉帳示例,破壞循環條件多是一個比較好的解決方法。

使用等待-通知機制

咱們上面在破壞佔用且等待條件時,使用了以下的死循環:

while(!allocator.apply(sourceAccount, targetAccount));

在併發量不高的狀況下,這樣寫沒有問題,可是在高併發的狀況下,這樣寫可能須要循環太屢次才能拿到鎖,太消耗CPU了,屬於蠻幹型。

在這種狀況下,一種合理的方案是:若是線程要求的條件不知足,那麼線程阻塞本身,進入等待狀態,當線程要求的條件知足後,通知等待的線程從新執行,這裏線程阻塞就避免了循環消耗CPU的問題。

這就是咱們要討論的等待-通知機制。

Java中的等待-通知機制

Java中的等待-通知機制流程是怎樣的?

線程首先獲取互斥鎖,當線程要求的條件不知足時,釋放互斥鎖,進入等待狀態;當要求的條件知足時,通知等待的線程,從新獲取互斥鎖。

Java使用synchronized關鍵字配合wait()、notify()、notifyAll()三個方法實現等待-通知機制。

在併發程序中,當一個線程進入臨界區後,因爲某些條件沒有知足,須要進入等待狀態,Java對象的wait()方法可以實現這一點。當線程要求的條件知足時,Java對象的notify()和notifyAll()方法就能夠通知等待的線程,它會告訴線程,你須要的條件曾經知足過,之因此說曾經,是由於notify()只能保證在通知的那一時刻,條件是知足的,而被通知線程的執行時刻和通知時刻通常不會重合,因此在線程開始執行的時候,可能條件又不知足了。

另外須要注意,被通知的線程從新執行時,還須要獲取互斥鎖,由於以前在調用wait()方法時,互斥鎖已經被釋放了。

wait()、notify()和notifyAll()三個方法可以被調用的前提是已經獲取了響應的互斥鎖,因此這三個方法都是在synchronized{}內部被調用的。

下面咱們來看一下修改後的Allocator,其中apply()和free()方法的代碼以下。

public synchronized void apply(Object... objs) {
		for (Object obj : objs) {
			while (lockObjs.contains(obj)) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					System.out.println(e.getMessage());
				}
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
		this.notifyAll();
	}

對應的transfer()方法的代碼以下。

public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
	Allocator allocator = Allocator.getInstance();
	allocator.apply(sourceAccount, targetAccount);
	try {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
	finally {
		allocator.free(sourceAccount, targetAccount);
	}
}

運行結果和咱們指望是一致的。

條件曾經知足

在上述代碼中,咱們能夠發現,apply()方法中的判斷條件以前是if,如今改爲了while, while (lockObjs.contains(obj)),這樣作能夠解決條件曾經知足的問題。

由於當wait()返回時,有可能條件已經發生了變化,曾經條件知足,可是如今已經不知足了,因此要從新檢驗條件是否知足。

這是一種範式,是一種經典的作法。

notify() vs notifyAll()

notify()和notifyAll()有什麼區別?

notify()會隨機的通知等待隊列中的一個線程, 而notifyAll()會通知等待隊列中的全部線程。

咱們儘可能使用notifyAll()方法,由於notify()可能會致使某些線程永遠不會被通知到。

假設咱們有一個實例,它有資源 A、B、C、D,咱們使用實例對象來建立互斥鎖。

  • 線程t1申請到了A、B
  • 線程t2申請到了C、D
  • 線程t3試圖申請A、B,失敗,進入等待隊列
  • 線程t4試圖申請C、D,失敗,進入等待隊列
  • 此時,線程t1執行結束,釋放鎖
  • 線程t1調用實例的notify()來通知等待隊列中的線程,有可能被通知的是線程t4,但線程t4申請的是C、D還被線程t2佔用,因此線程t4只能繼續等待
  • 此時,線程t2執行結束,釋放鎖
  • 線程t2調用實例的notify()來通知等待隊列中的線程,t3或者t4只能有1個被喚醒並正常執行,另外1個則再也沒有機會被喚醒

wait()和sleep()的區別

wait()方法與sleep()方法的不一樣之處在於,wait()方法會釋放對象的「鎖標誌」。當調用某一對象的wait()方法後,會使當前線程暫停執行,並將當前線程放入對象等待池中,直到調用了notify()方法後,將從對象等待池中移出任意一個線程並放入鎖標誌等待池中,只有鎖標誌等待池中的線程能夠獲取鎖標誌,它們隨時準備爭奪鎖的擁有權。當調用了某個對象的notifyAll()方法,會將對象等待池中的全部線程都移動到該對象的鎖標誌等待池。

sleep()方法須要指定等待的時間,它可讓當前正在執行的線程在指定的時間內暫停執行,進入阻塞狀態,該方法既可讓其餘同優先級或者高優先級的線程獲得執行的機會,也可讓低優先級的線程獲得執行機會。可是sleep()方法不會釋放「鎖標誌」,也就是說若是有synchronized同步塊,其餘線程仍然不能訪問共享數據。

總結一下,wait()和sleep()區別以下。

  • wait()釋放資源,sleep()不釋放資源
  • wait()須要被喚醒,sleep()不須要
  • wait()是object頂級父類的方法,sleep()則是Thread的方法

wait()和sleep()都會讓渡CPU執行時間,等待再次調度!

相關文章
相關標籤/搜索