經過了解異步設計的由來,來深刻理解異步事件機制。html
代碼地址node
爲了深刻理解異步的概念,就必須先了解異步設計的由來。算法
顯然易見的是,同步的概念隨着咱們學習第一個輸出Hello World的程序,就已經深刻人心。編程
然而咱們也很容易忘記一個事實:一個現代編程語言(如Python)作了很是多的工做,來指導和約束你如何去構建你本身的一個程序。segmentfault
def f(): print("in f()") def g(): print("in g()") f() g()
你知道in g()
必定輸出在in f()
以後,即函數f完成前函數g不會執行。這即爲同步。在現代編程語言的幫助下,這一切顯得很是的天然,從而也讓咱們能夠將咱們的程序分解成
鬆散耦合的函數:一個函數並不須要關心誰調用了它,它甚至能夠沒有返回值,只是完成一些操做。api
固然關於這些是怎麼具體實現的就不探究了,然而隨着一個程序的功能的增長,同步設計的開發理念並不足以實現一些複雜的功能。數組
寫一個程序每隔3秒打印「Hello World」,同時等待用戶命令行的輸入。用戶每輸入一個天然數n,就計算並打印斐波那契函數的值F(n),以後繼續等待下一個輸入
因爲等待用戶輸入是一個阻塞的操做,若是按照同步的設計理念:若是用戶未輸入,則意味着接下來的函數並不會執行,天然沒有辦法作到一邊輸出「Hello World」,
一邊等待用戶輸入。爲了讓程序能解決這樣一個問題,就必須引入併發機制,即讓程序可以同時作不少事,線程是其中一種。
具體代碼在example/hello_threads.py
中。
from threading import Thread from time import sleep from time import time from fib import timed_fib def print_hello(): while True: print("{} - Hello world!".format(int(time()))) sleep(3) def read_and_process_input(): while True: n = int(input()) print('fib({}) = {}'.format(n, timed_fib(n))) def main(): # Second thread will print the hello message. Starting as a daemon means # the thread will not prevent the process from exiting. t = Thread(target=print_hello) t.daemon = True t.start() # Main thread will read and process input read_and_process_input() if __name__ == '__main__': main()
對於以前那樣的問題,引入線程機制就能夠解決這種簡單的併發問題。而對於線程咱們應該有一個簡單的認知:
線程是由操做系統的調度器來調度的, 調度器統一負責管理調度進程中的線程。
而隨着現實問題的複雜化,如10K問題。
在Nginx沒有流行起來的時候,常被提到一個詞 10K(併發1W)。在互聯網的早期,網速很慢、用戶羣很小需求也只是簡單的頁面瀏覽,
因此最初的服務器設計者們使用基於進程/線程模型,也就是一個TCP鏈接就是分配一個進程(線程)。誰都沒有想到如今Web 2.0時候用戶羣裏和複雜的頁面交互問題,
而如今即時通訊和實在實時互動已經很廣泛了。那麼你設想若是每個用戶都和服務器保持一個(甚至多個)TCP鏈接才能進行實時的數據交互,別說BAT這種量級的網站,
就是豆瓣這種比較小的網站,同時的併發鏈接也要過億了。進程是操做系統最昂貴的資源,一臺機器沒法建立不少進程。若是要建立10K個進程,那麼操做系統是沒法承受的。
就算咱們不討論隨着服務器規模大幅上升帶來複雜度幾何級數上升的問題,採用分佈式系統,只是維持1億用戶在線須要10萬臺服務器,成本巨大,也只有FLAG、BAT這樣公司纔有財力購買如此多的服務器。
而一樣存在一些緣由,讓咱們避免考慮多線程的方式:
爲了解決這一問題,出現了「用同一進程/線程來同時處理若干鏈接」的思路,也就是I/O多路複用。
以Linux操做系統爲例,Linux操做系統給出了三種監聽文件描述符的機制,具體實現可參考:
select: 每一個鏈接對應一個描述符(socket),循環處理各個鏈接,先查下它的狀態,ready了就進行處理,不ready就不進行處理。可是缺點不少:
poll: 本質上和select沒有區別,可是因爲它是基於鏈表來存儲的,沒有最大鏈接數的限制。缺點是:
epoll: 它使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。epoll支持水平觸發和邊緣觸發,最大的特色在於「邊緣觸發」,它只告訴進程哪些剛剛變爲就緒態,而且只會通知一次。使用epoll的優勢不少:
綜上所述,經過epoll的機制,給現代高級語言提供了高併發、高性能解決方案的基礎。而一樣FreeBSD推出了kqueue,Windows推出了IOCP,Solaris推出了/dev/poll。
而在Python3.4中新增了selectors模塊,用於封裝各個操做系統所提供的I/O多路複用的接口。
那麼以前一樣的問題,咱們能夠經過I/O多路複用的機制實現併發。
寫一個程序每隔3秒打印「Hello World」,同時等待用戶命令行的輸入。用戶每輸入一個天然數n,就計算並打印斐波那契函數的值F(n),以後繼續等待下一個輸入
經過最基礎的輪詢機制(poll),輪詢標準輸入(stdin)是否變爲可讀的狀態,從而當標準輸入能被讀取時,去執行計算Fibonacci數列。而後判斷時間是否過去三秒鐘,從而是否輸出"Hello World!".
具體代碼在example/hello_selectors_poll.py
中。
注意:在Windows中並不是一切都是文件,因此該實例代碼沒法在Windows平臺下運行。
import selectors import sys from time import time from fib import timed_fib def process_input(stream): text = stream.readline() n = int(text.strip()) print('fib({}) = {}'.format(n, timed_fib(n))) def print_hello(): print("{} - Hello world!".format(int(time()))) def main(): selector = selectors.DefaultSelector() # Register the selector to poll for "read" readiness on stdin selector.register(sys.stdin, selectors.EVENT_READ) last_hello = 0 # Setting to 0 means the timer will start right away while True: # Wait at most 100 milliseconds for input to be available for event, mask in selector.select(0.1): process_input(event.fileobj) if time() - last_hello > 3: last_hello = time() print_hello() if __name__ == '__main__': main()
從上面解決問題的設計方案演化過程,從同步到併發,從線程到I/O多路複用。能夠看出根本思路去須要程序自己高效去阻塞,
讓CPU可以執行核心任務。意味着將數據包處理,內存管理,處理器調度等任務從內核態切換到應用態,操做系統只處理控制層,
數據層徹底交給應用程序在應用態中處理。極大程度的減小了程序在應用態和內核態之間切換的開銷,讓高性能、高併發成爲了可能。
經過以前的探究,不難發現一個同步的程序也能經過操做系統的接口實現「併發」,而這種「併發」的行爲便可稱之爲異步。
以前經過I/O複用的所提供的解決方案,進一步抽象,便可抽象出最基本的框架事件循環(Event Loop),而其中最容易理解的實現,
則是回調(Callback).
經過對事件自己的抽象,以及其對應的處理函數(handler),能夠實現以下算法:
維護一個按時間排序的事件列表,最近須要運行的定時器在最前面。這樣的話每次只須要從頭檢查是否有超時的事件並執行它們。
bisect.insort使得維護這個列表更加容易,它會幫你在合適的位置插入新的定時器事件組。
具體代碼在example/hello_event_loop_callback.py
中。
注意:在Windows中並不是一切都是文件,因此該實例代碼沒法在Windows平臺下運行。
from bisect import insort from fib import timed_fib from time import time import selectors import sys class EventLoop(object): """ Implements a callback based single-threaded event loop as a simple demonstration. """ def __init__(self, *tasks): self._running = False self._stdin_handlers = [] self._timers = [] self._selector = selectors.DefaultSelector() self._selector.register(sys.stdin, selectors.EVENT_READ) def run_forever(self): self._running = True while self._running: # First check for available IO input for key, mask in self._selector.select(0): line = key.fileobj.readline().strip() for callback in self._stdin_handlers: callback(line) # Handle timer events while self._timers and self._timers[0][0] < time(): handler = self._timers[0][1] del self._timers[0] handler() def add_stdin_handler(self, callback): self._stdin_handlers.append(callback) def add_timer(self, wait_time, callback): insort(self._timers, (time() + wait_time, callback)) def stop(self): self._running = False def main(): loop = EventLoop() def on_stdin_input(line): if line == 'exit': loop.stop() return n = int(line) print("fib({}) = {}".format(n, timed_fib(n))) def print_hello(): print("{} - Hello world!".format(int(time()))) loop.add_timer(3, print_hello) def f(x): def g(): print(x) return g loop.add_stdin_handler(on_stdin_input) loop.add_timer(0, print_hello) loop.run_forever() if __name__ == '__main__': main()