多線程編程已是如今網絡編程中經常使用的編程技術,設計一個良好的線程池庫顯得尤其重要。在 UNIX(WIN32下能夠採用相似的方法,acl 庫中的線程池是跨平臺的) 環境下設計線程池庫主要是如何用好以下系統 API:git
一、pthread_cond_signal/pthread_cond_broadcast:生產者線程通知線程池中的某個或一些消費者線程池,接收處理任務;github
二、pthread_cond_wait:線程池中的消費者線程等待線程條件變量被通知;編程
三、pthread_mutex_lock/pthread_mutex_unlock:線程互斥鎖的加鎖及解鎖函數。網絡
下面的代碼示例是你們常見的線程池的設計方式:多線程
// 線程任務類型定義 struct thread_job { struct thread_job *next; // 指向下一個線程任務 void (*func)(void*); // 應用回調處理函數 void *arg; // 回調函數的參數 ... }; // 線程池類型定義 struct thread_pool { int max_threads; // 線程池中最大線程數限制 int curr_threads; // 當前線程池中總的線程數 int idle_threads; // 當前線程池中空閒的線程數 pthread_mutex_t mutex; // 線程互斥鎖 pthread_cond_t cond; // 線程條件變量 thread_job *first; // 線程任務鏈表的表頭 thread_job *last; // 線程任務鏈表的表尾 ... } // 線程池中的消費者線程處理過程 static void *consumer_thread(void *arg) { struct thread_pool *pool = (struct thread_pool*) arg; struct thread_job *job; int status; // 該消費者線程須要先加鎖 pthread_mutex_lock(&pool->mutex); while (1) { if (pool->first != NULL) { // 有線程任務時,則取出並在下面進行處理 job = pool->first; pool->first = job->next; if (pool->last == job) pool->last = NULL; // 解鎖,容許其它消費者線程加鎖或生產者線程添加新的任務 pthread_mutex_unlock(&pool->mutex); // 回調應用的處理函數 job->func(job->arg); // 釋放動態分配的內存 free(job); // 從新去加鎖 pthread_mutex_lock(&pool->mutex); } else { pool->idle_threads++; // 在調用 pthread_cond_wait 等待線程條件變量被通知且自動解鎖 status = pthread_cond_wait(&pool->cond, &pool->mutex); pool->idle_threads--; if (status == 0) continue; // 等待線程條件變量異常,則該線程須要退出 pool->curr_threads--; pthread_mutex_unlock(&pool->mutex); break; } } return NULL; } // 生產者線程調用此函數添加新的處理任務 void add_thread_job(struct thread_pool *pool, void (*func)(void*), void *arg) { // 動態分配任務對象 struct thread_job *job = (struct thread_job*) calloc(1, sizeof(*job)); job->func = func; job->arg = arg; pthread_mutex_lock(&pool->mutex); // 將新任務添加進線程池的任務鏈表中 if (pool->first == NULL) pool->first = job; else pool->last->next = job; pool->last = job; job->next = NULL; if (pool->idle_threads > 0) { // 若是有空閒消費者線程,則通知空閒線程進行處理,同時須要解鎖 pthread_mutex_unlock(&pool->mutex); pthread_cond_signal(&pool->cond); } else if (pool->curr_threads < pool->max_threads) { // 若是未超過最大線程數限制,則建立一個新的消費者線程 pthread_t id; pthread_attr_t attr; pthread_attr_init(&attr); // 將線程屬性設爲分離模式,這樣當線程退出時其資源自動由系統回收 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 建立一個消費者線程 if (pthread_create(&id, &attr, consumer_thread, pool) == 0) pool->curr_threads++; pthread_mutex_unlock(&pool->mutex); pthread_attr_destroy(&attr); } } // 建立線程池對象 struct thread_pool *create_thread_pool(int max_threads) { struct thread_pool *pool = (struct thread_pool*) calloc(1, sizeof(*pool)); pool->max_threads = max_threads; pthread_mutex_init(&pool->mutex); pthread_cond_init(&pool->cond); ... return pool; } /////////////////////////////////////////////////////////////////////////////////// // 使用上面線程池的示例以下: // 由消費者線程回調的處理過程 static void thread_callback(void* arg) { ... } void test(void) { struct thread_pool *pool = create_thread_pool(100); int i; // 循環添加 1000000 次線程處理任務 for (i = 0; i < 1000000; i++) add_thread_job(pool, thread_callback, NULL); }
乍一看去,彷佛也沒有什麼問題,象不少經典的開源代碼中也是這樣設計的,但有一個重要問題被忽視了:線程池設計中的驚羣現象。你們能夠看到,整個線程池只有一個線程條件變量和線程互斥鎖,生產者線程和消費者線程(即線程池中的子線程)正是經過這兩個變量進行同步的。生產者線程每添加一個新任務,都會調用 pthread_cond_signal 一次,由操做系統喚醒一個在線程條件變量等待的消費者線程,但若是查看 pthread_cond_signal API 的系統幫助,你會發現其中有一句話:調用此函數後,系統會喚醒在相同條件變量上等待的一個或多個線程。而正是這句模棱兩可的話沒有引發不少線程池設計者的注意,這也是整個線程池中消費者線程收到信號通知後產生驚羣現象的根源所在,而且是消費者線程數量越多,驚羣現象越嚴重----意味着 CPU 佔用越高,線程池的調度性能越低。併發
要想避免如上線程池設計中的驚羣問題,在仍然共用一個線程互斥鎖的條件下,給每個消費者線程建立一個線程條件變量,生產者線程在添加任務時,找到空閒的消費者線程,將任務置入該消費者的任務隊列中同時只通知 (pthread_cond_signal) 該消費者的線程條件變量,消費者線程與生產者線程雖然共用相同的線程互斥鎖(由於有全局資源及調用 pthread_cond_wait 所需),但線程條件變量的通知過程倒是定向通知的,未被通知的消費者線程不會被喚醒,這樣驚羣現象也就不會產生了。svn
固然,還有一些設計上的細節須要注意,好比:當沒有空閒消費者線程時,須要將任務添加進線程池的全局任務隊列中,消費者線程處理完本身的任務後須要查看一下線程池中的全局任務隊列中是否還有未處理的任務。函數
更多的線程池的設計細節請參考 acl (https://sourceforge.net/projects/acl/) 庫中 lib_acl/src/thread/acl_pthread_pool.c 中的代碼。高併發
參考:性能
acl 庫下載:https://sourceforge.net/projects/acl/
github:https://github.com/zhengshuxin/acl
svn:svn checkout svn://svn.code.sf.net/p/acl/code/trunk acl-code