mongodb內核源碼實現、性能調優、最佳運維實踐系列-command命令處理模塊源碼實現一

關於做者

      前滴滴出行技術專家,現任OPPO文檔數據庫mongodb負責人,負責oppo千萬級峯值TPS/十萬億級數據量文檔數據庫mongodb內核研發及運維工做,一直專一於分佈式緩存、高性能服務端、數據庫、中間件等相關研發。後續持續分享《MongoDB內核源碼設計、性能優化、最佳運維實踐》,Github帳號地址:https://github.com/y123456yzgit

  1. 背景

      <<transport_layer網絡傳輸層模塊源碼實現>>中分享了mongodb內核底層網絡IO處理相關實現,包括套接字初始化、一個完整mongodb報文的讀取、獲取到DB數據發送給客戶端等。Mongodb支持多種增、刪、改、查、聚合處理、cluster處理等操做,每一個操做在內核實現中對應一個command,每一個command有不一樣的功能,mongodb內核如何進行command源碼處理將是本文分析的重點github

      此外,mongodb提供了mongostat工具來監控當前集羣的各類操做統計。Mongostat監控統計以下圖所示:mongodb

      其中,insert、delete、update、query這四項統計比較好理解,分別對應增、刪、改、查。可是,comand、getmore不是很好理解,command表明什麼統計?getMore表明什麼統計?,這兩項相對比較難理解。數據庫

      此外,經過本文字分析,咱們將搞明白這六項統計的具體含義,同時弄清這六項統計由那些操做進行計數。緩存

      Command命令處理模塊分爲:mongos操做命令、mongod操做命令、mongodb集羣內部命令,具體定義以下:性能優化

  • mongos操做命令,客戶端能夠經過mongos訪問集羣相關的命令。
  • mongod操做命令:客戶端能夠經過mongod複製集和cfg server訪問集羣的相關命令。
  • mongodb集羣內部命令:mongos、mongod、mongo-cfg集羣實例之間交互的命令。

      Command命令處理模塊核心代碼實現以下:網絡

     《command命令處理模塊源碼實現》相關文章重點分析命令處理模塊核心代碼實現,也就是上面截圖中的命令處理源碼文件實現。session

2. <<transport_layer網絡傳輸層模塊源碼實現>>銜接回顧app

      <<transport_layer網絡傳輸層模塊源碼實現三>>一文中,咱們對service_state_machine狀態機調度子模塊進行了分析,該模塊中的dealTask任務進行mongodb內部業務邏輯處理,其核心實現以下:less

1.//dealTask處理  
2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
3.    ......
4.    //command處理、DB訪問後的數據經過dbresponse返回  
5.    DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
6.    ......
7.}

      上面的_sep對應mongod或者mongos實例的服務入口實現,該_seq成員分別在以下代碼中初始化爲ServiceEntryPointMongod和ServiceEntryPointMongod類實現。SSM狀態機的_seq成員初始化賦值核心代碼實現以下:

1.//mongos實例啓動初始化  
2.static ExitCode runMongosServer() {  
3.    ......  
4.    //mongos實例對應sep爲ServiceEntryPointMongos  
5.    auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());  
6.    getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));  
7.    ......  
8.}  
9.  
10.//mongod實例啓動初始化  
11.ExitCode _initAndListen(int listenPort) {  
12.    ......  
13.    //mongod實例對應sep爲ServiceEntryPointMongod  
14.    serviceContext->setServiceEntryPoint(  
15.        stdx::make_unique<ServiceEntryPointMongod>(serviceContext));  
16.    ......  
17.}  
18.  
19.//SSM狀態機初始化  
20.ServiceStateMachine::ServiceStateMachine(...)  
21.    : _state{State::Created},  
22.      //mongod和mongos實例的服務入口經過這裏賦值給_seq成員變量  
23.      _sep{svcContext->getServiceEntryPoint()},  
24.      ......  
} 

         經過上面的幾個核心接口實現,把mongos和mongod兩個實例的服務入口與狀態機SSM(ServiceStateMachine)聯繫起來,最終和下面的command命令處理模塊關聯。

     dealTask進行一次mongodb請求的內部邏輯處理,該處理由_sep->handleRequest()接口實現。因爲mongos和mongod服務入口分別由ServiceEntryPointMongosServiceEntryPointMongod兩個類實現,所以dealTask也就演變爲以下接口處理:

  • mongos實例:ServiceEntryPointMongos::handleRequest(...)
  • Mongod實例::ServiceEntryPointMongod::handleRequest(...)

      這兩個接口入參都是OperationContext和Message,分別對應操做上下文、請求原始數據內容。下文會分析Message解析實現、OperationContext服務上下文實現將在後續章節分析。

      Mongod和mongos實例服務入口類都繼承自網絡傳輸模塊中的ServiceEntryPointImpl類,以下圖所示:

      Tips: mongos和mongod服務入口類爲什麼要繼承網絡傳輸模塊服務入口類?

      緣由是一個請求對應一個連接session,該session對應的請求又和SSM狀態機惟一對應。全部客戶端請求對應的SSM狀態機信息所有保存再ServiceEntryPointImpl._sessions成員中,而command命令處理模塊爲SSM狀態機任務中的dealTask任務,經過該繼承關係,ServiceEntryPointMongod和ServiceEntryPointMongos子類也就能夠和狀態機及任務處理關聯起來,同時也能夠獲取當前請求對應的session連接信息。

