謝寶友: 深刻理解RCU之五:玩具式實現

原創 謝寶友 Linux閱碼場 2018-01-26linux


本文簡介
本文介紹「玩具式」RCU實現。這些實現並不注重性能、實用性,也不能使用於生產環境中,而僅僅是爲了清晰的傳遞RCU的概念。即使如此,要理解這些玩具式的實現,也須要對硬件、RCU概念有深刻的理解。算法

做者簡介
謝寶友,在編程一線工做已經有20年時間,其中接近10年時間工做於Linux操做系統。
同時,他也是《深刻理解並行編程》一書的譯者。該書做者Paul E.McKeney是IBM Linux中心領導者,Linux RCU Maintainer。《深刻理解RCU》系列文章整理了Paul E.McKeney的相關著做,但願能幫助讀者更深入的理解Linux內核中很是難於理解的模塊:RCU。br/>目前,他在專心編寫Hot-Pot操做系統。並準備在2018年年中,開放這個操做系統的源代碼,而且編寫一本《Hot-Pot操做系統詳解--邁向工業級操做系統的實現》來詳細闡述這個操做系統。
聯繫方式:
mail:scxby@163.com
微信:linux-kernel編程

本文前序:
謝寶友: 深刻理解Linux RCU之一——從硬件提及
謝寶友:深刻理解Linux RCU:從硬件提及以內存屏障
謝寶友:深刻理解RCU之三:概念
謝寶友:深刻理解RCU之四:用法數組

一、基於鎖的RCU
也許最簡單的RCU實現就是用鎖了,以下圖所示。在該實現中,rcu_read_lock()獲取一把全局自旋鎖,rcu_read_unlock()釋放鎖,而synchronize_rcu()獲取自旋鎖,隨後將其釋放。
1 static void rcu_read_lock(void)
2 {
3 spin_lock(&rcu_gp_lock);
4 }
5
6 static void rcu_read_unlock(void)
7 {
8 spin_unlock(&rcu_gp_lock);
9 }
10
11 void synchronize_rcu(void)
12 {
13 spin_lock(&rcu_gp_lock);
14 spin_unlock(&rcu_gp_lock);
15 }緩存

基於鎖的RCU實現

由於synchronize_rcu()只有在獲取鎖(而後釋放)之後纔會返回,因此在全部以前發生的RCU讀端臨界區完成前,synchronize_rcu()是不會返回的,所以這符合RCU的語義,特別是存在擔保方面的語義。
可是,在這樣的實現中,一個讀端臨界區同時只能有一個RCU讀者進入,這基本上能夠說是和RCU的目的相反。並且,rcu_read_lock()和rcu_read_unlock()中的鎖操做開銷是極大的,讀端的開銷從Power5單核CPU上的100納秒到64核系統上的17微秒不等。更糟的是,使用同一把鎖使得rcu_read_lock(),可能會使得系統造成自旋鎖死鎖。這是由於:RCU的語義容許RCU讀端嵌套。因此,在這樣的實現中,RCU讀端臨界區不能嵌套。最後一點,原則上併發的RCU更新操做能夠共享一個公共的優雅週期,可是該實現將優雅週期串行化了,所以沒法共享優雅週期。
問題:這樣的死鎖情景會不會出現其餘RCU實現中?
問題:爲何不直接用讀寫鎖來實現這個RCU?
很難想象這種實現能用在任何一個產品中,可是這種實現有一點好處:能夠用在幾乎全部的用戶態程序上。不只如此,相似的使用每CPU鎖或者讀寫鎖的實現還曾經用於Linux 2.4內核中。
二、基於每線程鎖的RCU
下圖顯示了一種基於每線程鎖的實現。rcu_read_lock()和rcu_read_unlock()分別獲取和釋放當前線程的鎖。synchronize_rcu()函數按照次序逐一獲取和釋放每一個線程的鎖。這樣,全部在synchronize_rcu()開始時就已經執行的RCU讀端臨界區,必須在synchronize_rcu()結束前返回。
1 static void rcu_read_lock(void)
2 {
3 spinlock(& _get_thread_var(rcu_gp_lock));
4 }
5
6 static void rcu_read_unlock(void)
7 {
8 spinunlock(& _get_thread_var(rcu_gp_lock));
9 }
10
11 void synchronize_rcu(void)
12 {
13 int t;
14
15 for_each_running_thread(t) {
16 spin_lock(&per_thread(rcu_gp_lock, t));
17 spin_unlock(&per_thread(rcu_gp_lock, t));
18 }
19 }微信

基於鎖的每線程RCU實現

該實現的優勢在於:容許併發的RCU讀者,同時避免了使用單個全局鎖可能形成的死鎖。不只如此,讀端開銷雖然高達大概140納秒,可是無論CPU數目爲多少,始終保持在140納秒。不過,更新端的開銷則在從Power5單核上的600納秒到64核系統上的超過100微秒不等。
問題:若是在第15至18行看,先獲取全部鎖,而後再釋放全部鎖,這樣是否是更清晰一點呢?
問題:該實現可以避免死鎖嗎?若是能,爲何能?若是不能,爲何不能?
本方法在某些狀況下是頗有效的,尤爲是相似的方法曾在Linux 2.4內核中使用。
下面提到的基於計數的RCU實現,克服了基於鎖實現的某些缺點。
三、基於計數的簡單RCU實現
1 atomic_t rcu_refcnt;
2
3 static void rcu_read_lock(void)
4 {
5 atomic_inc(&rcu_refcnt);
6 smp_mb();
7 }
8
9 static void rcu_read_unlock(void)
10 {
11 smp_mb();
12 atomic_dec(&rcu_refcnt);
13 }
14
15 void synchronize_rcu(void)
16 {
17 smp_mb();
18 while (atomic_read(&rcu_refcnt) != 0) {
19 poll(NULL, 0, 10);
20 }
21 smp_mb();
22 }數據結構

