在文章《使用 acl::master_threads 類編寫多進程多線程服務器程序》和《使用 acl::master_proc 類編寫多進程服務器程序》中分別講述瞭如何編寫 LINUX 平臺下阻塞式服務器程序的多線程和多進程方式。雖然這兩種模式均可以處理併發任務,而且效率也不低,可是畢竟線程和進程資源是操做系統的寶貴資源,若是要支持很是高的併發請求,則會由於系統限制而不能建立更多的進程或線程。你們常見的 webserver Nginx 就是以支持高併發而聞名,Nginx 自己就是非阻塞設計的典型應用,固然還有不少其它服務器程序也是非阻塞的:ircd、Squid、Varnish、Bind 等。html
由於非阻塞程序編寫有必定難度,因此如今有人在寫非阻塞程序時不得不轉向 Erlang、Nodejs 等一些腳本式語言,要想用 C/C++ 來實現非阻塞程序仍是有些難度:系統級 API 過於底層、容易出錯、難於調試。雖然也有一些 C++ 庫(象ACE)提供了非阻塞庫,但這些庫的入門門檻仍是挺高的。根據本人多年在非阻塞編程方面的經驗,總結出一套比較實用的 C/C++ 非阻塞函數庫,本文就講述若是使用 acl_cpp 庫中的 master_aio 類來編寫非阻塞服務器程序(該服務器程序能夠規定啓動最大進程的個數,其中每一個進程是一個獨立的非阻塞進程)。在文章《非阻塞網絡編程實例講解》中給出了一個非阻塞網絡程序的示例,您能夠參考一下。web
1、類接口定義編程
master_aio 是一個純虛類,其中定義的接口須要子類實現,子類實例必須只能建立一個實例,接口以下:服務器
/** * 純虛函數:當接收到一個客戶端鏈接時調用此函數 * @param stream {aio_socket_stream*} 新接收到的客戶端異步流對象 * @return {bool} 該函數若是返回 false 則通知服務器框架再也不接收 * 遠程客戶端鏈接,不然繼續接收客戶端鏈接 */ virtual bool on_accept(aio_socket_stream* stream) = 0;
master_aio 提供了四個外部方法,以下:網絡
/** * 開始運行,調用該函數是指該服務進程是在 acl_master 服務框架 * 控制之下運行,通常用於生產機狀態 * @param argc {int} 從 main 中傳遞的第一個參數,表示參數個數 * @param argv {char**} 從 main 中傳遞的第二個參數 */ void run_daemon(int argc, char** argv); /** * 在單獨運行時的處理函數,用戶能夠調用此函數進行一些必要的調試工做 * @param addr {const char*} 服務監聽地址 * @param conf {const char*} 配置文件全路徑 * @param ht {aio_handle_type} 事件引擎的類型 * @return {bool} 監聽是否成功 */ bool run_alone(const char* addr, const char* conf = NULL, aio_handle_type ht = ENGINE_SELECT); /** * 得到異步IO的事件引擎句柄,經過此句柄,用戶能夠設置定時器等功能 * @return {aio_handle*} */ aio_handle* get_handle() const; /** * 在 run_alone 模式下,通知服務器框架關閉引擎,退出程序 */ void stop();
從上面兩個函數,能夠看出 master_aio 類當在生產環境下(由 acl_master 進程統一控制調度),用戶須要調用 run_daemon 函數;若是用戶在開發過程當中須要手工進行調試,則能夠調用 run_alone 函數。多線程
master_aio 的基類 master_base 的幾個虛接口以下:併發
/** * 當進程切換用戶身份前調用的回調函數,能夠在此函數中作一些 * 用戶身份爲 root 的權限操做 */ virtual void proc_pre_jail() {} /** * 當進程切換用戶身份後調用的回調函數,此函數被調用時,進程 * 的權限爲普通受限級別 */ virtual void proc_on_init() {} /** * 當進程退出前調用的回調函數 */ virtual void proc_on_exit() {} // 在 run_alone 狀態下運行前,調用此函數初始化一些配置
基類的這幾個虛函數用戶能夠根據須要調用。框架
另外,基類 master_base 還提供了幾個用來讀配置選項的函數:異步
/** * 設置 bool 類型的配置項 * @param table {master_bool_tbl*} */ void set_cfg_bool(master_bool_tbl* table); /** * 設置 int 類型的配置項 * @param table {master_int_tbl*} */ void set_cfg_int(master_int_tbl* table); /** * 設置 int64 類型的配置項 * @param table {master_int64_tbl*} */ void set_cfg_int64(master_int64_tbl* table); /** * 設置 字符串 類型的配置項 * @param table {master_str_tbl*} */ void set_cfg_str(master_str_tbl* table);
2、示例源程序socket
// master_aio.cpp : 定義控制檯應用程序的入口點。 // #include "stdafx.h" #include "lib_acl.hpp" static char *var_cfg_debug_msg; static acl::master_str_tbl var_conf_str_tab[] = { { "debug_msg", "test_msg", &var_cfg_debug_msg }, { 0, 0, 0 } }; static int var_cfg_debug_enable; static int var_cfg_keep_alive; static int var_cfg_send_banner; static acl::master_bool_tbl var_conf_bool_tab[] = { { "debug_enable", 1, &var_cfg_debug_enable }, { "keep_alive", 1, &var_cfg_keep_alive }, { "send_banner", 1, &var_cfg_send_banner }, { 0, 0, 0 } }; static int var_cfg_io_timeout; static acl::master_int_tbl var_conf_int_tab[] = { { "io_timeout", 120, &var_cfg_io_timeout, 0, 0 }, { 0, 0 , 0 , 0, 0 } }; static void (*format)(const char*, ...) = acl::log::msg1; using namespace acl; ////////////////////////////////////////////////////////////////////////// /** * 延遲讀回調處理類 */ class timer_reader: public aio_timer_reader { public: timer_reader(int delay) { delay_ = delay; format("timer_reader init, delay: %d\r\n", delay); } ~timer_reader() { } // aio_timer_reader 的子類必須重載 destroy 方法 void destroy() { format("timer_reader delete, delay: %d\r\n", delay_); delete this; } // 重載基類回調方法 virtual void timer_callback(unsigned int id) { format("timer_reader(%u): timer_callback, delay: %d\r\n", id, delay_); // 調用基類的處理過程 aio_timer_reader::timer_callback(id); } private: int delay_; }; /** * 延遲寫回調處理類 */ class timer_writer: public aio_timer_writer { public: timer_writer(int delay) { delay_ = delay; format("timer_writer init, delay: %d\r\n", delay); } ~timer_writer() { } // aio_timer_reader 的子類必須重載 destroy 方法 void destroy() { format("timer_writer delete, delay: %d\r\n", delay_); delete this; } // 重載基類回調方法 virtual void timer_callback(unsigned int id) { format("timer_writer(%u): timer_callback, delay: %u\r\n", id, delay_); // 調用基類的處理過程 aio_timer_writer::timer_callback(id); } private: int delay_; }; class timer_test : public aio_timer_callback { public: timer_test() {} ~timer_test() {} protected: // 基類純虛函數 virtual void timer_callback(unsigned int id) { format("id: %u\r\n", id); } virtual void destroy(void) { delete this; format("timer delete now\r\n"); } private: }; /** * 異步客戶端流的回調類的子類 */ class io_callback : public aio_callback { public: io_callback(aio_socket_stream* client) : client_(client) , i_(0) { } ~io_callback() { format("delete io_callback now ...\r\n"); } /** * 實現父類中的虛函數,客戶端流的讀成功回調過程 * @param data {char*} 讀到的數據地址 * @param len {int} 讀到的數據長度 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流 */ virtual bool read_callback(char* data, int len) { if (++i_ < 10) format(">>gets(i: %d): %s\r\n", i_, data); // 若是遠程客戶端但願退出,則關閉之 if (strncmp(data, "quit", 4) == 0) { client_->format("Bye!\r\n"); client_->close(); } // 若是遠程客戶端但願服務端也關閉,則停止異步事件過程 else if (strncmp(data, "stop", 4) == 0) { client_->format("Stop now!\r\n"); client_->close(); // 關閉遠程異步流 // 通知異步引擎關閉循環過程 client_->get_handle().stop(); } // 向遠程客戶端回寫收到的數據 int delay = 0; if (strncmp(data, "write_delay", strlen("write_delay")) == 0) { // 延遲寫過程 const char* ptr = data + strlen("write_delay"); delay = atoi(ptr); if (delay > 0) { format(">> write delay %d second ...\r\n", delay); timer_writer* timer = new timer_writer(delay); client_->write(data, len, delay * 1000000, timer); client_->gets(10, false); return true; } } else if (strncmp(data, "read_delay", strlen("read_delay")) == 0) { // 延遲讀過程 const char* ptr = data + strlen("read_delay"); delay = atoi(ptr); if (delay > 0) { client_->write(data, len); format(">> read delay %d second ...\r\n", delay); timer_reader* timer = new timer_reader(delay); client_->gets(10, false, delay * 1000000, timer); return true; } } client_->write(data, len); //client_->gets(10, false); return true; } /** * 實現父類中的虛函數,客戶端流的寫成功回調過程 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流 */ virtual bool write_callback() { return true; } /** * 實現父類中的虛函數,客戶端流的關閉回調過程 */ virtual void close_callback() { // 必須在此處刪除該動態分配的回調類對象以防止內存泄露 delete this; } /** * 實現父類中的虛函數,客戶端流的超時回調過程 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流 */ virtual bool timeout_callback() { format("Timeout ...\r\n"); return true; } private: aio_socket_stream* client_; int i_; }; ////////////////////////////////////////////////////////////////////////// class master_aio_test : public master_aio { public: master_aio_test() { timer_test_ = new timer_test(); } ~master_aio_test() { handle_->keep_timer(false); } protected: // 基類純虛函數:當接收到一個新的鏈接時調用此函數 virtual bool on_accept(aio_socket_stream* client) { // 建立異步客戶端流的回調對象並與該異步流進行綁定 io_callback* callback = new io_callback(client); // 註冊異步流的讀回調過程 client->add_read_callback(callback); // 註冊異步流的寫回調過程 client->add_write_callback(callback); // 註冊異步流的關閉回調過程 client->add_close_callback(callback); // 註冊異步流的超時回調過程 client->add_timeout_callback(callback); // 寫歡迎信息 if (var_cfg_send_banner) client->format("hello, you're welcome\r\n"); // 從異步流讀一行數據 client->gets(10, false); //client->read(); return true; } // 基類虛函數:服務進程切換用戶身份前調用此函數 virtual void proc_pre_jail() { format("proc_pre_jail\r\n"); // 只有當程序啓動後才能得到異步引擎句柄 handle_ = get_handle(); handle_->keep_timer(true); // 容許定時器被重複觸發 // 設置第一個定時任務,每隔1秒觸發一次,定時任務ID爲0 handle_->set_timer(timer_test_, 1000000, 0); } // 基類虛函數:服務進程切換用戶身份後調用此函數 virtual void proc_on_init() { format("proc init\r\n"); // 設置第二個定時任務,每隔2秒觸發一次,定時任務ID爲1 handle_->set_timer(timer_test_, 2000000, 1); } // 基類虛函數:服務進程退出前調用此函數 virtual void proc_on_exit() { format("proc exit\r\n"); } private: timer_test* timer_test_; aio_handle* handle_; }; ////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { master_aio_test ma; // master_aio 要求只能起一個實例 // 設置配置參數表 ma.set_cfg_int(var_conf_int_tab); ma.set_cfg_int64(NULL); ma.set_cfg_str(var_conf_str_tab); ma.set_cfg_bool(var_conf_bool_tab); // 開始運行 if (argc >= 2 && strcmp(argv[1], "alone") == 0) { const char* addr = "127.0.0.1:8888"; if (argc >= 3) addr = argv[2]; format = (void (*)(const char*, ...)) printf; format("listen: %s now\r\n", addr); ma.run_alone(addr); // 單獨運行方式 } else ma.run_daemon(argc, argv); // acl_master 控制模式運行 return 0; }
該示例主要實現幾個功能:接收一行數據並回寫該數據、延遲迴寫所讀的數據、延遲讀下一行數據、設置定時器的定時任務。由於 master_aio 類是按非阻塞模式設計的(其實,該類主要對 acl 庫中的非阻塞框架庫用 C++ 進行了封裝),因此該例子能夠支持很是大的併發請求。能夠經過配置文件指定系統事件 API 選擇採用 select/poll/epoll 中的一種、規定進程的空閒退出時間、預先啓動子進程的數量等參數。該例子所在目錄:acl_cpp/samples/master_aio。
在這個例子中,當服務端接收到客戶端鏈接後,非阻塞框架會經過虛函數 on_accept 回調子類處理過程,子類須要在這個虛函數中須要將一個處理 IO 過程的類實例與這個非阻塞鏈接流進行綁定(經過 add_xxx_callback 方式),其中處理 IO 過程的類的基類定義爲:aio_callback,子類須要實現其中的虛方法。
3、配置文件及程序安裝
打開 acl_cpp/samples/master_aio/aio_echo.cf 配置文件,就其中幾個配置參數說明一下:
## 由 acl_master 用來控制服務進程池的配置項 # 爲 no 表示啓用該進程服務,爲 yes 表示禁止該服務進程 master_disable = no # 表示本服務器進程監聽 127.0.0.1 的 8888 端口 master_service = 127.0.0.1:8888 # 表示是 TCP 套接口服務類型 master_type = inet # 表示該服務進程池的最大進程數爲 2 master_maxproc = 2 # 須要預先啓動的進程數,該值不該大於 master_maxproc master_prefork = 2 # 進程程序名 master_command = aio_echo # 進程日誌記錄文件,其中 {install_path} 須要用實際的安裝路徑代替 master_log = {install_path}/var/log/aio_echo.log
# 用 select 進行循環時的時間間隔 # 單位爲秒 aio_delay_sec = 1 # 單位爲微秒 aio_delay_usec = 500 # 採用事件循環的方式: select(default), poll, kernel(epoll/devpoll/kqueue) aio_event_mode = select # 是否將 socket 接收與IO功能分開: yes/no, 若是爲 yes 能夠大大提升 accept() 速度 aio_accept_alone = yes # 線程池的最大線程數, 若是該值爲0則表示採用單線程非阻塞模式. aio_max_threads = 0 # 每一個線程的空閒時間. aio_thread_idle_limit = 60 # 容許訪問的客戶端IP地址範圍 aio_access_allow = 10.0.0.1:255.255.255.255, 127.0.0.1:127.0.0.1 # 當 acl_master 退出時,若是該值置1則該程序不等全部鏈接處理完畢便當即退出 aio_quick_abort = 1
例如當 acl_master 服務器框架程序的安裝目錄爲:/opt/acl,則:
/opt/acl/libexec: 該目錄存儲服務器程序(acl_master 程序也存放在該目錄下);
/opt/acl/conf:該目錄存放 acl_master 程序配置文件 main.cf;
/opt/acl/conf/service:該目錄存放服務子進程的程序配置文件,該路徑由 main.cf 文件指定;
/opt/acl/var/log:該目錄存放日誌文件;
/opt/acl/var/pid:該目錄存放進程號文件。
該程序編譯經過後,須要把可執行程序放在 /opt/acl/libexec 目錄下,把配置文件放在 /opt/acl/conf/service 目錄下。
在 /opt/acl/sh 下有啓動/中止 acl_master 服務進程的控制腳本;運行腳本:./start.sh,而後請用下面方法檢查服務是否已經啓動:
ps -ef|grep acl_master # 查看服務器控制進程是否已經啓動
netstat -nap|grep LISTEN|grep 5001 # 查看服務端口號是否已經被監聽
固然,您也能夠查看 /opt/acl/var/log/acl_master 日誌文件,查看服務進程的啓動過程及監聽服務是否正常監聽。
能夠命令行以下測試:telnet 127.0.0.1 8888
QQ 羣:242722074