libevent多線程使用事項

轉 http://www.cnblogs.com/Seapeak/archive/2010/04/08/1707807.html  html

 

在linux平臺上使用c開發網絡程序的同志們通常狀況下都對鼎鼎大名的libevent很是的熟悉了。可是一些新進入此領域的new new people們對此都是一頭霧水。本來的迷茫再加上開源軟件一向的「幫助文件」缺失做風,讓咱們這些新手們顯的很是的無助。幸虧有一些熱心的朋友們幫忙,才化險爲夷啊!linux

    前幾天一直在開發一個locker server,雖然公司現有的locker server能很好的運轉,可是畢竟是net的,通用性不廣,當咱們要在linux上開發多集羣系統的時候現有的locker server就未免顯得有點捉襟見肘了。正是在開發locker server的過程當中使用到了libevent。apache

    整體上,libevent是很好用的。一兩個函數就能搞定你複雜的網絡通信工做了。固然了,這句話得用在你使用的是「單線程」的狀況下。雖然在linux系統中,進程的資源和window系統中進程的資源相比輕量級不少,代價也至關的沒有那麼昂貴,因此不少的軟件都是使用「多進程」方式實現的,好比大名鼎鼎的apache。可是在咱們的系統中,咱們使用了「單進程多線程」的方式,這樣,咱們就能在單機上啓動多個進程,以達到「僞分佈式」的效果來達到測試的目的。安全

      那麼這個時候就要注意libevent的使用了,由於對於event_base來講,不是線程安全的。也就是說多線程不能share同一個event_base,就算是加鎖操做也不行。那麼這個時候就只能採起「單線程單event_base」的策略了。個人作法是作一個task pool(任務對象池),每一個任務會被一個thread執行,固然了,thread確定也是從thread pool拿出來的,而在task pool初始化的時候,我就給每一個task中的event_base初始化了對象,這樣,萬事大吉了。網絡

      這個地方注意了之後,就開始說網絡通信了。在使用libevent的時候,觸發事件是在接收到網絡鏈接(或者timeout事件超時)的時候。因此你須要在事件處理函數中判斷時間源,其次libevent接收網絡通信的字節流時是使用了libevnet中自帶的緩衝的,因此當你接收的時候必定要注意累加,而且屢次loop或者註冊 event_event中的事件。因此在個人task中,會有接收的data。固然了若是你的協議是分爲header和body的,一般header比較短,body比較長,並且在client,header和body一般是連續發送的,這樣,在使用libevent的時候,header和body是同時被接收到的,這點必定要注意,因此提醒你在接收數據的函數中,須要區分接收header部分仍是body部分;當body很是長,超過libevent的緩衝時,是須要屢次屢次觸發接收函數的,這點也要注意,就是讓你須要在接收的時候除了區分header和body之外,還要注意一次接收不徹底的狀況下,對於數據須要累加。多線程

      當你在使用libevent時,event_set事件時,只要不是使用EV_PERSIST註冊的事件是不須要在接收完一次數據後屢次event_add的,只有當你不使用EV_PERSIST時,你的事件才須要屢次event_add到event_base中;固然了,使用了EV_PERSIST註冊的函數在event_base被task pool回收時是要顯式的event_del該註冊事件的,沒有使用EV_PERSIST註冊的事件是不須要顯式的使用event_del刪除該事件的。less

      

