簡單Linux C線程池的實現

大多處網絡服務器,包括Web服務器都有一個特色,就是單位時間內要處理大量的請求,且處理的時間每每比較短。本文會分析一下多進程網絡服務器模型、多線程網絡服務器模型(包括線程池)之間的優缺點,並給出一個簡單的線程池模型。linux

多進程網絡服務器模型

多進程網絡服務器模型,其基本框架每每是,父進程用socket建立一個監聽套接字,而後bindIP以及port,接着開始listen該套接字,經過一個while循環來accept鏈接,對於每個鏈接,fork一個子進程來處理鏈接,並繼續accept。簡化後的代碼以下:web

int listenfd, clientfd;
struct sockadd_in servaddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
// ...初始化servaddr,填入addr,port等...
bind(listenfd, (struct sockadd \*)&servaddr, sizeof(servaddr));
listen(listenfd, LISTENQ);
pid_t pid;
while (1)
{
    clientfd = accept(listenfd, NULL, NULL);
    pid = fork();
    if (pid == -1)
    {
        // ...出錯處理...
        return 0;
    }
    if (pid == 0) // in child
    {
        close(listenfd);
        // handle(clientfd);
        return 0;
    }
    else if (pid > 0) // in parent
    {
        close(clientfd);
    }
}
複製代碼

這種模型的缺點也十分明顯,咱們都知道fork一個子進程的代價是很高的,表如今如下幾點:bash

1.每次進來一個鏈接,操做系統爲其建立一個進程,開銷太大。《APUE》書8.3節講到子進程是父進程的副本,父進程和子進程共享正文段,子進程得到父進數據空間、堆和棧的副本。即使如今如今不少實現經過寫時複製(Copy-On-Write,COW)技術來替代徹底拷貝,可是其中有一個複製父進程頁表的操做,這也是爲何在Linux下建立進程比建立線程開銷大的緣由,而全部線程都共享一個頁表。服務器

2.進程調度壓力大。當併發量上來以後,系統會有N多個進程,這時候操做系統將花費至關多的時間來調度進程以及執行進程的上下文切換。網絡

3.每一個進程都有本身獨立的地址空間,須要消耗必定的內存,太多的進程會形成內存的大量消耗。同時,高併發下父子進程之間的IPC也是一個問題。多線程

多線程網絡服務器模型

多線程網絡服務器模型大體同上,不一樣點在於把每次accept一個新鏈接是建立一個線程而不是進程來處理。然而咱們知道web服務器的一個特色就是短而高頻率的請求,表如今服務器端就是不停地建立線程,銷燬線程。因此該方法雖然在必定程度上解決了fork的開銷問題,可是一樣沒有辦法避免線程調度開銷問題以及內存問題。架構

一個改進的方法是改用線程池,讓線程的數量固定下來,這樣就解決了上述問題。其基本架構爲用一個loop來accept鏈接,以後把這個鏈接分配給線程池中的一個線程來處理,處理完了以後這個線程迴歸線程池等待下一次處理鏈接。從目前來看,該方法已經很好地解決了上面提到的各類問題,文末也會給出該方法的不足以及改進。下面給出一個較爲簡單的線程池模型。這份threadpool的實現並不是我原創的,看到代碼條理清楚,清晰易懂,就轉載過來了。併發

