關於做者
前滴滴出行技術專家,現任OPPO文檔數據庫mongodb負責人,負責oppo千萬級峯值TPS/十萬億級數據量文檔數據庫mongodb研發和運維工做,一直專一於分佈式緩存、高性能服務端、數據庫、中間件等相關研發。Github帳號地址:https://github.com/y123456yzgit
1. 說明
在以前的<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>和<<transport_layer網絡傳輸層模塊源碼實現二>>一文中分析瞭如何閱讀百萬級大工程源碼、Asio網絡庫實現、線程模型、transport_layer套接字處理及傳輸層管理子模塊、session會話子模塊、Ticket數據收發子模塊、service_entry_point服務入口點子模塊。github
本文將繼續分析網絡傳輸層模塊中service_state_machine狀態機調度子模塊內核源碼實現。mongodb
2. service_state_machine狀態機調度子模塊
service_state_machine狀態機處理模塊主要複雜一次完整請求的狀態轉換,確保請求能夠按照指定的流程順利進行,最終實現客戶端的正常mongodb訪問。該模塊核心代碼實現主要由如下三個源碼文件實現(test爲測試相關,能夠忽略):數據庫
2.1 核心代碼實現
在service_entry_point服務入口點子模塊分析中,當接收到一個新的連接後,在ServiceEntryPointImpl::startSession(...)回調函數中會構造一個ServiceStateMachine ssm類,從而實現了新連接、session、ssm的一一映射關係。其中,ServiceStateMachine 類實現對ThreadGuard(線程守護)有較多的依賴,所以本文從這兩個類核心代碼實現來分析整個狀態機調度模塊的內部設計細節。緩存
2.1.1 ThreadGuard線程守護類核心代碼實現
ThreadGuard也就是」線程守護」類,該類主要用於工做線程名的管理維護、ssm歸屬管理、該ssm對應session連接的回收處理等。該類核心成員及接口實現以下:網絡
1.class ServiceStateMachine::ThreadGuard { 2. ...... 3.public: 4. // create a ThreadGuard which will take ownership of the SSM in this thread. 5. //ThreadGuard初始化,標記ssm全部權屬於本線程 6. explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} { 7. //獲取ssm狀態機全部權 8. auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned); 9. //若是是一個連接一個線程模型,則條件知足 10. if (owned == Ownership::kStatic) { 11. //一個連接一個線程模型,因爲連接始終由同一個線程處理,所以該連接對應的ssm始終歸屬於同一個線程 12. dassert(haveClient()); 13. dassert(Client::getCurrent() == _ssm->_dbClientPtr); 14. //標識歸屬權已肯定 15. _haveTakenOwnership = true; 16. return; 17. } 18. ...... 19. //adaptive 動態線程模式走下面的模式 20. 21. //把當前線程的線程名零時保存起來 22. auto oldThreadName = getThreadName(); 23. //ssm保存的線程名和當前線程名不一樣 24. if (oldThreadName != _ssm->_threadName) { 25. //即將修改線程名了,把修改前的線程名保存到_oldThreadName 26. _ssm->_oldThreadName = getThreadName().toString(); 27. //把運行本ssm狀態機的線程名改成conn-x線程 28. setThreadName(_ssm->_threadName); //把當前線程更名爲_threadName 29. } 30. 31. //設置本線程對應client信息,一個連接對應一個client,標識本client當前歸屬於本線程處理 32. Client::setCurrent(std::move(_ssm->_dbClient)); 33. //本狀態機ssm全部權有了,歸屬於運行本ssm的線程 34. _haveTakenOwnership = true; 35. } 36. ...... 37. //從新賦值 38. ThreadGuard& operator=(ThreadGuard&& other) { 39. if (this != &other) { 40. _ssm = other._ssm; 41. _haveTakenOwnership = other._haveTakenOwnership; 42. //原來的other全部權失效 43. other._haveTakenOwnership = false; 44. } 45. //返回 46. return *this; 47. }; 48. 49. //析構函數 50. ~ThreadGuard() { 51. //ssm全部權已肯定,則析構的時候,調用release處理,恢復線程原有線程名 52. if (_haveTakenOwnership) 53. release(); 54. } 55. 56. //一個連接一個線程模型,標記_owned爲kStatic,也就是線程名永遠不會改變 57. void markStaticOwnership() { 58. dassert(static_cast<bool>(*this)); 59. _ssm->_owned.store(Ownership::kStatic); 60. } 61. 62. //恢復原有線程名,同時把client信息從調度線程歸還給狀態機 63. void release() { 64. auto owned = _ssm->_owned.load(); 65. //adaptive異步線程池模式知足if條件,表示SSM固定歸屬於某個線程 66. if (owned != Ownership::kStatic) { 67. //本線程擁有currentClient信息,因而把它歸還給SSM狀態機 68. if (haveClient()) { 69. _ssm->_dbClient = Client::releaseCurrent(); 70. } 71. //恢復到之前的線程名 72. if (!_ssm->_oldThreadName.empty()) { 73. //恢復到老線程名 74. setThreadName(_ssm->_oldThreadName); 75. } 76. } 77. //狀態機狀態進入end,則調用對應回收hook處理 78. if (_ssm->state() == State::Ended) { 79. //連接關閉的回收處理 ServiceStateMachine::setCleanupHook 80. auto cleanupHook = std::move(_ssm->_cleanupHook); 81. if (cleanupHook) 82. cleanupHook(); 83. return; 84. } 85. 86. //歸屬權失效 87. _haveTakenOwnership = false; 88. //歸屬狀態變爲未知 89. if (owned == Ownership::kOwned) { 90. _ssm->_owned.store(Ownership::kUnowned); 91. } 92. } 93. 94.private: 95. //本線程守護當前對應的ssm 96. ServiceStateMachine* _ssm; 97. //默認false,標識該狀態機ssm不歸屬於任何線程 98. bool _haveTakenOwnership = false; 99.}
從上面的代碼分析能夠看出線程守護類做用比較明確,就是守護當前線程的歸屬狀態,並記錄狀態機ssm不一樣狀態變化先後的線程名。此外,狀態機ssm對應的session連接若是進入end狀態,則該連接的資源回收釋放也由該類完成。session
查看mongod或者mongos實例,若是啓動實例的時候配置了」serviceExecutor: adaptive」會發現這些進程下面有不少線程名爲」conn-x」和」worker-x」線程,同時同一個線程線程名可能發生改變,這個過程就是由ThreadGuard線程守護類來實現。synchronous一個連接一個線程模型只有」conn-x」線程,adaptive線程模型將同時存在有線程名爲」conn-x」和」worker-x」的線程,具體原理講在後面繼續分析,以下圖:多線程
說明:synchronous線程模式對應worker初始線程名爲」conn-x」,adaptive線程模型對應worker初始線程名爲」worker-x」。運維
ThreadGuard線程守護類和狀態機ssm(service_state_machine)關聯緊密,客戶端請求處理的內部ssm狀態轉換也和該類密切關聯,請看後續分析。異步
2.1.2 ServiceStateMachine 類核心代碼實現
service_state_machine狀態機處理模塊核心代碼實現經過ServiceStateMachine類完成,該類的核心結構成員和函數接口以下:
1.//一個新連接對應一個ServiceStateMachine保存到ServiceEntryPointImpl._sessions中 2.class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> { 3. ...... 4.public: 5. ...... 6. static std::shared_ptr<ServiceStateMachine> create(...); 7. ServiceStateMachine(...); 8. //狀態機所屬狀態 9. enum class State { 10. //ServiceStateMachine::ServiceStateMachine構造函數初始狀態 11. Created, 12. //ServiceStateMachine::_runNextInGuard開始進入接收網絡數據狀態 13. //標記本次客戶端請求已完成(已經發送DB獲取的數據給客戶端),等待調度進行該連接的 14. //下一輪請求,該狀態對應處理流程爲_sourceMessage 15. Source, 16. //等待獲取客戶端的數據 17. SourceWait, 18. //接收到一個完整mongodb報文後進入該狀態 19. Process, 20. //等待數據發送成功 21. SinkWait, 22. //接收或者發送數據異常、連接關閉,則進入該狀態 _cleanupSession 23. EndSession, 24. //session回收處理進入該狀態 25. Ended 26. }; 27. //全部權狀態,主要用來判斷是否須要在狀態轉換中跟新線程名,只對動態線程模型生效 28. enum class Ownership { 29. //該狀態表示本狀態機SSM處於非活躍狀態 30. kUnowned, 31. //該狀態標識本狀態機SSM歸屬於某個工做worker線程,處於活躍調度運行狀態 32. kOwned, 33. //表示SSM固定歸屬於某個線程 34. kStatic 35. }; 36. 37. ...... 38.private: 39. //ThreadGuard能夠理解爲線程守護,後面在ThreadGuard類中單獨說明 40. class ThreadGuard; 41. friend class ThreadGuard; 42. 43. ...... 44. //獲取session信息 45. const transport::SessionHandle& _session() 46. //如下兩個接口爲任務task調度相關接口 47. void _scheduleNextWithGuard(...); 48. void _runNextInGuard(ThreadGuard guard); 49. //接收到一個完整mongodb報文後的處理 50. inline void _processMessage(ThreadGuard guard); 51. //如下四個接口完成底層數據讀寫及其對應回調處理 52. void _sourceCallback(Status status); 53. void _sinkCallback(Status status); 54. void _sourceMessage(ThreadGuard guard); 55. void _sinkMessage(ThreadGuard guard, Message toSink); 56. 57. //一次客戶端請求,當前mongodb服務端所處的狀態 58. AtomicWord<State> _state{State::Created}; 59. //服務入口,ServiceEntryPointMongod ServiceEntryPointMongos mongod及mongos入口點 60. ServiceEntryPoint* _sep; 61. //synchronous及adaptive模式,也就是線程模型是一個連接一個線程仍是動態線程池 62. transport::Mode _transportMode; 63. //ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)服務上下文 64. ServiceContext* const _serviceContext; 65. //也就是本ssm對應的session信息,默認對應ASIOSession 66. transport::SessionHandle _sessionHandle; 67. //根據session構造對應client信息,ServiceStateMachine::ServiceStateMachine賦值 68. //也就是本次請求對應的客戶端信息 69. ServiceContext::UniqueClient _dbClient; 70. //指向上面的_dbClient 71. const Client* _dbClientPtr; 72. //該SSM當前處理線程的線程名,由於adaptive線程模型一次請求中的不一樣狀態會修改線程名 73. const std::string _threadName; 74. //修改線程名以前的線程名稱 75. std::string _oldThreadName; 76. //ServiceEntryPointImpl::startSession->ServiceStateMachine::setCleanupHook中設置賦值 77. //session連接回收處理 78. stdx::function<void()> _cleanupHook; 79. //接收處理的message信息 一個完整的報文就記錄在該msg中 80. Message _inMessage; 81. //默認初始化kUnowned,標識本SSM狀態機處於非活躍狀態, 82. //主要用來判斷是否須要在狀態轉換中跟新線程名,只對動態線程模型生效 83. AtomicWord<Ownership> _owned{Ownership::kUnowned}; 84.}
該類核心成員功能說明以下表:
成員名 |
功能說明 |
_state |
SSM狀態機當前所處狀態 |
_sep |
服務入口,mongod對應ServiceEntryPointMongod;mongos對應 ServiceEntryPointMongos |
_transportMode |
synchronous及adaptive模式,也就是線程模型是一個連接一個線程仍是動態線程池 |
_serviceContext |
服務上下文,mongod和mongos分別對應:ServiceContextMongoD、ServiceContextNoop |
_sessionHandle |
本ssm對應的session信息,默認對應ASIOSession |
_dbClient |
也就是本次請求對應的客戶端信息 |
_dbClientPtr |
指向上面的_dbClient |
_threadName |
SSM所屬線程的線程名,初始化爲」conn-n」,adaptive線程模型一次請求中的不一樣狀態會修改線程名 |
_oldThreadName |
修改線程名以前的線程名稱 |
_cleanupHook |
session連接回收處理 |
_inMessage |
接收處理的message信息 一個完整的報文就記錄在該msg中 |
_owned |
默認初始化kUnowned,標識本SSM狀態機處於非活躍狀態,主要用來判斷是否須要在狀態轉換中跟新線程名,只對動態線程模型生效 |
咱們知道,連接、session、SSM狀態機一一對應,他們也擁有對應的歸屬權,這裏的歸屬權指的就是當前SSM歸屬於那個線程,也就是當前SSM狀態機調度模塊由那個線程實現。歸屬權經過Ownership類標記,該類保護以下幾種狀態,每種狀態功能說明以下:
歸屬碼 |
功能說明 |
kUnowned |
未知狀態,也就是SSM當前不歸屬與具體的線程 |
kOwned |
針對adaptive線程模型,表示當前SSM狀態轉換處理由具體的線程負責,也就是歸屬權明確 |
kStatic |
針對synchronous線程模型,也就是一個連接一個線程模型。因爲一個連接始終由同一個線程處理,所以該連接對應SSM也就始終歸屬於同一個線程,整個運行狀態不會改變。 |
Mongodb服務端接收到客戶端請求後的數據接收、協議解析、從db層獲取數據、發送數據給客戶端都是經過SSM狀態機進行有序的狀態轉換處理,SSM調度處理過程當中保護多個狀態,每一個狀態對應一個狀態碼,具體狀態碼及其功能說明以下表所示:
狀態碼 |
功能說明 |
Created |
ServiceStateMachine::ServiceStateMachine()構造函數初始狀態 |
Source |
下面兩種狀況會進入該狀態:1. 新鏈接到來第一次進入SSM處理;2. 客戶端請求已完成(已經發送DB獲取的數據給客戶端),等待調度進行該連接的下一輪請求 |
SourceWait |
等待獲取客戶端的數據 |
Process |
接收到一個完整mongodb報文後進入該狀態,開始進行協議解析、db層數據訪問 |
SinkWait |
等待數據發送成功,發送數據給客戶端過程當中 |
EndSession |
接收或者發送數據異常、連接關閉,session對應客戶端,同時進入Ended狀態 |
Ended |
session回收處理,進入EndSession後立馬經過_cleanupSession進入該狀態 |
以上是SSM處理請求過程的狀態碼信息,狀態轉換的具體實現過程請參考後面的核心代碼分析。listerner線程接收到新的客戶端連接後會調用經過service_entry_point服務入口點子模塊的ssm->start()接口進入SSM狀態機調度模塊,該接口相關的源碼實現以下:
1.//ServiceEntryPointImpl::startSession中執行 啓動SSM狀態機 2.void ServiceStateMachine::start(Ownership ownershipModel) { 3. //直接調用_scheduleNextWithGuard接口 4. _scheduleNextWithGuard( 5. //listener線程暫時性的變爲conn線程名,在_scheduleNextWithGuard中任 6. //務入隊完成後,在下面的_scheduleNextWithGuard調用guard.release()恢復listener線程名 7. ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel); 8.} 9. 10.void ServiceStateMachine::_scheduleNextWithGuard(...) { 11. //該任務func實際上由worker線程運行,worker線程從asio庫的全局隊列獲取任務調度執行 12. auto func = [ ssm = shared_from_this(), ownershipModel ] { 13. //構造ThreadGuard 14. ThreadGuard guard(ssm.get()); 15. //說明是sync mode,即一個連接一個線程模式, 歸屬權明確,屬於指定線程 16. if (ownershipModel == Ownership::kStatic) 17. guard.markStaticOwnership(); 18. //對應:ServiceStateMachine::_runNextInGuard,在這裏面完成狀態調度轉換 19. ssm->_runNextInGuard(std::move(guard)); 20. }; 21. //恢復以前的線程名,若是該SSM進入Ended狀態,則開始資源回收處理 22. guard.release(); 23. //ServiceExecutorAdaptive::schedule(adaptive) ServiceExecutorSynchronous::schedule(synchronous) 24. //第一次進入該函數的時候在這裏面建立新線程,不是第一次則把task任務入隊調度 25. Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags); 26. if (status.isOK()) { 27. return; 28. } 29. //異常處理 30. ...... 31.}
ServiceStateMachine::start()接口調用ServiceStateMachine::_scheduleNextWithGuard()來啓動狀態機運行。_scheduleNextWithGuard()接口最核心的做用就是調用service_executor服務運行子模塊(線程模型子模塊)的schedule()接口來把狀態機調度任務入隊到ASIO網絡庫的一個全局隊列(adaptive動態線程模型),若是是一個連接一個線程模型,則任務入隊到線程級私有隊列。
adaptive線程模型,任務入隊以及工做線程調度任務執行的流程將在後續的線程模型子模塊中分析,也能夠參考:<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>
此外,_scheduleNextWithGuard()入隊到全局隊列的任務就是本模塊後面須要分析的SSM狀態機任務,這些task任務經過本函數接口的func (...)進行封裝,而後經過線程模型子模塊入隊到一個全局隊列。Func(...)這個task任務中會直接調用_runNextInGuard()接口來進行狀態轉換處理,該接口也就是入隊到ASIO全局隊列的任務,核心代碼功能以下:
1.void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) { 2. //獲取當前SSM狀態 3. auto curState = state(); 4. // If this is the first run of the SSM, then update its state to Source 5. //若是是第一次運行該SSM,則狀態爲Created,到這裏標記能夠準備接收數據了 6. if (curState == State::Created) { 7. //進入Source等待接收數據 8. curState = State::Source; 9. _state.store(curState); 10. } 11. //各狀態對應處理 12. try { 13. switch (curState) { 14. //接收數據 15. case State::Source: 16. _sourceMessage(std::move(guard)); 17. break; 18. //以及接收到完整的一個mongodb報文,能夠內部處理(解析+命令處理+應答客戶端) 19. case State::Process: 20. _processMessage(std::move(guard)); 21. break; 22. //連接異常或者已經關閉,則開始回收處理 23. case State::EndSession: 24. _cleanupSession(std::move(guard)); 25. break; 26. default: 27. MONGO_UNREACHABLE; 28. } 29. return; 30. } catch (...) { 31. //異常打印 32. } 33. //異常處理 34. ...... 35. //進入EndSession狀態 36. _state.store(State::EndSession); 37. _cleanupSession(std::move(guard)); 38.}
從上面的代碼實現能夠看出,真正入隊到全局隊列中的任務類型只有三個,分別是:
- 接收mongodb數據的task任務,簡稱爲readTask。
- 接收到一個完整mongodb數據後的後續處理(包括協議解析、命令處理、DB獲取數據、發送數據給客戶端等),簡稱爲dealTask。
- 接收或者發送數據異常、連接關閉等引發的後續資源釋放,簡稱爲cleanTask。
下面針對這三種task任務核心代碼實現進行分析:
- readTask任務核心代碼實現
readTask任務核心代碼實現由_sourceMessage()接口實現,具體代碼以下:
1.//接收客戶端數據 2.void ServiceStateMachine::_sourceMessage(ThreadGuard guard) { 3. ...... 4. //獲取本session接收數據的ticket,也就是ASIOSourceTicket 5. auto ticket = _session()->sourceMessage(&_inMessage); 6. //進入等等接收數據狀態 7. _state.store(State::SourceWait); 8. //release恢復worker線程原有的線程名,synchronous線程模型爲"conn-xx",adaptive對應worker線程名爲"conn-xx" 9. guard.release(); 10. //線程模型默認同步方式,也就是一個連接一個線程 11. if (_transportMode == transport::Mode::kSynchronous) { 12. //同步方式,讀取到一個完整mongodb報文後執行_sourceCallback回調 13. _sourceCallback([this](auto ticket) { 14. MONGO_IDLE_THREAD_BLOCK; 15. return _session()->getTransportLayer()->wait(std::move(ticket)); 16. }(std::move(ticket))); 17. } else if (_transportMode == transport::Mode::kAsynchronous) { 18. //adaptive線程模型,異步讀取一個mongodb報文後執行_sourceCallback回調 19. _session()->getTransportLayer()->asyncWait( 20. ////TransportLayerASIO::ASIOSourceTicket::_bodyCallback讀取到一個完整報文後執行該回調 21. std::move(ticket), [this](Status status) { _sourceCallback(status); }); 22. } 23.} 24. 25.//接收到一個完整mongodb報文後的回調處理 26.void ServiceStateMachine::_sourceCallback(Status status) { 27. //構造ThreadGuard,修改執行本SSM接口的線程名爲conn-xx 28. ThreadGuard guard(this); 29. 30. //狀態檢查 31. dassert(state() == State::SourceWait); 32. //獲取連接session遠端信息 33. auto remote = _session()->remote(); 34. if (status.isOK()) { 35. //等待調度,進入處理消息階段 _processMessage 36. _state.store(State::Process); 37. //注意kMayRecurse標識State::Process階段的處理仍是由本線程執行,這是一個遞歸標記 38. return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse); 39. } 40. ...... 41. //異常流程調用 42. _runNextInGuard(std::move(guard)); 43.}
SSM調度的第一個任務就是readTask任務,從上面的源碼分析能夠看出,該任務就是經過ticket數據分發模塊從ASIO網絡庫讀取一個完整長度的mongodb報文,而後執行_sourceCallback回調。進入該回調函數後,即刻設置SSM狀態爲State::Process狀態,而後調用_scheduleNextWithGuard(...)把dealTask任務入隊到ASIO的全局隊列(adaptive線程模型),或者入隊到線程級私有隊列(synchronous線程模型)等待worker線程調度執行。
這裏有個細節,在把dealTask入隊的時候,攜帶了kMayRecurse標記,該標記標識該任務能夠遞歸調用,也就是該任務能夠由當前線程繼續執行,這樣也就能夠保證同一個請求的taskRead任務和dealTask任務由同一個線程處理。任務遞歸調度,能夠參考後面的線程模型子模塊源碼實現。
2. dealTask任務核心代碼實現
當讀取到一個完整長度的mongodb報文後,就會把dealTask任務入隊到全局隊列,而後由worker線程調度執行該task任務。dealTask任務的核心代碼實現以下:
1.//dealTask處理 2.void ServiceStateMachine::_processMessage(ThreadGuard guard) { 3. ...... 4. //入口流量計數 5. networkCounter.hitLogicalIn(_inMessage.size()); 6. //獲取一個惟一的UniqueOperationContext,一個客戶端對應一個UniqueOperationContext 7. auto opCtx = Client::getCurrent()->makeOperationContext(); 8. //ServiceEntryPointMongod::handleRequest ServiceEntryPointMongos::handleRequest請求處理 9. //command處理、DB訪問後的數據經過dbresponse返回 10. DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); 11. //釋放opCtx,這樣currentop就看不到了 12. opCtx.reset(); 13. //須要發送給客戶端的應答數據 14. Message& toSink = dbresponse.response; 15. //應答數據存在 16. if (!toSink.empty()) { 17. ...... 18. //發送數據 ServiceStateMachine::_sinkMessage() 19. _sinkMessage(std::move(guard), std::move(toSink)); 20. 21. } else { 22. //若是不須要應答客戶端的處理 23. ...... 24. } 25.} 26. 27.//調用Sinkticket發送數據 28.void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) { 29. //獲取發送數據的ASIOSinkTicket 30. auto ticket = _session()->sinkMessage(toSink); 31. //進入sink發送等待狀態 32. _state.store(State::SinkWait); 33. //恢復原有的worker線程名 34. guard.release(); 35. //synchronous線程模型,同步發送 36. if (_transportMode == transport::Mode::kSynchronous) { 37. //最終在ASIOSinkTicket同步發送數據成功後執行_sinkCallback 38. _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket))); 39. } else if (_transportMode == transport::Mode::kAsynchronous) { 40. //最終在ASIOSinkTicket異步發送數據成功後執行_sinkCallback 41. _session()->getTransportLayer()->asyncWait( 42. std::move(ticket), [this](Status status) { _sinkCallback(status); }); 43. } 44.} 45. 46.//sink數據發送 47.void ServiceStateMachine::_sinkCallback(Status status) { 48. //SSM歸屬於新的guard,同時修改當前線程名爲conn-xx 49. ThreadGuard guard(this); 50. //狀態檢查 51. dassert(state() == State::SinkWait); 52. if (!status.isOK()) { 53. //進入EndSession狀態 54. _state.store(State::EndSession); 55. //異常狀況調用 56. return _runNextInGuard(std::move(guard)); 57. } else if (_inExhaust) { //_inExhaust方式 58. //注意這裏的狀態是process _processMessage 還須要繼續進行Process處理 59. _state.store(State::Process); 60. } else { 61. //正常流程始終進入該分支 _sourceMessage 這裏繼續進行遞歸接收數據處理 62. //注意這裏的狀態是Source,繼續接收客戶端請求 63. _state.store(State::Source); 64. } 65. //本連接對應的一次mongo訪問已經應答完成,須要繼續要一次調度了 66. return _scheduleNextWithGuard(std::move(guard), 67. ServiceExecutor::kDeferredTask | 68. ServiceExecutor::kMayYieldBeforeSchedule); 69.}
readTask經過ticket數據分發子模塊讀取一個完整長度的mongodb報文後,開始dealTask任務邏輯,該任務也就是_processMessage(...)。該接口中核心實現就是調用mongod和mongos實例對應的服務入口類的handleRequest(...)接口來完成後續的command命令處理、DB層數據訪問等,訪問到的數據存儲到DbResponse中,最終在經過_sinkMessage(...)把數據發送出去。
真正的mongodb內部處理流程實際上就是經過該dealTask任務完成,該任務也是處理整個請求中資源耗費最重的一個環節。在該task任務中,當數據成功發送給客戶端後,該session連接對應的SSM狀態機進入State::Source狀態,繼續等待worker線程調度來完成後續該連接的新請求。
3. cleanTask任務
在數據讀寫過程、客戶端連接關閉、訪問DB數據層等任何一個環節異常,則會進入State::EndSession狀態。該狀態對應得任務實現相對比較簡單,具體代碼實現以下:
1.//session對應連接回收處理 2.void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { 3. //進入這個狀態後在~ThreadGuard::release中進行ssm _cleanupHook處理,該hook在ServiceEntryPointImpl::startSession 4. _state.store(State::Ended); 5. //清空message buffer 6. _inMessage.reset(); 7. //釋放連接對應client資源 8. Client::releaseCurrent(); 9.}
進入該狀態後直接由本線程進行session資源回收和client資源釋放處理,而無需狀態機調度worker線程來回收。
2.2 關於worker線程名和guardthread線程守護類
前面得分析咱們知道,當線程模型爲adaptive動態線程模型的時候,mongod和mongos實例對應的子線程中有不少名爲「conn-xx」和」worker-xx」的線程,並且同一個線程可能一下子線程名爲「conn-xx」,下一次又變爲了」worker-xx」。這個線程名的初始命名和線程名更改與ServiceStateMachine狀態機調度類、guardthread線程守護類、worker線程模型等都有關係。
Worker線程由ServiceExecutor線程模型子模塊建立,請參考後續線程模型子模塊相關章節。默認初始化線程名爲」conn-x」,初始化代碼實現以下:
1.//ServiceStateMachine::create調用,ServiceStateMachine類初始化構造 2.ServiceStateMachine::ServiceStateMachine(...) 3. ...... 4. //線程名初始化:conn-xx,xx代碼session id 5. _threadName{str::stream() << "conn-" << _session()->id()} {} 6.} 7. 8.class Session { 9. ...... 10. //sessionID,自增生成 11. const Id _id; 12.} 13. 14.//全局sessionIdCounter計數器,初始化爲0 15.AtomicUInt64 sessionIdCounter(0); 16. 17.//session id自增 18.Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {}
SSM狀態處理過程當中,會把一個完整的請求過程 = readTask任務 + dealTask任務,這兩個任務都是經過SSM狀態機和ServiceExecutor線程模型子模塊的worker線程配合調度完成,在任務處理過程當中處理同一個任務的線程可能會有屢次線程名更改,這個就是結合guardthread線程守護類來完成,以一個線程名切換更改僞代碼實現爲例:
1.worker_thread_run_task(...) 2.{ 3. //若是是adaptive線程模型,當前worker線程名爲"worker-xx" 4. print(threadName) 5. //業務邏輯處理1 6. ...... 7. 8. //初始化構造ThreadGuard,這裏面修改線程名爲_ssm->_threadName,也就是"conn-xx", 9. //同時保存原來的線程名"worker-xx"到_ssm->_oldThreadName中 10. ThreadGuard guard(this); 11. //若是是adaptive線程模型,線程名打印內容爲"conn-xx" 12. print(threadName) 13. //業務邏輯處理2 14. ...... 15. //恢復_ssm->_oldThreadName保存的線程名"worker-xx" 16. guard.release(); 17. 18. //若是是adaptive線程模型,線程名恢復爲"worker-xx" 19. print(threadName) 20.}
從上面的僞代碼能夠看出,adaptive線程模型對應worker線程名爲」worker」,在進入ThreadGuard guard(this)流程後,線程名更改成」conn-xx」線程,當guard.release()釋放後恢復原有」worker-xx」線程名。
結合前面的SSM狀態處理流程,adaptive線程模型能夠獲得以下總結:底層網絡IO數據讀寫過程,worker線程名會改成」worker-xx」,其餘非網絡IO的mongodb內部邏輯處理線程名爲」conn-xx」。因此,若是查看mongod或者mongos進程全部線程名的時候,若是發現線程名爲」worker-xx」,說明當前線程在處理網絡IO;若是發現線程名爲」conn-xx」,則說明當前線程在處理內部邏輯處理,對於mongod實例能夠理解爲主要處理磁盤IO。
因爲synchronous同步線程模型,同一連接對應的全部客戶端請求至始至終都有同一線程處理,因此整個處理線程名不會改變,也不必修改線程名,整個過程都是」conn-xx」線程名。
2.3 該模塊函數接口總結大全
前面分析了主要核心接口源碼實現,不少其餘接口沒有一一列舉詳細分析,該模塊u全部接口功能總結以下,更多接口代碼實現詳見Mongodb內核源碼詳細註釋分析:
類名 |
函數接口 |
功能說明 |
ThreadGuard |
ThreadGuard(...) |
線程守護初始化,把worker線程名改成」conn-xx」 |
ThreadGuard |
operator=(...) |
類賦值 |
ThreadGuard |
~ThreadGuard() |
類析構,析構函數中調用release()接口 |
ThreadGuard |
operator bool() |
獲取全部權標識 |
ThreadGuard |
markStaticOwnership() |
synchronous線程模型默認歸屬權爲kStatic |
ThreadGuard |
release() |
|
ServiceStateMachine |
ServiceStateMachine(...) |
ServiceStateMachine類初始化構造賦值 |
ServiceStateMachine |
_session() |
獲取當前ssm對應的session信息 |
ServiceStateMachine |
_sourceMessage(...) |
等待經過ASIO庫接收網絡IO數據 |
ServiceStateMachine |
_sinkMessage(...) |
等待經過ASIO庫發送網絡IO數據 |
ServiceStateMachine |
_sourceCallback(...) |
接收到一個完整長度mongodb報文的回調處理 |
ServiceStateMachine |
_sinkCallback(...) |
發送一個完整應答報文後的回調處理 |
ServiceStateMachine |
_processMessage(...) |
開始內部處理,如協議解析、命令處理、DB層數據訪問等 |
ServiceStateMachine |
_runNextInGuard(...) |
SSM狀態機調度運行 |
ServiceStateMachine |
start(...) |
接收到一個新連接經過start()接口啓用SSM狀態機 |
ServiceStateMachine |
_scheduleNextWithGuard(...) |
readTask和dealTask交由worker線程處理 |
ServiceStateMachine |
terminate(...) |
調用TransportLayerASIO::end()進行套接字回收處理 |
ServiceStateMachine |
setCleanupHook(...) |
設置clean hook,也就是回收處理 |
ServiceStateMachine |
state() |
獲取SSM當前狀態 |
ServiceStateMachine |
_terminateAndLogIfError(...) |
打印回收日誌並進行terminate回收處理 |
ServiceStateMachine |
_cleanupSession(...) |
session會話回收處理 |
3. 總結
本文主要分析了service_state_machine狀態機子模塊,該模塊把session對應的客戶端請求轉換爲readTask任務、dealTask任務和cleanTask任務,前兩個任務經過worker線程完成調度處理,cleanTask任務在內部處理異常或者連接關閉的時候由本線程直接執行,而不是經過worker線程調度執行。
這三個任務處理過程會分別對應到Created、Source、SourceWait、Process、SinkWait、EndSession、Ended七種狀態的一種或者多種,具體詳見前面的狀態碼分析。一個正常的客戶端請求狀態轉換過程以下:
- 連接剛創建的第一次請求狀態轉換過程:
Created->Source -> SourceWait -> Process -> SinkWait -> Source
2. 該連接後續的請求狀態轉換過程:
Source -> SourceWait -> Process -> SinkWait -> Source
此外,SSM狀態機調度模塊經過ServiceStateMachine::_scheduleNextWithGuard(...)接口和線程模型子模塊關聯起來。SSM經過該接口完成worker線程初始建立、task任務入隊處理,下期將分析<<網絡線程模型子模塊>>詳細源碼實現。
說明:
該模塊更多接口實現細節詳見Mongodb內核源碼註釋:Mongodb內核源碼詳細註釋分析