C++11多線程編程系列-相關庫函數使用

一、C++11 新標準中引入了多個頭文件來支持多線程編程,分別是<atomic> ,<thread>,<mutex>,<condition_variable><future>

<atomic>:該頭文主要聲明瞭兩個類, std::atomic 和 std::atomic_flag,另外還聲明瞭一套 C 風格的原子類型和與 C 兼容的原子操做的函數。
<thread>:該頭文件主要聲明瞭 std::thread 類,另外 std::this_thread 命名空間也在該頭文件中。
<mutex>:該頭文件主要聲明瞭與互斥量(mutex)相關的類,包括 std::mutex 系列類,std::lock_guard, std::unique_lock, 以及其餘的類型和函數
<condition_variable>:該頭文件主要聲明瞭與條件變量相關的類,包括 std::condition_variable 和 std::condition_variable_any
<future>:該頭文件主要聲明瞭 std::promise, std::package_task 兩個 Provider 類,以及 std::future 和 std::shared_future 兩個 Future 類,另外還有一些與之相關的類型和函數,std::async() 函數就聲明在此頭文件中。html

1、 std::thread

a. C++ 11中建立線程很是簡單,使用std::thread類就能夠,thread類定義於thread頭文件,構造thread對象時傳入一個可調用對象做爲參數(若是可調用對象有參數,把參數同時傳入),這樣構造完成後,新的線程立刻被建立,同時執行該可調用對象ios

b. 用std::thread默認的構造函數構造的對象不關聯任何線程;判斷一個thread對象是否關聯某個線程,使用joinable()接口,若是返回true,代表該對象關聯着某個線程(即便該線程已經執行結束);算法

c. "joinable"的對象析構前,必須調用join()接口等待線程結束,或者調用detach()接口解除與線程的關聯,不然會拋異常編程

d. 正在執行的線程從關聯的對象detach後會自主執行直至結束,對應的對象變成不關聯任何線程的對象,joinable()將返回false;promise

e. std::thread沒有拷貝構造函數和拷貝賦值操做符,所以不支持複製操做(可是能夠move),也就是說,沒有兩個 std::thread對象會表示同一執行線程;安全

f. 容易知道,以下幾種狀況下,std::thread對象是不關聯任何線程的(對這種對象調用join或detach接口會拋異常):數據結構

默認構造的thread對象;多線程

被移動後的thread對象;併發

detach 或 join 後的thread對象;
g. 線程函數不只支持普通函數,還能夠是類的成員函數lambda表達式app

h. 線程不像進程,一個進程中的線程之間是沒有父子之分的,都是平級關係。即線程都是同樣的, 退出了一個不會影響另一個。可是所謂的」主線程」main,其入口代碼是相似這樣的方式調用main的:exit(main(…))。main執行完以後, 會調用exit()。exit() 會讓整個進程over終止,那全部線程天然都會退出。

即:若是進程中的任一線程調用了exit,_Exit或者_exit,那麼整個進程就會終止。

i. move操做是將一個進程轉移給另外一個進程,注意進程只能被轉移不能被複制。也能夠用swap交換兩個線程。

 這個程序建立了兩個線程,分別對變量num進行了10000次++操做,因爲兩個線程同時運行,++num也沒有加鎖保護,因此最後的輸出結果在10000到20000之間,有必定隨機性,也證實了++num不是原子操做

2、std::mutex   (輕鬆實現互斥)

常作多線程編程的人必定對mutex(互斥)很是熟悉,C++ 11固然也支持mutex,經過mutex能夠方便的對臨界區域加鎖,std::mutex類定義於mutex頭文件,是用於保護共享數據避免從多個線程同時訪問的同步原語。它提供了lock,try_lock,unlock等幾個接口,功能以下:

  1. 調用方線程從成功調用lock()或try_lock()開始,到unlock()爲止佔有mutex對象;
  2. 線程佔有mutex時,全部其餘線程若試圖要求mutex的全部權,則將阻塞(對於 lock 的調用)或收到false返回值(對於 try_lock )
  3. 調用方線程在調用 lock 或 try_lock 前必須不佔有mutex。
  4. mutex和thread同樣,不可複製(拷貝構造函數和拷貝賦值操做符都被刪除),並且,mutex也不可移動

用mutex改寫上面的例子,達到兩個線程不會同時++num的目的,改寫以下:

 

 

 通過mutex對++語句的保護,使同一時刻,只可能有一個線程對num變量進行++操做,所以,這段程序的輸出必然是20000。

注意:

a.操做系統提供mutex能夠設置屬性,C++11根據mutext的屬性提供四種互斥量,分別是

std::mutex,最經常使用,廣泛的互斥量(默認屬性), 
std::recursive_mutex ,容許同一線程使用recursive_mutext屢次加鎖,而後使用相同次數的解鎖操做解鎖。mutex屢次加鎖會形成死鎖
std::timed_mutex,在mutex上增長了時間的屬性。增長了兩個成員函數try_lock_for(),try_lock_until(),分別接收一個時間範圍再給定的時間內若是互斥量被鎖主了,線程阻塞,超過期間,返回false
std::recursive_timed_mutex,增長遞歸和時間屬性

b.mutex成員函數加鎖解鎖

lock(),互斥量加鎖,若是互斥量已被加鎖,線程阻塞
bool try_lock(),嘗試加鎖,若是互斥量未被加鎖,則執行加鎖操做,返回true;若是互斥量已被加鎖,返回false,線程不阻塞
void unlock(),解鎖互斥量
c. mutex RAII式的加鎖解鎖

