Python poll IO多路複用

1、poll介紹

poll本質上和select沒有區別,只是沒有了最大鏈接數(linux上默認1024個)的限制,緣由是它基於鏈表存儲的。html

本人的另外一篇博客講了 python  select : https://www.cnblogs.com/weihengblog/p/9830253.htmlpython

2、使用poll編寫SocketServer(本博客代碼須要在linux下運行)

首先咱們創建一個服務器端的socketlinux

import select
import socket
import sys
import queue
from queue import Queue


# 建立一個socket鏈接
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# 綁定IP地址和端口號
server_address = ('localhost', 8800)
server.bind(server_address)
print("服務器已啓動http://localhost:8800/")

# 監聽鏈接數
server.listen(5)

# 消息隊列 用於記錄客戶端發來的消息
message_queues = {}

設置輪詢的超時時間服務器

若是不設置timeout,方法將會阻塞直到對應的poll對象有一個事件發生。socket

TIMEOUT = 1000 # 設置爲1秒

poll的事件類型spa

POLLIN    Input ready                  有數據讀取
POLLPRI    Priority input ready        有緊急數據讀取
POLLOUT    Able to receive output      準備輸出
POLLERR    Error                       某些錯誤
POLLHUP    Channel closed              掛起
POLLNVAL    Channel not open           無效請求,描述符沒法打開

READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT

註冊要監聽文件描述符code

首先須要實例化一個poll對象,對要監聽的句柄進行註冊orm

poller = select.poll()
# 註冊server端socket  要監聽的事件類型爲 讀
poller.register(server, READ_ONLY)

文件描述符映射具體的套接字對象server

"""
因爲poll()返回包含套接字文件描述符和事件標誌的元組列表,所以須要從文件描述符號到對象的映射才能從套接字中讀取或寫入該套接字。
"""
fd_to_socket = { server.fileno(): server,}

 

事件輪詢htm

while True:

    """
    去檢測已經註冊的文件描述符,會返回一個(fd,event)元祖列表
    fd:文件描述符
    event:描述符可能會發生的事件
    若是返回爲空的列表,則說明超時且沒有文件描述符有事件發生
    """
    events = poller.poll(TIMEOUT) # 若是timeout爲None,將會阻塞,知道有事件發生
    for fd, flag in events:
        # 從文件描述符中檢索實際的套接字
        s = fd_to_socket[fd]

事件類型判斷

        if flag & (select.POLLIN | select.POLLPRI): # 有數據能夠讀取

            if s is server: # 表示有新的鏈接
                # 能夠讀取數據
                connection, client_address = s.accept()
                print(sys.stderr, '新的鏈接來自:', client_address)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符
                poller.register(connection, READ_ONLY)

                message_queues[connection] = Queue() # 爲了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據
            else: # 表示客戶端傳來了消息

                data = s.recv(1024)
                if data: # 代表數據接受成功

                    print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername()))
                    message_queues[s].put(data)
                    # 修改一個已經存在的fd,修改事件爲寫。這裏表示服務器向客戶端要發送數據
                    poller.modify(s, READ_WRITE)
                else:
                    # 若是沒有接受到數據,表示要斷開鏈接
                    print(sys.stderr, '關閉', client_address, '並未讀取到數據')
                    # 中止監聽鏈接上的輸入
                    poller.unregister(s)
                    s.close()

                    # 將此連接從隊列中刪除
                    del message_queues[s]

        elif flag & select.POLLHUP:
            print(sys.stderr, '關閉', client_address, '收到HUP後')
            poller.unregister(s)
            s.close()

        elif flag & select.POLLOUT:
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                print(sys.stderr, '隊列', s.getpeername(), '爲空')
                poller.modify(s, READ_ONLY)
            else:
                print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername()))
                s.send(next_msg)

        elif flag & select.POLLERR:
            print(sys.stderr, '異常信息:', s.getpeername())
            poller.unregister(s)
            s.close()
            del message_queues[s]

