Day038--Python--Gevent , IO多路複用

1. 協程: html

  gevent  (遇到IO自動切換)python

import gevent
import time
from gevent import monkey; monkey.patch_all()  # ;至關於換行

def eat(name):
    print('%s eat 1' % name)
    # gevent.sleep(1)
    time.sleep(2)    # gevent 不能識別time.sleep, from gevent import monkey; monkey.patch_all()可解決這個問題, 以後就可使用time.sleep()了
    print('%s eat 2' % name)

def play(name):
    print('%s play 1' % name)
    # gevent.sleep(1)
    time.sleep(2)
    print('%s play 2' % name)

g1 = gevent.spawn(eat, 'alex')
g2 = gevent.spawn(play, name='sylar')
# g1.join()
# g2.join()

gevent.joinall([g1, g2])
print('')

 

  gevent 之同步與異步git

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
View Code

 

 

  gevent 應用列舉:github

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('協程時間>>> %s' %(stop_time-start_time))

# 協程應用:爬蟲
print('--------------------------------')
s = time.time()
requests.get('https://www.python.org/')
requests.get('https://www.yahoo.com/')
requests.get('https://github.com/')
t = time.time()
print('串行時間>>',t-s)
View Code 協程應用: 爬蟲

  

 

參考: https://www.cnblogs.com/clschao/articles/9712056.html#_label4windows

 

 

 

 

阻塞 IO服務器

 

 

 

import socket
import time

server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8083))
server.listen(5)
print('你看看卡在哪')
while 1:
    conn, addr = server.accept()    # 等待客戶端鏈接, 阻塞住
    print('來自%s的連接請求' % addr)
    time.sleep(0.1)
View Code 阻塞IO

 

非阻塞 IO模型 網絡

 

import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
print('你看看卡在哪')
server.setblocking(False)   # 再也不阻塞等待
while 1:
    try:
        conn, addr = server.accept()
        print('來自%s的連接請求'%addr)
    except BlockingIOError:
        print('去買點藥')
    time.sleep(0.1)

 

 

 

import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
print('你看看卡在哪')
server.setblocking(False)
rlist = []
rl = []
while 1:
    try:
        conn, addr = server.accept()
        print(addr)
        rlist.append(conn)
        print('來自%s:%s的連接請求'%(addr[0],addr[1]))
    except BlockingIOError:
        print('去買點藥')

    time.sleep(0.1)   # 防止死循環一直高度佔用CPU
    print('rlist',rlist,len(rlist))
    for con in rlist:
        try:
            from_client_msg = con.recv(1024)
        except BlockingIOError:
            continue
        except ConnectionResetError:
            con.close()
            rl.append(con)
    print('>>>>',rl)
    for remove_con in rl:
        rlist.remove(remove_con)
    rl.clear()
View Code 阻塞IO的socket服務端
import socket
import time

ip_port = ('127.0.0.1',8083)

client = socket.socket()

client.connect(ip_port)

while 1:

    client.send(b'dayangge henweisuo ')
    time.sleep(0.1)
View Code 阻塞IO的socket客戶端

 

 

# 服務端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False) #設置不阻塞
r_list=[]  #用來存儲全部來請求server端的conn鏈接
w_list={}  #用來存儲全部已經有了請求數據的conn的請求數據

while 1:
    try:
        conn,addr=server.accept() #不阻塞,會報錯
        r_list.append(conn)  #爲了將鏈接保存起來,否則下次循環的時候,上一次的鏈接就沒有了
    except BlockingIOError:
        # 強調強調強調:!!!非阻塞IO的精髓在於徹底沒有阻塞!!!
        # time.sleep(0.5) # 打開該行註釋純屬爲了方便查看效果
        print('在作其餘的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍歷讀列表,依次取出套接字讀取內容
        del_rlist=[] #用來存儲刪除的conn鏈接
        for conn in r_list:
            try:
                data=conn.recv(1024) #不阻塞,會報錯
                if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下數據
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
                continue
            except ConnectionResetError: # 當前套接字出異常,則關閉,而後加入刪除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍歷寫列表,依次取出套接字發送內容
        print('wlist: ', len(w_list))
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理無用的套接字,無需再監聽它們的IO操做
        for conn in del_rlist:
            r_list.remove(conn)
        #del_rlist.clear() #清空列表中保存的已經刪除的內容
        for conn in del_wlist:
            w_list.pop(conn)
        #del_wlist.clear()
View Code 完整版的非阻塞IO服務端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))


##多線程的客戶端請求版本
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(20):
#     threading.Thread(target=func).start()
View Code 完整版的非阻塞IO客戶端

 

 

 

select IO多路複用 (重點)多線程

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。  這個圖和blockingIO的圖其實並無太大的不一樣,事實上還更差一些。由於它不只阻塞了還多須要使用兩個系統調用(select和recvfrom),而blockingIO只調用了一個系統調用(recvfrom),當只有一個鏈接請求的時候,這個模型還不如阻塞IO效率高。可是,用select的優點在於它能夠同時處理多個connection,而阻塞IO那裏不能,我無論阻塞不阻塞,你全部的鏈接包括recv等操做,我都幫你監聽着(以什麼形式監聽的呢?先不要考慮,下面會講的~~),其中任何一個有變更(有連接,有數據),我就告訴你用戶,那麼你就能夠去調用這個數據了,這就是他的NB之處。這個IO多路複用模型機制是操做系統幫咱們提供的,在windows上有這麼個機制叫作select,那麼若是咱們想經過本身寫代碼來控制這個機制或者本身寫這麼個機制,咱們可使用python中的select模塊來完成上面這一系列代理的行爲。在一切皆文件的unix下,這些能夠接收數據的對象或者鏈接,都叫作文件描述符fd
IO多路複用
import select

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])