3. Mongodb協議解析

在《transport_layer網絡傳輸層模塊源碼實現二》中的數據收發子模塊完成了一個完整mongodb報文的接收,一個mongodb報文由Header頭部+opCode包體組成,以下圖所示:

      上圖中各個字段說明以下表:

      opCode取值比較多,早期版本中OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY分別針對增刪改查請求,Mongodb從3.6版本開始默認使用OP_MSG操做做爲默認opCode,是一種可擴展的消息格式,旨在包含其餘操做碼的功能,新版本讀寫請求協議都對應該操做碼。本文以OP_MSG操做碼對應協議爲例進行分析,其餘操做碼協議分析過程相似,OP_MSG請求協議格式以下:

1.OP_MSG {  
2.    //mongodb報文頭部  
3.    MsgHeader header;            
4.    //位圖,用於標識報文是否須要校驗 是否須要應答等  
5.    uint32 flagBits;           // message flags  
6.    //報文內容,例如find write等命令內容經過bson格式存在於該結構中  
7.    Sections[] sections;       // data sections  
8.    //報文CRC校驗  
9.    optional<uint32> checksum; // optional CRC-32C checksum  
} 

      OP_MSG各個字段說明以下表:

     一個完整OP_MSG請求格式以下:

      除了通用頭部header外,客戶端命令請求實際上都保存於sections字段中,該字段存放的是請求的原始bson格式數據。BSON是由10gen開發的一個數據格式,目前主要用於MongoDB中,是MongoDB的數據存儲格式。BSON基於JSON格式,選擇JSON進行改造的緣由主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具備如下特性

  • Lightweight(更輕量級)
  • Traversable(易操做)
  • Efficient(高效性能)

      本文重點不是分析bson協議格式,bson協議實現細節將在後續章節分享。bson協議更多設計細節詳見:http://bsonspec.org/

      總結:一個完整mongodb報文由header+body組成,其中header長度固定爲16字節,body長度等於messageLength-16。Header部分協議解析由message.cpp和message.h兩源碼文件實現,body部分對應的OP_MSG類請求解析由op_msg.cpp和op_msg.h兩源碼文件實現。

3. mongodb報文通用頭部解析及封裝源碼實現

      Header頭部解析由src/mongo/util/net目錄下message.cpp和message.h兩文件完成,該類主要完成通用header頭部和body部分的解析、封裝。所以報文頭部核心代碼分爲如下兩類:

  • 報文頭部內容解析及封裝(MSGHEADER命名空間實現)
  • 頭部和body內容解析及封裝(MsgData命名空間實現)

3.1 mongodb報文頭部解析及封裝核心代碼實現

      mongodb報文頭部解析由namespace MSGHEADER {...}實現,該類主要成員及接口實現以下:

1.namespace MSGHEADER {  
2.//header頭部各個字段信息  
3.struct Layout {  
4.    //整個message長度,包括header長度和body長度  
5.    int32_t messageLength;     
6.    //requestID 該請求id信息  
7.    int32_t requestID;         
8.    //getResponseToMsgId解析  
9.    int32_t responseTo;        
10.    //操做類型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等  
11.    int32_t opCode;  
12.};  
13.  
14.//ConstView實現header頭部數據解析  
15.class ConstView {   
16.public:  
17.    ......  
18.    //初始化構造  
19.    ConstView(const char* data) : _data(data) {}  
20.    //獲取_data地址  
21.    const char* view2ptr() const {  
22.        return data().view();  
23.    }  
24.    //TransportLayerASIO::ASIOSourceTicket::_headerCallback調用  
25.    //解析header頭部的messageLength字段  
26.    int32_t getMessageLength() const {  
27.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));  
28.    }  
29.    //解析header頭部的requestID字段  
30.    int32_t getRequestMsgId() const {  
31.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));  
32.    }  
33.    //解析header頭部的getResponseToMsgId字段  
34.    int32_t getResponseToMsgId() const {  
35.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));  
36.    }  
37.    //解析header頭部的opCode字段  
38.    int32_t getOpCode() const {  
39.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));  
40.    }  
41.  
42.protected:  
43.    //mongodb報文數據起始地址  
44.    const view_type& data() const {  
45.        return _data;  
46.    }  
47.private:  
48.    //數據部分  
49.    view_type _data;  
50.};  
51.  
52.//View填充header頭部數據  
53.class View : public ConstView {  
54.public:  
55.    ......  
56.    //構造初始化  
57.    View(char* data) : ConstView(data) {}  
58.    //header起始地址  
59.    char* view2ptr() {  
60.        return data().view();  
61.    }  
62.    //如下四個接口進行header填充  
63.    //填充header頭部messageLength字段  
64.    void setMessageLength(int32_t value) {  
65.        data().write(tagLittleEndian(value), offsetof(Layout, messageLength));  
66.    }  
67.    //填充header頭部requestID字段  
68.    void setRequestMsgId(int32_t value) {  
69.        data().write(tagLittleEndian(value), offsetof(Layout, requestID));  
70.    }  
71.    //填充header頭部responseTo字段  
72.    void setResponseToMsgId(int32_t value) {  
73.        data().write(tagLittleEndian(value), offsetof(Layout, responseTo));  
74.    }  
75.    //填充header頭部opCode字段  
76.    void setOpCode(int32_t value) {  
77.        data().write(tagLittleEndian(value), offsetof(Layout, opCode));  
78.    }  
79.private:  
80.    //指向header起始地址  
81.    view_type data() const {  
82.        return const_cast<char*>(ConstView::view2ptr());  
83.    }  
84.};  
85.}

      從上面的header頭部解析、填充的實現類能夠看出,header頭部解析由MSGHEADER::ConstView實現;header頭部填充由MSGHEADER::View完成。實際上代碼實現上,經過offsetof來進行移位,從而快速定位到頭部對應字段。

