python--(協程 和 I/O多路複用)

python--(協程 和 I/O多路複用)python

一.協程linux

1.  >>>單線程下實現併發, 最大化線程的效率, 檢測 IO 並自動切換,程序級別的任務切換, 以前多進程多線程都是系統級別的切換, 程序級別的切換比系統要快的多.編程

 

#協程:單線程下實現併發
#併發:僞並行,遇到IO就切換,單核下多個任務之間切換執行,給你的效果就是貌似你的幾個程序在同時執行.提升效率  #多線程多進程下的任務切換+保存狀態是操做系統
#任務切換 + 保存狀態
#並行:多核cpu,真正的同時執行
#串行:一個任務執行完在執行另一個任務
windows

 

# 串行
# import time
#
# def func1():
#     time.sleep(1)
#     print('func1')
#
# def func2():
#     time.sleep(2)
#     print('func2')
#
# if __name__ == '__main__':
#     func1()
#     func2()
串行

 

#基於yield併發執行,多任務之間來回切換,這就是個簡單的協程的體現,可是他不能節省I/O時間.服務器

import time
def consumer():
    '''任務1:接收數據,處理數據'''
    while True:
        x=yield
        # time.sleep(1) #發現什麼?只是進行了切換,可是並無節省I/O時間
        print('處理了數據:',x)
def producer():
    '''任務2:生產數據'''
    g=consumer()
    # print('asdfasfasdf')
    next(g)  #找到了consumer函數的yield位置
    for i in range(3):
    # for i in range(10000000):
        g.send(i)  #給yield傳值,而後再循環給下一個yield傳值,而且多了切換的程序,比直接串行執行還多了一些步驟,致使執行效率反而更低了。
        print('發送了數據:',i)
start=time.time()
#基於yield保存狀態,實現兩個任務直接來回切換,即併發的效果
#PS:若是每一個任務中都加上打印,那麼明顯地看到兩個任務的打印是你一次我一次,即併發執行的.
producer() #我在當前線程中只執行了這個函數,可是經過這個函數裏面的send切換了另一個任務
stop=time.time()

# 串行執行的方式
res=producer()
consumer(res)
stop=time.time()

print(stop-start)
View Code
import time
# def consumer():
#     for i in range(10):
#         # x=yield
#         time.sleep(1)
#         print('處理了數據:',i)
# def producer():
#     g=consumer()
#     next(g)
#     for i in range(3):
#         g.send(i)
#         print('發送了數據:',i)
# #
# start=time.time()
# producer()
# stop=time.time()
# print(stop-start)


# import time
# def consumer():
#     for i in range(4):
#         time.sleep(1)
#
#         print('處理了數據:',i)
# def producer():
#     for i in range(3):
#         print('發送了數據:',i)
#
# start=time.time()
# consumer() #3.00097393989563
# producer()
# stop=time.time()
# print('>>>>>',stop-start)

import time
def consumer():
    for i in range(4):
        x = yield
        time.sleep(1)
        print('處理了數據:',i)
def producer():
    g = consumer()
    next(g)
    for i in range(3):
        g.send(i)
        print('發送了數據:',i)

# greenlet


# start=time.time()
# producer()
# stop=time.time()
# print(stop-start)
經過生成器實現單線程下的併發

 


2.  Greenlet:  多線程

#安裝:  pip3 install greenlet  併發

>>>任務切換 + 保存狀態,沒有實現IO自動切換,app

>>>greenlet只是提供了一種比 generator 更加便捷的切換方式, 當切到一個任務時若是遇到io, 那就原地阻塞, 仍然是沒有解決遇到IO自動切換提高效率的問題.異步

import gevent
from gevent import monkey
monkey.patch_all()
import time

def eat(name):
    print('%s eat 1' %name)
    # gevent.sleep(2)
    time.sleep(2)

    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    # gevent.sleep(2)
    time.sleep(2)
    print('%s play 2' %name)

g1=gevent.spawn(eat,'egon') #異步執行這個eat任務,後面egon就是給他傳的參數
g2=gevent.spawn(play,name='egon')
# g1.join()
# g2.join()
gevent.joinall([g1,g2])

