第27課 「共享狀態」及其管理者(std::future/std::shared_future)

一. 「共享狀態」ios

(一)「共享狀態」對象編程

 

  1. 用於保存線程函數及其參數、返回值以及新線程狀態等信息。該對象一般建立在堆上,由std::async、std::promise和std::package_task等提供(Provider),並交由future/shared_future管理。promise

  2. Provider將計算結果寫入「共享狀態」對象,而future/shared_future經過get()函數來讀取該結果。「共享狀態」做爲異步結果的傳輸通道future能夠從中方便地獲取線程函數的返回值多線程

  3. 「共享狀態」內部保存着一個引用計數,當引用計數爲0時會經過delete this來銷燬自身異步

// CLASS TEMPLATE _Associated_state
template <class _Ty>
class _Associated_state { // class for managing associated synchronous state
public:
    using _State_type = _Ty;
    using _Mydel      = _Deleter_base<_Ty>;

    _Associated_state(_Mydel* _Dp = nullptr)
        : _Refs(1), // non-atomic initialization
          _Exception(), _Retrieved(false), _Ready(false), _Ready_at_thread_exit(false), _Has_stored_result(false),
          _Running(false), _Deleter(_Dp) { // construct
        // TODO: _Associated_state ctor assumes _Ty is default constructible
    }

    virtual ~_Associated_state() noexcept { // 析構函數:注意並不會阻塞
        if (_Has_stored_result && !_Ready) { // registered for release at thread exit
            _Cond._Unregister(_Mtx);
        }
    }

    void _Retain() { // 增長引用計數
        _MT_INCR(_Refs);
    }

    void _Release() { // 減小引用計數,等於0時delete this
        if (_MT_DECR(_Refs) == 0) {
            _Delete_this();
        }
    }

private:
    _Atomic_counter_t _Refs;

public:
    virtual void _Wait() { // wait for signal
        unique_lock<mutex> _Lock(_Mtx);
        _Maybe_run_deferred_function(_Lock);
        while (!_Ready) {
            _Cond.wait(_Lock);
        }
    }

    struct _Test_ready { // _Associated_state包裝類
        _Test_ready(const _Associated_state* _St) : _State(_St) { // construct from associated state
        }

        bool operator()() const { // 判斷「共享狀態」是否就緒
            return _State->_Ready != 0;
        }
        const _Associated_state* _State;
    };

    template <class _Rep, class _Per>
    future_status _Wait_for(const chrono::duration<_Rep, _Per>& _Rel_time) { // wait for duration
        unique_lock<mutex> _Lock(_Mtx);
        if (_Has_deferred_function()) {
            return future_status::deferred; //若是是延遲任務,調用waitfor將返回deferred,而不是future_status::ready!
        }

        if (_Cond.wait_for(_Lock, _Rel_time, _Test_ready(this))) {
            return future_status::ready; //返回future_status::ready
        }

        return future_status::timeout; //返回超時
    }

    template <class _Clock, class _Dur>
    future_status _Wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time) { // wait until time point
        unique_lock<mutex> _Lock(_Mtx);
        if (_Has_deferred_function()) {
            return future_status::deferred;
        }

        if (_Cond.wait_until(_Lock, _Abs_time, _Test_ready(this))) {
            return future_status::ready;
        }

        return future_status::timeout;
    }

    virtual _Ty& _Get_value(bool _Get_only_once) { // return the stored result or throw stored exception
        unique_lock<mutex> _Lock(_Mtx);
        if (_Get_only_once && _Retrieved) { //_Get_only_once:true表示_Get_value只能調用一次,false表示可重複調用
            _Throw_future_error(make_error_code(future_errc::future_already_retrieved));
        }

        if (_Exception) {
            _Rethrow_future_exception(_Exception);
        }

        _Retrieved = true; //標記_Get_value()函數己被調用過
        _Maybe_run_deferred_function(_Lock);
        while (!_Ready) { //若是任務結束,則再也不等待。
            _Cond.wait(_Lock);
        }

        if (_Exception) {
            _Rethrow_future_exception(_Exception);
        }

        return _Result;
    }

    void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result
        unique_lock<mutex> _Lock(_Mtx);
        _Set_value_raw(_Val, &_Lock, _At_thread_exit);
    }

    void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock,
        bool _At_thread_exit) { // store a result while inside a locked block
        if (_Has_stored_result) {
            _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
        }

        _Result = _Val;
        _Do_notify(_Lock, _At_thread_exit);
    }

    void _Set_value(_Ty&& _Val, bool _At_thread_exit) { // store a result
        unique_lock<mutex> _Lock(_Mtx);
        _Set_value_raw(_STD forward<_Ty>(_Val), &_Lock, _At_thread_exit);
    }

    void _Set_value_raw(_Ty&& _Val, unique_lock<mutex>* _Lock,
        bool _At_thread_exit) { // store a result while inside a locked block
        if (_Has_stored_result) {
            _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
        }

        _Result = _STD forward<_Ty>(_Val);
        _Do_notify(_Lock, _At_thread_exit);
    }

    void _Set_value(bool _At_thread_exit) { // store a (void) result
        unique_lock<mutex> _Lock(_Mtx);
        _Set_value_raw(&_Lock, _At_thread_exit);
    }

    void _Set_value_raw(
        unique_lock<mutex>* _Lock, bool _At_thread_exit) { // store a (void) result while inside a locked block
        if (_Has_stored_result) {
            _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
        }

        _Do_notify(_Lock, _At_thread_exit);
    }

    void _Set_exception(exception_ptr _Exc, bool _At_thread_exit) { // store a result
        unique_lock<mutex> _Lock(_Mtx);
        _Set_exception_raw(_Exc, &_Lock, _At_thread_exit);
    }

    void _Set_exception_raw(exception_ptr _Exc, unique_lock<mutex>* _Lock,
        bool _At_thread_exit) { // store a result while inside a locked block
        if (_Has_stored_result) {
            _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
        }

        _Exception = _Exc;
        _Do_notify(_Lock, _At_thread_exit);
    }

    bool _Is_ready() const { // return ready status
        return _Ready != 0;
    }

    bool _Is_ready_at_thread_exit() const { // return ready at thread exit status
        return _Ready_at_thread_exit;
    }

    bool _Already_has_stored_result() const { // return presence of stored result
        return _Has_stored_result;
    }

    bool _Already_retrieved() const { // return retrieved status
        return _Retrieved;
    }

    void _Abandon() { // abandon shared state
        unique_lock<mutex> _Lock(_Mtx);
        if (!_Has_stored_result) { // queue exception
            future_error _Fut(make_error_code(future_errc::broken_promise));
            _Set_exception_raw(_STD make_exception_ptr(_Fut), &_Lock, false);
        }
    }

protected:
    void _Make_ready_at_thread_exit() { // set ready status at thread exit
        if (_Ready_at_thread_exit) {
            _Ready = true;
        }
    }

    void _Maybe_run_deferred_function(unique_lock<mutex>& _Lock) {
        if (!_Running) { //延遲任務默認值爲false,只能調用該函數後,纔會變爲true
            _Running = true;
            _Run_deferred_function(_Lock); //執行延遲任務
        }
    }

public:
    _Ty _Result;
    exception_ptr _Exception;
    mutex _Mtx;
    condition_variable _Cond;
    bool _Retrieved; //用於標記_Get_value函數是否己被調用過,true表示己調用過,false爲未調用過
    int _Ready; //是否處於就緒狀態,用於喚醒等待線程。(有些任務作完線程就被置爲就緒狀態,而有些任務要等線程退出時才置就緒)
    bool _Ready_at_thread_exit;//是否在線程退出時才設爲就緒狀態
    bool _Has_stored_result; //調用_Do_notify時表示結果己計算出來,該值被置爲true。
    bool _Running; //用於標識線程是否正在運行(異步任務默認值爲true,延遲任務默認值爲false)