threadpool.h框架

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <pthread.h>
typedef void *(*callback_func)(void*);
typedef struct job
{
    callback_func p_callback_func;          // 線程回調函數
    void *arg;
    struct job *next;
} job_t;
typedef struct threadpool
{
    int thread_num;                         // 線程池中開啓線程的個數
    int queue_max_num;                      // 隊列中最大job的個數
    job_t *head;                            // 指向job的頭指針
    job_t *tail;                            // 指向job的尾指針
    pthread_t *pthreads;                    // 線程池中全部線程的pthread_t
    pthread_mutex_t mutex;                  // 互斥信號量
    pthread_cond_t queue_empty;             // 隊列爲空的條件變量
    pthread_cond_t queue_not_empty;         // 隊列不爲空的條件變量
    pthread_cond_t queue_not_full;          // 隊列不爲滿的條件變量
    int queue_cur_num;                      // 隊列當前的job個數
    int queue_close;                        // 隊列是否已經關閉
    int pool_close;                         // 線程池是否已經關閉
} threadpool_t;
/*
 * pthreadpool_init - 初始化線程池
 * @thread_num - 線程池開啓的線程個數
 * @queue_max_num - 隊列的最大job個數
 * 返回 - 成功返回線程池地址 失敗返回NULL
 * */
threadpool_t *threadpool_init(int thread_num, int queue_max_num);
/*
 * threadpool_add_job - 想線程池中添加任務
 * @pool - 線程池地址
 * @callback_function - 回調函數
 * @arg - 回調函數參數
 * 返回 - 成功返回0 失敗返回-1
 * */
int threadpool_add_job(threadpool_t *pool, callback_func p_callback_fun, void *arg);
/*
 * threadpool_destory - 銷燬線程池
 * @pool - 線程池地址
 * 返回 - 永遠返回0
 * */
int threadpool_destory(threadpool_t *pool);
/*
 * threadpool_function - 線程池中線程函數
 * @arg - 線程池地址
 * */
void *threadpool_function(void *arg);
#endif /* _THREAD_POOL_H_ */

複製代碼

threadpool.csocket

