轉 http://www.cnblogs.com/Seapeak/archive/2010/04/08/1707807.html html
在linux平臺上使用c開發網絡程序的同志們通常狀況下都對鼎鼎大名的libevent很是的熟悉了。可是一些新進入此領域的new new people們對此都是一頭霧水。本來的迷茫再加上開源軟件一向的「幫助文件」缺失做風,讓咱們這些新手們顯的很是的無助。幸虧有一些熱心的朋友們幫忙,才化險爲夷啊!linux
前幾天一直在開發一個locker server,雖然公司現有的locker server能很好的運轉,可是畢竟是net的,通用性不廣,當咱們要在linux上開發多集羣系統的時候現有的locker server就未免顯得有點捉襟見肘了。正是在開發locker server的過程當中使用到了libevent。apache
整體上,libevent是很好用的。一兩個函數就能搞定你複雜的網絡通信工做了。固然了,這句話得用在你使用的是「單線程」的狀況下。雖然在linux系統中,進程的資源和window系統中進程的資源相比輕量級不少,代價也至關的沒有那麼昂貴,因此不少的軟件都是使用「多進程」方式實現的,好比大名鼎鼎的apache。可是在咱們的系統中,咱們使用了「單進程多線程」的方式,這樣,咱們就能在單機上啓動多個進程,以達到「僞分佈式」的效果來達到測試的目的。安全
那麼這個時候就要注意libevent的使用了,由於對於event_base來講,不是線程安全的。也就是說多線程不能share同一個event_base,就算是加鎖操做也不行。那麼這個時候就只能採起「單線程單event_base」的策略了。個人作法是作一個task pool(任務對象池),每一個任務會被一個thread執行,固然了,thread確定也是從thread pool拿出來的,而在task pool初始化的時候,我就給每一個task中的event_base初始化了對象,這樣,萬事大吉了。網絡
這個地方注意了之後,就開始說網絡通信了。在使用libevent的時候,觸發事件是在接收到網絡鏈接(或者timeout事件超時)的時候。因此你須要在事件處理函數中判斷時間源,其次libevent接收網絡通信的字節流時是使用了libevnet中自帶的緩衝的,因此當你接收的時候必定要注意累加,而且屢次loop或者註冊 event_event中的事件。因此在個人task中,會有接收的data。固然了若是你的協議是分爲header和body的,一般header比較短,body比較長,並且在client,header和body一般是連續發送的,這樣,在使用libevent的時候,header和body是同時被接收到的,這點必定要注意,因此提醒你在接收數據的函數中,須要區分接收header部分仍是body部分;當body很是長,超過libevent的緩衝時,是須要屢次屢次觸發接收函數的,這點也要注意,就是讓你須要在接收的時候除了區分header和body之外,還要注意一次接收不徹底的狀況下,對於數據須要累加。多線程
當你在使用libevent時,event_set事件時,只要不是使用EV_PERSIST註冊的事件是不須要在接收完一次數據後屢次event_add的,只有當你不使用EV_PERSIST時,你的事件才須要屢次event_add到event_base中;固然了,使用了EV_PERSIST註冊的函數在event_base被task pool回收時是要顯式的event_del該註冊事件的,沒有使用EV_PERSIST註冊的事件是不須要顯式的使用event_del刪除該事件的。less
static void read_buffer(int client_socket_fd,short event_type,void *arg) { if(NULL == arg) { log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__); return; } task_info_t *task_info = (task_info_t *) arg; if(event_type == EV_TIMEOUT) /* 這個地方注意須要判斷是否超時 由於我event_add事件的時候沒有使用ev_persist 因此當超時時須要再add一次事件到event_base的loop中 */ { if(0 != event_add(&task_info->on_read,&task_info->timeout)) { log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error."); close(task_info->on_read.ev_fd); task_pool_push(task_info); } return; } int bytes; /* 這個地方就是開始接收頭部 接收頭部時,可能分爲好幾回從緩衝中取得,因此須要一個while累加 */ while(header == task_info->read_type)//recv header { bytes = recv(client_socket_fd,task_info->header_buffer+task_info->offset,REQUEST_LENGTH -task_info->offset,0); if(0 > bytes ) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s", __LINE__, errno, strerror(errno)); close(task_info->on_read.ev_fd); task_pool_push(task_info); } return; } else if(0 == bytes) { log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.", __LINE__); close(task_info->on_read.ev_fd); task_pool_push(task_info); return; } if(REQUEST_LENGTH > bytes+task_info->offset) { log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__); task_info->offset += bytes; if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { task_info->read_type = body; deal_request_header(task_info); task_info->body_buffer = (char *) malloc(task_info->request_info.length); if(NULL == task_info->body_buffer) { log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__); close(client_socket_fd); task_pool_push(task_info); return; } memset(task_info->body_buffer,0,task_info->request_info.length); task_info->offset = 0;//set recv body buffer offset to 0 break; } } /* 這個地方就是開始接收body, 和header同樣,也要考慮body屢次接收累加的狀況。 */ while(body == task_info->read_type) { bytes = recv(client_socket_fd,task_info->body_buffer+task_info->offset,task_info->request_info.length-task_info->offset,0); if(0 > bytes ) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s", __LINE__, errno, strerror(errno)); close(task_info->on_read.ev_fd); task_pool_push(task_info); } return; } else if(0 == bytes) { log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.", __LINE__); close(task_info->on_read.ev_fd); task_pool_push(task_info); return; } if(task_info->request_info.length-task_info->offset > bytes) { log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__); task_info->offset += bytes; if(0 != event_add(&task_info->on_read, &task_info->timeout)) { close(task_info->on_read.ev_fd); task_pool_push(task_info); log_error("File: "__FILE__", line: %d, "\ "event_add fail.", __LINE__); return; } } else { task_info->read_type = unspecified; break; } } deal_request_body(client_socket_fd,task_info); return; }
void deal_working_thread(void *arg) { log_info("debug to this."); int client_socket_fd = (int) arg; if(0 > client_socket_fd) { log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__); return; } /* 設置網絡爲非阻塞,libevent必須的 */ if(!set_nonblocking(client_socket_fd)) { log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!", __LINE__,strerror(errno)); close(client_socket_fd); return; } task_info_t *task_info; task_info = task_pool_pop(); /* 對event_base註冊事件回調函數, 注意沒有使用EV_PERSIST */ do { task_info->read_type = header; event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info); if(0 != event_base_set(task_info->event_base,&task_info->on_read)) { log_error("File:"__FILE__",Line:%d.Associate the read header event to event_base is error.",__LINE__); task_info->read_type = unspecified; close(client_socket_fd); task_pool_push(task_info); break; } event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info); if(0 != event_base_set(task_info->event_base,&task_info->on_write)) { log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__); task_info->read_type = unspecified; close(client_socket_fd); task_pool_push(task_info); break; } if(0 != event_add(&task_info->on_read,&task_info->timeout)) { log_error("File:"__FILE__",Line:%d.add the read header event to event_base is error.",__LINE__); task_info->read_type = unspecified; close(client_socket_fd); task_pool_push(task_info); break; } event_base_loop(task_info->event_base,EVLOOP_NONBLOCK); }while(false); return; }