private:
    virtual bool _Has_deferred_function() const noexcept { // 被_Deferred_async_state子類override
        return false;
    }

    virtual void _Run_deferred_function(unique_lock<mutex>&) { // do nothing
    }

    virtual void _Do_notify(unique_lock<mutex>* _Lock, bool _At_thread_exit) { // 通知等待線程。
        _Has_stored_result = true; 
        if (_At_thread_exit) { //線程退出時,才喚醒等待線程
            _Cond._Register(*_Lock, &_Ready);
        } else { // 當即喚醒等待線程
            _Ready = true;
            _Cond.notify_all();
        }
    }

    void _Delete_this() { // delete this object
        if (_Deleter) {
            _Deleter->_Delete(this);
        } else {
            delete this; //刪除自身
        }
    }

    _Mydel* _Deleter;

public:
    _Associated_state(const _Associated_state&) = delete;  //不可複製
    _Associated_state& operator=(const _Associated_state&) = delete; //不可複製賦值
};

// CLASS TEMPLATE _Packaged_state
template <class>
class _Packaged_state;

template <class _Ret,
    class... _ArgTypes>
class _Packaged_state<_Ret(_ArgTypes...)>
    : public _Associated_state<_Ret> { //爲packaged_task準備的」共享狀態「
public:
    using _Mybase = _Associated_state<_Ret>;
    using _Mydel  = typename _Mybase::_Mydel;

    template <class _Fty2>
    _Packaged_state(const _Fty2& _Fnarg) : _Fn(_Fnarg) { // construct from function object
    }

#if _HAS_FUNCTION_ALLOCATOR_SUPPORT
    template <class _Fty2, class _Alloc>
    _Packaged_state(const _Fty2& _Fnarg, const _Alloc& _Al, _Mydel* _Dp)
        : _Mybase(_Dp), _Fn(allocator_arg, _Al, _Fnarg) { // construct from function object and allocator
    }
#endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT

    template <class _Fty2>
    _Packaged_state(_Fty2&& _Fnarg) : _Fn(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object
    }

#if _HAS_FUNCTION_ALLOCATOR_SUPPORT
    template <class _Fty2, class _Alloc>
    _Packaged_state(_Fty2&& _Fnarg, const _Alloc& _Al, _Mydel* _Dp)
        : _Mybase(_Dp), _Fn(allocator_arg, _Al,
                            _STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object and allocator
    }
#endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT

    //提供給Provider使用的,provider會經過set_value_at_thread_exit調用該函數來實現線程退出時喚醒等待線程。
    void _Call_deferred(_ArgTypes... _Args) { //這類延遲函數,在線程退出時將任務設爲就緒狀態,纔會喚醒其餘線程。。
        _TRY_BEGIN
        // call function object and catch exceptions
        this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), true); //執行_Fn函數,並將返回值提供給_Set_value函數。t
        _CATCH_ALL                                                      //true表示線程退出時才喚醒等待線程
        // function object threw exception; record result
        this->_Set_exception(_STD current_exception(), true);
        _CATCH_END
    }
    
    //當即調用線程函數,執行完當即喚醒等待線程。好比std::async不論是同步或異步,都是在執行完當即喚醒等待線程。
    void _Call_immediate(_ArgTypes... _Args) {
        _TRY_BEGIN
        // call function object and catch exceptions
        this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), false);//當即調用函數對象,false表示任務作完當即喚醒等待線程
        _CATCH_ALL
        // function object threw exception; record result
        this->_Set_exception(_STD current_exception(), false);
        _CATCH_END
    }

    const function<_Ret(_ArgTypes...)>& _Get_fn() { // return stored function object
        return _Fn;
    }

private:
    function<_Ret(_ArgTypes...)> _Fn;
};

