Linux C++線程池實例

想作一個多線程服務器測試程序,所以參考了github的一些實例,而後本身動手寫了相似的代碼來加深理解。linux

目前瞭解的線程池實現有2種思路:ios

第一種:git

主進程建立必定數量的線程,並將其所有掛起,此時線程狀態爲idle,並將running態計數爲0,等到任務能夠執行了,就喚醒線程,此時線程狀態爲running,計數增長,若是計數達到最大線程數,就再建立一組空閒線程,等待新任務,上一組線程執行完退出,如此交替。github

第二種:服務器

採用生成者-消費者模式,主進程做爲生成者,建立FIFO隊列,在任務隊列尾部添加任務,線程池做爲消費者在隊列頭部取走任務執行,這之間有人會提到無鎖環形隊列,在單生成者單消費者的模式下是有效的,可是線程池確定是多消費者同時去隊列取任務,環形隊列會形成掛死。多線程

個人實例採用第二種方式實現,在某些應用場景下,容許必定時間內,任務排隊的狀況,重複利用已有線程會比較合適。ide

代碼比較佔篇幅,所以摺疊在下面。post

 task.h:測試

 1 #ifndef TASK_H
 2 #define TASK_H
 3 
 4 #include <list>
 5 #include <pthread.h>
 6 
 7 using std::list;
 8 
 9 struct task {
10    void (*function) (void *);
11    void *arguments;
12    int id;
13 };
14 
15 struct work_queue {
16    work_queue(){
17         pthread_mutex_init(&queue_lock, NULL);
18         pthread_mutex_init(&queue_read_lock, NULL);
19         pthread_cond_init(&queue_read_cond, NULL);
20         qlen = 0;
21    }
22 
23    ~work_queue() {
24        queue.clear();
25        pthread_mutex_destroy(&queue_read_lock);
26        pthread_mutex_destroy(&queue_lock);
27        pthread_cond_destroy(&queue_read_cond);
28    }
29 
30    void push(task *tsk);
31    task *pull();
32    void post();
33    void wait();
34 
35 private:
36    int qlen;
37    list< task * > queue;
38    pthread_mutex_t queue_lock;
39    pthread_mutex_t queue_read_lock;
40    pthread_cond_t  queue_read_cond;
41 };
42 
43 #endif
View Code

 task.cppthis

#include "task.h"

void work_queue::push(task *tsk) {
   pthread_mutex_lock(&queue_lock);
   queue.push_back(tsk);
   qlen++;

   pthread_cond_signal(&queue_read_cond);
   pthread_mutex_unlock(&queue_lock);
}

task* work_queue::pull() {
    wait();

    pthread_mutex_lock(&queue_lock);
    task* tsk = NULL;
    if (qlen > 0) {
        tsk = *(queue.begin());
        queue.pop_front();
        qlen--;

        if (qlen > 0)
            pthread_cond_signal(&queue_read_cond);
    }
    pthread_mutex_unlock(&queue_lock);
    return tsk;
}

void work_queue::post() {
    pthread_mutex_lock(&queue_read_lock);
    pthread_cond_broadcast(&queue_read_cond);
    pthread_mutex_unlock(&queue_read_lock);
}

void work_queue::wait() {
    pthread_mutex_lock(&queue_read_lock);
    pthread_cond_wait(&queue_read_cond, &queue_read_lock);
    pthread_mutex_unlock(&queue_read_lock);
}
View Code

threadpool.h

 1 #ifndef THREAD_POOL_H
 2 #define THREAD_POOL_H
 3 
 4 #include "task.h"
 5 #include <vector>
 6 
 7 using std::vector;
 8 
 9 #define safe_delete(p)  if (p) { delete p;  p = NULL; }
10 
11 struct threadpool {
12     threadpool(int size) : pool_size(size)
13                          , thread_list(size, pthread_t(0))
14                          , queue(NULL)
15                          , finish(false)
16                          , ready(0) {
17 
18         pthread_mutex_init(&pool_lock, NULL);
19     }
20 
21     ~threadpool() {
22         thread_list.clear();
23         safe_delete(queue);
24         pthread_mutex_destroy(&pool_lock);
25     }
26 
27     void init();
28     void destroy();
29     static void* thread_run(void *tp);
30 
31     void incr_ready();
32     void decr_ready();
33     bool close() const;
34     
35     work_queue *queue;
36     
37 private:
38     int pool_size;
39     int ready;
40     bool finish;
41     pthread_mutex_t pool_lock;
42     vector <pthread_t> thread_list;
43 };
44 
45 
46 #endif
View Code

threadpool.cpp

 1 /*
 2  * threadpool.cpp
 3  *
 4  *  Created on: 2017年3月27日
 5  *      Author: Administrator
 6  */
 7 
 8 #include "threadpool.h"
 9 
10 
11 void* threadpool::thread_run(void *tp) {
12     threadpool *pool = (threadpool *) tp;
13     pool->incr_ready();
14 
15     while(1) {
16         task* tsk = pool->queue->pull();
17         if (tsk) {
18             (tsk->function)(tsk->arguments);
19             delete tsk;
20             tsk = NULL;
21         }
22 
23         if (pool->close())
24             break;
25     }
26 
27     pool->decr_ready();
28 
29     return NULL;
30 }
31 
32 void threadpool::incr_ready() {
33     pthread_mutex_lock(&pool_lock);
34     ready++;
35     pthread_mutex_unlock(&pool_lock);
36 }
37 
38 void threadpool::decr_ready() {
39     pthread_mutex_lock(&pool_lock);
40     ready--;
41     pthread_mutex_unlock(&pool_lock);
42 }
43 
44 bool threadpool::close() const {
45     return finish;
46 }
47 
48 void threadpool::init() {
49     queue = new work_queue;
50     if (!queue) {
51         return;
52     }
53 
54     for(int i; i<pool_size; i++) {
55         pthread_create(&thread_list[i], NULL, threadpool::thread_run, (void *)this);
56     }
57 
58     while(ready != pool_size) {}
59 }
60 
61 void threadpool::destroy() {
62     finish = true;
63 
64     while(ready) {
65         if(queue) {
66             queue->post();
67         }
68     }
69 }
View Code

main.cpp

 1 //============================================================================
 2 // Name        : thread_pool.cpp
 3 // Author      : dancy
 4 // Version     :
 5 // Copyright   : Your copyright notice
 6 // Description : Hello World in C++, Ansi-style
 7 //============================================================================
 8 
 9 #include <iostream>
10 #include "threadpool.h"
11 #include <pthread.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 
15 using namespace std;
16 
17 void job(void *tsk){
18     printf("job %-2d working on Thread #%u\n", ((task *)tsk)->id, (int)pthread_self());
19 }
20 
21 task *make_task(void (*func) (void *), int id) {
22     task *tsk = new task;
23     if (!tsk)
24         return NULL;
25     
26     tsk->function = func;
27     tsk->arguments = (void *)tsk;
28     tsk->id = id;
29     
30     return tsk;
31 }
32 
33 int main() {
34     threadpool tp(4);
35     tp.init();
36 
37     for(int i=0; i<40; i++) 
38         tp.queue->push(make_task(&job, i+1));
39     
40     tp.destroy();
41     printf("all task has completed\n");
42     return 0;
43 }
View Code

 

以上代碼須要在linux下編譯,mingw下封裝的pthread_t,多了一個void *指針,若是要適配還須要本身再封裝一次。

相關文章
相關標籤/搜索