關於做者
前滴滴出行技術專家,現任OPPO文檔數據庫mongodb負責人,負責oppo千萬級峯值TPS/十萬億級數據量文檔數據庫mongodb內核研發及運維工做,一直專一於分佈式緩存、高性能服務端、數據庫、中間件等相關研發。後續持續分享《MongoDB內核源碼設計、性能優化、最佳運維實踐》,Github帳號地址:https://github.com/y123456yzgit
- 背景
<<transport_layer網絡傳輸層模塊源碼實現>>中分享了mongodb內核底層網絡IO處理相關實現,包括套接字初始化、一個完整mongodb報文的讀取、獲取到DB數據發送給客戶端等。Mongodb支持多種增、刪、改、查、聚合處理、cluster處理等操做,每一個操做在內核實現中對應一個command,每一個command有不一樣的功能,mongodb內核如何進行command源碼處理將是本文分析的重點github
此外,mongodb提供了mongostat工具來監控當前集羣的各類操做統計。Mongostat監控統計以下圖所示:mongodb
其中,insert、delete、update、query這四項統計比較好理解,分別對應增、刪、改、查。可是,comand、getmore不是很好理解,command表明什麼統計?getMore表明什麼統計?,這兩項相對比較難理解。數據庫
此外,經過本文字分析,咱們將搞明白這六項統計的具體含義,同時弄清這六項統計由那些操做進行計數。緩存
Command命令處理模塊分爲:mongos操做命令、mongod操做命令、mongodb集羣內部命令,具體定義以下:性能優化
- mongos操做命令,客戶端能夠經過mongos訪問集羣相關的命令。
- mongod操做命令:客戶端能夠經過mongod複製集和cfg server訪問集羣的相關命令。
- mongodb集羣內部命令:mongos、mongod、mongo-cfg集羣實例之間交互的命令。
Command命令處理模塊核心代碼實現以下:網絡
《command命令處理模塊源碼實現》相關文章重點分析命令處理模塊核心代碼實現,也就是上面截圖中的命令處理源碼文件實現。session
2. <<transport_layer網絡傳輸層模塊源碼實現>>銜接回顧app
<<transport_layer網絡傳輸層模塊源碼實現三>>一文中,咱們對service_state_machine狀態機調度子模塊進行了分析,該模塊中的dealTask任務進行mongodb內部業務邏輯處理,其核心實現以下:less
1.//dealTask處理 2.void ServiceStateMachine::_processMessage(ThreadGuard guard) { 3. ...... 4. //command處理、DB訪問後的數據經過dbresponse返回 5. DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); 6. ...... 7.}
上面的_sep對應mongod或者mongos實例的服務入口實現,該_seq成員分別在以下代碼中初始化爲ServiceEntryPointMongod和ServiceEntryPointMongod類實現。SSM狀態機的_seq成員初始化賦值核心代碼實現以下:
1.//mongos實例啓動初始化 2.static ExitCode runMongosServer() { 3. ...... 4. //mongos實例對應sep爲ServiceEntryPointMongos 5. auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext()); 6. getGlobalServiceContext()->setServiceEntryPoint(std::move(sep)); 7. ...... 8.} 9. 10.//mongod實例啓動初始化 11.ExitCode _initAndListen(int listenPort) { 12. ...... 13. //mongod實例對應sep爲ServiceEntryPointMongod 14. serviceContext->setServiceEntryPoint( 15. stdx::make_unique<ServiceEntryPointMongod>(serviceContext)); 16. ...... 17.} 18. 19.//SSM狀態機初始化 20.ServiceStateMachine::ServiceStateMachine(...) 21. : _state{State::Created}, 22. //mongod和mongos實例的服務入口經過這裏賦值給_seq成員變量 23. _sep{svcContext->getServiceEntryPoint()}, 24. ...... }
經過上面的幾個核心接口實現,把mongos和mongod兩個實例的服務入口與狀態機SSM(ServiceStateMachine)聯繫起來,最終和下面的command命令處理模塊關聯。
dealTask進行一次mongodb請求的內部邏輯處理,該處理由_sep->handleRequest()接口實現。因爲mongos和mongod服務入口分別由ServiceEntryPointMongos和ServiceEntryPointMongod兩個類實現,所以dealTask也就演變爲以下接口處理:
- mongos實例:ServiceEntryPointMongos::handleRequest(...)
- Mongod實例::ServiceEntryPointMongod::handleRequest(...)
這兩個接口入參都是OperationContext和Message,分別對應操做上下文、請求原始數據內容。下文會分析Message解析實現、OperationContext服務上下文實現將在後續章節分析。
Mongod和mongos實例服務入口類都繼承自網絡傳輸模塊中的ServiceEntryPointImpl類,以下圖所示:
Tips: mongos和mongod服務入口類爲什麼要繼承網絡傳輸模塊服務入口類?
緣由是一個請求對應一個連接session,該session對應的請求又和SSM狀態機惟一對應。全部客戶端請求對應的SSM狀態機信息所有保存再ServiceEntryPointImpl._sessions成員中,而command命令處理模塊爲SSM狀態機任務中的dealTask任務,經過該繼承關係,ServiceEntryPointMongod和ServiceEntryPointMongos子類也就能夠和狀態機及任務處理關聯起來,同時也能夠獲取當前請求對應的session連接信息。
3. Mongodb協議解析
在《transport_layer網絡傳輸層模塊源碼實現二》中的數據收發子模塊完成了一個完整mongodb報文的接收,一個mongodb報文由Header頭部+opCode包體組成,以下圖所示:
上圖中各個字段說明以下表:
opCode取值比較多,早期版本中OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY分別針對增刪改查請求,Mongodb從3.6版本開始默認使用OP_MSG操做做爲默認opCode,是一種可擴展的消息格式,旨在包含其餘操做碼的功能,新版本讀寫請求協議都對應該操做碼。本文以OP_MSG操做碼對應協議爲例進行分析,其餘操做碼協議分析過程相似,OP_MSG請求協議格式以下:
1.OP_MSG { 2. //mongodb報文頭部 3. MsgHeader header; 4. //位圖,用於標識報文是否須要校驗 是否須要應答等 5. uint32 flagBits; // message flags 6. //報文內容,例如find write等命令內容經過bson格式存在於該結構中 7. Sections[] sections; // data sections 8. //報文CRC校驗 9. optional<uint32> checksum; // optional CRC-32C checksum }
OP_MSG各個字段說明以下表:
一個完整OP_MSG請求格式以下:
除了通用頭部header外,客戶端命令請求實際上都保存於sections字段中,該字段存放的是請求的原始bson格式數據。BSON是由10gen開發的一個數據格式,目前主要用於MongoDB中,是MongoDB的數據存儲格式。BSON基於JSON格式,選擇JSON進行改造的緣由主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具備如下特性:
- Lightweight(更輕量級)
- Traversable(易操做)
- Efficient(高效性能)
本文重點不是分析bson協議格式,bson協議實現細節將在後續章節分享。bson協議更多設計細節詳見:http://bsonspec.org/
總結:一個完整mongodb報文由header+body組成,其中header長度固定爲16字節,body長度等於messageLength-16。Header部分協議解析由message.cpp和message.h兩源碼文件實現,body部分對應的OP_MSG類請求解析由op_msg.cpp和op_msg.h兩源碼文件實現。
3. mongodb報文通用頭部解析及封裝源碼實現
Header頭部解析由src/mongo/util/net目錄下message.cpp和message.h兩文件完成,該類主要完成通用header頭部和body部分的解析、封裝。所以報文頭部核心代碼分爲如下兩類:
- 報文頭部內容解析及封裝(MSGHEADER命名空間實現)
- 頭部和body內容解析及封裝(MsgData命名空間實現)
3.1 mongodb報文頭部解析及封裝核心代碼實現
mongodb報文頭部解析由namespace MSGHEADER {...}實現,該類主要成員及接口實現以下:
1.namespace MSGHEADER { 2.//header頭部各個字段信息 3.struct Layout { 4. //整個message長度,包括header長度和body長度 5. int32_t messageLength; 6. //requestID 該請求id信息 7. int32_t requestID; 8. //getResponseToMsgId解析 9. int32_t responseTo; 10. //操做類型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等 11. int32_t opCode; 12.}; 13. 14.//ConstView實現header頭部數據解析 15.class ConstView { 16.public: 17. ...... 18. //初始化構造 19. ConstView(const char* data) : _data(data) {} 20. //獲取_data地址 21. const char* view2ptr() const { 22. return data().view(); 23. } 24. //TransportLayerASIO::ASIOSourceTicket::_headerCallback調用 25. //解析header頭部的messageLength字段 26. int32_t getMessageLength() const { 27. return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength)); 28. } 29. //解析header頭部的requestID字段 30. int32_t getRequestMsgId() const { 31. return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID)); 32. } 33. //解析header頭部的getResponseToMsgId字段 34. int32_t getResponseToMsgId() const { 35. return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo)); 36. } 37. //解析header頭部的opCode字段 38. int32_t getOpCode() const { 39. return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode)); 40. } 41. 42.protected: 43. //mongodb報文數據起始地址 44. const view_type& data() const { 45. return _data; 46. } 47.private: 48. //數據部分 49. view_type _data; 50.}; 51. 52.//View填充header頭部數據 53.class View : public ConstView { 54.public: 55. ...... 56. //構造初始化 57. View(char* data) : ConstView(data) {} 58. //header起始地址 59. char* view2ptr() { 60. return data().view(); 61. } 62. //如下四個接口進行header填充 63. //填充header頭部messageLength字段 64. void setMessageLength(int32_t value) { 65. data().write(tagLittleEndian(value), offsetof(Layout, messageLength)); 66. } 67. //填充header頭部requestID字段 68. void setRequestMsgId(int32_t value) { 69. data().write(tagLittleEndian(value), offsetof(Layout, requestID)); 70. } 71. //填充header頭部responseTo字段 72. void setResponseToMsgId(int32_t value) { 73. data().write(tagLittleEndian(value), offsetof(Layout, responseTo)); 74. } 75. //填充header頭部opCode字段 76. void setOpCode(int32_t value) { 77. data().write(tagLittleEndian(value), offsetof(Layout, opCode)); 78. } 79.private: 80. //指向header起始地址 81. view_type data() const { 82. return const_cast<char*>(ConstView::view2ptr()); 83. } 84.}; 85.}
從上面的header頭部解析、填充的實現類能夠看出,header頭部解析由MSGHEADER::ConstView實現;header頭部填充由MSGHEADER::View完成。實際上代碼實現上,經過offsetof來進行移位,從而快速定位到頭部對應字段。
3.2 mongodb報文頭部+body解析封裝核心代碼實現
Namespace MSGHEADER{...}命名空間只負責header頭部的處理,namespace MsgData{...}命名空間相對MSGHEADER命名空間更加完善,除了處理頭部解析封裝外,還負責body數據起始地址維護、body數據封裝、數據長度檢查等。MsgData命名空間核心代碼實現以下:
1.namespace MsgData { 2.struct Layout { 3. //數據填充組成:header部分 4. MSGHEADER::Layout header; 5. //數據填充組成: body部分,body先用data佔位置 6. char data[4]; 7.}; 8. 9.//解析header字段信息及body其實地址信息 10.class ConstView { 11.public: 12. //初始化構造 13. ConstView(const char* storage) : _storage(storage) {} 14. //獲取數據起始地址 15. const char* view2ptr() const { 16. return storage().view(); 17. } 18. 19. //如下四個接口間接執行前面的MSGHEADER中的頭部字段解析 20. //填充header頭部messageLength字段 21. int32_t getLen() const { 22. return header().getMessageLength(); 23. } 24. //填充header頭部requestID字段 25. int32_t getId() const { 26. return header().getRequestMsgId(); 27. } 28. //填充header頭部responseTo字段 29. int32_t getResponseToMsgId() const { 30. return header().getResponseToMsgId(); 31. } 32. //獲取網絡數據報文中的opCode字段 33. NetworkOp getNetworkOp() const { 34. return NetworkOp(header().getOpCode()); 35. } 36. //指向body起始地址 37. const char* data() const { 38. return storage().view(offsetof(Layout, data)); 39. } 40. //messageLength長度檢查,opcode檢查 41. bool valid() const { 42. if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize)) 43. return false; 44. if (getNetworkOp() < 0 || getNetworkOp() > 30000) 45. return false; 46. return true; 47. } 48. ...... 49.protected: 50. //獲取_storage 51. const ConstDataView& storage() const { 52. return _storage; 53. } 54. //指向header起始地址 55. MSGHEADER::ConstView header() const { 56. return storage().view(offsetof(Layout, header)); 57. } 58.private: 59. //mongodb報文存儲在這裏 60. ConstDataView _storage; 61.}; 62. 63.//填充數據,包括Header和body 64.class View : public ConstView { 65.public: 66. //構造初始化 67. View(char* storage) : ConstView(storage) {} 68. ...... 69. //獲取報文起始地址 70. char* view2ptr() { 71. return storage().view(); 72. } 73. 74. //如下四個接口間接執行前面的MSGHEADER中的頭部字段構造 75. //如下四個接口完成msg header賦值 76. //填充header頭部messageLength字段 77. void setLen(int value) { 78. return header().setMessageLength(value); 79. } 80. //填充header頭部messageLength字段 81. void setId(int32_t value) { 82. return header().setRequestMsgId(value); 83. } 84. //填充header頭部messageLength字段 85. void setResponseToMsgId(int32_t value) { 86. return header().setResponseToMsgId(value); 87. } 88. //填充header頭部messageLength字段 89. void setOperation(int value) { 90. return header().setOpCode(value); 91. } 92. 93. using ConstView::data; 94. //指向data 95. char* data() { 96. return storage().view(offsetof(Layout, data)); 97. } 98.private: 99. //也就是報文起始地址 100. DataView storage() const { 101. return const_cast<char*>(ConstView::view2ptr()); 102. } 103. //指向header頭部 104. MSGHEADER::View header() const { 105. return storage().view(offsetof(Layout, header)); 106. } 107.}; 108. 109....... 110.//Value爲前面的Layout,減4是由於有4字節填充data,因此這個就是header長度 111.const int MsgDataHeaderSize = sizeof(Value) - 4; 112. 113.//除去頭部後的數據部分長度 114.inline int ConstView::dataLen() const { 115. return getLen() - MsgDataHeaderSize; 116.} 117.} // namespace MsgData
和MSGHEADER命名空間相比,MsgData這個namespace命名空間接口實現和前面的MSGHEADER命名空間實現大同小異。MsgData不只僅處理header頭部的解析組裝,還負責body部分數據頭部指針指向、頭部長度檢查、opCode檢查、數據填充等。其中,MsgData命名空間中header頭部的解析構造底層依賴MSGHEADER實現。
3.3 Message/DbMessage核心代碼實現
在《transport_layer網絡傳輸層模塊源碼實現二》中,從底層ASIO庫接收到的mongodb報文是存放在Message結構中存儲,最終存放在ServiceStateMachine._inMessage成員中。
在前面第2章咱們知道mongod和mongso實例的服務入口接口handleRequest(...)中都帶有Message入參,也就是接收到的Message數據經過該接口處理。Message類主要接口實現以下:
1.//DbMessage._msg成員爲該類型 2.class Message { 3.public: 4. //message初始化 5. explicit Message(SharedBuffer data) : _buf(std::move(data)) {} 6. //頭部header數據 7. MsgData::View header() const { 8. verify(!empty()); 9. return _buf.get(); 10. } 11. //獲取網絡數據報文中的op字段 12. NetworkOp operation() const { 13. return header().getNetworkOp(); 14. } 15. //_buf釋放爲空 16. bool empty() const { 17. return !_buf; 18. } 19. //獲取報文總長度messageLength 20. int size() const { 21. if (_buf) { 22. return MsgData::ConstView(_buf.get()).getLen(); 23. } 24. return 0; 25. } 26. //body長度 27. int dataSize() const { 28. return size() - sizeof(MSGHEADER::Value); 29. } 30. //buf重置 31. void reset() { 32. _buf = {}; 33. } 34. // use to set first buffer if empty 35. //_buf直接使用buf空間 36. void setData(SharedBuffer buf) { 37. verify(empty()); 38. _buf = std::move(buf); 39. } 40. //把msgtxt拷貝到_buf中 41. void setData(int operation, const char* msgtxt) { 42. setData(operation, msgtxt, strlen(msgtxt) + 1); 43. } 44. //根據operation和msgdata構造一個完整mongodb報文 45. void setData(int operation, const char* msgdata, size_t len) { 46. verify(empty()); 47. size_t dataLen = len + sizeof(MsgData::Value) - 4; 48. _buf = SharedBuffer::allocate(dataLen); 49. MsgData::View d = _buf.get(); 50. if (len) 51. memcpy(d.data(), msgdata, len); 52. d.setLen(dataLen); 53. d.setOperation(operation); 54. } 55. ...... 56. //獲取_buf對應指針 57. const char* buf() const { 58. return _buf.get(); 59. } 60. 61.private: 62. //存放接收數據的buf 63. SharedBuffer _buf; 64.};
Message是操做mongodb收發報文最直接的實現類,該類主要完成一個完整mongodb報文封裝。有關mongodb報文頭後面的body更多的解析實如今DbMessage類中完成,DbMessage類包含Message類成員_msg。實際上,Message報文信息在handleRequest(...)實例服務入口中賦值給DbMessage._msg,報文後續的body處理繼續由DbMessage類相關接口完成處理。DbMessage和Message類關係以下:
1.class DbMessage { 2. ...... 3. //包含Message成員變量 4. const Message& _msg; 5. //mongodb報文起始地址 6. const char* _nsStart; 7. //報文結束地址 8. const char* _theEnd; 9.} 10. 11.DbMessage::DbMessage(const Message& msg) : _msg(msg), 12. _nsStart(NULL), _mark(NULL), _nsLen(0) { 13. //一個mongodb報文(header+body)數據的結束地址 14. _theEnd = _msg.singleData().data() + _msg.singleData().dataLen(); 15. //報文起始地址 [_nextjsobj, _theEnd ]之間的數據就是一個完整mongodb報文 16. _nextjsobj = _msg.singleData().data(); 17. ...... 18.}
DbMessage._msg成員爲DbMessage 類型,DbMessage的_nsStart和_theEnd成員分別記錄完整mongodb報文的起始地址和結束地址,經過這兩個指針就能夠獲取一個完整mongodb報文的所有內容,包括header和body。
注意:DbMessage是早期mongodb版本(version<3.6)中用於報文body解析封裝的類,這些類針對opCode=[dbUpdate, dbDelete]這個區間的操做。在mongodb新版本(version>=3.6)中,body解析及封裝由op_msg.h和op_msg.cpp代碼文件中的clase OpMsgRequest{}完成處理。
3.4 OpMsg報文解析封裝核心代碼實現
Mongodb從3.6版本開始默認使用OP_MSG操做做爲默認opCode,是一種可擴展的消息格式,旨在包含其餘操做碼的功能,新版本讀寫請求協議都對應該操做碼。OP_MSG對應mongodb報文body解析封裝處理由OpMsg類相關接口完成,OpMsg::parse(Message)從Message中解析出報文body內容,其核心代碼實現以下:
1.struct OpMsg { 2. ...... 3. //msg解析賦值見OpMsg::parse 4. //各類命令(insert update find等)都存放在該body中 5. BSONObj body; 6. //sequences用法暫時沒看懂,感受沒什麼用?先跳過 7. std::vector<DocumentSequence> sequences; //賦值見OpMsg::parse 8.} 1.//從message中解析出OpMsg信息 2.OpMsg OpMsg::parse(const Message& message) try { 3. //message不能爲空,而且opCode必須爲dbMsg 4. invariant(!message.empty()); 5. invariant(message.operation() == dbMsg); 6. //獲取flagBits 7. const uint32_t flags = OpMsg::flags(message); 8. //flagBits有效性檢查,bit 0-15中只能對第0和第1位操做 9. uassert(ErrorCodes::IllegalOpMsgFlag, 10. str::stream() << "Message contains illegal flags value: Ob" 11. << std::bitset<32>(flags).to_string(), 12. !containsUnknownRequiredFlags(flags)); 13. 14. //校驗碼默認4字節 15. constexpr int kCrc32Size = 4; 16. //判斷該mongo報文body內容是否啓用了校驗功能 17. const bool haveChecksum = flags & kChecksumPresent; 18. //若是有啓用校驗功能,則報文末尾4字節爲校驗碼 19. const int checksumSize = haveChecksum ? kCrc32Size : 0; 20. //sections字段內容 21. BufReader sectionsBuf(message.singleData().data() + sizeof(flags), 22. message.dataSize() - sizeof(flags) - checksumSize); 23. 24. //默認先設置位false 25. bool haveBody = false; 26. OpMsg msg; 27. //解析sections對應命令請求數據 28. while (!sectionsBuf.atEof()) { 29. //BufReader::read讀取kind內容,一個字節 30. const auto sectionKind = sectionsBuf.read<Section>(); 31. //kind爲0對應命令請求body內容,內容經過bson報錯 32. switch (sectionKind) { 33. //sections第一個字節是0說明是body 34. case Section::kBody: { 35. //默認只能有一個body 36. uassert(40430, "Multiple body sections in message", !haveBody); 37. haveBody = true; 38. //命令請求的bson信息保存在這裏 39. msg.body = sectionsBuf.read<Validated<BSONObj>>(); 40. break; 41. } 42. 43. //DocSequence暫時沒看明白,用到的地方不多,跳過,後續等 44. //該系列文章主流功能分析完成後,從頭再回首分析 45. case Section::kDocSequence: { 46. ...... 47. } 48. } 49. } 50. //OP_MSG必須有body內容 51. uassert(40587, "OP_MSG messages must have a body", haveBody); 52. //body和sequence去重判斷 53. for (const auto& docSeq : msg.sequences) { 54. ...... 55. } 56. return msg; 57.}
OpMsg類被OpMsgRequest類繼承,OpMsgRequest類中核心接口就是解析出OpMsg.body中的庫信息和表信息,OpMsgRequest類代碼實現以下:
1.//協議解析得時候會用到,見runCommands 2.struct OpMsgRequest : public OpMsg { 3. ...... 4. //構造初始化 5. explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {} 6. //opMsgRequestFromAnyProtocol->OpMsgRequest::parse 7. //從message中解析出OpMsg所需成員信息 8. static OpMsgRequest parse(const Message& message) { 9. //OpMsg::parse 10. return OpMsgRequest(OpMsg::parse(message)); 11. } 12. //根據db body extraFields填充OpMsgRequest 13. static OpMsgRequest fromDBAndBody(... { 14. OpMsgRequest request; 15. request.body = ([&] { 16. //填充request.body 17. ...... 18. }()); 19. return request; 20. } 21. //從body中獲取db name 22. StringData getDatabase() const { 23. if (auto elem = body["$db"]) 24. return elem.checkAndGetStringData(); 25. uasserted(40571, "OP_MSG requests require a $db argument"); 26. } 27. //find insert 等命令信息 body中的第一個elem就是command 名 28. StringData getCommandName() const { 29. return body.firstElementFieldName(); 30. } 31.};
OpMsgRequest經過OpMsg::parse(message)解析出OpMsg信息,從而獲取到body內容,GetCommandName()接口和getDatabase()則分別從body中獲取庫DB信息、命令名信息。經過該類相關接口,命令名(find、write、update等)和DB庫都獲取到了。
OpMsg模塊除了OP_MSG相關報文解析外,還負責OP_MSG報文組裝填充,該模塊接口功能大全以下表:
4. Mongod實例服務入口核心代碼實現
Mongod實例服務入口類ServiceEntryPointMongod繼承ServiceEntryPointImpl類,mongod實例的報文解析處理、命令解析、命令執行都由該類負責處理。ServiceEntryPointMongod核心接口能夠細分爲:opCode解析及回調處理、命令解析及查找、命令執行三個子模塊。
4.1 opCode解析及回調處理
OpCode操做碼解析及其回調處理由ServiceEntryPointMongod::handleRequest(...)接口實現,核心代碼實現以下:
1.//mongod服務對於客戶端請求的處理 2.//經過狀態機SSM模塊的以下接口調用:ServiceStateMachine::_processMessage 3.DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) { 4. //獲取opCode,3.6版本對應客戶端默認使用OP_MSG 5. NetworkOp op = m.operation(); 6. ...... 7. //根據message構造DbMessage 8. DbMessage dbmsg(m); 9. //根據操做上下文獲取對應的client 10. Client& c = *opCtx->getClient(); 11. ...... 12. //獲取庫.表信息,注意只有dbUpdate<opCode<dbDelete的opCode請求才經過dbmsg直接獲取庫和表信息 13. const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; 14. const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); 15. .... 16. //CurOp::debug 初始化opDebug,慢日誌相關記錄 17. OpDebug& debug = currentOp.debug(); 18. //慢日誌閥值 19. long long logThresholdMs = serverGlobalParams.slowMS; 20. //時mongodb將記錄此次慢操做,1爲只記錄慢操做,即操做時間大於了設置的配置,2表示記錄全部操做 21. bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); 22. DbResponse dbresponse; 23. if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { 24. //新版本op=dbMsg,所以走這裏 25. //從DB獲取數據,獲取到的數據經過dbresponse返回 26. dbresponse = runCommands(opCtx, m); 27. } else if (op == dbQuery) { 28. ...... 29. //早期mongodb版本查詢走這裏 30. dbresponse = receivedQuery(opCtx, nsString, c, m); 31. } else if (op == dbGetMore) { 32. //早期mongodb版本查詢走這裏 33. dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); 34. } else { 35. ...... 36. //早期版本增 刪 改走這裏處理 37. if (op == dbInsert) { 38. receivedInsert(opCtx, nsString, m); //插入操做入口 新版本CmdInsert::runImpl 39. } else if (op == dbUpdate) { 40. receivedUpdate(opCtx, nsString, m); //更新操做入口 41. } else if (op == dbDelete) { 42. receivedDelete(opCtx, nsString, m); //刪除操做入口 43. } 44. } 45. //獲取runCommands執行時間,也就是內部處理時間 46. debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()); 47. ...... 48. //慢日誌記錄 49. if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { 50. Locker::LockerInfo lockerInfo; 51. //OperationContext::lockState LockerImpl<>::getLockerInfo 52. opCtx->lockState()->getLockerInfo(&lockerInfo); 53. 54. //OpDebug::report 記錄慢日誌到日誌文件 55. log() << debug.report(&c, currentOp, lockerInfo.stats); 56. } 57. //各類統計信息 58. recordCurOpMetrics(opCtx); 59.}
Mongod的handleRequest()接口主要完成如下工做:
- 從Message中獲取OpCode,早期版本每一個命令又對應取值,例如增刪改查早期版本分別對應:dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6開始,默認請求對應OpCode都是OP_MSG,本文默認只分析OpCode=OP_MSG相關的處理。
- 獲取本操做對應的Client客戶端信息。
- 若是是早期版本,經過Message構造DbMessage,同時解析出庫.表信息。
- 根據不一樣OpCode執行對應回調操做,OP_MSG對應操做爲runCommands(...),獲取的數據經過dbresponse返回。
- 獲取到db層返回的數據後,進行慢日誌判斷,若是db層數據訪問超過閥值,記錄慢日誌。
- 設置debug的各類統計信息。
4.2 命令解析及查找
從上面的分析能夠看出,接口最後調用runCommands(...),該接口核心代碼實現以下所示:
1.//message解析出對應command執行 2.DbResponse runCommands(OperationContext* opCtx, const Message& message) { 3. //獲取message對應的ReplyBuilder,3.6默認對應OpMsgReplyBuilder 4. //應答數據經過該類構造 5. auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); 6. [&] { 7. OpMsgRequest request; 8. try { // Parse. 9. //協議解析 根據message獲取對應OpMsgRequest 10. request = rpc::opMsgRequestFromAnyProtocol(message); 11. } 12. } 13. try { // Execute. 14. //opCtx初始化 15. curOpCommandSetup(opCtx, request); 16. //command初始化爲Null 17. Command* c = nullptr; 18. //OpMsgRequest::getCommandName查找 19. if (!(c = Command::findCommand(request.getCommandName()))) { 20. //沒有找到相應的command的後續異常處理 21. ...... 22. } 23. //執行command命令,獲取到的數據經過replyBuilder.get()返回 24. execCommandDatabase(opCtx, c, request, replyBuilder.get()); 25. } 26. //OpMsgReplyBuilder::done對數據進行序列化操做 27. auto response = replyBuilder->done(); 28. //responseLength賦值 29. CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); 30. // 返回 31. return DbResponse{std::move(response)}; 32.}
RunCommands(...)接口從message中解析出OpMsg信息,而後獲取該OpMsg對應的command命令信息,最後執行該命令對應的後續處理操做。主要功能說明以下:
- 獲取該OpCode對應replyBuilder,OP_MSG操做對應builder爲OpMsgReplyBuilder。
- 根據message解析出OpMsgRequest數據,OpMsgRequest來中包含了真正的命令請求bson信息。
- opCtx初始化操做。
- 經過request.getCommandName()返回命令信息(如「find」、「update」等字符串)。
- 經過Command::findCommand(command name)從CommandMap這個map表中查找是否支持該command命令。若是沒找到說明不支持,若是找到說明支持。
- 調用execCommandDatabase(...)執行該命令,並獲取命令的執行結果。
- 根據command執行結果構造response並返回
4.3 命令執行
1.void execCommandDatabase(...) { 2. ...... 3. //獲取dbname 4. const auto dbname = request.getDatabase().toString(); 5. ...... 6. //mab表存放從bson中解析出的elem信息 7. StringMap<int> topLevelFields; 8. //body elem解析 9. for (auto&& element : request.body) { 10. //獲取bson中的elem信息 11. StringData fieldName = element.fieldNameStringData(); 12. //若是elem信息重複,則異常處理 13. ...... 14. } 15. //若是是help命令,則給出help提示 16. if (Command::isHelpRequest(helpField)) { 17. //給出help提示 18. Command::generateHelpResponse(opCtx, replyBuilder, *command); 19. return; 20. } 21. //權限認證檢查,檢查該命令執行權限 22. uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); 23. ...... 24. 25. //該命令執行次數統計 db.serverStatus().metrics.commands能夠獲取統計信息 26. command->incrementCommandsExecuted(); 27. //真正的命令執行在這裏面 28. retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime); 29. //該命令執行失敗次數統計 30. if (!retval) { 31. command->incrementCommandsFailed(); 32. } 33. ...... 34.}
execCommandDatabase(...)最終調用RunCommandImpl(...)進行對應命令的真正處理,該接口核心代碼實現以下:
1.bool runCommandImpl(...) { 2. //獲取命令請求內容body 3. BSONObj cmd = request.body; 4. //獲取請求中的DB庫信息 5. const std::string db = request.getDatabase().toString(); 6. //ReadConcern檢查 7. Status rcStatus = waitForReadConcern( 8. opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd)); 9. //ReadConcern檢查不經過,直接異常提示處理 10. if (!rcStatus.isOK()) { 11. //異常處理 12. return; 13. } 14. if (!command->supportsWriteConcern(cmd)) { 15. //命令不支持WriteConcern,可是對應的請求中卻帶有WriteConcern配置,直接報錯不支持 16. if (commandSpecifiesWriteConcern(cmd)) { 17. //異常處理"Command does not support writeConcern" 18. ...... 19. return result; 20. } 21. //調用Command::publicRun執行不一樣命令操做 22. result = command->publicRun(opCtx, request, inPlaceReplyBob); 23. } 24. //提取WriteConcernOptions信息 25. auto wcResult = extractWriteConcern(opCtx, cmd, db); 26. //提取異常,直接異常處理 27. if (!wcResult.isOK()) { 28. //異常處理 29. ...... 30. return result; 31. } 32. ...... 33. //執行對應的命令Command::publicRun,執行不一樣命令操做 34. result = command->publicRun(opCtx, request, inPlaceReplyBob); 35. ...... 36.}
RunCommandImpl(...)接口最終調用該接口入參的command,執行 command->publicRun(...)接口,也就是命令模塊的公共publicRun。
4.4 總結
Mongod服務入口首先從message中解析出opCode操做碼,3.6版本對應客戶端默認操做碼爲OP_MSQ,解析出該操做對應OpMsgRequest信息。而後從message原始數據中解析出command命令字符串後,繼續經過全局Map表種查找是否支持該命令操做,若是支持則執行該命令;若是不支持,直接異常打印,同時返回。
5. Mongos實例服務入口核心代碼實現
mongos服務入口核心代碼實現過程和mongod服務入口代碼實現流程幾乎相同,mongos實例message解析、OP_MSG操做碼處理、command命令查找等流程和上一章節mongod實例處理過程相似,本章節不在詳細分析。Mongos實例服務入口處理調用流程以下:
ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)
最後的接口核心代碼實現以下:
1.void runCommand(...) { 2. ...... 3. //獲取請求命令name 4. auto const commandName = request.getCommandName(); 5. //從全局map表中查找 6. auto const command = Command::findCommand(commandName); 7. //沒有對應的command存在,拋異常說明不支持該命令 8. if (!command) { 9. ...... 10. return; 11. } 12. ...... 13. //執行命令 14. execCommandClient(opCtx, command, request, builder); 15. ...... 16.} 17. 18.void execCommandClient(...) 19.{ 20. ...... 21. //認證檢查,是否有操做該command命令的權限,沒有則異常提示 22. Status status = Command::checkAuthorization(c, opCtx, request); 23. if (!status.isOK()) { 24. Command::appendCommandStatus(result, status); 25. return; 26. } 27. //該命令的執行次數自增,代理上面也是要計數的 28. c->incrementCommandsExecuted(); 29. //若是須要command統計,則加1 30. if (c->shouldAffectCommandCounter()) { 31. globalOpCounters.gotCommand(); 32. } 33. ...... 34. //有部分命令不支持writeconcern配置,報錯 35. bool supportsWriteConcern = c->supportsWriteConcern(request.body); 36. //不支持writeconcern又帶有該參數的請求,直接異常處理"Command does not support writeConcern" 37. if (!supportsWriteConcern && !wcResult.getValue().usedDefault) { 38. ...... 39. return; 40. } 41. //執行本命令對應的公共publicRun接口,Command::publicRun 42. ok = c->publicRun(opCtx, request, result); 43. ...... 44.}
- Tips: mongos和mongod實例服務入口核心代碼實現的一點小區別
- Mongod實例opCode操做碼解析、OpMsg解析、command查找及對應命令調用處理都由class ServiceEntryPointMongod{...}類一塊兒完成。
- mongos實例則把opCode操做碼解析交由class ServiceEntryPointMongos{...}類實現,OpMsg解析、command查找及對應命令調用處理放到了clase Strategy{...}類來處理。
6. 總結
Mongodb報文解析及組裝流程總結
- 一個完整mongodb報文由通用報文header頭部+body部分組成。
- Body部份內容,根據報文頭部的opCode來決定不一樣的body內容。
- 3.6版本對應客戶端請求opCode默認爲OP_MSG,該操做碼對應body部分由flagBits + sections + checksum組成,其中sections 中存放的是真正的命令請求信息,已bson數據格式保存。
- Header頭部和body報文體封裝及解析過程由class Message {...}類實現
- Body中對應command命令名、庫名、表名的解析在mongodb(version<3.6)低版本協議中由class DbMessage {...}類實現
- Body中對應command命令名、庫名、表名的解析在mongodb(version<3.6)低版本協議中由struct OpMsgRequest{...}結構和struct OpMsg {...}類實現
Mongos和mongod實例的服務入口處理流程大同小異,總體處理流程以下:
- 從message解析出opCode操做碼,根據不一樣操做碼執行對應操做碼回調。
- 根據message解析出OpMsg request信息,mongodb報文的命令信息就存儲在該body中,該body已bson格式存儲。
- 從body中解析出command命令字符串信息(如「insert」、「update」等)。
- 從全局_commands map表中查找是否支持該命令,若是支持則執行該命令處理,若是不支持則直接報錯提示。
- 最終找到對應command命令後,執行command的功能run接口。
圖形化總結以下:
說明:第3章的協議解析及封裝過程實際上應該算是網絡處理模塊範疇,本文爲了分析command命令處理模塊方便,把該部分實現概括到了命令處理模塊,這樣方便理解。
Tips: 下期繼續分享不一樣command命令執行細節。
7. 遺留問題
第1章節中的統計信息,將在command模塊核心代碼分析完畢後揭曉答案,《mongodb command命令處理模塊源碼實現二》中繼續分析,敬請關注。