3.2 mongodb報文頭部+body解析封裝核心代碼實現

      Namespace MSGHEADER{...}命名空間只負責header頭部的處理,namespace MsgData{...}命名空間相對MSGHEADER命名空間更加完善,除了處理頭部解析封裝外,還負責body數據起始地址維護、body數據封裝、數據長度檢查等。MsgData命名空間核心代碼實現以下:

1.namespace MsgData {  
2.struct Layout {  
3.    //數據填充組成:header部分  
4.    MSGHEADER::Layout header;  
5.    //數據填充組成: body部分,body先用data佔位置  
6.    char data[4];  
7.};  
8.  
9.//解析header字段信息及body其實地址信息  
10.class ConstView {  
11.public:  
12.    //初始化構造  
13.    ConstView(const char* storage) : _storage(storage) {}  
14.    //獲取數據起始地址  
15.    const char* view2ptr() const {  
16.        return storage().view();  
17.    }  
18.  
19.    //如下四個接口間接執行前面的MSGHEADER中的頭部字段解析  
20.    //填充header頭部messageLength字段  
21.    int32_t getLen() const {  
22.        return header().getMessageLength();  
23.    }  
24.    //填充header頭部requestID字段  
25.    int32_t getId() const {  
26.        return header().getRequestMsgId();  
27.    }  
28.    //填充header頭部responseTo字段  
29.    int32_t getResponseToMsgId() const {  
30.        return header().getResponseToMsgId();  
31.    }  
32.    //獲取網絡數據報文中的opCode字段  
33.    NetworkOp getNetworkOp() const {  
34.        return NetworkOp(header().getOpCode());  
35.    }  
36.    //指向body起始地址  
37.    const char* data() const {  
38.        return storage().view(offsetof(Layout, data));  
39.    }  
40.    //messageLength長度檢查,opcode檢查  
41.    bool valid() const {  
42.        if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))  
43.            return false;  
44.        if (getNetworkOp() < 0 || getNetworkOp() > 30000)  
45.            return false;  
46.        return true;  
47.    }  
48.    ......  
49.protected:  
50.    //獲取_storage  
51.    const ConstDataView& storage() const {  
52.        return _storage;  
53.    }  
54.    //指向header起始地址  
55.    MSGHEADER::ConstView header() const {  
56.        return storage().view(offsetof(Layout, header));  
57.    }  
58.private:  
59.    //mongodb報文存儲在這裏  
60.    ConstDataView _storage;  
61.};  
62.  
63.//填充數據,包括Header和body  
64.class View : public ConstView {  
65.public:  
66.    //構造初始化  
67.    View(char* storage) : ConstView(storage) {}  
68.    ......  
69.    //獲取報文起始地址  
70.    char* view2ptr() {  
71.        return storage().view();  
72.    }  
73.  
74.    //如下四個接口間接執行前面的MSGHEADER中的頭部字段構造  
75.    //如下四個接口完成msg header賦值  
76.    //填充header頭部messageLength字段  
77.    void setLen(int value) {  
78.        return header().setMessageLength(value);  
79.    }  
80.    //填充header頭部messageLength字段  
81.    void setId(int32_t value) {  
82.        return header().setRequestMsgId(value);  
83.    }  
84.    //填充header頭部messageLength字段  
85.    void setResponseToMsgId(int32_t value) {  
86.        return header().setResponseToMsgId(value);  
87.    }  
88.    //填充header頭部messageLength字段  
89.    void setOperation(int value) {  
90.        return header().setOpCode(value);  
91.    }  
92.  
93.    using ConstView::data;  
94.    //指向data  
95.    char* data() {  
96.        return storage().view(offsetof(Layout, data));  
97.    }  
98.private:  
99.    //也就是報文起始地址  
100.    DataView storage() const {  
101.        return const_cast<char*>(ConstView::view2ptr());  
102.    }  
103.    //指向header頭部  
104.    MSGHEADER::View header() const {  
105.        return storage().view(offsetof(Layout, header));  
106.    }  
107.};  
108.  
109.......  
110.//Value爲前面的Layout,減4是由於有4字節填充data,因此這個就是header長度  
111.const int MsgDataHeaderSize = sizeof(Value) - 4;  
112.  
113.//除去頭部後的數據部分長度  
114.inline int ConstView::dataLen() const {   
115.    return getLen() - MsgDataHeaderSize;  
116.}  
117.}  // namespace MsgData  

       和MSGHEADER命名空間相比,MsgData這個namespace命名空間接口實現和前面的MSGHEADER命名空間實現大同小異。MsgData不只僅處理header頭部的解析組裝,還負責body部分數據頭部指針指向、頭部長度檢查、opCode檢查、數據填充等。其中,MsgData命名空間中header頭部的解析構造底層依賴MSGHEADER實現。

