Boost.Asio源碼閱讀(3): concurrency

本文基於Boost 1.69,在展現源代碼時刪減了部分deprecated或者不一樣配置的與本文主題無關的代碼塊。html

簡介

本期討論的是Asio中涉及的併發編程實踐,依舊是基於源代碼進行解析。react

多線程技術

scheduler多線程調度

scheduler操做隊列不可避免的要考慮多線程的問題:操做隊列與線程的關係,操做隊列的線程安全問題以及操做在多線程環境的執行。算法

工具類

call_stack and context。查看源代碼可知,call_stack包含一個tss_ptr<context>類型的靜態數據成員top_,其中tss_ptr爲thread specific storage指針,在Unix平臺經過::pthread_xxxxxx接口將某個地址與Thread-specific key綁定;contextcall_stack的嵌套類,有趣的是,context的構造函數是一個push操做,而析構函數是pop操做,操做對象是top_編程

conditionally_enabled_mutex and conditionally_enabled_event。基於std::condition_variable(或其它相似的實現),實現了一些常見的線程控制功能。conditionally_enabled_mutex額外包裝了一個數據成員enabled_,當enabled_等於false時,不進行相應的操做。安全

調度過程解析

調度過程從兩個角度去分析,(生產)用戶提交任務和(消費並生產)io_context的event processing loop。多線程

Asio提交任務的兩個典型的內部接口是scheduler::post_immediate_completion函數(用於提交通常性任務,查看boost::asio::post源碼可知)和reactor::start_op(用於提交io相關任務,查看basic_stream_socket源碼可知)方法。查看scheduler::post_immediate_completion源碼,涉及到併發的操做很簡單,加鎖,將任務放入scheduler數據成員op_queue_,解鎖並喚醒一個線程。併發

// file: <boost/asio/detail/impl/scheduler.ipp>
...
void scheduler::post_immediate_completion(
    scheduler::operation* op, bool is_continuation)
{
#if defined(BOOST_ASIO_HAS_THREADS)
  if (one_thread_ || is_continuation)
  {
    if (thread_info_base* this_thread = thread_call_stack::contains(this))
    {
      ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
      static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
      return;
    }
  }
#else // defined(BOOST_ASIO_HAS_THREADS)
  (void)is_continuation;
#endif // defined(BOOST_ASIO_HAS_THREADS)

  work_started();
  mutex::scoped_lock lock(mutex_);
  op_queue_.push(op);
  wake_one_thread_and_unlock(lock);
}
...

查看reactor::start_op源碼。注意到RAII風格的互斥包裝器descriptor_lock獲取的是對於某個descriptor的鎖。針對不一樣的socket的reactor::start_op能夠並行執行。本文的主題是concurrency,因此reactor::start_op函數體這裏不作過多的介紹。注意末尾的scheduler_.work_started,該函數僅僅執行++outstanding_work_異步

// file: <boost/asio/detail/impl/epoll_reactor.ipp>
...
void epoll_reactor::start_op(int op_type, socket_type descriptor,
    epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
    bool is_continuation, bool allow_speculative)
{
  if (!descriptor_data)
  {
    op->ec_ = boost::asio::error::bad_descriptor;
    post_immediate_completion(op, is_continuation);
    return;
  }

  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);

  if (descriptor_data->shutdown_)
  {
    post_immediate_completion(op, is_continuation);
    return;
  }

  if (descriptor_data->op_queue_[op_type].empty())
  {
    if (allow_speculative
        && (op_type != read_op
          || descriptor_data->op_queue_[except_op].empty()))
    {
      if (descriptor_data->try_speculative_[op_type])
      {
        if (reactor_op::status status = op->perform())
        {
          if (status == reactor_op::done_and_exhausted)
            if (descriptor_data->registered_events_ != 0)
              descriptor_data->try_speculative_[op_type] = false;
          descriptor_lock.unlock();
          scheduler_.post_immediate_completion(op, is_continuation);
          return;
        }
      }

      if (descriptor_data->registered_events_ == 0)
      {
        op->ec_ = boost::asio::error::operation_not_supported;
        scheduler_.post_immediate_completion(op, is_continuation);
        return;
      }

      if (op_type == write_op)
      {
        if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
        {
          epoll_event ev = { 0, { 0 } };
          ev.events = descriptor_data->registered_events_ | EPOLLOUT;
          ev.data.ptr = descriptor_data;
          if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
          {
            descriptor_data->registered_events_ |= ev.events;
          }
          else
          {
            op->ec_ = boost::system::error_code(errno,
                boost::asio::error::get_system_category());
            scheduler_.post_immediate_completion(op, is_continuation);
            return;
          }
        }
      }
    }
    else if (descriptor_data->registered_events_ == 0)
    {
      op->ec_ = boost::asio::error::operation_not_supported;
      scheduler_.post_immediate_completion(op, is_continuation);
      return;
    }
    else
    {
      if (op_type == write_op)
      {
        descriptor_data->registered_events_ |= EPOLLOUT;
      }

      epoll_event ev = { 0, { 0 } };
      ev.events = descriptor_data->registered_events_;
      ev.data.ptr = descriptor_data;
      epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
    }
  }

  descriptor_data->op_queue_[op_type].push(op);
  scheduler_.work_started();
}
...

