[轉]:TBOX中的線程池設計和實現

TBOX的線程池經過在每一個worker中批量一次拉取多個task,對鎖的競爭進行了優化。git

因爲每一個task的函數實現不會太多,因此能夠根據每一個task的函數地址作hash,統計出每一個task執行所花費的平均時間。而後根據這個平均值來動態計算每一個worker一次拉取的task的數量,TBOX裏面默認每一個worker一次拉取10s的task量,這樣能夠儘量的避免worker間鎖的頻繁搶佔。github

全部從等待隊列被拉取出來的task,都會被放到pending隊列中去,若是等待隊列中的task都被取完了,某個worker處於了空閒狀態,就會嘗試去pending中,從新拉取其餘worker尚未執行到的task, 這樣能夠解決某些task耗時太長,將worker中剩餘的task阻塞住的問題。函數

從新從pending隊列中拉取其餘worker的task,並無經過鎖來維護,而是經過原子操做判斷task的狀態來維護的,因此性能上仍是能夠保證的。post

整個線程池,只用到了一個鎖來維護內部的幾個隊列,每一個worker在大部分狀況都是獨立運行的,只有在本身的全部task都執行完空閒時,纔回去全局等待隊列中取task,而且上層接口也提供了批量投遞任務的接口,來最小化對鎖的使用。性能

下面看下簡單的使用例子:優化

static tb_void_t tb_demo_task_time_done(tb_cpointer_t priv)
{
    tb_msleep((tb_size_t)(priv));
}

static tb_void_t tb_demo_task_time_exit(tb_cpointer_t priv)
{
}

/* 投遞一個60s的任務到全局線程池
 *
 * tb_thread_pool(): 全局線程池實例,若是不想用全局的,也能夠本身建立個線程池
 * "60000ms": 指定的一個任務名,常量字符串
 * tb_demo_task_time_done: 任務函數地址
 * tb_demo_task_time_exit: 任務被執行完或取消的時候的清理函數,能夠用於釋放一些自有數據,這個是可選的,不用直接傳tb_null
 * (tb_cpointer_t)60000: 傳遞的私有數據指針,這裏簡單的傳了個等待時間值進去
 * tb_false: 是否爲緊急任務, 若是爲tb_true, 則這個任務會盡量第一時間優先唄執行
 */
tb_thread_pool_task_post(tb_thread_pool(), "60000ms", tb_demo_task_time_done, tb_demo_task_time_exit, (tb_cpointer_t)60000, tb_false);

// 投遞一個10s的緊急任務
tb_thread_pool_task_post(tb_thread_pool(), "10000ms", tb_demo_task_time_done, tb_null, (tb_cpointer_t)10000, tb_true);

// 批量投遞兩個任務
tb_thread_pool_task_t list[2] = {0};
list[0].name = "60000ms";
list[0].done = tb_demo_task_time_done;
list[0].exit = tb_demo_task_time_exit;
list[0].priv = (tb_pointer_t)60000;
list[0].urgent = tb_false;
list[1].name = "10000ms";
list[1].done = tb_demo_task_time_done;
list[1].exit = tb_null;
list[1].priv = (tb_pointer_t)10000;
list[1].urgent = tb_true;
tb_thread_pool_task_post_list(tb_thread_pool(), list, 2);

// 初始化而且投遞一個10s的緊急任務, 返回一個有效句柄
tb_thread_pool_task_ref_t task = tb_thread_pool_task_init(tb_thread_pool(), "10000ms", tb_demo_task_time_done, tb_null, (tb_cpointer_t)10000, tb_true);
if (task)
{
    // 取消這個任務,若是這個任務已經在執行中了,就無法取消了
    tb_thread_pool_task_kill(tb_thread_pool(), task);
    
    // 等待任務取消或完成,超時值:-1:無限等待
    tb_thread_pool_task_wait(tb_thread_pool(), task, -1);
    
    // 釋放這個任務
    tb_thread_pool_task_exit(tb_thread_pool(), task);
}

若是不想用全局線程池,能夠本身初始化一個:.net

/* 初始化線程池
 *
 * 8:最大worker的數量,上限值,若是傳0就是使用默認值
 * 0: 每一個worker線程的堆棧大小,若是傳0就是使用默認值
 */
tb_thread_pool_ref_t thread_pool = tb_thread_pool_init(8, 0);
if (thread_pool)
{
    // 投遞一個10s的緊急任務
    tb_thread_pool_task_post(thread_pool, "10000ms", tb_demo_task_time_done, tb_null, (tb_cpointer_t)10000, tb_true);
    
    // 若是的調試模式下,能夠dump整個線程池的狀態和全部處理中的任務狀態
#ifdef __tb_debug__
    tb_thread_pool_dump(thread_pool);
#endif

    // 等待全部任務執行完成或被取消
    tb_thread_pool_task_wait_all(thread_pool, -1);
    
    // 退出線程池
    tb_thread_pool_exit(thread_pool);
}

相關文章
相關標籤/搜索