(原創)發佈一個c++11開發的輕量級的並行Task庫TaskCpp

TaskCpp簡介

  TaskCpp是c++11開發的一個跨平臺的並行task庫,它的設計思路來源於微軟的並行計算庫ppl和intel的並行計算庫tbb,關於ppl和tbb我在前面有介紹。既然已經有了這兩個大公司開發的並行計算庫,我爲何還要開發本身的並行計算庫。有兩個緣由:html

  1. ppl只能在windows上用不能跨平臺,tbb能跨平臺,可是受限於原始設計,tbb的task比較弱沒有ppl的強大,因此他們不能徹底知足個人要求;
  2. 我以爲能夠用c++11能夠開發出一個輕量級的好用的並行task庫。

  TaskCpp在接口設計上儘可能和ppl保持一致,由於我以爲ppl的接口很好很強大。所以,TaskCpp的接口用法和語義和ppl基本是一致的。由於TaskCpp是一個輕量級的task庫,總共也不過三百多行代碼,本着簡單夠用的原則,只提供了一些和ppl相似的經常使用用法, 有些不經常使用的特性不考慮支持。好比,不支持任務的取消,由於加入任務的取消會致使增長不少複雜性,而實際中用得比較少,因此不考慮支持,夠用就好。java

支持的平臺

須要支持c++11的編譯器,建議編譯器:linux

  • linux: GCC4.7+
  • windows: vs2012 nov ctp+, 最好是vs2013

庫的使用

  使用TaskCpp僅僅須要包含頭文件便可,在程序中使用只須要包含#include <TaskCpp.h>和少許的boost頭文件便可。c++

TaskCpp的功能

TaskCpp提供一下功能:算法

  • 並行任務:一種並行執行若干工做任務的機制。
    • 基本的異步任務
    • 延續的任務
    • 組合任務
      • WhenAll
      • WhenAny
    • 任務組
  • 並行算法:並行做用於數據集合的泛型算法。
    • ParallelForeach算法
    • ParallelInvoke算法
    • ParallelReduce算法

TaskCpp用法介紹

並行任務

基本的異步任務Task

  Task會建立一個異步操做,這個異步操做發起方式是延遲加載方式發起的,即在調用Task的Wait或者Get時才真正發起異步操做。Task能夠經過std::function或者lambda表達式去建立,不支持直接原生函數建立,若是要用原生函數須要先經過lambda或者std::function包裝一下。Task的Wait接口只是等待異步操做結束。Task的Get接口接收參數並等待異步操做結束並返回結果。PPL中的get接口是不能接收參數的,TaskCpp的Get接口是能夠接受任意參數的,更靈活一點,算是較PPL的一個小優勢吧。下面是Task的基本用法:windows

#include <TaskCpp.h>
using namespace Cosmos;

void TestTask()
{
    Task<void()> task([]{cout << 1 << endl; });
    task.Wait();

    Task<void()> task1 = []{cout << 1 << endl; };
    task1.Wait();

    Task<int()> task2 = []{cout << 1 << endl; return 1; };
    cout << task2.Get() << endl;

    Task<int(int)> task3 = [](int i){cout << i << endl; return i; };
    cout << task3.Get(3) << endl;
}
View Code

組合任務--WhenAll

  WhenAll保證一個任務集合中全部的任務完成。WhenAll函數會生成一個任務,該任務可在完成一組任務以後完成。 此函數可返回一個std::vector 對象,該對象包含集合中每一個任務的結果。 如下基本示例使用WhenAll建立表示完成其餘三個任務的任務。下面是WhenAll的基本用法:併發

void PrintThread()
{
    cout << std::this_thread::get_id() << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(5));
}

