Redis 源碼學習之 Redis 事務

 

做者:天涼好個秋redis

 

 

Redis做爲一個內存型數據庫,一樣支持傳統數據庫的事務特性。這篇文章會從源代碼角度來分析Redis中事務的實現原理。算法

 

What數據庫

 

Redis事務提供了一種將多個命令請求打包,而後一次性、按照順序地執行多個命令的機制,而且在事務執行的期間,服務器不會中斷事務而去執行其餘不在事務中的命令請求,它會把事務中全部的命令都執行完畢纔會去執行其餘的命令。編程

 

How數組

 

Redis中提供了multi、discard、exec、watch、unwatch這幾個命令來實現事務的功能。服務器

 

Redis的事務始於multi命令,以後跟着要在事務中執行的命令,終於exec命令或者discard命令。加入事務中的全部命令會原子的執行,中間不會穿插執行其餘沒有加入事務的命令。async

 

multi、exec和discardide

 

multi命令告訴Redis客戶端要開始一個事物,而後Redis會返回一個OK,接下來全部的命令Redis都不會當即執行,只會返回QUEUED結果,直到遇到了exec命令纔會去執行以前的全部的命令,或者遇到了discard命令,會拋棄執行以前加入事務的命令。函數

 

127.0.0.1:6379> get namethis

(nil)

127.0.0.1:6379> get gender

(nil)

127.0.0.1:6379> multi

OK

127.0.0.1:6379> set name Slogen

QUEUED

127.0.0.1:6379> set gender male

QUEUED

127.0.0.1:6379> exec

1) OK

2) OK

127.0.0.1:6379> mget name gender

1) "Slogen"

2) "male"

 

watch

 

watch命令是Redis提供的一個樂觀鎖,能夠在exec執行以前,監視任意數量的數據庫key,並在exec命令執行的時候,檢測被監視的key是否至少有一個已經被修改,若是是的話,服務器將拒絕執行事務,並向客戶端返回表明事務執行失敗的空回覆。

 

首先在client1執行下列命令:

 

127.0.0.1:6379> get name

(nil)

127.0.0.1:6379> watch name

OK

127.0.0.1:6379> multi

OK

127.0.0.1:6379> set name slogen

QUEUED

127.0.0.1:6379> set gender male

QUEUED

127.0.0.1:6379> get name

QUEUED

 

這個時候client尚未執行exec命令,接下來在client2下執行下面命令修改name:

 

127.0.0.1:6379> set name rio

OK

127.0.0.1:6379> get name

"rio"

 

接下來在client1下執行exec命令:

 

127.0.0.1:6379> exec

(nil)

127.0.0.1:6379> get name

"rio"

 

從執行結果能夠看到,在client1中執行exec命令的時候,Redis會檢測到name字段已經被其餘客戶端修改了,因此拒絕執行事務中全部的命令,直接返回nil表示執行失敗。這個時候獲取到的name的值仍是在client2中設置的rio。

 

Why

 

multi

 

Redis的事務始於multi命令,那麼就從multi命令的源代碼開始分析。

 

當Redis接收到客戶端發送過來的命令以後會執行multiCommand()這個方法,這個方法在multi.c文件中。

 

void multiCommand(client *c) {

    // 1. 若是檢測到flags裏面已經包含了CLIENT_MULTI

    // 表示對應client已經處於事務的上下文中,返回錯誤

    if (c->flags & CLIENT_MULTI) {

        addReplyError(c,"MULTI calls can not be nested");

        return;

    }

    // 2. 開啓flags的CLIENT_MULTI標識

    c->flags |= CLIENT_MULTI;

    // 3. 返回ok,告訴客戶端已經成功開啓事務

    addReply(c,shared.ok);

}

 

從源代碼中能夠看到,multiCommand()主要完成下面三件事:

 

  1. 檢測發送multi命令的client是否已經處於事務中,若是是則直接返回錯誤。從這裏能夠看到,Redis不支持事務嵌套執行。

     

  2. 給對應client的flags標誌位中增長MULTI_CLIENT標誌,表示已經進入事務中。

     

  3. 返回OK告訴客戶端已經成功開啓事務。

 

從前面的文章中能夠知道,Redis接收到全部的Client發送過來的命令後都會執行到processCommand()這個方法中,在processCommand()中有下面這部分代碼:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