// CLASS TEMPLATE _Deferred_async_state
template <class _Rx>
class _Deferred_async_state : public _Packaged_state<_Rx()> { //std::async建立的同步」共享狀態「
public:
    template <class _Fty2>
    _Deferred_async_state(const _Fty2& _Fnarg) : _Packaged_state<_Rx()>(_Fnarg) { // construct from function object
    }

    template <class _Fty2>
    _Deferred_async_state(_Fty2&& _Fnarg)
        : _Packaged_state<_Rx()>(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object
    }

private:
    virtual bool _Has_deferred_function() const
        noexcept { // this function is considered to be deferred until it's invoked
        return !this->_Running; //若是任務己被執行過,就不在是視爲延遲任務
    }

    virtual void _Run_deferred_function(unique_lock<mutex>& _Lock) { // run the deferred function
        _Lock.unlock();
        _Packaged_state<_Rx()>::_Call_immediate(); //注意,這裏不是調用_Call::deferred()!!!
        _Lock.lock();
    }
};

// CLASS TEMPLATE _Task_async_state
template <class _Rx>
class _Task_async_state : public _Packaged_state<_Rx()> { //std::async建立的異步」共享狀態「 
public:
    using _Mybase     = _Packaged_state<_Rx()>;
    using _State_type = typename _Mybase::_State_type;

    template <class _Fty2>
    _Task_async_state(_Fty2&& _Fnarg) : _Mybase(_STD forward<_Fty2>(_Fnarg)) { // construct from rvalue function object
        _Task = ::Concurrency::create_task([this]() { // do it now
            this->_Call_immediate();
        });

        this->_Running = true; //異步任務,線程一啓動就處於running狀態。
    }

    virtual ~_Task_async_state() noexcept { // destroy
        _Wait();   //異步「共享狀態」對象析構時,會被阻塞!!!
    }

    virtual void _Wait() { // wait for completion
        _Task.wait(); //重寫_Wait()。注意,這裏調用的是線程級別的wait,至關於對底層線程實施一次隱式join()。
    }

    virtual _State_type& _Get_value(bool _Get_only_once) { // return the stored result or throw stored exception
        _Task.wait(); //異步「共享狀態」對象,在調用Get_value時,也會被阻塞!
        return _Mybase::_Get_value(_Get_only_once);
    }

private:
    ::Concurrency::task<void> _Task;
};
「共享狀態」相關類的源碼摘要

(二)注意事項async

  1.std::promise建立_Assoicated_state類型的共享對象。這是一種最簡單的共享狀態對象,只能用於保存線程函數的返回值等信息。ide

  2. _Package_state類型「共享狀態」,除了能夠保存返回值外,還用於將可調用對象包裝成一個function對象通常由std::package_task建立函數

  3._Deffered_async_state或_Task_async_state類型,前者用於跟蹤std::launch::deferred類型的異步任務,然後者用於跟蹤std::launch::async類型的任務。std::async就是根據啓動策略來建立這兩種共享狀態之一。this

  4. _Task_async_state類型的「共享狀態」對象,在析構時會調用wait()函數來等待任務執行完畢。從效果來看,至關於實施一次隱式join(),正如std::thread同樣,C++標準委員會對這種行爲也曾經存在爭議。而其餘全部 「共享狀態」的類型都無此操做,這至關於實施一次隱式的detach()操做。atom

二.期值(「共享狀態」的管理器)

(一)future/shared_future源碼摘要

// CLASS TEMPLATE _State_manager
template <class _Ty>
class _State_manager { //「共享狀態」管理器
public:
    _State_manager() : _Assoc_state(nullptr) { // construct with no associated asynchronous state object
        _Get_only_once = false;  //注意,默認get()函數是可屢次調用的
    }

    _State_manager(_Associated_state<_Ty>* _New_state, bool _Get_once)
        : _Assoc_state(_New_state) { // construct with _New_state
        _Get_only_once = _Get_once;
    }

