orchid是一個構建於boost庫基礎上的C++庫,相似於python下的gevent/eventlet,爲用戶提供基於協程的併發模型。 python
協程,即協做式程序,其思想是,一系列互相依賴的協程間依次使用CPU,每次只有一個協程工做,而其餘協程處於休眠狀態。協程在控制離開時暫停執行,當控制再次進入時只能從離開的位置繼續執行。 協程已經被證實是一種很是有用的程序組件,不只被python、lua、ruby等腳本語言普遍採用,並且被新一代面向多核的編程語言如golang rust-lang等採用做爲併發的基本單位。 ios
術語「green化」來自於python下著名的協程庫greenlet,指改造IO對象以能和協程配合。某種意義上,協程與線程的關係相似與線程與進程的關係,多個協程會在同一個線程的上下文之中運行。所以,當出現IO操做的時候,爲了可以與協程相互配合,只阻塞當前協程而非整個線程,須要將io對象「green化」。目前orchid提供的green化的io對象包括: c++
chan這個概念引用自golang的chan。每一個協程是一個獨立的執行單元,爲了可以方便協程之間的通訊/同步,orchid提供了chan這種機制。chan本質上是一個阻塞消息隊列,後面咱們將看到,chan不只能夠用於同一個調度器上的協程之間的通訊,並且能夠用於不一樣調度器上的協程之間的通訊。 git
建議使用的scheduler per cpu的的模型來支持多核的機器,即爲每一個CPU核心分配一個調度器,有多少核心就建立多少個調度器。不一樣調度器的協程之間也能夠經過chan來通訊。協程應該被建立在那個調度器裏由用戶本身決定。 github
orchid的實現嚴重依賴於boost,依賴的主要子庫包括:boost.context boost.asio boost.iostreams shared_ptr boost.bind 等等。若是用戶對這些子庫,尤爲是boost.asio和boost.bind、shared_ptr具備必定的瞭解的話,會更加有利於瞭解和使用orchid。固然若是不瞭解也沒有關係,本文會在後面的例子中對涉及的相關知識進行簡單的介紹。 golang
orchid 自己是個只包含頭文件的模板庫,拷貝到指定的目錄便可,可是orchid依賴的boost庫須要編譯。並且在使用orchid的時須要連接 boost_context boost_iostreams boost_system boost_thread 等子庫(參見unit_test裏的CMakeLists.txt)。 編程
boost須要採用最新的svn裏的版本,由於1.52及如下版本缺乏boost.atomic等子庫。 bootstrap
git clone https://github.com/ryppl/boost-svn cd boost-svn ./bootstrap.sh ./b2 toolset=clang cxxflags="-arch x86_64" linkflags="-arch x86_64" install cd .. git clone https://github.com/ioriiod0/orchid.git cd orchid cp -r orchid <安裝路徑>
git clone https://github.com/ryppl/boost-svn cd boost-svn ./bootstrap.sh ./b2 install cd .. git clone https://github.com/ioriiod0/orchid.git cd orchid cp -r orchid <安裝路徑>
國際慣例,讓咱們從hello world開始。在這個例子中,咱們將看到如何建立和執行一個協程。 安全
//函數簽名爲 void(orchid::coroutine_handle) 的函數 void f(orchid::coroutine_handle co) { std::cout<<"f:hello world"<<std::endl; } //函數簽名爲 void(orchid::coroutine_handle,const char*) 的函數 void f1(orchid::coroutine_handle co,const char* str) { std::cout<<str<<std::endl; } void stop(orchid::coroutine_handle co) { co -> get_scheduler().stop(); } int main(int argc,char* argv[]) { orchid::scheduler sche; sche.spawn(f,orchid::coroutine::minimum_stack_size()); sche.spawn(boost::bind(f1,_1,"f1:hello world"),orchid::coroutine::default_stack_size()); sche.spawn(stop); sche.run(); std::cout<<"done!"<<std::endl; }
程序的輸出: ruby
f:hello world f1:hello world done!
在這個例子中,咱們首先聲明一個調度器sche,而後調用sche的spawn方法以2種方式建立了2個協程來輸出hello world,最後調用調度器的run方法來執行整個程序。當程序執行時,2個協程依次被執行。須要注意的是,在調用run方法以前,被建立的協助程序並不會被執行,只有調用了run方法以後,被建立的協程纔會被調度執行。 調用run方法的線程會被阻塞,直到調度器的stop方法被調用爲止。 (實際上,在這個例子中咱們並無對std::cout進行green化,所以每次調用std::cout的時候,整個調度器/線程都會被阻塞,在後面的介紹中咱們將看到如何將IO對象green化)。
spawn方法有2個參數:
template <typename F> void spawn(const F& func,std::size_t stack_size = coroutine_type::default_stack_size())
第一個參數func是協程被執行時所執行的函數。能夠認爲是協程的main函數。func能夠是任何符合函數簽名
void(orchid::coroutine_handle)
的函數或函數對象,如上面例子中的f。其中,orchid::coroutine_handle是協程的句柄,表明了func自己所處的協程。 每每要完成協程所執行的任務,僅僅有orchid::coroutine_handle是不夠的,好比函數f1,f1須要額外傳入一個const char* 的參數,對於這種函數,咱們能夠經過boost.bind、std::bind(tr1或者C++11)、或者lambda表達式(c++11)將他們適配成要求的函數類型。如上例中的:
sche.spawn(boost::bind(f1,_1,"f1:hello world"),orchid::coroutine::default_stack_size())
boost::bind將f1從void(orchid::coroutine,const char*)適配成了void(orchid::coroutine_handle)類型。其中_1是佔位符,表示空出f1的第一個參數,而f1的第二個參數由"f1:hello world"填充。在後面的例子中將會大量出現這種用法,若是用戶對boost.bind並非很清楚,能夠先閱讀一下boost文檔中的相關章節。
第二個參數指定了協程棧空間的大小,orchid::coroutine提供了3個相關的函數:
std::size_t default_stack_size();//默認的棧大小,通常爲16個內存頁 std::size_t minimum_stack_size();//棧的最小值,通常爲2個內存頁面 std::size_t maximum_stack_size();//棧的最大值,通常與進程的棧的最大值(軟限制)相同。
用戶能夠根據本身的狀況指定所需棧空間的大小。當協程使用的棧空間超過了提供的棧空間的大小的時候,程序會異常結束, orchid並不會自動增長棧空間的大小,因此用戶必須預估出足夠的棧空間。
這個hello world的例子過於簡單,下面咱們將經過建立一個echo server的客戶端和服務器端來進一步瞭解orchid。
第二個栗子,讓咱們從網絡編程屆的hello world:echo server開始。echo server首先必需要處理鏈接事件,在orchid中,咱們建立一個協程來專門處理鏈接事件:
typedef boost::shared_ptr<orchid::socket> socket_ptr; //處理ACCEPT事件的協程 void handle_accept(orchid::coroutine_handle co) { try { orchid::acceptor acceptor(co -> get_scheduler().get_io_service()); acceptor.bind_and_listen("5678",true); for(;;) { socket_ptr sock(new orchid::socket(co -> get_scheduler().get_io_service())); acceptor.accept(*sock,co); co -> get_scheduler().spawn(boost::bind(handle_io,_1,sock),orchid::minimum_stack_size()); } } catch(boost::system::system_error& e) { cerr<<e.code()<<" "<<e.what()<<endl; } }
在上面的代碼中,咱們建立了一個green化的acceptor,並讓它監聽5678端口,而後在"阻塞"等待鏈接到來,當鏈接事件發生時,建立一個新的協程來服務新獲得的socket。green化的socket被包裹在智能指針中以參數形式傳遞給處理socket io事件的協程。處理套接字IO的協程以下:
//處理SOCKET IO事件的協程 void handle_io(orchid::coroutine_handle co,socket_ptr sock) { orchid::tcp_ostream out(*sock,co); orchid::tcp_istream in(*sock,co); for(std::string str;std::getline(in, str) && out;) { out<<str<<endl; } }
協程首先在傳入的套接字上建立了一個輸入流和一個輸出流,分別表明了TCP的輸入和輸出。而後不斷地從輸入流中讀取一行,並輸出到輸出流當中。當socket上的TCP鏈接斷開時,輸入流和輸出流的eof標誌爲會被置位,所以循環結束,協程退出。
orchid可使用戶以流的形式來操做套接字。輸入流和輸出流分別提供了std::istream和std::ostream的接口;輸入流和輸出流是帶緩衝的,若是用戶須要無緩衝的讀寫socket或者自建緩衝,能夠直接調用orchid::socket的read和write函數。可是須要注意這兩個函數會拋出boost::system_error異常來表示錯誤(參見benchmark_orchid_client和benchmar_orchid_server)。
最後是main函數:
int main() { orchid::scheduler sche; sche.spawn(handle_accept,orchid::coroutine::minimum_stack_size());//建立協程 sche.run(); }
而後咱們來看客戶端的代碼,在客戶端中,咱們建立100個併發的TCP鏈接不斷的向echo server發送hello world。
首先是處理socket io的協程:
void handle_io(orchid::coroutine_handle co) { orchid::descriptor stdout(co -> get_scheduler().get_io_service(),STDOUT_FILENO); orchid::socket sock_(co -> get_scheduler().get_io_service()); try { sock_.connect("127.0.0.1","5678",co); orchid::tcp_istream in(sock_,co); orchid::tcp_ostream out(sock_,co); orchid::descriptor_ostream console(stdout,co); out << "hello world !!!!" <<endl; for (string str;std::getline(in,str);) { console << str << endl; out << "hello world !!!!" <<endl; } } catch (const boost::system::system_error& e) { cerr<<e.code()<<" "<<e.what()<<endl; } }
處理socket io的協程分別建立了一個green化的socket和一個green話的標準輸出,而後鏈接到echo server上,不斷執行 輸出 -> 接收 -> 打印 這個流程。
爲了可以從外部打斷client的執行,咱們還須要一個協程來處理中斷信號,這樣咱們就能夠用ctrl+c來正確的中斷程序的執行:
void handle_sig(orchid::coroutine_handle co) { orchid::signal sig(co -> get_scheduler().get_io_service()); try { sig.add(SIGINT); sig.add(SIGTERM); sig.wait(co); co->get_scheduler().stop(); } catch (const boost::system::system_error& e) { cerr<<e.code()<<" "<<e.what()<<endl; } }
在這個協程中,協程「阻塞」在SIGINT 和 SIGTERM信號上,當信號發生時,調用調度器的stop方法來中斷程序的執行,並安全的回收資源。
int main() { orchid::scheduler sche; sche.spawn(handle_sig,orchid::coroutine::minimum_stack_size()); for (int i=0;i<100;++i) { sche.spawn(handle_io); } sche.run(); }
在客戶端的main函數中,咱們建立100個協程,同時向服務器發送hello world。
在上面這個echo server的例子中,咱們採用了一種 coroutine per connection 的編程模型,與傳統的 thread per connection 模型同樣的簡潔清晰,可是整個程序實際上運行在同一線程當中。 因爲協程的切換和內存開銷遠遠小於線程,所以咱們能夠輕易的同時啓動上千協程來同時服務上千鏈接,這是 thread per connection的模型很難作到的; 在性能方面,整個green化的IO系統其實是使用boost.asio這種高性能的異步io庫實現的,與原始的asio相比,orchid的性能損耗很是小,性能基本持平。 所以經過orchid,咱們能夠在保持同步IO模型簡潔性的同時,得到近似於異步IO模型的高性能。
在這個例子中,咱們將主要介紹orchid提供的協程間的通訊機制:chan。chan這個概念引用自golang的chan。chan表現爲一個阻塞消息隊列。 orchid提供的chan只支持 單生產者-單消費者 和 多生產者-單消費者 這兩種模型(在其餘模型,如多生產者-多消費者中,也能夠工做,單可能會出現某些消費者餓死的現象)。
在下面的例子中,代碼orchid::chan < int > ch(10) 表示建立一個大小爲10,裝載類型爲int的chan。 chan 有3個重要的接口:
// 向chan中發送一個對象。t爲要發送的對象,co爲當前協程的協程句柄。 // 當隊列爲空時,co會阻塞,當隊列不在爲空時,阻塞的協程會被喚醒; // 發送成功返回true,不然false。 bool send(const U& t,coroutine_pointer co); // 從chan中接收一對象。 // 當隊列滿時,co會阻塞,當隊列再也不滿時,阻塞的協程會被喚醒。 // 接收成功返回true,不然false。 bool recv(U& t,coroutine_pointer co); // 關閉一個chan,當chan關閉後,再調用send和recv都會直接返回false;而且全部阻塞在chan中的協程都會被喚醒 // 被喚醒的協程中 send/recv 返回false。 void close();
下面是一個簡單的生產者和消費者的例子:
//生產者,不斷髮送本身的ID給消費者 void sender(orchid::coroutine_handle co,int id,orchid::chan<int>& ch) { for (;;) { ch.send(id,co); } } //消費者,不斷接收生產者發送的ID並打印ID. void receiver(orchid::coroutine_handle co,orchid::chan<int>& ch) { orchid::descriptor stdout(co -> get_scheduler().get_io_service(),STDOUT_FILENO); orchid::descriptor_ostream console(stdout,co); int id; for (;;) { ch.recv(id,co); console<<"receiver receive: "<<id<<std::endl; } } //生產者和消費者運行在同一個調度器中。 void test_one_scheduler() { orchid::scheduler sche; orchid::chan<int> ch(10); for (int i=0;i<100;++i) { sche.spawn(boost::bind(sender,_1,i,boost::ref(ch))); } sche.spawn(boost::bind(receiver,_1,boost::ref(ch))); sche.run(); } //生產者和消費者運行再不一樣的調度器中。 void test_scheduler_group() { orchid::scheduler_group group(2); orchid::chan<int> ch(10); for (int i=0;i<100;++i) { group[i%2].spawn(boost::bind(sender,_1,i,boost::ref(ch))); } group[0].spawn(boost::bind(receiver,_1,boost::ref(ch))); group.run(); }
經過scheduler_group類咱們能夠方便的建立一組調度器,每一個調度器運行在一個單獨的線程中。能夠經過下標來訪問某個調度器;經過調用其run方法同時啓動多個調度器;經過調用其stop方法,同時中止多個調度器。
此次咱們來一個複雜一些的例子:chat server 和 chat client。從這個例子中咱們將看到一些有用的技巧,好比如何使用boost::shared_from_this來管理協程間共享對象的生命週期;如何利用boost.variant在一個chan中接收多種類型消息。
先從較爲簡單的chat clent開始:在chat client中咱們將建立兩個協程,一個不斷從本機的標準輸入讀取輸入,而後發送到chat server;另外一個則不斷從chat server接受消息併發送到本機的標準輸出上。
const static std::size_t STACK_SIZE = 64*1024; //客戶端類 class chat_client { public: chat_client(const string& ip,const string& port) :sche_(), stdin_(sche_.get_io_service(),STDIN_FILENO),//green化的標準輸入 stdout_(sche_.get_io_service(),STDOUT_FILENO),//green化的標準輸出 sock_(sche_.get_io_service()), is_logined_(false),ip_(ip),port_(port) { } ~chat_client() { } public: //建立協程並啓動調度器 void run() { sche_.spawn(boost::bind(&chat_client::handle_console,this,_1),STACK_SIZE); sche_.run(); } //中止調度器 void stop() { sche_.stop(); } private: // 不斷從chat server接收消息並打印到標準輸出上。 void receive_msg(orchid::coroutine_handle co) { string str; orchid::descriptor_ostream out(stdout_,co); orchid::tcp_istream in(sock_,co); for (string str;std::getline(in, str);) { out<<str<<endl; } } //不斷從標準輸入接收用戶輸入,並處理用戶輸入,發送消息chat server。 //登錄用 /l username //發送消息用 /s xxxxxxx //退出用 /q void handle_console(orchid::coroutine_handle co) { orchid::descriptor_istream in(stdin_,co); orchid::tcp_ostream out(sock_,co); //首先鏈接chat server try { sock_.connect(ip_,port_,co); } catch (boost::system::system_error& e) { cerr<<e.code()<<" "<<e.what()<<endl; return; } //鏈接成功則啓動接受消息的協程。 sche_.spawn(boost::bind(&chat_client::receive_msg,this,_1), STACK_SIZE); //不斷讀取標準輸入並進行處理。 for(string str;std::getline(in,str);) { if(str.empty()) continue; // 退出 「/q」 if(str.size() >= 2 && str[0] == '/' && str[1] == 'q') { sock_.close(); user_.clear(); is_logined_ = false; cerr<<"closed"<<endl; stop(); } // 發送消息 /s message else if(str.size() >= 4 && str[0] == '/' && str[1] =='s') { if(!is_logined_) { cerr<<"login first"<<endl; } else { out<<user_<<" : "<<str.substr(3)<<endl; } } // 登錄 「/l username」 else if(str.size() >= 4 && str[0] == '/' && str[1] == 'l') { if (!is_logined_) { user_ = str.substr(3); is_logined_ = true; } else { cerr<<"err: already logined!"<<endl; } } else { print_err(); } } } void print_err() { cerr<<"err: bad cmd!"<<endl <<"usage:"<<endl <<"login: /l username"<<endl <<"exit: /q"<<endl <<"send: /s xxxxxxxxxxxx"<<endl; } private: orchid::scheduler sche_; orchid::descriptor stdin_; orchid::descriptor stdout_; orchid::socket sock_; string user_; bool is_logined_; string ip_; string port_; }; int main(int argc,char* argv[]) { string ip = argv[1]; string port = argv[2]; chat_client client(ip,port); client.run(); }
在上面的代碼中,咱們利用了boost.bind的強大能力,使用類的成員函數建立了協程。
而後是chat server:
chat server中有2個類,server和client。類server實現了chat server的主要邏輯,類client則是客戶端代理類,負責從客戶端處接收和發送數據。
server類的職責包括:維護客戶端列表,廣播某個客戶端的發來的消息。 所以server處理的消息有2類,第一類是控制消息(下面代碼中的ctrl_t類型),表明了客戶端的到來和離開事件; 另一類是文本消息,server類須要向全部的客戶端轉發、廣播該類消息。不論是處理第一種類型的消息仍是處理第二種類型的消息,都須要訪問到其內部維護的客戶端列表。爲了同步這些訪問,咱們須要在同一個chan中接收這兩種消息。
const static std::size_t STACK_SIZE = 64*1024; template <typename Client> struct server { ///////////////typedef///////////// typedef server<Client> self_type; typedef boost::shared_ptr<Client> client_sp_type; typedef std::list<client_sp_type> client_list_type;// 客戶端列表類型, // 保存的是智能指針。 enum {REGISTER,UNREGISTER}; struct ctrl_t { int cmd_; client_sp_type client_; }; typedef boost::variant<string,ctrl_t> msg_type;//消息類型 //////////////////變量/////////// orchid::scheduler_group schedulers_; orchid::acceptor acceptor_; std::string port_; orchid::chan<msg_type> msg_ch_;//server的消息隊列 client_list_type clients_;//客戶端列表 /////////////////////////// server(std::size_t size,const string& port) :schedulers_(size),acceptor_(schedulers_[0].get_io_service()),port_(port),msg_ch_(512) { } ~server() { } //handle_msg是消息處理協程。不斷的從消息隊列中讀取消息,而後判斷消息類型,並處理。 void handle_msg(orchid::coroutine_handle co) { msg_type msg; for (;;) { msg_ch_.recv(msg,co); if(msg.which() == 0) {// 若是是string類型,即文本消息,則向全部的客戶端代理廣播。 for(typename client_list_type::iterator it = clients_.begin(); it != clients_.end(); ++it) { //向客戶端代理的chan中發送消息。 (*it) -> ch_.send(boost::get<string>(msg),co); } } else if(msg.which() == 1) {//若是是ctrl_t類型,即控制消息,則修改客戶端列表。 if(boost::get<ctrl_t>(msg).cmd_ == REGISTER) {//註冊消息 clients_.push_back(boost::get<ctrl_t>(msg).client_); } else if(boost::get<ctrl_t>(msg).cmd_ == UNREGISTER) {//反註冊消息 clients_.remove(boost::get<ctrl_t>(msg).client_); } else { throw std::runtime_error("unkonw cmd! should never hanppened!"); } } else { throw std::runtime_error("unkonw msg! should never hanppened!"); } } } //處理鏈接到來事件。當鏈接到來時,發送表示註冊的控制消息到消息隊列中。 void handle_accept(orchid::coroutine_handle co) { try { int index = 1; acceptor_.bind_and_listen(port_); for (;;) { if(index >= schedulers_.size()) index = 0; boost::shared_ptr<Client> c(new Client(schedulers_[index++],*this)); acceptor_.accept(c->sock_,co); c -> start(); ctrl_t msg; msg.cmd_ = REGISTER; msg.client_ = c; msg_ch_.send(msg,co); } } catch (boost::system::system_error& e) { cout<<e.code()<<" "<<e.what()<<endl; } } void run() { schedulers_[0].spawn(boost::bind(&self_type::handle_accept,this,_1),STACK_SIZE); schedulers_[0].spawn(boost::bind(&self_type::handle_msg,this,_1),STACK_SIZE); schedulers_.run(); } };
客戶端代理類的主要職責是:接收客戶端socket上的數據,併發送到server類中;接收server發來的消息,併發送到客戶端socket中;當客戶端斷開鏈接時,通知server。 不難看出,一個客戶端代理類對象會同時被3個協程訪問到:server的消息處理協程,client的sender和receiver協程。爲了安全的釋放客戶端代理類的對象,咱們使用引用計數的方案。引用計數經過boost::enable_shared_from_this來實現。
//客戶端代理類。 struct client:public boost::enable_shared_from_this<client> { orchid::scheduler& sche_; server<client>& server_; orchid::socket sock_; orchid::chan<string> ch_; client(orchid::scheduler& sche,server<client>& s) :sche_(sche),server_(s), sock_(sche_.get_io_service()),ch_(32) { } ~client() { } // 啓動發送和接收協程。注意這個地方的 this -> shared_from_this(), // 啓動的協程中保存了當前對象的智能指針。 // 當協程結束後,智能指針會釋放,並在引用計數爲0時,釋放當前對象。 void start() { sche_.spawn(boost::bind(&client::sender,this -> shared_from_this(),_1),STACK_SIZE); sche_.spawn(boost::bind(&client::receiver,this -> shared_from_this(),_1),STACK_SIZE); } //不斷從client的消息隊列中接收消息,並經過socket發送。 //chan被關閉後退出循環,協程結束。 void sender(orchid::coroutine_handle& co) { string str; orchid::tcp_ostream out(sock_,co); while(ch_.recv(str,co)) { out<<str<<endl; } } // 不斷從客戶端接收消息,直到客戶端斷開鏈接。 // 當鏈接斷開後,關閉自身的chan併發送反註冊信息到server的消息隊列中。 void receiver(orchid::coroutine_handle& co) { orchid::tcp_istream in(sock_,co); //客戶端斷開鏈接後會退出循環。 for (string str;std::getline(in,str);) { //向server的chan中發送消息。 server_.msg_ch_.send(str,co); } ch_.close(); server<client>::ctrl_t ctrl_msg; ctrl_msg.cmd_ = server<client>::UNREGISTER; ctrl_msg.client_ = this -> shared_from_this(); server_.msg_ch_.send(ctrl_msg, co); } }; int main(int argc,char* argv[]) { string port = argv[1]; server<client> s(4,port); s.run(); }
須要注意的是,對同一個套接字進行讀寫的協程應該創建在同一個調度器上。由於實際上從green化的IO對象的構造函數便可以看出:green化的IO對象是與調度器中io_service對象綁定的。當不一樣調度器上的協程訪問同一個IO對象的時,orchid不能保證其行爲的正確性。