主要有三部分組成,threadpool,scheduler,task。linux
三者關係如上圖示,pplx只着重實現了task部分功能,scheduler跟threadpool只是簡略實現。app
threadpool主要依賴boost.asio達到跨平臺的目標,cpprestsdk的 io操做同時也依賴這個threadpool。async
pplx提供了兩個版本的scheduler,分別是函數
linux_scheduler依賴boost.asio.threadpool。oop
window_schedule依賴win32 ThreadPool。this
默認的scheduler只是簡單地將work投遞到threadpool進行分派。spa
用戶能夠根據本身須要,實現scheduler_interface,提供複雜的調度。線程
每一個task關聯着一個_Task_impl實現體,一個_TaskCollection_t(喚醒事件,後繼任務隊列,這個隊列的任務之間的關係是並列的),還有一個_PPLTaskHandle代碼執行單元。設計
task,並行執行的單位任務。經過scheduler將代碼執行單元調度到線程去執行。3d
task提供相似activeobject模式的功能,能夠看做是一個future,經過get()同步阻塞等待執行結果。
task提供拓撲模型,經過then()建立後續task,並做爲後繼執行任務。注意的是每一個task能夠接受不限數量的then(),這些後繼任務之間並不串行。例 task().then().then()串行,(task1.then(), task1.then())並行。一個任務在執行完成時,會將結果傳遞給它的全部直接後繼執行任務。
此外,task拓撲除了then()函數外,還能夠在執行lambda中添加並行分支,而後能夠在後繼任務中同步這些分支。
也就是說後繼任務同步本來task拓撲外的task拓撲才能繼續執行。
1 auto fork0 =
2 task([]()->task<void>{
3 auto fork1 =
4 task([]()->task<void>{ 5 auto fork2 = 6 task([](){ 7 // do your fork2 work 8 9 }); 10 // do your fork1 work 11 12 return fork2; 13 }).then([](task<void>& frk2){ frk2.wait(); }); // will sync fork2 14 // do your fork0 work 15 16 return fork1; 17 }).then([](task<void>& frk1){ frk1.wait(); }); // will sync fork1 18 fork0.wait(); // sync fork1, fork2
上面的方式有一個問題,若是裏層的fork先完成,將不要阻塞線程,可是外層fork先完成就不得不阻塞線程等待內層fork完成。
因此能夠用when_all
task<task<void> >([]()->task<void> {
std::vector<task<void> > forks; forks.push_back( task([]() { /* do fork0 work */ }) ); forks.push_back( task([]() { /* do fork1 work */ }) ); forks.push_back( task([]() { /* do fork2 work */ }) ); forks.push_back( task([]() { /* do fork3 work */ }) ); return when_all(std::begin(forks), std::end(forks)); }).then([](task<void> forks){ forks.wait(); }).wait();
經過上面的方式,也能夠在lambda中,將其它task拓撲插入到你原來的task拓撲。
在include/cpprest/atreambuf.h實現的_do_while就是這樣一個例子
template<class F, class T = bool> pplx::task<T> _do_while(F func) { pplx::task<T> first = func(); return first.then([=](bool guard) -> pplx::task<T> { if (guard) return pplx::details::_do_while<F, T>(func); else return first; }); }
若是func不返回一個false,就會無限地在first.then()這兩任務拓撲結束前,再插入多一個first.then()任務拓撲,無限地順序地執行下去,如loop同樣地進行。
task結束,分兩種狀況,完成以及取消。取消執行,只能在執行代碼時經過拋出異常,task並無提供取消的接口。任務在執行過程當中拋出的異常,就會被task捕捉,並暫存異常,而後取消執行。異常在wait()時從新拋出。下面的時序分析能夠看到全過程 。
值得注意的是,PPL中task本來的設計是的有Async與Inline之分的。在_Task_impl_base::_Wait()有一小段註釋說明
// If this task was created from a Windows Runtime async operation, do not attempt to inline it. The // async operation will take place on a thread in the appropriate apartment Simply wait for the completed // event to be set.
也就是task除了由scheduler調度到線程池分派執行,還能夠強制在wait()函數內分派執行,後繼task也沒必要再次調度而能夠在當前線程繼續分派執行。可是pplx沒有實現
class _TaskCollectionImpl
{
...
void _Cancel() { // No cancellation support } void _RunAndWait() { // No inlining support yet _Wait(); }
如今再來比較 task<_ReturnType> 與 task< task<_ReturnType > >,當一個前驅任務拋出異常停止後,若是前驅任務是task<_ReturnType>的話,後續任務的lambda參數就是_ReturnType,由後續任務執行_Continue時代爲執行了前驅任務的get(),這時就會rethrow異常,而後就直接停止後續任務。可是若是後續任務的lambda參數是task<_ReturnType>的話,用戶的lambda就有機會處理前驅任務的錯誤異常。因此就有了 task_from_result<_ReturnType>跟task_from_exception兩個函數,將結果或異常轉化成task,以符合後續任務的lambda的參數要求。
下面是對task的時序分析。
開始的task建立_InitialTaskHandle, 一種只能用於始首的Handle執行單元。
經過then()添加的task,建立_ContinuationTaskHandle,(一種能夠入鏈的後繼執行單元),並暫存起來。
當一個任務在線程池中分派結束時,就會將全部經過then()添加到它結尾的後繼任務一次過向scheduler調度出去。
任務只能經過拋出異常從而本身停止執行,task並暫存異常(及錯誤信息)。
後繼任務被調度到線程池繼續分派執行。
這裏順便討論一個開銷,在window版本中,每一個task都有一個喚醒事件,使用事件內核對象,都要建立釋放一個內核對象,在高並行任務時,可能會消耗過多內核對象,消耗句柄數。
而且continuation後繼任務,在默認scheduler調度下,不會在同一線程中分派,全部後繼任務都會簡單投遞到線程池。由線程池去決定分派的線程。因此由then()串行起來的任務可能會由不一樣的線程順序分派,從而產生開銷。由於pplx並無實現 Inline功能,全部task都會視做Async從新調度到線程池。