poll本質上和select沒有區別,只是沒有了最大鏈接數(linux上默認1024個)的限制,緣由是它基於鏈表存儲的。html
本人的另外一篇博客講了 python select : https://www.cnblogs.com/weihengblog/p/9830253.htmlpython
首先咱們創建一個服務器端的socketlinux
import select import socket import sys import queue from queue import Queue # 建立一個socket鏈接 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # 綁定IP地址和端口號 server_address = ('localhost', 8800) server.bind(server_address) print("服務器已啓動http://localhost:8800/") # 監聽鏈接數 server.listen(5) # 消息隊列 用於記錄客戶端發來的消息 message_queues = {}
設置輪詢的超時時間服務器
若是不設置timeout,方法將會阻塞直到對應的poll對象有一個事件發生。socket
TIMEOUT = 1000 # 設置爲1秒
poll的事件類型spa
POLLIN Input ready 有數據讀取 POLLPRI Priority input ready 有緊急數據讀取 POLLOUT Able to receive output 準備輸出 POLLERR Error 某些錯誤 POLLHUP Channel closed 掛起 POLLNVAL Channel not open 無效請求,描述符沒法打開
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT
註冊要監聽文件描述符code
首先須要實例化一個poll對象,對要監聽的句柄進行註冊orm
poller = select.poll() # 註冊server端socket 要監聽的事件類型爲 讀 poller.register(server, READ_ONLY)
文件描述符映射具體的套接字對象server
""" 因爲poll()返回包含套接字文件描述符和事件標誌的元組列表,所以須要從文件描述符號到對象的映射才能從套接字中讀取或寫入該套接字。 """ fd_to_socket = { server.fileno(): server,}
事件輪詢htm
while True: """ 去檢測已經註冊的文件描述符,會返回一個(fd,event)元祖列表 fd:文件描述符 event:描述符可能會發生的事件 若是返回爲空的列表,則說明超時且沒有文件描述符有事件發生 """ events = poller.poll(TIMEOUT) # 若是timeout爲None,將會阻塞,知道有事件發生 for fd, flag in events: # 從文件描述符中檢索實際的套接字 s = fd_to_socket[fd]
事件類型判斷
if flag & (select.POLLIN | select.POLLPRI): # 有數據能夠讀取 if s is server: # 表示有新的鏈接 # 能夠讀取數據 connection, client_address = s.accept() print(sys.stderr, '新的鏈接來自:', client_address) connection.setblocking(0) fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符 poller.register(connection, READ_ONLY) message_queues[connection] = Queue() # 爲了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據 else: # 表示客戶端傳來了消息 data = s.recv(1024) if data: # 代表數據接受成功 print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername())) message_queues[s].put(data) # 修改一個已經存在的fd,修改事件爲寫。這裏表示服務器向客戶端要發送數據 poller.modify(s, READ_WRITE) else: # 若是沒有接受到數據,表示要斷開鏈接 print(sys.stderr, '關閉', client_address, '並未讀取到數據') # 中止監聽鏈接上的輸入 poller.unregister(s) s.close() # 將此連接從隊列中刪除 del message_queues[s] elif flag & select.POLLHUP: print(sys.stderr, '關閉', client_address, '收到HUP後') poller.unregister(s) s.close() elif flag & select.POLLOUT: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print(sys.stderr, '隊列', s.getpeername(), '爲空') poller.modify(s, READ_ONLY) else: print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername())) s.send(next_msg) elif flag & select.POLLERR: print(sys.stderr, '異常信息:', s.getpeername()) poller.unregister(s) s.close() del message_queues[s]
server端:
import select import socket import sys import queue from queue import Queue # 建立一個socket鏈接 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # 綁定IP地址和端口號 server_address = ('localhost', 8800) server.bind(server_address) print("服務器已啓動http://localhost:8800/") # 監聽鏈接數 server.listen(5) # 消息隊列 message_queues = {} """ POLLIN Input ready 有數據讀取 POLLPRI Priority input ready 有緊急數據讀取 POLLOUT Able to receive output 準備輸出 POLLERR Error 某些錯誤 POLLHUP Channel closed 掛起 POLLNVAL Channel not open 無效請求,描述符沒法打開 """ # 經常使用的標識 表明你想檢查的事件類型 READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT TIMEOUT = 1000 poller = select.poll() # 建立一個poll對象,該對象能夠註冊或註銷文件描述符 # 註冊一個文件描述符,能夠經過poll()方法來檢查是否有對應的IO事件發生 # 接受兩個參數, fd 和 eventmask poller.register(server,READ_ONLY) fd_to_socket = { server.fileno(): server,} # 服務器的循環調用poll(),而後經過查找套接字並根據事件中的標誌採起行動來處理返回的「事件」。 while True: """ 去檢測已經註冊的文件描述符,會返回一個(fd,event)元祖列表 fd:文件描述符 event:描述符可能會發生的事件 若是返回爲空的列表,則說明超時且沒有文件描述符有事件發生 """ events = poller.poll(TIMEOUT) # 若是timeout爲None,將會阻塞,知道有事件發生 for fd, flag in events: # 從文件描述符中檢索實際的套接字 s = fd_to_socket[fd] if flag & (select.POLLIN | select.POLLPRI): # 有數據能夠讀取 if s is server: # 表示有新的鏈接 # 能夠讀取數據 connection, client_address = s.accept() print(sys.stderr, '新的鏈接來自:', client_address) connection.setblocking(0) fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符 poller.register(connection, READ_ONLY) message_queues[connection] = Queue() # 爲了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據 else: # 表示客戶端傳來了消息 data = s.recv(1024) if data: # 代表數據接受成功 print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername())) message_queues[s].put(data) # 修改一個已經存在的fd,修改事件爲寫。這裏表示服務器向客戶端要發送數據 poller.modify(s, READ_WRITE) else: # 若是沒有接受到數據,表示要斷開鏈接 print(sys.stderr, '關閉', client_address, '並未讀取到數據') # 中止監聽鏈接上的輸入 poller.unregister(s) s.close() # 將此連接從隊列中刪除 del message_queues[s] elif flag & select.POLLHUP: print(sys.stderr, '關閉', client_address, '收到HUP後') poller.unregister(s) s.close() elif flag & select.POLLOUT: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print(sys.stderr, '隊列', s.getpeername(), '爲空') poller.modify(s, READ_ONLY) else: print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername())) s.send(next_msg) elif flag & select.POLLERR: print(sys.stderr, '異常信息:', s.getpeername()) poller.unregister(s) s.close() del message_queues[s]
client端:
import socket client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('localhost',8800)) while True: msg = bytes(input("<<<"),encoding='utf-8') client.sendall(msg) data = client.recv(1024) print("{}".format(data))