用 acl::master_aio 類編寫高併發非阻塞服務器程序

      在文章《使用 acl::master_threads 類編寫多進程多線程服務器程序》和《使用 acl::master_proc 類編寫多進程服務器程序》中分別講述瞭如何編寫 LINUX 平臺下阻塞式服務器程序的多線程和多進程方式。雖然這兩種模式均可以處理併發任務,而且效率也不低,可是畢竟線程和進程資源是操做系統的寶貴資源,若是要支持很是高的併發請求,則會由於系統限制而不能建立更多的進程或線程。你們常見的 webserver Nginx 就是以支持高併發而聞名,Nginx 自己就是非阻塞設計的典型應用,固然還有不少其它服務器程序也是非阻塞的:ircd、Squid、Varnish、Bind 等。html

      由於非阻塞程序編寫有必定難度,因此如今有人在寫非阻塞程序時不得不轉向 Erlang、Nodejs 等一些腳本式語言,要想用 C/C++ 來實現非阻塞程序仍是有些難度:系統級 API 過於底層、容易出錯、難於調試。雖然也有一些 C++ 庫(象ACE)提供了非阻塞庫,但這些庫的入門門檻仍是挺高的。根據本人多年在非阻塞編程方面的經驗,總結出一套比較實用的 C/C++ 非阻塞函數庫,本文就講述若是使用 acl_cpp 庫中的 master_aio 類來編寫非阻塞服務器程序(該服務器程序能夠規定啓動最大進程的個數,其中每一個進程是一個獨立的非阻塞進程)。在文章《非阻塞網絡編程實例講解》中給出了一個非阻塞網絡程序的示例,您能夠參考一下。web

      1、類接口定義編程

      master_aio 是一個純虛類,其中定義的接口須要子類實現,子類實例必須只能建立一個實例,接口以下:服務器

 

/**
		 * 純虛函數:當接收到一個客戶端鏈接時調用此函數
		 * @param stream {aio_socket_stream*} 新接收到的客戶端異步流對象
		 * @return {bool} 該函數若是返回 false 則通知服務器框架再也不接收
		 *  遠程客戶端鏈接,不然繼續接收客戶端鏈接
		 */
		virtual bool on_accept(aio_socket_stream* stream) = 0;

 

      master_aio 提供了四個外部方法,以下:網絡

 

/**
		 * 開始運行,調用該函數是指該服務進程是在 acl_master 服務框架
		 * 控制之下運行,通常用於生產機狀態
		 * @param argc {int} 從 main 中傳遞的第一個參數,表示參數個數
		 * @param argv {char**} 從 main 中傳遞的第二個參數
		 */
		void run_daemon(int argc, char** argv);

		/**
		 * 在單獨運行時的處理函數,用戶能夠調用此函數進行一些必要的調試工做
		 * @param addr {const char*} 服務監聽地址
		 * @param conf {const char*} 配置文件全路徑
		 * @param ht {aio_handle_type} 事件引擎的類型
		 * @return {bool} 監聽是否成功
		 */
		bool run_alone(const char* addr, const char* conf = NULL,
			aio_handle_type ht = ENGINE_SELECT);

		/**
		 * 得到異步IO的事件引擎句柄,經過此句柄,用戶能夠設置定時器等功能
		 * @return {aio_handle*}
		 */
		aio_handle* get_handle() const;

		/**
		 * 在 run_alone 模式下,通知服務器框架關閉引擎,退出程序
		 */
		void stop();

 

        從上面兩個函數,能夠看出 master_aio 類當在生產環境下(由 acl_master 進程統一控制調度),用戶須要調用 run_daemon 函數;若是用戶在開發過程當中須要手工進行調試,則能夠調用 run_alone 函數。多線程

        master_aio 的基類 master_base 的幾個虛接口以下:併發

 

/**
		 * 當進程切換用戶身份前調用的回調函數,能夠在此函數中作一些
		 * 用戶身份爲 root 的權限操做
		 */
		virtual void proc_pre_jail() {}

		/**
		 * 當進程切換用戶身份後調用的回調函數,此函數被調用時,進程
		 * 的權限爲普通受限級別
		 */
		virtual void proc_on_init() {}

		/**
		 * 當進程退出前調用的回調函數
		 */
		virtual void proc_on_exit() {}

		// 在 run_alone 狀態下運行前,調用此函數初始化一些配置

 

      基類的這幾個虛函數用戶能夠根據須要調用。框架

      另外,基類 master_base 還提供了幾個用來讀配置選項的函數:異步

 

