一塊兒讀 Gevent 源碼

這一篇主要想跟你們分享一下 Gevent 實現的基礎邏輯,也是有同窗對這個很感興趣,因此貼出來跟你們一塊兒分享一下。python

Greenlet

咱們知道 Gevent 是基於 Greenlet 實現的,greenlet 有的時候也被叫作微線程或者協程。其實 Greenlet 自己很是簡單,其自身實現的功能也很是直接。區別於常規的編程思路——順序執行、調用進棧、返回出棧—— Greenlet 提供了一種在不一樣的調用棧之間自由跳躍的功能。從一個簡單的例子來看一下吧(摘自官方文檔):git

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

這裏,每個 greenlet 就是一個調用棧——您能夠把他想象成一個線程,只不過真正的線程能夠並行執行,而同一時刻只能有一個 greenlet 在執行(同一線程裏)。正如例子中最後三句話,咱們建立了 gr1gr2 兩個不一樣的調用棧空間,入口函數分別是 test1test2;這最後一句 gr1.switch() 得多解釋一點。github

由於除了 gr1gr2,咱們還有一個棧空間,也就是全部 Python 程序都得有的默認的棧空間——咱們暫且稱之爲 main,而這一句 gr1.switch() 偏偏實現了從 maingr1 的跳躍,也就是從當前的棧跳到指定的棧。這時,就猶如常規調用 test1() 同樣,gr1.switch() 的調用暫時不會返回結果,程序會跳轉到 test1 繼續執行;只不過區別於普通函數調用時 test1() 會向當前棧壓棧,而 gr1.switch() 則會將當前棧存檔,替換成 gr1 的棧。如圖所示:算法

請輸入圖片描述

對於這種棧的切換,咱們有時也稱之爲執行權的轉移,或者說 main 交出了執行權,同時 gr1 得到了執行權。Greenlet 在底層是用匯編實現的這樣的切換:把當前的棧(main)相關的寄存器啊什麼的保存到內存裏,而後把本來保存在內存裏的 gr1 的相關信息恢復到寄存器裏。這種操做速度很是快,比操做系統對多進程調度的上下文切換還要快。代碼在這裏,有興趣的同窗能夠一塊兒研究一下(其中 switch_x32_unix.h 是我寫的哈哈)。數據庫

回到前面的例子,最後一句 gr1.switch() 調用將執行點跳到了 gr1 的第一句,因而輸出了 12。隨後順序執行到 gr2.switch(),繼而跳轉到 gr2 的第一句,因而輸出了 56。接着又是 gr1.switch(),跳回到 gr1,從以前跳出的地方繼續——對 gr1 而言就是 gr2.switch() 的調用返回告終果 None,而後輸出 34編程

這個時候 test1 執行到頭了,gr1 的棧裏面空了。Greenlet 設計了 parent greenlet 的概念,就是說,當一個 greenlet 的入口函數執行完以後,會自動切換回其 parent。默認狀況下,greenlet 的 parent 就是建立該 greenlet 時所在的那個棧,前面的例子中,gr1gr2 都是在 main 裏被建立的,因此他們倆的 parent 都是 main。因此當 gr1 結束的時候,會回到 main 的最後一句,接着 main 結束了,因此整個程序也就結束了——78 歷來沒有被執行到過。另外,greenlet 的 parent 也能夠手工設置。segmentfault

簡單來看,greenlet 只是爲 Python 語言增長了建立多條執行序列的功能,並且多條執行序列之間的切換還必須得手動顯式調用 switch() 才行;這些都跟異步 I/O 沒有必然關係。後端

gevent.sleep

接着來看 Gevent。最簡單的一個 Gevent 示例就是這樣的了:服務器

import gevent
gevent.sleep(1)

貌似很是簡單的一個 sleep,卻包含了 Gevent 的關鍵結構,讓咱們仔細看一下 sleep 的實現吧。代碼在 gevent/hub.py多線程

def sleep(seconds=0):
    hub = get_hub()
    loop = hub.loop
    hub.wait(loop.timer(seconds))

這裏我把一些當前用不着的代碼作了一些清理,只留下了三句關鍵的代碼,其中就有 Gevent 的兩個關鍵的部件——hublooploop 是 Gevent 的核心部件,也就是主循環核心,默認是用 Cython 寫的 libev 的包裝(因此性能槓槓滴),稍後會在詳細提到它。hub 則是一個 greenlet,裏面跑着 loop