在processCommand()執行實際的命令以前會先判斷對應的client是否已經處於事務的上下文中,若是是的話,且須要執行的命令不是exec、discard、multi和watch這四個命令中的任何一個,則調用queueMultiCommand()方法把須要執行的命令加入隊列中,不然的話調用call()直接執行命令。

 

queueMultiCommand()

 

Redis調用queueMultiCommand()方法把加入事務的命令加入Redis隊列中,實現以下:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

queueMultiCommand()方法主要是把要加入事務的命令封裝在multiCmd結構的變量,而後放置到client->mstate.commands數組中去,multiCmd的定義以下:

 

typedef struct multiCmd {

    robj **argv; // 命令的參數數組

    int argc; // 命令的參數個數

    struct redisCommand *cmd; // 要執行的命令

} multiCmd;

 

而mstate字段定義爲:

 

typedef struct client {

    // 其餘省略代碼

    multiState mstate;      /* MULTI/EXEC state */

} client;

 

multiState的結構爲:

 

typedef struct multiState {

    multiCmd *commands;     /* Array of MULTI commands */

    int count;              /* Total number of MULTI commands */

    int minreplicas;        /* MINREPLICAS for synchronous replication */

    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */

} multiState;

 

  • commands:multiCmd類型的數組,存放着事務中全部的要執行的命令

     

  • count:當前事務中全部已經存放的命令的個數

 

另外兩個字段當前版本中(3.2.28)沒用上。

 

假設當前事務隊列中已經存在set name slogen和lpush num 20這兩個命令的時候,client中的mstate的數據以下:


watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

這個時候再往事務中添加get name這個命令的時候結構圖以下:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

錯誤命令:CLIENT_DIRTY_EXEC

 

那麼有個問題,好比我往事務中添加的命令是個不存在的命令,或者命令使用方式,好比命令參數不對,這個時候這個命令會被加入事務嗎?


前面說了,Redis接收到的全部的命令都是執行到processCommand()這個方法,在實際執行對應的命令前,processCommand()方法都會對將要執行的命令進行一系列的檢查,代碼以下:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

從上面代碼能夠看到,processCommand()在對要執行的命令進行的一系列檢查的時候若是有任何一項檢測失敗都會調用flagTransaction()函數而後返回對應的信息給客戶端,flagTransaction()實現以下:

 

void flagTransaction(client *c) {

    if (c->flags & CLIENT_MULTI)

        // 若是flags包含CLIENT_MULTI標誌位,表示已經處於事務上下文中

        // 則給對應的client的flags開啓CLIENT_DIRTY_EXEC標誌位

        c->flags |= CLIENT_DIRTY_EXEC;

}

 

flagTransaction()方法會檢測對應的client是否處於事務的上下文中,若是是的話就給對應的client的flags字段開啓CLIENT_DIRTY_EXEC標誌位。

也就是說,若是命令在加入事務的時候因爲各類緣由,好比命令不存在,或者對應的命令參數不正確,則對應的命令不會被添加到mstate.commands數組中,且同時給對應的client的flags字段開啓CLIENT_DIRTY_EXEC標誌位。

 

watch命令

 

當client處於事務的上下文中時,watch命令屬於能夠被當即執行的幾個命令之一,watch命令對應的代碼爲watchCommand()函數,實現以下:

 

void watchCommand(client *c) {

    int j;

 

    if (c->flags & CLIENT_MULTI) {

        // 若是執行watch命令的client處於事務的上下文中則直接返回

        addReplyError(c,"WATCH inside MULTI is not allowed");

        return;

    }

    for (j = 1; j < c->argc; j++)

        // 對傳入的每一個要watch的能夠調用watchForKey()

        watchForKey(c,c->argv[j]);

    addReply(c,shared.ok);

}

 

watchCommand()方法會首先判斷執行watch的命令是否已經處於事務的上下文中,若是是的話則直接報錯返回,說明在Redis事務中不能調用watch命令。

 

接下來對於watch命令傳入的全部的key,依次調用watchForKey()方法,定義以下:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

