深刻理解 tornado 之 底層 ioloop 實現(二)

ioloop 分析

首先要看的是關於 epoll 操做的方法,還記得前文說過的 epoll 只須要四個 api 就能徹底操做嘛? 咱們來看 PollIOLoop 的實現:linux

epoll 操做

 1     def add_handler(self, fd, handler, events):
 2         fd, obj = self.split_fd(fd)
 3         self._handlers[fd] = (obj, stack_context.wrap(handler))
 4         self._impl.register(fd, events | self.ERROR)
 5     def update_handler(self, fd, events):
 6         fd, obj = self.split_fd(fd)
 7         self._impl.modify(fd, events | self.ERROR)
 8     def remove_handler(self, fd):
 9         fd, obj = self.split_fd(fd)
10         self._handlers.pop(fd, None)
11         self._events.pop(fd, None)
12         try:
13             self._impl.unregister(fd)
14             except Exception:
15                 gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

epoll_ctl:這個三個方法分別對應 epoll_ctl 中的 add 、 modify 、 del 參數。 因此這三個方法實現了 epoll 的 epoll_ctl 。web

epoll_create:而後 epoll 的生成在前文 EPollIOLoop 的初始化中就已經完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 這個至關於 epoll_create 。api

epoll_wait:epoll_wait 操做則在 start() 中:event_pairs = self._impl.poll(poll_timeout)服務器

epoll_close:而 epoll 的 close 則在 PollIOLoop 中的 close 方法內調用: self._impl.close() 完成。app

initialize

接下來看 PollIOLoop 的初始化方法中做了什麼:ide

 1     def initialize(self, impl, time_func=None, **kwargs):
 2             super(PollIOLoop, self).initialize(**kwargs)
 3             self._impl = impl                         # 指定 epoll
 4             if hasattr(self._impl, 'fileno'):
 5                 set_close_exec(self._impl.fileno())   # fork 後關閉無用文件描述符
 6             self.time_func = time_func or time.time   # 指定獲取當前時間的函數
 7             self._handlers = {}                       # handler 的字典,儲存被 epoll 監聽的 handler,與打開它的文件描述符 ( file descriptor 簡稱 fd ) 一一對應
 8             self._events = {}                         # event 的字典,儲存 epoll 返回的活躍的 fd event pairs
 9             self._callbacks = []                      # 儲存各個 fd 回調函數的列表
10             self._callback_lock = threading.Lock()    # 指定進程鎖
11             self._timeouts = []                       # 將是一個最小堆結構,按照超時時間從小到大排列的 fd 的任務堆( 一般這個任務都會包含一個 callback )
12             self._cancellations = 0                   # 關於 timeout 的計數器
13             self._running = False                     # ioloop 是否在運行
14             self._stopped = False                     # ioloop 是否中止
15             self._closing = False                     # ioloop 是否關閉
16             self._thread_ident = None                 #  當前線程堆標識符 ( thread identify )
17             self._blocking_signal_threshold = None    # 系統信號, 主要用來在 epoll_wait 時判斷是否會有 signal alarm 打斷 epoll
18             self._timeout_counter = itertools.count() # 超時計數器 ( 暫時不是很明白具體做用,好像和前面的 _cancellations 有關係? 請大神講講)
19             self._waker = Waker()                     # 一個 waker 類,主要是對於管道 pipe 的操做,由於 ioloop 屬於底層的數據操做,這裏 epoll 監聽的是 pipe
20             self.add_handler(self._waker.fileno(),
21                              lambda fd, events: self._waker.consume(),
22                              self.READ)               # 將管道加入 epoll 監聽,對於 web server 初始化時只須要關心 READ 事件