接下來是負責「消費和生產」的io_context的event processing loop。loop主要調用其成員的scheduler::run。從scheduler::run入手瞭解操做隊列的調用過程:socket

  1. 聲明本地變量this_thread(成員private_op_queue
  2. scheduler地址和local變量this_thread地址入棧
  3. lock mutex_,其中mutex_scheduler數據成員
  4. 調用do_run_one,lock mutex_,循環
  5. RAII,scheduler地址和local變量this_thread地址出棧

scheduler::do_run_one。如今來分析scheduler::do_run_one的執行過程:async

  1. scheduler的操做隊列op_queue_不爲空時

    1. 複製op_queue_頂部成員o並pop op_queue_
    2. 若是o等於&task_operation_

      1. 若是還有更多任務而且多線程的狀況下unlock_and_signal_one,不然unlock。剩下的部分能夠併發執行:
      2. 初始化task_cleanup實例
      3. 執行reactor::run,傳入的操做隊列爲線程私有隊列
      4. task_cleanup實例析構,cleanup(下文解析)
    3. 若是o不等於&task_operation_

      1. 若是還有更多任務而且多線程的狀況下unlock_and_signal_one,不然unlock。剩下的部分能夠併發執行:
      2. 初始化work_cleanup實例
      3. 執行o->complete(完成操做隊列首位的操做)
      4. work_cleanup實例析構,cleanup
  2. scheduler的操做隊列op_queue_爲空時

    1. wakeup_event_ clear and wait,等待其餘線程喚醒本線程

介紹一下task_cleanup類,查看源碼發現task_cleanup惟一的成員函數爲析構函數,主要功能也由其實現:

  1. 對(原子類型)scheduler_->outstanding_work_進行increment(非原子類型)this_thread_->private_outstanding_work操做。
  2. 加鎖並執行scheduler_->op_queue_.push(this_thread_->private_op_queue)等操做。

work_cleanup略微不一樣,讀者請自行閱讀源碼瞭解。

// file: <boost/asio/detail/impl/scheduler.ipp>
...
std::size_t scheduler::run(boost::system::error_code& ec)
{
  ec = boost::system::error_code();
  if (outstanding_work_ == 0)
  {
    stop();
    return 0;
  }

  thread_info this_thread;
  this_thread.private_outstanding_work = 0;
  thread_call_stack::context ctx(this, this_thread);

  mutex::scoped_lock lock(mutex_);

  std::size_t n = 0;
  for (; do_run_one(lock, this_thread, ec); lock.lock())
    if (n != (std::numeric_limits<std::size_t>::max)())
      ++n;
  return n;
}
...
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
    scheduler::thread_info& this_thread,
    const boost::system::error_code& ec)
{
  while (!stopped_)
  {
    if (!op_queue_.empty())
    {
      // Prepare to execute first handler from queue.
      operation* o = op_queue_.front();
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == &task_operation_)
      {
        task_interrupted_ = more_handlers;

        if (more_handlers && !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);
        else
          lock.unlock();

        task_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        // Run the task. May throw an exception. Only block if the operation
        // queue is empty and we're not polling, otherwise we want to return
        // as soon as possible.
        task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
      }
      else
      {
        std::size_t task_result = o->task_result_;

        if (more_handlers && !one_thread_)
          wake_one_thread_and_unlock(lock);
        else
          lock.unlock();

        // Ensure the count of outstanding work is decremented on block exit.
        work_cleanup on_exit = { this, &lock, &this_thread };
        (void)on_exit;

        // Complete the operation. May throw an exception. Deletes the object.
        o->complete(this, ec, task_result);

        return 1;
      }
    }
    else
    {
      wakeup_event_.clear(lock);
      wakeup_event_.wait(lock);
    }
  }

  return 0;
}
...
  ~task_cleanup()
  {
    if (this_thread_->private_outstanding_work > 0)
    {
      boost::asio::detail::increment(
          scheduler_->outstanding_work_,
          this_thread_->private_outstanding_work);
    }
    this_thread_->private_outstanding_work = 0;

    // Enqueue the completed operations and reinsert the task at the end of
    // the operation queue.
    lock_->lock();
    scheduler_->task_interrupted_ = true;
    scheduler_->op_queue_.push(this_thread_->private_op_queue);
    scheduler_->op_queue_.push(&scheduler_->task_operation_);
  }