3.3 Message/DbMessage核心代碼實現

      在《transport_layer網絡傳輸層模塊源碼實現二》中,從底層ASIO庫接收到的mongodb報文是存放在Message結構中存儲,最終存放在ServiceStateMachine._inMessage成員中。

      在前面第2章咱們知道mongod和mongso實例的服務入口接口handleRequest(...)中都帶有Message入參,也就是接收到的Message數據經過該接口處理。Message類主要接口實現以下:

1.//DbMessage._msg成員爲該類型  
2.class Message {  
3.public:  
4.    //message初始化  
5.    explicit Message(SharedBuffer data) : _buf(std::move(data)) {}  
6.    //頭部header數據  
7.    MsgData::View header() const {  
8.        verify(!empty());  
9.        return _buf.get();  
10.    }  
11.    //獲取網絡數據報文中的op字段  
12.    NetworkOp operation() const {  
13.        return header().getNetworkOp();  
14.    }  
15.    //_buf釋放爲空  
16.    bool empty() const {  
17.        return !_buf;  
18.    }  
19.    //獲取報文總長度messageLength  
20.    int size() const {  
21.        if (_buf) {  
22.            return MsgData::ConstView(_buf.get()).getLen();  
23.        }  
24.        return 0;  
25.    }  
26.    //body長度  
27.    int dataSize() const {  
28.        return size() - sizeof(MSGHEADER::Value);  
29.    }  
30.    //buf重置  
31.    void reset() {  
32.        _buf = {};  
33.    }  
34.    // use to set first buffer if empty  
35.    //_buf直接使用buf空間  
36.    void setData(SharedBuffer buf) {  
37.        verify(empty());  
38.        _buf = std::move(buf);  
39.    }  
40.     //把msgtxt拷貝到_buf中  
41.    void setData(int operation, const char* msgtxt) {  
42.        setData(operation, msgtxt, strlen(msgtxt) + 1);  
43.    }  
44.    //根據operation和msgdata構造一個完整mongodb報文  
45.    void setData(int operation, const char* msgdata, size_t len) {  
46.        verify(empty());  
47.        size_t dataLen = len + sizeof(MsgData::Value) - 4;  
48.        _buf = SharedBuffer::allocate(dataLen);  
49.        MsgData::View d = _buf.get();  
50.        if (len)  
51.            memcpy(d.data(), msgdata, len);  
52.        d.setLen(dataLen);  
53.        d.setOperation(operation);  
54.    }  
55.    ......  
56.    //獲取_buf對應指針  
57.    const char* buf() const {  
58.        return _buf.get();  
59.    }  
60.  
61.private:  
62.    //存放接收數據的buf  
63.    SharedBuffer _buf;  
64.};  

       Message是操做mongodb收發報文最直接的實現類,該類主要完成一個完整mongodb報文封裝。有關mongodb報文頭後面的body更多的解析實如今DbMessage類中完成,DbMessage類包含Message類成員_msg。實際上,Message報文信息在handleRequest(...)實例服務入口中賦值給DbMessage._msg,報文後續的body處理繼續由DbMessage類相關接口完成處理。DbMessage和Message類關係以下:

1.class DbMessage {  
2.    ......  
3.    //包含Message成員變量  
4.    const Message& _msg;  
5.    //mongodb報文起始地址
6.    const char* _nsStart; 
7.    //報文結束地址
8.    const char* _theEnd; 
9.}  
10.  
11.DbMessage::DbMessage(const Message& msg) : _msg(msg),   
12.  _nsStart(NULL), _mark(NULL), _nsLen(0) {  
13.    //一個mongodb報文(header+body)數據的結束地址  
14.    _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();  
15.    //報文起始地址 [_nextjsobj, _theEnd ]之間的數據就是一個完整mongodb報文  
16.    _nextjsobj = _msg.singleData().data();  
17.    ......  
18.} 

      DbMessage._msg成員爲DbMessage 類型,DbMessage_nsStart_theEnd成員分別記錄完整mongodb報文的起始地址和結束地址,經過這兩個指針就能夠獲取一個完整mongodb報文的所有內容,包括header和body。

      注意:DbMessage是早期mongodb版本(version<3.6)中用於報文body解析封裝的類,這些類針對opCode=[dbUpdate, dbDelete]這個區間的操做。在mongodb新版本(version>=3.6)中,body解析及封裝由op_msg.h和op_msg.cpp代碼文件中的clase OpMsgRequest{}完成處理。

