非阻塞網絡編程實例講解

1、概述    ios

      acl 庫的 C 庫(lib_acl) 的 aio 模塊設計了完整的非阻塞異步 IO 通訊過程,在 acl 的C++庫(lib_acl_cpp) 中封裝並加強了異步通訊的功能,本文主要描述了 acl C++ 庫之非阻塞IO庫的設計及使用方法,該異步流的設計思路爲:異步流類與異步流接口類,其中異步流類對象完成網絡套接口監聽、鏈接、讀寫的操做,異步流接口類對象定義了網絡讀寫成功/超時回調、鏈接成功回調、接收客戶端鏈接回調等接口;用戶在進行異步編程時,首先必須實現接口類中定義的純方法,而後將接口類對象在異步流對象中進行註冊,這樣當知足接口類對象的回調條件時 acl_cpp 的異步框架便自動調用用戶定義的接口方法。編程

 

      在 acl_cpp 中異步流的類繼續關係以下圖所示:
異步流類繼承關係圖服務器

 

      由上圖能夠看出,基類 aio_stream 中定義了流關閉,註冊/取消流關閉回調和流超時回調等基礎方法;aio_istream 和 aio_ostream 分別定義了異步流讀及寫的基本方法,aio_istream 中包含添加/刪除流讀成功回調接口類對象的方法,aio_ostream 中包含添加/刪除流寫成功回調接口類對象的方法;aio_socket_stream 類對象爲鏈接服務器成功後的客戶端流,或服務器接收到客戶端鏈接建立的客戶端鏈接流,其中定義了作爲鏈接流時遠程鏈接的方法及添加鏈接成功回調接口的方法;aio_listen_stream 類爲監聽流類,其中定義了監聽某個網絡地址(或UNIX下的域套接口地址)方法,以及註冊接收成功接口的方法。網絡

 

      acl_cpp 異步流接口類繼承關係圖以下圖:框架

異步流接口類

      異步流接口類的設計中:aio_accept_callback 爲監聽流的回調接口類,用戶應繼承該類以得到外來客戶端鏈接流,同時還須要定義繼承於 aio_callback 的類,用於得到網絡讀寫操做等結果信息;aio_open_callback 只有當客戶端鏈接遠程服務器時,用戶須要實現其子類得到鏈接成功的結果。異步

 

2、實例socket

一、異步服務器異步編程

 

 

#include <iostream>
#include <assert.h>
#include "aio_handle.hpp"
#include "aio_istream.hpp"
#include "aio_listen_stream.hpp"
#include "aio_socket_stream.hpp"

using namespace acl;

/**
 * 異步客戶端流的回調類的子類
 */
class io_callback : public aio_callback
{
public:
	io_callback(aio_socket_stream* client)
		: client_(client)
		, i_(0)
	{
	}

	~io_callback()
	{
		std::cout << "delete io_callback now ..." << std::endl;
	}

	/**
	 * 實現父類中的虛函數,客戶端流的讀成功回調過程
	 * @param data {char*} 讀到的數據地址
	 * @param len {int} 讀到的數據長度
	 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流
	 */
	bool read_callback(char* data, int len)
	{
		i_++;
		if (i_ < 10)
			std::cout << ">>gets(i:" << i_ << "): " << data;

		// 若是遠程客戶端但願退出,則關閉之
		if (strncasecmp(data, "quit", 4) == 0)
		{
			client_->format("Bye!\r\n");
			client_->close();
		}

		// 若是遠程客戶端但願服務端也關閉,則停止異步事件過程
		else if (strncasecmp(data, "stop", 4) == 0)
		{
			client_->format("Stop now!\r\n");
			client_->close();  // 關閉遠程異步流

			// 通知異步引擎關閉循環過程
			client_->get_handle().stop();
		}

		// 向遠程客戶端回寫收到的數據

		client_->write(data, len);

		return (true);
	}

	/**
	 * 實現父類中的虛函數,客戶端流的寫成功回調過程
	 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流
	 */
	bool write_callback()
	{
		return (true);
	}

	/**
	 * 實現父類中的虛函數,客戶端流的超時回調過程
	 */
	void close_callback()
	{
		// 必須在此處刪除該動態分配的回調類對象以防止內存泄露
		delete this;
	}

	/**
	 * 實現父類中的虛函數,客戶端流的超時回調過程
	 * @return {bool} 返回 true 表示繼續,不然但願關閉該異步流
	 */
	bool timeout_callback()
	{
		std::cout << "Timeout ..." << std::endl;
		return (true);
	}

private:
	aio_socket_stream* client_;
	int   i_;
};

/**
 * 異步監聽流的回調類的子類
 */
