最近看Tornado源碼給了我很多啓發,心血來潮決定本身試着只用python標準庫來實現一個異步非阻塞web框架。花了點時間感受還能夠,一百多行的代碼已經能夠撐起一個極簡框架了。html
須要的相關知識點:python
掌握上面三個點的知識就徹底沒有問題,不是很清楚的同窗我也推薦幾篇參考文章git
HTTP協議詳細介紹(http://www.javashuo.com/article/p-ggogpsqq-ha.html)github
Python篇-IO多路複用詳解(https://www.jianshu.com/p/818f27379a5e)web
Python異步IO之協程(一):從yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738)數據庫
實驗環境:瀏覽器
python 3.7.3
因爲在框架中會使用到async/await關鍵字,因此只要確保python版本在3.5以上便可。服務器
咱們的框架要實現最基本的幾個功能:併發
固然一個完善的web框架須要實現的遠遠不止這些,這裏咱們如今只須要它能跑起來就足夠了。app
HTTP是基於TCP/IP通訊協議來實現數據傳輸,與通常的C/S相比,它的特色在於當客戶端(瀏覽器)向服務端發起HTTP請求,服務端響應數據後雙方立馬斷開鏈接,服務端沒法主動向客戶端發送數據。HTTP協議數據傳輸內容分爲請求頭和請求體,請求頭和請求體之間使用"\r\n\r\n"進行分隔。在請求頭中,第一行包含了請求方式,請求路徑和HTTP協議,此後每一行以key: value的形式傳輸數據。
對於咱們的web服務端來講,須要的就是解析http請求和處理http響應。
咱們經過寫兩個類,HttpRequest和HttpResponse來實現。
HttpRequest設計目標是解析從socket接收request數據
1 class HttpRequest(object): 2 def __init__(self, content: bytes): 3 self.content = content.decode('utf-8') 4 self.headers = {} 5 self.GET = {} 6 self.url = '' 7 self.full_path = '' 8 self.body = '' 9 try: 10 header, self.body = self.content.split('\r\n\r\n') 11 temp = header.split('\r\n') 12 first_line = temp.pop(0) 13 self.method, self.url, self.protocol = first_line.split(' ') 14 self.full_path = self.url 15 for t in temp: 16 k, v = t.split(': ', 1) 17 self.headers[k] = v 18 except Exception as e: 19 print(e) 20 if len(self.url.split('?')) > 1: # 解析GET參數 21 self.url = self.full_path.split('?')[0] # 把url中攜帶的參數去掉 22 parms = self.full_path.split('?')[1].split('&') 23 for p in parms: # 將GET參數添加到self.GET字典 24 k, v = p.split('=') 25 self.GET[k] = v
在類中,咱們實現解析http請求的headers、method、url和GET參數,其實還有不少事情沒有作,好比使用POST傳輸數據時,數據是在請求體中,針對這部份內容我並無開始寫,緣由在於本文主要目的仍是異步非阻塞框架,目前的功能已經足以支持咱們進行下一步實驗了。
HTTP響應也能夠分爲響應頭和響應體,咱們能夠很簡單的實現一個response:
1 class HttpResponse(object): 2 def __init__(self, data: str): 3 self.status_code = 200 # 默認響應狀態 200 4 self.headers = 'HTTP/1.1 %s OK\r\n' 5 self.headers += 'Server:AsyncWeb' 6 self.headers += '\r\n\r\n' 7 self.data = data 8 9 @property 10 def content(self): 11 return bytes((self.headers + self.data) % self.status_code, encoding='utf8')
HttpResponse中並無作太多的事情,接受一個字符串,並使用content返回一個知足HTTP響應格式的bytes。
從用戶調用角度,可使用return HttpResponse("歡迎來到AsynicWeb")來返回數據。
咱們也能夠簡單的定義一個404頁面:
Http404 = HttpResponse('<html><h1>404</h1></html>') Http404.status_code = 404
路由映射簡單理解就是從一個URL地址找到對應的邏輯函數。舉個例子,咱們訪問http://127.0.0.1:8000這個頁面,在http請求中它的url是"/",在web服務器中有一個函數index,web服務器可以由url地址"/"找到函數index,這就是一個路由映射。
其實路由映射實現起來很是簡單。咱們只要定義一個映射列表,列表中的每一個元素包含url和邏輯處理(視圖函數)兩部分,當一個http請求到達的時候,遍歷映射列表,使用正則匹配每個url,若是請求的url和映射表中的相同,咱們就能夠取出對應的視圖函數。
路由映射表是徹底由用戶來定義映射關係的,它應該使用一個咱們定義的標準結構,好比:
routers = [ ('/$', IndexView), ('/home', asy) ]
視圖是指可以根據一個請求,執行某些邏輯運算,最終返回響應的模塊。說到這裏,一個web框架的運行流程就出來了:
http請求——路由映射表——視圖——執行視圖獲取返回值——http響應
在咱們的框架中,借鑑Django的設計,咱們讓它支持類視圖(CBV)和函數視圖(FBV)兩種模式。
對於函數視圖,徹底由用戶本身定義,只要至少可以接受一個request參數便可
對於類視圖,咱們須要作一些預處理,確保用戶按咱們的規則來實現類視圖。
定義一個View類:
1 class View(object): 2 # CBV應繼承View類 3 def dispatch(self, request): 4 method = request.method.lower() 5 if hasattr(self, method): 6 return getattr(self, method)(request) 7 else: 8 return Http404
在View類中,咱們只寫了一個dispatch方法,其實就作了一件事:反射。當咱們在路由映射表中找對應的視圖時,若是判斷視圖屬於類,咱們就調用dispatch方法。
從用戶角度來看,實現一個CBV只須要繼承View類,而後經過定義get、post、delete等方法來實現不一樣的處理。
上面幾個小節實現了web框架的大致執行路徑,從這節開始咱們實現web服務器的核心。
經過IO多路複用能夠達到單線程實現高併發的效果,一個標準的IO多路複用寫法:
1 server = socket(AF_INET, SOCK_STREAM) 2 server.bind(("127.0.0.1", 8000)) 3 server.setblocking(False) # 設置非阻塞 4 server.listen(128) 5 Future_Task_Wait = {} 6 rlist = [server, ] 7 while True: 8 r, w, x = select.select(rlist, [], [], 0.1) 9 for o in r: 10 if o == server: 11 '''判斷o是server仍是conn''' 12 conn, addr = o.accept() 13 conn.setblocking(False) # 設置非阻塞 14 rlist.append(conn) # 客戶鏈接 加入輪詢列表 15 else: 16 data = b"" 17 while True: # 接收客戶傳輸數據 18 try: 19 chunk = o.recv(1024) 20 data = data + chunk 21 except Exception as e: 22 chunk = None 23 if not chunk: 24 break 25 dosomething(o, data, routers) # 拿到數據乾點啥
經過這段代碼咱們能夠得到全部的請求了,下一步就是處理這些請求。
咱們就定義一個dosomething函數
1 import re 2 import time 3 from types import FunctionType 4 5 def dosomething(o, data, routers): 6 '''解析http請求,尋找映射函數並執行獲得結果
7 :param o: socket鏈接對象 8 :param data: socket接收數據 9 :return: 響應結果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判斷targe是函數仍是類 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result
這段代碼作了這麼幾件事。1.實例化HttpRequest;2.使用正則遍歷路由映射表;3.將request傳入視圖函數或類視圖的dispatch方法;4.拿到result結果
咱們經過result = dosomething(o, data, routers)能夠拿到結果,接下來咱們只須要把結果發回給客戶端並斷開鏈接就能夠了
o.sendall(result.content) # 因爲result是一個HttpResponse對象 咱們使用content屬性
rlist.remove(o) # 從輪詢中刪除鏈接
o.close() # 關閉鏈接
至此,咱們的web框架已經搭建好了。
但它仍是一個同步的框架,在咱們的服務端中,其實一直經過while循環在監聽select是否變化,假如咱們在視圖函數中添加IO操做,其餘鏈接依然會阻塞等待,接下來讓咱們的框架實現對協程的支持。
在實現協程以前,咱們先聊聊Tornado的Future對象。能夠說Tornado異步非阻塞的實現核心就是Future。
Future對象內部維護了一個重要屬性_result,這是一個標記位,一個剛實例化的Future內部的_result=None,咱們能夠經過其餘操做來更改_result的狀態。另外一方面,咱們能夠一直監聽每一個Future對象的_result狀態,若是發生變化就執行某些特定的操做。
咱們在第六節定義的dosomething函數中拿到了一個result,它應當是一個HttpResponse對象,那麼能不能返回一個Future對象呢。
假如result是一個Future對象,咱們的服務端不立馬返回結果,而是把Future放進另外一個輪詢列表中,當Future內的_result改變時再返回結果,就達到了異步的效果。
咱們也能夠定義一個Future類,這個類維護只一個變量result:
1 class Future(object): 2 def __init__(self): 3 self.result = None
對於框架使用者來講,在視圖函數要麼返回一個HttpResponse對象表明當即返回,要麼返回一個Future對象說你先別管我,我把事情幹完了再通知你返回結果。
既然視圖函數返回的可能不僅是HttpResponse對象,那麼咱們就須要對第六步的代碼增長額外的處理:
Future_Task_Wait = {} # 定義一個異步Future字典 result = dosomething() # 拿到結果後執行下面判斷 if isinstance(result, Future): Future_Task_Wait[o] = result # Futre對象則加入字典 else: o.sendall(result.content) # 非Future對象直接返回結果並斷開鏈接 rlist.remove(o) o.close()
在while True輪詢內再增長一段代碼,遍歷Future_Task_Wait字典:
rm_conn = [] # 須要移除列表的conn for conn, future in Future_Task_Wait.items(): if future.result: try: conn.sendall(HttpResponse(data=future.result).content) # 返回result finally: rlist.remove(conn) conn.close() rm_conn.append(conn) for conn in rm_conn: # 在字典中刪除conn del Future_Task_Wait[conn]
這樣,咱們就能夠返回一個Future來告訴服務器這是未來才返回的對象。
那回歸正題,咱們到底該如何使用協程?這裏我用的方法是建立一個子線程來執行協程事件循環,主線程永遠在監聽socket。
from threading import Thread def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() coroutine_loop = asyncio.new_event_loop() # 建立協程事件循環 run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,)) # 新起線程運行事件循環, 防止阻塞主線程 run_loop_thread.start() # 運行線程,即運行協程事件循環
當咱們要把asyncdo方法添加做爲協程任務時
asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)
好了,異步非阻塞的核心代碼分析的差很少了,將六七節的代碼整合寫成一個類
1 import re 2 import time 3 import select 4 import asyncio 5 from socket import * 6 from threading import Thread 7 from types import FunctionType 8 from http.response import Http404, HttpResponse 9 from http.request import HttpRequest 10 from views import View 11 from core.future import Future 12 13 class App(object): 14 # web應用程序 15 coroutine_loop = None 16 17 def __new__(cls, *args, **kwargs): 18 # 使用單例模式 19 if not hasattr(cls, '_instance'): 20 App._instance = super().__new__(cls) 21 return App._instance 22 23 def listen(self, host, port, routers): 24 # IO多路複用監聽鏈接 25 server = socket(AF_INET, SOCK_STREAM) 26 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 27 server.bind((host, port)) 28 server.setblocking(False) 29 server.listen(128) 30 Future_Task_Wait = {} 31 rlist = [server, ] 32 while True: 33 r, w, x = select.select(rlist, [], [], 0.01) 34 for o in r: 35 if o == server: 36 '''判斷o是server仍是conn''' 37 conn, addr = o.accept() 38 conn.setblocking(False) 39 rlist.append(conn) 40 else: 41 data = b"" 42 while True: 43 try: 44 chunk = o.recv(1024) 45 data = data + chunk 46 except Exception as e: 47 chunk = None 48 if not chunk: 49 break 50 try: 51 request = HttpRequest(data, o) 52 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 53 request.method, request.url) 54 flag = False 55 for router in routers: 56 if re.match(router[0], request.url): 57 target = router[1] 58 flag = True 59 break 60 if flag: 61 # 判斷targe是函數仍是類 62 if isinstance(target, FunctionType): 63 result = target(request) 64 elif issubclass(target, View): 65 result = target().dispatch(request) 66 else: 67 result = Http404 68 else: 69 result = Http404 70 # 判斷result是否是future 71 if isinstance(result, Future): 72 Future_Task_Wait[o] = result 73 else: 74 o.sendall(result.content) 75 rlist.remove(o) 76 o.close() 77 except Exception as e: 78 print(e) 79 rm_conn = [] 80 for conn, future in Future_Task_Wait.items(): 81 if future.result: 82 try: 83 conn.sendall(HttpResponse(data=future.result).content) 84 finally: 85 rlist.remove(conn) 86 conn.close() 87 rm_conn.append(conn) 88 for conn in rm_conn: 89 del Future_Task_Wait[conn] 90 91 def run(self, host='127.0.0.1', port=8000, routers=()): 92 # 主線程select多路複用,處理http請求和響應 93 # 給協程單首創建一個子線程,負責處理View函數提交的協程 94 def start_loop(loop): 95 asyncio.set_event_loop(loop) 96 loop.run_forever() 97 self.coroutine_loop = asyncio.new_event_loop() # 建立協程事件循環 98 run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,)) # 新起線程運行事件循環, 防止阻塞主線程 99 run_loop_thread.start() # 運行線程,即運行協程事件循環 100 self.listen(host, port, routers)
如今,能夠測試咱們的web框架了。
1 import asyncio 2 from core.server import App 3 from views import View 4 from http.response import * 5 from core.future import Future 6 7 8 class IndexView(View): 9 def get(self, request): 10 return HttpResponse('歡迎來到首頁') 11 12 def post(self, request): 13 return HttpResponse('post') 14 15 def asy(request): 16 future = Future() 17 print('異步調用') 18 wait = request.url.split('/')[-1] 19 try: 20 wait = int(wait) 21 except: 22 wait = 5 23 asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop) 24 print('返回Future') 25 return future 26 27 async def dosomething(future, wait): 28 # 異步函數 29 await asyncio.sleep(wait)# 模擬異步操做 30 future.result = '等待了%s秒' % wait 31 32 routers = [ 33 ('/$', IndexView), 34 ('/home', asy) 35 ] 36 37 # 從用戶角度只需使用run() 38 app = App() 39 app.run('127.0.0.1', 8080, routers=routers)
瀏覽器訪問http://127.0.0.1:8080,返回沒有問題,若是有同窗使用Chrome可能會亂碼,那是由於咱們的HttpResponse沒有返回指定編碼,添加一個響應頭便可。
瀏覽器訪問http://127.0.0.1:8080/home,這時候會執行協程,默認等待5s後返回結果,你能夠在多個標籤頁訪問這個地址,經過等待時間來驗證咱們的異步框架是否正常工做。
至此,咱們要實現的異步非阻塞web框架已經完成了。固然這個框架說到底仍是太簡陋,後續徹底能夠優化HttpRequest和HttpResponse、增長對數據庫、模板語言等等組件的擴展。
完整源碼已經上傳至https://github.com/sswest/AsyncWeb