hub 是一個單例,從 get_hub() 的源碼就能夠看出來:

import _thread
_threadlocal = _thread._local()

def get_hub(*args, **kwargs):
    global _threadlocal
    try:
        return _threadlocal.hub
    except AttributeError:
        hubtype = get_hub_class()
        hub = _threadlocal.hub = hubtype(*args, **kwargs)
        return hub

因此第一次執行 get_hub() 的時候,就會建立一個 hub 實例:

class Hub(greenlet):
    loop_class = config('gevent.core.loop', 'GEVENT_LOOP')

    def __init__(self):
        greenlet.__init__(self)
        loop_class = _import(self.loop_class)
        self.loop = loop_class()

一樣這是一段精簡了的代碼,反映了一個 hub 的關鍵屬性——looploop 實例隨着 hub 實例的建立而建立,默認的 loop 就是 gevent/core.ppyx 裏的 class loop,也能夠經過環境變量 GEVENT_LOOP 來自定義。

值得注意的是,截止到 hub = get_hub()loop = hub.loop,咱們都只是建立了 hubloop,並無真正開始跑咱們的主循環。稍安勿躁,第三句就要開始了。

loop 有一堆接口,對應着底層 libev 的各個功能,詳見此處。咱們這裏用到的是 timer(seconds),該函數返回的是一個 watcher 對象,對應着底層 libev 的 watcher 概念。咱們大概能猜到,這個 watcher 對象會在幾秒鐘以後作一些什麼事情,可是具體怎麼作,讓咱們一塊兒看看 hub.wait() 的實現吧。

def wait(self, watcher):
        waiter = Waiter()
        watcher.start(waiter.switch)
        waiter.get()

代碼也不長,不過能看到 watcher 的接口 watcher.start(method),也就是說,當給定的幾秒鐘過了以後,會調用這裏給的函數,也就是 waiter.switch。讓咱們再看一下這裏用到的 Waiter,都是在同一個文件 hub.py 裏面:

from greenlet import getcurrent

class Waiter(object):
    def __init__(self):
        self.hub = get_hub()
        self.greenlet = None

    def switch(self):
        assert getcurrent() is self.hub
        self.greenlet.switch()

    def get(self):
        assert self.greenlet is None
        self.greenlet = getcurrent()
        try:
            self.hub.switch()
        finally:
            self.greenlet = None

這裏一樣刪掉了大量干擾因素。根據前面 wait() 的定義,咱們會先建立一個 waiter,而後調用其 get(),隨後幾秒鐘以後 loop 會調用其 switch()。一個個看。

get() 一上來會保證本身不會被同時調用到(assert),接着就去獲取了當前的 greenlet,也就是調用 get() 時所處的棧,一直往前找,找到 sleep(1),因此 getcurrent() 的結果是 mainWaiter 隨後將 main 保存在了 self.greenlet 引用中。

下面的一句話是重中之重了,self.hub.switch()!由無論任何上下文中,直接往 hub 裏跳。因爲這是第一次跳進 hub 裏,因此此時 loop 就開始運轉了。

正巧,咱們以前已經經過 loop.timer(1)watcher.start(waiter.switch),在 loop 裏註冊了說,1 秒鐘以後去調用 waiter.switchloop 一旦跑起來就會嚴格執行以前註冊的命令。因此呢,一秒鐘以後,咱們在 hub 的棧中,調用到了 Waiter.switch()

switch() 裏,程序一上來就要驗證當前上下文必須得是 hub,翻閱一下前面的代碼,這個是必然的。最後,跳到 self.greenlet!還記得它被設置成什麼了嗎?——main。因而乎,咱們就回到了最初的代碼裏,gevent.sleep(1) 在通過了 1 秒鐘的等待以後終於返回了。

回頭看一下這個過程,其實也很簡單的:當咱們須要等待一個事件發生時——好比須要等待 1 秒鐘的計時器事件,咱們就把當前的執行棧跟這個事件作一個綁定(watcher.start(waiter.switch)),而後把執行權交給 hubhub 則會在事件發生後,根據註冊的記錄儘快回到原來的斷點繼續執行。

異步

