1.主線程定義回調對象react
2.調用io object的操做windows
3.io object會另開線程,定義opertion op來執行操做,同時將回調對象加到op的do_complete上。進行操做網絡
4.完成操做加入完成隊列架構
5.io_service線程循環從完成隊列取事件,調用其事件對應的回調函數併發
還記得前面咱們在分析resolver的實現的時候,挖了一個關於operation的坑?爲了避免讓本身陷進去,如今來填吧;接下來咱們就來看看asio中的各類operation。app
和前面提到過的service的相似,這裏的operation也分爲兩大系:IOCP Enable和Disable系列。這裏咱們重點關注下圖中紅色部分表示的IOCP Enable系列operation。異步
從上圖能夠看到,全部IOCP Enable的operation,其基類都是struct OVERLAPPED結構,該結構是Win32進行交疊IO一個很是重要的結構,用以異步執行過程當中的參數傳遞。全部的operation直接從該結構繼承的結果,就是全部operation對象,能夠直接做爲OVERLAPPED結構在異步函數中進行傳遞。socket
例如在win_iocp_socket_service_base中,爲了啓動一個receive的異步操做, start_receive_op函數就直接把傳遞進來的operation指針做爲OVERLAPPED結構傳遞給::WSARecv函數,從而發起一個異步服務請求。async
void win_iocp_socket_service_base::start_receive_op(tcp win_iocp_socket_service_base::base_implementation_type& impl, WSABUF* buffers, std::size_t buffer_count, socket_base::message_flags flags, bool noop,operation* op) { update_cancellation_thread_id(impl); iocp_service_.work_started();
if (noop) iocp_service_.on_completion(op); else if (!is_open(impl)) iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); else { DWORD bytes_transferred = 0; DWORD recv_flags = flags; int result = ::WSARecv(impl.socket_, buffers, static_cast<DWORD>(buffer_count), &bytes_transferred, &recv_flags, op, 0); DWORD last_error = ::WSAGetLastError(); if (last_error == ERROR_NETNAME_DELETED) last_error = WSAECONNRESET; else if (last_error == ERROR_PORT_UNREACHABLE) last_error = WSAECONNREFUSED; if (result != 0 && last_error != WSA_IO_PENDING) iocp_service_.on_completion(op, last_error, bytes_transferred); else iocp_service_.on_pending(op); } } |
關於operation對象的建立、傳遞,以及完成handler的執行序列等,使用下圖能夠清晰的描述。
下表反映了Windows環境下,部分的異步請求所對應的服務、win函數、operation等信息:
異步請求 |
服務 |
start op |
Win32函數 |
對應operation |
ip::tcp::socket::async_connect |
win_iocp_socket_service |
start_connect_op() |
::connect |
reactive_socket_connect_op |
ip::tcp::socket::async_read_some |
start_receive_op() |
::WSARecv |
win_iocp_socket_recv_op |
|
ip::tcp::socket::async_receive |
start_receive_op() |
::WSARecv |
win_iocp_socket_recv_op |
|
ip::tcp::socket::async_write_some |
start_send_op() |
::WSASend |
win_iocp_socket_send_op |
|
ip::tcp::socket::async_send |
start_send_op() |
::WSASend |
win_iocp_socket_send_op |
|
ip::tcp::acceptor::async_accept |
start_accept_op() |
::AcceptEx |
win_iocp_socket_accept_op |
|
|
|
|
|
|
ip::tcp::resolver::async_resolve |
resolver_service |
start_resolve_op() |
::getaddrinfo |
resolve_op |
不知你是否注意到,在operation的類圖中,全部從operation繼承的子類,都定義了一個do_complete()函數,然而該函數聲明爲static,這又是爲什麼呢?
咱們以win_iocp_socket_recv_op爲例來進行說明。該類中的do_complete是這樣聲明的:
staticvoid do_complete(io_service_impl* owner,
operation* base,
const boost::system::error_code& result_ec,
std::size_t bytes_transferred)
該類的構造函數,又把此函數地址傳遞給父類win_iocp_operation去初始化父類成員,這兩個類的構造函數分別以下,請注意加粗代碼:
win_iocp_socket_recv_op ::
win_iocp_socket_recv_op(socket_ops::state_type state,
socket_ops::weak_cancel_token_type cancel_token,
const MutableBufferSequence& buffers, Handler& handler)
:operation(&win_iocp_socket_recv_op::do_complete),
state_(state),
cancel_token_(cancel_token),
buffers_(buffers),
handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))
{
}
win_iocp_operation ::win_iocp_operation(func_type func)
: next_(0),
func_(func)
{
reset();
}
至此,咱們明白,將do_complete聲明爲static,能夠方便獲取函數指針,並在父類中進行回調。那麼,不只要問,既然兩個類存在繼承關係,那麼爲什麼不將do_complete聲明爲虛函數呢?
再回頭看看這些類的最頂層基類,就會明白的。最頂層的OVERLAPPED基類,使得將operation對象做爲OVERLAPPED對象在異步函數中進行傳遞成爲可能;若是將do_complete聲明爲虛函數,則多數編譯器會在對象起始位置放置vptr,這樣就改變了內存佈局,從而不能再把operation對象直接做爲OVERLAPPED對象進行傳遞了。
固然,必定要用虛函數的話,也不是不可能,只是在傳遞對象的時候,就須要考慮到vptr的存在,這會有兩個方面的問題:一是進行多態類型轉換時,效率上的損失;二是各家編譯器對vtpr的實現各不相同,跨平臺的asio就須要進行多種適配,這無疑又過於煩躁了。因而做者就採起了最爲簡單有效的方式——用static函數來進行回調——簡單,就美。
在Windows NT環境下(IOCP Enabled),win_iocp_io_service表明着io_service,是整個asio的運轉核心。本節開始來分析該類的實現。
從類的命名也能夠看出,IOCP是該實現的核心。IOCP(IO Completion Port, IOCP)在windows上,能夠說是效率最高的異步IO模型了,他使用有限的線程,處理儘量多的併發IO請求。該模型雖然說能夠應用於各類IO處理,但目前應用較多的仍是網絡IO方面。
咱們都知道,在Window是環境下使用IOCP,基本上須要這樣幾個步驟:
全部這些線程調用GetQueuedCompletionStatus()函數等待一個完成端口事件的到來;
那麼,這些步驟,是如何分散到asio中的呢?來吧,先從完成端口建立開始。
如上所述,完成端口的建立,須要調用CreateIoCompletionPort()函數,在win_iocp_io_service的構造函數中,就有這樣的操做:
win_iocp_io_service::win_iocp_io_service( boost::asio::io_service& io_service, size_tconcurrency_hint) : boost::asio::detail::service_base<win_iocp_io_service>(io_service), iocp_(), outstanding_work_(0), stopped_(0), stop_event_posted_(0), shutdown_(0), dispatch_required_(0) { BOOST_ASIO_HANDLER_TRACKING_INIT;
iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0)))); if (!iocp_.handle) { DWORD last_error = ::GetLastError(); boost::system::error_code ec(last_error, boost::asio::error::get_system_category()); boost::asio::detail::throw_error(ec, "iocp"); } } |
win_iocp_io_service的構造函數,負責建立一個完成端口,並把此完成端口對象的句柄交給一個auto_handle進行管理——auto_handle的惟一用途,就是在對象析構時,調用::CloseHandle()把windows句柄資源關閉,從而保證不會資源泄露。
咱們在windows環境下,聲明一個boost::asio::io_service對象,其內部就建立了一個win_iocp_io_service的實例;所以,一個io_service對象就對應着一個完成端口對象——這也就能夠解釋,爲何全部的IO Object都須要一個io_service參數了——這樣,你們就好公用外面定義好的完成端口對象。
除了io_service對象會建立一個完成端口對象,事實上,在asio中,另一個service也會建立一個,這就是boost::asio::ip::resolver_service。該類對應的detail實現boost::asio::detail::resolver_service中,有一個數據成員是: io_service,這樣就一樣建立了一個完成端口對象:
namespace boost {
namespace asio {
namespace detail {
class resolver_service_base
{
...
protected:
// Private io_service used for performing asynchronous host resolution.
scoped_ptr<boost::asio::io_service> work_io_service_;
...
至於該完成端口的用途如何,咱們在後續部分再來講明——搽,又開始挖坑了。
在建立了io對象後,例如socket,就須要將此對象和完成端口對象綁定起來,以指示操做系統將該io對象上後續全部的完成事件發送到某個完成端口上,該操做一樣是由CreateIoCompletionPort()函數完成,只是所使用的參數不一樣。
在win_iocp_io_service中,這個操做由下面的代碼完成——請注意參數的差異:
boost::system::error_code win_iocp_io_service::register_handle( HANDLE handle, boost::system::error_code& ec) { if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0)== 0) { DWORD last_error = ::GetLastError(); ec = boost::system::error_code(last_error, boost::asio::error::get_system_category()); } else { ec = boost::system::error_code(); } return ec; } |
經過代碼搜索,咱們發現函數win_iocp_socket_service_base::do_open()內部調用了register_handle();該函數的做用是打開一個socket(其中調用了socket函數socket()去建立一個socket),也就是說,在打開一個socket後,就把該socket綁定到指定的完成端口上,這樣,後續的事件就會發送到完成端口了。
此外還有另外的和assign相關的兩個函數也調用了register_handle(),再也不貼出其代碼了。
IOCP要求至少有一個線程進行服務,也能夠有一堆線程;io_service早就爲這些線程準備好了服務例程,即io_service::run()函數。
void server::run() { // Create a pool of threads to run all of the io_services. std::vector<boost::shared_ptr<boost::thread> > threads; for (std::size_t i = 0; i < thread_pool_size_; ++i) { boost::shared_ptr<boost::thread> thread( new boost::thread( boost::bind(&boost::asio::io_service::run, &io_service_) ) ); threads.push_back(thread); }
// Wait for all threads in the pool to exit. for (std::size_t i = 0; i < threads.size(); ++i) threads[i]->join(); } |
因爲io_service::run()又是委託win_iocp_io_service::run()來實現的,咱們來看看後者的實現:
size_t win_iocp_io_service::run(boost::system::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { stop(); ec = boost::system::error_code(); return 0; }
win_iocp_thread_info this_thread; thread_call_stack::context ctx(this, this_thread);
size_t n = 0; while (do_one(true, ec)) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; } |
run()首先檢查是否有須要處理的操做,若是沒有,函數退出;win_iocp_io_service使用outstanding_work_來記錄當前須要處理的任務數。若是該數值不爲0,則委託do_one函數繼續處理——asio中,全部的髒活累活都在這裏處理了。
win_iocp_io_service::do_one函數較長,咱們只貼出核心代碼
size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec) { for (;;) { // Try to acquire responsibility for dispatching timers and completed ops. if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)?#1 { mutex::scoped_lock lock(dispatch_mutex_);
// Dispatch pending timers and operations. op_queue<win_iocp_operation> ops; ops.push(completed_ops_); timer_queues_.get_ready_timers(ops); post_deferred_completions(ops);?#2 update_timeout(); }
// Get the next operation from the queue. DWORD bytes_transferred = 0; dword_ptr_t completion_key = 0; LPOVERLAPPED overlapped = 0; ::SetLastError(0); BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, block ? gqcs_timeout : 0);?#3 DWORD last_error = ::GetLastError();
if (overlapped) { win_iocp_operation* op =static_cast<win_iocp_operation*>(overlapped);?#4 boost::system::error_code result_ec(last_error, boost::asio::error::get_system_category());
// We may have been passed the last_error and bytes_transferred in the // OVERLAPPED structure itself. if (completion_key == overlapped_contains_result) { result_ec = boost::system::error_code(static_cast<int>(op->Offset), *reinterpret_cast<boost::system::error_category*>(op->Internal)); bytes_transferred = op->OffsetHigh; }
// Otherwise ensure any result has been saved into the OVERLAPPED // structure. else { op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category()); op->Offset = result_ec.value(); op->OffsetHigh = bytes_transferred; }
// Dispatch the operation only if ready. The operation may not be ready // if the initiating function (e.g. a call to WSARecv) has not yet // returned. This is because the initiating function still wants access // to the operation's OVERLAPPED structure. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) { // Ensure the count of outstanding work is decremented on block exit. work_finished_on_block_exit on_exit = { this };?#5 (void)on_exit;?#6
op->complete(*this, result_ec, bytes_transferred);?#7 ec = boost::system::error_code(); return 1; } } else if (!ok) { ... |
作一下簡要說明:
- #1:變量dispatch_required_記錄了因爲資源忙,而沒有成功投遞的操做數;全部這些操做都記錄在隊列completed_ops_中;
- #2:將全部須要投遞的操做,投遞出去;至於什麼樣的操做須要投遞,什麼時候投遞,以及爲先前會投遞失敗,失敗後如何處理等,咱們後續說明——再次挖坑了。
- #3:IOCP的核心操做函數GetQueuedCompletionStatus()出現了。該函數致使線程在完成端口上進行等待,直到超時或者某個完成端口數據包到來。
- #4:注意這裏將 OVERLAPPED結構直接轉換爲 operation對象。相關內容在前面的operation:OVERLAPPED基類部分已經有說明。
- #5:該變量保證在操做完成,return以後,win_iocp_io_service對象所記錄的任務數outstanding_work_會自動減1——是啊,辛辛苦苦作的事兒,能不記錄下來嘛!
- #6:這一行從功能上講沒有什麼特別的用途;不過有了這一行,能夠抑制有些編譯器針對 #5所聲明的變量沒有被使用的編譯器警告;
- #7:調用operation對象的complete()函數,從而調用到異步操做所設定的回調函數。具體流程參考operation:執行流程。
上述的線程函數,會在GetQueuedCompletionStatus()函數上進行等待,直到超時或者有完成端口數據包到來;
完成端口數據包,有兩個來源:一個是用戶所請求的異步操做完成,異步服務執行者(這裏是操做系統)向該完成端口投遞完成端口數據包;另一種狀況是,用戶本身使用IOCP的另一個核心函數PostQueuedCompletionStatus()向完成端口投遞數據包;
通常的異步操做請求,是不須要用戶本身主動向完成端口投遞數據的,例如async_read, asyn_write等操做;
有另一些操做,因爲沒有對應或者做者並無採用支持OVERLAPPED IO操做的Win32函數,就須要實現者本身管理完成事件,並進行完成端口數據包的投遞,好比:
另外還有一些io_service提供的操做,例如請求io_service執行代爲執行指定handler的操做:
全部這些須要本身投遞完成端口數據包的操做,基本上都是這樣一個投遞流程:
OK,至此,基本分析完了operation的投遞,總數填了一個前面挖下的坑。
前面說過,Resolver本身會建立一個IOCP,爲何會這樣呢?因爲Win32下面沒有提供對應於地址解析的overlapped版本的函數,爲了實現async_resolve操做,做者本身實現了這樣一個異步服務。在resolver_service內部,有一個io_service數據成員,該數據成員建立了一個IOCP;除此以外,該service內部還啓動一個工做線程來執行io_service::run(),使用此線程來模擬異步服務。
使用resolver進行async_resolve的詳細過程以下:
Main Thread (IOCP#1) |
Resolver Thread (IOCP #2) |
|
|
|
|
1.構建主io_service對象, IOCP#1被建立 |
|
|
|
|
|
2.構建 resolver對象, IOCP#2被建立,同時該resolver持有主io_service的引用 |
|
|
|
|
|
3.發起異步調用:resolver.async_resolve() |
|
|
|
|
|
4. resolve_op被建立 |
|
|
|
|
|
5. Resolver線程啓動,主線程開始等待 |
|
|
|
|
|
|
|
6.開始運行,激活等待事件,並在 IOCP#2上開始等待 |
|
|
|
7.線程恢復執行;將op投遞到 IOCP#2 |
|
|
|
|
|
|
|
8.執行op->do_complete()操做,地址解析完成後,將op再回投給IOCP#1 |
|
|
|
9. do_one()獲得從Resolver線程投遞迴來的op,開始執行op->do_complete()操做,此時回調async_resolve所設置的handler |
|
|
|
|
|
10.結束 |
|
|
請注意step8和 step9,執行一樣一個op->do_complete()函數,爲何操做不同呢?看其實現就知道,該函數內部,會判斷執行此函數時的owner,若是owner是主io_service對象,則說明是在主線程中執行,此時進行handler的回調;不然就說明在工做線程中,就去執行真正的地址解析工做;
針對socket上提交的異步請求,可使用cancel()函數來取消掉該socket上全部沒執行的異步請求。
使用該函數,在Windows Vista(不含)之前的版本,有幾點須要注意:
針對這些問題,另外的替代方案是:
在windows vista及後續版本中,cancel()函數在內部調用Win32函數 CancelIoEx(),該函數能夠取消來自任何線程的異步操做請求,不存在上述問題。
須要注意的是,即便異步請求被取消了,所指定的handler也會被執行,只是傳遞的error code 爲:boost::asio::error::operation_aborted。
該service提供了windows下全部socket相關的功能,是asio在windows環境中一個很是重要的角色,他所提供的函數主要分下面兩類:
不過關於該類的實現前面已經作了較多的涉及,再也不單獨詳述了。
如今咱們已經把Windows環境下所涉及到的關鍵部件都涉及到了,此刻咱們再回過頭來,從高層俯瞰一下asio的架構,看看是否會有不同的感覺呢?事實上,asio的文檔用下面的圖來講明asio的高層架構——前攝器模式,咱們也從這個圖開始:
呵呵,其實這張圖,從一開始就是爲了表達Proactor(前攝器)模式的,基本上它和asio沒半毛錢關係,只不過asio既支持同步IO,又支持異步IO,在異步IO部分,是參照Proactor模式來實現的。下面咱們來分別說說asio的前攝器模式中的各個組件:
僅僅從asio使用者的角度看,高層的stream_socket_service類就是一個這樣的處理器,由於從tcp::socket發送的異步操做都是由其完成處理的。可是從真正實現的角度看,這樣的異步操做在Windwos上,大部分由操做系統負責完成的,另一部分由asio本身負責處理,如resolver_service,所以Windows操做系統和asio一塊兒組成了異步操做處理器。
在Windows平臺上,io_service類經過win_iocp_io_service類的do_one()函數把每一個異步操做所設定的completion handler調用起來。
在Windows上,asio的完成事件隊列由操做系統負責管理;只不過該隊列中的數據有兩個來源,一個是Windows內部,另一個是asio中本身PostQueuedCompletionStatus()所提交的事件。
在Windows上,這一功能也是由操做系統完成的,具體來講,我認爲是由GetQueuedCompletionStatus完成的,而該函數時由do_one()調用的,所以,從高層的角度來看,這個分離器,也是由io_service負責的。
基於上述信息,咱們重繪practor模式架構圖以下: