c++簡單線程池實現

線程池,簡單來講就是有一堆已經建立好的線程(最大數目必定),初始時他們都處於空閒狀態,當有新的任務進來,從線程池中取出一個空閒的線程處理任務,而後當任務處理完成以後,該線程被從新放回到線程池中,供其餘的任務使用,當線程池中的線程都在處理任務時,就沒有空閒線程供使用,此時,如有新的任務產生,只能等待線程池中有線程結束任務空閒才能執行,下面是線程池的工做原理圖:c++

咱們爲何要使用線程池呢?多線程

簡單來講就是線程自己存在開銷,咱們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程原本就是可重用的資源,不須要每次使用時都進行初始化,所以能夠採用有限的線程個數處理無限的任務。函數

 

廢話少說,直接上代碼測試

首先是用條件變量和互斥量封裝的一個狀態,用於保護線程池的狀態ui

condition.h線程

複製代碼

#ifndef _CONDITION_H_
#define _CONDITION_H_

#include <pthread.h>

//封裝一個互斥量和條件變量做爲狀態
typedef struct condition
{
    pthread_mutex_t pmutex;
    pthread_cond_t pcond;
}condition_t;

//對狀態的操做函數
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t* cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);

#endif

複製代碼

condition.c指針

複製代碼

#include "condition.h"

//初始化
int condition_init(condition_t *cond)
{
    int status;
    if((status = pthread_mutex_init(&cond->pmutex, NULL)))
        return status;
    
    if((status = pthread_cond_init(&cond->pcond, NULL)))
        return status;
    
    return 0;
}

//加鎖
int condition_lock(condition_t *cond)
{
    return pthread_mutex_lock(&cond->pmutex);
}

//解鎖
int condition_unlock(condition_t *cond)
{
    return pthread_mutex_unlock(&cond->pmutex);
}

//等待
int condition_wait(condition_t *cond)
{
    return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

//固定時間等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
    return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

//喚醒一個睡眠線程
int condition_signal(condition_t* cond)
{
    return pthread_cond_signal(&cond->pcond);
}

//喚醒全部睡眠線程
int condition_broadcast(condition_t *cond)
{
    return pthread_cond_broadcast(&cond->pcond);
}

//釋放
int condition_destroy(condition_t *cond)
{
    int status;
    if((status = pthread_mutex_destroy(&cond->pmutex)))
        return status;
    
    if((status = pthread_cond_destroy(&cond->pcond)))
        return status;
        
    return 0;
}

複製代碼

而後是線程池對應的threadpool.h和threadpool.c對象

複製代碼

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

//線程池頭文件

#include "condition.h"

//封裝線程池中的對象須要執行的任務對象
typedef struct task
{
    void *(*run)(void *args);  //函數指針,須要執行的任務
    void *arg;              //參數
    struct task *next;      //任務隊列中下一個任務
}task_t;


//下面是線程池結構體
typedef struct threadpool
{
    condition_t ready;    //狀態量
    task_t *first;       //任務隊列中第一個任務
    task_t *last;        //任務隊列中最後一個任務
    int counter;         //線程池中已有線程數
    int idle;            //線程池中kongxi線程數
    int max_threads;     //線程池最大線程數
    int quit;            //是否退出標誌
}threadpool_t;


//線程池初始化
void threadpool_init(threadpool_t *pool, int threads);

//往線程池中加入任務
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);

//摧毀線程池
void threadpool_destroy(threadpool_t *pool);

#endif

複製代碼

複製代碼

#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>

