基於協程的併發框架orchid簡介

orchid簡介

什麼是orchid?

orchid是一個構建於boost庫基礎上的C++庫,相似於python下的gevent/eventlet,爲用戶提供基於協程的併發模型。 python

什麼是協程:

協程,即協做式程序,其思想是,一系列互相依賴的協程間依次使用CPU,每次只有一個協程工做,而其餘協程處於休眠狀態。協程在控制離開時暫停執行,當控制再次進入時只能從離開的位置繼續執行。 協程已經被證實是一種很是有用的程序組件,不只被python、lua、ruby等腳本語言普遍採用,並且被新一代面向多核的編程語言如golang rust-lang等採用做爲併發的基本單位。 ios

協程能夠被認爲是一種用戶空間線程,與傳統的搶佔式線程相比,有2個主要的優勢:

  • 與線程不一樣,協程是本身主動讓出CPU,並交付他指望的下一個協程運行,而不是在任什麼時候候都有可能被系統調度打斷。所以協程的使用更加清晰易懂,而且多數狀況下不須要鎖機制。
  • 與線程相比,協程的切換由程序控制,發生在用戶空間而非內核空間,所以切換的代價很是的小。

green化

術語「green化」來自於python下著名的協程庫greenlet,指改造IO對象以能和協程配合。某種意義上,協程與線程的關係相似與線程與進程的關係,多個協程會在同一個線程的上下文之中運行。所以,當出現IO操做的時候,爲了可以與協程相互配合,只阻塞當前協程而非整個線程,須要將io對象「green化」。目前orchid提供的green化的io對象包括: c++

  • tcp socket(還不支持udp)
  • descriptor(目前僅支持非文件類型文件描述符,如管道和標準輸入/輸出,文件類型的支持會在之後版本添加)
  • timer (定時器)
  • signal (信號)

chan:協程間通訊

chan這個概念引用自golang的chan。每一個協程是一個獨立的執行單元,爲了可以方便協程之間的通訊/同步,orchid提供了chan這種機制。chan本質上是一個阻塞消息隊列,後面咱們將看到,chan不只能夠用於同一個調度器上的協程之間的通訊,並且能夠用於不一樣調度器上的協程之間的通訊。 git

多核

建議使用的scheduler per cpu的的模型來支持多核的機器,即爲每一個CPU核心分配一個調度器,有多少核心就建立多少個調度器。不一樣調度器的協程之間也能夠經過chan來通訊。協程應該被建立在那個調度器裏由用戶本身決定。 github

預備知識

orchid的實現嚴重依賴於boost,依賴的主要子庫包括:boost.context boost.asio boost.iostreams shared_ptr boost.bind 等等。若是用戶對這些子庫,尤爲是boost.asio和boost.bind、shared_ptr具備必定的瞭解的話,會更加有利於瞭解和使用orchid。固然若是不瞭解也沒有關係,本文會在後面的例子中對涉及的相關知識進行簡單的介紹。 golang

編譯與安裝

orchid 自己是個只包含頭文件的模板庫,拷貝到指定的目錄便可,可是orchid依賴的boost庫須要編譯。並且在使用orchid的時須要連接 boost_context boost_iostreams boost_system boost_thread 等子庫(參見unit_test裏的CMakeLists.txt)。 編程

boost須要採用最新的svn裏的版本,由於1.52及如下版本缺乏boost.atomic等子庫。 bootstrap

MAC OS

git clone https://github.com/ryppl/boost-svn
cd boost-svn
./bootstrap.sh
./b2 toolset=clang cxxflags="-arch x86_64" linkflags="-arch x86_64" install
cd ..
git clone https://github.com/ioriiod0/orchid.git
cd orchid
cp -r orchid <安裝路徑>

LINUX

git clone https://github.com/ryppl/boost-svn
cd boost-svn
./bootstrap.sh
./b2 install
cd ..
git clone https://github.com/ioriiod0/orchid.git
cd orchid
cp -r orchid <安裝路徑>

