協程與IO多路複用

 

                                IO多路複用                          

   I/O多路複用 : 經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做.python

  Pythongit

  Python中有一個select模塊,其中提供了 : select , poll , epoll ,三個方法,分別調用系統的select , poll , epoll ,從而實現IO多路複用.程序員

 

       注意:網絡操做、文件操做、終端操做等均屬於IO操做,對於windows只支持Socket操做,其餘系統支持其餘IO操做,可是沒法檢測 普通文件操做 自動上次讀取是否已經變化。github

         單線程實現併發                                     web

    對於select方法:編程

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間)
 
參數: 可接受四個參數(前三個必須)
返回值:三個列表
 
select方法用來監視文件句柄,若是句柄發生變化,則獲取該句柄。
1、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中
2、當 參數2 序列中含有句柄時,則將該序列中全部的句柄添加到 返回值2 序列中
3、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中
4、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化
   當 超時時間 = 1時,那麼若是監聽的句柄均無任何變化,則select會阻塞 1 秒,以後返回三個空列表,若是監聽的句柄有變化,則直接執行。

 

import socket
import select

client1 = socket.socket()
client1.setblocking(False) # 百度建立鏈接: 非阻塞(本來要等待回覆或者鏈接,如今是發送以後再阻塞的時間就能夠去執行別的任務

try:
    client1.connect(('www.baidu.com',80))  #執行了,可是報錯了
except BlockingIOError as e:
    pass

client2 = socket.socket()
client2.setblocking(False) # 搜狗建立鏈接: 非阻塞
try:
    client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
    pass


client3 = socket.socket()
client3.setblocking(False) # 搜狐建立鏈接: 非阻塞
try:
    client3.connect(('www.souhu.com',80))
except BlockingIOError as e:
    pass

socket_list = [client1,client2,client3]
conn_list = [client1,client2,client3]

while True:
    """
    socket_list:檢測是否服務端給我返回數據,可讀
    conn_list:檢測其中的全部socket是否已經和服務端鏈接成功,可寫
    rlist :就是有 [client2 , client3 不必定有1,2,3哪個]
    wlist:[client1,client2]
    [] : 是鏈接不成功的放這裏
    0.005 : 最多0.005秒檢測一次,檢測是否鏈接成功 / 返回數據

    """
    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
    # wlist中表示已經鏈接成功的socket對象
    for sk in wlist:
        if sk == client1:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
        elif sk==client2:
            sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n')
        else:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.souhu.com\r\n\r\n')
        conn_list.remove(sk)
        """
        鏈接成功以後就不用繼續監聽是否鏈接成功了,全部剔除
        """
    for sk in rlist:
        chunk_list = []
        while True:
            """
            不阻塞,可是若是沒有了會報錯,都接受以後
            """
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
            except BlockingIOError as e:
                break
        body = b''.join(chunk_list)
        print('------------>',body)
        sk.close()
        socket_list.remove(sk)
    if not socket_list:
        break
import socket
import select

class Req(object):
    def __init__(self,sk,func):
        self.sock = sk
        self.func = func

    def fileno(self):
        return self.sock.fileno()


class Nb(object):

    def __init__(self):
        self.conn_list = []
        self.socket_list = []

    def add(self,url,func):
        client = socket.socket()
        client.setblocking(False)  # 非阻塞
        try:
            client.connect((url, 80))
        except BlockingIOError as e:
            pass
        obj = Req(client,func)
        self.conn_list.append(obj)
        self.socket_list.append(obj)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005)
            # wlist中表示已經鏈接成功的req對象
            for sk in wlist:
                # 發生變換的req對象
                sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
                self.conn_list.remove(sk)
            for sk in rlist:
                chunk_list = []
                while True:
                    try:
                        chunk = sk.sock.recv(8096)
                        if not chunk:
                            break
                        chunk_list.append(chunk)
                    except BlockingIOError as e:
                        break
                body = b''.join(chunk_list)
                # print(body.decode('utf-8'))
                sk.func(body)
                sk.sock.close()
                self.socket_list.remove(sk)
            if not self.socket_list:
                break


def baidu_repsonse(body):
    print('百度下載結果:',body)

def sogou_repsonse(body):
    print('搜狗下載結果:', body)

def souhu_repsonse(body):
    print('老男孩下載結果:', body)


