boost::asio的io_service處理過程

1.主線程定義回調對象react

2.調用io object的操做windows

3.io object會另開線程,定義opertion op來執行操做,同時將回調對象加到op的do_complete上。進行操做網絡

4.完成操做加入完成隊列架構

5.io_service線程循環從完成隊列取事件,調用其事件對應的回調函數併發

 

 

 

 

 

Operation

還記得前面咱們在分析resolver的實現的時候,挖了一個關於operation的坑?爲了避免讓本身陷進去,如今來填吧;接下來咱們就來看看asio中的各類operationapp

 

和前面提到過的service的相似,這裏的operation也分爲兩大系:IOCP EnableDisable系列。這裏咱們重點關注下圖中紅色部分表示的IOCP Enable系列operation異步

 


  

OVERLAPPED基類

從上圖能夠看到,全部IOCP Enableoperation,其基類都是struct OVERLAPPED結構,該結構是Win32進行交疊IO一個很是重要的結構,用以異步執行過程當中的參數傳遞。全部的operation直接從該結構繼承的結果,就是全部operation對象,能夠直接做爲OVERLAPPED結構在異步函數中進行傳遞。socket

 

例如在win_iocp_socket_service_base中,爲了啓動一個receive的異步操做, start_receive_op函數就直接把傳遞進來的operation指針做爲OVERLAPPED結構傳遞給::WSARecv函數,從而發起一個異步服務請求。async

void win_iocp_socket_service_base::start_receive_op(tcp

   win_iocp_socket_service_base::base_implementation_type& impl,

   WSABUF* buffers, std::size_t buffer_count,

   socket_base::message_flags flags, bool noop,operation* op)

{

 update_cancellation_thread_id(impl);

 iocp_service_.work_started();

 

 if (noop)

   iocp_service_.on_completion(op);

 else if (!is_open(impl))

   iocp_service_.on_completion(op, boost::asio::error::bad_descriptor);

 else

 {

   DWORD bytes_transferred = 0;

   DWORD recv_flags = flags;

   int result = ::WSARecv(impl.socket_, buffers, static_cast<DWORD>(buffer_count),

       &bytes_transferred, &recv_flags, op, 0);

   DWORD last_error = ::WSAGetLastError();

   if (last_error == ERROR_NETNAME_DELETED)

     last_error = WSAECONNRESET;

   else if (last_error == ERROR_PORT_UNREACHABLE)

     last_error = WSAECONNREFUSED;

   if (result != 0 && last_error != WSA_IO_PENDING)

     iocp_service_.on_completion(op, last_error, bytes_transferred);

   else

     iocp_service_.on_pending(op);

 }

}

 

執行流程

關於operation對象的建立、傳遞,以及完成handler的執行序列等,使用下圖能夠清晰的描述。

 


  

下表反映了Windows環境下,部分的異步請求所對應的服務、win函數、operation等信息:

 

異步請求

服務

start op

Win32函數

對應operation

ip::tcp::socket::async_connect

win_iocp_socket_service

start_connect_op()

::connect

reactive_socket_connect_op

ip::tcp::socket::async_read_some

start_receive_op()

::WSARecv

win_iocp_socket_recv_op

ip::tcp::socket::async_receive

start_receive_op()

::WSARecv

win_iocp_socket_recv_op

ip::tcp::socket::async_write_some

start_send_op()

::WSASend

win_iocp_socket_send_op

ip::tcp::socket::async_send

start_send_op()

::WSASend

win_iocp_socket_send_op

ip::tcp::acceptor::async_accept

start_accept_op()

::AcceptEx

win_iocp_socket_accept_op

 

 

 

 

 

ip::tcp::resolver::async_resolve

resolver_service

start_resolve_op()

::getaddrinfo

resolve_op

 

 

靜態的do_complete

不知你是否注意到,在operation的類圖中,全部從operation繼承的子類,都定義了一個do_complete()函數,然而該函數聲明爲static,這又是爲什麼呢?

 

咱們以win_iocp_socket_recv_op爲例來進行說明。該類中的do_complete是這樣聲明的:

     staticvoid do_complete(io_service_impl* owner,

         operation* base,

         const boost::system::error_code& result_ec,

         std::size_t bytes_transferred)

 

