本篇文章由導學寶轉自:http://www.cnblogs.com/freezee/archive/2012/02/29/2373958.html
html
What I write, what I lose.socket
以前有點時間, 從新熟悉Linux的進程間通信的東西.ide
因而想起以前項目中本身寫啦個很簡單的線程池. 函數
此次想從新寫下.測試
主要目的是用進程間或者線程間通訊的阻塞/取消阻塞方法實現對線程池線程的等待做業和開始做業.this
算是對這些代碼的一種實踐.spa
以上..net
===================================================================線程
我對一個簡單線程池的一些理解.code
1.建立大量的線程.
2.工做線程的執行體功能爲:
while(1)
{
//按照必定條件(A)阻塞.
//按照任務的參數設置開始執行任務.
}
3.控制線程的功能爲.
{
//接受新任務的參數, 通常爲回調函數+參數. (爲保持兼容, 我設置的格式爲 (void*)(*thread_task)(void*) + void* . 跟線程建立保持形式兼容.)
//按照必定規則查找空閒的線程.
//將接受的新任務參數賦給這條線程數據體.
//解除這條線程的阻塞條件.
}
===================================================================
common-thread-pool.c 線程池主要實現+一個簡單的測試代碼.
接口沒有拿出來.
thread-control.h 提供線程池線程的等待做業和開始做業接口.
thread-control-xxxxx.c thread-control.h的接口實現. 能夠使用多種方式.
common-thread-pool.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <assert.h> #include <sys/types.h>#define DBGPRINTF_DEBUG printf#define DBGPRINTF_ERROR printf#define ASSERT assert #include "thread-control.h" typedef void*(*thread_task_func)(void* arg);/*線程執行任務的數據.*/ struct _thread_task_t { int taskid; /*任務id.*/ thread_task_func task_func; /*任務函數及參數*/ void* task_arg; }; typedef struct _thread_task_t thread_task_t;/*線程狀態.*/ typedef enum { ethread_status_unknown = , ethread_status_idle , ethread_status_running , ethread_status_terminel , ethread_status_cannotuse , }thread_status_e;/*線程數據.*/ struct _thread_data_t { int thread_id; pthread_t pid; thread_status_e status; thread_task_t thread_task; THREAD_CONTROL thread_control; }; typedef struct _thread_data_t thread_data_t;/*線程池數據.*/ struct _thread_pool_t { thread_data_t* thread_data_set; int num_thread; int taskid_base; pthread_mutex_t thread_pool_lock; }; typedef struct _thread_pool_t thread_pool_t; thread_pool_t g_thread_pool;/*設置線程狀態.*/ int thread_pool_setthreadstatus(thread_data_t* thread_data, thread_status_e status) { thread_pool_t* thread_pool = &g_thread_pool; pthread_mutex_lock(&(thread_pool->thread_pool_lock)); thread_data->status = status; pthread_mutex_unlock(&(thread_pool->thread_pool_lock)); return ; }/*線程池線程函數體.*/ void* thread_pool_func(void* arg) { sleep(1); //Wait pthread_t count. thread_data_t* thread_data = (thread_data_t*)arg; DBGPRINTF_DEBUG("Thread start run. Thread_id = %d, pid = 0x%x . \n", thread_data->thread_id, (unsigned int)thread_data->pid); /* Continue to wait the task, then based on new task_func and task_arg to perform this task. */ while(1) { thread_control_wait(thread_data->thread_control); //Need to lock? Yes. thread_pool_setthreadstatus(thread_data, ethread_status_running); DBGPRINTF_DEBUG("Task start. taskid = %d .\n", thread_data->thread_task.taskid); thread_data->thread_task.task_func(thread_data->thread_task.task_arg); DBGPRINTF_DEBUG("Task end. taskid = %d .\n", thread_data->thread_task.taskid); //Need to lock?Yes. thread_pool_setthreadstatus(thread_data, ethread_status_idle); } DBGPRINTF_DEBUG("Thread end run. Thread_id = %d, pid = 0x%x . \n", thread_data->thread_id, (unsigned int)thread_data->pid); }int thread_task_init(thread_task_t* thread_task) { thread_task->taskid = -1; thread_task->task_func = NULL; thread_task->task_arg = NULL; return ; }int thread_data_init(thread_data_t* thread_data) { thread_data->thread_id = -1; thread_data->pid = 0x0; thread_data->status = ethread_status_unknown , thread_task_init(&(thread_data->thread_task)); thread_control_init(&(thread_data->thread_control)); return ; }int thread_pool_create(int num_thread) { ASSERT(num_thread > && num_thread <= 10*1024); thread_pool_t* thread_pool = &g_thread_pool; int i = ; thread_pool->thread_data_set = (thread_data_t*)malloc(sizeof(thread_data_t) * num_thread); ASSERT(thread_pool->thread_data_set != NULL); thread_pool->num_thread = num_thread; thread_pool->taskid_base = -1; pthread_mutex_init(&(thread_pool->thread_pool_lock), NULL); for(i=; i<num_thread; i++) { thread_data_t* thread_data = thread_pool->thread_data_set+i; thread_data_init(thread_data); thread_data->thread_id = i; thread_data->status = ethread_status_idle; /* pthread_create set to detached. */ pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); int ret = pthread_create(&(thread_data->pid), &attr, thread_pool_func, thread_data); if(ret != ) { DBGPRINTF_DEBUG("pthread_create error[%d].\n", i); break; } } sleep(2); return ; }void* test_func(void* arg) { int t_sleep = (int)arg; DBGPRINTF_DEBUG("Test func. Sleep %d .\n", t_sleep); /* int a[2048*1024] = {0}; int i = 0; for(i=0; i<2028*1024; i++) { a[i] = i*i; } DBGPRINTF_DEBUG("a[0]=%d. \n", a[0]); */ sleep(t_sleep); DBGPRINTF_DEBUG("Test func finished. \n"); return NULL; }/*查詢可接收任務的線程.*/ int thread_pool_queryfree(thread_data_t** thread_data_found) { *thread_data_found = NULL; thread_pool_t* thread_pool = &g_thread_pool; pthread_mutex_lock(&(thread_pool->thread_pool_lock)); int i = ; for(i=; i<thread_pool->num_thread; i++) { thread_data_t* thread_data = thread_pool->thread_data_set+i; if(thread_data->status == ethread_status_idle) { *thread_data_found = thread_data; break; } } pthread_mutex_unlock(&(thread_pool->thread_pool_lock)); return ; }/*分配taskid.*/ int thread_pool_gettaskid(int* taskid) { thread_pool_t* thread_pool = &g_thread_pool; pthread_mutex_lock(&(thread_pool->thread_pool_lock)); thread_pool->taskid_base ++; *taskid = thread_pool->taskid_base; pthread_mutex_unlock(&(thread_pool->thread_pool_lock)); return ; }/*向線程池增長任務.*/ int thread_pool_addtask(thread_task_func task_func, void* arg) { /* Find a free thread. */ thread_data_t* thread_data_found = NULL; thread_pool_queryfree(&thread_data_found); if(thread_data_found != NULL) { DBGPRINTF_DEBUG("Thread [%d] perferm this task.\n", thread_data_found->thread_id); /* Set task data. */ thread_data_found->thread_task.task_func = task_func; thread_data_found->thread_task.task_arg = arg; thread_pool_gettaskid(&(thread_data_found->thread_task.taskid)); /* Start the task. */ thread_pool_setthreadstatus(thread_data_found, ethread_status_running); thread_control_start(thread_data_found->thread_control); DBGPRINTF_DEBUG("Thread [%d] Add task[%d] finished.\n", thread_data_found->thread_id, thread_data_found->thread_task.taskid); } else { DBGPRINTF_ERROR("Thread pool full. Task not added.\n"); } return ; }int main() { thread_pool_create(10); //thread_pool_create(10); thread_pool_addtask(test_func, (void*)(1<<)); thread_pool_addtask(test_func, (void*)(1<<1)); thread_pool_addtask(test_func, (void*)(1<<2)); thread_pool_addtask(test_func, (void*)(1<<3)); thread_pool_addtask(test_func, (void*)(1<<4)); thread_pool_addtask(test_func, (void*)(1<<5)); thread_pool_addtask(test_func, (void*)(1<<6)); thread_pool_addtask(test_func, (void*)(1<<7)); sleep(6); thread_pool_addtask(test_func, (void*)(1<<)); thread_pool_addtask(test_func, (void*)(1<<1)); thread_pool_addtask(test_func, (void*)(1<<2)); thread_pool_addtask(test_func, (void*)(1<<3)); thread_pool_addtask(test_func, (void*)(1<<4)); thread_pool_addtask(test_func, (void*)(1<<5)); thread_pool_addtask(test_func, (void*)(1<<6)); thread_pool_addtask(test_func, (void*)(1<<7)); sleep(100000); return ; }
thread-control.h
#define THREAD_CONTROL void*int thread_control_init(THREAD_CONTROL* thread_control);int thread_control_deinit(THREAD_CONTROL* thread_control);int thread_control_wait(THREAD_CONTROL thread_control);int thread_control_start(THREAD_CONTROL thread_control);
thread-control.h的接口實現. 能夠使用多種方式.
只要進程間通訊/線程間通訊中存在阻塞等待/解除阻塞等待的均可以拿來做實驗.
好比:條件變量.
thread-control-condition.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <assert.h> #include <sys/types.h>#define DBGPRINTF_DEBUG printf#define DBGPRINTF_ERROR printf#define ASSERT assert #include "thread-control.h" struct _thread_control_cond_t { pthread_mutex_t lock; pthread_cond_t condition; }; typedef struct _thread_control_cond_t thread_control_cond_t;int thread_control_init(THREAD_CONTROL* thread_control) { *thread_control = NULL; thread_control_cond_t* cond = (thread_control_cond_t*)malloc(sizeof(thread_control_cond_t)); assert(cond != NULL); pthread_mutex_init(&(cond->lock), NULL); pthread_cond_init(&(cond->condition), NULL); *thread_control = cond; return ; }int thread_control_deinit(THREAD_CONTROL* thread_control) { thread_control_cond_t* cond = (thread_control_cond_t*)(*thread_control); pthread_mutex_destroy(&(cond->lock)); pthread_cond_destroy(&(cond->condition)); free(cond); *thread_control = NULL; return ; }int thread_control_wait(THREAD_CONTROL thread_control) { thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control); //Wait pthread condition. pthread_mutex_lock(&(cond->lock)); pthread_cond_wait(&(cond->condition), &(cond->lock)); pthread_mutex_unlock(&(cond->lock)); return ; }int thread_control_start(THREAD_CONTROL thread_control) { thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control); //start pthread condition. pthread_mutex_lock(&(cond->lock)); pthread_cond_signal(&(cond->condition)); pthread_mutex_unlock(&(cond->lock)); return ; }
好比:有名管道.
thread-control-fifopipe.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <assert.h> #include <fcntl.h> #include <sys/types.h> #include <sys/stat.h>#define DBGPRINTF_DEBUG printf#define DBGPRINTF_ERROR printf#define ASSERT assert #include "thread-control.h" static int path_index = ;#define LEN_CMD_PATH 10struct _fifopipe_control_t { char fifopipe_cmd_path[LEN_CMD_PATH]; }; typedef struct _fifopipe_control_t fifopipe_control_t;int thread_control_init(THREAD_CONTROL* thread_control) { *thread_control = NULL; fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)malloc(sizeof(fifopipe_control_t)); assert(fifopipe_control != NULL); path_index ++; snprintf(fifopipe_control->fifopipe_cmd_path, LEN_CMD_PATH, "./xxx%d", path_index); int ret = mkfifo(fifopipe_control->fifopipe_cmd_path, 0666/*(O_CREAT | O_RDWR)*/); assert(ret == ); *thread_control = fifopipe_control; return ; }int thread_control_deinit(THREAD_CONTROL* thread_control) { fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(*thread_control); free(fifopipe_control); *thread_control = NULL; return ; }int thread_control_wait(THREAD_CONTROL thread_control) { fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control); int fd = open(fifopipe_control->fifopipe_cmd_path, O_RDONLY, ); assert(fd>); char tmp = ; read(fd, &tmp, 1); return ; }int thread_control_start(THREAD_CONTROL thread_control) { fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control); int fd = open(fifopipe_control->fifopipe_cmd_path, O_WRONLY, ); assert(fd>); char tmp = ; write(fd, &tmp, 1); return ; }
好比:管道, 消息隊列, socket, while(condition?){sleep}等等.
以上代碼中, 註釋的比較少.
差很少.其實我都有點不知道本身在寫什麼.