pika主從複製原理之工做流程

上一篇pika主從複製原理之binlog中介紹了主從複製binlog的元信息、日誌的格式及對應的api,本篇介紹下主從複製有關的線程、全量複製過程、增量複製過程。redis

線程

PikaBinlogReceiverThread: 系統啓動時初始化,佔用端口port+1000,做爲Slave接收Master同步過來的Redis命令。

PikaHeartbeatThread: 系統啓動時初始化,佔用端口port+2000,做爲Master接收Slave發送的ping、spci指令,對全部Slave進行存活檢測

PikaTrysyncThread: 系統啓動時初始化,無角色,執行slaveof host port命令對應的後臺任務包括按期檢查是否須要跟某個Master創建鏈接、db替換、啓動或關閉rsync任務等,當啓動rsync服務時,佔用端口port+3000。

PikaSlavepingThread: 成爲某個Master的Slave後,做爲Slave的一方初始化,定時向Master發送ping、spci指令,若是超時超過30秒,生成之後關閉Master的後臺任務,由PikaBinlogReceiverThread來執行。

BinlogBGWorker: 系統啓動時初始化,每一個BinlogBGWorker包含一個binlogbg_thread,做爲Slave的後臺執行模塊,接收PikaBinlogReceiverThread的調度、執行具體的redis命令。

PikaBinlogSenderThread: 成爲某個Slave的Master後,做爲Master的一方初始化,根據slave傳過來的filenum、offset消費日誌,併發送到PikaBinlogReceiverThread所在的服務端口。
複製代碼

類圖

  • PikaBinlogReceiverThread

  • PikaBinlogSenderThread

主要API描述

  • Thread
//初始化

int Thread::InitHandle()

//執行線程內的定時任務

void Thread::CronHandle()

//建立線程,調用RunThread()

int Thread::StartThread()

//處理邏輯入口函數

void *Thread::RunThread(void *arg)
複製代碼
  • HolyThread
virtual int InitHandle()

1.初始化epoll監聽事件,並將套接字加入server_fds

virtual void *ThreadMain()

1.調用CronHandle,檢測是否存在須要執行的定時任務,若是有則執行。

2.若是server_fds上存在EPOLLIN事件,則接受鏈接請求,創建slave-master鏈接

3.若是是slave-master鏈接上存在EPOLLIN事件,則調用in_conn->GetRequest()進行具體redis命令處理。

4.若是是slave-master鏈接上存在EPOLLOUT事件,則調用in_conn->SendReply()發送應答。
複製代碼
  • PikaBinlogReceiverThread
//執行後臺任務,PikaSlavepingThread檢測到應該退出或者主掛掉時,添加Kill PikaBinlogSenderThread後臺任務。

void PikaBinlogReceiverThread::CronHandle()
複製代碼
  • PikaMasterConn
int PikaMasterConn::DealMessage()

1.若是存在monitor,則將當前命令發到monitor一份(即redis的monitor命令)。

2.生成全局日誌序號serial,保證按順序併發寫入slave binlog。

3.若是是隻讀,則由PikaBinlogReceiverThread完成binlog日誌的追加,不然由PikaBinlogReceiverThread分配的BinlogBGWorker完成。
複製代碼
  • BinlogBGWorker
void Schedule()

1.第一次調度時啓動後臺線程。

2.經過DoBinlogBG執行具體的後臺任務。

void BinlogBGWorker::DoBinlogBG(void* arg)

1.根據解析的參數獲取須要執行的命令。

2.若是slave非只讀,則獲取一個全局記錄鎖(應該是用不到的)。

3.記錄binlog。

4.執行命令。

5.記錄慢日誌。
複製代碼

Slaveof流程圖

  • slave

  • master

  • slaveof過程
1.若是是slaveof no one,中止rsync服務,刪除master,repl_state_ = PIKA_REPL_NO_CONNECT,role=master

2.若是slaveof ip port filenum offset,則將filenum以前的binlog刪除,同時若是filenum存在,則將offset以前的文件內容填充空格。

3.role更新爲slave,repl_state更新爲PIKA_REPL_CONNECT,應答成功。

//後臺任務部分

4.PikaTrysyncThread檢測到repl_state==PIKA_REPL_CONNECT,在端口port+3000啓動rsync服務,準備接受master的db內容。

5.slave創建與master的鏈接,若是有auth,則主動發送auth命令。

6.發送trysync命令,主動要求master進行數據同步。

7.若是master應答結果爲kInnerReplWait,則repl_state_ = PIKA_REPL_WAIT_DBSYNC。

8.slave會一直等待,直到db_sync_path目錄下存在info文件時,用新的db替換以前的db,根據info中的filenum、offset更新slave對應的filenum、offset,重置repl_state_ = PIKA_REPL_CONNECT

9.再次執行3-6步驟,master應答kInnerReplOk,更新repl_state_ = PIKA_REPL_CONNECTING,停掉rsync服務,建立PikaSlavepingThread,在PikaSlavepingThread ping master成功之後更新repl_state_ = PIKA_REPL_CONNECTED,主從同步正式創建。
複製代碼
  • trysync過程
1.根據slave的ip、port構造slave的惟一標識,stage=SLAVE_ITEM_STAGE_ONE。

2.若是master沒有找到slave傳過來的filenum,則執行bgsave,生成一份db鏡像和info信息,經過rsync傳給slave,並清理掉master的臨時文件

3.根據slave的ip、port,bgsave的filenum、offset構建PikaBinlogSenderThread

4.過濾binlog中可能已經損壞的內容(什麼場景可能會用到?可能主從數據不一致嗎?)

5.更新role_ = PIKA_ROLE_MASTER,PikaSlavepingThread第二次存活檢測時發送spci命令,主根據PikaBinlogSenderThread,更新stage=SLAVE_ITEM_STAGE_TWO,主從同步正式創建。
複製代碼

結語

我的認爲主從複製是pika裏面體量最大也是最複雜的一個模塊,經過採用了相似LevelDB的WAL日誌的方式,固然因爲redis命令的問題,這個日誌非冪等的,不是爲了在啓動時重放,而是解決了redis中增量複製buffer被打滿的問題。(基於2.x版本,目前pika已是3.x版本,主從複製有所變化,先遷移到此處,後續會更新)api

相關文章
相關標籤/搜索