//建立的線程執行
void *thread_routine(void *arg)
{
    struct timespec abstime;
    int timeout;
    printf("thread %d is starting\n", (int)pthread_self());
    threadpool_t *pool = (threadpool_t *)arg;
    while(1)
    {
        timeout = 0;
        //訪問線程池以前須要加鎖
        condition_lock(&pool->ready);
        //空閒
        pool->idle++;
        //等待隊列有任務到來 或者 收到線程池銷燬通知
        while(pool->first == NULL && !pool->quit)
        {
            //不然線程阻塞等待
            printf("thread %d is waiting\n", (int)pthread_self());
            //獲取從當前時間,並加上等待時間, 設置進程的超時睡眠時間
            clock_gettime(CLOCK_REALTIME, &abstime);  
            abstime.tv_sec += 2;
            int status;
            status = condition_timedwait(&pool->ready, &abstime);  //該函數會解鎖,容許其餘線程訪問,當被喚醒時,加鎖
            if(status == ETIMEDOUT)
            {
                printf("thread %d wait timed out\n", (int)pthread_self());
                timeout = 1;
                break;
            }
        }
        
        pool->idle--;
        if(pool->first != NULL)
        {
            //取出等待隊列最前的任務,移除任務,並執行任務
            task_t *t = pool->first;
            pool->first = t->next;
            //因爲任務執行須要消耗時間,先解鎖讓其餘線程訪問線程池
            condition_unlock(&pool->ready);
            //執行任務
            t->run(t->arg);
            //執行完任務釋放內存
            free(t);
            //從新加鎖
            condition_lock(&pool->ready);
        }
        
        //退出線程池
        if(pool->quit && pool->first == NULL)
        {
            pool->counter--;//當前工做的線程數-1
            //若線程池中沒有線程,通知等待線程(主線程)所有任務已經完成
            if(pool->counter == 0)
            {
                condition_signal(&pool->ready);
            }
            condition_unlock(&pool->ready);
            break;
        }
        //超時,跳出銷燬線程
        if(timeout == 1)
        {
            pool->counter--;//當前工做的線程數-1
            condition_unlock(&pool->ready);
            break;
        }
        
        condition_unlock(&pool->ready);
    }
    
    printf("thread %d is exiting\n", (int)pthread_self());
    return NULL;
    
}


//線程池初始化
void threadpool_init(threadpool_t *pool, int threads)
{
    
    condition_init(&pool->ready);
    pool->first = NULL;
    pool->last =NULL;
    pool->counter =0;
    pool->idle =0;
    pool->max_threads = threads;
    pool->quit =0;
    
}


//增長一個任務到線程池
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
    //產生一個新的任務
    task_t *newtask = (task_t *)malloc(sizeof(task_t));
    newtask->run = run;
    newtask->arg = arg;
    newtask->next=NULL;//新加的任務放在隊列尾端
    
    //線程池的狀態被多個線程共享,操做前須要加鎖
    condition_lock(&pool->ready);
    
    if(pool->first == NULL)//第一個任務加入
    {
        pool->first = newtask;
    }        
    else    
    {
        pool->last->next = newtask;
    }
    pool->last = newtask;  //隊列尾指向新加入的線程
    
    //線程池中有線程空閒,喚醒
    if(pool->idle > 0)
    {
        condition_signal(&pool->ready);
    }
    //當前線程池中線程個數沒有達到設定的最大值,建立一個新的線性
    else if(pool->counter < pool->max_threads)
    {
        pthread_t tid;
        pthread_create(&tid, NULL, thread_routine, pool);
        pool->counter++;
    }
    //結束,訪問
    condition_unlock(&pool->ready);
}

//線程池銷燬
void threadpool_destroy(threadpool_t *pool)
{
    //若是已經調用銷燬,直接返回
    if(pool->quit)
    {
    return;
    }
    //加鎖
    condition_lock(&pool->ready);
    //設置銷燬標記爲1
    pool->quit = 1;
    //線程池中線程個數大於0
    if(pool->counter > 0)
    {
        //對於等待的線程,發送信號喚醒
        if(pool->idle > 0)
        {
            condition_broadcast(&pool->ready);
        }
        //正在執行任務的線程,等待他們結束任務
        while(pool->counter)
        {
            condition_wait(&pool->ready);
        }
    }
    condition_unlock(&pool->ready);
    condition_destroy(&pool->ready);
}

複製代碼

測試代碼:blog

複製代碼

#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>

void* mytask(void *arg)
{
    printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);
    sleep(1);
    free(arg);
    return NULL;
}

//測試代碼
int main(void)
{
    threadpool_t pool;
    //初始化線程池,最多三個線程
    threadpool_init(&pool, 3);
    int i;
    //建立十個任務
    for(i=0; i < 10; i++)
    {
        int *arg = malloc(sizeof(int));
        *arg = i;
        threadpool_add_task(&pool, mytask, arg);
        
    }
    threadpool_destroy(&pool);
    return 0;
}

複製代碼

輸出結果:隊列

能夠看出程序前後建立了三個線程進行工做,當沒有任務空閒時,等待2s直接退出銷燬線程

標籤: 線程池c++

相關文章
相關標籤/搜索