boost.ASIO-多是下一代C++標準的網絡庫

曾幾什麼時候,Boost中有一個Socket庫,但後來沒有了下文,C++社區一直在翹首盼望一個標準網絡庫的出現,網絡上開源的網絡庫也有很多,例如Apache Portable Runtime就是比較著名的一個,也有像ACE這樣重量級的網絡框架。
去年,Boost將ASIO歸入了本身的體系,因爲Boost的影響力,ASIO有機會成爲標準網絡庫。做者Chris Kohlhoff以ASIO爲樣本向C++標準委員會提交了一個網絡庫建議書,裏面提到:
ASIO的覆蓋範圍:
 Networking using TCP and UDP, including support for multicast.
 Client and server applications.
 Scalability to handle many concurrent connections.
 Protocol independence between IPv4 and IPv6.
 Name resolution (i.e. DNS).
 Timers.
不在ASIO考慮範圍以內的:
 Protocol implementations such as HTTP, SMTP or FTP.
 Encryption (e.g. SSL, TLS).
 Operating system specific demultiplexing APIs.
 Support for realtime environments.
 QoS-enabled sockets.
 Other TCP/IP protocols such as ICMP.
 Functions and classes for enumerating network interfaces.
Boost.Asio支持如下平臺:
 Win32 using Visual C++ 7.1 and Visual C++ 8.0.
 Win32 using Borland C++Builder 6 patch 4.
 Win32 using MinGW.
 Win32 using Cygwin.
 Linux (2.4 or 2.6 kernels) using g++ 3.3 or later.
 Solaris using g++ 3.3 or later.
 Mac OS X 10.4 using g++ 3.3 or later.
 QNX Neutrino 6.3 using g++ 3.3 or later.
 FreeBSD using g++ 3.3 or later.
參考ACE的Proactor模式,ASIO採用異步通信機制,同時參考了Symbian C++ sockets API、Microsoft .NET socket classes和Open Group的Extended Sockets API。

 

usidc5 2011-01-18 23:01

Asio 是一個跨平臺的C++開發包用來處理網絡和低級I/O編程,經過先進的C++方法爲開發人員提供連續異步模型。
示例代碼:
  void handle_read(const asio::error_code& error,
      size_t bytes_transferred)
  {
    if (!error)
    {
      asio::async_write(socket_,
          asio::buffer(data_, bytes_transferred),
          make_custom_alloc_handler(allocator_,
            boost::bind(&session::handle_write,
              shared_from_this(),
              asio::placeholders::error)));
    }
  }

  void handle_write(const asio::error_code& error)
  {
    if (!error)
    {
      socket_.async_read_some(asio::buffer(data_),
          make_custom_alloc_handler(allocator_,
            boost::bind(&session::handle_read,
              shared_from_this(),
              asio::placeholders::error,
              asio::placeholders::bytes_transferred)));
    }
  }

 

usidc5 2011-06-25 15:36
boost真是個好東西,每次去逛總會有驚喜。此次的驚喜是發現了asio,一個跨平臺的支持異步I/O的網絡通信socket庫。

異步I/O是一種高效的I/O模型,在Windows平臺下這種機制的表明就是IOCP完成端口模型。事實上,asio在Windows平臺上的實現就是對IOCP的封裝。

其實在網絡通信這一塊,已經有許多成熟的框架了,最典型的就是ACE,一個網絡通信設計模式的集大成者。但ACE對我來講過重型了,並且其起源於90年代,與標準庫的集成不是太好,好比ACE就有本身的容器類。。。總而言之,ACE是一個龐然大物,威力無窮,但也顯得比較笨重。

C++發展到如今,庫的設計風格也愈來愈趨向於泛型,boost就是一個典型,並且boost社區跟C++標準委員會的密切關係,使得進入boost的程序庫有更大的機會加入下一代的C++標準庫。

所以無論從設計風格(我不否定我也喜歡追時髦;)),仍是從功利的角度看,學習asio都是一筆不錯的投資。

學習她,首先要安裝她。asio要求首先安裝boost,下面我把本身的安裝過程描述一遍,這確實仍是頗費心思的。

首先要先安裝boost,這但是一個漫長而又炎熱的夏天。。。萬幸的是我之前已經裝過了,嘿嘿。我裝的是boost_1_33_0,爲了完整說明,我這裏也簡單列了下boost的安裝步驟,這也是從網上找來的。

step1.從www.boost.org下載boost庫

step2 在 tools\build\jam_src目錄下 運行build.bat來生成jam

step3 設置環境變量(後面的%PATH%要加)

PATH=%boost的絕對路徑%\tools\build\jam_src\bin.ntx86;%PATH% 
PATH=%boost的絕對路徑%;%PATH%

For Visial Studio 6.0 
SET MSVC_ROOT="VC6的安裝路徑" 
SET VISUALC="VC6的安裝路徑" 
Example: 
SET MSVC_ROOT="c:\Program Files\Microsoft Visual Studio\VC98"

For Visual Studio.net 
SET VC7_ROOT="vs.NET安裝路徑" 
Example: 
SET VC7_ROOT="C:\Program Files\Microsoft Visual Studio .NET\VC7"

For Visual Studio.net 2003 
SET VC71_ROOT="vs.NET2003安裝路徑" 
Example: 
set VC71_ROOT="C:\Program Files\Microsoft Visual Studio .NET 2003\Vc7"

step 4 編譯boost庫 
bjam "-sTOOLS=%編譯器%" install
Visual Studio 6.0 %編譯器%=msvc 
Visual Studio .NET %編譯器%=vc7 
Visual Studio .NET 2003 %編譯器%=vc-7_1

我用的是VC7.1,照着這個指示,當時編譯了很久才完成。不過我在最後一步時,忘了加上install。這也沒什麼,你能夠在boost下新建一個lib文件夾,而後把bin目錄下全部的庫文件都拷貝進來,而後在你的編譯器裏進行適當的路徑設置(頭文件路徑,庫文件路徑),就能夠使用boost了。

安裝好boost後(我裝在了E:\boost_1_33_0),就能夠安裝asio了,先去http://asio.sourceforge.net下載,如今是最新版本0.3.8。
注意下載帶有boost前綴(boost_asio_0_3_8rc2.zip)的zip文件,解開後能夠看到兩個目錄:boost和libs。把boost裏面的全部文件拷貝到E:\boost_1_33_0\boost下面,注意裏面有個detail目錄不能直接覆蓋,而是要把其中的文件(identifier.hpp)拷貝到E:\boost_1_33_0\boost\detail中去;一樣把libs裏面的文件夾都拷貝到E:\boost_1_33_0\libs下,就能夠了。

好了,接下來就到了激動人心的時刻,讓咱們來開始編譯asio示例,我找了個asio自帶的最簡單的例子,關於同步定時器的,5秒後超時打印Hello, world!

#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

int main()
{
       boost::asio::io_service io;

       boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
       t.wait();

       std::cout << "Hello, world!\n";

       return 0;
}

先別管具體的代碼,開始編譯。。。果真不出所料,哪有那麼順暢的事情;)

正在編譯...
main.cpp
e:\boost_1_33_0\boost\asio\detail\push_options.hpp(103) : fatal error C1189: #error :       Multithreaded RTL must be selected.

哦,原來須要多線程庫,這好辦,在項目屬性裏:配置屬性 -> C/C++ -> 代碼生成 -> 運行時庫 ->多線程調試(/MTd),再試一下。

正在編譯...
main.cpp
Please define _WIN32_WINNT or _WIN32_WINDOWS appropriately
Assuming _WIN32_WINNT=0x0500 (i.e. Windows 2000 target)
正在連接...
LINK : fatal error LNK1104: 沒法打開文件「libboost_system-vc71-mt-sgd-1_33.lib」

好傢伙,一下出來倆。第一個是windows版本支持問題,我在項目屬性裏把宏_WIN32_WINNT加到預處理器中,如今編譯經過了,但連接仍是不行:

正在連接...
LINK : fatal error LNK1104: 沒法打開文件「libboost_system-vc71-mt-sgd-1_33.lib」

找不到system庫。我納悶了一會,由於asio主頁上明明寫着大部分功能只須要boost頭文件便可。所以我又照着asio主頁上的說明,把_BOOST_ALL_NO_LIB也加到預處理器中去,但仍是不行。

後來我又到下載的文件中去找,發現system是asio自帶的一個庫,要想使用asio,就必須先編譯這個庫,OMG~

我還沒單獨編譯過boost的一個庫,所以又去網上找了找,終於找到了,原來也不是很難。基本步驟仍是跟編譯整個boost同樣,只不過在最後一步時,要換成這樣:

bjam "-sTOOLS=%編譯器%" --with-system install

就能夠編譯system庫了。最後檢查下編譯器的頭文件和庫文件路徑是否正確,再從新試一遍,終於大功告成!

我懷着欣喜的心情開始測試asio自帶的tutorial程序,前面幾個關於定時器的運行的很正常,但到了後來測試daytime1的時候,連接又有問題了。

正在編譯...
main.cpp
f:\My-SmartWin-Demo\asio_demo\main.cpp(56) : warning C4267: 「參數」 : 從「size_t」轉換到「std::streamsize」,可能丟失數據
正在連接...
main.obj : error LNK2019: 沒法解析的外部符號 "public: static class boost::system::error_category __cdecl boost::system::error_code::new_category(int (__cdecl*)(class boost::system::error_code const &),class std::basic_string<char,struct std::char_traits<char>,class std::allocator<char> > (__cdecl*)(class boost::system::error_code const &),class std::basic_string<unsigned short,struct std::char_traits<unsigned short>,class std::allocator<unsigned short> > (__cdecl*)(class boost::system::error_code const &))" (?new_category@error_code@system@boost@@SA?AVerror_category@23@P6AHABV123@@ZP6A?AV?$basic_string@DU?$char_traits@D@std@@V?$allocator@D@2@@std@@0@ZP6A?AV?$basic_string@GU?$char_traits@G@std@@V?$allocator@G@2@@6@0@Z@Z) ,該符號在函數 _$E47 中被引用
Debug/asio_demo.exe : fatal error LNK1120: 1 個沒法解析的外部命令

前面那個警告無論,後面連接時說一個static函數new_category(...)只有聲明沒有實現,我找了下,這個函數在system庫裏error_code.hpp中有聲明,error_code.cpp也有實現,並且明明system庫我也編譯成功,並加入相關路徑中,怎麼仍是會出錯?

鬱悶了半天,後來乾脆把error_code.cpp加入到daytime1工程中一塊兒編譯,終於完全搞定了。

真是TMD不容易啊。

 

usidc5 2011-06-25 15:39

對於一個網絡程序的服務器端咱們須要提供的是服務器的address,和服務開放的端口號port。
在asio庫中首先咱們必須使用一個io_service類來支持全部的IO功能。須要注意到是咱們必須調用io_service_my.run()函數來開啓IO服務的事件循環以使功能都能被正常使用。
boost::asio::io_service io_service_my;
如今咱們能夠基於這個io_service_my來關聯構建一下幾個類:
1. boost::asio::ip::tcp::acceptor acceptor_my(io_service_my); 
由於LPD的實現是基於TCP傳輸協議,因此也使用了TCP的acceptor來接收client發來的鏈接。
2.  boost::asio::ip::tcp::resolver resolver_my(io_service_my);
boost::asio::ip::tcp::resolver::query query_my(address,port);
boost::asio::ip::tcp::endpoint endpoint_my = *resolver.resolve(query_my);
這幾個類主要是用來實現對地址的解析和綁定終端節點到相應的開放端口號上。首先構造一個關聯到io_service_my的解析器resolver_my。而後讓解析器resolver_my執行resolve
()函數來解析query_my指定的address和port到一個終端節點endpoint_my上。咱們會看到這個endpoint_my終端節點會被綁定到這個acceptor_my接收器上。
3. boost::asio::ip::tcp::socket socket_my(io_service_my);
定義一個基於TCP協議的socket關聯到io_service_my對象上。
在這些準備工做作完後咱們開始一些實際的動做:
/*
* 打開一個使用由endpoint_my指定的協議類型的接收器,這個protocol()函數會自動返回與endpoint_my關聯的協
* 議類型。
*/
acceptor_my.open(endpoint_my.protocol());

/*
* 設置選項容許socket綁定到一個已經正在被使用的地址。
*/
acceptor_my.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true);

/*
* 把接收器綁定到已經被設置過的endpoint_my。
*/
acceptor_my.bind(endpoint_my);

/*
* 接收器開始偵聽。
*/
acceptor_my.listen();

/*
* 以同步/異步方式開始接收鏈接。
*/
acceptor_my.accept(socket_my) //同步
acceptor_my.async_accept(socket_my,
  boost::bind(&handle_accept,boost::asio::placeholders::error));//異步
其中異步的偵聽函數原型是:
template<
    typename SocketService,
    typename AcceptHandler>
void async_accept(
    basic_socket< protocol_type, SocketService > & peer,
    AcceptHandler handler);
handler所對應的函數在新鏈接接收完成後會被調用。這種異步方式實現回調的方法也相似於使用boost::asio::io_service::post(boost::bind(&handle_accept));
注意到bind函數中的&handle_accept,這是函數handle_accept的入口地址,也就是在接收完成後會調用的函數在這裏咱們能夠繼續進行下一步的處理,從socket_my中讀取或者寫
入數據。