/**
		 * 設置 bool 類型的配置項
		 * @param table {master_bool_tbl*}
		 */
		void set_cfg_bool(master_bool_tbl* table);

		/**
		 * 設置 int 類型的配置項
		 * @param table {master_int_tbl*}
		 */
		void set_cfg_int(master_int_tbl* table);

		/**
		 * 設置 int64 類型的配置項
		 * @param table {master_int64_tbl*}
		 */
		void set_cfg_int64(master_int64_tbl* table);

		/**
		 * 設置 字符串 類型的配置項
		 * @param table {master_str_tbl*}
		 */
		void set_cfg_str(master_str_tbl* table);

       2、示例源程序socket

 

// master_aio.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"
#include "lib_acl.hpp"

static char *var_cfg_debug_msg;

static acl::master_str_tbl var_conf_str_tab[] = {
	{ "debug_msg", "test_msg", &var_cfg_debug_msg },

	{ 0, 0, 0 }
};

static int  var_cfg_debug_enable;
static int  var_cfg_keep_alive;
static int  var_cfg_send_banner;

static acl::master_bool_tbl var_conf_bool_tab[] = {
	{ "debug_enable", 1, &var_cfg_debug_enable },
	{ "keep_alive", 1, &var_cfg_keep_alive },
	{ "send_banner", 1, &var_cfg_send_banner },

	{ 0, 0, 0 }
};

static int  var_cfg_io_timeout;

static acl::master_int_tbl var_conf_int_tab[] = {
	{ "io_timeout", 120, &var_cfg_io_timeout, 0, 0 },

	{ 0, 0 , 0 , 0, 0 }
};

static void (*format)(const char*, ...) = acl::log::msg1;

using namespace acl;

//////////////////////////////////////////////////////////////////////////
/**
 * 延遲讀回調處理類
 */
class timer_reader: public aio_timer_reader
{
public:
	timer_reader(int delay)
	{
		delay_ = delay;
		format("timer_reader init, delay: %d\r\n", delay);
	}

	~timer_reader()
	{
	}

	// aio_timer_reader 的子類必須重載 destroy 方法
	void destroy()
	{
		format("timer_reader delete, delay: %d\r\n", delay_);
		delete this;
	}

	// 重載基類回調方法
	virtual void timer_callback(unsigned int id)
	{
		format("timer_reader(%u): timer_callback, delay: %d\r\n", id, delay_);

		// 調用基類的處理過程
		aio_timer_reader::timer_callback(id);
	}

private:
	int   delay_;
};

/**
 * 延遲寫回調處理類
 */
class timer_writer: public aio_timer_writer
{
public:
	timer_writer(int delay)
	{
		delay_ = delay;
		format("timer_writer init, delay: %d\r\n", delay);
	}

	~timer_writer()
	{
	}

	// aio_timer_reader 的子類必須重載 destroy 方法
	void destroy()
	{
		format("timer_writer delete, delay: %d\r\n", delay_);
		delete this;
	}

	// 重載基類回調方法
	virtual void timer_callback(unsigned int id)
	{
		format("timer_writer(%u): timer_callback, delay: %u\r\n", id, delay_);

		// 調用基類的處理過程
		aio_timer_writer::timer_callback(id);
	}

private:
	int   delay_;
};

class timer_test : public aio_timer_callback
{
public:
	timer_test() {}
	~timer_test() {}
protected:
	// 基類純虛函數
	virtual void timer_callback(unsigned int id)
	{
		format("id: %u\r\n", id);
	}

	virtual void destroy(void)
	{
		delete this;
		format("timer delete now\r\n");
	}
private:
};
/**
 * 異步客戶端流的回調類的子類
 */
class io_callback : public aio_callback
{
public:
	io_callback(aio_socket_stream* client)
		: client_(client)
		, i_(0)
	{
	}

	~io_callback()
	{
		format("delete io_callback now ...\r\n");
	}

