acl_cpp 的 rpc 相關類整合阻塞及非阻塞過程

1、概述html

      非阻塞網絡編程無疑成了高併發、高性能編程的代名詞,但現實應用編程中並非每種應用都須要採用非阻塞編程模式,由於這將大大增長編程的複雜性、開發週期以及出錯率,因此咱們寫的絕大部分網絡程序程序都是阻塞的,通常是一個進程一個網絡鏈接或一個線程一個網絡鏈接。即然非阻塞模式能夠實現高併發網絡鏈接,阻塞模式能夠實現複雜的業務邏輯,那是否有辦法將兩者結合起來呢?答案是確定的,其中在 acl_cpp 庫中,ipc 目錄下的模塊就是爲了知足這種需求而設計的。git

      在文章《acl_cpp 非阻塞模塊的IPC通訊機制》和《ipc_service 類:阻塞與非阻塞混合編程》中分別以兩種方式介紹了 如何經過 acl_cpp IPC 通訊庫來達到阻塞與非阻塞兩種編程模式的結合目的,但這篇文章講的內容仍是有點麻煩,用戶用也很難上手。因而本人在那兩篇文章所講的庫的基礎上進一步抽象,實現了另一個阻塞與非阻塞結合的庫。程序員

 

2、接口設計github

      在 rpc.hpp 頭文件中有兩個類:rpc_service 和 rpc_request,其中 rpc_request 是一個純虛類,用戶須要繼承此類並實現該類中規定的純虛接口,從而實現本身的阻塞操做功能;rpc_service 是阻塞與非阻塞結合的粘合類,經過將 rpc_request 子類實例傳遞給 rpc_service 類實例的 rpc_fork 方法,實現將阻塞請求過程交給子線程處理的目的。數據庫

     一、rpc_service 類編程

     1.一、構造函數:服務器

 

/**
	 * 構造函數
	 * @param nthread {int} 若是該值 > 1 則內部自動採用線程池,不然
	 *  則是一個請求一個線程
	 * @param win32_gui {bool} 是不是窗口類的消息,若是是,則內部的
	 *  通信模式自動設置爲基於 WIN32 的消息,不然依然採用通用的套接
	 *  口通信方式
	 */
	rpc_service(int nthread, bool win32_gui = false)

 

      從參數 win32_gui 能夠看出,acl_cpp 的阻塞/非阻塞結合模式不只能夠用在一般的網絡編程中,同時還能夠用在阻塞過程與 WINDOWS 界面消息相結合的方面,這無非是常常用 MFC 進行界面編程的福音(例如,用戶 VC 寫了一個界面程序---固然這個界面窗口是基於 WIN32 消息的,但若是想進行一些數據庫操做或文件下載操做,對用戶而言阻塞式方法是很是容易實現這兩類需求的,但 WIN32 的界面過程不能堵在任何一個數據庫操做或文件下載過程,原來 VC 程序員一般的作法也是建立單獨的線程進行阻塞操做,而後經過給主窗口傳遞消息將操做結果通知至主線程,幸運的是 acl_cpp 的 rpc 相關類可使這一過程更爲方便快捷;再如,你寫的一個網絡服務器程序的主線程是非阻塞的,但其中你不得不調用別人提供的庫以實現用戶身份驗證的功能,同時這個用於用戶認證的庫又偏偏是阻塞的---通常也是如此,當然,你也許能夠費很周折實現這一過程,一樣,acl_cpp 的 rpc 相關類能夠幫你解決這類問題)。網絡

      1.二、將阻塞處理過程的對象交由子線程處理:併發

 

/**
	 * 主線程中運行:將請求任務放入子線程池的任務請求隊列中,當線程池
	 * 中的一個子線程接收到該任務後便調用 rpc_request::rpc_run 方法調
	 * 用子類的方法,當任務處理完畢後給主線程發消息,在主線程中再調用
	 * rpc_request::rpc_callback
	 * @param req {rpc_request*} rpc_request 子類實例,非空
	 */
	void rpc_fork(rpc_request* req);

 

       經過此接口,能夠將阻塞請求過程交給子線程處理,子線程處理完後再通知主線程。異步

 

      二、rpc_request 類

      在類 rpc_request 中有三個虛接口,用戶子類必須實現其中的兩個純虛接口:rpc_run 和 rpc_onover,同時用戶能夠根據須要實現另外一非純虛接口:rpc_wakeup。是當用戶調完 rpc_service::rpc_fork 且子線程接收到此請求任務後 rpc_request::rpc_run 方法會被調用(必定切記:rpc_fork 是在主線程中被調用的,而 rpc_run 是在子線程中被調用的);當 rpc_request::rpc_run 函數返回後,rpc_reuqest::rpc_onover 會在主線程中被調用以表示子線程已經處理完畢(一樣須要嚴重注意:rpc_request::rpc_onover 方法又回到主線程中被調用),這樣,經過這兩個過程就實現了將阻塞過程放在子線程中處理,主線程的非阻塞過程(非阻塞網絡事件或非阻塞的 WIN32 消息過程)異步地等待子線程完成任務。

 