/*
* 調用異步讀函數,把接收的數據拷貝到buffer緩存中,其中buffer是由參數buffer_my構造,
* 而buffer_my自己能夠是一個容器例如boost::array<char,8192> buffer_my,表示申請了
* 一個8K個char字符型空間。也能夠使用例外一種方法實現buffer的構造例如
* boost::asio::buffer(data,size_t);其中data表示指向某種數據類型的指針,
* size_t則表示包含多少個該數據類型的元素。
*/
void handle_accept(boost::system::error_code error)
{
if(!error)
{
  boost::array<char, 8192> buffer_my;
  boost::asio::async_read(socket_my, boost::asio::buffer(buffer_my),
   boost::bind(&handle_read, boost::asio::placeholders::error));

}
相似的寫程序以下
boost::asio::async_write(socket_my, boost::asio::buffer(buffer_my),
   boost::bind(&handle_write, 
     boost::asio::placeholders::error,
     boost::asio::placeholders::bytes_transferred));

最後在全部鏈接完成以後或是服務器中止的時候別忘記關掉鏈接。例如
socket_my.close();
acceptor_my.close();
至此一個基於boost::asio庫的網絡程序的框架就出來了,至於具體的設計類實現能夠視需求而定。

 

usidc5 2011-06-25 15:40
摘要:本文經過形像而活潑的語言簡單地介紹了Boost::asio庫的使用,做爲asio的一個入門介紹是很是合適的,能夠給人一種新鮮的感受,同時也能讓體驗到asio的主要內容。本文來自網絡,原文在這裏。


目錄 [隱藏]
ASIO的同步方式
自我介紹
示例代碼
小結
ASIO的異步方式
自我介紹
示例代碼
小結
ASIO的「便民措施」
端點
超時
統一讀寫接口
基於流的操做
用ASIO編寫UDP通訊程序
用ASIO讀寫串行口
演示代碼
Boost.Asio是一個跨平臺的網絡及底層IO的C++編程庫,它使用現代C++手法實現了統一的異步調用模型。


ASIO的同步方式
ASIO庫可以使用TCP、UDP、ICMP、串口來發送/接收數據,下面先介紹TCP協議的讀寫操做。對於讀寫方式,ASIO支持同步和異步兩種方式,首先登場的是同步方式,下面請同步方式自我介紹一下。


自我介紹
你們好!我是同步方式!


個人主要特色就是執着!全部的操做都要完成或出錯纔會返回,不過偶的執着被你們稱之爲阻塞,實在是鬱悶~~(場下一片噓聲),其實這樣 也是有好處的,好比邏輯清晰,編程比較容易。


在服務器端,我會作個socket交給acceptor對象,讓它一直等客戶端連進來,連上之後再經過這個socket與客戶端通訊, 而全部的通訊都是以阻塞方式進行的,讀完或寫完纔會返回。


在客戶端也同樣,這時我會拿着socket去鏈接服務器,固然也是連上或出錯了才返回,最後也是以阻塞的方式和服務器通訊。


有人認爲同步方式沒有異步方式高效,其實這是片面的理解。在單線程的狀況下可能確實如此,我不能利用耗時的網絡操做這段時間作別的事 情,不是好的統籌方法。不過這個問題能夠經過多線程來避免,好比在服務器端讓其中一個線程負責等待客戶端鏈接,鏈接進來後把socket交給另外的線程去 和客戶端通訊,這樣與一個客戶端通訊的同時也能接受其它客戶端的鏈接,主線程也徹底被解放了出來。


個人介紹就有這裏,謝謝你們!


示例代碼
好,感謝同步方式的自我介紹,如今放出同步方式的演示代碼(起立鼓掌!)。


服務器端


#include <iostream>
#include <boost/asio.hpp>


int main(int argc, char* argv[])
{
        using namespace boost::asio;
        // 全部asio類都須要io_service對象
        io_service iosev;
        ip::tcp::acceptor acceptor(iosev, 
        ip::tcp::endpoint(ip::tcp::v4(), 1000));
        for(;;)
        {
                // socket對象
                ip::tcp::socket socket(iosev);
                // 等待直到客戶端鏈接進來
                acceptor.accept(socket);
                // 顯示鏈接進來的客戶端
                std::cout << socket.remote_endpoint().address() << std::endl;
                // 向客戶端發送hello world!
                boost::system::error_code ec;
                socket.write_some(buffer("hello world!"), ec);


                // 若是出錯,打印出錯信息
                if(ec)
                {
                        std::cout << 
                                boost::system::system_error(ec).what() << std::endl;
                        break;
                }
                // 與當前客戶交互完成後循環繼續等待下一客戶鏈接
        }
        return 0;
}
客戶端


#include <iostream>
#include <boost/asio.hpp>


int main(int argc, char* argv[])
{
        using namespace boost::asio;


        // 全部asio類都須要io_service對象
        io_service iosev;
        // socket對象
        ip::tcp::socket socket(iosev);
        // 鏈接端點,這裏使用了本機鏈接,能夠修改IP地址測試遠程鏈接
        ip::tcp::endpoint ep(ip::address_v4::from_string("127.0.0.1"), 1000);
        // 鏈接服務器
        boost::system::error_code ec;
        socket.connect(ep,ec);
        // 若是出錯,打印出錯信息
        if(ec)
        {
                std::cout << boost::system::system_error(ec).what() << std::endl;
                return -1;
        }
        // 接收數據
        char buf[100];
        size_t len=socket.read_some(buffer(buf), ec);
        std::cout.write(buf, len);


        return 0;
}
小結
從演示代碼能夠得知


ASIO的TCP協議經過boost::asio::ip名 空間下的tcp類進行通訊。
IP地址(address,address_v4,address_v6)、 端口號和協議版本組成一個端點(tcp:: endpoint)。用於在服務器端生成tcp::acceptor對 象,並在指定端口上等待鏈接;或者在客戶端鏈接到指定地址的服務器上。
socket是 服務器與客戶端通訊的橋樑,鏈接成功後全部的讀寫都是經過socket對 象實現的,當socket析 構後,鏈接自動斷 開。
ASIO讀寫所用的緩衝區用buffer函 數生成,這個函數生成的是一個ASIO內部使用的緩衝區類,它能把數組、指針(同時指定大 小)、std::vector、std::string、boost::array包裝成緩衝區類。
ASIO中的函數、類方法都接受一個boost::system::error_code類 型的數據,用於提供出錯碼。它能夠轉換成bool測試是否出錯,並經過boost::system::system_error類 得到詳細的出錯信息。另外,也能夠不向ASIO的函數或方法提供 boost::system::error_code,這時若是出錯的話就會直 接拋出異常,異常類型就是boost::system:: system_error(它是從std::runtime_error繼承的)。
ASIO的異步方式
嗯?異步方式好像有點坐不住了,那就請異步方式上場,你們歡迎...


自我介紹
你們好,我是異步方式


和同步方式不一樣,我歷來不花時間去等那些龜速的IO操做,我只是向系統說一聲要作什麼,而後就能夠作其它事去了。若是系統完成了操做, 系統就會經過我以前給它的回調對象來通知我。


在ASIO庫中,異步方式的函數或方法名稱前面都有「async_」 前綴,函數參數裏會要求放一個回調函數(或仿函數)。異步操做執行 後無論有沒有完成都會當即返回,這時能夠作一些其它事,直到回調函數(或仿函數)被調用,說明異步操做已經完成。


在ASIO中不少回調函數都只接受一個boost::system::error_code參數,在實際使用時確定是不夠的,因此通常 使用仿函數攜帶一堆相關數據做爲回調,或者使用boost::bind來綁定一堆數據。


另外要注意的是,只有io_service類的run()方法運行以後回調對象纔會被調用,不然即便系統已經完成了異步操做也不會有任 務動做。


示例代碼
好了,就介紹到這裏,下面是我帶來的異步方式TCP Helloworld 服務器端:


#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/smart_ptr.hpp>


using namespace boost::asio;
using boost::system::error_code;
using ip::tcp;


struct CHelloWorld_Service
{
        CHelloWorld_Service(io_service &iosev)
                :m_iosev(iosev),m_acceptor(iosev, tcp::endpoint(tcp::v4(), 1000))
        {}


        void start()
        {
                // 開始等待鏈接(非阻塞)
                boost::shared_ptr<tcp::socket> psocket(new tcp::socket(m_iosev));
                // 觸發的事件只有error_code參數,因此用boost::bind把socket綁定進去
                m_acceptor.async_accept(*psocket,
                        boost::bind(&CHelloWorld_Service::accept_handler, this, psocket, _1));
        }


        // 有客戶端鏈接時accept_handler觸發
        void accept_handler(boost::shared_ptr<tcp::socket> psocket, error_code ec)
        {
                if(ec) return;
                // 繼續等待鏈接
                start();
                // 顯示遠程IP
                std::cout << psocket->remote_endpoint().address() << std::endl;
                // 發送信息(非阻塞)
                boost::shared_ptr<std::string> pstr(new std::string("hello async world!"));
                psocket->async_write_some(buffer(*pstr),
                        boost::bind(&CHelloWorld_Service::write_handler, this, pstr, _1, _2));
        }


        // 異步寫操做完成後write_handler觸發
        void write_handler(boost::shared_ptr<std::string> pstr, error_code ec,
                size_t bytes_transferred)
        {
                if(ec)
                std::cout<< "發送失敗!" << std::endl;
                else
                std::cout<< *pstr << " 已發送" << std::endl;
        }


        private:
                io_service &m_iosev;
                ip::tcp::acceptor m_acceptor;
};


int main(int argc, char* argv[])
{
        io_service iosev;
        CHelloWorld_Service sev(iosev);
        // 開始等待鏈接
        sev.start();
        iosev.run();


        return 0;
}
小結
在這個例子中,首先調用sev.start()開 始接受客戶端鏈接。因爲async_accept調 用後當即返回,start()方 法 也就立刻完成了。sev.start()在 瞬間返回後iosev.run()開 始執行,iosev.run()方法是一個循環,負責分發異步回調事件,只 有全部異步操做所有完成纔會返回。


這裏有個問題,就是要保證start()方法中m_acceptor.async_accept操 做所用的tcp::socket對 象 在整個異步操做期間保持有效(不 然系統底層異步操做了一半忽然發現tcp::socket沒了,不是拿人家開涮嘛-_-!!!),並且客戶端鏈接進來後這個tcp::socket對象還 有用呢。這裏的解決辦法是使用一個帶計數的智能指針boost::shared_ptr,並把這個指針做爲參數綁定到回調函數上。


一旦有客戶鏈接,咱們在start()裏給的回調函數accept_handler就會被 調用,首先調用start()繼續異步等待其 它客戶端的鏈接,而後使用綁定進來的tcp::socket對象與當前客戶端通訊。


發送數據也使用了異步方式(async_write_some), 一樣要保證在整個異步發送期間緩衝區的有效性,因此也用boost::bind綁定了boost::shared_ptr。


對於客戶端也同樣,在connect和read_some方法前加一個async_前綴,而後加入回調便可,你們本身練習寫一寫。


ASIO的「便民措施」
asio中提供一些便利功能,如此能夠實現許多方便的操做。


端點
回到前面的客戶端代碼,客戶端的鏈接很簡單,主要代碼就是兩行:


...
// 鏈接
socket.connect(endpoint,ec);
...
// 通訊
socket.read_some(buffer(buf), ec);
不過鏈接以前咱們必須獲得鏈接端點endpoint,也就是服務器地址、端口號以及所用的協議版本。


前面的客戶端代碼假設了服務器使用IPv4協議,服務器IP地址爲127.0.0.1,端口號爲1000。實際使用的狀況是,咱們常常只能知道服務器網絡ID,提供的服務類型,這時咱們就得使用ASIO提供的tcp::resolver類來取得服務器的端點了。





好比咱們要取得163網站的首頁,首先就要獲得「www.163.com」服務器的HTTP端點:


io_service iosev;
ip::tcp::resolver res(iosev);
ip::tcp::resolver::query query("www.163.com","80"); //www.163.com 80端口
ip::tcp::resolver::iterator itr_endpoint = res.resolve(query);
這裏的itr_endpoint是一個endpoint的迭代器,服務器的同一端口上可能不止一個端點,好比同時有IPv4和IPv6 兩種。如今,遍歷這些端點,找到可用的:


// 接上面代碼
ip::tcp::resolver::iterator itr_end; //無參數構造生成end迭代器
ip::tcp::socket socket(iosev);
boost::system::error_code ec = error::host_not_found;
for(;ec && itr_endpoint!=itr_end;++itr_endpoint)
{
        socket.close();
        socket.connect(*itr_endpoint, ec);
}
若是鏈接上,錯誤碼ec被清空,咱們就能夠與服務器通訊了:


if(ec)
{
        std::cout << boost::system::system_error(ec).what() << std::endl;
        return -1;
}
// HTTP協議,取根路徑HTTP源碼
socket.write_some(buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
for(;;)
{
        char buf[128];
        boost::system::error_code error;
        size_t len = socket.read_some(buffer(buf), error);
        // 循環取數據,直到取完爲止
        if(error == error::eof)
        break;
        else if(error)
        {
                std::cout << boost::system::system_error(error).what() << std::endl;
                return -1;
        }


        std::cout.write(buf, len);
}
當全部HTTP源碼下載了之後,服務器會主動斷開鏈接,這時客戶端的錯誤碼獲得boost::asio::error::eof,咱們 要根據它來斷定是否跳出循環。


ip::tcp::resolver::query的構造函數接受服務器名和服務名。前面的服務名咱們直接使用了端口號"80",有時 咱們也能夠使用別名,用記事本打開%windir%\system32\drivers\etc\services文件(Windows環境),能夠看到 一堆別名及對應的端口,如:


echo           7/tcp                 # Echo
ftp           21/tcp                 # File Transfer Protocol (Control)
telnet        23/tcp                 # Virtual Terminal Protocol
smtp          25/tcp                 # Simple Mail Transfer Protocol
time          37/tcp  timeserver     # Time
好比要鏈接163網站的telnet端口(若是有的話),能夠這樣寫:


ip::tcp::resolver::query query("www.163.com","telnet");
ip::tcp::resolver::iterator itr_endpoint = res.resolve(query);
超時
在網絡應用裏,經常要考慮超時的問題,否則鏈接後半天沒反應誰也受不了。


ASIO庫提供了deadline_timer類來支持定時觸發,它的用法是:


// 定義定時回調
void print(const boost::system::error_code& /*e*/)
{
        std::cout << "Hello, world! ";
}

deadline_timer timer;
// 設置5秒後觸發回調
timer.expires_from_now(boost::posix_time::seconds(5));
timer.async_wait(print);
這段代碼執行後5秒鐘時打印Hello World!


咱們能夠利用這種定時機制和異步鏈接方式來實現超時取消:


deadline_timer timer;
// 異步鏈接
socket.async_connect(my_endpoint, connect_handler/*鏈接回調*/);
// 設置超時
timer.expires_from_now(boost::posix_time::seconds(5));
timer.async_wait(timer_handler);
...
// 超時發生時關閉socket
void timer_handler()
{
        socket.close();
}
最後不要忘了io_service的run()方法。


統一讀寫接口
除了前面例子所用的tcp::socket讀寫方法(read_some, write_some等)之外,ASIO也提供了幾個讀寫函數,主要有這麼幾個:


read、write、read_until、write_until
固然還有異步版本的


async_read、async_write、async_read_until、async_write_until
這些函數能夠以統一的方式讀寫TCP、串口、HANDLE等類型的數據流。


咱們前面的HTTP客戶端代碼能夠這樣改寫:


...
//socket.write_some(buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
write(socket,buffer("GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 "));
...
//size_t len = socket.read_some(buffer(buf), error);
size_t len = read(socket, buffer(buf), transfer_all() ,error);
if(len) std::cout.write(buf, len);
這個read和write有多個重載,一樣,有錯誤碼參數的不會拋出異常而無錯誤碼參數的若出錯則拋出異常。


本例中read函數裏的transfer_all()是一個稱爲CompletionCondition的對象,表示讀取/寫入直接緩 衝區裝滿或出錯爲止。另外一個可選的是transfer_at_least(size_t),表示至少要讀取/寫入多少個字符。


read_until和write_until用於讀取直到某個條件知足爲止,它接受的參數再也不是buffer,而是boost::asio:: streambuf。


好比咱們能夠把咱們的HTTP客戶端代碼改爲這樣:


boost::asio::streambuf strmbuf;
size_t len = read_until(socket,strmbuf," ",error);
std::istream is(&strmbuf);
is.unsetf(std::ios_base::skipws);
// 顯示is流裏的內容
std::copy(std::istream_iterator<char>(is),
    std::istream_iterator<char>(),
    std::ostream_iterator<char>(std::cout));
基於流的操做
對於TCP協議來講,ASIO還提供了一個tcp::iostream。用它能夠更簡單地實現咱們的HTTP客戶端:


ip::tcp::iostream stream("www.163.com", "80");
if(stream)
{
        // 發送數據
        stream << "GET <a href="http://www.163.com" title="http://www.163.com">http://www.163.com</a> HTTP/1.0 ";
        // 不要忽略空白字符
        stream.unsetf(std::ios_base::skipws);
        // 顯示stream流裏的內容
        std::copy(std::istream_iterator<char>(stream),
        std::istream_iterator<char>(),
        std::ostream_iterator<char>(std::cout));
}
用ASIO編寫UDP通訊程序
ASIO的TCP協議經過boost::asio::ip名空間下的tcp類進行通訊,舉一返三:ASIO的UDP協議經過boost::asio::ip名空間下的udp類進行通訊。


咱們知道UDP是基於數據報模式的,因此事先不須要創建鏈接。就象寄信同樣,要寄給誰只要寫上地址往門口的郵箱一丟,其它的事各級郵局 包辦;要收信用只要看看自家信箱裏有沒有信件就行(或問門口傳達室老大爺)。在ASIO裏,就是udp::socket的send_to和receive_from方法(異步版本是async_send_to和asnync_receive_from)。


下面的示例代碼是從ASIO官方文檔裏拿來的(實在想不出更好的例子了:-P):


服務器端代碼


//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2008 Christopher M. Kohlhoff 
// (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. 
// (See accompanying
// file LICENSE_1_0.txt or 
// copy at <a href="http://www.boost.org/LICENSE_1_0.txt" title="http://www.boost.org/LICENSE_1_0.txt">http://www.boost.org/LICENSE_1_0.txt</a>)
//


#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/asio.hpp>


using boost::asio::ip::udp;


std::string make_daytime_string()
{
        using namespace std; // For time_t, time and ctime;
        time_t now = time(0);
        return ctime(&now);
}


int main()
{
        try
        {
                boost::asio::io_service io_service;
                // 在本機13端口創建一個socket
                udp::socket socket(io_service, udp::endpoint(udp::v4(), 13));


                for (;;)
                {
                        boost::array<char, 1> recv_buf;
                        udp::endpoint remote_endpoint;
                        boost::system::error_code error;
                        // 接收一個字符,這樣就獲得了遠程端點(remote_endpoint)
                        socket.receive_from(boost::asio::buffer(recv_buf),
                        remote_endpoint, 0, error);


                        if (error && error != boost::asio::error::message_size)
                                throw boost::system::system_error(error);


                        std::string message = make_daytime_string();
                        // 向遠程端點發送字符串message(當前時間)    
                        boost::system::error_code ignored_error;
                        socket.send_to(boost::asio::buffer(message),
                        remote_endpoint, 0, ignored_error);
                }
        }
        catch (std::exception& e)
        {
                std::cerr << e.what() << std::endl;
        }


        return 0;
}
客戶端代碼


//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2008 Christopher M. Kohlhoff
// (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. 
// (See accompanying file LICENSE_1_0.txt or
//  copy at <a href="http://www.boost.org/LICENSE_1_0.txt" title="http://www.boost.org/LICENSE_1_0.txt">http://www.boost.org/LICENSE_1_0.txt</a>)
//


#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>


using boost::asio::ip::udp;


int main(int argc, char* argv[])
{
        try
        {
                if (argc != 2)
                {
                        std::cerr << "Usage: client <host>" << std::endl;
                        return 1;
                }


                boost::asio::io_service io_service;
                // 取得命令行參數對應的服務器端點
                udp::resolver resolver(io_service);
                udp::resolver::query query(udp::v4(), argv[1], "daytime");
                udp::endpoint receiver_endpoint = *resolver.resolve(query);


                udp::socket socket(io_service);
                socket.open(udp::v4());
                // 發送一個字節給服務器,讓服務器知道咱們的地址
                boost::array<char, 1> send_buf  = { 0 };
                socket.send_to(boost::asio::buffer(send_buf), receiver_endpoint);
                // 接收服務器發來的數據
                boost::array<char, 128> recv_buf;
                udp::endpoint sender_endpoint;
                size_t len = socket.receive_from(
                boost::asio::buffer(recv_buf), sender_endpoint);


                std::cout.write(recv_buf.data(), len);
        }
        catch (std::exception& e)
        {
                std::cerr << e.what() << std::endl;
        }


        return 0;
}
用ASIO讀寫串行口
ASIO不只支持網絡通訊,還能支持串口通訊。要讓兩個設備使用串口通訊,關鍵是要設置好正確的參數,這些參數是:波特率、奇偶校驗 位、中止位、字符大小和流量控制。兩個串口設備只有設置了相同的參數才能互相交談。


ASIO提供了boost::asio::serial_port類,它有一個set_option(const SettableSerialPortOption& option)方法就是用於設置上面列舉的這些參數的,其中的option能夠是:


serial_port::baud_rate 波特率,構造參數爲unsigned int
serial_port::parity 奇偶校驗,構造參數爲serial_port::parity::type,enum類型,能夠是none, odd, even。
serial_port::flow_control 流量控制,構造參數爲serial_port::flow_control::type,enum類型,能夠是none software hardware
serial_port::stop_bits 中止位,構造參數爲serial_port::stop_bits::type,enum類型,能夠是one onepointfive two
serial_port::character_size 字符大小,構造參數爲unsigned int
演示代碼
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


using namespace std;
using namespace boost::asio;


int main(int argc, char* argv[])
{
        io_service iosev;
        // 串口COM1, Linux下爲「/dev/ttyS0」
        serial_port sp(iosev, "COM1");
        // 設置參數
        sp.set_option(serial_port::baud_rate(19200));
        sp.set_option(serial_port::flow_control(serial_port::flow_control::none));
        sp.set_option(serial_port::parity(serial_port::parity::none));
        sp.set_option(serial_port::stop_bits(serial_port::stop_bits::one));
        sp.set_option(serial_port::character_size(8));
        // 向串口寫數據
        write(sp, buffer("Hello world", 12));


        // 向串口讀數據
        char buf[100];
        read(sp, buffer(buf));


        iosev.run();
        return 0;
}
上面這段代碼有個問題,read(sp, buffer(buf))非得讀滿100個字符纔會返回,串口通訊有時咱們確實能知道對方發過來的字符長度,有時候是不能的。


若是知道對方發過來的數據裏有分隔符的話(好比空格做爲分隔),能夠使用read_until來讀,好比:


boost::asio::streambuf buf;
// 一直讀到遇到空格爲止
read_until(sp, buf, ' ');
copy(istream_iterator<char>(istream(&buf)>>noskipws),
        istream_iterator<char>(),
        ostream_iterator<char>(cout));
另一個方法是使用前面說過的異步讀寫+超時的方式,代碼以下:


#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


using namespace std;
using namespace boost::asio;
void handle_read(char *buf,boost::system::error_code ec,
std::size_t bytes_transferred)
{
        cout.write(buf, bytes_transferred);
}


int main(int argc, char* argv[])
{
        io_service iosev;
        serial_port sp(iosev, "COM1");
        sp.set_option(serial_port::baud_rate(19200));
        sp.set_option(serial_port::flow_control());
        sp.set_option(serial_port::parity());
        sp.set_option(serial_port::stop_bits());
        sp.set_option(serial_port::character_size(8));


        write(sp, buffer("Hello world", 12));


        // 異步讀
        char buf[100];
        async_read(sp, buffer(buf), boost::bind(handle_read, buf, _1, _2));
        // 100ms後超時
        deadline_timer timer(iosev);
        timer.expires_from_now(boost::posix_time::millisec(100));
        // 超時後調用sp的cancel()方法放棄讀取更多字符
        timer.async_wait(boost::bind(&serial_port::cancel, boost::ref(sp)));


        iosev.run();
        return 0;
}


 

usidc5 2011-07-08 18:32

asio自帶的例子裏是用deadline_timer的async_wait方法來實現超時的,這種方法須要單獨寫一個回調函數,不利於把鏈接和超時封裝到單個函數裏。傳統的Winsock編程能夠先把socket設爲非阻塞,而後connect,再用select來判斷超時,asio也能夠這樣作,惟一「非主流」的是asio裏沒有一個相似select的函數,因此得調用原始的Winsock API,也就犧牲了跨平臺:前端

  1. #include <iostream>  
  2. #include <boost/asio.hpp>  
  3.    
  4. int main()  
  5. {  
  6.     boost::asio::io_service ios;  
  7.     boost::asio::ip::tcp::socket s(ios);  
  8.     boost::system::error_code ec;  
  9.    
  10.     s.open(boost::asio::ip::tcp::v4());  
  11.     // 設爲非阻塞  
  12.     s.io_control(boost::asio::ip::tcp::socket::non_blocking_io(true));  
  13.     // connect時必須指定error_code參數,不然會有異常拋出  
  14.     s.connect(  
  15.         boost::asio::ip::tcp::endpoint(  
  16.         boost::asio::ip::address::from_string("192.168.1.1"), 80)  
  17.         , ec);  
  18.     fd_set fdWrite;  
  19.     FD_ZERO(&fdWrite);  
  20.     FD_SET(s.native(), &fdWrite);  
  21.     timeval tv = {5};    // 5秒超時  
  22.     if (select(0, NULL, &fdWrite, NULL, &tv) <= 0   
  23.         || !FD_ISSET(s.native(), &fdWrite))  
  24.     {  
  25.         std::cout << "超時/出錯啦" << std::endl;  
  26.         s.close();  
  27.         return 0;  
  28.     }  
  29.     // 設回阻塞  
  30.     s.io_control(boost::asio::ip::tcp::socket::non_blocking_io(false));  
  31.     std::cout << "鏈接成功" << std::endl;  
  32.     s.close();  
  33.    
  34.     return 0;  

 

usidc5 2011-07-08 18:34

全部的 asio 類都只要包含頭文件:   "asio.hpp"


例子1:   使用一個同步的定時器

#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>   //使用時間間隔
int main()
{
//全部使用 asio 的程序都必須至少擁有一個 io_service 類型的對象. 
//它提供 I/O 功能. 
boost::asio::io_service io;
//建立一個定時器 deadline_timer 的對象. 
//這種定時器有兩種狀態: 超時 和 未超時.
//在超時的狀態下. 調用它的 wait() 函數會當即返回. 
//在未超時的狀況下則阻塞. 直到變爲超時狀態.
//它的構造函數參數爲: 一個 io_service 對象(asio中主要提供IO的類都用io_service對象作構造函數第一個參數).
//                     超時時間.
//從建立開始. 它就進入 "未超時"狀態. 且持續指定的時間. 轉變到"超時"狀態.
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
//這裏等待定時器 t 超時. 因此會阻塞5秒.
t.wait();
std::cout << "Hello, world!\n";
return 0;
}






例子2: 使用一個異步的定時器
//一個將被定時器異步調用的函數. 
void print(const boost::system::error_code& /*e*/)
{
std::cout << "Hello, world!\n";
}
int main()
{
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
//和例子1不一樣. 這裏調用 async_wait() 執行一個異步的等待. 它註冊一個可執行體(即此處的print函數).   //這裏不懂的是: print的參數怎麼傳入?
//實際上. 這個執行體被註冊到 deadline_timer 類的 io_service 成員上(即本例的 io 對象). 只有在之後調用 io.run() 時這些註冊的執行體纔會被真正執行. 
t.async_wait(print);
//調用 io對象的 run() 函數執行那些被註冊的執行體. 
//這個函數不會當即返回. 除非和他相關的定時器對象超時而且在定時器超時後執行完全部註冊的執行體. 以後才返回. 
//因此它在這裏阻塞一下子. 等t超時後執行完print. 才返回.
//這裏要注意的是. 調用 io.run() 能夠放在其它線程中. 那樣全部的回調函數都在別的線程上運行.
io.run();
return 0;
}




例子3: 向超時回調函數綁定參數

// 這個例子中. 每次 定時器超時後. 都修改定時器的狀態到"未超時". 再註冊回調函數. 這樣循環 5 次. 因此 print 會被執行 5 次.
void print(const boost::system::error_code& /*e*/,
    boost::asio::deadline_timer* t, int* count)
{
if (*count < 5)
{
    std::cout << *count << "\n";
    ++(*count);
    //能夠用 deadline_timer::expires_at() 來 獲取/設置 超時的時間點. 
    //在這裏咱們將超時的時間點向後推遲一秒. 
    t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
    //再次向 t 中的 io_service 對象註冊一個回掉函數. 
    // 注意這裏綁定時. 指定了綁定到 print 的第一個參數爲: boost::asio::placeholders::error //不懂. 這個error是什麼東西. 爲何在例子2中不要綁定它?
    t->async_wait(boost::bind(print,
          boost::asio::placeholders::error, t, count));
}
}
int main()
{
boost::asio::io_service io;
int count = 0;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(1));
t.async_wait(boost::bind(print,
        boost::asio::placeholders::error, &t, &count));
io.run();
std::cout << "Final count is " << count << "\n";
return 0;
}



例子4: 多線程處理定時器的回掉函數. 同步的問題.
前面的例子都只在一個線程中調用 boost::asio::io_service::run() 函數. 
向定時器註冊的回掉函數都是在調用該 run() 的線程中執行.
但實際上這個 run() 函數能夠在多個線程中同時被調用. 例如:
boost::asio::io_service io; 
//兩個定時器
boost::asio::deadline_timer t1(io, boost::posix_time::seconds(1));
t1.async_wait(func1);   
boost::asio::deadline_timer t2(io, boost::posix_time::seconds(1));
t2.async_wait(func2); 

因爲向 io 註冊了多個cmd. 這裏爲了效率咱們想讓這些cmd並行執行:
boost::thread thread1(bind(&boost::asio::io_service::run, &io);
boost::thread thread2(bind(&boost::asio::io_service::run, &io);
thread1.join();
thread2.join();
這裏在多個線程中調用 io.run() 因此咱們註冊的cmd可能在任何一個線程中運行. 
這些線程會一直等待io對象相關的定時器超時並執行相關的 cmd. 直到全部的定時器都超時. run函數才返回. 線程才結束.
但這裏有一個問題: 咱們向定時器註冊的 func1 和 func2 . 它們可能會同時訪問全局的對象(好比 cout ). 
這時咱們但願對 func1 和 func2 的調用是同步的. 即執行其中一個的時候. 另外一個要等待.
這就用到了 boost::asio::strand 類. 它能夠把幾個cmd包裝成同步執行的. 例如前面咱們向定時器註冊 func1 和 func2 時. 能夠改成:
boost::asio::strand the_strand;
t1.async_wait(the_strand.wrap(func1));      //包裝爲同步執行的
t2.async_wait(the_strand.wrap(func2)); 
這樣就保證了在任什麼時候刻. func1 和 func2 都不會同時在執行.

 

usidc5 2011-07-08 18:35
 
 
  • // test.cpp : 定義控制檯應用程序的入口點。  
 
  • //  
 
  •   
 
  • #include "stdafx.h"  
 
  • #include <boost/asio.hpp>  
 
  • #include <boost/bind.hpp>  
 
  • #include <boost/date_time/posix_time/posix_time_types.hpp>  
 
  • #include <iostream>  
 
  •   
 
  • using namespace boost::asio;  
 
  • using boost::asio::ip::tcp;  
 
  •   
 
  • class connect_handler  
 
  • {  
 
  • public:  
 
  •     connect_handler(io_service& ios)  
 
  •         : io_service_(ios),  
 
  •         timer_(ios),  
 
  •         socket_(ios)  
 
  •     {  
 
  •         socket_.async_connect(  
 
  •             tcp::endpoint(boost::asio::ip::address_v4::loopback(), 3212),  
 
  •             boost::bind(&connect_handler::handle_connect, this,  
 
  •             boost::asio::placeholders::error));  
 
  •   
 
  •         timer_.expires_from_now(boost::posix_time::seconds(5));  
 
  •         timer_.async_wait(boost::bind(&connect_handler::close, this));  
 
  •     }  
 
  •   
 
  •     void handle_connect(const boost::system::error_code& err)  
 
  •     {  
 
  •         if (err)  
 
  •         {  
 
  •             std::cout << "Connect error: " << err.message() << "\n";  
 
  •         }  
 
  •         else  
 
  •         {  
 
  •             std::cout << "Successful connection\n";  
 
  •         }  
 
  •     }  
 
  •   
 
  •     void close()  
 
  •     {  
 
  •         socket_.close();  
 
  •     }  
 
  •   
 
  • private:  
 
  •     io_service& io_service_;  
 
  •     deadline_timer timer_;  
 
  •     tcp::socket socket_;  
 
  • };  
 
  •   
 
  • int main()  
 
  • {  
 
  •     try  
 
  •     {  
 
  •         io_service ios;  
 
  •         tcp::acceptor a(ios, tcp::endpoint(tcp::v4(), 32123), 1);  
 
  •   
 
  •         // Make lots of connections so that at least some of them will block.  
 
  •         connect_handler ch1(ios);  
 
  •         //connect_handler ch2(ios);  
 
  •         //connect_handler ch3(ios);  
 
  •         //connect_handler ch4(ios);  
 
  •         //connect_handler ch5(ios);  
 
  •         //connect_handler ch6(ios);  
 
  •         //connect_handler ch7(ios);  
 
  •         //connect_handler ch8(ios);  
 
  •         //connect_handler ch9(ios);  
 
  •   
 
  •         ios.run();  
 
  •     }  
 
  •     catch (std::exception& e)  
 
  •     {  
 
  •         std::cerr << "Exception: " << e.what() << "\n";  
 
  •     }  
 
  •   
 
  •     return 0;  
 
  • }  
 

 

usidc5 2011-07-08 19:49
服務器代碼:


Servier.cpp
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using boost::asio::ip::tcp;
#define max_len 1024
class clientSession
:public boost::enable_shared_from_this<clientSession>
{
public:
clientSession(boost::asio::io_service& ioservice)
:m_socket(ioservice)
{
memset(data_,‘\0′,sizeof(data_));
}
~clientSession()
{}
tcp::socket& socket()
{
return m_socket;
}
void start()
{
boost::asio::async_write(m_socket,
boost::asio::buffer(「link successed!」),
boost::bind(&clientSession::handle_write,shared_from_this(),
boost::asio::placeholders::error));
/*async_read跟客戶端同樣,仍是不能進入handle_read函數,若是你能找到問題所在,請告訴我,謝謝*/
// --已經解決,boost::asio::async_read(...)讀取的字節長度不能大於數據流的長度,不然就會進入
// ioservice.run()線程等待,read後面的就不執行了。
//boost::asio::async_read(m_socket,boost::asio::buffer(data_,max_len),


//         boost::bind(&clientSession::handle_read,shared_from_this(),


//         boost::asio::placeholders::error));
//max_len能夠換成較小的數字,就會發現async_read_some能夠連續接收未收完的數據


m_socket.async_read_some(boost::asio::buffer(data_,max_len),
boost::bind(&clientSession::handle_read,shared_from_this(),
boost::asio::placeholders::error));
}
private:
void handle_write(const boost::system::error_code& error)
{
if(error)
{
m_socket.close();
}
}
void handle_read(const boost::system::error_code& error)
{
if(!error)
{
std::cout << data_ << std::endl;
//boost::asio::async_read(m_socket,boost::asio::buffer(data_,max_len),


//     boost::bind(&clientSession::handle_read,shared_from_this(),


//     boost::asio::placeholders::error));


m_socket.async_read_some(boost::asio::buffer(data_,max_len),
boost::bind(&clientSession::handle_read,shared_from_this(),
boost::asio::placeholders::error));
}
else
{
m_socket.close();
}
}
private:
tcp::socket m_socket;
char data_[max_len];
};
class serverApp
{
typedef boost::shared_ptr<clientSession> session_ptr;
public:
serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint)
:m_ioservice(ioservice),
acceptor_(ioservice,endpoint)
{
session_ptr new_session(new clientSession(ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
~serverApp()
{
}
private:
void handle_accept(const boost::system::error_code& error,session_ptr& session)
{
if(!error)
{
std::cout << 「get a new client!」 << std::endl;
//實現對每一個客戶端的數據處理


session->start();
//在這就應該看出爲何要封session類了吧,每個session就是一個客戶端


session_ptr new_session(new clientSession(m_ioservice));
acceptor_.async_accept(new_session->socket(),
boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
new_session));
}
}
private:
boost::asio::io_service& m_ioservice;
tcp::acceptor acceptor_;
};
int main(int argc , char* argv[])
{
boost::asio::io_service myIoService;
short port = 8100/*argv[1]*/;
//咱們用的是inet4


tcp::endpoint endPoint(tcp::v4(),port);
//終端(能夠看做sockaddr_in)完成後,就要accept了


serverApp sa(myIoService,endPoint);
//數據收發邏輯


myIoService.run();
return 0;
}
客戶端代碼:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
using boost::asio::ip::tcp;
class client
{
public:
client(boost::asio::io_service& io_service,tcp::endpoint& endpoint)
: socket(io_service)//這裏就把socket實例化了
{
//鏈接服務端 connect
socket.async_connect(endpoint,
boost::bind(&client::handle_connect,this,boost::asio::placeholders::error)
);
memset(getBuffer,‘\0′,1024);
}
~client()
{}
private:
void handle_connect(const boost::system::error_code& error)
{
if(!error)
{
//一連上,就向服務端發送信息
boost::asio::async_write(socket,boost::asio::buffer(「hello,server!」),
boost::bind(&client::handle_write,this,boost::asio::placeholders::error));
/**讀取服務端發下來的信息
*這裏很奇怪,用async_read根本就不能進入handle_read函數
**/
// --已經解決,boost::asio::async_read(...)讀取的字節長度不能大於數據流的長度,不然就會進入
// ioservice.run()線程等待,read後面的就不執行了。
//boost::asio::async_read(socket,
//     boost::asio::buffer(getBuffer,1024),
//     boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
//    );
socket.async_read_some(boost::asio::buffer(getBuffer,1024),
boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
);
}
else
{
socket.close();
}
}
void handle_read(const boost::system::error_code& error)
{
if(!error)
{
std::cout << getBuffer << std::endl;
//boost::asio::async_read(socket,
//         boost::asio::buffer(getBuffer,1024),
//         boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
//        );
//這樣就能夠實現循環讀取了,至關於while(1)
//固然,到了這裏,作過網絡的朋友就應該至關熟悉了,一些邏輯就能夠自行擴展了
//想作聊天室的朋友能夠用多線程來實現
socket.async_read_some(
boost::asio::buffer(getBuffer,1024),
boost::bind(&client::handle_read,this,boost::asio::placeholders::error)
);
}
else
{
socket.close();
}
}
void handle_write(const boost::system::error_code& error)
{
}
private:
tcp::socket socket;
char getBuffer[1024];
};
int main(int argc,char* argv[])
{
//if(argc != 3)
//{
// std::cerr << 「Usage: chat_client <host> <port>\n」;
//    return 1;
//}
//我覺IO_SERVICE是一個基本性的接口,基本上一般用到的類實例都須要經過它來構造
//功能咱們能夠看似socket
boost::asio::io_service io_service;
//這個終端就是服務器
//它的定義就能夠看做時sockaddr_in,咱們用它來定義IP和PORT
tcp::endpoint endpoint(boost::asio::ip::address_v4::from_string("192.168.1.119"/*argv[1]*/),8100/*argv[2]*/);
//既然socket和sockaddr_in已經定義好了,那麼,就能夠CONNECT了
//之因此爲了要把鏈接和數據處理封成一個類,就是爲了方便管理數據,這點在服務端就會有明顯的感受了
boost::shared_ptr<client> client_ptr(new client(io_service,endpoint));
//執行收發數據的函數
io_service.run();
return 0;
}
修改192.168.1.119爲127.0.0.1,而後先運行server,再運行client,一切ok.

 

usidc5 2011-07-08 19:50
理論基礎
許多應用程序以某種方式和外界交互,例如文件,網絡,串口或者終端。某些狀況下例如網絡,獨立IO操做須要很長時間才能完成,這對程序開發造成了一個特殊的挑戰。


Boost.Asio庫提供管理這些長時間操做的工具,而且不須要使用基於線程的併發模型和顯式的鎖。


Asio庫致力於以下幾點:


移植性


高負載


效率


基於已知API例如BSD sockets的模型概念


易於使用


做爲進一步抽象的基礎


雖然asio主要關注網絡,它的異步概念也擴展到了其餘系統資源,例如串口,文件等等。


主要概念和功能
基本架構(略)
Proactor設計模式:無需額外線程的併發機制(略)
這種模型感受很像aio或者iocp,而select,epoll則應該相似於Reactor。


線程和Asio


線程安全


通常來講,併發使用不一樣的對象是安全的。但併發使用同一個對象是不安全的。不過io_service等類型的併發使用是安全的。


線程池


多個線程能夠同時調用io_service::run,多個線程是平等的。


內部線程


爲了某些特定功能,asio內部使用了thread以模擬異步,這些thread對用戶而言是不可見的。它們都符合以下原則:


它們不會直接調用任何用戶代碼
他們不會被任何信號中斷。
注意,以下幾種狀況違背了原則1。


ip::basic_resolver::async_resolve() 全部平臺


basic_socket::async_connect() windows平臺


涉及null_buffers()的任何操做 windows平臺


以上是容易理解的,asio自己儘量不建立thread,某些狀況下,例如connect,因爲windows 2k平臺下並不提供異步connect,因此asio只能用select模擬,這種狀況下不得不建立新線程。windows xp下提供connectex,但考慮到兼容性,asio彷佛並未使用。


asio徹底保證而後異步完成函數都僅在運行io_service::run的線程中被調用。


同時,建立而且管理運行io_service::run的線程是用戶的責任。


Strands:使用多線程且無需顯式鎖


有3種方式能夠顯式或隱式使用鎖。


只在一個線程中調用io_service::run,那麼全部異步完成函數都會在該線程中串行化調用
應用邏輯保證
直接使用strand
strand::wrap能夠建立一個包裹handler用於post或其餘異步調用。


Buffers
Asio支持多個buffer同時用於讀寫,相似於WSARecv裏面的WSABUF數組。mutable_buffer和const_buffer相似於WSABUF,MutableBufferSequence和ConstBufferSequence相似於WSABUF的容器。


Buffer自己不分配釋放內存,該數據結構很簡單。


vc8及以上的編譯器在debug編譯時缺省支持檢查越界等問題。其餘編譯器能夠用BOOST_ASIO_DISABLE_BUFFER_DEBUGGING打開這個開關。


流,不徹底讀和不徹底寫
許多io對象是基於流的,這意味着:


沒有消息邊界,數據是連續的字節。
讀或者寫操做可能僅傳送了要求的部分字節,這稱之爲不徹底讀/寫。
read_some,async_read_some,write_some,async_write_some則爲這種不徹底讀/寫。


系統API通常均爲這種不徹底讀寫。例如WSASend,WSARecv等等。


通常來講都須要讀/寫特定的字節。能夠用read,async_read,write,async_write。這些函數在未完成任務以前會持續調用不徹底函數。


EOF
read,async_read,read_until,async_read_until在遇到流結束時會產生一個錯誤。這是很合理的,例如要求讀4個字節,但僅讀了1個字節socket就關閉了。在handle_read中error_code將提示一個錯誤。


Reactor類型的操做
有些應用程序必須集成第3方的庫,這些庫但願本身執行io操做。


這種操做相似於select,考察select和aio的區別,前者是獲得完成消息,而後再執行同步讀操做,aio是預發異步讀操做,在完成消息到來時,讀操做已經完成。


null_buffer設計用來實現這類操做。


ip::tcp::socket socket(my_io_service);
...
ip::tcp::socket::non_blocking nb(true);
socket.io_control(nb);
...
socket.async_read_some(null_buffers(), read_handler);
...
void read_handler(boost::system::error_code ec)
{
  if (!ec)
  {
    std::vector<char> buf(socket.available());
    socket.read_some(buffer(buf));
  }
}
注意通常asio的用法和這明顯不一樣。以上代碼很是相似select的方式。
常規代碼是:
boost::asio::async_read(socket_,boost::asio::buffer(data,length),handle_read);
void handle_read(){…}
行操做
許多應用協議都是基於行的,例如HTTP,SMTP,FTP。爲了簡化這類操做,Asio提供read_until以及async_read_until。
例如:
class http_connection
{
  ...


  void start()
  {
    boost::asio::async_read_until(socket_, data_, "/r/n",
        boost::bind(&http_connection::handle_request_line, this, _1));
  }


  void handle_request_line(boost::system::error_code ec)
  {
    if (!ec)
    {
      std::string method, uri, version;
      char sp1, sp2, cr, lf;
      std::istream is(&data_);
      is.unsetf(std::ios_base::skipws);
      is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
      ...
    }
  }


  ...


  boost::asio::ip::tcp::socket socket_;
  boost::asio::streambuf data_;
};
read_until,async_read_until支持的判斷類型能夠是char,string以及boost::regex,它還支持自定義匹配函數。
如下例子是持續讀,一直到讀到空格爲止:
typedef boost::asio::buffers_iterator<
    boost::asio::streambuf::const_buffers_type> iterator;


std::pair<iterator, bool>
match_whitespace(iterator begin, iterator end)
{
  iterator i = begin;
  while (i != end)
    if (std::isspace(*i++))
      return std::make_pair(i, true);
  return std::make_pair(i, false);
}
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_whitespace);



如下例子是持續讀,直到讀到特定字符爲止:
class match_char
{
public:
  explicit match_char(char c) : c_(c) {}


  template <typename Iterator>
  std::pair<Iterator, bool> operator()(
      Iterator begin, Iterator end) const
  {
    Iterator i = begin;
    while (i != end)
      if (c_ == *i++)
        return std::make_pair(i, true);
    return std::make_pair(i, false);
  }


private:
  char c_;
};


namespace boost { namespace asio {
  template <> struct is_match_condition<match_char>
    : public boost::true_type {};
} } // namespace boost::asio
...
boost::asio::streambuf b;
boost::asio::read_until(s, b, match_char('a'));



自定義內存分配
Asio不少地方都須要複製拷貝handlers,缺省狀況下,使用new/delete,若是handlers提供


void* asio_handler_allocate(size_t, ...);
void asio_handler_deallocate(void*, size_t, ...);
則會調用這兩個函數來進行分配和釋放。
The implementation guarantees that the deallocation will occur before the associated handler is invoked, which means the memory is ready to be reused for any new asynchronous operations started by the handler.


若是在完成函數中再發起一個異步請求,那麼這塊內存能夠重用,也就是說,若是永遠僅有一個異步請求在未完成的狀態,那麼僅須要一塊內存就足夠用於asio的handler copy了。


The custom memory allocation functions may be called from any user-created thread that is calling a library function. The implementation guarantees that, for the asynchronous operations included the library, the implementation will not make concurrent calls to the memory allocation functions for that handler. The implementation will insert appropriate memory barriers to ensure correct memory visibility should allocation functions need to be called from different threads.


以上這段不很清楚,不明白多線程環境下,asio_handler_allocate是否要考慮同步問題。


Custom memory allocation support is currently implemented for all asynchronous operations with the following exceptions:


ip::basic_resolver::async_resolve() on all platforms.
basic_socket::async_connect() on Windows.
Any operation involving null_buffers() on Windows, other than an asynchronous read performed on a stream-oriented socket.

 

usidc5 2011-07-08 22:50
boost::asio是一個高性能的網絡開發庫,Windows下使用IOCP,Linux下使用epoll。與ACE不一樣的是,它並無提供一個網絡框架,而是採起組件的方式來提供應用接口。可是對於常見的狀況,採用一個好用的框架仍是可以簡化開發過程,特別是asio的各個異步接口的用法都至關相似。
  受到 SP Server 框架的影響,我使用asio大體實現了一個多線程的半異步半同步服務器框架,如下是利用它來實現一個Echo服務器:

1. 實現回調:

    static void onSessionStarted(RequestPtr const& request, ResponsePtr const& response) {   request->setReadMode(Session::READ_LN); // 設置爲行讀取}static void onSession(RequestPtr const& request, ResponsePtr const& response) {   print(request->message()); //打印收到的消息   response->addReply(request->message()); //回送消息   response->close();}

複製代碼
說明:close()是一個關閉請求,它並不立刻關閉Session,而是等待全部與該Session相關的異步操做所有結束後才關閉。

2. 一個單線程的Echo服務器:
    void server_main() {unsigned short port = 7;AsioService svc;AsioTcpServer tcp(svc, port);svc.callbacks().sessionStarted = &onSessionStarted;svc.callbacks().sessionHandle = &onSession;svc.run();}

複製代碼
3. 一個多線程的Echo服務器(半異步半同步:一個主線程,4個工做者線程)
    void server_main2() {unsigned short port = 7;int num_threads = 4;AsioService svc;AsioService worker(AsioService::HAS_WORK);AsioTcpServer tcp(svc, port);svc.callbacks().sessionStarted = worker.wrap(&onSessionStarted);svc.callbacks().sessionHandle = worker.wrap(&onSession);AsioThreadPool thr(worker, num_threads);svc.run();}

複製代碼
  有了這樣一個思路,實現起來就很容易了。重點是如下兩點:
  1。緩衝區的管理與內存池的使用
  2。爲了保證Session的線程安全,必需要設置一個掛起狀態。
      
     還有一個好處,就是徹底隔絕了asio的應用接口,不用再忍受asio漫長的編譯時間了。代碼就不貼在這裏了,有興趣的能夠經過email 探討。(說明,這裏只提出一個思路,再也不提供源代碼,請各位見諒)

 

usidc5 2011-07-08 22:54
2. 同步Timer
本章介紹asio如何在定時器上進行阻塞等待(blocking wait). 
實現,咱們包含必要的頭文件. 
全部的asio類能夠簡單的經過include "asio.hpp"來調用.
#include <iostream>
#include <boost/asio.hpp>
此外,這個示例用到了timer,咱們還要包含Boost.Date_Time的頭文件來控制時間.
#include <boost/date_time/posix_time/posix_time.hpp>
使用asio至少須要一個boost::asio::io_service對象.該類提供了訪問I/O的功能.咱們首先在main函數中聲明它.
int main()
{
    boost::asio::io_service io;
下一步咱們聲明boost::asio::deadline_timer對象.這個asio的核心類提供I/O的功能(這裏更確切的說是定時功能),老是把一個io_service對象做爲他的第一個構造函數,而第二個構造函數的參數設定timer會在5秒後到時(expired).
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
這個簡單的示例中咱們演示了定時器上的一個阻塞等待.就是說,調用boost::asio::deadline_timer::wait()的在建立後5秒內(注意:不是等待開始後),timer到時以前不會返回任何值. 
一個deadline_timer只有兩種狀態:到時,未到時. 
若是boost::asio::deadline_timer::wait()在到時的timer對象上調用,會當即return.
t.wait();
最後,咱們輸出理所固然的"Hello, world!"來演示timer到時了.
    std::cout << "Hello, world! ";
    return 0;
}
完整的代碼:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
int main()
{
    boost::asio::io_service io;
    boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
    t.wait();
    std::cout << "Hello, world! ";
    return 0;
}

3. 異步Timer
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
asio的異步函數會在一個異步操做完成後被回調.這裏咱們定義了一個將被回調的函數.
void print(const asio::error& /*e*/)
{
    std::cout << "Hello, world! ";
}
int main()
{
    asio::io_service io;
    asio::deadline_timer t(io, boost::posix_time::seconds(5));
這裏咱們調用asio::deadline_timer::async_wait()來異步等待
t.async_wait(print);
最後,咱們必須調用asio::io_service::run(). 
asio庫只會調用那個正在運行的asio::io_service::run()的回調函數. 
若是asio::io_service::run()不被調用,那麼回調永遠不會發生. 
asio::io_service::run()會持續工做到點,這裏就是timer到時,回調完成. 
別忘了在調用 asio::io_service::run()以前設置好io_service的任務.好比,這裏,若是咱們忘記先調用asio::deadline_timer::async_wait()則asio::io_service::run()會在瞬間return.
    io.run();
    return 0;
}
完整的代碼:
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/)
{
    std::cout << "Hello, world! ";
}
int main()
{
    asio::io_service io;
    asio::deadline_timer t(io, boost::posix_time::seconds(5));
    t.async_wait(print);
    io.run();
    return 0;
}
4. 回調函數的參數
這裏咱們將每秒回調一次,來演示如何回調函數參數的含義
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
首先,調整一下timer的持續時間,開始一個異步等待.顯示,回調函數須要訪問timer來實現週期運行,因此咱們再介紹兩個新參數
指向timer的指針
一個int*來指向計數器
void print(const asio::error& /*e*/,
    asio::deadline_timer* t, int* count)
{
咱們打算讓這個函數運行6個週期,然而你會發現這裏沒有顯式的方法來終止io_service.不過,回顧上一節,你會發現當 asio::io_service::run()會在全部任務完成時終止.這樣咱們當計算器的值達到5時(0爲第一次運行的值),再也不開啓一個新的異步等待就能夠了.
    if (*count < 5)
    {
        std::cout << *count << " ";
        ++(*count);
...
而後,咱們推遲的timer的終止時間.經過在原先的終止時間上增長延時,咱們能夠確保timer不會在處理回調函數所需時間內的到期. 
(原文:By calculating the new expiry time relative to the old, we can ensure that the timer does not drift away from the whole-second mark due to any delays in processing the handler.)
t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
而後咱們開始一個新的同步等待.如您所見,咱們用把print和他的多個參數用boost::bind函數合成一個的形爲void(const asio::error&)回調函數(準確的說是function object). 
在這個例子中, boost::bind的asio::placeholders::error參數是爲了給回調函數傳入一個error對象.當進行一個異步操做,開始 boost::bind時,你須要使用它來匹配回調函數的參數表.下一節中你會學到回調函數不須要error參數時能夠省略它.
     t->async_wait(boost::bind(print,
        asio::placeholders::error, t, count));
    }
}
int main()
{
    asio::io_service io;
    int count = 0;
    asio::deadline_timer t(io, boost::posix_time::seconds(1));
和上面同樣,咱們再一次使用了綁定asio::deadline_timer::async_wait()
t.async_wait(boost::bind(print,
    asio::placeholders::error, &t, &count));
io.run();
在結尾,咱們打印出的最後一次沒有設置timer的調用的count的值
    std::cout << "Final count is " << count << " ";
    return 0;
}
完整的代碼:
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/,
  bsp;     asio::deadline_timer* t, int* count)
{
    if (*count < 5)
    {
        std::cout << *count << " ";
        ++(*count);
        t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
        t->async_wait(boost::bind(print,
                    asio::placeholders::error, t, count));
    }
}
int main()
{
    asio::io_service io;
    int count = 0;
    asio::deadline_timer t(io, boost::posix_time::seconds(1));
    t.async_wait(boost::bind(print,
                asio::placeholders::error, &t, &count));
    io.run();
    std::cout << "Final count is " << count << " ";
    return 0;
}

5. 成員函數做爲回調函數
本例的運行結果和上一節相似
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
咱們先定義一個printer類
class printer
{
public:
//構造函數有一個io_service參數,而且在初始化timer_時用到了它.用來計數的count_這裏一樣做爲了成員變量
    printer(boost::asio::io_service& io)
        : timer_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
boost::bind 一樣能夠出色的工做在成員函數上.衆所周知,全部的非靜態成員函數都有一個隱式的this參數,咱們須要把this做爲參數bind到成員函數上.和上一節相似,咱們再次用bind構造出void(const boost::asio::error&)形式的函數. 
注意,這裏沒有指定boost::asio::placeholders::error佔位符,由於這個print成員函數沒有接受一個error對象做爲參數.
timer_.async_wait(boost::bind(&printer::print, this));

在類的折構函數中咱們輸出最後一次回調的count的值
~printer()
{
    std::cout << "Final count is " << count_ << " ";
}


print函數於上一節的十分相似,可是用成員變量取代了參數.
    void print()
    {
        if (count_ < 5)
        {
            std::cout << count_ << " ";
            ++count_;
            timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
            timer_.async_wait(boost::bind(&printer::print, this));
        }
    }
private:
    boost::asio::deadline_timer timer_;
    int count_;
};

如今main函數清爽多了,在運行io_service以前只須要簡單的定義一個printer對象.
int main()
{
    boost::asio::io_service io;
    printer p(io);
    io.run();
    return 0;
}
完整的代碼:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
    public:
        printer(boost::asio::io_service& io)
            : timer_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
        timer_.async_wait(boost::bind(&printer::print, this));
    }
        ~printer()
        {
            std::cout << "Final count is " << count_ << " ";
        }
        void print()
        {
            if (count_ < 5)
            {
                std::cout << count_ << " ";
                ++count_;
                timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
                timer_.async_wait(boost::bind(&printer::print, this));
            }
        }
    private:
        boost::asio::deadline_timer timer_;
        int count_;
};
int main()
{
    boost::asio::io_service io;
    printer p(io);
    io.run();
    return 0;
}


6. 多線程回調同步
本節演示了使用boost::asio::strand在多線程程序中進行回調同步(synchronise). 
先前的幾節闡明瞭如何在單線程程序中用boost::asio::io_service::run()進行同步.如您所見,asio庫確保 僅噹噹前線程調用boost::asio::io_service::run()時產生回調.顯然,僅在一個線程中調用 boost::asio::io_service::run() 來確保回調是適用於併發編程的. 
一個基於asio的程序最好是從單線程入手,可是單線程有以下的限制,這一點在服務器上尤爲明顯:
當回調耗時較長時,反應遲鈍.
在多核的系統上無能爲力
若是你發覺你陷入了這種困擾,能夠替代的方法是創建一個boost::asio::io_service::run()的線程池.然而這樣就容許回調函數併發執行.因此,當回調函數須要訪問一個共享,線程不安全的資源時,咱們須要一種方式來同步操做.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
在上一節的基礎上咱們定義一個printer類,這次,它將並行運行兩個timer
class printer
{
public:
除了聲明瞭一對boost::asio::deadline_timer,構造函數也初始化了類型爲boost::asio::strand的strand_成員. 
boost::asio::strand 能夠分配的回調函數.它保證不管有多少線程調用了boost::asio::io_service::run(),下一個回調函數僅在前一個回調函數完成後開始,固然回調函數仍然能夠和那些不使用boost::asio::strand分配,或是使用另外一個boost::asio::strand分配的回調函數一塊兒併發執行.
printer(boost::asio::io_service& io)
    : strand_(io),
    timer1_(io, boost::posix_time::seconds(1)),
    timer2_(io, boost::posix_time::seconds(1)),
    count_(0)
{
當一個異步操做開始時,用boost::asio::strand來 "wrapped(包裝)"回調函數.boost::asio::strand::wrap()會返回一個由boost::asio::strand分配的新的handler(句柄),這樣,咱們能夠確保它們不會同時運行.
    timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
    timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
~printer()
{
    std::cout << "Final count is " << count_ << " ";
}


多線程程序中,回調函數在訪問共享資源前須要同步.這裏共享資源是std::cout 和count_變量. 
    void print1()
    {
        if (count_ < 10)
        {
            std::cout << "Timer 1: " << count_ << " ";
            ++count_;
            timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
            timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
        }
    }
    void print2()
    {
        if (count_ < 10)
        {
            std::cout << "Timer 2: " << count_ << " ";
            ++count_;
            timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
            timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
        }
    }
private:
    boost::asio::strand strand_;
    boost::asio::deadline_timer timer1_;
    boost::asio::deadline_timer timer2_;
    int count_;
};
main函數中boost::asio::io_service::run()在兩個線程中被調用:主線程、一個boost::thread線程. 
正如單線程中那樣,併發的boost::asio::io_service::run()會一直運行直到完成任務.後臺的線程將在全部異步線程完成後終結. 
int main()
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
    io.run();
    t.join();
    return 0;
}
完整的代碼:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
public:
        printer(boost::asio::io_service& io)
            : strand_(io),
            timer1_(io, boost::posix_time::seconds(1)),
            timer2_(io, boost::posix_time::seconds(1)),
            count_(0)
    {
        timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
        timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
    }
        ~printer()
        {
            std::cout << "Final count is " << count_ << " ";
        }
        void print1()
        {
            if (count_ < 10)
            {
                std::cout << "Timer 1: " << count_ << " ";
                ++count_;
                timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
                timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
            }
        }
        void print2()
        {
            if (count_ < 10)
            {
                std::cout << "Timer 2: " << count_ << " ";
                ++count_;
                timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
                timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
            }
        }
private:
        boost::asio::strand strand_;
        boost::asio::deadline_timer timer1_;
        boost::asio::deadline_timer timer2_;
        int count_;
};
int main()
{
    boost::asio::io_service io;
    printer p(io);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
    io.run();
    t.join();
    return 0;
}