使用單個全局引用計數的RCU實現

這是一種稍微複雜一點的RCU實現。本方法在第1行定義了一個全局引用計數rcu_refcnt。rcu_read_lock()原語自動增長計數,而後執行一個內存屏障,確保在原子自增以後才進入RCU讀端臨界區。一樣,rcu_read_unlock()先執行一個內存屏障,劃定RCU讀端臨界區的結束點,而後再原子自減計數。synchronize_rcu()原語不停自旋,等待引用計數的值變爲0,語句先後用內存屏障保護正確的順序。第19行的poll()只是純粹的延時,從純RCU語義的角度上看是能夠省略的。等synchronize_rcu()返回後,全部以前發生的RCU讀端臨界區都已經完成了。
與基於鎖的實現相比,咱們欣喜地發現:這種實現可讓讀者併發進入RCU讀端臨界區。與基於每線程鎖的實現相比,咱們又欣喜地發現:本節的實現可讓RCU讀端臨界區嵌套。另外,rcu_read_lock()原語不會進入死鎖循環,由於它既不自旋也不阻塞。
問題:可是若是你在調用synchronize_rcu()時持有一把鎖,而後又在RCU讀端臨界區中獲取同一把鎖,會發生什麼呢?
固然,這個實現仍是存在一些嚴重的缺點。首先,rcu_read_lock()和rcu_read_unlock()中的原子操做開銷是很是大的,讀端開銷從Power5單核CPU上的100納秒到64核系統上的40微秒不等。這意味着RCU讀端臨界區必須很是長,纔可以知足現實世界中的讀端併發請求。可是從另外一方面來講,當沒有讀者時,優雅週期只有差很少40納秒,這比Linux內核中的產品級實現要快上不少個數量級。
其次,若是存在多個併發的rcu_read_lock()和rcu_read_unlock()操做,由於出現大量高速緩衝未命中,對rcu_refcnt的內存訪問競爭將會十分激烈。
以上這兩個缺點極大地影響RCU的目標,即提供一種讀端低開銷的同步原語。
最後,在很長的讀端臨界區中的大量RCU讀者甚至會讓synchronize_rcu()沒法完成,由於全局計數可能永遠不爲0。這會致使RCU更新端的飢餓,這一點在產品級應用裏確定是不可接受的。
問題:當synchronize_rcu()等待時間過長了之後,爲何不能簡單地讓rcu_read_lock()暫停一下子呢?這種作法不能防止synchronize_rcu()飢餓嗎?
經過上述內容,很難想象本節的實現能夠在產品級應用中使用,雖然它比基於鎖的實現更有這方面的潛力,好比,做爲一種高負荷調試環境中的RCU實現。下面咱們將介紹一種對寫者更有利的引用計數RCU變體。
四、不會讓更新者飢餓的引用計數RCU
1 DEFINE_SPINLOCK(rcu_gp_lock);
2 atomic_t rcu_refcnt[2];
3 atomic_t rcu_idx;
4 DEFINE_PER_THREAD(int, rcu_nesting);
5 DEFINE_PER_THREAD(int, rcu_read_idx);併發

RCU全局引用計數對的數據定義

下圖展現了一種RCU實現的讀端原語,使用一對引用計數(rcu_refcnt[]),經過一個全局索引(rcu_idx)從這對計數中選出一個計數,一個每線程的嵌套計數rcu_nesting,一個每線程的全局索引快照(rcu_read_idx),以及一個全局鎖(rcu_gp_lock),上圖給出了上述定義。
1 static void rcu_readlock(void)
2 {
3 int i;
4 int n;
5
6 n =
_get_thread_var(rcu_nesting);
7 if (n == 0) {
8 i = atomic_read(&rcuidx);
9
_get_thread_var(rcu_read_idx) = i;
10 atomic_inc(&rcurefcnt[i]);
11 }
12
_get_thread_var(rcu_nesting) = n + 1;
13 smp_mb();
14 }
15
16 static void rcu_read_unlock(void)
17 {
18 int i;
19 int n;
20
21 smpmb();
22 n =
_get_thread_var(rcunesting);
23 if (n == 1) {
24 i =
_get_thread_var(rcu_read_idx);
25 atomic_dec(&rcu_refcnt[i]);
26 }
27 __get_thread_var(rcu_nesting) = n - 1;
28 }ide

使用全局引用計數對的RCU讀端原語