該類的構造函數,又把此函數地址傳遞給父類win_iocp_operation去初始化父類成員,這兩個類的構造函數分別以下,請注意加粗代碼:

win_iocp_socket_recv_op ::

win_iocp_socket_recv_op(socket_ops::state_type state,

     socket_ops::weak_cancel_token_type cancel_token,

     const MutableBufferSequence& buffers, Handler& handler)

   :operation(&win_iocp_socket_recv_op::do_complete),

         state_(state),

         cancel_token_(cancel_token),

         buffers_(buffers),

         handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))

   {

   }

 

win_iocp_operation ::win_iocp_operation(func_type func)

         : next_(0),

         func_(func)

   {

         reset();

   }

 

至此,咱們明白,將do_complete聲明爲static,能夠方便獲取函數指針,並在父類中進行回調。那麼,不只要問,既然兩個類存在繼承關係,那麼爲什麼不將do_complete聲明爲虛函數呢?

 

再回頭看看這些類的最頂層基類,就會明白的。最頂層的OVERLAPPED基類,使得將operation對象做爲OVERLAPPED對象在異步函數中進行傳遞成爲可能;若是將do_complete聲明爲虛函數,則多數編譯器會在對象起始位置放置vptr,這樣就改變了內存佈局,從而不能再把operation對象直接做爲OVERLAPPED對象進行傳遞了。

 

固然,必定要用虛函數的話,也不是不可能,只是在傳遞對象的時候,就須要考慮到vptr的存在,這會有兩個方面的問題:一是進行多態類型轉換時,效率上的損失;二是各家編譯器對vtpr的實現各不相同,跨平臺的asio就須要進行多種適配,這無疑又過於煩躁了。因而做者就採起了最爲簡單有效的方式——用static函數來進行回調——簡單,就美。

 

win_iocp_io_service的實現

Windows NT環境下(IOCP Enabled),win_iocp_io_service表明着io_service,是整個asio的運轉核心。本節開始來分析該類的實現。

 

從類的命名也能夠看出,IOCP是該實現的核心。IOCPIO Completion Port IOCP)在windows上,能夠說是效率最高的異步IO模型了,他使用有限的線程,處理儘量多的併發IO請求。該模型雖然說能夠應用於各類IO處理,但目前應用較多的仍是網絡IO方面。

 

咱們都知道,在Window是環境下使用IOCP,基本上須要這樣幾個步驟:

  1. 使用Win函數CreateIoCompletionPort()建立一個完成端口對象;
  2. 建立一個IO對象,如用於listensocket對象;
  3. 再次調用CreateIoCompletionPort()函數,分別在參數中指定第二步建立的IO對象和第一步建立的完成端口對象。因爲指定了這兩個參數,這一次的調用,只是告訴系統,後續該IO對象上全部的完成事件,都投遞到綁定的完成端口上。
  4. 建立一個線程或者線程池,用以服務完成端口事件;

全部這些線程調用GetQueuedCompletionStatus()函數等待一個完成端口事件的到來;

  1. 進行異步調用,例如WSASend()等操做。
  2. 在系統執行完異步操做並把事件投遞到端口上,或者客戶本身調用了PostQueuedCompletionStatus()函數,使得在完成端口上等待的一個線程甦醒,執行後續的服務操做。

 

那麼,這些步驟,是如何分散到asio中的呢?來吧,先從完成端口建立開始。

完成端口的建立

如上所述,完成端口的建立,須要調用CreateIoCompletionPort()函數,在win_iocp_io_service的構造函數中,就有這樣的操做:

win_iocp_io_service::win_iocp_io_service(

   boost::asio::io_service& io_service, size_tconcurrency_hint)

 : boost::asio::detail::service_base<win_iocp_io_service>(io_service),

   iocp_(),

   outstanding_work_(0),

   stopped_(0),

   stop_event_posted_(0),

   shutdown_(0),

   dispatch_required_(0)

