Kqueue 實現非阻塞 Socket 通訊

若是有誤,請大神指出啊!python

--git

以前留下的坑

以前寫過一篇 kqueue 實現文件操做監控,講了 Kqueue 在文件監控的應用,文章給出的例子只對於一個 test 文件進行監控。github

Kqueue 或者 Epoll 更多的是被使用在 Socket 通訊的場景中,因而我又寫了一個帶有 Socket 的版本。代碼以下:服務器

# coding=utf-8
import select
from socket import socket
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from threading import Thread

fd = open('test')
s = socket(AF_INET, SOCK_STREAM)
s.bind(("127.0.0.1", 3000))
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.listen(1)
kq = select.kqueue()
flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR
fflags = select.KQ_NOTE_DELETE | select.KQ_NOTE_WRITE | select.KQ_NOTE_EXTEND \
         | select.KQ_NOTE_RENAME

# 監測文件事件,若是有新事件在這個 fd 上發生,則返回,監測事件類型由 fflags 規定
file_ev = select.kevent(fd.fileno(), filter=select.KQ_FILTER_VNODE, flags=flags, fflags=fflags)

# 監測 Socket 事件,若是有新數據可讀則返回
socket_ev = select.kevent(s.fileno(), filter=select.KQ_FILTER_READ, flags=flags)

# 監測多個對象就只需把不少 kevent 對象塞進 events 列表中,而後傳遞給 control 函數
events = []
events.append(file_ev)
events.append(socket_ev)


# 處理這個 socket 請求
def socket_handler(cl):
    while True:
        data = cl.recv(100)
        print data
        if not data:
            cl.close()
            print 'socket closed'
            break

while True:
    revents = kq.control(events, 1, None)
    for e in revents:
        # 若是是 socket 觸發的事件
        if e.ident == s.fileno():
            print 'Event from socket'
            if e.filter & select.KQ_FILTER_READ:
                cl, _ = s.accept()
                # 若是直接調用 socket_handler 函數,那麼這個 eventloop 會被阻塞,因此此處使用線程
                Thread(None, socket_handler, args=(cl,)).start()
            else:
                print e
        # 若是是文件觸發的事件
        if e.ident == fd.fileno():
            print 'Event from file'
            if e.fflags & select.KQ_NOTE_EXTEND:
                print 'extend'
            elif e.fflags & select.KQ_NOTE_WRITE:
                print 'write'
            elif e.fflags & select.KQ_NOTE_RENAME:
                print 'rename'
            elif e.fflags & select.KQ_NOTE_DELETE:
                print 'delete'
            else:
                print e

然而隨時時間的推移,我發現其實這是有大大的問題的,我自做聰明地爲每一個新連接產生一個線程應付 Client,然而若是當 Client 源源不斷地涌入時,線程數會超標,致使程序發送錯誤。就算咱們維護一個線程的列表,使列表的長度不大於某個標準,每次建立線程的損耗也是存在的。併發

性能對比

首先咱們創建了一個 KqueueEventLoop 的循環類,代碼以下:app

# 部分代碼參考了 SS 源碼中的實現
class KqueueEventLoop(object):

    KQ_FILTER_READ = select.KQ_FILTER_READ

    def __init__(self):
        self._fd_map = {}
        self._handler_map = {}
        self._event_map = {}
        self.kq = select.kqueue()
        self.klist = []
        self._stop = False

    # 啓動這個事件循環
    def run(self):
        while not self._stop:
            events = self.poll()
            for e in events:
                self._fd_map[e.ident](self._handler_map[e.ident])

    # 從事件池裏面取事件出來
    def poll(self):
        events = self.kq.control(self.klist, 1, None)
        return events

    # 註冊事件
    def add(self, f, mode, handler):
        fd = f.fileno()
        event = select.kevent(fd, filter=mode, flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR)
        self._handler_map[fd] = f
        self._fd_map[fd] = handler
        self._event_map[fd] = event
        self.klist.append(event)

    # 刪除事件
    def remove(self, f):
        fd = f.fileno()
        del self._handler_map[fd]
        del self._fd_map[fd]
        self.klist.remove(self._event_map[fd])

    # 暫停事件循環
    def stop(self):
        self._stop = True

若是是用一開始的代碼的想法,爲每一個 Client 生成一個線程,調用方法以下:socket

def test():
    loop = KqueueEventLoop()
    s = socket(AF_INET, SOCK_STREAM)
    s.bind(("127.0.0.1", 3000))
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.listen(5)

    def callback(f):
        Thread(None, handler, args=(f,)).start()

    def handler(f):
        print 'INFO: New connection established.'
        cl, _ = f.accept()
        while True:
            data = cl.recv(1024)
            if not data:
                print 'INFO: Connection dropped.'
                loop.remove(cl)
                cl.close()
                return
            print 'DATA: %s' % repr(data)

    loop.add(s, KqueueEventLoop.KQ_FILTER_READ, callback)
    loop.run()

if __name__ == '__main__':
    test()

咱們模擬了一個 100 個客戶端,每隔 0.1 秒向服務器發 1,獲取到該服務端內存使用量爲 14MB。 以前提到這邊線程的產生會形成兩個嚴重的問題,因此咱們就不生成線程。那麼怎麼作到原來的 while 循環來接受這麼多客戶端發來的數據呢?答案是,不用 while 循環。ide

繼續註冊事件

當事件池返回一個新鏈接創建請求時,接受並創建這個鏈接,再把這個鏈接扔回事件池中,代碼以下:函數

def test():
    loop = KqueueEventLoop()
    s = socket(AF_INET, SOCK_STREAM)
    s.bind(("127.0.0.1", 3000))
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.listen(5)

    def handler(f):
        print 'INFO: New connection established.'
        cl, _ = f.accept()
        cl.setblocking(False)
        loop.add(cl, KqueueEventLoop.KQ_FILTER_READ, read_data)

    def read_data(cl):
        try:
            data = cl.recv(1024)
            if not data:
                print 'INFO: Connection dropped.'
                loop.remove(cl)
                cl.close()
                return
            print 'DATA: %s' % repr(data)
        except Exception, e:
            print 'ERROR: %s' % repr(e)
            loop.remove(cl)
            cl.close()

    loop.add(s, KqueueEventLoop.KQ_FILTER_READ, handler)
    loop.run()

if __name__ == '__main__':
    test()

咱們經過 loop.add(cl, KqueueEventLoop.KQ_FILTER_READ, read_data) 將每一個新鏈接 cl 加入到事件循環 loop 中,並讓事件循環類記錄回調方法 read_data。此時的情況就是,整個程序只有一個事件循環,每來一個新鏈接,經過 handler 回調創建該鏈接。再若是網卡接受到新數據,KqueueEventLoop 找到新數據對應的鏈接,把數據讀入內存並打印出來。oop

以上實現沒有了新建線程帶來的損耗,整個程序只保留了一個 while 循環,使得相同狀況下內存使用量僅爲 5MB。而且併發鏈接數量有很大提高,再也不受限於線程數限制。

其餘

在 IO 密集型程序中,其實線程的使用是能夠改善程序運行速度的。但線程仍是要用在可控的地方,好比用線程去跑事件循環。

相關文章
相關標籤/搜索