擁有兩個元素的rcu_refcnt[]數組讓更新者免於飢餓。這裏的關鍵點是synchronize_rcu()只須要等待已存在的讀者。若是在給定實例的synchronize_rcu()正在執行時,出現一個新的讀者,那麼synchronize_rcu()不須要等待那個新的讀者。在任意時刻,當給定的讀者經過經過rcu_read_lock()進入其RCU讀端臨界區時,它增長rcu_refcnt[]數組中由rcu_idx變量所表明下標的元素。當同一個讀者經過rcu_read_unlock()退出其RCU讀端臨界區,它減去其增長的元素,忽略對rcu_idx值任何可能的後續更改。
這種安排意味着synchronize_rcu()能夠經過修改rcu_idx的值來避免飢餓。假設rcu_idx的舊值爲零,所以修改後的新值爲1。在修改操做以後到達的新讀者將增長rcu_idx[1],而舊的讀者先前遞增的rcu_idx [0]將在它們退出RCU讀端臨界區時遞減。這意味着rcu_idx[0]的值將再也不增長,而是單調遞減。這意味着全部synchronize_rcu()須要作的是等待rcu_refcnt[0]的值達到零。
有了背景,咱們來好好看看實際的實現原語。
實現rcu_read_lock()原語自動增長由rcu_idx標出的rcu_refcnt[]成員的值,而後將索引保存在每線程變量rcu_read_idx中。rcu_read_unlock()原語自動減小對應的rcu_read_lock()增長的那個計數的值。不過,由於rcu_idx每一個線程只能設置爲rcu_idx設置一個值,因此還須要一些手段才能容許嵌套。方法是用每線程的rcu_nesting變量跟蹤嵌套。
爲了讓這種方法可以工做,rcu_read_lock()函數的第6行獲取了當前線程的rcu_nesting,若是第7行的檢查發現當前處於最外層的rcu_read_lock(),那麼第8至10行獲取變量rcu_idx的當前值,將其存到當前線程的rcu_read_idx中,而後增長被rcu_idx選中的rcu_refcnt元素的值。第12行無論如今的rcu_nesting值是多少,直接對其加1。第13行執行一個內存屏障,確保RCU讀端臨界區不會在rcu_read_lock()以前開始。
一樣,rcu_read_unlock()函數在第21行也執行一個內存屏障,確保RCU讀端臨界區不會在rcu_read_unlock()代碼以後還未完成。第22行獲取當前線程的rcu_nesting,若是第23行的檢查發現當前處於最外層的rcu_read_unlock(),那麼第24至25行獲取當前線程的rcu_read_idx(由最外層的rcu_read_lock()保存)而且原子減小被rcu_read_idx選擇的rcu_refcnt元素。不管當前嵌套了多少層,第27行都直接減小本線程的rcu_nesting值。
1 void synchronize_rcu(void)
2 {
3 int i;
4
5 smp_mb();
6 spin_lock(&rcu_gp_lock);
7 i = atomic_read(&rcu_idx);
8 atomic_set(&rcu_idx, !i);
9 smp_mb();
10 while (atomic_read(&rcu_refcnt[i]) != 0) {
11 poll(NULL, 0, 10);
12 }
13 smp_mb();
14 atomic_set(&rcu_idx, i);
15 smp_mb();
16 while (atomic_read(&rcu_refcnt[!i]) != 0) {
17 poll(NULL, 0, 10);
18 }
19 spin_unlock(&rcu_gp_lock);
20 smp_mb();
21 }函數

使用全局引用計數對的RCU更新端原語

上圖實現了對應的synchronize_rcu()。第6行和第19行獲取並釋放rcu_gp_lock,由於這樣能夠防止多於一個的併發synchronize_rcu()實例。第7至8行分別獲取rcu_idx的值,並對其取反,這樣後續的rcu_read_lock()實例將使用與以前的實例不一樣的rcu_idx值。而後第10至12行等待以前的由rcu_idx選出的元素變成0,第9行的內存屏障是爲了保證對rcu_idx的檢查不會被優化到對rcu_idx取反操做以前。第13至18行重複這一過程,第20行的內存屏障是爲了保證全部後續的回收操做不會被優化到對rcu_refcnt的檢查以前執行。
問題:爲何上圖中,在得到自旋鎖以前,synchronize_rcu()第5行還有一個內存屏障?
問題:爲何上圖的計數要檢查兩次?難道檢查一次還不夠嗎?
本節的實現避免了簡單計數實現可能發生的更新端飢餓問題。
討論不過這種實現仍然存在一些嚴重問題。首先,rcu_read_lock()和rcu_read_unlock()中的原子操做開銷很大。事實上,它們比上一個實現中的單個計數要複雜不少,讀端原語的開銷從Power5單核處理器上的150納秒到64核處理器上的40微秒不等。更新端synchronize_rcu()原語的開銷也變大了,從Power5單核CPU中的200納秒到64核處理器中的40微秒不等。這意味着RCU讀端臨界區必須很是長,纔可以知足現實世界的讀端併發請求。
其次,若是存在不少併發的rcu_read_lock()和rcu_read_unlock()操做,那麼對rcu_refcnt的內存訪問競爭將會十分激烈,這將致使耗費巨大的高速緩存未命中。這一點進一步延長了提供併發讀端訪問所須要的RCU讀端臨界區持續時間。這兩個缺點在不少狀況下都影響了RCU的目標。
第三,須要檢查rcu_idx兩次這一點爲更新操做增長了開銷,尤爲是線程數目不少時。
最後,儘管原則上併發的RCU更新能夠共用一個公共優雅週期,可是本節的實現串行化了優雅週期,使得這種共享沒法進行。
問題:既然原子自增和原子自減的開銷巨大,爲何不第10行使用非原子自增,在第25行使用非原子自減呢?
儘管有這樣那樣的缺點,這種RCU的變體仍是能夠運用在小型的多核系統上,也許能夠做爲一種節省內存實現,用於維護與更復雜實現之間的API兼容性。可是,這種方法在CPU增多時可擴展性不佳。
另外一種基於引用計數機制的RCU變體極大地改善了讀端性能和可擴展性。
五、可擴展的基於計數RCU實現
1 DEFINE_SPINLOCK(rcu_gp_lock);
2 DEFINE_PER_THREAD(int [2], rcu_refcnt);
3 atomic_t rcu_idx;
4 DEFINE_PER_THREAD(int, rcu_nesting);
5 DEFINE_PER_THREAD(int, rcu_read_idx);