class io_accept_callback : public aio_accept_callback
{
public:
	io_accept_callback() {}
	~io_accept_callback()
	{
		printf(">>io_accept_callback over!\n");
	}

	/**
	 * 基類虛函數,當有新鏈接到達後調用此回調過程
	 * @param client {aio_socket_stream*} 異步客戶端流
	 * @return {bool} 返回 true 以通知監聽流繼續監聽
	 */
	bool accept_callback(aio_socket_stream* client)
	{
		// 建立異步客戶端流的回調對象並與該異步流進行綁定
		io_callback* callback = new io_callback(client);

		// 註冊異步流的讀回調過程
		client->add_read_callback(callback);

		// 註冊異步流的寫回調過程
		client->add_write_callback(callback);

		// 註冊異步流的關閉回調過程
		client->add_close_callback(callback);

		// 註冊異步流的超時回調過程
		client->add_timeout_callback(callback);

		// 從異步流讀一行數據
		client->gets(10, false);
		return (true);
	}
};

int main(int argc, char* argv[])
{
	// 初始化ACL庫(尤爲是在WIN32下必定要調用此函數,在UNIX平臺下可不調用)
	acl_cpp_init();

	// 構建異步引擎類對象
	aio_handle handle(ENGINE_KERNEL);

	// 建立監聽異步流
	aio_listen_stream* sstream = new aio_listen_stream(&handle);
	const char* addr = "127.0.0.1:9001";

	// 監聽指定的地址
	if (sstream->open(addr) == false)
	{
		std::cout << "open " << addr << " error!" << std::endl;
		sstream->close();
		// XXX: 爲了保證能關閉監聽流,應在此處再 check 一下
		handle.check();

		getchar();
		return (1);
	}

	// 建立回調類對象,當有新鏈接到達時自動調用此類對象的回調過程
	io_accept_callback callback;
	sstream->add_accept_callback(&callback);
	std::cout << "Listen: " << addr << " ok!" << std::endl;

	while (true)
	{
		// 若是返回 false 則表示再也不繼續,須要退出
		if (handle.check() == false)
		{
			std::cout << "aio_server stop now ..." << std::endl;
			break;
		}
	}

	// 關閉監聽流並釋放流對象
	sstream->close();

	// XXX: 爲了保證能關閉監聽流,應在此處再 check 一下
	handle.check();

	return (0);
}

 

 

      簡要說明一下,上面代碼的基本思路是:函數

a) 建立異步通訊框架對象 aio_handle --> 建立異步監聽流 aio_listen_stream 並註冊回調類對象 io_accept_callback-->進入異步通訊框架的事件循環中;ui

b) 當接收到客戶端鏈接後,異步框架回調 io_accept_callback 類對象的 accept_callback 接口並將客戶端異步流輸入-->建立異步流接口類對象,並將該對象註冊至客戶端異步流對象中;

c) 當客戶端異步流收到數據時回調異步流接口中的 read_callback 方法 --> 回寫收到數據至客戶端;當客戶端流鏈接關閉時回調異步流接口中的close_callback --> 若是該接口類對象是動態建立的則須要手工 delete 掉;當接收客戶端數據超時時會回調異步流接口中的 time_callback,該函數若是返回 true 則表示但願異步框架不關閉該客戶端異步流,不然則關閉。

 

      異步監聽流的接口類的純虛函數:virtual bool accept_callback(aio_socket_stream* client)  須要子類實現,子類在該函數中得到客戶端鏈接異步流對象。

      客戶端異步流接口類 aio_callback 有四個虛函數:

      a) virtual bool read_callback(char* data, int len)  當客戶端異步流讀到數據時的回調虛函數;

      b) virtual bool write_callback() 當客戶端異步流寫數據成功後的回調虛函數;

      c) virtual void close_callback() 當異步流(客戶端流或監聽流)關閉時的回調虛函數;

      d) virtual bool timeout_callback() 當異步流(客戶端流在讀寫超時或監聽流在監聽超時)超時時的回調函數虛函數。

 

二、異步客戶端

 

#include <iostream>
#include <assert.h>
#include "string.hpp"
#include "util.hpp"
#include "aio_handle.hpp"
#include "acl_cpp_init.hpp"
#include "aio_socket_stream.hpp"

#ifdef WIN32
# ifndef snprintf
#  define snprintf _snprintf
# endif
#endif

using namespace acl;

typedef struct
{
	char  addr[64];
	aio_handle* handle;
	int   connect_timeout;
	int   read_timeout;
	int   nopen_limit;
	int   nopen_total;
	int   nwrite_limit;
	int   nwrite_total;
	int   nread_total;
	int   id_begin;
	bool  debug;
} IO_CTX;

static bool connect_server(IO_CTX* ctx, int id);

/**
 * 客戶端異步鏈接流回調函數類
 */
