本文將從源碼實現上對 libgo 的調度策略進行分析,主要涉及到上一篇文章中的三個結構體的定義:c++
三者的關係以下圖所示:安全
本文會列出類內的主要成員和主要函數作以分析。app
libgo/scheduler/scheduler.hide
class Scheduler{ public: /* * 建立一個調度器,初始化 libgo * 建立主線程的執行器,若是後續 STart 的時候沒有參數,默認只有一個執行器去作 * 當僅使用一個線程進行協程調度時, 協程地執行會嚴格地遵循其建立順序. * */ static Scheduler* Create(); /* * 建立一個協程 Task 對象,並添加到當前的執行器 processer 的任務隊列中, * 調度器的任務數 taskCount_ +1 * */ void CreateTask(TaskF const& fn, TaskOpt const& opt); /* 啓動調度器 * @minThreadNumber : 最小調度線程數, 爲0時, 設置爲cpu核心數. * @maxThreadNumber : 最大調度線程數, 爲0時, 設置爲minThreadNumber. * 若是maxThreadNumber大於minThreadNumber, 則當協程產生長時間阻塞時, * 能夠自動擴展調度線程數. * 喚醒定時器線程 * 每一個調度線程都會調用 Process 開始調度,最後開啓 id 爲 0 的調度線程 * 若是 maxThreadNumber_ > 1 的話,會開啓調度線程 DispatcherThread * */ void Start(int minThreadNumber = 1, int maxThreadNumber = 0); /* * 中止調度,中止後沒法恢復, 僅用於安全退出main函數 * 若是某個調度線程被協程阻塞, 必須等待阻塞結束才能退出. * */ void Stop(); private: /* * 調度線程,主要爲平衡多個 processer 的負載將高負載或阻塞的 p 中的協程 steal 給低負載的 p * 若是所有阻塞可是還有協程待執行,會起新線程,線程數不超過 maxThreadNumber_ * 會將阻塞 P 中的協程分攤給負載較少的 P * */ void DispatcherThread(); /* * 建立一個新的 Processer,並添加到雙端隊列 processers_ 中 * */ void NewProcessThread(); private: atomic_t<uint32_t> taskCount_{0}; // 用來統計協程數量 Deque<Processer*> processers_; // DispatcherThread雙端隊列,用來存放全部的執行器,每一個執行器都會單獨開一個線程去執行,線程中回調 Process() 方法。 LFLock started_; // libgo 提供的自選鎖 };
調度器負責管理 1~N 個調度線程,每一個調度線程一個執行器 Processer。調度器僅負責均衡各個執行器的負載,防止所有卡住的狀況,並不涉及協程的切換等工做。函數
ligbo提供了默認的協程調度器 co_sched學習
#define g_Scheduler ::co::Scheduler::getInstance() #define co_sched g_Scheduler
用戶也能夠建立本身的協程調度器ui
co::Scheduler* my_sched = co::Scheduler::Create();
啓動調度this
std::thread t([my_sched]{mysched->Start();}); t.detach();
schedule 負責整個系統的協程調度,協程的運行依賴於執行器 Processer(簡稱 P),所以在調度器初始化的時候會選擇建立 P 的數量(支持動態增加),全部的執行器會添加到雙端隊列中。主線程也做爲一個執行器,在建立 Scheduler 對象的時候建立,位於雙端隊列下標爲 0 的位置(注意:只是建立對象,並無開始運行);atom
當調用了 Start() 函數後,會正式開始運行。在 Start 函數內部,會建立指定數量的執行器 P,具體數量取決於參數,默認會建立 minThreadNumber 個,當所有執行器都阻塞以後,會動態擴展,最多 maxThreadNumber 個執行器。每一個執行器都會運行於一個單獨的線程,執行器負責該線程內部協程的切換和執行;spa
當建立協程時,會將協程添加到某一個處於活躍狀態的執行器,若是剛好都不活躍,也會添加到某一個 P 中,這並不影響執行器的正常工做,由於調度器的調度線程會去處理它;
Start 函數內部,除了上述執行器所在線程,還會開啓調度線程 DispatcherThread,調度線程會平衡各個 P 的協程數量和負載,進行 steal,若是全部 P 都阻塞,會根據 maxThreadNumber 動態增長 P 的數量,若是僅僅部分 P 阻塞,會將阻塞的 P 中的協程所有拿出(steal),均攤到負載最小的 P 中;
Schedule 也會選擇性開啓協程的定時器線程;
關於定時器以及時鐘的實現,會在以後的文章中討論。
libgo/scheduler/processer.h
每一個協程執行器對應一個線程,負責本線程的協程調度,但並不是線程安全的,是協程調度的核心。
class Processer { public: // 協程掛起標識,用於後續進行喚醒和超時判斷 struct SuspendEntry { // ... }; // 協程切出 ALWAYS_INLINE static void StaticCoYield(); // 掛起當前協程 static SuspendEntry Suspend(); // 掛起當前協程, 並在指定時間後自動喚醒 static SuspendEntry Suspend(FastSteadyClock::duration dur); // 喚醒協程 static bool Wakeup(SuspendEntry const& entry); private: /* * 執行器對協程的調度,也是執行器所在如今的主處理邏輯 * */ void Process(); /* * 從當前執行器中偷 n 個協程並返回 * n 爲0則所有偷出來,不然取出相應的個數 * */ SList<Task> Steal(std::size_t n); /* * 給當前執行器打標記,用於檢測協程是否阻塞 * */ void Mark(); private: int id_; // 線程 id,與 shcedule 中的 _processer 下標對應 Scheduler * scheduler_; // 該執行器依賴的調度器 volatile bool active_ = true; // 該執行器的活躍狀態,活躍代表該執行器未被阻塞,由調度器的調度線程控制 volatile int64_t markTick_ = 0; // mark 的時間戳 volatile uint64_t markSwitch_ = 0; // mark 的時候處於第幾回協程調度 volatile uint64_t switchCount_ = 0; // 協程調度的次數 // 當前正在運行的協程 Task* runningTask_{nullptr}; Task* nextTask_{nullptr}; // 協程隊列 typedef TSQueue<Task, true> TaskQueue; TaskQueue runnableQueue_; // 運行協程隊列 TaskQueue waitQueue_; // 等待協程隊列 TSQueue<Task, false> gcQueue_; // 待回收的協程隊列,協程運行完畢以後,會被添加到該隊列中,等待回收 TaskQueue newQueue_; // 新添加到該執行器中的協程,包括剛剛 steal 過來的協程,該隊列中的協程暫不會執行,會由 Process() 函數將該隊列中的協程不斷添加到 runnableQueue_ 中 volatile uint64_t switchCount_ = 0; // 協程調度的次數 // 執行器等待的條件變量 std::mutex cvMutex_; std::condition_variable cv_; std::atomic_bool waiting_{false}; };
執行器 Processer 維護了三個線程安全的協程隊列:
void Processer::Process() { GetCurrentProcesser() = this; bool & isStop = *stop_; while (!isStop) { runnableQueue_.front(runningTask_); // 獲取一個能夠運行對協程對象 if (!runningTask_) { if (AddNewTasks()) runnableQueue_.front(runningTask_); if (!runningTask_) { WaitCondition(); // 沒有能夠執行的協程,wait 條件變量 AddNewTasks(); continue; } } addNewQuota_ = 1; while (runningTask_ && !isStop) { runningTask_->state_ = TaskState::runnable; runningTask_->proc_ = this; ++switchCount_; runningTask_->SwapIn(); switch (runningTask_->state_) { case TaskState::runnable: { std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); auto next = (Task*)runningTask_->next; if (next) { runningTask_ = next; runningTask_->check_ = runnableQueue_.check_; break; } if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) { runningTask_ = nullptr; } else { lock.unlock(); if (AddNewTasks()) { runnableQueue_.next(runningTask_, runningTask_); -- addNewQuota_; } else { std::unique_lock<TaskQueue::lock_t> lock2(runnableQueue_.LockRef()); runningTask_ = nullptr; } } } break; case TaskState::block: { std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; case TaskState::done: default: { runnableQueue_.next(runningTask_, nextTask_); if (!nextTask_ && addNewQuota_ > 0) { if (AddNewTasks()) { runnableQueue_.next(runningTask_, nextTask_); -- addNewQuota_; } } DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo()); runnableQueue_.erase(runningTask_); if (gcQueue_.size() > 16) // 執行完畢的協程,須要回收資源 GC(); gcQueue_.push(runningTask_); if (runningTask_->eptr_) { std::exception_ptr ep = runningTask_->eptr_; std::rethrow_exception(ep); } std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); runningTask_ = nextTask_; nextTask_ = nullptr; } break; } } } }
在調度器 Schedule 執行 Stop() 函數以前,執行器 P 會一直處於調度協程階段 Process()。在期間,執行器 P 會將運行隊列 runnableQueue 中的第一個協程獲取進行執行,若是可運行隊列爲空,執行器會嘗試將處於 newQueue 中的協程添加到可運行隊列中去,若是 newQueue_ 爲空,說明此時該執行器處於無協程可調度狀態,經過設置條件變量,將執行器設置爲等待狀態;
當獲取到一個可執行協程以後,會執行該協程的任務。協程的執行流程是經過狀態機來實現的。(協程有三個狀態:運行中,阻塞,執行完畢)
Processer 使用了 std::mutex,而且提供了條件變量用來喚醒。當調度器嘗試獲取下一個可運行的協程對象時,若此時無可用協程對象,就會主動去等待該條件變量,默認100毫秒的超時時間。
void Processer::WaitCondition() { GC(); std::unique_lock<std::mutex> lock(cvMutex_); waiting_ = true; cv_.wait_for(lock, std::chrono::milliseconds(100)); waiting_ = false; } void Processer::NotifyCondition() { cv_.notify_all(); }
當調度器向該執行器中增長了新的協程對象時,會喚醒該條件變量,繼續執行 Process 流程。使用條件變量喚醒的效率,要遠遠高於不斷去輪詢。
爲何在使用了條件變量後還要設置超時時間,定時輪詢,即便條件變量沒有被喚醒也但願它返回呢?
由於咱們不但願線程會在這裏阻塞,只要沒有新的協程加入,就一直在死等。咱們但願線程在等待的同時,也能夠定時跳出,執行一些其它的檢測工做等。
簡單來講,從執行器中取協程出來,就是從執行器維護的雙端隊列中獲取執行個數的結點。
爲何要取出來?前面提到過,要麼該執行器負載過大,要麼該執行器處於阻塞的狀態。
SList<Task> Processer::Steal(std::size_t n) { if (n > 0) { // steal 指定個數協程 newQueue_.AssertLink(); auto slist = newQueue_.pop_back(n); newQueue_.AssertLink(); if (slist.size() >= n) return slist; std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size()); if (pushRunningTask) runnableQueue_.pushWithoutLock(runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock(nextTask_); lock.unlock(); slist2.append(std::move(slist)); if (!slist2.empty()) DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size()); return slist2; } else { // steal all newQueue_.AssertLink(); auto slist = newQueue_.pop_all(); newQueue_.AssertLink(); std::unique_lock<TaskQueue::lock_t> lock(runnableQueue_.LockRef()); bool pushRunningTask = false, pushNextTask = false; if (runningTask_) pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_); if (nextTask_) pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_); auto slist2 = runnableQueue_.pop_allWithoutLock(); if (pushRunningTask) runnableQueue_.pushWithoutLock(runningTask_); if (pushNextTask) runnableQueue_.pushWithoutLock(nextTask_); lock.unlock(); slist2.append(std::move(slist)); if (!slist2.empty()) DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size()); return slist2; } }
首先,會從 newQueue 隊列中獲取協程結點,由於 newQueue 中的結點尚未添加到運行隊列中,所以能夠直接取出;若是 newQueue 中協程數量不足,會從 runnableQueue 隊列尾部中繼續獲取結點。因爲 runnableQueue 隊列中咱們記錄了正在執行的協程和下一次將執行的協程(runningTask & nextTask),須要特殊處理。在從 runnableQueue 偷協程以前,會將 runningTask & nextTask 從隊列刪除,待偷完結點以後再次添加到當前 runnableQueue_ 隊列中。
簡單說,偷協程的工做,不會從隊列中獲取到 runningTask & nextTask 標識的協程。
void Processer::Mark() { if (runningTask_ && markSwitch_ != switchCount_) { markSwitch_ = switchCount_; markTick_ = NowMicrosecond(); } } uint32_t cycle_timeout_us = 10 * 1000; bool Processer::IsBlocking() { if (!markSwitch_ || markSwitch_ != switchCount_) return false; return NowMicrosecond() > markTick_ + CoroutineOptions::getInstance().cycle_timeout_us; }
Mark 函數會在調度器的調度函數中被調用,須要注意的是,只有執行器處於活躍狀態時纔會調用。Mark 顧名思義,是給該執行打標記,會記錄mark的時間戳,並記錄下是在第多少次協程調度的過程當中作了標記,Mark 的做用是用來進行執行器的阻塞檢測。
處於活躍狀態的執行器,老是在執行着協程的切換,所以,會不斷自增 switchCount_ 的值,根據 IsBlocking 函數得知,當咱們此時標籤記錄的協程調度次數超過10ms沒有發生改變,咱們認爲該執行器發生阻塞,Scheduler 會進行 Steal 操做。
static SuspendEntry Suspend();
一種方式是直接掛起,會將該協程狀態轉換爲 TaskState::block,而後將該協程從 runnableQueue 中刪除,再添加到 waitQueue 中;
另一種方式是掛起以後(第一種方式執行完畢以後),容許配置一個時間段以後去自動喚醒該協程。
用於喚醒協程
喚醒協程要作的,就是講待喚醒的協程從 waitQueue_ 中刪除並從新添加到 newQueue_中去。
用於在一個執行器中切出當前協程
有兩種可能,一種是協程被阻塞須要掛起;另一種是協程執行完畢,主動切出。
具體實現是經過獲取當前執行器正在執行的協程 Task,調用 SwapOut() 方法實現。
ALWAYS_INLINE void Processer::StaticCoYield() { auto proc = GetCurrentProcesser(); if (proc) proc->CoYield(); } ALWAYS_INLINE void Processer::CoYield() { Task *tk = GetCurrentTask(); assert(tk); ++ tk->yieldCount_; #if ENABLE_DEBUGGER DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_)); if (Listener::GetTaskListener()) Listener::GetTaskListener()->onSwapOut(tk->id_); #endif tk->SwapOut(); }
#define co_yield do { ::co::Processer::StaticCoYield(); } while (0)
# 協程狀態 enum class TaskState { runnable, // 可運行 block, // 阻塞 done, // 協程運行完畢 }; typedef std::function<void()> TaskF; // c++11提供的函數模板 struct Task { TaskState state_ = TaskState::runnable; uint64_t id_; // 當前調度器下協程編號,從0開始 TaskF fn_; // 協程運行的函數 uint64_t yieldCount_ = 0; // 協程切出的次數 Context ctx_; // 上下文信息 Processer* proc_ = nullptr; // 歸屬於哪一個執行器 // 提供了協程切入、切出、切換到指定線程三個函數 ALWAYS_INLINE void SwapIn(); ALWAYS_INLINE void SwapTo(Task* other); ALWAYS_INLINE void SwapOut(); private: static void StaticRun(intptr_t vp); // 參數爲 Task*,函數會去執行該 Task 的 fn_(),執行完畢後,協程狀態改成 TaskState::done,並在執行器 P 中切出 };
每一個 Task 對象是一個協程,在使用過程當中,建立一個協程實際就是建立了一個 Task 對象,再添加到對應的執行器 P 中。以前提到過,執行器進行協程調度是經過一個狀態機來實現的,這裏的 TaskState 就是協程狀態,協程函數 fn_ 會在 StaticRun 靜態方法中調用,該靜態方法註冊到了協程上下文 _ctx 中。
除此以外,Task 類內部,也提供了協程的切入切出方法,本質也是調用了上下文的切換。
控制協程的運行,內部調用了 Task::Run() 方法,會在協程函數 fn_ 執行完畢以後,將協程狀態轉換爲 TaskState::done,並將協程切出。
void Task::Run() { auto call_fn = [this]() { this->fn_(); this->fn_ = TaskF(); //讓協程function對象的析構也在協程中執行 }; \\ ... call_fn(); \\ ... state_ = TaskState::done; Processer::StaticCoYield(); } void Task::StaticRun(intptr_t vp) { Task* tk = (Task*)vp; tk->Run(); }
這裏就是對 libgo 調度相關實現的描述,本文跳過了對定時器和時鐘部分的實現,這個會在以後單獨敘述。本文涉及到的代碼在源碼目錄下的
libgo-master/libgo/scheduler/processer.cpp libgo-master/libgo/scheduler/processer.h libgo-master/libgo/scheduler/scheduler.cpp libgo-master/libgo/scheduler/scheduler.h
有興趣的讀者能夠對照源碼學習,歡迎討論學習