併發編程專題五-AbstractQueuedSynchronizer源碼分析

PS:外號鴿子王不是白來的,鴿了好幾天,也是由於比較忙,時間太少了,這篇東西有點多,須要慢慢消化。不知不覺竟然寫了4個多小時....java

1、什麼是AQS

aqs是AbstractQueuedSynchronizer的簡稱,是用來構建鎖或者其餘同步組件(信號量、事件等)的基礎框架類。JDK中許多併發工具類的內部實現都依賴於AQS,如ReentrantLock, Semaphore, CountDownLatch等等。算法

2、AQS的設計模式

2.1模板方法設計模式

在學習原理和源碼以前,咱們先了解一中設計模式。模板方法設計模式。編程

模板方法設計模式定義一個操做中算法的骨架,而將一些步驟延遲到子類中,模板方法使得子類能夠不改變算法的結構便可重定義該算法的某些特定步驟。設計模式

通俗來講就是完成一件事情,有固定的數個步驟,可是每一個步驟根據對象的不一樣,而實現細節不一樣;就能夠在父類中定義一個完成該事情的總方法,按照完成事件須要的步驟去調用其每一個步驟的實現方法。每一個步驟的具體實現,由子類完成。微信

例如 發短信有如下有如下四個步驟: 數據結構

1,須要發送給的人;  2 編寫內容; 3,發送日期 4, 調用短信網關發送多線程

發郵件也一樣有如下四個步驟:併發

1,須要發送給的人;  2 編寫內容; ,發送日期 4, 調用郵箱服務發送框架

這時,能夠清楚地發現,不管是發短信仍是發郵件,它們的步驟幾乎是相同的: 1, 須要發送給的人;  2 發送內容 3,發送日期 4, 發送。只有具體到發送方法的時候,它們有些步驟纔不一樣。這時咱們就能夠把相同的步驟提取出來,生成一個模版,進行共用,而到具體的內容時,它們再有具本的實現。 具體到代碼時,模版就能夠用抽象類實現,具體的內容能夠到它的子類中實現。ide

代碼以下

import java.util.Date;

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description:
 */
public abstract class SendTemplate {

	//發送給誰,具體發送給誰須要子類去實現
	public abstract void toUser();
	//發送內容,具體發送內容須要子類去實現
	public abstract void content();
	//發送日期,由於日期都是同樣的,因此父類就能夠實現掉
	public void date() {
		System.out.println(new Date());
	}
	//發送方法,不一樣的發送方式確定要實現不一樣的發送方法
	public abstract void send();
	
	//發送消息,框架方法-模板方法
	public void sendMessage() {
		toUser();
		content();
		date();
		send();
	}

}

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description: 短信發送模板實現類
 */
public class SendSms extends SendTemplate {

	@Override
	public void toUser() {
		System.out.println("to Pistachio");
	}

	@Override
	public void content() {
		System.out.println(" I LOVE YOU ❤");

	}

	@Override
	public void send() {
		System.out.println("set sms");

	}
	
	public static void main(String[] args) {
		SendTemplate sendSms = new SendSms();
		sendSms.sendMessage();
	}

}

AQS就是採用了模板方法的設計模式,除了AQS,Spring加載配置的過程一樣也是使用了這種設計模式。具體之後Spring源碼專題在給你們詳細說明

2.2AQS中的方法

既然知道AQS所使用的是模板方法設計模式,那具體都有哪些方法,咱們如今列舉一下。

一、模板方法

獨佔式(獨佔鎖) 共享式(共享鎖) 方法描述
acquire acquireShared 獲取鎖方法,獲取同步狀態
acquireInterruptibly acquireSharedInterruptibly 獲取鎖方法,同acquire,但能夠響應中斷
tryAcquireNanos tryAcquireSharedNanos 獲取鎖方法
release releaseShared 釋放鎖
     

二、須要子類覆蓋的流程方法

獨佔式獲取  tryAcquire

獨佔式釋放  tryRelease

共享式獲取 tryAcquireShared

共享式釋放  tryReleaseShared

這些方法咱們能夠看到,AQS中只拋出了一個UnsupportedOperationException異常,因此須要咱們在子類中具體去實現。

三、其餘方法

isHeldExclusively:判斷同步器是否處於獨佔模式

除此以外,AQS還定義了一個volatile變量state,用於記錄鎖的狀態

getState:獲取當前的同步狀態

setState:設置當前同步狀態

compareAndSetState:使用CAS設置狀態,保證狀態設置的原子性

爲何Doug Lea大師要這樣設計AQS呢?AQS面向的是鎖的實現者,而咱們在使用Lock鎖的時候,只須要調用Lock對應的方法便可,屏蔽了實現細節。而對於鎖的實現者來講,簡化了鎖的實現方式,例如同步狀態管理,線程排隊等底層操做。隔離了鎖的實現者和鎖的使用者。從而進行了解耦,當咱們在使用ReentrantLock等鎖的時候,徹底感受不到AQS的存在。

