IOStream對tornado的高效起了很大的做用,他封裝了socket的非阻塞IO的讀寫操做。大致上能夠這麼說,當鏈接創建後,服務端與客戶端的請求響應都是基於IOStream的,也就是說:IOStream是用來處理鏈接的。ios
接下來講一下有關接收請求的大致流程:緩存
當鏈接創建,服務器端會產生一個對應該鏈接的socket,同時將該socket封裝至IOStream實例中(這表明着IOStream的初始化)。服務器
咱們知道tornado是基於IO多路複用的(就拿epoll來講),此時將socket進行register,事件爲READABLE,這一步與IOStream沒有多大關係。 異步
當該socket事件發生時,也就是意味着有數據從鏈接發送到了系統緩衝區中,這時就須要將chunk讀入到咱們在內存中爲其開闢的_read_buffer中,在IOStream中使用deque做爲buffer。_read_buffer表示讀緩衝,固然也有_write_buffer,但無論是讀緩衝仍是寫緩衝本質上就是tornado進程開闢的一段用來存儲數據的內存。socket
而這些chunk通常都是客戶端發送的請求了,可是咱們還須要對這些chunk做進一步操做,好比這個chunk中可能包含了多個請求,如何把請求分離?(每一個請求首部的結束符是b'\r\n\r\n'),這裏就用到read_until來分離請求並設置callback了。同時會將被分離的請求數據從_read_buffer中移除。ide
而後就是將callback以及他的參數(被分離的請求數據)添加至IOLoop._callbacks中,等待下一次IOLoop的執行,屆時會迭代_callbacks並執行回調函數。函數
補充: tornado是水平觸發,因此假如讀完一次chunk後系統緩存區中依然還有數據,那麼下一次的epoll.poll()依然會返回該socket。tornado
在iostream中有一個類叫作:IOStream oop
有幾個較爲重要的屬性:3d
def __init__(): self.socket = socket # 封裝socket self.socket.setblocking(False) # 設置socket爲非阻塞 self.io_loop = io_loop or ioloop.IOLoop.current() self._read_buffer = deque() # 讀緩衝 self._write_buffer = deque() # 寫緩衝 self._read_callback = None # 讀到指定字節數據時,或是指定標誌字符串時,須要執行的回調函數 self._write_callback = None # 發送完_write_buffer的數據時,須要執行的回調函數
有幾個較爲重要的方法
class IOStream(object): def read_until(self, delimiter, callback): def read_bytes(self, num_bytes, callback, streaming_callback=None): def read_until_regex(self, regex, callback): def read_until_close(self, callback, streaming_callback=None): def write(self, data, callback=None):
以上全部的方法都須要一個可選的callback參數,若是該參數爲None則該方法會返回一個Future對象。
以上全部的讀方法本質上都是讀取該socket所發送來的數據,而後當讀到指定分隔符或者標記的時候,中止讀,而後將該分隔符以及其前面的數據做爲callback(若是沒有callback,則將數據設置爲Future對象的result)的參數,而後將callback添加至IOLoop._callbacks中。固然其中全部的"讀"操做是非阻塞的!
就拿最爲常見的read_until方法來講,下面是代碼簡化版:
def read_until(self, delimiter, callback=None, max_bytes=None): future = self._set_read_callback(callback) # 多是Future對象,也多是None self._read_delimiter = delimiter # 設置分隔符 self._read_max_bytes = max_bytes # 設置最大讀字節數 self._try_inline_read() return future
其中_set_read_callback會根據callback是否存在返回None或者Future對象(存在返回None,不然返回一個Future實例對象)
若是咱們
再來看_try_inline_read方法的簡化版:
def _try_inline_read(self): """ 嘗試從_read_buffer中讀取所需數據 """ # 查看是否咱們已經在以前的讀操做中獲得了數據 self._run_streaming_callback() # 字符流回調,通常是讀操做沒有完全讀夠而處於streaming狀態,通常默認是None,若是調用read_bytes和read_until_close並指定了streaming_callback參數就會形成這個回調 pos = self._find_read_pos() # 嘗試在_read_buffer中找到分隔符的位置。找到則返回分隔符末尾所處的位置,若是不能,則返回None。 if pos is not None: self._read_from_buffer(pos) return self._check_closed() # 檢查當前IOStream是否關閉 pos = self._read_to_buffer_loop() # 從系統緩衝中讀取一個chunk,檢查是否含有分隔符,沒有則繼續讀取一個chunk,合併兩個chunk,再次檢查是否函數分隔符…… 若是找到了分隔符,會返回分隔符末尾在_read_buffer中所處的位置 if pos is not None: # 若是找到了分隔符, self._read_from_buffer(pos) # 將所需的數據從_read_buffer中移除,並將其做爲callback的參數,而後將callback封裝後添加至IOLoop._callbacks中 return # 沒找到分隔符,要麼關閉IOStream,要麼爲該socket在IOLoop中註冊事件 if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ)
上面的代碼被我用空行分爲了三部分,每一部分順序的對應下面每一句話
分析該方法:
1 首先在_read_buffer第一項中找分隔符,找到了就將分隔符以及其前的數據從_read_buffer中移除並將其做爲參數傳入回調函數,沒找到就將第二項與第一項合併而後繼續找……;
2 若是在_read_buffer全部項中都沒找到的話就把系統緩存中的數據讀取至_read_buffer,而後合併再次查找,
3 若是把系統緩存中的數據都取完了都還沒找到,那麼就等待下一次該socket發生READ事件後再找,這時的找則就是:將系統緩存中的數據讀取到_read_buffer中而後找,也就是執行第2步。
來看一看這三部分分別調用了什麼方法:
第一部分中的_find_read_pos以及_read_from_buffer
前者主要是在_read_buffer中查找分隔符,並返回分隔符的位置,後者則是將分隔符以及分隔符前面的全部數據從_read_buffer中取出並將其做爲callback的參數,而後將callback封裝後添加至IOLoop._callbacks中
來看_find_read_pos方法的簡化版:
_find_read_pos
_read_from_buffer
_run_read_callback
這裏面還用到一個頗有意思的函數:_merge_prefix ,這個函數的做用就是將deque的首項調整爲指定大小
_merge_prefix
第二部分的_read_to_buffer_loop
_read_to_buffer_loop
第三部分_add_io_state,該函數和ioloop異步相關
_add_io_state