本文基於Boost 1.69,在展現源代碼時刪減了部分deprecated或者不一樣配置的與本文主題無關的代碼塊。html
本期討論的是Asio中涉及的併發編程實踐,依舊是基於源代碼進行解析。react
scheduler
操做隊列不可避免的要考慮多線程的問題:操做隊列與線程的關係,操做隊列的線程安全問題以及操做在多線程環境的執行。算法
call_stack
and context
。查看源代碼可知,call_stack
包含一個tss_ptr<context>
類型的靜態數據成員top_
,其中tss_ptr
爲thread specific storage指針,在Unix平臺經過::pthread_xxxxxx
接口將某個地址與Thread-specific key綁定;context
是call_stack
的嵌套類,有趣的是,context
的構造函數是一個push操做,而析構函數是pop操做,操做對象是top_
。編程
conditionally_enabled_mutex
and conditionally_enabled_event
。基於std::condition_variable
(或其它相似的實現),實現了一些常見的線程控制功能。conditionally_enabled_mutex
額外包裝了一個數據成員enabled_
,當enabled_
等於false時,不進行相應的操做。安全
調度過程從兩個角度去分析,(生產)用戶提交任務和(消費並生產)io_context
的event processing loop。多線程
Asio提交任務的兩個典型的內部接口是scheduler::post_immediate_completion
函數(用於提交通常性任務,查看boost::asio::post
源碼可知)和reactor::start_op
(用於提交io相關任務,查看basic_stream_socket
源碼可知)方法。查看scheduler::post_immediate_completion
源碼,涉及到併發的操做很簡單,加鎖,將任務放入scheduler
數據成員op_queue_
,解鎖並喚醒一個線程。併發
// file: <boost/asio/detail/impl/scheduler.ipp> ... void scheduler::post_immediate_completion( scheduler::operation* op, bool is_continuation) { #if defined(BOOST_ASIO_HAS_THREADS) if (one_thread_ || is_continuation) { if (thread_info_base* this_thread = thread_call_stack::contains(this)) { ++static_cast<thread_info*>(this_thread)->private_outstanding_work; static_cast<thread_info*>(this_thread)->private_op_queue.push(op); return; } } #else // defined(BOOST_ASIO_HAS_THREADS) (void)is_continuation; #endif // defined(BOOST_ASIO_HAS_THREADS) work_started(); mutex::scoped_lock lock(mutex_); op_queue_.push(op); wake_one_thread_and_unlock(lock); } ...
查看reactor::start_op
源碼。注意到RAII風格的互斥包裝器descriptor_lock
獲取的是對於某個descriptor的鎖。針對不一樣的socket的reactor::start_op
能夠並行執行。本文的主題是concurrency,因此reactor::start_op
函數體這裏不作過多的介紹。注意末尾的scheduler_.work_started
,該函數僅僅執行++outstanding_work_
。異步
// file: <boost/asio/detail/impl/epoll_reactor.ipp> ... void epoll_reactor::start_op(int op_type, socket_type descriptor, epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op, bool is_continuation, bool allow_speculative) { if (!descriptor_data) { op->ec_ = boost::asio::error::bad_descriptor; post_immediate_completion(op, is_continuation); return; } mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); if (descriptor_data->shutdown_) { post_immediate_completion(op, is_continuation); return; } if (descriptor_data->op_queue_[op_type].empty()) { if (allow_speculative && (op_type != read_op || descriptor_data->op_queue_[except_op].empty())) { if (descriptor_data->try_speculative_[op_type]) { if (reactor_op::status status = op->perform()) { if (status == reactor_op::done_and_exhausted) if (descriptor_data->registered_events_ != 0) descriptor_data->try_speculative_[op_type] = false; descriptor_lock.unlock(); scheduler_.post_immediate_completion(op, is_continuation); return; } } if (descriptor_data->registered_events_ == 0) { op->ec_ = boost::asio::error::operation_not_supported; scheduler_.post_immediate_completion(op, is_continuation); return; } if (op_type == write_op) { if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) { epoll_event ev = { 0, { 0 } }; ev.events = descriptor_data->registered_events_ | EPOLLOUT; ev.data.ptr = descriptor_data; if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) { descriptor_data->registered_events_ |= ev.events; } else { op->ec_ = boost::system::error_code(errno, boost::asio::error::get_system_category()); scheduler_.post_immediate_completion(op, is_continuation); return; } } } } else if (descriptor_data->registered_events_ == 0) { op->ec_ = boost::asio::error::operation_not_supported; scheduler_.post_immediate_completion(op, is_continuation); return; } else { if (op_type == write_op) { descriptor_data->registered_events_ |= EPOLLOUT; } epoll_event ev = { 0, { 0 } }; ev.events = descriptor_data->registered_events_; ev.data.ptr = descriptor_data; epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); } } descriptor_data->op_queue_[op_type].push(op); scheduler_.work_started(); } ...
接下來是負責「消費和生產」的io_context
的event processing loop。loop主要調用其成員的scheduler::run
。從scheduler::run
入手瞭解操做隊列的調用過程:socket
this_thread
(成員private_op_queue
)scheduler
地址和local變量this_thread
地址入棧mutex_
,其中mutex_
爲scheduler
數據成員do_run_one
,lock mutex_
,循環scheduler
地址和local變量this_thread
地址出棧scheduler::do_run_one
。如今來分析scheduler::do_run_one
的執行過程:async
當scheduler
的操做隊列op_queue_
不爲空時
op_queue_
頂部成員o
並pop op_queue_
若是o
等於&task_operation_
unlock_and_signal_one
,不然unlock
。剩下的部分能夠併發執行:task_cleanup
實例reactor::run
,傳入的操做隊列爲線程私有隊列task_cleanup
實例析構,cleanup(下文解析)若是o
不等於&task_operation_
unlock_and_signal_one
,不然unlock
。剩下的部分能夠併發執行:work_cleanup
實例o->complete
(完成操做隊列首位的操做)work_cleanup
實例析構,cleanup當scheduler
的操做隊列op_queue_
爲空時
wakeup_event_
clear and wait,等待其餘線程喚醒本線程介紹一下task_cleanup
類,查看源碼發現task_cleanup
惟一的成員函數爲析構函數,主要功能也由其實現:
scheduler_->outstanding_work_
進行increment(非原子類型)this_thread_->private_outstanding_work
操做。scheduler_->op_queue_.push(this_thread_->private_op_queue)
等操做。work_cleanup
略微不一樣,讀者請自行閱讀源碼瞭解。
// file: <boost/asio/detail/impl/scheduler.ipp> ... std::size_t scheduler::run(boost::system::error_code& ec) { ec = boost::system::error_code(); if (outstanding_work_ == 0) { stop(); return 0; } thread_info this_thread; this_thread.private_outstanding_work = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); std::size_t n = 0; for (; do_run_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits<std::size_t>::max)()) ++n; return n; } ... std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, scheduler::thread_info& this_thread, const boost::system::error_code& ec) { while (!stopped_) { if (!op_queue_.empty()) { // Prepare to execute first handler from queue. operation* o = op_queue_.front(); op_queue_.pop(); bool more_handlers = (!op_queue_.empty()); if (o == &task_operation_) { task_interrupted_ = more_handlers; if (more_handlers && !one_thread_) wakeup_event_.unlock_and_signal_one(lock); else lock.unlock(); task_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue); } else { std::size_t task_result = o->task_result_; if (more_handlers && !one_thread_) wake_one_thread_and_unlock(lock); else lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. o->complete(this, ec, task_result); return 1; } } else { wakeup_event_.clear(lock); wakeup_event_.wait(lock); } } return 0; } ... ~task_cleanup() { if (this_thread_->private_outstanding_work > 0) { boost::asio::detail::increment( scheduler_->outstanding_work_, this_thread_->private_outstanding_work); } this_thread_->private_outstanding_work = 0; // Enqueue the completed operations and reinsert the task at the end of // the operation queue. lock_->lock(); scheduler_->task_interrupted_ = true; scheduler_->op_queue_.push(this_thread_->private_op_queue); scheduler_->op_queue_.push(&scheduler_->task_operation_); } ...
學習scheduler
源碼發現,其併發特性以下:
scheduler
數據成員op_queue_
的操做必須獲取scheduler
自身的鎖來完成,沒法併發scheduler
數據成員(原子類型)outstanding_work_
的操做爲原子操做reactor::run
的隊列參數爲線程私有隊列,其內部epoll_wait併發執行。reactor::start_op
須要獲取descriptor的鎖,不一樣descriptor之間能夠併發執行。值得注意的是關於op_queue_
的幾乎全部操做都須要在加鎖互斥的狀況下完成,這聽上去有些不怎麼「併發」。Boost有一個lockfree隊列實現,雖然能夠避免鎖的使用,然而這種算法在實際運用中一般比基於鎖的算法表現更差。並且scheduler
鎖只是在op_queue_
獲取元素(指針)及pop元素的這一個較短的時間段內持有,用戶操做的執行並不須要鎖,綜合來看併發能力也不算差。
當咱們要求用戶的多個操做互斥時,能夠經過strand完成。咱們能夠經過strand::dispatch
提交互斥操做,具體實現爲detail::strand_executor_service::dispatch
,其執行過程以下:
strand_executor_service::enqueue
,將返回值保存於first
first
爲真則dispatch被invoker
類包裝的strand implementation。// file: <boost/asio/strand.hpp> ... template <typename Function, typename Allocator> void dispatch(BOOST_ASIO_MOVE_ARG(Function) f, const Allocator& a) const { detail::strand_executor_service::dispatch(impl_, executor_, BOOST_ASIO_MOVE_CAST(Function)(f), a); } ... // file: <boost/asio/detail/impl/strand_executor_service.hpp> ... template <typename Executor, typename Function, typename Allocator> void strand_executor_service::dispatch(const implementation_type& impl, Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a) { typedef typename decay<Function>::type function_type; // If we are already in the strand then the function can run immediately. if (call_stack<strand_impl>::contains(impl.get())) { // Make a local, non-const copy of the function. function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(function)); fenced_block b(fenced_block::full); boost_asio_handler_invoke_helpers::invoke(tmp, tmp); return; } // Allocate and construct an operation to wrap the function. typedef executor_op<function_type, Allocator> op; typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 }; p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a); BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p, "strand_executor", impl.get(), 0, "dispatch")); // Add the function to the strand and schedule the strand if required. bool first = enqueue(impl, p.p); p.v = p.p = 0; if (first) ex.dispatch(invoker<Executor>(impl, ex), a); } ...
接上文,關鍵函數爲strand_executor_service::enqueue
和invoker::operator()
。其中:
strand_executor_service::enqueue
負責在加鎖狀態下操做入列,並經過對一個bool變量的斷定和賦值來肯定是否第一個獲取鎖invoker::operator()
:
strand_impl
入棧call_stack<strand_impl>
。ready_queue_
內全部操做,注意因爲call_stack<strand_impl>
的使用,若是一個操做在執行過程調用了同一個strand_impl
的dispatch,則被dispatch的操做會當即執行調用on_invoker_exit
析構函數:
waiting_queue_
的成員移動到ready_queue_
ready_queue_
爲空則清除locked_
(代表做爲"當前第一個"獲取鎖的線程,相關工做已經完成)ready_queue_
不爲空則post invoker
// file: <boost/asio/detail/impl/strand_executor_service.ipp> ... bool strand_executor_service::enqueue(const implementation_type& impl, scheduler_operation* op) { impl->mutex_->lock(); if (impl->shutdown_) { impl->mutex_->unlock(); op->destroy(); return false; } else if (impl->locked_) { // Some other function already holds the strand lock. Enqueue for later. impl->waiting_queue_.push(op); impl->mutex_->unlock(); return false; } else { // The function is acquiring the strand lock and so is responsible for // scheduling the strand. impl->locked_ = true; impl->mutex_->unlock(); impl->ready_queue_.push(op); return true; } } ... // file: <boost/asio/detail/impl/strand_executor_service.hpp> ~on_invoker_exit() { this_->impl_->mutex_->lock(); this_->impl_->ready_queue_.push(this_->impl_->waiting_queue_); bool more_handlers = this_->impl_->locked_ = !this_->impl_->ready_queue_.empty(); this_->impl_->mutex_->unlock(); if (more_handlers) { Executor ex(this_->work_.get_executor()); recycling_allocator<void> allocator; ex.post(BOOST_ASIO_MOVE_CAST(invoker)(*this_), allocator); } } ... void operator()() { // Indicate that this strand is executing on the current thread. call_stack<strand_impl>::context ctx(impl_.get()); // Ensure the next handler, if any, is scheduled on block exit. on_invoker_exit on_exit = { this }; (void)on_exit; // Run all ready handlers. No lock is required since the ready queue is // accessed only within the strand. boost::system::error_code ec; while (scheduler_operation* o = impl_->ready_queue_.front()) { impl_->ready_queue_.pop(); o->complete(impl_.get(), ec, 0); } } ...
strand簡單來講就是多個線程獲取strand鎖而後將操做加入隊列,由某一個線程來dispatch strand (as op and contains op)。咱們來看看strand對併發的影響:
(todo 因爲筆者對於Asio理解不夠深刻,這部份內容處於未完成狀態)
回顧一下executor_op::do_complete
的源碼,在調用handler
以前構造了一個fenced_block
實例,這是與併發相關的代碼。std版本的fenced_block
源碼以下,類的代碼比較簡單,其主要在構造和析構時調用(或不調用)函數std::atomic_thread_fence
。該函數用於創建內存同步順序。全局搜索Asio源碼發現,xxxxxxx_op
在執行complete
以前構造fenced_block b(fenced_block::half)
,而io_context::executor_type
, strand_service
, thread_pool
的成員函數dispatch
可能直接執行操做,執行以前構造fenced_block b(fenced_block::full)
。抽象的來講,fence的做用在於對fence先後的memory operations的順序進行某些限制,考慮到cpu或者編譯器可能爲了優化而打亂順序。這樣,其餘線程在觀察本線程對內存產生的side effect具備必定的順序。
爲了講解fence在Asio的做用,介紹一下其餘相關內容
implicit strand,引用官網的說明:
Where there is a single chain of asynchronous operations associated with a connection (e.g. in a half duplex protocol implementation like HTTP) there is no possibility of concurrent execution of the handlers. This is an implicit strand.
Concurrency Hints。Concurrency Hints爲BOOST_ASIO_CONCURRENCY_HINT_UNSAFE時不使用部分mutex,但多線程運行仍然是可能的,這時用戶須要額外的操做來保證io_context內部狀態的安全性,官網說明以下:
BOOST_ASIO_CONCURRENCY_HINT_UNSAFEThis special concurrency hint disables locking in both the scheduler and reactor I/O. This hint has the following restrictions:
— Care must be taken to ensure that all operations on the io_context and any of its associated I/O objects (such as sockets and timers) occur in only one thread at a time.
— Asynchronous resolve operations fail with operation_not_supported.
— If a signal_set is used with the io_context, signal_set objects cannot be used with any other io_context in the program.
當scheduler和reactor不啓用mutex,用戶的操做又符合implicit strand的狀況下,Asio如何保證handler A在thread 1修改數據後能被隨後的運行在thread 2的handler B看到呢?咱們來考慮一下handler A與handler B可能的執行過程
fenced_block b(fenced_block::half);
std::atomic_thread_fence(std::memory_order_release);
outstanding_work_
;或者執行前一個任務以後的xxxx_cleanup析構函數注意這個順序:handler A的寫操做->fence析構->atomic write;atomic read->handler B的讀操做。這剛好是Fence-atomic synchronization。保證了B可以看到A寫入x的數據。
(todo)咱們再來考慮fenced_block b(fenced_block::full);
在Asio的應用。
// file: <boost/asio/detail/executor_op.hpp> ... static void do_complete(void* owner, Operation* base, const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/) { ... // Make the upcall if required. if (owner) { fenced_block b(fenced_block::half); BOOST_ASIO_HANDLER_INVOCATION_BEGIN(()); boost_asio_handler_invoke_helpers::invoke(handler, handler); BOOST_ASIO_HANDLER_INVOCATION_END; } } ... // file: <boost/asio/detail/std_fenced_block.hpp> ... class std_fenced_block : private noncopyable { public: enum half_t { half }; enum full_t { full }; // Constructor for a half fenced block. explicit std_fenced_block(half_t) { } // Constructor for a full fenced block. explicit std_fenced_block(full_t) { std::atomic_thread_fence(std::memory_order_acquire); } // Destructor. ~std_fenced_block() { std::atomic_thread_fence(std::memory_order_release); } }; ... // file: <boost/asio/impl/io_context.hpp> template <typename Function, typename Allocator> void io_context::executor_type::dispatch( BOOST_ASIO_MOVE_ARG(Function) f, const Allocator& a) const { typedef typename decay<Function>::type function_type; // Invoke immediately if we are already inside the thread pool. if (io_context_.impl_.can_dispatch()) { // Make a local, non-const copy of the function. function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(f)); detail::fenced_block b(detail::fenced_block::full); boost_asio_handler_invoke_helpers::invoke(tmp, tmp); return; } // Allocate and construct an operation to wrap the function. .... } ...
io_context
的構造函數接受一個名爲Concurrency Hints的參數,這個參數會影響io_context
的併發特性。具體說明見官方,由此咱們能夠總結一下線程安全問題的分工:
Asio保證的是:
io_context
內部狀態的線程安全,或者在其餘狀況下告知用戶如何確保這種安全性用戶的責任是:
io_context
內部狀態的安全