事務處理是DBMS中最關鍵的技術,對SQLite也同樣,它涉及到併發控制,以及故障恢復等等。在數據庫中使用事務能夠保證數據的統一和完整性,同時也能夠提升效率。假設須要在一張表內一次插入20我的的名字纔算是操做成功,那麼在不使用事務的狀況下,若是插入過程當中出現異常或者在插入過程當中出現一些其餘數據庫操做的話,就頗有可能影響了操做的完整性。因此事務能夠很好地解決這樣的狀況,首先事務是能夠把啓動事務過程當中的全部操做視爲事務的過程。等到全部過程執行完畢後,咱們能夠根據操做是否成功來決定事務是否進行提交或者回滾。提交事務後會一次性把全部數據提交到數據庫,若是回滾了事務就會放棄此次的操做,而對原來表的數據不進行更改。sql
SQLite中分別以BEGIN、COMMIT和ROLLBACK啓動、提交和回滾事務。見以下示例:數據庫
@try{ char *errorMsg; if (sqlite3_exec(_database, "BEGIN", NULL, NULL, &errorMsg)==SQLITE_OK) { NSLog(@」啓動事務成功」); sqlite3_free(errorMsg); sqlite3_stmt *statement; if (sqlite3_prepare_v2(_database, [@"insert into persons(name) values(?);" UTF8String], -1, &statement, NULL)==SQLITE_OK) { //綁定參數 const char *text=[@」張三」 cStringUsingEncoding:NSUTF8StringEncoding]; sqlite3_bind_text(statement, index, text, strlen(text), SQLITE_STATIC); if (sqlite3_step(statement)!=SQLITE_DONE) { sqlite3_finalize(statement); } } if (sqlite3_exec(_database, "COMMIT", NULL, NULL, &errorMsg)==SQLITE_OK) { NSLog(@」提交事務成功」); } sqlite3_free(errorMsg); }
else{ sqlite3_free(errorMsg); } } @catch(NSException *e){ char *errorMsg; if (sqlite3_exec(_database, "ROLLBACK", NULL, NULL, &errorMsg)==SQLITE_OK) { NSLog(@」回滾事務成功」); } sqlite3_free(errorMsg); } @finally{ }
在SQLite中,若是沒有爲當前的SQL命令(SELECT除外)顯示的指定事務,那麼SQLite會自動爲該操做添加一個隱式的事務,以保證該操做的原子性和一致性。固然,SQLite也支持顯示的事務,其語法與大多數關係型數據庫相比基本相同。見以下示例:windows
sqlite> BEGIN TRANSACTION; sqlite> INSERT INTO testtable VALUES(1); sqlite> INSERT INTO testtable VALUES(2); sqlite> COMMIT TRANSACTION; --顯示事務被提交,數據表中的數據也發生了變化。 sqlite> SELECT COUNT(*) FROM testtable; COUNT(*) ---------- 2 sqlite> BEGIN TRANSACTION; sqlite> INSERT INTO testtable VALUES(1); sqlite> ROLLBACK TRANSACTION; --顯示事務被回滾,數據表中的數據沒有發生變化。 sqlite> SELECT COUNT(*) FROM testtable; COUNT(*) ---------- 2
下面經過具體示例來分析SQLite原子提交的實現(基於Version 3.3.6的代碼):緩存
CREATE TABLE episodes( id integer primary key,name text, cid int); insert into episodes(name,cid) values("cat",1); --插入一條記錄
它通過編譯器處理後生成的虛擬機代碼以下:數據結構
sqlite> explain insert into episodes(name,cid) values("cat",1); 0|Trace|0|0|0|explain insert into episodes(name,cid) values("cat",1);|00| 1|Goto|0|12|0||00| 2|SetNumColumns|0|3|0||00| 3|OpenWrite|0|2|0||00| 4|NewRowid|0|2|0||00| 5|Null|0|3|0||00| 6|String8|0|4|0|cat|00| 7|Integer|1|5|0||00| 8|MakeRecord|3|3|6|dad|00| 9|Insert|0|6|2|episodes|0b| 10|Close|0|0|0||00| 11|Halt|0|0|0||00| 12|Transaction|0|1|0||00| 13|VerifyCookie|0|1|0||00| 14|Transaction|1|1|0||00| 15|VerifyCookie|1|0|0||00| 16|TableLock|0|2|1|episodes|00| 17|Goto|0|2|0||00|
一、初始狀態(Initial State)
當一個數據庫鏈接第一次打開時,狀態如圖所示。圖中最右邊(「Disk」標註)表示保存在存儲設備中的內容。每一個方框表明一個扇區。藍色的塊表示這個扇區保存了原始數據。圖中中間區域是操做系統的磁盤緩衝區。開始的時候,這些緩存是尚未被使用,所以這些方框是空白的。圖中左邊區域顯示SQLite用戶進程的內存。由於這個數據庫鏈接剛剛打開,因此尚未任何數據記錄被讀入,因此這些內存也是空的。併發
2、獲取讀鎖(Acquiring A Read Lock)
在SQLite寫數據庫以前,它必須先從數據庫中讀取相關信息。好比,在插入新的數據時,SQLite會先從sqlite_master表中讀取數據庫模式(至關於數據字典),以便編譯器對INSERT語句進行分析,肯定數據插入的位置。
在進行讀操做以前,必須先獲取數據庫的共享鎖(shared lock),共享鎖容許兩個或更多的鏈接在同一時刻讀取數據庫。可是共享鎖不容許其它鏈接對數據庫進行寫操做。
shared lock存在於操做系統磁盤緩存,而不是磁盤自己。文件鎖的本質只是操做系統的內核數據結構,當操做系統崩潰或掉電時,這些內核數據也會隨之消失。app
三、讀取數據
一旦獲得shared lock,就能夠進行讀操做。如圖所示,數據先由OS從磁盤讀取到OS緩存,而後再由OS移到用戶進程空間。通常來講,數據庫文件分爲不少頁,而一次讀操做只讀取一小部分頁面。如圖,從8個頁面讀取3個頁面。dom
四、獲取Reserved Lock
在對數據進行修改操做以前,先要獲取數據庫文件的Reserved Lock,Reserved Lock和shared lock的類似之處在於,它們都容許其它進程對數據庫文件進行讀操做。Reserved Lock和Shared Lock能夠共存,可是隻能是一個Reserved Lock和多個Shared Lock——多個Reserved Lock不能共存。因此,在同一時刻,只能進行一個寫操做。
Reserved Lock意味着當前進程(鏈接)想修改數據庫文件,可是還沒開始修改操做,因此其它的進程能夠讀數據庫,但不能寫數據庫。ide
五、建立恢復日誌(Creating A Rollback Journal File)
在對數據庫進行寫操做以前,SQLite先要建立一個單獨的日誌文件,而後把要修改的頁面的原始數據寫入日誌。回滾日誌包含一個日誌頭(圖中的綠色)——記錄數據庫文件的原始大小。因此即便數據庫文件大小改變了,咱們仍知道數據庫的原始大小。
從OS的角度來看,當一個文件建立時,大多數OS(Windows、Linux、Mac OS X)不會向磁盤寫入數據,新建立的文件此時位於磁盤緩存中,以後纔會真正寫入磁盤。如圖,日誌文件位於OS磁盤緩存中,而不是位於磁盤。
函數
以上5步的實現代碼:
//事務指令的實現 //p1爲數據庫文件的索引號--0爲main database;1爲temporary tables使用的文件 //p2不爲0,一個寫事務開始 case OP_Transaction: { //數據庫的索引號 int i = pOp->p1; //指向數據庫對應的btree Btree *pBt; assert( i>=0 && i<db->nDb ); assert( (p->btreeMask & (1<<i))!=0 ); //設置btree指針 pBt = db->aDb[i].pBt; if( pBt ){ //從這裏btree開始事務,主要給文件加鎖,並設置btree事務狀態 rc = sqlite3BtreeBeginTrans(pBt, pOp->p2); if( rc==SQLITE_BUSY ){ p->pc = pc; p->rc = rc = SQLITE_BUSY; goto vdbe_return; } if( rc!=SQLITE_OK && rc!=SQLITE_READONLY /* && rc!=SQLITE_BUSY */ ){ goto abort_due_to_error; } } break; } //開始一個事務,若是第二個參數不爲0,則一個寫事務開始,不然是一個讀事務 //若是wrflag>=2,一個exclusive事務開始,此時別的鏈接不能訪問數據庫 int sqlite3BtreeBeginTrans(Btree *p, int wrflag){ BtShared *pBt = p->pBt; int rc = SQLITE_OK; btreeIntegrity(p); /* If the btree is already in a write-transaction, or it ** is already in a read-transaction and a read-transaction ** is requested, this is a no-op. */ //若是b-tree處於一個寫事務;或者處於一個讀事務,一個讀事務又請求,則返回SQLITE_OK if( p->inTrans==TRANS_WRITE || (p->inTrans==TRANS_READ && !wrflag) ){ return SQLITE_OK; } /* Write transactions are not possible on a read-only database */ //寫事務不能訪問只讀數據庫 if( pBt->readOnly && wrflag ){ return SQLITE_READONLY; } /* If another database handle has already opened a write transaction ** on this shared-btree structure and a second write transaction is ** requested, return SQLITE_BUSY. */ //若是數據庫已存在一個寫事務,則該寫事務請求時返回SQLITE_BUSY if( pBt->inTransaction==TRANS_WRITE && wrflag ){ return SQLITE_BUSY; } do { //若是數據庫對應btree的第一個頁面還沒讀進內存 //則把該頁面讀進內存,數據庫也相應的加read lock if( pBt->pPage1==0 ){ //加read lock,並讀頁面到內存 rc = lockBtree(pBt); } if( rc==SQLITE_OK && wrflag ){ //對數據庫文件加RESERVED_LOCK鎖 rc = sqlite3pager_begin(pBt->pPage1->aData, wrflag>1); if( rc==SQLITE_OK ){ rc = newDatabase(pBt); } } if( rc==SQLITE_OK ){ if( wrflag ) pBt->inStmt = 0; }else{ unlockBtreeIfUnused(pBt); } }while( rc==SQLITE_BUSY && pBt->inTransaction==TRANS_NONE && sqlite3InvokeBusyHandler(pBt->pBusyHandler) ); if( rc==SQLITE_OK ){ if( p->inTrans==TRANS_NONE ){ //btree的事務數加1 pBt->nTransaction++; } //設置btree事務狀態 p->inTrans = (wrflag?TRANS_WRITE:TRANS_READ); if( p->inTrans>pBt->inTransaction ){ pBt->inTransaction = p->inTrans; } } btreeIntegrity(p); return rc; }
/* **獲取數據庫的寫鎖,發生如下狀況時去除寫鎖: ** * sqlite3pager_commit() is called. ** * sqlite3pager_rollback() is called. ** * sqlite3pager_close() is called. ** * sqlite3pager_unref() is called to on every outstanding page. **pData指向數據庫的打開的頁面,此時並不修改,僅僅只是獲取 **相應的pager,檢查它是否處於read-lock狀態 **若是打開的不是臨時文件,則打開日誌文件. **若是數據庫已經處於寫狀態,則do nothing */ int sqlite3pager_begin(void *pData, int exFlag){ PgHdr *pPg = DATA_TO_PGHDR(pData); Pager *pPager = pPg->pPager; int rc = SQLITE_OK; assert( pPg->nRef>0 ); assert( pPager->state!=PAGER_UNLOCK ); //pager已經處於share狀態 if( pPager->state==PAGER_SHARED ){ assert( pPager->aInJournal==0 ); if( MEMDB ){ pPager->state = PAGER_EXCLUSIVE; pPager->origDbSize = pPager->dbSize; }else{ //對文件加 RESERVED_LOCK rc = sqlite3OsLock(pPager->fd, RESERVED_LOCK); if( rc==SQLITE_OK ){ //設置pager的狀態 pPager->state = PAGER_RESERVED; if( exFlag ){ rc = pager_wait_on_lock(pPager, EXCLUSIVE_LOCK); } } if( rc!=SQLITE_OK ){ return rc; } pPager->dirtyCache = 0; TRACE2("TRANSACTION %d\n", PAGERID(pPager)); //使用日誌,不是臨時文件,則打開日誌文件 if( pPager->useJournal && !pPager->tempFile ){ //爲pager打開日誌文件,pager應該處於RESERVED或EXCLUSIVE狀態 //會向日志文件寫入header rc = pager_open_journal(pPager); } } } return rc; }
//建立日誌文件,pager應該處於RESERVED或EXCLUSIVE狀態 static int pager_open_journal(Pager *pPager){ int rc; assert( !MEMDB ); assert( pPager->state>=PAGER_RESERVED ); assert( pPager->journalOpen==0 ); assert( pPager->useJournal ); assert( pPager->aInJournal==0 ); sqlite3pager_pagecount(pPager); //日誌文件頁面位圖 pPager->aInJournal = sqliteMalloc( pPager->dbSize/8 + 1 ); if( pPager->aInJournal==0 ){ rc = SQLITE_NOMEM; goto failed_to_open_journal; } //打開日誌文件 rc = sqlite3OsOpenExclusive(pPager->zJournal, &pPager->jfd, pPager->tempFile); //日誌文件的位置指針 pPager->journalOff = 0; pPager->setMaster = 0; pPager->journalHdr = 0; if( rc!=SQLITE_OK ){ goto failed_to_open_journal; } /*通常來講,OS此時建立的文件位於磁盤緩存,並無實際 **存在於磁盤,下面三個操做就是爲了把結果寫入磁盤,而對於 **windows系統來講,並無提供相應API,因此實際上沒有意義. */ //fullSync操做對windows沒有意義 sqlite3OsSetFullSync(pPager->jfd, pPager->full_fsync); sqlite3OsSetFullSync(pPager->fd, pPager->full_fsync); /* Attempt to open a file descriptor for the directory that contains a file. **This file descriptor can be used to fsync() the directory **in order to make sure the creation of a new file is actually written to disk. */ sqlite3OsOpenDirectory(pPager->jfd, pPager->zDirectory); pPager->journalOpen = 1; pPager->journalStarted = 0; pPager->needSync = 0; pPager->alwaysRollback = 0; pPager->nRec = 0; if( pPager->errCode ){ rc = pPager->errCode; goto failed_to_open_journal; } pPager->origDbSize = pPager->dbSize; //寫入日誌文件的header--24個字節 rc = writeJournalHdr(pPager); if( pPager->stmtAutoopen && rc==SQLITE_OK ){ rc = sqlite3pager_stmt_begin(pPager); } if( rc!=SQLITE_OK && rc!=SQLITE_NOMEM ){ rc = pager_unwritelock(pPager); if( rc==SQLITE_OK ){ rc = SQLITE_FULL; } } return rc; failed_to_open_journal: sqliteFree(pPager->aInJournal); pPager->aInJournal = 0; if( rc==SQLITE_NOMEM ){ /* If this was a malloc() failure, then we will not be closing the pager ** file. So delete any journal file we may have just created. Otherwise, ** the system will get confused, we have a read-lock on the file and a ** mysterious journal has appeared in the filesystem. */ sqlite3OsDelete(pPager->zJournal); }else{ sqlite3OsUnlock(pPager->fd, NO_LOCK); pPager->state = PAGER_UNLOCK; } return rc; } /*寫入日誌文件頭 **journal header的格式以下: ** - 8 bytes: 標誌日誌文件的魔數 ** - 4 bytes: 日誌文件中記錄數 ** - 4 bytes: Random number used for page hash. ** - 4 bytes: 原來數據庫的大小(kb) ** - 4 bytes: 扇區大小512byte */ static int writeJournalHdr(Pager *pPager){ //日誌文件頭 char zHeader[sizeof(aJournalMagic)+16]; int rc = seekJournalHdr(pPager); if( rc ) return rc; pPager->journalHdr = pPager->journalOff; if( pPager->stmtHdrOff==0 ){ pPager->stmtHdrOff = pPager->journalHdr; } //設置文件指針指向header以後 pPager->journalOff += JOURNAL_HDR_SZ(pPager); /* FIX ME: ** ** Possibly for a pager not in no-sync mode, the journal magic should not ** be written until nRec is filled in as part of next syncJournal(). ** ** Actually maybe the whole journal header should be delayed until that ** point. Think about this. */ memcpy(zHeader, aJournalMagic, sizeof(aJournalMagic)); /* The nRec Field. 0xFFFFFFFF for no-sync journals. */ put32bits(&zHeader[sizeof(aJournalMagic)], pPager->noSync ? 0xffffffff : 0); /* The random check-hash initialiser */ sqlite3Randomness(sizeof(pPager->cksumInit), &pPager->cksumInit); put32bits(&zHeader[sizeof(aJournalMagic)+4], pPager->cksumInit); /* The initial database size */ put32bits(&zHeader[sizeof(aJournalMagic)+8], pPager->dbSize); /* The assumed sector size for this process */ put32bits(&zHeader[sizeof(aJournalMagic)+12], pPager->sectorSize); //寫入文件頭 rc = sqlite3OsWrite(pPager->jfd, zHeader, sizeof(zHeader)); /* The journal header has been written successfully. Seek the journal ** file descriptor to the end of the journal header sector. */ if( rc==SQLITE_OK ){ rc = sqlite3OsSeek(pPager->jfd, pPager->journalOff-1); if( rc==SQLITE_OK ){ rc = sqlite3OsWrite(pPager->jfd, "\000", 1); } } return rc; }
其實現過程以下圖所示:
六、修改位於用戶進程空間的頁面(Changing Database Pages In User Space)
頁面的原始數據寫入日誌以後,就能夠修改頁面了——位於用戶進程空間。每一個數據庫鏈接都有本身私有的空間,因此頁面的變化只對該鏈接可見,而對其它鏈接的數據仍然是磁盤緩存中的數據。從這裏能夠明白一件事:一個進程在修改頁面數據的同時,其它進程能夠繼續進行讀操做。圖中的紅色表示修改的頁面。
七、日誌文件刷入磁盤(Flushing The Rollback Journal File To Mass Storage)
接下來把日誌文件的內容刷入磁盤,這對於數據庫從意外中恢復來講是相當重要的一步。並且這一般也是一個耗時的操做,由於磁盤I/O速度很慢。
這個步驟不僅把日誌文件刷入磁盤那麼簡單,它的實現實際上分紅兩步:首先把日誌文件的內容刷入磁盤(即頁面數據);而後把日誌文件中頁面的數目寫入日誌文件頭,再把header刷入磁盤(這一過程在代碼中清晰可見)。
代碼以下:
/* **Sync日誌文件,保證全部的髒頁面寫入磁盤日誌文件 */ static int syncJournal(Pager *pPager){ PgHdr *pPg; int rc = SQLITE_OK; /* Sync the journal before modifying the main database ** (assuming there is a journal and it needs to be synced.) */ if( pPager->needSync ){ if( !pPager->tempFile ){ assert( pPager->journalOpen ); /* assert( !pPager->noSync ); // noSync might be set if synchronous ** was turned off after the transaction was started. Ticket #615 */ #ifndef NDEBUG { /* Make sure the pPager->nRec counter we are keeping agrees ** with the nRec computed from the size of the journal file. */ i64 jSz; rc = sqlite3OsFileSize(pPager->jfd, &jSz); if( rc!=0 ) return rc; assert( pPager->journalOff==jSz ); } #endif { /* Write the nRec value into the journal file header. If in ** full-synchronous mode, sync the journal first. This ensures that ** all data has really hit the disk before nRec is updated to mark ** it as a candidate for rollback. */ if( pPager->fullSync ){ TRACE2("SYNC journal of %d\n", PAGERID(pPager)); //首先保證髒頁面中全部的數據都已經寫入日誌文件 rc = sqlite3OsSync(pPager->jfd, 0); if( rc!=0 ) return rc; } rc = sqlite3OsSeek(pPager->jfd, pPager->journalHdr + sizeof(aJournalMagic)); if( rc ) return rc; //頁面的數目寫入日誌文件 rc = write32bits(pPager->jfd, pPager->nRec); if( rc ) return rc; rc = sqlite3OsSeek(pPager->jfd, pPager->journalOff); if( rc ) return rc; } TRACE2("SYNC journal of %d\n", PAGERID(pPager)); rc = sqlite3OsSync(pPager->jfd, pPager->full_fsync); if( rc!=0 ) return rc; pPager->journalStarted = 1; } pPager->needSync = 0; /* Erase the needSync flag from every page. */ //清除needSync標誌位 for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){ pPg->needSync = 0; } pPager->pFirstSynced = pPager->pFirst; } #ifndef NDEBUG /* If the Pager.needSync flag is clear then the PgHdr.needSync ** flag must also be clear for all pages. Verify that this ** invariant is true. */ else{ for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){ assert( pPg->needSync==0 ); } assert( pPager->pFirstSynced==pPager->pFirst ); } #endif return rc; }
八、獲取排斥鎖(Obtaining An Exclusive Lock)
在對數據庫文件進行修改以前(注:這裏不是內存中的頁面),咱們必須獲得數據庫文件的排斥鎖(Exclusive Lock)。獲得排斥鎖的過程可分爲兩步:首先獲得Pending lock;而後Pending lock升級到exclusive lock。
Pending lock容許其它已經存在的Shared lock繼續讀數據庫文件,可是不容許產生新的shared lock,這樣作目的是爲了防止寫操做發生餓死狀況。一旦全部的shared lock完成操做,則pending lock升級到exclusive lock。
九、修改的頁面寫入文件(Writing Changes To The Database File)
一旦獲得exclusive lock,其它的進程就不能進行讀操做,此時就能夠把修改的頁面寫回數據庫文件,可是一般OS都把結果暫時保存到磁盤緩存中,直到某個時刻纔會真正把結果寫入磁盤。
以上2步的實現代碼:
//把全部的髒頁面寫入數據庫 //到這裏開始獲取EXCLUSIVEQ鎖,並將頁面寫回操做系統文件 static int pager_write_pagelist(PgHdr *pList){ Pager *pPager; int rc; if( pList==0 ) return SQLITE_OK; pPager = pList->pPager; /* At this point there may be either a RESERVED or EXCLUSIVE lock on the ** database file. If there is already an EXCLUSIVE lock, the following ** calls to sqlite3OsLock() are no-ops. ** ** Moving the lock from RESERVED to EXCLUSIVE actually involves going ** through an intermediate state PENDING. A PENDING lock prevents new ** readers from attaching to the database but is unsufficient for us to ** write. The idea of a PENDING lock is to prevent new readers from ** coming in while we wait for existing readers to clear. ** ** While the pager is in the RESERVED state, the original database file ** is unchanged and we can rollback without having to playback the ** journal into the original database file. Once we transition to ** EXCLUSIVE, it means the database file has been changed and any rollback ** will require a journal playback. */ //加EXCLUSIVE_LOCK鎖 rc = pager_wait_on_lock(pPager, EXCLUSIVE_LOCK); if( rc!=SQLITE_OK ){ return rc; } while( pList ){ assert( pList->dirty ); rc = sqlite3OsSeek(pPager->fd, (pList->pgno-1)*(i64)pPager->pageSize); if( rc ) return rc; /* If there are dirty pages in the page cache with page numbers greater ** than Pager.dbSize, this means sqlite3pager_truncate() was called to ** make the file smaller (presumably by auto-vacuum code). Do not write ** any such pages to the file. */ if( pList->pgno<=pPager->dbSize ){ char *pData = CODEC2(pPager, PGHDR_TO_DATA(pList), pList->pgno, 6); TRACE3("STORE %d page %d\n", PAGERID(pPager), pList->pgno); //寫入文件 rc = sqlite3OsWrite(pPager->fd, pData, pPager->pageSize); TEST_INCR(pPager->nWrite); } #ifndef NDEBUG else{ TRACE3("NOSTORE %d page %d\n", PAGERID(pPager), pList->pgno); } #endif if( rc ) return rc; //設置dirty pList->dirty = 0; #ifdef SQLITE_CHECK_PAGES pList->pageHash = pager_pagehash(pList); #endif //指向下一個髒頁面 pList = pList->pDirty; } return SQLITE_OK; }
十、修改結果刷入存儲設備(Flushing Changes To Mass Storage)
爲了保證修改結果然正寫入磁盤,這一步必不可少。對於數據庫存的完整性,這一步也是關鍵的一步。因爲要進行實際的I/O操做,因此和第7步同樣,將花費較多的時間。
以上幾步實現代碼以下(以上幾步是在函數sqlite3BtreeSync()--btree.c中調用的):
//同步btree對應的數據庫文件 //該函數返回以後,只須要提交寫事務,刪除日誌文件 int sqlite3BtreeSync(Btree *p, const char *zMaster){ int rc = SQLITE_OK; if( p->inTrans==TRANS_WRITE ){ BtShared *pBt = p->pBt; Pgno nTrunc = 0; #ifndef SQLITE_OMIT_AUTOVACUUM if( pBt->autoVacuum ){ rc = autoVacuumCommit(pBt, &nTrunc); if( rc!=SQLITE_OK ){ return rc; } } #endif //調用pager進行sync rc = sqlite3pager_sync(pBt->pPager, zMaster, nTrunc); } return rc; } //把pager全部髒頁面寫回文件 int sqlite3pager_sync(Pager *pPager, const char *zMaster, Pgno nTrunc){ int rc = SQLITE_OK; TRACE4("DATABASE SYNC: File=%s zMaster=%s nTrunc=%d\n", pPager->zFilename, zMaster, nTrunc); /* If this is an in-memory db, or no pages have been written to, or this ** function has already been called, it is a no-op. */ //pager不處於PAGER_SYNCED狀態,dirtyCache爲1, //則進行sync操做 if( pPager->state!=PAGER_SYNCED && !MEMDB && pPager->dirtyCache ){ PgHdr *pPg; assert( pPager->journalOpen ); /* If a master journal file name has already been written to the ** journal file, then no sync is required. This happens when it is ** written, then the process fails to upgrade from a RESERVED to an ** EXCLUSIVE lock. The next time the process tries to commit the ** transaction the m-j name will have already been written. */ if( !pPager->setMaster ){ //pager修改計數 rc = pager_incr_changecounter(pPager); if( rc!=SQLITE_OK ) goto sync_exit; #ifndef SQLITE_OMIT_AUTOVACUUM if( nTrunc!=0 ){ /* If this transaction has made the database smaller, then all pages ** being discarded by the truncation must be written to the journal ** file. */ Pgno i; void *pPage; int iSkip = PAGER_MJ_PGNO(pPager); for( i=nTrunc+1; i<=pPager->origDbSize; i++ ){ if( !(pPager->aInJournal[i/8] & (1<<(i&7))) && i!=iSkip ){ rc = sqlite3pager_get(pPager, i, &pPage); if( rc!=SQLITE_OK ) goto sync_exit; rc = sqlite3pager_write(pPage); sqlite3pager_unref(pPage); if( rc!=SQLITE_OK ) goto sync_exit; } } } #endif rc = writeMasterJournal(pPager, zMaster); if( rc!=SQLITE_OK ) goto sync_exit; //sync日誌文件 rc = syncJournal(pPager); if( rc!=SQLITE_OK ) goto sync_exit; } #ifndef SQLITE_OMIT_AUTOVACUUM if( nTrunc!=0 ){ rc = sqlite3pager_truncate(pPager, nTrunc); if( rc!=SQLITE_OK ) goto sync_exit; } #endif /* Write all dirty pages to the database file */ pPg = pager_get_all_dirty_pages(pPager); //把全部髒頁面寫回操做系統文件 rc = pager_write_pagelist(pPg); if( rc!=SQLITE_OK ) goto sync_exit; /* Sync the database file. */ //sync數據庫文件 if( !pPager->noSync ){ rc = sqlite3OsSync(pPager->fd, 0); } pPager->state = PAGER_SYNCED; }else if( MEMDB && nTrunc!=0 ){ rc = sqlite3pager_truncate(pPager, nTrunc); } sync_exit: return rc; }
接下來的過程以下圖所示:
十一、刪除日誌文件(Deleting The Rollback Journal)
一旦更改寫入設備,日誌文件將會被刪除,這是事務真正提交的時刻。若是在這以前系統發生崩潰,就會進行恢復處理,使得數據庫和沒發生改變同樣;若是在這以後系統發生崩潰,代表全部的更改都已經寫入磁盤。SQLite就是根據日誌存在狀況決定是否對數據庫進行恢復處理。刪除文件本質上不是一個原子操做,可是從用戶進程的角度來看是一個原子操做,因此一個事務看起來是一個原子操做。
在許多系統中,刪除文件也是一個高代價的操做。做爲優化,SQLite能夠配置成把日誌文件的長度截爲0或者把日誌文件頭清零。
十二、釋放鎖(Releasing The Lock)
做爲原子提交的最後一步,釋放排斥鎖使得其它進程能夠開始訪問數據庫。
下圖中,咱們指明瞭當鎖被釋放的時候用戶空間所擁有的信息已經被清空了。對於老版本的SQLite能夠這麼認爲,但最新的SQLite會保存些用戶空間的緩存不會被清空,可能下一個事務開始的時候,這些數據恰好能夠用上。從新利用這些內存要比再次從操做系統磁盤緩存或者硬盤中讀取輕鬆和快捷得多。在再次使用這些數據以前,咱們必須先取得一個共享鎖,同時咱們還不得不去檢查一下,保證尚未其餘進程在咱們擁有共享鎖以前對數據庫文件進行了修改。數據庫文件的第一頁中有一個計數器,數據庫文件每作一次修改,這個計數器就會增加一下。咱們能夠經過檢查這個計數器就可得知是否有其餘進程修改過數據庫文件。若是數據庫文件已經被修改過了,那麼用戶內存空間的緩存就不得不清空,並從新讀入。大多數狀況下,這種狀況不大會發生,所以用戶空間的內存緩存將是有效的,這對於性能提升來講做用是顯著的。
以上2步(以上2步是在sqlite3BtreeCommit()--btree.c函數中實現的)代碼以下:
//提交事務,至此一個事務完成.主要作兩件事: //刪除日誌文件,釋放數據庫文件的寫鎖 int sqlite3BtreeCommit(Btree *p){ BtShared *pBt = p->pBt; btreeIntegrity(p); /* If the handle has a write-transaction open, commit the shared-btrees ** transaction and set the shared state to TRANS_READ. */ if( p->inTrans==TRANS_WRITE ){ int rc; assert( pBt->inTransaction==TRANS_WRITE ); assert( pBt->nTransaction>0 ); //調用pager,提交事務 rc = sqlite3pager_commit(pBt->pPager); if( rc!=SQLITE_OK ){ return rc; } pBt->inTransaction = TRANS_READ; pBt->inStmt = 0; } unlockAllTables(p); /* If the handle has any kind of transaction open, decrement the transaction ** count of the shared btree. If the transaction count reaches 0, set ** the shared state to TRANS_NONE. The unlockBtreeIfUnused() call below ** will unlock the pager. */ if( p->inTrans!=TRANS_NONE ){ pBt->nTransaction--; if( 0==pBt->nTransaction ){ pBt->inTransaction = TRANS_NONE; } } } //提交事務,主要調用pager_unwritelock()函數 int sqlite3pager_commit(Pager *pPager){ int rc; PgHdr *pPg; if( pPager->errCode ){ return pPager->errCode; } if( pPager->state<PAGER_RESERVED ){ return SQLITE_ERROR; } TRACE2("COMMIT %d\n", PAGERID(pPager)); if( MEMDB ){ pPg = pager_get_all_dirty_pages(pPager); while( pPg ){ clearHistory(PGHDR_TO_HIST(pPg, pPager)); pPg->dirty = 0; pPg->inJournal = 0; pPg->inStmt = 0; pPg->needSync = 0; pPg->pPrevStmt = pPg->pNextStmt = 0; pPg = pPg->pDirty; } pPager->pDirty = 0; #ifndef NDEBUG for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){ PgHistory *pHist = PGHDR_TO_HIST(pPg, pPager); assert( !pPg->alwaysRollback ); assert( !pHist->pOrig ); assert( !pHist->pStmt ); } #endif pPager->pStmt = 0; pPager->state = PAGER_SHARED; return SQLITE_OK; } if( pPager->dirtyCache==0 ){ /* Exit early (without doing the time-consuming sqlite3OsSync() calls) ** if there have been no changes to the database file. */ assert( pPager->needSync==0 ); rc = pager_unwritelock(pPager); pPager->dbSize = -1; return rc; } assert( pPager->journalOpen ); rc = sqlite3pager_sync(pPager, 0, 0); //刪除文件,釋放寫鎖 if( rc==SQLITE_OK ){ rc = pager_unwritelock(pPager); pPager->dbSize = -1; } return rc; } //對數據庫加read lock,刪除日誌文件 static int pager_unwritelock(Pager *pPager){ PgHdr *pPg; int rc; assert( !MEMDB ); if( pPager->state<PAGER_RESERVED ){ return SQLITE_OK; } sqlite3pager_stmt_commit(pPager); if( pPager->stmtOpen ){ sqlite3OsClose(&pPager->stfd); pPager->stmtOpen = 0; } if( pPager->journalOpen ){ //關閉日誌文件 sqlite3OsClose(&pPager->jfd); pPager->journalOpen = 0; //刪除日誌文件 sqlite3OsDelete(pPager->zJournal); sqliteFree( pPager->aInJournal ); pPager->aInJournal = 0; for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){ pPg->inJournal = 0; pPg->dirty = 0; pPg->needSync = 0; #ifdef SQLITE_CHECK_PAGES pPg->pageHash = pager_pagehash(pPg); #endif } pPager->pDirty = 0; pPager->dirtyCache = 0; pPager->nRec = 0; }else{ assert( pPager->aInJournal==0 ); assert( pPager->dirtyCache==0 || pPager->useJournal==0 ); } //釋放寫鎖,加讀鎖 rc = sqlite3OsUnlock(pPager->fd, SHARED_LOCK); pPager->state = PAGER_SHARED; pPager->origDbSize = 0; pPager->setMaster = 0; pPager->needSync = 0; pPager->pFirstSynced = pPager->pFirst; return rc; }
下圖可進一步描述該過程:
其中sqlite3BtreeSync()和sqlite3BtreeCommit()是如何被調用的?
通常來講,事務提交方式爲自動提交的話,在虛擬機中的OP_Halt指令實現提交事務,相關代碼以下:
//虛擬機停機指令 case OP_Halt: { /* no-push */ p->pTos = pTos; p->rc = pOp->p1; p->pc = pc; p->errorAction = pOp->p2; if( pOp->p3 ){ sqlite3SetString(&p->zErrMsg, pOp->p3, (char*)0); } //設置虛擬機狀態SQLITE_MAGIC_RUN 爲 SQLITE_MAGIC_HALT, //並提交事務 rc = sqlite3VdbeHalt(p); assert( rc==SQLITE_BUSY || rc==SQLITE_OK ); if( rc==SQLITE_BUSY ){ p->rc = SQLITE_BUSY; return SQLITE_BUSY; } return p->rc ? SQLITE_ERROR : SQLITE_DONE; } //當虛擬機要停機時,調用該函數,若是VDBE改變了數據庫且爲自動 //提交模式,則提交這些改變 int sqlite3VdbeHalt(Vdbe *p){ sqlite3 *db = p->db; int i; int (*xFunc)(Btree *pBt) = 0; /* Function to call on each btree backend */ int isSpecialError; /* Set to true if SQLITE_NOMEM or IOERR */ /* This function contains the logic that determines if a statement or ** transaction will be committed or rolled back as a result of the ** execution of this virtual machine. ** ** Special errors: ** ** If an SQLITE_NOMEM error has occured in a statement that writes to ** the database, then either a statement or transaction must be rolled ** back to ensure the tree-structures are in a consistent state. A ** statement transaction is rolled back if one is open, otherwise the ** entire transaction must be rolled back. ** ** If an SQLITE_IOERR error has occured in a statement that writes to ** the database, then the entire transaction must be rolled back. The ** I/O error may have caused garbage to be written to the journal ** file. Were the transaction to continue and eventually be rolled ** back that garbage might end up in the database file. ** ** In both of the above cases, the Vdbe.errorAction variable is ** ignored. If the sqlite3.autoCommit flag is false and a transaction ** is rolled back, it will be set to true. ** ** Other errors: ** ** No error: ** */ if( sqlite3MallocFailed() ){ p->rc = SQLITE_NOMEM; } if( p->magic!=VDBE_MAGIC_RUN ){ /* Already halted. Nothing to do. */ assert( p->magic==VDBE_MAGIC_HALT ); return SQLITE_OK; } //釋放虛擬機中全部的遊標 closeAllCursors(p); checkActiveVdbeCnt(db); /* No commit or rollback needed if the program never started */ if( p->pc>=0 ){ /* Check for one of the special errors - SQLITE_NOMEM or SQLITE_IOERR */ isSpecialError = ((p->rc==SQLITE_NOMEM || p->rc==SQLITE_IOERR)?1:0); if( isSpecialError ){ /* This loop does static analysis of the query to see which of the ** following three categories it falls into: ** ** Read-only ** Query with statement journal ** Query without statement journal ** ** We could do something more elegant than this static analysis (i.e. ** store the type of query as part of the compliation phase), but ** handling malloc() or IO failure is a fairly obscure edge case so ** this is probably easier. Todo: Might be an opportunity to reduce ** code size a very small amount though */ int isReadOnly = 1; int isStatement = 0; assert(p->aOp || p->nOp==0); for(i=0; i<p->nOp; i++){ switch( p->aOp[i].opcode ){ case OP_Transaction: isReadOnly = 0; break; case OP_Statement: isStatement = 1; break; } } /* If the query was read-only, we need do no rollback at all. Otherwise, ** proceed with the special handling. */ if( !isReadOnly ){ if( p->rc==SQLITE_NOMEM && isStatement ){ xFunc = sqlite3BtreeRollbackStmt; }else{ /* We are forced to roll back the active transaction. Before doing ** so, abort any other statements this handle currently has active. */ sqlite3AbortOtherActiveVdbes(db, p); sqlite3RollbackAll(db); db->autoCommit = 1; } } } /* If the auto-commit flag is set and this is the only active vdbe, then ** we do either a commit or rollback of the current transaction. ** ** Note: This block also runs if one of the special errors handled ** above has occured. */ //若是自動提交事務,則提交事務 if( db->autoCommit && db->activeVdbeCnt==1 ){ if( p->rc==SQLITE_OK || (p->errorAction==OE_Fail && !isSpecialError) ){ /* The auto-commit flag is true, and the vdbe program was ** successful or hit an 'OR FAIL' constraint. This means a commit ** is required. */ //提交事務 int rc = vdbeCommit(db); if( rc==SQLITE_BUSY ){ return SQLITE_BUSY; }else if( rc!=SQLITE_OK ){ p->rc = rc; sqlite3RollbackAll(db); }else{ sqlite3CommitInternalChanges(db); } }else{ sqlite3RollbackAll(db); } }else if( !xFunc ){ if( p->rc==SQLITE_OK || p->errorAction==OE_Fail ){ xFunc = sqlite3BtreeCommitStmt; }else if( p->errorAction==OE_Abort ){ xFunc = sqlite3BtreeRollbackStmt; }else{ sqlite3AbortOtherActiveVdbes(db, p); sqlite3RollbackAll(db); db->autoCommit = 1; } } /* If xFunc is not NULL, then it is one of sqlite3BtreeRollbackStmt or ** sqlite3BtreeCommitStmt. Call it once on each backend. If an error occurs ** and the return code is still SQLITE_OK, set the return code to the new ** error value. */ assert(!xFunc || xFunc==sqlite3BtreeCommitStmt || xFunc==sqlite3BtreeRollbackStmt ); for(i=0; xFunc && i<db->nDb; i++){ int rc; Btree *pBt = db->aDb[i].pBt; if( pBt ){ rc = xFunc(pBt); if( rc && (p->rc==SQLITE_OK || p->rc==SQLITE_CONSTRAINT) ){ p->rc = rc; sqlite3SetString(&p->zErrMsg, 0); } } } /* If this was an INSERT, UPDATE or DELETE and the statement was committed, ** set the change counter. */ if( p->changeCntOn && p->pc>=0 ){ if( !xFunc || xFunc==sqlite3BtreeCommitStmt ){ sqlite3VdbeSetChanges(db, p->nChange); }else{ sqlite3VdbeSetChanges(db, 0); } p->nChange = 0; } /* Rollback or commit any schema changes that occurred. */ if( p->rc!=SQLITE_OK && db->flags&SQLITE_InternChanges ){ sqlite3ResetInternalSchema(db, 0); db->flags = (db->flags | SQLITE_InternChanges); } } /* We have successfully halted and closed the VM. Record this fact. */ if( p->pc>=0 ){ db->activeVdbeCnt--; } p->magic = VDBE_MAGIC_HALT; checkActiveVdbeCnt(db); return SQLITE_OK; } //提交事務,主要調用: //sqlite3BtreeSync()--同步btree, sqlite3BtreeCommit()---提交事務 static int vdbeCommit(sqlite3 *db){ int i; int nTrans = 0; /* Number of databases with an active write-transaction */ int rc = SQLITE_OK; int needXcommit = 0; for(i=0; i<db->nDb; i++){ Btree *pBt = db->aDb[i].pBt; if( pBt && sqlite3BtreeIsInTrans(pBt) ){ needXcommit = 1; if( i!=1 ) nTrans++; } } /* If there are any write-transactions at all, invoke the commit hook */ if( needXcommit && db->xCommitCallback ){ sqlite3SafetyOff(db); rc = db->xCommitCallback(db->pCommitArg); sqlite3SafetyOn(db); if( rc ){ return SQLITE_CONSTRAINT; } } /* The simple case - no more than one database file (not counting the ** TEMP database) has a transaction active. There is no need for the ** master-journal. ** ** If the return value of sqlite3BtreeGetFilename() is a zero length ** string, it means the main database is :memory:. In that case we do ** not support atomic multi-file commits, so use the simple case then ** too. */ //簡單的狀況,只有一個數據庫文件,不須要master-journal if( 0==strlen(sqlite3BtreeGetFilename(db->aDb[0].pBt)) || nTrans<=1 ){ for(i=0; rc==SQLITE_OK && i<db->nDb; i++){ Btree *pBt = db->aDb[i].pBt; if( pBt ){ //同步btree rc = sqlite3BtreeSync(pBt, 0); } } /* Do the commit only if all databases successfully synced */ //commite事務 if( rc==SQLITE_OK ){ for(i=0; i<db->nDb; i++){ Btree *pBt = db->aDb[i].pBt; if( pBt ){ sqlite3BtreeCommit(pBt); } } } } /* The complex case - There is a multi-file write-transaction active. ** This requires a master journal file to ensure the transaction is ** committed atomicly. */ #ifndef SQLITE_OMIT_DISKIO else{ int needSync = 0; char *zMaster = 0; /* File-name for the master journal */ char const *zMainFile = sqlite3BtreeGetFilename(db->aDb[0].pBt); OsFile *master = 0; /* Select a master journal file name */ do { u32 random; sqliteFree(zMaster); sqlite3Randomness(sizeof(random), &random); zMaster = sqlite3MPrintf("%s-mj%08X", zMainFile, random&0x7fffffff); if( !zMaster ){ return SQLITE_NOMEM; } }while( sqlite3OsFileExists(zMaster) ); /* Open the master journal. */ rc = sqlite3OsOpenExclusive(zMaster, &master, 0); if( rc!=SQLITE_OK ){ sqliteFree(zMaster); return rc; } /* Write the name of each database file in the transaction into the new ** master journal file. If an error occurs at this point close ** and delete the master journal file. All the individual journal files ** still have 'null' as the master journal pointer, so they will roll ** back independently if a failure occurs. */ for(i=0; i<db->nDb; i++){ Btree *pBt = db->aDb[i].pBt; if( i==1 ) continue; /* Ignore the TEMP database */ if( pBt && sqlite3BtreeIsInTrans(pBt) ){ char const *zFile = sqlite3BtreeGetJournalname(pBt); if( zFile[0]==0 ) continue; /* Ignore :memory: databases */ if( !needSync && !sqlite3BtreeSyncDisabled(pBt) ){ needSync = 1; } rc = sqlite3OsWrite(master, zFile, strlen(zFile)+1); if( rc!=SQLITE_OK ){ sqlite3OsClose(&master); sqlite3OsDelete(zMaster); sqliteFree(zMaster); return rc; } } } /* Sync the master journal file. Before doing this, open the directory ** the master journal file is store in so that it gets synced too. */ zMainFile = sqlite3BtreeGetDirname(db->aDb[0].pBt); rc = sqlite3OsOpenDirectory(master, zMainFile); if( rc!=SQLITE_OK || (needSync && (rc=sqlite3OsSync(master,0))!=SQLITE_OK) ){ sqlite3OsClose(&master); sqlite3OsDelete(zMaster); sqliteFree(zMaster); return rc; } /* Sync all the db files involved in the transaction. The same call ** sets the master journal pointer in each individual journal. If ** an error occurs here, do not delete the master journal file. ** ** If the error occurs during the first call to sqlite3BtreeSync(), ** then there is a chance that the master journal file will be ** orphaned. But we cannot delete it, in case the master journal ** file name was written into the journal file before the failure ** occured. */ for(i=0; i<db->nDb; i++){ Btree *pBt = db->aDb[i].pBt; if( pBt && sqlite3BtreeIsInTrans(pBt) ){ rc = sqlite3BtreeSync(pBt, zMaster); if( rc!=SQLITE_OK ){ sqlite3OsClose(&master); sqliteFree(zMaster); return rc; } } } sqlite3OsClose(&master); /* Delete the master journal file. This commits the transaction. After ** doing this the directory is synced again before any individual ** transaction files are deleted. */ rc = sqlite3OsDelete(zMaster); assert( rc==SQLITE_OK ); sqliteFree(zMaster); zMaster = 0; rc = sqlite3OsSyncDirectory(zMainFile); if( rc!=SQLITE_OK ){ /* This is not good. The master journal file has been deleted, but ** the directory sync failed. There is no completely safe course of ** action from here. The individual journals contain the name of the ** master journal file, but there is no way of knowing if that ** master journal exists now or if it will exist after the operating ** system crash that may follow the fsync() failure. */ return rc; } /* All files and directories have already been synced, so the following ** calls to sqlite3BtreeCommit() are only closing files and deleting ** journals. If something goes wrong while this is happening we don't ** really care. The integrity of the transaction is already guaranteed, ** but some stray 'cold' journals may be lying around. Returning an ** error code won't help matters. */ for(i=0; i<db->nDb; i++){ Btree *pBt = db->aDb[i].pBt; if( pBt ){ sqlite3BtreeCommit(pBt); } } } #endif return rc; }
pager層是SQLite實現最爲核心的模塊,它具備四大功能:I/O、頁面緩存、併發控制和日誌恢復。而這些功能不只是上層Btree的基礎,並且對系統的性能和健壯性有相當重要的影響。其中併發控制和日誌恢復是事務處理實現的基礎。SQLite併發控制的機制很是簡單——即封鎖機制;另外,它的查詢優化機制也很是簡單——基於索引。這一切使得整個SQLite的實現變得簡單,同時變得很小,保證其運行速度很是快,因此特別適合嵌入式設備。SQLite是基於鎖來實現併發控制的,其鎖機制實現得很是簡單而巧妙。
SQLite的併發控制機制是採用加鎖的方式,實現簡單,也很是巧妙,以下圖所示:
一、RESERVED LOCK
RESERVED鎖意味着進程將要對數據庫進行寫操做。某一時刻只能有一個RESERVED Lock,可是RESERVED鎖和SHARED鎖能夠共存,並且能夠對數據庫加新的SHARED鎖。
爲何要用RESERVED鎖?
主要是出於併發性的考慮。因爲SQLite只有庫級排斥鎖(EXCLUSIVE LOCK),若是寫事務一開始就上EXCLUSIVE鎖,而後再進行實際的數據更新,寫磁盤操做,這會使得併發性大大下降。而SQLite一旦獲得數據庫的RESERVED鎖,就能夠對緩存中的數據進行修改,而與此同時,其它進程能夠繼續進行讀操做。直到真正須要寫磁盤時纔對數據庫加EXCLUSIVE鎖。
二、PENDING LOCK
PENDING LOCK意味着進程已經完成緩存中的數據修改,並想當即將更新寫入磁盤。它將等待此時已經存在的讀鎖事務完成,可是不容許對數據庫加新的SHARED LOCK(這與RESERVED LOCK相區別)。
爲何要有PENDING LOCK?
主要是爲了防止出現寫餓死的狀況。因爲寫事務先要獲取RESERVED LOCK,因此可能一直產生新的SHARED LOCK,使得寫事務發生餓死的狀況。
三、加鎖機制的具體實現
SQLite在pager層獲取鎖的函數以下:
//獲取一個文件的鎖,若是忙則重複該操做, //直到busy回調函數返回flase,或者成功得到鎖 static int pager_wait_on_lock(Pager *pPager, int locktype){ int rc; assert( PAGER_SHARED==SHARED_LOCK ); assert( PAGER_RESERVED==RESERVED_LOCK ); assert( PAGER_EXCLUSIVE==EXCLUSIVE_LOCK ); if( pPager->state>=locktype ){ rc = SQLITE_OK; }else{ //重複直到得到鎖 do { rc = sqlite3OsLock(pPager->fd, locktype); }while( rc==SQLITE_BUSY && sqlite3InvokeBusyHandler(pPager->pBusyHandler) ); if( rc==SQLITE_OK ){ //設置pager的狀態 pPager->state = locktype; } } return rc; }
Windows下具體的實現以下:
static int winLock(OsFile *id, int locktype){ int rc = SQLITE_OK; /* Return code from subroutines */ int res = 1; /* Result of a windows lock call */ int newLocktype; /* Set id->locktype to this value before exiting */ int gotPendingLock = 0;/* True if we acquired a PENDING lock this time */ winFile *pFile = (winFile*)id; assert( pFile!=0 ); TRACE5("LOCK %d %d was %d(%d)\n", pFile->h, locktype, pFile->locktype, pFile->sharedLockByte); /* If there is already a lock of this type or more restrictive on the ** OsFile, do nothing. Don't use the end_lock: exit path, as ** sqlite3OsEnterMutex() hasn't been called yet. */ //當前的鎖>=locktype,則返回 if( pFile->locktype>=locktype ){ return SQLITE_OK; } /* Make sure the locking sequence is correct */ assert( pFile->locktype!=NO_LOCK || locktype==SHARED_LOCK ); assert( locktype!=PENDING_LOCK ); assert( locktype!=RESERVED_LOCK || pFile->locktype==SHARED_LOCK ); /* Lock the PENDING_LOCK byte if we need to acquire a PENDING lock or ** a SHARED lock. If we are acquiring a SHARED lock, the acquisition of ** the PENDING_LOCK byte is temporary. */ newLocktype = pFile->locktype; /*兩種狀況: (1)若是當前文件處於無鎖狀態(獲取讀鎖--讀事務 **和寫事務在最初階段都要經歷的階段), **(2)處於RESERVED_LOCK,且請求的鎖爲EXCLUSIVE_LOCK(寫事務) **則對執行加PENDING_LOCK */ /////////////////////(1)/////////////////// if( pFile->locktype==NO_LOCK || (locktype==EXCLUSIVE_LOCK && pFile->locktype==RESERVED_LOCK) ){ int cnt = 3; //加pending鎖 while( cnt-->0 && (res = LockFile(pFile->h, PENDING_BYTE, 0, 1, 0))==0 ){ /* Try 3 times to get the pending lock. The pending lock might be ** held by another reader process who will release it momentarily. */ TRACE2("could not get a PENDING lock. cnt=%d\n", cnt); Sleep(1); } //設置爲gotPendingLock爲1,使和在後面要釋放PENDING鎖 gotPendingLock = res; } /* Acquire a shared lock */ /*獲取shared lock **此時,事務應該持有PENDING鎖,而PENDING鎖做爲事務從UNLOCKED到 **SHARED_LOCKED的一個過渡,因此事務由PENDING->SHARED **此時,實際上鎖處於兩個狀態:PENDING和SHARED, **直到後面釋放PENDING鎖後,才真正處於SHARED狀態 */ ////////////////(2)///////////////////////////////////// if( locktype==SHARED_LOCK && res ){ assert( pFile->locktype==NO_LOCK ); res = getReadLock(pFile); if( res ){ newLocktype = SHARED_LOCK; } } /* Acquire a RESERVED lock */ /*獲取RESERVED **此時事務持有SHARED_LOCK,變化過程爲SHARED->RESERVED。 **RESERVED鎖的做用就是爲了提升系統的併發性能 */ ////////////////////////(3)///////////////////////////////// if( locktype==RESERVED_LOCK && res ){ assert( pFile->locktype==SHARED_LOCK ); //加RESERVED鎖 res = LockFile(pFile->h, RESERVED_BYTE, 0, 1, 0); if( res ){ newLocktype = RESERVED_LOCK; } } /* Acquire a PENDING lock */ /*獲取PENDING鎖 **此時事務持有RESERVED_LOCK,且事務申請EXCLUSIVE_LOCK **變化過程爲:RESERVED->PENDING。 **PENDING狀態只是惟一的做用就是防止寫餓死. **讀事務不會執行該代碼,可是寫事務會執行該代碼, **執行該代碼後gotPendingLock設爲0,後面就不會釋放PENDING鎖。 */ //////////////////////////////(4)//////////////////////////////// if( locktype==EXCLUSIVE_LOCK && res ){ //這裏沒有實際的加鎖操做,只是把鎖的狀態改成PENDING狀態 newLocktype = PENDING_LOCK; //設置了gotPendingLock,後面就不會釋放PENDING鎖了, //至關於加了PENDING鎖,其實是在開始處加的PENDING鎖 gotPendingLock = 0; } /* Acquire an EXCLUSIVE lock */ /*獲取EXCLUSIVE鎖 **當一個事務執行該代碼時,它應該知足如下條件: **(1)鎖的狀態爲:PENDING (2)是一個寫事務 **變化過程:PENDING->EXCLUSIVE */ /////////////////////////(5)/////////////////////////////////////////// if( locktype==EXCLUSIVE_LOCK && res ){ assert( pFile->locktype>=SHARED_LOCK ); res = unlockReadLock(pFile); TRACE2("unreadlock = %d\n", res); res = LockFile(pFile->h, SHARED_FIRST, 0, SHARED_SIZE, 0); if( res ){ newLocktype = EXCLUSIVE_LOCK; }else{ TRACE2("error-code = %d\n", GetLastError()); } } /* If we are holding a PENDING lock that ought to be released, then ** release it now. */ /*此時事務在第2步中得到PENDING鎖,它將申請SHARED_LOCK(第3步,和圖形相對照), **而在以前它已經獲取了PENDING鎖, **因此在這裏它須要釋放PENDING鎖,此時鎖的變化爲:PENDING->SHARED */ //////////////////////////(6)///////////////////////////////////// if( gotPendingLock && locktype==SHARED_LOCK ){ UnlockFile(pFile->h, PENDING_BYTE, 0, 1, 0); } /* Update the state of the lock has held in the file descriptor then ** return the appropriate result code. */ if( res ){ rc = SQLITE_OK; }else{ TRACE4("LOCK FAILED %d trying for %d but got %d\n", pFile->h, locktype, newLocktype); rc = SQLITE_BUSY; } //在這裏設置文件鎖的狀態 pFile->locktype = newLocktype; return rc; }
在幾個關鍵的部位標記數字。
(I)對於一個讀事務會的完整通過:
語句序列:(1)——>(2)——>(6)
相應的狀態真正的變化過程爲:UNLOCKED→PENDING(1)→PENDING、SHARED(2)→SHARED(6)→UNLOCKED
(II)對於一個寫事務完整通過:
第一階段:
語句序列:(1)——>(2)——>(6)
狀態變化:UNLOCKED→PENDING(1)→PENDING、SHARED(2)→SHARED(6)。此時事務得到SHARED LOCK。
第二個階段:
語句序列:(3)
此時事務得到RESERVED LOCK。
第三個階段:
事務執行修改操做。
第四個階段:
語句序列:(1)——>(4)——>(5)
狀態變化爲:
RESERVED→ RESERVED 、PENDING(1)→PENDING(4)→EXCLUSIVE(5)。此時事務得到排斥鎖,就能夠進行寫磁盤操做了。
注:在上面的過程當中,因爲(1)的執行,使得某些時刻SQLite處於兩種狀態,但它持續的時間很短,從某種程度上來講能夠忽略,可是爲了把問題說清楚,在這裏描述了這一微妙而巧妙的過程。
四、SQLite的死鎖問題
SQLite的加鎖機制會不會出現死鎖?
這是一個頗有意思的問題,對於任何採起加鎖做爲併發控制機制的DBMS都得考慮這個問題。有兩種方式處理死鎖問題:(1)死鎖預防(deadlock prevention)(2)死鎖檢測(deadlock detection)與死鎖恢復(deadlock recovery)。SQLite採起了第一種方式,若是一個事務不能獲取鎖,它會重試有限次(這個重試次數能夠由應用程序運行預先設置,默認爲1次)——這其實是基本鎖超時的機制。若是仍是不能獲取鎖,SQLite返回SQLITE_BUSY錯誤給應用程序,應用程序此時應該中斷,以後再重試;或者停止當前事務。雖然基於鎖超時的機制簡單,容易實現,可是它的缺點也是明顯的——資源浪費。
五、事務類型(Transaction Types)
既然SQLite採起了這種機制,因此應用程序得處理SQLITE_BUSY錯誤,先來看一個會產生SQLITE_BUSY錯誤的例子:
因此應用程序應該儘可能避免產生死鎖,那麼應用程序如何作能夠避免死鎖的產生呢?
答案就是爲你的程序選擇正確合適的事務類型。
SQLite有三種不一樣的事務類型,這不一樣於鎖的狀態。事務能夠從DEFERRED、IMMEDIATE或者EXCLUSIVE,一個事務的類型在BEGIN命令中指定:
BEGIN [ DEFERRED | IMMEDIATE | EXCLUSIVE ] TRANSACTION;
一個deferred事務不獲取任何鎖,直到它須要鎖的時候,並且BEGIN語句自己也不會作什麼事情——它開始於UNLOCK狀態;默認狀況下是這樣的。若是僅僅用BEGIN開始一個事務,那麼事務就是DEFERRED的,同時它不會獲取任何鎖,當對數據庫進行第一次讀操做時,它會獲取SHARED LOCK;一樣,當進行第一次寫操做時,它會獲取RESERVED LOCK。 由BEGIN開始的Immediate事務會試着獲取RESERVED LOCK。若是成功,BEGIN IMMEDIATE保證沒有別的鏈接能夠寫數據庫。可是,別的鏈接能夠對數據庫進行讀操做,可是RESERVED LOCK會阻止其它的鏈接BEGIN IMMEDIATE或者BEGIN EXCLUSIVE命令,SQLite會返回SQLITE_BUSY錯誤。這時你就能夠對數據庫進行修改操做,可是你不能提交,當你COMMIT時,會返回SQLITE_BUSY錯誤,這意味着還有其它的讀事務沒有完成,得等它們執行完後才能提交事務。 Exclusive事務會試着獲取對數據庫的EXCLUSIVE鎖。這與IMMEDIATE相似,可是一旦成功,EXCLUSIVE事務保證沒有其它的鏈接,因此就可對數據庫進行讀寫操做了。 上面那個例子的問題在於兩個鏈接最終都想寫數據庫,可是他們都沒有放棄各自原來的鎖,最終,shared鎖致使了問題的出現。若是兩個鏈接都以BEGIN IMMEDIATE開始事務,那麼死鎖就不會發生。在這種狀況下,在同一時刻只能有一個鏈接進入BEGIN IMMEDIATE,其它的鏈接就得等待。BEGIN IMMEDIATE和BEGIN EXCLUSIVE一般被寫事務使用。就像同步機制同樣,它防止了死鎖的產生。 基本的準則是:若是你在使用的數據庫沒有其它的鏈接,用BEGIN就足夠了。可是,若是你使用的數據庫在其它的鏈接也要對數據庫進行寫操做,就得使用BEGIN IMMEDIATE或BEGIN EXCLUSIVE開始你的事務。