除了註釋中的解釋,還有幾點補充:函數

  1. close_exec 的做用: 子進程在fork出來的時候,使用了寫時複製(COW,Copy-On-Write)方式得到父進程的數據空間、 堆和棧副本,這其中也包括文件描述符。剛剛fork成功時,父子進程中相同的文件描述符指向系統文件表中的同一項,接着,通常咱們會調用exec執行另外一個程序,此時會用全新的程序替換子進程的正文,數據,堆和棧等。此時保存文件描述符的變量固然也不存在了,咱們就沒法關閉無用的文件描述符了。因此一般咱們會fork子進程後在子進程中直接執行close關掉無用的文件描述符,而後再執行exec。 因此 close_exec 執行的其實就是 關閉 + 執行的做用。 詳情能夠查看: 關於linux進程間的close-on-exec機制

  2.Waker(): Waker 封裝了對於管道 pipe 的操做:oop

 1      def set_close_exec(fd):
 2          flags = fcntl.fcntl(fd, fcntl.F_GETFD)
 3          fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
 4      def _set_nonblocking(fd):
 5          flags = fcntl.fcntl(fd, fcntl.F_GETFL)
 6          fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
 7      class Waker(interface.Waker):
 8          def __init__(self):
 9              r, w = os.pipe()
10              _set_nonblocking(r)
11              _set_nonblocking(w)
12              set_close_exec(r)
13              set_close_exec(w)
14              self.reader = os.fdopen(r, "rb", 0)
15              self.writer = os.fdopen(w, "wb", 0)
16          def fileno(self):
17              return self.reader.fileno()
18          def write_fileno(self):
19              return self.writer.fileno()
20          def wake(self):
21              try:
22                  self.writer.write(b"x")
23              except IOError:
24                  pass
25          def consume(self):
26              try:
27                  while True:
28                      result = self.reader.read()
29                      if not result:
30                          break
31              except IOError:
32                  pass
33          def close(self):
34              self.reader.close()
35              self.writer.close()
36      ```

能夠看到 waker 把 pipe 分爲讀、 寫兩個管道並都設置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中寫入隨意字符從而釋放管道。spa

start

ioloop 最核心的部分:.net

 1     def start(self):
 2             if self._running:       # 判斷是否已經運行
 3                 raise RuntimeError("IOLoop is already running")
 4             self._setup_logging()
 5             if self._stopped:
 6                 self._stopped = False  # 設置中止爲假
 7                 return
 8             old_current = getattr(IOLoop._current, "instance", None)
 9             IOLoop._current.instance = self
10             self._thread_ident = thread.get_ident()  # 得到當前線程標識符
11             self._running = True # 設置運行
12             old_wakeup_fd = None
13             if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
14                 try:
15                     old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
16                     if old_wakeup_fd != -1:
17                         signal.set_wakeup_fd(old_wakeup_fd)
18                         old_wakeup_fd = None
19                 except ValueError:
20                     old_wakeup_fd = None
21             try:
22                 while True:  # 服務器進程正式開始,相似於其餘服務器的 serve_forever
23                     with self._callback_lock: # 加鎖,_callbacks 作爲臨界區不加鎖進行讀寫會產生髒數據
24                         callbacks = self._callbacks # 讀取 _callbacks
25                         self._callbacks = []. # 清空 _callbacks
26                     due_timeouts = [] # 用於存放這個週期內已過時( 已超時 )的任務
27                     if self._timeouts: # 判斷 _timeouts 裏是否有數據
28                         now = self.time() # 獲取當前時間,用來判斷 _timeouts 裏的任務有沒有超時
29                         while self._timeouts: # _timeouts 有數據時一直循環, _timeouts 是個最小堆,第一個數據永遠是最小的, 這裏第一個數據永遠是最接近超時或已超時的
30                             if self._timeouts[0].callback is None: # 超時任務無回調
31                                 heapq.heappop(self._timeouts) # 直接彈出
32                                 self._cancellations -= 1 # 超時計數器 -1
33                             elif self._timeouts[0].deadline <= now: # 判斷最小的數據是否超時
34                                 due_timeouts.append(heapq.heappop(self._timeouts)) # 超時就加到已超時列表裏。
35                             else:
36                                 break # 由於最小堆,若是沒超時就直接退出循環( 後面的數據一定未超時 )
37                         if (self._cancellations > 512
38                                 and self._cancellations > (len(self._timeouts) >> 1)):  # 當超時計數器大於 512 而且 大於 _timeouts 長度一半( >> 爲右移運算, 至關於十進制數據被除 2 )時,清零計數器,並剔除 _timeouts 中無 callbacks 的任務
39                             self._cancellations = 0
40                             self._timeouts = [x for x in self._timeouts
41                                               if x.callback is not None]
42                             heapq.heapify(self._timeouts) # 進行 _timeouts 最小堆化
43                     for callback in callbacks:
44                         self._run_callback(callback) # 運行 callbacks 裏全部的 calllback
45                     for timeout in due_timeouts:
46                         if timeout.callback is not None:
47                             self._run_callback(timeout.callback) # 運行全部已過時任務的 callback
48                     callbacks = callback = due_timeouts = timeout = None # 釋放內存
49                     if self._callbacks: # _callbacks 裏有數據時
50                         poll_timeout = 0.0 # 設置 epoll_wait 時間爲0( 當即返回 )
51                     elif self._timeouts: # _timeouts 裏有數據時
52                         poll_timeout = self._timeouts[0].deadline - self.time() 
53                         # 取最小過時時間當 epoll_wait 等待時間,這樣當第一個任務過時時當即返回
54                         poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
55                         # 若是最小過時時間大於默認等待時間 _POLL_TIMEOUT = 3600,則用 3600,若是最小過時時間小於0 就設置爲0 當即返回。
56                     else:
57                         poll_timeout = _POLL_TIMEOUT # 默認 3600 s 等待時間
58                     if not self._running: # 檢查是否有系統信號中斷運行,有則中斷,無則繼續
59                         break
60                     if self._blocking_signal_threshold is not None:
61                         signal.setitimer(signal.ITIMER_REAL, 0, 0) # 開始 epoll_wait 以前確保 signal alarm 都被清空( 這樣在 epoll_wait 過程當中不會被 signal alarm 打斷 )
62                     try:
63                         event_pairs = self._impl.poll(poll_timeout) # 獲取返回的活躍事件隊
64                     except Exception as e:
65                         if errno_from_exception(e) == errno.EINTR:
66                             continue
67                         else:
68                             raise
69                     if self._blocking_signal_threshold is not None:
70                         signal.setitimer(signal.ITIMER_REAL,
71                                          self._blocking_signal_threshold, 0) #  epoll_wait 結束, 再設置 signal alarm
72                     self._events.update(event_pairs) # 將活躍事件加入 _events
73                     while self._events:
74                         fd, events = self._events.popitem() # 循環彈出事件
75                         try:
76                             fd_obj, handler_func = self._handlers[fd] # 處理事件
77                             handler_func(fd_obj, events)
78                         except (OSError, IOError) as e:
79                             if errno_from_exception(e) == errno.EPIPE:
80                                 pass
81                             else:
82                                 self.handle_callback_exception(self._handlers.get(fd))
83                         except Exception:
84                             self.handle_callback_exception(self._handlers.get(fd))
85                     fd_obj = handler_func = None
86             finally:
87                 self._stopped = False # 確保發生異常也繼續運行
88                 if self._blocking_signal_threshold is not None:
89                     signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm
90                 IOLoop._current.instance = old_current 
91                 if old_wakeup_fd is not None:
92                     signal.set_wakeup_fd(old_wakeup_fd)   # 和 start 開頭部分對應,可是不是很清楚做用,求老司機帶帶路

最後來看 stop:

stop

1     def stop(self):
2         self._running = False
3         self._stopped = True
4         self._waker.wake()

這個很簡單,設置判斷條件,而後調用 self._waker.wake() 向 pipe 寫入隨意字符喚醒 ioloop 事件循環(感謝 mlcyng 指正這裏的錯誤)。 over!

總結

噗,寫了這麼長,終於寫完了。 通過分析,咱們能夠看到, ioloop 其實是對 epoll 的封裝,並加入了一些對上層事件的處理和 server 相關的底層處理。

最後,感謝你們任勞任怨看到這,文中理解有誤的地方還請多多指教!

相關文章
相關標籤/搜索