ACE中的Proactor介紹和應用實例

把這兩天作Proactor的一些經驗和心得寫一下,可能會給一些人幫助。編程

    Proactor是異步模式的網絡處理器,ACE中叫作「前攝器」。服務器

    先講幾個概念:網絡

    前攝器(Proactor)-異步的事件多路分離器、處理器,是核心處理類。啓動後由3個線程組成(你不須要關心這三個線程,我只是讓你知道一下有這回事存在)。異步

    接受器(Acceptor)-用於服務端,監聽在一個端口上,接受用戶的請求。async

    鏈接器(Connector)-用於客戶端,去鏈接遠程的監聽。固然,若是遠程是ACE寫的,就是Acceptor。函數

    異步模式-即非阻塞模式。網絡的傳輸速度通常來說爲10Mbps、100Mbps、1000Mbps。拿千兆網來講,實際的傳輸速度爲1000Mbps/8大概爲128KB左右。咱們的CPU通常爲P4 3.0GHZ,若是是32位的處理器,一秒鐘大概能夠處理6G的字節,那麼,128KB的網絡速度是遠遠及不上處理器的速度的。網絡發送數據是一位一位發送出去的,若是CPU等在這裏,發送完成函數才結束,那麼,處理器浪費了大量時間在網絡傳輸上。this

    操做系統提供了異步的模式來傳輸網絡數據,工做模式即:應用程序把要發送的數據交給操做系統,操做系統把數據放在系統緩衝區後就告訴應用程序OK了,我幫你發,應用程序該幹嗎幹嗎去。操做系統發送完成後,會給應用系統一個回執,告訴應用程序:剛纔那個包發送完成了!操作系統

   舉個例子:你有幾封郵件和包裹要發,最有效率的辦法是什麼?你把郵件和包裹及交給總檯,總檯MM說,好了,你幫你發,你忙去吧!而後你去工做了。過了一會,總檯MM打電話告訴你:「剛纔我叫快遞公司的人來了,把你的包裹發出去了。郵局的人也來了,取走了郵件,放心好了」。一樣,若是你知道今天會有包裹來,好比你在淘寶上購物了,你能整天等在總檯?你應該告訴總檯MM:「今天可能有個人一個快遞,你幫我收一下,晚上請你肯德基!」。MM:「看在肯得基的面子上,幫你收了」。某個時間,MM打電話來了:「帥哥,你的包裹到了,我幫你簽收了,快來拿吧。」線程

   由於操做系統是頗有效率的,全部,他在後臺收發是很快的。應用程序也很簡單。Proactor就是這種異步模式的。Proactor就是總檯MM;ACE_Service_Handle就是總檯代爲收發郵件的公司流程。指針


咱們看一個實例:


//***********************************************************

class TPTCPAsynchServerImpl : public ACE_Service_Handler

{

public:

 TPTCPAsynchServerImpl(void);

 ~TPTCPAsynchServerImpl(void);


 virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); 

 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);


 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);


 virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);


private:

 int initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result);


 ACE_Asynch_Read_Stream rs_;

 ACE_Asynch_Write_Stream ws_;


};



這個例子從ACE_Service_Handler繼承過來,ACE_Service_Handle主要就是定義了一些回調函數。

一、 virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);

  當有客戶端鏈接上來,鏈接創建成功後Proactor會調用這個方法。


二、 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);

當用戶要讀的數據讀好了後,調用這個方法


三、virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);

當用戶要寫的數據在網卡上發送成功後,Proactor會回調這個方法


四、 virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);

當用戶設定的時鐘到期了,這個方法會被調用。


這跟和總檯MM的聯絡方法是否是同樣的?


對還缺點東西,缺乏怎麼向總檯MM交待任務的方法。下面看看:


首先,建立一個監聽器。


 ACE_Asynch_Acceptor<TPTCPAsynchServerImpl> acceptor_;

看到沒,就是咱們剛纔寫的類,由於他繼承了回調接口,並實現了自已的代碼,模板中ACE_Asynch_Acceptor會在合適的時候回調這些方法。


//建立一個地址對象

 ACE_INET_Addr addr(port, ip);

acceptor_.open (addr, 8 * 1024, 1);

    Open後,就開始監聽了。其它的,向Proactor註冊一些事件的事模板類中都替你作了,你不須要作不少事。

    那麼,已經開始監聽了,個人程序從哪裏開始呢?對於一個服務程序來說,程序是被用戶的鏈接驅動的,一個用戶程序想和通信,必須先建立鏈接,就是Socket中的connect操做。這個操做Proactor會替咱們作一些工做,當鏈接建立完成後,上面講的Open方法會被調用,咱們看看Open方法中都有些什麼代碼:


void TPTCPAsynchServerImpl::open (ACE_HANDLE handle, ACE_Message_Block &message_block)

{

 ACE_DEBUG ((LM_DEBUG, "%N:%l:TPTCPAsynchServerImpl::open()... "));


 //構造讀流

 if (rs_.open (*this, handle) == -1)

 {

  ACE_ERROR ((LM_ERROR, "%N:%l: ", "TPTCPAsynchServerImpl::open() Error"));

  return;

    }


 //構造寫流

 if (ws_.open(*this, handle) == -1)

 {

  ACE_ERROR ((LM_ERROR, "%N:%l: ", "TPTCPAsynchServerImpl::open() Error"));

  return;

    }


 //獲取客戶端鏈接地址和端口

 ACE_INET_Addr addr; 

 ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(handle); 

 size_t addr_size=1; 

 ass.get_local_addrs(&addr,addr_size);


 this->server_->onClientConnect((int)handle, addr.get_ip_address(), addr.get_port_number());



 



 //若是客戶鏈接時同時提交了數據,須要僞造一個結果,而後呼叫讀事件

 if (message_block.length () != 0)

 {

 // ACE_DEBUG((LM_DEBUG, "message_block.length() != 0 "));

  // 複製消息塊

  ACE_Message_Block &duplicate =  *message_block.duplicate ();


  // 僞造讀結果,以便進行讀完成回調

  ACE_Asynch_Read_Stream_Result_Impl *fake_result =

        ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (),

                                                                     this->handle_,

                                                                     duplicate,

                                                                     1024,

                                                                     0,

                                                                     ACE_INVALID_HANDLE,

                                                                     0,

                                                                     0);


  size_t bytes_transferred = message_block.length ();


  // Accept事件處理完成,wr_ptr指針會被向前移動,將其移動到開始位置

  duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);


  // 這個方法將調用回調函數

  fake_result->complete (message_block.length (), 1, 0);


  // 銷燬僞造的讀結果

  delete fake_result;

 }



 // 不然,通知底層,準備讀取用戶數據

 //建立一個消息塊。這個消息塊將用於從套接字中異步讀 

 ACE_Message_Block *mb = 0;

  ACE_NEW (mb, ACE_Message_Block (_bufSize));


 if (rs_.read (*mb, mb->size () - 1) == -1)

 {

  delete mb;

  ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));

  return;

 }

}


 


咱們看到,首先建立了兩個流,就是前面類定義中定義的一個異步寫流,一個異步讀流。之後對網絡的讀和寫就經過這兩個流進行。我還給出了一段讀客戶端地址和端口的代碼。而後是讀取客戶Connect可能附帶的數據,那段代碼不用看懂,之後使用照抄就行。而後就是


 if (rs_.read (*mb, mb->size () - 1) == -1)

 {

  delete mb;

  ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));

  return;

 }


這段代碼使用讀流讀一段數據。這段代碼就是向總檯MM交待:我要收包裹,收好了叫我!

也就是說,這段代碼99%的多是讀不出數據的,只是向Proactor註冊讀的事件,具體的等待、讀取操做由Proactor讀,讀到了,就回調Handle_Read_Stream方法。ACE_Message_Block是消息塊,數據就是存放在消息塊中的。

下面看看Handle_Read_Stream方法的代碼:


void TPTCPAsynchServerImpl::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)

{

 result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '/0';


  ACE_DEBUG ((LM_DEBUG, "********************/n"));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_to_read", result.bytes_to_read ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "handle", result.handle ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "bytes_transfered", result.bytes_transferred ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "act", (u_long) result.act ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "success", result.success ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "completion_key", (u_long) result.completion_key ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d/n", "error", result.error ()));

  ACE_DEBUG ((LM_DEBUG, "********************/n"));


 result.message_block().release();


 if (this->initiate_read_stream (result) == -1)

 {

  ACE_ERROR((LM_ERROR, "%N:%l:read stream failed!connection closed, remove it:%d/n", result.handle()));

  closeConnection(result.handle());

 } 

}


 


這個函數被調用,就代表有數據已經讀好了,包裹已經在總檯了。Proactor比總檯MM還好,給你送上門了,數據就在Result裏,上面演示了Result中的數據。而後把消息塊釋放了,而後調用initiate_read_stream繼續監聽網絡上可能到來的數據。看看initiate_read_stream好了:


int TPTCPAsynchServerImpl::initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result)