	/**
	 * 實現父類中的虛函數,客戶端流的讀成功回調過程
	 * @param data {char*} 讀到的數據地址
	 * @param len {int} 讀到的數據長度
	 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流
	 */
	virtual bool read_callback(char* data, int len)
	{
		if (++i_ < 10)
			format(">>gets(i: %d): %s\r\n", i_, data);

		// 若是遠程客戶端但願退出,則關閉之
		if (strncmp(data, "quit", 4) == 0)
		{
			client_->format("Bye!\r\n");
			client_->close();
		}

		// 若是遠程客戶端但願服務端也關閉,則停止異步事件過程
		else if (strncmp(data, "stop", 4) == 0)
		{
			client_->format("Stop now!\r\n");
			client_->close();  // 關閉遠程異步流

			// 通知異步引擎關閉循環過程
			client_->get_handle().stop();
		}

		// 向遠程客戶端回寫收到的數據

		int   delay = 0;

		if (strncmp(data, "write_delay", strlen("write_delay")) == 0)
		{
			// 延遲寫過程

			const char* ptr = data + strlen("write_delay");
			delay = atoi(ptr);
			if (delay > 0)
			{
				format(">> write delay %d second ...\r\n", delay);
				timer_writer* timer = new timer_writer(delay);
				client_->write(data, len, delay * 1000000, timer);
				client_->gets(10, false);
				return true;
			}
		}
		else if (strncmp(data, "read_delay", strlen("read_delay")) == 0)
		{
			// 延遲讀過程

			const char* ptr = data + strlen("read_delay");
			delay = atoi(ptr);
			if (delay > 0)
			{
				client_->write(data, len);
				format(">> read delay %d second ...\r\n", delay);
				timer_reader* timer = new timer_reader(delay);
				client_->gets(10, false, delay * 1000000, timer);
				return true;
			}
		}

		client_->write(data, len);
		//client_->gets(10, false);
		return true;
	}

	/**
	 * 實現父類中的虛函數,客戶端流的寫成功回調過程
	 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流
	 */
	virtual bool write_callback()
	{
		return true;
	}

	/**
	 * 實現父類中的虛函數,客戶端流的關閉回調過程
	 */
	virtual void close_callback()
	{
		// 必須在此處刪除該動態分配的回調類對象以防止內存泄露
		delete this;
	}

	/**
	 * 實現父類中的虛函數,客戶端流的超時回調過程
	 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流
	 */
	virtual bool timeout_callback()
	{
		format("Timeout ...\r\n");
		return true;
	}

private:
	aio_socket_stream* client_;
	int   i_;
};

//////////////////////////////////////////////////////////////////////////

class master_aio_test : public master_aio
{
public:
	master_aio_test() { timer_test_ = new timer_test(); }

	~master_aio_test() { handle_->keep_timer(false); }

protected:
	// 基類純虛函數:當接收到一個新的鏈接時調用此函數
	virtual bool on_accept(aio_socket_stream* client)
	{
		// 建立異步客戶端流的回調對象並與該異步流進行綁定
		io_callback* callback = new io_callback(client);

		// 註冊異步流的讀回調過程
		client->add_read_callback(callback);

		// 註冊異步流的寫回調過程
		client->add_write_callback(callback);

		// 註冊異步流的關閉回調過程
		client->add_close_callback(callback);

		// 註冊異步流的超時回調過程
		client->add_timeout_callback(callback);

		// 寫歡迎信息
		if (var_cfg_send_banner)
			client->format("hello, you're welcome\r\n");

		// 從異步流讀一行數據
		client->gets(10, false);
		//client->read();
		return true;
	}

	// 基類虛函數:服務進程切換用戶身份前調用此函數
	virtual void proc_pre_jail()
	{
		format("proc_pre_jail\r\n");
		// 只有當程序啓動後才能得到異步引擎句柄
		handle_ = get_handle();
		handle_->keep_timer(true); // 容許定時器被重複觸發
		// 設置第一個定時任務,每隔1秒觸發一次,定時任務ID爲0
		handle_->set_timer(timer_test_, 1000000, 0);
	}

	// 基類虛函數:服務進程切換用戶身份後調用此函數
	virtual void proc_on_init()
	{
		format("proc init\r\n");
		// 設置第二個定時任務,每隔2秒觸發一次,定時任務ID爲1
		handle_->set_timer(timer_test_, 2000000, 1);
	}

	// 基類虛函數:服務進程退出前調用此函數
	virtual void proc_on_exit()
	{
		format("proc exit\r\n");
	}
private:
	timer_test* timer_test_;
	aio_handle* handle_;
};
//////////////////////////////////////////////////////////////////////////

