Linux簡單高併發模型——Epoll + 線程池

版權聲明:本文爲博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處連接和本聲明。
本文連接:https://blog.csdn.net/qq_25425023/article/details/70199133
首先是一個locker.h的文件,封裝了信號量、互斥量、條件變量。ios

在線程池中的任務隊列須要互斥量的保護,當任務隊列中有任務到達時,須要喚醒一個等待pthread_cond_wait()的線程,線程池中止時,須要喚醒因此的線程,調用的是pthread_cond_broadcast()。數組

locker.h文件:併發

#ifndef _LOCKER_H_
#define _LOCKER_H_
 
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
 
/*信號量的類*/
class sem_locker
{
private:
    sem_t m_sem;
 
public:
    //初始化信號量
    sem_locker()
    {
    if(sem_init(&m_sem, 0, 0) != 0)
        printf("sem init error\n");
    }
    //銷燬信號量
    ~sem_locker()
    {
    sem_destroy(&m_sem);
    }
 
    //等待信號量
    bool wait()
    {
    return sem_wait(&m_sem) == 0;
    }
    //添加信號量
    bool add()
    {
    return sem_post(&m_sem) == 0;
    }
};
 
 
/*互斥 locker*/
class mutex_locker
{
private:
    pthread_mutex_t m_mutex;
 
public:
    mutex_locker()
    {
        if(pthread_mutex_init(&m_mutex, NULL) != 0)
        printf("mutex init error!");
    }
    ~mutex_locker()
    {
    pthread_mutex_destroy(&m_mutex);
    }
 
    bool mutex_lock()  //lock mutex
    {
    return pthread_mutex_lock(&m_mutex) == 0;
    }
    bool mutex_unlock()   //unlock
    {
    return pthread_mutex_unlock(&m_mutex) == 0;
    }
};
 
/*條件變量 locker*/
class cond_locker
{
private:
    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
 
public:
    // 初始化 m_mutex and m_cond
    cond_locker()
    {
    if(pthread_mutex_init(&m_mutex, NULL) != 0)
        printf("mutex init error");
    if(pthread_cond_init(&m_cond, NULL) != 0)
    {   //條件變量初始化是被,釋放初始化成功的mutex
        pthread_mutex_destroy(&m_mutex);
        printf("cond init error");
    }
    }
    // destroy mutex and cond
    ~cond_locker()
    {
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_cond);
    }
    //等待條件變量
    bool wait()
    {
    int ans = 0;
    pthread_mutex_lock(&m_mutex);
    ans = pthread_cond_wait(&m_cond, &m_mutex);
    pthread_mutex_unlock(&m_mutex);
    return ans == 0;
    }
    //喚醒等待條件變量的線程
    bool signal()
    {
    return pthread_cond_signal(&m_cond) == 0;
    }
 
    //喚醒all等待條件變量的線程
    bool broadcast()
    {
            return pthread_cond_broadcast(&m_cond) == 0;
    }
};
 
#endifapp


thread_pool.h文件。socket

建立threadnum個線程,並調用pthread_detach()分離線程,線程結束,自動回收資源。(前面的一篇博客的線程池有bug,不完整,線程池退出時,不能讓全部的線程正常退出)函數

#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_
 
#include "locker.h"
#include <queue>
#include <stdio.h>
#include <exception>
#include <errno.h>
#include <pthread.h>
#include <iostream>
 
template<class T>
class threadpool
{
private:
    int thread_number;  //線程池的線程數
    //int max_task_number;  //任務隊列中的最大任務數
    pthread_t *all_threads;   //線程數組
    std::queue<T *> task_queue; //任務隊列
    mutex_locker queue_mutex_locker;  //互斥鎖
    //sem_locker queue_sem_locker;   //信號量
    cond_locker queue_cond_locker; //cond
    bool is_stop; //是否結束線程
public:
    threadpool(int thread_num = 20);
    ~threadpool();
    bool append_task(T *task);  //添加任務
    void start();              //線程池開啓
    void stop();               //線程池關閉
private:
    //線程運行的函數。執行run()函數
    static void *worker(void *arg);
    void run();
    T *getTask();   //獲取任務
};
 
template <class T>
threadpool<T>::threadpool(int thread_num):
    thread_number(thread_num),is_stop(false), all_threads(NULL)
{       //構造函數
    if(thread_num <= 0)
    printf("threadpool can't init because thread_number = 0");
 
    all_threads = new pthread_t[thread_number];
    if(all_threads == NULL)
        printf("can't init threadpool because thread array can't new");
}
 
