linux內核級同步機制--futex

在面試中關於多線程同步,你必需要思考的問題 一文中,咱們知道glibc的pthread_cond_timedwait底層是用linux futex機制實現的。java

理想的同步機制應該是沒有鎖衝突時在用戶態利用原子指令就解決問題,而須要掛起等待時再使用內核提供的系統調用進行睡眠與喚醒。換句話說,在用戶態的自旋失敗時,能不能讓進程掛起,由持有鎖的線程釋放鎖時將其喚醒?
若是你沒有較深刻地考慮過這個問題,極可能想固然的認爲相似於這樣就好了(僞代碼):node

void lock(int lockval) {
	//trylock是用戶級的自旋鎖
	while(!trylock(lockval)) {
		wait();//釋放cpu,並將當期線程加入等待隊列,是系統調用
	}
}

boolean trylock(int lockval){
	int i=0; 
	//localval=1表明上鎖成功
	while(!compareAndSet(lockval,0,1)){
		if(++i>10){
			return false;
		}
	}
	return true;
}

void unlock(int lockval) {
	 compareAndSet(lockval,1,0);
	 notify();
}
複製代碼

上述代碼的問題是trylock和wait兩個調用之間存在一個窗口:
若是一個線程trylock失敗,在調用wait時持有鎖的線程釋放了鎖,當前線程仍是會調用wait進行等待,但以後就沒有人再喚醒該線程了。linux

爲了解決上述問題,linux內核引入了futex機制,futex主要包括等待和喚醒兩個方法:futex_waitfutex_wake,其定義以下git

//uaddr指向一個地址,val表明這個地址期待的值,當*uaddr==val時,纔會進行wait
int futex_wait(int *uaddr, int val);
//喚醒n個在uaddr指向的鎖變量上掛起等待的進程
int futex_wake(int *uaddr, int n);
複製代碼

futex在真正將進程掛起以前會檢查addr指向的地址的值是否等於val,若是不相等則會當即返回,由用戶態繼續trylock。不然將當期線程插入到一個隊列中去,並掛起。github

關於同步的一點思考-上文章中對futex的背景與基本原理有介紹,對futex不熟悉的人能夠先看下。面試

本文將深刻分析futex的實現,讓讀者對於鎖的最底層實現方式有直觀認識,再結合以前的兩篇文章(關於同步的一點思考-上關於同步的一點思考-下)能對操做系統的同步機制有個全面的理解。bash

下文中的進程一詞包括常規進程與線程多線程

futex_wait

在看下面的源碼分析前,先思考一個問題:如何確保掛起進程時,val的值是沒有被其餘進程修改過的?架構

代碼在kernel/futex.c中函數

static int futex_wait(u32 __user *uaddr, int fshared,
		      u32 val, ktime_t *abs_time, u32 bitset, int clockrt)
{
	struct hrtimer_sleeper timeout, *to = NULL;
	struct restart_block *restart;
	struct futex_hash_bucket *hb;
	struct futex_q q;
	int ret;

	...

	//設置hrtimer定時任務:在必定時間(abs_time)後,若是進程還沒被喚醒則喚醒wait的進程
	if (abs_time) {
	    ...
		hrtimer_init_sleeper(to, current);
		...
	}

retry:
	//該函數中判斷uaddr指向的值是否等於val,以及一些初始化操做
	ret = futex_wait_setup(uaddr, val, fshared, &q, &hb);
	//若是val發生了改變,則直接返回
	if (ret)
		goto out;

	//將當前進程狀態改成TASK_INTERRUPTIBLE,並插入到futex等待隊列,而後從新調度。
	futex_wait_queue_me(hb, &q, to);

	/* If we were woken (and unqueued), we succeeded, whatever. */
	ret = 0;
	//若是unqueue_me成功,則說明是超時觸發(由於futex_wake喚醒時,會將該進程移出等待隊列,因此這裏會失敗)
	if (!unqueue_me(&q))
		goto out_put_key;
	ret = -ETIMEDOUT;
	if (to && !to->task)
		goto out_put_key;

	/*
	 * We expect signal_pending(current), but we might be the
	 * victim of a spurious wakeup as well.
	 */
	if (!signal_pending(current)) {
		put_futex_key(fshared, &q.key);
		goto retry;
	}

	ret = -ERESTARTSYS;
	if (!abs_time)
		goto out_put_key;

	...

out_put_key:
	put_futex_key(fshared, &q.key);
out:
	if (to) {
		//取消定時任務
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}
	return ret;
}
複製代碼

在將進程阻塞前會將當期進程插入到一個等待隊列中,須要注意的是這裏說的等待隊列實際上是一個相似Java HashMap的結構,全局惟一。

struct futex_hash_bucket {
	spinlock_t lock;
	//雙向鏈表
	struct plist_head chain;
};

static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
複製代碼

着重看futex_wait_setup和兩個函數futex_wait_queue_me