watchForKey()方法會作下面幾件事:

 

  1. 判斷對應的key是否已經存在於client->watched_keys列表中,若是已經存在則直接返回。client->watched_keys保存着對應的client對象全部的要監視的key。

     

  2. 若是不存在,則去client->db->watched_keys中查找全部的已經監視了這個key的client對象。client->db->watched_keys以dict的結構保存了全部的監視這個key的client列表。

     

  3. 若是第二步中的列表存在,則把執行watch命令的client添加到這個列表的尾部,若是不存在,表示尚未任何一個client監視這個key,則新建一個列表,添加到client->db->watched_keys中,而後把執行watch命令的client添加到新生成的列表的尾部。

     

  4. 把傳入的key封裝成一個watchedKey結構的變量,添加到client->watched_key列表的最後面。

 

假設當前client->db->watched_keys的監測狀況以下圖所示:


watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

而client->watched_keys的監測狀況以下:


watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

這個時候client_A執行watch key1 key2 key3這個命令,執行完命令以後client->db->watched_keys結果爲


watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

而client->watched_keys結果爲


watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

對於key1,目前尚未client對key1進行監視,因此這個時候client_A會新建一個列表,把本身添加到這個列表中而後把映射關係添加到client->db->watched_keys中去,以後會把key1添加到client->watched_keys列表的最後。


對於key2,因爲已經存在於watched_keys列表中,因此會直接返回不作任何處理。


對於key3,因爲client->db->watched_keys中已經有client_B和client_C在監視它,因此會直接把client_A添加到監視列表的末尾以後再把key3添加到client_A的監視列表中去。

 

修改數據:CLIENT_DIRTY_CAS

 

watch命令的做用就是用在事務中檢測被監視的key是否被其餘的client修改了,若是已經被修改,則阻止事務的執行,那麼這個功能是怎麼實現的呢?


這裏以set命令爲例進行分析。


假設client_A執行了watch name這個命令而後執行multi命令開啓了事務可是尚未執行exec命令,這個時候client_B執行了set name slogen這個命令,整個過程以下:

 

時間 client_A client_B
T1 watch name  
T2 multi  
T3 get name  
T4   set name slogen
T5 exec  

 

在T4的時候client_B執行了set命令修改了name,Redis收到set命令以後會執行setCommand方法,實現以下:

 

void setCommand(client *c) {

    // 其餘省略代碼

    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);

}

 

在setCommand()最後會調用setGenericCommand()方法,改方法實現以下:

 

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {

    // 其餘省略代碼

    setKey(c->db,key,val);

    // 其餘省略代碼

}

 

在setGenericCommand()方法中會調用setKey()這個方法,接着看下setKey()這個方法:

 

void setKey(redisDb *db, robj *key, robj *val) {

    if (lookupKeyWrite(db,key) == NULL) {

        dbAdd(db,key,val);

    } else {

        dbOverwrite(db,key,val);

    }

    incrRefCount(val);

    removeExpire(db,key);

    // 通知修改了key

    signalModifiedKey(db,key);

}

 

在setKey()方法最後會調用signaleModifiedKey()通知redis數據庫中有數據被修改,signaleModifiedKey()方法實現以下:

 

void signalModifiedKey(redisDb *db, robj *key) {

    touchWatchedKey(db,key);

}

 

能夠看到signalModifiedKey()也僅僅是調用touchWatchedKey()方法,代碼以下:

 

void touchWatchedKey(redisDb *db, robj *key) {

    list *clients;

    listIter li;

    listNode *ln;

 

    if (dictSize(db->watched_keys) == 0) return;

    // 1. 從redisDb->watched_keys中找到對應的client列表

    clients = dictFetchValue(db->watched_keys, key);

    if (!clients) return;

 

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */

    /* Check if we are already watching for this key */

    listRewind(clients,&li);

    while((ln = listNext(&li))) {

        // 2.依次遍歷client列表,給每一個client的flags字段

        // 開啓CLIENT_DIRTY_CAS標識位

        client *c = listNodeValue(ln);

        c->flags |= CLIENT_DIRTY_CAS;

    }

}

 

touchWatchedKey()方法會作下面兩件事:

 

  1. 從redisDb->watched_keys中找到監視這個key的client列表。前面在分析watch命令的時候說過,若是有client執行了watch keys命令,那麼redis會以鍵值對的形式把(key,client)的對應關係保存在redisDb->watched_key這個字段裏面。

     

  2. 對於第一步中找到的每一個client對象,都會給這個client的flags 字段開啓CLIENT_DIRTY_CAS標誌位。

 