    _State_manager(const _State_manager& _Other, bool _Get_once = false)
        : _Assoc_state(nullptr) { // construct from _Other
        _Copy_from(_Other);
        _Get_only_once = _Get_once;
    }

    _State_manager(_State_manager&& _Other, bool _Get_once = false)
        : _Assoc_state(nullptr) { // construct from rvalue _Other
        _Move_from(_Other);
        _Get_only_once = _Get_once;
    }

    ~_State_manager() noexcept { // destroy
        if (_Assoc_state != nullptr) {
            _Assoc_state->_Release(); //_State_manager對象析構時,會同時將管理的「共享狀態」的引用計數自減1
        }
    }

    _State_manager& operator=(const _State_manager& _Other) { // assign from _Other
        _Copy_from(_Other);
        return *this;
    }

    _State_manager& operator=(_State_manager&& _Other) { // assign from rvalue _Other
        _Move_from(_Other);
        return *this;
    }

     //檢查當前的 std::future 對象是否有效,即釋放與某個共享狀態相關聯
    _NODISCARD bool valid() const noexcept {
        return _Assoc_state != nullptr && !(_Get_only_once && _Assoc_state->_Already_retrieved());
    }

    //等待與當前std::future 對象相關聯的共享狀態的標誌變爲 ready.
    void wait() const { // wait for signal
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        _Assoc_state->_Wait(); //等待條件變量
    }

    template <class _Rep, class _Per>
    future_status wait_for(const chrono::duration<_Rep, _Per>& _Rel_time) const { // wait for duration
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        return _Assoc_state->_Wait_for(_Rel_time);
    }

    template <class _Clock, class _Dur>
    future_status wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time) const { // wait until time point
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        return _Assoc_state->_Wait_until(_Abs_time);
    }

    _Ty& _Get_value() const { // return the stored result or throw stored exception
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        return _Assoc_state->_Get_value(_Get_only_once);
    }

    void _Set_value(const _Ty& _Val, bool _Defer) { // store a result
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        _Assoc_state->_Set_value(_Val, _Defer);
    }

    void _Set_value(_Ty&& _Val, bool _Defer) { // store a result
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        _Assoc_state->_Set_value(_STD forward<_Ty>(_Val), _Defer);
    }

    void _Abandon() { // abandon shared state
        if (_Assoc_state) {
            _Assoc_state->_Abandon();
        }
    }

    void _Set_exception(exception_ptr _Exc, bool _Defer) { // store a result
        if (!valid()) {
            _Throw_future_error(make_error_code(future_errc::no_state));
        }

        _Assoc_state->_Set_exception(_Exc, _Defer);
    }

    void _Swap(_State_manager& _Other) { // exchange with _Other
        _STD swap(_Assoc_state, _Other._Assoc_state);
    }

    _Associated_state<_Ty>* _Ptr() const { // return pointer to stored associated asynchronous state object
        return _Assoc_state;
    }

    void _Copy_from(const _State_manager& _Other) { // copy stored associated asynchronous state object from _Other
        if (this != _STD addressof(_Other)) { // different, copy
            if (_Assoc_state) {
                _Assoc_state->_Release();
            }

            if (_Other._Assoc_state == nullptr) {
                _Assoc_state = nullptr;
            } else { // do the copy
                _Other._Assoc_state->_Retain();
                _Assoc_state   = _Other._Assoc_state;
                _Get_only_once = _Other._Get_only_once;
            }
        }
    }

    void _Move_from(_State_manager& _Other) { // move stored associated asynchronous state object from _Other
        if (this != _STD addressof(_Other)) { // different, move
            if (_Assoc_state) {
                _Assoc_state->_Release();
            }

            _Assoc_state        = _Other._Assoc_state;
            _Other._Assoc_state = nullptr;
            _Get_only_once      = _Other._Get_only_once;
        }
    }

    bool _Is_ready() const { // return ready status
        return _Assoc_state && _Assoc_state->_Is_ready();
    }

    bool _Is_ready_at_thread_exit() const { // return ready at thread exit status
        return _Assoc_state && _Assoc_state->_Is_ready_at_thread_exit();
    }

