線程池,簡單來講就是有一堆已經建立好的線程(最大數目必定),初始時他們都處於空閒狀態,當有新的任務進來,從線程池中取出一個空閒的線程處理任務,而後當任務處理完成以後,該線程被從新放回到線程池中,供其餘的任務使用,當線程池中的線程都在處理任務時,就沒有空閒線程供使用,此時,如有新的任務產生,只能等待線程池中有線程結束任務空閒才能執行,下面是線程池的工做原理圖:c++
咱們爲何要使用線程池呢?多線程
簡單來講就是線程自己存在開銷,咱們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程原本就是可重用的資源,不須要每次使用時都進行初始化,所以能夠採用有限的線程個數處理無限的任務。函數
廢話少說,直接上代碼測試
首先是用條件變量和互斥量封裝的一個狀態,用於保護線程池的狀態ui
condition.h線程
#ifndef _CONDITION_H_ #define _CONDITION_H_ #include <pthread.h> //封裝一個互斥量和條件變量做爲狀態 typedef struct condition { pthread_mutex_t pmutex; pthread_cond_t pcond; }condition_t; //對狀態的操做函數 int condition_init(condition_t *cond); int condition_lock(condition_t *cond); int condition_unlock(condition_t *cond); int condition_wait(condition_t *cond); int condition_timedwait(condition_t *cond, const struct timespec *abstime); int condition_signal(condition_t* cond); int condition_broadcast(condition_t *cond); int condition_destroy(condition_t *cond); #endif
condition.c指針
#include "condition.h" //初始化 int condition_init(condition_t *cond) { int status; if((status = pthread_mutex_init(&cond->pmutex, NULL))) return status; if((status = pthread_cond_init(&cond->pcond, NULL))) return status; return 0; } //加鎖 int condition_lock(condition_t *cond) { return pthread_mutex_lock(&cond->pmutex); } //解鎖 int condition_unlock(condition_t *cond) { return pthread_mutex_unlock(&cond->pmutex); } //等待 int condition_wait(condition_t *cond) { return pthread_cond_wait(&cond->pcond, &cond->pmutex); } //固定時間等待 int condition_timedwait(condition_t *cond, const struct timespec *abstime) { return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime); } //喚醒一個睡眠線程 int condition_signal(condition_t* cond) { return pthread_cond_signal(&cond->pcond); } //喚醒全部睡眠線程 int condition_broadcast(condition_t *cond) { return pthread_cond_broadcast(&cond->pcond); } //釋放 int condition_destroy(condition_t *cond) { int status; if((status = pthread_mutex_destroy(&cond->pmutex))) return status; if((status = pthread_cond_destroy(&cond->pcond))) return status; return 0; }
而後是線程池對應的threadpool.h和threadpool.c對象
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ //線程池頭文件 #include "condition.h" //封裝線程池中的對象須要執行的任務對象 typedef struct task { void *(*run)(void *args); //函數指針,須要執行的任務 void *arg; //參數 struct task *next; //任務隊列中下一個任務 }task_t; //下面是線程池結構體 typedef struct threadpool { condition_t ready; //狀態量 task_t *first; //任務隊列中第一個任務 task_t *last; //任務隊列中最後一個任務 int counter; //線程池中已有線程數 int idle; //線程池中kongxi線程數 int max_threads; //線程池最大線程數 int quit; //是否退出標誌 }threadpool_t; //線程池初始化 void threadpool_init(threadpool_t *pool, int threads); //往線程池中加入任務 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg); //摧毀線程池 void threadpool_destroy(threadpool_t *pool); #endif
#include "threadpool.h" #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <time.h> //建立的線程執行 void *thread_routine(void *arg) { struct timespec abstime; int timeout; printf("thread %d is starting\n", (int)pthread_self()); threadpool_t *pool = (threadpool_t *)arg; while(1) { timeout = 0; //訪問線程池以前須要加鎖 condition_lock(&pool->ready); //空閒 pool->idle++; //等待隊列有任務到來 或者 收到線程池銷燬通知 while(pool->first == NULL && !pool->quit) { //不然線程阻塞等待 printf("thread %d is waiting\n", (int)pthread_self()); //獲取從當前時間,並加上等待時間, 設置進程的超時睡眠時間 clock_gettime(CLOCK_REALTIME, &abstime); abstime.tv_sec += 2; int status; status = condition_timedwait(&pool->ready, &abstime); //該函數會解鎖,容許其餘線程訪問,當被喚醒時,加鎖 if(status == ETIMEDOUT) { printf("thread %d wait timed out\n", (int)pthread_self()); timeout = 1; break; } } pool->idle--; if(pool->first != NULL) { //取出等待隊列最前的任務,移除任務,並執行任務 task_t *t = pool->first; pool->first = t->next; //因爲任務執行須要消耗時間,先解鎖讓其餘線程訪問線程池 condition_unlock(&pool->ready); //執行任務 t->run(t->arg); //執行完任務釋放內存 free(t); //從新加鎖 condition_lock(&pool->ready); } //退出線程池 if(pool->quit && pool->first == NULL) { pool->counter--;//當前工做的線程數-1 //若線程池中沒有線程,通知等待線程(主線程)所有任務已經完成 if(pool->counter == 0) { condition_signal(&pool->ready); } condition_unlock(&pool->ready); break; } //超時,跳出銷燬線程 if(timeout == 1) { pool->counter--;//當前工做的線程數-1 condition_unlock(&pool->ready); break; } condition_unlock(&pool->ready); } printf("thread %d is exiting\n", (int)pthread_self()); return NULL; } //線程池初始化 void threadpool_init(threadpool_t *pool, int threads) { condition_init(&pool->ready); pool->first = NULL; pool->last =NULL; pool->counter =0; pool->idle =0; pool->max_threads = threads; pool->quit =0; } //增長一個任務到線程池 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) { //產生一個新的任務 task_t *newtask = (task_t *)malloc(sizeof(task_t)); newtask->run = run; newtask->arg = arg; newtask->next=NULL;//新加的任務放在隊列尾端 //線程池的狀態被多個線程共享,操做前須要加鎖 condition_lock(&pool->ready); if(pool->first == NULL)//第一個任務加入 { pool->first = newtask; } else { pool->last->next = newtask; } pool->last = newtask; //隊列尾指向新加入的線程 //線程池中有線程空閒,喚醒 if(pool->idle > 0) { condition_signal(&pool->ready); } //當前線程池中線程個數沒有達到設定的最大值,建立一個新的線性 else if(pool->counter < pool->max_threads) { pthread_t tid; pthread_create(&tid, NULL, thread_routine, pool); pool->counter++; } //結束,訪問 condition_unlock(&pool->ready); } //線程池銷燬 void threadpool_destroy(threadpool_t *pool) { //若是已經調用銷燬,直接返回 if(pool->quit) { return; } //加鎖 condition_lock(&pool->ready); //設置銷燬標記爲1 pool->quit = 1; //線程池中線程個數大於0 if(pool->counter > 0) { //對於等待的線程,發送信號喚醒 if(pool->idle > 0) { condition_broadcast(&pool->ready); } //正在執行任務的線程,等待他們結束任務 while(pool->counter) { condition_wait(&pool->ready); } } condition_unlock(&pool->ready); condition_destroy(&pool->ready); }
測試代碼:blog
#include "threadpool.h" #include <unistd.h> #include <stdlib.h> #include <stdio.h> void* mytask(void *arg) { printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg); sleep(1); free(arg); return NULL; } //測試代碼 int main(void) { threadpool_t pool; //初始化線程池,最多三個線程 threadpool_init(&pool, 3); int i; //建立十個任務 for(i=0; i < 10; i++) { int *arg = malloc(sizeof(int)); *arg = i; threadpool_add_task(&pool, mytask, arg); } threadpool_destroy(&pool); return 0; }
輸出結果:隊列
能夠看出程序前後建立了三個線程進行工做,當沒有任務空閒時,等待2s直接退出銷燬線程