class client_io_callback : public aio_open_callback
{
public:
	/**
	 * 構造函數
	 * @param ctx {IO_CTX*}
	 * @param client {aio_socket_stream*} 異步鏈接流
	 * @param id {int} 本流的ID號
	 */
	client_io_callback(IO_CTX* ctx, aio_socket_stream* client, int id)
		: client_(client)
		, ctx_(ctx)
		, nwrite_(0)
		, id_(id)
	{
	}

	~client_io_callback()
	{
		std::cout << ">>>ID: " << id_ << ", io_callback deleted now!" << std::endl;
	}

	/**
	 * 基類虛函數, 當異步流讀到所要求的數據時調用此回調函數
	 * @param data {char*} 讀到的數據地址
	 * @param len {int} 讀到的數據長度
	 * @return {bool} 返回給調用者 true 表示繼續,不然表示須要關閉異步流
	 */
	bool read_callback(char* data, int len)
	{
		(void) data;
		(void) len;

		ctx_->nread_total++;

		if (ctx_->debug)
		{
			if (nwrite_ < 10)
				std::cout << "gets(" << nwrite_ << "): " << data;
			else if (nwrite_ % 2000 == 0)
				std::cout << ">>ID: " << id_ << ", I: "
					<< nwrite_ << "; "<<  data;
		}

		// 若是收到服務器的退出消息,則也應退出
		if (acl::strncasecmp_(data, "quit", 4) == 0)
		{
			// 向服務器發送數據
			client_->format("Bye!\r\n");
			// 關閉異步流鏈接
			client_->close();
			return (true);
		}

		if (nwrite_ >= ctx_->nwrite_limit)
		{
			if (ctx_->debug)
				std::cout << "ID: " << id_
					<< ", nwrite: " << nwrite_
					<< ", nwrite_limit: " << ctx_->nwrite_limit
					<< ", quiting ..." << std::endl;

			// 向服務器發送退出消息
			client_->format("quit\r\n");
			client_->close();
		}
		else
		{
			char  buf[256];
			snprintf(buf, sizeof(buf), "hello world: %d\n", nwrite_);
			client_->write(buf, (int) strlen(buf));

			// 向服務器發送數據
			//client_->format("hello world: %d\n", nwrite_);
		}

		return (true);
	}

	/**
	 * 基類虛函數, 當異步流寫成功時調用此回調函數
	 * @return {bool} 返回給調用者 true 表示繼續,不然表示須要關閉異步流
	 */
	bool write_callback()
	{
		ctx_->nwrite_total++;
		nwrite_++;

		// 從服務器讀一行數據
		client_->gets(ctx_->read_timeout, false);
		return (true);
	}

	/**
	 * 基類虛函數, 當該異步流關閉時調用此回調函數
	 */
	void close_callback()
	{
		if (client_->is_opened() == false)
		{
			std::cout << "Id: " << id_ << " connect "
				<< ctx_->addr << " error: "
				<< acl::last_serror();

			// 若是是第一次鏈接就失敗,則退出
			if (ctx_->nopen_total == 0)
			{
				std::cout << ", first connect error, quit";
				/* 得到異步引擎句柄,並設置爲退出狀態 */
				client_->get_handle().stop();
			}
			std::cout << std::endl;
			delete this;
			return;
		}

		/* 得到異步引擎中受監控的異步流個數 */
		int nleft = client_->get_handle().length();
		if (ctx_->nopen_total == ctx_->nopen_limit && nleft == 1)
		{
			std::cout << "Id: " << id_ << " stop now! nstream: "
				<< nleft << std::endl;
			/* 得到異步引擎句柄,並設置爲退出狀態 */
			client_->get_handle().stop();
		}

		// 必須在此處刪除該動態分配的回調類對象以防止內存泄露
		delete this;
	}

	/**
	 * 基類虛函數,當異步流超時時調用此函數
	 * @return {bool} 返回給調用者 true 表示繼續,不然表示須要關閉異步流
	 */
	bool timeout_callback()
	{
		std::cout << "Connect " << ctx_->addr << " Timeout ..." << std::endl;
		client_->close();
		return (false);
	}

