mongodb內核源碼實現、性能調優、最佳運維實踐系列-mongodb網絡傳輸層模塊源碼實現二

關於做者

       前滴滴出行技術專家,現任OPPO文檔數據庫mongodb負責人,負責oppo千萬級峯值TPS/十萬億級數據量文檔數據庫mongodb內核研發及運維工做,一直專一於分佈式緩存、高性能服務端、數據庫、中間件等相關研發。後續持續分享《MongoDB內核源碼設計、性能優化、最佳運維實踐》,Github帳號地址:https://github.com/y123456yzgit

1. 說明

       在以前的<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>一文中分析瞭如何閱讀百萬級大工程源碼、Asio網絡庫實現、transport傳輸層網絡模塊中線程模型實現,可是因爲篇幅緣由,傳輸層網絡模塊中的如下模塊實現原理沒有分析,本文降將繼續分析遺留的如下子模塊:github

  1. transport_layer套接字處理及傳輸層管理子模塊
  2. session會話子模塊
  3. Ticket數據收發子模塊
  4. service_entry_point服務入口點子模塊
  5. service_state_machine狀態機子模塊(該《模塊在網絡傳輸層模塊源碼實現三》中分析)
  6. service_executor線程模型子模塊(該《模塊在網絡傳輸層模塊源碼實現四》中分析)

2. transport_layer套接字處理及傳輸層管理子模塊mongodb

        transport_layer套接字處理及傳輸層管理子模塊功能包括套接字相關初始化處理、結合asio庫實現異步accept處理、不一樣線程模型管理及初始化等,該模塊的源碼實現主要由如下幾個文件實現:數據庫

     上圖是套接字處理及傳輸層管理子模塊源碼實現的相關文件,其中mock和test文件主要用於模擬測試等,因此真正核心的代碼實現只有下表的幾個文件,對應源碼文件功能說明以下表所示:數組

文件名緩存

功能性能優化

transport_layer.h網絡

transport_layer.cppsession

該子模塊基類,經過該類把本模塊和Ticket數據分發模塊銜接起來,具體類實如今transport_layer_legacy.cpp和transport_layer_asio.cpp中運維

transport_layer_legacy.h

transport_layer_legacy.cpp

早期的傳輸模塊實現方式,如今已淘汰,不在分析

transport_layer_asio.h

transport_layer_asio.cpp

傳輸模塊Asio網絡IO處理實現,同時銜接ServiceEntryPoint服務入口模塊和Ticket數據分發模塊

2.1 核心代碼實現

       該子模塊核心代碼主要由TransportLayerManager類和TransportLayerASIO類相關接口實現。

2.1.1  TransportLayerManager類核心代碼實現

        TransportLayerManager類主要成員及接口以下:

1.//網絡會話連接,消息處理管理相關的類,在createWithConfig構造該類存入_tls  
2.class TransportLayerManager final : public TransportLayer {  
3.    //如下四個接口真正實如今TransportLayerASIO類中具體實現  
4.    Ticket sourceMessage(...) override;  
5.    Ticket sinkMessage(...) override;      
6.    Status wait(Ticket&& ticket) override;  
7.    void asyncWait(...) override;  
8.    //配置初始化實現  
9.    std::unique_ptr<TransportLayer> createWithConfig(...);  
10.  
11.    //createWithConfig中賦值,對應TransportLayerASIO,  
12.    //實際上容器中就一個成員,就是TransportLayerASIO  
13.    std::vector<std::unique_ptr<TransportLayer>> _tls;  
14.};  

       TransportLayerManager類包含一個_tls成員,該類最核心的createWithConfig接口代碼實現以下:

15.//根據配置構造相應類信息  _initAndListen中調用  
16.std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(...) {  
17.    std::unique_ptr<TransportLayer> transportLayer;  
18.    //服務類型,也就是本實例是mongos仍是mongod  
19.    //mongos對應ServiceEntryPointMongod,mongod對應ServiceEntryPointMongos  
20.    auto sep = ctx->getServiceEntryPoint();  
21.    //net.transportLayer配置模式,默認asio, legacy模式已淘汰  
22.    if (config->transportLayer == "asio") {  
23.         //同步方式仍是異步方式,默認synchronous  
24.        if (config->serviceExecutor == "adaptive") {  
25.            //動態線程池模型,也就是異步模式  
26.            opts.transportMode = transport::Mode::kAsynchronous;  
27.        } else if (config->serviceExecutor == "synchronous") {  
28.            //一個連接一個線程模型,也就是同步模式  
29.            opts.transportMode = transport::Mode::kSynchronous;  
30.        }   
31.        //若是配置是asio,構造TransportLayerASIO類  
32.        auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);  
33.        if (config->serviceExecutor == "adaptive") { //異步方式  
34.             //構造動態線程模型對應的執行器ServiceExecutorAdaptive  
35.            ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>(  
36.                ctx, transportLayerASIO->getIOContext()));  
37.         } else if (config->serviceExecutor == "synchronous") { //同步方式  
38.            //構造一個連接一個線程模型對應的執行器ServiceExecutorSynchronous  
39.            ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx));  
40.         }  
41.         //transportLayerASIO轉換爲transportLayer類  
42.         transportLayer = std::move(transportLayerASIO);  
43.    }   
44.   //transportLayer轉存到對應retVector數組中並返回  
45.    std::vector<std::unique_ptr<TransportLayer>> retVector;  
46.    retVector.emplace_back(std::move(transportLayer));  
47.    return stdx::make_unique<TransportLayerManager>(std::move(retVector));  
48.}  

        createWithConfig函數根據配置文件來肯定對應的TransportLayer,若是net.transportLayer配置爲asio,則選用TransportLayerASIO類來進行底層的網絡IO處理,若是配置爲legacy,則選用TransportLayerLegacy。legacy模式當前已淘汰,本文只分析asio模式實現。

        「asio模式包含兩種線程模型:adaptive(動態線程模型)和synchronous(同步線程模型)。adaptive模式線程設計採用動態線程方式,線程數和mongodb壓力直接相關,若是mongodb壓力大,則線程數增長;若是mongodb壓力變小,則線程數自動減小。同步線程模式也就是一個連接一個線程模型,線程數的多少和連接數的多少成正比,連接數越多則線程數也越大。

        Mongodb內核實現中經過opts.transportMode來標記asio的線程模型,這兩種模型對應標記以下:

線程模型

內核transportMode標記

說明

對應線程模型由那個類實現

adaptive

KAsynchronous(異步)

adaptive模型也能夠稱爲異步模型

ServiceExecutorAdaptive

synchronous

KSynchronous(同步)

synchronous模型也能夠稱爲同步模型

ServiceExecutorSynchronous

說明:


       adaptive線程模型被標記爲KAsynchronous,synchronous被標記爲KSynchronous是有緣由的,adaptive動態線程模型網絡IO處理藉助epoll異步實現,而synchronous一個連接一個線程模型網絡IO處理是同步讀寫操做。Mongodb網絡線程模型具體實現及各類優缺點能夠參考:Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計

2.1.2 TransportLayerASIO類核心代碼實現

     TransportLayerASIO類核心成員及接口以下:

1.class TransportLayerASIO final : public TransportLayer {  
2.    //如下四個接口主要和套接字數據讀寫相關  
3.    Ticket sourceMessage(...);  
4.    Ticket sinkMessage(...);  
5.    Status wait(Ticket&& ticket);  
6.    void asyncWait(Ticket&& ticket, TicketCallback callback);  
7.    void end(const SessionHandle& session);  
8.    //新連接處理  
9.    void _acceptConnection(GenericAcceptor& acceptor);  
10.      
11.    //adaptive線程模型網絡IO上下文處理  
12.    std::shared_ptr<asio::io_context> _workerIOContext;   
13.    //accept接收客戶端連接對應的IO上下文  
14.    std::unique_ptr<asio::io_context> _acceptorIOContext;    
15.    //bindIp配置中的ip地址列表,用於bind監聽,accept客戶端請求  
16.    std::vector<std::pair<SockAddr, GenericAcceptor>> _acceptors;  
17.    //listener線程負責接收客戶端新連接  
18.    stdx::thread _listenerThread;  
19.    //服務類型,也就是本實例是mongos仍是mongod  
20.    //mongos對應ServiceEntryPointMongod,mongod對應ServiceEntryPointMongos  
21.    ServiceEntryPoint* const _sep = nullptr;  
22.    //當前運行狀態  
23.    AtomicWord<bool> _running{false};  
24.    //listener處理相關的配置信息  
25.    Options _listenerOptions;  
26.}  

       從上面的類結構能夠看出,該類主要經過listenerThread線程完成bind綁定及listen監聽操做,同時部分接口實現新鏈接上的數據讀寫。套接字初始化代碼實現以下:

1.Status TransportLayerASIO::setup() {  
2.    std::vector<std::string> listenAddrs;  
3.    //若是沒有配置bindIp,則默認監聽"127.0.0.1:27017"
4.    if (_listenerOptions.ipList.empty()) {  
5.        listenAddrs = {"127.0.0.1"};  
6.    } else {  
7.        //配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗號分隔符獲取ip列表存入ipList  
8.        boost::split(listenAddrs, _listenerOptions.ipList, boost::is_any_of(","), boost::token_compress_on);  
9.    }  
10.    //遍歷ip地址列表  
11.    for (auto& ip : listenAddrs) {  
12.        //根據IP和端口構造對應SockAddr結構  
13.        const auto addrs = SockAddr::createAll(  
14.            ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET);  
15.        ......  
16.        //根據addr構造endpoint  
17.        asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize);  
18.        //_acceptorIOContext和_acceptors關聯  
19.        GenericAcceptor acceptor(*_acceptorIOContext);  
20.        //epoll註冊,也就是fd和epoll關聯  
21.        //basic_socket_acceptor::open  
22.        acceptor.open(endpoint.protocol());   
23.         //SO_REUSEADDR配置 basic_socket_acceptor::set_option  
24.        acceptor.set_option(GenericAcceptor::reuse_address(true));  
25.        //非阻塞設置 basic_socket_acceptor::non_blocking  
26.        acceptor.non_blocking(true, ec);    
27.        //bind綁定    
28.        acceptor.bind(endpoint, ec);   
29.        if (ec) {  
30.            return errorCodeToStatus(ec);  
31.        }  
32.    }  
33.}  

         從上面的分析能夠看出,代碼實現首先解析出配置文件中bindIP中的ip:port列表,而後遍歷列表綁定全部服務端須要監聽的ip:port,每一個ip:port對應一個GenericAcceptor ,全部acceptor和全局accept IO上下文_acceptorIOContext關聯,同時bind()綁定全部ip:port。

        Bind()綁定全部配置文件中的Ip:port後,而後經過TransportLayerASIO::start()完成後續處理,該接口代碼實現以下:

1.//_initAndListen中調用執行   
2.Status TransportLayerASIO::start() { //listen線程處理  
3.    ......  
4.    //這裏專門起一個線程作listen相關的accept事件處理  
5.    _listenerThread = stdx::thread([this] {  
6.        //修改線程名  
7.        setThreadName("listener");   
8.        //該函數中循環處理accept事件  
9.        while (_running.load()) {  
10.            asio::io_context::work work(*_acceptorIOContext);   
11.            try {  
12.                //accept事件調度處理  
13.                 _acceptorIOContext->run();    
14.            } catch (...) { //異常處理  
15.                severe() << "Uncaught exception in the listener: " << exceptionToStatus();  
16.                fassertFailed(40491);  
17.            }  
18.        }  
19.    });   
20.   遍歷_acceptors,進行listen監聽處理  
21.   for (auto& acceptor : _acceptors) {   
22.        acceptor.second.listen(serverGlobalParams.listenBacklog);  
23.        //異步accept回調註冊在該函數中  
24.        _acceptConnection(acceptor.second);       
25.    }  
26.}

        從上面的TransportLayerASIO::start()接口能夠看出,mongodb特意建立了一個listener線程用於客戶端accept事件處理,而後藉助ASIO網絡庫的_acceptorIOContext->run()接口來調度,當有新連接到來的時候,就會執行相應的accept回調處理,accept回調註冊到io_context的流程由acceptConnection()完成,該接口核心源碼實現以下:

1.//accept新鏈接到來的回調註冊 
2.void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {  
3.      //新連接到來時候的回調函數,服務端接收到新鏈接都會執行該回調
4.    //注意這裏面是遞歸執行,保證全部accept事件都會一次處理完畢
5.    auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {  
6.        if (!_running.load())  
7.            return;  
8.  
9.        ......  
10.        //每一個新的連接都會new一個新的ASIOSession  
11.        std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket)));  
12.        //新的連接處理ServiceEntryPointImpl::startSession,  
13.        //和ServiceEntryPointImpl服務入口點模塊關聯起來  
14.        _sep->startSession(std::move(session));  
15.        //遞歸,直處處理完全部的網絡accept事件  
16.        _acceptConnection(acceptor);   
17.    };  
18.    //accept新鏈接到來後服務端的回調處理在這裏註冊  
19.    acceptor.async_accept(*_workerIOContext, std::move(acceptCb));  
20.}  

       TransportLayerASIO::_acceptConnection的新鏈接處理過程藉助ASIO庫實現,經過acceptor.async_accept實現全部監聽的acceptor回調異步註冊。

      當服務端接收到客戶端新鏈接事件通知後,會觸發執行acceptCb()回調,該回調中底層ASIO庫經過epoll_wait獲取到全部的accept事件,每獲取到一個accept事件就表明一個新的客戶端連接,而後調用ServiceEntryPointImpl::startSession()接口處理這個新的連接事件,整個過程遞歸執行,保證一次能夠處理全部的客戶端accept請求信息。

    每一個連接都會構造一個惟一的session信息,該session就表明一個惟一的新鏈接,連接和session一一對應。此外,最終會調用ServiceEntryPointImpl::startSession()進行真正的accept()處理,從而獲取到一個新的連接。

         注意TransportLayerASIO::_acceptConnection()中實現了TransportLayerASIO類和ServiceEntryPointImpl類的關聯,這兩個類在該接口實現了關聯。

       此外,從前面的TransportLayerASIO類結構中能夠看出,該類還包含以下四個接口:sourceMessage(...)、sinkMessage(...)、wait(Ticket&& ticket)、asyncWait(Ticket&& ticket, TicketCallback callback),這四個接口入參都和Ticket數據分發子模塊相關聯,具體核心代碼實現以下:

1.//根據asioSession, expiration, message三個信息構造數據接收類ASIOSourceTicket  
2.Ticket TransportLayerASIO::sourceMessage(...) {  
3.    ......  
4.    auto asioSession = checked_pointer_cast<ASIOSession>(session);  
5.    //根據asioSession, expiration, message三個信息構造ASIOSourceTicket  
6.    auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);  
7.    return {this, std::move(ticket)};  
8.}  
9.  
10.//根據asioSession, expiration, message三個信息構造數據發送類ASIOSinkTicket  
11.Ticket TransportLayerASIO::sinkMessage(...) {  
12.    auto asioSession = checked_pointer_cast<ASIOSession>(session);  
13.    auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);  
14.    return {this, std::move(ticket)};  
15.}  
16.  
17.//同步接收或者發送,最終調用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
18.Status TransportLayerASIO::wait(Ticket&& ticket) {  
19.    //獲取對應Ticket,接收對應ASIOSourceTicket,發送對應ASIOSinkTicket  
20.    auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));  
21.    auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
22.    ......  
23.    //調用對應fill接口 同步接收ASIOSourceTicket::fill 或者 同步發送ASIOSinkTicket::fill  
24.    asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });  
25.    return waitStatus;  
26.}  
27.//異步接收或者發送,最終調用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
28.void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {  
29.    //獲取對應數據收發的Ticket,接收對應ASIOSourceTicket,發送對應ASIOSinkTicket  
30.    auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));  
31.    auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
32.  
33.   //調用對應ASIOTicket::fill  
34.    asioTicket->fill(  
35.        false,   [ callback = std::move(callback),  
36.        ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });  
}

上面四個接口中的前兩個接口主要經過Session, expiration, message這三個參數來獲取對應的Ticket 信息,實際上mongodb內核實現中把接收數據的Ticket和發送數據的Ticket分別用不一樣的繼承類ASIOSourceTicket和ASIOSinkTicket來區分,三個參數的做用以下表所示:

參數名

做用

Session

表明一個連接,一個session和一個連接意義對應

expiration

數據收發超時相關設置

message

數據內容

數據收發包括同步收發和異步收發,同步收發經過TransportLayerASIO::wait()實現,異步收發經過TransportLayerASIO::asyncWait()實現。

注意:以上四個接口把TransportLayerASIO類和Ticket 數據收發類的關聯。    

2.2 總結

        service_entry_point服務入口點子模塊主要負責以下功能:新鏈接處理、Session會話管理、接收到一個完整報文後的回調處理(含報文解析、認證、引擎層處理等)。

類命

接口名

功能說明

transport_layer_manager

TransportLayerManager::createWithConfig()

根據配置文件選擇不一樣的TransportLayer和serviceExecutor

TransportLayerManager::setup()

已廢棄

TransportLayerManager::start()

已廢棄

TransportLayerASIO

TransportLayerASIO::Options::Options()

Net相關配置

TransportLayerASIO::TransportLayerASIO()

初始化構造

TransportLayerASIO::sourceMessage()

獲取數據接收的Ticket

TransportLayerASIO::sinkMessage()

獲取數據發送的Ticket

TransportLayerASIO::wait()

同步發送接收或者發送數據

TransportLayerASIO::asyncWait()