2.3自定義鎖的實現

既然咱們瞭解了AQS中的一些方法,那咱們就經過實現父類中的方法,來本身實現一個鎖,加深下對模板方法的理解。

本身實現鎖的話,首先咱們實現Lock接口,Lock共有6個接口,以前咱們都已經講過了,你們若是感興趣,個人併發編程專題裏查看哈~

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description:
 */
public class SelfLock implements Lock{
	
	//aqs中state 表示獲取到鎖的狀態 state=1 獲取到了鎖,state=0,表示這個鎖當前沒有線程拿到

	//定義一個內部類,實現AQS模板方法
	private static class Sync extends AbstractQueuedSynchronizer{
		
		//是否佔用
		protected boolean isHeldExclusively() {
			return getState()==1;
		}
		//嘗試獲取鎖
		protected boolean tryAcquire(int arg) {
			//CAS操做,首先對比鎖是否被獲取到,獲取到的話就將鎖的狀態置爲1,不然獲取鎖失敗
			if(compareAndSetState(0,1)) {
				//設置鎖的擁有者爲當前線程
				setExclusiveOwnerThread(Thread.currentThread());
				return true;
			}
			return false;
		}
		//嘗試釋放鎖
		protected boolean tryRelease(int arg) {
			//若是鎖的狀態爲0,則不須要釋放,拋出異常
			if(getState()==0) {
				throw new UnsupportedOperationException();
			}
			//設置鎖的擁有者爲null,而且狀態設置爲0
			setExclusiveOwnerThread(null);
			setState(0);
			return true;
		}
		//Condition對象,用於對鎖的對象喚醒和等待
		Condition newCondition() {
			return new ConditionObject();
		}
	}
	
	private final Sync sycn = new Sync();

	//獲取鎖。若是鎖已被其餘線程獲取,則進行等待
	@Override
	public void lock() {
		sycn.acquire(1);
		
	}
	//能夠直接調用父類的方法。該方法和lock的區別就是能夠響應中斷。
	@Override
	public void lockInterruptibly() throws InterruptedException {
		sycn.acquireInterruptibly(1);
		
	}
	//它表示用來嘗試獲取鎖,若是獲取成功,則返回true,若是獲取失敗返回false
	@Override
	public boolean tryLock() {
		return sycn.tryAcquire(1);
	}
	//帶時間戳的獲取鎖,若是等待時間超過,則獲取失敗
	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		return sycn.tryAcquireNanos(1, unit.toNanos(time));
	}
    //釋放鎖
	@Override
	public void unlock() {
		sycn.release(1);
		
	}
	//返回Condition對象
	@Override
	public Condition newCondition() {
		return sycn.newCondition();
	}


}

以上方法咱們實現了獲取鎖和釋放鎖的接口。從實現中咱們發現獲取鎖的時候使用了CAS操做,但釋放鎖的時候沒有進行CAS操做。這樣寫會不會出現問題呢?接下來寫個測試類。測試下鎖可否正常使用

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.xiangxue.tools.SleepTools;

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description:
 */
public class TestMyLock {
    public void test() {
        final Lock lock = new SelfLock();
        
        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                    	SleepTools.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(1);
                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
                }
            }
        }
        // 啓動10個子線程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 主線程每隔1秒換行
        for (int i = 0; i < 10; i++) {
        	SleepTools.second(1);
            System.out.println();
        }
    }

    public static void main(String[] args) {
        TestMyLock testMyLock = new TestMyLock();
        testMyLock.test();
    }
}

咱們定義了10個線程,打印出當前獲取鎖的線程。從打印咱們能夠看出來,每次只會打印出一個線程名。說明咱們寫的鎖是起做用的。那爲何釋放的時候不須要CAS操做呢。當咱們一個線程獲取到鎖,其餘線程又都在作什麼呢。接下來咱們走進AQS源碼中,進行學習。

3、AQS源碼解析

咱們上面的測試用例中。當咱們一個線程獲取到鎖,其餘線程再去獲取鎖的時候,咱們返回了false,線程進入的等待狀態,那既然線程進入了等待狀態,等待鎖被釋放的時候了進行喚醒。那麼必然要有個地方存儲咱們進行等待的線程的一個數據結構。

3.一、AQS中的數據結構-節點和同步隊列

AQS的中的等待的線程所有存儲在一個同步隊列裏,它是一個先進先出的數據結構,先進來的線程會先被喚醒。同時這個隊列仍是個雙向列表。上一個節點指向下一個節點。還有頭指示器指向第一個節點。尾指示器指向最後一個節點。

3.2 同步隊列節點屬性

打開AQS的源碼,找到Node類,全部屬性以下