static int futex_wait_setup(u32 __user *uaddr, u32 val, int fshared,
			   struct futex_q *q, struct futex_hash_bucket **hb)
{
	u32 uval;
	int ret;
retry:
	q->key = FUTEX_KEY_INIT;
	//初始化futex_q
	ret = get_futex_key(uaddr, fshared, &q->key, VERIFY_READ);
	if (unlikely(ret != 0))
		return ret;

retry_private:
	//得到自旋鎖
	*hb = queue_lock(q);
	//原子的將uaddr的值設置到uval中
	ret = get_futex_value_locked(&uval, uaddr);

   ... 
	//若是當期uaddr指向的值不等於val,即說明其餘進程修改了
	//uaddr指向的值,等待條件再也不成立,不用阻塞直接返回。
	if (uval != val) {
		//釋放鎖
		queue_unlock(q, *hb);
		ret = -EWOULDBLOCK;
	}

   ...
	return ret;
}
複製代碼

函數futex_wait_setup中主要作了兩件事,一是得到自旋鎖,二是判斷*uaddr是否爲預期值。

static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
				struct hrtimer_sleeper *timeout)
{
	//設置進程狀態爲TASK_INTERRUPTIBLE,cpu調度時只會選擇
	//狀態爲TASK_RUNNING的進程
	set_current_state(TASK_INTERRUPTIBLE);
	//將當期進程(q封裝)插入到等待隊列中去,而後釋放自旋鎖
	queue_me(q, hb);

	//啓動定時任務
	if (timeout) {
		hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
		if (!hrtimer_active(&timeout->timer))
			timeout->task = NULL;
	}

	/*
	 * If we have been removed from the hash list, then another task
	 * has tried to wake us, and we can skip the call to schedule().
	 */
	if (likely(!plist_node_empty(&q->list))) {
		 
		 //若是沒有設置過時時間 || 設置了過時時間且還沒過時
		if (!timeout || timeout->task)
			//系統從新進行進程調度,這個時候cpu會去執行其餘進程,該進程會阻塞在這裏
			schedule();
	}
	//走到這裏說明又被cpu選中運行了
	__set_current_state(TASK_RUNNING);
}
複製代碼

futex_wait_queue_me中主要作幾件事:

  1. 將當期進程插入到等待隊列
  2. 啓動定時任務
  3. 從新調度進程

如何保證條件與等待之間的原子性

futex_wait_setup方法中會加自旋鎖;在futex_wait_queue_me中將狀態設置爲TASK_INTERRUPTIBLE,調用queue_me將當期線程插入到等待隊列中,而後才釋放自旋鎖。也就是說檢查uaddr的值的過程跟進程掛起的過程放在同一個臨界區中。當釋放自旋鎖後,這時再更改addr地址的值已經沒有關係了,由於當期進程已經加入到等待隊列中,能被wake喚醒,不會出現本文開頭提到的沒人喚醒的問題。

futex_wait小結

總結下futex_wait流程:

  1. 加自旋鎖
  2. 檢測*uaddr是否等於val,若是不相等則會當即返回
  3. 將進程狀態設置爲TASK_INTERRUPTIBLE
  4. 將當期進程插入到等待隊列中
  5. 釋放自旋鎖
  6. 建立定時任務:當超過必定時間還沒被喚醒時,將進程喚醒
  7. 掛起當前進程

futex_wake

futex_wake

static int futex_wake(u32 __user *uaddr, int fshared, int nr_wake, u32 bitset)
{
	struct futex_hash_bucket *hb;
	struct futex_q *this, *next;
	struct plist_head *head;
	union futex_key key = FUTEX_KEY_INIT;
	int ret;

	...
	//根據uaddr的值填充&key的內容
	ret = get_futex_key(uaddr, fshared, &key, VERIFY_READ);
	if (unlikely(ret != 0))
		goto out;
	//根據&key得到對應uaddr所在的futex_hash_bucket
	hb = hash_futex(&key);
	//對該hb加自旋鎖
	spin_lock(&hb->lock);
	head = &hb->chain;
	//遍歷該hb的鏈表,注意鏈表中存儲的節點是plist_node類型,而而這裏的this倒是futex_q類型,這種類型轉換是經過c中的container_of機制實現的
	plist_for_each_entry_safe(this, next, head, list) {
		if (match_futex (&this->key, &key)) {
			...
			//喚醒對應進程
			wake_futex(this);
			if (++ret >= nr_wake)
				break;
		}
	}
	//釋放自旋鎖
	spin_unlock(&hb->lock);
	put_futex_key(fshared, &key);
out:
	return ret;
}
複製代碼

futex_wake流程以下:

  1. 找到uaddr對應的futex_hash_bucket,即代碼中的hb
  2. 對hb加自旋鎖
  3. 遍歷fb的鏈表,找到uaddr對應的節點
  4. 調用wake_futex喚起等待的進程
  5. 釋放自旋鎖

wake_futex中將制定進程狀態設置爲TASK_RUNNING並加入到系統調度列表中,同時將進程從futex的等待隊列中移除掉,具體代碼就不分析了,有興趣的能夠自行研究。

End

Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進行線程同步,理解futex的實現能幫助你更好的理解與使用這些上層的同步機制。另外因篇幅與精力有限,涉及到進程調度的相關內容沒有具體分析,不過並不妨礙理解文章內容。

原文:Java架構筆記

相關文章
相關標籤/搜索