異步方式發送接收或者發送數據

TransportLayerASIO::end()

關閉連接

TransportLayerASIO::setup()

建立套接字並bind綁定

TransportLayerASIO::start()

建立listener線程作監聽操做,同時註冊accept回調

TransportLayerASIO::shutdown()

shutdown處理及資源回收

TransportLayerASIO::_acceptConnection()

Accept回調註冊

       Transport_layer_manager中初始化TransportLayer和serviceExecutor,net.TransportLayer配置能夠爲legacy和asio,其中legacy已經淘汰,當前內核只支持asio模式。asio配置對應的TransportLayer由TransportLayerASIO實現,對應的serviceExecutor線程模型能夠是adaptive動態線程模型,也能夠是synchronous同步線程模型。

       套接字建立、bind()綁定、listen()監聽、accept事件註冊等都由本類實現,同時數據分發Ticket模塊也與本模塊關聯,一塊兒配合完成整個後續Ticket模塊模塊的同步及異步數據讀寫流程。此外,本模塊還經過ServiceEntryPoint服務入口子模塊聯動,保證了套接字初始化、accept事件註冊完成後,服務入口子模塊能有序的進行新鏈接接收處理。

        接下來繼續分析本模塊相關聯的ServiceEntryPoint服務入口子模塊和Ticket數據分發子模塊實現。

3. service_entry_point服務入口點子模塊

        service_entry_point服務入口點子模塊主要負責以下功能:新鏈接處理、Session會話管理、接收到一個完整報文後的回調處理(含報文解析、認證、引擎層處理等)。

       該模塊的源碼實現主要包含如下幾個文件:

     service_entry_point開頭的代碼文件都和本模塊相關,其中service_entry_point_utils*負責工做線程建立,service_entry_point_impl*完成新連接回調處理及sesseion會話管理。

3.1 核心源碼實現

      服務入口子模塊相關代碼實現比較簡潔,主要由ServiceEntryPointImpl類和service_entry_point_utils中的線程建立函數組成。

3.1.1 ServiceEntryPointImpl類核心代碼實現

     ServiceEntryPointImpl類主要成員和接口以下:

1.class ServiceEntryPointImpl : public ServiceEntryPoint {  
2.    MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);  
3.public:  
4.    //構造函數  
5.    explicit ServiceEntryPointImpl(ServiceContext* svcCtx);     
6.    //如下三個接口進行session會話處理控制  
7.    void startSession(transport::SessionHandle session) final;  
8.    void endAllSessions(transport::Session::TagMask tags) final;  
9.    bool shutdown(Milliseconds timeout) final;  
10.    //session會話統計  
11.    Stats sessionStats() const final;  
12.    ......  
13.private:  
14.    //該list結構管理全部的ServiceStateMachine信息  
15.    using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>;  
16.    //SSMList對應的迭代器  
17.    using SSMListIterator = SSMList::iterator;  
18.    //賦值ServiceEntryPointImpl::ServiceEntryPointImpl  
19.    //對應ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)類  
20.    ServiceContext* const _svcCtx;   
21.    //該成員變量在代碼中沒有使用  
22.    AtomicWord<std::size_t> _nWorkers;  
23.    //鎖  
24.    mutable stdx::mutex _sessionsMutex;  
25.    //一個新連接對應一個ssm保存到ServiceEntryPointImpl._sessions中  
26.    SSMList _sessions;  
27.    //最大連接數控制  
28.    size_t _maxNumConnections{DEFAULT_MAX_CONN};  
29.    //當前的總連接數,不包括關閉的連接  
30.    AtomicWord<size_t> _currentConnections{0};  
31.    //全部的連接,包括已經關閉的連接  
32.    AtomicWord<size_t> _createdConnections{0};  
33.};  

       該類的幾個接口主要是session相關控制處理,該類中的變量成員說明以下:

成員名

功能說明

_svcCtx

服務上下文,mongod實例對應ServiceContextMongoD類,mongos代理實例對應ServiceContextNoop類

_sessionsMutex

_sessions鎖保護

_sessions

一個新連接對應一個ssm保存到ServiceEntryPointImpl._sessions中

_maxNumConnections

最大連接數,默認1000000,能夠經過maxConns配置

_currentConnections

當前的在線連接數,不包括之前關閉的連接

_createdConnections

全部的連接,包括已經關閉的連接

       ServiceEntryPointImpl類最核心的startSession()接口負責每一個新鏈接到來後的內部回調處理,具體實現以下:

1.//新連接到來後的回調處理  
2.void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {   
3.    //獲取該新鏈接對應的服務端和客戶端地址信息  
4.    const auto& remoteAddr = session->remote().sockAddr();  
5.    const auto& localAddr = session->local().sockAddr();  
6.    //服務端和客戶端地址記錄到session中  
7.    auto restrictionEnvironment =  stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr);  
8.    RestrictionEnvironment::set(session, std::move(restrictionEnvironment));  
9.    ......  
10.  
11.    //獲取transportMode,kAsynchronous或者kSynchronous  
12.    auto transportMode = _svcCtx->getServiceExecutor()->transportMode();  
13.    //構造ssm  
14.    auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);  
15.    {//該{}體內實現連接計數,同時把ssm統一添加到_sessions列表管理  
16.        stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);  
17.        connectionCount = _sessions.size() + 1; //鏈接數自增  
18.        if (connectionCount <= _maxNumConnections) {  
19.            //新來的連接對應的session保存到_sessions鏈表    
20.            //一個新連接對應一個ssm保存到ServiceEntryPointImpl._sessions中  
21.            ssmIt = _sessions.emplace(_sessions.begin(), ssm);  
22.            _currentConnections.store(connectionCount);  
23.            _createdConnections.addAndFetch(1);  
24.        }  
25.    }  
26.    //連接超限,直接退出  
27.    if (connectionCount > _maxNumConnections) {   
28.        ......  
29.        return;  
30.    }  
31.    //連接關閉的回收處理  
32.    ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] {  
33.         ......  
34.    });  
35.    //獲取transport模式爲同步模式仍是異步模式,也就是adaptive線程模式仍是synchronous線程模式  
36.    auto ownership = ServiceStateMachine::Ownership::kOwned;  
37.    if (transportMode == transport::Mode::kSynchronous) {  
38.        ownership = ServiceStateMachine::Ownership::kStatic;  
39.    }  
40.    //ServiceStateMachine::start,這裏和服務狀態機模塊銜接起來  
41.    ssm->start(ownership);  
42.}  

     該接口拿到該連接對應的服務端和客戶端地址後,記錄到該連接對應session中,而後根據該session、transportMode、_svcCtx構建一個服務狀態機ssm(ServiceStateMachine)。一個新連接對應一個惟一session,一個session對應一個惟一的服務狀態機ssm,這三者保持惟一的一對一關係。

     最終,startSession()讓服務入口子模塊、session會話子模塊、ssm狀態機子模塊關聯起來。   