3.4 OpMsg報文解析封裝核心代碼實現

        Mongodb從3.6版本開始默認使用OP_MSG操做做爲默認opCode,是一種可擴展的消息格式,旨在包含其餘操做碼的功能,新版本讀寫請求協議都對應該操做碼。OP_MSG對應mongodb報文body解析封裝處理由OpMsg類相關接口完成,OpMsg::parse(Message)從Message中解析出報文body內容,其核心代碼實現以下:

1.struct OpMsg {   
2.      ......  
3.    //msg解析賦值見OpMsg::parse     
4.    //各類命令(insert update find等)都存放在該body中  
5.    BSONObj body;    
6.    //sequences用法暫時沒看懂,感受沒什麼用?先跳過  
7.    std::vector<DocumentSequence> sequences; //賦值見OpMsg::parse  
8.}  

1.//從message中解析出OpMsg信息  
2.OpMsg OpMsg::parse(const Message& message) try {  
3.    //message不能爲空,而且opCode必須爲dbMsg  
4.    invariant(!message.empty());  
5.    invariant(message.operation() == dbMsg);  
6.    //獲取flagBits  
7.    const uint32_t flags = OpMsg::flags(message);  
8.    //flagBits有效性檢查,bit 0-15中只能對第0和第1位操做  
9.    uassert(ErrorCodes::IllegalOpMsgFlag,  
10.            str::stream() << "Message contains illegal flags value: Ob"  
11.                          << std::bitset<32>(flags).to_string(),  
12.            !containsUnknownRequiredFlags(flags));  
13.  
14.    //校驗碼默認4字節  
15.    constexpr int kCrc32Size = 4;  
16.    //判斷該mongo報文body內容是否啓用了校驗功能  
17.    const bool haveChecksum = flags & kChecksumPresent;  
18.    //若是有啓用校驗功能,則報文末尾4字節爲校驗碼  
19.    const int checksumSize = haveChecksum ? kCrc32Size : 0;  
20.    //sections字段內容  
21.    BufReader sectionsBuf(message.singleData().data() + sizeof(flags),  
22.                          message.dataSize() - sizeof(flags) - checksumSize);  
23.  
24.    //默認先設置位false  
25.    bool haveBody = false;  
26.    OpMsg msg;  
27.    //解析sections對應命令請求數據  
28.    while (!sectionsBuf.atEof()) {  
29.        //BufReader::read讀取kind內容,一個字節  
30.        const auto sectionKind = sectionsBuf.read<Section>();  
31.        //kind爲0對應命令請求body內容,內容經過bson報錯  
32.        switch (sectionKind) {  
33.            //sections第一個字節是0說明是body  
34.            case Section::kBody: {  
35.                //默認只能有一個body  
36.                uassert(40430, "Multiple body sections in message", !haveBody);  
37.                haveBody = true;  
38.                //命令請求的bson信息保存在這裏  
39.                msg.body = sectionsBuf.read<Validated<BSONObj>>();  
40.                break;  
41.            }  
42.  
43.            //DocSequence暫時沒看明白,用到的地方不多,跳過,後續等  
44.            //該系列文章主流功能分析完成後,從頭再回首分析  
45.            case Section::kDocSequence: {  
46.                  ......  
47.            }  
48.        }  
49.    }  
50.    //OP_MSG必須有body內容  
51.    uassert(40587, "OP_MSG messages must have a body", haveBody);  
52.    //body和sequence去重判斷  
53.    for (const auto& docSeq : msg.sequences) {  
54.        ......  
55.    }  
56.    return msg;  
57.}  

        OpMsg類被OpMsgRequest類繼承,OpMsgRequest類中核心接口就是解析出OpMsg.body中的庫信息和表信息,OpMsgRequest類代碼實現以下:

1.//協議解析得時候會用到,見runCommands  
2.struct OpMsgRequest : public OpMsg {  
3.    ......  
4.    //構造初始化  
5.    explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}  
6.    //opMsgRequestFromAnyProtocol->OpMsgRequest::parse   
7.    //從message中解析出OpMsg所需成員信息  
8.    static OpMsgRequest parse(const Message& message) {  
9.        //OpMsg::parse  
10.        return OpMsgRequest(OpMsg::parse(message));  
11.    }  
12.    //根據db body extraFields填充OpMsgRequest  
13.    static OpMsgRequest fromDBAndBody(... {  
14.        OpMsgRequest request;  
15.        request.body = ([&] {  
16.            //填充request.body  
17.            ......  
18.        }());  
19.        return request;  
20.    }  
21.    //從body中獲取db name  
22.    StringData getDatabase() const {  
23.        if (auto elem = body["$db"])  
24.            return elem.checkAndGetStringData();  
25.        uasserted(40571, "OP_MSG requests require a $db argument");  
26.    }  
27.    //find  insert 等命令信息  body中的第一個elem就是command 名  
28.    StringData getCommandName() const {  
29.        return body.firstElementFieldName();  
30.    }  
31.};  

       OpMsgRequest經過OpMsg::parse(message)解析出OpMsg信息,從而獲取到body內容,GetCommandName()接口和getDatabase()則分別從body中獲取庫DB信息、命令名信息。經過該類相關接口,命令名(find、write、update等)和DB庫都獲取到了。

      OpMsg模塊除了OP_MSG相關報文解析外,還負責OP_MSG報文組裝填充,該模塊接口功能大全以下表:

4. Mongod實例服務入口核心代碼實現

      Mongod實例服務入口類ServiceEntryPointMongod繼承ServiceEntryPointImpl類,mongod實例的報文解析處理、命令解析、命令執行都由該類負責處理。ServiceEntryPointMongod核心接口能夠細分爲:opCode解析及回調處理、命令解析及查找、命令執行三個子模塊。

4.1 opCode解析及回調處理

      OpCode操做碼解析及其回調處理由ServiceEntryPointMongod::handleRequest(...)接口實現,核心代碼實現以下:

1.//mongod服務對於客戶端請求的處理    
2.//經過狀態機SSM模塊的以下接口調用:ServiceStateMachine::_processMessage  
3.DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {  
4.    //獲取opCode,3.6版本對應客戶端默認使用OP_MSG  
5.    NetworkOp op = m.operation();   
6.    ......  
7.    //根據message構造DbMessage  
8.    DbMessage dbmsg(m);  
9.    //根據操做上下文獲取對應的client  
10.    Client& c = *opCtx->getClient();    
11.    ......  
12.    //獲取庫.表信息,注意只有dbUpdate<opCode<dbDelete的opCode請求才經過dbmsg直接獲取庫和表信息  
13.    const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;  
14.    const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();  
15.    ....  
16.    //CurOp::debug 初始化opDebug,慢日誌相關記錄  
17.    OpDebug& debug = currentOp.debug();  
18.    //慢日誌閥值  
19.    long long logThresholdMs = serverGlobalParams.slowMS;  
20.    //時mongodb將記錄此次慢操做,1爲只記錄慢操做,即操做時間大於了設置的配置,2表示記錄全部操做    
21.    bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));  
22.    DbResponse dbresponse;  
23.    if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {  
24.        //新版本op=dbMsg,所以走這裏  
25.        //從DB獲取數據,獲取到的數據經過dbresponse返回  
26.        dbresponse = runCommands(opCtx, m);     
27.    } else if (op == dbQuery) {  
28.        ......   
29.        //早期mongodb版本查詢走這裏  
30.        dbresponse = receivedQuery(opCtx, nsString, c, m);  
31.    } else if (op == dbGetMore) {    
32.        //早期mongodb版本查詢走這裏  
33.        dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);  
34.    } else {  
35.        ......  
36.        //早期版本增 刪 改走這裏處理  
37.         if (op == dbInsert) {  
38.              receivedInsert(opCtx, nsString, m); //插入操做入口   新版本CmdInsert::runImpl  
39.         } else if (op == dbUpdate) {  
40.              receivedUpdate(opCtx, nsString, m); //更新操做入口    
41.         } else if (op == dbDelete) {  
42.              receivedDelete(opCtx, nsString, m); //刪除操做入口    
43.         }   
44.    }  
45.    //獲取runCommands執行時間,也就是內部處理時間  
46.    debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());  
47.    ......  
48.    //慢日誌記錄  
49.    if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {  
50.        Locker::LockerInfo lockerInfo;    
51.        //OperationContext::lockState  LockerImpl<>::getLockerInfo  
52.        opCtx->lockState()->getLockerInfo(&lockerInfo);   
53.  
54.    //OpDebug::report 記錄慢日誌到日誌文件  
55.        log() << debug.report(&c, currentOp, lockerInfo.stats);   
56.    }  
57.    //各類統計信息  
58.    recordCurOpMetrics(opCtx);  
59.}  

      Mongod的handleRequest()接口主要完成如下工做:

  • 從Message中獲取OpCode,早期版本每一個命令又對應取值,例如增刪改查早期版本分別對應:dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6開始,默認請求對應OpCode都是OP_MSG,本文默認只分析OpCode=OP_MSG相關的處理。
  • 獲取本操做對應的Client客戶端信息。
  • 若是是早期版本,經過Message構造DbMessage,同時解析出庫.表信息。
  • 根據不一樣OpCode執行對應回調操做,OP_MSG對應操做爲runCommands(...),獲取的數據經過dbresponse返回。
  • 獲取到db層返回的數據後,進行慢日誌判斷,若是db層數據訪問超過閥值,記錄慢日誌。
  • 設置debug的各類統計信息。

4.2 命令解析及查找

      從上面的分析能夠看出,接口最後調用runCommands(...),該接口核心代碼實現以下所示:

1.//message解析出對應command執行  
2.DbResponse runCommands(OperationContext* opCtx, const Message& message) {  
3.    //獲取message對應的ReplyBuilder,3.6默認對應OpMsgReplyBuilder  
4.    //應答數據經過該類構造  
5.    auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));  
6.    [&] {  
7.        OpMsgRequest request;  
8.        try {  // Parse.  
9.            //協議解析 根據message獲取對應OpMsgRequest  
10.            request = rpc::opMsgRequestFromAnyProtocol(message);  
11.        }   
12.    }  
13.    try {  // Execute.  
14.        //opCtx初始化  
15.        curOpCommandSetup(opCtx, request);  
16.        //command初始化爲Null  
17.        Command* c = nullptr;  
18.        //OpMsgRequest::getCommandName查找  
19.        if (!(c = Command::findCommand(request.getCommandName()))) {   
20.             //沒有找到相應的command的後續異常處理  
21.             ......  
22.        }  
23.        //執行command命令,獲取到的數據經過replyBuilder.get()返回  
24.        execCommandDatabase(opCtx, c, request, replyBuilder.get());  
25.    }  
26.    //OpMsgReplyBuilder::done對數據進行序列化操做  
27.    auto response = replyBuilder->done();  
28.    //responseLength賦值  
29.    CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();  
30.    // 返回  
31.    return DbResponse{std::move(response)};  
32.}  

      RunCommands(...)接口從message中解析出OpMsg信息,而後獲取該OpMsg對應的command命令信息,最後執行該命令對應的後續處理操做。主要功能說明以下:

  • 獲取該OpCode對應replyBuilder,OP_MSG操做對應builder爲OpMsgReplyBuilder。
  • 根據message解析出OpMsgRequest數據,OpMsgRequest來中包含了真正的命令請求bson信息。
  • opCtx初始化操做。
  • 經過request.getCommandName()返回命令信息(如「find」、「update」等字符串)。
  • 經過Command::findCommand(command name)從CommandMap這個map表中查找是否支持該command命令。若是沒找到說明不支持,若是找到說明支持。
  • 調用execCommandDatabase(...)執行該命令,並獲取命令的執行結果。
  • 根據command執行結果構造response並返回

4.3 命令執行

1.void execCommandDatabase(...) {  
2.    ......  
3.    //獲取dbname  
4.    const auto dbname = request.getDatabase().toString();  
5.    ......  
6.    //mab表存放從bson中解析出的elem信息  
7.    StringMap<int> topLevelFields;  
8.    //body elem解析  
9.    for (auto&& element : request.body) {  
10.        //獲取bson中的elem信息  
11.        StringData fieldName = element.fieldNameStringData();  
12.        //若是elem信息重複,則異常處理  
13.        ......  
14.    }  
15.    //若是是help命令,則給出help提示  
16.    if (Command::isHelpRequest(helpField)) {  
17.        //給出help提示  
18.        Command::generateHelpResponse(opCtx, replyBuilder, *command);  
19.        return;  
20.    }  
21.    //權限認證檢查,檢查該命令執行權限  
22.    uassertStatusOK(Command::checkAuthorization(command, opCtx, request));  
23.    ......  
24.  
25.    //該命令執行次數統計  db.serverStatus().metrics.commands能夠獲取統計信息  
26.    command->incrementCommandsExecuted();  
27.    //真正的命令執行在這裏面  
28.    retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);  
29.    //該命令執行失敗次數統計  
30.    if (!retval) {  
31.        command->incrementCommandsFailed();  
32.     }  
33.     ......  
34.}  

      execCommandDatabase(...)最終調用RunCommandImpl(...)進行對應命令的真正處理,該接口核心代碼實現以下:

1.bool runCommandImpl(...) {  
2.    //獲取命令請求內容body  
3.    BSONObj cmd = request.body;  
4.    //獲取請求中的DB庫信息  
5.    const std::string db = request.getDatabase().toString();  
6.    //ReadConcern檢查  
7.    Status rcStatus = waitForReadConcern(  
8.        opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));  
9.    //ReadConcern檢查不經過,直接異常提示處理  
10.    if (!rcStatus.isOK()) {  
11.         //異常處理  
12.         return;  
13.    }  
14.    if (!command->supportsWriteConcern(cmd)) {  
15.        //命令不支持WriteConcern,可是對應的請求中卻帶有WriteConcern配置,直接報錯不支持  
16.        if (commandSpecifiesWriteConcern(cmd)) {  
17.            //異常處理"Command does not support writeConcern"  
18.            ......  
19.            return result;  
20.        }  
21.    //調用Command::publicRun執行不一樣命令操做  
22.        result = command->publicRun(opCtx, request, inPlaceReplyBob);  
23.    }  
24.    //提取WriteConcernOptions信息  
25.    auto wcResult = extractWriteConcern(opCtx, cmd, db);  
26.    //提取異常,直接異常處理  
27.    if (!wcResult.isOK()) {  
28.        //異常處理  
29.        ......  
30.        return result;  
31.    }  
32.    ......  
33.    //執行對應的命令Command::publicRun,執行不一樣命令操做  
34.    result = command->publicRun(opCtx, request, inPlaceReplyBob);  
35.    ......  
36.}

      RunCommandImpl(...)接口最終調用該接口入參的command,執行 command->publicRun(...)接口,也就是命令模塊的公共publicRun