template <class T>
threadpool<T>::~threadpool()
{
    delete []all_threads;
    stop();
}
 
template <class T>
void threadpool<T>::stop() //線程池中止
{
        is_stop = true;
        //queue_sem_locker.add();
        queue_cond_locker.broadcast();
}
 
template <class T>
void threadpool<T>::start()  //線程池啓動
{
    for(int i = 0; i < thread_number; ++i)
    {
    //printf("create the %dth pthread\n", i);
    if(pthread_create(all_threads + i, NULL, worker, this) != 0)
    {//建立線程失敗,清除成功申請的資源並拋出異常
        delete []all_threads;
        throw std::exception();
    }
    if(pthread_detach(all_threads[i]))
    {//將線程設置爲脫離線程,失敗則清除成功申請的資源並拋出異常
        delete []all_threads;
        throw std::exception();
    }
    }
}
//添加任務進入任務隊列
template <class T>
bool threadpool<T>::append_task(T *task)   //添加任務
{   //獲取互斥鎖
    queue_mutex_locker.mutex_lock();
    
    bool is_signal = task_queue.empty();
    //添加進入隊列
    task_queue.push(task);
    queue_mutex_locker.mutex_unlock();
    //喚醒等待任務的線程
    if(is_signal)
    {
            queue_cond_locker.signal();
    }
    return true;
}
 
template <class T>
void *threadpool<T>::worker(void *arg)  //線程工做函數
{
    threadpool *pool = (threadpool *)arg;
    pool->run();
    return pool;
}
 
template <class T>
T* threadpool<T>::getTask()   //從任務隊列中獲取任務
{
    T *task = NULL;
    queue_mutex_locker.mutex_lock();
    if(!task_queue.empty())
    {
        task = task_queue.front();
        task_queue.pop();
    }
    queue_mutex_locker.mutex_unlock();
    return task;
}
 
template <class T>
void threadpool<T>::run()
{
    while(!is_stop){
        T *task = getTask();
        if(task == NULL)  //隊列爲空,等待
                queue_cond_locker.wait();
        else              //執行任務
                task->doit();
    }
    //for test
    //printf("exit%d\n", (unsigned long)pthread_self());
}
 
#endif
 
 post

封裝了epoll。
EpollServer.h中的BaseTask.h和Task.h應該放在另一個文件中的。這裏圖個方便,哈哈。測試

#ifndef _EPOLL_SERVER_H_
#define _EPOLL_SERVER_H_
 
#include <sys/socket.h>
#include <sys/types.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/epoll.h>
//#include <pthread.h>
 
#include "thread_pool.h"
 
#define MAX_EVENT 1024   //epoll_events的最大個數
#define MAX_BUFFER 2048  //Buffer的最大字節
 
class BaseTask
{
public:
    virtual void doit() = 0;
};
 
class Task : public BaseTask
{
private:
    int sockfd;
    char order[MAX_BUFFER];
public:
    Task(char *str, int fd) : sockfd(fd)
    {
        memset(order, '\0', MAX_BUFFER);
        strcpy(order, str);
    }
    void doit()  //任務的執行函數
    {
        //do something of the order
        //printf("%s\n", order);
        snprintf(order, MAX_BUFFER - 1, "somedata\n");
        write(sockfd, order, strlen(order));
    }
};
 
class EpollServer
{
private:
    bool is_stop;   //是否中止epoll_wait的標誌
    int threadnum;   //線程數目
    int sockfd;     //監聽的fd
    int port;      //端口
    int epollfd;    //Epoll的fd
    threadpool<BaseTask> *pool;   //線程池的指針
    //char address[20];
    epoll_event events[MAX_EVENT];  //epoll的events數組
    struct sockaddr_in bindAddr;   //綁定的sockaddr
 
public://構造函數
    EpollServer()
    {}
    EpollServer(int ports, int thread) : is_stop(false) , threadnum(thread) ,
        port(ports), pool(NULL)
    {
    }
    ~EpollServer()  //析構
    {
        delete pool;
    }
 
    void init();
 
    void epoll();
 
    static int setnonblocking(int fd)  //將fd設置稱非阻塞
    {
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
    }
 