/**
	 * 在子線程中被調用,子類必須實現此接口,用於處理具體任務
	 */
	virtual void rpc_run(void) = 0;

	/**
	 * 在主線程中被調用,子類必須實現此接口,
	 * 當子線程處理完請求任務後該接口將被調用,因此該類對象只能
	 * 是當本接口調用後才能被釋放,禁止在調用本接口前釋放本類對象
	 */
	virtual void rpc_onover(void) = 0;

 

       另外,在 rpc_request 類中還有一個非純虛方法:rpc_wakeup,這是作什麼用的呢?能夠假設這種應用場景,在子線程中調用 rpc_request::rpc_run 方法內部過程當中,用戶若是須要通知主線程一些中間狀態(好比,文件下載的進度)該怎麼辦?那就在 rpc_run 方法內先調用 rpc_request::rpc_signal 通知主線程子線程處理的中間狀態,則在主線程中用戶實現的 rpc_request::rpc_wakeup 虛接口就會被調用。下面是 rpc_signal 和 rpc_wakeup 的接口說明:

 

/**
	 * 在子線程中被調用,內部自動支持套接口或 WIN32 窗口消息
	 * 子類實例的 rpc_run 方法中能夠屢次調用此方法向主線程的
	 * 本類實例發送消息,主線程中調用本對象 rpc_wakeup 方法
	 * @param ctx {void*} 傳遞的參數指針,通常應該是動態地址
	 *  比較好,這樣能夠避免同一個參數被重複覆蓋的問題
	 */
	void rpc_signal(void* ctx);

	/**
	 * 虛接口:當子線程調用本對象的 rpc_signal 時,在主線程中會
	 * 調用本接口,通知在任務未完成前(即調用 rpc_onover 前)收到
	 * 子線程運行的中間狀態信息;內部自動支持套接口或 WIN32 窗口
	 * 消息;應用場景,例如,對於 HTTP 下載應用,在子線程中能夠
	 * 一邊下載,一邊向主線程發送(調用 rpc_signal 方法)下載進程,
	 * 則主線程會調用本類實例的此方法來處理此消息
	 */
	virtual void rpc_wakeup(void* ctx) { (void) ctx; }

 

       緊接着這個應用場景,假設在子線程調用 rpc_run 的內部經過 rpc_signal 通知主線程的中間狀態後,但願主線程能收到此通知消息而且但願獲得主線程下一步但願執行的指令纔會進一步繼續執行。因而便有了 rpc_request::cond_wait 和 rpc_request::cond_signal 兩個方法的產生,即子線程經過 cond_wait 阻塞地等待主線程的下一步操做指令,主線程則調用 cond_signal 通知子線程下步指令,下面是這兩個方法的說明:

 

/**
	 * 當子線程調用 rpc_signal 給主線程後,調用本方法能夠等待
	 * 主線程發來下一步指令
	 * @param timeout {int} 等待超時時間(毫秒),當該值爲 0 時
	 *  則採用非阻塞等待模式,當該值爲 < 0 時,則採用徹底阻塞
	 *  等待模式(即一直等到主線程發送 cond_signal 通知),當該
	 *  值 > 0 時,則等待的最大超時時間爲 timeout 毫秒
	 * @return {bool} 返回 true 表示收到主線程發來的通知信號,
	 *  不然,須要調用 cond_wait_timeout 判斷是不是超時引發的
	 */
	bool cond_wait(int timeout = -1);

	/**
	 * 在子線程中被調用,內部自動支持套接口或 WIN32 窗口消息
	 * 子類實例的 rpc_run 方法中能夠屢次調用此方法向主線程的
	 * 本類實例發送消息,主線程中調用本對象 rpc_wakeup 方法
	 * @param ctx {void*} 傳遞的參數指針,通常應該是動態地址
	 *  比較好,這樣能夠避免同一個參數被重複覆蓋的問題
	 */
	void rpc_signal(void* ctx);

 

 

3、示例

      若是您能大致明白上面有關 rpc_service 和 rpc_request 類的功能說明,相信下面的例子您也必定能看明白:

 

// rpc_download.cpp : 定義控制檯應用程序的入口點。
//

#include "stdafx.h"
#include <assert.h>
#include "lib_acl.hpp"

using namespace acl;

typedef enum
{
	CTX_T_CONTENT_LENGTH,
	CTX_T_PARTIAL_LENGTH,
	CTX_T_END
} ctx_t;

struct DOWN_CTX 
{
	ctx_t type;
	int length;
};

static int __download_count = 0;