RCU每線程引用計數對的數據定義

下圖是一種RCU實現的讀端原語,其中使用了每線程引用計數。本實現與前一個實現十分相似,惟一的區別在於rcu_refcnt成了一個每線程變量。使用這個兩元素數組是爲了防止讀者致使寫者飢餓。使用每線程rcu_refcnt[]數組的另外一個好處是,rcu_read_lock()和rcu_read_unlock()原語不用再執行原子操做。
1 static void rcu_readlock(void)
2 {
3 int i;
4 int n;
5
6 n =
_get_thread_var(rcu_nesting);
7 if (n == 0) {
8 i = atomic_read(&rcuidx);
9
_get_thread_var(rcu_readidx) = i;
10
_get_thread_var(rcurefcnt)[i]++;
11 }
12
_get_thread_var(rcu_nesting) = n + 1;
13 smp_mb();
14 }
15
16 static void rcu_read_unlock(void)
17 {
18 int i;
19 int n;
20
21 smpmb();
22 n =
_get_thread_var(rcunesting);
23 if (n == 1) {
24 i =
_get_thread_var(rcu_readidx);
25
_get_thread_var(rcurefcnt)[i]--;
26 }
27
_get_thread_var(rcu_nesting) = n - 1;
28 }

使用每線程引用計數對的RCU讀端原語

問題:別忽悠了!我在rcu_read_lock()裏看見atomic_read()原語了!爲何你想僞裝rcu_read_lock()裏沒有原子操做?
1 static void flip_counter_and_wait(int i)
2 {
3 int t;
4
5 atomic_set(&rcu_idx, !i);
6 smp_mb();
7 for_each_thread(t) {
8 while (per_thread(rcu_refcnt, t)[i] != 0) {
9 poll(NULL, 0, 10);
10 }
11 }
12 smp_mb();
13 }
14
15 void synchronize_rcu(void)
16 {
17 int i;
18
19 smp_mb();
20 spin_lock(&rcu_gp_lock);
21 i = atomic_read(&rcu_idx);
22 flip_counter_and_wait(i);
23 flip_counter_and_wait(!i);
24 spin_unlock(&rcu_gp_lock);
25 smp_mb();
26 }

使用每線程引用計數對的RCU更新端原語

下圖是synchronize_rcu()的實現,還有一個輔助函數flipcounter and_wait()。synchronize_rcu()函數和前一個實現基本同樣,除了原來的重複檢查計數過程被替換成了第22至23行的輔助函數。
新的flip_counter_and_wait()函數在第5行更新rcu_idx變量,第6行執行內存屏障,而後第7至11行循環檢查每一個線程對應的rcu_refcnt元素,等待該值變爲0。一旦全部元素都變爲0,第12行執行另外一個內存屏障,而後返回。
本RCU實現對軟件環境有所要求,(1)可以聲明每線程變量,(2)每一個線程均可以訪問其餘線程的每線程變量,(3)可以遍歷全部線程。絕大多數軟件環境都知足上述要求,可是一般對線程數的上限有所限制。更復雜的實現能夠避開這種限制,好比,使用可擴展的哈希表。這種實現可以動態地跟蹤線程,好比,在線程第一次調用rcu_read_lock()時將線程加入哈希表。
問題:好極了,若是我有N個線程,那麼我要等待2N*10毫秒(每一個flip_counter_and_wait()調用消耗的時間,假設咱們每一個線程只等待一次)。咱們難道不能讓優雅週期再快一點完成嗎?
不過本實現還有一些缺點。首先,須要檢查rcu_idx兩次,這爲更新端帶來一些開銷,特別是線程數不少時。
其次,synchronize_rcu()必須檢查的變量數隨着線程增多而線性增加,這給線程數不少的應用程序帶來必定的開銷。
第三,和以前同樣,雖然原則上併發的RCU更新能夠共用一個公共優雅週期,可是本節的實現串行化了優雅週期,使得這種共享沒法進行。
最後,本節曾經提到的軟件環境需求,在某些環境下每線程變量和遍歷線程可能存在問題。
讀端原語的擴展性很是好,無論是在單核系統仍是64核系統都只須要115納秒左右。Synchronize_rcu()原語的擴展性不佳,開銷在單核Power5系統上的1微秒到64核系統上的200微秒不等。整體來講,本節的方法能夠算是一種初級的產品級用戶態RCU實現了。
下面介紹一種可以讓併發的RCU更新更有效的算法。
六、可擴展的基於計數RCU實現,能夠共享優雅週期
1 DEFINE_SPINLOCK(rcu_gp_lock);
2 DEFINE_PER_THREAD(int [2], rcu_refcnt);
3 long rcu_idx;
4 DEFINE_PER_THREAD(int, rcu_nesting);
5 DEFINE_PER_THREAD(int, rcu_read_idx);

使用每線程引用計數對和共享更新數據的數據定義

下圖是一種使用每線程引用計數RCU實現的讀端原語,可是該實現容許更新端共享優雅週期。本節的實現和前面的實現惟一的區別是,rcu_idx如今是一個long型整數,能夠自由增加,因此第8行用了一個掩碼屏蔽了最低位。咱們還將atomic_read()和atomic_set()改爲了ACCESS_ONCE()。上圖中的數據定義和前例也很類似,只是rcu_idx如今是long類型而非以前的atomic_t類型。
1 static void rcu_readlock(void)
2 {
3 int i;
4 int n;
5
6 n =
_get_thread_var(rcu_nesting);
7 if (n == 0) {
8 i = ACCESS_ONCE(rcuidx) & 0x1;
9
_get_thread_var(rcu_readidx) = i;
10
_get_thread_var(rcurefcnt)[i]++;
11 }
12
_get_thread_var(rcu_nesting) = n + 1;
13 smp_mb();
14 }
15
16 static void rcu_read_unlock(void)
17 {
18 int i;
19 int n;
20
21 smpmb();
22 n =
_get_thread_var(rcunesting);
23 if (n == 1) {
24 i =
_get_thread_var(rcu_readidx);
25
_get_thread_var(rcurefcnt)[i]--;
26 }
27
_get_thread_var(rcu_nesting) = n - 1;
28 }

