一個寫文檔的開發者,其實就是個 Dockerhtml
上一期咱們熟悉了應用場景和測試,這一期咱們實現receive
函數。python
先重溫一下 API:git
class HTTP2Protocol:
def receive(self, data: bytes):
pass
def send(self, stream: Stream):
pass
複製代碼
咱們的總體設計思路是 Event Driven + Mutable State.github
Event Driven:gethy
內部自定義一些事件(Event),HTTP2Protocol
的 Public API 只會返回這些 Event 而已。api
Mutable State:HTTP2Protocol
內部會管理兩個緩衝(Buffer),一個inbound_buffer
儲存接收的數據,一個outbound_buffer
儲存須要發送的數據。這兩個 Buffer 都是私有的,用戶不該該使用。根據不一樣的事件,HTTP2Protocol
會向 Buffer 添加數據或者清除數據。數組
如今,咱們來看更具體的函數簽名:bash
# http2protocol.py
from typing import List
import h2.config
import h2.connection
import h2.events
import h2.exceptions
from h2.events import (
RequestReceived,
DataReceived,
WindowUpdated,
StreamEnded
)
from gethy.event import H2Event
class HTTP2Protocol:
def __init__(self):
self.current_events = []
self.request_buffer = {} # input buffer
self.response_buffer = {} # output buffer, not used in this tutorial
config = h2.config.H2Configuration(client_side=False, header_encoding='utf-8')
self.http2_connection = h2.connection.H2Connection(config=config)
def receive(self, data: bytes) -> List[H2Event]:
pass
複製代碼
current_events
:顧名思義,用來存放目前已知的事件。
request_buffer
:存放沒有接收完整的 Request Stream。
response_buffer
:存放沒有徹底發送的 Response Stream。服務器
固然,咱們還須要一個Stream
來表示一個數據流。app
class Stream:
def __init__(self, stream_id: int, headers):
self.stream_id = stream_id
self.headers = headers # as the name indicates
# when stream_ended is True
# buffered_data has to be None
# and data has to be a bytes
#
# if buffered_data is empty
# then both buffered_data and data have to be None when stream_ended is True
#
# should write a value enforcement contract decorator for it
self.stream_ended = False
self.buffered_data = []
self.data = None
複製代碼
在實現以前,咱們先來看看流程圖。框架
def receive(self, data: bytes):
""" receive bytes, return HTTP Request object if any stream is ready else return None :param data: bytes, received from a socket :return: list, of Request """
# First, proceed incoming data
# handle any events emitted from h2
events = self.http2_connection.receive_data(data)
for event in events:
self._handle_event(event)
self._parse_request_buffer()
events = self.current_events # assign all current events to an events variable and return this variable
self.current_events = [] # empty current event list by assign a newly allocated list
return events
複製代碼
這裏就將receive
函數寫好了,接下來實現_handle_event
和_parse_request_buffer
。
Handle events 的部分由幾個重要的函數組成。
def _handle_event(self, event: h2.events.Event):
# RequestReceived 的命名可能產生誤解。
# 這裏不是說一個完整的 Request 收到了。
# 而是說,Headers 收到了。
if isinstance(event, h2.events.RequestReceived):
self._request_headers_received(event)
elif isinstance(event, h2.events.DataReceived):
self._data_received(event)
else:
logging.info("Has not implement %s handler" % type(event))
複製代碼
首先_handle_event
要判斷是哪一種 h2 事件。咱們用if/else
來將事件導流到相應的函數去。本期只關心 Request(Headers&Data),其他事件簡單地打印出來。
注:這裏的 h2 事件其實和 HTTP/2 的 frame 有直接的關係。一個 Request 事件其實就是一個 Request Frame。一個 Data 事件其實就是一個 Data Frame。
參考文檔:
Hyper-h2 API
http2 FramingLayer
def _request_headers_received(self, event: RequestReceived):
self.request_buffer[event.stream_id] = Stream(event.stream_id, event.headers)
if event.priority_updated:
logging.warning("RequestReceived.priority_updated is not handled")
if event.stream_ended:
self._stream_ended(event.stream_ended)
複製代碼
這個 event 裏有stream_id
&headers
,將其拿到並構造一個Stream
實例。若是數據流結束,則調用_stream_ended
。這裏stream_ended == True
的意思就是這個 Request 只有 Headers。一般的GET
或者POST url param encoded
就屬於這個類型。不少框架甚至不容許GET
帶有 Request Body/Data。
def _data_received(self, event: DataReceived):
self.request_buffer[event.stream_id].buffered_data.append(event.data)
if event.stream_ended:
self._stream_ended(event.stream_ended)
複製代碼
Request 也能夠帶有 Data,因此就會觸發這個事件。這裏request_buffer[event.stream_id]
是必定不能觸發KeyError
的,由於只有可能先接收 Headers,再接收 Data。若是有 KeyError,那麼八阿哥必定潛伏於某處。這裏stream_ended == True
就說明 Request 完整接收了。
def _stream_ended(self, event: StreamEnded):
stream = self.request_buffer[event.stream_id]
stream.stream_ended = True
stream.data = b''.join(stream.buffered_data)
stream.buffered_data = None
複製代碼
當接收完一個 Request 數據流後,將Stream
實例的狀態作一些調整。
這樣,咱們就將全部數據都處理好了。如今的任務就是將緩衝掃描一遍,看有沒有指的返回的東西。
def _parse_request_buffer(self):
""" exercise all inbound streams """
# This is a list of stream ids
streams_to_delete_from_request_buffer = []
# inbound_streams is a dictionary with schema {stream_id: stream_obj}
# therefore use .values()
for stream in self.request_buffer.values():
if stream.stream_ended:
# create a HTTP Request event, add it to current event list
event = RequestEvent(stream)
self.current_events.append(event)
# Emitting an event means to clear the cached inbound data
# The caller has to handle all returned events. Otherwise bad
streams_to_delete_from_request_buffer.append(stream.stream_id)
# clear the inbound cache
for stream_id in streams_to_delete_from_request_buffer:
del self.request_buffer[stream_id]
複製代碼
這裏的邏輯也簡單明瞭,檢查有沒有完整的 Request,有的話就構造一個完整的RequestEvent
,而後將其放到self.current_events
中。最後從緩衝中刪除相應的Stream
。
RequestEvent
定義以下:
# events.py
class H2Event:
pass
class RequestEvent(H2Event):
def __init__(self, stream):
self.stream = stream
複製代碼
純粹爲了代碼可讀性而定義的。
仔細的同窗可能會看到兩點:
_stream_ended
中就能夠完成這個函數中的全部操做,沒有必要再 loop 一遍浪費時間。current_events
,而不是更改對象的值。徹底正確,這裏我爲了你們看得簡單明瞭,因此選擇了更簡潔,可是效率稍微慢一點的實現。
到這裏你就實現了一個徹底正確可用的 HTTP/2 服務器端的接收功能。下一期就要實現發送了。
視頻對文章進行補充,感興趣就去看看吧!代碼在 GitHub,喜歡給個🌟唄!
B 站
油膩的管道(你留言我就上傳)
上期 下期(還沒寫)