...

總結

學習scheduler源碼發現,其併發特性以下:

  1. 全部針對scheduler數據成員op_queue_的操做必須獲取scheduler自身的鎖來完成,沒法併發
  2. 針對scheduler數據成員(原子類型)outstanding_work_的操做爲原子操做
  3. reactor::run 的隊列參數爲線程私有隊列,其內部epoll_wait併發執行。
  4. reactor::start_op 須要獲取descriptor的鎖,不一樣descriptor之間能夠併發執行。

值得注意的是關於op_queue_的幾乎全部操做都須要在加鎖互斥的狀況下完成,這聽上去有些不怎麼「併發」。Boost有一個lockfree隊列實現,雖然能夠避免鎖的使用,然而這種算法在實際運用中一般比基於鎖的算法表現更差。並且scheduler鎖只是在op_queue_獲取元素(指針)及pop元素的這一個較短的時間段內持有,用戶操做的執行並不須要鎖,綜合來看併發能力也不算差。

strand

當咱們要求用戶的多個操做互斥時,能夠經過strand完成。咱們能夠經過strand::dispatch提交互斥操做,具體實現爲detail::strand_executor_service::dispatch,其執行過程以下:

  1. 判斷是否在strand內,若是是直接執行操做並返回
  2. 包裝操做並strand_executor_service::enqueue,將返回值保存於first
  3. first爲真則dispatch被invoker類包裝的strand implementation。
// file: <boost/asio/strand.hpp>
...
  template <typename Function, typename Allocator>
  void dispatch(BOOST_ASIO_MOVE_ARG(Function) f, const Allocator& a) const
  {
    detail::strand_executor_service::dispatch(impl_,
        executor_, BOOST_ASIO_MOVE_CAST(Function)(f), a);
  }
...
// file: <boost/asio/detail/impl/strand_executor_service.hpp>
...
template <typename Executor, typename Function, typename Allocator>
void strand_executor_service::dispatch(const implementation_type& impl,
    Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a)
{
  typedef typename decay<Function>::type function_type;

  // If we are already in the strand then the function can run immediately.
  if (call_stack<strand_impl>::contains(impl.get()))
  {
    // Make a local, non-const copy of the function.
    function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(function));

    fenced_block b(fenced_block::full);
    boost_asio_handler_invoke_helpers::invoke(tmp, tmp);
    return;
  }

  // Allocate and construct an operation to wrap the function.
  typedef executor_op<function_type, Allocator> op;
  typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a);

  BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
        "strand_executor", impl.get(), 0, "dispatch"));

  // Add the function to the strand and schedule the strand if required.
  bool first = enqueue(impl, p.p);
  p.v = p.p = 0;
  if (first)
    ex.dispatch(invoker<Executor>(impl, ex), a);
}
...

接上文,關鍵函數爲strand_executor_service::enqueueinvoker::operator()。其中:

  • strand_executor_service::enqueue負責在加鎖狀態下操做入列,並經過對一個bool變量的斷定和賦值來肯定是否第一個獲取鎖
  • invoker::operator()

    1. strand_impl入棧call_stack<strand_impl>
    2. 按順序執行ready_queue_內全部操做,注意因爲call_stack<strand_impl>的使用,若是一個操做在執行過程調用了同一個strand_impl的dispatch,則被dispatch的操做會當即執行
    3. 調用on_invoker_exit析構函數:

      1. 加鎖
      2. waiting_queue_的成員移動到ready_queue_
      3. ready_queue_爲空則清除locked_(代表做爲"當前第一個"獲取鎖的線程,相關工做已經完成)
      4. 釋放鎖
      5. 若是(加鎖狀態下)剛纔判斷ready_queue_不爲空則post invoker
