以前看libevent後寫的一個簡單問答服務器。linux
工做方式比較簡單,一個主線程和多個工做線程,主線程只接受鏈接並通知工做線程接管工做,工做線程接管鏈接而後接收消息並返回。也能夠換成進程的方式。ios
主線程與工做線程之間的通訊是使用的socket對,採用libevent的事件。數組
//libevent server sample on linux /*socket*/ #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> /**/ #include <iostream> #include <err.h> //err() #include <string.h> //memset() #include <fcntl.h> //fcntl() #include <pthread.h> // #include <cstdlib> //calloc() #include <unistd.h> //close() #include <sys/queue.h> //鏈表 #include <errno.h> //errno,ENTIR #include <stdio.h> /*libevent*/ #include <event.h> using namespace std; #define SERVER_PORT 55555 #define LISTEN_NUM 32 #define THREAD_NUM 4 #define BUF_LEN 1024 #define WRITE_BUF_LEN 1024*4 struct event_accepted{ int fd;//socket fd int thread_id; //記錄所連客戶端socket所屬的工做線程編號 struct event* ev_read; struct event* ev_write; char* write_buf; int len; //write_buf的長度 int offset; //write_buf已寫入後的偏移 }; struct socket_pair{ int connecter; int accepter; socket_pair(){ connecter =-1; accepter =-1; } }; /*工做線程結構*/ struct workthread_info{ int thread_id; //線程編號 pthread_mutex_t g_clock; //線程鎖 volatile int count; //各個工做線程socket鏈接數 struct socket_pair socket_pairs; //通知工做線程註冊事件的socket對 struct event socket_pair_event; //用於工做線程監聽註冊事件 struct event_base *base; //工做線程的event_base實例 // TAILQ_ENTRY(event_accepted) entries;//工做線程上註冊的event的鏈表,用於回收event分配的內存 workthread_info(){ count = 0; } }; struct workthread_info work_info[THREAD_NUM]; pthread_t pthread_id[THREAD_NUM]; // int setnonblock(int fd); void on_accept(int fd, short ev, void *arg); void on_read(int fd, short ev, void *arg); void on_write(int fd, short ev, void *arg); void* work_thread(void* arg); int socketpair_init(); //初始化本地socket鏈接用於通知子線程註冊事件 void socketpair_read(int fd, short ev, void *arg); //工做線程接收已鏈接的socket,併爲socket註冊讀事件 void destroy(); int main(){ //初始化工做線程註冊通知socket if(socketpair_init() <0){ err(1,"init socketpair_init failed"); } /*初始化監聽事件*/ event_init(); struct event ev_accept; int thread_id[THREAD_NUM]={0}; /*初始化工做線程*/ for(int i=0;i<THREAD_NUM;i++){ thread_id[i]=i; //將工做線程編號傳到回調函數中,i在這裏是一個臨時變量,將i的地址傳到線程中去的話i的值何時使用是不肯定的,可能致使一些工做線程編號相同。 if(0 !=pthread_create(&pthread_id[i],NULL,work_thread,&thread_id[i])){ err(1,"thread create failed"); } pthread_mutex_init(&work_info[i].g_clock,NULL); } /*初始化監聽socket*/ struct sockaddr_in listen_addr; int reuseaddr_on =1; int listen_fd =socket(AF_INET,SOCK_STREAM,0); if(listen_fd<0){ err(1,"listen failed"); } if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr_on,sizeof(reuseaddr_on)) <0){ err(1,"setsockopt failed"); } memset(&listen_addr,0,sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port =htons(SERVER_PORT); if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr))<0){ err(1,"bind failed"); } if(listen(listen_fd,LISTEN_NUM)<0){ err(1,"listen failed"); } if(setnonblock(listen_fd)<0){ err(1,"set server socket to non-blocking failed"); } /**/ event_set(&ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL); event_add(&ev_accept,NULL); event_dispatch(); return 0; } int setnonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } void on_accept(int fd, short ev, void *arg){ int client_fd; struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); client_fd = accept(fd,(struct sockaddr*)&client_addr,&client_len); if(client_fd == -1){ warn("accept failed"); return; } if(setnonblock(client_fd)<0){ warn("failed to set client socket non-blocking"); } //socket鏈接數最少的一個工做線程 int min =work_info[0].count; int thread_num =0; for(int i=0;i<THREAD_NUM;i++){ if(work_info[i].count < min){ thread_num = i; min = work_info[i].count; } } //將已鏈接的socket文件描述符通知工做線程 send(work_info[thread_num].socket_pairs.connecter,&client_fd,sizeof(client_fd),0); work_info[thread_num].count++; } int socketpair_init(){ // int connecter[THREAD_NUM]={-1}; int listener = -1; listener = socket(AF_INET,SOCK_STREAM,0); if(listener<0){ err(1,"%d: init socketpair listener for insert event failed",__LINE__); } struct sockaddr_in listen_addr,connect_addr[THREAD_NUM]; int size; memset(&listen_addr,0,sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); listen_addr.sin_port = 0; /*kernel chooses port.*/ if(bind(listener,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){ goto fail; } if(listen(listener,THREAD_NUM) == -1){ goto fail; } for(int i=0;i<THREAD_NUM;i++){ work_info[i].socket_pairs.connecter = socket(AF_INET,SOCK_STREAM,0); if(work_info[i].socket_pairs.connecter<0){ goto fail; } size = sizeof(connect_addr[i]); /* We want to find out the port number to connect to. */ if(getsockname(listener,(struct sockaddr*)&connect_addr[i],(socklen_t *)&size) == -1){ goto fail; } if(size != sizeof(connect_addr[i])){ goto fail; } if(connect(work_info[i].socket_pairs.connecter,(struct sockaddr*)&connect_addr[i],sizeof(connect_addr[i])) == -1){ printf("i:%d, %s\n",i,strerror(errno)); goto fail; } size = sizeof(listen_addr); work_info[i].socket_pairs.accepter = accept(listener,(struct sockaddr*)&listen_addr,(socklen_t*)&size); if(work_info[i].socket_pairs.accepter<0){ goto fail; } /* Now check we are talking to ourself by matching port and host on the two sockets. */ if(getsockname(work_info[i].socket_pairs.connecter,(struct sockaddr*)&connect_addr[i],(socklen_t*)&size) == -1){ goto fail; } if(size != sizeof(connect_addr[i]) || listen_addr.sin_family != connect_addr[i].sin_family || listen_addr.sin_addr.s_addr != connect_addr[i].sin_addr.s_addr || listen_addr.sin_port != connect_addr[i].sin_port){ goto fail; } } /*close listen socket*/ close(listener); return 0; fail: for(int i=0;i<THREAD_NUM;i++){ if(work_info[i].socket_pairs.connecter >0){ close(work_info[i].socket_pairs.connecter); } if(work_info[i].socket_pairs.accepter >0){ close(work_info[i].socket_pairs.accepter); } } return -1; } void on_read(int fd, short ev, void *arg){ struct event_accepted *client = (event_accepted*)arg; char* buf = new char[BUF_LEN]; if(buf == NULL){ err(1,"malloc failed"); } int recv_len = read(fd,buf,BUF_LEN); if(recv_len == 0){ //客戶端斷開鏈接 cout<<"client disconnected"<<endl; close(fd); event_del(client->ev_read); event_del(client->ev_write); delete client->ev_write; delete client->ev_read; delete client->write_buf; work_info[client->thread_id].count--; free(client); delete[] buf; return; } else if(recv_len < 0){ cout<<"socekt failure,disconnecting client:"<<strerror(errno)<<endl; close(fd); event_del(client->ev_read); event_del(client->ev_write); delete client->ev_write; delete client->ev_read; delete client->write_buf; work_info[client->thread_id].count--; free(client); delete[] buf; return; } //TODO 1.須要處理數組越界問題;2.這裏只是一問一答的狀況,若是須要主動推送消息須要增長設計; //TODO 1.在同一個文件描述符上若是屢次註冊同一事件會發生什麼?這裏改用鏈表記錄須要發送的數據可能會好一點, memcpy(client->write_buf+client->len,buf,recv_len); client->len += recv_len; event_add(client->ev_write,NULL); delete[] buf; } void on_write(int fd, short ev, void *arg){ struct event_accepted* client = (event_accepted*)arg; int len =0; len = write(fd,client->write_buf+client->offset,client->len-client->offset); if(len == -1){ //寫操做被信號打斷或不能寫入,從新註冊寫事件 if(errno == EINTR || errno == EAGAIN){ event_add(client->ev_write,NULL); return; } else{ err(1,"write error"); } } else if((client->offset + len) <client->len){ //數據沒有徹底寫入 client->offset += len; event_add(client->ev_write,NULL); return; } else{ //寫入完成 client->offset = client->len = 0; } } void* work_thread(void* arg){ int thread_num = *(int*)arg; //獲取線程編號 struct workthread_info *p_work_info = &work_info[thread_num]; p_work_info->thread_id = thread_num; //記錄線程編號 struct event_base * base = p_work_info->base = event_base_new(); event_set(&p_work_info->socket_pair_event,p_work_info->socket_pairs.accepter,EV_READ|EV_PERSIST,socketpair_read,&p_work_info->socket_pair_event); event_base_set(base,&p_work_info->socket_pair_event); event_add(&p_work_info->socket_pair_event,NULL); event_base_dispatch(base); } void socketpair_read(int fd, short ev, void *arg){ struct event_accepted *client = (struct event_accepted*)calloc(1,sizeof(event_accepted)); if(NULL == client){ err(1,"on_accept malloc event_accepted failed"); } struct workthread_info *p_work_info; for(int i=0;i<THREAD_NUM;i++){ //找到所屬工做線程編號 if(work_info[i].socket_pairs.accepter == fd){ p_work_info = &work_info[i]; client->thread_id = i; //記錄所連客戶端socket所屬的線程編號 break; } } int client_fd = -1; int recv_len = recv(fd,&client_fd,4,0); if(recv_len != 4){ err(1,"socketpair read len not equal to 4"); } if(client_fd <= 0){ err(1,"socketpair recved an error socket fd"); } client->fd = client_fd; client->ev_write = new event; client->ev_read = new event; client->write_buf = new char[WRITE_BUF_LEN]; client->len = 0; client->offset = 0; if(client->ev_read == NULL){ err(1,"alloc read event failed"); } // // // TAILQ_INSERT_TAIL(p_work_info->entries,client,); //初始化事件 event_set(client->ev_read,client_fd,EV_READ|EV_PERSIST,on_read,client); event_base_set(p_work_info->base,client->ev_read); event_set(client->ev_write,client_fd,EV_WRITE,on_write,client); event_base_set(p_work_info->base,client->ev_write); //工做線程註冊讀事件,寫事件在須要時再註冊 event_add(client->ev_read,NULL); } void destroy(){ }