使用每線程引用計數對和共享更新數據的RCU讀端原語

1 static void flip_counter_and_wait(int ctr)
2 {
3 int i;
4 int t;
5
6 ACCESS_ONCE(rcu_idx) = ctr + 1;
7 i = ctr & 0x1;
8 smp_mb();
9 for_each_thread(t) {
10 while (per_thread(rcu_refcnt, t)[i] != 0)
{
11 poll(NULL, 0, 10);
12 }
13 }
14 smp_mb();
15 }
16
17 void synchronize_rcu(void)
18 {
19 int ctr;
20 int oldctr;
21
22 smp_mb();
23 oldctr = ACCESS_ONCE(rcu_idx);
24 smp_mb();
25 spin_lock(&rcu_gp_lock);
26 ctr = ACCESS_ONCE(rcu_idx);
27 if (ctr - oldctr >= 3) {
28 spin_unlock(&rcu_gp_lock);
29 smp_mb();
30 return;
31 }
32 flip_counter_and_wait(ctr);
33 if (ctr - oldctr < 2)
34 flip_counter_and_wait(ctr + 1);
35 spin_unlock(&rcu_gp_lock);
36 smp_mb();
37 }

使用每線程引用計數對的RCU共享更新端原語

上圖是synchronize_rcu()及其輔助函數flip_counter_and_wait()的實現。flip_counter_and_wait()的變化在於:
1.第6行使用ACCESS_ONCE()代替了atomic_set(),用自增替代取反。
2.新增了第7行,將計數的最低位掩去。
synchronize_rcu()的區別要多一些:
1.新增了一個局部變量oldctr,存儲第23行的獲取每線程鎖以前的rcu_idx值。
2.第26行用ACCESS_ONCE()代替atomic_read()。
3.第27至30行檢查在鎖已獲取時,其餘線程此時是否在循環檢查3個以上的計數,若是是,釋放鎖,執行一個內存屏障而後返回。在本例中,有兩個線程在等待計數變爲0,因此其餘的線程已經作了全部必作的工做。
4.在第33至34行,在鎖已被獲取時,若是當前檢查計數是否爲0的線程不足2個,那麼flip_counter_and_wait()會被調用兩次。另外一方面,若是有兩個線程,另外一個線程已經完成了對計數的檢查,那麼只需再有一個就能夠。
在本方法中,若是有任意多個線程併發調用synchronize_rcu(),一個線程對應一個CPU,那麼最多隻有3個線程在等待計數變爲0。
儘管有這些改進,本節的RCU實現仍然存在一些缺點。首先,和上一節同樣,須要檢查rcu_idx兩次爲更新端帶來開銷,尤爲是線程不少時。
其次,本實現須要每CPU變量和遍歷全部線程的能力,這在某些軟件環境多是有問題的。
最後,在32位機器上,因爲rcu_idx溢出而致使須要作一些額外的檢查。
本實現的讀端原語擴展性極佳,無論CPU數爲多少,開銷大概爲115納秒。synchronize_rcu()原語的開銷仍然昂貴,從1微秒到15微秒不等。然而這比前面的200微秒的開銷已經好多了。因此,儘管存在這些缺點,本節的RCU實現已經能夠在真實世界中的產品中使用了。
問題:全部這些玩具式的RCU實現都要麼在rcu_read_lock()和rcuread unlock()中使用了原子操做,要麼讓synchronize_rcu()的開銷與線程數線性增加。那麼究竟在哪一種環境下,RCU的實現既可讓上述三個原語的實現簡單,又能擁有O(1)的開銷和延遲呢?
從新審視代碼,咱們看到了對一個全局變量的訪問和對不超過4個每線程變量的訪問。考慮到在POSIX線程中訪問每線程變量的開銷相對較高,咱們能夠將三個每線程變量放進單個結構體中,讓rcu_read_lock()和rcu_read_unlock()用單個每線程變量存儲類來訪問各自的每線程變量。
可是,下面將會介紹一種更好的辦法,能夠減小訪問每線程變量的次數到一次。
七、基於自由增加計數的RCU
1 DEFINE_SPINLOCK(rcu_gp_lock);
2 long rcu_gp_ctr = 0;
3 DEFINE_PER_THREAD(long, rcu_reader_gp);
4 DEFINE_PER_THREAD(long, rcu_reader_gp_snap);

使用自由增加計數的數據定義

下圖是一種基於單個全局free-running計數的RCU實現,該計數只對偶數值進行計數,相關的數據定義見上圖。rcu_read_lock()的實現極其簡單。第3行向全局free-running變量rcu_gp_ctr加1,將相加後的奇數值存儲在每線程變量rcu_reader_gp中。第4行執行一個內存屏障,防止後續的RCU讀端臨界區內容「泄漏」。
1 static void rcu_readlock(void)
2 {
3
_get_thread_var(rcu_reader_gp) = rcu_gp_ctr + 1;
4 smp_mb();
5 }
6
7 static void rcu_read_unlock(void)
8 {
9 smpmb();
10
_get_thread_var(rcu_reader_gp) = rcu_gp_ctr;
11 }
12
13 void synchronize_rcu(void)
14 {
15 int t;
16
17 smp_mb();
18 spin_lock(&rcu_gp_lock);
19 rcu_gp_ctr += 2;
20 smp_mb();
21 for_each_thread(t) {
22 while ((per_thread(rcu_reader_gp, t) & 0x1) &&
23 ((per_thread(rcu_reader_gp, t) -
24 rcu_gp_ctr) < 0)) {
25 poll(NULL, 0, 10);
26 }
27 }
28 spin_unlock(&rcu_gp_lock);
29 smp_mb();
30 }