3.1.2  service_entry_point_utils核心代碼實現

        service_entry_point_utils源碼文件只有launchServiceWorkerThread一個接口,該接口主要負責工做線程建立,並設置每一個工做線程的線程棧大小,若是系統默認棧大於1M,則每一個工做線程的線程棧大小設置爲1M,若是系統棧大小小於1M,則以系統堆棧大小爲準,同時warning打印提示。該函數實現以下:

1.Status launchServiceWorkerThread(stdx::function<void()> task) {  
2.        static const size_t kStackSize = 1024 * 1024;  //1M  
3.        struct rlimit limits;  
4.        //或者系統堆棧大小  
5.        invariant(getrlimit(RLIMIT_STACK, &limits) == 0);  
6.        //若是系統堆棧大小大於1M,則默認設置線程棧大小爲1M  
7.        if (limits.rlim_cur > kStackSize) {  
8.            size_t stackSizeToSet = kStackSize;  
9.            int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet);  
10.            if (failed) {  
11.                const auto ewd = errnoWithDescription(failed);  
12.                warning() << "pthread_attr_setstacksize failed: " << ewd;  
13.            }  
14.        } else if (limits.rlim_cur < 1024 * 1024) {  
15.            //若是系統棧大小小於1M,則已係統堆棧爲準,同時給出告警  
16.            warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB";  
17.        }}  
18.        ......  
19.        //task參數傳遞給新建線程  
20.        auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task));  
21.        int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());   
22.        ......  
23.} 

3.2 總結

        service_entry_point服務入口點子模塊主要負責新鏈接後的回調處理及工做線程建立,該模塊和後續的session會話模塊、SSM服務狀態機模塊銜接配合,完成數據收發的正常邏輯轉換處理。上面的分析只列出了服務入口點子模塊的核心接口實現,下表總結該模塊全部的接口功能:

接口

功能說明

ServiceEntryPointImpl

ServiceEntryPointImpl::ServiceEntryPointImpl()

構造初始化,最大連接數限制

ServiceEntryPointImpl

ServiceEntryPointImpl::startSession()

新連接的回調處理

ServiceEntryPointImpl

ServiceEntryPointImpl::endAllSessions()

Ssm服務狀態機及session會話回收處理

ServiceEntryPointImpl

ServiceEntryPointImpl::shutdown()

實例下線處理

ServiceEntryPointImpl

ServiceEntryPointImpl::sessionStats()

獲取連接統計信息

service_entry_point_utils

launchServiceWorkerThread

建立工做線程,同時限制每一個線程對應線程棧大小

3. Ticket數據收發子模塊

         Ticket數據收發子模塊主要功能以下:調用session子模塊進行底層asio庫處理、拆分數據接收和數據發送到兩個類、完整mongodb報文讀取 、接收或者發送mongodb報文後的回調處理。

3.1 ASIOTicket類核心代碼實現

        Ticket數據收發模塊相關實現主要由ASIOTicket類完成,該類結構以下:

 

1.//下面的ASIOSinkTicket和ASIOSourceTicket繼承該類,用於控制數據的發送和接收  
2.class TransportLayerASIO::ASIOTicket : public TicketImpl {  
3.public:  
4.    //初始化構造  
5.    explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration);  
6.    //獲取sessionId  
7.    SessionId sessionId() const final {  
8.        return _sessionId;  
9.    }  
10.    //asio模式沒用,針對legacy模型  
11.    Date_t expiration() const final {  
12.        return _expiration;  
13.    }  
14.
15.    //如下四個接口用於數據收發相關處理  
16.    void fill(bool sync, TicketCallback&& cb);  
17.protected:  
18.    void finishFill(Status status);  
19.    bool isSync() const;  
20.    virtual void fillImpl() = 0;  
21.private:  
22.    //會話信息,一個連接一個session  
23.    std::weak_ptr<ASIOSession> _session;  
24.    //每一個session有一個惟一id  
25.    const SessionId _sessionId;  
26.    //asio模型沒用,針對legacy生效  
27.    const Date_t _expiration;  
28.    //數據發送或者接收成功後的回調處理  
29.    TicketCallback _fillCallback;  
30.    //同步方式仍是異步方式進行數據處理,默認異步  
31.    bool _fillSync;  
32.};  

           該類保護多個成員變量,這些成員變量功能說明以下:

成員名

做用

_session

Session會話信息,一個連接對應一個session

_sessionId

每一個session都有一個對應的惟一ID

_expiration

Legacy模式使用,當前都是用asio,該成員已淘汰

_fillCallback

發送或者接收一個完整mongodb報文後的回調處理

_fillSync

同步仍是異步方式收發數據。adaptive線程模型爲異步,synchronous線程模型爲同步讀寫方式

        mongodb在具體實現上,數據接收和數據發送分開實現,分別是數據接收類ASIOSourceTicket和數據發送類ASIOSinkTicket,這兩個類都繼承自ASIOTicket類,這兩個類的主要結構以下:

1.//數據接收的ticket  
2.class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket {  
3.public:  
4.    //初始化構造  
5.    ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg);  
6.protected:  
7.    //數據接收Impl  
8.    void fillImpl() final;  
9.private:  
10.    //接收到mongodb頭部數據後的回調處理  
11.    void _headerCallback(const std::error_code& ec, size_t size);  
12.    //接收到mongodb包體數據後的回調處理    
13.    void _bodyCallback(const std::error_code& ec, size_t size);  
14.  
15.    //存儲數據的buffer,網絡IO讀取到的原始數據內容  
16.    SharedBuffer _buffer;  
17.    //數據Message管理,數據來源爲_buffer  
18.    Message* _target;  
19.};  
1.
2.  
20.//數據發送的ticket  
21.class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket {  
22. public:  
23.    //初始化構造  
24.    ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg);  
25.protected:  
26.    //數據發送Impl  
27.    void fillImpl() final;  
28.private:  
29.    //發送數據完成的回調處理  
30.    void _sinkCallback(const std::error_code& ec, size_t size);  
31.    //須要發送的數據message信息  
32.    Message _msgToSend;  
33.}; 

       從上面的代碼實現能夠看出,ASIOSinkTicket ASIOSourceTicket 類接口及成員實現幾乎意義,只是具體的實現方法不一樣,下面對ASIOSourceTicket和ASIOSinkTicket 相關核心代碼實現進行分析。

3.1.2 ASIOSourceTicket 數據接收核心代碼實現

      數據接收過程核心代碼以下:

1.//數據接收的fillImpl接口實現  
2.void TransportLayerASIO::ASIOSourceTicket::fillImpl() {    
3.    //獲取對應session信息  
4.    auto session = getSession();  
5.    if (!session)  
6.        return;  
7.    //收到讀取mongodb頭部數據,頭部數據長度是固定的kHeaderSize字節  
8.    const auto initBufSize = kHeaderSize;  
9.    _buffer = SharedBuffer::allocate(initBufSize);  
10.  
11.    //調用TransportLayerASIO::ASIOSession::read讀取底層數據存入_buffer  
12.    //讀完頭部數據後執行對應的_headerCallback回調函數  
13.    session->read(isSync(),  
14.                  asio::buffer(_buffer.get(), initBufSize), //先讀取頭部字段出來  
15.                  [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); });  
16.}  
17.  
18.//讀取到mongodb header頭部信息後的回調處理  
19.void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) {  
20.    ......  
21.    //獲取session信息  
22.    auto session = getSession();  
23.    if (!session)  
24.        return;  
25.    //從_buffer中獲取頭部信息  
26.    MSGHEADER::View headerView(_buffer.get());  
27.    //獲取message長度  
28.    auto msgLen = static_cast<size_t>(headerView.getMessageLength());  
29.    //長度過小或者太大,直接報錯  
30.    if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {  
31.        .......  
32.        return;  
33.    }  
34.    ....  
35.   //內容還不夠一個mongo協議報文,繼續讀取body長度字節的數據,讀取完畢後開始body處理  
36.   //注意這裏是realloc,保證頭部和body在同一個buffer中  
37.    _buffer.realloc(msgLen);   
38.    MsgData::View msgView(_buffer.get());  
39.  
40.    //調用底層TransportLayerASIO::ASIOSession::read讀取數據body   
41.    session->read(isSync(),  
42.      //數據讀取到該buffer                  
43.      asio::buffer(msgView.data(), msgView.dataLen()),  
44.      //讀取成功後的回調處理  
45.      [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); });  
46.}  
47.  
48.//_headerCallback對header讀取後解析header頭部獲取到對應的msg長度,而後開始body部分處理  
49.void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) {  
50.    ......  
51.    //buffer轉存到_target中  
52.    _target->setData(std::move(_buffer));  
53.    //流量統計  
54.    networkCounter.hitPhysicalIn(_target->size());  
55.    //TransportLayerASIO::ASIOTicket::finishFill    
56.    finishFill(Status::OK()); //包體內容讀完後,開始下一階段的處理    
57.    //報文讀取完後的下一階段就是報文內容處理,開始走ServiceStateMachine::_processMessage  
58.}  

        Mongodb協議由msg header + msg body組成,一個完整的mongodb報文內容格式以下:

        上圖所示各個字段及body內容部分功能說明以下表:

Header or body

字段名

功能說明

msg header

messageLength

整個message長度,包括header長度和body長度

msg header

requestID

該請求id信息

msg header

responseTo

應答id

msg header

opCode

操做類型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE等

msg body

Body

不一樣opCode對應的包體內容

         ASIOSourceTicket類的幾個核心接口都是圍繞這一原則展開,整個mongodb數據接收流程以下:

  1. 讀取mongodb頭部header數據,解析出header中的messageLength字段。
  2. 檢查messageLength字段是否在指定的合理範圍,該字段不能小於Header整個頭部大小,也不能超過MaxMessageSizeBytes最大長度。
  3. Header len檢查經過,說明讀取header數據完成,因而執行_headerCallback回調。
  4. realloc更多的空間來存儲body內容。
  5. 繼續讀取body len長度數據,讀取body完成後,執行_bodyCallback回調處理。

3.1.3 ASIOSinkTicket數據發送類核心代碼實現

     ASIOSinkTicket發送類相比接收類,沒有數據解析相關的流程,所以實現過程會更加簡單,具體源碼實現以下:

1.//發送數據成功後的回調處理  
2.void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) {  
3.    //發送的網絡字節數統計  
4.    networkCounter.hitPhysicalOut(_msgToSend.size());   
5.    //執行SSM中對應的狀態流程  
6.    finishFill(ec ? errorCodeToStatus(ec) : Status::OK());  
7.}  
8.  
9.//發送數據的fillImpl  
10.void TransportLayerASIO::ASIOSinkTicket::fillImpl() {  
11.    //獲取對應session  
12.    auto session = getSession();  
13.    if (!session)  
14.        return;  
15.  
16.    //調用底層TransportLayerASIO::ASIOSession::write發送數據,發送成功後執行_sinkCallback回調   
17.    session->write(isSync(),  
18.       asio::buffer(_msgToSend.buf(), _msgToSend.size()),  
19.       //發送數據成功後的callback回調  
20.       [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); });  
21.}  