std::lock_guard,管理mutex的類。對象構建時傳入mutex,會自動對mutex加入,直到離開類的做用域,析構時完成解鎖。RAII式的棧對象能保證在異常情形下mutex能夠在lock_guard對象析構被解鎖
std::unique_lock 與 lock_guard功能相似,可是比lock_guard的功能更強大。好比std::unique_lock維護了互斥量的狀態,可經過bool owns_lock()訪問,當locked時返回true,不然返回false

3、std::lock_guard   (有做用域的mutex,讓程序更穩定,防止死鎖)

很容易想到,mutex的lock和unlock必須成對調用,lock以後忘記調用unlock將是很是嚴重的錯誤,再次lock時會形成死鎖(其餘線程就永遠沒法獲得鎖)。有時候一段程序中會有各類出口,如return,continue,break等等語句,在每一個出口前記得unlock已經加鎖的mutex是有必定負擔的,而假如程序段中有拋異常的狀況,就更爲隱蔽棘手,C++ 11提供了更好的解決方案,RAII。
類模板std::lock_guard是mutex封裝器,經過便利的RAII機制在其做用域內佔有mutex。

建立lock_guard對象時,它試圖接收給定mutex的全部權。當程序流程離開建立lock_guard對象的做用域時,lock_guard對象被自動銷燬並釋放mutex,lock_guard類也是不可複製的。

通常,須要加鎖的代碼段,咱們用{}括起來造成一個做用域,括號的開端建立lock_guard對象,把mutex對象做爲參數傳入lock_guard的構造函數便可,好比上面的例子加鎖的部分,咱們能夠改寫以下:

 

 

 進入做用域,臨時對象guard建立,獲取mutex控制權(構造函數裏調用了mutex的lock接口),離開做用域,臨時對象guard銷燬,釋放了mutex(析構函數裏調用了unlock接口),這是對mutex的更爲安全的操做方式(對異常致使的執行路徑改變也有效),你們在實踐中應該多多使用;

4、std::unique_guard 

std::lock_guard限制得太死了,只有構造和析構函數,無法經過它的成員函數加鎖和解鎖。爲此,C++11提供了靈活的std:unique_lock模板類。std::unique_lock提供lock和unlock函數,所以能夠在適當的時候加解鎖。這樣能夠下降鎖的粒度。默認狀況下,std::unique_lock的構造函數會對mutex進行加鎖,在析構的時候會對mutex進行解鎖

#include<mutex>
std::unique_lock<std::mutex> ul(g_mutex);//構造函數進行上鎖
......
ul.unlock();//解鎖,下降鎖的粒度
......
ul.lock();
......
//析構函數會進行解鎖
std::unique_lock<std::mutex> ul1(m_mutex, std::defer_lock); //延遲上鎖

  std::unique_lock<std::mutex> ul1(m_mutex, std::adopt_lock);//已經上鎖

延遲上鎖是指在構造函數裏面不須要給它上鎖已經上鎖是表示在構造以前已經上鎖了。  

std::lock_guard也是支持std::adopt_lock的,但不支持std::defer_lock,估計是由於std::lock_guard內部變量記錄鎖的狀態,它只知道在構造函數加鎖(或者由adopt_lock指明無需加鎖),在析構函數解鎖。

對於使用了std::defer_lock的std::unique_lock,之後手動加鎖時要經過std::unique_lock類的lock()函數,而不用std::mutex的lock()函數由於std::unique_lock須要記錄mutex的加鎖狀況

C++11提供了一個模板函數std::lock()使得很容易原子地多個鎖進行加鎖。std::lock函數只要求參數有lock操做便可,也就是說能夠傳一個std::mutex或者std::unique_lock變量給std::lock。std::lock_guard變量則不行,由於其沒有lock()函數

template <class Mutex1, class Mutex2, class... Mutexes>
void lock (Mutex1& a, Mutex2& b, Mutexes&... cde);
std::lock(ul1, ul2);//同時對多個鎖上鎖

5、std::unique_lock與std::lock_guard區別

C++多線程編程中一般會對共享的數據進行寫保護,以防止多線程在對共享數據成員進行讀寫時形成資源爭搶致使程序出現未定義的行爲。一般的作法是在修改共享數據成員的時候進行加鎖--mutex。在使用鎖的時候一般是在對共享數據進行修改以前進行lock操做,在寫完以後再進行unlock操做,常常會出現因爲疏忽致使因爲lock以後在離開共享成員操做區域時忘記unlock,致使死鎖。

針對以上的問題,C++11中引入了std::unique_lock與std::lock_guard兩種數據結構。經過對lock和unlock進行一次薄的封裝,實現自動unlock的功能。

std::mutex mut;  
  
void insert_data()  
{  
       std::lock_guard<std::mutex> lk(mut);  
       queue.push_back(data);  
}  
  
void process_data()  
{  
       std::unqiue_lock<std::mutex> lk(mut);  
       queue.pop();  
}

std::unique_lock 與std::lock_guard都能實現自動加鎖與解鎖功能,可是std::unique_lock要比std::lock_guard更靈活,可是更靈活的代價是佔用空間相對更大一點且相對更慢一點。 

std::unique_lock 的構造函數的數目相對來講比 std::lock_guard ,其中一方面也是由於 std::unique_lock 更加靈活,從而在構造 std::unique_lock 對象時能夠接受額外的參數。總地來講,std::unique_lock 構造函數以下:

default (1) unique_lock() noexcept;
locking (2) explicit unique_lock(mutex_type& m);
try-locking (3) unique_lock(mutex_type& m, try_to_lock_t tag);
deferred (4) unique_lock(mutex_type& m, defer_lock_t tag) noexcept;
adopting (5) unique_lock(mutex_type& m, adopt_lock_t tag);
locking for (6)


template <class Rep, class Period>

unique_lock(mutex_type& m, const chrono::duration<Rep,Period>& rel_time);

locking until (7)


template <class Clock, class Duration>

unique_lock(mutex_type& m, const chrono::time_point<Clock,Duration>& abs_time);

copy [deleted] (8) unique_lock(const unique_lock&) = delete;
move (9) unique_lock(unique_lock&& x);

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

下面咱們來分別介紹以上各個構造函數:

(1) 默認構造函數

新建立的 unique_lock 對象無論理任何 Mutex 對象。

(2) locking 初始化

新建立的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.lock() 對 Mutex 對象進行上鎖,若是此時另外某個 unique_lock 對象已經管理了該 Mutex 對象 m,則當前線程將會被阻塞

(3) try-locking 初始化

新建立的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.try_lock() 對 Mutex 對象進行上鎖,但若是上鎖不成功,並不會阻塞當前線程。

(4) deferred 初始化

新建立的 unique_lock 對象管理 Mutex 對象 m,可是在初始化的時候並不鎖住 Mutex 對象。 m 應該是一個沒有當前線程鎖住的 Mutex 對象。

(5) adopting 初始化

新建立的 unique_lock 對象管理 Mutex 對象 m, m 應該是一個已經被當前線程鎖住的 Mutex 對象。(而且當前新建立的 unique_lock 對象擁有對鎖(Lock)的全部權)。

(6) locking 一段時間(duration)

新建立的 unique_lock 對象管理 Mutex 對象 m,並試圖經過調用 m.try_lock_for(rel_time)鎖住 Mutex 對象一段時間(rel_time)。

(7) locking 直到某個時間點(time point)

新建立的 unique_lock 對象管理 Mutex 對象m,並試圖經過調用 m.try_lock_until(abs_time)在某個時間點(abs_time)以前鎖住 Mutex 對象。

(8) 拷貝構造 [被禁用]

unique_lock 對象不能被拷貝構造。

(9) 移動(move)構造

新建立的 unique_lock 對象得到了由 x 所管理的 Mutex 對象的全部權(包括當前 Mutex 的狀態)。調用 move 構造以後, x 對象如同經過默認構造函數所建立的,就再也不管理任何 Mutex 對象了。

綜上所述,由 (2) 和 (5) 建立的 unique_lock 對象一般擁有 Mutex 對象的鎖。而經過 (1) 和 (4) 建立的則不會擁有鎖。經過 (3),(6) 和 (7) 建立的 unique_lock 對象,則在 lock 成功時得到鎖。

6、condition_variable

條件變量的詳細介紹見https://www.cnblogs.com/GuoXinxin/p/11675053.html

C++裏面使用條件變量實現和信號量相同的功能。下面代碼是一個經典的生產者消費者模型:

#include<thread>
#include<iostream>
#include<mutex>
#include<list>
#include<condition_variable>

std::mutex g_mutex;
std::condition_variable cond;

std::list<int> alist;

void threadFun1()
{
    std::unique_lock<std::mutex> ul(g_mutex);
    while (alist.empty())
    {
        cond.wait(ul);
    }

    std::cout << "threadFun1 get the value : " << alist.front() << std::endl;
    alist.pop_front();
}

void threadFun2()
{
    std::lock_guard<std::mutex> lg(g_mutex);
    alist.push_back(13);

    cond.notify_one();
}

int main()
{
    std::thread th1(threadFun1);
    std::thread th2(threadFun2);

    th1.join();
    th2.join();

    return 0;
}

上面例子之因此用一個while循環而不是if,是由於存在虛假喚醒情景。當notify激活了多個線程以後,若是某個線程率先拿到鎖將數據取空,其餘線程應該再次檢查一下數據是否爲空。  

std::condition_variable 提供了兩種 wait() 函數。當前線程調用 wait() 後將被阻塞(此時當前線程應該得到了鎖(mutex),不妨設得到鎖 lck),直到另外某個線程調用 notify_* 喚醒了當前線程
在線程被阻塞時,該函數會自動調用 lck.unlock() 釋放鎖,使得其餘被阻塞在鎖競爭上的線程得以繼續執行。另外,一旦當前線程得到通知(notified,一般是另外某個線程調用 notify_* 喚醒了當前線程),wait() 函數也是自動調用 lck.lock(),使得 lck 的狀態和 wait 函數被調用時相同

7、future

目的是爲了得到線程函數的返回值,若是使用join的方法,主線程等待次線程結束後,再去讀取全局變量便可。可是join是等待次線程結束,而結束有不少種緣由,好比正常結束和拋異常提早終止。對於後者,並不能保證join返回後,讀取全局變量獲得的就是所要的值。

 

#include<thread>
#include<iostream>
#include<mutex>
#include<vector>
#include<future>
#include<numeric>

void threadFun(const std::vector<int> &big_vec, std::promise<double> prom)
{
    double sum = std::accumulate(big_vec.begin(), big_vec.end(), 0.0);

    double avg = 0;
    if (!big_vec.empty())
        avg = sum / big_vec.size();

    prom.set_value(avg);
}