7. TCP客戶端:對準時間
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
本程序的目的是訪問一個時間同步服務器,咱們須要用戶指定一個服務器(如time-nw.nist.gov),用IP亦可. 
(譯者注:日期查詢協議,這種時間傳輸協議不指定固定的傳輸格式,只要求按照ASCII標準發送數據。)
using boost::asio::ip::tcp;
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 2)
        {
            std::cerr << "Usage: client <host>" << std::endl;
            return 1;
            }
用asio進行網絡鏈接至少須要一個boost::asio::io_service對象
boost::asio::io_service io_service;


咱們須要把在命令行參數中指定的服務器轉換爲TCP上的節點.完成這項工做須要boost::asio::ip::tcp::resolver對象
tcp::resolver resolver(io_service);


一個resolver對象查詢一個參數,並將其轉換爲TCP上節點的列表.這裏咱們把argv[1]中的sever的名字和要查詢字串daytime關聯.
tcp::resolver::query query(argv[1], "daytime");


節點列表能夠用 boost::asio::ip::tcp::resolver::iterator 來進行迭代.iterator默認的構造函數生成一個end iterator.
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
如今咱們創建一個鏈接的sockert,因爲得到節點既有IPv4也有IPv6的.因此,咱們須要依次嘗試他們直到找到一個能夠正常工做的.這步使得咱們的程序獨立於IP版本
tcp::socket socket(io_service);
boost::asio::error error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
    socket.close();
    socket.connect(*endpoint_iterator++, boost::asio::assign_error(error));
}
if (error)
    throw error;