例子

第一個栗子:一個複雜一些的hello world

國際慣例,讓咱們從hello world開始。在這個例子中,咱們將看到如何建立和執行一個協程。 安全

//函數簽名爲 void(orchid::coroutine_handle) 的函數
void f(orchid::coroutine_handle co) {
    std::cout<<"f:hello world"<<std::endl;
}

//函數簽名爲 void(orchid::coroutine_handle,const char*) 的函數
void f1(orchid::coroutine_handle co,const char* str) {
    std::cout<<str<<std::endl;
}

void stop(orchid::coroutine_handle co) {
    co -> get_scheduler().stop();
}

int main(int argc,char* argv[]) {
    orchid::scheduler sche;
    sche.spawn(f,orchid::coroutine::minimum_stack_size());
    sche.spawn(boost::bind(f1,_1,"f1:hello world"),orchid::coroutine::default_stack_size());
    sche.spawn(stop);
    sche.run();
    std::cout<<"done!"<<std::endl;
}

程序的輸出: ruby

f:hello world
f1:hello world
done!

在這個例子中,咱們首先聲明一個調度器sche,而後調用sche的spawn方法以2種方式建立了2個協程來輸出hello world,最後調用調度器的run方法來執行整個程序。當程序執行時,2個協程依次被執行。須要注意的是,在調用run方法以前,被建立的協助程序並不會被執行,只有調用了run方法以後,被建立的協程纔會被調度執行。 調用run方法的線程會被阻塞,直到調度器的stop方法被調用爲止。 (實際上,在這個例子中咱們並無對std::cout進行green化,所以每次調用std::cout的時候,整個調度器/線程都會被阻塞,在後面的介紹中咱們將看到如何將IO對象green化)。

spawn方法有2個參數:

template <typename F>
void spawn(const F& func,std::size_t stack_size = coroutine_type::default_stack_size())

第一個參數func是協程被執行時所執行的函數。能夠認爲是協程的main函數。func能夠是任何符合函數簽名

void(orchid::coroutine_handle)

的函數或函數對象,如上面例子中的f。其中,orchid::coroutine_handle是協程的句柄,表明了func自己所處的協程。 每每要完成協程所執行的任務,僅僅有orchid::coroutine_handle是不夠的,好比函數f1,f1須要額外傳入一個const char* 的參數,對於這種函數,咱們能夠經過boost.bind、std::bind(tr1或者C++11)、或者lambda表達式(c++11)將他們適配成要求的函數類型。如上例中的:

sche.spawn(boost::bind(f1,_1,"f1:hello world"),orchid::coroutine::default_stack_size())

boost::bind將f1從void(orchid::coroutine,const char*)適配成了void(orchid::coroutine_handle)類型。其中_1是佔位符,表示空出f1的第一個參數,而f1的第二個參數由"f1:hello world"填充。在後面的例子中將會大量出現這種用法,若是用戶對boost.bind並非很清楚,能夠先閱讀一下boost文檔中的相關章節。

第二個參數指定了協程棧空間的大小,orchid::coroutine提供了3個相關的函數:

std::size_t default_stack_size();//默認的棧大小,通常爲16個內存頁
std::size_t minimum_stack_size();//棧的最小值,通常爲2個內存頁面
std::size_t maximum_stack_size();//棧的最大值,通常與進程的棧的最大值(軟限制)相同。

用戶能夠根據本身的狀況指定所需棧空間的大小。當協程使用的棧空間超過了提供的棧空間的大小的時候,程序會異常結束, orchid並不會自動增長棧空間的大小,因此用戶必須預估出足夠的棧空間。

這個hello world的例子過於簡單,下面咱們將經過建立一個echo server的客戶端和服務器端來進一步瞭解orchid。

第二個栗子:echo server