int main()
{
    std::promise<double> prom;
    std::future<double> fu = prom.get_future();

    std::vector<int> vec{ 1, 2, 3, 4, 5, 6 };
    //以右值引用的方式進行傳遞,本線程中的prom對象轉移給了子線程,保證主線程不會一直阻塞。
    std::thread th(threadFun, std::ref(vec), std::move(prom));
    th.detach();

    double avg = fu.get();//阻塞一直到次線程調用set_value

    std::cout << "avg = " << avg << std::endl;

    return 0;
}

若是在析構std::promise變量時,還沒對std::pormise變量進行設置,那麼析構函數就會爲其關聯的std::future存儲一個std::future_error異常。此時,std::future的get()函數會拋出一個std::futre_error異常。

std::future是一次性的。std::promise只能調用一次get_future,std::future也只能調用一次get()。 若是想在多個線程中共享一個std::promise的設置值,可使用std::shared_future。

有了std::packaged_task,線程函數就能夠直接返回一個值。這樣顯得更加天然。從下面例子也能夠看到,std::packaged_task並不是必定要做爲std::thread的參數,它徹底能夠在主線程中調用。

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>

// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }

void task_lambda()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b); 
    });
    std::future<int> result = task.get_future();

    task(2, 9);

    std::cout << "task_lambda:\t" << result.get() << '\n';
}

void task_bind()
{
    std::packaged_task<int()> task(std::bind(f, 2, 11));
    std::future<int> result = task.get_future();

    task();

    std::cout << "task_bind:\t" << result.get() << '\n';
}

void task_thread()
{
    std::packaged_task<int(int,int)> task(f);
    std::future<int> result = task.get_future();

    std::thread task_td(std::move(task), 2, 10);
    task_td.join();

    std::cout << "task_thread:\t" << result.get() << '\n';
}

int main()
{
    task_lambda();
    task_bind();
    task_thread();
}

再用async進行一層封裝:

#include<thread>
#include<iostream>
#include<vector>
#include<future>
#include<numeric>

double calcAvg(const std::vector<int> &vec)
{
    double sum = std::accumulate(vec.begin(), vec.end(), 0.0);
    double avg = 0;
    if (!vec.empty())
        avg = sum / vec.size();

    return avg;
}


