在開發Linux網絡程序時,一般須要維護多個定時器,如維護客戶端心跳時間、檢查多個數據包的超時重傳等。若是採用Linux的SIGALARM信號實現,則會帶來較大的系統開銷,且不便於管理。網絡
本文在應用層實現了一個基於時間堆的高性能定時器,同時考慮到定時的粒度問題,因爲經過alarm系統調用設置的SIGALARM信號只能以秒爲單位觸發,所以須要採用其它手段實現更細粒度的定時操做,固然,這裏不考慮使用多線程+sleep的實現方法,理由性能過低。多線程
一般的作法還有采用基於升序的時間鏈表,但升序時間鏈表的插入操做效率較低,須要遍歷鏈表。所以本實現方案使用最小堆來維護多個定時器,插入O(logn)、刪除O(1)、查找O(1)的效率較高。併發
首先是每一個定時器的定義:函數
class heap_timer
{
public:
heap_timer( int ms_delay )
{
gettimeofday( &expire, NULL );
expire.tv_usec += ms_delay * 1000;
if ( expire.tv_usec > 1000000 )
{
expire.tv_sec += expire.tv_usec / 1000000;
expire.tv_usec %= 1000000;
}
}
public:
struct timeval expire;
void (*cb_func)( client_data* );
client_data* user_data;
~heap_timer()
{
delete user_data;
}
};性能
包括一個超時時間expire、超時回調函數cb_func以及一個user_data變量,user_data用於存儲與定時器相關的用戶數據,用戶數據能夠根據不一樣的應用場合進行修改,這裏實現的是一個智能博物館的網關,網關接收來自zigbee協調器的用戶數據,併爲每一個用戶維護一段等待時間T,在T到來以前,同一個用戶的全部數據都存放到user_data的target_list中,當T到來時,根據target_list列表選擇一個適當的target併發送到ip_address,同時刪除定時器(有點扯遠了=。=)。總之,要實現的功能就是給每一個用戶維護一個定時器,定時值到來時作一些操做。
class client_data
{
public:
client_data(char *address):target_count(0)
{
strcpy(ip_address,address);
}
private:
char ip_address[32];
target target_list[64];
int target_count;
......
};
如下是時間堆的類定義,包括了一些基本的堆操做:插入、刪除、擴容,還包括了定時器溢出時的操做函數tick().net
class time_heap
{
public:
time_heap( int cap = 1) throw ( std::exception )
: capacity( cap ), cur_size( 0 )
{
array = new heap_timer* [capacity];
if ( ! array )
{
throw std::exception();
}
for( int i = 0; i < capacity; ++i )
{
array[i] = NULL;
}
}
~time_heap()
{
for ( int i = 0; i < cur_size; ++i )
{
delete array[i];
}
delete [] array;
}
public:
int get_cursize()
{
return cur_size;
}
void add_timer( heap_timer* timer ) throw ( std::exception )
{
if( !timer )
{
return;
}
if( cur_size >= capacity )
{
resize();
}
int hole = cur_size++;
int parent = 0;
for( ; hole > 0; hole=parent )
{
parent = (hole-1)/2;
if ( timercmp( &(array[parent]->expire), &(timer->expire), <= ) )
{
break;
}
array[hole] = array[parent];
}
array[hole] = timer;
}
void del_timer( heap_timer* timer )
{
if( !timer )
{
return;
}
// lazy delelte
timer->cb_func = NULL;
}
int top(struct timeval &time_top) const
{
if ( empty() )
{
return 0;
}
time_top = array[0]->expire;
return 1;
}
void pop_timer()
{
if( empty() )
{
return;
}
if( array[0] )
{
delete array[0];
array[0] = array[--cur_size];
percolate_down( 0 );
}
}
void tick()
{
heap_timer* tmp = array[0];
struct timeval cur;
gettimeofday( &cur, NULL );
while( !empty() )
{
if( !tmp )
{
break;
}
if( timercmp( &cur, &(tmp->expire), < ) )
{
break;
}
if( array[0]->cb_func )
{
array[0]->cb_func( array[0]->user_data );
}
pop_timer();
tmp = array[0];
}
}
bool empty() const
{
return cur_size == 0;
}
heap_timer** get_heap_array()
{
return array;
}
private:
void percolate_down( int hole )
{
heap_timer* temp = array[hole];
int child = 0;
for ( ; ((hole*2+1) <= (cur_size-1)); hole=child )
{
child = hole*2+1;
if ( (child < (cur_size-1)) && timercmp( &(array[child+1]->expire), &(array[child]->expire), < ) )
{
++child;
}
if ( timercmp( &(array[child]->expire), &(temp->expire), < ) )
{
array[hole] = array[child];
}
else
{
break;
}
}
array[hole] = temp;
}
void resize() throw ( std::exception )
{
heap_timer** temp = new heap_timer* [2*capacity];
for( int i = 0; i < 2*capacity; ++i )
{
temp[i] = NULL;
}
if ( ! temp )
{
throw std::exception();
}
capacity = 2*capacity;
for ( int i = 0; i < cur_size; ++i )
{
temp[i] = array[i];
}
delete [] array;
array = temp;
}
private:
heap_timer** array;
int capacity;
int cur_size;
};
如何用epoll實現多個定時器的操做是本設計的關鍵,咱們知道,epoll_wait的最後一個參數是阻塞等待的時候,單位是毫秒。能夠這樣設計:線程
一、當時間堆中沒有定時器時,epoll_wait的超時時間T設爲-1,表示一直阻塞等待新用戶的到來;設計
二、當時間堆中有定時器時,epoll_wait的超時時間T設爲最小堆堆頂的超時值,這樣能夠保證讓最近觸發的定時器能得以執行;server
三、在epoll_wait阻塞等待期間,如有其它的用戶到來,則epoll_wait返回n>0,進行常規的處理,隨後應從新設置epoll_wait爲小頂堆堆頂的超時時間。blog
爲此,本實現對epoll_wait進行了封裝,名爲tepoll_wait,調用接口與epoll_wait差很少,但返回值有所不一樣:tepoll_wait不返回n=0的狀況(即超時),由於超時事件在tepoll_wait中進行處理,只有等到n>0(即在等待過程當中有用戶數據到來)或者n<0(出現錯誤)才進行返回。
廢話很少說,看代碼最清楚:
void timer_handler()
{
heap.tick();
//setalarm();
}
/* tselect - select with timers */
int tepoll_wait( int epollfd, epoll_event *events, int max_event_number )
{
struct timeval now;
struct timeval tv;
struct timeval *tvp;
//tevent_t *tp;
int n;
for ( ;; )
{
if ( gettimeofday( &now, NULL ) < 0 )
perror("gettimeofday");
struct timeval time_top;
if ( heap.top(time_top) )
{
tv.tv_sec = time_top.tv_sec - now.tv_sec;;
tv.tv_usec = time_top.tv_usec - now.tv_usec;
if ( tv.tv_usec < 0 )
{
tv.tv_usec += 1000000;
tv.tv_sec--;
}
tvp = &tv;
}
else
tvp = NULL;
if(tvp == NULL)
n = epoll_wait( epollfd, events, max_event_number, -1 );
else
n = epoll_wait( epollfd, events, max_event_number, tvp->tv_sec*1000 + tvp->tv_usec/1000 );
if ( n < 0 )
return -1;
if ( n > 0 )
return n;
timer_handler();
}
}
代碼一目瞭然,在tepoll_wait中,是個死循環,只有等到上述兩種狀況發生時,才進行返回,此時在調用方進行處理,處理過程跟epoll_wait同樣。
while( !stop_server )
{
number = tepoll_wait( epollfd, events, MAX_EVENT_NUMBER);
for ( i= 0; i < number; i++ )
{
int fd = events[i].data.fd;
if ( (events[i].events & EPOLLIN)&& (fd == uart_fd) )
{
//讀取用戶數據
if( (timer_id = find_exist_timer(ip_address)) != -1)
{
//add to the exist timer
heap_timer ** heap_array = heap.get_heap_array();
heap_array[timer_id]->user_data->add_target(RSSI,target_id);
continue;
}
//new timer
heap_timer *timer = new heap_timer(200);
timer->cb_func = cb_func;
timer->user_data = new client_data(ip_address);
timer->user_data->add_target(RSSI,target_id);
heap.add_timer(timer);
}
else if( ( fd == pipefd[0] ) && ( events[i].events & EPOLLIN ) )
{
//此處進行了統一信號源處理,經過雙向管道來獲取SIGTERM以及SIGINT的信號,在主循環中進行統一處理
char signals[1024];
ret = recv( pipefd[0], signals, sizeof( signals ), 0 );
if( ret == -1 )
{
continue;
}
else if( ret == 0 )
{
continue;
}
else
{
for( int i = 0; i < ret; ++i )
{
switch( signals[i] )
{
case SIGTERM:
case SIGINT:
{
stop_server = true;
}
}
}
}
}
}
}
實測效果很是不錯,歡迎分享!
———————————————— 版權聲明:本文爲CSDN博主「Issacluo」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接及本聲明。 原文連接:https://blog.csdn.net/gbjj123/article/details/25155501