redis學習筆記——事件處理

Redis服務器是一個事件驅動程序,服務器須要處理如下兩類事件:redis

  • 文件事件(file event):Redis服務器經過套接字與客戶端(或者其餘Redis服務器)進行鏈接,而文件事件就是服務器對套接字操做的抽象。服務器與客戶端(或者其餘服務器)的通訊會產生相應的文件事件,而服務器則經過監聽並處理這些事件來完成一系列網絡通訊操做;
  • 時間事件(time event):Redis服務器中的一些操做(好比serverCron函數)須要在給定的時間點執行,而時間事件就是服務器對這類定時操做的抽象。

文件事件

文件事件處理器

  • 文件事件處理器使用I/O多路複用(multiplexing)程序來同時監聽多個套接字,並根據套接字目前執行的任務來爲套接字關聯不一樣的事件處理器;
  • 當被監聽的套接字準備好執行鏈接應答(accept)、讀取(read)、寫入(write)、關閉(close)等操做時,與操做相對應的文件事件就會產生,這時文件事件處理器就會調用套接字以前關聯好的事件處理器來處理這些事件。

組成數據庫

文件事件處理器的四個組成部分,它們分別是套接字、I/O多路複用程序、文件事件分派器(dispatcher),以及事件處理器。服務器

文件事件處理器的四個組成部分網絡

文件事件是對套接字操做的抽象,每當一個套接字準備好執行鏈接應答(accept)、寫入、讀取、關閉等操做時,就會產生一個文件事件。由於一個服務器一般會鏈接多個套接字,因此多個文件事件有可能會併發地出現。併發

I/O多路複用程序負責監聽多個套接字,並向文件事件分派器傳送那些產生了事件的套接字。socket

儘管多個文件事件可能會併發地出現,但I/O多路複用程序老是會將全部產生事件的套接字都放到一個隊列裏面,而後經過這個隊列,以有序(sequentially)、同步(synchronously)、每次一個套接字的方式向文件事件分派器傳送套接字。函數

文件事件分派器接收I/O多路複用程序傳來的套接字,並根據套接字產生的事件的類型,調用相應的事件處理器;性能

事件處理器是一個個函數,它們定義了某個事件發生時,服務器應該執行的動做。測試

I/O多路複用程序的實現

Redis的I/O多路複用程序的全部功能都是經過包裝常見的select、epoll、evport和kqueue這些I/O多路複用函數庫來實現的,每一個I/O多路複用函數庫在Redis源碼中都對應一個單獨的文件,好比ae_select.c、ae_epoll.c、ae_kqueue.c,諸如此類。spa

由於Redis爲每一個I/O多路複用函數庫都實現了相同的API,因此I/O多路複用程序的底層實現是能夠互換,如圖:

Redis在I/O多路複用程序的實現源碼中用#include宏定義了相應的規則,程序會在編譯時自動選擇系統中性能最高的I/O多路複用函數庫來做爲Redis的I/O多路複用程序的底層實現:

# ifdef HAVE_EVPORT
# include "ae_evport.c"else
    # ifdef HAVE_EPOLL
    # include "ae_epoll.c"
    # else
        # ifdef HAVE_KQUEUE
        # include "ae_kqueue.c"
        # else
        # include "ae_select.c"
        # endif
    # endif
# endif

事件的類型

I/O多路複用程序能夠監聽多個套接字的ae.h/AE_READABLE事件和ae.h/AE_WRITABLE事件,這兩類事件和套接字操做之間的對應關係以下:

  • 當套接字變得可讀時(客戶端對套接字執行write操做,或者執行close操做),或者有新的可應答(acceptable)套接字出現時(客戶端對服務器的監聽套接字執行connect操做),套接字產生AE_READABLE事件;
  • 當套接字變得可寫時(客戶端對套接字執行read操做),套接字產生AE_WRITABLE事件。

I/O多路複用程序容許服務器同時監聽套接字的AE_READABLE事件和AE_WRITABLE事件,若是一個套接字同時產生了這兩種事件,那麼文件事件分派器會優先處理AE_READABLE事件,等到AE_READABLE事件處理完以後,才處理AE_WRITABLE事件。