hub 一旦拿到執行權,就能夠作不少事情了,好比切換到別的 greenlet 去執行一些其餘的任務,直到這些 greenlet 又主動把執行權交回給 hub。宏觀的來看,就是這樣的:一個 hub,好多個其餘的任務 greenlet(其中沒準就包括 main),hub 負責總調度,去依次調用各個任務 greenlet;任務 greenlet 則在執行至下一次斷點時,主動切換回 hub。這樣一來,許多個任務 greenlet 就能夠看似並行地同步運行了,這種任務調度方式叫作協做式的任務調度(cooperative scheduling)。

舉個例子:

import gevent

def beep(interval):
    while True:
        print("Beep %s" % interval)
        gevent.sleep(interval)

for i in range(10):
    gevent.spawn(beep, i)

beep(20)

例子裏咱們總共建立了 10greenlet,每個都會按照不一樣頻率輸出「蜂鳴」;最後一句的 beep(20) 又讓 main greenlet 也不斷地蜂鳴。算上 hub,這個例子一共會有 12 個不一樣的 greenlet 在協做式地運行。

I/O

Gevent 最主要的功能固然是異步 I/O 了。其實,I/O 跟前面 sleep 的例子沒什麼本質的區別,只不過 sleep 用的 watchertimer,而 I/O 用到的 watcherio。好比說 wait_read(fileno) 是這樣的:

def wait_read(fileno):
    hub = get_hub()
    io = hub.loop.io(fileno, 1)
    return hub.wait(io)

沒什麼太大區別吧,原理其實都是同樣的。基於這個,咱們就能夠搞異步 socket 了。socket 的接口較爲複雜,這裏提取一些標誌性的代碼一塊兒讀一下吧:

class socket(object):
    def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0):
        self._sock = _realsocket(family, type, proto)  # 建立底層的 socket
        self._sock.setblocking(0)  # 將其設置爲非阻塞的
        fileno = self._sock.fileno()  # 得到其文件描述符
        self.hub = get_hub()  # 本身留一份 hub 的引用,省的每次再現取
        io = self.hub.loop.io  # 快捷方式
        self._read_event = io(fileno, 1)  # socket 的讀取事件
        self._write_event = io(fileno, 2)  # socket 的寫入事件

    def _wait(self, watcher):
        assert watcher.callback is None  # 一個 socket 只能被一個 greenlet 用
        self.hub.wait(watcher)  # 見以前的例子,等待一個事件發生

    def recv(self, *args):
        sock = self._sock
        while True:
            try:
                return sock.recv(*args)  # 異步接收,要麼當即成功,要麼當即失敗
            except error as ex:
                if ex.args[0] != EWOULDBLOCK:  # 若是失敗的話,除了是異步等待的狀況,
                    raise  # 其餘狀況都報錯
            self._wait(self._read_event)  # 等待 socket 有數據可讀

libev

最後提一點關於 libev 的東西,由於有同窗也問到 Gevent 底層的調度方式。簡單來講,libev 是依賴操做系統底層的異步 I/O 接口實現的,Linux 用的是 epoll,FreeBSD 則是 kqueue。Python 代碼裏,socket 會建立一堆 io watcher,對應底層則是將一堆文件描述符添加到一個——好比—— epoll 的句柄裏。當切換到 hub 以後,libev 會調用底層的 epoll_wait 來等待這些 socket 中可能出現的事件。一旦有事件產生(多是一次出現好多個事件),libev 就會按照優先級依次調用每一個事件的回調函數。注意,epoll_wait 是有超時的,因此一些沒法以文件描述符的形式存在的事件也能夠有機會被觸發。關於 libev 網上還有不少資料,有興趣你們能夠自行查閱。

Gevent 的性能調優

Gevent 不是銀彈,不能無限制地建立 greenlet。正如多線程編程同樣,用 gevent 寫服務器也應該建立一個「微線程池」,超過池子大小的 spawn 應該被阻塞而且開始排隊。只有這樣,才能保證同時運行的 greenlet 數量不至於多到顯著增長異步等待的恢復時間,從而保證每一個任務的響應速度。其實,當池子的大小增長到必定程度以後,CPU 使用量的增速會放緩甚至變爲 0,這時繼續增長池子大小隻能致使回調函數開始排隊,不能真正增長吞吐量。正確的作法是增長硬件或者優化代碼(提升算法效率、減小無謂調用等)。

