15分鐘讓你瞭解如何實現併發中的Barrier

說到Barrier,不少語言中已是標準庫中自帶的概念,通常狀況下,只須要直接使用就好了。而最近一些機緣巧合的機會,我須要在c++中使用這麼個玩意兒。可是c++標準庫裏尚未這個概念,只有boost裏面有這樣現成的東西,而我又不想爲了這麼一個小東西引入個boost。因此,我藉着這個機會研究了下,發現其實這些多線程/併發中的東西仍是蠻有意思的。ios

 

閱讀本文你可能須要以下的一些知識:c++

  1. 多線程編程的概念。編程

  2. c++的基本語法和有關多線程的語法。多線程

 

第二條可能也沒有那麼重要,由於若是理解了多線程的這些東西,什麼語言均可以實現其核心概念。好了,廢話少扯,進入正題。併發

 

1、什麼是Barrier?函數

 

首先,得介紹下Barrier的概念,Barrier從字面理解是屏障的意思,主要是用做集合線程,而後再一塊兒往下執行。再具體一點,在Barrier以前,若干個thread各自執行,而後到了Barrier的時候停下,等待規定數目的全部的其餘線程到達這個Barrier,以後再一塊兒經過這個Barrier各自幹本身的事情。性能

 

這個概念特別像小時候集體活動的過程,你們從各自的家裏到學校集合,待人數都到齊以後,以後再一塊兒坐車出去,到達指定地點後一塊兒行動或者各自行動。網站

 

而在計算機的世界裏,Barrier能夠解決的問題不少,好比,一個程序有若干個線程併發的從網站上下載一個大型xml文件,這個過程能夠相互獨立,由於一個文件的各個部分並不相關。而在處理這個文件的時候,可能須要一個完整的文件,因此,須要有一條虛擬的線讓這些併發的部分集合一下從而能夠拼接成爲一個完整的文件,多是爲了後續處理也多是爲了計算hash值來驗證文件的完整性。然後,再交由下一步處理。this

 

2、如何實現一個Barrier?spa

 

併發的不少東西都擁有一個壞處就是你很難證實某種實現不是錯誤的,由於不少時候確實狀況太多了,不管是死鎖,飢餓對於人腦都是太大的負擔。而反過來,對於我扯這篇文章,也是一個好處,正由於很難證實不是錯誤的,因此個人扯淡能夠更放心一點。

 

在研究Barrier的實現中,我查閱了蠻多的資料的。說實話,其實現方式挺多的。在剔除了一些我能明確證實其有多是錯誤的,我選擇了我本身以爲最容易理解的一種。

 

第一節說過,barrier很像是之前的班級集合,站在一個老師的角度,你須要知道的東西至少有這兩個:

  1. 班級有多少人。

  2. 目前已經到了多少人。

     

只有當目前已經到了的人等於班級人數以後才能出發。

 

因此若是按照這個類比,實現一個barrier至少須要如下的幾個變量:

  1. 須要同時在barrier等待的線程的個數。

  2. 當前到達barrier的線程的個數。

 

而按照barrier的邏輯,主要應該有這些操做:

  1. 當一個線程到達barrier的時候,增長計數。

  2. 若是個數不等於當前須要等待的線程個數,等待。

  3. 若是個數達到了須要等待的線程個數,通知/喚醒全部等待的進程,讓全部進程經過barrier。

 

在不考慮加鎖的狀況下,按照上面的邏輯,僞代碼大概應該像這樣:

thread_count = n; <-- n是須要一塊兒等待的線程的個數
arrived_count = 0; <-- 到達線程的個數
-------------------------------------------------------------
 以上是全局變量,只會初始化一次,如下是barrier開始的代碼
-------------------------------------------------------------
arrived_count += 1;
if(arrived_count == thread_count)
    notify_all_threads_and_unblok();
else
    block_and_wait();

而在多線程環境下,很明顯arrived_count這種全局變量更新須要加鎖。因此,對於這個代碼,綜合稍微再改動一下,僞代碼能夠更新下成爲這樣:

thread_count = n; <-- n是須要一塊兒等待的線程的個數
arrived_count = 0; <-- 到達線程的個數
-------------------------------------------------------------
 以上是全局變量,只會初始化一次,如下是barrier開始的代碼
-------------------------------------------------------------
lock();
    arrived_count += 1;
unlock();
if(arrived_count == thread_count)
    notify_all_threads_and_unblok();
else
    block_and_wait();

這裏,在有的語言中,鎖的粒度可能小了點,取決於notify_all_threads和wait在這個語言中的定義,可是做爲僞代碼,爲了可能展現起來比較方便。

 

