第26課 std::async異步任務

一. std::async函數模板ios

(一)std::async和std::thread的區別編程

  1. 二者最明顯的區別在於async採用默認啓動策略時並不必定建立新的線程。若是系統資源緊張,那麼std::thread建立線程可能失敗,系統報告異常,整個程序可能崩潰。而std::async通常則不會,它在沒法建立新線程時,會將任務分配給後續調用future.get()函數的線程,並以同步的方式執行(即不建立新線程)。promise

  2. std::async表現爲更高階的抽象,它把用戶從線程管理的細節解放出來,將這些責任轉交給C++標準庫的實現者。而std::thread要求自行處理線程耗盡、超訂、負載均衡以及新平臺適配問題併發

  3. std::thread未提供直接獲取線程函數返回值的方法。但std::async能夠經過future對象來獲取負載均衡

(二)std::async函數模板及分析 dom

  1. 「共享狀態」對象,用於保存線程函數(通常是可調用對象)及其參數、返回值以及新線程狀態等信息。該對象保存在堆中,由std::async、std::promise或std::package_task提供,並交由future或shared_future管理其生命期。被調方(一般指調用promise.set_value()的一方)將計算所得的結果寫入「共享狀態」,而調用方經過std::future的get()讀取該結果。異步

 

  2. 調用std::async是會建立一個「_Deferred_async_state」或_「Task_async_state」類的「共享狀態」對象,該對象是_Packaged_state的子類。注意,直接建立std::promise時,生成的是「_associated_state」類的共享狀態對象,而std::package_task建立的是「_Packaged_state」類的共享狀態對象async

  3. _Get_associated_state是個工廠函數,經過不一樣的策略建立不一樣的「共享狀態」對象,並將其交由future管理,負責其生命週期。future相似於std::unique_ptr,對「共享狀態」對象「獨佔」全部權。函數

  4. 與std::thread同樣,傳入std::async中的可調用對象及其參數會被按值以副本造成保存成一個tuple對象,而後再以右值的方式傳入線程函數中對應的參數。this

【編程實驗】建立異步任務

#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <vector>
#include <numeric> //for std::accumulate

using namespace std;

std::mutex mtx;

class Widget
{
public:
    void foo(int x, const std::string& s)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = "<<std::this_thread::get_id()<<
            " void Foo::foo(int, const std::string&): x = " <<  x << ", s = " << s<< endl;
    }

    void bar(const std::string& s)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()  
             <<" void Widget::bar(const std::string&): s = " << s << endl;
    }

    void operator()(double val)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()
             << " void Widget::operator(): val = " << val << endl;
    }
};

class NonCopyable //只移動對象
{
public:
    NonCopyable() {};

    NonCopyable(const NonCopyable&) = delete;
    NonCopyable& operator=(const NonCopyable&) = delete;

    NonCopyable(NonCopyable&&) = default;
    NonCopyable& operator=(NonCopyable&&) = default;

    double operator()(double d)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()
             << " void NonCopyable::operator(): d = " << d << endl;
        return d;
    }
};

//並行計算
template<typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
    auto len = end - beg;
    if (len < 1000)
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "thread id = " << std::this_thread::get_id()
            << " invoke parallel_sum()" << endl;
        return std::accumulate(beg, end, 0); //遍歷[beg,end)區別的每一個元素並累加。初始值爲0
    }
        

    RandomIt mid = beg + len / 2;
    auto handle = std::async(std::launch::async,  //子線程將[mid,end)元素進行累加
                            parallel_sum<RandomIt>, mid, end);

    int sum = parallel_sum(beg, mid);//本線程將[begin,mid)區間元素進行累加

    return sum + handle.get(); //返回兩個區間結果的累加和
}

