150行代碼搭建異步非阻塞Web框架

最近看Tornado源碼給了我很多啓發,心血來潮決定本身試着只用python標準庫來實現一個異步非阻塞web框架。花了點時間感受還能夠,一百多行的代碼已經能夠撐起一個極簡框架了。html

1、準備工做

須要的相關知識點:python

  • HTTP協議的請求和響應
  • IO多路複用
  • asyncio

掌握上面三個點的知識就徹底沒有問題,不是很清楚的同窗我也推薦幾篇參考文章git

  HTTP協議詳細介紹(http://www.javashuo.com/article/p-ggogpsqq-ha.htmlgithub

  Python篇-IO多路複用詳解(https://www.jianshu.com/p/818f27379a5eweb

  Python異步IO之協程(一):從yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738數據庫

實驗環境:瀏覽器

python 3.7.3

 因爲在框架中會使用到async/await關鍵字,因此只要確保python版本在3.5以上便可。服務器

 2、框架功能目標

咱們的框架要實現最基本的幾個功能:併發

  • 封裝HTTP請求響應
  • 路由映射
  • 類視圖和函數視圖
  • 協程支持

 固然一個完善的web框架須要實現的遠遠不止這些,這裏咱們如今只須要它能跑起來就足夠了。app

3、封裝HTTP協議

HTTP是基於TCP/IP通訊協議來實現數據傳輸,與通常的C/S相比,它的特色在於當客戶端(瀏覽器)向服務端發起HTTP請求,服務端響應數據後雙方立馬斷開鏈接,服務端沒法主動向客戶端發送數據。HTTP協議數據傳輸內容分爲請求頭和請求體,請求頭和請求體之間使用"\r\n\r\n"進行分隔。在請求頭中,第一行包含了請求方式,請求路徑和HTTP協議,此後每一行以key: value的形式傳輸數據。

對於咱們的web服務端來講,須要的就是解析http請求和處理http響應。

咱們經過寫兩個類,HttpRequest和HttpResponse來實現。

3.1 HttpRequest

HttpRequest設計目標是解析從socket接收request數據

 1 class HttpRequest(object):
 2     def __init__(self, content: bytes):
 3         self.content = content.decode('utf-8')
 4         self.headers = {}
 5         self.GET = {}
 6         self.url = ''
 7         self.full_path = ''
 8         self.body = ''
 9         try:
10             header, self.body = self.content.split('\r\n\r\n')
11             temp = header.split('\r\n')
12             first_line = temp.pop(0)
13             self.method, self.url, self.protocol = first_line.split(' ')
14             self.full_path = self.url
15             for t in temp:
16                 k, v = t.split(': ', 1)
17                 self.headers[k] = v
18         except Exception as e:
19             print(e)
20         if len(self.url.split('?')) > 1: # 解析GET參數
21             self.url = self.full_path.split('?')[0] # 把url中攜帶的參數去掉
22             parms = self.full_path.split('?')[1].split('&')
23             for p in parms: # 將GET參數添加到self.GET字典
24                 k, v = p.split('=')
25                 self.GET[k] = v

在類中,咱們實現解析http請求的headers、method、url和GET參數,其實還有不少事情沒有作,好比使用POST傳輸數據時,數據是在請求體中,針對這部份內容我並無開始寫,緣由在於本文主要目的仍是異步非阻塞框架,目前的功能已經足以支持咱們進行下一步實驗了。

3.2 HttpResponse

HTTP響應也能夠分爲響應頭和響應體,咱們能夠很簡單的實現一個response:

 1 class HttpResponse(object):
 2     def __init__(self, data: str):
 3         self.status_code = 200 # 默認響應狀態 200
 4         self.headers = 'HTTP/1.1 %s OK\r\n'
 5         self.headers += 'Server:AsyncWeb'
 6         self.headers += '\r\n\r\n'
 7         self.data = data
 8 
 9     @property
10     def content(self):
11         return bytes((self.headers + self.data) % self.status_code, encoding='utf8')

HttpResponse中並無作太多的事情,接受一個字符串,並使用content返回一個知足HTTP響應格式的bytes。

從用戶調用角度,可使用return HttpResponse("歡迎來到AsynicWeb")來返回數據。

咱們也能夠簡單的定義一個404頁面:

Http404 = HttpResponse('<html><h1>404</h1></html>')
Http404.status_code = 404

4、路由映射

路由映射簡單理解就是從一個URL地址找到對應的邏輯函數。舉個例子,咱們訪問http://127.0.0.1:8000這個頁面,在http請求中它的url是"/",在web服務器中有一個函數index,web服務器可以由url地址"/"找到函數index,這就是一個路由映射。

其實路由映射實現起來很是簡單。咱們只要定義一個映射列表,列表中的每一個元素包含url和邏輯處理(視圖函數)兩部分,當一個http請求到達的時候,遍歷映射列表,使用正則匹配每個url,若是請求的url和映射表中的相同,咱們就能夠取出對應的視圖函數。

路由映射表是徹底由用戶來定義映射關係的,它應該使用一個咱們定義的標準結構,好比:

routers = [
    ('/$', IndexView),
    ('/home', asy)
]

 

5、類視圖和函數視圖

視圖是指可以根據一個請求,執行某些邏輯運算,最終返回響應的模塊。說到這裏,一個web框架的運行流程就出來了:

    http請求——路由映射表——視圖——執行視圖獲取返回值——http響應

在咱們的框架中,借鑑Django的設計,咱們讓它支持類視圖(CBV)和函數視圖(FBV)兩種模式。

對於函數視圖,徹底由用戶本身定義,只要至少可以接受一個request參數便可

對於類視圖,咱們須要作一些預處理,確保用戶按咱們的規則來實現類視圖。

定義一個View類:

1 class View(object):
2     # CBV應繼承View類
3     def dispatch(self, request):
4         method = request.method.lower()
5         if hasattr(self, method):
6             return getattr(self, method)(request)
7         else:
8             return Http404

 在View類中,咱們只寫了一個dispatch方法,其實就作了一件事:反射。當咱們在路由映射表中找對應的視圖時,若是判斷視圖屬於類,咱們就調用dispatch方法。

從用戶角度來看,實現一個CBV只須要繼承View類,而後經過定義get、post、delete等方法來實現不一樣的處理。

6、socket和多路複用

上面幾個小節實現了web框架的大致執行路徑,從這節開始咱們實現web服務器的核心。

經過IO多路複用能夠達到單線程實現高併發的效果,一個標準的IO多路複用寫法:

 1 server = socket(AF_INET, SOCK_STREAM)
 2 server.bind(("127.0.0.1", 8000))
 3 server.setblocking(False) # 設置非阻塞
 4 server.listen(128)
 5 Future_Task_Wait = {}
 6 rlist = [server, ]
 7 while True:
 8     r, w, x = select.select(rlist, [], [], 0.1)
 9     for o in r:
10         if o == server:
11             '''判斷o是server仍是conn'''
12             conn, addr = o.accept()
13             conn.setblocking(False) # 設置非阻塞
14             rlist.append(conn) # 客戶鏈接 加入輪詢列表
15         else:
16             data = b""
17             while True: # 接收客戶傳輸數據
18                 try:
19                     chunk = o.recv(1024)
20                     data = data + chunk
21                 except Exception as e:
22                     chunk = None
23                 if not chunk:
24                     break
25             dosomething(o, data, routers) # 拿到數據乾點啥

 經過這段代碼咱們能夠得到全部的請求了,下一步就是處理這些請求。

咱們就定義一個dosomething函數

 1 import re
 2 import time
 3 from types import FunctionType
 4 
 5 def dosomething(o, data, routers):
 6     '''解析http請求,尋找映射函數並執行獲得結果
7 :param o: socket鏈接對象 8 :param data: socket接收數據 9 :return: 響應結果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判斷targe是函數仍是類 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result

這段代碼作了這麼幾件事。1.實例化HttpRequest;2.使用正則遍歷路由映射表;3.將request傳入視圖函數或類視圖的dispatch方法;4.拿到result結果

咱們經過result = dosomething(o, data, routers)能夠拿到結果,接下來咱們只須要把結果發回給客戶端並斷開鏈接就能夠了

o.sendall(result.content)  # 因爲result是一個HttpResponse對象 咱們使用content屬性
rlist.remove(o) # 從輪詢中刪除鏈接
o.close() # 關閉鏈接

至此,咱們的web框架已經搭建好了。

但它仍是一個同步的框架,在咱們的服務端中,其實一直經過while循環在監聽select是否變化,假如咱們在視圖函數中添加IO操做,其餘鏈接依然會阻塞等待,接下來讓咱們的框架實現對協程的支持。

7、協程支持

在實現協程以前,咱們先聊聊Tornado的Future對象。能夠說Tornado異步非阻塞的實現核心就是Future。

Future對象內部維護了一個重要屬性_result,這是一個標記位,一個剛實例化的Future內部的_result=None,咱們能夠經過其餘操做來更改_result的狀態。另外一方面,咱們能夠一直監聽每一個Future對象的_result狀態,若是發生變化就執行某些特定的操做。

咱們在第六節定義的dosomething函數中拿到了一個result,它應當是一個HttpResponse對象,那麼能不能返回一個Future對象呢。

假如result是一個Future對象,咱們的服務端不立馬返回結果,而是把Future放進另外一個輪詢列表中,當Future內的_result改變時再返回結果,就達到了異步的效果。

咱們也能夠定義一個Future類,這個類維護只一個變量result:

1 class Future(object):
2     def __init__(self):
3         self.result = None

 對於框架使用者來講,在視圖函數要麼返回一個HttpResponse對象表明當即返回,要麼返回一個Future對象說你先別管我,我把事情幹完了再通知你返回結果。

既然視圖函數返回的可能不僅是HttpResponse對象,那麼咱們就須要對第六步的代碼增長額外的處理:

Future_Task_Wait = {} # 定義一個異步Future字典
result = dosomething() # 拿到結果後執行下面判斷
if isinstance(result, Future):
    Future_Task_Wait[o] = result # Futre對象則加入字典
else:
    o.sendall(result.content) # 非Future對象直接返回結果並斷開鏈接
    rlist.remove(o)
    o.close()

在while True輪詢內再增長一段代碼,遍歷Future_Task_Wait字典:

rm_conn = [] # 須要移除列表的conn
for conn, future in Future_Task_Wait.items():
    if future.result:
        try:
            conn.sendall(HttpResponse(data=future.result).content) # 返回result
        finally:
            rlist.remove(conn) 
            conn.close()
            rm_conn.append(conn)
for conn in rm_conn: # 在字典中刪除conn
    del Future_Task_Wait[conn]

 這樣,咱們就能夠返回一個Future來告訴服務器這是未來才返回的對象。

那回歸正題,咱們到底該如何使用協程?這裏我用的方法是建立一個子線程來執行協程事件循環,主線程永遠在監聽socket。

from threading import Thread
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
coroutine_loop = asyncio.new_event_loop()  # 建立協程事件循環
run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,))  # 新起線程運行事件循環, 防止阻塞主線程
run_loop_thread.start()  # 運行線程,即運行協程事件循環

當咱們要把asyncdo方法添加做爲協程任務時

asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)

好了,異步非阻塞的核心代碼分析的差很少了,將六七節的代碼整合寫成一個類

  1 import re
  2 import time
  3 import select
  4 import asyncio
  5 from socket import *
  6 from threading import Thread
  7 from types import FunctionType
  8 from http.response import Http404, HttpResponse
  9 from http.request import HttpRequest
 10 from views import View
 11 from core.future import Future
 12 
 13 class App(object):
 14     # web應用程序
 15     coroutine_loop = None
 16 
 17     def __new__(cls, *args, **kwargs):
 18         # 使用單例模式
 19         if not hasattr(cls, '_instance'):
 20             App._instance = super().__new__(cls)
 21         return App._instance
 22 
 23     def listen(self, host, port, routers):
 24         # IO多路複用監聽鏈接
 25         server = socket(AF_INET, SOCK_STREAM)
 26         server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
 27         server.bind((host, port))
 28         server.setblocking(False)
 29         server.listen(128)
 30         Future_Task_Wait = {}
 31         rlist = [server, ]
 32         while True:
 33             r, w, x = select.select(rlist, [], [], 0.01)
 34             for o in r:
 35                 if o == server:
 36                     '''判斷o是server仍是conn'''
 37                     conn, addr = o.accept()
 38                     conn.setblocking(False)
 39                     rlist.append(conn)
 40                 else:
 41                     data = b""
 42                     while True:
 43                         try:
 44                             chunk = o.recv(1024)
 45                             data = data + chunk
 46                         except Exception as e:
 47                             chunk = None
 48                         if not chunk:
 49                             break
 50                     try:
 51                         request = HttpRequest(data, o)
 52                         print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0],
 53                               request.method, request.url)
 54                         flag = False
 55                         for router in routers:
 56                             if re.match(router[0], request.url):
 57                                 target = router[1]
 58                                 flag = True
 59                                 break
 60                         if flag:
 61                             # 判斷targe是函數仍是類
 62                             if isinstance(target, FunctionType):
 63                                 result = target(request)
 64                             elif issubclass(target, View):
 65                                 result = target().dispatch(request)
 66                             else:
 67                                 result = Http404
 68                         else:
 69                             result = Http404
 70                         # 判斷result是否是future
 71                         if isinstance(result, Future):
 72                             Future_Task_Wait[o] = result
 73                         else:
 74                             o.sendall(result.content)
 75                             rlist.remove(o)
 76                             o.close()
 77                     except Exception as e:
 78                         print(e)
 79             rm_conn = []
 80             for conn, future in Future_Task_Wait.items():
 81                 if future.result:
 82                     try:
 83                         conn.sendall(HttpResponse(data=future.result).content)
 84                     finally:
 85                         rlist.remove(conn)
 86                         conn.close()
 87                         rm_conn.append(conn)
 88             for conn in rm_conn:
 89                 del Future_Task_Wait[conn]
 90 
 91     def run(self, host='127.0.0.1', port=8000, routers=()):
 92         # 主線程select多路複用,處理http請求和響應
 93         # 給協程單首創建一個子線程,負責處理View函數提交的協程
 94         def start_loop(loop):
 95             asyncio.set_event_loop(loop)
 96             loop.run_forever()
 97         self.coroutine_loop = asyncio.new_event_loop()  # 建立協程事件循環
 98         run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,))  # 新起線程運行事件循環, 防止阻塞主線程
 99         run_loop_thread.start()  # 運行線程,即運行協程事件循環
100         self.listen(host, port, routers)

8、框架測試

如今,能夠測試咱們的web框架了。

 1 import asyncio
 2 from core.server import App
 3 from views import View
 4 from http.response import *
 5 from core.future import Future
 6 
 7 
 8 class IndexView(View):
 9     def get(self, request):
10         return HttpResponse('歡迎來到首頁')
11 
12     def post(self, request):
13         return HttpResponse('post')
14 
15 def asy(request):
16     future = Future()
17     print('異步調用')
18     wait = request.url.split('/')[-1]
19     try:
20         wait = int(wait)
21     except:
22         wait = 5
23     asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop)
24     print('返回Future')
25     return future
26 
27 async def dosomething(future, wait):
28     # 異步函數
29     await asyncio.sleep(wait)# 模擬異步操做
30     future.result = '等待了%s秒' % wait
31 
32 routers = [
33     ('/$', IndexView),
34     ('/home', asy)
35 ]
36 
37 # 從用戶角度只需使用run()
38 app = App()
39 app.run('127.0.0.1', 8080, routers=routers)

瀏覽器訪問http://127.0.0.1:8080,返回沒有問題,若是有同窗使用Chrome可能會亂碼,那是由於咱們的HttpResponse沒有返回指定編碼,添加一個響應頭便可。

瀏覽器訪問http://127.0.0.1:8080/home,這時候會執行協程,默認等待5s後返回結果,你能夠在多個標籤頁訪問這個地址,經過等待時間來驗證咱們的異步框架是否正常工做。

9、其餘

至此,咱們要實現的異步非阻塞web框架已經完成了。固然這個框架說到底仍是太簡陋,後續徹底能夠優化HttpRequest和HttpResponse、增長對數據庫、模板語言等等組件的擴展。

完整源碼已經上傳至https://github.com/sswest/AsyncWeb

相關文章
相關標籤/搜索