深刻tornado中的IOStream

IOStream對tornado的高效起了很大的做用,他封裝了socket的非阻塞IO的讀寫操做。大致上能夠這麼說,當鏈接創建後,服務端與客戶端的請求響應都是基於IOStream的,也就是說:IOStream是用來處理鏈接的。ios

接下來講一下有關接收請求的大致流程:緩存

  當鏈接創建,服務器端會產生一個對應該鏈接的socket,同時將該socket封裝至IOStream實例中(這表明着IOStream的初始化)。服務器

  咱們知道tornado是基於IO多路複用的(就拿epoll來講),此時將socket進行register,事件爲READABLE,這一步與IOStream沒有多大關係。 異步

  當該socket事件發生時,也就是意味着有數據從鏈接發送到了系統緩衝區中,這時就須要將chunk讀入到咱們在內存中爲其開闢的_read_buffer中,在IOStream中使用deque做爲buffer。_read_buffer表示讀緩衝,固然也有_write_buffer,但無論是讀緩衝仍是寫緩衝本質上就是tornado進程開闢的一段用來存儲數據的內存。socket

  而這些chunk通常都是客戶端發送的請求了,可是咱們還須要對這些chunk做進一步操做,好比這個chunk中可能包含了多個請求,如何把請求分離?(每一個請求首部的結束符是b'\r\n\r\n'),這裏就用到read_until來分離請求並設置callback了。同時會將被分離的請求數據從_read_buffer中移除。ide

  而後就是將callback以及他的參數(被分離的請求數據)添加至IOLoop._callbacks中,等待下一次IOLoop的執行,屆時會迭代_callbacks並執行回調函數。函數

  

  補充: tornado是水平觸發,因此假如讀完一次chunk後系統緩存區中依然還有數據,那麼下一次的epoll.poll()依然會返回該socket。tornado

 

在iostream中有一個類叫作:IOStream  oop

有幾個較爲重要的屬性:3d

複製代碼

def __init__():
    self.socket = socket           # 封裝socket 
    self.socket.setblocking(False) # 設置socket爲非阻塞
    self.io_loop = io_loop or ioloop.IOLoop.current()    
    self._read_buffer = deque()    # 讀緩衝
    self._write_buffer = deque()   # 寫緩衝 
    self._read_callback = None     # 讀到指定字節數據時,或是指定標誌字符串時,須要執行的回調函數
    self._write_callback = None    # 發送完_write_buffer的數據時,須要執行的回調函數

複製代碼

有幾個較爲重要的方法

class IOStream(object):    def read_until(self, delimiter, callback): 
    def read_bytes(self, num_bytes, callback, streaming_callback=None): 
    def read_until_regex(self, regex, callback): 
    def read_until_close(self, callback, streaming_callback=None): 
    def write(self, data, callback=None):

以上全部的方法都須要一個可選的callback參數,若是該參數爲None則該方法會返回一個Future對象。

以上全部的讀方法本質上都是讀取該socket所發送來的數據,而後當讀到指定分隔符或者標記的時候,中止讀,而後將該分隔符以及其前面的數據做爲callback(若是沒有callback,則將數據設置爲Future對象的result)的參數,而後將callback添加至IOLoop._callbacks中。固然其中全部的"讀"操做是非阻塞的!
就拿最爲常見的read_until方法來講,下面是代碼簡化版:

    def read_until(self, delimiter, callback=None, max_bytes=None):
        future = self._set_read_callback(callback)     # 多是Future對象,也多是None
        self._read_delimiter = delimiter          # 設置分隔符
        self._read_max_bytes = max_bytes          # 設置最大讀字節數
        self._try_inline_read()        return future

其中_set_read_callback會根據callback是否存在返回None或者Future對象(存在返回None,不然返回一個Future實例對象)

若是咱們
再來看_try_inline_read方法的簡化版:

複製代碼

def _try_inline_read(self):        """
            嘗試從_read_buffer中讀取所需數據        """
        # 查看是否咱們已經在以前的讀操做中獲得了數據
        self._run_streaming_callback() # 字符流回調,通常是讀操做沒有完全讀夠而處於streaming狀態,通常默認是None,若是調用read_bytes和read_until_close並指定了streaming_callback參數就會形成這個回調
        pos = self._find_read_pos()       # 嘗試在_read_buffer中找到分隔符的位置。找到則返回分隔符末尾所處的位置,若是不能,則返回None。
        if pos is not None:
            self._read_from_buffer(pos)            return

        self._check_closed()           # 檢查當前IOStream是否關閉
        pos = self._read_to_buffer_loop()  # 從系統緩衝中讀取一個chunk,檢查是否含有分隔符,沒有則繼續讀取一個chunk,合併兩個chunk,再次檢查是否函數分隔符…… 若是找到了分隔符,會返回分隔符末尾在_read_buffer中所處的位置
        if pos is not None:                # 若是找到了分隔符,
            self._read_from_buffer(pos)    # 將所需的數據從_read_buffer中移除,並將其做爲callback的參數,而後將callback封裝後添加至IOLoop._callbacks中     
            return

        # 沒找到分隔符,要麼關閉IOStream,要麼爲該socket在IOLoop中註冊事件
        if self.closed():     
            self._maybe_run_close_callback()        else:
            self._add_io_state(ioloop.IOLoop.READ)

複製代碼

上面的代碼被我用空行分爲了三部分,每一部分順序的對應下面每一句話

分析該方法:

  1 首先在_read_buffer第一項中找分隔符,找到了就將分隔符以及其前的數據從_read_buffer中移除並將其做爲參數傳入回調函數,沒找到就將第二項與第一項合併而後繼續找……;

  2 若是在_read_buffer全部項中都沒找到的話就把系統緩存中的數據讀取至_read_buffer,而後合併再次查找,

  3 若是把系統緩存中的數據都取完了都還沒找到,那麼就等待下一次該socket發生READ事件後再找,這時的找則就是:將系統緩存中的數據讀取到_read_buffer中而後找,也就是執行第2步。

 來看一看這三部分分別調用了什麼方法:

第一部分中的_find_read_pos以及_read_from_buffer

前者主要是在_read_buffer中查找分隔符,並返回分隔符的位置,後者則是將分隔符以及分隔符前面的全部數據從_read_buffer中取出並將其做爲callback的參數,而後將callback封裝後添加至IOLoop._callbacks中

來看_find_read_pos方法的簡化版:

 _find_read_pos

 _read_from_buffer

 _run_read_callback

這裏面還用到一個頗有意思的函數:_merge_prefix ,這個函數的做用就是將deque的首項調整爲指定大小

 _merge_prefix

 

第二部分的_read_to_buffer_loop

 _read_to_buffer_loop

 

第三部分_add_io_state,該函數和ioloop異步相關

 _add_io_state

相關文章
相關標籤/搜索