相信你們一直有聽過線程池, 可是不必定都知道這究竟是個什麼東西,是如何實現的;git
當一個服務器接受到大量短小線程的請求時, 使用線程池技術是很是合適的, 它能夠大大減小線程的建立和銷燬次數, 提升服務器的工做效率. 可是線程要求的運行時間比較長, 則不適用.github
線程池通常有如下三個功能:服務器
以上是對外的三個接口方法;數據結構
本次實現的線程池對外有四個接口:併發
struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time); void sl_thread_pool_destory(struct sl_thread_pool *pool); void sl_thread_pool_destory_now(struct sl_thread_pool *pool); int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg);
在銷燬線程的時候我作了個功能細化,分爲兩種: 一種是當即銷燬線程池, 一種是執行完任務隊列中的任務再銷燬線程池,兩種方式都是爲阻塞方式;函數
struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time);
core_td_num: 初始化線程數
max_td_num: 最大線程數目(線程數量是動態分配)
alive_time: 線程空閒時存活的時間,單位:毫秒
return: 返回線程池句柄
該接口主要是用於建立一個線程池, 筆者寫的該線程池能夠動態的伸縮因此加入了最大線程數限制和存活時間.oop
void sl_thread_pool_destory(struct sl_thread_pool *pool);
調用該接口時,線程池不會立馬被註銷而是處理完任務隊列中的全部任務才註銷;ui
void sl_thread_pool_destory_now(struct sl_thread_pool *pool);
調用該接口時,當即註銷線程池;spa
int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg);
向線程池中添加一個新任務, 形參task_fun
爲任務的函數指針, arg
爲函數指針的參數;線程
筆者寫的該線程池有兩個重要的鏈表:一個是線程鏈表,一個是任務鏈表,還有一個重要的線程:manager線程,用於管理線程的銷燬和建立;
struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time) { struct sl_thread_pool *pstp = NULL; struct sl_thread *thread = NULL; int create_ret = -1; pstp = (struct sl_thread_pool*)malloc(sizeof(struct sl_thread_pool)); ① if (pstp == NULL) { ERR("%s: malloc error for creat pool", __FUNCTION__); goto malloc_pool_err; } create_ret = sl_create_thread_key(destructor_fun); ② if (create_ret != 0) { ERR("%s: create thread key error", __FUNCTION__); goto create_key_err; } /* 建立manager*/ create_manager_looper(pstp); thread = sl_thread_create(sl_thread_manager_do, pstp); ③ if (thread == NULL) { ERR("%s: malloc error for create pool", __FUNCTION__); goto create_manager_err; } else { pstp->thread_manager = thread; pthread_setname_np(thread->thread_id, "manager_thread"); } /* 初始化線程池鏈表 */ list_head_init(&pstp->thread_head); list_head_init(&pstp->task_queue.task_head); /* 初始化線程池計數 */ pstp->core_threads_num = core_td_num; pstp->max_threads_num = max_td_num; pstp->keep_alive_time = alive_time; pstp->alive_threads_num = 0; pstp->destory = 0; pthread_mutex_init(&pstp->thread_mutex, NULL); // pthread_cond_init(&pstp->thread_run_signal, NULL); /* 初始化工做鎖 */ pthread_mutex_init(&pstp->task_queue.task_mutex, NULL); /* 初始化工做隊列同步條件 */ pthread_cond_init(&pstp->task_queue.task_ready_signal, NULL); /* 建立核心線程 */ for (int i = 0; i < pstp->core_threads_num; i++) { ④ thread = sl_thread_create(sl_thread_do, pstp); if (thread != NULL) { list_add(&pstp->thread_head, &thread->thread_list); pthread_setname_np(thread->thread_id, "core_thread"); } else { i--; } } /* 等待覈心線程建立完成 */ while (pstp->alive_threads_num != pstp->core_threads_num); return pstp; create_manager_err: pthread_key_delete(g_key); create_key_err: free(pstp); malloc_pool_err: return NULL; }
①: 爲線程池分配空間.
②: 建立線程私有數據.
③: 建立manager線程.
④: 建立核心線程,這必定數量的線程是不會被釋放的.
線程池的數據結構以下:
struct sl_thread_pool { struct list_head thread_head; /* 線程鏈表 */ struct sl_task_queue task_queue; /* 任務鏈表 */ unsigned int core_threads_num; /* 初始化須要建立的線程數 */ unsigned int max_threads_num; /* 建立線程最大上限數 */ unsigned int alive_threads_num; /* 當前建立線程數量 */ pthread_mutex_t thread_mutex; pthread_cond_t thread_run_signal; /* 線程run信號 */ int keep_alive_time; /* 空閒線程保持存活時間 unit: ms */ struct sl_thread *thread_manager; /* 領導 */ unsigned int destory; };
static void *sl_thread_manager_do(void *arg) { struct sl_thread_pool *pstp = (struct sl_thread_pool *)arg; int next_poll_time = -1; int keep_alive_time = -1; if (pstp == NULL) { ERR("%s: pool is NULL", __FUNCTION__); return NULL; } do { usleep(100); } while(pstp->thread_manager == NULL); while (pstp->thread_manager->thread_status != THREAD_QUIT) { keep_alive_time = poll_event(pstp, next_poll_time); next_poll_time = get_next_poll_time(keep_alive_time); } INFO("sl_thread_manager_do quit"); return NULL; }
manager線程主要是epoll
來輪詢事件,而後作出相應的處理;主要的事件有三個:
static int poll_event(struct sl_thread_pool *pool, int time_out) { ... struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int event_count = epoll_wait(g_epoll_fd, eventItems, EPOLL_MAX_EVENTS, time_out); ... // Check for poll timeout. if (event_count == 0) { ① list_for_each(plh, &pstp->thread_head) { pst = sl_list_entry(plh, struct sl_thread, thread_list); DEBUG("%s: pstp->alive_threads_num = %d, %ld thread status %s", __FUNCTION__, pstp->alive_threads_num, pst->thread_id, get_status(pst->thread_status)); if (pstp->alive_threads_num > pstp->core_threads_num) { if (pst->thread_status == THREAD_SUPPEND) { pst->thread_status = THREAD_QUIT; sl_notify_all(&pstp->task_queue); delete_when_each(plh); pthread_join(pst->thread_id, NULL); free(pst); keep_time = 50; // 50ms再檢測一次 break; } } else { keep_time = -1; break; } } return keep_time; } // despatch for poll event for (int i = 0; i < event_count; i++) { fd = eventItems[i].data.fd; epoll_events = eventItems[i].events; if ((fd == g_wake_read_pip_fd) && (epoll_events & EPOLLIN)) { /* thread和task同時來臨只處理thread */ ret_event = sl_get_event(); switch(ret_event) { case EVENT_THREAD: ② DEBUG("EVENT_THREAD"); if (pstp->alive_threads_num > pstp->core_threads_num) { keep_time = pstp->keep_alive_time; } else { keep_time = -1; } break; case EVENT_TASK: ③ DEBUG("EVENT_TASK"); /* 判斷當前線程的消息和當前運行線程比例 */ pstq = &pstp->task_queue; if(pstq->num_tasks_alive >= (pstp->alive_threads_num * 2) && (pstp->alive_threads_num <= pstp->max_threads_num)) { /* 建立線程 */ pst = sl_thread_create(sl_thread_do, pstp); if (pst != NULL) { list_add(&pstp->thread_head, &pst->thread_list); pthread_setname_np(pst->thread_id, "other_thread"); } } break; case EVENT_SHUTDOWN: ④ DEBUG("EVENT_SHUTDOWN"); /* 執行完任務對列中的任務才shutdown */ pstp->core_threads_num = 0; pool->destory = 1; break; default: break; } } } return keep_time; }
①: wait超時的處理,通常進入超時狀態都是準備註銷線程, 線程空閒時則註銷.
②: 線程狀態變化處理,判斷當前線程是否多餘核心線程,若是是則設置存活時間爲下一輪的wait超時時間.
③: 發送任務事件後,主要是判斷當前任務數量,線程池是否處理的過來,不然建立新線程.
④: 註銷事件,核心線程數設置爲0,等待任務鏈表中的任務處理完再註銷;
事件的輪詢主要是藉助epoll
監控管道的變化實現,想了解的能夠詳細看下代碼;
static void *sl_thread_do(void *arg) { struct sl_thread_pool *pstp = (struct sl_thread_pool *)arg; struct sl_thread_task *pstt = NULL; struct sl_task_queue *pstq = NULL; if (pstp == NULL) { ERR("%s: pool is NULL", __FUNCTION__); return NULL; } pstq = &pstp->task_queue; pthread_mutex_lock(&pstp->thread_mutex); pstp->alive_threads_num++; pthread_mutex_unlock(&pstp->thread_mutex); sl_save_thread_self(pstp); while (sl_get_thread_self()->thread_status != THREAD_QUIT) { pstt = sl_task_pull(pstq); ① if (pstt != NULL) { sl_update_thread_status(THREAD_WORKING); pstt->task_fun(&pstt->arg); ② free(pstt); } } pthread_mutex_lock(&pstp->thread_mutex); pstp->alive_threads_num--; pthread_mutex_unlock(&pstp->thread_mutex); sl_update_thread_status(THREAD_IDLE); sl_clear_thread_self(); ③ INFO("thread_run_task %ld quit, currten threads count %d, currten tasks count %d\n", pthread_self(), pstp->alive_threads_num, pstq->num_tasks_alive); return NULL; }
①: 從任務對列中取出一個任務, 沒有則休眠;
②: 執行任務
③: 清除私有數據中存放的值
這在說明一點,用線程的私有數據進行存儲, 主要是爲了更新線程的狀態方便;
int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg) { struct sl_task_queue *pstq = NULL; struct sl_thread_task *pstt = NULL; if (pool == NULL || task_fun == NULL || pool->destory == 1) { ERR("%s: pool or task_fun is NULL or is destory status", __FUNCTION__); return -1; } pstq = &pool->task_queue; pstt = (struct sl_thread_task*)malloc(sizeof(struct sl_thread_task)); if (pstt == NULL) { ERR("%s: malloc error for creat a task", __FUNCTION__); return -1; } pstt->task_fun = task_fun; pstt->arg = arg; return sl_task_push(pstq, pstt); }
該接口主要分配了一個空間初始化傳進來的任務,往下看:
static int sl_task_push(struct sl_task_queue *_stq, struct sl_thread_task *new_task) { struct sl_task_queue *pstq = _stq; struct sl_thread_task *pstt = new_task; if (pstq == NULL || pstt == NULL) { ERR("%s: pstq or pstt is NULL", __FUNCTION__); return -1; } pthread_mutex_lock(&pstq->task_mutex); list_add(&pstq->task_head, &pstt->task_list); pstq->num_tasks_alive++; pthread_mutex_unlock(&pstq->task_mutex); sl_notify_one(pstq); sl_update_task_queue_info(); return pstq->num_tasks_alive; }
將剛纔保存的任務添加進任務對列併發送通知;
筆者寫這個線程池,主要涉及到這個點有: 同步變量, 鎖, 線程私有數據, 管道, epoll和雙向隊列;
代碼已經放到個人github上了: thread pool