{

 BOOST_ASIO_HANDLER_TRACKING_INIT;

 

 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,

     static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));

 if (!iocp_.handle)

 {

   DWORD last_error = ::GetLastError();

   boost::system::error_code ec(last_error,

       boost::asio::error::get_system_category());

   boost::asio::detail::throw_error(ec, "iocp");

 }

}

 

win_iocp_io_service的構造函數,負責建立一個完成端口,並把此完成端口對象的句柄交給一個auto_handle進行管理——auto_handle的惟一用途,就是在對象析構時,調用::CloseHandle()windows句柄資源關閉,從而保證不會資源泄露。

 

咱們在windows環境下,聲明一個boost::asio::io_service對象,其內部就建立了一個win_iocp_io_service的實例;所以,一個io_service對象就對應着一個完成端口對象——這也就能夠解釋,爲何全部的IO Object都須要一個io_service參數了——這樣,你們就好公用外面定義好的完成端口對象。

 

除了io_service對象會建立一個完成端口對象,事實上,在asio中,另一個service也會建立一個,這就是boost::asio::ip::resolver_service。該類對應的detail實現boost::asio::detail::resolver_service中,有一個數據成員是: io_service,這樣就一樣建立了一個完成端口對象:

    namespace boost {

namespace asio {

namespace detail {

 

class resolver_service_base

{

...

protected:

// Private io_service used for performing asynchronous host resolution.

            scoped_ptr<boost::asio::io_service> work_io_service_;

...

 

至於該完成端口的用途如何,咱們在後續部分再來講明——搽,又開始挖坑了。

完成端口的綁定

在建立了io對象後,例如socket,就須要將此對象和完成端口對象綁定起來,以指示操做系統將該io對象上後續全部的完成事件發送到某個完成端口上,該操做一樣是由CreateIoCompletionPort()函數完成,只是所使用的參數不一樣。

 

win_iocp_io_service中,這個操做由下面的代碼完成——請注意參數的差異:

boost::system::error_code win_iocp_io_service::register_handle(

   HANDLE handle, boost::system::error_code& ec)

{

 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0)== 0)

 {

   DWORD last_error = ::GetLastError();

   ec = boost::system::error_code(last_error,

       boost::asio::error::get_system_category());

 }

 else

 {

   ec = boost::system::error_code();

 }

 return ec;

}

 

經過代碼搜索,咱們發現函數win_iocp_socket_service_base::do_open()內部調用了register_handle();該函數的做用是打開一個socket(其中調用了socket函數socket()去建立一個socket),也就是說,在打開一個socket後,就把該socket綁定到指定的完成端口上,這樣,後續的事件就會發送到完成端口了。

 

此外還有另外的和assign相關的兩個函數也調用了register_handle(),再也不貼出其代碼了。

 

線程函數

IOCP要求至少有一個線程進行服務,也能夠有一堆線程;io_service早就爲這些線程準備好了服務例程,即io_service::run()函數。

  • 若是應用只打算使用一個線程進行服務,那麼在主線程中準備好了異步請求後,調用io_service::run()便可。注意,必須先發起一個異步請求,而後才能調用run()。參考一下run()的實現就會明白。
  • 若是打算用多個線程進行服務,能夠建立多個線程,指定io_service::run()做爲線程函數便可。一個最簡單的示例是:

void server::run()

{

 // Create a pool of threads to run all of the io_services.

 std::vector<boost::shared_ptr<boost::thread> > threads;

 for (std::size_t i = 0; i < thread_pool_size_; ++i)

 {

boost::shared_ptr<boost::thread>

   thread(

       new boost::thread(

               boost::bind(&boost::asio::io_service::run, &io_service_)

           )

       );

   threads.push_back(thread);

 }

 

 // Wait for all threads in the pool to exit.

 for (std::size_t i = 0; i < threads.size(); ++i)

   threads[i]->join();

}

 

因爲io_service::run()又是委託win_iocp_io_service::run()來實現的,咱們來看看後者的實現:

size_t win_iocp_io_service::run(boost::system::error_code& ec)

{

 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)

 {

   stop();

   ec = boost::system::error_code();

   return 0;

 }

 

 win_iocp_thread_info this_thread;

 thread_call_stack::context ctx(this, this_thread);

 

 size_t n = 0;

  while (do_one(true, ec))

   if (n != (std::numeric_limits<size_t>::max)())

     ++n;

 return n;

}

 

run()首先檢查是否有須要處理的操做,若是沒有,函數退出;win_iocp_io_service使用outstanding_work_來記錄當前須要處理的任務數。若是該數值不爲0,則委託do_one函數繼續處理——asio中,全部的髒活累活都在這裏處理了。

 

win_iocp_io_service::do_one函數較長,咱們只貼出核心代碼

size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec)