而若是你有併發編程的知識,你應該敏感的認識到notify_all_threads_and_unblock,block_and_wait這種在這裏雖然是簡單的幾個單詞,可是其包含的操做步驟明顯不止一個,更別說背後的機器指令了。因此做爲一個併發概念下運行的程序,不能夠簡單的就放這樣一個操做在這裏,若是都是任何函數,指令,代碼都是自帶原子性的,那麼寫多線程/併發程序也沒有啥好研究的了。因此對於這兩個操做,咱們必須具體的擴展下。

 

對於notify_all_threads_and_unblock和block_and_wait包含至關多的操做,因此下面,得把這兩個操做具體的展開。

 1 thread_count = n; <-- n是須要一塊兒等待的線程的個數
 2 arrived_count = 0; <-- 到達線程的個數
 3 could_release = false; 
 4 -------------------------------------------------------------
 5  以上是全局變量,只會初始化一次,如下是barrier開始的代碼
 6 -------------------------------------------------------------
 7 lock();
 8     if(arrived_count == 0)
 9        could_release = false; 
10     
11     arrived_count += 1;
12 unlock();
13 if(arrived_count == thread_count)
14     could_realse = true;    
15     arrived_count = 0; 
16 else
17     while(could_release == false)
18         spin()

這裏多了一個變量could_release完成上面說的兩個操做。原理也很簡單,若是等待的個數沒有到達指定數目,這個值始終是false,在代碼中使用循環讓線程阻塞在spin處(固然,假設spin是原子性的)。若是到達了thread_count,改變could_release的值,這樣循環條件不知足,代碼能夠繼續執行。而在13行的if裏面把arrived_count從新設置爲0是由於若是不這樣作,那麼這個barrier就只能用一次,由於沒有地方再把這個表示到達線程數目變量的初始值從新設置了。

 

我以爲這裏須要停一下,來思一下上面的代碼,首先,這個代碼有不少看起來很像有問題的地方。好比對於could_release和arrived_count的重置處,這都是賦值,而在併發程序中,任何寫操做都須要仔細思考是否須要加鎖,在這裏,加鎖固然沒問題。可是盲目的加鎖會致使性能損失。

 

多線程程序最可怕的就是陷入細節,因此,我通常都是總體的思考下是否是有問題。對於一個barrier,錯誤就是指沒有等全部的線程都到達了就中止了等待,人沒來齊就發車了。而怎麼會致使這樣的狀況呢?只有當arrived_count值在兩個線程不一樣步纔會致使錯誤。秉承這個原則,看看上面的代碼,arrived_count的更新是加鎖的,因此在到達if以前其值是能夠信賴的。而if這段判斷自己是讀操做,其判斷就是能夠信賴的,由於arrived_count的值更新是可靠的,因此進來的線程要麼進入if,要麼進入else。不存在線程1更新了arrived_count的值而線程2讀到了arrived_count的值而致使沒有到thread_count就更新了could_release的狀況。

 

沒辦法,這類的程序就是很繞,因此我通常都不陷入細節。

 

如今看起來,一切都很完美,可是多線程程序最噁心的地方就是可能的死鎖,飢餓等等。而這些又很難證實,而上面這段代碼,在某些狀況下就是會致使死鎖。考慮thread_count等於2,也就是這個barrier須要等待兩個線程一塊兒經過。

 

如今有兩個線程,t1和t2,t1先執行直到17行,卡住,這時候t2得到寶貴的cpu機會。很明顯,這時會進入14行,更新could_release的值。若是這個時候t1得到執行機會,萬事大吉,t1會離開while區域,繼續執行。直到下次再次到達這個barrier。

 

可是若是這個時候t1並無得到執行機會,t2一直執行,雖然喚醒了could_relase,可是t1會一直停留在18行。要知道,這個含有barrier的代碼多是在一個循環之中,若是t2再次到達barrier的區域,這時候arrived_count等於0(由於arrived_count在上一次t2進入13行以後重置了),這個時候could_relase會變成false。如今t1,t2都在18行了,沒有人有機會去更新could_relase的值,線程死鎖了。

 

怎麼辦?仔細思考下,是喚醒機制有問題,很明顯,若是可以在喚醒的時候原子式的喚醒全部的線程,那麼上面所說的問題就不存在了。在不少語言裏都有這樣的方法能夠完成上面說的原子性的喚醒全部線程,好比c++裏面的notify_all。可是,若是沒有這個函數,該如何實現呢?

 