鏈接完成,咱們須要作的是讀取daytime服務器的響應. 
咱們用boost::array來保存獲得的數據,boost::asio::buffer()會自動根據array的大小暫停工做,來防止緩衝溢出.除了使用boost::array,也能夠使用char [] 或std::vector.
for (;;)
{
    boost::array<char, 128> buf;
    boost::asio::error error;
    size_t len = socket.read_some(
        boost::asio::buffer(buf), boost::asio::assign_error(error));
當服務器關閉鏈接時,boost::asio::ip::tcp::socket::read_some()會用boost::asio::error::eof標誌完成, 這時咱們應該退出讀取循環了.
if (error == boost::asio::error::eof)
    break; // Connection closed cleanly by peer.
else if (error)
    throw error; // Some other error.
std::cout.write(buf.data(), len);

若是發生了什麼異常咱們一樣會拋出它
}
catch (std::exception& e)
{
    std::cerr << e.what() << std::endl;
}


運行示例:在windowsXP的cmd窗口下 
輸入:upload.exe time-a.nist.gov
輸出:54031 06-10-23 01:50:45 07 0 0 454.2 UTC(NIST) *
完整的代碼:
#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>
using asio::ip::tcp;
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 2)
        {
            std::cerr << "Usage: client <host>" << std::endl;
            return 1;
        }
        asio::io_service io_service;
        tcp::resolver resolver(io_service);
        tcp::resolver::query query(argv[1], "daytime");
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        tcp::resolver::iterator end;
        tcp::socket socket(io_service);
        asio::error error = asio::error::host_not_found;
        while (error && endpoint_iterator != end)
        {
            socket.close();
            socket.connect(*endpoint_iterator++, asio::assign_error(error));
        }
        if (error)
            throw error;
        for (;;)
        {
            boost::array<char, 128> buf;
            asio::error error;
            size_t len = socket.read_some(
                    asio::buffer(buf), asio::assign_error(error));
            if (error == asio::error::eof)
                break; // Connection closed cleanly by peer.
            else if (error)
                throw error; // Some other error.
            std::cout.write(buf.data(), len);
        }
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

