acl_cpp 非阻塞模塊的IPC通訊機制

      在 acl_cpp 的非阻塞框架的設計中,充分利用了操做系統平臺的高併發機制,同時簡化了異步編程的過程。可是,並非全部的操做都是非阻塞的,現實的程序應用中存在着大量的阻塞式行爲,acl_cpp 的非阻塞框架中設計了一種經過 ipc 模式使阻塞式函數與 acl_cpp 的非阻塞過程相結合的機制。便是說,在 acl_cpp 的主線程是非阻塞的,而把阻塞過程放在單獨的一個線程中運行,當阻塞線程運算完畢,會以 ipc 方式通知主線程運行結果,這樣就達到了阻塞與非阻塞相結合的模式。下圖展現了 ipc 類的繼承關係:ios

ipc 圖

 

      以上圖中有兩個類與 ipc 相關:ipc_server 及 ipc_client,其中 ipc_server 其實用於與監聽異步流相關,而ipc_client 與客戶端鏈接流相關。在 ipc_server 類中有一個 open 函數用來打開本地監聽地址,另有三個虛函數便於子類進行相關操做:編程

      1)on_accept:當監聽流得到一個客戶端鏈接時回調此函數,將得到的客戶端流傳遞給子類對象;服務器

      2)on_open:當用戶調用 ipc_server::open 函數,且監聽某一服務地址成功時經過該函數將實際的監聽地址傳遞給子類對象(由於在調用 open(addr) 時,addr 通常僅指定 IP 地址,同時將 端口賦 0 以便於操做系統自動分配本地端口號,因此經過 on_open 即可以將實際的 端口傳遞給子類對象);併發

      3)on_close:當監聽流關閉時調用此函數通知子類對象。app

 

      相對於 ipc_server 類,則 ipc_client 的接口就顯得比較多,主要的函數以下:框架

      1)open:共有四個 open 重載函數用鏈接監聽流服務地址,其中兩個是同步流方式,兩個是異步流方式,通常同步創建的流用在阻塞式線程中,異步創建的流用在非阻塞線程中;異步

      2)send_message:通常用來向非阻塞主線程發送消息;socket

      3)on_message:異步接收到阻塞線程發來的消息的回調函數;異步編程

      4)append_message:添加異步流想要接收的消息號。函數

      ipc_client 類比較特殊,其充當着雙重身份:1)做爲客戶端鏈接流鏈接監聽流的服務地址,通常用在阻塞式線程中;2)做爲監聽流接收到來自於客戶端流的鏈接請求而建立的與之對應的服務端異步流,通常用在非阻塞線程中。

 

      下面以一個具體的實例來講明若是使用 ipc_server 及 ipc_client 兩個類:

 

 

#include "lib_acl.h"
#include <iostream>
#include "aio_handle.hpp"
#include "ipc_server.hpp"
#include "ipc_client.hpp"

using namespace acl;

#define MSG_REQ		1
#define MSG_RES		2
#define MSG_STOP		3

// 消息客戶端鏈接類定義
class test_client1 : public ipc_client
{
public:
	test_client1()
	{

	}

	~test_client1()
	{

	}

	// 鏈接消息服務端地址成功後的回調函數
	virtual void on_open()
	{
		// 添加消息回調對象,接收消息服務器的
		// 此類消息
		this->append_message(MSG_RES);

		// 向消息服務器發送請求消息
		this->send_message(MSG_REQ, NULL, 0);

		// 異步等待來自於消息服務器的消息
		wait();
	}

	// 流關閉時的回調函數
	virtual void on_close()
	{
		delete this;
	}

	// 接收到消息服務器的消息時的回調函數,其中的消息號由
	// append_message 進行註冊
	virtual void on_message(int nMsg, void*, int)
	{
		std::cout << "test_client1 on message:" << nMsg << std::endl;

		// 向消息服務器發送消息,通知消息服務器中止
		this->send_message(MSG_STOP, NULL, 0);

		// 刪除在 on_open 中註冊的消息號
		this->delete_message(MSG_RES);

		// 本異步消息過程中止運行
		this->get_handle().stop();

		// 關閉本異步流對象
		this->close();
	}
protected:
private:
};

// 消息客戶端處理過程

static bool client_main(aio_handle* handle, const char* addr)
{
	// 建立消息鏈接
	ipc_client* ipc = new test_client1();
	
	// 鏈接消息服務器
	if (ipc->open(handle, addr, 0) == false)
	{
		std::cout << "open " << addr << " error!" << std::endl;
		delete ipc;
		return (false);
	}

	return (true);
}

// 子線程的入口函數
static void* thread_callback(void *ctx)
{
	const char* addr = (const char*) ctx;
	aio_handle handle;

	if (client_main(&handle, addr) == false)
	{
		handle.check();
		return (NULL);
	}

	// 子線程的異步消息循環
	while (true)
	{
		if (handle.check() == false)
			break;
	}

	// 最後清理一些可能未關閉的流鏈接
	handle.check();

	return (NULL);
}
// 消息服務器接收到的客戶端鏈接流類定義
class test_client2 : public ipc_client
{
public:
	test_client2()
	{

	}