第二個栗子,讓咱們從網絡編程屆的hello world:echo server開始。echo server首先必需要處理鏈接事件,在orchid中,咱們建立一個協程來專門處理鏈接事件:

typedef boost::shared_ptr<orchid::socket> socket_ptr;

//處理ACCEPT事件的協程
void handle_accept(orchid::coroutine_handle co) {
    try {
        orchid::acceptor acceptor(co -> get_scheduler().get_io_service());
        acceptor.bind_and_listen("5678",true);
        for(;;) {
            socket_ptr sock(new orchid::socket(co -> get_scheduler().get_io_service()));
            acceptor.accept(*sock,co);
            co -> get_scheduler().spawn(boost::bind(handle_io,_1,sock),orchid::minimum_stack_size());
        }
    }
    catch(boost::system::system_error& e) {
        cerr<<e.code()<<" "<<e.what()<<endl;
    }
}

在上面的代碼中,咱們建立了一個green化的acceptor,並讓它監聽5678端口,而後在"阻塞"等待鏈接到來,當鏈接事件發生時,建立一個新的協程來服務新獲得的socket。green化的socket被包裹在智能指針中以參數形式傳遞給處理socket io事件的協程。處理套接字IO的協程以下:

//處理SOCKET IO事件的協程
void handle_io(orchid::coroutine_handle co,socket_ptr sock) {
    orchid::tcp_ostream out(*sock,co);
    orchid::tcp_istream in(*sock,co);
    for(std::string str;std::getline(in, str) && out;)
    {
        out<<str<<endl;
    }

}

協程首先在傳入的套接字上建立了一個輸入流和一個輸出流,分別表明了TCP的輸入和輸出。而後不斷地從輸入流中讀取一行,並輸出到輸出流當中。當socket上的TCP鏈接斷開時,輸入流和輸出流的eof標誌爲會被置位,所以循環結束,協程退出。

orchid可使用戶以流的形式來操做套接字。輸入流和輸出流分別提供了std::istream和std::ostream的接口;輸入流和輸出流是帶緩衝的,若是用戶須要無緩衝的讀寫socket或者自建緩衝,能夠直接調用orchid::socket的read和write函數。可是須要注意這兩個函數會拋出boost::system_error異常來表示錯誤(參見benchmark_orchid_client和benchmar_orchid_server)。

最後是main函數:

int main() {
    orchid::scheduler sche;
    sche.spawn(handle_accept,orchid::coroutine::minimum_stack_size());//建立協程
    sche.run();
}

而後咱們來看客戶端的代碼,在客戶端中,咱們建立100個併發的TCP鏈接不斷的向echo server發送hello world。

首先是處理socket io的協程:

void handle_io(orchid::coroutine_handle co) {
    orchid::descriptor stdout(co -> get_scheduler().get_io_service(),STDOUT_FILENO);
    orchid::socket sock_(co -> get_scheduler().get_io_service());
    try {
        sock_.connect("127.0.0.1","5678",co);
        orchid::tcp_istream in(sock_,co);
        orchid::tcp_ostream out(sock_,co);
        orchid::descriptor_ostream console(stdout,co);
        out << "hello world !!!!" <<endl;
        for (string str;std::getline(in,str);) {
            console << str << endl;
            out << "hello world !!!!" <<endl;
        }
    } catch (const boost::system::system_error& e) {
        cerr<<e.code()<<" "<<e.what()<<endl;
    }
}

處理socket io的協程分別建立了一個green化的socket和一個green話的標準輸出,而後鏈接到echo server上,不斷執行 輸出 -> 接收 -> 打印 這個流程。

爲了可以從外部打斷client的執行,咱們還須要一個協程來處理中斷信號,這樣咱們就能夠用ctrl+c來正確的中斷程序的執行:

void handle_sig(orchid::coroutine_handle co) {
    orchid::signal sig(co -> get_scheduler().get_io_service());
    try {
        sig.add(SIGINT);
        sig.add(SIGTERM);
        sig.wait(co);
        co->get_scheduler().stop();

    } catch (const boost::system::system_error& e) {
        cerr<<e.code()<<" "<<e.what()<<endl;
    }
}

在這個協程中,協程「阻塞」在SIGINT 和 SIGTERM信號上,當信號發生時,調用調度器的stop方法來中斷程序的執行,並安全的回收資源。

int main() {
    orchid::scheduler sche;
    sche.spawn(handle_sig,orchid::coroutine::minimum_stack_size());
    for (int i=0;i<100;++i) {
        sche.spawn(handle_io);
    }
    sche.run();
}

在客戶端的main函數中,咱們建立100個協程,同時向服務器發送hello world。

在上面這個echo server的例子中,咱們採用了一種 coroutine per connection 的編程模型,與傳統的 thread per connection 模型同樣的簡潔清晰,可是整個程序實際上運行在同一線程當中。 因爲協程的切換和內存開銷遠遠小於線程,所以咱們能夠輕易的同時啓動上千協程來同時服務上千鏈接,這是 thread per connection的模型很難作到的; 在性能方面,整個green化的IO系統其實是使用boost.asio這種高性能的異步io庫實現的,與原始的asio相比,orchid的性能損耗很是小,性能基本持平。 所以經過orchid,咱們能夠在保持同步IO模型簡潔性的同時,得到近似於異步IO模型的高性能。

第三個栗子:生產者-消費者

在這個例子中,咱們將主要介紹orchid提供的協程間的通訊機制:chan。chan這個概念引用自golang的chan。chan表現爲一個阻塞消息隊列。 orchid提供的chan只支持 單生產者-單消費者 和 多生產者-單消費者 這兩種模型(在其餘模型,如多生產者-多消費者中,也能夠工做,單可能會出現某些消費者餓死的現象)。

在下面的例子中,代碼orchid::chan < int > ch(10) 表示建立一個大小爲10,裝載類型爲int的chan。 chan 有3個重要的接口:

// 向chan中發送一個對象。t爲要發送的對象,co爲當前協程的協程句柄。
// 當隊列爲空時,co會阻塞,當隊列不在爲空時,阻塞的協程會被喚醒;
// 發送成功返回true,不然false。
bool send(const U& t,coroutine_pointer co);

// 從chan中接收一對象。
// 當隊列滿時,co會阻塞,當隊列再也不滿時,阻塞的協程會被喚醒。
// 接收成功返回true,不然false。
bool recv(U& t,coroutine_pointer co);

// 關閉一個chan,當chan關閉後,再調用send和recv都會直接返回false;而且全部阻塞在chan中的協程都會被喚醒
// 被喚醒的協程中 send/recv 返回false。
void close();

下面是一個簡單的生產者和消費者的例子:

//生產者,不斷髮送本身的ID給消費者
void sender(orchid::coroutine_handle co,int id,orchid::chan<int>& ch) {
    for (;;) {
        ch.send(id,co);
    }
}
//消費者,不斷接收生產者發送的ID並打印ID.
void receiver(orchid::coroutine_handle co,orchid::chan<int>& ch) {
    orchid::descriptor stdout(co -> get_scheduler().get_io_service(),STDOUT_FILENO);
    orchid::descriptor_ostream console(stdout,co);
    int id;
    for (;;) {
        ch.recv(id,co);
        console<<"receiver receive: "<<id<<std::endl;
    }
}

//生產者和消費者運行在同一個調度器中。
void test_one_scheduler() {
    orchid::scheduler sche;
    orchid::chan<int> ch(10);
    for (int i=0;i<100;++i) {
        sche.spawn(boost::bind(sender,_1,i,boost::ref(ch)));
    }
    sche.spawn(boost::bind(receiver,_1,boost::ref(ch)));
    sche.run();
}