private:
    _Associated_state<_Ty>* _Assoc_state;
    bool _Get_only_once;
};

// CLASS TEMPLATE future
template <class _Ty>
class shared_future;

template <class _Ty>
class future : public _State_manager<_Ty> { // class that defines a non-copyable asynchronous return object
                                            // that holds a value
    using _Mybase = _State_manager<_Ty>;

public:
    future() noexcept { // construct
    }

    future(future&& _Other) noexcept : _Mybase(_STD move(_Other), true) { // true表示get只能被調用一次。
    }

    future& operator=(future&& _Right) noexcept { // assign from rvalue future object
        _Mybase::operator=(_STD move(_Right));
        return *this;
    }

    future(const _Mybase& _State, _Nil) : _Mybase(_State, true) { // construct from associated asynchronous state object
    }

    ~future() noexcept { // destroy
    }

    _Ty get() { // block until ready then return the stored result or
                // throw the stored exception
        future _Local{_STD move(*this)}; //注意,移動操做,將使調用get()函數後,future將失去與「共享狀態」的關聯,valid()變爲無效。
        return _STD move(_Local._Get_value());
    }

    _NODISCARD shared_future<_Ty> share() noexcept { // return state as shared_future
        return shared_future<_Ty>(_STD move(*this));
    }

    future(const future&) = delete;
    future& operator=(const future&) = delete;
};

// CLASS TEMPLATE shared_future
template <class _Ty>
class shared_future : public _State_manager<_Ty> { // class that defines a copyable asynchronous return object
                                                   // that holds a value
    using _Mybase = _State_manager<_Ty>;

public:
    shared_future() noexcept { // _Mybase中將_Get_Only_once默認值設爲false,表示get()可屢次調用。
    }

    shared_future(const shared_future& _Other) noexcept : _Mybase(_Other) { //拷貝構造
    }

    shared_future& operator=(const shared_future& _Right) noexcept { // 拷貝賦值

        _Mybase::operator=(_Right);
        return *this;
    }

    shared_future(future<_Ty>&& _Other) noexcept
        : _Mybase(_STD forward<_Mybase>(_Other)) { // 移動構造
    }

    shared_future(shared_future&& _Other) noexcept
        : _Mybase(_STD move(_Other)) { // construct from rvalue shared_future object
    }

    shared_future& operator=(shared_future&& _Right) noexcept { // assign from shared_future rvalue object

        _Mybase::operator=(_STD move(_Right));
        return *this;
    }

    ~shared_future() noexcept { // destroy
    }

    const _Ty& get() const { // 阻塞,直到任務就緒。
        return this->_Get_value(); //返回值是個只讀對象!
    }
};
【「共享狀態」管理器】相關類的源碼摘要

  1. std::thread對應着系統的一個底層線程,期值和std::thread對象同樣,均可以視爲底層線程的句柄,也就是一個線程對象,管理着對應的底層線程。

  2. 調用get()時,會阻塞等待直至任務完成。但在future中,get()函數是經過移動語義將異步結果從future中轉移給get的返回值,所以該函數只能被調用一次,同時也意味着這個future對象也不可再使用(valid()爲false)。而shared_future的get()函數只是簡單地返回異步結果的引用因此能夠屢次被調用

  3. std::future是隻移動類型,而std::shared_future既可移動也可複製

  (1)二者的關係,就如同unique_ptr和shared_ptr。future獨佔「共享狀態」的全部權,而shared_future會共享全部權當調用future的share()函數時,將建立一個shared_future對象,同時原來的future將失去對「共享狀態」對象的全部權,這意味着該future對象不可再使用(其valid()爲false)。

  (2)shared_future可複製,多線程可共享「共享狀態」對象,可用於線程間的通訊。此外,在容器中保存期值時,通常使用shared_future類型而不是future類型,首先由於shared_future是可複製的,其次是由於future對象在使用get函數後將變成無效,會致使容器中保存着失效的future。

  4. future/shared_future析構時,會將其關聯的「共享狀態」對象的引用計數自減一次。當引用計數爲0時,會同時銷燬「共享狀態」對象。

  5. waitfor()函數返回值的三種狀態:

  (1)future_status::ready:共享狀態的標誌已經變爲 ready,即 Provider在共享狀態上設置了值或者異常。

  (2)future_status::timeout:超時,即在規定的時間內共享狀態的標誌沒有變爲ready。

  (3)future_status::deferred:共享狀態包含一個 deferred函數。當使用std::async建立std::launch::deferred任務時,waitfor函數的返回值不是ready,而是deferred!!!