在Redis裏面全部會修改數據庫內容的命令最後都會調用signalModifiedKey()這個方法,而在signalModifiedKey()會給全部的監視這個key的client增長CLIENT_DIRTY_CAS標誌位。

 

exec命令

 

exec命令用來執行事務,對應的代碼爲execCommand()這個方法,實現以下:

 

void execCommand(client *c) {

    int j;

    robj **orig_argv;

    int orig_argc;

    struct redisCommand *orig_cmd;

    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */

 

    // 1. 判斷對應的client是否屬於事務中

    if (!(c->flags & CLIENT_MULTI)) {

        addReplyError(c,"EXEC without MULTI");

        return;

    }

    /**

     * 2. 檢查是否須要執行事務,在下面兩種狀況下不會執行事務

     * 1) 有被watch的key被其餘的客戶端修改了,對應於CLIENT_DIRTY_CAS標誌位被開啓

     * ,這個時候會返回一個nil,表示沒有執行事務

     * 2) 有命令在加入事務隊列的時候發生錯誤,對應於CLIENT_DIRTY_EXEC標誌位被開啓

     * ,這個時候會返回一個execaborterr錯誤

     */

    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {

        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :

                                                  shared.nullmultibulk);

        // 取消全部的事務

        discardTransaction(c);

        goto handle_monitor;

    }

 

    /* Exec all the queued commands */

    // 3. unwatch全部被這個client watch的key

    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */

    orig_argv = c->argv;

    orig_argc = c->argc;

    orig_cmd = c->cmd;

    addReplyMultiBulkLen(c,c->mstate.count);

    // 4. 依次執行事務隊列中全部的命令

    for (j = 0; j < c->mstate.count; j++) {

        c->argc = c->mstate.commands[j].argc;

        c->argv = c->mstate.commands[j].argv;

        c->cmd = c->mstate.commands[j].cmd;

 

        /* Propagate a MULTI request once we encounter the first write op.

         * This way we'll deliver the MULTI/..../EXEC block as a whole and

         * both the AOF and the replication link will have the same consistency

         * and atomicity guarantees. */

        if (!must_propagate && !(c->cmd->flags & CMD_READONLY)) {

            execCommandPropagateMulti(c);

            must_propagate = 1;

        }

 

        call(c,CMD_CALL_FULL);

 

        /* Commands may alter argc/argv, restore mstate. */

        c->mstate.commands[j].argc = c->argc;

        c->mstate.commands[j].argv = c->argv;

        c->mstate.commands[j].cmd = c->cmd;

    }

    c->argv = orig_argv;

    c->argc = orig_argc;

    c->cmd = orig_cmd;

    // 5. 重置這個client對應的事務相關的全部的數據

    discardTransaction(c);

    /* Make sure the EXEC command will be propagated as well if MULTI

        * was already propagated. */

    if (must_propagate) server.dirty++;

 

handle_monitor:

    if (listLength(server.monitors) && !server.loading)

        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);

}

 

execCommand()方法會作下面幾件事:

 

  1. 判斷對應的client是否已經處於事務中,若是不是,則直接返回錯誤。

     

  2. 判斷時候須要執行事務中的命令。在下面兩種狀況下不會執行事務而是返回錯誤。

     

    1. 有被監視的key被其餘的客戶端修改了,對應於CLIENT_DIRTY_CAS標誌位被開啓,這個時候會返回一個nil,表示沒有執行事務。

       

    2. 有命令在加入事務隊列的時候發生錯誤,對應於CLIENT_DIRTY_EXEC標誌位被開啓,這個時候會返回一個execaborterr錯誤。

       

  3. unwatch全部被這個client監視的key。

     

  4. 依次執行事務隊列中全部的命令。

     

  5. 重置這個client對應的事務相關的全部的數據。

 

discard

 

使用discard命令能夠取消一個事務,對應的方法爲discardCommand(),實現以下:

 

void discardCommand(client *c) {

    // 1. 檢查對應的client是否處於事務中

    if (!(c->flags & CLIENT_MULTI)) {

        addReplyError(c,"DISCARD without MULTI");

        return;

    }

    // 2. 取消事務

    discardTransaction(c);

    addReply(c,shared.ok);

}

 