//生產者和消費者運行再不一樣的調度器中。
void test_scheduler_group() {
    orchid::scheduler_group group(2);
    orchid::chan<int> ch(10);
    for (int i=0;i<100;++i) {
        group[i%2].spawn(boost::bind(sender,_1,i,boost::ref(ch)));
    }
    group[0].spawn(boost::bind(receiver,_1,boost::ref(ch)));
    group.run();
}

經過scheduler_group類咱們能夠方便的建立一組調度器,每一個調度器運行在一個單獨的線程中。能夠經過下標來訪問某個調度器;經過調用其run方法同時啓動多個調度器;經過調用其stop方法,同時中止多個調度器。

第四個栗子:chat server

此次咱們來一個複雜一些的例子:chat server 和 chat client。從這個例子中咱們將看到一些有用的技巧,好比如何使用boost::shared_from_this來管理協程間共享對象的生命週期;如何利用boost.variant在一個chan中接收多種類型消息。

先從較爲簡單的chat clent開始:在chat client中咱們將建立兩個協程,一個不斷從本機的標準輸入讀取輸入,而後發送到chat server;另外一個則不斷從chat server接受消息併發送到本機的標準輸出上。

const static std::size_t STACK_SIZE = 64*1024;

//客戶端類
class chat_client {
public:
    chat_client(const string& ip,const string& port)
        :sche_(),
        stdin_(sche_.get_io_service(),STDIN_FILENO),//green化的標準輸入
        stdout_(sche_.get_io_service(),STDOUT_FILENO),//green化的標準輸出
        sock_(sche_.get_io_service()),
        is_logined_(false),ip_(ip),port_(port) {

    }
    ~chat_client() {

    }
public:
    //建立協程並啓動調度器
    void run() {
        sche_.spawn(boost::bind(&chat_client::handle_console,this,_1),STACK_SIZE);
        sche_.run();
    }
    //中止調度器
    void stop() {
        sche_.stop();
    }

private:
    // 不斷從chat server接收消息並打印到標準輸出上。
    void receive_msg(orchid::coroutine_handle co) {
        string str;
        orchid::descriptor_ostream out(stdout_,co);
        orchid::tcp_istream in(sock_,co);
        for (string str;std::getline(in, str);) {
            out<<str<<endl;
        }
    }

    //不斷從標準輸入接收用戶輸入,並處理用戶輸入,發送消息chat server。
    //登錄用 /l username
    //發送消息用 /s xxxxxxx
    //退出用 /q
    void handle_console(orchid::coroutine_handle co) {
        orchid::descriptor_istream in(stdin_,co);
        orchid::tcp_ostream out(sock_,co);
        //首先鏈接chat server
        try {
            sock_.connect(ip_,port_,co);
        } catch (boost::system::system_error& e) {
            cerr<<e.code()<<" "<<e.what()<<endl;
            return;
        }
        //鏈接成功則啓動接受消息的協程。
        sche_.spawn(boost::bind(&chat_client::receive_msg,this,_1), STACK_SIZE);
        //不斷讀取標準輸入並進行處理。
        for(string str;std::getline(in,str);) {
            if(str.empty()) continue;
            // 退出 「/q」
            if(str.size() >= 2 && str[0] == '/' && str[1] == 'q') { 
                sock_.close();
                user_.clear();
                is_logined_ = false;
                cerr<<"closed"<<endl;
                stop();
            }
            // 發送消息 /s message
            else if(str.size() >= 4 && str[0] == '/' && str[1] =='s') {
                if(!is_logined_) {
                    cerr<<"login first"<<endl;
                } else {
                    out<<user_<<" : "<<str.substr(3)<<endl;
                }
            }
            // 登錄 「/l username」
            else if(str.size() >= 4 && str[0] == '/' && str[1] == 'l') {
                if (!is_logined_) {
                    user_ = str.substr(3);
                    is_logined_ = true;
                } else {
                    cerr<<"err: already logined!"<<endl;
                }
            } else {
                print_err();
            }
        }

    }

