我開始一直搞不懂協程是什麼,網上搜一搜,(尤爲是Golang的goroutine)感受從概念上聽起來有點像線程池,尤爲是相似Java的ExcutorService相似的東西java
package helloworld; import java.util.Calendar; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallMe { static class Call implements Callable<String>{ @Override public String call() throws Exception { Date d = Calendar.getInstance().getTime(); return d.toString(); } } public static void main(String[] args) throws Exception{ ExecutorService pool = Executors.newSingleThreadExecutor(); Future<String> str = pool.submit(new Call()); pool.shutdown(); String ret = str.get(); System.out.println(ret); } }
package main import ( "fmt" "time" ) func CallMe(pipe chan string) { t := time.Now() pipe <- t.String() } func main() { pipe := make(chan string, 1) defer close(pipe) go CallMe(pipe) select { case v, ok := <-pipe: if !ok { fmt.Println("Read Error") } fmt.Println(v) } }
是的,協程除了它要解決的問題上,其餘能夠說就是線程。ios
那麼協程要解決什麼問題呢?程序員
這要從協程爲何火起來講起。線程池很好,但線程是由操做系統調度的,而且線程切換代價太大,每每須要耗費數千個CPU週期。golang
在同步阻塞的編程模式下,
當併發量很大、IO密集時,每每一個任務剛進入線程池就阻塞在IO,就可能(由於線程切換是不可控的)須要切換線程,這時線程切換的代價就不能夠忽視了。sql
後來人們發現異步非阻塞的模型能解決這個問題,當被IO阻塞時,直接調用非阻塞接口,註冊一個回調函數,當前線程繼續進行,也就不用切換線程了。但理想很豐滿,現實很骨感,異步的回調各類嵌套讓程序員的人生更加悲慘。數據庫
因而協程應運重生。編程
協程就是由程序員控制跑在線程裏的「微線程」。它能夠由程序員調度,切換協程時代價小(切換根據實現不一樣,消耗的CPU週期從幾十到幾百不等),建立時耗費資源小。十分適用IO密集的場景。api
boost的Coroutine2不一樣於Goroutine,golang的協程調度是由Go語言完成,而boost::coroutine2的協程須要本身去調度。promise
#include <boost\coroutine2\all.hpp> #include <cstdlib> #include <iostream> using namespace boost; using namespace std; class X { public: X() { cout << "X()\n"; } ~X() { cout << "~X()\n"; system("pause"); } }; void foo(boost::coroutines2::coroutine<void>::pull_type& pull) { X x; cout << "a\n"; pull(); cout << "b\n"; pull(); cout << "c\n"; } int main() { coroutines2::coroutine<void>::push_type push(foo); cout << "1\n"; push(); cout << "2\n"; push(); cout << "3\n"; push(); return 0; }
調用push_type和pull_type的operator()就會讓出當前執行流程給對應的協程。push_type能夠給pull_type傳遞參數,而pull_type經過調用get來獲取。多線程
你也能夠寫成這樣
boost::coroutines2::coroutine<void>::pull_type pull([](coroutine<void>::push_type &push){...})
它和上面的區別是,新建的pull_type會當即進入綁定的函數中(哪裏能夠調用push(),哪一個協程先執行)
那若是在main結束以前,foo裏沒有執行完,那foo裏的X會析構嗎?
Boost文檔裏說會的,這個叫作Stack unwinding
。
咱們不妨把main函數裏最後一個push();
去掉,這樣後面就不會切換到foo的context了。會發現雖然foo中的"c"沒有輸出,但X仍是析構了的。
在實際生產中,咱們更適合用fiber來解決問題。fiber有調度器,使用簡單,不須要手動控制執行流程。
#include <boost\fiber\all.hpp> #include <chrono> #include <string> #include <ctime> #include <iostream> #include <cstdlib> using namespace std; using namespace boost; void callMe(fibers::buffered_channel<string>& pipe) { std::time_t result = std::time(nullptr); string timestr = std::asctime(std::localtime(&result)); pipe.push(timestr); } int main() { fibers::buffered_channel<string> pipe(2); fibers::fiber f([&]() {callMe(pipe); }); f.detach(); string str; pipe.pop(str); cout << str << "\n"; system("pause"); return 0; }
boost::fibers是一個擁有調度器的協程。看上去fiber已經和goroutine徹底同樣了。在fiber裏不能調用任何阻塞線程的接口,由於一旦當前fiber被阻塞,那意味着當前線程的全部fiber都被阻塞了。所以全部跟協程相關的阻塞接口都須要本身實現一套協程的包裝,好比this_fiber::sleep_for()
。這也意味着數據庫之類的操做沒辦法被fiber中直接使用。但好在fiber提供了一系列方法去解決這個問題。
int read_chunk( NonblockingAPI & api, std::string & data, std::size_t desired) { int error; while ( EWOULDBLOCK == ( error = api.read( data, desired) ) ) { boost::this_fiber::yield(); } return error; }
主要思想就是,當前fiber調用非阻塞api輪詢,一旦發現該接口會阻塞,就調用boost::this_fiber::yield()
讓出執行權限給其餘協程,知道下次得到執行權限,再次查看是否阻塞。
std::pair< AsyncAPI::errorcode, std::string > read_ec( AsyncAPI & api) { typedef std::pair< AsyncAPI::errorcode, std::string > result_pair; boost::fibers::promise< result_pair > promise; boost::fibers::future< result_pair > future( promise.get_future() ); // We promise that both 'promise' and 'future' will survive until our lambda has been called. // Need C++14 api.init_read([promise=std::move( promise)]( AsyncAPI::errorcode ec, std::string const& data) mutable { promise.set_value( result_pair( ec, data) ); }); return future.get(); }
這種實現方法主要是利用了異步IO不會阻塞當前fiber,在異步的回調中給fibers::promise
設值。當異步操做未返回時,若是依賴到異步的結果,在調用future.get()
時就會讓出執行權限給其餘協程。
同步IO不能夠直接應用到fiber中,由於會阻塞當前線程而致使線程全部的fiber都阻塞。
若是一個接口只有同步模式,好比官方的Mysql Connector,那咱們只能先利用多線程模擬一個異步接口,而後再把它當作異步IO去處理。
如何用多線程把同步接口包裝成異步接口呢?
以下,包裝好後就能夠利用上面異步IO的方法再包裝一個fiber可使用的IO接口。
#include <boost/asio.hpp> #include <boost/fiber/all.hpp> #include <string> #include <thread> #include <cstdlib> #include <stdio.h> using namespace std; using namespace boost; class AsyncWrapper { public: void async_read(const string fileName, function<void (const string &)> callback) { auto fun = [=]() { FILE* fp = fopen(fileName.c_str(), "r"); char buff[1024]; string tmp; while (nullptr != fgets(buff, 1024, fp)) { tmp += buff; } fclose(fp); callback(tmp); }; asio::post(pool, fun); } static void wait() { pool.join(); } protected: static asio::thread_pool pool; }; asio::thread_pool AsyncWrapper::pool(2); int main() { AsyncWrapper wrap; string file = "./temp.txt"; wrap.async_read(file, [](const string& result) {printf("%s\n", result.c_str()); }); AsyncWrapper::wait(); std::system("pause"); return 0; }
golang的好處就在於它已經幫你完成了上述的封裝過程,它把全部的IO操做都封裝成了阻塞同步調用模式,無非也是經過上面兩種方法。這樣程序員調用的時候 感受本身在寫同步的代碼,但卻能享受異步/非阻塞帶來的好處。
例如
package main import ( "log" "os" "sync" "time" ) var wg sync.WaitGroup func blockSleep() { log.Printf("blockSleep Before bock----------") time.Sleep(1 * time.Second) log.Printf("blockSleep After bock----------") wg.Done() } func writeFile() { log.Printf("writeFile Before bock+++++++++++") f, _ := os.Create("./temp.txt") defer f.Close() log.Printf("writeFile Before Write+++++++++++") f.WriteString("Hello World") log.Printf("writeFile After bock+++++++++++") wg.Done() } func main() { go writeFile() wg.Add(1) for i := 1; i < 5; i++ { go blockSleep() wg.Add(1) } wg.Wait() }
這段代碼time.Sleep
和os.Create
都會形成當前協程讓出CPU。其輸出以下
2018/05/31 13:53:19 blockSleep Before bock---------- 2018/05/31 13:53:19 blockSleep Before bock---------- 2018/05/31 13:53:19 writeFile Before bock+++++++++++ 2018/05/31 13:53:19 blockSleep Before bock---------- 2018/05/31 13:53:19 writeFile Before Write+++++++++++ 2018/05/31 13:53:19 writeFile After bock+++++++++++ 2018/05/31 13:53:19 blockSleep Before bock---------- 2018/05/31 13:53:20 blockSleep After bock---------- 2018/05/31 13:53:20 blockSleep After bock---------- 2018/05/31 13:53:20 blockSleep After bock---------- 2018/05/31 13:53:20 blockSleep After bock----------