// file: <boost/asio/detail/impl/strand_executor_service.ipp>
...
bool strand_executor_service::enqueue(const implementation_type& impl,
    scheduler_operation* op)
{
  impl->mutex_->lock();
  if (impl->shutdown_)
  {
    impl->mutex_->unlock();
    op->destroy();
    return false;
  }
  else if (impl->locked_)
  {
    // Some other function already holds the strand lock. Enqueue for later.
    impl->waiting_queue_.push(op);
    impl->mutex_->unlock();
    return false;
  }
  else
  {
    // The function is acquiring the strand lock and so is responsible for
    // scheduling the strand.
    impl->locked_ = true;
    impl->mutex_->unlock();
    impl->ready_queue_.push(op);
    return true;
  }
}
...
// file: <boost/asio/detail/impl/strand_executor_service.hpp>
~on_invoker_exit()
{
    this_->impl_->mutex_->lock();
    this_->impl_->ready_queue_.push(this_->impl_->waiting_queue_);
    bool more_handlers = this_->impl_->locked_ =
    !this_->impl_->ready_queue_.empty();
    this_->impl_->mutex_->unlock();

    if (more_handlers)
    {
    Executor ex(this_->work_.get_executor());
    recycling_allocator<void> allocator;
    ex.post(BOOST_ASIO_MOVE_CAST(invoker)(*this_), allocator);
    }
}
...
  void operator()()
  {
    // Indicate that this strand is executing on the current thread.
    call_stack<strand_impl>::context ctx(impl_.get());

    // Ensure the next handler, if any, is scheduled on block exit.
    on_invoker_exit on_exit = { this };
    (void)on_exit;

    // Run all ready handlers. No lock is required since the ready queue is
    // accessed only within the strand.
    boost::system::error_code ec;
    while (scheduler_operation* o = impl_->ready_queue_.front())
    {
      impl_->ready_queue_.pop();
      o->complete(impl_.get(), ec, 0);
    }
  }
...

總結

strand簡單來講就是多個線程獲取strand鎖而後將操做加入隊列,由某一個線程來dispatch strand (as op and contains op)。咱們來看看strand對併發的影響:

  • 對於運行在strand內(即操做保存在strand的隊列)的操做來講,顯然的,是按順序執行。
  • 鎖。運行strand不可避免的增長了額外的鎖的操做,因爲strand包含兩個隊列('ready', 'wait')的多線程執行邏輯,持有鎖的時間略微增長,但主要規律與上文相同,即只是在處理隊列(且隊列成員類型爲指針,開銷小)時加鎖,操做在執行過程當中不須要加鎖。

memory_order

(todo 因爲筆者對於Asio理解不夠深刻,這部份內容處於未完成狀態)

回顧一下executor_op::do_complete的源碼,在調用handler以前構造了一個fenced_block實例,這是與併發相關的代碼。std版本的fenced_block源碼以下,類的代碼比較簡單,其主要在構造和析構時調用(或不調用)函數std::atomic_thread_fence。該函數用於創建內存同步順序。全局搜索Asio源碼發現,xxxxxxx_op在執行complete以前構造fenced_block b(fenced_block::half),而io_context::executor_type, strand_service, thread_pool的成員函數dispatch可能直接執行操做,執行以前構造fenced_block b(fenced_block::full)。抽象的來講,fence的做用在於對fence先後的memory operations的順序進行某些限制,考慮到cpu或者編譯器可能爲了優化而打亂順序。這樣,其餘線程在觀察本線程對內存產生的side effect具備必定的順序。

爲了講解fence在Asio的做用,介紹一下其餘相關內容

implicit strand引用官網的說明:

Where there is a single chain of asynchronous operations associated with a connection (e.g. in a half duplex protocol implementation like HTTP) there is no possibility of concurrent execution of the handlers. This is an implicit strand.

