若是有誤,請大神指出啊!python
--git
以前寫過一篇 kqueue 實現文件操做監控,講了 Kqueue 在文件監控的應用,文章給出的例子只對於一個 test 文件進行監控。github
Kqueue 或者 Epoll 更多的是被使用在 Socket 通訊的場景中,因而我又寫了一個帶有 Socket 的版本。代碼以下:服務器
# coding=utf-8 import select from socket import socket from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from threading import Thread fd = open('test') s = socket(AF_INET, SOCK_STREAM) s.bind(("127.0.0.1", 3000)) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.listen(1) kq = select.kqueue() flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR fflags = select.KQ_NOTE_DELETE | select.KQ_NOTE_WRITE | select.KQ_NOTE_EXTEND \ | select.KQ_NOTE_RENAME # 監測文件事件,若是有新事件在這個 fd 上發生,則返回,監測事件類型由 fflags 規定 file_ev = select.kevent(fd.fileno(), filter=select.KQ_FILTER_VNODE, flags=flags, fflags=fflags) # 監測 Socket 事件,若是有新數據可讀則返回 socket_ev = select.kevent(s.fileno(), filter=select.KQ_FILTER_READ, flags=flags) # 監測多個對象就只需把不少 kevent 對象塞進 events 列表中,而後傳遞給 control 函數 events = [] events.append(file_ev) events.append(socket_ev) # 處理這個 socket 請求 def socket_handler(cl): while True: data = cl.recv(100) print data if not data: cl.close() print 'socket closed' break while True: revents = kq.control(events, 1, None) for e in revents: # 若是是 socket 觸發的事件 if e.ident == s.fileno(): print 'Event from socket' if e.filter & select.KQ_FILTER_READ: cl, _ = s.accept() # 若是直接調用 socket_handler 函數,那麼這個 eventloop 會被阻塞,因此此處使用線程 Thread(None, socket_handler, args=(cl,)).start() else: print e # 若是是文件觸發的事件 if e.ident == fd.fileno(): print 'Event from file' if e.fflags & select.KQ_NOTE_EXTEND: print 'extend' elif e.fflags & select.KQ_NOTE_WRITE: print 'write' elif e.fflags & select.KQ_NOTE_RENAME: print 'rename' elif e.fflags & select.KQ_NOTE_DELETE: print 'delete' else: print e
然而隨時時間的推移,我發現其實這是有大大的問題的,我自做聰明地爲每一個新連接產生一個線程應付 Client,然而若是當 Client 源源不斷地涌入時,線程數會超標,致使程序發送錯誤。就算咱們維護一個線程的列表,使列表的長度不大於某個標準,每次建立線程的損耗也是存在的。併發
首先咱們創建了一個 KqueueEventLoop 的循環類,代碼以下:app
# 部分代碼參考了 SS 源碼中的實現 class KqueueEventLoop(object): KQ_FILTER_READ = select.KQ_FILTER_READ def __init__(self): self._fd_map = {} self._handler_map = {} self._event_map = {} self.kq = select.kqueue() self.klist = [] self._stop = False # 啓動這個事件循環 def run(self): while not self._stop: events = self.poll() for e in events: self._fd_map[e.ident](self._handler_map[e.ident]) # 從事件池裏面取事件出來 def poll(self): events = self.kq.control(self.klist, 1, None) return events # 註冊事件 def add(self, f, mode, handler): fd = f.fileno() event = select.kevent(fd, filter=mode, flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR) self._handler_map[fd] = f self._fd_map[fd] = handler self._event_map[fd] = event self.klist.append(event) # 刪除事件 def remove(self, f): fd = f.fileno() del self._handler_map[fd] del self._fd_map[fd] self.klist.remove(self._event_map[fd]) # 暫停事件循環 def stop(self): self._stop = True
若是是用一開始的代碼的想法,爲每一個 Client 生成一個線程,調用方法以下:socket
def test(): loop = KqueueEventLoop() s = socket(AF_INET, SOCK_STREAM) s.bind(("127.0.0.1", 3000)) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.listen(5) def callback(f): Thread(None, handler, args=(f,)).start() def handler(f): print 'INFO: New connection established.' cl, _ = f.accept() while True: data = cl.recv(1024) if not data: print 'INFO: Connection dropped.' loop.remove(cl) cl.close() return print 'DATA: %s' % repr(data) loop.add(s, KqueueEventLoop.KQ_FILTER_READ, callback) loop.run() if __name__ == '__main__': test()
咱們模擬了一個 100 個客戶端,每隔 0.1 秒向服務器發 1,獲取到該服務端內存使用量爲 14MB。 以前提到這邊線程的產生會形成兩個嚴重的問題,因此咱們就不生成線程。那麼怎麼作到原來的 while 循環來接受這麼多客戶端發來的數據呢?答案是,不用 while 循環。ide
當事件池返回一個新鏈接創建請求時,接受並創建這個鏈接,再把這個鏈接扔回事件池中,代碼以下:函數
def test(): loop = KqueueEventLoop() s = socket(AF_INET, SOCK_STREAM) s.bind(("127.0.0.1", 3000)) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.listen(5) def handler(f): print 'INFO: New connection established.' cl, _ = f.accept() cl.setblocking(False) loop.add(cl, KqueueEventLoop.KQ_FILTER_READ, read_data) def read_data(cl): try: data = cl.recv(1024) if not data: print 'INFO: Connection dropped.' loop.remove(cl) cl.close() return print 'DATA: %s' % repr(data) except Exception, e: print 'ERROR: %s' % repr(e) loop.remove(cl) cl.close() loop.add(s, KqueueEventLoop.KQ_FILTER_READ, handler) loop.run() if __name__ == '__main__': test()
咱們經過 loop.add(cl, KqueueEventLoop.KQ_FILTER_READ, read_data)
將每一個新鏈接 cl 加入到事件循環 loop 中,並讓事件循環類記錄回調方法 read_data。此時的情況就是,整個程序只有一個事件循環,每來一個新鏈接,經過 handler 回調創建該鏈接。再若是網卡接受到新數據,KqueueEventLoop 找到新數據對應的鏈接,把數據讀入內存並打印出來。oop
以上實現沒有了新建線程帶來的損耗,整個程序只保留了一個 while 循環,使得相同狀況下內存使用量僅爲 5MB。而且併發鏈接數量有很大提高,再也不受限於線程數限制。
在 IO 密集型程序中,其實線程的使用是能夠改善程序運行速度的。但線程仍是要用在可控的地方,好比用線程去跑事件循環。