使用自由增加計數的RCU實現

rcu_read_unlock()實現也很相似。第9行執行一個內存屏障,防止前一個RCU讀端臨界區「泄漏」。第10行將全局變量rcu_gp_ctr的值複製給每線程變量rcu_reader_gp,將此每線程變量的值變爲偶數值,這樣當前併發的synchronize_rcu()實例就知道忽略該每線程變量了。
問題:若是任何偶數值均可以讓synchronize_rcu()忽略對應的任務,那麼第10行爲何不直接給rcu_reader_gp賦值爲0?
synchronize_rcu()會等待全部線程的rcu_reader_gp變量變爲偶數值。可是,由於synchronize_rcu()只須要等待「在調用synchronize_rcu()以前就已存在的」RCU讀端臨界區,因此徹底能夠有更好的方法。第17行執行一個內存屏障,防止以前操縱的受RCU保護的數據結構被亂序(由編譯器或者是CPU)放到第17行以後執行。爲了防止多個synchronize_rcu()實例併發執行,第18行獲取rcu_gp_lock鎖(第28釋放鎖)。而後第19行給全局變量rcu_gp_ctr加2。回憶一下,rcu_reader_gp的值爲偶數的線程不在RCU讀端臨界區裏,因此第21至27行掃描rcu_reader_gp的值,直到全部值要麼是偶數(第22行),要麼比全局變量rcu_gp_ctr的值大(第23至24行)。第25行阻塞一小段時間,等待一個以前已經存在的RCU讀端臨界區退出,若是對優雅週期的延遲很敏感的話,也能夠用自旋鎖來代替。最後,第29行的內存屏障保證全部後續的銷燬工做不會被亂序到循環以前進行。
問題:爲何須要第17和第29行的內存屏障?難道第18行和第28行的鎖原語自帶的內存屏障還不夠嗎?
本節方法的讀端性能很是好,無論CPU數目多少,帶來的開銷大概是63納秒。更新端的開銷稍大,從Power5單核的500納秒到64核的超過100微秒不等。
這個實現除了剛纔提到的更新端的開銷較大之外,還有一些嚴重缺點。首先,該實現不容許RCU讀端臨界區嵌套。其次若是讀者在第3行獲取rcu_gp_ctr以後,存儲到rcu_reader_gp以前被搶佔,而且若是rcu_gp_ctr計數的值增加到最大值的一半以上,但沒有達到最大值時,那麼synchronize_rcu()將會忽略後續的RCU讀端臨界區。第三也是最後一點,本實現須要軟件環境支持每線程變量和對全部線程遍歷。
問題:第3行的讀者被搶佔問題是一個真實問題嗎?換句話說,這種致使問題的事件序列可能發生嗎?若是不能,爲何不能?若是能,事件序列是什麼樣的,咱們該怎樣處理這個問題?
八、基於自由增加計數的可嵌套RCU
1 DEFINE_SPINLOCK(rcu_gp_lock);
2 #define RCU_GP_CTR_SHIFT 7
3 #define RCU_GP_CTR_BOTTOM_BIT (1 <<
RCU_GP_CTR_SHIFT)
4 #define RCU_GP_CTR_NEST_MASK
(RCU_GP_CTR_BOTTOM_BIT - 1)
5 long rcu_gp_ctr = 0;
6 DEFINE_PER_THREAD(long, rcu_reader_gp);

基於自由增加計數的可嵌套RCU的數據定義

下圖是一種基於單個全局free-running計數的RCU實現,可是容許RCU讀端臨界區的嵌套。這種嵌套能力是經過讓全局變量rcu_gp_ctr的低位記錄嵌套次數實現的,定義在上圖中。該方法保留低位來記錄嵌套深度。爲了作到這一點,定義了兩個宏,RCU_GP_CTR_NEST_MASK和RCU_GP_CTR_BOTTOM_BIT。兩個宏之間的關係是:RCU_GP_CTR_NEST_MASK=RCUGP CTR_BOTTOM_BIT - 1。RCU_GP_CTR_BOTTOM_BIT宏是用於記錄嵌套那一位以前的一位,RCU_GP_CTR_NEST_MASK宏則包含rcu_gp_ctr中全部用於記錄嵌套的位。顯然,這兩個宏必須保留足夠多的位來記錄容許的最大RCU讀端臨界區嵌套深度,在本實現中保留了7位,這樣,容許最大RCU讀端臨界區嵌套深度爲127,這足夠絕大多數應用使用。
1 static void rcu_readlock(void)
2 {
3 long tmp;
4 long *rrgp;
5
6 rrgp = &
_get_thread_var(rcu_reader_gp);
7 tmp = rrgp;
8 if ((tmp & RCU_GP_CTR_NEST_MASK) == 0)
9 tmp = rcu_gp_ctr;
10 tmp++;
11
rrgp = tmp;
12 smp_mb();
13 }
14
15 static void rcu_read_unlock(void)
16 {
17 long tmp;
18
19 smpmb();
20
_get_thread_var(rcu_reader_gp)--;
21 }
22
23 void synchronize_rcu(void)
24 {
25 int t;
26
27 smp_mb();
28 spin_lock(&rcu_gp_lock);
29 rcu_gp_ctr += RCU_GP_CTR_BOTTOM_BIT;
30 smp_mb();
31 for_each_thread(t) {
32 while (rcu_gp_ongoing(t) &&
33 ((per_thread(rcu_reader_gp, t) -
34 rcu_gp_ctr) < 0)) {
35 poll(NULL, 0, 10);
36 }
37 }
38 spin_unlock(&rcu_gp_lock);
39 smp_mb();
40 }