int main(int argc, char* argv[])
{
	master_aio_test ma;  // master_aio 要求只能起一個實例

	// 設置配置參數表
	ma.set_cfg_int(var_conf_int_tab);
	ma.set_cfg_int64(NULL);
	ma.set_cfg_str(var_conf_str_tab);
	ma.set_cfg_bool(var_conf_bool_tab);

	// 開始運行

	if (argc >= 2 && strcmp(argv[1], "alone") == 0)
	{
		const char* addr = "127.0.0.1:8888";

		if (argc >= 3)
			addr = argv[2];

		format = (void (*)(const char*, ...)) printf;
		format("listen: %s now\r\n", addr);
		ma.run_alone(addr);  // 單獨運行方式
	}
	else
		ma.run_daemon(argc, argv);  // acl_master 控制模式運行
	return 0;
}

 

       該示例主要實現幾個功能:接收一行數據並回寫該數據、延遲迴寫所讀的數據、延遲讀下一行數據、設置定時器的定時任務。由於 master_aio 類是按非阻塞模式設計的(其實,該類主要對 acl 庫中的非阻塞框架庫用 C++ 進行了封裝),因此該例子能夠支持很是大的併發請求。能夠經過配置文件指定系統事件 API 選擇採用 select/poll/epoll 中的一種、規定進程的空閒退出時間、預先啓動子進程的數量等參數。該例子所在目錄:acl_cpp/samples/master_aio。

      在這個例子中,當服務端接收到客戶端鏈接後,非阻塞框架會經過虛函數  on_accept 回調子類處理過程,子類須要在這個虛函數中須要將一個處理 IO 過程的類實例與這個非阻塞鏈接流進行綁定(經過 add_xxx_callback 方式),其中處理 IO 過程的類的基類定義爲:aio_callback,子類須要實現其中的虛方法。

 

      3、配置文件及程序安裝

      打開 acl_cpp/samples/master_aio/aio_echo.cf 配置文件,就其中幾個配置參數說明一下:

 

## 由 acl_master 用來控制服務進程池的配置項
# 爲 no 表示啓用該進程服務,爲 yes 表示禁止該服務進程
master_disable = no

# 表示本服務器進程監聽 127.0.0.1 的 8888 端口
master_service = 127.0.0.1:8888

# 表示是 TCP 套接口服務類型
master_type = inet

# 表示該服務進程池的最大進程數爲 2
master_maxproc = 2

# 須要預先啓動的進程數,該值不該大於 master_maxproc
master_prefork = 2

# 進程程序名
master_command = aio_echo

# 進程日誌記錄文件,其中 {install_path} 須要用實際的安裝路徑代替
master_log = {install_path}/var/log/aio_echo.log

 

# 用 select 進行循環時的時間間隔
# 單位爲秒
aio_delay_sec = 1
# 單位爲微秒
aio_delay_usec = 500
# 採用事件循環的方式: select(default), poll, kernel(epoll/devpoll/kqueue)
aio_event_mode = select
# 是否將 socket 接收與IO功能分開: yes/no, 若是爲 yes 能夠大大提升 accept() 速度
aio_accept_alone = yes
# 線程池的最大線程數, 若是該值爲0則表示採用單線程非阻塞模式.
aio_max_threads = 0
# 每一個線程的空閒時間.
aio_thread_idle_limit = 60
# 容許訪問的客戶端IP地址範圍
aio_access_allow = 10.0.0.1:255.255.255.255, 127.0.0.1:127.0.0.1
# 當 acl_master 退出時,若是該值置1則該程序不等全部鏈接處理完畢便當即退出
aio_quick_abort = 1

 

      例如當 acl_master 服務器框架程序的安裝目錄爲:/opt/acl,則:

      /opt/acl/libexec: 該目錄存儲服務器程序(acl_master 程序也存放在該目錄下);

      /opt/acl/conf:該目錄存放 acl_master 程序配置文件 main.cf;

      /opt/acl/conf/service:該目錄存放服務子進程的程序配置文件,該路徑由 main.cf 文件指定;

      /opt/acl/var/log:該目錄存放日誌文件;

      /opt/acl/var/pid:該目錄存放進程號文件。

      該程序編譯經過後,須要把可執行程序放在 /opt/acl/libexec 目錄下,把配置文件放在 /opt/acl/conf/service 目錄下。

      在 /opt/acl/sh 下有啓動/中止 acl_master 服務進程的控制腳本;運行腳本:./start.sh,而後請用下面方法檢查服務是否已經啓動:

      ps -ef|grep acl_master # 查看服務器控制進程是否已經啓動

      netstat -nap|grep LISTEN|grep 5001 # 查看服務端口號是否已經被監聽

      固然,您也能夠查看 /opt/acl/var/log/acl_master 日誌文件,查看服務進程的啓動過程及監聽服務是否正常監聽。

 

      能夠命令行以下測試:telnet 127.0.0.1 8888

       原文地址

       acl_cpp 下載

       acl_cpp 的編譯與使用

       更多文章

       我的微博:http://weibo.com/zsxxsz

       QQ 羣:242722074

相關文章
相關標籤/搜索