非阻塞編程主要解決了網絡通信中高併發的問題,採用非阻塞方式,服務器沒必要爲每一個鏈接啓動單獨的進程或線程,從而大大地減小了系統資源的浪費;可是現實網絡應用中,阻塞應用又是不可避免的,如咱們對數據庫編程時使用的數據操做的客戶端庫自己就是阻塞的。所以,單純的非阻塞模式或阻塞模式均不能很好地勝任互聯應用,若是可以將一些必要的阻塞過程融合進非阻塞過程當中將會是一個現實的需求。本文主要介紹瞭如何保證在使用 acl_cpp 的非阻塞框架的同時,能夠把阻塞的過程與非阻塞過程進行整合。關於非阻塞編程,能夠參考 acl_cpp開發--非阻塞網絡編程 中的章節。ios
本文討論的內容是創建在 acl_cpp 非阻塞模塊的IPC通訊機制 內容的基礎之上,有兩個基礎類:ipc_service 類用來粘合阻塞與非阻塞過程,ipc_request 作爲阻塞調用過程的基礎類,提供了須要子線程中進行阻塞式過程的虛方法。數據庫
ipc_service 類的繼承關係圖以下:編程
能夠看出,ipc_service 是從 ipc_server 類繼承而來,在 ipc_service 中定義了受保護的成員方法: request(ipc_request*) ,它將被子類對象調用向子線程或線程池發送任務請求;同時在 ipc_service 中還專門針對基於 win32 消息窗口定義了虛方法:win32_proc,該虛方法被子類繼承後用於主窗口過程接收來自於子線程的結果處理消息;另外,在該圖中還專門封裝了三個實際應用的子類 http_service、db_service、dns_service。在 ipc_service 的構造函數能夠指定任務線程池中最大線程數,同時還能夠指定是否採用基於 win32 窗口消息的 IPC 通訊機制。服務器
ipc_request 類的繼承關係圖以下:網絡
ipc_request 的類對象在獨立的子線程中運行,運行結果經過 ipc_client 通道給非阻塞的主線程發送消息(對於win2的窗口過程,能夠發送窗口消息,此時 ipc_service 的類對象將會經過 win32_proc 虛方法接收消息)。另外,ipc_request 的兩個重載方法: run(ipc_client*) 和 run(HWND) 都是在子線程中運行的。併發
程序執行的流程爲:app
1)在主線程中建立 ipc_service 對象實例,並監聽某一端口:ipc_service::open(aio_handle*, const char* addr="127.0.0.1:0") ---> 主線程進行異步事件循環過程:while (true) { aio_handle::check(); }框架
2)在主線程中建立 ipc_request 對象實例,並建立須要在子線程中處理的阻塞式任務 ---> 在主線程 ipc_service 的對象中調用 ipc_service 的異步請求方法 request(ipc_request*) 將阻塞請求任務 ipc_request 傳遞給子線程dom
3)在子線程中調用 ipc_request::run 阻塞過程處理用戶的請求異步
4)在子線程中調用 ipc_client::send_message 方法將結果數據以 ipc 消息的方式通知主線程
5)在主線程中接收到來自於子線程的結果處理消息後,將結果傳遞給用戶過程
如下以 dns_service 爲例,詳細講解 ipc_service 與 ipc_request 在主線程和子線程中的行爲過程:
1)主線程中 dns_service 的主要代碼以下:
dns_service::dns_service(int nthread /* = 1 */, bool win32_gui /* = false */) : ipc_service(nthread, win32_gui) { } dns_service::~dns_service() { } // 主線程的 dns_service 對象接收到子線程消息的 ipc 鏈接請求時的回調函數 void dns_service::on_accept(aio_socket_stream* client) { // 建立 ipc_client 消息通道,同時用異步鏈接流做爲通信對象 ipc_client* ipc = NEW dns_ipc(this); ipc->open(client); // 添加消息回調對象 ipc->append_message(IPC_RES); // 異步等待來自於子線程的消息 ipc->wait(); } #ifdef WIN32 // 當接收 WIN32 窗口消息時的回調過程 void dns_service::win32_proc(HWND hWnd, UINT nMsg, WPARAM wParam, LPARAM lParam) { if (nMsg != IPC_RES + WM_USER) { logger_error("invalid nMsg(%d)", nMsg); return; } else if (lParam == 0) { logger_error("lParam invalid"); return; } DNS_IPC_DATA* dat = (DNS_IPC_DATA*) lParam; dns_res* res = dat->res; on_result(*res); delete res; // 在採用 WIN32 消息時該對象空間是動態分配的,因此須要釋放 acl_myfree(dat); } #endif // 用戶調用此函數查詢某個域名,須要查詢的域名及域名查詢結果 // 均在 dns_result_callback 對象中記錄 void dns_service::lookup(dns_result_callback* callback) { // 若是發現有針對相同域名的查詢過程,只須要將本查詢過程 // 下相同域名的查詢過程合併,加入原來的查詢過程的列表中, // 減小針對同一域名的查詢次數 std::list<dns_result_callback*>::iterator it; const char* domain = callback->get_domain().c_str(); for (it= callbacks_.begin(); it != callbacks_.end(); it++) { if ((*it)->get_domain() == domain) { callbacks_.push_back(callback); return; } } callbacks_.push_back(callback); ipc_request* req = NEW dns_request(domain); // 調用基類 ipc_service 請求過程 request(req); } // 查詢結果的回調函數 void dns_service::on_result(const dns_res& res) { std::list<dns_result_callback*>::iterator it, next; it= callbacks_.begin(); for (; it != callbacks_.end();) { next = it; next++; if ((*it)->get_domain() == res.domain_.c_str()) { // 通知請求對象的解析結果 (*it)->on_result((*it)->get_domain(), res); (*it)->destroy(); // 調用請求對象的銷燬過程 callbacks_.erase(it); it = next; } else it++; } }
2)在 dns_service::on_accept 函數中建立 ipc_client 通道的子類定義以下:
class dns_ipc : public ipc_client { public: dns_ipc(dns_service* server) : server_(server) { } ~dns_ipc() { } // 在主線程接收到來自於子線程的消息時的回調函數 // 由於在 dns_service::on_accept 中將該類對象 // 與相應的消息號進行了綁定 virtual void on_message(int nMsg acl_unused, void* data, int dlen acl_unused) { if (nMsg != IPC_RES) { logger_error("invalid nMsg(%d)", nMsg); this->close(); return; } // 轉換子線程傳來的數據內容 DNS_IPC_DATA* dat = (DNS_IPC_DATA*) data; dns_res* res = dat->res; // 調用主線程中的結果接收過程 server_->on_result(*res); delete res; } protected: virtual void on_close() { // 由於該類對象在 dns_service::on_accept 中是動態建立的, // 所須要當 ipc_client 通道關閉時須要自行釋放所佔內存 delete this; } private: dns_service* server_; };
3)dns_request 類定義以下(該類對象是在主線程中建立的,但其中的阻塞方法:run 倒是在子線程中運行的):
// 由子線程傳遞給主線程的數據類型 struct DNS_IPC_DATA { dns_res* res; }; // 由子線程傳遞給主線程的消息號 typedef enum { IPC_RES }; /////////////////////////////////////////////////////////////////////// class dns_request : public ipc_request { public: dns_request(const char* domain) : domain_(domain) { } ~dns_request() { } // 當 dns_request 類對象在 dns_service::lookup 方法中經過 // request(ipc_request) 過程傳遞給子線程後,子線程便會鏈接 // 主線程中 dns_service 監聽的消息服務器地址,鏈接成功後調用 // dns_request::run 虛方法,同時將 ipc_client 通道傳入 virtual void run(ipc_client* ipc) { // 阻塞式查詢域名 ACL_DNS_DB* db = acl_gethostbyname(domain_.c_str(), NULL); // 將查詢結果放在自定義的結構中 data_.res = NEW dns_res(domain_.c_str()); if (db != NULL) { ACL_ITER iter; acl_foreach(iter, db) { ACL_HOSTNAME* hn = (ACL_HOSTNAME*) iter.data; data_.res->ips_.push_back(hn->ip); } acl_netdb_free(db); } // 向主線程發送結果 ipc->send_message(IPC_RES, &data_, sizeof(data_)); // 銷燬本類對象,由於其是動態分配的 delete this; } #ifdef WIN32 // 基類虛接口,使子線程能夠在執行完任務後向主線程發送 WIN32 窗口消息 virtual void run(HWND hWnd) { ACL_DNS_DB* db = acl_gethostbyname(domain_.c_str(), NULL); DNS_IPC_DATA* data = (DNS_IPC_DATA*) acl_mymalloc(sizeof(DNS_IPC_DATA)); data->res = NEW dns_res(domain_.c_str()); if (db != NULL) { ACL_ITER iter; acl_foreach(iter, db) { ACL_HOSTNAME* hn = (ACL_HOSTNAME*) iter.data; data->res->ips_.push_back(hn->ip); } acl_netdb_free(db); } // 向主線程發送結果 ::PostMessage(hWnd, IPC_RES + WM_USER, 0, (LPARAM) data); // 銷燬本類對象,由於其是動態分配的 delete this; } #endif private: string domain_; DNS_IPC_DATA data_; };
經過以上三個類的實現便完成了主線程的非阻塞過程與子線程的阻塞過程的結合。下面是一個具體的使用如上 dns_service 類的例子:
#ifdef WIN32 #include "acl/lib_acl.h" #else #include "lib_acl.h" #include <getopt.h> #endif #include <iostream> #include "aio_handle.hpp" #include "dns_service.hpp" using namespace acl; static void usage(const char* procname) { printf("usage: %s -h[help] -t[use thread]\n", procname); } static int __ncount = 0; class dns_result : public dns_result_callback { public: dns_result(const char* domain) : dns_result_callback(domain) { } ~dns_result() { } virtual void destroy() { delete this; __ncount--; } virtual void on_result(const char* domain, const dns_res& res) { std::cout << "result: domain: " << domain; if (res.ips_.size() == 0) { std::cout << ": null" << std::endl; return; } std::cout << std::endl; std::list<string>::const_iterator cit = res.ips_.begin(); for (; cit != res.ips_.end(); cit++) std::cout << "\t" << (*cit).c_str(); std::cout << std::endl; } }; int main(int argc, char* argv[]) { int ch, nthreads = 2; while ((ch = getopt(argc, argv, "ht:")) > 0) { switch (ch) { case 'h': usage(argv[0]); return (0); case 't': nthreads = atoi(optarg); break; default: break; } } acl_init(); aio_handle handle; const char* domain = "www.baidu.com"; dns_service* server = new dns_service(nthreads); // 使消息服務器監聽 127.0.0.1 的地址 if (server->open(&handle) == false) { delete server; std::cout << "open server error!" << std::endl; getchar(); return (1); } // 建立查詢結果接收對象,並進行查詢 dns_result* result = new dns_result(domain); server->lookup(result); __ncount++; result = new dns_result(domain); server->lookup(result); __ncount++; result = new dns_result(domain); server->lookup(result); __ncount++; domain = "www.sina.com"; result = new dns_result(domain); server->lookup(result); __ncount++; domain = "www.51iker.com"; result = new dns_result(domain); server->lookup(result); __ncount++; domain = "www.hexun.com"; result = new dns_result(domain); server->lookup(result); __ncount++; domain = "www.com.dummy"; result = new dns_result(domain); server->lookup(result); __ncount++; result = new dns_result(domain); server->lookup(result); __ncount++; result = new dns_result(domain); server->lookup(result); __ncount++; // 異步消息循環過程 while (true) { if (handle.check() == false) { std::cout << "stop now!" << std::endl; break; } if (__ncount == 0) break; } // 銷燬 dns_service 動態對象 delete server; // 作最後的清理工做,以釋放延遲釋放的鏈接對象 handle.check(); std::cout << "server stopped!" << std::endl; getchar(); return (0); }
示例代碼:samples/aio_dns
acl_cpp 下載:http://sourceforge.net/projects/acl/
原文地址:http://zsxxsz.iteye.com/blog/1482208
更多文章:http://zsxxsz.iteye.com/
QQ 羣:242722074