    void print_err() {
           cerr<<"err: bad cmd!"<<endl
            <<"usage:"<<endl
            <<"login: /l username"<<endl
            <<"exit: /q"<<endl
            <<"send: /s xxxxxxxxxxxx"<<endl;
    }

private:
    orchid::scheduler sche_;
    orchid::descriptor stdin_;
    orchid::descriptor stdout_;
    orchid::socket sock_;
    string user_;
    bool is_logined_;
    string ip_;
    string port_;

};

int main(int argc,char* argv[]) {
    string ip = argv[1];
    string port = argv[2];
    chat_client client(ip,port);
    client.run();
}

在上面的代碼中,咱們利用了boost.bind的強大能力,使用類的成員函數建立了協程。

而後是chat server:

chat server中有2個類,server和client。類server實現了chat server的主要邏輯,類client則是客戶端代理類,負責從客戶端處接收和發送數據。

server類的職責包括:維護客戶端列表,廣播某個客戶端的發來的消息。 所以server處理的消息有2類,第一類是控制消息(下面代碼中的ctrl_t類型),表明了客戶端的到來和離開事件; 另一類是文本消息,server類須要向全部的客戶端轉發、廣播該類消息。不論是處理第一種類型的消息仍是處理第二種類型的消息,都須要訪問到其內部維護的客戶端列表。爲了同步這些訪問,咱們須要在同一個chan中接收這兩種消息。

const static std::size_t STACK_SIZE = 64*1024;

template <typename Client>
struct server {
    ///////////////typedef/////////////
    typedef server<Client> self_type;
    typedef boost::shared_ptr<Client> client_sp_type; 
    typedef std::list<client_sp_type> client_list_type;// 客戶端列表類型,
                                                        // 保存的是智能指針。
    enum {REGISTER,UNREGISTER};
    struct ctrl_t {
        int cmd_;
        client_sp_type client_;
    };
    typedef boost::variant<string,ctrl_t> msg_type;//消息類型

    //////////////////變量///////////
    orchid::scheduler_group schedulers_;
    orchid::acceptor acceptor_;
    std::string port_;
    orchid::chan<msg_type> msg_ch_;//server的消息隊列
    client_list_type clients_;//客戶端列表
    ///////////////////////////

    server(std::size_t size,const string& port)
        :schedulers_(size),acceptor_(schedulers_[0].get_io_service()),port_(port),msg_ch_(512) {
    }
    ~server() {
    }

    //handle_msg是消息處理協程。不斷的從消息隊列中讀取消息,而後判斷消息類型,並處理。
    void handle_msg(orchid::coroutine_handle co) {
        msg_type msg;
        for (;;) {
            msg_ch_.recv(msg,co);
            if(msg.which() == 0) {// 若是是string類型,即文本消息,則向全部的客戶端代理廣播。
                for(typename client_list_type::iterator it = clients_.begin(); it != clients_.end(); ++it) {
                    //向客戶端代理的chan中發送消息。
                    (*it) -> ch_.send(boost::get<string>(msg),co);
                }
            } else if(msg.which() == 1) {//若是是ctrl_t類型,即控制消息,則修改客戶端列表。
                if(boost::get<ctrl_t>(msg).cmd_ == REGISTER) {//註冊消息
                    clients_.push_back(boost::get<ctrl_t>(msg).client_);
                } else if(boost::get<ctrl_t>(msg).cmd_ == UNREGISTER) {//反註冊消息
                    clients_.remove(boost::get<ctrl_t>(msg).client_);
                } else {
                    throw std::runtime_error("unkonw cmd! should never hanppened!");
                }
            } else {
                throw std::runtime_error("unkonw msg! should never hanppened!");
            }
        }
    }

