首先要看的是關於 epoll 操做的方法,還記得前文說過的 epoll 只須要四個 api 就能徹底操做嘛? 咱們來看 PollIOLoop 的實現:linux
1 def add_handler(self, fd, handler, events): 2 fd, obj = self.split_fd(fd) 3 self._handlers[fd] = (obj, stack_context.wrap(handler)) 4 self._impl.register(fd, events | self.ERROR) 5 def update_handler(self, fd, events): 6 fd, obj = self.split_fd(fd) 7 self._impl.modify(fd, events | self.ERROR) 8 def remove_handler(self, fd): 9 fd, obj = self.split_fd(fd) 10 self._handlers.pop(fd, None) 11 self._events.pop(fd, None) 12 try: 13 self._impl.unregister(fd) 14 except Exception: 15 gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
epoll_ctl:這個三個方法分別對應 epoll_ctl 中的 add 、 modify 、 del 參數。 因此這三個方法實現了 epoll 的 epoll_ctl 。web
epoll_create:而後 epoll 的生成在前文 EPollIOLoop 的初始化中就已經完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
。 這個至關於 epoll_create 。api
epoll_wait:epoll_wait 操做則在 start()
中:event_pairs = self._impl.poll(poll_timeout)
服務器
epoll_close:而 epoll 的 close 則在 PollIOLoop 中的 close
方法內調用: self._impl.close()
完成。app
接下來看 PollIOLoop 的初始化方法中做了什麼:ide
1 def initialize(self, impl, time_func=None, **kwargs): 2 super(PollIOLoop, self).initialize(**kwargs) 3 self._impl = impl # 指定 epoll 4 if hasattr(self._impl, 'fileno'): 5 set_close_exec(self._impl.fileno()) # fork 後關閉無用文件描述符 6 self.time_func = time_func or time.time # 指定獲取當前時間的函數 7 self._handlers = {} # handler 的字典,儲存被 epoll 監聽的 handler,與打開它的文件描述符 ( file descriptor 簡稱 fd ) 一一對應 8 self._events = {} # event 的字典,儲存 epoll 返回的活躍的 fd event pairs 9 self._callbacks = [] # 儲存各個 fd 回調函數的列表 10 self._callback_lock = threading.Lock() # 指定進程鎖 11 self._timeouts = [] # 將是一個最小堆結構,按照超時時間從小到大排列的 fd 的任務堆( 一般這個任務都會包含一個 callback ) 12 self._cancellations = 0 # 關於 timeout 的計數器 13 self._running = False # ioloop 是否在運行 14 self._stopped = False # ioloop 是否中止 15 self._closing = False # ioloop 是否關閉 16 self._thread_ident = None # 當前線程堆標識符 ( thread identify ) 17 self._blocking_signal_threshold = None # 系統信號, 主要用來在 epoll_wait 時判斷是否會有 signal alarm 打斷 epoll 18 self._timeout_counter = itertools.count() # 超時計數器 ( 暫時不是很明白具體做用,好像和前面的 _cancellations 有關係? 請大神講講) 19 self._waker = Waker() # 一個 waker 類,主要是對於管道 pipe 的操做,由於 ioloop 屬於底層的數據操做,這裏 epoll 監聽的是 pipe 20 self.add_handler(self._waker.fileno(), 21 lambda fd, events: self._waker.consume(), 22 self.READ) # 將管道加入 epoll 監聽,對於 web server 初始化時只須要關心 READ 事件
除了註釋中的解釋,還有幾點補充:函數
2.Waker(): Waker 封裝了對於管道 pipe 的操做:oop
1 def set_close_exec(fd): 2 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) 4 def _set_nonblocking(fd): 5 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 6 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 7 class Waker(interface.Waker): 8 def __init__(self): 9 r, w = os.pipe() 10 _set_nonblocking(r) 11 _set_nonblocking(w) 12 set_close_exec(r) 13 set_close_exec(w) 14 self.reader = os.fdopen(r, "rb", 0) 15 self.writer = os.fdopen(w, "wb", 0) 16 def fileno(self): 17 return self.reader.fileno() 18 def write_fileno(self): 19 return self.writer.fileno() 20 def wake(self): 21 try: 22 self.writer.write(b"x") 23 except IOError: 24 pass 25 def consume(self): 26 try: 27 while True: 28 result = self.reader.read() 29 if not result: 30 break 31 except IOError: 32 pass 33 def close(self): 34 self.reader.close() 35 self.writer.close() 36 ```
能夠看到 waker 把 pipe 分爲讀、 寫兩個管道並都設置了非阻塞和 close_exec
。 注意wake(self)
方法中:self.writer.write(b"x")
直接向管道中寫入隨意字符從而釋放管道。spa
ioloop 最核心的部分:.net
1 def start(self): 2 if self._running: # 判斷是否已經運行 3 raise RuntimeError("IOLoop is already running") 4 self._setup_logging() 5 if self._stopped: 6 self._stopped = False # 設置中止爲假 7 return 8 old_current = getattr(IOLoop._current, "instance", None) 9 IOLoop._current.instance = self 10 self._thread_ident = thread.get_ident() # 得到當前線程標識符 11 self._running = True # 設置運行 12 old_wakeup_fd = None 13 if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': 14 try: 15 old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) 16 if old_wakeup_fd != -1: 17 signal.set_wakeup_fd(old_wakeup_fd) 18 old_wakeup_fd = None 19 except ValueError: 20 old_wakeup_fd = None 21 try: 22 while True: # 服務器進程正式開始,相似於其餘服務器的 serve_forever 23 with self._callback_lock: # 加鎖,_callbacks 作爲臨界區不加鎖進行讀寫會產生髒數據 24 callbacks = self._callbacks # 讀取 _callbacks 25 self._callbacks = []. # 清空 _callbacks 26 due_timeouts = [] # 用於存放這個週期內已過時( 已超時 )的任務 27 if self._timeouts: # 判斷 _timeouts 裏是否有數據 28 now = self.time() # 獲取當前時間,用來判斷 _timeouts 裏的任務有沒有超時 29 while self._timeouts: # _timeouts 有數據時一直循環, _timeouts 是個最小堆,第一個數據永遠是最小的, 這裏第一個數據永遠是最接近超時或已超時的 30 if self._timeouts[0].callback is None: # 超時任務無回調 31 heapq.heappop(self._timeouts) # 直接彈出 32 self._cancellations -= 1 # 超時計數器 -1 33 elif self._timeouts[0].deadline <= now: # 判斷最小的數據是否超時 34 due_timeouts.append(heapq.heappop(self._timeouts)) # 超時就加到已超時列表裏。 35 else: 36 break # 由於最小堆,若是沒超時就直接退出循環( 後面的數據一定未超時 ) 37 if (self._cancellations > 512 38 and self._cancellations > (len(self._timeouts) >> 1)): # 當超時計數器大於 512 而且 大於 _timeouts 長度一半( >> 爲右移運算, 至關於十進制數據被除 2 )時,清零計數器,並剔除 _timeouts 中無 callbacks 的任務 39 self._cancellations = 0 40 self._timeouts = [x for x in self._timeouts 41 if x.callback is not None] 42 heapq.heapify(self._timeouts) # 進行 _timeouts 最小堆化 43 for callback in callbacks: 44 self._run_callback(callback) # 運行 callbacks 裏全部的 calllback 45 for timeout in due_timeouts: 46 if timeout.callback is not None: 47 self._run_callback(timeout.callback) # 運行全部已過時任務的 callback 48 callbacks = callback = due_timeouts = timeout = None # 釋放內存 49 if self._callbacks: # _callbacks 裏有數據時 50 poll_timeout = 0.0 # 設置 epoll_wait 時間爲0( 當即返回 ) 51 elif self._timeouts: # _timeouts 裏有數據時 52 poll_timeout = self._timeouts[0].deadline - self.time() 53 # 取最小過時時間當 epoll_wait 等待時間,這樣當第一個任務過時時當即返回 54 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) 55 # 若是最小過時時間大於默認等待時間 _POLL_TIMEOUT = 3600,則用 3600,若是最小過時時間小於0 就設置爲0 當即返回。 56 else: 57 poll_timeout = _POLL_TIMEOUT # 默認 3600 s 等待時間 58 if not self._running: # 檢查是否有系統信號中斷運行,有則中斷,無則繼續 59 break 60 if self._blocking_signal_threshold is not None: 61 signal.setitimer(signal.ITIMER_REAL, 0, 0) # 開始 epoll_wait 以前確保 signal alarm 都被清空( 這樣在 epoll_wait 過程當中不會被 signal alarm 打斷 ) 62 try: 63 event_pairs = self._impl.poll(poll_timeout) # 獲取返回的活躍事件隊 64 except Exception as e: 65 if errno_from_exception(e) == errno.EINTR: 66 continue 67 else: 68 raise 69 if self._blocking_signal_threshold is not None: 70 signal.setitimer(signal.ITIMER_REAL, 71 self._blocking_signal_threshold, 0) # epoll_wait 結束, 再設置 signal alarm 72 self._events.update(event_pairs) # 將活躍事件加入 _events 73 while self._events: 74 fd, events = self._events.popitem() # 循環彈出事件 75 try: 76 fd_obj, handler_func = self._handlers[fd] # 處理事件 77 handler_func(fd_obj, events) 78 except (OSError, IOError) as e: 79 if errno_from_exception(e) == errno.EPIPE: 80 pass 81 else: 82 self.handle_callback_exception(self._handlers.get(fd)) 83 except Exception: 84 self.handle_callback_exception(self._handlers.get(fd)) 85 fd_obj = handler_func = None 86 finally: 87 self._stopped = False # 確保發生異常也繼續運行 88 if self._blocking_signal_threshold is not None: 89 signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm 90 IOLoop._current.instance = old_current 91 if old_wakeup_fd is not None: 92 signal.set_wakeup_fd(old_wakeup_fd) # 和 start 開頭部分對應,可是不是很清楚做用,求老司機帶帶路
最後來看 stop
:
1 def stop(self): 2 self._running = False 3 self._stopped = True 4 self._waker.wake()
這個很簡單,設置判斷條件,而後調用 self._waker.wake()
向 pipe 寫入隨意字符喚醒 ioloop 事件循環(感謝 mlcyng 指正這裏的錯誤)。 over!
噗,寫了這麼長,終於寫完了。 通過分析,咱們能夠看到, ioloop 其實是對 epoll 的封裝,並加入了一些對上層事件的處理和 server 相關的底層處理。
最後,感謝你們任勞任怨看到這,文中理解有誤的地方還請多多指教!