簡單實現LINUX C下的線程池.

本篇文章由導學寶轉自:http://www.cnblogs.com/freezee/archive/2012/02/29/2373958.html
html

What I write, what I lose.socket

 

以前有點時間, 從新熟悉Linux的進程間通信的東西.ide

因而想起以前項目中本身寫啦個很簡單的線程池. 函數

此次想從新寫下.測試

主要目的是用進程間或者線程間通訊的阻塞/取消阻塞方法實現對線程池線程的等待做業和開始做業.this

算是對這些代碼的一種實踐.spa

以上..net

===================================================================線程

我對一個簡單線程池的一些理解.code

1.建立大量的線程.

2.工做線程的執行體功能爲:

while(1)

{

//按照必定條件(A)阻塞.

 

//按照任務的參數設置開始執行任務.

}

3.控制線程的功能爲.

{

//接受新任務的參數, 通常爲回調函數+參數. (爲保持兼容, 我設置的格式爲 (void*)(*thread_task)(void*) + void* . 跟線程建立保持形式兼容.)

//按照必定規則查找空閒的線程.

//將接受的新任務參數賦給這條線程數據體.

//解除這條線程的阻塞條件.

}

 

===================================================================

common-thread-pool.c     線程池主要實現+一個簡單的測試代碼.

   接口沒有拿出來.

thread-control.h      提供線程池線程的等待做業和開始做業接口.

thread-control-xxxxx.c     thread-control.h的接口實現. 能夠使用多種方式.

 

 

common-thread-pool.c 

View Code
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>#define DBGPRINTF_DEBUG printf#define DBGPRINTF_ERROR printf#define ASSERT  assert


#include "thread-control.h" typedef void*(*thread_task_func)(void* arg);/*線程執行任務的數據.*/ struct _thread_task_t
{
    int taskid;                     /*任務id.*/     thread_task_func task_func;     /*任務函數及參數*/     void* task_arg;
};
typedef struct _thread_task_t thread_task_t;/*線程狀態.*/ typedef enum {
    ethread_status_unknown = ,
    ethread_status_idle ,
    ethread_status_running ,
    ethread_status_terminel ,
    ethread_status_cannotuse ,
}thread_status_e;/*線程數據.*/ struct _thread_data_t
{
    int thread_id;
    pthread_t pid;
    thread_status_e status;

    thread_task_t thread_task;
    THREAD_CONTROL thread_control;
};
typedef struct _thread_data_t thread_data_t;/*線程池數據.*/ struct _thread_pool_t
{
    thread_data_t* thread_data_set;
    int num_thread;
    int taskid_base;

    pthread_mutex_t thread_pool_lock;
};
typedef struct _thread_pool_t thread_pool_t;