#include "threadpool.h"
#include "common.h"
threadpool_t *threadpool_init(int thread_num, int queue_max_num)
{
    threadpool_t *pool = NULL;
    pool = malloc(sizeof(threadpool_t));
    do
    {
        if (NULL == pool)
        {
            bug("failed to malloc threadpool\n");
            break;
        }
        pool->thread_num = thread_num;
        pool->queue_max_num = queue_max_num;
        pool->queue_cur_num = 0;
        pool->head = NULL;
        pool->tail = NULL;
        if (pthread_mutex_init(&(pool->mutex), NULL))
        {
            bug("pthread_mutex_init\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_empty), NULL))
        {
            bug("pthread_cond_init\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_empty), NULL))
        {
            bug("pthread_cond_init\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_full), NULL))
        {
            bug("pthread_cond_init\n");
            break;
        }
        pool->pthreads = malloc(sizeof(pthread_t) * thread_num); 
        if (NULL == pool->pthreads)
        {
            bug("malloc error\n");
            break;
        }
        
        pool->queue_close = 0;
        pool->pool_close = 0;
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
            if (pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool) < 0)
                bug("pthread_create\n");
        }
        return pool;
    } while (0);
    return NULL;
}
int threadpool_add_job(threadpool_t *pool, callback_func p_callback_func, void *arg)
{
    if (pool == NULL || p_callback_func == NULL)
        return -1;
    pthread_mutex_lock(&(pool->mutex));
    while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->pool_close || pool->queue_close))
    {
        // 等待threadpool_function發送queue_not_full信號
        pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex)); // 隊列滿的時候就等待
    }
    if (pool->queue_close || pool->pool_close) // 隊列關閉或者線程池關閉就退出
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    job_t *pjob = (job_t *)malloc(sizeof(job_t));
    if (NULL == pjob)
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    pjob->p_callback_func = p_callback_func;
    pjob->arg = arg;
    pjob->next = NULL;
    if (pool->head == NULL)
    {
        pool->head = pool->tail = pjob;
        pthread_cond_broadcast(&(pool->queue_not_empty)); // 隊列空的時候,有任務來了,就通知線程池中的線程:隊列非空
    }
    else
    {
        pool->tail->next = pjob;
        pool->tail = pjob; // 把任務插入到隊列的尾部
    }
    pool->queue_cur_num++;
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}
void *threadpool_function(void *arg)
{
    threadpool_t *pool = (threadpool_t *)arg;
    job_t *pjob = NULL;
    while (1)
    {
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == 0) && !pool->pool_close) // 隊列爲空,就等待隊列非空
        {
            // 等待threadpool_add_job函數發送queue_not_empty信號
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
        }
        if (pool->pool_close) // 線程池關閉,線程就退出
        {
            pthread_mutex_unlock(&(pool->mutex));
            pthread_exit(NULL);
        }
        pool->queue_cur_num--;
        pjob = pool->head;
        if (pool->queue_cur_num == 0)
        {
            pool->head = pool->tail = NULL;
        }
        else
        {
            pool->head = pjob->next;
        }
        if (pool->queue_cur_num == 0)
        {
            pthread_cond_signal(&(pool->queue_empty)); // 通知destory函數能夠銷燬線程池了
        }
        else if (pool->queue_cur_num <= pool->queue_max_num - 1)
        {
            // 向threadpool_add_job發送queue_not_full信號
            pthread_cond_broadcast(&(pool->queue_not_full));
        }
        pthread_mutex_unlock(&(pool->mutex));
        (*(pjob->p_callback_func))(pjob->arg); // 線程真正要作的工做,調用回調函數
        free(pjob);
        pjob = NULL;
    }
}
int threadpool_destory(threadpool_t *pool)
{
    if (pool == NULL)
        return 0;
    pthread_mutex_lock(&(pool->mutex));
    if (pool->queue_close && pool->pool_close) // 線程池已經退出了,就直接返回
    {
        pthread_mutex_unlock(&(pool->mutex));
        return 0;
    }
    pool->queue_close = 1; // 關閉任務隊列,不接受新的任務了
    while (pool->queue_cur_num != 0)
    {
        pthread_cond_wait(&(pool->queue_empty), &(pool->mutex)); // 等待隊列爲空
    }
    pool->pool_close = 1; // 線程池關閉
    pthread_mutex_unlock(&(pool->mutex));
    pthread_cond_broadcast(&(pool->queue_not_empty)); // 喚醒線程池中正在阻塞的線程
    pthread_cond_broadcast(&(pool->queue_not_full)); // 喚醒添加任務的threadpool_add_job函數
    int i;
    for (i = 0; i < pool->thread_num; ++i)
    {
        pthread_join(pool->pthreads[i], NULL); // 等待線程池的全部線程執行完畢
    }
    pthread_mutex_destroy(&(pool->mutex)); // 清理資源
    pthread_cond_destroy(&(pool->queue_empty)); 
    pthread_cond_destroy(&(pool->queue_not_empty)); 
    pthread_cond_destroy(&(pool->queue_not_full)); 
    free(pool->pthreads);
    job_t *pjob;
    while (pool->head != NULL)
    {
        pjob = pool->head;
        pool->head = pjob->next;
        free(pjob);
    }
    free(pool);
    return 0;
}
複製代碼

剩餘問題

線程池的方案雖然看起來很不錯,但在實際狀況中,不少鏈接都是長鏈接(在一個TCP鏈接上進行屢次通訊),一個線程在受到任務之後,處理完第一批來的數據,此時會再次調用read,可是客戶端下一次發送數據過來的時機是不肯定的,因而這個線程就被這個read給阻塞住了(socket fd默認是blocking的),直到1.這個fd可讀者;2.對方已經關閉鏈接;3.TCP超時這3個狀況之一發生以前什麼都不能幹,那麼併發量上來以後仍是會發生部分鏈接沒法被即便處理的狀況。

一個比較好的解決方案是把fd 設置爲non-blocking,並經過事件驅動(Event-driven)的方法來處理鏈接,在linux下能夠經過epoll實現。

小結

請你們多多關注小夥,您的關注與點贊是小夥更新的動力。

關於更多epoll問題能夠加小夥QQ,更多福利資源等你領取喲,QQ 758810390

相關文章
相關標籤/搜索