python socket應用進階配合select模塊IO多路複用

Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用。python

Windows Python:編程

    提供: select數組

Mac Python:緩存

    提供: select安全

Linux Python:服務器

    提供: select、poll、epoll網絡

 

select數據結構

select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。app

select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。異步

select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。

另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。

poll

poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。

poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。

另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。

epoll

直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。

epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。

另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

關於select模塊的select方法

返回值1, 返回值2, 返回值3 = select.select(參數1, 參數2, 參數3, 超時時間)

參數: 可接受四個參數(前三個必須)

返回值:三個列表

select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。

一、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中

二、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中

三、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中

四、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化

當 超時時間 = 1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。

socket server 端簡單實例

import socket
import select

sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)

inputs = [sk1,]

while True:
    readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
    for r in readable_list:
        # 當客戶端第鏈接服務端時爲可讀socket對象
        if sk1 == r:
            # 當可讀取對象爲socket對象的時候,將其轉變爲可接受對象再放入到inputs中
            print('accept...')
            request, address = r.accept()
            應爲accept的時候會阻塞,因此setblocking(0)可讓設置其爲非阻塞狀態
            request.setblocking(0)
            inputs.append(request)
        # 當客戶端鏈接上服務端以後,再次發送數據時
        else:
            # 當對象爲可接受對象的時候接收數據,
            received = r.recv(1024)
            # 當正常接收客戶端發送的數據時
            if received:
                print('received data:', received.decode("utf-8"))
            # 當客戶端關閉程序時移除對象
            else:
                inputs.remove(r)

sk1.close()

socket client 端簡單實例

import socket

ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)

while True:
    input_str = input('wait input:')
    sk.sendall(input_str.encode("utf-8"))
sk.close()

 

socket補充

一般狀況下聲明瞭socket以後能夠對 socket進行配置選項

關於設置socket選項setsocketopt

setsockopt(level,optname,value)

level

定義了哪一個選項將被使用。一般狀況下是SOL_SOCKET,意思是正在使用的socket選項。

它還能夠經過設置一個特殊協議號碼來設置協議選項,然而對於一個給定的操做系統,

大多數協議選項都是明確的,因此爲了簡便,它們不多用於爲移動設備設計的應用程序。

 

optname:

參數提供使用的特殊選項。關於可用選項的設置,會由於操做系統的不一樣而有少量不一樣。

若是level選定了SOL_SOCKET,那麼一些經常使用的選項

參數以下:

選項

意義

指望值

SO_BINDTODEVICE

可使socket只在某個特殊的網絡接口(網卡)有效。也許不能是移動便攜設備

一個字符串給出設備的名稱或者一個空字符串返回默認值

SO_BROADCAST

容許廣播地址發送和接收信息包。只對UDP有效。如何發送和接收廣播信息包

布爾型整數

SO_DONTROUTE

禁止經過路由器和網關往外發送信息包。這主要是爲了安全而用在以太網上UDP通訊的一種方法。無論目的地址使用什麼IP地址,均可以防止數據離開本地網絡

布爾型整數

SO_KEEPALIVE

可使TCP通訊的信息包保持連續性。這些信息包能夠在沒有信息傳輸的時候,使通訊的雙方肯定鏈接是保持的

布爾型整數

SO_OOBINLINE

能夠把收到的不正常數據當作是正常的數據,也就是說會經過一個標準的對recv()的調用來接收這些數據

布爾型整數

SO_REUSEADDR

當socket關閉後,本地端用於該socket的端口號馬上就能夠被重用。一般來講,只有通過系統定義一段時間後,才能被重用。

布爾型整數

下面用到了SO_REUSEADDR選項,具體寫法是:

S.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 這裏value設置爲1,表示將SO_REUSEADDR標記爲TRUE,操做系統會在服務器socket被關閉或服務器進程終止後立刻釋放該服務器的端口,不然操做系統會保留幾分鐘該端口。

socket客戶端實例

'''
 服務器的實現 採用select的方式
'''
import select
import socket
import sys
import queue

#建立套接字並設置該套接字爲非阻塞模式

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.setblocking(0)

#綁定套接字
server_address = ('localhost',10000)
print('starting up on %s port %s'% server_address)
server.bind(server_address)

#將該socket變成服務模式
#backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5
#這個值不能無限大,由於要在內核中維護鏈接隊列

server.listen(5)

#初始化讀取數據的監聽列表,最開始時但願從server這個套接字上讀取數據
inputs = [server]

#初始化寫入數據的監聽列表,最開始並無客戶端鏈接進來,因此列表爲空

outputs = []