使用自由增加計數的可嵌套RCU實現

rcu_read_lock()的實現仍然十分簡單。第6行將指向本線程rcu_reader_gp實例的指針放入局部變量rrgp中,將代價昂貴的訪問phtread每線程變量API的數目降到最低。第7行記錄rcu_reader_gp的值放入另外一個局部變量tmp中,第8行檢查低位字節是否爲0,代表當前的rcu_read_lock()是最外層的。若是是,第9行將全局變量rcu_gp_ctr的值存入tmp,由於第7行以前存入的值可能已通過期了。若是不是,第10行增長嵌套深度,若是你能記得,它存放在計數的最低7位。第11行將更新後的計數值從新放入當前線程的rcu_reader_gp實例中,而後,也是最後,第12行執行一個內存屏障,防止RCU讀端臨界區泄漏到rcu_read_lock()以前的代碼裏。
換句話說,除非當前調用的rcu_read_lock()的代碼位於RCU讀端臨界區中,不然本節實現的rcu_read_lock()原語會獲取全局變量rcu_gp_ctr的一個副本,而在嵌套環境中,rcu_read_lock()則去獲取rcu_reader_gp在當前線程中的實例。在兩種狀況下,rcu_read_lock()都會增長獲取到的值,代表嵌套深度又增長了一層,而後將結果儲存到當前線程的rcu_reader_gp實例中。
有趣的是,rcu_read_unlock()的實現和前面的實現如出一轍。第19行執行一個內存屏障,防止RCU讀端臨界區泄漏到rcu_read_unlock()以後的代碼中去,而後第20行減小當前線程的rcu_reader_gp實例,這將減小rcu_reader_gp最低幾位包含的嵌套深度。rcu_read_unlock()原語的調試版本將會在減小嵌套深度以前檢查rcu_reader_gp的最低幾位是否爲0。
synchronize_rcu()的實現與前面十分相似。不過存在兩點不一樣。第一,第29行將RCU_GP_CTR_BOTTOM_BIT增長到全局變量rcu_gp_ctr,而不是直接加常數2。第二,第32行的比較被剝離成一個函數,檢查RCU_GP_CTR_BOTTOM_BIT指示的位,而非無條件地檢查最低位。
本節方法的讀端性能與前面的實現幾乎同樣,無論CPU數目多少,開銷大概爲65納秒。更新端的開銷仍然較大,從Power5單核的600納秒到64核的超過100微秒。
問題:爲何不像上一節那樣,直接用一個單獨的每線程變量來表示嵌套深度,反而用複雜的位運算來表示?
除了解決了RCU讀端臨界區嵌套問題之外,本節的實現有着和前面實現同樣的缺點。另外,在32位系統上,本方法會減小全局變量rcu_gp_ctr變量溢出所需的時間。隨後將介紹一種能大大延長溢出所需時間,同時又極大地下降了讀端開銷的方法。
問題:怎樣才能將全局變量rcu_gp_ctr溢出的時間延長一倍?
問題:溢出是致命的嗎?爲何?爲何不是?若是是致命的,有什麼辦法能夠解決它?
九、基於靜止狀態的RCU
1 DEFINE_SPINLOCK(rcu_gp_lock);
2 long rcu_gp_ctr = 0;
3 DEFINE_PER_THREAD(long, rcu_reader_qs_gp);

基於quiescent-state的RCU的數據定義

下圖是一種基於靜止狀態的用戶態級RCU實現的讀端原語。數據定義在上圖。從圖中第1至7行能夠看出,rcu_read_lock()和rcu_read_unlock()原語不作任何事情,就和Linux內核同樣,這種空函數會成爲內聯函數,而後被編譯器優化掉。之因此是空函數,是由於基於靜止狀態的RCU實現用以前提到的靜止狀態來大體的做爲RCU讀端臨界區的長度,這種狀態包括第9至15行的rcu_quiescent_state()調用。進入擴展的靜止狀態(好比當發生阻塞時)的線程能夠分別用thread_offline()和thread_online() API,來標記擴展的靜止狀態的開始和結尾。這樣,thread_online()就成了對rcu_read_lock()的模仿,thread_offline()就成了對rcu_read_unlock()的模仿。此外,rcu_quiescent_state()能夠被認爲是一個rcu_thread_online()緊跟一個rcu_thread_offline()。從RCU讀端臨界區中調用rcu_quiescent_state()、rcu_thread_offline()或rcu_thread_online()是非法的。
1 static void rcu_read_lock(void)
2 {
3 }
4
5 static void rcu_read_unlock(void)
6 {
7 }
8
9 rcu_quiescent_state(void)
10 {
11 smpmb();
12
_get_thread_var(rcu_reader_qs_gp) =
13 ACCESS_ONCE(rcu_gp_ctr) + 1;
14 smp_mb();
15 }
16
17 static void rcu_thread_offline(void)
18 {
19 smpmb();
20
_get_thread_var(rcu_reader_qs_gp) =
21 ACCESS_ONCE(rcu_gp_ctr);
22 smp_mb();
23 }
24
25 static void rcu_thread_online(void)
26 {
27 rcu_quiescent_state();
28 }

