TaskCpp是c++11開發的一個跨平臺的並行task庫,它的設計思路來源於微軟的並行計算庫ppl和intel的並行計算庫tbb,關於ppl和tbb我在前面有介紹。既然已經有了這兩個大公司開發的並行計算庫,我爲何還要開發本身的並行計算庫。有兩個緣由:html
TaskCpp在接口設計上儘可能和ppl保持一致,由於我以爲ppl的接口很好很強大。所以,TaskCpp的接口用法和語義和ppl基本是一致的。由於TaskCpp是一個輕量級的task庫,總共也不過三百多行代碼,本着簡單夠用的原則,只提供了一些和ppl相似的經常使用用法, 有些不經常使用的特性不考慮支持。好比,不支持任務的取消,由於加入任務的取消會致使增長不少複雜性,而實際中用得比較少,因此不考慮支持,夠用就好。java
須要支持c++11的編譯器,建議編譯器:linux
使用TaskCpp僅僅須要包含頭文件便可,在程序中使用只須要包含#include <TaskCpp.h>和少許的boost頭文件便可。c++
TaskCpp提供一下功能:算法
Task會建立一個異步操做,這個異步操做發起方式是延遲加載方式發起的,即在調用Task的Wait或者Get時才真正發起異步操做。Task能夠經過std::function或者lambda表達式去建立,不支持直接原生函數建立,若是要用原生函數須要先經過lambda或者std::function包裝一下。Task的Wait接口只是等待異步操做結束。Task的Get接口接收參數並等待異步操做結束並返回結果。PPL中的get接口是不能接收參數的,TaskCpp的Get接口是能夠接受任意參數的,更靈活一點,算是較PPL的一個小優勢吧。下面是Task的基本用法:windows
#include <TaskCpp.h> using namespace Cosmos; void TestTask() { Task<void()> task([]{cout << 1 << endl; }); task.Wait(); Task<void()> task1 = []{cout << 1 << endl; }; task1.Wait(); Task<int()> task2 = []{cout << 1 << endl; return 1; }; cout << task2.Get() << endl; Task<int(int)> task3 = [](int i){cout << i << endl; return i; }; cout << task3.Get(3) << endl; }
WhenAll保證一個任務集合中全部的任務完成。WhenAll函數會生成一個任務,該任務可在完成一組任務以後完成。 此函數可返回一個std::vector 對象,該對象包含集合中每一個任務的結果。 如下基本示例使用WhenAll建立表示完成其餘三個任務的任務。下面是WhenAll的基本用法:併發
void PrintThread() { cout << std::this_thread::get_id() << endl; std::this_thread::sleep_for(std::chrono::milliseconds(5)); } void TestWhenAll() { vector<Task<int()>> v = { Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }), Task<int()>([]{PrintThread(); return 2; }), Task<int()>([]{PrintThread(); return 3; }), Task<int()>([]{PrintThread(); return 4; }) }; cout << "when all " << endl; WhenAll(v).Get(); }
注意:WhenAll是非阻塞的,它只是建立一個任務,在Wait或Get時才發起異步操做。傳遞給WhenAll的任務必須是統一的。 換言之,它們必須都返回相同類型。框架
WhenAny在任務集合中任意一個任務結束以後就返回。函數會生成一個任務,該任務可在完成一組任務的第一個任務以後完成。 此函數可返回一個 std::pair 對象,該對象包含已完成任務的結果和集合中任務的索引。下面是WhenAny的基本用法:異步
void TestWhenAny() { vector<Task<int()>> v = { Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }), Task<int()>([]{PrintThread(); return 2; }), Task<int()>([]{PrintThread(); return 3; }), Task<int()>([]{PrintThread(); return 4; }) }; cout << "when any " << endl; WhenAny(v).Then([](std::pair<int, int>& result) { cout << " index " << result.first << " result " << result.second << endl; return result.second; }).Then([](int result){cout << "any result: " << result << endl; }).Get(); }
注意:WhenAny是非阻塞的,它只是建立一個任務,在Wait或Get時才發起異步操做。傳遞給WhenAny的任務必須是統一的。 換言之,它們必須都返回相同類型。ide
TaskGroup能夠並行的處理一組任務,TaskGroup能夠接受多個task或者function,TaskGroup的Wait等待全部任務完成。下面是TaskGroup的基本用法:
void TestTaskGroup() { Task<int()> t1([]{PrintThread(); return 1; }); Task<double()> t2([]{PrintThread(); return 2.123; }); Task<void()> t3([]{PrintThread(); }); Task<string()> t4([]{PrintThread(); return "ok"; }); TaskGroup group; group.Run(t1); group.Run(t2); group.Run(t3); group.Run(t4); //若是你以爲這樣一個一個Run加入任務,你也能夠一塊兒Run group.Run(t1, t2, t3, []{PrintThread(); return 1; }); group.Wait(); }
PPL的task_group的任務只能是void()形式的,TaskCpp容許一些簡單類型的任務如int()、double()、string()等,其實任務的返回類型沒有實際意義,由於Wait沒有返回值,這裏支持多種返回類型的任務只不過是爲了減小一點限制,用起來稍微方便一點罷了。PPL加入任務只能一個一個Run,要加入多個任務時有點繁瑣,TaskCpp能夠一次Run多個任務,比PPL要方便一些。這兩點算是較PPL的兩個小優勢吧。
ParallelForeach算法與 STL std::for_each 算法相似,只是 parallel_for_each 算法併發執行任務。用法比較簡單:
bool check_prime(int x) // 爲了體現效果, 該函數故意沒有優化. { for (int i = 2; i < x; ++i) if (x % i == 0) return false; return true; } void TestParallelFor() { vector<int> v; for (int i = 0; i < 100000; i++) { v.push_back(i + 1); } boost::timer t; ParallelForeach(v.begin(), v.end(), check_prime); ParallelForeach(v.begin(), v.end(), check_prime); cout << "taskcpp: " << t.elapsed() << endl; }
ParallelInvoke算法並行執行一組任務。 在完成全部任務以前,此算法不會返回。 當您須要同時執行多個獨立的任務時,此算法頗有用。ParallelInvoke和TaskGroup的做用是同樣的。用法比較簡單:
void TestParaInvoke() { auto f = []{cout << "1" << endl; return 1; }; ParallelInvoke(f, []{cout << "2" << endl; }); }
ParallelReduce算法在實際應用中比較經常使用,有點相似於map-reduce,能夠並行的對一個集合進行reduce操做。ParallelReduce的用法稍微複雜一點,它的原型:
第一個參數是集合,第二個參數是算法的初始值,第三個參數rangeFunc是一個聲稱中間結果的函數,第四個參數是中間結果的匯聚函數。若是調用ParallelReduce(range,init, reduceFunc),則表示rangeFunc和reduceFunc是一個函數。
下面的例子是並行的計算100000000個整數的和:
void TestParallelSum() { vector<int> v; const int Size = 100000000; v.reserve(Size); for (int i = 0; i < Size; i++) { v.push_back(i + 1); } int i = 0; boost::timer t; auto r = ParallelReduce(v, i, [](const vector<int>::iterator& begin, vector<int>::iterator&end, int val) { return std::accumulate(begin, end, val); }); cout << t.elapsed() << " " << r << endl; }
下面是並行查找最長的字符串的例子:
void TestFindString() { vector<string> v; v.reserve(10000000); for (int i = 0; i < 10000000; i++) { v.emplace_back(std::to_string(i + 1)); } string init = ""; auto f = [](const vector<string>::iterator& begin, vector<string>::iterator&end, string& val) { return *std::max_element(begin, end, [](string& str1, string& str2){return str1.length()<str2.length(); }); }; boost::timer t; auto r = ParallelReduce(v, init, f, f); cout << t.elapsed() << " " << r << endl; }
用四個測試用例對比測試了tbb、ppl、TaskCpp和單線程的性能。下圖是測試對比的結果:
能夠看到TaskCpp的性能比單線程效率要高,整體上也優於ppl和tbb,其中ppl和tbb在某些場景下性能還不如單線程高,因此在使用時要以實際測試數據爲準,並非一用並行庫效率就能提升。
遵循LGPL(GNU General Public License)協議。
TaskCpp是一個任務庫,不是線程池,每啓動一個task就會建立一個線程,若是須要線程池能夠看這裏。
若是發現問題或者有什麼建議請給我留言或者發郵件qicosmos@163.com .
c++11 boost技術交流羣:296561497,歡迎你們來交流技術。
TaskCpp的開發和測試花費了我兩三週的時間,開發之初我就計劃將其開源,我但願更多的人能用起來並推廣TaskCpp,促進它的發展。曾經有人問我,爲何堅持發原創文章分享技術,是否是有什麼好處。我一會兒還真答不上來,由於我根本就沒有想過有啥好處,如今再想一下,好處嘛,分享的術也許對別人學習有幫助吧。再想一想我這樣作的緣由,一個緣由是興趣,這要感謝c++11,是c++11讓我以爲c++語言是很是有意思和有魅力的語言,總能帶給人驚喜,沒有c++11我也不可能完成TaskCpp庫。還有就是一點點分享快樂的精神,我分享我快樂。最重要的緣由是一點點夢想,c++中開源庫太少了,不少框架和基礎庫都還不夠,遠遠趕不上java,因此在使用和推廣上不如java。可是我有一點夢想:我但願經過本身的一點努力能讓c++的世界變得更加美好,能讓c++開發者的日子變得美好。是的,正是這個夢想促使我將我開發的大部分代碼都開源出來!也正是這個夢想促使我堅持寫原創博客分享技術!