print('')
gevent的使用

 

3.Gevent  socket

#安裝:  pip3 install gevent

>>>任務切換 + 保存狀態,實現了IO自動切換,而且經過monkey 可以識別到基本上全部的IO操做.

>>>Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

1 = gevent.spawn(func, 1, 2, 3, x=4, y=5)
#建立一個協程對象g1, spawn口號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置
實參或關鍵字實參,都是傳給函數eat的
g2 = gevent.spawn(func2)

g1.join()   #等待g1結束,上面只是建立協程對象,這個join纔是去執行
g2.join() #等待g2結束  有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,可是你會發現,若是g2裏面的任務執行的時間長,可是不寫join的話,就不會執行完等到g2剩下的任務了
#或者上述兩步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

 

import gevent
from gevent import monkey
monkey.patch_all()
import time

def eat(name):
    print('%s eat 1' %name)
    # gevent.sleep(2)
    time.sleep(2)

    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    # gevent.sleep(2)
    time.sleep(2)
    print('%s play 2' %name)

g1=gevent.spawn(eat,'egon') #異步執行這個eat任務,後面egon就是給他傳的參數
g2=gevent.spawn(play,name='egon')

# g1.join()
# g2.join()
gevent.joinall([g1,g2])

print('')
gevent用法

 

 

二.I/O模型介紹

對於network IO 他會涉及兩個系統對象: 
# 1.調用IO的Process or thread
# 2.系統內核(kernel)
當一個read/recv讀數據的操做發生時,該操做會經歷兩個階段:
# 1)等待數據準備 (Waiting for the data to be ready)
# 2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
補充:
# 1.輸入操做:read、readv、recv、recvfrom、recvmsg共5個函數,若是會阻塞狀態,則會經歷 # wait data和copy data兩個階段,若是設置爲非阻塞則在wait 不到data時拋出異常
#二、輸出操做:write、writev、send、sendto、sendmsg共5個函數,在發送緩衝區滿了會阻塞在原地,若是設置爲非阻塞,則會拋出異常
#三、接收外來連接:accept,與輸入操做相似
#四、發起外出連接:connect,與輸出操做相似

 

1.阻塞IO模型  blocking

 

IO 

 

 

2.非阻塞IO模型

>>>徹底沒有阻塞,不推薦使用

import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) rlist = [] rl = [] while 1: try: conn, addr = server.accept() print(addr) rlist.append(conn) print('來自%s:%s的連接請求'%(addr[0],addr[1])) except BlockingIOError: print('去買點藥') # time.sleep(0.1) print('rlist',rlist,len(rlist)) for con in rlist: try: from_client_msg = con.recv(1024) except BlockingIOError: continue except ConnectionResetError: con.close() rl.append(con) print('>>>>',rl) for remove_con in rl: rlist.remove(remove_con) rl.clear()
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) while 1: try: conn, addr = server.accept() print('來自%s的連接請求'%addr) except BlockingIOError: print('去買點藥') time.sleep(0.1)
非阻塞IO
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) rlist = [] rl = [] while 1: try: conn, addr = server.accept() print(addr) rlist.append(conn) print('來自%s:%s的連接請求'%(addr[0],addr[1])) except BlockingIOError: print('去買點藥') # time.sleep(0.1) print('rlist',rlist,len(rlist)) for con in rlist: try: from_client_msg = con.recv(1024) except BlockingIOError: continue except ConnectionResetError: con.close() rl.append(con) print('>>>>',rl) for remove_con in rl: rlist.remove(remove_con) rl.clear()
非阻塞IO的socket服務端

 