    static void addfd(int epollfd, int sockfd, bool oneshot)  //向Epoll中添加fd
    {//oneshot表示是否設置稱同一時刻,只能有一個線程訪問fd,數據的讀取都在主線程中,因此調用都設置成false
        epoll_event event;
        event.data.fd = sockfd;
        event.events = EPOLLIN | EPOLLET;
        if(oneshot)
        {
            event.events |= EPOLLONESHOT;
        }
        epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event); //添加fd
        EpollServer::setnonblocking(sockfd);
    }
 
};
 
void EpollServer::init()   //EpollServer的初始化
{
    bzero(&bindAddr, sizeof(bindAddr));
    bindAddr.sin_family = AF_INET;
    bindAddr.sin_port = htons(port);
    bindAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        //建立Socket
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0)
    {
        printf("EpollServer socket init error\n");
        return;
    }
    int ret = bind(sockfd, (struct sockaddr *)&bindAddr, sizeof(bindAddr));
    if(ret < 0)
    {
        printf("EpollServer bind init error\n");
        return;
    }
    ret = listen(sockfd, 10);
    if(ret < 0)
    {
        printf("EpollServer listen init error\n");
        return;
    }
        //create Epoll
    epollfd = epoll_create(1024);
    if(epollfd < 0)
    {
        printf("EpollServer epoll_create init error\n");
        return;
    }
    pool = new threadpool<BaseTask>(threadnum);  //建立線程池
}
 
void EpollServer::epoll()
{
    pool->start();   //線程池啓動
    //
    addfd(epollfd, sockfd, false);
    while(!is_stop)
    {//調用epoll_wait
        int ret = epoll_wait(epollfd, events, MAX_EVENT, -1);
        if(ret < 0)  //出錯處理
        {
            printf("epoll_wait error\n");
            break;
        }
        for(int i = 0; i < ret; ++i)
        {
            int fd = events[i].data.fd;
            if(fd == sockfd)  //新的鏈接到來
            {
                struct sockaddr_in clientAddr;
                socklen_t len = sizeof(clientAddr);
                int confd = accept(sockfd, (struct sockaddr *)
                    &clientAddr, &len);
 
                EpollServer::addfd(epollfd, confd, false);
            }
            else if(events[i].events & EPOLLIN)  //某個fd上有數據可讀
            {
                char buffer[MAX_BUFFER];
        readagain:    memset(buffer, 0, sizeof(buffer));
                int ret = read(fd, buffer, MAX_BUFFER - 1);
                if(ret == 0)  //某個fd關閉了鏈接,從Epoll中刪除並關閉fd
                {
                    struct epoll_event ev;
                    ev.events = EPOLLIN;
                    ev.data.fd = fd;
                    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
                    shutdown(fd, SHUT_RDWR);
                    printf("%d logout\n", fd);
                    continue;
                }
                else if(ret < 0)//讀取出錯,嘗試再次讀取
                {
                    if(errno == EAGAIN)    
                    {
                        printf("read error! read again\n");
                        goto readagain;
                            break;
                    }
                }
                else//成功讀取,向線程池中添加任務
                {
                    BaseTask *task = new Task(buffer, fd);
                    pool->append_task(task);
                }
            }
            else
            {
                printf("something else had happened\n");
            }
        }
    }
    close(sockfd);//結束。
 
    pool->stop();
}
 
#endifthis


接下來是簡單的Demo的測試。.net

#include "EpollServer.h"
 
int main(int argc, char const *argv[])
{
    if(argc != 3)
    {
        printf("usage %s port threadnum\n", argv[0]);
        return -1;
    }
    int port = atoi(argv[1]);
    if(port == 0)
    {
        printf("port must be Integer\n");
        return -1;
    }
    int threadnum = atoi(argv[2]);
    if(port == 0)
    {
        printf("threadnum must be Integer\n");
        return -1;
    }
    EpollServer *epoll = new EpollServer(port, threadnum);
 
    epoll->init();
 
    epoll->epoll();
    return 0;
}


代碼在Ubuntu中編譯經過。下次再來更新可以支持併發量的多少。

-------------------------------------------------------------------------------------------------

 ————————————————  版權聲明:本文爲CSDN博主「XD灬」的原創文章,遵循CC 4.0 by-sa版權協議,轉載請附上原文出處連接及本聲明。 原文連接:https://blog.csdn.net/qq_25425023/article/details/70199133

相關文章
相關標籤/搜索