3.2 總結

        從上面的分析能夠看出,Ticket數據收發模塊主要調用session會話模塊來進行底層數據的讀寫、解析,當讀取或者發送一個完整的mongodb報文後最終交由SSM服務狀態機模塊調度處理。

        ticket模塊主要接口功能總結以下表所示:

類命

接口名

功能說明

ASIOTicket

ASIOTicket::getSession()

獲取session信息

ASIOTicket

ASIOTicket::isSync()

判斷是同步收發仍是異步收發

ASIOTicket

ASIOTicket::finishFill()

執行_fillCallback回調

ASIOTicket

ASIOTicket::fill()

給_fillCallback賦值

ASIOTicket

ASIOTicket::ASIOTicket()

ASIOTicket構造初始化

ASIOSourceTicket

ASIOSourceTicket::ASIOSourceTicket()

ASIOSourceTicket初始化

ASIOSourceTicket

ASIOSourceTicket::_bodyCallback()

接收到mesg header+body後的回調處理

ASIOSourceTicket

ASIOSourceTicket::_headerCallback

接收到msg header後的回調處理

ASIOSinkTicket

ASIOSinkTicket::ASIOSinkTicket()

ASIOSinkTicket初始化構造

ASIOSinkTicket

ASIOSourceTicket::fillImpl()

發送指定msg數據,發送完成後執行回調

ASIOSinkTicket

ASIOSinkTicket::_sinkCallback

發送msg成功後的回調處理

       前面的分析也能夠看出,Ticket數據收發模塊會調用session處理模塊來進行真正的數據讀寫,同時接收或者發送完一個完整mongodb報文後的後續回調處理講交由SSM服務狀態機模塊處理。

4. Session會話子模塊

        Session會話模塊功能主要以下:負責記錄HostAndPort、和底層asio庫直接互動,實現數據的同步或者異步收發。一個新鏈接fd對應一個惟一的session,對fd的操做直接映射爲session操做。Session會話子模塊主要代碼實現相關文件以下:

4.1 session會話子模塊核心代碼實現

1.class TransportLayerASIO::ASIOSession : public Session {  
2.    //初始化構造  
3.    ASIOSession(TransportLayerASIO* tl, GenericSocket socket);  
4.    //獲取本session使用的tl  
5.    TransportLayer* getTransportLayer();  
6.    //如下四個接口套接字相關,本端/對端地址獲取,獲取fd,關閉fd等  
7.    const HostAndPort& remote();  
8.    const HostAndPort& local();  
9.    GenericSocket& getSocket();  
10.    void shutdown();  
11.  
12.    //如下四個接口調用asio網絡庫實現數據的同步收發和異步收發  
13.    void read(...)  
14.    void write(...)  
15.    void opportunisticRead(...)  
16.    void opportunisticWrite(...)  
17.  
18.    //遠端地址信息  
19.    HostAndPort _remote;  
20.    //本段地址信息  
21.    HostAndPort _local;  
22.    //賦值見TransportLayerASIO::_acceptConnection  
23.    //也就是fd,一個新鏈接對應一個_socket  
24.    GenericSocket _socket;  
25.    //SSL相關不作分析,  
26.#ifdef MONGO_CONFIG_SSL  
27.    boost::optional<asio::ssl::stream<decltype(_socket)>> _sslSocket;  
28.    bool _ranHandshake = false;  
29.#endif  
30.  
31.    //本套接字對應的tl,賦值建TransportLayerASIO::_acceptConnection(...)  
32.    TransportLayerASIO* const _tl;  
33.} 

       該類最核心的三個接口ASIOSession(...)、opportunisticRead(...)、opportunisticWrite(..)分別完成套接字處理、調用asio庫接口實現底層數據讀和底層數據寫。這三個核心接口源碼實現以下:

1.//初始化構造 TransportLayerASIO::_acceptConnection調用  
2.ASIOSession(TransportLayerASIO* tl, GenericSocket socket)  
3.    //fd描述符及TL初始化賦值  
4.    : _socket(std::move(socket)), _tl(tl) {  
5.    std::error_code ec;  
6.  
7.    //異步方式設置爲非阻塞讀  
8.    _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec);  
9.    fassert(40490, ec.value() == 0);  
10.  
11.    //獲取套接字的family  
12.    auto family = endpointToSockAddr(_socket.local_endpoint()).getType();  
13.    //知足AF_INET
14.    if (family == AF_INET || family == AF_INET6) {  
15.        //no_delay keep_alive套接字系統參數設置  
16.        _socket.set_option(asio::ip::tcp::no_delay(true));  
17.        _socket.set_option(asio::socket_base::keep_alive(true));  
18.        //KeepAlive系統參數設置  
19.        setSocketKeepAliveParams(_socket.native_handle());  
20.    }  
21.  
22.    //獲取本端和對端地址  
23.    _local = endpointToHostAndPort(_socket.local_endpoint());  
24.    _remote = endpointToHostAndPort(_socket.remote_endpoint(ec));  
25.    if (ec) {  
26.        LOG(3) << "Unable to get remote endpoint address: " << ec.message();  
27.    }  
28.}  

        該類初始化的時候完成新鏈接_socket相關的初始化設置,包括阻塞讀寫仍是非阻塞讀寫。若是是同步線程模型(一個連接一個線程),則讀寫方式位阻塞讀寫;若是是異步線程模型(adaptive動態線程模型),則調用asio網絡庫接口實現異步讀寫。

       此外,該連接_socket對應的客戶端ip:port和服務端ip:port也在該初始化類中獲取,最終保存到本session的_remote和_local成員中。

       數據讀取核心代碼實現以下:

1.//讀取指定長度數據,而後執行handler回調  
2.void opportunisticRead(...) {  
3.    std::error_code ec;  
4.    //若是是異步線程模型,在ASIOSession構造初始化的時候會設置non_blocking非阻塞模式  
5.    //異步線程模型這裏其實是非阻塞讀取,若是是同步線程模型,則沒有non_blocking設置,也就是阻塞讀取  
6.    auto size = asio::read(stream, buffers, ec);    
7.  
8.    //若是是異步讀,而且read返回would_block或者try_again說明指定長度的數據尚未讀取完畢  
9.    if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {  
10.        //buffers有大小size,實際讀最多讀size字節  
11.        MutableBufferSequence asyncBuffers(buffers);  
12.        if (size > 0) {  
13.            asyncBuffers += size; //buffer offset向後移動  
14.        }  
15.  
16.        //繼續異步方式讀取數據,讀取到指定長度數據後執行handler回調處理  
17.        asio::async_read(stream, asyncBuffers, std::forward<CompleteHandler>(handler));  
18.    } else {   
19.        //阻塞方式讀取read返回後能夠保證讀取到了size字節長度的數據  
20.        //直接read獲取到size字節數據,則直接執行handler   
21.        handler(ec, size);  
22.    }  
23.}  

        opportunisticRead首先調用asio::read(stream, buffers, ec)讀取buffers對應size長度的數據,buffers空間大小就是須要讀取的數據size大小。若是是同步線程模型,則這裏爲阻塞式讀取,直到讀到size字節纔會返回;若是是異步線程模型,這這裏是非阻塞讀取,非阻塞讀取當內核網絡協議棧數據讀取完畢後,若是尚未讀到size字節,則繼續進行async_read異步讀取。

        當buffers指定的size字節所有讀取完整後,不論是同步模式仍是異步模式,最終都會執行handler回調,開始後續的數據解析及處理流程。

       發送數據核心代碼實現以下:

1.//發送數據  
2.void opportunisticWrite(...) {  
3.    std::error_code ec;  
4.    //若是是同步模式,則阻塞寫,直到所有寫成功。異步模式則非阻塞寫  
5.    auto size = asio::write(stream, buffers, ec);   
6.  
7.    //異步寫若是返回try_again說明數據尚未發送完,則繼續異步寫發送  
8.    if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {  
9.        ConstBufferSequence asyncBuffers(buffers);  
10.        if (size > 0) {  //buffer中數據指針偏移計數
11.            asyncBuffers += size;  
12.        }  
13.        //異步寫發送完成,執行handler回調  
14.        asio::async_write(stream, asyncBuffers, std::forward<CompleteHandler>(handler));  
15.    } else {  
16.        //同步寫成功,則直接執行handler回調處理  
17.        handler(ec, size);  
18.    }  
19.}  

       數據發送流程和數據接收流程相似,也分位同步模式發送和異步模式發送,同步模式發送爲阻塞式寫,只有當全部數據經過asio::write()發送成功後才返回;異步模式發送爲非阻塞寫,asio::write()不必定所有發送出去,所以須要再次調用asio庫的asio::async_write()進行異步發送。

       不論是同步模式仍是異步模式發送數據,最終數據發送成功後,都會調用handler()回調來執行後續的流程。

4.2 總結

       從上面的代碼分析能夠看出,session會話模塊最終直接和asio網絡庫交互實現數據的讀寫操做。該模塊核心接口功能總結以下表:

類名

接口名

功能說明

ASIOSession

ASIOSession(...)

新鏈接fd相關處理,如是否非阻塞、delay設置、keep_alive設置、兩端地址獲取等

ASIOSession

getTransportLayer()

獲取對應TL

ASIOSession

remote()

獲取該連接對應的對端Ip:port信息

ASIOSession

local()

獲取該連接對應的對端Ip:port信息

ASIOSession

getSocket()

獲取該連接的fd

ASIOSession

shutdown()

fd回收處理

ASIOSession

opportunisticRead()

同步或者異步數據讀操做

ASIOSession

opportunisticWrite()

同步或者異步數據寫操做

5. 總結

       《Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計》一文對mongodb網絡傳輸模塊中的ASIO網絡庫實現、service_executor服務運行子模塊(即線程模型子模塊)進行了詳細的分析,加上本文的transport_layer套接字處理及傳輸層管理子模塊、session會話子模塊、Ticket數據收發子模塊、service_entry_point服務入口點子模塊。

        transport_layer套接字處理及傳輸層管理子模塊主要由transport_layer_manager和transport_layer_asio兩個核心類組成。分別完成net相關的配置文件初始化操做,套接字初始化及處理,最終transport_layer_asio的相應接口實現了和ticket數據分發子模塊、服務入口點子模塊的關聯。

      服務入口子模塊主要由ServiceEntryPointImpl類和service_entry_point_utils中的線程建立函數組成,實現新鏈接accept處理及控制。該模塊經過startSession()讓服務入口子模塊、session會話子模塊、ssm狀態機子模塊關聯起來。

       ticket數據收發子模塊主要功能以下:調用session子模塊進行底層asio庫處理、拆分數據接收和數據發送到兩個類、完整mongodb報文讀取 、接收或者發送mongodb報文後的回調處理, 回調處理由SSM服務狀態機模塊管理,當讀取或者發送一個完整的mongodb報文後最終交由SSM服務狀態機模塊調度處理。。

       Session會話模塊功能主要以下:負責記錄HostAndPort、和底層asio庫直接互動,實現數據的同步或者異步收發。一個新鏈接fd對應一個惟一的session,對fd的操做直接映射爲session操做。

      到這裏,整個mongodb網絡傳輸層模塊分析只差service_state_machine狀態機調度子模塊,狀態機調度子模塊相比本文分析的幾個子模塊更加複雜,所以將在下期《mongodb網絡傳輸層模塊源碼分析三》中單獨分析。

本文全部源碼註釋分析詳見以下連接:mongodb網絡傳輸模塊詳細源碼分析

相關文章
相關標籤/搜索