{

 for (;;)

 {

   // Try to acquire responsibility for dispatching timers and completed ops.

   if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)?#1

   {

     mutex::scoped_lock lock(dispatch_mutex_);

 

     // Dispatch pending timers and operations.

     op_queue<win_iocp_operation> ops;

     ops.push(completed_ops_);

     timer_queues_.get_ready_timers(ops);

     post_deferred_completions(ops);?#2

     update_timeout();

   }

 

   // Get the next operation from the queue.

   DWORD bytes_transferred = 0;

   dword_ptr_t completion_key = 0;

   LPOVERLAPPED overlapped = 0;

::SetLastError(0);

   BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,

       &completion_key, &overlapped, block ? gqcs_timeout : 0);?#3

   DWORD last_error = ::GetLastError();

 

   if (overlapped)

   {

     win_iocp_operation* op =static_cast<win_iocp_operation*>(overlapped);?#4

     boost::system::error_code result_ec(last_error,

         boost::asio::error::get_system_category());

 

     // We may have been passed the last_error and bytes_transferred in the

     // OVERLAPPED structure itself.

     if (completion_key == overlapped_contains_result)

     {

       result_ec = boost::system::error_code(static_cast<int>(op->Offset),

           *reinterpret_cast<boost::system::error_category*>(op->Internal));

       bytes_transferred = op->OffsetHigh;

     }

 

     // Otherwise ensure any result has been saved into the OVERLAPPED

     // structure.

     else

     {

       op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());

       op->Offset = result_ec.value();

       op->OffsetHigh = bytes_transferred;

     }

 

     // Dispatch the operation only if ready. The operation may not be ready

     // if the initiating function (e.g. a call to WSARecv) has not yet

     // returned. This is because the initiating function still wants access

     // to the operation's OVERLAPPED structure.

     if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)

     {

       // Ensure the count of outstanding work is decremented on block exit.

       work_finished_on_block_exit on_exit = { this };?#5

       (void)on_exit;?#6

 

       op->complete(*this, result_ec, bytes_transferred);?#7

       ec = boost::system::error_code();

       return 1;

     }

   }

   else if (!ok)

