深刻理解異步事件機制

前言

經過了解異步設計的由來,來深刻理解異步事件機制。html

代碼地址node

什麼是異步

爲了深刻理解異步的概念,就必須先了解異步設計的由來。算法

同步

顯然易見的是,同步的概念隨着咱們學習第一個輸出Hello World的程序,就已經深刻人心。編程

然而咱們也很容易忘記一個事實:一個現代編程語言(如Python)作了很是多的工做,來指導和約束你如何去構建你本身的一個程序。segmentfault

def f():
    print("in f()")
def g():
    print("in g()")
f()
g()

你知道in g()必定輸出在in f()以後,即函數f完成前函數g不會執行。這即爲同步。在現代編程語言的幫助下,這一切顯得很是的天然,從而也讓咱們能夠將咱們的程序分解成
鬆散耦合的函數:一個函數並不須要關心誰調用了它,它甚至能夠沒有返回值,只是完成一些操做。api

固然關於這些是怎麼具體實現的就不探究了,然而隨着一個程序的功能的增長,同步設計的開發理念並不足以實現一些複雜的功能。數組

併發

寫一個程序每隔3秒打印「Hello World」,同時等待用戶命令行的輸入。用戶每輸入一個天然數n,就計算並打印斐波那契函數的值F(n),以後繼續等待下一個輸入

因爲等待用戶輸入是一個阻塞的操做,若是按照同步的設計理念:若是用戶未輸入,則意味着接下來的函數並不會執行,天然沒有辦法作到一邊輸出「Hello World」,
一邊等待用戶輸入。爲了讓程序能解決這樣一個問題,就必須引入併發機制,即讓程序可以同時作不少事,線程是其中一種。

線程

具體代碼在example/hello_threads.py中。

from threading import Thread
from time import sleep
from time import time
from fib import timed_fib
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        sleep(3)
def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))
def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    # Main thread will read and process input
    read_and_process_input()
if __name__ == '__main__':
    main()

對於以前那樣的問題,引入線程機制就能夠解決這種簡單的併發問題。而對於線程咱們應該有一個簡單的認知:

  • 一個線程能夠理解爲指令的序列和CPU執行的上下文的集合。
  • 一個同步的程序即進程,有且只會在一個線程中運行,因此當線程被阻塞,也就意味着整個進程被阻塞
  • 一個進程能夠有多個線程,同一個進程中的線程共享了進程的一些資源,好比說內存,地址空間,文件描述符等。
  • 線程是由操做系統的調度器來調度的, 調度器統一負責管理調度進程中的線程。

    • 系統的調度器決定何時會把當前線程掛起,並把CPU的控制器交個另外一個線程。這個過程稱之爲稱上下文切換,包括對於當前線程上下文的保存、對目標線程上下文的加載。
    • 上下文切換會對性能產生影響,由於它自己也須要CPU的週期來執行

I/O多路複用

而隨着現實問題的複雜化,如10K問題。

在Nginx沒有流行起來的時候,常被提到一個詞 10K(併發1W)。在互聯網的早期,網速很慢、用戶羣很小需求也只是簡單的頁面瀏覽,
因此最初的服務器設計者們使用基於進程/線程模型,也就是一個TCP鏈接就是分配一個進程(線程)。誰都沒有想到如今Web 2.0時候用戶羣裏和複雜的頁面交互問題,
而如今即時通訊和實在實時互動已經很廣泛了。那麼你設想若是每個用戶都和服務器保持一個(甚至多個)TCP鏈接才能進行實時的數據交互,別說BAT這種量級的網站,
就是豆瓣這種比較小的網站,同時的併發鏈接也要過億了。進程是操做系統最昂貴的資源,一臺機器沒法建立不少進程。若是要建立10K個進程,那麼操做系統是沒法承受的。
就算咱們不討論隨着服務器規模大幅上升帶來複雜度幾何級數上升的問題,採用分佈式系統,只是維持1億用戶在線須要10萬臺服務器,成本巨大,也只有FLAG、BAT這樣公司纔有財力購買如此多的服務器。

而一樣存在一些緣由,讓咱們避免考慮多線程的方式:

  • 線程在計算和資源消耗的角度來講是比較昂貴的。
  • 線程併發所帶來的問題,好比由於共享的內存空間而帶來的死鎖和競態條件。這些又會致使更加複雜的代碼,在編寫代碼的時候須要時不時地注意一些線程安全的問題。

爲了解決這一問題,出現了「用同一進程/線程來同時處理若干鏈接」的思路,也就是I/O多路複用。

