[轉]使用 C++11 編寫 Linux 多線程程序

前言

在這個多核時代,如何充分利用每一個 CPU 內核是一個繞不開的話題,從須要爲成千上萬的用戶同時提供服務的服務端應用程序,到須要同時打開十幾個頁面,每一個頁面都有幾十上百個連接的 web 瀏覽器應用程序,從保持着幾 t 甚或幾 p 的數據的數據庫系統,到手機上的一個有良好用戶響應能力的 app,爲了充分利用每一個 CPU 內核,都會想到是否可使用多線程技術。這裏所說的「充分利用」包含了兩個層面的意思,一個是使用到全部的內核,再一個是內核不空閒,不讓某個內核長時間處於空閒狀態。在 C++98 的時代,C++標準並無包含多線程的支持,人們只能直接調用操做系統提供的 SDK API 來編寫多線程程序,不一樣的操做系統提供的 SDK API 以及線程控制能力不盡相同,到了 C++11,終於在標準之中加入了正式的多線程的支持,從而咱們可使用標準形式的類來建立與執行線程,也使得咱們可使用標準形式的鎖、原子操做、線程本地存儲 (TLS) 等來進行復雜的各類模式的多線程編程,並且,C++11 還提供了一些高級概念,好比 promise/future,packaged_task,async 等以簡化某些模式的多線程編程。linux

多線程可讓咱們的應用程序擁有更加出色的性能,同時,若是沒有用好,多線程又是比較容易出錯的且難以查找錯誤所在,甚至可讓人們以爲本身陷進了泥潭,但願本文可以幫助您更好地使用 C++11 來進行 Linux 下的多線程編程。ios

認識多線程

首先咱們應該正確地認識線程。維基百科對線程的定義是:線程是一個編排好的指令序列,這個指令序列(線程)能夠和其它的指令序列(線程)並行執行,操做系統調度器將線程做爲最小的 CPU 調度單元。在進行架構設計時,咱們應該多從操做系統線程調度的角度去考慮應用程序的線程安排,而不只僅是代碼。c++

當只有一個 CPU 內核可供調度時,多個線程的運行示意以下:web

圖 一、單個 CPU 內核上的多個線程運行示意圖

圖 一、單個 CPU 內核上的多個線程運行示意圖

咱們能夠看到,這時的多線程本質上是單個 CPU 的時間分片,一個時間片運行一個線程的代碼,它能夠支持併發處理,可是不能說是真正的並行計算。編程

當有多個 CPU 或者多個內核可供調度時,能夠作到真正的並行計算,多個線程的運行示意以下:promise

圖 二、雙核 CPU 上的多個線程運行示意圖

圖 二、雙核 CPU 上的多個線程運行示意圖

從上述兩圖,咱們能夠直接獲得使用多線程的一些常見場景:緩存

  • 進程中的某個線程執行了一個阻塞操做時,其它線程能夠依然運行,好比,等待用戶輸入或者等待網絡數據包的時候處理啓動後臺線程處理業務,或者在一個遊戲引擎中,一個線程等待用戶的交互動做輸入,另一個線程在後臺合成下一幀要畫的圖像或者播放背景音樂等。
  • 將某個任務分解爲小的能夠並行進行的子任務,讓這些子任務在不一樣的 CPU 或者內核上同時進行計算,而後彙總結果,好比歸併排序,或者分段查找,這樣子來提升任務的執行速度。

須要注意一點,由於單個 CPU 內核下多個線程並非真正的並行,有些問題,好比 CPU 緩存不一致問題,不必定能表現出來,一旦這些代碼被放到了多核或者多 CPU 的環境運行,就極可能會出現「在開發測試環境一切沒有問題,到了實施現場就莫名其妙」的狀況,因此,在進行多線程開發時,開發與測試環境應該是多核或者多 CPU 的,以免出現這類狀況。網絡

C++11 的線程類 std::thread

