利用epoll實現異步IO

  以前異步IO一直沒搞明白,大體的理解就是在一個大的循環中,有兩部分:第一部分是監聽事件;第二部分是處理事件(經過添加回調函數的方式)。就拿網絡通訊來講,能夠先經過調用 select 模塊中的 select 監聽各個 socket 。當 socket 有事件到來時,針對相應的事件作出處理,就這麼一直循環下去。因此異步IO也被稱爲事件驅動IO。原理其實我說得太簡單了,因此我會以一個例子來講明一切。不過在這以前我仍是要說一下 select 和 epoll 的區別。服務器

1、IO多路服用的select

  IO多路複用相對於阻塞式和非阻塞式的好處就是它能夠監聽多個 socket ,而且不會消耗過多資源。當用戶進程調用 select 時,它會監聽其中全部 socket 直到有一個或多個 socket 數據已經準備好,不然就一直處於阻塞狀態。select的缺點在於單個進程可以監視的文件描述符的數量存在最大限制,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量的tcp連接處於很是活躍狀態,但調用select()會對全部的socket進行一次線性掃描,因此這也浪費了必定的開銷。不過它的好處還有就是它的跨平臺特性。網絡

2、 異步IO的epoll數據結構

  epoll的優勢就是徹底的異步,你只須要對其中 poll 函數註冊相應的 socket 和事件,就能夠徹底無論。當有時間發生時,數據已經從內核態拷貝到用戶態,也就是徹底沒有阻塞。app

 

3、基於epoll的聊天室程序異步

  說了這麼多,我決定仍是用epoll寫一個多人聊天程序。epoll能夠支持大量鏈接,select卻有限制,因此這就是我決定用epoll的緣由。首先看服務器程序:socket

 1 import socket, select
 2 # 服務端
 3 
 4 serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 5 serverSock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
 6 serverSock.bind(('127.0.0.1', 8001))
 7 serverSock.listen(5)
 8 serverSock.setblocking(False)
 9 
10 EOL = bytes('\n\n', 'utf-8')
11 QUIT = bytes('\r\n', 'utf-8')
12 epoll = select.epoll()
13 epoll.register(serverSock.fileno(), select.EPOLLIN)
14 print('註冊事件:%s'%serverSock.fileno())
15 
16 try:
17     connections = {}; requests = {}; responses = {}
18     while True:
19         events = epoll.poll(1)
20         for fileno, event in events:
21 #             print('event:%s fileno:%s'%(event, fileno))
22             if fileno == serverSock.fileno():
23                 clientSock, address = serverSock.accept()
24                 print('鏈接到客戶端: %s:%s'%(address[0], address[1]))
25                 clientSock.setblocking(False)
26                 connections[clientSock.fileno()] = (clientSock, address)
27                 epoll.register(clientSock.fileno(), select.EPOLLOUT) # socket只能註冊輸入或輸出一個,不能同時註冊
28                 requests[clientSock.fileno()] = bytes('', 'utf-8')
29                 responses[clientSock.fileno()] = bytes('你已鏈接到服務器,IP爲{}:{}\n\n'.format(*serverSock.getsockname()),
30                                          'utf-8')
31             elif event & select.EPOLLIN:
32                 requests[fileno] += connections[fileno][0].recv(1024)
33                 if requests[fileno].endswith(EOL):
34                     msg = str(requests[fileno], 'utf-8')
35                     msg = '來自{}的消息:{}'.format(connections[fileno][1], msg[:-2])
36                     requests[fileno] = b''
37                     #print(msg)
38                     for i in responses:
39                         if i == fileno:
40                             continue
41                         responses[i] += bytes(msg, 'utf-8')
42                         epoll.modify(i, select.EPOLLOUT)
43                 if QUIT in requests[fileno]:
44                     epoll.modify(fileno, select.EPOLLOUT) 
45                     
46             elif event & select.EPOLLOUT:
47                 #print('開始發送消息:%s'%str(responses[fileno], 'utf-8'))
48                 bytesSend = connections[fileno][0].send(responses[fileno])
49                 responses[fileno] = responses[fileno][bytesSend:]
50                 #print('發送完成')
51                 if responses[fileno] == b'':
52                     epoll.modify(fileno, select.EPOLLIN)
53                 if QUIT in requests[fileno]:
54                     epoll.modify(fileno, 0)
55                     connections[fileno][0].shutdown(socket.SHUT_RDWR)
56                     
57             elif event & select.EPOLLHUP:
58                 epoll.unregister(fileno)
59                 connections[fileno][0].close()
60                 del connections[fileno]
61 finally:
62     epoll.unregister(serverSock.fileno())
63     epoll.close()
64     serverSock.close()
65     print('已退出服務端程序')

 

 

注意,我首先定義了兩個終止符:EOL表示這段話已經發完了;QUIT表示客戶端想要退出。客戶端的程序有點讓我爲難,既要在命令行輸入又要同時保證能輸出別人發過來的消息,全部我只好用了prompt_toolkit再加上一個線程。以下:tcp

 1 import socket, prompt_toolkit, select
 2 import threading, queue
 3 
 4 
 5 class Client:
 6     def __init__(self, sock):
 7         self.sock = sock
 8         self.want_to_send = False
 9         self.want_to_recv = True
10         self._msg = queue.Queue()
11         
12     def fileno(self):
13         return self.sock.fileno()
14     
15     def handle_recv(self):
16         print('接受消息..')
17         msg = self.sock.recv(1024)
18         print(str(msg, 'utf-8'))
19         
20     def handle_send(self):
21         msg = self._msg.get()
22         if msg == '\r\n':
23             self.want_to_send = False
24             self.want_to_recv = False
25         self.sock.sendall(bytes(msg, 'utf-8'))
26         self.want_to_send = False
27             
28 def handle_sock(want_to_send, want_to_recv, sock):
29     print('開始處理消息...')
30     want_to_recv.append(sock.fileno())
31     while True:
32         if sock.want_to_send:
33             if not want_to_send:
34                 want_to_send.append(myclient.fileno())
35         else:
36             want_to_send.clear()
37         can_recv, can_send, _ = select.select(want_to_recv, want_to_send, [], 1)
38         if can_recv:
39             sock.handle_recv()
40         if can_send:
41             sock.handle_send()
42         if not (sock.want_to_send or sock.want_to_recv):
43             print('正中止客戶端鏈接...')
44             break
45         if sock._msg.qsize():
46             sock.want_to_send = True
47 
48             
49 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 s.connect(('127.0.0.1',8001))
51 
52 myclient = Client(s)
53 want_to_send = []
54 want_to_recv = []
55     
56 
57             
58 t = threading.Thread(target=handle_sock,
59                      args=(want_to_send, want_to_recv, myclient),
60                      daemon=True)
61 t.start()
62 
63 try:
64     while True:
65         messages = prompt_toolkit.shortcuts.prompt('\n\n>>> ',patch_stdout=True)
66         myclient._msg.put(messages+'\n\n')
67 except KeyboardInterrupt:
68     myclient._msg.put('\r\n')
69 finally:
70     t.join()
71     myclient.sock.close()
72     print('網絡已斷開')

 

個人服務器跑在 jupyter 上,客戶端跑在命令行上,效果以下:函數

 

  客戶端接受和發送消息都是互不影響的,這樣就實現了一個多人聊天的功能。並且服務器使用的是epoll,因此哪怕是成千上萬的人同時在線也沒有任何壓力。至於怎麼測試暫時還沒想到辦法。測試

相關文章
相關標籤/搜索