#要發往客戶端的數據
message_queues = {}
while inputs:
    print('waiting for the next event')
    #調用select監聽全部監聽列表中的套接字,並將準備好的套接字加入到對應的列表中
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    #監控文件句柄有某一處發生了變化 可寫 可讀  異常屬於Linux中的網絡編程 
    #屬於同步I/O操做,屬於I/O複用模型的一種
    #rlist--等待到準備好讀
    #wlist--等待到準備好寫
    #xlist--等待到一種異常
    #處理可讀取的套接字

    '''
        若是server這個套接字可讀,則說明有新連接到來
        此時在server套接字上調用accept,生成一個與客戶端通信的套接字
        並將與客戶端通信的套接字加入inputs列表,下一次能夠經過select檢查鏈接是否可讀
        而後在發往客戶端的緩衝中加入一項,鍵名爲:與客戶端通信的套接字,鍵值爲空隊列
        select系統調用是用來讓咱們的程序監視多個文件句柄(file descrīptor)的狀態變化的。程序會停在select這裏等待,
        直到被監視的文件句柄有某一個或多個發生了狀態改變
        '''

    '''
        若可讀的套接字不是server套接字,有兩種狀況:一種是有數據到來,另外一種是連接斷開
        若是有數據到來,先接收數據,而後將收到的數據填入往客戶端的緩存區中的對應位置,最後
        將於客戶端通信的套接字加入到寫數據的監聽列表:
        若是套接字可讀.但沒有接收到數據,則說明客戶端已經斷開。這時須要關閉與客戶端鏈接的套接字
        進行資源清理
        '''
        
    for s in readable: 
        if s is server:
            connection,client_address = s.accept()
            print('connection from',client_address)
            connection.setblocking(0)#設置非阻塞
            inputs.append(connection)
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print('received "%s" from %s'% (data,s.getpeername()))
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print('closing',client_address)
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
                    
    #處理可寫的套接字
    '''
        在發送緩衝區中取出響應的數據,發往客戶端。
        若是沒有數據須要寫,則將套接字從發送隊列中移除,select中再也不監視
        '''

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print('  ',s.getpeername(),'queue empty')
            outputs.remove(s)
        else:
            print('sending "%s" to %s'%(next_msg,s.getpeername()))
            s.send(next_msg)



    #處理異常狀況

    for s in exceptional:
        for s in exceptional:
            print('exception condition on',s.getpeername())
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            del message_queues[s]

socket client 端實例:

import socket
import sys
import select

ip_port = ('127.0.0.1',10000)
sk = socket.socket()
sk.connect(ip_port)

rlist = [sys.stdin, sk]

while True:
    read_list, write_list, error_list = select.select(rlist , [], [])
    for sock in read_list:
        #incoming message from remote server
        if sock == sk:
            data = sock.recv(4096).decode("utf-8")
            if not data:
                print('\nDisconnected from chat server')
                sys.exit()
            else :
                #print data
                print("server_message:",data)
            
        #user entered a message
        elif sys.stdin in read_list :
            msg = sys.stdin.readline()
            sk.send(msg.encode("utf-8"))
sk.close()

 

實現socket異步聊天客戶端和服務端

server:

# Tcp Chat server
 
import socket, select
 
def broadcast_data (sock, message):
    # 用於發送消息的函數
    message = message.encode("utf-8")
    for socket in CONNECTION_LIST:
        if socket != server_socket and socket != sock :
            try :
                socket.send(message)
            except :
                # 若是出現錯誤則斷開鏈接並從鏈接池中刪除
                socket.close()
                CONNECTION_LIST.remove(socket)
 
if __name__ == "__main__":
    CONNECTION_LIST = []
    RECV_BUFFER = 4096
    PORT = 8002
    HOST = '127.0.0.1'
     
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((HOST, PORT))
    server_socket.listen(10)
    CONNECTION_LIST.append(server_socket)
 
    print("Chat server started on port " + str(PORT))
 
    while True:
        read_sockets, write_sockets, error_sockets = select.select(CONNECTION_LIST, [], [])
        for sock in read_sockets:
            #New connection
            if sock == server_socket:
                sockfd, addr = server_socket.accept()
                CONNECTION_LIST.append(sockfd)
                print("Client (%s, %s) connected" % addr)
                broadcast_data(sockfd, "[%s:%s] entered room\n" % addr)
             
            else:
                try:
                    data = sock.recv(RECV_BUFFER)
                    data = data.decode("utf-8")
                    if data:
                        broadcast_data(sock, "\r" + '<' + str(sock.getpeername()) + '> ' + data)                
                 
                except:
                    broadcast_data(sock, "Client (%s, %s) is offline" % addr)
                    print("Client (%s, %s) is offline" % addr)
                    sock.close()
                    CONNECTION_LIST.remove(sock)
                    continue
     
    server_socket.close()

 

client:

# Tcp Chat Client

import socket, select, string, sys
 
def prompt() :
    print('<You>', end="")
    sys.stdout.flush()
 
if __name__ == "__main__":
     
    host = '127.0.0.1'
    port = 8002
     
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(2)
     
    try :
        s.connect((host, port))
    except Exception as e:
        print('Unable to connect:', e)
        sys.exit()
     
    print('Connected to remote host. Start sending messages')
    prompt()
    rlist = [sys.stdin, s]
    while True:
        read_list, write_list, error_list = select.select(rlist , [], [])
         
        for sock in read_list:
            if sock == s:
                data = sock.recv(4096)
                if not data :
                    print('\nDisconnected from chat server')
                    sys.exit()
                else :
                    print(data.decode("utf-8"), end="")
                    prompt()
             
            else :
                msg = sys.stdin.readline()
                msg = msg.encode("utf-8")
                s.send(msg)
                prompt()
相關文章
相關標籤/搜索