關於 pool 的大小,我以爲是能夠算出來的:

一、在壓力較小、pool 資源充足的狀況下,測得單個請求平均處理總時間,記做 Ta
二、根據系統需求,估計一下能接受的最慢的請求處理時間,記做 Tm
三、設 Ta 中有 Ts 的時間,執行權是不屬於當前處理中的 greenlet 的,好比正在進行異步的數據庫訪問或是調用遠端 API 等後端訪問
四、在常規壓力下,經過測量後端訪問請求處理的平均時間,根據代碼實際調用狀況測算出 Ts
五、pool 的大小 = (Tm / (Ta - Ts)) * 150%,這裏的 150% 是個 buffer 值,拍腦門拍出來的

好比理想狀況下平均每一個請求處理須要 20ms,其中平均有 15ms 是花在數據庫訪問上(假設數據庫性能較爲穩定,可以線性 scale)。若是最大能容忍的請求處理時間是 500ms 的話,那池子大小應該設置成 (500 / (20 - 15)) * 150% = 150,也就意味着單進程最大併發量是 150

從這個算法也能夠看出,花在 Python 端的 CPU 時間越少,系統併發量就越高,而花在後端訪問上的時間長短對併發影響不是很大——固然了,依然得假設數據庫等後端能夠線性 scale。

下面是我以前在 Amazon EC2 m1.small 機器上的部分測試結果,對比了同步多進程和 Gevent 在處理包含異步 PostgreSQL 和 Redis 訪問的請求時的性能:

Log Format (per actor)
handling time for 500 requests / Time receiving 500 responses - time per handling / time per request - raw handling rate / request per second

8 actors, 128 testers: 798 rps on client

1230.08 ms / 5649.88 ms - 2.46 ms / 11.30 ms - 406.48 rps / 88.50 rps
1707.71 ms / 5938.53 ms - 3.42 ms / 11.88 ms - 292.79 rps / 84.20 rps
2219.12 ms / 6324.48 ms - 4.44 ms / 12.65 ms - 225.31 rps / 79.06 rps
1446.94 ms / 5491.89 ms - 2.89 ms / 10.98 ms - 345.56 rps / 91.04 rps
1064.61 ms / 5189.07 ms - 2.13 ms / 10.38 ms - 469.66 rps / 96.36 rps
2099.23 ms / 5844.37 ms - 4.20 ms / 11.69 ms - 238.18 rps / 85.55 rps

1 async actor with 8 concurrency limit, 128 testers: 1031 rps on client

3995.44 ms / 560.62 ms - 7.99 ms / 1.12 ms - 125.14 rps / 891.87 rps
4369.57 ms / 575.34 ms - 8.74 ms / 1.15 ms - 114.43 rps / 869.06 rps
4388.47 ms / 590.63 ms - 8.78 ms / 1.18 ms - 113.93 rps / 846.55 rps
4439.61 ms / 579.39 ms - 8.88 ms / 1.16 ms - 112.62 rps / 862.97 rps
3866.82 ms / 574.92 ms - 7.73 ms / 1.15 ms - 129.31 rps / 869.69 rps

1 async actor with no concurrency limit, 128 testers: 987 rps on client

38191.16 ms / 551.76 ms - 76.38 ms / 1.10 ms - 13.09 rps / 906.20 rps
34354.80 ms / 564.43 ms - 68.71 ms / 1.13 ms - 14.55 rps / 885.84 rps
40397.18 ms / 543.23 ms - 80.79 ms / 1.09 ms - 12.38 rps / 920.42 rps
45406.02 ms / 490.45 ms - 90.81 ms / 0.98 ms - 11.01 rps / 1019.48 rps
37106.92 ms / 581.95 ms - 74.21 ms / 1.16 ms - 13.47 rps / 859.18 rps

能看出來,一樣是 8 的併發限制,同步比異步處理快兩三倍(可是 load balance 拉低了同步的優點),吞吐量上雖比不上異步,但也不差。在去掉併發限制以後,吞吐量變化不大,但處理時間翻了 10 倍(由於大量 callback 開始排隊,沒法及時被調用到),且不穩定。

相關文章
相關標籤/搜索