關於做者
前滴滴出行技術專家,現任OPPO文檔數據庫mongodb負責人,負責oppo千萬級峯值TPS/十萬億級數據量文檔數據庫mongodb內核研發及運維工做,一直專一於分佈式緩存、高性能服務端、數據庫、中間件等相關研發。後續持續分享《MongoDB內核源碼設計、性能優化、最佳運維實踐》,Github帳號地址:https://github.com/y123456yzgit
1. 說明
在以前的<<Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計>>一文中分析瞭如何閱讀百萬級大工程源碼、Asio網絡庫實現、transport傳輸層網絡模塊中線程模型實現,可是因爲篇幅緣由,傳輸層網絡模塊中的如下模塊實現原理沒有分析,本文降將繼續分析遺留的如下子模塊:github
- transport_layer套接字處理及傳輸層管理子模塊
- session會話子模塊
- Ticket數據收發子模塊
- service_entry_point服務入口點子模塊
- service_state_machine狀態機子模塊(該《模塊在網絡傳輸層模塊源碼實現三》中分析)
- service_executor線程模型子模塊(該《模塊在網絡傳輸層模塊源碼實現四》中分析)
2. transport_layer套接字處理及傳輸層管理子模塊mongodb
transport_layer套接字處理及傳輸層管理子模塊功能包括套接字相關初始化處理、結合asio庫實現異步accept處理、不一樣線程模型管理及初始化等,該模塊的源碼實現主要由如下幾個文件實現:數據庫
上圖是套接字處理及傳輸層管理子模塊源碼實現的相關文件,其中mock和test文件主要用於模擬測試等,因此真正核心的代碼實現只有下表的幾個文件,對應源碼文件功能說明以下表所示:數組
文件名緩存 |
功能性能優化 |
transport_layer.h網絡 transport_layer.cppsession |
該子模塊基類,經過該類把本模塊和Ticket數據分發模塊銜接起來,具體類實如今transport_layer_legacy.cpp和transport_layer_asio.cpp中運維 |
transport_layer_legacy.h transport_layer_legacy.cpp |
早期的傳輸模塊實現方式,如今已淘汰,不在分析 |
transport_layer_asio.h transport_layer_asio.cpp |
傳輸模塊Asio網絡IO處理實現,同時銜接ServiceEntryPoint服務入口模塊和Ticket數據分發模塊 |
2.1 核心代碼實現
該子模塊核心代碼主要由TransportLayerManager類和TransportLayerASIO類相關接口實現。
2.1.1 TransportLayerManager類核心代碼實現
TransportLayerManager類主要成員及接口以下:
1.//網絡會話連接,消息處理管理相關的類,在createWithConfig構造該類存入_tls 2.class TransportLayerManager final : public TransportLayer { 3. //如下四個接口真正實如今TransportLayerASIO類中具體實現 4. Ticket sourceMessage(...) override; 5. Ticket sinkMessage(...) override; 6. Status wait(Ticket&& ticket) override; 7. void asyncWait(...) override; 8. //配置初始化實現 9. std::unique_ptr<TransportLayer> createWithConfig(...); 10. 11. //createWithConfig中賦值,對應TransportLayerASIO, 12. //實際上容器中就一個成員,就是TransportLayerASIO 13. std::vector<std::unique_ptr<TransportLayer>> _tls; 14.};
TransportLayerManager類包含一個_tls成員,該類最核心的createWithConfig接口代碼實現以下:
15.//根據配置構造相應類信息 _initAndListen中調用 16.std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(...) { 17. std::unique_ptr<TransportLayer> transportLayer; 18. //服務類型,也就是本實例是mongos仍是mongod 19. //mongos對應ServiceEntryPointMongod,mongod對應ServiceEntryPointMongos 20. auto sep = ctx->getServiceEntryPoint(); 21. //net.transportLayer配置模式,默認asio, legacy模式已淘汰 22. if (config->transportLayer == "asio") { 23. //同步方式仍是異步方式,默認synchronous 24. if (config->serviceExecutor == "adaptive") { 25. //動態線程池模型,也就是異步模式 26. opts.transportMode = transport::Mode::kAsynchronous; 27. } else if (config->serviceExecutor == "synchronous") { 28. //一個連接一個線程模型,也就是同步模式 29. opts.transportMode = transport::Mode::kSynchronous; 30. } 31. //若是配置是asio,構造TransportLayerASIO類 32. auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep); 33. if (config->serviceExecutor == "adaptive") { //異步方式 34. //構造動態線程模型對應的執行器ServiceExecutorAdaptive 35. ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>( 36. ctx, transportLayerASIO->getIOContext())); 37. } else if (config->serviceExecutor == "synchronous") { //同步方式 38. //構造一個連接一個線程模型對應的執行器ServiceExecutorSynchronous 39. ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx)); 40. } 41. //transportLayerASIO轉換爲transportLayer類 42. transportLayer = std::move(transportLayerASIO); 43. } 44. //transportLayer轉存到對應retVector數組中並返回 45. std::vector<std::unique_ptr<TransportLayer>> retVector; 46. retVector.emplace_back(std::move(transportLayer)); 47. return stdx::make_unique<TransportLayerManager>(std::move(retVector)); 48.}
createWithConfig函數根據配置文件來肯定對應的TransportLayer,若是net.transportLayer配置爲」asio」,則選用TransportLayerASIO類來進行底層的網絡IO處理,若是配置爲」legacy」,則選用TransportLayerLegacy。」legacy」模式當前已淘汰,本文只分析」asio」模式實現。
「asio」模式包含兩種線程模型:adaptive(動態線程模型)和synchronous(同步線程模型)。adaptive模式線程設計採用動態線程方式,線程數和mongodb壓力直接相關,若是mongodb壓力大,則線程數增長;若是mongodb壓力變小,則線程數自動減小。同步線程模式也就是一個連接一個線程模型,線程數的多少和連接數的多少成正比,連接數越多則線程數也越大。
Mongodb內核實現中經過opts.transportMode來標記asio的線程模型,這兩種模型對應標記以下:
線程模型 |
內核transportMode標記 |
說明 |
對應線程模型由那個類實現 |
adaptive |
KAsynchronous(異步) |
adaptive模型也能夠稱爲異步模型 |
ServiceExecutorAdaptive |
synchronous |
KSynchronous(同步) |
synchronous模型也能夠稱爲同步模型 |
ServiceExecutorSynchronous |
說明:
adaptive線程模型被標記爲KAsynchronous,synchronous被標記爲KSynchronous是有緣由的,adaptive動態線程模型網絡IO處理藉助epoll異步實現,而synchronous一個連接一個線程模型網絡IO處理是同步讀寫操做。Mongodb網絡線程模型具體實現及各類優缺點能夠參考:Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計
2.1.2 TransportLayerASIO類核心代碼實現
TransportLayerASIO類核心成員及接口以下:
1.class TransportLayerASIO final : public TransportLayer { 2. //如下四個接口主要和套接字數據讀寫相關 3. Ticket sourceMessage(...); 4. Ticket sinkMessage(...); 5. Status wait(Ticket&& ticket); 6. void asyncWait(Ticket&& ticket, TicketCallback callback); 7. void end(const SessionHandle& session); 8. //新連接處理 9. void _acceptConnection(GenericAcceptor& acceptor); 10. 11. //adaptive線程模型網絡IO上下文處理 12. std::shared_ptr<asio::io_context> _workerIOContext; 13. //accept接收客戶端連接對應的IO上下文 14. std::unique_ptr<asio::io_context> _acceptorIOContext; 15. //bindIp配置中的ip地址列表,用於bind監聽,accept客戶端請求 16. std::vector<std::pair<SockAddr, GenericAcceptor>> _acceptors; 17. //listener線程負責接收客戶端新連接 18. stdx::thread _listenerThread; 19. //服務類型,也就是本實例是mongos仍是mongod 20. //mongos對應ServiceEntryPointMongod,mongod對應ServiceEntryPointMongos 21. ServiceEntryPoint* const _sep = nullptr; 22. //當前運行狀態 23. AtomicWord<bool> _running{false}; 24. //listener處理相關的配置信息 25. Options _listenerOptions; 26.}
從上面的類結構能夠看出,該類主要經過listenerThread線程完成bind綁定及listen監聽操做,同時部分接口實現新鏈接上的數據讀寫。套接字初始化代碼實現以下:
1.Status TransportLayerASIO::setup() { 2. std::vector<std::string> listenAddrs; 3. //若是沒有配置bindIp,則默認監聽"127.0.0.1:27017" 4. if (_listenerOptions.ipList.empty()) { 5. listenAddrs = {"127.0.0.1"}; 6. } else { 7. //配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗號分隔符獲取ip列表存入ipList 8. boost::split(listenAddrs, _listenerOptions.ipList, boost::is_any_of(","), boost::token_compress_on); 9. } 10. //遍歷ip地址列表 11. for (auto& ip : listenAddrs) { 12. //根據IP和端口構造對應SockAddr結構 13. const auto addrs = SockAddr::createAll( 14. ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET); 15. ...... 16. //根據addr構造endpoint 17. asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize); 18. //_acceptorIOContext和_acceptors關聯 19. GenericAcceptor acceptor(*_acceptorIOContext); 20. //epoll註冊,也就是fd和epoll關聯 21. //basic_socket_acceptor::open 22. acceptor.open(endpoint.protocol()); 23. //SO_REUSEADDR配置 basic_socket_acceptor::set_option 24. acceptor.set_option(GenericAcceptor::reuse_address(true)); 25. //非阻塞設置 basic_socket_acceptor::non_blocking 26. acceptor.non_blocking(true, ec); 27. //bind綁定 28. acceptor.bind(endpoint, ec); 29. if (ec) { 30. return errorCodeToStatus(ec); 31. } 32. } 33.}
從上面的分析能夠看出,代碼實現首先解析出配置文件中bindIP中的ip:port列表,而後遍歷列表綁定全部服務端須要監聽的ip:port,每一個ip:port對應一個GenericAcceptor ,全部acceptor和全局accept IO上下文_acceptorIOContext關聯,同時bind()綁定全部ip:port。
Bind()綁定全部配置文件中的Ip:port後,而後經過TransportLayerASIO::start()完成後續處理,該接口代碼實現以下:
1.//_initAndListen中調用執行 2.Status TransportLayerASIO::start() { //listen線程處理 3. ...... 4. //這裏專門起一個線程作listen相關的accept事件處理 5. _listenerThread = stdx::thread([this] { 6. //修改線程名 7. setThreadName("listener"); 8. //該函數中循環處理accept事件 9. while (_running.load()) { 10. asio::io_context::work work(*_acceptorIOContext); 11. try { 12. //accept事件調度處理 13. _acceptorIOContext->run(); 14. } catch (...) { //異常處理 15. severe() << "Uncaught exception in the listener: " << exceptionToStatus(); 16. fassertFailed(40491); 17. } 18. } 19. }); 20. 遍歷_acceptors,進行listen監聽處理 21. for (auto& acceptor : _acceptors) { 22. acceptor.second.listen(serverGlobalParams.listenBacklog); 23. //異步accept回調註冊在該函數中 24. _acceptConnection(acceptor.second); 25. } 26.}
從上面的TransportLayerASIO::start()接口能夠看出,mongodb特意建立了一個listener線程用於客戶端accept事件處理,而後藉助ASIO網絡庫的_acceptorIOContext->run()接口來調度,當有新連接到來的時候,就會執行相應的accept回調處理,accept回調註冊到io_context的流程由acceptConnection()完成,該接口核心源碼實現以下:
1.//accept新鏈接到來的回調註冊 2.void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { 3. //新連接到來時候的回調函數,服務端接收到新鏈接都會執行該回調 4. //注意這裏面是遞歸執行,保證全部accept事件都會一次處理完畢 5. auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable { 6. if (!_running.load()) 7. return; 8. 9. ...... 10. //每一個新的連接都會new一個新的ASIOSession 11. std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket))); 12. //新的連接處理ServiceEntryPointImpl::startSession, 13. //和ServiceEntryPointImpl服務入口點模塊關聯起來 14. _sep->startSession(std::move(session)); 15. //遞歸,直處處理完全部的網絡accept事件 16. _acceptConnection(acceptor); 17. }; 18. //accept新鏈接到來後服務端的回調處理在這裏註冊 19. acceptor.async_accept(*_workerIOContext, std::move(acceptCb)); 20.}
TransportLayerASIO::_acceptConnection的新鏈接處理過程藉助ASIO庫實現,經過acceptor.async_accept實現全部監聽的acceptor回調異步註冊。
當服務端接收到客戶端新鏈接事件通知後,會觸發執行acceptCb()回調,該回調中底層ASIO庫經過epoll_wait獲取到全部的accept事件,每獲取到一個accept事件就表明一個新的客戶端連接,而後調用ServiceEntryPointImpl::startSession()接口處理這個新的連接事件,整個過程遞歸執行,保證一次能夠處理全部的客戶端accept請求信息。
每一個連接都會構造一個惟一的session信息,該session就表明一個惟一的新鏈接,連接和session一一對應。此外,最終會調用ServiceEntryPointImpl::startSession()進行真正的accept()處理,從而獲取到一個新的連接。
注意:TransportLayerASIO::_acceptConnection()中實現了TransportLayerASIO類和ServiceEntryPointImpl類的關聯,這兩個類在該接口實現了關聯。
此外,從前面的TransportLayerASIO類結構中能夠看出,該類還包含以下四個接口:sourceMessage(...)、sinkMessage(...)、wait(Ticket&& ticket)、asyncWait(Ticket&& ticket, TicketCallback callback),這四個接口入參都和Ticket數據分發子模塊相關聯,具體核心代碼實現以下:
1.//根據asioSession, expiration, message三個信息構造數據接收類ASIOSourceTicket 2.Ticket TransportLayerASIO::sourceMessage(...) { 3. ...... 4. auto asioSession = checked_pointer_cast<ASIOSession>(session); 5. //根據asioSession, expiration, message三個信息構造ASIOSourceTicket 6. auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message); 7. return {this, std::move(ticket)}; 8.} 9. 10.//根據asioSession, expiration, message三個信息構造數據發送類ASIOSinkTicket 11.Ticket TransportLayerASIO::sinkMessage(...) { 12. auto asioSession = checked_pointer_cast<ASIOSession>(session); 13. auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message); 14. return {this, std::move(ticket)}; 15.} 16. 17.//同步接收或者發送,最終調用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill 18.Status TransportLayerASIO::wait(Ticket&& ticket) { 19. //獲取對應Ticket,接收對應ASIOSourceTicket,發送對應ASIOSinkTicket 20. auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket)); 21. auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get()); 22. ...... 23. //調用對應fill接口 同步接收ASIOSourceTicket::fill 或者 同步發送ASIOSinkTicket::fill 24. asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; }); 25. return waitStatus; 26.} 27.//異步接收或者發送,最終調用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill 28.void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) { 29. //獲取對應數據收發的Ticket,接收對應ASIOSourceTicket,發送對應ASIOSinkTicket 30. auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket))); 31. auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get()); 32. 33. //調用對應ASIOTicket::fill 34. asioTicket->fill( 35. false, [ callback = std::move(callback), 36. ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); }); }
上面四個接口中的前兩個接口主要經過Session, expiration, message這三個參數來獲取對應的Ticket 信息,實際上mongodb內核實現中把接收數據的Ticket和發送數據的Ticket分別用不一樣的繼承類ASIOSourceTicket和ASIOSinkTicket來區分,三個參數的做用以下表所示:
參數名 |
做用 |
Session |
表明一個連接,一個session和一個連接意義對應 |
expiration |
數據收發超時相關設置 |
message |
數據內容 |
數據收發包括同步收發和異步收發,同步收發經過TransportLayerASIO::wait()實現,異步收發經過TransportLayerASIO::asyncWait()實現。
注意:以上四個接口把TransportLayerASIO類和Ticket 數據收發類的關聯。
2.2 總結
service_entry_point服務入口點子模塊主要負責以下功能:新鏈接處理、Session會話管理、接收到一個完整報文後的回調處理(含報文解析、認證、引擎層處理等)。
類命 |
接口名 |
功能說明 |
transport_layer_manager |
TransportLayerManager::createWithConfig() |
根據配置文件選擇不一樣的TransportLayer和serviceExecutor |
TransportLayerManager::setup() |
已廢棄 |
|
TransportLayerManager::start() |
已廢棄 |
|
TransportLayerASIO |
TransportLayerASIO::Options::Options() |
Net相關配置 |
TransportLayerASIO::TransportLayerASIO() |
初始化構造 |
|
TransportLayerASIO::sourceMessage() |
獲取數據接收的Ticket |
|
TransportLayerASIO::sinkMessage() |
獲取數據發送的Ticket |
|
TransportLayerASIO::wait() |
同步發送接收或者發送數據 |
|
TransportLayerASIO::asyncWait() |
異步方式發送接收或者發送數據 |
|
TransportLayerASIO::end() |
關閉連接 |
|
TransportLayerASIO::setup() |
建立套接字並bind綁定 |
|
TransportLayerASIO::start() |
建立listener線程作監聽操做,同時註冊accept回調 |
|
TransportLayerASIO::shutdown() |
shutdown處理及資源回收 |
|
TransportLayerASIO::_acceptConnection() |
Accept回調註冊 |
Transport_layer_manager中初始化TransportLayer和serviceExecutor,net.TransportLayer配置能夠爲legacy和asio,其中legacy已經淘汰,當前內核只支持asio模式。asio配置對應的TransportLayer由TransportLayerASIO實現,對應的serviceExecutor線程模型能夠是adaptive動態線程模型,也能夠是synchronous同步線程模型。
套接字建立、bind()綁定、listen()監聽、accept事件註冊等都由本類實現,同時數據分發Ticket模塊也與本模塊關聯,一塊兒配合完成整個後續Ticket模塊模塊的同步及異步數據讀寫流程。此外,本模塊還經過ServiceEntryPoint服務入口子模塊聯動,保證了套接字初始化、accept事件註冊完成後,服務入口子模塊能有序的進行新鏈接接收處理。
接下來繼續分析本模塊相關聯的ServiceEntryPoint服務入口子模塊和Ticket數據分發子模塊實現。
3. service_entry_point服務入口點子模塊
service_entry_point服務入口點子模塊主要負責以下功能:新鏈接處理、Session會話管理、接收到一個完整報文後的回調處理(含報文解析、認證、引擎層處理等)。
該模塊的源碼實現主要包含如下幾個文件:
service_entry_point開頭的代碼文件都和本模塊相關,其中service_entry_point_utils*負責工做線程建立,service_entry_point_impl*完成新連接回調處理及sesseion會話管理。
3.1 核心源碼實現
服務入口子模塊相關代碼實現比較簡潔,主要由ServiceEntryPointImpl類和service_entry_point_utils中的線程建立函數組成。
3.1.1 ServiceEntryPointImpl類核心代碼實現
ServiceEntryPointImpl類主要成員和接口以下:
1.class ServiceEntryPointImpl : public ServiceEntryPoint { 2. MONGO_DISALLOW_COPYING(ServiceEntryPointImpl); 3.public: 4. //構造函數 5. explicit ServiceEntryPointImpl(ServiceContext* svcCtx); 6. //如下三個接口進行session會話處理控制 7. void startSession(transport::SessionHandle session) final; 8. void endAllSessions(transport::Session::TagMask tags) final; 9. bool shutdown(Milliseconds timeout) final; 10. //session會話統計 11. Stats sessionStats() const final; 12. ...... 13.private: 14. //該list結構管理全部的ServiceStateMachine信息 15. using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>; 16. //SSMList對應的迭代器 17. using SSMListIterator = SSMList::iterator; 18. //賦值ServiceEntryPointImpl::ServiceEntryPointImpl 19. //對應ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)類 20. ServiceContext* const _svcCtx; 21. //該成員變量在代碼中沒有使用 22. AtomicWord<std::size_t> _nWorkers; 23. //鎖 24. mutable stdx::mutex _sessionsMutex; 25. //一個新連接對應一個ssm保存到ServiceEntryPointImpl._sessions中 26. SSMList _sessions; 27. //最大連接數控制 28. size_t _maxNumConnections{DEFAULT_MAX_CONN}; 29. //當前的總連接數,不包括關閉的連接 30. AtomicWord<size_t> _currentConnections{0}; 31. //全部的連接,包括已經關閉的連接 32. AtomicWord<size_t> _createdConnections{0}; 33.};
該類的幾個接口主要是session相關控制處理,該類中的變量成員說明以下:
成員名 |
功能說明 |
_svcCtx |
服務上下文,mongod實例對應ServiceContextMongoD類,mongos代理實例對應ServiceContextNoop類 |
_sessionsMutex |
_sessions鎖保護 |
_sessions |
一個新連接對應一個ssm保存到ServiceEntryPointImpl._sessions中 |
_maxNumConnections |
最大連接數,默認1000000,能夠經過maxConns配置 |
_currentConnections |
當前的在線連接數,不包括之前關閉的連接 |
_createdConnections |
全部的連接,包括已經關閉的連接 |
ServiceEntryPointImpl類最核心的startSession()接口負責每一個新鏈接到來後的內部回調處理,具體實現以下:
1.//新連接到來後的回調處理 2.void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { 3. //獲取該新鏈接對應的服務端和客戶端地址信息 4. const auto& remoteAddr = session->remote().sockAddr(); 5. const auto& localAddr = session->local().sockAddr(); 6. //服務端和客戶端地址記錄到session中 7. auto restrictionEnvironment = stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr); 8. RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); 9. ...... 10. 11. //獲取transportMode,kAsynchronous或者kSynchronous 12. auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); 13. //構造ssm 14. auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); 15. {//該{}體內實現連接計數,同時把ssm統一添加到_sessions列表管理 16. stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); 17. connectionCount = _sessions.size() + 1; //鏈接數自增 18. if (connectionCount <= _maxNumConnections) { 19. //新來的連接對應的session保存到_sessions鏈表 20. //一個新連接對應一個ssm保存到ServiceEntryPointImpl._sessions中 21. ssmIt = _sessions.emplace(_sessions.begin(), ssm); 22. _currentConnections.store(connectionCount); 23. _createdConnections.addAndFetch(1); 24. } 25. } 26. //連接超限,直接退出 27. if (connectionCount > _maxNumConnections) { 28. ...... 29. return; 30. } 31. //連接關閉的回收處理 32. ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] { 33. ...... 34. }); 35. //獲取transport模式爲同步模式仍是異步模式,也就是adaptive線程模式仍是synchronous線程模式 36. auto ownership = ServiceStateMachine::Ownership::kOwned; 37. if (transportMode == transport::Mode::kSynchronous) { 38. ownership = ServiceStateMachine::Ownership::kStatic; 39. } 40. //ServiceStateMachine::start,這裏和服務狀態機模塊銜接起來 41. ssm->start(ownership); 42.}
該接口拿到該連接對應的服務端和客戶端地址後,記錄到該連接對應session中,而後根據該session、transportMode、_svcCtx構建一個服務狀態機ssm(ServiceStateMachine)。一個新連接對應一個惟一session,一個session對應一個惟一的服務狀態機ssm,這三者保持惟一的一對一關係。
最終,startSession()讓服務入口子模塊、session會話子模塊、ssm狀態機子模塊關聯起來。
3.1.2 service_entry_point_utils核心代碼實現
service_entry_point_utils源碼文件只有launchServiceWorkerThread一個接口,該接口主要負責工做線程建立,並設置每一個工做線程的線程棧大小,若是系統默認棧大於1M,則每一個工做線程的線程棧大小設置爲1M,若是系統棧大小小於1M,則以系統堆棧大小爲準,同時warning打印提示。該函數實現以下:
1.Status launchServiceWorkerThread(stdx::function<void()> task) { 2. static const size_t kStackSize = 1024 * 1024; //1M 3. struct rlimit limits; 4. //或者系統堆棧大小 5. invariant(getrlimit(RLIMIT_STACK, &limits) == 0); 6. //若是系統堆棧大小大於1M,則默認設置線程棧大小爲1M 7. if (limits.rlim_cur > kStackSize) { 8. size_t stackSizeToSet = kStackSize; 9. int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet); 10. if (failed) { 11. const auto ewd = errnoWithDescription(failed); 12. warning() << "pthread_attr_setstacksize failed: " << ewd; 13. } 14. } else if (limits.rlim_cur < 1024 * 1024) { 15. //若是系統棧大小小於1M,則已係統堆棧爲準,同時給出告警 16. warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB"; 17. }} 18. ...... 19. //task參數傳遞給新建線程 20. auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task)); 21. int failed = pthread_create(&thread, &attrs, runFunc, ctx.get()); 22. ...... 23.}
3.2 總結
service_entry_point服務入口點子模塊主要負責新鏈接後的回調處理及工做線程建立,該模塊和後續的session會話模塊、SSM服務狀態機模塊銜接配合,完成數據收發的正常邏輯轉換處理。上面的分析只列出了服務入口點子模塊的核心接口實現,下表總結該模塊全部的接口功能:
類 |
接口 |
功能說明 |
ServiceEntryPointImpl |
ServiceEntryPointImpl::ServiceEntryPointImpl() |
構造初始化,最大連接數限制 |
ServiceEntryPointImpl |
ServiceEntryPointImpl::startSession() |
新連接的回調處理 |
ServiceEntryPointImpl |
ServiceEntryPointImpl::endAllSessions() |
Ssm服務狀態機及session會話回收處理 |
ServiceEntryPointImpl |
ServiceEntryPointImpl::shutdown() |
實例下線處理 |
ServiceEntryPointImpl |
ServiceEntryPointImpl::sessionStats() |
獲取連接統計信息 |
service_entry_point_utils |
launchServiceWorkerThread |
建立工做線程,同時限制每一個線程對應線程棧大小 |
3. Ticket數據收發子模塊
Ticket數據收發子模塊主要功能以下:調用session子模塊進行底層asio庫處理、拆分數據接收和數據發送到兩個類、完整mongodb報文讀取 、接收或者發送mongodb報文後的回調處理。
3.1 ASIOTicket類核心代碼實現
Ticket數據收發模塊相關實現主要由ASIOTicket類完成,該類結構以下:
1.//下面的ASIOSinkTicket和ASIOSourceTicket繼承該類,用於控制數據的發送和接收 2.class TransportLayerASIO::ASIOTicket : public TicketImpl { 3.public: 4. //初始化構造 5. explicit ASIOTicket(const ASIOSessionHandle& session, Date_t expiration); 6. //獲取sessionId 7. SessionId sessionId() const final { 8. return _sessionId; 9. } 10. //asio模式沒用,針對legacy模型 11. Date_t expiration() const final { 12. return _expiration; 13. } 14. 15. //如下四個接口用於數據收發相關處理 16. void fill(bool sync, TicketCallback&& cb); 17.protected: 18. void finishFill(Status status); 19. bool isSync() const; 20. virtual void fillImpl() = 0; 21.private: 22. //會話信息,一個連接一個session 23. std::weak_ptr<ASIOSession> _session; 24. //每一個session有一個惟一id 25. const SessionId _sessionId; 26. //asio模型沒用,針對legacy生效 27. const Date_t _expiration; 28. //數據發送或者接收成功後的回調處理 29. TicketCallback _fillCallback; 30. //同步方式仍是異步方式進行數據處理,默認異步 31. bool _fillSync; 32.};
該類保護多個成員變量,這些成員變量功能說明以下:
成員名 |
做用 |
_session |
Session會話信息,一個連接對應一個session |
_sessionId |
每一個session都有一個對應的惟一ID |
_expiration |
Legacy模式使用,當前都是用asio,該成員已淘汰 |
_fillCallback |
發送或者接收一個完整mongodb報文後的回調處理 |
_fillSync |
同步仍是異步方式收發數據。adaptive線程模型爲異步,synchronous線程模型爲同步讀寫方式 |
mongodb在具體實現上,數據接收和數據發送分開實現,分別是數據接收類ASIOSourceTicket和數據發送類ASIOSinkTicket,這兩個類都繼承自ASIOTicket類,這兩個類的主要結構以下:
1.//數據接收的ticket 2.class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket { 3.public: 4. //初始化構造 5. ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg); 6.protected: 7. //數據接收Impl 8. void fillImpl() final; 9.private: 10. //接收到mongodb頭部數據後的回調處理 11. void _headerCallback(const std::error_code& ec, size_t size); 12. //接收到mongodb包體數據後的回調處理 13. void _bodyCallback(const std::error_code& ec, size_t size); 14. 15. //存儲數據的buffer,網絡IO讀取到的原始數據內容 16. SharedBuffer _buffer; 17. //數據Message管理,數據來源爲_buffer 18. Message* _target; 19.}; 1. 2. 20.//數據發送的ticket 21.class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket { 22. public: 23. //初始化構造 24. ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg); 25.protected: 26. //數據發送Impl 27. void fillImpl() final; 28.private: 29. //發送數據完成的回調處理 30. void _sinkCallback(const std::error_code& ec, size_t size); 31. //須要發送的數據message信息 32. Message _msgToSend; 33.};
從上面的代碼實現能夠看出,ASIOSinkTicket 和ASIOSourceTicket 類接口及成員實現幾乎意義,只是具體的實現方法不一樣,下面對ASIOSourceTicket和ASIOSinkTicket 相關核心代碼實現進行分析。
3.1.2 ASIOSourceTicket 數據接收核心代碼實現
數據接收過程核心代碼以下:
1.//數據接收的fillImpl接口實現 2.void TransportLayerASIO::ASIOSourceTicket::fillImpl() { 3. //獲取對應session信息 4. auto session = getSession(); 5. if (!session) 6. return; 7. //收到讀取mongodb頭部數據,頭部數據長度是固定的kHeaderSize字節 8. const auto initBufSize = kHeaderSize; 9. _buffer = SharedBuffer::allocate(initBufSize); 10. 11. //調用TransportLayerASIO::ASIOSession::read讀取底層數據存入_buffer 12. //讀完頭部數據後執行對應的_headerCallback回調函數 13. session->read(isSync(), 14. asio::buffer(_buffer.get(), initBufSize), //先讀取頭部字段出來 15. [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); }); 16.} 17. 18.//讀取到mongodb header頭部信息後的回調處理 19.void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) { 20. ...... 21. //獲取session信息 22. auto session = getSession(); 23. if (!session) 24. return; 25. //從_buffer中獲取頭部信息 26. MSGHEADER::View headerView(_buffer.get()); 27. //獲取message長度 28. auto msgLen = static_cast<size_t>(headerView.getMessageLength()); 29. //長度過小或者太大,直接報錯 30. if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) { 31. ....... 32. return; 33. } 34. .... 35. //內容還不夠一個mongo協議報文,繼續讀取body長度字節的數據,讀取完畢後開始body處理 36. //注意這裏是realloc,保證頭部和body在同一個buffer中 37. _buffer.realloc(msgLen); 38. MsgData::View msgView(_buffer.get()); 39. 40. //調用底層TransportLayerASIO::ASIOSession::read讀取數據body 41. session->read(isSync(), 42. //數據讀取到該buffer 43. asio::buffer(msgView.data(), msgView.dataLen()), 44. //讀取成功後的回調處理 45. [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); }); 46.} 47. 48.//_headerCallback對header讀取後解析header頭部獲取到對應的msg長度,而後開始body部分處理 49.void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) { 50. ...... 51. //buffer轉存到_target中 52. _target->setData(std::move(_buffer)); 53. //流量統計 54. networkCounter.hitPhysicalIn(_target->size()); 55. //TransportLayerASIO::ASIOTicket::finishFill 56. finishFill(Status::OK()); //包體內容讀完後,開始下一階段的處理 57. //報文讀取完後的下一階段就是報文內容處理,開始走ServiceStateMachine::_processMessage 58.}
Mongodb協議由msg header + msg body組成,一個完整的mongodb報文內容格式以下:
上圖所示各個字段及body內容部分功能說明以下表:
Header or body |
字段名 |
功能說明 |
msg header |
messageLength |
整個message長度,包括header長度和body長度 |
msg header |
requestID |
該請求id信息 |
msg header |
responseTo |
應答id |
msg header |
opCode |
操做類型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE等 |
msg body |
Body |
不一樣opCode對應的包體內容 |
ASIOSourceTicket類的幾個核心接口都是圍繞這一原則展開,整個mongodb數據接收流程以下:
- 讀取mongodb頭部header數據,解析出header中的messageLength字段。
- 檢查messageLength字段是否在指定的合理範圍,該字段不能小於Header整個頭部大小,也不能超過MaxMessageSizeBytes最大長度。
- Header len檢查經過,說明讀取header數據完成,因而執行_headerCallback回調。
- realloc更多的空間來存儲body內容。
- 繼續讀取body len長度數據,讀取body完成後,執行_bodyCallback回調處理。
3.1.3 ASIOSinkTicket數據發送類核心代碼實現
ASIOSinkTicket發送類相比接收類,沒有數據解析相關的流程,所以實現過程會更加簡單,具體源碼實現以下:
1.//發送數據成功後的回調處理 2.void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) { 3. //發送的網絡字節數統計 4. networkCounter.hitPhysicalOut(_msgToSend.size()); 5. //執行SSM中對應的狀態流程 6. finishFill(ec ? errorCodeToStatus(ec) : Status::OK()); 7.} 8. 9.//發送數據的fillImpl 10.void TransportLayerASIO::ASIOSinkTicket::fillImpl() { 11. //獲取對應session 12. auto session = getSession(); 13. if (!session) 14. return; 15. 16. //調用底層TransportLayerASIO::ASIOSession::write發送數據,發送成功後執行_sinkCallback回調 17. session->write(isSync(), 18. asio::buffer(_msgToSend.buf(), _msgToSend.size()), 19. //發送數據成功後的callback回調 20. [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); }); 21.}
3.2 總結
從上面的分析能夠看出,Ticket數據收發模塊主要調用session會話模塊來進行底層數據的讀寫、解析,當讀取或者發送一個完整的mongodb報文後最終交由SSM服務狀態機模塊調度處理。
ticket模塊主要接口功能總結以下表所示:
類命 |
接口名 |
功能說明 |
ASIOTicket |
ASIOTicket::getSession() |
獲取session信息 |
ASIOTicket |
ASIOTicket::isSync() |
判斷是同步收發仍是異步收發 |
ASIOTicket |
ASIOTicket::finishFill() |
執行_fillCallback回調 |
ASIOTicket |
ASIOTicket::fill() |
給_fillCallback賦值 |
ASIOTicket |
ASIOTicket::ASIOTicket() |
ASIOTicket構造初始化 |
ASIOSourceTicket |
ASIOSourceTicket::ASIOSourceTicket() |
ASIOSourceTicket初始化 |
ASIOSourceTicket |
ASIOSourceTicket::_bodyCallback() |
接收到mesg header+body後的回調處理 |
ASIOSourceTicket |
ASIOSourceTicket::_headerCallback |
接收到msg header後的回調處理 |
ASIOSinkTicket |
ASIOSinkTicket::ASIOSinkTicket() |
ASIOSinkTicket初始化構造 |
ASIOSinkTicket |
ASIOSourceTicket::fillImpl() |
發送指定msg數據,發送完成後執行回調 |
ASIOSinkTicket |
ASIOSinkTicket::_sinkCallback |
發送msg成功後的回調處理 |
前面的分析也能夠看出,Ticket數據收發模塊會調用session處理模塊來進行真正的數據讀寫,同時接收或者發送完一個完整mongodb報文後的後續回調處理講交由SSM服務狀態機模塊處理。
4. Session會話子模塊
Session會話模塊功能主要以下:負責記錄HostAndPort、和底層asio庫直接互動,實現數據的同步或者異步收發。一個新鏈接fd對應一個惟一的session,對fd的操做直接映射爲session操做。Session會話子模塊主要代碼實現相關文件以下:
4.1 session會話子模塊核心代碼實現
1.class TransportLayerASIO::ASIOSession : public Session { 2. //初始化構造 3. ASIOSession(TransportLayerASIO* tl, GenericSocket socket); 4. //獲取本session使用的tl 5. TransportLayer* getTransportLayer(); 6. //如下四個接口套接字相關,本端/對端地址獲取,獲取fd,關閉fd等 7. const HostAndPort& remote(); 8. const HostAndPort& local(); 9. GenericSocket& getSocket(); 10. void shutdown(); 11. 12. //如下四個接口調用asio網絡庫實現數據的同步收發和異步收發 13. void read(...) 14. void write(...) 15. void opportunisticRead(...) 16. void opportunisticWrite(...) 17. 18. //遠端地址信息 19. HostAndPort _remote; 20. //本段地址信息 21. HostAndPort _local; 22. //賦值見TransportLayerASIO::_acceptConnection 23. //也就是fd,一個新鏈接對應一個_socket 24. GenericSocket _socket; 25. //SSL相關不作分析, 26.#ifdef MONGO_CONFIG_SSL 27. boost::optional<asio::ssl::stream<decltype(_socket)>> _sslSocket; 28. bool _ranHandshake = false; 29.#endif 30. 31. //本套接字對應的tl,賦值建TransportLayerASIO::_acceptConnection(...) 32. TransportLayerASIO* const _tl; 33.}
該類最核心的三個接口ASIOSession(...)、opportunisticRead(...)、opportunisticWrite(..)分別完成套接字處理、調用asio庫接口實現底層數據讀和底層數據寫。這三個核心接口源碼實現以下:
1.//初始化構造 TransportLayerASIO::_acceptConnection調用 2.ASIOSession(TransportLayerASIO* tl, GenericSocket socket) 3. //fd描述符及TL初始化賦值 4. : _socket(std::move(socket)), _tl(tl) { 5. std::error_code ec; 6. 7. //異步方式設置爲非阻塞讀 8. _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec); 9. fassert(40490, ec.value() == 0); 10. 11. //獲取套接字的family 12. auto family = endpointToSockAddr(_socket.local_endpoint()).getType(); 13. //知足AF_INET 14. if (family == AF_INET || family == AF_INET6) { 15. //no_delay keep_alive套接字系統參數設置 16. _socket.set_option(asio::ip::tcp::no_delay(true)); 17. _socket.set_option(asio::socket_base::keep_alive(true)); 18. //KeepAlive系統參數設置 19. setSocketKeepAliveParams(_socket.native_handle()); 20. } 21. 22. //獲取本端和對端地址 23. _local = endpointToHostAndPort(_socket.local_endpoint()); 24. _remote = endpointToHostAndPort(_socket.remote_endpoint(ec)); 25. if (ec) { 26. LOG(3) << "Unable to get remote endpoint address: " << ec.message(); 27. } 28.}
該類初始化的時候完成新鏈接_socket相關的初始化設置,包括阻塞讀寫仍是非阻塞讀寫。若是是同步線程模型(一個連接一個線程),則讀寫方式位阻塞讀寫;若是是異步線程模型(adaptive動態線程模型),則調用asio網絡庫接口實現異步讀寫。
此外,該連接_socket對應的客戶端ip:port和服務端ip:port也在該初始化類中獲取,最終保存到本session的_remote和_local成員中。
數據讀取核心代碼實現以下:
1.//讀取指定長度數據,而後執行handler回調 2.void opportunisticRead(...) { 3. std::error_code ec; 4. //若是是異步線程模型,在ASIOSession構造初始化的時候會設置non_blocking非阻塞模式 5. //異步線程模型這裏其實是非阻塞讀取,若是是同步線程模型,則沒有non_blocking設置,也就是阻塞讀取 6. auto size = asio::read(stream, buffers, ec); 7. 8. //若是是異步讀,而且read返回would_block或者try_again說明指定長度的數據尚未讀取完畢 9. if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) { 10. //buffers有大小size,實際讀最多讀size字節 11. MutableBufferSequence asyncBuffers(buffers); 12. if (size > 0) { 13. asyncBuffers += size; //buffer offset向後移動 14. } 15. 16. //繼續異步方式讀取數據,讀取到指定長度數據後執行handler回調處理 17. asio::async_read(stream, asyncBuffers, std::forward<CompleteHandler>(handler)); 18. } else { 19. //阻塞方式讀取read返回後能夠保證讀取到了size字節長度的數據 20. //直接read獲取到size字節數據,則直接執行handler 21. handler(ec, size); 22. } 23.}
opportunisticRead首先調用asio::read(stream, buffers, ec)讀取buffers對應size長度的數據,buffers空間大小就是須要讀取的數據size大小。若是是同步線程模型,則這裏爲阻塞式讀取,直到讀到size字節纔會返回;若是是異步線程模型,這這裏是非阻塞讀取,非阻塞讀取當內核網絡協議棧數據讀取完畢後,若是尚未讀到size字節,則繼續進行async_read異步讀取。
當buffers指定的size字節所有讀取完整後,不論是同步模式仍是異步模式,最終都會執行handler回調,開始後續的數據解析及處理流程。
發送數據核心代碼實現以下:
1.//發送數據 2.void opportunisticWrite(...) { 3. std::error_code ec; 4. //若是是同步模式,則阻塞寫,直到所有寫成功。異步模式則非阻塞寫 5. auto size = asio::write(stream, buffers, ec); 6. 7. //異步寫若是返回try_again說明數據尚未發送完,則繼續異步寫發送 8. if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) { 9. ConstBufferSequence asyncBuffers(buffers); 10. if (size > 0) { //buffer中數據指針偏移計數 11. asyncBuffers += size; 12. } 13. //異步寫發送完成,執行handler回調 14. asio::async_write(stream, asyncBuffers, std::forward<CompleteHandler>(handler)); 15. } else { 16. //同步寫成功,則直接執行handler回調處理 17. handler(ec, size); 18. } 19.}
數據發送流程和數據接收流程相似,也分位同步模式發送和異步模式發送,同步模式發送爲阻塞式寫,只有當全部數據經過asio::write()發送成功後才返回;異步模式發送爲非阻塞寫,asio::write()不必定所有發送出去,所以須要再次調用asio庫的asio::async_write()進行異步發送。
不論是同步模式仍是異步模式發送數據,最終數據發送成功後,都會調用handler()回調來執行後續的流程。
4.2 總結
從上面的代碼分析能夠看出,session會話模塊最終直接和asio網絡庫交互實現數據的讀寫操做。該模塊核心接口功能總結以下表:
類名 |
接口名 |
功能說明 |
ASIOSession |
ASIOSession(...) |
新鏈接fd相關處理,如是否非阻塞、delay設置、keep_alive設置、兩端地址獲取等 |
ASIOSession |
getTransportLayer() |
獲取對應TL |
ASIOSession |
remote() |
獲取該連接對應的對端Ip:port信息 |
ASIOSession |
local() |
獲取該連接對應的對端Ip:port信息 |
ASIOSession |
getSocket() |
獲取該連接的fd |
ASIOSession |
shutdown() |
fd回收處理 |
ASIOSession |
opportunisticRead() |
同步或者異步數據讀操做 |
ASIOSession |
opportunisticWrite() |
同步或者異步數據寫操做 |
5. 總結
《Mongodb網絡傳輸處理源碼實現及性能調優-體驗內核性能極致設計》一文對mongodb網絡傳輸模塊中的ASIO網絡庫實現、service_executor服務運行子模塊(即線程模型子模塊)進行了詳細的分析,加上本文的transport_layer套接字處理及傳輸層管理子模塊、session會話子模塊、Ticket數據收發子模塊、service_entry_point服務入口點子模塊。
transport_layer套接字處理及傳輸層管理子模塊主要由transport_layer_manager和transport_layer_asio兩個核心類組成。分別完成net相關的配置文件初始化操做,套接字初始化及處理,最終transport_layer_asio的相應接口實現了和ticket數據分發子模塊、服務入口點子模塊的關聯。
服務入口子模塊主要由ServiceEntryPointImpl類和service_entry_point_utils中的線程建立函數組成,實現新鏈接accept處理及控制。該模塊經過startSession()讓服務入口子模塊、session會話子模塊、ssm狀態機子模塊關聯起來。
ticket數據收發子模塊主要功能以下:調用session子模塊進行底層asio庫處理、拆分數據接收和數據發送到兩個類、完整mongodb報文讀取 、接收或者發送mongodb報文後的回調處理, 回調處理由SSM服務狀態機模塊管理,當讀取或者發送一個完整的mongodb報文後最終交由SSM服務狀態機模塊調度處理。。
Session會話模塊功能主要以下:負責記錄HostAndPort、和底層asio庫直接互動,實現數據的同步或者異步收發。一個新鏈接fd對應一個惟一的session,對fd的操做直接映射爲session操做。
到這裏,整個mongodb網絡傳輸層模塊分析只差service_state_machine狀態機調度子模塊,狀態機調度子模塊相比本文分析的幾個子模塊更加複雜,所以將在下期《mongodb網絡傳輸層模塊源碼分析三》中單獨分析。
本文全部源碼註釋分析詳見以下連接:mongodb網絡傳輸模塊詳細源碼分析