Concurrency Hints。Concurrency Hints爲BOOST_ASIO_CONCURRENCY_HINT_UNSAFE時不使用部分mutex,但多線程運行仍然是可能的,這時用戶須要額外的操做來保證io_context內部狀態的安全性,官網說明以下

BOOST_ASIO_CONCURRENCY_HINT_UNSAFE

This special concurrency hint disables locking in both the scheduler and reactor I/O. This hint has the following restrictions:

— Care must be taken to ensure that all operations on the io_context and any of its associated I/O objects (such as sockets and timers) occur in only one thread at a time.

— Asynchronous resolve operations fail with operation_not_supported.

— If a signal_set is used with the io_context, signal_set objects cannot be used with any other io_context in the program.

當scheduler和reactor不啓用mutex,用戶的操做又符合implicit strand的狀況下,Asio如何保證handler A在thread 1修改數據後能被隨後的運行在thread 2的handler B看到呢?咱們來考慮一下handler A與handler B可能的執行過程

  1. [thread 1] fenced_block b(fenced_block::half);
  2. [thread 1] handler A執行寫入變量x
  3. [thread 1] handler A提交一個異步read加上回調handler B的操做
  4. [thread 1] fence析構std::atomic_thread_fence(std::memory_order_release);
  5. [thread 1] xxxx_cleanup析構函數(包含原子操做)
  6. [thread 1] .....
  7. [thread 2] 假如剛開始執行io_context::run,讀取原子對象outstanding_work_;或者執行前一個任務以後的xxxx_cleanup析構函數
  8. [thread 2] handler B讀取handler A寫入的變量x

注意這個順序:handler A的寫操做->fence析構->atomic write;atomic read->handler B的讀操做。這剛好是Fence-atomic synchronization。保證了B可以看到A寫入x的數據。

(todo)咱們再來考慮fenced_block b(fenced_block::full);在Asio的應用。

// file: <boost/asio/detail/executor_op.hpp>
...
  static void do_complete(void* owner, Operation* base,
      const boost::system::error_code& /*ec*/,
      std::size_t /*bytes_transferred*/)
  {
...
    // Make the upcall if required.
    if (owner)
    {
      fenced_block b(fenced_block::half);
      BOOST_ASIO_HANDLER_INVOCATION_BEGIN(());
      boost_asio_handler_invoke_helpers::invoke(handler, handler);
      BOOST_ASIO_HANDLER_INVOCATION_END;
    }
  }
...
// file: <boost/asio/detail/std_fenced_block.hpp>
...
class std_fenced_block
  : private noncopyable
{
public:
  enum half_t { half };
  enum full_t { full };

  // Constructor for a half fenced block.
  explicit std_fenced_block(half_t)
  {
  }

  // Constructor for a full fenced block.
  explicit std_fenced_block(full_t)
  {
    std::atomic_thread_fence(std::memory_order_acquire);
  }

  // Destructor.
  ~std_fenced_block()
  {
    std::atomic_thread_fence(std::memory_order_release);
  }
};
...
// file: <boost/asio/impl/io_context.hpp>
template <typename Function, typename Allocator>
void io_context::executor_type::dispatch(
    BOOST_ASIO_MOVE_ARG(Function) f, const Allocator& a) const
{
  typedef typename decay<Function>::type function_type;

  // Invoke immediately if we are already inside the thread pool.
  if (io_context_.impl_.can_dispatch())
  {
    // Make a local, non-const copy of the function.
    function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(f));

    detail::fenced_block b(detail::fenced_block::full);
    boost_asio_handler_invoke_helpers::invoke(tmp, tmp);
    return;
  }

  // Allocate and construct an operation to wrap the function.
....
}
...

Concurrency Hints

io_context的構造函數接受一個名爲Concurrency Hints的參數,這個參數會影響io_context的併發特性。具體說明見官方,由此咱們能夠總結一下線程安全問題的分工:

  • Asio保證的是:

    1. 默認狀況下確保io_context內部狀態的線程安全,或者在其餘狀況下告知用戶如何確保這種安全性
    2. 實現strand(包括implicit strand)
  • 用戶的責任是:

    1. 保證操做在並行運行下的安全性,或者利用"strand"來避免操做並行運行
    2. 某些狀況下須要接受額外的限制來保證io_context內部狀態的安全
相關文章
相關標籤/搜索