API

  • ae.c/aeCreateFileEvent函數接受一個套接字描述符、一個事件類型,以及一個事件處理器做爲參數,將給定套接字的給定事件加入到I/O多路複用程序的監聽範圍以內,並對事件和事件處理器進行關聯;
  • ae.c/aeDeleteFileEvent函數接受一個套接字描述符和一個監聽事件類型做爲參數,讓I/O多路複用程序取消對給定套接字的給定事件的監聽,並取消事件和事件處理器之間的關聯;
  • ae.c/aeGetFileEvents函數接受一個套接字描述符,返回該套接字正在被監聽的事件類型:
    • 若是套接字沒有任何事件被監聽,那麼函數返回AE_NONE;
    • 若是套接字的讀事件正在被監聽,那麼函數返回AE_READABLE;
    • 若是套接字的寫事件正在被監聽,那麼函數返回AE_WRITABLE;
    • 若是套接字的讀事件和寫事件正在被監聽,那麼函數返回AE_READABLE|AE_WRITABLE。
  • ae.c/aeWait函數接受一個套接字描述符、一個事件類型和一個毫秒數爲參數,在給定的時間內阻塞並等待套接字的給定類型事件產生,當事件成功產生,或者等待超時以後,函數返回
  • ae.c/aeApiPoll函數接受一個sys/time.h/struct timeval結構爲參數,並在指定的時間內,阻塞並等待全部被aeCreateFileEvent函數設置爲監聽狀態的套接字產生文件事件,當有至少一個事件產生,或者等待超時後,函數返回;
  • ae.c/aeProcessEvents函數是文件事件分派器,它先調用aeApiPoll函數來等待事件產生,而後遍歷全部已產生的事件,並調用相應的事件處理器來處理這些事件;
  • ae.c/aeGetApiName函數返回I/O多路複用程序底層所使用的I/O多路複用函數庫的名稱:返回"select"表示底層爲select函數庫,諸如此類。

文件事件的處理器

鏈接應答處理器

networking.c/acceptTcpHandler函數是Redis的鏈接應答處理器,這個處理器用於對鏈接服務器監聽套接字的客戶端進行應答,其主要調用anet.c中anetTcpAccept函數實現,具體實現爲sys/socket.h/accept函數的包裝。

當Redis服務器進行初始化的時候,程序會將這個鏈接應答處理器和服務器監聽套接字的AE_READABLE事件關聯起來,當有客戶端用sys/socket.h/connect函數鏈接服務器監聽套接字的時候,套接字就會產生AE_READABLE事件,引起鏈接應答處理器執行,並執行相應的套接字應答操做。

命令請求處理器

networking.c/readQueryFromClient函數是Redis的命令請求處理器,這個處理器負責從套接字中讀入客戶端發送的命令請求內容,具體實現爲unistd.h/read函數的包裝。

當一個客戶端經過鏈接應答處理器成功鏈接到服務器以後,服務器會將客戶端套接字的AE_READABLE事件和命令請求處理器關聯起來,當客戶端向服務器發送命令請求的時候,套接字就會產生AE_READABLE事件,引起命令請求處理器執行,並執行相應的套接字讀入操做;

在客戶端鏈接服務器的整個過程當中,服務器都會一直爲客戶端套接字的AE_READABLE事件關聯命令請求處理器。

命令回覆處理器

networking.c/sendReplyToClient函數是Redis的命令回覆處理器,這個處理器負責將服務器執行命令後獲得的命令回覆經過套接字返回給客戶端,具體實現爲unistd.h/write函數的包裝。

服務器有命令回覆須要傳送給客戶端的時候服務器會將客戶端套接字的AE_WRITABLE事件和命令回覆處理器關聯起來,當客戶端準備好接收服務器傳回的命令回覆時,就會產生AE_WRITABLE事件,引起命令回覆處理器執行,並執行相應的套接字寫入操做

時間事件

時間事件分爲如下兩類:

  • 定時事件:讓一段程序在指定的時間以後執行一次。好比說,讓程序X在當前時間的30毫秒以後執行一次;
  • 週期性事件:讓一段程序每隔指定時間就執行一次。好比說,讓程序Y每隔30毫秒就執行一次。

時間事件主要由如下三個屬性組成:

  • id:服務器爲時間事件建立的全局惟一ID(標識號)。ID號按從小到大的順序遞增,新事件的ID號比舊事件的ID號要大;
  • when:毫秒精度的UNIX時間戳,記錄了時間事件的到達(arrive)時間;
  • timeProc:時間事件處理器,一個函數。當時間事件到達時,服務器就會調用相應的處理器來處理事件。

