深刻mongoDB(1)--mongod的線程模型與網絡框架

最近工做須要開始研究mongoDB,我準備從其源代碼角度,對於mongod和mongos服務的架構、sharding策略、 replicaset策略、數據同步容災、索引等機制作一個本質性的瞭解。其代碼約20萬行(我研究的是 2.0.6版本源碼),本篇先從mongod的 啓動流程提及,它本是一個多線程程序,因此本文在於說明mongod有多少個線程,每一個線程的意義所在。但願你們閱讀本文時關注在mongod的外圍框 架,暫不涉及數據文件的組織、索引B樹的組織等,僅focus in在網絡框架、線程模型上。web

 

弄清楚這點的好處很明顯:以後就能夠有的放矢的研究mongod某個模塊到底是如何實現的,能夠快速的跳到相應的類中閱讀源碼,解決咱們在產品中的實際問題。我認爲這是研究其龐大源碼一個好的開始。mongodb

 

在說明mongod前,須瞭解mongoDB大量代碼是基於boost庫構建的,所以這裏先行對boost庫創建線程作個簡單的瞭解。數據庫

 

一、boost庫如何創建線程數組

boost::thread是boost中跨平臺的多線程庫,mongoDB建立線程時大多數狀況下是使用thread庫的(少許狀況直接調用pthread_create方法),主要使用瞭如下兩種方式:網絡

(1)直接運行讓線程運行func多線程

例如durThread線程:架構

void durThread() {併發

while( !inShutdown() ) { ... }app

}框架

boost::thread t(durThread);

(2)在類中定義靜態的run方法,調用thread建立線程

    class FileAllocator : boost::noncopyable {
        static void run( FileAllocator * fa );

 

        void FileAllocator::start() {
             boost::thread t( boost::bind( &FileAllocator::run , this ) );
        }
    };

 

二、mongod的入口

mongod的入口main函數在src/mongo/db/db.cpp文件中,我畫了個簡單的活動圖簡要介紹其啓動流程:

如上圖所示,這裏出現了12個固定線程,尚未包括mongod運行之後處理請求時派生出來的線程,以下所示:

–      interruptThread

–      DataFileSync::run

–      FileAllocator::run

–      durThread

–      SnapshotThread::run

–      ClientCursorMonitor::run

–      PeriodicTask::Runner::run

–      TTLMonitor::run

–      replSlaveThread

–      replMasterThread

–      webServerThread

–      處理數據庫請求的主線程

若是不屬於任何replica set,那麼至少有10個固定線程(去除 replSlaveThread和 replMasterThread)。

下面咱們先討論這10個固定的線程,再討論性能很是弱的監聽web事件的線程是怎樣處理請求的,最後討論性能稍好一點的主服務線程是怎樣處理請求的。

 

三、5個基於BackgroundJob類實現的工做線程

這5個線程分別是DataFileSync,SnapshotThread, ClientCursorMonitor, TTLMonitor, PeriodicTask,類圖以下所示:

上面這5個類也是用boost::threadfunction方法建立線程運行的,它們繼承了BackgroundJob類,使用go方法啓動線程執行jobBody就是在啓動線程執行run方法,以下所示:

  1. BackgroundJob& BackgroundJob::go() {  
  2.     boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );  
  3.     return *this;  
  4. }  
  5.   
  6. void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {  
  7.     ...  
  8.     run();  
  9.     ...  
  10. }     


這些線程的意義以下:

DataFileSync 主要在調用MemoryMappedFile::flush方法將內存中的數據刷到磁盤上。 咱們知道,mongodb是調用mmap把磁盤中的數據映射到內存中的,因此必須有一個機制時刻的刷數據到硬盤才能保證可靠性,多久刷一次是與 syncdelay參數相關的。

SnapshotThread將生成快照文件幫助快速恢復。

ClientCursorMonitor將管理用戶的遊標,每4秒調用一次idleTimeReport()方法,每一分鐘調用sayMemoryStatus()方法。

TTLMonitor管理TTL,經過調用doTTLForDB()方法檢查全部db。

PeriodicTask將從動態數組std::vector<PeriodicTask* > _tasks中獲取週期性任務執行。

 

四、5個直接提供全局方法執行的線程

FileAllocator用於分配新文件,它決定分配文件的大小,例如用翻倍的方式。

interruptThread只處理信號量。

durThread作批量提交和回滾工做。

replSlaveThread是當前結點做爲secondary時的同步線程。

replMasterThread是當前結點做爲master時的同步線程。

 

五、web監聽線程