    //處理鏈接到來事件。當鏈接到來時,發送表示註冊的控制消息到消息隊列中。
    void handle_accept(orchid::coroutine_handle co) {
    try {
        int index = 1;
        acceptor_.bind_and_listen(port_);
        for (;;) {
            if(index >= schedulers_.size()) index = 0;
            boost::shared_ptr<Client> c(new Client(schedulers_[index++],*this));
            acceptor_.accept(c->sock_,co);
            c -> start();
            ctrl_t msg;
            msg.cmd_ = REGISTER;
            msg.client_ = c;
            msg_ch_.send(msg,co);
        }
    } catch (boost::system::system_error& e) {
        cout<<e.code()<<" "<<e.what()<<endl;
    }
}

    void run() {
        schedulers_[0].spawn(boost::bind(&self_type::handle_accept,this,_1),STACK_SIZE);
        schedulers_[0].spawn(boost::bind(&self_type::handle_msg,this,_1),STACK_SIZE);
        schedulers_.run();
    }

};

客戶端代理類的主要職責是:接收客戶端socket上的數據,併發送到server類中;接收server發來的消息,併發送到客戶端socket中;當客戶端斷開鏈接時,通知server。 不難看出,一個客戶端代理類對象會同時被3個協程訪問到:server的消息處理協程,client的sender和receiver協程。爲了安全的釋放客戶端代理類的對象,咱們使用引用計數的方案。引用計數經過boost::enable_shared_from_this來實現。

//客戶端代理類。
struct client:public boost::enable_shared_from_this<client> {
    orchid::scheduler& sche_;
    server<client>& server_;
    orchid::socket sock_;
    orchid::chan<string> ch_;

    client(orchid::scheduler& sche,server<client>& s)
        :sche_(sche),server_(s),
        sock_(sche_.get_io_service()),ch_(32) {

    }
    ~client() {

    }

    // 啓動發送和接收協程。注意這個地方的 this -> shared_from_this(),
    // 啓動的協程中保存了當前對象的智能指針。
    // 當協程結束後,智能指針會釋放,並在引用計數爲0時,釋放當前對象。
    void start() {
         sche_.spawn(boost::bind(&client::sender,this -> shared_from_this(),_1),STACK_SIZE);
         sche_.spawn(boost::bind(&client::receiver,this -> shared_from_this(),_1),STACK_SIZE);
    }

    //不斷從client的消息隊列中接收消息,並經過socket發送。
    //chan被關閉後退出循環,協程結束。
    void sender(orchid::coroutine_handle& co) {
        string str;
        orchid::tcp_ostream out(sock_,co);
        while(ch_.recv(str,co)) {
            out<<str<<endl;
        }
    }

    // 不斷從客戶端接收消息,直到客戶端斷開鏈接。
    // 當鏈接斷開後,關閉自身的chan併發送反註冊信息到server的消息隊列中。
    void receiver(orchid::coroutine_handle& co) {
        orchid::tcp_istream in(sock_,co);
        //客戶端斷開鏈接後會退出循環。
        for (string str;std::getline(in,str);) {
            //向server的chan中發送消息。
            server_.msg_ch_.send(str,co);
        }
        ch_.close();
        server<client>::ctrl_t ctrl_msg;
        ctrl_msg.cmd_ = server<client>::UNREGISTER;
        ctrl_msg.client_ = this -> shared_from_this();
        server_.msg_ch_.send(ctrl_msg, co);

    }
};

int main(int argc,char* argv[]) {
    string port = argv[1];
    server<client> s(4,port);
    s.run();
}

須要注意的是,對同一個套接字進行讀寫的協程應該創建在同一個調度器上。由於實際上從green化的IO對象的構造函數便可以看出:green化的IO對象是與調度器中io_service對象綁定的。當不一樣調度器上的協程訪問同一個IO對象的時,orchid不能保證其行爲的正確性。

相關文章
相關標籤/搜索