8. TCP同步時間服務器
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
咱們先定義一個函數返回當前的時間的string形式.這個函數會在咱們全部的時間服務器示例上被使用.
std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}
int main()
{
    try
    {
        asio::io_service io_service;
新建一個asio::ip::tcp::acceptor對象來監聽新的鏈接.咱們監聽TCP端口13,IP版本爲V4
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));


這是一個iterative server,也就是說同一時間只能處理一個鏈接.創建一個socket來表示一個和客戶端的鏈接, 而後等待客戶端的鏈接.
for (;;)
{
    tcp::socket socket(io_service);
    acceptor.accept(socket);
當客戶端訪問服務器時,咱們獲取當前時間,而後返回它.
        std::string message = make_daytime_string();
        asio::write(socket, asio::buffer(message),
            asio::transfer_all(), asio::ignore_error());
    }
}
最後處理異常
catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;

運行示例:運行服務器,而後運行上一節的客戶端,在windowsXP的cmd窗口下 
輸入:client.exe 127.0.0.1 
輸出:Mon Oct 23 09:44:48 2006
完整的代碼:
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}
int main()
{
    try
    {
        asio::io_service io_service;
        tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));
        for (;;)
        {
            tcp::socket socket(io_service);
            acceptor.accept(socket);
            std::string message = make_daytime_string();
            asio::write(socket, asio::buffer(message),
                    asio::transfer_all(), asio::ignore_error());
        }
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

 

usidc5 2011-07-08 22:55


構造函數


構造函數的主要動做就是調用CreateIoCompletionPort建立了一個初始iocp。


Dispatch和post的區別


Post必定是PostQueuedCompletionStatus而且在GetQueuedCompletionStatus 以後執行。


Dispatch會首先檢查當前thread是否是io_service.run/runonce/poll/poll_once線程,若是是,則直接運行。


poll和run的區別


二者代碼幾乎同樣,都是首先檢查是否有outstanding的消息,若是沒有直接返回,不然調用do_one()。惟一的不一樣是在調用size_t do_one(bool block, boost::system::error_code& ec)時前者block = false,後者block = true。


該參數的做用體如今:


BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,


&completion_key, &overlapped, block ? timeout : 0);


所以能夠看出,poll處理的是已經完成了的消息,也即GetQueuedCompletionStatus馬上能返回的。而run則會致使等待。


poll 的做用是依次處理當前已經完成了的消息,直到全部已經完成的消息處理完成爲止。若是沒有已經完成了得消息,函數將退出。poll不會等待。這個函數有點相似於PeekMessage。鑑於PeekMessage不多用到,poll的使用場景我也有點疑惑。poll的一個應用場景是若是但願handler的處理有優先級,也即,若是消息完成速度很快,同時可能完成多個消息,而消息的處理過程可能比較耗時,那麼能夠在完成以後的消息處理函數中不真正處理數據,而是把handler保存在隊列中,而後按優先級統一處理。代碼以下:


while (io_service.run_one()) { 
    // The custom invocation hook adds the handlers to the priority queue 
    // rather than executing them from within the poll_one() call. 
    while (io_service.poll_one())      ;
    pri_queue.execute_all(); }


循環執行poll_one讓已經完成的消息的wrap_handler處理完畢,也即插入一個隊列中,而後再統一處理之。這裏的wrap_handler是一個class,在post的時候,用以下代碼:


io_service.post(pri_queue.wrap(0, low_priority_handler));或者 acceptor.async_accept(server_socket, pri_queue.wrap(100, high_priority_handler));


template <typename Handler> wrapped_handler<Handler> handler_priority_queue::wrap(int priority, Handler handler) 
{    return wrapped_handler<Handler>(*this, priority, handler); }


參見boost_asio/example/invocation/prioritised_handlers.cpp


這個sample也同時表現了wrap的使用場景。


也即把handler以及參數都wrap成一個object,而後把object插入一個隊列,在pri_queue.execute_all中按優先級統一處理。


run的做用是處理消息,若是有消息未完成將一直等待到全部消息完成並處理以後才退出。


reset和stop


文檔中reset的解釋是重置io_service以便下一次調用。


當 run,run_one,poll,poll_one是被stop掉致使退出,或者因爲完成了全部任務(正常退出)致使退出時,在調用下一次 run,run_one,poll,poll_one以前,必須調用此函數。reset不能在run,run_one,poll,poll_one正在運行時調用。若是是消息處理handler(用戶代碼)拋出異常,則能夠在處理以後直接繼續調用 io.run,run_one,poll,poll_one。 例如:


boost::asio::io_service io_service;  
...  
for (;;){  
  try 
  {  
    io_service.run();  
    break; // run() exited normally  
  }  
  catch (my_exception& e)  
  {  
    // Deal with exception as appropriate.  
  }  

在拋出了異常的狀況下,stopped_還沒來得及被asio設置爲1,因此無需調用reset。
reset函數的代碼僅有一行:


void reset()  
{  
::InterlockedExchange(&stopped_, 0);  

也即,當io.stop時,會設置stopped_=1。當完成全部任務時,也會設置。


總的來講,單線程狀況下,無論io.run是如何退出的,在下一次調用io.run以前調用一次reset沒有什麼壞處。例如:


for(;;)  
{  
try 
{  
io.run();  
}  
catch(…)  
{  
}  
io.reset();  
}  

若是是多線程在運行io.run,則應該當心,由於reset必須是全部的run,run_one,poll,poll_one退出後才能調用。


文檔中的stop的解釋是中止io_service的處理循環。


此函數不是阻塞函數,也即,它僅僅只是給iocp發送一個退出消息而並非等待其真正退出。由於poll和poll_one原本就不等待(GetQueuedCompletionStatus時timeout = 0),因此此函數對poll和poll_one無心義。對於run_one來講,若是該事件還未完成,則run_one會馬上返回。若是該事件已經完成,而且還在處理中,則stop並沒有特殊意義(會等待handler完成後天然退出)。對於run來講,stop的調用會致使run中的 GetQueuedCompletionStatus馬上返回。而且因爲設置了stopped = 1,此前完成的消息的handlers也不會被調用。考慮一下這種狀況:在io.stop以前,有1k個消息已經完成但還沒有處理,io.run正在依次從 GetQueuedCompletionStatus中得到信息而且調用handlers,調用io.stop設置stopped=1將致使後許 GetQueuedCompletionStatus返回的消息直接被丟棄,直到收到退出消息並退出io.run爲止。


void stop()  
{  
if (::InterlockedExchange(&stopped_, 1) == 0)  
{  
if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))  
{  
DWORD last_error = ::GetLastError();  
boost::system::system_error e(  
boost::system::error_code(last_error,  
boost::asio::error::get_system_category()),  
"pqcs");  
boost::throw_exception(e);  
}  
}  

注意除了讓當前代碼退出以外還有一個反作用就是設置了stopped_=1。這個反作用致使在stop以後若是不調用reset,全部run,run_one,poll,poll_one都將直接退出。


另外一個須要注意的是,stop會致使全部未完成的消息以及完成了但還沒有處理得消息都直接被丟棄,不會致使handlers倍調用。


注意這兩個函數都不會CloseHandle(iocp.handle_),那是析構函數乾的事情。


注意此處有個細節:一次PostQueuedCompletionStatus僅致使一次 GetQueuedCompletionStatus返回,那麼若是有多個thread此時都在io.run,而且block在 GetQueuedCompletionStatus時,調用io.stop將PostQueuedCompletionStatus而且致使一個 thread的GetQueuedCompletionStatus返回。那麼其餘的thread呢?進入io_service的do_one(由run 函數調用)代碼能夠看到,當GetQueuedCompletionStatus返回而且發現是退出消息時,會再發送一次 PostQueuedCompletionStatus。代碼以下:


else 
{  
    // Relinquish responsibility for dispatching timers. If the io_service  
    // is not being stopped then the thread will get an opportunity to  
    // reacquire timer responsibility on the next loop iteration.  
    if (dispatching_timers)  
    {  
      ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);  
    }  
    // The stopped_ flag is always checked to ensure that any leftover  
    // interrupts from a previous run invocation are ignored.  

    if (::InterlockedExchangeAdd(&stopped_, 0) != 0)  
    {  
      // Wake up next thread that is blocked on GetQueuedCompletionStatus.  
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))  
      {  
        last_error = ::GetLastError();  
        ec = boost::system::error_code(last_error,  
            boost::asio::error::get_system_category());  
        return 0;  
      }  
      ec = boost::system::error_code();  
      return 0;  
    }  
}  

Wrap


這個函數是一個語法糖。


Void func(int a);


io_service.wrap(func)(a);


至關於io_service.dispatch(bind(func,a));


能夠保存io_service.wrap(func)到g,以便在稍後某些時候調用g(a);


例如:


socket_.async_read_some(boost::asio::buffer(buffer_),      strand_.wrap( 
        boost::bind(&connection::handle_read, shared_from_this(), 
          boost::asio::placeholders::error, 
          boost::asio::placeholders::bytes_transferred)));


這是一個典型的wrap用法。注意async_read_some要求的參數是一個handler,在read_some結束後被調用。因爲但願真正被調用的handle_read是串行化的,在這裏再post一個消息給io_service。以上代碼相似於:


void A::func(error,bytes_transferred)  
{  
strand_.dispatch(boost::bind(handle_read,shared_from_this(),error,bytes_transferred);  
}  
socket_.async_read_some(boost::asio::buffer(buffer_), func); 
注意1點:


io_service.dispatch(bind(func,a1,…an)),這裏面都是傳值,沒法指定bind(func,ref(a1)…an)); 因此若是要用ref語義,則應該在傳入wrap時顯式指出。例如:


void func(int& i){i+=1;}  
void main()  
{  
int i = 0;  
boost::asio::io_service io;  
io.wrap(func)(boost::ref(i));  
io.run();  
printf("i=%d/n");  

固然在某些場合下,傳遞shared_ptr也是能夠的(也許更好)。


從handlers拋出的異常的影響


當handlers拋出異常時,該異常會傳遞到本線程最外層的io.run,run_one,poll,poll_one,不會影響其餘線程。捕獲該異常是程序員本身的責任。


例如:





boost::asio::io_service io_service;  

Thread1,2,3,4()  
{  
for (;;)  
{  
try 
{  
io_service.run();  
break; // run() exited normally  
}  
catch (my_exception& e)  
{  
// Deal with exception as appropriate.  
}  
}  
}  

Void func(void)  
{  
throw 1;  
}  

Thread5()  
{  
io_service.post(func);  

注意這種狀況下無需調用io_service.reset()。


這種狀況下也不能調用reset,由於調用reset以前必須讓全部其餘線程正在調用的io_service.run退出。(reset調用時不能有任何run,run_one,poll,poll_one正在運行)


Work


有些應用程序但願在沒有pending的消息時,io.run也不退出。好比io.run運行於一個後臺線程,該線程在程序的異步請求發出以前就啓動了。


能夠經過以下代碼實現這種需求:


main()  
{  
boost::asio::io_service io_service;  
boost::asio::io_service::work work(io_service);  
Create thread 
Getchar();  
}  

Thread()  
{  
Io_service.run();  

這種狀況下,若是work不被析構,該線程永遠不會退出。在work不被析構得狀況下就讓其退出,能夠調用io.stop。這將致使 io.run馬上退出,全部未完成的消息都將丟棄。已完成的消息(但還沒有進入handler的)也不會調用其handler函數(因爲在stop中設置了 stopped_= 1)。


若是但願全部發出的異步消息都正常處理以後io.run正常退出,work對象必須析構,或者顯式的刪除。


boost::asio::io_service io_service;  
auto_ptr<boost::asio::io_service::work> work(  
new boost::asio::io_service::work(io_service));  

...  

work.reset(); // Allow run() to normal exit. 
work是一個很小的輔助類,只支持構造函數和析構函數。(還有一個get_io_service返回所關聯的io_service)


代碼以下:


inline io_service::work::work(boost::asio::io_service& io_service)  
: io_service_(io_service)  
{  
io_service_.impl_.work_started();  
}  

inline io_service::work::work(const work& other)  
: io_service_(other.io_service_)  
{  
io_service_.impl_.work_started();  
}  

inline io_service::work::~work()  
{  
io_service_.impl_.work_finished();  
}  

void work_started()  
{  
::InterlockedIncrement(&outstanding_work_);  
}  

// Notify that some work has finished.  
void work_finished()  
{  
if (::InterlockedDecrement(&outstanding_work_) == 0)  
stop();  
}  
能夠看出構造一個work時,outstanding_work_+1,使得io.run在完成全部異步消息後判斷outstanding_work_時不會爲0,所以會繼續調用GetQueuedCompletionStatus並阻塞在這個函數上。


而析構函數中將其-1,並判斷其是否爲0,若是是,則post退出消息給GetQueuedCompletionStatus讓其退出。


所以work若是析構,則io.run會在處理完全部消息以後正常退出。work若是不析構,則io.run會一直運行不退出。若是用戶直接調用io.stop,則會讓io.run馬上退出。


特別注意的是,work提供了一個拷貝構造函數,所以能夠直接在任意地方使用。對於一個io_service來講,有多少個work實例關聯,則outstanding_work_就+1了多少次,只有關聯到同一個io_service的work全被析構以後,io.run纔會在全部消息處理結束以後正常退出。


strand


strand是另外一個輔助類,提供2個接口dispatch和post,語義和io_service的dispatch和post相似。區別在於,同一個strand所發出的dispatch和post絕對不會並行執行,dispatch和post所包含的handlers也不會並行。所以若是但願串行處理每個tcp鏈接,則在accept以後應該在該鏈接的數據結構中構造一個strand,而且全部dispatch/post(recv /send)操做都由該strand發出。strand的做用巨大,考慮以下場景:有多個thread都在執行async_read_some,那麼因爲線程調度,頗有可能後接收到的包先被處理,爲了不這種狀況,就只能收完數據後放入一個隊列中,而後由另外一個線程去統一處理。


void connection::start()   
{   
socket_.async_read_some(boost::asio::buffer(buffer_),   
strand_.wrap(   
boost::bind(&connection::handle_read, shared_from_this(),   
boost::asio::placeholders::error,   
boost::asio::placeholders::bytes_transferred)));   

不使用strand的處理方式:


前端tcp iocp收包,而且把同一個tcp鏈接的包放入一個list,若是list之前爲空,則post一個消息給後端vnn iocp。後端vnn iocp收到post的消息後循環從list中獲取數據,而且處理,直到list爲空爲止。處理結束後從新調用 GetQueuedCompletionStatus進入等待。若是前端tcp iocp發現list過大,意味着處理速度小於接收速度,則再也不調用iocpRecv,而且設置標誌,當vnn iocp thread處理完了當前全部積壓的數據包後,檢查這個標誌,從新調用一次iocpRecv。


使用strand的處理方式:


前端tcp iocp收包,收到包後直接經過strand.post(on_recved)發給後端vnn iocp。後端vnn iocp處理完以後再調用一次strand.async_read_some。


這兩種方式我沒看出太大區別來。若是對數據包的處理的確須要阻塞操做,例如db query,那麼使用後端iocp以及後端thread是值得考慮的。這種狀況下,前端iocp因爲僅用來異步收發數據,所以1個thread就夠了。在肯定使用2級iocp的狀況下,前者彷佛更爲靈活,也沒有增長什麼開銷。


值得討論的是,若是後端多個thread都處於db query狀態,那麼實際上此時依然沒有thread能夠提供數據處理服務,所以2級iocp意義其實就在於在這種狀況下,前端tcp iocp依然能夠accept,以及recv第一次數據,不會致使用戶connect不上的狀況。在後端thread空閒以後會處理這期間的recv到的數據並在此async_read_some。


若是是單級iocp(假定handlers沒有阻塞操做),多線程,那麼strand的做用很明顯。這種狀況下,很明顯應該讓一個tcp鏈接的數據處理過程串行化。


Strand的實現原理


Strand內部實現機制稍微有點複雜。每次發出strand請求(例如 async_read(strand_.wrap(funobj1))),strand再次包裹了一次成爲funobj2。在async_read完成時,系統調用funobj2,檢查是否正在執行該strand所發出的完成函數(檢查該strand的一個標誌位),若是沒有,則直接調用 funobj2。若是有,則檢查是否就是當前thread在執行,若是是,則直接調用funobj2(這種狀況可能發生在嵌套調用的時候,但並不產生同步問題,就像同一個thread能夠屢次進入同一個critical_session同樣)。若是不是,則把該funobj2插入到strand內部維護的一個隊列中。

 

usidc5 2011-07-13 18:18


最近在設計一個多線程分塊支持續傳的http的異步客戶端時, 測試部門常常發現http下載模
塊退出時偶爾會卡住, 在win7系統上由爲明顯. 反覆檢查代碼, 並未明顯問題, 因而專門寫
了一個反覆退出的單元測試, 當即發現問題, 並定位在io_service的析構函數中, 奇怪的是, 
個人投遞io的全部socket都早已經關閉, run線程也已經退出, 按理說, 這時的io_service的
outstanding_work_應該爲0纔是, 可我一看它倒是1, 因而始終在win_iocp_io_service.hpp的
shutdown_service裏一直循環調用GetQueuedCompletionStatus, 從而致使沒法正常退出...
很明顯, 這是asio對outstanding_work_計數維護的有問題, 爲了解決
問題, 因而我很快想到不使用iocp, 添加宏BOOST_ASIO_DISABLE_IOCP一切就正常了...
因爲本身使用的是boost.1.45版本, 因而換了個boost.1.46.1再試試, 結果同樣. 難道這麼嚴
重的bug跨在了這兩個很是重要的發行版本上而沒人知道?
在官方的郵件列表中細節檢查, 終於看到了某人的bug報告和我描述的狀況差很少, 並且
在那我的報告了bug的次日, asio做者就發佈了補丁, 但這個補丁並未更新到boost.1.45
和boost.1.46中, 唉, 這兩個版本但是大版本啊, 估計受害人很多...
不過幸運的是, 我在boost的主分枝中看到了修正的代碼.
下面是這個補丁內容:


From 81a6a51c0cb66de6bc77e1fa5dcd46b2794995e4 Mon Sep 17 00:00:00 2001
From: Christopher Kohlhoff <chris@kohlhoff.com>
Date: Wed, 23 Mar 2011 15:03:56 +1100
Subject: [PATCH] On Windows, ensure the count of outstanding work is decremented for
abandoned operations (i.e. operations that are being cleaned up within
the io_service destructor).


---
asio/include/asio/detail/impl/dev_poll_reactor.ipp |    2 ++
asio/include/asio/detail/impl/epoll_reactor.ipp    |    2 ++
asio/include/asio/detail/impl/kqueue_reactor.ipp   |    2 ++
asio/include/asio/detail/impl/select_reactor.ipp   |    2 ++
.../asio/detail/impl/signal_set_service.ipp        |    2 ++
asio/include/asio/detail/impl/task_io_service.ipp  |    7 +++++++
.../asio/detail/impl/win_iocp_io_service.ipp       |   11 +++++++++++
asio/include/asio/detail/task_io_service.hpp       |    4 ++++
asio/include/asio/detail/win_iocp_io_service.hpp   |    4 ++++
9 files changed, 36 insertions(+), 0 deletions(-)


diff --git a/asio/include/asio/detail/impl/dev_poll_reactor.ipp b/asio/include/asio/detail/impl/dev_poll_reactor.ipp
index 06d89ea..2a01993 100644
--- a/asio/include/asio/detail/impl/dev_poll_reactor.ipp
+++ b/asio/include/asio/detail/impl/dev_poll_reactor.ipp
@@ -63,6 +63,8 @@ void dev_poll_reactor::shutdown_service()
     op_queue_.get_all_operations(ops);

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);


// Helper class to re-register all descriptors with /dev/poll.
diff --git a/asio/include/asio/detail/impl/epoll_reactor.ipp b/asio/include/asio/detail/impl/epoll_reactor.ipp
index 22f567a..d08dedb 100644
--- a/asio/include/asio/detail/impl/epoll_reactor.ipp
+++ b/asio/include/asio/detail/impl/epoll_reactor.ipp
@@ -84,6 +84,8 @@ void epoll_reactor::shutdown_service()
   }

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void epoll_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/kqueue_reactor.ipp b/asio/include/asio/detail/impl/kqueue_reactor.ipp
index f0cdf73..45aff60 100644
--- a/asio/include/asio/detail/impl/kqueue_reactor.ipp
+++ b/asio/include/asio/detail/impl/kqueue_reactor.ipp
@@ -74,6 +74,8 @@ void kqueue_reactor::shutdown_service()
   }

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void kqueue_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/select_reactor.ipp b/asio/include/asio/detail/impl/select_reactor.ipp
index f4e0314..00fd9fc 100644
--- a/asio/include/asio/detail/impl/select_reactor.ipp
+++ b/asio/include/asio/detail/impl/select_reactor.ipp
@@ -81,6 +81,8 @@ void select_reactor::shutdown_service()
     op_queue_.get_all_operations(ops);

   timer_queues_.get_all_timers(ops);
+
+  io_service_.abandon_operations(ops);
}

void select_reactor::fork_service(asio::io_service::fork_event fork_ev)
diff --git a/asio/include/asio/detail/impl/signal_set_service.ipp b/asio/include/asio/detail/impl/signal_set_service.ipp
index f0f0e78..4cde184 100644
--- a/asio/include/asio/detail/impl/signal_set_service.ipp
+++ b/asio/include/asio/detail/impl/signal_set_service.ipp
@@ -145,6 +145,8 @@ void signal_set_service::shutdown_service()
       reg = reg->next_in_table_;
     }
   }
+
+  io_service_.abandon_operations(ops);
}

void signal_set_service::fork_service(
diff --git a/asio/include/asio/detail/impl/task_io_service.ipp b/asio/include/asio/detail/impl/task_io_service.ipp
index cb585d5..0a2c6fa 100644
--- a/asio/include/asio/detail/impl/task_io_service.ipp
+++ b/asio/include/asio/detail/impl/task_io_service.ipp
@@ -230,6 +230,13 @@ void task_io_service::post_deferred_completions(
   }
}

+void task_io_service::abandon_operations(
+    op_queue<task_io_service::operation>& ops)
+{
+  op_queue<task_io_service::operation> ops2;
+  ops2.push(ops);
+}
+
std::size_t task_io_service::do_one(mutex::scoped_lock& lock,
     task_io_service::idle_thread_info* this_idle_thread)
{
diff --git a/asio/include/asio/detail/impl/win_iocp_io_service.ipp b/asio/include/asio/detail/impl/win_iocp_io_service.ipp
index ca3125e..7aaa6b8 100644
--- a/asio/include/asio/detail/impl/win_iocp_io_service.ipp
+++ b/asio/include/asio/detail/impl/win_iocp_io_service.ipp
@@ -262,6 +262,17 @@ void win_iocp_io_service::post_deferred_completions(
   }
}

+void win_iocp_io_service::abandon_operations(
+    op_queue<win_iocp_operation>& ops)
+{
+  while (win_iocp_operation* op = ops.front())
+  {
+    ops.pop();
+    ::InterlockedDecrement(&outstanding_work_);
+    op->destroy();
+  }
+}
+
void win_iocp_io_service::on_pending(win_iocp_operation* op)
{
   if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
diff --git a/asio/include/asio/detail/task_io_service.hpp b/asio/include/asio/detail/task_io_service.hpp
index 285d83e..654f83c 100644
--- a/asio/include/asio/detail/task_io_service.hpp
+++ b/asio/include/asio/detail/task_io_service.hpp
@@ -105,6 +105,10 @@ public:
   // that work_started() was previously called for each operation.
   ASIO_DECL void post_deferred_completions(op_queue<operation>& ops);

+  // Process unfinished operations as part of a shutdown_service operation.
+  // Assumes that work_started() was previously called for the operations.
+  ASIO_DECL void abandon_operations(op_queue<operation>& ops);
+
private:
   // Structure containing information about an idle thread.
   struct idle_thread_info;
diff --git a/asio/include/asio/detail/win_iocp_io_service.hpp b/asio/include/asio/detail/win_iocp_io_service.hpp
index a562834..b5d7f0b 100644
--- a/asio/include/asio/detail/win_iocp_io_service.hpp
+++ b/asio/include/asio/detail/win_iocp_io_service.hpp
@@ -126,6 +126,10 @@ public:
   ASIO_DECL void post_deferred_completions(
       op_queue<win_iocp_operation>& ops);

+  // Enqueue unfinished operation as part of a shutdown_service operation.
+  // Assumes that work_started() was previously called for the operations.
+  ASIO_DECL void abandon_operations(op_queue<operation>& ops);
+
   // Called after starting an overlapped I/O operation that did not complete
   // immediately. The caller must have already called work_started() prior to
   // starting the operation.
-- 
1.7.0.1


注: 在boost.asio中, 使用這個補丁時, 須要將ASIO_DECL 改爲 BOOST_ ASIO_DECL 


這是我第二次在使用asio的過程當中, 發現的比較嚴重的bug了, 不過幸運的是, 每一次都能在官方的論壇 
或郵件列表中找到解決方案. 


結論, 再牛的人寫的代碼也會有bug, 我的很是崇拜asio的做者. 

 

usidc5 2011-09-30 22:59
在win32平臺上,asio是基於IOCP技術實現的,我之前也用過IOCP,卻沒想到竟然能擴展成這樣,真是神奇!在其餘平臺下還會有別的方法去實現,具體見io_service類下面這部分的源碼:
  // The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
   typedef detail::win_iocp_io_service impl_type;
   friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
   typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
   typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
   typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
   typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif
這部分代碼其實就在boost::asio::io_service類聲明中的最前面幾行,能夠看見在不一樣平臺下,io_service類的實現將會不一樣。很顯然,windows平臺下固然是win_iocp_io_service類爲實現了(不過我一開始還覺得win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎麼有移植性呢?官方文檔也對該類隻字不提,其實我卡殼就是卡在這裏了,差點就直接用這個類了^_^!)。

那麼就分析一下win_iocp_io_service的代碼吧,這裏徹底是用IOCP來路由各類任務,你們使用post來委託任務,內部調用的實際上是IOCP的PostQueuedCompletionStatus函數,而後線程們用run來接受任務,內部實際上是阻塞在IOCP的GetQueuedCompletionStatus函數上,一旦有了任務就當即返回,執行完後再一個循環,繼續阻塞在這裏等待下一個任務的到來,這種設計思想堪稱神奇,對線程、服務以及任務徹底解耦,靈活度達到了如此高度,不愧爲boost庫的東西!我只能有拜的份了...

說一下整體的設計思想,其實io_service就像是勞工中介所,而一個線程就是一個勞工,而調用post的模塊至關於富人們,他們去中介所委託任務,而勞工們就聽候中介所的調遣去執行這些任務,任務的內容就寫在富人們給你的handler上,也就是函數指針,指針指向具體實現就是任務的實質內容。其實在整個過程當中,富人們都不知道是哪一個勞工幫他們作的工做,只知道是中介所負責完成這些就能夠了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個不恰當的地方,若是硬要這樣比喻的話,我只能說:其實勞工裏面也有不少富人的^o^! 。不少勞工在完成任務的過程當中本身也託給中介所一些任務,而後這些任務極可能仍是本身去完成。這也難怪,運行代碼的老是這些線程,那麼調用post的確定也會有這些線程了,不過無論怎麼說,如此循環往復能夠解決問題就行,比喻不見得就得恰當,任何事物之間都不可能徹底相同,只要能闡述思想就行。

最後還要說明的一點就是:委託的任務其實能夠設定執行的時間的,很不錯的設定,內部實現則是經過定時器原理,GetQueuedCompletionStatus有一個等待時間的參數彷佛被用在這方面,還有源碼中的定時器線程我並無過多的去理解,總之大致原理已基本掌握,剩下的就是使勁的用它了!!!

另外爲了方便人交流,在這裏插入一些代碼可能更容易讓人理解吧,
下面這個是啓動服務時的代碼:
void ServerFramework::run()
{
     boost::thread_group workers;
    for (uint32 i = 0; i < mWorkerCount; ++i)
         workers.create_thread(
             boost::bind(&boost::asio::io_service::run, &mIoService));
     workers.join_all();
}

在打開前就得分配好任務,不然線程們運行起來就退出了,阻塞不住,任務的分配就交給open函數了,它是分配了監聽端口的任務,一旦有了鏈接就會拋出一個任務,其中一個線程就會開始行動啦。
void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /*= DEFAULT_WORKER_COUNT*/)
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
     boost::asio::ip::tcp::resolver resolver(mIoService);
     boost::asio::ip::tcp::resolver::query query(address, port);
     boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

     mAcceptor.open(endpoint.protocol());
     mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
     mAcceptor.bind(endpoint);
     mAcceptor.listen();

     mNextConnection = new Connection(this);
     mAcceptor.async_accept(mNextConnection->getSocket(),
         boost::bind(&ServerFramework::__onConnect, this,
         boost::asio::placeholders::error));

     mWorkerCount = nWorkers;
    if (mWorkerCount == DEFAULT_WORKER_COUNT)
    {
         mWorkerCount = 4;
     }
}