(二)期值的析構行爲分析

  1. std::thread對象析構時,若是仍處於可聯結的狀態(未顯式調用join()或detach()),那麼程序會終止。而future/shared_future對象在析構,有時像是隱式join(),有時像隱式detach(),有時像是兩者都沒有執行

  2.期值的兩種析構行爲:

  (1)常規行爲:析構期值對象自己的成員變量並將其管理的「共享狀態」對象的引用計數自減一次注意,儘管期值的構析行爲有時相似於隱式join或detach,但實際上它並不對任何東西實施join或detach所以,當future配合std::thread使用時,仍須要顯式調用std::thread的join或detach以防止程序被終止。這從使用std::thread的角度看也很合理,必須顯式調用二者之一。

  (2)特殊行爲析構期值時會阻塞線程。根據前面的分析,只有當「共享狀態」對象爲_Task_async_state類型時,纔會阻塞線程直至異步任務結束。

  3. 結論:

  (1)只有持有_Task_async_state類型的「共享狀態」的期值纔會阻塞線程。所以,只有同時知足如下三個條件的期值纔會表現出特殊行爲,任何一個條件不知足時均只表現爲常規行爲。

    ①期值所指向的「共享狀態」由調用std::async函數建立

    ②任務以std::launch::async策略建立。

    ③該期值是最後一個指向該共享狀態的期值。對於std::future而言,因爲獨佔「共享狀態」對象的全部權,所以這一點老是成立。對於std::shared_future而言,「共享狀態」對象的全部權被shared_future共享,因此只有最後一個指向該對象的shared_future才表現出這種特殊行爲。

  (2)對於那些由std::async以std::launch::deferred建立的任務,在最後一個期值析構時仍沒調用get或wait,則被推遲的任務將不會再有機會運行了。由於最後一個期值將會把「共享狀態」對象銷燬。

【編程實驗】std::future和std::shared_future

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <mutex>
#include <vector>

using namespace std;

std::mutex g_mtx;

int func(int tm, std::promise<int>& pr)
{
    auto dura = std::chrono::milliseconds(tm * 100);
    std::this_thread::sleep_for(dura);
    pr.set_value(tm);

    return tm;
}

bool check_prime(int x) //判斷是不是質數
{
    for (int i = 2; i < x; ++i){
        if (x % i == 0)
            return false;
    }

    return true;
}

int calcValue(int x, int y)
{
    std::lock_guard<std::mutex> lck(g_mtx);
    cout<<"calcValue: x = " << x << ", y = " << y << endl;

    return x * x + y * y;
}