thread_pool_t g_thread_pool;/*設置線程狀態.*/ int thread_pool_setthreadstatus(thread_data_t* thread_data, thread_status_e status)
{
    thread_pool_t* thread_pool = &g_thread_pool;
    pthread_mutex_lock(&(thread_pool->thread_pool_lock));

    thread_data->status = status;

    pthread_mutex_unlock(&(thread_pool->thread_pool_lock));

    return ;
}/*線程池線程函數體.*/ void* thread_pool_func(void* arg)
{
    sleep(1);   //Wait pthread_t count.      thread_data_t* thread_data = (thread_data_t*)arg;
    DBGPRINTF_DEBUG("Thread start run. Thread_id = %d, pid = 0x%x . \n", 
            thread_data->thread_id, (unsigned int)thread_data->pid); 

    /* Continue to wait the task, then based on new task_func and task_arg to perform this task. */     while(1)
    {
        thread_control_wait(thread_data->thread_control);
        
        //Need to lock? Yes.         thread_pool_setthreadstatus(thread_data, ethread_status_running);
        DBGPRINTF_DEBUG("Task start. taskid = %d .\n", thread_data->thread_task.taskid);

        thread_data->thread_task.task_func(thread_data->thread_task.task_arg);

        DBGPRINTF_DEBUG("Task end. taskid = %d .\n", thread_data->thread_task.taskid);
        //Need to lock?Yes.         thread_pool_setthreadstatus(thread_data, ethread_status_idle);
    }

    DBGPRINTF_DEBUG("Thread end run. Thread_id = %d, pid = 0x%x . \n", 
            thread_data->thread_id, (unsigned int)thread_data->pid); 
}int thread_task_init(thread_task_t* thread_task)
{

    thread_task->taskid             = -1;
    thread_task->task_func          = NULL;
    thread_task->task_arg           = NULL;

    return ;
}int thread_data_init(thread_data_t* thread_data)
{
    thread_data->thread_id = -1;
    thread_data->pid = 0x0;
    thread_data->status = ethread_status_unknown ,

    thread_task_init(&(thread_data->thread_task));
    thread_control_init(&(thread_data->thread_control));

    return ;
}int thread_pool_create(int num_thread)
{
    ASSERT(num_thread >  && num_thread <= 10*1024);
    thread_pool_t* thread_pool = &g_thread_pool;
    
    int i = ;
    thread_pool->thread_data_set = (thread_data_t*)malloc(sizeof(thread_data_t) * num_thread);
    ASSERT(thread_pool->thread_data_set != NULL);
    thread_pool->num_thread = num_thread;
    thread_pool->taskid_base = -1;
    pthread_mutex_init(&(thread_pool->thread_pool_lock), NULL);

    for(i=; i<num_thread; i++)
    {
        thread_data_t* thread_data = thread_pool->thread_data_set+i;
        thread_data_init(thread_data);
        thread_data->thread_id = i;
        thread_data->status = ethread_status_idle;

        /* pthread_create set to detached. */         pthread_attr_t attr;
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
        int ret = pthread_create(&(thread_data->pid), &attr, thread_pool_func, thread_data);
        if(ret != )
        {
            DBGPRINTF_DEBUG("pthread_create error[%d].\n", i);
            break;
        }
    }

    sleep(2);

    return ;
}void* test_func(void* arg)
{
    int t_sleep = (int)arg;
    DBGPRINTF_DEBUG("Test func. Sleep %d .\n", t_sleep);
    /* int a[2048*1024] = {0}; int i = 0; for(i=0; i<2028*1024; i++) { a[i] = i*i; } DBGPRINTF_DEBUG("a[0]=%d. \n", a[0]); */     sleep(t_sleep);

    DBGPRINTF_DEBUG("Test func finished. \n");

    return NULL;
}/*查詢可接收任務的線程.*/ int thread_pool_queryfree(thread_data_t** thread_data_found)
{
    *thread_data_found = NULL;
    thread_pool_t* thread_pool = &g_thread_pool;
    pthread_mutex_lock(&(thread_pool->thread_pool_lock));

    int i = ;
    for(i=; i<thread_pool->num_thread; i++)
    {
        thread_data_t* thread_data = thread_pool->thread_data_set+i;
        if(thread_data->status == ethread_status_idle)
        {
            *thread_data_found = thread_data;
            break;
        }
    }

    pthread_mutex_unlock(&(thread_pool->thread_pool_lock));

    return ;
}/*分配taskid.*/ int thread_pool_gettaskid(int* taskid)
{
    thread_pool_t* thread_pool = &g_thread_pool;
    pthread_mutex_lock(&(thread_pool->thread_pool_lock));

    thread_pool->taskid_base ++;
    *taskid = thread_pool->taskid_base;

    pthread_mutex_unlock(&(thread_pool->thread_pool_lock));

    return ;
}/*向線程池增長任務.*/ int thread_pool_addtask(thread_task_func task_func, void* arg)
{
    /* Find a free thread. */     thread_data_t* thread_data_found = NULL;
    thread_pool_queryfree(&thread_data_found);

    if(thread_data_found != NULL)
    {
        DBGPRINTF_DEBUG("Thread [%d] perferm this task.\n", thread_data_found->thread_id);

        /* Set task data. */         thread_data_found->thread_task.task_func    = task_func;       
        thread_data_found->thread_task.task_arg     = arg;       
        thread_pool_gettaskid(&(thread_data_found->thread_task.taskid));

        /* Start the task. */         thread_pool_setthreadstatus(thread_data_found, ethread_status_running);
        thread_control_start(thread_data_found->thread_control);
        DBGPRINTF_DEBUG("Thread [%d] Add task[%d] finished.\n", 
                thread_data_found->thread_id, thread_data_found->thread_task.taskid);
    }
    else     {
        DBGPRINTF_ERROR("Thread pool full. Task not added.\n");
    }

    return ;
}int main()
{
    thread_pool_create(10);
    //thread_pool_create(10);     thread_pool_addtask(test_func, (void*)(1<<));
    thread_pool_addtask(test_func, (void*)(1<<1));
    thread_pool_addtask(test_func, (void*)(1<<2));
    thread_pool_addtask(test_func, (void*)(1<<3));
    thread_pool_addtask(test_func, (void*)(1<<4));
    thread_pool_addtask(test_func, (void*)(1<<5));
    thread_pool_addtask(test_func, (void*)(1<<6));
    thread_pool_addtask(test_func, (void*)(1<<7));

    sleep(6);
    thread_pool_addtask(test_func, (void*)(1<<));
    thread_pool_addtask(test_func, (void*)(1<<1));
    thread_pool_addtask(test_func, (void*)(1<<2));
    thread_pool_addtask(test_func, (void*)(1<<3));
    thread_pool_addtask(test_func, (void*)(1<<4));
    thread_pool_addtask(test_func, (void*)(1<<5));
    thread_pool_addtask(test_func, (void*)(1<<6));
    thread_pool_addtask(test_func, (void*)(1<<7));

    sleep(100000);

    return ;
}