import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) print('你看看卡在哪') server.setblocking(False) rlist = [] rl = [] while 1: try: conn, addr = server.accept() print(addr) rlist.append(conn) print('來自%s:%s的連接請求'%(addr[0],addr[1])) except BlockingIOError: print('去買點藥') # time.sleep(0.1) print('rlist',rlist,len(rlist)) for con in rlist: try: from_client_msg = con.recv(1024) except BlockingIOError: continue except ConnectionResetError: con.close() rl.append(con) print('>>>>',rl) for remove_con in rl: rlist.remove(remove_con) rl.clear()
非阻塞塞IO的socket服務端
# 服務端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False) #設置不阻塞
r_list=[]  #用來存儲全部來請求server端的conn鏈接
w_list={}  #用來存儲全部已經有了請求數據的conn的請求數據

while 1:
    try:
        conn,addr=server.accept() #不阻塞,會報錯
        r_list.append(conn)  #爲了將鏈接保存起來,否則下次循環的時候,上一次的鏈接就沒有了
    except BlockingIOError:
        # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!!
        # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果
        print('在作其餘的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍歷讀列表,依次取出套接字讀取內容
        del_rlist=[] #用來存儲刪除的conn鏈接
        for conn in r_list:
            try:
                data=conn.recv(1024) #不阻塞,會報錯
                if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
                continue
            except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍歷寫列表,依次取出套接字發送內容
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理無用的套接字,無需再監聽它們的IO操做
        for conn in del_rlist:
            r_list.remove(conn)
        #del_rlist.clear() #清空列表中保存的已經刪除的內容
        for conn in del_wlist:
            w_list.pop(conn)
        #del_wlist.clear()

#客戶端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))


##多線程的客戶端請求版本
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(20):
#     threading.Thread(target=func).start()
非阻塞IO模型
import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) #設置不阻塞 r_list=[] #用來存儲全部來請求server端的conn鏈接 w_list={} #用來存儲全部已經有了請求數據的conn的請求數據 while 1: try: conn,addr=server.accept() #不阻塞,會報錯 r_list.append(conn) #爲了將鏈接保存起來,否則下次循環的時候,上一次的鏈接就沒有了 except BlockingIOError: # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!! # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果 print('在作其餘的事情') # print('rlist: ',len(r_list)) # print('wlist: ',len(w_list)) # 遍歷讀列表,依次取出套接字讀取內容 del_rlist=[] #用來存儲刪除的conn鏈接 for conn in r_list: try: data=conn.recv(1024) #不阻塞,會報錯 if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據  conn.close() del_rlist.append(conn) continue w_list[conn]=data.upper() except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收 continue except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除  conn.close() del_rlist.append(conn) # 遍歷寫列表,依次取出套接字發送內容 del_wlist=[] for conn,data in w_list.items(): try: conn.send(data) del_wlist.append(conn) except BlockingIOError: continue # 清理無用的套接字,無需再監聽它們的IO操做 for conn in del_rlist: r_list.remove(conn) #del_rlist.clear() #清空列表中保存的已經刪除的內容 for conn in del_wlist: w_list.pop(conn) #del_wlist.clear()
完整的IO非阻塞模板

雖然咱們上面的代碼經過設置非阻塞,規避了IO操做,可是非阻塞IO模型毫不被推薦。

  咱們不可否則其優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在「」同時「」執行)。

  可是也難掩其缺點:

#1. 循環調用recv()將大幅度推高CPU佔用率;這也是咱們在代碼中留一句time.sleep(2)的緣由,不然在低配主機下極容易出現卡機狀況 #2. 任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

 

3.IO多路複用:三種機制

  Select: 代理監聽全部的須要使用的對象,輪訓本身監聽的那個列表.windows  linux

  Poll:  沒有監聽數量的限制  linux

  Epoll:  回調機制  Linux

  Seletor:  根據系統自動選擇一個最優的機制

 

python中的select模塊:

複製代碼
import select fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout]) 參數: 可接受四個參數(前三個必須) rlist: wait until ready for reading #等待讀的對象,你須要監聽的須要獲取數據的對象列表 wlist: wait until ready for writing #等待寫的對象,你須要寫一些內容的時候,input等等,也就是說我會循環他看看是否有須要發送的消息,若是有我取出這個對象的消息併發送出去,通常用不到,這裏咱們也給一個[]。 xlist: wait for an 「exceptional condition」 #等待異常的對象,一些額外的狀況,通常用不到,可是必須傳,那麼咱們就給他一個[]。  timeout: 超時時間 當超時時間 = n(正整數)時,那麼若是監聽的句柄均無任何變化,則select會阻塞n秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。 返回值:三個列表與上面的三個參數列表是對應的   select方法用來監視文件描述符(當文件描述符條件不知足時,select會阻塞),當某個文件描述符狀態改變後,會返回三個列表 1、當參數1 序列中的fd知足「可讀」條件時,則獲取發生變化的fd並添加到fd_r_list中 2、當參數2 序列中含有fd時,則將該序列中全部的fd添加到 fd_w_list中 3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中 四、當超時時間爲空,則select會一直阻塞,直到監聽的句柄發生變化

 

 

  結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接  

#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設置爲非阻塞
server.setblocking(False)

# 初始化將服務端socket對象加入監聽列表,後面還要動態添加一些conn鏈接對象,當accept的時候sk就有感應,當recv的時候conn就有動靜
rlist=[server,]
rdata = {}  #存放客戶端發送過來的消息

wlist=[]  #等待寫對象
wdata={}  #存放要返回給客戶端的消息

print('預備!監聽!!!')
count = 0 #寫着計數用的,爲了看實驗效果用的,沒用
while True:
    # 開始 select 監聽,對rlist中的服務端server進行監聽,select函數阻塞進程,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手信號,從而變得可讀,知足select函數的「可讀」條件),被觸發的(有動靜的)套接字(服務器套接字)返回給了rl這個返回值裏面;
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print('%s 次數>>'%(count),wl)
    count = count + 1
    # 對rl進行循環判斷是否有客戶端鏈接進來,當有客戶端鏈接進來時select將觸發
    for sock in rl:
        # 判斷當前觸發的是否是socket對象, 當觸發的對象是socket對象時,說明有新客戶端accept鏈接進來了
        if sock == server:
            # 接收客戶端的鏈接, 獲取客戶端對象和客戶端地址信息
            conn,addr=sock.accept()
            #把新的客戶端鏈接加入到監聽列表中,當客戶端的鏈接有接收消息的時候,select將被觸發,會知道這個鏈接有動靜,有消息,那麼返回給rl這個返回值列表裏面。
            rlist.append(conn)
        else:
            # 因爲客戶端鏈接進來時socket接收客戶端鏈接請求,將客戶端鏈接加入到了監聽列表中(rlist),客戶端發送消息的時候這個鏈接將觸發
            # 因此判斷是不是客戶端鏈接對象觸發
            try:
                data=sock.recv(1024)
                #沒有數據的時候,咱們將這個鏈接關閉掉,並從監聽列表中移除
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                print("received {0} from client {1}".format(data.decode(), sock))
                #將接受到的客戶端的消息保存下來
                rdata[sock] = data.decode()

                #將客戶端鏈接對象和這個對象接收到的消息加工成返回消息,並添加到wdata這個字典裏面
                wdata[sock]=data.upper()
                #須要給這個客戶端回覆消息的時候,咱們將這個鏈接添加到wlist寫監聽列表中
                wlist.append(sock)
            #若是這個鏈接出錯了,客戶端暴力斷開了(注意,我尚未接收他的消息,或者接收他的消息的過程當中出錯了)
            except Exception:
                #關閉這個鏈接
                sock.close()
                #在監聽列表中將他移除,由於無論什麼緣由,它畢竟是斷開了,不必再監聽它了
                rlist.remove(sock)
    # 若是如今沒有客戶端請求鏈接,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否須要發送消息
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

    # #將一次select監聽列表中有接收數據的conn對象所接收到的消息打印一下
    # for k,v in rdata.items():
    #     print(k,'發來的消息是:',v)
    # #清空接收到的消息
    # rdata.clear()

---------------------------------------
#客戶端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
select運用IO多路複用

 

 


4.異步IO操做

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel操做系統會等待數據(阻塞)準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

相關文章
相關標籤/搜索