上面死鎖問題的誕生在於一個線程不恰當的更新了全局的could_relase,致使所有的判斷條件跟着錯誤的改變。解決這樣的問題,須要的是一個只有每一個線程各自能看到,能夠獨立更新,互相不干擾而又能被使用的變量。幸虧,在設計多線程概念時,有一個概念叫作thread local,恰好可以知足這個要求。而運用這樣的變量,上述的概念能夠表述成爲:

 1 thread_count = n; <-- n是須要一塊兒等待的線程的個數
 2 arrived_count = 0; <-- 到達線程的個數
 3 could_release = false;
 4 thread_local_flag = could_release; <-- 線程局部變量,每一個線程獨立更新 
 5 -------------------------------------------------------------
 6  以上是全局變量,只會初始化一次,如下是barrier開始的代碼
 7 -------------------------------------------------------------
 8 thread_local_flag = !thread_local_flag
 9 lock();
10     arrived_count += 1;
11 unlock();
12 if(arrived_count == thread_count)
13     could_realse = thread_local_flag;    
14     arrived_count = 0; 
15 else
16     while(could_release != thread_local_flag)
17         spin()

這裏要着重解釋下,爲何不會死鎖,因爲thread_local_flag是每一個線程獨立更新的,因此很明顯,其是不用加鎖的。其他代碼和上面的僞代碼相似,不一樣的是,若是發生上面同樣的狀況,t2更新thread_local_flag的時候,只有其局部的變量會被置反而不會影響其他的線程的變量,而由於could_realse是全局變量,在t2第一次執行到13行的時候已經設置成thread_local_flag同樣的值了。這個時候, 哪怕t2再次執行到16行也會由於其內部變量已經被置反而阻塞在這個while循環之中。而t1只要得到執行機會,就能夠經過這個barrier。

 

有點繞,可是仔細想一想仍是蠻有意思的。

 

3、如何運用c++實現Barrier?

 

雖然上面說了那麼多,可是c++中實現Barrier不須要這麼複雜,這要感謝c++ 11中已經自帶了不少原子性的操做,好比上面說的notify_all。因此,代碼就沒有那麼複雜了,固然,c++也有thread_local,若是不畏勞苦,能夠真的從最基礎的寫起。

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>using namespace std;
​
class TestBarrier{
public:
    TestBarrier(int nThreadCount):
        m_threadCount(nThreadCount),
        m_count(0),
        m_release(0)
    {}
​
    void wait1(){
        unique_lock<mutex> lk(m_lock);
        if(m_count == 0){
            m_release = 0;
        }
        m_count++;
        if(m_count == m_threadCount){
            m_count = 0;
            m_release = 1;
            m_cv.notify_all();
        }
        else{
            m_cv.wait(lk, [&]{return m_release == 1;});
        } 
    }
​
private:
    mutex m_lock;
    condition_variable m_cv;
    unsigned int m_threadCount;
    unsigned int m_count; 
    unsigned int m_release;
};

這裏多虧了c++標準庫中引進的condition_variable,使得上面的概念能夠簡單高效而又放心的實現,你也不須要操心什麼線程局部量。而關於c++併發相關的種種知識可能須要專門的若干篇幅才能說清楚,若是你並不熟悉c++,能夠跳過這些不知所云的部分。驗證上述代碼可使用以下代碼:

unsigned int threadWaiting = 5;
TestBarrier barrier(5);
​
void func1(){
    this_thread::sleep_for(chrono::seconds(3));
    cout<<"func1"<<endl;
    barrier.wait1();
    cout<<"func1 has awakended!"<<endl;
}
​
void func2(){
    cout<<"func2"<<endl;
    barrier.wait1();
    cout<<"func2 has awakended!"<<endl;
}
​
void func3(){
    this_thread::sleep_for(chrono::seconds(1));
    cout<<"func3"<<endl;
    barrier.wait1();
    cout<<"func3 has awakended!"<<endl;
}
​
int main(){
    for(int i = 0; i < 5; i++){
        thread t1(func1);
        thread t2(func3);
        thread t3(func2);
        thread t4(func3);
        thread t5(func2);
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        t5.join();
    }
}

好了,在我機器上的運行結果是這樣的,因爲輸出沒有同步,因此輸出可能並無想象的那麼整潔。可是不影響總體結果,能夠看到,全部線程到齊以後才各自執行各自後面的代碼:


這篇文章也在個人公衆號同步發表,個人這個公衆號嘛,佛系更新,固然,本質上是想到一個話題不容易(懶的好藉口),歡迎關注哦:

相關文章
相關標籤/搜索