discardCommand()方法首先判斷對應的client是否處於事務中,若是不是則直接返回錯誤,不然的話會調用discardTransaction()方法取消事務,該方法實現以下:

 

void discardTransaction(client *c) {

    // 1. 釋放全部跟MULTI/EXEC狀態相關的資源    

    freeClientMultiState(c);

    // 2. 初始化相應的狀態

    initClientMultiState(c);

    // 3. 取消對應client的3個標誌位

    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);

    // 4.unwatch全部已經被watch的key

    unwatchAllKeys(c);

}

 

Other

 

Atomic:原子性

 

原子性是指一個事務(transaction)中的全部操做,要麼所有完成,要麼所有不完成,不會結束在中間某個環節。


對於Redis的事務來講,事務隊列中的命令要麼所有執行完成,要麼一個都不執行,所以Redis的事務是具備原子性的。


注意Redis不提供事務回滾機制。

 

Consistency:一致性

 

事務的一致性是指事務的執行結果必須是使事務從一個一致性狀態變到另外一個一致性狀態,不管事務是否執行成功。

 

  1. 命令加入事務隊列失敗(參數個數不對?命令不存在?),整個事務不會執行。因此事務的一致性不會被影響。

     

  2. 使用了watch命令監視的key只事務期間被其餘客戶端修改,整個事務不會執行。也不會影響事務的一致性。

     

  3. 命令執行錯誤。若是事務執行過程當中有一個活多個命令錯誤執行失敗,服務器也不會中斷事務的執行,會繼續執行事務中剩下的命令,而且已經執行的命令不會受任何影響。出錯的命令將不會執行,也就不會對數據庫作出修改,所以這種狀況下事物的一致性也不會受到影響。

     

  4. 服務器宕機。服務器宕機的狀況下的一致性能夠根據服務器使用的持久化方式來分析。

     

    1. 無持久化模式下,事務是一致的。這種狀況下重啓以後的數據庫沒有任何數據,所以老是一致的。

       

    2. RDB模式下,事務也是一致的。服務器宕機重啓以後能夠根據RDB文件來恢復數據,從而將數據庫還原到一個一致的狀態。若是找不到可使用的RDB文件,那麼重啓以後數據庫是空白的,那也是一致的。

       

    3. AOF模式下,事務也是一致的。服務器宕機重啓以後能夠根據AOF文件來恢復數據,從而將數據庫還原到一個一直的狀態。若是找不到可使用的AOF文件,那麼重啓以後數據庫是空白的,那麼也是一致的。

 

Isolation:隔離性

 

Redis 是單進程程序,而且它保證在執行事務時,不會對事務進行中斷,事務能夠運行直到執行完全部事務隊列中的命令爲止。所以,Redis 的事務是老是帶有隔離性的。

 

Durability:持久性

 

Redis事務並無提供任何的持久性功能,因此事務的持久性是由Redis自己所使用的持久化方式來決定的。

 

  • 在單純的內存模式下,事務確定是不持久的。

     

  • 在RDB模式下,服務器可能在事務執行以後RDB文件更新以前的這段時間失敗,因此RDB模式下的Redis事務也是不持久的。

     

  • 在AOF的always模式下,事務的每條命令在執行成功以後,都會當即調用fsync或fdatasync將事務數據寫入到AOF文件。可是,這種保存是由後臺線程進行的,主線程不會阻塞直到保存成功,因此從命令執行成功到數據保存到硬盤之間,仍是有一段很是小的間隔,因此這種模式下的事務也是不持久的。

     

  • 其餘AOF模式也和always模式相似,因此它們都是不持久的。

 

結論:Redis的事務知足原子性、一致性和隔離性,可是不知足持久性。

 

Reference

 

  1. Redis源碼(3.2.28)

  2. 《Redis設計與實現》

 

 

 

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

 

另外,新關注咱們公衆號的朋友,能夠在公衆號後臺聊天框回覆【1024】,能夠免費獲取2T的編程視頻,算法、人工智能、Python、Java、Linux、Go、C語言、軟考、英語等等資源 。

 

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

相關文章
相關標籤/搜索