C++11 的標準類 std::thread 對線程進行了封裝,它的聲明放在頭文件 thread 中,其中聲明瞭線程類 thread, 線程標識符 id,以及名字空間 this_thread,按照 C++11 規範,這個頭文件至少應該兼容以下內容:

清單 1.例子 thread 頭文件主要內容
namespace std{
 struct thread{
 // native_handle_type 是鏈接 thread 類和操做系統 SDK API 之間的橋樑。
 typedef implementation-dependent native_handle_type;
 native_handle_type native_handle();
 //
 struct id{
 id() noexcept;
 // 能夠由==, < 兩個運算衍生出其它大小關係運算。
 bool operator==(thread::id x, thread::id y) noexcept;
 bool operator<(thread::id x, thread::id y) noexcept;
 template<class charT, class traits>
 basic_ostream<charT, traits>&
 operator<<(basic_ostream<charT, traits>&out, thread::id id);
 // 哈希函數
 template <class T> struct hash;
 template <> struct hash<thread::id>;
 };
 id get_id() const noexcept;
 // 構造與析構
 thread() noexcept;
 template<class F, class… Args> explicit thread(F&f, Args&&… args);
 ~thread();
 thread(const thread&) = delete;
 thread(thread&&) noexcept;
 thread& operator=( const thread&) = delete;
 thread& operator=(thread&&) noexcept;
 //
 void swap(thread&) noexcept;
 bool joinable() const noexcept;
 void join();
 void detach();
 // 獲取物理線程數目
 static unsigned hardware_concurrency() noexcept;
 }
 namespace this_thead{
 thread::id get_id();
 void yield();
 template<class Clock, class Duration>
 void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);
 template<class Rep, class Period>
 void sleep_for(const chromo::duration<Rep, Period>& rel_time);
 }
}

  

和有些語言中定義的線程不一樣,C++11 所定義的線程是和操做系的線程是一一對應的,也就是說咱們生成的線程都是直接接受操做系統的調度的,經過操做系統的相關命令(好比 ps -M 命令)是能夠看到的,一個進程所能建立的線程數目以及一個操做系統所能建立的總的線程數目等都由運行時操做系統限定。

native_handle_type 是鏈接 thread 類和操做系統 SDK API 之間的橋樑,在 g++(libstdc++) for Linux 裏面,native_handle_type 其實就是 pthread 裏面的 pthread_t 類型,當 thread 類的功能不能知足咱們的要求的時候(好比改變某個線程的優先級),能夠經過 thread 類實例的 native_handle() 返回值做爲參數來調用相關的 pthread 函數達到目的。thread::id 定義了在運行時操做系統內惟一可以標識該線程的標識符,同時其值還能指示所標識的線程的狀態,其默認值 (thread::id()) 表示不存在可控的正在執行的線程(即空線程,好比,調用 thead() 生成的沒有指定入口函數的線程類實例),當一個線程類實例的 get_id() 等於默認值的時候,即 get_id() == thread::id(),表示這個線程類實例處於下述狀態之一:

  • 還沒有指定運行的任務
  • 線程運行完畢
  • 線程已經被轉移 (move) 到另一個線程類實例
  • 線程已經被分離 (detached)

空線程 id 字符串表示形式依具體實現而定,有些編譯器爲 0x0,有些爲一句語義解釋。

有時候咱們須要在線程執行代碼裏面對當前調用者線程進行操做,針對這種狀況,C++11 裏面專門定義了一個名字空間 this_thread,其中包括 get_id() 函數可用來獲取當前調用者線程的 id,yield() 函數能夠用來將調用者線程跳出運行狀態,從新交給操做系統進行調度,sleep_until 和 sleep_for 函數則可讓調用者線程休眠若干時間。get_id() 函數其實是經過調用 pthread_self() 函數得到調用者線程的標識符,而 yield() 函數則是經過調用操做系統 API sched_yield() 進行調度切換。

如何建立和結束一個線程

和 pthread_create 不一樣,使用 thread 類建立線程可使用一個函數做爲入口,也能夠是其它的 Callable 對象,並且,能夠給入口傳入任意個數任意類型的參數:

清單 2.例子 thread_run_func_var_args.cc
int funcReturnInt(const char* fmt, ...){
 va_list ap;
 va_start(ap, fmt);
 vprintf( fmt, ap );
 va_end(ap);
 return 0xabcd;
}
void threadRunFunction(void){
 thread* t = new thread(funcReturnInt, "%d%s\n", 100, "\%");
 t->join();
 delete t;
}
咱們也能夠傳入一個 Lambda 表達式做爲入口,好比:

  

清單 3.例子 thread_run_lambda.cc
void threadRunLambda(void){
 int a = 100,
 b = 200;
 thread* t = new thread( [](int ia, int ib){
 cout << (ia + ib) << endl;
 },
 a,
 b );
 t->join();
 delete t;
}

  

一個類的成員函數也能夠做爲線程入口:

清單 4.例子 thread_run_member_func.cc
struct God{
 void create(const char* anything){
 cout << "create " << anything << endl;
 }
};
void threadRunMemberFunction(void){
 God god;
 thread* t = new thread( &God::create, god, "the world" );
 t->join();
 delete t;
}

  

雖然 thread 類的初始化能夠提供這麼豐富和方便的形式,其實現的底層依然是建立一個 pthread 線程並運行之,有些實現甚至是直接調用 pthread_create 來建立。

建立一個線程以後,咱們還須要考慮一個問題:該如何處理這個線程的結束?一種方式是等待這個線程結束,在一個合適的地方調用 thread 實例的 join() 方法,調用者線程將會一直等待着目標線程的結束,當目標線程結束以後調用者線程繼續運行;另外一個方式是將這個線程分離,由其本身結束,經過調用 thread 實例的 detach() 方法將目標線程置於分離模式。一個線程的 join() 方法與 detach() 方法只能調用一次,不能在調用了 join() 以後又調用 detach(),也不能在調用 detach() 以後又調用 join(),在調用了 join() 或者 detach() 以後,該線程的 id 即被置爲默認值(空線程),表示不能繼續再對該線程做修改變化。若是沒有調用 join() 或者 detach(),那麼,在析構的時候,該線程實例將會調用 std::terminate(),這會致使整個進程退出,因此,若是沒有特別須要,通常都建議在生成子線程後調用其 join() 方法等待其退出,這樣子最起碼知道這些子線程在何時已經確保結束。

在 C++11 裏面沒有提供 kill 掉某個線程的能力,只能被動地等待某個線程的天然結束,若是咱們要主動中止某個線程的話,能夠經過調用 Linux 操做系統提供的 pthread_kill 函數給目標線程發送信號來實現,示例以下:

清單 5.例子 thread_kill.cc
static void on_signal_term(int sig){
 cout << "on SIGTERM:" << this_thread::get_id() << endl;
 pthread_exit(NULL); 
}
void threadPosixKill(void){
 signal(SIGTERM, on_signal_term);
 thread* t = new thread( [](){
 while(true){
 ++counter;
 }
 });
 pthread_t tid = t->native_handle();
 cout << "tid=" << tid << endl;
 // 確保子線程已經在運行。
 this_thread::sleep_for( chrono::seconds(1) );
 pthread_kill(tid, SIGTERM);
 t->join();
 delete t;
 cout << "thread destroyed." << endl;
}

  

上述例子還能夠用來給某個線程發送其它信號,具體的 pthread_exit 函數調用的約定依賴於具體的操做系統的實現,因此,這個方法是依賴於具體的操做系統的,並且,由於在 C++11 裏面沒有這方面的具體約定,用這種方式也是依賴於 C++編譯器的具體實現的。

線程類 std::thread 的其它方法和特色

thread 類是一個特殊的類,它不能被拷貝,只能被轉移或者互換,這是符合線程的語義的,不要忘記這裏所說的線程是直接被操做系統調度的。線程的轉移使用 move 函數,示例以下:

清單 6.例子 thread_move.cc
void threadMove(void){
 int a = 1;
 thread t( [](int* pa){
 for(;;){
 *pa = (*pa * 33) % 0x7fffffff;
 if ( ( (*pa) >> 30) & 1) break;
 }
 }, &a);
 thread t2 = move(t);   // 改成 t2 = t 將不能編譯。
 t2.join();
 cout << "a=" << a << endl;
}

  

在這個例子中,若是將 t2.join() 改成 t.join() 將會致使整個進程被結束,由於忘記了調用 t2 也就是被轉移的線程的 join() 方法,從而致使整個進程被結束,而 t 則由於已經被轉移,其 id 已被置空。

線程實例互換使用 swap 函數,示例以下:

清單 7.例子 thread_swap.cc
void threadSwap(void){
 int a = 1;
 thread t( [](int* pa){
 for(;;){
 *pa = (*pa * 33) % 0x7fffffff;
 if ( ( (*pa) >> 30) & 1) break;
 }
 }, &a);
 thread t2;
 cout << "before swap: t=" << t.get_id() 
 << ", t2=" << t2.get_id() << endl;
 swap(t, t2);
 cout << "after swap : t=" << t.get_id() 
 << ", t2=" << t2.get_id() << endl;
 t2.join();
 cout << "a=" << a << endl;
}

  

互換和轉移很相似,可是互換僅僅進行實例(以 id 做標識)的互換,而轉移則在進行實例標識的互換以前,還進行了轉移目的實例(以下例的t2)的清理,若是 t2 是可聚合的(joinable() 方法返回 true),則調用 std::terminate(),這會致使整個進程退出,好比下面這個例子:

清單 8.例子 thread_move_term.cc
void threadMoveTerm(void){
 int a = 1;
 thread t( [](int* pa){
 for(;;){
 *pa = (*pa * 33) % 0x7fffffff;
 if ( ( (*pa) >> 30) & 1) break;
 }
 }, &a);
 thread t2( [](){
 int i = 0;
 for(;;)i++;
 } );
 t2 = move(t);  // 將會致使 std::terminate()
 cout << "should not reach here" << endl;
 t2.join();
}

  

因此,在進行線程實例轉移的時候,要注意判斷目的實例的 id 是否爲空值(即 id())。

若是咱們繼承了 thread 類,則還須要禁止拷貝構造函數、拷貝賦值函數以及賦值操做符重載函數等,另外,thread 類的析構函數並非虛析構函數。示例以下:

清單 9.例子 thread_inherit.cc
class MyThread : public thread{
public:
 MyThread() noexcept : thread(){};
 template<typename Callable, typename... Args>
 explicit
 MyThread(Callable&& func, Args&&... args) : 
 thread( std::forward<Callable>(func), 
 std::forward<Args>(args)...){
 }
 ~MyThread() { thread::~thread(); }
 // disable copy constructors
 MyThread( MyThread& ) = delete;
 MyThread( const MyThread& ) = delete;
 MyThread& operator=(const MyThread&) = delete;
};

  

由於 thread 類的析構函數不是虛析構函數,在上例中,須要避免出現下面這種狀況:

MyThread* tc = new MyThread(...);

...

thread* tp = tc;

...

delete tp;

這種狀況會致使 MyThread 的析構函數沒有被調用。

線程的調度

咱們能夠調用 this_thread::yield() 將當前調用者線程切換到從新等待調度,可是不能對非調用者線程進行調度切換,也不能讓非調用者線程休眠(這是操做系統調度器乾的活)。

