線程就是,在同一程序同一時間內容許執行不一樣函數的離散處理隊列。 這使得一個長時間去進行某種特殊運算的函數在執行時不阻礙其餘的函數變得十分重要。 線程實際上容許同時執行兩種函數,而這兩個函數沒必要相互等待。php
一旦一個應用程序啓動,它僅包含一個默認線程。 此線程執行 main()
函數。 在 main()
中被調用的函數則按這個線程的上下文順序地執行。 這樣的程序稱爲單線程程序。html
反之,那些建立新的線程的程序就是多線程程序。 他們不只能夠在同一時間執行多個函數,並且這在現在多核盛行的時代顯得尤其重要。 既然多核容許同時執行多個函數,這就使得對開發人員相應地使用這種處理能力提出了要求。 然而線程一直被用來當併發地執行多個函數,開發人員如今不得不仔細地構建應用來支持這種併發。 多線程編程知識也所以在多核系統時代變得愈來愈重要。ios
本章將介紹C++ Boost庫 Boost.Thread,它能夠開發獨立於平臺的多線程應用程序。編程
在這個庫最重要的一個類就是 boost::thread
,它是在 boost/thread.hpp
裏定義的,用來建立一個新線程。下面的示例來講明如何運用它。數組
#include <boost/thread.hpp> #include <iostream> void wait(int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } void thread() { for (int i = 0; i < 5; ++i) { wait(1); std::cout << i << std::endl; } } int main() { boost::thread t(thread); t.join(); }
新建線程裏執行的那個函數的名稱被傳遞到 boost::thread
的構造函數。 一旦上述示例中的變量t 被建立,該 thread()
函數就在其所在線程中被當即執行。 同時在 main()
裏也併發地執行該 thread()
。安全
爲了防止程序終止,就須要對新建線程調用 join()
方法。 join()
方法是一個阻塞調用:它能夠暫停當前線程,直到調用 join()
的線程運行結束。 這就使得main()
函數一直會等待到 thread()
運行結束。多線程
正如在上面的例子中看到,一個特定的線程能夠經過諸如 t 的變量訪問,經過這個變量等待着它的使用 join()
方法終止。 可是,即便 t 越界或者析構了,該線程也將繼續執行。 一個線程老是在一開始就綁定到一個類型爲 boost::thread
的變量,可是一旦建立,就不在取決於它。 甚至還存在着一個叫 detach()
的方法,容許類型爲boost::thread
的變量從它對應的線程裏分離。 固然了,像 join()
的方法以後也就不能被調用,由於這個變量再也不是一個有效的線程。併發
任何一個函數內能夠作的事情也能夠在一個線程內完成。 歸根結底,一個線程只不過是一個函數,除了它是同時執行的。 在上述例子中,使用一個循環把5個數字寫入標準輸出流。 爲了減緩輸出,每個循環中調用wait()
函數讓執行延遲了一秒。 wait()
能夠調用一個名爲sleep()
的函數,這個函數也來自於 Boost.Thread,位於 boost::this_thread
名空間內。dom
sleep()
要麼在預計的一段時間或一個特定的時間點後時才讓線程繼續執行。 經過傳遞一個類型爲boost::posix_time::seconds
的對象,在這個例子裏咱們指定了一段時間。boost::posix_time::seconds
來自於 Boost.DateTime 庫,它被 Boost.Thread 用來管理和處理時間的數據。函數
雖然前面的例子說明了如何等待一個不一樣的線程,但下面的例子演示瞭如何經過所謂的中斷點讓一個線程中斷。
#include <boost/thread.hpp> #include <iostream> void wait(int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } void thread() { try { for (int i = 0; i < 5; ++i) { wait(1); std::cout << i << std::endl; } } catch (boost::thread_interrupted&) { } } int main() { boost::thread t(thread); wait(3); t.interrupt(); t.join(); }
在一個線程對象上調用 interrupt()
會中斷相應的線程。 在這方面,中斷意味着一個類型爲boost::thread_interrupted
的異常,它會在這個線程中拋出。 而後這隻有在線程達到中斷點時纔會發生。
若是給定的線程不包含任何中斷點,簡單調用 interrupt()
就不會起做用。 每當一個線程中斷點,它就會檢查interrupt()
是否被調用過。 只有被調用過了, boost::thread_interrupted
異常纔會相應地拋出。
Boost.Thread定義了一系列的中斷點,例如 sleep()
函數。 因爲 sleep()
在這個例子裏被調用了五次,該線程就檢查了五次它是否應該被中斷。 然而 sleep()
之間的調用,卻不能使線程中斷。
一旦該程序被執行,它只會打印三個數字到標準輸出流。 這是因爲在main裏3秒後調用 interrupt()
方法。 所以,相應的線程被中斷,並拋出一個boost::thread_interrupted
異常。 這個異常在線程內也被正確地捕獲, catch
處理雖然是空的。 因爲 thread()
函數在處理程序後返回,線程也被終止。 這反過來也將終止整個程序,由於main()
等待該線程使用join()終止該線程。
Boost.Thread定義包括上述 sleep()
函數十個中斷。 有了這些中斷點,線程能夠很容易及時中斷。 然而,他們並不老是最佳的選擇,由於中斷點必須事前讀入以檢查boost::thread_interrupted
異常。
爲了提供一個對 Boost.Thread 裏提供的多種函數的總體概述,下面的例子將會再介紹兩個。
#include <boost/thread.hpp> #include <iostream> int main() { std::cout << boost::this_thread::get_id() << std::endl; std::cout << boost::thread::hardware_concurrency() << std::endl; }
使用 boost::this_thread
命名空間,能提供獨立的函數應用於當前線程,好比前面出現的 sleep()
。 另外一個是 get_id()
:它會返回一個當前線程的ID號。 它也是由boost::thread
提供的。
boost::thread
類提供了一個靜態方法 hardware_concurrency()
,它可以返回基於CPU數目或者CPU內核數目的刻在同時在物理機器上運行的線程數。 在經常使用的雙核機器上調用這個方法,返回值爲2。 這樣的話就能夠肯定在一個多核程序能夠同時運行的理論最大線程數。
雖然多線程的使用能夠提升應用程序的性能,但也增長了複雜性。 若是使用線程在同一時間執行幾個函數,訪問共享資源時必須相應地同步。 一旦應用達到了必定規模,這涉及至關一些工做。 本段介紹了Boost.Thread提供同步線程的類。
#include <boost/thread.hpp> #include <iostream> void wait(int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } boost::mutex mutex; void thread() { for (int i = 0; i < 5; ++i) { wait(1); mutex.lock(); std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; mutex.unlock(); } } int main() { boost::thread t1(thread); boost::thread t2(thread); t1.join(); t2.join(); }
多線程程序使用所謂的互斥對象來同步。 Boost.Thread提供多個的互斥類,boost::mutex
是最簡單的一個。 互斥的基本原則是當一個特定的線程擁有資源的時候防止其餘線程奪取其全部權。 一旦釋放,其餘的線程能夠取得全部權。 這將致使線程等待至另外一個線程完成處理一些操做,從而相應地釋放互斥對象的全部權。
上面的示例使用一個類型爲 boost::mutex
的 mutex 全局互斥對象。thread()
函數獲取此對象的全部權纔在 for
循環內使用lock()
方法寫入到標準輸出流的。 一旦信息被寫入,使用 unlock()
方法釋放全部權。
main()
建立兩個線程,同時執行 thread ()
函數。 利用 for
循環,每一個線程數到5,用一個迭代器寫一條消息到標準輸出流。 不幸的是,標準輸出流是一個全局性的被全部線程共享的對象。 該標準不提供任何保證std::cout 能夠安全地從多個線程訪問。 所以,訪問標準輸出流必須同步:在任什麼時候候,只有一個線程能夠訪問 std::cout。
因爲兩個線程試圖在寫入標準輸出流前得到互斥體,實際上只能保證一次只有一個線程訪問 std::cout。 無論哪一個線程成功調用 lock()
方法,其餘全部線程必須等待,直到 unlock()
被調用。
獲取和釋放互斥體是一個典型的模式,是由Boost.Thread經過不一樣的數據類型支持。 例如,不直接地調用 lock()
和 unlock()
,使用 boost::lock_guard
類也是能夠的。
#include <boost/thread.hpp> #include <iostream> void wait(int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } boost::mutex mutex; void thread() { for (int i = 0; i < 5; ++i) { wait(1); boost::lock_guard<boost::mutex> lock(mutex); std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; } } int main() { boost::thread t1(thread); boost::thread t2(thread); t1.join(); t2.join(); }
boost::lock_guard
在其內部構造和析構函數分別自動調用 lock()
和 unlock()
。 訪問共享資源是須要同步的,由於它顯示地被兩個方法調用。boost::lock_guard
類是另外一個出如今 第 2 章 智能指針的RAII用語。
除了boost::mutex
和 boost::lock_guard
以外,Boost.Thread也提供其餘的類支持各類同步。 其中一個重要的就是boost::unique_lock
,相比較 boost::lock_guard
而言,它提供許多有用的方法。
#include <boost/thread.hpp> #include <iostream> void wait(int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } boost::timed_mutex mutex; void thread() { for (int i = 0; i < 5; ++i) { wait(1); boost::unique_lock<boost::timed_mutex> lock(mutex, boost::try_to_lock); if (!lock.owns_lock()) lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1)); std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; boost::timed_mutex *m = lock.release(); m->unlock(); } } int main() { boost::thread t1(thread); boost::thread t2(thread); t1.join(); t2.join(); }
上面的例子用不一樣的方法來演示 boost::unique_lock
的功能。 固然了,這些功能的用法對給定的情景不必定適用;boost::lock_guard
在上個例子的用法仍是挺合理的。 這個例子就是爲了演示boost::unique_lock
提供的功能。
boost::unique_lock
經過多個構造函數來提供不一樣的方式得到互斥體。 這個指望得到互斥體的函數簡單地調用了lock()
方法,一直等到得到這個互斥體。 因此它的行爲跟 boost::lock_guard
的那個是同樣的。
若是第二個參數傳入一個 boost::try_to_lock
類型的值,對應的構造函數就會調用 try_lock()
方法。 這個方法返回 bool
型的值:若是可以得到互斥體則返回true
,不然返回false
。 相比 lock()
函數,try_lock()
會當即返回,並且在得到互斥體以前不會被阻塞。
上面的程序向 boost::unique_lock
的構造函數的第二個參數傳入boost::try_to_lock。 而後經過owns_lock()
能夠檢查是否可得到互斥體。 若是不能, owns_lock()
返回 false
。 這也用到boost::unique_lock
提供的另一個函數: timed_lock()
等待必定的時間以得到互斥體。 給定的程序等待長達1秒,應較足夠的時間來獲取更多的互斥。
其實這個例子顯示了三個方法獲取一個互斥體:lock()
會一直等待,直到得到一個互斥體。 try_lock()
則不會等待,但若是它只會在互斥體可用的時候才能得到,不然返回 false
。 最後,timed_lock()
試圖得到在必定的時間內獲取互斥體。 和try_lock()
同樣,返回bool
類型的值意味着成功是否。
雖然 boost::mutex
提供了 lock()
和try_lock()
兩個方法,可是 boost::timed_mutex
只支持timed_lock()
,這就是上面示例那麼使用的緣由。 若是不用 timed_lock()
的話,也能夠像之前的例子那樣用 boost::mutex
。
就像 boost::lock_guard
同樣, boost::unique_lock
的析構函數也會相應地釋放互斥量。此外,能夠手動地用 unlock()
釋放互斥量。也能夠像上面的例子那樣,經過調用release()
解除boost::unique_lock
和互斥量之間的關聯。然而在這種狀況下,必須顯式地調用unlock()
方法來釋放互斥量,由於 boost::unique_lock
的析構函數再也不作這件事情。
boost::unique_lock
這個所謂的獨佔鎖意味着一個互斥量同時只能被一個線程獲取。 其餘線程必須等待,直到互斥體再次被釋放。 除了獨佔鎖,還有非獨佔鎖。 Boost.Thread裏有個boost::shared_lock
的類提供了非獨佔鎖。 正以下面的例子,這個類必須和boost::shared_mutex
型的互斥量一塊兒使用。
#include <boost/thread.hpp> #include <iostream> #include <vector> #include <cstdlib> #include <ctime> void wait(int seconds) { boost::this_thread::sleep(boost::posix_time::seconds(seconds)); } boost::shared_mutex mutex; std::vector<int> random_numbers; void fill() { std::srand(static_cast<unsigned int>(std::time(0))); for (int i = 0; i < 3; ++i) { boost::unique_lock<boost::shared_mutex> lock(mutex); random_numbers.push_back(std::rand()); lock.unlock(); wait(1); } } void print() { for (int i = 0; i < 3; ++i) { wait(1); boost::shared_lock<boost::shared_mutex> lock(mutex); std::cout << random_numbers.back() << std::endl; } } int sum = 0; void count() { for (int i = 0; i < 3; ++i) { wait(1); boost::shared_lock<boost::shared_mutex> lock(mutex); sum += random_numbers.back(); } } int main() { boost::thread t1(fill); boost::thread t2(print); boost::thread t3(count); t1.join(); t2.join(); t3.join(); std::cout << "Sum: " << sum << std::endl; }
boost::shared_lock
類型的非獨佔鎖能夠在線程只對某個資源讀訪問的狀況下使用。 一個線程修改的資源須要寫訪問,所以須要一個獨佔鎖。 這樣作也很明顯:只須要讀訪問的線程不須要知道同一時間其餘線程是否訪問。 所以非獨佔鎖能夠共享一個互斥體。
在給定的例子, print()
和 count()
均可以只讀訪問random_numbers 。 雖然 print()
函數把 random_numbers 裏的最後一個數寫到標準輸出,count()
函數把它統計到sum 變量。 因爲沒有函數修改 random_numbers,全部的均可以在同一時間用 boost::shared_lock
類型的非獨佔鎖訪問它。
在 fill()
函數裏,須要用一個 boost::unique_lock
類型的非獨佔鎖,由於它插入了一個新的隨機數到random_numbers。 在 unlock()
顯式地調用 unlock()
來釋放互斥量以後, fill()
等待了一秒。 相比於以前的那個樣子, 在for
循環的尾部調用 wait()
以保證容器裏至少存在一個隨機數,能夠被print()
或者count()
訪問。 對應地,這兩個函數在 for
循環的開始調用了wait()
。
考慮到在不一樣的地方每一個單獨地調用 wait()
,一個潛在的問題變得很明顯:函數調用的順序直接受CPU執行每一個獨立進程的順序決定。 利用所謂的條件變量,能夠同步哪些獨立的線程,使數組的每一個元素都被不一樣的線程當即添加到random_numbers 。
#include <boost/thread.hpp> #include <iostream> #include <vector> #include <cstdlib> #include <ctime> boost::mutex mutex; boost::condition_variable_any cond; std::vector<int> random_numbers; void fill() { std::srand(static_cast<unsigned int>(std::time(0))); for (int i = 0; i < 3; ++i) { boost::unique_lock<boost::mutex> lock(mutex); random_numbers.push_back(std::rand()); cond.notify_all(); cond.wait(mutex); } } void print() { std::size_t next_size = 1; for (int i = 0; i < 3; ++i) { boost::unique_lock<boost::mutex> lock(mutex); while (random_numbers.size() != next_size) cond.wait(mutex); std::cout << random_numbers.back() << std::endl; ++next_size; cond.notify_all(); } } int main() { boost::thread t1(fill); boost::thread t2(print); t1.join(); t2.join(); }
這個例子的程序刪除了 wait()
和 count()
。線程不用在每一個循環迭代中等待一秒,而是儘量快地執行。此外,沒有計算總額;數字徹底寫入標準輸出流。
爲確保正確地處理隨機數,須要一個容許檢查多個線程之間特定條件的條件變量來同步不每一個獨立的線程。
正如上面所說, fill()
函數用在每一個迭代產生一個隨機數,而後放在 random_numbers 容器中。 爲了防止其餘線程同時訪問這個容器,就要相應得使用一個排它鎖。 不是等待一秒,實際上這個例子卻用了一個條件變量。 調用notify_all()
會喚醒每一個哪些正在分別經過調用wait()
等待此通知的線程。
經過查看 print()
函數裏的 for
循環,能夠看到相同的條件變量被wait()
函數調用了。 若是這個線程被 notify_all()
喚醒,它就會試圖這個互斥量,但只有在fill()
函數徹底釋放以後才能成功。
這裏的竅門就是調用 wait()
會釋放相應的被參數傳入的互斥量。 在調用 notify_all()
後, fill()
函數會經過 wait()
相應地釋放線程。 而後它會阻止和等待其餘的線程調用 notify_all()
,一旦隨機數已寫入標準輸出流,這就會在print()
裏發生。
注意到在 print()
函數裏調用 wait()
事實上發生在一個單獨while
循環裏。 這樣作的目的是爲了處理在 print()
函數裏第一次調用wait()
函數以前隨機數已經放到容器裏。 經過比較 random_numbers裏元素的數目與預期值,發現這成功地處理了把隨機數寫入到標準輸出流。
線程本地存儲(TLS)是一個只能由一個線程訪問的專門的存儲區域。 TLS的變量能夠被看做是一個只對某個特定線程而非整個程序可見的全局變量。 下面的例子顯示了這些變量的好處。
#include <boost/thread.hpp> #include <iostream> #include <cstdlib> #include <ctime> void init_number_generator() { static bool done = false; if (!done) { done = true; std::srand(static_cast<unsigned int>(std::time(0))); } } boost::mutex mutex; void random_number_generator() { init_number_generator(); int i = std::rand(); boost::lock_guard<boost::mutex> lock(mutex); std::cout << i << std::endl; } int main() { boost::thread t[3]; for (int i = 0; i < 3; ++i) t[i] = boost::thread(random_number_generator); for (int i = 0; i < 3; ++i) t[i].join(); }
該示例建立三個線程,每一個線程寫一個隨機數到標準輸出流。 random_number_generator()
函數將會利用在C++標準裏定義的std::rand()
函數建立一個隨機數。 可是用於 std::rand()
的隨機數產生器必須先用 std::srand()
正確地初始化。 若是沒作,程序始終打印同一個隨機數。
隨機數產生器,經過 std::time()
返回當前時間, 在 init_number_generator()
函數裏完成初始化。 因爲這個值每次都不一樣,能夠保證產生器老是用不一樣的值初始化,從而產生不一樣的隨機數。 由於產生器只要初始化一次,init_number_generator()
用了一個靜態變量 done 做爲條件量。
若是程序運行了屢次,寫入的三分之二的隨機數顯然就會相同。 事實上這個程序有個缺陷:std::rand()
所用的產生器必須被各個線程初始化。 所以init_number_generator()
的實現其實是不對的,由於它只調用了一次 std::srand()
。使用TLS,這一缺陷能夠獲得糾正。
#include <boost/thread.hpp> #include <iostream> #include <cstdlib> #include <ctime> void init_number_generator() { static boost::thread_specific_ptr<bool> tls; if (!tls.get()) tls.reset(new bool(false)); if (!*tls) { *tls = true; std::srand(static_cast<unsigned int>(std::time(0))); } } boost::mutex mutex; void random_number_generator() { init_number_generator(); int i = std::rand(); boost::lock_guard<boost::mutex> lock(mutex); std::cout << i << std::endl; } int main() { boost::thread t[3]; for (int i = 0; i < 3; ++i) t[i] = boost::thread(random_number_generator); for (int i = 0; i < 3; ++i) t[i].join(); }
用一個TLS變量 tls 代替靜態變量 done,是基於用 bool
類型實例化的boost::thread_specific_ptr
。 原則上, tls 工做起來就像done :它能夠做爲一個條件指明隨機數發生器是否被初始化。 可是關鍵的區別,就是 tls 存儲的值只對相應的線程可見和可用。
一旦一個 boost::thread_specific_ptr
型的變量被建立,它能夠相應地設置。 不過,它指望獲得一個bool
型變量的地址,而非它自己。使用 reset()
方法,能夠把它的地址保存到tls 裏面。 在給出的例子中,會動態地分配一個 bool
型的變量,由 new
返回它的地址,並保存到 tls 裏。 爲了不每次調用 init_number_generator()
都設置tls ,它會經過 get()
函數檢查是否已經保存了一個地址。
因爲 boost::thread_specific_ptr
保存了一個地址,它的行爲就像一個普通的指針。 所以,operator*()
和operator->()
都被被重載以方便使用。 這個例子用 *tls
檢查這個條件當前是 true
仍是 false
。 再根據當前的條件,隨機數生成器決定是否初始化。
正如所見, boost::thread_specific_ptr
容許爲當前進程保存一個對象的地址,而後只容許當前進程得到這個地址。 然而,當一個線程已經成功保存這個地址,其餘的線程就會可能就失敗。
若是程序正在執行時,它可能會使人感到奇怪:儘管有了TLS的變量,生成的隨機數仍然相等。 這是由於,三個線程在同一時間被建立,從而形成隨機數生成器在同一時間初始化。 若是該程序執行了幾回,隨機數就會改變,這就代表生成器初始化正確了。
You can buy solutions to all exercises in this book as a ZIP file.
重構下面的程序用兩個線程來計算總和。因爲如今許多處理器有兩個內核,應利用線程減小執行時間。
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/cstdint.hpp> #include <iostream> int main() { boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); boost::uint64_t sum = 0; for (int i = 0; i < 1000000000; ++i) sum += i; boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); std::cout << end - start << std::endl; std::cout << sum << std::endl; }
經過利用處理器儘量同時執行多的線程,把例1通常化。 例如,若是處理器有四個內核,就應該利用四個線程。
修改下面的程序,在 main()
中本身的線程中執行 thread()
。 程序應該可以計算總和,而後把結果輸入到標準輸出兩次。 但能夠更改 calculate()
,print()
和thread()
的實現,每一個函數的接口仍需保持一致。 也就是說每一個函數應該仍然沒有任何參數,也不須要返回一個值。
#include <iostream> int sum = 0; void calculate() { for (int i = 0; i < 1000; ++i) sum += i; } void print() { std::cout << sum << std::endl; } void thread() { calculate(); print(); } int main() { thread(); }