	~test_client2()
	{

	}

	virtual void on_close()
	{
		delete this;
	}

	// 接收到消息客戶端發來消息的回調函數
	virtual void on_message(int nMsg, void*, int)
	{
		std::cout << "test_client2 on message:" << nMsg << std::endl;

		// 若是收到消息客戶端要求退出的消息,則主線程了退出
		if (nMsg == MSG_STOP)
		{
			this->close();

			// 通知主線程的非阻塞引擎關閉
			this->get_handle().stop();
		}
		else
			// 迴應客戶端消息
			this->send_message(MSG_RES, NULL, 0);
	}
protected:
private:
};

// 主線程的消息服務器 ipc 服務監聽類定義
class test_server : public ipc_server
{
public:
	test_server()
	{

	}

	~test_server()
	{

	}

	// 消息服務器接收到消息客戶端鏈接時的回調函數	
	void on_accept(aio_socket_stream* client)
	{
		// 建立 ipc 鏈接對象
		ipc_client* ipc = new test_client2();

		// 打開異步IPC過程
		ipc->open(client);

		// 添加消息回調對象
		ipc->append_message(MSG_REQ);
		ipc->append_message(MSG_STOP);
		ipc->wait();
	}
protected:
private:
};

static void usage(const char* procname)
{
	printf("usage: %s -h[help] -t[use thread]\n", procname);
}

int main(int argc, char* argv[])
{
	int   ch;
	bool  use_thread = false;

	while ((ch = getopt(argc, argv, "ht")) > 0)
	{
		switch (ch)
		{
		case 'h':
			usage(argv[0]);
			return (0);
		case 't':
			use_thread = true;
			break;
		default:
			break;
		}
	}

	acl_init();

	aio_handle handle;

	ipc_server* server = new test_server();

	// 使消息服務器監聽 127.0.0.1 的地址
	if (server->open(&handle, "127.0.0.1:0") == false)
	{
		delete server;
		std::cout << "open server error!" << std::endl;
		getchar();
		return (1);
	}

	char  addr[256];
#ifdef WIN32
	_snprintf(addr, sizeof(addr), "%s", server->get_addr());
#else
	snprintf(addr, sizeof(addr), "%s", server->get_addr());
#endif

	if (use_thread)
	{
		// 使消息客戶端在子線程中單獨運行
		acl_pthread_t tid;
		acl_pthread_create(&tid, NULL, thread_callback, addr);
	}

	// 由於消息客戶端也是非阻塞過程,因此也能夠與消息服務器
	// 在同一線程中運行
	else
		client_main(&handle, addr);

	// 主線程的消息循環過程
	while (true)
	{
		if (handle.check() == false)
		{
			std::cout << "stop now!" << std::endl;
			break;
		}
	}

	delete server;
	handle.check();  // 清理一些可能未關閉的異步流對象

	std::cout << "server stopped!" << std::endl;
	getchar();
	return (0);
}

 

 

      以上例子相對簡單,其展現的消息服務器與消息客戶端均是非阻塞過程,其實將上面的異步消息客戶端稍微一改即可以改爲同步消息客戶端了,修改部分以下:

 

 

class test_client3 : public ipc_client
{
public:
	test_client3()
	{

	}

	~test_client3()
	{

	}

	virtual void on_open()
	{
		// 添加消息回調對象
		this->append_message(MSG_RES);

		// 向消息服務器發送請求消息
		this->send_message(MSG_REQ, NULL, 0);

		// 同步等待消息
		wait();
	}

	virtual void on_close()
	{
		delete this;
	}

	virtual void on_message(int nMsg, void*, int)
	{
		std::cout << "test_client3 on message:" << nMsg << std::endl;
		this->send_message(MSG_STOP, NULL, 0);
		this->delete_message(MSG_RES);
		this->close();
	}
protected:
private:
};

// 子線程處理過程

static bool client_main(const char* addr)
{
	// 建立消息客戶端對象
	ipc_client* ipc = new test_client3();
	
	// 同步方式鏈接消息服務器
	if (ipc->open(addr, 0) == false)
	{
		std::cout << "open " << addr << " error!" << std::endl;
		delete ipc;  // 當消息客戶端未成功建立時須要在此處刪除對象
		return (false);
	}

	return (true);
}

static void* thread_callback(void *ctx)
{
	const char* addr = (const char*) ctx;

	if (client_main(addr) == false)
		return (NULL);
	return (NULL);
}

 

   對比 test_client1 與 test_client_3 兩個消息客戶端,能夠發現兩者區別並不太大,關鍵在於調用 open 時是採用了異步仍是同步鏈接消息服務器,其決定了消息客戶端是異步的仍是同步的。

 

      示例代碼:samples/aio_ipc

 

      我的微博:http://weibo.com/zsxxsz

      acl_cpp 下載:http://sourceforge.net/projects/acl/

      原文地址:http://zsxxsz.iteye.com/blog/1495832

      更多文章:http://zsxxsz.iteye.com/

      QQ 羣:242722074

相關文章
相關標籤/搜索