在Reactor和Proactor模型一文中講到,Reactor模型提供了一個比較理想的I/O編程框架,讓程序更有結構,用戶使用起來更加方便,比裸API調用開發效率要高。另一方面,若是但願每一個事件通知以後,作的事情能有機會被代理到某個線程裏面去單獨運行,而線程完成的狀態又能通知回主任務,那麼「異步」的機制就必須被引入。本文以boost.Asio庫(其設計模式爲Proactor)爲基礎,講解爲何須要異步編程以及異步編程的實現。html
設想你是一位體育老師,須要測驗100位同窗的400米成績。你固然不會讓100位同窗一塊兒起跑,由於當同窗們返回終點時,你根原本不及掐表記錄各位同窗的成績。react
若是你每次讓一位同窗起跑並等待他回到終點你記下成績後再讓下一位起跑,直到全部同窗都跑完。恭喜你,你已經掌握了同步阻塞模式。你設計了一個函數,傳入參數是學生號和起跑時間,返回值是到達終點的時間。你調用該函數100次,就能完成此次測驗任務。這個函數是同步的,由於只要你調用它,就能獲得結果;這個函數也是阻塞的,由於你一旦調用它,就必須等待,直到它給你結果,不能去幹其餘事情。git
若是你一邊每隔10秒讓一位同窗起跑,直到全部同窗出發完畢;另外一邊每有一個同窗回到終點就記錄成績,直到全部同窗都跑完。恭喜你,你已經掌握了異步非阻塞模式。你設計了兩個函數,其中一個函數記錄起跑時間和學生號,該函數你會主動調用100次;另外一個函數記錄到達時間和學生號,該函數是一個事件驅動的callback函數,當有同窗到達終點時,你會被動調用。你主動調用的函數是異步的,由於你調用它,它並不會告訴你結果;這個函數也是非阻塞的,由於你一旦調用它,它就立刻返回,你不用等待就能夠再次調用它。但僅僅將這個函數調用100次,你並無完成你的測驗任務,你還須要被動等待調用另外一個函數100次。github
固然,你立刻就會意識到,同步阻塞模式的效率明顯低於異步非阻塞模式。那麼,誰還會使用同步阻塞模式呢?不錯,異步模式效率高,但更麻煩,你一邊要記錄起跑同窗的數據,一邊要記錄到達同窗的數據,並且同窗們回到終點的次序與起跑的次序並不相同,因此你還要不停地在你的成績冊上查找學生號。忙亂之中你每每會張冠李戴。你可能會想出更聰明的辦法:你帶了不少塊秒錶,讓同窗們分組互相測驗。恭喜你!你已經掌握了多線程同步模式!編程
每一個拿秒錶的同窗均可以獨立調用你的同步函數,這樣既不容易出錯,效率也大大提升,只要秒錶足夠多,同步的效率也能達到甚至超過異步。設計模式
能夠理解,你現的問題多是:既然多線程同步既快又好,異步模式還有存在的必要嗎?api
很遺憾,異步模式依然很是重要,由於在不少狀況下,你拿不出不少秒錶。你須要通訊的對端系統可能只容許你創建一個SOCKET鏈接,不少金融、電信行業的大型業務系統都如此要求。安全
如下部分主要來自於:http://www.javashuo.com/article/p-xgjicfxa-bo.html
依據微軟的MSDN上的解說:
(1)、同步函數:當一個函數是同步執行時,那麼當該函數被調用時不會當即返回,直到該函數所要作的事情全都作完了才返回。
(2)、異步函數:若是一個異步函數被調用時,該函數會當即返回儘管該函數規定的操做任務尚未完成。
(3)、在一個線程中分別調用上述兩種函數會對調用線程有何影響呢?網絡
(4)、一個調用了異步函數的線程如何與異步函數的執行結果同步呢?多線程
咱們是否已經發現了一個有趣的地方呢?!就是咱們可使用等待函數將一個異步執行的函數封裝成一個同步函數。
操做系統發展到今天已經十分精巧,線程就是其中一個傑做。操做系統把 CPU 處理時間劃分紅許多短暫時間片,在時間 T1 執行一個線程的指令,到時間 T2 又執行下一線程的指令,各線程輪流執行,結果好象是全部線程在並肩前進。這樣,編程時能夠建立多個線程,在同一期間執行,各線程能夠「並行」完成不一樣的任務。
在單線程方式下,計算機是一臺嚴格意義上的馮·諾依曼式機器,一段代碼調用另外一段代碼時,只能採用同步調用,必須等待這段代碼執行完返回結果後,調用方纔能繼續往下執行。有了多線程的支持,能夠採用異步調用,調用方和被調方能夠屬於兩個不一樣的線程,調用方啓動被調方線程後,不等對方返回結果就繼續執行後續代碼。被調方執行完畢後,經過某種手段通知調用方:結果已經出來,請酌情處理。
計算機中有些處理比較耗時。調用這種處理代碼時,調用方若是站在那裏苦苦等待,會嚴重影響程序性能。例如,某個程序啓動後若是須要打開文件讀出其中的數據,再根據這些數據進行一系列初始化處理,程序主窗口將遲遲不能顯示,讓用戶感到這個程序怎麼等半天也不出來,太差勁了。藉助異步調用能夠把問題輕鬆化解:把整個初始化處理放進一個單獨線程,主線程啓動此線程後接着往下走,讓主窗口瞬間顯示出來。等用戶盯着窗口犯呆時,初始化處理就在背後悄悄完成了。程序開始穩定運行之後,還能夠繼續使用這種技巧改善人機交互的瞬時反應。用戶點擊鼠標時,所激發的操做若是較費時,再點擊鼠標將不會當即反應,整個程序顯得很沉重。藉助異步調用處理費時的操做,讓主線程隨時恭候下一條消息,用戶點擊鼠標時感到輕鬆快捷,確定會對軟件產生好感。
異步調用用來處理從外部輸入的數據特別有效。假如計算機須要從一臺低速設備索取數據,而後是一段冗長的數據處理過程,採用同步調用顯然很不合算:計算機先向外部設備發出請求,而後等待數據輸入;而外部設備向計算機發送數據後,也要等待計算機完成數據處理後再發出下一條數據請求。雙方都有一段等待期,拉長了整個處理過程。其實,計算機能夠在處理數據以前先發出下一條數據請求,而後當即去處理數據。若是數據處理比數據採集快,要等待的只有計算機,外部設備能夠連續不停地採集數據。若是計算機同時鏈接多臺輸入設備,能夠輪流向各臺設備發出數據請求,並隨時處理每臺設備發來的數據,整個系統能夠保持連續高速運轉。編程的關鍵是把數據索取代碼和數據處理代碼分別歸屬兩個不一樣的線程。數據處理代碼調用一個數據請求異步函數,而後徑自處理手頭的數據。待下一組數據到來後,數據處理線程將收到通知,結束 wait 狀態,發出下一條數據請求,而後繼續處理數據。
異步調用時,調用方不等被調方返回結果就轉身離去,所以必須有一種機制讓被調方有告終果時能通知調用方。在同一進程中有不少手段能夠利用,筆者經常使用的手段是回調、event 對象和消息。
回調:回調方式很簡單:調用異步函數時在參數中放入一個函數地址,異步函數保存此地址,待有告終果後回調此函數即可以向調用方發出通知。若是把異步函數包裝進一個對象中,能夠用事件取代回調函數地址,經過事件處理例程向調用方發通知。
event : event 是 Windows 系統提供的一個經常使用同步對象,以在異步處理中對齊不一樣線程之間的步點。若是調用方暫時無事可作,能夠調用 wait 函數等在那裏,此時 event 處於 nonsignaled 狀態。當被調方出來結果以後,把 event 對象置於 signaled 狀態,wait 函數便自動結束等待,使調用方從新動做起來,從被調方取出處理結果。這種方式比回調方式要複雜一些,速度也相對較慢,但有很大的靈活性,能夠搞出不少花樣以適應比較複雜的處理系統。
消息:藉助 Windows 消息發通知是個不錯的選擇,既簡單又安全。程序中定義一個用戶消息,並由調用方準備好消息處理例程。被調方出來結果以後當即向調用方發送此消息,並經過 WParam 和 LParam 這兩個參數傳送結果。消息老是與窗口 handle 關聯,所以調用方必須藉助一個窗口才能接收消息,這是其不方便之處。另外,經過消息聯絡會影響速度,須要高速處理時回調方式更有優點。
若是調用方和被調方分屬兩個不一樣的進程,因爲內存空間的隔閡,通常是採用 Windows 消息發通知比較簡單可靠,被調方能夠藉助消息自己向調用方傳送數據。event 對象也能夠經過名稱在不一樣進程間共享,但只能發通知,自己沒法傳送數據,須要藉助 Windows 消息和 FileMapping 等內存共享手段或藉助 MailSlot 和 Pipe 等通訊手段。
若是你的服務端的客戶端數量多,你的服務端就採用異步的,可是你的客戶端能夠用同步的,客戶端通常功能比較單一,收到數據後才能執行下面的工做,因此弄成同步的在那等。
同步異步指的是通訊模式,而阻塞和非阻塞指的是在接收和發送時是否等待動做完成才返回。
首先是通訊的同步,主要是指客戶端在發送請求後,必須得在服務端有迴應後才發送下一個請求。因此這個時候的全部請求將會在服務端獲得同步。
其次是通訊的異步,指客戶端在發送請求後,沒必要等待服務端的迴應就能夠發送下一個請求,這樣對於全部的請求動做來講將會在服務端獲得異步,這條請求的鏈路就象是一個請求隊列,全部的動做在這裏不會獲得同步的。
阻塞和非阻塞只是應用在請求的讀取和發送。
在實現過程當中,若是服務端是異步的話,客戶端也是異步的話,通訊效率會很高,但若是服務端在請求的返回時也是返回給請求的鏈路時,客戶端是能夠同步的,這種狀況下,服務端是兼容同步和異步的。相反,若是客戶端是異步而服務端是同步的也不會有問題,只是處理效率低了些。
阻塞 block 是指,你撥通某人的電話,可是此人不在,因而你拿着電話等他回來,其間不能再用電話。同步大概和阻塞差很少。
非阻塞 nonblock 是指,你撥通某人的電話,可是此人不在,因而你掛斷電話,待會兒再打。至於到時候他回來沒有,只有打了電話才知道。即所謂的「輪詢 / poll」。
異步是指,你撥通某人的電話,可是此人不在,因而你叫接電話的人告訴那人(leave a message),回來後給你打電話(call back)。
1、同步阻塞模式
在這個模式中,用戶空間的應用程序執行一個系統調用,並阻塞,直到系統調用完成爲止(數據傳輸完成或發生錯誤)。
2、同步非阻塞模式
同步阻塞 I/O 的一種效率稍低的。非阻塞的實現是 I/O 命令可能並不會當即知足,須要應用程序調用許屢次來等待操做完成。這可能效率不高,由於在不少狀況下,當內核執行這個命令時,應用程序必需要進行忙碌等待,直到數據可用爲止,或者試圖執行其餘工做。由於數據在內核中變爲可用到用戶調用 read 返回數據之間存在必定的間隔,這會致使總體數據吞吐量的下降。但異步非阻塞因爲是多線程,效率仍是高。
/* create the connection by socket * means that connect "sockfd" to "server_addr" * 同步阻塞模式 */ if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1) { perror("connect"); exit(1); } /* 同步非阻塞模式 */ while (send(sockfd, snd_buf, sizeof(snd_buf), MSG_DONTWAIT) == -1) { sleep(1); printf("sleep\n"); }
前面說了那麼多,如今終於能夠回到咱們的正題,介紹異步編程了。就像以前所說的,同步編程比異步編程簡單不少。這是由於,線性的思考是很簡單的(調用A,調用A結束,調用B,調用B結束,而後繼續,這是以事件處理的方式來思考)。後面你會碰到這種狀況,好比:五件事情,你不知道它們執行的順序,也不知道他們是否會執行!這部分主要參考:https://mmoaay.gitbooks.io/boost-asio-cpp-network-programming-chinese/content/Chapter2.html
儘管異步編程更難,可是你會更傾向於選擇使用它,好比:寫一個須要處理不少併發訪問的服務端。併發訪問越多,異步編程就比同步編程越簡單。
假設:你有一個須要處理1000個併發訪問的應用,從客戶端發給服務端的每一個信息都會再返回給客戶端,以‘\n’結尾。
同步方式的代碼,1個線程:
using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // 每一個信息最多這麼大 int already_read; // 你已經讀了多少 }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock.available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // 分析消息,而後返回 if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... }
有一種狀況是在任何服務端(和任何基於網絡的應用)都須要避免的,就是代碼無響應的狀況。在咱們的例子裏,咱們須要handle_clients()方法儘量少的阻塞。若是方法在某個點上阻塞,任何進來的信息都須要等待方法解除阻塞才能被處理。
爲了保持響應,只在一個套接字有數據的時候咱們纔讀,也就是說,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read時,咱們只讀當前可用的;調用read_until(c.sock, buffer(...), '\n')會是一個很是糟糕的選擇,由於直到咱們從一個指定的客戶端讀取了完整的消息以前,它都是阻塞的(咱們永遠不知道它何時會讀取到完整的消息)
這裏的瓶頸就是on_read_msg()方法;當它執行時,全部進來的消息都在等待。一個良好的on_read_msg()方法實現會保證這種狀況基本不會發生,可是它仍是會發生(有時候向一個套接字寫入數據,緩衝區滿了時,它會被阻塞)
同步方式的代碼,10個線程
using namespace boost::asio; struct client { // ... 和以前同樣 bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // 已經在讀取 else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // 和以前同樣 } void on_read_msg(client & c, const std::string & msg) { // 和以前同樣 }
爲了使用多線程,咱們須要對線程進行同步,這就是set_reading()和set_unreading()所作的。set_reading()方法很是重要,好比你想要一步實現「判斷是否在讀取而後標記爲讀取中」。但這是有兩步的(「判斷是否在讀取」和「標記爲讀取中」),你可能會有兩個線程同時爲一個客戶端判斷是否在讀取,而後你會有兩個線程同時爲一個客戶端調用on_read,結果就是數據衝突甚至致使應用崩潰。
你會發現代碼變得極其複雜。
同步編程有第三個選擇,就是爲每一個鏈接開闢一個線程。可是當併發的線程增長時,這就成了一種災難性的狀況。
而後,讓咱們來看異步編程。咱們不斷地異步讀取。當一個客戶端請求某些東西時,on_read被調用,而後迴應,而後等待下一個請求(而後開始另一個異步的read操做)。
異步方式的代碼,10個線程
using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // 從客戶端取回結果 } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // 等待同一個客戶端下一個讀取操做 async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); }
發現代碼變得有多簡單了吧?client結構裏面只有兩個成員,handle_clients()僅僅調用了async_read_until,而後它建立了10個線程,每一個線程都調用service.run()。這些線程會處理全部來自客戶端的異步read操做,而後分發全部向客戶端的異步write操做。另外須要注意的一件事情是:on_read()一直在爲下一次異步read操做作準備(看最後一行代碼)。
再一次說明,若是有等待執行的操做,run()會一直執行,直到你手動調用io_service::stop()。爲了保證io_service一直執行,一般你添加一個或者多個異步操做,而後在它們被執行時,你繼續一直不停地添加異步操做,好比下面代碼:
using namespace boost::asio; io_service service; ip::tcp::socket sock(service); char buff_read[1024], buff_write[1024] = "ok"; void on_read(const boost::system::error_code &err, std::size_t bytes); void on_write(const boost::system::error_code &err, std::size_t bytes) { sock.async_read_some(buffer(buff_read), on_read); } void on_read(const boost::system::error_code &err, std::size_t bytes) { // ... 處理讀取操做 ... sock.async_write_some(buffer(buff_write,3), on_write); } void on_connect(const boost::system::error_code &err) { sock.async_read_some(buffer(buff_read), on_read); } int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001); sock.async_connect(ep, on_connect); service.run(); }
假設你須要作下面的操做:
io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff));
在這個例子中,sock和buff的存在時間都必須比read()調用的時間要長。也就是說,在調用read()返回以前,它們都必須有效。這就是你所指望的;你傳給一個方法的全部參數在方法內部都必須有效。當咱們採用異步方式時,事情會變得比較複雜。
io_service service; ip::tcp::socket sock(service); char buff[512]; void on_read(const boost::system::error_code &, size_t) {} ... async_read(sock, buffer(buff), on_read);
在這個例子中,sock和buff的存在時間都必須比read()操做自己時間要長,可是read操做持續的時間咱們是不知道的,由於它是異步的。
當使用socket緩衝區的時候,你會有一個buffer實例在異步調用時一直存在(使用boost::shared_array<>)。在這裏,咱們可使用一樣的方式,經過建立一個類並在其內部管理socket和它的讀寫緩衝區。而後,對於全部的異步操做,傳遞一個包含智能指針的boost::bind仿函數給它:
using namespace boost::asio; io_service service; struct connection : boost::enable_shared_from_this<connection> { typedef boost::system::error_code error_code; typedef boost::shared_ptr<connection> ptr; connection() : sock_(service), started_(true) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1)); } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { // 這裏你決定用這個鏈接作什麼: 讀取或者寫入 if ( !err) do_read(); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg == "can_login") do_write("access_data"); else if ( msg.find("data ") == 0) process_data(msg); else if ( msg == "login_fail") stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2)); } void do_write(const std::string & msg) { if ( !started() ) return; // 注意: 由於在作另一個async_read操做以前你想要發送多個消息, // 因此你須要多個寫入buffer std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); } void process_data(const std::string & msg) { // 處理服務端來的內容,而後啓動另一個寫入操做 } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); connection::ptr(new connection)->start(ep); }
在全部異步調用中,咱們傳遞一個boost::bind仿函數看成參數。這個仿函數內部包含了一個智能指針,指向connection實例。只要有一個異步操做等待時,Boost.Asio就會保存boost::bind仿函數的拷貝,這個拷貝保存了指向鏈接實例的一個智能指針,從而保證connection實例保持活動。問題解決!
固然,connection類僅僅是一個框架類;你須要根據你的需求對它進行調整(它看起來會和當前服務端例子的狀況至關不一樣)。
你須要注意的是建立一個新的鏈接是至關簡單的:connection::ptr(new connection)- >start(ep)。這個方法啓動了到服務端的(異步)鏈接。當你須要關閉這個鏈接時,調用stop()。
當實例被啓動時(start()),它會等待客戶端的鏈接。當鏈接發生時。on_connect()被調用。若是沒有錯誤發生,它啓動一個read操做(do_read())。當read操做結束時,你就能夠解析這個消息;固然你應用的on_read()看起來會各類各樣。而當你寫回一個消息時,你須要把它拷貝到緩衝區,而後像我在do_write()方法中所作的同樣將其發送出去,由於這個緩衝區一樣須要在這個異步寫操做中一直存活。最後須要注意的一點——當寫回時,你須要指定寫入的數量,不然,整個緩衝區都會被髮送出去。
網絡api實際上要繁雜得多,這個章節只是作爲一個參考,當你在實現本身的網絡應用時能夠回過頭來看看。
Boost.Asio實現了端點的概念,你能夠認爲是IP和端口。若是你不知道準確的IP,你可使用resolver對象將主機名,例如www.yahoo.com轉換爲一個或多個IP地址。
咱們也能夠看到API的核心——socket類。Boost.Asio提供了TCP、UDP和 ICMP的實現。並且你還能夠用你本身的協議來對它進行擴展;固然,這個工做不適合缺少勇氣的人。
異步編程是剛需。你應該已經明白爲何有時候須要用到它,尤爲在寫服務端的時候。調用service.run()來實現異步循環就已經可讓你很知足,可是有時候你須要更進一步,嘗試使用run_one()、poll()或者poll_one()。
當實現異步時,你能夠異步執行你本身的方法;使用service.post()或者service.dispatch()。
最後,爲了使socket和緩衝區(read或者write)在整個異步操做的生命週期中一直活動,咱們須要採起特殊的防禦措施。你的鏈接類須要繼承自enabled_shared_from_this,而後在內部保存它須要的緩衝區,並且每次異步調用都要傳遞一個智能指針給this操做。