int main()
{
    std::vector<int> vec{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    std::future<double> fu = std::async(calcAvg, std::ref(vec));

    double avg = fu.get();
    std::cout << "avg = " << avg << std::endl;

    return 0;
}

 8、atomic

我的理解,原子操做在性能上優與互斥量,當對某一種數據類型執行原子操做時,它會更多地使用物理機器上提供的原子操做,避免線程阻塞。若是不能夠的話,可能內部會用自旋鎖來解決同步問題。(原子操做,要麼該線程執行完該操做,要麼該操做都不執行)

最基本的是std::atomic_flag ,不過使用更多的是std::atomic,這還針對整型和指針作了模板特化。

在使用atomic時,會涉及到內存模型的概念。順序一致性模型不只在共享存儲系統上適用,在多處理器和多線程環境下也一樣適用。而在多處理器和多線程環境下理解順序一致性包括兩個方面,(1). 從多個線程平行角度來看,程序最終的執行結果至關於多個線程某種交織執行的結果,(2)從單個線程內部執行順序來看,該線程中的指令是按照程序事先已規定的順序執行的(即不考慮運行時 CPU 亂序執行和 Memory Reorder)。

咱們在運行咱們的代碼時,首先會通過編譯器優化(可能會生成打亂順序的彙編語言),CPU也可能會亂序執行指令以實現優化。內存模型對編譯器和 CPU 做出必定的約束才能合理正確地優化你的程序。

9、自旋鎖

互斥鎖得不到鎖時,線程會進入休眠,這類同步機制都有一個共性就是 一旦資源被佔用都會產生任務切換,任務切換涉及不少東西的(保存原來的上下文,按調度算法選擇新的任務,恢復新任務的上下文,還有就是要修改cr3寄存器會致使cache失效)這些都是須要大量時間的,所以用互斥之類來同步一旦涉及到阻塞代價是十分昂貴的。

一個互斥鎖來控制2行代碼的原子操做,這個時候一個CPU正在執行這個代碼,另外一個CPU也要進入, 另外一個CPU就會產生任務切換。爲了短短的兩行代碼 就進行任務切換執行大量的代碼,對系統性能不利,另外一個CPU還不如直接有條件的死循環,等待那個CPU把那兩行代碼執行完。

當鎖被其餘線程佔有時,獲取鎖的線程便會進入自旋,不斷檢測自旋鎖的狀態。一旦自旋鎖被釋放,線程便結束自旋,獲得自旋鎖的線程即可以執行臨界區的代碼。對於臨界區的代碼必須短小,不然其餘線程會一直受到阻塞,這也是要求鎖的持有時間儘可能短的緣由!

10、讀寫鎖

讀寫鎖和互斥量(互斥鎖)很相似,是另外一種線程同步機制,但不屬於POSIX標準,能夠用來同步同一進程中的各個線程。固然若是一個讀寫鎖存放在多個進程共享的某個內存區中,那麼還能夠用來進行進程間的同步,

和互斥量不一樣的是:互斥量會把試圖進入已保護的臨界區的線程都阻塞;然而讀寫鎖會視當前進入臨界區的線程和請求進入臨界區的線程的屬性來判斷是否容許線程進入。

相對互斥量只有加鎖和不加鎖兩種狀態,讀寫鎖有三種狀態:讀模式下的加鎖,寫模式下的加鎖,不加鎖。

讀寫鎖的使用規則:

只要沒有寫模式下的加鎖,任意線程均可以進行讀模式下的加鎖;
只有讀寫鎖處於不加鎖狀態時,才能進行寫模式下的加鎖;
讀寫鎖也稱爲共享-獨佔(shared-exclusive)鎖,當讀寫鎖以讀模式加鎖時,它是以共享模式鎖住,當以寫模式加鎖時,它是以獨佔模式鎖住。讀寫鎖很是適合讀數據的頻率遠大於寫數據的頻率從的應用中。這樣能夠在任什麼時候刻運行多個讀線程併發的執行,給程序帶來了更高的併發度。

11、線程同步

std::mutex mtx_syn;
std::condition_variable cv_syn;
std::condition_variable cv_syn_1;
bool ready = false;
void threadA(int id) {
	while (1)
	{
		std::unique_lock<std::mutex> lck(mtx_syn);
		while (!ready) cv_syn.wait(lck);
		// ...
		std::cout << "thread " << id << '\n';
		Sleep(500);
		cv_syn.notify_all();   //cpu 輪詢執行 全部被喚醒的線程。
		cv_syn.wait(lck);
	}
	
}
void threadB(int id) {
	while (1)
	{
//新建立的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.lock() 對 Mutex 對象進行上鎖,若是此時另外某個 unique_lock 對象已經管理了該 Mutex 對象 m,則當前線程將會被阻塞
		std::unique_lock<std::mutex> lck(mtx_syn);
		while (!ready) cv_syn.wait(lck);
		// ...
		std::cout << "thread " << id << '\n';
		Sleep(500);
		cv_syn.notify_all();
		cv_syn.wait(lck);
	}
}
 
 
void threadC(int id) {
	while (1)
	{
		std::unique_lock<std::mutex> lck(mtx_syn);
		while (!ready) cv_syn_1.wait(lck);
		// ...
		std::cout << "thread " << id << '\n';
		Sleep(500);
		cv_syn_1.notify_all();
		cv_syn_1.wait(lck);
	}
}
 
 
void go()
{
	std::unique_lock<std::mutex> lck(mtx_syn);
	ready = true;
	cv_syn.notify_one();
}
//線程同步
	std::thread threads[5];
	// spawn 10 threads:
	//for (int i = 0; i<5; ++i)
	//	threads[i] = std::thread(print_id, i);
	threads[0] = std::thread(threadA, 0);
	threads[1] = std::thread(threadB, 1);
	threads[2] = std::thread(threadC, 2);  //該線程 與 0, 1 無關,不影響 0,1 線程的同步,由於用的不是一個 condition_variable
	std::cout << "2 threads ready to race...\n";
	go();                       // go!
 
	for (auto& th : threads) th.join();

  

12、thread 使用

#include <iostream>
 
#include <thread>
 
std::thread::id main_thread_id = std::this_thread::get_id();
 
void hello()  
{
    std::cout << "Hello Concurrent World\n";
    if (main_thread_id == std::this_thread::get_id())
        std::cout << "This is the main thread.\n";
    else
        std::cout << "This is not the main thread.\n";
}
 
void pause_thread(int n) {
    std::this_thread::sleep_for(std::chrono::seconds(n));
    std::cout << "pause of " << n << " seconds ended\n";
}
 
int main() {
    std::thread t(hello);
    std::cout << t.hardware_concurrency() << std::endl;//能夠併發執行多少個(不許確)
    std::cout << "native_handle " << t.native_handle() << std::endl;//能夠併發執行多少個(不許確)
    t.join();
    std::thread a(hello);
    a.detach();
    std::thread threads[5];                         // 默認構造線程
 
    std::cout << "Spawning 5 threads...\n";
    for (int i = 0; i < 5; ++i)
        threads[i] = std::thread(pause_thread, i + 1);   // move-assign threads
    std::cout << "Done spawning threads. Now waiting for them to join:\n";
    for (auto &thread : threads)
        thread.join();
    std::cout << "All threads joined!\n";
}

十3、多線程應用實例

#include <iostream>
#include <opencv2/opencv.hpp>
#include "../data300w/util/Util.h"
#include "cunpd.hpp"
#include <ctime>
#include <io.h>
#include <direct.h>  
#include <thread>
#include <condition_variable>
#include <mutex>
#include <Windows.h>
 
 
using namespace std;
using namespace cv;
using namespace glasssix;
 
 
#define ZOOM_ 1.0
#define SIZE 96
 
 
extern void  splitString(const string& s, vector<string>& v, const string& c);
 
 
template <class Type>
Type stringToNum(const string& str)
{
	istringstream iss(str);
	Type num;
	iss >> num;
	return num;
}
void writeHistoFile(std::string filePath, vector<int> & densi_data)
{
	if (filePath == "" || densi_data.size() == 0)
	{
		return;
	}
	ofstream in;
	in.open(filePath, ios::app);   //ios::trunc
	int length = densi_data.size();
	for (int i = 0; i < length; i++)
	{
		string dataline = to_string(densi_data[i]);
		in << dataline << "\n";
	}
	in.close();
}
 
 
float getHorizontal(LandMark &landmark)
{
	float tan_theta = (landmark.points[1].y - landmark.points[0].y) / (landmark.points[1].x - landmark.points[0].x);
	float theta = atan(tan_theta);
	return theta * 180 / 3.1415926;
}
void showHistogram(vector<float> & hor_data)
{
	int densi[60] = { 0 };
	int length = hor_data.size();
	for (int i = 0; i < length; i++)
	{
		if (floor((hor_data[i] + 30)) >= 0 && floor((hor_data[i] + 30)) < 60)
		{
			densi[(int)floor((hor_data[i] + 30))]++;
		}
 
 
		if (floor((hor_data[i] + 30)) < 0)
		{
			densi[0]++;
		}
		else if (floor((hor_data[i] + 30)) >= 60)
		{
			densi[60]++;
		}
	}
	string density_text = "D:\\UMD\\density_text.txt";
	vector<int>density_data(densi, densi + 60);
	writeHistoFile(density_text, density_data);
 
 
	Mat histImg;
	histImg.create(1000, 1600, CV_8UC3);
	histImg.setTo(0);
	int offset = 10;
	for (int i = 0; i < 60; i++)
	{
		double tmpCount = densi[i];
		rectangle(histImg, Point2f(offset + i * 25, 1000), Point2f(offset + i * 25, 1000 - tmpCount / 15.0), Scalar::all(255), -1);  //畫出直方圖  
		putText(histImg, to_string(i - 29), Point2f(offset + i * 25 + 3, 1000 - 3), 0.3, 0.3, Scalar(0, 0, 255));
		Point2f pt0;
		pt0.x = offset + i * 25;
		pt0.y = 1000 - densi[i] / 15.0;
 
 
		Point2f pt1;
		pt1.x = offset + (i + 1) * 25;
		pt1.y = 1000 - densi[i + 1] / 15.0;
		line(histImg, pt0, pt1, Scalar(255, 0, 0), 1); //鏈接直方圖的頂點  
	}
	imshow("hist", histImg);
	waitKey(0);
}
void getDatahor(string file1, vector<float> & hor_data)
{
	int mark_num = 5;
	DataPrepareUtil dpu;
	vector<LandMark> data;
	dpu.readFileData(file1, data, mark_num);
	int length = data.size();
	for (int i = 0; i < length; i++)
	{
		float hor = getHorizontal(data[i]);
		hor_data.emplace_back(hor);
	}
}
 
 
void rotation(float theta, Mat &img, Mat &dst, Size img_size, LandMark &landmark, int mark_num)
{
	//rotation
	Mat mat = img;
	Point2f center(img_size.width / 2, img_size.height / 2);
	double angle = theta;
 
 
	Mat rot = getRotationMatrix2D(center, angle, 1);
	Rect bbox = RotatedRect(center, mat.size(), angle).boundingRect();
 
 
	cv::warpAffine(mat, dst, rot, bbox.size());
 
 
	for (int j = 0; j < mark_num; j++)
	{
		float theta = -3.1415926 / (180 / angle);
		float x1 = landmark.points[j].x - rot.at<double>(1, 2);
		float y1 = landmark.points[j].y - rot.at<double>(0, 2);
		landmark.points[j].x = x1 * cos(theta) - y1 * sin(theta);
		landmark.points[j].y = x1 * sin(theta) + y1 * cos(theta);
 
 
		//circle(dst, Point(x, y), 2, Scalar(255, 0, 0));
	}
	//cv::imshow("dst", dst);
	//cv::waitKey(0);
}
 
 
void augment_data(string img_path, string img_text, string result_path, string result_text)
{
	DataPrepareUtil dpu;
	int mark_num = 5;
	srand((unsigned)time(NULL));
 
 
	vector<LandMark> data;
	dpu.readFileData(img_text, data, mark_num);
 
 
	vector<LandMark> rotation_data;
	vector<float> hor_data;
	getDatahor(img_text, hor_data);
	int length = hor_data.size();
	for (int i = 0; i < length; i++)
	{
		if (hor_data[i] > 0 && hor_data[i] < 3)
		{
			Mat dst;
			Mat img = imread(img_path + data[i].fileName);
			LandMark landmark(data[i]);
			rotation(25, img, dst, Size(96, 96), landmark, mark_num);
			rotation_data.push_back(landmark);
		}
	}
 
 
}
 
 
bool getFaceRect(cunpd &pd, int model_id, Mat &dstImg, LandMark & landmark, Rect & rect)
{
	const int widths = dstImg.cols;
	const int heights = dstImg.rows;
	vector<FaceInfomation>  face = pd.detect(dstImg, model_id, 48);
	int length = face.size();
	if (length == 0)
	{
		cout << "not found face ." << endl;
	}
	for (int j = 0; j < length; j++)
	{
		if (face[j].score > 15)
		{
			rect = face[j].rect;
 
 
			if (landmark.points[0].x > rect.x && landmark.points[0].x < rect.x + rect.width
				&& landmark.points[0].y > rect.y && landmark.points[0].y < rect.y + rect.height
				&&landmark.points[12].x > rect.x && landmark.points[12].x < rect.x + rect.width
				&& landmark.points[12].y > rect.y && landmark.points[12].y < rect.y + rect.height
				&&landmark.points[16].x > rect.x && landmark.points[16].x < rect.x + rect.width
				&& landmark.points[16].y > rect.y && landmark.points[16].y < rect.y + rect.height
				&&landmark.points[20].x > rect.x && landmark.points[20].x < rect.x + rect.width
				&& landmark.points[20].y > rect.y && landmark.points[20].y < rect.y + rect.height
				&& (abs(landmark.points[7].y - landmark.points[17].y) > (rect.height / 6.0)))
			{
				int rect_w = rect.width;
				int rect_h = rect.height;
				rect.width = rect_w * ZOOM_;
				rect.height = rect_h * ZOOM_;
				rect.x = max(rect.x - (ZOOM_ - 1.0) * rect_w / 2.0, 0.0);
				rect.y = max(rect.y - (ZOOM_ - 1.0) * rect_h / 2.0, 0.0);
 
 
				if (rect.x + rect.width > widths)
				{
					rect.width = widths - rect.x;
				}
				if (rect.y + rect.height > heights)
				{
					rect.height = heights - rect.y;
				}
				return true;
			}
		}
	}
	return false;
 
 
}
 
 
void getoffsetRect(Rect & rect, vector<Rect> & all_rect, int cols, int rows, int max_offset)
{
	srand((unsigned)time(NULL));
	Rect rect0(rect), rect1(rect);
	int offsetx = rand() % max_offset + 1;
	int offsety = rand() % max_offset + 1;
 
 
	if (rect.x > offsetx && rect.y > offsety)
	{
		rect0.x = rect.x - offsetx;
		rect0.y = rect.y - offsety;
	}
 
 
	offsetx = rand() % max_offset + 1;
	offsety = rand() % max_offset + 1;
 
 
	if (rect.x + rect.width + offsetx < cols && rect.y + rect.height + offsety < rows)
	{
		rect1.x = rect.x + offsetx;
		rect1.y = rect.y + offsety;
	}
	all_rect.push_back(rect0);
	all_rect.push_back(rect1);
}
 
 
#define NEED_LANDMARK 5
#define CURRENT_LANDMARK 21
const int five_points[5] = { 7, 10, 14, 17, 19 };
string search_base = "H:\\UMD\\";
string search_dir_[] = { search_base + "umdfaces_batch1", search_base + "umdfaces_batch2", search_base + "umdfaces_batch3" };
 
 
string text_file[] = { search_base + "umdfaces_batch1\\umdfaces_batch1_ultraface.csv", search_base + "umdfaces_batch2\\umdfaces_batch2_ultraface.csv", search_base + "umdfaces_batch3\\umdfaces_batch3_ultraface.csv" };
string text_pre[] = { "batch1_aug_", "batch2_aug_", "batch3_aug_" };
string tail_[] = { ".jpg", ".jpg" , ".jpg" };
 
 
string base = search_base + "landmark_5\\augment_img\\";
string result_img = base + "result_img_" + to_string(SIZE) + "\\";
string result_txt = base + "landmark_" + to_string(SIZE) + "_5.txt";
const int theta_offset = 5;
const int theta_max = 20;
vector<LandMark> rotation_point;
int countNum = 0;
bool ready = false;
std::mutex mtx_syn;
std::condition_variable cv_syn;
void roll_yaw_pitch_data(LandMark result_mark, int temp, cunpd &pd, int model_id, DataPrepareUtil &dpu)
{
 
 
	float roll = result_mark.direct[2];
 
 
	string img_path = search_dir_[temp] + "\\" + result_mark.fileName;
	if (_access(img_path.c_str(), 0) == -1)
	{
		cout << "coun't found filename" << img_path << endl;
		return;
	}
	Mat img = imread(img_path);
	Mat dstImg; //dstImg.create(heights, widths, CV_8UC1);
	cvtColor(img, dstImg, CV_BGR2GRAY);
	//yaw 加強 pitch 加強
	for (int j = 0; j < 2; j++)
	{
		if (result_mark.direct[j] > -theta_offset && result_mark.direct[j] < theta_offset)
		{
			Rect rect;
			LandMark landmark(result_mark);
			bool success = getFaceRect(pd, model_id, dstImg, landmark, rect);
			if (success)
			{
				vector<Rect> all_rect;
				getoffsetRect(rect, all_rect, img.cols, img.rows, 4);
				for (int i = 0; i < 2; i++)
				{
					LandMark dst_landmark;
					//vector<string> filenames;
					//splitString(landmark.fileName, filenames, "/");
					//string filename = filenames[filenames.size()-1];
					
					std::unique_lock<std::mutex> lck(mtx_syn);
					dst_landmark.fileName = text_pre[temp] + to_string(countNum++) + ".png";
					lck.unlock();
					//cout << img.rows << " " << rotat_img.cols << " " << rect.x << " " << rect.y << " " << rect.width << " " << rect.height << endl;
 
 
					Mat roi_face = img(all_rect[i]);
					cv::resize(roi_face, roi_face, Size(SIZE, SIZE));
					//座標轉換
					for (int k = 0; k < 5; k++)
					{
						dst_landmark.visible[k] = landmark.visible[five_points[k]];
						dst_landmark.points[k].x = ((float)SIZE / all_rect[i].width) * (landmark.points[five_points[k]].x - all_rect[i].x);
						dst_landmark.points[k].y = ((float)SIZE / all_rect[i].height) * (landmark.points[five_points[k]].y - all_rect[i].y);
					}
					imwrite(result_img + dst_landmark.fileName, roi_face);
 
 
					std::unique_lock<std::mutex> lck1(mtx_syn);
					rotation_point.push_back(dst_landmark);
					lck1.unlock();
				}
			}
 
 
		}
	}
	// roll 加強
	if (roll > -theta_offset && roll < theta_offset)
	{
		for (int i = -1; i < 2; i = i + 2)
		{
			Mat rotat_img;
			LandMark landmark(result_mark);
			int theta = (rand() % theta_max + theta_offset) * i;
			rotation(theta, img, rotat_img, Size(SIZE, SIZE), landmark, CURRENT_LANDMARK);
 
 
			Mat dstImg; //dstImg.create(heights, widths, CV_8UC1);
			cvtColor(rotat_img, dstImg, CV_BGR2GRAY);
 
 
			//for (int j = 0; j < CURRENT_LANDMARK; j++)
			//{
			//	circle(rotat_img, Point(landmark.points[j]), 2, Scalar(255, 0, 0));
			//}
			//imshow("img", rotat_img);
			//waitKey(0);
 
 
			LandMark dst_landmark;
 
 
			//vector<string> filenames;
			//splitString(landmark.fileName, filenames, "/");
			//string filename = filenames[filenames.size()-1];
			std::unique_lock<std::mutex> lck(mtx_syn);
			dst_landmark.fileName = text_pre[temp] + to_string(countNum++) + ".png";
			lck.unlock();
			Rect rect;
			bool success = getFaceRect(pd, model_id, dstImg, landmark, rect);
			if (success)
			{
				//cout << rotat_img.rows << " " << rotat_img.cols << " " << rect.x << " " << rect.y << " " << rect.width << " " << rect.height << endl;
				Mat roi_face = rotat_img(rect);
				cv::resize(roi_face, roi_face, Size(SIZE, SIZE));
				//座標轉換
				for (int k = 0; k < 5; k++)
				{
					dst_landmark.visible[k] = landmark.visible[five_points[k]];
					dst_landmark.points[k].x = ((float)SIZE / rect.width) * (landmark.points[five_points[k]].x - rect.x);
					dst_landmark.points[k].y = ((float)SIZE / rect.height) * (landmark.points[five_points[k]].y - rect.y);
				}
				imwrite(result_img + dst_landmark.fileName, roi_face);
 
 
				std::unique_lock<std::mutex> lck(mtx_syn);
				rotation_point.push_back(dst_landmark);
 
 
				if (rotation_point.size() > 500)
				{
					dpu.writePointVisibletoFile(result_txt, rotation_point, NEED_LANDMARK);
					rotation_point.clear();
				}
				if (countNum % 500 == 0)
				{
					cout << "prepare data:" << countNum << endl;
				}
				lck.unlock();
			}
		}
	}
 
 
 
 
 
 
}
 
 
vector<LandMark> result_point;   //注意 使用多線程 時共同處理的 變量用 全局變量。
void deal_thread(int temp, int model_id, DataPrepareUtil &dpu, cunpd &pd)
{
	while (true)
	{
		std::unique_lock<std::mutex> lck(mtx_syn);
		while (!ready) {
			cv_syn.wait(lck);
		}
		//
		auto itor = result_point.begin();
		auto itor2 = result_point.end();
		if (itor == itor2)
		{
			break;
		}
		LandMark landmark(result_point[0]);
		result_point.erase(itor);
//		cout << "landmark.fileName is:"<<landmark.fileName<< "thread id"<< this_thread::get_id()<< endl;
		lck.unlock();
 
 
		roll_yaw_pitch_data(landmark, temp, pd, model_id, dpu);
		
	}
 
 
}
 
 
void go()
{
	std::unique_lock<std::mutex> lck(mtx_syn);
	ready = true;
	cv_syn.notify_all();
}
 
 
 
int main()
{
 
	cunpd pd;
	int model_id = pd.AddNpdModel(0);
 
	/*string img_path = "D:\\UMD\\result_img_96\\";
	string result_path = "D:\\UMD\\arguement_data\\";
	string img_text = img_path + "shutter_96_5_train.txt";
	string result_text = result_path + "augment_96_5_train.txt";
	augment_data(img_path, img_text, result_path, result_text);*/
 
	string base_dir = base;
	if (_access(base_dir.c_str(), 0) == -1)
	{
		_mkdir(base_dir.c_str());
	}
	string dir = result_img;
	if (_access(dir.c_str(), 0) == -1)
	{
		_mkdir(dir.c_str());
	}
 
 
	srand((unsigned)time(NULL));
	DataPrepareUtil dpUtil;
 
	dpUtil.clearFileData(result_txt);
 
	long count = 0;
 
	vector<LandMark> rotation_point;
	for (int temp = 0; temp < 3; temp++)
	{
		long countNum = 0;
		//vector<LandMark> result_point;
 
 
		dpUtil.readFileData(text_file[temp], result_point, CURRENT_LANDMARK);
 
 
		std::thread threads[4];
		for (int i = 0; i < 4; i++)
		{
			threads[i] = std::thread(deal_thread, temp, model_id, dpUtil, pd);
			//threads[i] = std::thread(threadA, i, result_point, temp, model_id, dpUtil, pd);
		}
		cout << "temp start:" << temp << endl;
		go();
		for (auto &th : threads) {
			th.join();
		}
		cout << "temp end:" << temp << endl;
		
		}
		if (rotation_point.size() > 0)
		{
			dpUtil.writePointVisibletoFile(result_txt, rotation_point, NEED_LANDMARK);
			rotation_point.clear();
		}
		system("PAUSE");
		return 0;
}

  

 十4、Future使用

void test_thread() {
    //1.
    std::thread t(foo, "hello");
    t.join();
 
    //2.
    std::packaged_task<int(int)> task([](int a) {std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "my task" << std::endl; return a; });
    std::future<int> result = task.get_future();
    std::thread(std::move(task), 2).detach();
    std::cout << "Waiting...." << std::endl;
    //result.wait();
 
    //result.get會阻塞,直到對應線程完成
    std::cout << "Done  result is:" << result.get() << std::endl;
 
    //3.
    std::packaged_task<string(string)> task1(foo);
    std::future<string> result1 = task1.get_future();
    string str = "liu";
    std::thread(std::move(task1), str).detach();
    //result1.get會阻塞,直到對應線程完成
    std::cout << "task1:" << result1.get() << std::endl;
}

  

 

 

整理於:

https://blog.csdn.net/u011808673/article/details/80811998

https://blog.csdn.net/zhougb3/article/details/79538750

相關文章
相關標籤/搜索