REDIS目前給出了一個異步的主從複製版本系統。在redis裏 提供了幾種方式來完成這個工做。 主從複製主要對應在redis/replication.c這個文件裏。源碼框架裏 分爲3部分: Master部分/SLAVE部分/複製核心部分redis
其實主從複製我我的以爲比較難的點就是在於每次重啓以後 master/slave傳遞數據的模式方式數據庫
首先對於slave來說 是主動鏈接他的master服務器
int connectWithMaster(void) { int fd; fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; server.repl_state = REDIS_REPL_CONNECTING; return REDIS_OK; }
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) 就是註冊一個可讀和可寫的事件 注意處理事件函數是syncwithMaster.框架
rep_state狀態就變成了REDIS_PREPL_CONNECTING.相關的server的replication作出相應的調整。 異步
如今咱們就進入syncwithMaster進去看看:socket
if (server.repl_state == REDIS_REPL_CONNECTING) { redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REDIS_REPL_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ syncWrite(fd,"PING\r\n",6,100); return; }
首先客戶端slave 發送一個PING給server master 這個是帶有超時的一個迴應, 狀態就改爲了REDIS_REPL_RECEIVE_PONG 按理來講Master收到了會作出相應的動做。對於slave端而言 下一步就是REDIS_REPL_RECEIVE_PONG這個狀態了。其實就是準備接受某個值了函數
buf[0] = '\0'; if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s", strerror(errno)); goto error; }
這是PONG狀態下的作的核心事情:讀出來 而後判斷是不是有相應的內容。oop
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; }
……if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR)
給MASETER發送一個SYNC 而後就進入可讀狀態 註冊了一個readSyncBulkPayload。等下來看看這個事件函數 後面要作的事情就是設置相應的位了:性能
server.repl_state = REDIS_REPL_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile);
repl_transfer_size設置-1 表示從master收到的文件大小爲-1 。狀態變成了REPL_TRANSFER。 如今進入readSyncBulkPayload看看這個函數是怎麼接受的:this
server.repl_transfer_size = strtol(buf+1,NULL,10);
首先肯定了對方要發送多大的文件 而後讀到buf 在寫到rdb相應的文件裏面。
left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); nread = read(fd,buf,readlen); ............................................................... ............................................................... ............................................................... write(server.repl_transfer_fd,buf,nread) != nread) server.repl_transfer_read += nread; /* Check if the transfer is now complete */ if (server.repl_transfer_read == server.repl_transfer_size) { if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); return; } redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); signalFlushedDb(-1); emptyDb(); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); if (rdbLoad(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); replicationAbortSyncTransfer(); return; }
若是2者是相等的 read和transfer_size相等 首先替換名字 替換成rdb_filename名字 而後清空db emptyDb()銷燬可讀事件 最後調用rdbLoad在本地從新構建一個key_value數據庫副本。【master的】 全部的動做的操做都已經完成了 。這裏的發送大小雙方應該會有一個限定。咱們能夠從master部分來找到相應的事件出來:
對於主服務器來說 除了客戶端發送了一個PING以後指望獲得主機的一個回覆以外 真正對這個主從複製有用的應該是從服務器這個操做:
write(fd,「SYNC\r\n」,buf).這個動做一發出: master就會調用SYNCcommand()來完成相應的拷貝動做: 首先進入SYNCcommand()函數進去看看是一個什麼狀況:
要完成複製 首先要在一個合適的時機:master進入了一個bgsave操做。要保證rdb文件是一個最新的文件。 對於master而言 先看看rdb_pid!=-1若是條件知足 代表正在作這個操做 master只須要等完成了才作相應的動做 而若是不是sync就會觸發一個bgsave操做。而後對於主進程而言: 都會設置狀態爲:WAIT_BGSAVE_END.這個時候syncCommand就完成了 而複製操做尚未開始 進行往下面看
而作bgsaveCommand操做時 都會調用一個function:updateSlavesWaitingBgsave 這樣就不會出現同步等待現象了。
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
打開相應的repldbfd,準備複製文件了:
aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR)
註冊了一個sendBulkToSlave.發給salve 這個部分要注意好:
須要注意的點: 1) 發送緩衝區大小怎麼設置 和slave是否是設置同樣 2)發送是一個同步的過程仍是異步的過程
REDIS_IOBUF_LEN: 1024*16 這個變量就是一次讀的rdb量 仍是進入sendBulkToSlave()看看:
{ redisClient *slave = privdata; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); char buf[REDIS_IOBUF_LEN]; ssize_t nwritten, buflen; if (slave->repldboff == 0) { /* Write the bulk write count before to transfer the DB. In theory here * we don't know how much room there is in the output buffer of the * socket, but in practice SO_SNDLOWAT (the minimum count for output * operations) will never be smaller than the few bytes we need. */ sds bulkcount; bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) slave->repldbsize); if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount)) { sdsfree(bulkcount); freeClient(slave); return; } sdsfree(bulkcount); } lseek(slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); if (buflen <= 0) { redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); freeClient(slave); return; } if ((nwritten = write(fd,buf,buflen)) == -1) { redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s", strerror(errno)); freeClient(slave); return; } slave->repldboff += nwritten; if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); slave->replstate = REDIS_REPL_ONLINE; if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { freeClient(slave); return; } redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); } }
若是repldoff==0 代表是第一次初始化 也就是會發送一個應該發送的長度數據給對方slave.這是第一次發送。注意這裏調用write若是成功以後會繼續。lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 每次會定位到相應的位置,這個很是惱火 調用磁盤的一個隨機操做,比較耗時 若是文件很大 對性能影響比較大。 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); 而後讀到內存中 而後在作write操做。由於沒有關閉事件模型,因此EPOLL輪詢時都會認爲這個事件還須要執行:仍是準備好的,因此繼續調用這個函數,從本質上來說 能夠算是一個異步的操做。因此不會出現服務的一箇中斷現象,可是lseek是比較耗時的,在複製完成了 關閉fd的可讀狀態 而且把replstate狀態標記成REPL_ONLINE,這個狀態就是命令傳播狀態。註冊了一個一個新的函數sendReplyToClient,固然把以前的函數事件del掉。因此每次Server端給的buf是比slave端小不少.主從複製核心就是這裏了。