open函數中給io_service的一個任務就是在有連接訪問服務器端口的狀況下執行ServerFramework::__onConnect函數,有一點須要格外注意的,io_service必須時刻都有任務存在,不然線程io_service::run函數將返回,因而線程都會結束並銷燬,程序將退出,因此,你必須保證不管什麼時候都有任務存在,這樣線程們即便空閒了也仍是會繼續等待,不會銷燬。因此,我在ServerFramework::__onConnect函數中又一次給了io_service相同的任務,即:繼續監聽端口,有連接了仍是調用ServerFramework::__onConnect函數。若是你在ServerFramework::__onConnect執行完了尚未給io_service任務的話,那麼一切都晚了...... 代碼以下:
void ServerFramework::__onConnect(const BoostSysErr& e)
{
    if (e)
    {
         MOELOG_DETAIL_WARN(e.message().c_str());
     }

     Connection* p = mNextConnection;
     mNextConnection = new Connection(this);

    // 再次進入監聽狀態
     mAcceptor.async_accept(mNextConnection->getSocket(),
         boost::bind(&ServerFramework::__onConnect, this,
         boost::asio::placeholders::error));

    // 處理當前連接
     __addConnection(p);
     p->start();
}


最後,展現一下這個類的全部成員變量吧:react

 

    // 用於線程池異步處理的核心對象
     boost::asio::io_service mIoService;

    // 網絡連接的接收器,用於接收請求進入的連接
     boost::asio::ip::tcp::acceptor mAcceptor;

    // 指向下一個將要被使用的連接對象
     Connection* mNextConnection;

    // 存儲服務器連接對象的容器
     ConnectionSet mConnections;

    //// 爲連接對象容器準備的strand,防止並行調用mConnections
    //boost::asio::io_service::strand mStrand_mConnections;

    // 爲連接對象容器準備的同步鎖,防止並行調用mConnections
     boost::mutex mMutex4ConnSet;

    // 爲控制檯輸出流準備的strand,防止並行調用std::cout
     AsioService::strand mStrand_ConsoleIostream;

    // 工做線程的數量
     uint32 mWorkerCount;

 

usidc5 2013-10-07 16:41
花了足足3天時間,外加1天心情休整,終於在第5天編寫出了一個能運行的基於asio和thread_group的框架,差點沒氣暈過去,把源碼都看懂了才感受會用了。
測試了一下,debug下一萬次迴應耗時800+毫秒,release下是200+毫秒,機器配置雙核2.5G英特爾,4個線程並行工做,無錯的感受真好,不再用擔憂iocp出一些奇怪的問題啦,由於是巨人們寫的實現,呵呵。


進入正題,簡要說一下asio的實現原理吧。在win32平臺上,asio是基於IOCP技術實現的,我之前也用過IOCP,卻沒想到竟然能擴展成這樣,真是神奇!在其餘平臺下還會有別的方法去實現,具體見io_service類下面這部分的源碼:
  // The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
  typedef detail::win_iocp_io_service impl_type;
  friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
  typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
  typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
  typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
  typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif


這部分代碼其實就在boost::asio::io_service類聲明中的最前面幾行,能夠看見在不一樣平臺下,io_service類的實現將會不一樣。很顯然,windows平臺下固然是win_iocp_io_service類爲實現了(不過我一開始還覺得win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎麼有移植性呢?官方文檔也對該類隻字不提,其實我卡殼就是卡在這裏了,差點就直接用這個類了^_^!)。


那麼就分析一下win_iocp_io_service的代碼吧,這裏徹底是用IOCP來路由各類任務,你們使用post來委託任務,內部調用的實際上是IOCP的PostQueuedCompletionStatus函數,而後線程們用run來接受任務,內部實際上是阻塞在IOCP的GetQueuedCompletionStatus函數上,一旦有了任務就當即返回,執行完後再一個循環,繼續阻塞在這裏等待下一個任務的到來,這種設計思想堪稱神奇,對線程、服務以及任務徹底解耦,靈活度達到了如此高度,不愧爲boost庫的東西!我只能有拜的份了...


說一下整體的設計思想,其實io_service就像是勞工中介所,而一個線程就是一個勞工,而調用post的模塊至關於富人們,他們去中介所委託任務,而勞工們就聽候中介所的調遣去執行這些任務,任務的內容就寫在富人們給你的handler上,也就是函數指針,指針指向具體實現就是任務的實質內容。其實在整個過程當中,富人們都不知道是哪一個勞工幫他們作的工做,只知道是中介所負責完成這些就能夠了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個不恰當的地方,若是硬要這樣比喻的話,我只能說:其實勞工裏面也有不少富人的^o^! 。不少勞工在完成任務的過程當中本身也託給中介所一些任務,而後這些任務極可能仍是本身去完成。這也難怪,運行代碼的老是這些線程,那麼調用post的確定也會有這些線程了,不過無論怎麼說,如此循環往復能夠解決問題就行,比喻不見得就得恰當,任何事物之間都不可能徹底相同,只要能闡述思想就行。


最後還要說明的一點就是:委託的任務其實能夠設定執行的時間的,很不錯的設定,內部實現則是經過定時器原理,GetQueuedCompletionStatus有一個等待時間的參數彷佛被用在這方面,還有源碼中的定時器線程我並無過多的去理解,總之大致原理已基本掌握,剩下的就是使勁的用它了!!!


另外爲了方便人交流,在這裏插入一些代碼可能更容易讓人理解吧,
下面這個是啓動服務時的代碼:
void ServerFramework::run()
{
    boost::thread_group workers;
    for (uint32 i = 0; i < mWorkerCount; ++i)
        workers.create_thread(
            boost::bind(&boost::asio::io_service::run, &mIoService));
    workers.join_all();
}


在打開前就得分配好任務,不然線程們運行起來就退出了,阻塞不住,任務的分配就交給open函數了,它是分配了監聽端口的任務,一旦有了鏈接就會拋出一個任務,其中一個線程就會開始行動啦。
void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /*= DEFAULT_WORKER_COUNT*/)
{
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
    boost::asio::ip::tcp::resolver resolver(mIoService);
    boost::asio::ip::tcp::resolver::query query(address, port);
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);


    mAcceptor.open(endpoint.protocol());
    mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    mAcceptor.bind(endpoint);
    mAcceptor.listen();


    mNextConnection = new Connection(this);
    mAcceptor.async_accept(mNextConnection->getSocket(),
        boost::bind(&ServerFramework::__onConnect, this,
        boost::asio::placeholders::error));


    mWorkerCount = nWorkers;
    if (mWorkerCount == DEFAULT_WORKER_COUNT)
    {
        mWorkerCount = 4;
    }
}


open函數中給io_service的一個任務就是在有連接訪問服務器端口的狀況下執行ServerFramework::__onConnect函數,有一點須要格外注意的,io_service必須時刻都有任務存在,不然線程io_service::run函數將返回,因而線程都會結束並銷燬,程序將退出,因此,你必須保證不管什麼時候都有任務存在,這樣線程們即便空閒了也仍是會繼續等待,不會銷燬。因此,我在ServerFramework::__onConnect函數中又一次給了io_service相同的任務,即:繼續監聽端口,有連接了仍是調用ServerFramework::__onConnect函數。若是你在ServerFramework::__onConnect執行完了尚未給io_service任務的話,那麼一切都晚了...... 代碼以下:
void ServerFramework::__onConnect(const BoostSysErr& e)
{
    if (e)
    {
        MOELOG_DETAIL_WARN(e.message().c_str());
    }


    Connection* p = mNextConnection;
    mNextConnection = new Connection(this);


    // 再次進入監聽狀態
    mAcceptor.async_accept(mNextConnection->getSocket(),
        boost::bind(&ServerFramework::__onConnect, this,
        boost::asio::placeholders::error));


    // 處理當前連接
    __addConnection(p);
    p->start();
}


最後,展現一下這個類的全部成員變量吧:
    // 用於線程池異步處理的核心對象
    boost::asio::io_service mIoService;


    // 網絡連接的接收器,用於接收請求進入的連接
    boost::asio::ip::tcp::acceptor mAcceptor;


    // 指向下一個將要被使用的連接對象
    Connection* mNextConnection;


    // 存儲服務器連接對象的容器
    ConnectionSet mConnections;


    //// 爲連接對象容器準備的strand,防止並行調用mConnections
    //boost::asio::io_service::strand mStrand_mConnections;


    // 爲連接對象容器準備的同步鎖,防止並行調用mConnections
    boost::mutex mMutex4ConnSet;


    // 爲控制檯輸出流準備的strand,防止並行調用std::cout
    AsioService::strand mStrand_ConsoleIostream;


    // 工做線程的數量
    uint32 mWorkerCount;

 

usidc5 2013-10-07 16:41
boost的官方例子,有單線程的網絡框架,httpserver2是線程池的。下面參照網上某人的代碼修改了一點(忘了哪位大仙的代碼了)
測試工具,適用stressmark,測試效果很是好, 9000個/s
複製代碼
#include <stdio.h>
#include "AuthenHandle.h"
#include "configure.h"
#ifdef WIN32 //for windows nt/2000/xp


#include <winsock.h>
#include <windows.h>
#include "gelsserver.h"
#pragma comment(lib,"Ws2_32.lib")
#else         //for unix




#include <sys/socket.h>
//    #include <sys/types.h>


//    #include <sys/signal.h>


//    #include <sys/time.h>


#include <netinet/in.h>     //socket


//    #include <netdb.h>


#include <unistd.h>            //gethostname


// #include <fcntl.h>


#include <arpa/inet.h>


#include <string.h>            //memset


typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef struct sockaddr SOCKADDR;
#ifdef M_I386
typedef int socklen_t;
#endif


#define BOOL             int
#define INVALID_SOCKET    -1
#define SOCKET_ERROR     -1
#define TRUE             1
#define FALSE             0
#endif        //end #ifdef WIN32








static int count111 = 0;
static time_t oldtime = 0, nowtime = 0;




#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>


using namespace std;
using boost::asio::ip::tcp;


class io_service_pool
    : public boost::noncopyable
{
public:


    explicit io_service_pool(std::size_t pool_size)
        : next_io_service_(0)
    { 
        for (std::size_t i = 0; i < pool_size; ++ i)
        {
            io_service_sptr io_service(new boost::asio::io_service);
            work_sptr work(new boost::asio::io_service::work(*io_service));
            io_services_.push_back(io_service);
            work_.push_back(work);
        }
    }


    void start()
    { 
        for (std::size_t i = 0; i < io_services_.size(); ++ i)
        {
            boost::shared_ptr<boost::thread> thread(new boost::thread(
                boost::bind(&boost::asio::io_service::run, io_services_)));
            threads_.push_back(thread);
        }
    }


    void join()
    {
        for (std::size_t i = 0; i < threads_.size(); ++ i)
        {
            threads_->join();
        } 
    }


    void stop()
    { 
        for (std::size_t i = 0; i < io_services_.size(); ++ i)
        {
            io_services_->stop();
        }
    }


    boost::asio::io_service& get_io_service()
    {
        boost::mutex::scoped_lock lock(mtx);
        boost::asio::io_service& io_service = *io_services_[next_io_service_];
        ++ next_io_service_;
        if (next_io_service_ == io_services_.size())
        {
            next_io_service_ = 0;
        }
        return io_service;
    }


private:
    typedef boost::shared_ptr<boost::asio::io_service> io_service_sptr;
    typedef boost::shared_ptr<boost::asio::io_service::work> work_sptr;
    typedef boost::shared_ptr<boost::thread> thread_sptr;


    boost::mutex mtx;


    std::vector<io_service_sptr> io_services_;
    std::vector<work_sptr> work_;
    std::vector<thread_sptr> threads_; 
    std::size_t next_io_service_;
};


boost::mutex cout_mtx;
int packet_size = 0;
enum {MAX_PACKET_LEN = 4096};


class session
{
public:
    session(boost::asio::io_service& io_service)
        : socket_(io_service)
        , recv_times(0)
    {
    }


    virtual ~session()
    {
        boost::mutex::scoped_lock lock(cout_mtx);
    }


    tcp::socket& socket()
    {
        return socket_;
    }


    inline void start()
    {


        socket_.async_read_some(boost::asio::buffer(data_, MAX_PACKET_LEN),
            boost::bind(&session::handle_read, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    }


    void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
    {
        if (!error)
        {
            ++ recv_times;




            count111 ++;


            struct tm *today;
            time_t ltime;
            time( &nowtime );


            if(nowtime != oldtime){
                printf("%d\n", count111);
                oldtime = nowtime;
                count111 = 0;
            }




            boost::asio::async_write(socket_, boost::asio::buffer(data_, bytes_transferred),
                boost::bind(&session::handle_write, this, boost::asio::placeholders::error));






        }
        else
        {
            delete this;
        }
    }


    void handle_write(const boost::system::error_code& error)
    {
        if (!error)
        {
            start();
        }
        else
        {
            delete this;
        }
    }


private:
    tcp::socket socket_;
    char data_[MAX_PACKET_LEN];
    int recv_times;
};


class server
{
public:
    server(short port, int thread_cnt)
        : io_service_pool_(thread_cnt)
        , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
    {
        session* new_session = new session(io_service_pool_.get_io_service());
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }


    void handle_accept(session* new_session, const boost::system::error_code& error)
    {
        if (!error)
        {
            new_session->start();
        }
        else
        {
            delete new_session;
        }


        new_session = new session(io_service_pool_.get_io_service());
        acceptor_.async_accept(new_session->socket(),
            boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }


    void run()
    {
        io_service_pool_.start();
        io_service_pool_.join();
    }


private:


    io_service_pool io_service_pool_;
    tcp::acceptor acceptor_;
};






int main()
{


    //boost


    server s(port, 50);
    s.run();


    while(true)
    {
        sleep(1000);




     }


    return 0;
}
複製代碼


 

usidc5 2013-10-07 16:42
網上大部分人都講boost.asio用完成端口實現,而且實現了線程池,因此效率很是的高。
      我在應用asio的時候發現完成端口是有,可是線程池確並不存在,並且在現有的架構下,要想用線程池來實現對數據的處理,可能寫出來不是很好看。
asio經過開啓線程調用io_service::run再調用win_iocp_io_service::run來處理收到的事件。
  size_t run(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    size_t n = 0;
    while (do_one(true, ec))
      if (n != (std::numeric_limits<size_t>::max)())
        ++n;
    return n;
  }
do_one裏面爲
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
          &completion_key, &overlapped, block ? timeout : 0);
        operation* op = static_cast<operation*>(overlapped);
        op->do_completion(last_error, bytes_transferred);
實際上若是op->do_completion裏面有時間比較長的操做,這個線程一樣爲死在這個地方。
由於只有一個線程在驅動前面的run函數。
固然你也能夠經過同時啓動幾個線程來調用run函數,這樣是可行的,可是這種手法確很笨拙,由於你可能一下啓動10個線程,卻只有一個線程比較忙,
或者你的10個線程根本就忙不過來,這根有沒有使用iocp徹底沒什麼兩樣。
     作事情要力求完美,不要覺得NB的大師不提供的東西,你就不能自已弄一個。其實我以爲asio裏面c++的運用,很是的完美,可是從實用的角度來講,
還不如我之前一個同事寫的iocp寫得好。

咱們怎麼對asio這部分進行改良,讓他支持線程池的方式呢。
實際上咱們只須要對win_iocp_io_service進行一些加工便可。
在do_one裏面
op->do_completion(last_error, bytes_transferred);
以前auto_work work(*this);
這個地方,實際上就是來計算當前有多少工做要作,
這個地方調用work_started
  ::InterlockedIncrement(&outstanding_work_);
只須要在這按照你的需求加入一個線程就能夠了。
算法自已想吧,還存在work_finished函數,能夠用來減小線程。
須要給win_iocp_io_service類增長一個thread_group成員變量,供上面使用。
改良的方式不是很好,也比較很差看,
唉,完美只存在內心,適可而止吧。

 

usidc5 2013-10-07 16:43
正如其名字,asio是一個異步網絡庫。但第一次使用它倒是把它做爲一個線程池的實現。下面是一段實驗代碼。


#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
void foo() {
  sleep(1);
  printf("foo: %d\n", (int)pthread_self());
}




class TaskPool {
  typedef boost::shared_ptr<boost::thread> Thread;
  public:
  TaskPool(std::size_t num_workers) : num_workers_(num_workers) {
  }
  void Start() {
    manage_thread_.reset(new boost::thread(boost::bind(TaskPool::_Start, this)));
  }
  void Post() {
    ios_.post(foo);
  }
  private:
    static void _Start(TaskPool* pool) {
    for (std::size_t i = 0; i < pool->num_workers_; ++i) {
      pool->workers_.push_back(Thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &pool->ios_))));
    }
    for (std::size_t i = 0; i < pool->workers_.size(); ++i) {
      pool->workers_->join();
    }


  }
  private:
  std::size_t num_workers_;
  boost::asio::io_service ios_;
  std::vector<Thread> workers_;
  Thread manage_thread_;
};