一個時間事件是定時事件仍是週期性事件取決於時間事件處理器的返回值:

  • 若是事件處理器返回ae.h/AE_NOMORE,那麼這個事件爲定時事件:該事件在達到一次以後就會被刪除,以後再也不到達;
  • 若是事件處理器返回一個非AE_NOMORE的整數值,那麼這個事件爲週期性時間:當一個時間事件到達以後,服務器會根據事件處理器返回的值,對時間事件的when屬性進行更新,讓這個事件在一段時間以後再次到達,並以這種方式一直更新並運行下去。好比說,若是一個時間事件的處理器返回整數值30,那麼服務器應該對這個時間事件進行更新,讓這個事件在30毫秒以後再次到達。(如今的Redis主要使用這個)

API

ae.c/aeCreateTimeEvent函數接受一個毫秒數milliseconds和一個時間事件處理器proc做爲參數,將一個新的時間事件添加到服務器,這個新的時間事件將在當前時間的milliseconds毫秒以後到達,而事件的處理器爲proc。

ae.c/aeDeleteFileEvent函數接受一個時間事件ID做爲參數,而後從服務器中刪除該ID所對應的時間事件;

ae.c/aeSearchNearestTimer函數返回到達時間距離當前時間最接近的那個時間事件;

ae.c/processTimeEvents函數是時間事件的執行器,這個函數會遍歷全部已到達的時間事件,並調用這些事件的處理器。已到達指的是,時間事件的when屬性記錄的UNIX時間戳等於或小於當前時間的UNIX時間戳。

processTimeEvents函數的定義能夠用如下僞代碼來描述:

def processTimeEvents():
    # 
遍歷服務器中的全部時間事件
    for time_event in all_time_event():
        # 
檢查事件是否已經到達
        if time_event.when <= unix_ts_now():
            # 
事件已到達
            # 
執行事件處理器,並獲取返回值
            retval = time_event.timeProc()
            # 
若是這是一個定時事件
            if retval == AE_NOMORE:
                # 
那麼將該事件從服務器中刪除
                delete_time_event_from_server(time_event)
        # 
若是這是一個週期性事件
        else:
            # 
那麼按照事件處理器的返回值更新時間事件的 when 
屬性
            # 
讓這個事件在指定的時間以後再次到達
            update_when(time_event, retval)

 

時間事件應用實例:serverCron函數

持續運行的Redis服務器須要按期對自身的資源和狀態進行檢查和調整,從而確保服務器能夠長期、穩定地運行,這些按期操做由redis.c/serverCron函數負責執行,它的主要工做包括:

  • 更新服務器的各種統計信息,好比時間、內存佔用、數據庫佔用狀況等;
  • 清理數據庫中的過時鍵值對;
  • 關閉和清理鏈接失效的客戶端;
  • 嘗試進行AOF或RDB持久化操做;
  • 若是服務器是主服務器,那麼對從服務器進行按期同步;
  • 若是處於集羣模式,對集羣進行按期同步和鏈接測試;

事件的調度與執行

事件的調度和執行由ae.c/aeProcessEvents函數負責。能夠用一下源代碼完成:

def aeProcessEvents():
    # 
獲取到達時間離當前時間最接近的時間事件
    time_event = aeSearchNearestTimer()
    # 
計算最接近的時間事件距離到達還有多少毫秒
    remaind_ms = time_event.when - unix_ts_now()
    # 
若是事件已到達,那麼remaind_ms
的值可能爲負數,將它設定爲0
    if remaind_ms < 0:
        remaind_ms 0
    # 
根據remaind_ms
的值,建立timeval
結構
    timeval = create_timeval_with_ms(remaind_ms)
    # 
阻塞並等待文件事件產生,最大阻塞時間由傳入的timeval
結構決定
    # 
若是remaind_ms
的值爲0
,那麼aeApiPoll
調用以後立刻返回,不阻塞
    aeApiPoll(timeval)
    # 
處理全部已產生的文件事件(其實並無這個函數)
    processFileEvents()
    # 
處理全部已到達的時間事件
    processTimeEvents()
相關文章
相關標籤/搜索