字段名 屬性值 描述
CANCELLED 1 線程等待超時或者被中斷了,須要從隊列中移走
SIGNAL -1 後續的節點等待狀態,當前節點若是完成則通知後面的節點去運行
CONDITION -2 當前節點處於等待隊列
PROPAGATE -3 共享,表示狀態要日後面的節點傳播
  0 初始狀態
waitStatus   標識當前節點狀態,就是上面的那幾個常量字段。
Node prev   表示當前節點的前驅節點
Node next   表示當前節點的後驅節點
Thread thread   表示當前節點存放的線程
Node nextWaiter   指向等待隊列節點

3.3 獲取鎖的過程

當咱們執行獲取acquire鎖的方法時

若是獲取失敗,則會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,該方法首先會執行addWaiter(Node.EXCLUSIVE), arg),

該方法是將線程包裝成Node節點,經過CAS操做,將節點加入到同步隊列中。

尾節點變化如圖所示

由於多線程的緣由,爲了保持原子性,因此這裏須要使用CAS操做。否則可能會出現尾部節點數據丟失等問題。若是CAS對比操做失敗,則執行enq方法,將該操做放入循環中。直到添加成功。(CAS的基本操做)

當Node已經放入了同步隊列以後,那麼這個線程說明須要進行等待。排隊排到個人時候我要去獲取鎖。因此acquireQueued方法中,會一直循環獲取鎖。

首先獲取前置節點,若是前置節點是頭節點,則直接去嘗試獲取鎖。當獲取鎖成功以後,將該節點設置爲頭節點,以前的頭結點設置爲null,脫鉤幫助GC。而後returen false;說明獲取鎖成功。不須要在進行等待。

若是獲取鎖失敗,則會執行parkAndCheckInterrupt()方法,將本身阻塞。這是整個獲取鎖的一個過程。

3.4 釋放鎖的過程

釋放鎖的方法爲咱們定義的unlock();實際調用的AQS中的release方法。

該方法首先執行咱們實現的TryRelease嘗試釋放鎖,若是鎖釋放成功,則獲取頭結點,而後執行unparkSuccessor(h)方法。

這個方法主要就是獲取頭結點的下一個節點,若是下個節點爲null,則將下個節點的前一個節點設置爲尾部,不然的話而後進行喚醒。

首節點的變化如圖所示

當一個節點釋放鎖以後,修改頭結點時同時只會有一個線程去操做,因此不須要CAS操做,直接將頭節點設爲null,而後修改同步器頭部指向下一個節點。

總體獲取和釋放鎖的過程以下所示

3.五、AQS中的數據結構-節點和等待隊列

每個鎖都有一個Condition對象,該對象主要實現await和signal等方法。用於線程的等待和喚醒。所以每一個Condition對象中確定也存在一個等待隊列。

等待隊列的數據結構

等待隊列和同步隊列的區別

一、等待隊列裏的節點和同步隊列節點都是同一個屬性。惟一的不一樣就是等待隊列是一個單向鏈表,而非雙向鏈表。

二、一個同步器(鎖)裏面,只會有一個同步隊列,但能夠有多個等待隊列。以下圖所示。

一個鎖能夠建立個多個Condition,每個Condition下都會有一個等待隊列。

private Lock lock = new ReentrantLock(); private Condition keCond = lock.newCondition(); private Condition siteCond = lock.newCondition();

3.六、await方法過程

await表示使當前線程進入等待狀態。

首先調用addConditionWaiter方法將該線程包裝成Node加入到阻塞隊列中去,而後調用fullyRelease方法,將該線程所持有的鎖進行釋放。在while中判斷線程是否被喚醒,若是沒有,則進行阻塞。

await方法過程

3.七、single方法過程

single喚醒等待的一個線程。

當執行single的時候,首先須要從等待隊列中取出一個節點,若是不爲null,則執行doSignal方法。

若是等待隊列的下一個節點爲null,則把末尾節點設爲null

而後調用transferForSignal方法,判斷當前節點的狀態,若是不能修改值,則取消,若是修改爲功,則嗲用enq。將該節點移動到同步隊列尾部,並設置waitStatus爲SIGNAL。知足條件後喚醒線程。

single流程圖以下

由於每一個condition對象都會有一個同步機制,並且調用single會指定喚醒對應等待隊列的線程,不會丟失信息。因此建議使用single方法喚醒,而不是調用singleAll,並且每次調用singleAll還要將全部等待隊列的節點所有移動到同步隊列中。

大致上學完AQS,要了解模板方法設計,會本身手動實現鎖,瞭解獲取鎖和釋放鎖,以及基於鎖的等待和喚醒機制。你們若是還有什麼問題,能夠加我微信一塊兒討論哈。

其餘閱讀   併發編程專題

相關文章
相關標籤/搜索