int main()
{
  TaskPool pool(8);
  pool.Start();
  for (int i = 0; i < 1000; ++i) {
  pool.Post();
  }
  printf("post finished\n");
  sleep(10);
  return 0;
}

 

usidc5 2013-11-16 22:52
ACE是一個很成熟的中間件產品,爲自適應通信環境,但它過於宏大,一堆的設計模式,架構是一層又一層,對初學者來講,有點困難。
ASIO是基本Boost開發的異步IO庫,封裝了Socket,簡化基於socket程序的開發。


最近分析ASIO的源代碼,讓我無不驚呀於它設計。在ACE中開發中的內存管理一直讓人頭痛,ASIO的出現,讓我看到新的曙光,成爲我新的好夥伴。簡單地與ACE作個比較。


1.層次架構:
ACE底層是C風格的OS適配層,上一層基於C++的wrap類,再上一層是一些框架(Accpetor, Connector,Reactor等),最上一層是框架上服務。
ASIO與之相似,底層是OS的適配層,上一層一些模板類,再上一層模板類的參數化(TCP/UDP),再上一層是服務,它只有一種框架爲io_service。


2.涉及範圍:
ACE包含了日誌,IPC,線程,共享內存,配置服務等。
ASIO只涉及到Socket,提供簡單的線程操做。


3.設計模式:
ACE主要應用了Reactor,Proactor等。
而ASIO主要應用了Proactor。


4.線程調度:
ACE的Reactor是單線程調度,Proactor支持多線程調度。
ASIO支持單線程與多線程調度。


5.事件分派處理:
ACE主要是註冊handler類,當事件分派時,調用其handler的虛掛勾函數。實現ACE_Handler/ACE_Svc_Handler/ACE_Event_handler等類的虛函數。
ASIO是基於函數對象的hanlder事件分派。任何函數均可能成功hanlder,少了一堆虛表的維護,調度上優於ACE。


6.發佈方式:
ACE是開源免費的,不依賴於第3方庫, 通常應用使用它時,以動態連接的方式發佈動態庫。
ASIO是開源免費的,依賴Boost,應用使用時只要include頭文件,不需動態庫。


7.可移植性:
ACE支持多種平臺,可移植性不存在問題,聽說socket編程在linux下有很多bugs。
ASIO支持多種平臺,可移植性不存在問題。


8.開發難度:
基於ACE開發應用,對程序員要求比較高,要用好它,必須很是瞭解其框架。在其框架下開發,每每new出一個對象,不知在什麼地方釋放好。
基於ASIO開發應用,要求程序員熟悉函數對象,函數指針,熟悉boost庫中的boost::bind。內存管理控制方便。




我我的以爲,若是應用socket編程,使用ASIO開發比較好,開發效率比較高。ACE適合於理論研究,它原本就是源於Douglas的學術研究。

 

usidc5 2013-11-16 22:53
在使用IOCP時,最重要的幾個API就是GetQueueCompeltionStatus、WSARecv、WSASend,數據的I/O及其完成狀態經過這幾個接口獲取並進行後續處理。


GetQueueCompeltionStatus attempts to dequeue an I/O completion packet from the specified I/O completion port. If there is no completion packet queued, the function waits for a pending I/O operation associated with the completion port to complete.


BOOL WINAPI GetQueuedCompletionStatus(
  __in   HANDLE CompletionPort,
  __out  LPDWORD lpNumberOfBytes,
  __out  PULONG_PTR lpCompletionKey,
  __out  LPOVERLAPPED *lpOverlapped,
  __in   DWORD dwMilliseconds
);
If the function dequeues a completion packet for a successful I/O operation from the completion port, the return value is nonzero. The function stores information in the variables pointed to by the lpNumberOfBytes, lpCompletionKey, and lpOverlapped parameters.


除了關心這個API的in & out(這是MSDN開頭的幾行就能夠告訴咱們的)以外,咱們更加關心不一樣的return & out意味着什麼,由於因爲各類已知或未知的緣由,咱們的程序並不老是有正確的return & out。


If *lpOverlapped is NULL and the function does not dequeue a completion packet from the completion port, the return value is zero. The function does not store information in the variables pointed to by the lpNumberOfBytes and lpCompletionKey parameters. To get extended error information, call GetLastError. If the function did not dequeue a completion packet because the wait timed out, GetLastError returns WAIT_TIMEOUT.


假設咱們指定dwMilliseconds爲INFINITE。


這裏常見的幾個錯誤有:


WSA_OPERATION_ABORTED (995): Overlapped operation aborted.


因爲線程退出或應用程序請求,已放棄I/O 操做。


MSDN: An overlapped operation was canceled due to the closure of the socket, or the execution of the SIO_FLUSH command in WSAIoctl. Note that this error is returned by the operating system, so the error number may change in future releases of Windows.


成因分析:這個錯誤通常是因爲peer socket被closesocket或者WSACleanup關閉後,針對這些socket的pending overlapped I/O operation被停止。


解決方案:針對socket,通常應該先調用shutdown禁止I/O操做後再調用closesocket關閉。


嚴重程度:輕微易處理。


WSAENOTSOCK (10038): Socket operation on nonsocket.


MSDN: An operation was attempted on something that is not a socket. Either the socket handle parameter did not reference a valid socket, or for select, a member of an fd_set was not valid.


成因分析:在一個非套接字上嘗試了一個操做。


使用closesocket關閉socket以後,針對該invalid socket的任何操做都會得到該錯誤。


解決方案:若是是多線程存在對同一socket的操做,要保證對socket的I/O操做邏輯上的順序,作好socket的graceful disconnect。


嚴重程度:輕微易處理。


WSAECONNRESET (10054): Connection reset by peer.


遠程主機強迫關閉了一個現有的鏈接。


MSDN: An existing connection was forcibly closed by the remote host. This normally results if the peer application on the remote host is suddenly stopped, the host is rebooted, the host or remote network interface is disabled, or the remote host uses a hard close (see setsockopt for more information on the SO_LINGER option on the remote socket). This error may also result if a connection was broken due to keep-alive activity detecting a failure while one or more operations are in progress. Operations that were in progress fail with WSAENETRESET. Subsequent operations fail with WSAECONNRESET.


成因分析:在使用WSAAccpet、WSARecv、WSASend等接口時,若是peer application忽然停止(緣由如上所述),往其對應的socket上投遞的operations將會失敗。


解決方案:若是是對方主機或程序意外停止,那就只有各安天命了。但若是這程序是你寫的,而你只是hard close,那就由不得別人了。至少,你要知道這樣的錯誤已經出現了,就不要再費勁的繼續投遞或等待了。


嚴重程度:輕微易處理。


WSAECONNREFUSED (10061): Connection refused.


因爲目標機器積極拒絕,沒法鏈接。


MSDN: No connection could be made because the target computer actively refused it. This usually results from trying to connect to a service that is inactive on the foreign host—that is, one with no server application running.


成因分析:在使用connect或WSAConnect時,服務器沒有運行或者服務器的監聽隊列已滿;在使用WSAAccept時,客戶端的鏈接請求被condition function拒絕。


解決方案:Call connect or WSAConnect again for the same socket. 等待服務器開啓、監聽空閒或查看被拒絕的緣由。是否是長的醜或者錢沒給夠,要不就是服務器拒絕接受天價薪酬自主創業去了?


嚴重程度:輕微易處理。


WSAENOBUFS (10055): No buffer space available.


因爲系統緩衝區空間不足或列隊已滿,不能執行套接字上的操做。


MSDN: An operation on a socket could not be performed because the system lacked sufficient buffer space or because a queue was full.


成因分析:這個錯誤是我查看錯誤日誌後,最在乎的一個錯誤。由於服務器對於消息收發有明確限制,若是緩衝區不足應該早就處理了,不可能待到send/recv失敗啊。並且這個錯誤在以前的版本中幾乎沒有出現過。這也是這篇文章的主要內容。像connect和accept由於緩衝區空間不足均可以理解,並且危險不高,但若是send/recv形成擁堵並惡性循環下去,麻煩就大了,至少說明以前的驗證邏輯有疏漏。


WSASend失敗的緣由是:The Windows Sockets provider reports a buffer deadlock. 這裏提到的是buffer deadlock,顯然是因爲多線程I/O投遞不當引發的。


解決方案:在消息收發前,對最大掛起的消息總的數量和容量進行檢驗和控制。


嚴重程度:嚴重。


本文主要參考MSDN。

 

usidc5 2013-11-16 22:59
1:在IOCP中投遞WSASend返回WSA_IO_PENDING的時候,表示異步投遞已經成功,可是稍後發送纔會完成。這其中涉及到了三個緩衝區。
網卡緩衝區,TCP/IP層緩衝區,程序緩衝區。
狀況一:調用WSASend發送正確的時候(即當即返回,且沒有錯誤),TCP/IP將數據從程序緩衝區中拷貝到TCP/IP層緩衝區中,而後不鎖定該程序緩衝區,由上層程序本身處理。TCP/IP層緩衝區在網絡合適的時候,將其數據拷貝到網卡緩衝區,進行真正的發送。
狀況二:調用WSASend發送錯誤,可是錯誤碼是WSA_IO_PENDING的時候,表示此時TCP/IP層緩衝區已滿,暫時沒有剩餘的空間將程序緩衝區的數據拷貝出來,這時系統將鎖定用戶的程序緩衝區,按照書上說的WSASend指定的緩衝區將會被鎖定到系統的非分頁內存中。直到TCP/IP層緩衝區有空餘的地方來接受拷貝咱們的程序緩衝區數據才拷貝走,並將給IOCP一個完成消息。
狀況三:調用WSASend發送錯誤,可是錯誤碼不是WSA_IO_PENDING,此時應該是發送錯誤,應該釋放該SOCKET對應的全部資源。


2:在IOCP中投遞WSARecv的時候,狀況類似。
狀況一:調用WSARecv正確,TCP/IP將數據從TCP/IP層緩衝區拷貝到緩衝區,而後由咱們的程序自行處理了。清除TCP/IP層緩衝區數據。
狀況二:調用WSARecv錯誤,可是返回值是WSA_IO_PENDING,此時是由於TCP/IP層緩衝區中沒有數據可取,系統將會鎖定咱們投遞的WSARecv的buffer,直到TCP/IP層緩衝區中有新的數據到來。
狀況三:調用WSARecv錯誤,錯誤值不是WSA_IO_PENDING,此時是接收出錯,應該釋放該SOCKET對應的全部資源。


在以上狀況中有幾個很是要注意的事情:
系統鎖定非分頁內存的時候,最小的鎖定大小是4K(固然,這個取決於您系統的設置,也能夠設置小一些,在註冊表裏面能夠改,固然我想這些數值微軟應該比咱們更知道什麼合適了),因此當咱們投遞了不少WSARecv或者WSASend的時候,無論咱們投遞的Buffer有多大(0除外),系統在出現IO_PENGDING的時候,都會鎖定咱們4K的內存。這也就是常常有開發者出現WSANOBUF的狀況緣由了。


咱們在解決這個問題的時候,要針對WSASend和WSARecv作處理
1:投遞WSARecv的時候,能夠採用一個巧妙的設計,先投遞0大小Buf的WSARecv,若是返回,表示有數據能夠接收,咱們開啓真正的recv將數據從TCP/IP層緩衝區取出來,直到WSA_IO_PENGDING.
2:對投遞的WSARecv以及WSASend進行計數統計,若是超過了咱們預約義的值,就不進行WSASend或者WSARecv投遞了。
3:如今咱們應該就能夠明白爲何WSASend會返回小於咱們投遞的buffer空間數據值了,是由於TCP/IP層緩衝區小於咱們要發送的緩衝區,TCP/IP只會拷貝他剩餘可被Copy的緩衝區大小的數據走,而後給咱們的WSASend的已發送緩衝區設置爲移走的大小,下一次投遞的時候,若是TCP/IP層還未被髮送,將返回WSA_IO_PENGDING。
4:在不少地方有提到,能夠關閉TCP/IP層緩衝區,能夠提升一些效率和性能,這個從上面的分析來看,有這個可能,要實際的網絡狀況去實際分析了。







==================


關於數據包在應用層亂序問題就很少說了(IOCP荒廢了TCP在傳輸層辛辛苦苦保證的有序)。


這可有可無,由於iocp要管理上千個SOCKET,每一個SOCKET的讀請求、寫請求分別保證串行便可。





=============


關於GetQueuedCompletionStatus的返回值判斷:


我給超時值傳的是0,直接測試,無須等待。


這裏咱們關心這幾個值:


第二個參數所傳回的byte值


第三個參數所傳回的complete key值 ——PER HANDLE DATA


第四個參數所傳回的OVERLAPPED結構指針 ——PER IO DATA


系統設置的ERROR值。





在超時狀況下,byte值返回0,per handle data值是-1,per io data爲NULL





1.若是返回FALSE


    one : iocp句柄在外部被關閉。


   WSAGetLastError返回6(無效句柄),byte值返回0,per handle data值是-1,per io data爲NULL





    two: 咱們主動close一個socket句柄,或者CancelIO(socket)(且此時有未決的操做)


    WSAGetLastError返回995(因爲線程退出或應用程序請求,已放棄 I/O 操做)


   byte值爲0,


   per handle data與per io data正確傳回。





   three:對端強退(且此時本地有未決的操做)


   WSAGetLastError返回64(指定的網絡名再也不可用)


  byte值爲0,per handle data與per io data正確傳回 





2.若是返回TRUE【此時必定獲得了你投遞的OVERLAP結構】


    one:  我接收到對端數據,而後準備再投遞接收請求;但此期間,對端關閉socket。


   WSARecv返回錯誤碼10054:遠程主機強迫關閉了一個現有的鏈接。


TODO TODO


   從網上搜到一個作法,感受很不錯:


若是返回FALSE, 那麼:若是OVERLAP爲空,那必定是發生了錯誤(注意:請排除TIMEOUT錯誤);


若是OVERLAP不爲空,有可能發生錯誤。不用管它,這裏直接投遞請求;若是有錯,WSARecv將返回錯誤。關閉鏈接便可。








============


關於closesocket操做:





The closesocket function will initiate cancellation on the outstanding I/O operations, but that does not mean that an application will receive I/O completion for these I/O operations by the time the closesocket function returns. Thus, an application should not cleanup any resources (WSAOVERLAPPED structures, for example) referenced by the outstanding I/O requests until the I/O requests are indeed completed.





在IOCP模式下,若是調用closesocket時有未決的pending   IO,將致使socket被重置,因此有時會出現數據丟失。正統的解決方式是使用shutdown函數(指定SD_SEND標誌),注意這時可能有未完成的發送pengding   IO,因此你應該監測是否該鏈接的全部是否已完成(也許你要用一個計數器來跟蹤這些pending   IO),僅在全部send   pending   IO完成後調用shutdown。





MSDN推薦的優雅關閉socket:




Call WSAAsyncSelect to register for FD_CLOSE notification.
Call shutdown with how=SD_SEND.
When FD_CLOSE received, call recv until zero returned, or SOCKET_ERROR.
Call closesocket.



FD_CLOSE being posted after all data is read from a socket. An application should check for remaining data upon receipt of FD_CLOSE to avoid any possibility of losing data.


























對每一個使用AcceptEx接受的鏈接套結字使用setsockopt設置SO_UPDATE_ACCEPT_CONTEXT選項,這個選項原義是把listen套結字一些屬性(包括socket內部接受/發送緩存大小等等)拷貝到新創建的套結字,卻能夠使後續的shutdown調用成功。



/* SO_UPDATE_ACCEPT_CONTEXT is required for shutdown() to work fine*/
       setsockopt( sockClient,
                            SOL_SOCKET,
                            SO_UPDATE_ACCEPT_CONTEXT,
                            (char*)&m_sockListen,
                            sizeof(m_sockListen) ) ;
若是是調用AcceptEX接收的鏈接 不設置該選項的話,隨後的shutdown調用
將返回失敗, WSAGetLastError() returns 10057 -- WSANOTCONN 




2012.10.24


用智能指針重構了網絡庫,替換了裸指針。


可是發現IOCP以下一個問題:




1. 收到14字節數據
2012-10-25[02_02_05_906[DBG]:OnRecv : Worker thread [6268], socket = 11256, bytes = 14


2.再次投遞RECV請求,發生錯誤,由於對端已經關閉
2012-10-25[02_02_05_906[DBG]:Fatal error when post recv, error 10054, socket = 11256
2012-10-25[02_02_05_906[DBG]:Socket is set invalid 11256


3.因而準備回收資源,結束RECV請求;
2012-10-25[02_02_05_906[DBG]:EndRecv : Worker thread [6268], socket = 11256


4.但此時overlap結構仍然是掛起狀態?
2012-10-25[02_02_05_906[DBG]:2EndRecv socket 11256, now recv overlappe is complete ? 0
相關文章
相關標籤/搜索