{

 ACE_DEBUG((LM_TRACE, "%N:%l:TPTCPAsynchServerImpl::initiate_read_stream() "));

 //建立一個消息塊。這個消息塊將用於從套接字中異步讀 

 ACE_Message_Block *mb = new ACE_Message_Block(_bufSize);

 if (mb == NULL)

 {

  ACE_DEBUG((LM_ERROR, "%N:%l:can't allock ACE_Message_Block.  ")); 

  return -1;

 }

 


 if (rs_.read (*mb, mb->size () - 1) == -1)

 {

  delete mb;

  ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:rs->read() failed, clientID=%d ", result.handle()),  -1);

 }


 return 0;

}


 


代碼很簡單,就是建立一個新的消息塊,而後使用讀流注冊一個讀消息就能夠了。


到此爲止,Proactor的讀流程很清楚了吧?


下面再說一個寫流程。


寫流程其實更簡單,在任意想向客戶端寫數據的地方,調用相應代碼就好了,好比,咱們提供了SendData方法來發送數據,在任意想發送數據的地方調用SendData就好了,SendData的代碼以下:


int TPTCPAsynchServerImpl::sendData(int clientID, const char *data, int dataLen, unsigned int &id)

{

 ACE_DEBUG((LM_DEBUG, "TPTCPAsynchServerImpl::sendData(void) "));


 ACE_Message_Block *mb; 

 ACE_NEW_RETURN(mb, ACE_Message_Block(dataLen + 1), -1);


 mb->wr_ptr((char*)data);                  

 ACE_OS::memcpy(mb->base(),(char*)data, dataLen);


 id = GlobleSingleton::instance()->getIndex();

 mb->msg_type((int)id);


 //向操做系統發送數據

 if (connection->ws->write (*mb , dataLen ) == -1)

 {

  ACE_ERROR_RETURN((LM_ERROR, "%N:%l:sendData failed! clientID=%d ", clientID),-1);

 }


 return 0;

}


 


簡單說,就是建立了一個消息塊,把用戶數據拷貝進來,而後調用寫流WS向Proactor發送一個Write事件就能夠了,發送成功後,Handle_write_handle會被調用,看一下:


void

TPTCPAsynchServerImpl::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)

{

  ACE_DEBUG ((LM_DEBUG,

              "handle_write_stream called "));


  // Reset pointers.

  result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());


  ACE_DEBUG ((LM_DEBUG, "******************** "));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "bytes_to_write", result.bytes_to_write ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "handle", result.handle ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "bytes_transfered", result.bytes_transferred ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "act", (u_long) result.act ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "success", result.success ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "completion_key", (u_long) result.completion_key ()));

  ACE_DEBUG ((LM_DEBUG, "%s = %d ", "error", result.error ()));

  ACE_DEBUG ((LM_DEBUG, "******************** "));

#if 0

  ACE_DEBUG ((LM_DEBUG, "%s = %s ", "message_block", result.message_block ().rd_ptr ()));

#endif


  // Release message block.

  result.message_block ().release ();

}


 


代碼中使用了result中發數據,而後把消息塊釋放了,就這麼簡單。



////////////////////////////////////////////////////////////////////////////////////////////////////


這是簡單的proactor用法,固然,複雜也基本就這樣用。所謂不基本的不是Proactor的內容,而是服務器編程自己的麻煩。好比說,多個鏈接的管理、重發機制、發送隊列等等,這都不是ACE的內容。這些要你們本身思考了,並添加。


在這裏,我要說幾個重要的問題:鏈接的管理。Acceptor是一個類,可是在每個鏈接,Proactor都用了某種辦法建立了一個實例,因此,鏈接管理的羣集類必定不能在Acceptor類中,否則獲得的結果就是始終只有一條記錄。由於每一個Acceptor都有一個實例,實例對應一個鏈接,羣集類也就每一個實例一個了。要採起的方法是一個全局的容器對象就能夠了。好比我這個類:


typedef ACE_Map_Manager <ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionMap;

typedef ACE_Map_Iterator<ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionIterator;

typedef ACE_Map_Entry   <ACE_HANDLE, ConnectionBean *> ConnectionEntry;


class Globle

{

public:

 Globle(void);

 ~Globle(void);


 ITPServer* server_;

 ConnectionMap _connections;


 unsigned getIndex(void); 

 long getTimerId(void);


private:

 unsigned int index_;


 long timerId_;

};


typedef ACE_Singleton<Globle, ACE_Null_Mutex> GlobleSingleton;


 


我使用ACE的Singleton模板建立這個類,每個Acceptor要使用ConnectionMap,都使用這裏的_connections,方法以下 :

  GlobleSingleton::instance()->connection.bind()......


這個問題但是我花費了2天時間找出來的,諸位同仁不可不戒啊,給點掌聲:)

相關文章
相關標籤/搜索