class http_download : public rpc_request
{
public:
	http_download(aio_handle& handle, const char* addr, const char* url)
		: handle_(handle)
		, addr_(addr)
		, url_(url)
		, error_(false)
		, total_read_(0)
		, content_length_(0)
	{}
	~http_download() {}
protected:

	// 子線程處理函數
	void rpc_run()
	{
		http_request req(addr_);  // HTTP 請求對象

		// 設置 HTTP 請求頭信息
		req.request_header().set_url(url_.c_str())
			.set_content_type("text/html")
			.set_host(addr_.c_str())
			.set_method(HTTP_METHOD_GET);

		// 測試用,顯示 HTTP 請求頭信息內容
		string header;
		req.request_header().build_request(header);
		printf("request: %s\r\n", header.c_str());

		// 發送 HTTP 請求數據
		if (req.request(NULL, 0) == false)
		{
			printf("send request error\r\n");
			error_ = false;
			return;
		}

		// 得到 HTTP 請求的鏈接對象
		http_client* conn = req.get_client();
		assert(conn);
		DOWN_CTX* ctx = new DOWN_CTX;
		ctx->type = CTX_T_CONTENT_LENGTH;

		// 得到 HTTP 響應數據的數據體長度
		ctx->length = (int) conn->body_length();
		content_length_ = ctx->length;

		// 通知主線程
		rpc_signal(ctx);

		char buf[8192];
		while (true)
		{
			// 讀 HTTP 響應數據體
			int ret = req.get_body(buf, sizeof(buf));
			if (ret <= 0)
			{
				ctx = new DOWN_CTX;
				ctx->type = CTX_T_END;
				ctx->length = ret;
				// 通知主線程
				rpc_signal(ctx);
				break;
			}
			ctx = new DOWN_CTX;
			ctx->type = CTX_T_PARTIAL_LENGTH;
			ctx->length = ret;
			// 通知主線程
			rpc_signal(ctx);
		}
	}

	// 主線程處理過程,收到子線程任務完成的消息
	void rpc_onover()
	{
		printf("%s: read over now, total read: %d, content-length: %d\r\n",
			addr_.c_str(), total_read_, content_length_);

		// 當 HTTP 響應都完成時,通知主線程中止事件循環過程
		__download_count--;
		if (__download_count == 0)
			handle_.stop();
	}

	// 主線程處理過程,收到子線程的通知消息
	void rpc_wakeup(void* ctx)
	{
		DOWN_CTX* down_ctx = (DOWN_CTX*) ctx;
		switch (down_ctx->type)
		{
		case CTX_T_CONTENT_LENGTH:
			printf("%s: content-length: %d\r\n",
				addr_.c_str(), down_ctx->length);
			break;
		case CTX_T_PARTIAL_LENGTH:
			total_read_ += down_ctx->length;
			printf("%s: partial-length: %d, total read: %d\r\n",
				addr_.c_str(), down_ctx->length, total_read_);
			break;
		case CTX_T_END:
			printf("%s: read over\r\n", addr_.c_str());
			break;
		default:
			printf("%s: ERROR\r\n", addr_.c_str());
			break;
		}
		delete down_ctx;
	}
private:
	aio_handle& handle_;
	string addr_;
	string url_;
	bool  error_;
	int   total_read_;
	int   content_length_;
};

static void run(void)
{
	aio_handle handle;
	rpc_service* service = new rpc_service(10);  // 建立 rpc 服務對象

	// 打開消息服務器
	if (service->open(&handle) == false)
	{
		printf("open service error: %s\r\n", last_serror());
		return;
	}

	// 下載頁面內容

	http_download down1(handle, "www.sina.com.cn:80", "http://www.sina.com.cn/");
	service->rpc_fork(&down1);  // 發起一個阻塞會話過程
	__download_count++;

	http_download down2(handle, "www.hexun.com:80", "/");
	service->rpc_fork(&down2);  // 發起第二個阻塞會話過程
	__download_count++;

	// 異步事件循環過程
	while (true)
	{
		if (handle.check() == false)
			break;
	}

	delete service;
	handle.check(); // 保證釋放全部延遲關閉的異步對象
}

int main(void)
{
#ifdef WIN32
	acl_cpp_init();
#endif

	run();
	printf("Enter any key to continue\r\n");
	getchar();
	return 0;
}

 

我的微博:http://weibo.com/zsxxsz

參考:

1)acl_cpp 非阻塞模塊的IPC通訊機制

2)ipc_service 類:阻塞與非阻塞混合編程

3)更多文章:http://zsxxsz.iteye.com/

4)acl 下載:

http://sourceforge.net/projects/acl/

svn:svn checkout svn://svn.code.sf.net/p/acl/code/trunk acl-code

github:https://github.com/zhengshuxin/acl

5)  QQ 羣:242722074

相關文章
相關標籤/搜索