基於靜止狀態的RCU讀端原語

在rcu_quiescent_state()中,第11行執行一個內存屏障,防止在靜止狀態以前的代碼亂序到靜止狀態以後執行。第12至13行獲取全局變量rcu_gp_ctr的副本,使用ACCESS_ONCE()來保證編譯器不會啓用任何優化措施讓rcu_gp_ctr被讀取超過一次。而後對取來的值加1,儲存到每線程變量rcu_reader_qs_gp中,這樣任何併發的synchronize_rcu()實例都只會看見奇數值,所以就知道新的RCU讀端臨界區開始了。正在等待老的讀端臨界區的synchronize_rcu()實例所以也知道忽略新產生的讀端臨界區。最後,第14行執行一個內存屏障,這會阻止後續代碼(包括可能的RCU讀端臨界區)對第12至13行的從新排序。
問題:第14行多餘的內存屏障會不會顯著增長rcu_quiescent_state()的開銷?
有些應用程序可能只是偶爾須要用RCU,可是一旦它們開始用,那必定是處處都在用。這種應用程序能夠在開始用RCU時調用rcu_thread_online(),在再也不使用RCU時調用rcu_thread_offline()。在調用rcu_thread_offline()和下一個調用rcuthread online()之間的時間成爲擴展的靜止狀態,在這段時間RCU不會顯式地註冊靜止狀態。
rcu_thread_offline()函數直接將每線程變量rcu_reader_qs_gp賦值爲rcu_gp_ctr的當前值,該值是一個偶數。這樣全部併發的synchronize_rcu()實例就知道忽略這個線程。
問題:爲何須要第19行和第22行的內存屏障?
rcu_thread_online()函數直接調用rcu_quiescent_state(),這也表示延長靜止狀態的結束。
1 void synchronize_rcu(void)
2 {
3 int t;
4
5 smp_mb();
6 spin_lock(&rcu_gp_lock);
7 rcu_gp_ctr += 2;
8 smp_mb();
9 for_each_thread(t) {
10 while (rcu_gp_ongoing(t) &&
11 ((per_thread(rcu_reader_qs_gp, t) -
12 rcu_gp_ctr) < 0)) {
13 poll(NULL, 0, 10);
14 }
15 }
16 spin_unlock(&rcu_gp_lock);
17 smp_mb();
18 }

基於靜止狀態的RCU更新端原語

下圖是synchronize_rcu()的實現,和前一個實現很相像。
本節實現的讀端原語快得驚人,調用rcu_read_lock()和rcu_read_unlock()的開銷一共大概50皮秒(10的負12次方秒)。synchronize_rcu()的開銷從Power5單核上的600納秒到64核上的超過100微秒不等。
問題:能夠肯定的是,ca-2008Power系統的時鐘頻率至關高,但是即便是5GHz的時鐘頻率,也不足以讓讀端原語在50皮秒執行完畢。這裏究竟發生了什麼?
不過,本節的實現要求每一個線程要麼週期性地調用rcu_quiescent_state(),要麼爲擴展的靜止狀態調用rcu_thread_offline()。週期性調用這些函數的要求在某些狀況下會讓實現變得困難,好比某種類型的庫函數。
另外,本節的實現不容許併發的synchronize_rcu()調用來共享同一個優雅週期。不過,徹底能夠基於這個RCU版本寫一個產品級的RCU實現。
十、關於玩具式RCU實現的總結
若是你看到這裏,恭喜!你如今不只對RCU自己有了更清晰的瞭解,並且對其所須要的軟件和應用環境也更熟悉了。想要更進一步瞭解RCU的讀者,請自行閱讀在各類產品中大量採用的RCU實現。
以前的章節列出了各類RCU原語的理想特性。下面咱們將整理一個列表,供有意實現本身的RCU實現的讀者作參考。
1.必須有讀端原語(好比rcu_read_lock()和rcu_read_unlock())和優雅週期原語(好比synchronize_rcu()和call_rcu()),任何在優雅週期開始前就存在的RCU讀端臨界區必須在優雅週期結束前執行完畢。
2.RCU讀端原語應該有最小的開銷。特別是應該避免如高速緩存未命中、原子操做、內存屏障和條件分支之類的操做。
3.RCU讀端原語應該有O(1)的時間複雜度,能夠用於實時用途。(這意味着讀者能夠與更新者併發運行。)
4.RCU讀端原語應該在全部上下文中均可以使用(在Linux內核中,只有空的死循環時不能使用RCU讀端原語)。一個重要的特例是RCU讀端原語必須能夠在RCU讀端臨界區中使用,換句話說,必須容許RCU讀端臨界區嵌套。
5.RCU讀端原語不該該有條件判斷,不會返回失敗。這個特性十分重要,由於錯誤檢查會增長複雜度,讓測試和驗證變得更復雜。
6.除了靜止狀態之外的任何操做都能在RCU讀端原語裏執行。好比像I/O這樣的操做也該容許。
7.應該容許在RCU讀端臨界區中執行的同時更新一個受RCU保護的數據結構。
8.RCU讀端和更新端的原語應該在內存分配器的設計和實現上獨立。
9.RCU優雅週期不該該被在RCU讀端臨界區以外阻塞的線程而阻塞。

全部這些目標,都被Linux內核RCU實現所知足。後續將分析Linux內核中RCU實現代碼。

相關文章
相關標籤/搜索