網絡編程之Reactor 模式

基本的架構是 epoll+線程池。react

這篇博文主要從如下幾個方面進行闡述:數組

(1)reactor模式的一個介紹:(只要是個人理解)安全

(2)關於線程池的說明。數據結構

(3)如何將epoll + 池結合起來實現一個羣聊 架構

一. reactor 模式:socket

從我我的的理解角度,所謂的reactor模式相似於:函數

場景:銀行, 和三個業務工做人員 ,一個接待,有不少人在等待。post

當你進去的時候,銀行的接待會給你一個編號,這就是你第幾個纔會被業務工做人員接待。spa

這個時候,你就進入了等待的狀態。直到輪流到你了,三個業務工做人員中的一個就會幫助你處理你的問題。操作系統

類比到計算機就是:做爲接待只是給你發了一個編號,她並不關心你要處理什麼業務,以後又去給後續進來的人發送編號。她就不在意何時你的業務纔會被處理到,至於你的業務被處理的時候,就屬於業務處理人員。固然業務處理人員也不會關心給進來的人發編號,他只關心你當前要處理的業務是什麼。

這種模式的優勢在於:主線程只監聽當前套接字是否可讀或者可寫,至於你要處理什麼事情,就交給工做線程了。

估計如今原本清晰的你,已經被我弄糊塗了,那就用圖說明:

 

IO模型實現reactor 模式的工做流程:

(1)主線程向epoll內核事件表內註冊socket上的可讀就緒事件。

(2)主線程調用epoll_wait()等待socket上有數據可讀

(3)當socket上有數據可讀,epoll_wait 通知主線程。主線程從socket可讀事件放入請求隊列。

(4)睡眠在請求隊列上的某個可讀工做線程被喚醒,從socket上讀取數據,處理客戶的請求。

而後向 epoll內核事件表裏註冊寫的就緒事件

(5)主線程調用epoll_wait()等待數據可寫 。

 

以上就是關於Reactor模式的一個說明,下面就來看線程池的說明。

(二)線程池:

之因此會出現池這個概念就是:單個任務處理事件比較短,須要處理的任務有比較多。使用線程池能夠減小在建立和撤銷線程上花費的時間以及系統資源的開銷。若是不使用線程池有可能建立大量的線程而消耗完系統資源以及過分的切換。

線程池的概念是從一個很簡單的模型開始的,那就是生產者和消費者思想,這個模型我相信不少學過操做系統的人,都知道。主線程就至關因而生產者,而咱們本身建立的大量的線程就至關於消費者(工做線程),生產者將本身須要處理的業務放到一個任務隊列中,而工做線程從任務隊列中取出任務,而後又加以處理。咱們建立的大量線程經過一個池的東西維護起來,這個池裏麪包含咱們建立的線程,還有那個工做的隊列,互斥鎖,條件變量等等不少的東西。

(三)epoll + 池的結合

#include <arpa/inet.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <pthread.h>
#include <fcntl.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include "thread_pool.h"
#include "thread_pool.c"
#define MAX_EVENT_NUMBER  1000 
#define SIZE    1024 
#define MAX     10 

//從主線程向工做線程數據結構
struct fd 
{
    int epollfd; 
    int sockfd ;
};
 
//用戶說明
struct user
{
    int  sockfd ;   //文件描述符
    char client_buf [SIZE]; //數據的緩衝區
};
struct user user_client[MAX];  //定義一個全局的客戶數據表


//因爲epoll設置的EPOLLONESHOT模式,當出現errno =EAGAIN,就須要從新設置文件描述符(可讀)
void reset_oneshot (int epollfd , int fd)
{
    struct epoll_event event ;
    event.data.fd = fd ;
    event.events = EPOLLIN|EPOLLET|EPOLLONESHOT ;
    epoll_ctl (epollfd , EPOLL_CTL_MOD, fd , &event);

}
//向epoll內核事件表裏面添加可寫的事件
int addreadfd (int epollfd , int fd , int oneshot)
{
    struct epoll_event  event ;
    event.data.fd = fd ;
    event.events |= ~ EPOLLIN ; 
    event.events |= EPOLLOUT ;
    event.events |= EPOLLET;
    if (oneshot)
    {
        event.events |= EPOLLONESHOT ; //設置EPOLLONESHOT

    }
    epoll_ctl (epollfd , EPOLL_CTL_MOD ,fd , &event);

}
//羣聊函數
int groupchat (int epollfd , int sockfd , char *buf)
{
        
        int i = 0 ;
    for ( i  = 0 ; i < MAX ; i++)
        {
            if (user_client[i].sockfd == sockfd)
        {
                 continue ;
            }
            strncpy (user_client[i].client_buf ,buf , strlen (buf)) ;
            addreadfd (epollfd , user_client[i].sockfd , 1);

        }

}
//接受數據的函數,也就是線程的回調函數
int funcation (void *args)
{
    int sockfd = ((struct fd*)args)->sockfd ; 
    int epollfd =((struct fd*)args)->epollfd;
    char buf[SIZE];
        memset (buf , '\0', SIZE);
    
    printf ("start new thread to receive data on fd :%d\n", sockfd);
    
    //因爲我將epoll的工做模式設置爲ET模式,因此就要用一個循環來讀取數據,防止數據沒有讀完,而丟失。
    while (1)
    {
        int ret = recv (sockfd ,buf , SIZE-1 , 0);
        if (ret == 0)
        {
            close (sockfd);
            break;
        }
        else if (ret < 0)
        {
            if (errno == EAGAIN)
                {
                reset_oneshot (epollfd, sockfd);  //從新設置(上面已經解釋了)
                break;
            }
        }
        else
        {
            printf (" read data is %s\n", buf);
            sleep (5);
                groupchat (epollfd , sockfd, buf );
        }
        

    }
    printf ("end thread receive  data on fd : %d\n", sockfd);
    
}
//這是從新註冊,將文件描述符從可寫變成可讀
int addagainfd (int epollfd , int fd)
{
       struct epoll_event event;
       event.data.fd = fd ;
       event.events  |= ~EPOLLOUT ;
       event.events = EPOLLIN|EPOLLET|EPOLLONESHOT;
       epoll_ctl (epollfd , EPOLL_CTL_MOD , fd , &event);
}
//與前面的解釋同樣
int reset_read_oneshot (int epollfd , int sockfd)
{
    struct epoll_event  event;
    event.data.fd = sockfd ;
    event.events = EPOLLOUT |EPOLLET |EPOLLONESHOT ;
    epoll_ctl (epollfd, EPOLL_CTL_MOD , sockfd , &event);
    return 0 ;

}

//發送讀的數據
int readfun (void *args)
{
       int sockfd = ((struct fd *)args)->sockfd ;
       int epollfd= ((struct fd*)args)->epollfd ;
        
       int ret = send (sockfd, user_client[sockfd].client_buf , strlen (user_client[sockfd].client_buf), 0); //發送數據
       if (ret == 0 )
       {
           
           close (sockfd);
           printf ("發送數據失敗\n");
           return -1 ;
       }
       else if (ret == EAGAIN)
       {
           reset_read_oneshot (epollfd , sockfd);
           printf("send later\n");
           return -1;
       }
       memset (&user_client[sockfd].client_buf , '\0', sizeof (user_client[sockfd].client_buf));
       addagainfd (epollfd , sockfd);//從新設置文件描述符
     
}
//套接字設置爲非阻塞
int setnoblocking (int fd)
{
    int old_option = fcntl (fd, F_GETFL);
    int new_option = old_option|O_NONBLOCK;
    fcntl (fd , F_SETFL , new_option);
    return old_option ;
}

int addfd (int epollfd , int fd , int oneshot)
{
    struct epoll_event  event;
    event.data.fd = fd ;
    event.events = EPOLLIN|EPOLLET ;
    if (oneshot)
    {
        event.events |= EPOLLONESHOT ;

    }
    epoll_ctl (epollfd , EPOLL_CTL_ADD ,fd ,  &event);
    setnoblocking (fd);
    return 0 ;
}



int main(int argc, char *argv[])
{
    struct sockaddr_in  address ;
    const char *ip = "127.0.0.1";
    int port  = 8086 ;

    memset (&address , 0 , sizeof (address));
    address.sin_family = AF_INET ;
    inet_pton (AF_INET ,ip , &address.sin_addr);
    address.sin_port =htons( port) ;


    int listenfd = socket (AF_INET, SOCK_STREAM, 0);
    assert (listen >=0);
    int reuse = 1;
        setsockopt (listenfd , SOL_SOCKET , SO_REUSEADDR , &reuse , sizeof (reuse)); //端口重用,由於出現過端口沒法綁定的錯誤
    int ret = bind (listenfd, (struct sockaddr*)&address , sizeof (address));
    assert (ret >=0 );

    ret = listen (listenfd , 5);
        assert (ret >=0);
        

        struct epoll_event events[MAX_EVENT_NUMBER];
        
    int epollfd = epoll_create (5); //建立內核事件描述符表
    assert (epollfd != -1);
    addfd (epollfd , listenfd, 0);
        
        thpool_t  *thpool ;  //線程池
    thpool = thpool_init (5) ; //線程池的一個初始化
        
        while (1)
    {
        int ret = epoll_wait (epollfd, events, MAX_EVENT_NUMBER , -1);//等待就緒的文件描述符,這個函數會將就緒的複製到events的結構體數組中。
        if (ret < 0)
        {
            printf ("poll failure\n");
            break ; 
        }
                int i =0  ;
        for ( i = 0 ; i < ret ; i++ )
        {
            int sockfd = events[i].data.fd ;

            if (sockfd == listenfd)
            {
                struct sockaddr_in client_address ;
                socklen_t  client_length = sizeof (client_address);
                int connfd = accept (listenfd , (struct sockaddr*)&client_address,&client_length);
                user_client[connfd].sockfd = connfd ;
                memset (&user_client[connfd].client_buf , '\0', sizeof (user_client[connfd].client_buf));
                addfd (epollfd , connfd , 1);//將新的套接字加入到內核事件表裏面。
            }
            else if (events[i].events & EPOLLIN) 
            {
                 struct fd    fds_for_new_worker ;
                 fds_for_new_worker.epollfd = epollfd ; 
                 fds_for_new_worker.sockfd = sockfd ;
                
                  thpool_add_work (thpool, (void*)funcation ,&fds_for_new_worker);//將任務添加到工做隊列中
                }else if (events[i].events & EPOLLOUT)
            {
                
                                 struct  fd   fds_for_new_worker ;
                 fds_for_new_worker.epollfd = epollfd ;
                 fds_for_new_worker.sockfd = sockfd ;
                                 thpool_add_work (thpool, (void*)readfun , &fds_for_new_worker );//將任務添加到工做隊列中
            }
                        
        }

    }
                            
    thpool_destory (thpool);
    close (listenfd);
        return EXIT_SUCCESS;
}

線程池的代碼在這裏:

#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>


#include "thread_pool.h"


static int thpool_keepalive = 1 ;   //線程池保持存活
pthread_mutex_t  mutex  = PTHREAD_MUTEX_INITIALIZER ;  //靜態賦值法初始化互斥鎖


thpool_t * thpool_init (int threadsN){
    thpool_t  *tp_p ; 

    if (!threadsN || threadsN < 1){
        threadsN = 1 ;
        
    }

    tp_p =  (thpool_t *)malloc (sizeof (thpool_t)) ;
    if (tp_p == NULL){
            fprintf (stderr ,"thpool_init (): could not allocate memory for thread pool\n");
        return NULL ;
    }
    tp_p->threads = (pthread_t *)malloc (threadsN * sizeof (pthread_t));
    if (tp_p->threads == NULL){
        fprintf( stderr , "could not allocation memory for thread id\n");
        return NULL;
    }
    tp_p->threadsN = threadsN ;
    

    if (thpool_jobqueue_init (tp_p) == -1){
        fprintf (stderr ,"could not allocate memory for job queue\n");
        return NULL;
        }

    /*初始化信號*/
    tp_p->jobqueue->queueSem = (sem_t *)malloc (sizeof (sem_t));

    /*定位一個匿名信號量,第二個參數是1表示。這個信號量將在進程內的線程是共享的,第三個參數是信號量的初始值*/
    sem_init (tp_p->jobqueue->queueSem, 0 , 0 );
        
    int  t ; 
    
       

    for (t = 0 ; t < threadsN ; t++){
        printf ("Create thread %d in pool\n", t);

        //第四個參數是傳遞給函數指針的一個參數,這個函數指針就是咱們所說的線程指針
        if (pthread_create (&(tp_p->threads[t]) , NULL , (void *) thpool_thread_do , (void *)tp_p)){
            free (tp_p->threads);
            
            free (tp_p->jobqueue->queueSem);
            free (tp_p->jobqueue);
            free (tp_p);
        } 
    }
    return  tp_p ;
}



/*
 * 初始化完線程應該處理的事情
 * 這裏存在兩個信號量,
 */

void thpool_thread_do (thpool_t *tp_p){
    while (thpool_keepalive)
    {
        if (sem_wait (tp_p->jobqueue->queueSem))  //若是工做隊列中沒有工做,那麼全部的線程都將在這裏阻塞,當他調用成功的時候,信號量-1
        {
            fprintf(stderr , "Waiting for semaphore\n");
            exit (1);
        }

        if (thpool_keepalive)
        {
            void *(*func_buff) (void *arg);
            void *arg_buff;
            thpool_job_t *job_p;

            pthread_mutex_lock (&mutex);
            job_p = thpool_jobqueue_peek (tp_p);
                func_buff = job_p->function ;
            arg_buff= job_p->arg ;
            thpool_jobqueue_removelast (tp_p);
            pthread_mutex_unlock (&mutex);
            
            func_buff (arg_buff);
        
            free (job_p);
        }
        else 
        {
            return ;
        }
    }
    return ;



    
}


int thpool_add_work (thpool_t *tp_p ,void * (*function_p )(void *), void *arg_p){

    thpool_job_t   *newjob ;

    newjob = (thpool_job_t *)malloc (sizeof (thpool_job_t));
    if (newjob == NULL)
    {
        fprintf (stderr,"couldnot allocate memory for new job\n");
        exit (1);
    }
    newjob->function = function_p ;
    newjob->arg = arg_p ;

    pthread_mutex_lock (&mutex);
    thpool_jobqueue_add (tp_p ,newjob);
    pthread_mutex_unlock (&mutex);
    return 0 ;
}


void thpool_destory (thpool_t *tp_p){
    int    t ;

    thpool_keepalive = 0 ;  //讓全部的線程運行的線程都退出循環

    for (t = 0 ; t < (tp_p->threadsN) ; t++ ){

        //sem_post 會使在這個線程上阻塞的線程,再也不阻塞
        if (sem_post (tp_p->jobqueue->queueSem) ){
            fprintf (stderr,"thpool_destory () : could not bypass sem_wait ()\n");
        }
        
    }
    if (sem_destroy (tp_p->jobqueue->queueSem)!= 0){
        fprintf (stderr, "thpool_destory () : could not destroy semaphore\n");
    }

    for (t = 0 ; t< (tp_p->threadsN) ; t++)
    {
        pthread_join (tp_p->threads[t], NULL);
    }
    thpool_jobqueue_empty (tp_p);
    free (tp_p->threads);
    free (tp_p->jobqueue->queueSem);
    free (tp_p->jobqueue);
    free (tp_p);



}


int thpool_jobqueue_init (thpool_t *tp_p)
{
    tp_p->jobqueue = (thpool_jobqueue *)malloc (sizeof (thpool_jobqueue));
    if (tp_p->jobqueue == NULL)
    {
        fprintf (stderr ,"thpool_jobqueue malloc is error\n");
        return -1 ;
    }
    tp_p->jobqueue->tail = NULL ;
    tp_p->jobqueue->head = NULL ;
    tp_p->jobqueue->jobsN = 0 ;
    return 0 ;

}

void thpool_jobqueue_add (thpool_t *tp_p , thpool_job_t *newjob_p){
    newjob_p->next = NULL ;
    newjob_p->prev = NULL ;

    thpool_job_t   *oldfirstjob ;
    oldfirstjob = tp_p->jobqueue->head;


    switch (tp_p->jobqueue->jobsN)
    {
        case 0 : 
             tp_p->jobqueue->tail = newjob_p;
             tp_p->jobqueue->head = newjob_p;
             break;
        default :
             oldfirstjob->prev= newjob_p ;
             newjob_p->next = oldfirstjob ;
             tp_p->jobqueue->head= newjob_p;
             break;
    }

        (tp_p->jobqueue->jobsN)++ ;
    sem_post (tp_p->jobqueue->queueSem);  //原子操做,信號量增長1 ,保證線程安全

    int sval ;
    sem_getvalue (tp_p->jobqueue->queueSem , &sval);   //sval表示當前正在阻塞的線程數量

}

int thpool_jobqueue_removelast (thpool_t *tp_p){
    thpool_job_t *oldlastjob  , *tmp;
    oldlastjob = tp_p->jobqueue->tail ;


    switch (tp_p->jobqueue->jobsN)
    {
        case 0 :
            return -1 ;
            break;
        case 1 :
            tp_p->jobqueue->head = NULL ;
            tp_p->jobqueue->tail = NULL ;
            break;
        default :
            tmp = oldlastjob->prev ;
            tmp->next = NULL ;
            tp_p->jobqueue->tail = oldlastjob->prev;

    }
    (tp_p->jobqueue->jobsN) -- ;
    int sval ;
    sem_getvalue (tp_p->jobqueue->queueSem, &sval);
    return 0 ;
}
thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p){
    return tp_p->jobqueue->tail ;
}


void thpool_jobqueue_empty (thpool_t *tp_p)
{
    thpool_job_t *curjob;
    curjob = tp_p->jobqueue->tail ;
    while (tp_p->jobqueue->jobsN){
        tp_p->jobqueue->tail = curjob->prev ;
        free (curjob);
        curjob = tp_p->jobqueue->tail ;
        tp_p->jobqueue->jobsN -- ;
    }
    tp_p->jobqueue->tail = NULL ;
    tp_p->jobqueue->head = NULL ;
}

線程池的.h文件

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <pthread.h>
#include <semaphore.h>

/*Individual job*/
typedef struct thpool_job_t {
    void   (*function)(void* arg);    //函數指針
    void                    *arg ;    //函數的參數
    struct tpool_job_t     *next ;    //指向下一個任務
    struct tpool_job_t     *prev ;    //指向前一個任務
}thpool_job_t ;

/*job queue as doubly linked list*/
typedef struct thpool_jobqueue {
    thpool_job_t    *head ;           //隊列的頭指針
    thpool_job_t    *tail;            //對列的尾指針
    int             jobsN;           //隊列中工做的個數
        sem_t           *queueSem;        //原子信號量
}thpool_jobqueue;

/*thread pool*/

typedef struct thpool_t {
    pthread_t          *threads ;        //線程的ID 
    int                 threadsN ;        //線程的數量
    thpool_jobqueue    *jobqueue;        //工做隊列的指針
    

}thpool_t;


/*線程池中的線程都須要互斥鎖和指向線程池的一個指針*/
typedef struct thread_data{
    pthread_mutex_t     *mutex_p ;
    thpool_t            *tp_p ;
}thread_data;



/*
 *    初始化線程池
 *    爲線程池, 工做隊列, 申請內存空間,信號等申請內存空間
 *    @param  :將被使用的線程ID 
 *    @return :成功返回的線程池結構體,錯誤返回null
 */

thpool_t   *thpool_init (int threadsN);

/*
 * 每一個線程要作的事情
 * 這是一個無止境循環,當撤銷這線程池的時候,這個循環纔會被中斷
 *@param: 線程池
 *@return:不作任何的事情
 */

void  thpool_thread_do (thpool_t *tp_p);

/* 
 *向工做隊列裏面添加任何
 *採用來了一個行爲和他的參數,添加到線程池的工做對列中去,
 * 若是你想添加工做函數,須要更多的參數,經過傳遞一個指向結構體的指針,就能夠實現一個接口
 * ATTENTION:爲了避免引發警告,你不得不將函數和參數都帶上
 *
 * @param: 添加工做的線程線程池
 * @param: 這個工做的處理函數
 * @param:函數的參數
 * @return : int 
 */

int thpool_t_add_work (thpool_t *tp_p ,void* (*function_p) (void *), void* arg_p );


/*
 *摧毀線程池
 *
 *這將撤銷這個線程池和釋放所申請的內存空間,當你在調用這個函數的時候,存在有的線程還在運行中,那麼
 *中止他們如今所作的工做,而後他們被撤銷掉
 * @param:你想要撤銷的線程池的指針
 */


void thpool_destory (thpool_t  *tp_p);

/*-----------------------Queue specific---------------------------------*/



/*
 * 初始化隊列
 * @param: 指向線程池的指針
 * @return :成功的時候返回是 0 ,分配內存失敗的時候,返回是-1
 */
int thpool_jobqueue_init (thpool_t *tp_p);


/*
 *添加任務到隊列
 *一個新的工做任務將被添加到隊列,在使用這個函數或者其餘向別的相似這樣
 *函數 thpool_jobqueue_empty ()以前,這個新的任務要被申請內存空間
 *
 * @param: 指向線程池的指針
 * @param:指向一個已經申請內存空間的任務
 * @return   nothing 
 */
void thpool_jobqueue_add (thpool_t * tp_p , thpool_job_t *newjob_p);

/*
 * 移除對列的最後一個任務
 *這個函數將不會被釋放申請的內存空間,因此要保證
 *
 *@param :指向線程池的指針
 *@return : 成功返回0 ,若是對列是空的,就返回-1 
 */
int thpool_jobqueue_removelast (thpool_t *tp_p);


/*
 *對列的最後一個任務
 *在隊列裏面獲得最後一個任務,即便隊列是空的,這個函數依舊可使用
 *
 *參數:指向線程池結構體的指針
 *返回值:獲得隊列中最後一個任務的指針,或者在對列是空的狀況下,返回是空
 */
thpool_job_t * thpool_jobqueue_peek (thpool_t *tp_p);

/*
 *移除和撤銷這個隊列中的全部任務
 *這個函數將刪除這個隊列中的全部任務,將任務對列恢復到初始化狀態,所以隊列的頭和對列的尾都設置爲NULL ,此時隊列中任務= 0
 *
 *參數:指向線程池結構體的指針
 *
 */
void thpool_jobqueue_empty (thpool_t *tp_p);

#endif
相關文章
相關標籤/搜索