mongod是如何處理web請求的呢?它是經過網絡框架中的核心類Listerner實現的,類圖以下所示:

怎麼理解這幅類圖呢?

首先看 Listener類,它負責監聽、建立新鏈接,其工做步驟以下:

a、建立socket句柄,綁定端口,監聽

b、調用select檢測新鏈接事件

c、對檢測到的事件調用accept創建新鏈接

d、調用void Listener::acceptedMP(MessagingPort*mp)方法處理新鏈接,誰從新實現acceptedMP方法誰決定處理方式

 

這個Listener類既用於處理web請求,也用於處理普通的數據庫請求。

OK, 如今咱們看web請求是如何處理的。MiniWebServer類繼承了Listener類,它從新實現了acceptedMP方法,開始接收TCP流, 解析HTTP協議,同時還會負責組裝HTTP響應包併發送TCP流到客戶端。那麼實際完成http請求的類是誰呢?它是繼承了MiniWebServer 類的DbWebServer類。這個類從新實現了doRequest方法,它會在完整接收到HTTP請求後被調用,HTTP請求的處理過程不在本篇的討論 範圍內,這裏略過。但咱們清楚了,這個線程採用同步的阻塞的方式處理請求,它意味着它同一時刻只能處理一個web請求,併發能力超級弱,還好web請求只 是mongod的副業,僅用於查詢狀態。

 

六、主監聽線程和數據請求的處理線程

處理數據庫請求的是上圖中的PortMessageServer 類,它運行在主線程中。

咱們先看看PortMessageServer 類是如何實現acceptedMP方法的:

  1. virtual voidacceptedMP(MessagingPort * p) {  
  2.     if ( !connTicketHolder.tryAcquire() ) {  
  3.         sleepmillis(2); // otherwisewe'll hard loop  
  4.         return;  
  5.     }  
  6.      …  
  7.         int failed =pthread_create(&thread, &attrs, (void*(*)(void*)) &pms::threadRun,p);  
  8.         …  
  9. }  


很清晰,它開啓了一個線程獨立的執行這個請求。雖然這種方式依然性能極差:大量的進程間上下文切換在等着咱們,但總比web請求處理要好多了,並且mongod的併發能力原本就不是它的長項。

對於每一個新鏈接,都會有類封裝成對象,以下:

接下來pms::threadRun方法是在處理MessagingPort對象。

下面看看pms::threadRun方法中作了些什麼:

  1. void threadRun( MessagingPort *inPort) {  
  2.     TicketHolderReleaserconnTicketReleaser( &connTicketHolder );  
  3.     Message m;  
  4.     try {  
  5.         LastError * le = newLastError();  
  6.         lastError.reset( le ); //lastError now has ownership  
  7.         handler->connected( p.get());  
  8.         while ( ! inShutdown() ) {  
  9.             if ( ! p->recv(m) ) {  
  10.                 p->shutdown();  
  11.                 break;  
  12.             }  
  13.             handler->process( m ,p.get() , le );  
  14.         }  
  15.     }  
  16.     handler->disconnected( p.get());  
  17. }  


能夠看到,它會在這個鏈接上接收完整的請求,以後會調用handler的process方法。這個handler又是什麼呢?以下圖所示:

因此,普通的數據庫請求是由MyMessageHandler的process方法處理的。這個方法裏也只是個封裝,真正處理業務的是全局方法assembleResponse。

assembleResponse方法中會按照8種操做方式分別的調用DataFileMgr中的方法處理實際文件,例如:

  1. enum Operations {  
  2.     opReply = 1,     /* reply. responseTo is set. */  
  3.     dbMsg = 1000,    /* generic msg command followed by a string */  
  4.     dbUpdate = 2001, /* update object */  
  5.     dbInsert = 2002,  
  6.     //dbGetByOID = 2003,  
  7.     dbQuery = 2004,  
  8.     dbGetMore = 2005,  
  9.     dbDelete = 2006,  
  10.     dbKillCursors = 2007  
  11. };  


在方法中有相似這樣的代碼在調用實際的業務類處理操做:

  1. else if ( op == dbInsert ) {  
  2.     receivedInsert(m, currentOp);  
  3. }  
  4. else if ( op == dbUpdate ) {  
  5.     receivedUpdate(m, currentOp);  
  6. }  
  7. else if ( op == dbDelete ) {  
  8.     receivedDelete(m, currentOp);  
  9. }  

固然本篇志不在此,下篇咱們再討論索引和數據文件的操做。

相關文章
相關標籤/搜索