以Linux操做系統爲例,Linux操做系統給出了三種監聽文件描述符的機制,具體實現可參考

  • select: 每一個鏈接對應一個描述符(socket),循環處理各個鏈接,先查下它的狀態,ready了就進行處理,不ready就不進行處理。可是缺點不少:

    • 每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大
    • 同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大
    • select支持的文件描述符數量過小了,默認是1024
  • poll: 本質上和select沒有區別,可是因爲它是基於鏈表來存儲的,沒有最大鏈接數的限制。缺點是:

    • 大量的的數組被總體複製於用戶態和內核地址空間之間,而無論這樣的複製是否是有意義。
    • poll的特色是「水平觸發(只要有數據能夠讀,無論怎樣都會通知)」,若是報告後沒有被處理,那麼下次poll時會再次報告它。
  • epoll: 它使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。epoll支持水平觸發和邊緣觸發,最大的特色在於「邊緣觸發」,它只告訴進程哪些剛剛變爲就緒態,而且只會通知一次。使用epoll的優勢不少:

    • 沒有最大併發鏈接的限制,能打開的fd的上限遠大於1024(1G的內存上能監聽約10萬個端口)
    • 效率提高,不是輪詢的方式,不會隨着fd數目的增長效率降低
    • 內存拷貝,利用mmap()文件映射內存加速與內核空間的消息傳遞;即epoll使用mmap減小複製開銷

綜上所述,經過epoll的機制,給現代高級語言提供了高併發、高性能解決方案的基礎。而一樣FreeBSD推出了kqueue,Windows推出了IOCP,Solaris推出了/dev/poll。

而在Python3.4中新增了selectors模塊,用於封裝各個操做系統所提供的I/O多路複用的接口。
那麼以前一樣的問題,咱們能夠經過I/O多路複用的機制實現併發。

寫一個程序每隔3秒打印「Hello World」,同時等待用戶命令行的輸入。用戶每輸入一個天然數n,就計算並打印斐波那契函數的值F(n),以後繼續等待下一個輸入

經過最基礎的輪詢機制(poll),輪詢標準輸入(stdin)是否變爲可讀的狀態,從而當標準輸入能被讀取時,去執行計算Fibonacci數列。而後判斷時間是否過去三秒鐘,從而是否輸出"Hello World!".
具體代碼在example/hello_selectors_poll.py中。

注意:在Windows中並不是一切都是文件,因此該實例代碼沒法在Windows平臺下運行。

import selectors
import sys
from time import time
from fib import timed_fib
def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))
def print_hello():
    print("{} - Hello world!".format(int(time())))
def main():
    selector = selectors.DefaultSelector()
    # Register the selector to poll for "read" readiness on stdin
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0  # Setting to 0 means the timer will start right away
    while True:
        # Wait at most 100 milliseconds for input to be available
        for event, mask in selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()
if __name__ == '__main__':
    main()

從上面解決問題的設計方案演化過程,從同步到併發,從線程到I/O多路複用。能夠看出根本思路去須要程序自己高效去阻塞,
讓CPU可以執行核心任務。意味着將數據包處理,內存管理,處理器調度等任務從內核態切換到應用態,操做系統只處理控制層,
數據層徹底交給應用程序在應用態中處理。極大程度的減小了程序在應用態和內核態之間切換的開銷,讓高性能、高併發成爲了可能。

異步

經過以前的探究,不難發現一個同步的程序也能經過操做系統的接口實現「併發」,而這種「併發」的行爲便可稱之爲異步

以前經過I/O複用的所提供的解決方案,進一步抽象,便可抽象出最基本的框架事件循環(Event Loop),而其中最容易理解的實現,
則是回調(Callback).

回調

經過對事件自己的抽象,以及其對應的處理函數(handler),能夠實現以下算法:

維護一個按時間排序的事件列表,最近須要運行的定時器在最前面。這樣的話每次只須要從頭檢查是否有超時的事件並執行它們。

bisect.insort使得維護這個列表更加容易,它會幫你在合適的位置插入新的定時器事件組。
具體代碼在example/hello_event_loop_callback.py中。

注意:在Windows中並不是一切都是文件,因此該實例代碼沒法在Windows平臺下運行。

from bisect import insort
from fib import timed_fib
from time import time
import selectors
import sys
class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)
    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)
            # Handle timer events
            while self._timers and self._timers[0][0] < time():
                handler = self._timers[0][1]
                del self._timers[0]
                handler()
    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)
    def add_timer(self, wait_time, callback):
        insort(self._timers, (time() + wait_time, callback))
    def stop(self):
        self._running = False
def main():
    loop = EventLoop()
    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))
    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)
    def f(x):
        def g():
            print(x)
        return g
    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()
if __name__ == '__main__':
    main()

參考文獻

相關文章
相關標籤/搜索