boost.asio源碼剖析(三) ---- 流程分析

* 常見流程分析之一(Tcp異步鏈接)react

 

咱們用一個簡單的demo分析Tcp異步鏈接的流程:ios

 1 #include <iostream>
 2 #include <boost/asio.hpp>
 3 
 4 // 異步鏈接回調函數
 5 void on_connect(boost::system::error_code ec)
 6 {
 7     if (ec)  // 鏈接失敗, 輸出錯誤碼
 8         std::cout << "async connect error:" << ec.message() << std::endl;
 9     else  // 鏈接成功
10         std::cout << "async connect ok!" << std::endl;
11 }
12 
13 int main()
14 {
15     boost::asio::io_service ios;  // 建立io_service對象
16     boost::asio::ip::tcp::endpoint addr(
17         boost::asio::ip::address::from_string("127.0.0.1"), 12345);  // server端地址
18     boost::asio::ip::tcp::socket conn_socket(ios);  // 建立tcp協議的socket對象
19     conn_socket.async_connect(addr, &on_connect);  // 發起異步鏈接請求
20     ios.run();  // 調用io_service::run, 等待異步操做結果
21 
22     std::cin.get();
23     return 0;
24 }

 


這段代碼中的異步鏈接請求在asio源碼中的序列圖以下:
windows

 

      其中,basic_socket是個模板類,tcp協議中的socket的定義以下:
            typedef basic_socket<tcp> socket;異步


      reactor的定義以下:
      #if defined(BOOST_ASIO_WINDOWS_RUNTIME)
            typedef class null_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_IOCP)
            typedef class select_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_EPOLL)
            typedef class epoll_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_KQUEUE)
            typedef class kqueue_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_DEV_POLL)
            typedef class dev_poll_reactor reactor;
      #else
            typedef class select_reactor reactor;
      #endifsocket

      在這個序列圖中最值得注意的一點是:在windows平臺下,異步鏈接請求不是由Iocp處理的,而是由select模型處理的,這是與異步讀寫數據最大的不一樣之處。async

 


* 常見流程分析之二(Tcp異步接受鏈接)tcp

 