	/**
	 * 基類虛函數, 當異步鏈接成功後調用此函數
	 * @return {bool} 返回給調用者 true 表示繼續,不然表示須要關閉異步流
	 */
	bool open_callback()
	{
		// 鏈接成功,設置IO讀寫回調函數
		client_->add_read_callback(this);
		client_->add_write_callback(this);
		ctx_->nopen_total++;

		acl::assert_(id_ > 0);
		if (ctx_->nopen_total < ctx_->nopen_limit)
		{
			// 開始進行下一個鏈接過程
			if (connect_server(ctx_, id_ + 1) == false)
				std::cout << "connect error!" << std::endl;
		}

		// 異步向服務器發送數據
		//client_->format("hello world: %d\n", nwrite_);
		char  buf[256];
		snprintf(buf, sizeof(buf), "hello world: %d\n", nwrite_);
		client_->write(buf, (int) strlen(buf));

		// 異步從服務器讀取一行數據
		client_->gets(ctx_->read_timeout, false);

		// 表示繼續異步過程
		return (true);
	}

protected:
private:
	aio_socket_stream* client_;
	IO_CTX* ctx_;
	int   nwrite_;
	int   id_;
};

static bool connect_server(IO_CTX* ctx, int id)
{
	// 開始異步鏈接遠程服務器
	aio_socket_stream* stream = aio_socket_stream::open(ctx->handle,
			ctx->addr, ctx->connect_timeout);
	if (stream == NULL)
	{
		std::cout << "connect " << ctx->addr << " error!" << std::endl;
		std::cout << "stoping ..." << std::endl;
		if (id == 0)
			ctx->handle->stop();
		return (false);
	}

	// 建立鏈接後的回調函數類
	client_io_callback* callback = new client_io_callback(ctx, stream, id);

	// 添加鏈接成功的回調函數類
	stream->add_open_callback(callback);

	// 添加鏈接失敗後回調函數類
	stream->add_close_callback(callback);

	// 添加鏈接超時的回調函數類
	stream->add_timeout_callback(callback);
	return (true);
}

int main(int argc, char* argv[])
{
	bool use_kernel = false;
	int   ch;
	IO_CTX ctx;

	memset(&ctx, 0, sizeof(ctx));
	ctx.connect_timeout = 5;
	ctx.nopen_limit = 10;
	ctx.id_begin = 1;
	ctx.nwrite_limit = 10;
	ctx.debug = false;
	snprintf(ctx.addr, sizeof(ctx.addr), "127.0.0.1:9001");

	acl_cpp_init();

	aio_handle handle(ENGINE_KERNEL);
	ctx.handle = &handle;

	if (connect_server(&ctx, ctx.id_begin) == false)
	{
		std::cout << "enter any key to exit." << std::endl;
		getchar();
		return (1);
	}

	std::cout << "Connect " << ctx.addr << " ..." << std::endl;

	while (true)
	{
		// 若是返回 false 則表示再也不繼續,須要退出
		if (handle.check() == false)
			break;
	}

	return (0);
}

 

     異步客戶端的基本流程爲:

     a) 建立異步框架對象 aio_handle --> 異步鏈接遠程服務器,建立鏈接成功/失敗/超時的異步接口類對象並註冊至異步鏈接流中 --> 異步框架進行事件循環中;

     b) 鏈接成功後,異步接口類對象中的 open_callback 被調用,啓動下一個異步鏈接過程(未達限制鏈接數前) --> 添加異步讀及異步寫的回調接口類  --> 異步寫入數據,同時開始異步讀數據過程;

     c) 當客戶端異步流收到數據時回調異步流接口中的 read_callback 方法 --> 回寫收到數據至客戶端;當客戶端流鏈接關閉時回調異步流接口中的 close_callback --> 若是該接口類對象是動態建立的則須要手工 delete 掉;當接收客戶端數據超時時會回調異步流接口中的 time_callback,該函數若是返回 true 則表示但願異步框架不關閉該客戶端異步流,不然則關閉。

 

    客戶端異步鏈接流的接口類 aio_open_callback 的純虛函數 virtual bool open_callback() 須要子類實現,在鏈接服務器成功後調用此函數,容許子類在該函數中作進一步操做,如:註冊客戶端流的異步讀回調接口類對象及異步寫回調類對象;若是鏈接超時或鏈接失敗而致使的關閉,則基礎接口類中的 timeout_callback() 或 close_callback() 將會被調用,以通知用戶應用程序。

 

3、小結

      以上的示例演示了基本的非阻塞異步流的監聽、讀寫、鏈接等過程,類的設計中也提供了基本的操做方法,爲了應對實踐中的多樣性及複雜性,acl_cpp 的異步流還設計了更多的接口和方法,如:延遲讀寫操做(這對於限流的服務器比較有用處)、定時器操做等。用戶能夠從以下地址下載完整的 acl 框架庫的拷貝:https://sourceforge.net/projects/acl/

      更多例子參見:lib_acl_cpp/samples/aio/ 目錄

      我的微博:http://weibo.com/zsxxsz

      acl 庫下載:http://sourceforge.net/projects/acl/

      原文地址:http://zsxxsz.iteye.com/blog/1474009

      更多文章:http://zsxxsz.iteye.com/

      QQ 羣:242722074

相關文章
相關標籤/搜索