t1 = Nb()
t1.add('www.baidu.com',baidu_repsonse)
t1.add('www.sogou.com',sogou_repsonse)
t1.add('www.souhu.com',oldboyedu_repsonse)
t1.run()
高級版

 

  基於事件循環實現的異步非阻塞框架   Twisted           windows

                  異步 : 執行完某個任務後自動調用分配的函數網絡

                  非阻塞 : 不等待 (給個任務以後不用等待回覆)併發

from lzl import Nb

def baidu_repsonse(body):
    print('百度下載結果:',body)

def sogou_repsonse(body):
    print('搜狗下載結果:', body)

def souhu_repsonse(body):
    print('搜狐下載結果:', body)


t1 = Nb()
t1.add('www.baidu.com',baidu_repsonse)
t1.add('www.sogou.com',sogou_repsonse)
t1.add('www.souhu.com',oldboyedu_repsonse)
t1.run()
#鏈接網絡請求以後,不用去等着回覆直接去執行後面相對應的函數

            總結 : app

    1. IO多路複用的做用 : 檢測多個socket是否發送變化 (三種模式 select , poll, epoll .Windows系統只支持 select).

    2.異步非阻塞:

      異步:通知,執行以後自動執行回調函數或自動執行某些操做(通知).

      非阻塞 : 不等待 :

        好比:建立socket對某個地址進行 connect,獲取接收數據recv時默認都會等待(鏈接成功或接收數據),才執行後續操做.-----若是設置了 setblocking(False),以上兩個過程就不在等待,可是會報錯-BiockingIOError的錯誤,只要捕獲便可.

    3.同步阻塞 :

      阻塞: 等待

      同步 : 按照順序逐步執行.

key_list = ['ab','db','sb']
for item in key_list:
    ret = requests.get('https://www.baidu.com/s?wd=%s' %item)
    print(ret.text)
        
同步阻塞

 

 

                                  協程                          

       概念      

     協程 : 是單線程下的併發,又稱"微線程",是由程序員創造出來的一個不是真實存在的東西.

    做用是 : 對一個線程進行切片,使得線程在代碼塊之間進行來回切換執行任務,而不是原來逐行執行.

  注意 : 

#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
#2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)

  對比操做系統的線程的切換,用戶在單線程內控制協程的切換 :

#優勢:
#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
#2. 單線程內就能夠實現併發的效果,最大限度地利用cpu


#缺點:
#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
#2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

       協程的特色 :  

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

        greenlet   模塊          

   先安裝greenlet模塊 ; pip3 install greenlet

import greenlet


def f1():
    print(11)
    gr2.switch()
    print(22)
    gr2.switch()


def f2():
    print(33)
    gr1.switch()
    print(44)


# 協程 gr1
gr1 = greenlet.greenlet(f1)
# 協程 gr2
gr2 = greenlet.greenlet(f2)

gr1.switch()

結果:
11
33
22
44

       注意 :單純的切換有時候還會下降程序的執行速度,greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。

單線程裏的這20個任務的代碼一般會既有計算操做又有阻塞操做,咱們徹底能夠在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提升效率,這就用到了Gevent模塊。

因此說 當--- 協程 + 遇到IO就切換 ===厲害了!!!

 

        gevent 模塊           

    安裝 : pip3 install gevent 

from gevent import monkey
monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 協程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 協程3
])

#結果的順序是不肯定的,A,B,C三個請求,當執行A的時候遇到阻塞,那麼久在阻塞的這個時間段去執行不阻塞的B或者C,當執行B的時候遇到阻塞就去執行A/C,在網絡編程中會有請求時候的阻塞和接收時候的阻塞.

              總結 : 

      1.協程能提升併發嗎 ?

       答:協程自己是沒法提升併發的,可是協程+IO切換能夠.

      2.單線程提升併發的方法:

         --- 協程+IO切換  gevent

         --- 基於事件循環的異步非阻塞框架   Twisted

      3.進程,線程,協程的區別 : ★★★★★★★★★★★★

        --- 進程是資源分配的最小單位,線程是CPU調度的最小單位.

             --- 在一個程序中能夠有多個進行,一個進程最少有一個線程.

        --- 和其餘語言相比較,其餘語言幾乎不用進程的,可是在Python中,它的進程和線程是有差別的,Python有個GIL鎖,GIL鎖保證一個進程在同一時刻只有一個線程被CPU調到.

        --- 對於協程來講,它是有程序員創造出來的不是一個真實存在的東西,它自己是沒有意義的,可是當 協程+IO切換放到一塊兒的時候就能夠提升單線程併發的性能.

相關文章
相關標籤/搜索