咱們用一個簡單的demo分析Tcp異步鏈接的流程:函數

 1 #include <iostream>
 2 #include <boost/asio.hpp>
 3 #include <boost/bind.hpp>
 4 
 5 // 異步鏈接回調函數
 6 void on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket * socket_ptr)
 7 {
 8     if (ec)  // 鏈接失敗, 輸出錯誤碼
 9         std::cout << "async accept error:" << ec.message() << std::endl;
10     else  // 鏈接成功
11         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
12 
13     // 斷開鏈接, 釋放資源.
14     socket_ptr->close(), delete socket_ptr;
15 }
16 
17 int main()
18 {
19     boost::asio::io_service ios;  // 建立io_service對象
20     boost::asio::ip::tcp::endpoint addr(
21         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
22     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 建立acceptor對象
23     boost::asio::ip::tcp::socket * socket_ptr = new boost::asio::ip::tcp::socket(ios);
24     acceptor.async_accept(*socket_ptr
25         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 調用異步accept請求
26     ios.run();  // 調用io_service::run, 等待異步操做結果
27 
28     std::cin.get();
29     return 0;
30 }

 

這段代碼中的異步鏈接請求在asio源碼中的序列圖以下:
spa

 


* 常見流程分析之三(Tcp異步讀寫數據)操作系統

 

咱們依然以上一節的例子爲基礎,擴展一個簡單的demo分析Tcp異步讀寫數據的流程:

 1 #include <iostream>
 2 #include <boost/asio.hpp>
 3 #include <boost/bind.hpp>
 4 #include <boost/shared_ptr.hpp>
 5 #include <boost/array.hpp>
 6 
 7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
 8 typedef boost::array<char, 128> buffer_t;
 9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;
10 
11 // 異步讀數據回調函數
12 void on_read(boost::system::error_code ec
13     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
14 {
15     if (ec)
16         std::cout << "async write error:" << ec.message() << std::endl;
17     else
18     {
19         std::cout << "async read size:" << len;
20         std::cout << " info:" << std::string((char*)buffer_ptr->begin(), len) << std::endl;
21 
22         // auto release socket and buffer.
23     }
24 }
25 
26 // 異步寫數據回調函數
27 void on_write(boost::system::error_code ec
28     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
29 {
30     if (ec)
31         std::cout << "async write error:" << ec.message() << std::endl;
32     else
33     {
34         std::cout << "async write size:" << len << std::endl;
35         socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
36             , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
37                 , socket_ptr, buffer_ptr));
38     }
39 }
40 
41 // 異步鏈接回調函數
42 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
43 {
44     if (ec)  // 鏈接失敗, 輸出錯誤碼
45     {
46         std::cout << "async accept error:" << ec.message() << std::endl;
47     }
48     else  // 鏈接成功
49     {
50         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
51         buffer_ptr_t buffer_ptr(new buffer_t);
52         strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
53         socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
54             , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
55                 , socket_ptr, buffer_ptr));
56     }
57 }
58 
59 int main()
60 {
61     boost::asio::io_service ios;  // 建立io_service對象
62     boost::asio::ip::tcp::endpoint addr(
63         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
64     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 建立acceptor對象
65     socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
66     acceptor.async_accept(*socket_ptr
67         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 調用異步accept請求
68     ios.run();  // 調用io_service::run, 等待異步操做結果
69 
70     std::cout << "press enter key...";
71     std::cin.get();
72     return 0;
73 }    

 

這段代碼中的異步鏈接請求在asio源碼中的序列圖以下:

 


* 常見流程分析之四(Tcp強制關閉鏈接)

咱們依然以上一節的例子爲基礎,擴展一個簡單的demo分析Tcp強制關閉鏈接的流程:

 1 #include <iostream>
 2 #include <boost/asio.hpp>
 3 #include <boost/bind.hpp>
 4 #include <boost/shared_ptr.hpp>
 5 #include <boost/array.hpp>
 6 
 7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
 8 typedef boost::array<char, 128> buffer_t;
 9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;
10 
11 // 異步讀數據回調函數
12 void on_read(boost::system::error_code ec
13     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
14 {
15     if (ec)  // 鏈接失敗, 輸出錯誤碼
16     {
17         std::cout << "async read error:" << ec.message() << std::endl;
18     }
19 }
20 
21 // 異步寫數據回調函數
22 void on_write(boost::system::error_code ec
23     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
24 {
25     if (ec)  // 鏈接失敗, 輸出錯誤碼
26     {
27         std::cout << "async write error:" << ec.message() << std::endl;
28     }
29 }
30 
31 // 異步鏈接回調函數
32 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
33 {
34     if (ec)  // 鏈接失敗, 輸出錯誤碼
35     {
36         std::cout << "async accept error:" << ec.message() << std::endl;
37     }
38     else  // 鏈接成功
39     {
40         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
41 
42         {
43             buffer_ptr_t buffer_ptr(new buffer_t);
44             strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
45             socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
46                 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
47                 , socket_ptr, buffer_ptr));
48         }
49 
50         {
51             buffer_ptr_t buffer_ptr(new buffer_t);
52             socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
53                 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
54                 , socket_ptr, buffer_ptr));
55         }
56 
57         /// 強制關閉鏈接
58         socket_ptr->close(ec);
59         if (ec)
60             std::cout << "close error:" << ec.message() << std::endl;
61     }
62 }
63 
64 int main()
65 {
66     boost::asio::io_service ios;  // 建立io_service對象
67     boost::asio::ip::tcp::endpoint addr(
68         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
69     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 建立acceptor對象
70     socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
71     acceptor.async_accept(*socket_ptr
72         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 調用異步accept請求
73     socket_ptr.reset();
74     ios.run();  // 調用io_service::run, 等待異步操做結果
75 
76     std::cout << "press enter key...";
77     std::cin.get();
78     return 0;
79 } 

這個例子中,接受到客戶端的鏈接後,當即發起異步讀請求和異步寫請求,而後當即強制關閉socket。

其中,強制關閉socket的請求在asio源碼中的序列圖以下:

 

* 常見流程分析之五(Tcp優雅地關閉鏈接)

咱們依然以第三節的例子爲基礎,擴展一個簡單的demo分析Tcp優雅地關閉鏈接的流程:

 1 #include <iostream>
 2 #include <boost/asio.hpp>
 3 #include <boost/bind.hpp>
 4 #include <boost/shared_ptr.hpp>
 5 #include <boost/array.hpp>
 6 
 7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
 8 typedef boost::array<char, 32> buffer_t;
 9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;
10 
11 
12 // 異步讀數據回調函數
13 void on_read(boost::system::error_code ec
14     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
15 {
16     static int si = 0;
17     if (ec)  // 鏈接失敗, 輸出錯誤碼
18     {
19         std::cout << "async read(" << si++ << ") error:" << ec.message() << std::endl;
20         socket_ptr->shutdown(boost::asio::socket_base::shutdown_receive, ec);
21         socket_ptr->close(ec);
22         if (ec)
23             std::cout << "close error:" << ec.message() << std::endl;
24     }
25     else
26     {
27         std::cout << "read(" << si++ << ") len:" << len << std::endl;
28 
29         socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
30             , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
31             , socket_ptr, buffer_ptr));
32     }
33 }
34 
35 // 異步寫數據回調函數
36 void on_write(boost::system::error_code ec
37     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
38 {
39     if (ec)  // 鏈接失敗, 輸出錯誤碼
40     {
41         std::cout << "async write error:" << ec.message() << std::endl;
42     }
43     else
44     {
45         /// 優雅地關閉鏈接
46         socket_ptr->shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
47         if (ec)
48             std::cout << "shutdown send error:" << ec.message() << std::endl;
49     }
50 }
51 
52 // 異步鏈接回調函數
53 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
54 {
55     if (ec)  // 鏈接失敗, 輸出錯誤碼
56     {
57         std::cout << "async accept error:" << ec.message() << std::endl;
58     }
59     else  // 鏈接成功
60     {
61         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
62 
63         {
64             buffer_ptr_t buffer_ptr(new buffer_t);
65             socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
66                 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
67                 , socket_ptr, buffer_ptr));
68         }
69 
70         {
71             buffer_ptr_t buffer_ptr(new buffer_t);
72             strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
73             socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
74                 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
75                 , socket_ptr, buffer_ptr));
76         }
77     }
78 }
79 
80 int main()
81 { 
82     boost::asio::io_service ios;  // 建立io_service對象
83     boost::asio::ip::tcp::endpoint addr(
84         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址
85     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 建立acceptor對象
86     socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
87     acceptor.async_accept(*socket_ptr
88         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 調用異步accept請求
89     socket_ptr.reset();
90     ios.run();  // 調用io_service::run, 等待異步操做結果
91 
92     std::cout << "press enter key...";
93     std::cin.get();
94     return 0;
95 }

 

      在這個例子中,接收到客戶端的鏈接並向客戶端發送數據之後,先關閉socket的發送通道,而後等待socket接收緩衝區中的數據所有read出來之後,再關閉socket的接收通道。此時,socket的接收和發送通道均以關閉,任何進程都沒法使用此socket收發數據,但其所佔用的系統資源並未釋放,底層發送緩衝區中的數據也不保證已所有發出,須要在此以後執行close操做以便釋放系統資源。
      若在釋放系統資源前但願底層發送緩衝區中的數據依然能夠發出,則需在socket的linger屬性中設置一個等待時間,以便有時間等待發送緩衝區中的數據發送完畢。但linger中的值絕對不是越大越好,這是由於其原理是操做系統幫忙保留socket的資源以等待其發送緩衝區中的數據發送完畢,若是遠端socket的一直未能接收數據便會致使本地socket一直等待下去,這對系統資源是極大的浪費。所以,在須要處理大量鏈接的服務端,linger的值必定不可過大。

 

 

因爲本文會實時根據讀者反饋的寶貴意見更新,爲防其餘讀者看到過期的文章,所以本系列專題謝絕轉載!

相關文章
相關標籤/搜索