thread-control.h

View Code
#define THREAD_CONTROL  void*int thread_control_init(THREAD_CONTROL* thread_control);int thread_control_deinit(THREAD_CONTROL* thread_control);int thread_control_wait(THREAD_CONTROL thread_control);int thread_control_start(THREAD_CONTROL thread_control);

 

thread-control.h的接口實現. 能夠使用多種方式.

只要進程間通訊/線程間通訊中存在阻塞等待/解除阻塞等待的均可以拿來做實驗.

好比:條件變量.
thread-control-condition.c   

View Code
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>#define DBGPRINTF_DEBUG printf#define DBGPRINTF_ERROR printf#define ASSERT  assert


#include "thread-control.h" struct _thread_control_cond_t
{
    pthread_mutex_t lock;
    pthread_cond_t condition;
};
typedef struct _thread_control_cond_t thread_control_cond_t;int thread_control_init(THREAD_CONTROL* thread_control)
{
    *thread_control = NULL;

    thread_control_cond_t* cond = (thread_control_cond_t*)malloc(sizeof(thread_control_cond_t)); 
    assert(cond != NULL);

    pthread_mutex_init(&(cond->lock), NULL);
    pthread_cond_init(&(cond->condition), NULL);

    *thread_control = cond;

    return ;
}int thread_control_deinit(THREAD_CONTROL* thread_control)
{

    thread_control_cond_t* cond = (thread_control_cond_t*)(*thread_control); 

    pthread_mutex_destroy(&(cond->lock));
    pthread_cond_destroy(&(cond->condition));

    free(cond);
    *thread_control = NULL;
    
    return ;
}int thread_control_wait(THREAD_CONTROL thread_control)
{
    thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control); 

    //Wait pthread condition.     pthread_mutex_lock(&(cond->lock));
    pthread_cond_wait(&(cond->condition), &(cond->lock));
    pthread_mutex_unlock(&(cond->lock));

    return ;
}int thread_control_start(THREAD_CONTROL thread_control)
{
    thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control); 

    //start pthread condition.     pthread_mutex_lock(&(cond->lock));
    pthread_cond_signal(&(cond->condition));
    pthread_mutex_unlock(&(cond->lock));

    return ;
}


好比:有名管道.

thread-control-fifopipe.c

View Code
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>#define DBGPRINTF_DEBUG printf#define DBGPRINTF_ERROR printf#define ASSERT  assert


#include "thread-control.h" static int path_index = ;#define LEN_CMD_PATH    10struct _fifopipe_control_t
{
    char fifopipe_cmd_path[LEN_CMD_PATH];
};
typedef struct _fifopipe_control_t fifopipe_control_t;int thread_control_init(THREAD_CONTROL* thread_control)
{
    *thread_control = NULL;

    fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)malloc(sizeof(fifopipe_control_t));
    assert(fifopipe_control != NULL);

    path_index ++;
    snprintf(fifopipe_control->fifopipe_cmd_path, LEN_CMD_PATH, "./xxx%d", path_index);

    int ret = mkfifo(fifopipe_control->fifopipe_cmd_path, 0666/*(O_CREAT | O_RDWR)*/);
    assert(ret == );

    *thread_control = fifopipe_control;

    return ;
}int thread_control_deinit(THREAD_CONTROL* thread_control)
{
    fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(*thread_control);
    

    free(fifopipe_control);
    *thread_control = NULL;

    return ;
}int thread_control_wait(THREAD_CONTROL thread_control)
{
    fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control);
    
    int fd = open(fifopipe_control->fifopipe_cmd_path, O_RDONLY, );
    assert(fd>);

    char tmp = ;
    read(fd, &tmp, 1);

    return ;
}int thread_control_start(THREAD_CONTROL thread_control)
{
    fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control);
    
    int fd = open(fifopipe_control->fifopipe_cmd_path, O_WRONLY, );
    assert(fd>);

    char tmp = ;
    write(fd, &tmp, 1);

    return ;
}

 

好比:管道, 消息隊列, socket, while(condition?){sleep}等等. 

 

以上代碼中, 註釋的比較少. 

差很少.其實我都有點不知道本身在寫什麼.

相關文章
相關標籤/搜索