線性池的應用來源服務器
爲知足多客戶端可同時登錄的要求,服務器端必須實現併發工做方式。當服務器主進程持續等待客戶端鏈接時,每鏈接上一個客戶端都需一個單獨的進程或線程處理客戶端的任務。但考慮到多進程對系統資源消耗大,單一線程存在重複建立、銷燬等動做產生過多的調度開銷,故採用線性池的方法。多線程
線性池是一種多線程併發的處理形式,它就是由一堆已建立好的線程組成。有新任務 -> 取出空閒線程處理任務 -> 任務處理完成放入線程池等待。避免了處理短期任務時大量的線程重複建立、銷燬的代價,很是適用於連續產生大量併發任務的場合。併發
線行池實現原理過程:框架
1)初始設置任務隊列(鏈表)做爲緩衝機制,並初始化建立n個線程,加鎖去任務隊列取任務運行(多線程互斥)。函數
2)在處理任務過程當中,當任務隊列爲空時,全部線程阻塞(阻塞IO)處於空閒(wait)狀態;spa
3)當任務隊列加入新的任務時,隊列加鎖,而後使用條件變量喚醒(work)處於阻塞中的某一線程來執行任務;線程
4)執行完後再次返回線程池中成爲空閒(wait)狀態,依序或等待執行下一個任務。code
最後完成全部任務將線程池中的線程統一銷燬。blog
-----------------------------------------------------------------隊列
加入程序框架中:(運用了線程互斥、線程同步)
main主函數下: //1.初始化線程池
pool_init(5); // 等待鏈接
while(1) { new_fd = accept(sockfd,(struct sockaddr *)(&client_addr),&sin_size); //2.執行process,將process任務交給線程池
pool_add_task(process,new_fd); }
void pool_init (int max_thread_num) { //申請堆空間 pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
for (i = 0; i < max_thread_num; i++) //3
{ //建立線程 保存線程id pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);
} }
void * thread_routine (void *arg)
{
while (1)
{
pthread_mutex_lock (&(pool->queue_lock)); //鎖互斥鎖
while (pool->cur_task_size == 0 && !pool->shutdown)
{ //1.當前沒有任務,且線程池處於非關閉
printf ("thread 0x%x is waiting\n", pthread_self ());
//等待條件成熟函數->線程等待
pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
}
if (pool->shutdown) /*2.線程池要銷燬了*/
{
/*遇到break,continue,return等跳轉語句,千萬不要忘記先解鎖*/
pthread_mutex_unlock (&(pool->queue_lock)); //解互斥鎖
printf ("thread 0x%x will exit\n", pthread_self ());
pthread_exit (NULL); //線程退出函數
}
/*待處理任務減1,並取出鏈表中的頭元素*/ 3.處理任務
pool->cur_task_size--; //等待任務-1
Cthread_task *task = pool->queue_head; //取第一個任務處理
pool->queue_head = task->next; //鏈表頭指向下一個任務
pthread_mutex_unlock (&(pool->queue_lock)); //解互斥鎖
/*調用回調函數,執行任務*/
(*(task->process)) (task->arg);
free (task);
task = NULL;
}
pthread_exit (NULL);
}
int pool_add_task (void *(*process) (int arg), int arg) { /*構造一個新任務 初始化*/ Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task)); task->process = process; task->arg = arg; task->next = NULL; pthread_mutex_lock (&(pool->queue_lock)); ~~~~~~ pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); //條件成熟函數->喚醒線程
return 0; } void * process(int arg) //讀取操做符,讀取對應的命令響應
{
if(cmd == 'Q')
{
/*關閉SSL*/
SSL_shutdown(ssl);
SSL_free(ssl);
close(tmp_fd);
break;
}
else
handle(cmd,ssl);
}