void TestWhenAll()
{
    vector<Task<int()>> v = {
        Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }),
        Task<int()>([]{PrintThread(); return 2; }),
        Task<int()>([]{PrintThread(); return 3; }),
        Task<int()>([]{PrintThread(); return 4; })
    };

    cout << "when all " << endl;
    WhenAll(v).Get();

}
View Code

  注意:WhenAll是非阻塞的,它只是建立一個任務,在Wait或Get時才發起異步操做。傳遞給WhenAll的任務必須是統一的。 換言之,它們必須都返回相同類型。框架

組合任務--WhenAny

  WhenAny在任務集合中任意一個任務結束以後就返回。函數會生成一個任務,該任務可在完成一組任務的第一個任務以後完成。 此函數可返回一個 std::pair 對象,該對象包含已完成任務的結果和集合中任務的索引。下面是WhenAny的基本用法:異步

void TestWhenAny()
{
    vector<Task<int()>> v = {
        Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }),
        Task<int()>([]{PrintThread(); return 2; }),
        Task<int()>([]{PrintThread(); return 3; }),
        Task<int()>([]{PrintThread(); return 4; })
    };

    cout << "when any " << endl;
    WhenAny(v).Then([](std::pair<int, int>& result)
    {
        cout << " index " << result.first << " result " << result.second << endl;
        return result.second;
    }).Then([](int result){cout << "any result: " << result << endl; }).Get();
}
View Code


  注意:WhenAny是非阻塞的,它只是建立一個任務,在Wait或Get時才發起異步操做。傳遞給WhenAny的任務必須是統一的。 換言之,它們必須都返回相同類型。ide

任務組--TaskGroup

  TaskGroup能夠並行的處理一組任務,TaskGroup能夠接受多個task或者function,TaskGroup的Wait等待全部任務完成。下面是TaskGroup的基本用法:

void TestTaskGroup()
{
    Task<int()> t1([]{PrintThread(); return 1; });
    Task<double()> t2([]{PrintThread(); return 2.123; });
    Task<void()> t3([]{PrintThread(); });
    Task<string()> t4([]{PrintThread(); return "ok"; });

    TaskGroup group;
    group.Run(t1); 
    group.Run(t2); 
    group.Run(t3);  
    group.Run(t4); 
    
    //若是你以爲這樣一個一個Run加入任務,你也能夠一塊兒Run
    group.Run(t1, t2, t3, []{PrintThread(); return 1; });

    group.Wait();
}
View Code

  PPL的task_group的任務只能是void()形式的,TaskCpp容許一些簡單類型的任務如int()、double()、string()等,其實任務的返回類型沒有實際意義,由於Wait沒有返回值,這裏支持多種返回類型的任務只不過是爲了減小一點限制,用起來稍微方便一點罷了。PPL加入任務只能一個一個Run,要加入多個任務時有點繁瑣,TaskCpp能夠一次Run多個任務,比PPL要方便一些。這兩點算是較PPL的兩個小優勢吧。

並行算法

ParallelForeach算法

  ParallelForeach算法與 STL std::for_each 算法相似,只是 parallel_for_each 算法併發執行任務。用法比較簡單:

bool check_prime(int x) // 爲了體現效果, 該函數故意沒有優化.
{
    for (int i = 2; i < x; ++i)
    if (x % i == 0)
        return false;
    return true;
}

void TestParallelFor()
{
    vector<int> v;
    for (int i = 0; i < 100000; i++)
    {
        v.push_back(i + 1);
    }

    boost::timer t;

    ParallelForeach(v.begin(), v.end(), check_prime);
    ParallelForeach(v.begin(), v.end(), check_prime);

    cout << "taskcpp: " << t.elapsed() << endl;
}
View Code

ParallelInvoke算法

  ParallelInvoke算法並行執行一組任務。 在完成全部任務以前,此算法不會返回。 當您須要同時執行多個獨立的任務時,此算法頗有用。ParallelInvoke和TaskGroup的做用是同樣的。用法比較簡單:

void TestParaInvoke()
{
    auto f = []{cout << "1" << endl; return 1; };
    ParallelInvoke(f, []{cout << "2" << endl; });
}