static void  read_buffer(int client_socket_fd,short event_type,void *arg)
{
	if(NULL == arg)
	{
		log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__);
		return;
	}
	task_info_t *task_info = (task_info_t *) arg;

	if(event_type == EV_TIMEOUT)
	/*
	這個地方注意須要判斷是否超時
	由於我event_add事件的時候沒有使用ev_persist
	因此當超時時須要再add一次事件到event_base的loop中
	*/
	{
		if(0 != event_add(&task_info->on_read,&task_info->timeout))
		{
			log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error.");
			close(task_info->on_read.ev_fd);
			task_pool_push(task_info);

		}
		return;
	}

	int bytes;
	/*
	這個地方就是開始接收頭部
	接收頭部時,可能分爲好幾回從緩衝中取得,因此須要一個while累加
	*/
	while(header == task_info->read_type)//recv header
	{
		bytes = recv(client_socket_fd,task_info->header_buffer+task_info->offset,REQUEST_LENGTH -task_info->offset,0);
		if(0 > bytes )
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				if(0 != event_add(&task_info->on_read, &task_info->timeout))
				{
					close(task_info->on_read.ev_fd);
					task_pool_push(task_info);

					log_error("File: "__FILE__", line: %d, "\
						"event_add fail.", __LINE__);
					return;
				}
			}
			else
			{
				log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
					__LINE__, errno, strerror(errno));

				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
			}
			return;
		}
		else if(0 == bytes)
		{
			log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
					__LINE__);
			close(task_info->on_read.ev_fd);
			task_pool_push(task_info);
			return;
		}

		if(REQUEST_LENGTH > bytes+task_info->offset)
		{
			log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__);
			task_info->offset += bytes;
			if(0 != event_add(&task_info->on_read, &task_info->timeout))
			{
				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
				log_error("File: "__FILE__", line: %d, "\
					"event_add fail.", __LINE__);
				return;
			}
		}
		else
		{
			task_info->read_type = body;
			deal_request_header(task_info);
			task_info->body_buffer = (char *) malloc(task_info->request_info.length);
			if(NULL == task_info->body_buffer)
			{
				log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__);
				close(client_socket_fd);
				task_pool_push(task_info);
				return;
			}
			memset(task_info->body_buffer,0,task_info->request_info.length);
			task_info->offset = 0;//set recv body buffer offset to 0
			break;
		}
	}

	/*
	這個地方就是開始接收body,
	和header同樣,也要考慮body屢次接收累加的狀況。
	*/
	while(body == task_info->read_type)
	{
		bytes = recv(client_socket_fd,task_info->body_buffer+task_info->offset,task_info->request_info.length-task_info->offset,0);
		if(0 > bytes )
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				if(0 != event_add(&task_info->on_read, &task_info->timeout))
				{
					close(task_info->on_read.ev_fd);
					task_pool_push(task_info);

					log_error("File: "__FILE__", line: %d, "\
						"event_add fail.", __LINE__);
					return;
				}
			}
			else
			{
				log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
					__LINE__, errno, strerror(errno));

				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
			}
			return;
		}
		else if(0 == bytes)
		{
			log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
					__LINE__);
			close(task_info->on_read.ev_fd);
			task_pool_push(task_info);
			return;
		}

		if(task_info->request_info.length-task_info->offset > bytes)
		{
			log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__);
			task_info->offset += bytes;
			if(0 != event_add(&task_info->on_read, &task_info->timeout))
			{
				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
				log_error("File: "__FILE__", line: %d, "\
					"event_add fail.", __LINE__);
				return;
			}
		}
		else
		{
			task_info->read_type = unspecified;
			break;
		}
	}
	deal_request_body(client_socket_fd,task_info);
	return;
}

 

void deal_working_thread(void *arg)
{
	log_info("debug to this.");
	int client_socket_fd = (int) arg;
	if(0 > client_socket_fd)
	{
		log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__);
		return;
	}
	/*
	設置網絡爲非阻塞,libevent必須的
	*/
	if(!set_nonblocking(client_socket_fd))
	{
		log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!",
				__LINE__,strerror(errno));
		close(client_socket_fd);
		return;
	}

	task_info_t *task_info;
	task_info = task_pool_pop();
	/*
	對event_base註冊事件回調函數,
	注意沒有使用EV_PERSIST
	*/
	do
	{
		task_info->read_type = header;
		event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info);
		if(0 != event_base_set(task_info->event_base,&task_info->on_read))
		{
			log_error("File:"__FILE__",Line:%d.Associate the read header event to  event_base is error.",__LINE__);
			task_info->read_type = unspecified;
			close(client_socket_fd);
			task_pool_push(task_info);
			break;
		}

		event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info);
		if(0 != event_base_set(task_info->event_base,&task_info->on_write))
		{
			log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__);
			task_info->read_type = unspecified;
			close(client_socket_fd);
			task_pool_push(task_info);
			break;
		}


		if(0 != event_add(&task_info->on_read,&task_info->timeout))
		{
			log_error("File:"__FILE__",Line:%d.add the read header event to  event_base is error.",__LINE__);
			task_info->read_type = unspecified;
			close(client_socket_fd);
			task_pool_push(task_info);
			break;
		}

		event_base_loop(task_info->event_base,EVLOOP_NONBLOCK);
	}while(false);
	return;
}
相關文章
相關標籤/搜索