python3.x Day6 IO多路複用

IO多路複用
import asyncio 這個是異步IO模塊 這個還不知道怎麼用

select poll epoll 都是IO多路複用

windows 僅支持select

linux2.6之後 支持epoll epoll是至關厲害的

詳細的描述參考:http://www.cnblogs.com/alex3714/articles/5876749.html

select、poll、epoll區別:http://www.cnblogs.com/alex3714/p/4372426.html

先來寫一個用select方式的socket模型
import select #引入select模塊,這是利用操做系統的select處理方式,windows、linux、unix都支持
import socket
import sys
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('0.0.0.0',9999)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏

    for s in readable: #每一個s就是一個socket

        if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
            #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
            #新鏈接進來了,接受這個鏈接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
            #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
            #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的

            message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送

        else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
            #客戶端的數據過來了,在這接收
            try:
                data = s.recv(1024)
            except ConnectionResetError as e:
                data=None
                print(e)
                print("客戶端斷開了", s)
                if s in outputs:
                    outputs.remove(s)  # 清理已斷開的鏈接
                inputs.remove(s)  # 清理已斷開的鏈接
                del message_queues[s]  ##清理已斷開的鏈接
            if data:
                print("收到來自[%s]的數據:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
                if s not  in outputs:
                    outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端


            else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
                print("客戶端斷開了",s)

                if s in outputs:
                    outputs.remove(s) #清理已斷開的鏈接
                if s in inputs:
                    inputs.remove(s) #清理已斷開的鏈接
                if s in message_queues.keys():
                    del message_queues[s] ##清理已斷開的鏈接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]

再來一個升級版本的,selectors模塊,它底層是看操做系統的,默認是epoll,可是若是不支持,好比windows,linux kernel < 2.6,就用select模式html

import selectors,socket #selectors 超牛的IO多路複用的模塊,支持select poll epoll ,優先epoll

sel=selectors.DefaultSelector()

def accpect(sock,mask):  #建立業務鏈接的方法
    conn,addr=sock.accept()
    print("accept:",conn,"from:",addr)
    conn.setblocking(False)
    sel.register(fileobj=conn,events=selectors.EVENT_READ,data=read)

def read(conn,mask):    #鏈接之後 執行業務的方法
    data=conn.recv(1024)
    if data:
        print("recv data:",data,"conn:",conn)
        # print(conn.getsockname())
        conn.send(data)
    else:
        print("close:",conn)
        sel.unregister(fileobj=conn)
        conn.close()

server=socket.socket()
server.bind(("0.0.0.0",9999))
server.listen(10000)
server.setblocking(False)


sel.register(fileobj=server,events=selectors.EVENT_READ,data=accpect) #只須要註冊須要併發使用IO的應用,明確對應的回調data內容就好了


while True:
    events=sel.select()
    print("已經註冊sel數量:")
    for key,mask in events:
        callback=key.data #獲取註冊時的sel.register()方法參數中的data對應的內容,以前放進去的有accpect和read函數
        callback(key.fileobj,mask)

這裏注意,有個很是很是尷尬的問題情景:linux

需求:windows

一、基於socket多線程

二、要多進程處理業務(多線程因爲GIL鎖,多核也沒法真正同時刻處理,因此用多進程)併發

三、進程內必須是selectors處理鏈接app

此時,就很是尷尬了,緣由是多進程multiprocessing模塊,IO多路複用selectors模塊,他倆有矛盾:異步

矛盾就是:socket

multiprocessing就是認爲socket是阻塞的,async

selectors要求socket必須是非阻塞的,否則沒辦法監聽活動。函數

那麼socket怎麼辦呢,這個問題,在windows還好,由於select方式仍是慢,顯不出來,linux epoll超快的,直接就嚴重影響業務了

我是這麼解決這個矛盾的

須要屢次接收或發送 或者 循環發送循環接收的業務情景,selectors監聽到後,步驟:

一、從selectors實例中註銷掉這個業務鏈接,

二、設定這個業務鏈接爲阻塞,

三、開始進行上述情景業務,

四、再設定爲非阻塞,

五、從新註冊回selectors

相關文章
相關標籤/搜索