清單 10.例子 thread_yield.cc
void threadYield(void){
 unsigned int procs = thread::hardware_concurrency(), // 獲取物理線程數目
 i = 0;
 thread* ta = new thread( [](){
 struct timeval t1, t2;
 gettimeofday(&t1, NULL);
 for(int i = 0, m = 13; i < COUNT; i++, m *= 17){
 this_thread::yield();
 }
 gettimeofday(&t2, NULL);
 print_time(t1, t2, " with yield");
 } );
 thread** tb = new thread*[ procs ];
 for( i = 0; i < procs; i++){
 tb[i] = new thread( [](){
 struct timeval t1, t2;
 gettimeofday(&t1, NULL);
 for(int i = 0, m = 13; i < COUNT; i++, m *= 17){
 do_nothing();
 }
 gettimeofday(&t2, NULL);
 print_time(t1, t2, "without yield");
 });
 }
 ta->join();
 delete ta;
 for( i = 0; i < procs; i++){
 tb[i]->join();
 delete tb[i];
 };
 delete tb;
}

  ta 線程由於須要常常切換去從新等待調度,它運行的時間要比 tb 要多,好比在做者的機器上運行獲得以下結果:

$time ./a.out
without yield elapse 0.050199s
without yield elapse 0.051042s
without yield elapse 0.05139s
without yield elapse 0.048782s
 with yield elapse 1.63366s
real    0m1.643s
user    0m1.175s
sys 0m0.611s

  

ta 線程即便扣除系統調用運行時間 0.611s 以後,它的運行時間也遠大於沒有進行切換的線程。

C++11 沒有提供調整線程的調度策略或者優先級的能力,若是須要,只能經過調用相關的 pthread 函數來進行,須要的時候,能夠經過調用 thread 類實例的 native_handle() 方法或者操做系統 API pthread_self() 來得到 pthread 線程 id,做爲 pthread 函數的參數。

線程間的數據交互和數據爭用 (Data Racing)

同一個進程內的多個線程之間可能是免不了要有數據互相來往的,隊列和共享數據是實現多個線程之間的數據交互的經常使用方式,封裝好的隊列使用起來相對來講不容易出錯一些,而共享數據則是最基本的也是較容易出錯的,由於它會產生數據爭用的狀況,即有超過一個線程試圖同時搶佔某個資源,好比對某塊內存進行讀寫等,以下例所示:

清單 11.例子 thread_data_race.cc
static void
inc(int *p ){
 for(int i = 0; i < COUNT; i++){
 (*p)++;
 }
}
void threadDataRacing(void){
 int a = 0;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

  

這是簡化了的極端狀況,咱們能夠一眼看出來這是兩個線程在同時對&a 這個內存地址進行寫操做,可是在實際工做中,在代碼的海洋中發現它並不必定容易。從表面看,兩個線程執行完以後,最後的 a 值應該是 COUNT * 2,可是實際上並不是如此,由於簡單如 (*p)++這樣的操做並非一個原子動做,要解決這個問題,對於簡單的基本類型數據如字符、整型、指針等,C++提供了原子模版類 atomic,而對於複雜的對象,則提供了最經常使用的鎖機制,好比互斥類 mutex,門鎖 lock_guard,惟一鎖 unique_lock,條件變量 condition_variable 等。

如今咱們使用原子模版類 atomic 改造上述例子獲得預期結果:

清單 12.例子 thread_atomic.cc
static void
inc(atomic<int> *p ){
 for(int i = 0; i < COUNT; i++){
 (*p)++;
 }
}
void threadDataRacing(void){
 atomic<int> a(0) ;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

  

咱們也可使用 lock_guard,lock_guard 是一個範圍鎖,本質是 RAII(Resource Acquire Is Initialization),在構建的時候自動加鎖,在析構的時候自動解鎖,這保證了每一次加鎖都會獲得解鎖。即便是調用函數發生了異常,在清理棧幀的時候也會調用它的析構函數獲得解鎖,從而保證每次加鎖都會解鎖,可是咱們不能手工調用加鎖方法或者解鎖方法來進行更加精細的資源佔用管理,使用 lock_guard 示例以下:

清單 13.例子 thread_lock_guard.cc
static mutex g_mutex;
static void
inc(int *p ){
 for(int i = 0; i < COUNT; i++){
 lock_guard<mutex> _(g_mutex);
 (*p)++;
 }
}
void threadLockGuard(void){
 int a = 0;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

  

若是要支持手工加鎖,能夠考慮使用 unique_lock 或者直接使用 mutex。unique_lock 也支持 RAII,它也能夠一次性將多個鎖加鎖;若是使用 mutex 則直接調用 mutex 類的 lock, unlock, trylock 等方法進行更加精細的鎖管理:

清單 14.例子 thread_mutex.cc
static mutex g_mutex;
static void
inc(int *p ){
 thread_local int i; // TLS 變量
 for(; i < COUNT; i++){
 g_mutex.lock();
 (*p)++;
 g_mutex.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc, &a);
 thread tb( inc, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

  

在上例中,咱們還使用了線程本地存儲 (TLS) 變量,咱們只須要在變量前面聲明它是 thread_local 便可。TLS 變量在線程棧內分配,線程棧只有在線程建立以後才生效,在線程退出的時候銷燬,須要注意不一樣系統的線程棧的大小是不一樣的,若是 TLS 變量佔用空間比較大,須要注意這個問題。TLS 變量通常不能跨線程,其初始化在調用線程第一次使用這個變量時進行,默認初始化爲 0。

對於線程間的事件通知,C++11 提供了條件變量類 condition_variable,可視爲 pthread_cond_t 的封裝,使用條件變量可讓一個線程等待其它線程的通知 (wait,wait_for,wait_until),也能夠給其它線程發送通知 (notify_one,notify_all),條件變量必須和鎖配合使用,在等待時由於有解鎖和從新加鎖,因此,在等待時必須使用能夠手工解鎖和加鎖的鎖,好比 unique_lock,而不能使用 lock_guard,示例以下:

清單 15.例子 thread_cond_var.cc
#include <thread>
#include <iostream>
#include <condition_variable>
using namespace std;
mutex m;
condition_variable cv;
void threadCondVar(void){
# define THREAD_COUNT 10
 thread** t = new thread*[THREAD_COUNT];
 int i;
 for(i = 0; i < THREAD_COUNT; i++){
 t[i] = new thread( [](int index){
 unique_lock<mutex> lck(m);
 cv.wait_for(lck, chrono::hours(1000));
 cout << index << endl;
 }, i );
 this_thread::sleep_for( chrono::milliseconds(50));
 }
 for(i = 0; i < THREAD_COUNT; i++){
 lock_guard<mutex> _(m);
 cv.notify_one();
 }
 for(i = 0; i < THREAD_COUNT; i++){
 t[i]->join();
 delete t[i];
 }
 delete t;
}

  

幾個高級概念

C++11 提供了若干多線程編程的高級概念:promise/future, packaged_task, async,來簡化多線程編程,尤爲是線程之間的數據交互比較簡單的狀況下,讓咱們能夠將注意力更多地放在業務處理上。

promise/future 能夠用來在線程之間進行簡單的數據交互,而不須要考慮鎖的問題,線程 A 將數據保存在一個 promise 變量中,另一個線程 B 能夠經過這個 promise 變量的 get_future() 獲取其值,當線程 A 還沒有在 promise 變量中賦值時,線程 B 也能夠等待這個 promise 變量的賦值:

清單 16.例子 thread_promise_future.cc
promise<string> val;
static void
threadPromiseFuture(){
 thread ta([](){
 future<string> fu = val.get_future();
 cout << "waiting promise->future" << endl;
 cout << fu.get() << endl;
 });
 thread tb([](){
 this_thread::sleep_for( chrono::milliseconds(100) );
 val.set_value("promise is set");
 });
 ta.join();
 tb.join();
}

  

一個 future 變量只能調用一次 get(),若是須要屢次調用 get(),可使用 shared_future,經過 promise/future 還能夠在線程之間傳遞異常。

若是將一個 callable 對象和一個 promise 組合,那就是 packaged_task,它能夠進一步簡化操做:

清單 17.例子 thread_packaged_task.cc
static mutex g_mutex;
static void
threadPackagedTask(){
 auto run = [=](int index){ 
 {
 lock_guard<mutex> _(g_mutex);
 cout << "tasklet " << index << endl;
 }
 this_thread::sleep_for( chrono::seconds(10) );
 return index * 1000;
 };
 packaged_task<int(int)> pt1(run);
 packaged_task<int(int)> pt2(run);
 thread t1([&](){pt1(2);} );
 thread t2([&](){pt2(3);} );
 int f1 = pt1.get_future().get();
 int f2 = pt2.get_future().get();
 cout << "task result=" << f1 << endl;
 cout << "task result=" << f2 << endl;
 t1.join();
 t2.join();
}

  

咱們還能夠試圖將一個 packaged_task 和一個線程組合,那就是 async() 函數。使用 async() 函數啓動執行代碼,返回一個 future 對象來保存代碼返回值,不須要咱們顯式地建立和銷燬線程等,而是由 C++11 庫的實現決定什麼時候建立和銷燬線程,以及建立幾個線程等,示例以下:

清單 18.例子 thread_async.cc
static long
do_sum(vector<long> *arr, size_t start, size_t count){
 static mutex _m;
 long sum = 0;
 for(size_t i = 0; i < count; i++){
 sum += (*arr)[start + i];
 }
 {
 lock_guard<mutex> _(_m);
 cout << "thread " << this_thread::get_id() 
 << ", count=" << count
 << ", sum=" << sum << endl;
 }
 return sum;
}
static void
threadAsync(){
# define COUNT 1000000
 vector<long> data(COUNT);
 for(size_t i = 0; i < COUNT; i++){
 data[i] = random() & 0xff;
 }
 //
 vector< future<long> > result;
 size_t ptc = thread::hardware_concurrency() * 2;
 for(size_t batch = 0; batch < ptc; batch++){
 size_t batch_each = COUNT / ptc;
 if (batch == ptc - 1){
 batch_each = COUNT - (COUNT / ptc * batch);
 }
 result.push_back(async(do_sum, &data, batch * batch_each, batch_each));
 }
 long total = 0;
 for(size_t batch = 0; batch < ptc; batch++){
 total += result[batch].get();
 }
 cout << "total=" << total << endl;
}

  

若是是在多核或者多 CPU 的環境上面運行上述例子,仔細觀察輸出結果,可能會發現有些線程 ID 是重複的,這說明重複使用了線程,也就是說,經過使用 async() 還可達到一些線程池的功能。

幾個須要注意的地方

thread 同時也是棉線、毛線、絲線等意思,我想你們都能體會面對一團亂麻不知從何處查找頭緒的感覺,不要忘了,線程不是靜態的,它是不斷變化的,請想像一下面對一團會動態變化的亂麻的情景。因此,使用多線程技術的首要準則是咱們本身要十分清楚咱們的線程在哪裏?線頭(線程入口和出口)在哪裏?先安排好線程的運行,注意不一樣線程的交叉點(訪問或者修改同一個資源,包括內存、I/O 設備等),儘可能減小線程的交叉點,要知道幾條線堆在一塊兒最怕的是互相打結。

當咱們的確須要不一樣線程訪問一個共同的資源時,通常都須要進行加鎖保護,不然極可能會出現數據不一致的狀況,從而出現各類時現時不現的莫名其妙的問題,加鎖保護時有幾個問題須要特別注意:一是一個線程內連續屢次調用非遞歸鎖 (non-recursive lock) 的加鎖動做,這極可能會致使異常;二是加鎖的粒度;三是出現死鎖 (deadlock),多個線程互相等待對方釋放鎖致使這些線程所有處於罷工狀態。

第一個問題只要根據場景調用合適的鎖便可,當咱們可能會在某個線程內重複調用某個鎖的加鎖動做時,咱們應該使用遞歸鎖 (recursive lock),在 C++11 中,能夠根據須要來使用 recursive_mutex,或者 recursive_timed_mutex。

第二個問題,即鎖的粒度,原則上應該是粒度越小越好,那意味着阻塞的時間越少,效率更高,好比一個數據庫,給一個數據行 (data row) 加鎖固然比給一個表 (table) 加鎖要高效,可是同時複雜度也會越大,越容易出錯,好比死鎖等。

對於第三個問題咱們須要先看下出現死鎖的條件:

  1. 資源互斥,某個資源在某一時刻只能被一個線程持有 (hold);
  2. 吃着碗裏的還看着鍋裏的,持有一個以上的互斥資源的線程在等待被其它進程持有的互斥資源;
  3. 不可搶佔,只有在某互斥資源的持有線程釋放了該資源以後,其它線程才能去持有該資源;
  4. 環形等待,有兩個或者兩個以上的線程各自持有某些互斥資源,而且各自在等待其它線程所持有的互斥資源。

咱們只要不讓上述四個條件中的任意一個不成當即可。在設計的時候,很是有必要先分析一下會否出現知足四個條件的狀況,特別是檢查有無試圖去同時保持兩個或者兩個以上的鎖,當咱們發現試圖去同時保持兩個或者兩個以上的鎖的時候,就須要特別警戒了。下面咱們來看一個簡化了的死鎖的例子:

清單 19.例子 thread_deadlock.cc
static mutex g_mutex1, g_mutex2;
static void
inc1(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex1.lock();
 (*p)++;
 g_mutex2.lock();
 // do something.
 g_mutex2.unlock();
 g_mutex1.unlock();
 }
}
static void
inc2(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex2.lock();
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 // do other thing.
 g_mutex2.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc1, &a);
 thread tb( inc2, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

  

在這個例子中,g_mutex1 和 g_mutex2 都是互斥的資源,任意時刻都只有一個線程能夠持有(加鎖成功),並且只有持有線程調用 unlock 釋放鎖資源的時候其它線程才能去持有,知足條件 1 和 3,線程 ta 持有了 g_mutex1 以後,在釋放 g_mutex1 以前試圖去持有 g_mutex2,而線程 tb 持有了 g_mutex2 以後,在釋放 g_mutex2 以前試圖去持有 g_mutex1,知足條件 2 和 4,這種狀況之下,當線程 ta 試圖去持有 g_mutex2 的時候,若是 tb 正持有 g_mutex2 而試圖去持有 g_mutex1 時就發生了死鎖。在有些環境下,可能要屢次運行這個例子纔出現死鎖,實際工做中這種偶現特性讓查找問題變難。要破除這個死鎖,咱們只要按以下代碼所示破除條件 3 和 4 便可:

清單 20.例子 thread_break_deadlock.cc
static mutex g_mutex1, g_mutex2;
static voi
inc1(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 g_mutex2.lock();
 // do something.
 g_mutex2.unlock();
 }
}
static void
inc2(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex2.lock();
 // do other thing.
 g_mutex2.unlock();
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc1, &a);
 thread tb( inc2, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

  

在一些複雜的並行編程場景,如何避免死鎖是一個很重要的話題,在實踐中,當咱們看到有兩個鎖嵌套加鎖的時候就要特別提升警戒,它極有可能知足了條件 2 或者 4。

結束語

上述例子在 CentOS 6.5,g++ 4.8.1/g++4.9 以及 clang 3.5 下面編譯經過,在編譯的時候,請注意下述幾點:

  • 設置 -std=c++11;
  • 連接的時候設置 -pthread;
  • 使用 g++編譯連接時設置 -Wl,--no-as-needed 傳給連接器,有些版本的 g++須要這個設置;
  • 設置宏定義 -D_REENTRANT,有些庫函數是依賴於這個宏定義來肯定是否使用多線程版本的。

具體能夠參考本文所附的代碼中的 Makefile 文件。

在用 gdb 調試多線程程序的時候,能夠輸入命令 info threads 查看當前的線程列表,經過命令 thread n 切換到第 n 個線程的上下文,這裏的 n 是 info threads 命令輸出的線程索引數字,例如,若是要切換到第 2 個線程的上下文,則輸入命令 thread 2。

聰明地使用多線程,擁抱多線程吧。

 

相關主題

相關文章
相關標籤/搜索