Python select IO多路複用

1、select介紹

Python的select()函數是底層操做系統實現的直接接口。它監視套接字,打開文件和管道(任何帶有返回有效文件描述符fileno()方法),直到它們變得可讀或可寫,或者發生通訊錯誤。select()使得更容易同時監視多個鏈接,而且比使用套接字超時在Python中編寫輪詢循環更有效,由於監視發生在操做系統網絡層而不是解釋器。緩存

 

2、使用select編寫SocketServer

接下來經過socket server例子要以瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的服務器

首先咱們創建一個服務器端socket

import select
import socket
import sys
from queue import Queue
import queue


# 建立套接字
server = socket.socket()
server.setblocking(False)
server.bind(('localhost',8800))
# 監聽傳入的鏈接
server.listen(5)

select()的參數是三個包含要監視的通訊通道的列表。第一個是要檢查要讀取的傳入數據的對象列表,第二個包含在緩衝區中有空間時將接收傳出數據的對象,第三個包含可能有錯誤的對象(一般是錯誤的組合輸入和輸出通道對象)。服務器的下一步是設置包含要傳遞給select()輸入源和輸出目標的列表網絡

input = [server] 
output = [] 
select.select(input,output,input)

全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到數據以後就馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取出來再發出去。併發

# 對外發送數據的隊列,記錄到字典中
message_queues = {}

服務器程序的主要部分循環,調用select()來阻止並等待網絡活動。app

while 1:
    #等待至少一個套接字準備好處理
    print(sys.stderr, '\nwaiting for the next event')
    """
    readable中的socket鏈接表明有數據可接受,可讀取
    writable list中存放着你能夠對其進行發送(send)操做
    exceptional 當鏈接通訊出現error時會把error寫到exceptional列表中
    """
    readable, writable, exceptional = select.select(input, output, input)

readable列表中的socket可能會有3種可能狀態,
1. 若是這個socket是server(它負責監聽客戶端的鏈接),那就表示server端已經收到一個新的鏈接
2. 客戶端把數據發送過來了
3. 客戶端和服務器端斷開了鏈接,將客戶端對象從 列表和隊列中刪除
    for s in readable:
        if s is server: # 第1種狀況,表示有新的鏈接進來
            connection,add = s.accept() # 接受新的鏈接
            connection.setblocking(0)

            """
            這個時候,爲了避免阻塞整個程序的運行,咱們先將它放入input列表中。
            下一次loop時,就會被select去監聽,若是這個鏈接的客戶端發來了數據
            那麼這個鏈接的fd在server端就會變成就緒的,select就會把這個鏈接返回到readable列表中
            而後在 for s in readable中取出這個鏈接,開始接受數據
            """
            input.append(connection)
            message_queue[connection] = Queue()

        else: # 第2種狀況就是,客戶端把數據發送了過來
            data = s.recv(1024) # 經過recv去接受數據
            if data:
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()))
                message_queue[s].put(data) # 接受到的數據先放到隊列中
                if s not in output:
                    output.append(s) # 爲了避免影響處理與其餘客戶端的鏈接,這裏不馬上返回數據給客戶端
            else: # 第3種狀況 就是客戶端斷開了鏈接,這個時候recv()數據就是空,這個時候就能夠跟客戶端斷開鏈接
                if s in output:
                    """
                    既然斷開了鏈接,就沒有必要給客戶端發送數據了
                    若是客戶端鏈接對象還在output中,就把他刪除
                    """
                    output.remove(s)
                input.remove(s) # 在input列表中也刪除掉
                # 關閉鏈接,在隊列中也刪除
                s.close()
                del message_queue[s]

對於writable list中的socket,也有幾種狀態,若是這個客戶端鏈接在跟它對應的queue裏有數據,就把這個數據取出來再發回給這個客戶端,不然就把這個鏈接從output list中移除,這樣下一次循環select()調用時檢測到outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態socket

    for s in writable:
        try:
            next_msg = message_queue[s].get_nowait()
        except queue.Empty as e:
            output.remove(s)
        else:
            s.send(next_msg)