參數: 可接受四個參數(前三個必須)
    rlist: wait until ready for reading  #等待讀的對象,你須要監聽的須要獲取數據的對象列表
    wlist: wait until ready for writing  #等待寫的對象,你須要寫一些內容的時候,input等等,也就是說我會循環他看看是否有須要發送的消息,若是有我取出這個對象的消息併發送出去,通常用不到,這裏咱們也給一個[]。
    xlist: wait for an 「exceptional condition」  #等待異常的對象,一些額外的狀況,通常用不到,可是必須傳,那麼咱們就給他一個[]。
    timeout: 超時時間
    當超時時間 = n(正整數)時,那麼若是監聽的句柄均無任何變化,則select會阻塞n秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。
返回值:三個列表與上面的三個參數列表是對應的
  select方法用來監視文件描述符(當文件描述符條件不知足時,select會阻塞),當某個文件描述符狀態改變後,會返回三個列表
    1、當參數1 序列中的fd知足「可讀」條件時,則獲取發生變化的fd並添加到fd_r_list中
    2、當參數2 序列中含有fd時,則將該序列中全部的fd添加到 fd_w_list中
    3、當參數3 序列中的fd發生錯誤時,則將該發生錯誤的fd添加到 fd_e_list中
    四、當超時時間爲空,則select會一直阻塞,直到監聽的句柄發生變化
View Code介紹
#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設置爲非阻塞
server.setblocking(False)

# 初始化將服務端socket對象加入監聽列表,後面還要動態添加一些conn鏈接對象,當accept的時候sk就有感應,當recv的時候conn就有動靜
rlist=[server,]
rdata = {}  #存放客戶端發送過來的消息

wlist=[]  #等待寫對象
wdata={}  #存放要返回給客戶端的消息

print('預備!監聽!!!')
count = 0 #寫着計數用的,爲了看實驗效果用的,沒用
while True:
    # 開始 select 監聽,對rlist中的服務端server進行監聽,select函數阻塞進程,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手信號,從而變得可讀,知足select函數的「可讀」條件),被觸發的(有動靜的)套接字(服務器套接字)返回給了rl這個返回值裏面;
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print('%s 次數>>'%(count),wl)
    count = count + 1
    # 對rl進行循環判斷是否有客戶端鏈接進來,當有客戶端鏈接進來時select將觸發
    for sock in rl:
        # 判斷當前觸發的是否是socket對象, 當觸發的對象是socket對象時,說明有新客戶端accept鏈接進來了
        if sock == server:
            # 接收客戶端的鏈接, 獲取客戶端對象和客戶端地址信息
            conn,addr=sock.accept()
            #把新的客戶端鏈接加入到監聽列表中,當客戶端的鏈接有接收消息的時候,select將被觸發,會知道這個鏈接有動靜,有消息,那麼返回給rl這個返回值列表裏面。
            rlist.append(conn)
        else:
            # 因爲客戶端鏈接進來時socket接收客戶端鏈接請求,將客戶端鏈接加入到了監聽列表中(rlist),客戶端發送消息的時候這個鏈接將觸發
            # 因此判斷是不是客戶端鏈接對象觸發
            try:
                data=sock.recv(1024)
                #沒有數據的時候,咱們將這個鏈接關閉掉,並從監聽列表中移除
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                print("received {0} from client {1}".format(data.decode(), sock))
                #將接受到的客戶端的消息保存下來
                rdata[sock] = data.decode()

                #將客戶端鏈接對象和這個對象接收到的消息加工成返回消息,並添加到wdata這個字典裏面
                wdata[sock]=data.upper()
                #須要給這個客戶端回覆消息的時候,咱們將這個鏈接添加到wlist寫監聽列表中
                wlist.append(sock)
            #若是這個鏈接出錯了,客戶端暴力斷開了(注意,我尚未接收他的消息,或者接收他的消息的過程當中出錯了)
            except Exception:
                #關閉這個鏈接
                sock.close()
                #在監聽列表中將他移除,由於無論什麼緣由,它畢竟是斷開了,不必再監聽它了
                rlist.remove(sock)
    # 若是如今沒有客戶端請求鏈接,也沒有客戶端發送消息時,開始對發送消息列表進行處理,是否須要發送消息
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

    # #將一次select監聽列表中有接收數據的conn對象所接收到的消息打印一下
    # for k,v in rdata.items():
    #     print(k,'發來的消息是:',v)
    # #清空接收到的消息
    # rdata.clear()

---------------------------------------
#客戶端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()

select網絡IO模型的示例代碼
select網絡IO模型示例代碼

 

 

 select IO多路複用 異步併發

異步IO  asyncio (很厲害, 高併發)app

相關文章
相關標籤/搜索