ParallelReduce算法

  ParallelReduce算法在實際應用中比較經常使用,有點相似於map-reduce,能夠並行的對一個集合進行reduce操做。ParallelReduce的用法稍微複雜一點,它的原型:

    • ParallelReduce(range,init, reduceFunc);
    • ParallelReduce(range,init, rangeFunc, reduceFunc);

  第一個參數是集合,第二個參數是算法的初始值,第三個參數rangeFunc是一個聲稱中間結果的函數,第四個參數是中間結果的匯聚函數。若是調用ParallelReduce(range,init, reduceFunc),則表示rangeFunc和reduceFunc是一個函數。

  下面的例子是並行的計算100000000個整數的和:

void TestParallelSum()
{
    vector<int> v;
    const int Size = 100000000;
    v.reserve(Size);
    for (int i = 0; i < Size; i++)
    {
        v.push_back(i + 1);
    }

    int i = 0;

    boost::timer t;
    auto r = ParallelReduce(v, i, [](const vector<int>::iterator& begin, 

vector<int>::iterator&end, int val)
    {
        return std::accumulate(begin, end, val);
    });
    cout << t.elapsed() << " " << r << endl;
}
View Code

  下面是並行查找最長的字符串的例子:

void TestFindString()
{
    vector<string> v;
    v.reserve(10000000);
    for (int i = 0; i < 10000000; i++)
    {
        v.emplace_back(std::to_string(i + 1));
    }

    string init = "";

    auto f = [](const vector<string>::iterator& begin, vector<string>::iterator&end, string& val)
    {
        return *std::max_element(begin, end, [](string& str1, string& str2){return str1.length()<str2.length(); });
    };

    boost::timer t;
    auto r = ParallelReduce(v, init, f, f);
    cout << t.elapsed() << " " << r << endl;
}
View Code

性能

  用四個測試用例對比測試了tbb、ppl、TaskCpp和單線程的性能。下圖是測試對比的結果:

  能夠看到TaskCpp的性能比單線程效率要高,整體上也優於ppl和tbb,其中ppl和tbb在某些場景下性能還不如單線程高,因此在使用時要以實際測試數據爲準,並非一用並行庫效率就能提升。

下載地址

TaskCpp 更新版本TaskCppV1.1

TaskCpp開源協議

  遵循LGPL(GNU General Public License)協議。

注意

TaskCpp是一個任務庫,不是線程池,每啓動一個task就會建立一個線程,若是須要線程池能夠看這裏

聯繫我

  若是發現問題或者有什麼建議請給我留言或者發郵件qicosmos@163.com . 

  c++11 boost技術交流羣:296561497,歡迎你們來交流技術。

後記

  TaskCpp的開發和測試花費了我兩三週的時間,開發之初我就計劃將其開源,我但願更多的人能用起來並推廣TaskCpp,促進它的發展。曾經有人問我,爲何堅持發原創文章分享技術,是否是有什麼好處。我一會兒還真答不上來,由於我根本就沒有想過有啥好處,如今再想一下,好處嘛,分享的術也許對別人學習有幫助吧。再想一想我這樣作的緣由,一個緣由是興趣,這要感謝c++11,是c++11讓我以爲c++語言是很是有意思和有魅力的語言,總能帶給人驚喜,沒有c++11我也不可能完成TaskCpp庫。還有就是一點點分享快樂的精神,我分享我快樂。最重要的緣由是一點點夢想,c++中開源庫太少了,不少框架和基礎庫都還不夠,遠遠趕不上java,因此在使用和推廣上不如java。可是我有一點夢想:我但願經過本身的一點努力能讓c++的世界變得更加美好,能讓c++開發者的日子變得美好。是的,正是這個夢想促使我將我開發的大部分代碼都開源出來!也正是這個夢想促使我堅持寫原創博客分享技術!

相關文章
相關標籤/搜索