最後,exceptional 有異常產生。若是在跟某個socket鏈接通訊過程當中出了錯誤,就把這個鏈接對象在inputs\outputs\message_queue中都刪除,再把鏈接關閉掉函數

3、完整代碼

server端:高併發

# 經過非阻塞io實現http請求
# select + 回調 + 事件循環

# 使用單線程完成高併發

import select
import socket
import sys
from queue import Queue
import queue


# 建立套接字
server = socket.socket()
server.setblocking(False)
server.bind(('localhost',8800))
# 監聽傳入的鏈接
server.listen(5)

"""
第一個是要檢查要讀取的傳入數據的對象列表,
第二個包含在緩衝區中有空間時將接收傳出數據的對象,
第三個包含可能有錯誤的對象(一般是錯誤的組合輸入和輸出通道對象)。
"""
input = [server] # 從中讀取數據
output = [] # 將數據發送出去
select.select(input,output,input)

message_queue = {} # 消息隊列

while input:
    #等待至少一個套接字準備好處理
    print(sys.stderr, '\nwaiting for the next event')
    """
    readable中的socket鏈接表明有數據可接受,可讀取
    writable list中存放着你能夠對其進行發送(send)操做
    exceptional 當鏈接通訊出現error時會把error寫到exceptional列表中
    """
    readable, writable, exceptional = select.select(input, output, input)


    """
    readable列表中的socket可能會有3種可能狀態,
        1. 若是這個socket是server(它負責監聽客戶端的鏈接),那就表示server端已經收到一個新的鏈接
        2. 客戶端把數據發送過來了
        3. 客戶端和服務器端斷開了鏈接,將客戶端對象從 列表和隊列中刪除
    """
    for s in readable:
        if s is server: # 第一種狀況,表示有新的鏈接進來
            connection,add = s.accept() # 接受新的鏈接
            connection.setblocking(0)

            """
            這個時候,爲了避免阻塞整個程序的運行,咱們先將它放入input列表中。
            下一次loop時,就會被select去監聽,若是這個鏈接的客戶端發來了數據
            那麼這個鏈接的fd在server端就會變成就緒的,select就會把這個鏈接返回到readable列表中
            而後在 for s in readable中取出這個鏈接,開始接受數據
            """
            input.append(connection)
            message_queue[connection] = Queue()

        else: # 第2種狀況就是,客戶端把數據發送了過來
            data = s.recv(1024) # 經過recv去接受數據
            if data:
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()))
                message_queue[s].put(data) # 接受到的數據先放到隊列中
                if s not in output:
                    output.append(s) # 爲了避免影響處理與其餘客戶端的鏈接,這裏不馬上返回數據給客戶端
            else: # 第3種狀況 就是客戶端斷開了鏈接,這個時候recv()數據就是空,這個時候就能夠跟客戶端斷開鏈接
                if s in output:
                    """
                    既然斷開了鏈接,就沒有必要給客戶端發送數據了
                    若是客戶端鏈接對象還在output中,就把他刪除
                    """
                    output.remove(s)
                input.remove(s) # 在input列表中也刪除掉
                # 關閉鏈接,在隊列中也刪除
                s.close()
                del message_queue[s]


    """
    writable list也有幾種狀態,若是客戶端鏈接在跟它對應的queue裏有數據時,就把這個數據取出來再發給用戶
    不然就把這個鏈接從output中移除,這樣下一次,select調用時檢測到output列表中沒有這個鏈接,就會認爲這個鏈接處於非活動狀態
    """
    for s in writable:
        try:
            next_msg = message_queue[s].get_nowait()
        except queue.Empty as e:
            output.remove(s)
        else:
            s.send(next_msg)


    """
    若是跟某個socket鏈接通訊失敗出現錯誤,就把這個鏈接對象從 各個列表中刪除,再關閉鏈接
    """
    for s in exceptional:
        input.remove(s)
        for s in output:
            output.remove(s)
        s.close()
        del message_queue[s]

client端:oop

import socket


client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost',8800))

while True:
    msg = bytes(input("<<<"),encoding='utf-8')
    client.sendall(msg)

    data = client.recv(1024)

    print("{}".format(data))
相關文章
相關標籤/搜索