3、完整代碼示例

server端:

import select
import socket
import sys
import queue
from queue import Queue


# 建立一個socket鏈接
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# 綁定IP地址和端口號
server_address = ('localhost', 8800)
server.bind(server_address)
print("服務器已啓動http://localhost:8800/")

# 監聽鏈接數
server.listen(5)

# 消息隊列
message_queues = {}

"""
POLLIN    Input ready      有數據讀取
POLLPRI    Priority input ready   有緊急數據讀取
POLLOUT    Able to receive output  準備輸出
POLLERR    Error   某些錯誤
POLLHUP    Channel closed   掛起
POLLNVAL    Channel not open  無效請求,描述符沒法打開
"""
# 經常使用的標識  表明你想檢查的事件類型
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
READ_WRITE = READ_ONLY | select.POLLOUT

TIMEOUT = 1000
poller = select.poll() # 建立一個poll對象,該對象能夠註冊或註銷文件描述符

# 註冊一個文件描述符,能夠經過poll()方法來檢查是否有對應的IO事件發生
# 接受兩個參數, fd  和  eventmask
poller.register(server,READ_ONLY)
fd_to_socket = { server.fileno(): server,}
# 服務器的循環調用poll(),而後經過查找套接字並根據事件中的標誌採起行動來處理返回的「事件」。
while True:

    """
    去檢測已經註冊的文件描述符,會返回一個(fd,event)元祖列表
    fd:文件描述符
    event:描述符可能會發生的事件
    若是返回爲空的列表,則說明超時且沒有文件描述符有事件發生
    """
    events = poller.poll(TIMEOUT) # 若是timeout爲None,將會阻塞,知道有事件發生
    for fd, flag in events:
        # 從文件描述符中檢索實際的套接字
        s = fd_to_socket[fd]

        if flag & (select.POLLIN | select.POLLPRI): # 有數據能夠讀取

            if s is server: # 表示有新的鏈接
                # 能夠讀取數據
                connection, client_address = s.accept()
                print(sys.stderr, '新的鏈接來自:', client_address)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection # 往fd字典中添加一個新的 文件描述符
                poller.register(connection, READ_ONLY)

                message_queues[connection] = Queue() # 爲了防止等待客戶端發來數據期間發生阻塞,分配一個隊列用於保存數據
            else: # 表示客戶端傳來了消息

                data = s.recv(1024)
                if data: # 代表數據接受成功

                    print(sys.stderr, '接受數據 "%s" 來自 %s' % (data, s.getpeername()))
                    message_queues[s].put(data)
                    # 修改一個已經存在的fd,修改事件爲寫。這裏表示服務器向客戶端要發送數據
                    poller.modify(s, READ_WRITE)
                else:
                    # 若是沒有接受到數據,表示要斷開鏈接
                    print(sys.stderr, '關閉', client_address, '並未讀取到數據')
                    # 中止監聽鏈接上的輸入
                    poller.unregister(s)
                    s.close()

                    # 將此連接從隊列中刪除
                    del message_queues[s]

        elif flag & select.POLLHUP:
            print(sys.stderr, '關閉', client_address, '收到HUP後')
            poller.unregister(s)
            s.close()

        elif flag & select.POLLOUT:
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                print(sys.stderr, '隊列', s.getpeername(), '爲空')
                poller.modify(s, READ_ONLY)
            else:
                print(sys.stderr, '發送 "%s" 到 %s' % (next_msg, s.getpeername()))
                s.send(next_msg)

        elif flag & select.POLLERR:
            print(sys.stderr, '異常信息:', s.getpeername())
            poller.unregister(s)
            s.close()
            del message_queues[s]

client端:

import socket


client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost',8800))

while True:
    msg = bytes(input("<<<"),encoding='utf-8')
    client.sendall(msg)

    data = client.recv(1024)

    print("{}".format(data))
相關文章
相關標籤/搜索