int main()
{
    //1.future和shared_future的使用
    //shared_future:用於讓多個線程同時等待信號
    std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
    std::shared_future<void> ready_future = ready_promise.get_future();

    std::chrono::time_point<std::chrono::high_resolution_clock> start;

    //注意,ready_future按值捕獲
    auto fun1 = [&, ready_future]()->std::chrono::duration<double, std::milli>
    {
        t1_ready_promise.set_value();
        ready_future.wait();  //等待,接受來自main()的信號
        return std::chrono::high_resolution_clock::now() - start; //返回start後多少時間才收到信號
    };

    auto fun2 = [&, ready_future]()->std::chrono::duration<double, std::milli>
    {
        t2_ready_promise.set_value();
        ready_future.wait(); //等待,接受來自main()的信號
        return std::chrono::high_resolution_clock::now() - start; //返回start後多少時間才收到信號
    };

    auto result1 = std::async(std::launch::async, fun1);
    auto result2 = std::async(std::launch::async, fun2);

    //等待子線程啓動完成:確保兩個子線程都己經運行,以防止主線程先發通知後子線程才運行。
    //兩個t1/t2_ready_promise經過set_value通知主線程,兩個子線程己所有啓動,並進入等待主線程的通知。
    t1_ready_promise.get_future().wait(); 
    t2_ready_promise.get_future().wait(); 

    start = std::chrono::high_resolution_clock::now();

    //向子線程發送信號
    ready_promise.set_value();
    std::cout << "Thread 1 received the signal " << result1.get().count() << " ms after start." << endl;
    std::cout << "Thread 2 received the signal " << result2.get().count() << " ms after start." << endl;

    //1.2 用容器保存std::shared_future。
    vector<std::shared_future<int>> vec;
    auto fut1 = std::async(std::launch::async, [](int a, int b) {return a + b;}, 2, 3);
    vec.push_back(fut1.share());
    std::cout << "The shared_future result is " <<vec[0].get() << endl;

    ////2. wait_for的返回值類型
    auto fut2 = std::async(std::launch::deferred, calcValue, 1, 2); //延時函數,同步任務
    auto fut3 = std::async(std::launch::async, []() {std::this_thread::sleep_for(1s); });//異步任務

    if (fut2.wait_for(0s) == std::future_status::deferred) {
        cout <<"fut2 is a deferred task!" << endl;
    }

    cout << "waiting";
    while (fut3.wait_for(20ms) != std::future_status::ready) { // std::future_status::timeout
        cout << ".";
    }
    cout << endl;

    //3. 期值的析構行爲
    //3.1常規行爲:期值析構時,並不對任何東西實施join或detach
    {
        std::promise<int> pr;
        std::future<int> fut = pr.get_future();

        std::thread th(func, 10, std::ref(pr)); 
        th.detach(); //必須確保在th析構前,調用join或detach

        auto res = fut.get();

        cout << res << endl;
    }

    {
        std::packaged_task<int(int, int)> pt(calcValue);
        auto fut = pt.get_future();
        std::thread th(std::move(pt), 1, 2); //pt是隻移動類型
        cout << fut2.get() << endl;
        th.join(); //th析構前,必須調用join()或detach(),可放入get以前或以後。
    }

    //3.2 特殊行爲
    {
        //由async建立的std::launch::async類型的任務,會在fut離開做用域阻塞等待任何完成(相似於隱式join)
        auto fut1 = std::async(std::launch::async, calcValue, 3, 4);
 
        //std::launch::deferred類型的任務,在fut2離開做用域時並不會阻塞等待(普通析構行爲!)。
        auto fut2 = std::async(std::launch::deferred, check_prime, 123456);

        //若是不調用fut2.get(),任務永遠不會被執行。
        if (fut2.get()) {//阻塞等待子線程結束,並獲取任務的結果。
            cout << "123456 is prime." << endl;
        }
        else {
            cout << "123456 is not prime." << endl;
        }
    } 

    return 0;
}
/*輸出結果
Thread 1 received the signal 0.0199 ms after start.
Thread 2 received the signal 0.0139 ms after start.
The shared_future result is 5
fut2 is a deferred task!
waiting...............................................
10
calcValue: x = 1, y = 2
5
calcValue: x = 1, y = 2
123456 is not prime.
calcValue: x = 3, y = 4
*/
相關文章
相關標籤/搜索