int main()
{
    Widget w;

    cout << "main thread id = " << std::this_thread::get_id() << endl;
    //1. 參數傳遞
    auto fut1 = std::async(&Widget::foo, &w, 42, "hello"); //傳入this指針:&w
    auto fut2 = std::async(&Widget::bar, w, "goodbye"); //傳入x的副本如tmp。 tmp.bar(...)

    auto fut3 = std::async(Widget(), 3.14159); //傳入Widget臨時對象,調用operator()
    auto fut4 = std::async(std::ref(w), 2.718);  //傳入w的引用,調用operator();

    NonCopyable mo;    //只移動對象
    auto fut5 = std::async(std::move(mo),3.14159); //mo是隻移動對象,必須被轉爲右值

    //2. 同步、異步
    auto fut6 = std::async(std::launch::async, Widget(), 1.2); //在新線程上運行,operator()
    auto fut7 = std::async(std::launch::deferred, &Widget::bar, &w, "deferred"); //線程延遲到調用get或wait才執行

    auto fut8 = std::async(std::launch::async | std::launch::deferred, //等價於默認啓動策略
                         &Widget::bar, &w, "async | deferred");

    fut7.get(); //主線程阻塞,等待fut7子線程。(子線程延遲到這時才執行)。

    //3. 並行計算
    std::vector<int> vec(10000, 1); //10000個1
    int res = parallel_sum(vec.begin(), vec.end());
    
    {
        std::lock_guard<std::mutex> lk(mtx);
        cout << "The sum is: " << res << endl;

        cout << "main thread  end." << endl;
    }

    return 0;
}
/*輸出結果
main thread id = 16756
thread id = 1928 void Foo::foo(int, const std::string&): x = 42, s = hello
thread id = 16756 void Widget::bar(const std::string&): s = deferred  //注意,由主線程執行
thread id = 13216 void Widget::bar(const std::string&): s = goodbye
thread id = 7940 void Widget::operator(): val = 3.14159
thread id = 16080 void Widget::operator(): val = 2.718
thread id = 11492 void NonCopyable::operator(): d = 3.14159
thread id = 1928 void Widget::operator(): val = 1.2
thread id = 13216 void Widget::bar(const std::string&): s = async | deferred
thread id = 16756 invoke parallel_sum()
thread id = 7940 invoke parallel_sum()
thread id = 16080 invoke parallel_sum()
thread id = 11492 invoke parallel_sum()
thread id = 1928 invoke parallel_sum()
thread id = 13216 invoke parallel_sum()
thread id = 1928 invoke parallel_sum()
thread id = 7636 invoke parallel_sum()
thread id = 5816 invoke parallel_sum()
thread id = 15856 invoke parallel_sum()
thread id = 15832 invoke parallel_sum()
thread id = 7636 invoke parallel_sum()
thread id = 15400 invoke parallel_sum()
thread id = 16968 invoke parallel_sum()
thread id = 15856 invoke parallel_sum()
thread id = 15476 invoke parallel_sum()
The sum is: 10000
main thread  end.
*/

二. std::async的啓動策略

(一)std::async的啓動策略

  1. 三種啓動策略(std::async經過指定不一樣的啓動策略來決定建立是「共享狀態」對象)

  (1)異步方式(std::launch::async):會建立一個「_Task_async_state」類的共享狀態對象。使用該策略時異味着線程函數必須以異步的方式運行,即在另外一個線程之上執行

  (2)同步方式(std::launch::deferred):會建立一個「_Deferred_async_state」類的共享狀態對象。使用該策略意味着線程函數延遲到調用future的get/wait時才得以運行,並且二者是在同一線程上以同步的方式運行。即調用future的一方會阻塞至線程函數運行結束爲止。若是get/wait沒有獲得調用,則線程函數不會被執行。

  (3)默認啓動策略std::launch::async|std::launch::deferred)即二者或運算的結果,這意味着任務可能以異步或同步的方式被運行。也就是說是否建立新線程來運行任務,取決於系統資源是否緊張,由標準庫的線程管理組件承擔線程建立和銷燬、避免超訂以及負載均衡的責任。

(二)默認啓動策略

  1. 帶來的問題

  (1)用戶沒法預知是異步仍是同步運行,由於線程函數可能被調度爲延遲執行。

  (2)沒法預知線程函數是否與調用future的get/wait函數線程是否在同一線程運行。若是此時線程函數會讀取線程局部存儲(thread_local storage, TLS),那麼也就沒法預知會取到哪一個線程的局部存儲

  (3)有時甚至連線程函數是否會運行,這件起碼的事情都是沒法預知的這是所以沒法保證在程序的每條路徑上future的get或wait都會得以調用。

  2. 注意事項:

  (1)默認啓動策略能正常工做須要知足如下全部條件

    ①任務不須要與調用get/wait的線程併發執行。

    ②讀/寫哪一個線程的thread_local變量可有可無。

    ③能夠保證在std::async返回的future上調用get/wait,或者能夠接受任務可能永不執行。

    ④用戶已清楚使用wait_for或wait_unitil的代碼任務可能被推遲執行,這種可能性己被歸入考量。

  (2)只要其中一個條件不知足,就必須手動指定啓動策略以保證任務以異步或同步的方式運行。

【編程實驗】默認啓動策略問題的解決

#include <iostream>
#include <future>

using namespace std;
using namespace literals; //for duration suffixes(時長後綴,如1s)

void func()
{
    std::this_thread::sleep_for(1s);
}

//reallyAsync函數模板:用於保證任務被異步執行
template<typename Func, typename ...Args>
inline auto reallyAsync(Func&& f, Args... args)
{
    return std::async(std::launch::async,
                      std::forward<Func>(func),
                      std::forward<Args>(args)...);
}

int main()
{
    //wait_for函數必須可慮任務是同步或異步運行

    auto fut1 = std::async(func); //默認啓動策略,沒法預估任務是被同步仍是異步運行

    //解決方案1:wait_for(0s)
    if (fut1.wait_for(0s) == std::future_status::deferred){ //同步運行,wait_for(0s)
        fut1.get(); //等待結果
    }else { //異步運行
        while (fut1.wait_for(100ms) != std::future_status::ready) { //輪詢子線程是否結束
            //...   //併發作其餘任務
        }

        //... //fut is ready
    }

    //解決方案2:確實以異步運行任務
    auto fut2 = reallyAsync(func);
    while (fut2.wait_for(100ms) != std::future_status::ready) //異步方式,確保wait_for返回ready的結果
    {                                                         //從而消除future_status::deferred的可能

    }

    return 0;
}
相關文章
相關標籤/搜索