4.4 總結

      Mongod服務入口首先從message中解析出opCode操做碼,3.6版本對應客戶端默認操做碼爲OP_MSQ,解析出該操做對應OpMsgRequest信息。而後從message原始數據中解析出command命令字符串後,繼續經過全局Map表種查找是否支持該命令操做,若是支持則執行該命令;若是不支持,直接異常打印,同時返回。

5. Mongos實例服務入口核心代碼實現

       mongos服務入口核心代碼實現過程和mongod服務入口代碼實現流程幾乎相同,mongos實例message解析、OP_MSG操做碼處理、command命令查找等流程和上一章節mongod實例處理過程相似,本章節不在詳細分析。Mongos實例服務入口處理調用流程以下:

      ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)

      最後的接口核心代碼實現以下:

1.void runCommand(...) {  
2.    ......  
3.    //獲取請求命令name  
4.    auto const commandName = request.getCommandName();  
5.    //從全局map表中查找  
6.    auto const command = Command::findCommand(commandName);  
7.    //沒有對應的command存在,拋異常說明不支持該命令  
8.    if (!command) {   
9.        ......  
10.        return;  
11.    }   
12.    ......  
13.    //執行命令  
14.    execCommandClient(opCtx, command, request, builder);   
15.    ......  
16.}  
17.
18.void execCommandClient(...)  
19.{   
20.    ......  
21.    //認證檢查,是否有操做該command命令的權限,沒有則異常提示  
22.    Status status = Command::checkAuthorization(c, opCtx, request);    
23.    if (!status.isOK()) {  
24.        Command::appendCommandStatus(result, status);  
25.        return;  
26.    }  
27.    //該命令的執行次數自增,代理上面也是要計數的  
28.    c->incrementCommandsExecuted();   
29.    //若是須要command統計,則加1  
30.    if (c->shouldAffectCommandCounter()) {  
31.        globalOpCounters.gotCommand();  
32.    }  
33.    ......  
34.    //有部分命令不支持writeconcern配置,報錯  
35.    bool supportsWriteConcern = c->supportsWriteConcern(request.body);  
36.    //不支持writeconcern又帶有該參數的請求,直接異常處理"Command does not support writeConcern"  
37.    if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {  
38.        ......  
39.        return;  
40.    }  
41.    //執行本命令對應的公共publicRun接口,Command::publicRun  
42.    ok = c->publicRun(opCtx, request, result);   
43.    ......  
44.}  
  1. Tips: mongos和mongod實例服務入口核心代碼實現的一點小區別
  • Mongod實例opCode操做碼解析、OpMsg解析、command查找及對應命令調用處理都由class ServiceEntryPointMongod{...}類一塊兒完成。
  • mongos實例則把opCode操做碼解析交由class ServiceEntryPointMongos{...}類實現,OpMsg解析、command查找及對應命令調用處理放到了clase Strategy{...}類來處理。

6. 總結

        Mongodb報文解析及組裝流程總結

  • 一個完整mongodb報文由通用報文header頭部+body部分組成。
  • Body部份內容,根據報文頭部的opCode來決定不一樣的body內容。
  • 3.6版本對應客戶端請求opCode默認爲OP_MSG,該操做碼對應body部分由flagBits + sections + checksum組成,其中sections 中存放的是真正的命令請求信息,已bson數據格式保存。
  • Header頭部和body報文體封裝及解析過程由class Message {...}類實現
  • Body中對應command命令名、庫名、表名的解析在mongodb(version<3.6)低版本協議中由class DbMessage {...}類實現
  • Body中對應command命令名、庫名、表名的解析在mongodb(version<3.6)低版本協議中由struct OpMsgRequest{...}結構和struct OpMsg {...}類實現

      Mongos和mongod實例的服務入口處理流程大同小異,總體處理流程以下:

  • 從message解析出opCode操做碼,根據不一樣操做碼執行對應操做碼回調。
  • 根據message解析出OpMsg request信息,mongodb報文的命令信息就存儲在該body中,該body已bson格式存儲。
  • 從body中解析出command命令字符串信息(如「insert」、「update」等)。
  • 從全局_commands map表中查找是否支持該命令,若是支持則執行該命令處理,若是不支持則直接報錯提示。
  • 最終找到對應command命令後,執行command的功能run接口。

      圖形化總結以下:

      說明:第3章的協議解析及封裝過程實際上應該算是網絡處理模塊範疇,本文爲了分析command命令處理模塊方便,把該部分實現概括到了命令處理模塊,這樣方便理解。

    Tips: 下期繼續分享不一樣command命令執行細節。

7. 遺留問題

    第1章節中的統計信息,將在command模塊核心代碼分析完畢後揭曉答案,《mongodb command命令處理模塊源碼實現二》中繼續分析,敬請關注。

相關文章
相關標籤/搜索