transport_layer網絡傳輸層模塊源碼實現四
關於做者
前滴滴出行技術專家,現任OPPO文檔數據庫mongodb負責人,負責oppo千萬級峯值TPS/十萬億級數據量文檔數據庫mongodb內核研發及運維工做,一直專一於分佈式緩存、高性能服務端、數據庫、中間件等相關研發。後續持續分享《MongoDB內核源碼設計、性能優化、最佳運維實踐》,Github帳號地址:https://github.com/y123456yzgit
《mongodb內核源碼實現、性能調優、最佳運維實踐系列》文章有先後邏輯關係,請閱讀本篇文章前,提早閱讀以下模塊:github
mongodb網絡傳輸層模塊源碼實現二mongodb
- 說明
本文分析網絡傳輸層模塊中的最後一個子模塊:service_executor服務運行子模塊,即線程模型子模塊。在閱讀該文章前,請提早閱讀下<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>、<<transport_layer網絡傳輸層模塊源碼實現二>>、<<transport_layer網絡傳輸層模塊源碼實現三>>,這樣有助於快速理解本文分享的線程模型子模塊。緩存
線程模型設計在數據庫性能指標中起着很是重要的做用,所以本文將重點分析mongodb服務層線程模型設計,體驗mongodb如何經過優秀的工做線程模型來達到多種業務場景下的性能極致表現。該模塊主要代碼實現文件以下:性能優化
service_executor線程模型子模塊,在代碼實現中,把線程模型分爲兩種:synchronous線程模式和adaptive線程模型,這兩種線程模型中用於任務調度運行的線程統稱爲worker工做線程。Mongodb啓動的時候經過配置參數net.serviceExecutor來肯定採用那種線程模式運行mongo實例,配置方式以下:網絡
1.//synchronous同步線程模式配置,一個連接已給線程 2.net: 3. serviceExecutor: synchronous 4. 5.//動態線程池模式配置 6.net: 7. serviceExecutor: adaptive
2. synchronous同步線程模型(一個連接已給線程)設計原理及核心代碼實現session
Synchronous同步線程模型也就是每接收到一個連接,就建立一個線程專門負責該連接對應全部的客戶端請求,也就是該連接的全部訪問至始至終由同一個線程負責處理。多線程
2.1 核心代碼實現原理
該線程模型核心代碼實現由ServiceExecutorSynchronous類負責,該類注意成員變量和重要接口以下:
1.//同步線程模型對應ServiceExecutorSynchronous類 2.class ServiceExecutorSynchronous final : public ServiceExecutor { 3.public: 4. //ServiceExecutorSynchronous初始化 5. explicit ServiceExecutorSynchronous(ServiceContext* ctx); 6. //獲取系統CPU個數 7. Status start() override; 8. //shutdown處理 9. Status shutdown(Milliseconds timeout) override; 10. //線程管理及任務入隊處理 11. Status schedule(Task task, ScheduleFlags flags) override; 12. //同步線程模型對應mode 13. Mode transportMode() const override { 14. return Mode::kSynchronous; 15. } 16. //獲取該模型統計信息 17. void appendStats(BSONObjBuilder* bob) const override; 18. 19.private: 20. //私有線程隊列 21. static thread_local std::deque<Task> _localWorkQueue; 22. //遞歸深度 23. static thread_local int _localRecursionDepth; 24. //空閒線程數,例如某個連接當前沒有請求,則該線程阻塞在讀操做上面等待數據讀到來 25. static thread_local int64_t _localThreadIdleCounter; 26. //shutdown的時候設置爲false,連接沒關閉前一直爲true 27. AtomicBool _stillRunning{false}; 28. //當前conn線程數,參考ServiceExecutorSynchronous::schedul 29. AtomicWord<size_t> _numRunningWorkerThreads{0}; 30. //cpu個數 31. size_t _numHardwareCores{0}; 32.};
ServiceExecutorSynchronous類核心成員變量及其功能說明以下:
每一個連接對應的線程都有三個私有成員,分別是:線程隊列、遞歸深度、idle頻度,這三個線程私有成員的做用以下:
- _localWorkQueue:線程私有隊列,task任務入隊及出隊執行都是經過該隊列完成
- _localRecursionDepth:任務遞歸深度控制,避免堆棧溢出
- _localThreadIdleCounter:當線程運行多少次任務後,須要短暫的休息一下子,默認運行0xf次task任務就調用markThreadIdle()一次
同步線程模型子模塊最核心的代碼實現以下:
1.//ServiceStateMachine::_scheduleNextWithGuard 啓動新的conn線程 2.Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) { 3. //若是_stillRunning爲false,則直接返回 4. if (!_stillRunning.load()) { 5. return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; 6. } 7. //隊列不爲空,說明由任務須要運行,同步線程模型只有新鏈接第一次經過SSM進入該函數的時候爲空 8. //其餘狀況都不爲空 9. if (!_localWorkQueue.empty()) { 10. //kMayYieldBeforeSchedule標記當返回客戶端應答成功後,開始接收下一個新請求,這時候會設置該標記 11. if (flags & ScheduleFlags::kMayYieldBeforeSchedule) { 12. //也就是若是該連接對應的線程若是連續處理了0xf個請求,則須要休息一下子 13. if ((_localThreadIdleCounter++ & 0xf) == 0) { 14. //短暫休息會兒後再處理該連接的下一個用戶請求 15. //其實是調用TCMalloc MarkThreadTemporarilyIdle實現 16. markThreadIdle(); 17. } 18. //連接數即線程數超過了CPU個數,則每處理完一個請求,就yield一次 19. if (_numRunningWorkerThreads.loadRelaxed() > _numHardwareCores) { 20. stdx::this_thread::yield();//線程本次不參與CPU調度,也就是放慢腳步 21. } 22. } 23. //帶kMayRecurse標識,說明即將調度執行的是dealTask 24. //若是遞歸深度小於synchronousServiceExecutorRecursionLimit,則執行task 25. if ((flags & ScheduleFlags::kMayRecurse) && 26. (_localRecursionDepth < synchronousServiceExecutorRecursionLimit.loadRelaxed())) { 27. ++_localRecursionDepth; 28. //遞歸深度沒有超限,則直接執行task,不用入隊 29. task(); 30. } else { 31. //入隊,等待 32. _localWorkQueue.emplace_back(std::move(task)); 33. } 34. return Status::OK(); 35. } 36. //建立conn線程,線程名conn-xx(其實是從listener線程繼承過來的,這時候的Listener線程是父線程,在 37. //ServiceStateMachine::start中已經過線程守護ThreadGuard改成conn-xx),執行對應的task 38. Status status = launchServiceWorkerThread([ this, task = std::move(task) ] { 39. //說明來了一個新連接,線程數自增 40. int ret = _numRunningWorkerThreads.addAndFetch(1); 41. //新連接到來的第一個任務其實是readTask任務 42. _localWorkQueue.emplace_back(std::move(task)); 43. while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { 44. //每次任務若是是經過線程私有隊列獲取運行,則恢復遞歸深度爲初始值1 45. _localRecursionDepth = 1; 46. //取出該線程擁有的私有隊列上的第一個任務運行 47. _localWorkQueue.front()(); 48. //該任務已經執行完畢,把該任務從隊列移除 49. _localWorkQueue.pop_front(); 50. } 51. //走到這裏說明線程異常了或者須要退出,如連接關閉,須要消耗線程 52. ...... 53. }); 54. return status; 55.}
從上面的代碼能夠看出,worker工做線程經過_localRecursionDepth控制task任務的遞歸深度,當遞歸深度超過最大深度synchronousServiceExecutorRecursionLimit值,則把任務到_localWorkQueue隊列,而後從隊列獲取task任務執行。
此外,爲了達到性能的極致發揮,在每次執行task任務的時候作了以下細節設計,這些細節設計在高壓力狀況下,能夠提高5%的性能提高:
- 每運行oxf次任務,就經過markThreadIdle()讓線程idle休息一下子
- 若是線程數大於CPU核數,則每執行一個任務前都讓線程yield()一次
2.2該模塊函數接口總結大全
synchronous同步線程模型全部接口及其功能說明以下表所示:
- Adaptive動態線程模型設計原理及核心代碼實現
adaptive動態線程模型,會根據當前系統的訪問負載動態的調整線程數,當線程CPU工做比較頻繁的時候,控制線程增長工做線程數;當線程CPU比較空閒後,本線程就會自動銷燬退出,整體worker工做線程數就會減小。
3.1 動態線程模型核心源碼實現
動態線程模型核心代碼實現由ServiceExecutorAdaptive負責完成,該類核心成員變量及核心函數接口以下:
1.class ServiceExecutorAdaptive : public ServiceExecutor { 2.public: 3. //初始化構造 4. explicit ServiceExecutorAdaptive(...); 5. explicit ServiceExecutorAdaptive(...); 6. ServiceExecutorAdaptive(...) = default; 7. ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default; 8. virtual ~ServiceExecutorAdaptive(); 9. //控制線程及worker線程初始化建立 10. Status start() final; 11. //shutdown處理 12. Status shutdown(Milliseconds timeout) final; 13. //任務調度運行 14. Status schedule(Task task, ScheduleFlags flags) final; 15. //adaptive動態線程模型對應Mode 16. Mode transportMode() const final { 17. return Mode::kAsynchronous; 18. } 19. //統計信息 20. void appendStats(BSONObjBuilder* bob) const final; 21. //獲取runing狀態 22. int threadsRunning() { 23. return _threadsRunning.load(); 24. } 25. //新鍵一個worker線程 26. void _startWorkerThread(); 27. //worker工做線程主循環while{}處理 28. void _workerThreadRoutine(int threadId, ThreadList::iterator it); 29. //control控制線程主循環,主要用於控制何時增長線程 30. void _controllerThreadRoutine(); 31. //判斷隊列中的任務數和可用線程數大小,避免任務task飢餓 32. bool _isStarved() const; 33. //asio網絡庫io上下文 34. std::shared_ptr<asio::io_context> _ioContext; //早期ASIO中叫io_service 35. //TransportLayerManager::createWithConfig賦值調用 36. std::unique_ptr<Options> _config; 37. //線程列表及其對應的鎖 38. mutable stdx::mutex _threadsMutex; 39. ThreadList _threads; 40. //控制線程 41. stdx::thread _controllerThread; 42. 43. //TransportLayerManager::createWithConfig賦值調用 44. //時間嘀嗒處理 45. TickSource* const _tickSource; 46. //運行狀態 47. AtomicWord<bool> _isRunning{false}; 48. //kThreadsRunning表明已經執行過task的線程總數,也就是這些線程不是剛剛建立起來的 49. AtomicWord<int> _threadsRunning{0}; 50. //表明當前剛建立或者正在啓動的線程總數,也就是建立起來尚未執行task的線程數 51. AtomicWord<int> _threadsPending{0}; 52. //當前正在執行task的線程 53. AtomicWord<int> _threadsInUse{0}; 54. //當前入隊還沒執行的task數 55. AtomicWord<int> _tasksQueued{0}; 56. //當前入隊還沒執行的deferredTask數 57. AtomicWord<int> _deferredTasksQueued{0}; 58. //TransportLayerManager::createWithConfig賦值調用 59. //沒什麼實際做用 60. TickTimer _lastScheduleTimer; 61. //記錄這個退出的線程生命期內執行任務的總時間 62. AtomicWord<TickSource::Tick> _pastThreadsSpentExecuting{0}; 63. //記錄這個退出的線程生命期內運行的總時間(包括等待IO及運行IO任務的時間) 64. AtomicWord<TickSource::Tick> _pastThreadsSpentRunning{0}; 65. //完成線程級的統計 66. static thread_local ThreadState* _localThreadState; 67. 68. //總的入隊任務數 69. AtomicWord<int64_t> _totalQueued{0}; 70. //總執行的任務數 71. AtomicWord<int64_t> _totalExecuted{0}; 72. //從任務被調度入隊,到真正被執行這段過程的時間,也就是等待被調度的時間 73. AtomicWord<TickSource::Tick> _totalSpentQueued{0}; 74. 75. //shutdown的時候等待線程消耗的條件變量 76. stdx::condition_variable _deathCondition; 77. //條件變量,若是發現工做線程壓力大,爲了不task飢餓 78. //通知controler線程,通知見ServiceExecutorAdaptive::schedule,等待見_controllerThreadRoutine 79. stdx::condition_variable _scheduleCondition; 80.};
ServiceExecutorAdaptive類核心成員變量及其功能說明以下:
從上面的成員變量列表看出,隊列、線程這兩個大類能夠進一步細化爲不一樣的小類,以下:
- 線程:_threadsRunning、threadsPending、_threadsInUsed
- 隊列:_totalExecuted、_tasksQueued、deferredTasksQueued
從上面的ServiceExecutorAdaptive類中的核心接口函數代碼實現能夠概括爲以下三類:
- 時間計數相關核心代碼實現
- Worker工做線程建立及任務調度相關核心接口代碼實現
- controler控制線程設計原理及核心代碼實現
3.1.1 線程運行時間計算相關核心代碼實現
線程運行時間計算核心算法以下:
1.//線程運行時間統計,包含兩種類型時間統計 2.enum class ThreadTimer 3.{ 4. //線程執行task任務的時間+等待數據的時間 5. Running, 6. //只包含線程執行task任務的時間 7. Executing 8.}; 9. 10.//線程私有統計信息,記錄該線程運行時間,運行時間分爲兩種: 11.//1. 執行task任務的時間 2. 若是沒有客戶端請求,線程就會等待,這就是線程等待時間 12.struct ThreadState { 13. //構造初始化 14. ThreadState(TickSource* ts) : running(ts), executing(ts) {} 15. //線程一次循環處理的時間,包括IO等待和執行對應網絡事件對應task的時間 16. CumulativeTickTimer running; 17. //線程一次循環處理中執行task任務的時間,也就是真正工做的時間 18. CumulativeTickTimer executing; 19. //遞歸深度 20. int recursionDepth = 0; 21.}; 22. 23.//獲取指定which類型的工做線程相關運行時間, 24.//例如Running表明線程總運行時間(等待數據+任務處理) 25.//Executing只包含執行task任務的時間 26.TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal(ThreadTimer which) const { 27. //獲取一個時間嘀嗒tick 28. TickSource::Tick accumulator; 29. //先把已消耗的線程的數據統計出來 30. switch (which) { 31. //獲取生命週期已經結束的線程執行任務的總時間(只包括執行任務的時間) 32. case ThreadTimer::Running: 33. accumulator = _pastThreadsSpentRunning.load(); 34. break; 35. //獲取生命週期已經結束的線程整個生命週期時間(包括空閒時間+執行任務時間) 36. case ThreadTimer::Executing: 37. accumulator = _pastThreadsSpentExecuting.load(); 38. break; 39. } 40. //而後再把統計當前正在運行的worker線程的不一樣類型的統計時間統計出來 41. stdx::lock_guard<stdx::mutex> lk(_threadsMutex); 42. for (auto& thread : _threads) { 43. switch (which) { 44. //獲取當前線程池中全部工做線程執行任務時間 45. case ThreadTimer::Running: 46. accumulator += thread.running.totalTime(); 47. break; 48. //獲取當前線程池中全部工做線程整個生命週期時間(包括空閒時間+執行任務時間) 49. case ThreadTimer::Executing: 50. accumulator += thread.executing.totalTime(); 51. break; 52. } 53. } 54. //返回的時間計算包含了已銷燬的線程和當前正在運行的線程的相關統計 55. return accumulator; 56.}
Worker工做線程啓動後的時間能夠包含兩類:1. 線程運行task任務的時間;2.線程等待客戶端請求的時間。一個線程建立起來,若是沒有客戶端請求,則線程就會等待接收數據。若是有客戶端請求,線程就會經過隊列獲取task任務運行。這兩類時間分別表明線程」忙」和「空閒」。
線程總的「忙」狀態時間=全部線程運行task任務的時間,包括已經銷燬的線程。線程總的「空閒」時間=全部線程等待獲取任務執行的時間,也包括已銷燬的線程,線程空閒通常是沒有客戶端請求,或者客戶端請求不多。Worker工做線程對應while(){}循環每循環一次都會進行線程私有運行時間ThreadState計數,總的時間統計就是以該線程私有統計信息爲基準求和而來。
3.1.2 worker工做線程建立、銷燬及task任務處理
worker工做線程在以下狀況下建立或者銷燬:1. 線程池初始化;2. controler控制線程發現當前線程池中線程比較」忙」,則會動態建立新的工做線程;3. 工做線程在while體中每循環一次都會判斷當前線程池是否很」閒」,若是很」閒」則本線程直接銷燬退出。
Worker工做線程建立核心源碼實現以下:
1.Status ServiceExecutorAdaptive::start() { 2. invariant(!_isRunning.load()); 3. //running狀態 4. _isRunning.store(true); 5. //控制線程初始化建立,線程回調ServiceExecutorAdaptive::_controllerThreadRoutine 6. _controllerThread = stdx::thread(&ServiceExecutorAdaptive::_controllerThreadRoutine, this); 7. //啓動時候默認啓用CPU核心數/2個worker線程 8. for (auto i = 0; i < _config->reservedThreads(); i++) { 9. //建立一個工做線程 10. _startWorkerThread(); 11. } 12. return Status::OK(); }
worker工做線程默認初始化爲CPU/2個,初始工做線程數也能夠經過指定的命令行參數來配置:adaptiveServiceExecutorReservedThreads。此外,start()接口默認也會建立一個controler控制線程。
Task任務經過SSM狀態機調用ServiceExecutorAdaptive::schedule()接口入隊,該函數接口核心代碼實現以下:
1.Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, ScheduleFlags flags) { 2. //獲取當前時間 3. auto scheduleTime = _tickSource->getTicks(); 4. //kTasksQueued: 普通tak,也就是dealTask 5. //_deferredTasksQueued: deferred task,也就是readTask 6. //defered task和普通task分開記錄 _totalQueued=_deferredTasksQueued+_tasksQueued 7. auto pendingCounterPtr = (flags & kDeferredTask) ? &_deferredTasksQueued : &_tasksQueued; 8. //相應隊列 9. pendingCounterPtr->addAndFetch(1); 10. ...... 11. //這裏面的task()執行後-task()執行前的時間纔是CPU真正工做的時間 12. auto wrappedTask = [ this, task = std::move(task), scheduleTime, pendingCounterPtr ] { 13. //worker線程回調會執行該wrappedTask, 14. pendingCounterPtr->subtractAndFetch(1); 15. auto start = _tickSource->getTicks(); 16. //從任務被調度入隊,到真正被執行這段過程的時間,也就是等待被調度的時間 17. //從任務被調度入隊,到真正被執行這段過程的時間 18. _totalSpentQueued.addAndFetch(start - scheduleTime); 19. //recursionDepth=0說明開始進入調度處理,後續有多是遞歸執行 20. if (_localThreadState->recursionDepth++ == 0) { 21. //記錄wrappedTask被worker線程調度執行的起始時間 22. _localThreadState->executing.markRunning(); 23. //當前正在執行wrappedTask的線程加1 24. _threadsInUse.addAndFetch(1); 25. } 26. //ServiceExecutorAdaptive::_workerThreadRoutine執行wrappedTask後會調用guard這裏的func 27. const auto guard = MakeGuard([this, start] { //改函數在task()運行後執行 28. //每執行一個任務完成,則遞歸深度自減 29. if (--_localThreadState->recursionDepth == 0) { 30. //wrappedTask任務被執行消耗的總時間 31. //_localThreadState->executing.markStopped()表明任務該task執行的時間 32. _localThreadState->executingCurRun += _localThreadState->executing.markStopped(); 33. //下面的task()執行完後,正在執行task的線程-1 34. _threadsInUse.subtractAndFetch(1); 35. } 36. //總執行的任務數,task每執行一次增長一次 37. _totalExecuted.addAndFetch(1); 38. }); 39. //運行任務 40. task(); 41. }; 42. //kMayRecurse標識的任務,會進行遞歸調用 dealTask進入調度的時候調由該標識 43. if ((flags & kMayRecurse) && //遞歸調用,任務仍是由本線程處理 44. //遞歸深度還沒達到上限,則仍是由本線程繼續調度執行wrappedTask任務 45. (_localThreadState->recursionDepth + 1 < _config->recursionLimit())) { 46. //本線程立馬直接執行wrappedTask任務,不用入隊到boost-asio全局隊列等待調度執行 47. //io_context::dispatch io_context::dispatch 48. _ioContext->dispatch(std::move(wrappedTask)); 49. } else { //入隊 io_context::post 50. //task入隊到schedule得全局隊列,等待工做線程調度 51. _ioContext->post(std::move(wrappedTask)); 52. } 53. // 54. _lastScheduleTimer.reset(); 55. //總的入隊任務數 56. _totalQueued.addAndFetch(1); 57. //kDeferredTask真正生效在這裏 58. //若是隊列中的任務數大於可用線程數,說明worker壓力過大,須要建立新的worker線程 59. if (_isStarved() && !(flags & kDeferredTask)) {//kDeferredTask真正生效在這裏 60. //條件變量,通知controler線程,通知_controllerThreadRoutine控制線程處理 61. _scheduleCondition.notify_one(); 62. } 63. return Status::OK(); 64.}
從上面的分析能夠看出,schedule()主要完成task任務入隊處理。若是帶有遞歸標識kMayRecurse,則經過_ioContext->dispatch()接口入隊,該接口再ASIO底層實現的時候實際上沒有真正把任務添加到全局隊列,而是直接當前線程繼續處理,這樣就實現了遞歸調用。若是沒有攜帶kMayRecurse遞歸標識,則task任務經過_ioContext->post()須要入隊到全局隊列。ASIO庫的dispatch接口和post接口的具體實現能夠參考:
<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>
若是任務入隊到全局隊列,則線程池中的worker線程就會經過全局鎖競爭從隊列中獲取task任務執行,該流程經過以下接口實現:
1.//建立線程的回掉函數,線程循環主體,從隊列獲取task任務執行 2.void ServiceExecutorAdaptive::_workerThreadRoutine( 3. int threadId, ServiceExecutorAdaptive::ThreadList::iterator state) { 4. //設置線程模 5. _localThreadState = &(*state); 6. { 7. //worker-N線程名 8. std::string threadName = str::stream() << "worker-" << threadId; 9. setThreadName(threadName); 10. } 11. //該線程第一次執行while中的任務的時候爲ture,後面都是false 12. //表示該線程是剛建立起來的,尚未執行任何一個task 13. bool stillPending = true; 14. 15. //線程退出的時候執行如下{},都是一些計數清理 16. const auto guard = MakeGuard([this, &stillPending, state] { 17. //該worker線程退出前的計數清理、信號通知處理 18. //...... 19. } 20. while (_isRunning.load()) { 21. ...... 22. //本次循環執行task的時間,不包括網絡IO等待時間 23. state->executingCurRun = 0; 24. try { 25. //經過_ioContext和入隊的任務聯繫起來 26. asio::io_context::work work(*_ioContext); 27. //記錄開始時間,也就是任務執行開始時間 28. state->running.markRunning(); 29. //執行ServiceExecutorAdaptive::schedule中對應的task 30. //線程第一次運行task任務,最多從隊列拿一個任務執行 31. //runTime.toSystemDuration()指定一次run最多運行多長時間 32. if (stillPending) { 33. //執行一個任務就會返回 34. _ioContext->run_one_for(runTime.toSystemDuration()); 35. } else { // Otherwise, just run for the full run period 36. //_ioContext對應的全部任務都執行完或者toSystemDuration超時後纔會返回 37. _ioContext->run_for(runTime.toSystemDuration()); //io_context::run_for 38. } 39. ...... 40. } 41. //該線程第一次執行while中的任務後設置ture,後面都是false 42. if (stillPending) { 43. _threadsPending.subtractAndFetch(1); 44. stillPending = false; 45. //當前線程數比初始線程數多 46. } else if (_threadsRunning.load() > _config->reservedThreads()) { 47. //表明本次循環該線程真正處理任務的時間與本次循環總時間(總時間包括IO等待和IO任務處理時間) 48. double executingToRunning = state->executingCurRun / static_cast<double>(spentRunning); 49. executingToRunning *= 100; 50. dassert(executingToRunning <= 100); 51. 52. int pctExecuting = static_cast<int>(executingToRunning); 53. //線程不少,超過了指定配置,而且知足這個條件,該worker線程會退出,線程比較空閒,退出 54. //若是線程真正處理任務執行時間佔比小於該值,則說明本線程比較空閒,能夠退出。 55. if (pctExecuting < _config->idlePctThreshold()) { 56. log() << "Thread was only executing tasks " << pctExecuting << "% over the last " 57. << runTime << ". Exiting thread."; 58. break; //退出線程循環,也就是線程自動銷燬了 59. } 60. } 61. } 62.}
線程主循環主要工做內容:1. 從ASIO庫的全局隊列獲取任務執行;2. 判斷本線程是否比較」閒」,若是是則直接銷燬退出。3. 線程建立起來進行初始線程名設置、線程主循環一些計數處理等。
3.2.3 controller控制線程核心代碼實現
控制線程用於判斷線程池是線程是否壓力很大,是否比較」忙」,若是是則增長線程數來減輕全局隊列中task任務積壓引發的延遲處理問題。控制線程核心代碼實現以下:
1.//controller控制線程 2.void ServiceExecutorAdaptive::_controllerThreadRoutine() { 3. //控制線程線程名設置 4. setThreadName("worker-controller"_sd); 5. ...... 6. //控制線程主循環 7. while (_isRunning.load()) { 8. //一次while結束的時候執行對應func ,也就是結束的時候計算爲起始時間 9. const auto timerResetGuard = 10. MakeGuard([&sinceLastControlRound] { sinceLastControlRound.reset(); }); 11. //等待工做線程通知,最多等待stuckThreadTimeout 12. _scheduleCondition.wait_for(fakeLk, _config->stuckThreadTimeout().toSystemDuration()); 13. ...... 14. double utilizationPct; 15. { 16. //獲取全部線程執行任務的總時間 17. auto spentExecuting = _getThreadTimerTotal(ThreadTimer::Executing); 18. //獲取全部線程整個生命週期時間(包括空閒時間+執行任務時間+建立線程的時間) 19. auto spentRunning = _getThreadTimerTotal(ThreadTimer::Running); 20. //也就是while中執行一次這個過程當中spentExecuting差值, 21. //也就是spentExecuting表明while一次循環的Executing time開始值, 22. //lastSpentExecuting表明一次循環對應的結束time值 23. auto diffExecuting = spentExecuting - lastSpentExecuting; 24. //也就是spentRunning表明while一次循環的running time開始值, 25. //lastSpentRunning表明一次循環對應的結束time值 26. auto diffRunning = spentRunning - lastSpentRunning; 27. if (spentRunning == 0 || diffRunning == 0) 28. utilizationPct = 0.0; 29. else { 30. lastSpentExecuting = spentExecuting; 31. lastSpentRunning = spentRunning; 32. 33. //一次while循環過程當中全部線程執行任務的時間和線程運行總時間的比值 34. utilizationPct = diffExecuting / static_cast<double>(diffRunning); 35. utilizationPct *= 100; 36. } 37. } 38. //也就是本while()執行一次的時間差值,也就是上次走到這裏的時間和本次走到這裏的時間差值大於該閥值 39. //也就是控制線程過久沒有判斷線程池是否夠用了 40. if (sinceLastControlRound.sinceStart() >= _config->stuckThreadTimeout()) { 41. //use中的線程數=線程池中總的線程數,說明線程池中線程太忙了 42. if ((_threadsInUse.load() == _threadsRunning.load()) && 43. (sinceLastSchedule >= _config->stuckThreadTimeout())) { 44. log() << "Detected blocked worker threads, " 45. << "starting new reserve threads to unblock service executor"; 46. //一次批量建立這麼多線程,若是咱們配置adaptiveServiceExecutorReservedThreads很是大 47. //這裏實際上有個問題,則這裏會一次性建立很是多的線程,可能反而會成爲系統瓶頸 48. //建議mongodb官方這裏最好作一下上限限制 49. for (int i = 0; i < _config->reservedThreads(); i++) { 50. //建立新的worker工做線程 51. _startWorkerThread(); 52. } 53. } 54. continue; 55. } 56. //當前的worker線程數 57. auto threadsRunning = _threadsRunning.load(); 58. //保證線程池中worker線程數最少都要reservedThreads個 59. if (threadsRunning < _config->reservedThreads()) { 60. //線程池中線程數最少數量不能比最低配置少 61. while (_threadsRunning.load() < _config->reservedThreads()) { 62. _startWorkerThread(); 63. } 64. } 65. //worker線程非空閒佔比小於該閥值,說明壓力不大,不須要增長worker線程數 66. if (utilizationPct < _config->idlePctThreshold()) { 67. continue; 68. } 69. //走到這裏,說明總體worker工做壓力仍是很大的 70. //咱們在這裏循環stuckThreadTimeout毫秒,直到咱們等待worker線程建立起來並正常運行task 71. //由於若是有正在建立的worker線程,咱們等待一小會,最多等待stuckThreadTimeout ms 72. //保證一次while循環時間爲stuckThreadTimeout 73. do { 74. stdx::this_thread::sleep_for(_config->maxQueueLatency().toSystemDuration()); 75. } while ((_threadsPending.load() > 0) && 76. (sinceLastControlRound.sinceStart() < _config->stuckThreadTimeout())); 77. //隊列中任務數多餘可用空閒線程數,說明壓力有點大,給線程池增長一個新的線程 78. if (_isStarved()) { 79. _startWorkerThread(); 80. } 81. } 82.}
Mongodb服務層有個專門的控制線程用於判斷線程池中工做線程的壓力狀況,以此來決定是否在線程池中建立新的工做線程來提高性能。
控制線程每過必定時間循環檢查線程池中的線程壓力狀態,實現原理就是簡單的實時記錄線程池中的線程當前運行狀況,爲如下兩類計數:總線程數_threadsRunning、
當前正在運行task任務的線程數_threadsInUse。若是_threadsRunning=_threadsRunning,說明全部工做線程當前都在處理task任務,這時候就會建立新的worker線程來減輕任務由於排隊引發的延遲。
2.1.4 adaptive線程模型函數接口大全
前面只分析了核心的幾個接口,下表列出了該模塊的完整接口功能說明:
- 總結
adaptive動態線程池模型,內核實現的時候會根據當前系統的訪問負載動態的調整線程數。當線程CPU工做比較頻繁的時候,控制線程增長工做線程數;當線程CPU比較空閒後,本線程就會自動消耗退出。下面一塊兒體驗adaptive線程模式下,mongodb是如何作到性能極致設計的。
3.1 synchronous同步線程模型總結
Sync線程模型也就是一個連接一個線程,實現比較簡單。該線程模型,listener線程每接收到一個連接就會建立一個線程,該連接上的全部數據讀寫及內部請求處理流程將一直由本線程負責,整個線程的生命週期就是這個連接的生命週期。
3.2 adaptive線程模型worker線程運行時間相關的幾個統計
3.6狀態機調度模塊中提到,一個完整的客戶端請求處理能夠轉換爲2個任務:經過asio庫接收一個完整mongodb報文、接收到報文後的後續全部處理(含報文解析、認證、引擎層處理、發送數據給客戶端等)。假設這兩個任務對應的任務名、運行時間分別以下表所示:
客戶端一次完整請求過程當中,mongodb內部處理過程=task1 + task2,整個請求過程當中mongodb內部消耗的時間T1+T2。
實際上若是fd上沒有數據請求,則工做線程就會等待數據,等待數據的過程就至關於空閒時間,咱們把這個時間定義爲T3。因而一個工做線程總運行時間=內部任務處理時間+空閒等待時間,也就是線程總時間=T1+T2+T3,只是T3是無用等待時間。
- 單個工做線程如何判斷本身處於」空閒」狀態
步驟2中提到,線程運行總時間=T1 + T2 +T3,其中T3是無用等待時間。若是T3的無用等待時間佔比很大,則說明線程比較空閒。
Mongodb工做線程每次運行完一次task任務後,都會判斷本線程的有效運行時間佔比,有效運行時間佔比=(T1+T2)/(T1+T2+T3),若是有效運行時間佔比小於某個閥值,則該線程自動退出銷燬,該閥值由adaptiveServiceExecutorIdlePctThreshold參數指定。該參數在線調整方式:
db.adminCommand( { setParameter: 1, adaptiveServiceExecutorIdlePctThreshold: 50} )
- 如何判斷線程池中工做線程「太忙」
Mongodb服務層有個專門的控制線程用於判斷線程池中工做線程的壓力狀況,以此來決定是否在線程池中建立新的工做線程來提高性能。
控制線程每過必定時間循環檢查線程池中的線程壓力狀態,實現原理就是簡單的實時記錄線程池中的線程當前運行狀況,爲如下兩類計數:總線程數_threadsRunning、
當前正在運行task任務的線程數_threadsInUse。若是_threadsRunning=_threadsRunning,說明全部工做線程當前都在處理task任務,這時候已經沒有多餘線程去asio庫中的全局任務隊列op_queue_中取任務執行了,這時候隊列中的任務就不會獲得及時的執行,就會成爲響應客戶端請求的瓶頸點。
- 如何判斷線程池中全部線程比較「空閒」
control控制線程會在收集線程池中全部工做線程的有效運行時間佔比,若是佔比小於指定配置的閥值,則表明整個線程池空閒。
前面已經說明一個線程的有效時間佔比爲:(T1+T2)/(T1+T2+T3),那麼全部線程池中的線程總的有效時間佔比計算方式以下:
全部線程的總有效時間TT1 = (線程池中工做線程1的有效時間T1+T2) + (線程池中工做線程2的有效時間T1+T2) + ..... + (線程池中工做線程n的有效時間T1+T2)
全部線程總運行時間TT2 = (線程池中工做線程1的有效時間T1+T2+T3) + (線程池中工做線程2的有效時間T1+T2+T3) + ..... + (線程池中工做線程n的有效時間T1+T2+T3)
線程池中全部線程的總有效工做時間佔比 = TT1/TT2
- control控制線程如何動態增長線程池中線程數
Mongodb在啓動初始化的時候,會建立一個線程名爲」worker-controller」的控制線程,該線程主要工做就是判斷線程池中是否有充足的工做線程來處理asio庫中全局隊列op_queue_中的task任務,若是發現線程池比較忙,沒有足夠的線程來處理隊列中的任務,則在線程池中動態增長線程來避免task任務在隊列上排隊等待。
control控制線程循環主體主要壓力判斷控制流程以下:
while { #等待工做線程喚醒條件變量,最長等待stuckThreadTimeout _scheduleCondition.wait_for(stuckThreadTimeout) ...... #獲取線程池中全部線程最近一次運行任務的總有效時間TT1 Executing = _getThreadTimerTotal(ThreadTimer::Executing); #獲取線程池中全部線程最近一次運行任務的總運行時間TT2 Running = _getThreadTimerTotal(ThreadTimer::Running); #線程池中全部線程的總有效工做時間佔比 = TT1/TT2 utilizationPct = Executing / Running; ...... #表明control線程過久沒有進行線程池壓力檢查了 if(本次循環到該行代碼的時間 > stuckThreadTimeout閥值) { #說明過久沒作壓力檢查,形成工做線程不夠用了 if(_threadsInUse == _threadsRunning) { #批量建立一批工做線程 for(; i < reservedThreads; i++) #建立工做線程 _startWorkerThread(); } #control線程繼續下一次循環壓力檢查 continue; } ...... #若是當前線程池中總線程數小於最小線程數配置 #則建立一批線程,保證最少工做線程數達到要求 if (threadsRunning < reservedThreads) { while (_threadsRunning < reservedThreads) { _startWorkerThread(); } } ...... #檢查上一次循環到本次循環這段時間範圍內線程池中線程的工做壓力 #若是壓力不大,則說明無需增長工做線程數,則繼續下一次循環 if (utilizationPct < idlePctThreshold) { continue; } ...... #若是發現已經有線程建立起來了,可是這些線程尚未運行任務 #這說明當前可用線程數可能足夠了,咱們休息sleep_for會兒在判斷一下 #該循環最多持續stuckThreadTimeout時間 do { stdx::this_thread::sleep_for(); } while ((_threadsPending.load() > 0) && (sinceLastControlRound.sinceStart() < stuckThreadTimeout) #若是tasksQueued隊列中的任務數大於工做線程數,說明任務在排隊了 #該擴容線程池中線程了 if (_isStarved()) { _startWorkerThread(); } }
1. 實時serviceExecutorTaskStats線程模型統計信息
本文分析的mongodb版本爲3.6.1,其network.serviceExecutorTaskStats網絡線程模型相關統計經過db.serverStatus().network.serviceExecutorTaskStats能夠查看,以下圖所示:
上表中各個字段的都有各自的意義,咱們須要注意這些參數的如下狀況:
- threadsRunning - threadsInUse的差值越大說明線程池中線程比較空閒,差值越小說明壓力越大
- threadsPending越大,表示線程池越空閒
- tasksQueued - totalExecuted的差值越大說明任務隊列上等待執行的任務越多,說明任務積壓現象越明顯
- deferredTasksQueued越大說明工做線程比較空閒,在等待客戶端數據到來
- totalTimeRunningMicros - totalTimeExecutingMicros差值越大說明越空閒
上面三個大類中的整體反映趨勢都是同樣的,任何一個差值越大就說明越空閒。
在後續mongodb最新版本中,去掉了部分重複統計的字段,同時也增長了如下字段,以下圖所示:
新版本增長的幾個統計項實際上和3.6.1大同小異,只是把狀態機任務按照不通類型進行了更加詳細的統計。新版本中,更重要的一個功能就是control線程在發現線程池壓力過大的時候建立新線程的觸發狀況也進行了統計,這樣咱們就能夠更加直觀的查看動態建立的線程是由於什麼緣由建立的。
- Mongodb-3.6早期版本control線程動態調整動態增長線程缺陷1例
從步驟6中能夠看出,control控制線程建立工做線程的第一個條件爲:若是該線程超過stuckThreadTimeout閥值都沒有作線程壓力控制檢查,而且線程池中線程數所有在處理任務隊列中的任務,這種狀況control線程一次性會建立reservedThreads個線程。reservedThreads由adaptiveServiceExecutorReservedThreads配置,若是沒有配置,則採用初始值CPU/2。
那麼問題來了,若是我提早經過命令行配置了這個值,而且這個值配置的很是大,例如一百萬,這裏豈不是要建立一百萬個線程,這樣會形成操做系統負載升高,更容易引發耗盡系統pid信息,這會引發嚴重的系統級問題。
不過,不用擔憂,最新版本的mongodb代碼,內核代碼已經作了限制,這種狀況下建立的線程數變爲了1,也就是這種狀況只建立一個線程。
3.3 adaptive線程模型實時參數調優
動態線程模設計的時候,mongodb設計者考慮到了不通應用場景的狀況,所以在覈心關鍵點增長了實時在線參數調整設置,主要包含以下7種參數,以下表所示:
命令行實時參數調整方法以下,以adaptiveServiceExecutorReservedThreads爲例,其餘參數調整方法相似:db.adminCommand( { setParameter: 1, adaptiveServiceExecutorReservedThreads: xx} )
Mongodb服務層的adaptive動態線程模型設計代碼實現很是優秀,有不少實現細節針對不一樣應用場景作了極致優化。
3.4 不一樣線程模型性能多場景PK
詳見:<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>
3.5 Asio網絡庫全局隊列鎖優化,性能進一步提高
經過<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>一文中的ASIO庫實現和adaptive動態線程模型實現,能夠看出爲了獲取全局任務隊列上的任務,須要進行全局鎖競爭,這其實是整個線程池從隊列獲取任務運行最大的一個瓶頸。
優化思路:咱們能夠經過優化隊列和鎖來提高總體性能,當前的隊列只有一個,咱們能夠把單個隊列調整爲多個隊列,每一個隊列一把鎖,任務入隊的時候經過把連接session散列到多個隊列,經過該優化,鎖競爭及排隊將會獲得極大的改善。
優化前隊列架構:
優化後隊列架構:
如上圖,把一個全局隊列拆分爲多個隊列,任務入隊的時候把session按照hash散列到各自的隊列,工做線程獲取任務的時候,同理經過hash的方式去對應的隊列獲取任務,經過這種方式減小鎖競爭,同時提高總體性能。
因爲篇幅緣由,本文只分析了主要核心接口源碼實現,更多接口的源碼實現能夠參考以下地址,詳見:mongodb adaptive動態線程模型源碼詳細分析