{

...

 

作一下簡要說明:

-  #1變量dispatch_required_記錄了因爲資源忙,而沒有成功投遞的操做數;全部這些操做都記錄在隊列completed_ops_中;

-  #2將全部須要投遞的操做,投遞出去;至於什麼樣的操做須要投遞,什麼時候投遞,以及爲先前會投遞失敗,失敗後如何處理等,咱們後續說明——再次挖坑了。

-  #3IOCP的核心操做函數GetQueuedCompletionStatus()出現了。該函數致使線程在完成端口上進行等待,直到超時或者某個完成端口數據包到來。

-  #4注意這裏將 OVERLAPPED結構直接轉換爲 operation對象。相關內容在前面的operation:OVERLAPPED基類部分已經有說明。

-  #5該變量保證在操做完成,return以後,win_iocp_io_service對象所記錄的任務數outstanding_work_會自動減1——是啊,辛辛苦苦作的事兒,能不記錄下來嘛!

-  #6這一行從功能上講沒有什麼特別的用途;不過有了這一行,能夠抑制有些編譯器針對 #5所聲明的變量沒有被使用的編譯器警告;

-  #7調用operation對象的complete()函數,從而調用到異步操做所設定的回調函數。具體流程參考operation:執行流程

 

任務投遞

上述的線程函數,會在GetQueuedCompletionStatus()函數上進行等待,直到超時或者有完成端口數據包到來;

 

完成端口數據包,有兩個來源:一個是用戶所請求的異步操做完成,異步服務執行者(這裏是操做系統)向該完成端口投遞完成端口數據包;另一種狀況是,用戶本身使用IOCP的另一個核心函數PostQueuedCompletionStatus()向完成端口投遞數據包;

 

通常的異步操做請求,是不須要用戶本身主動向完成端口投遞數據的,例如async_read, asyn_write等操做;

 

有另一些操做,因爲沒有對應或者做者並無採用支持OVERLAPPED IO操做的Win32函數,就須要實現者本身管理完成事件,並進行完成端口數據包的投遞,好比:

  • async_resolve:因爲系統沒有提供對應的OVERLAPPED IO操做,須要實現者本身管理,因此其本身進行投遞
  • async_connect:因爲做者並無採用支持OVERLAPPED IOConnectEx()版本的鏈接函數,而是採用了標準的socket函數connect()進行鏈接,因此也須要本身進行投遞

 

另外還有一些io_service提供的操做,例如請求io_service執行代爲執行指定handler的操做:

  • dispatch(handler)
  • post(handler)

 

全部這些須要本身投遞完成端口數據包的操做,基本上都是這樣一個投遞流程:

  • 調用win_iocp_io_service::post_immediate_completion(op)
    • 調用work_started()outstanding_work_ 1
    • 調用post_deferred_completion(op)
      • 因爲自行管理,主動將op->ready_置爲 1,代表op就緒
      • 調用PostQueuedCompletionStatus(op)進行投遞
      • 若是投遞失敗,則把該op放置到等待dispatch的隊列completed_ops_ 中,待下一次do_one()執行時,再次投遞

 

OK,至此,基本分析完了operation的投遞,總數填了一個前面挖下的坑。

Resolver本身的IOCP

前面說過,Resolver本身會建立一個IOCP,爲何會這樣呢?因爲Win32下面沒有提供對應於地址解析的overlapped版本的函數,爲了實現async_resolve操做,做者本身實現了這樣一個異步服務。在resolver_service內部,有一個io_service數據成員,該數據成員建立了一個IOCP;除此以外,該service內部還啓動一個工做線程來執行io_service::run(),使用此線程來模擬異步服務。

 

使用resolver進行async_resolve的詳細過程以下:

Main Thread (IOCP#1)

 

Resolver Thread (IOCP #2)

 

 

 

1.構建io_service對象, IOCP#1被建立

 

 

 

 

 

2.構建 resolver對象, IOCP#2被建立,同時該resolver持有io_service的引用

 

 

 

 

 

3.發起異步調用:resolver.async_resolve()

 

 

 

 

 

4. resolve_op被建立

 

 

 

 

 

5. Resolver線程啓動,主線程開始等待

 

 

 

 

 

 

 

6.開始運行,激活等待事件,並在 IOCP#2上開始等待

 

 

 

7.線程恢復執行;將op投遞到 IOCP#2

 

 

 

 

 

 

 

8.執行op->do_complete()操做,地址解析完成後,將op再回投給IOCP#1

 

 

 

9. do_one()獲得Resolver線程投遞迴來的op開始執行op->do_complete()操做,此時回調async_resolve所設置的handler

 

 

 

 

 

10.結束

 

 

 

請注意step8 step9執行一樣一個op->do_complete()函數,爲何操做不同呢?看其實現就知道,該函數內部,會判斷執行此函數時的owner,若是owner是主io_service對象,則說明是在主線程中執行,此時進行handler的回調;不然就說明在工做線程中,就去執行真正的地址解析工做;

任務的取消

針對socket上提交的異步請求,可使用cancel()函數來取消掉該socket上全部沒執行的異步請求。

 

使用該函數,在Windows Vista(不含)之前的版本,有幾點須要注意:

  • 須要定義BOOST_ASIO_ENABLE_CANCELIO來啓用該功能
  • cancel()函數在內部調用Win32函數 CancelIo()
  • 該函數只能取消來自當前線程的異步請求
  • 對於正在執行的異步操做,則要看異步服務提供者是如何實現的了,可能會被取消,也可能不會;

針對這些問題,另外的替代方案是:

  • Window是上定義BOOST_ASIO_DISABLE_IOCP來禁用IOCP,使用老式的reactor模式(及select IO)。
  • 或者使用close()來關閉socket,如此一來全部未被執行的請求則都會被取消掉。

 

windows vista及後續版本中,cancel()函數在內部調用Win32函數 CancelIoEx(),該函數能夠取消來自任何線程的異步操做請求,不存在上述問題。

 

須要注意的是,即便異步請求被取消了,所指定的handler也會被執行,只是傳遞的error code 爲:boost::asio::error::operation_aborted

win_iocp_socket_service實現

service提供了windows下全部socket相關的功能,是asiowindows環境中一個很是重要的角色,他所提供的函數主要分下面兩類:

  • XXXXX(), async_XXXXX()對某個操做的同步、異步函數接口;主要被上層服務調用;例如connect(), async_connect()等;
  • start_XXXXX_op() :windows發出對應的異步操做請求,例如WSARecv

 

不過關於該類的實現前面已經作了較多的涉及,再也不單獨詳述了。

前攝器模式

如今咱們已經把Windows環境下所涉及到的關鍵部件都涉及到了,此刻咱們再回過頭來,從高層俯瞰一下asio的架構,看看是否會有不同的感覺呢?事實上,asio的文檔用下面的圖來講明asio的高層架構——前攝器模式,咱們也從這個圖開始:

boost.asio 學習筆記05——asio的windows實現 - 地線 - 別再讓虛假消息充斥這本已混亂的世界

 

 

呵呵,其實這張圖,從一開始就是爲了表達Proactor(前攝器)模式的,基本上它和asio沒半毛錢關係,只不過asio既支持同步IO,又支持異步IO,在異步IO部分,是參照Proactor模式來實現的。下面咱們來分別說說asio的前攝器模式中的各個組件:

  • Initiator,(初始化器?)中文名還真不清楚,不過其實就是客戶代碼,甚至能夠簡單理解到main函數,全部的是是非非,都是從這兒開始的。
  • Asynchronous Operation,定義的一系列異步操做,對應到Windows平臺,諸如AcceptExWSASendWSARecv等函數。在asio中,這些函數封裝在win_iocp_socket_service resolver_service類中。
  • Asynchronous Operation Processor,異步操做處理器,他負責執行異步操做,並在操做完成後,把完成事件投放到完成事件隊列上。

僅僅從asio使用者的角度看,高層的stream_socket_service類就是一個這樣的處理器,由於從tcp::socket發送的異步操做都是由其完成處理的。可是從真正實現的角度看,這樣的異步操做在Windwos上,大部分由操做系統負責完成的,另一部分由asio本身負責處理,如resolver_service,所以Windows操做系統和asio一塊兒組成了異步操做處理器。

  • Completion Handler,完成事件處理器;這是由用戶本身定義的一個函數(函數對象),在異步操做完成後,由前攝器負責把該函數調用起來。

Windows平臺上,io_service類經過win_iocp_io_service類的do_one()函數把每一個異步操做所設定的completion handler調用起來。

  • Completion Event Queue完成事件隊列,存儲由異步操做處理器發送過來的完成事件,當異步事件多路分離器將其中一個事件取走以後,該事件從隊列中刪除;

Windows上,asio的完成事件隊列由操做系統負責管理;只不過該隊列中的數據有兩個來源,一個是Windows內部,另一個是asio中本身PostQueuedCompletionStatus()所提交的事件。

  • Asynchronous Event Demultiplexer,異步事件多路分離器,他的做用就是在完成事件隊列上等待,一旦有事件到來,他就把該事件返回給調用者。

Windows上,這一功能也是由操做系統完成的,具體來講,我認爲是由GetQueuedCompletionStatus完成的,而該函數時由do_one()調用的,所以,從高層的角度來看,這個分離器,也是由io_service負責的。

  • Proactor,前攝器,負責調度異步事件多路分離器去幹活,並在異步操做完成時,調度所對應的Completion Handler。在asio中,這部分由io_service來作,具體Windows就是win_iocp_io_service

 

基於上述信息,咱們重繪practor模式架構圖以下:

 

 

  

相關文章
相關標籤/搜索