Python 之協程

 

多線程併發、包括線程池,是操做系統控制的併發。若是是單線程,能夠經過協程實現單線程下的併發。python

協程 又稱微線程,是一種用戶態的輕量級線程,由用戶程序本身控制調度。多線程

python的線程屬於內核級別的,由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)併發

而單線程裏開啓協程,一旦遇到io,由用戶本身控制調度。app

特色:異步

一、單線程裏併發socket

二、修改共享數據不需枷鎖函數

三、用戶程序裏保存多個控制流的上下文棧spa

四、附加:一個協程遇到IO操做自動切換到其餘協程(yield,greenlet都沒法實現檢測IO,用gevent模塊的select機制能夠)操作系統

併發:線程

# 注意到consumer函數是一個generator(生成器):
# 任何包含yield關鍵字的函數都會自動成爲生成器(generator)對象
def consumer():
    r = ''
    while True:
        # 三、consumer經過yield拿到消息,處理,又經過yield把結果傳回;
        #    yield指令具備return關鍵字的做用。而後函數的堆棧會自動凍結(freeze)在這一行。
        #    當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,
        #    就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。經過這種方式,迭代器能夠實現無限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'


def produce():
    # 一、首先調用c.next()啓動生成器
    c = consumer()
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 二、而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
        cr = c.send(n)
        # 四、produce拿到consumer處理的結果,繼續生產下一條消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 五、produce決定不生產了,經過c.close()關閉consumer,整個過程結束。
    c.close()

if __name__ == '__main__':
    start_time = time.time()
    # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,在一個線程裏協做完成。
    res = produce()
    # consumer(res)
    end_time = time.time()
    print(end_time-start_time)

 

串行:

def produce():
    n = 0
    res = []
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        res.append(n)
    return res

def consumer(res):
    pass

if __name__ == '__main__':
    start_time = time.time()
    # 六、整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,在一個線程裏協做完成。
    res = produce()
    consumer(res)
    end_time = time.time()
    print(end_time-start_time)

 

greenlet模塊,方便切換,但遇到IO時,不會自動切換到其餘任務

from greenlet import greenlet

def eat(name):
    print('%s is eating 1'%name)
    gr2.switch('zhanwu')
    print('%s is eating 2' % name)
    gr2.switch()

def play(name):
    print('%s is playing 1' % name)
    gr1.switch()
    print('%s is playing 2' % name)

gr1 = greenlet(eat)
gr2 = greenlet(play)
gr1.switch('egon')  #切換的時候傳參

 

gevent模塊,異步提交任務,遇到IO時可實現自動切換。

異步提交任務後,主線程結束的話,會致使子線程任務完不成,經過sleep或join實現主線程不死直到子線程運行結束。

from gevent import monkey
monkey.patch_all()   #將下面的阻塞操做變爲非阻塞操做,這是用gevent必須的
def eat(name):
    print('%s is eating 1'%name)
    gevent.sleep(3)
    print('%s is eating 2' % name)

def play(name):
    print('%s is playing 1' % name)
    gevent.sleep(4)
    print('%s is playing 2' % name)


gevent.joinall(
    [gevent.spawn(eat,'egon'),
    gevent.spawn(play,'egon')]
)

 

基於gevent實現單線程下的併發。

若是開多個進程,每一個進程裏開多個線程,每一個線程裏再開協程,會大大提高效率。

server端
from gevent import monkey,spawn;monkey.patch_all()
from socket import *


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server(ip,port): # 來一個客戶端,起一個conn
    server = socket(AF_INET, SOCK_STREAM)
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn,addr = server.accept()
        spawn(talk,conn)
    server.close()

if __name__ == '__main__':
    g = spawn(server,'127.0.0.1',8087)
    g.join()

client端:
from socket import *
from threading import Thread,currentThread
def client():
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(('127.0.0.1',8087))

    while True:
        client.send(('%s say hello'%currentThread().getName()).encode('utf8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))

if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=client)
        t.start()
相關文章
相關標籤/搜索