python網絡-多任務實現之協程(27)

1、協程

協程,又稱微線程,纖程。英文名Coroutine。python

協程不是進程,也不是線程,它就是一個函數,一個特殊的函數——能夠在某個地方掛起,而且能夠從新在掛起處繼續運行。因此說,協程與進程、線程相比,不是一個維度的概念。程序員

一個進程能夠包含多個線程,一個線程也能夠包含多個協程,也就是說,一個線程內能夠有多個那樣的特殊函數在運行。可是有一點,必須明確,一個線程內的多個協程的運行是串行的。若是有多核CPU的話,多個進程或一個進程內的多個線程是能夠並行運行的,可是一個線程內的多個協程卻絕對串行的,不管有多少個CPU(核)。這個比較好理解,畢竟協程雖然是一個特殊的函數,但仍然是一個函數。一個線程內能夠運行多個函數,可是這些函數都是串行運行的。當一個協程運行時,其餘協程必須掛起。服務器

通俗的理解:在一個線程中的某個函數,能夠在任何地方保存當前函數的一些臨時變量等信息,而後切換到另一個函數中執行,注意不是經過調用函數的方式作到的,而且切換的次數以及何時再切換到原來的函數都由開發者本身肯定網絡

2、yield實現協程

 1 import time
 2 
 3 def A():
 4     while True:
 5         print("----A---")
 6         yield
 7         time.sleep(0.3)
 8 
 9 def B(c):
10     while True:
11         print("----B---")
12         next(c)
13         time.sleep(0.3)
14 
15 if __name__=='__main__':
16     a = A()
17     B(a)

執行結果併發

----B---
----A---
----B---
----A---
----B---
----A---
----B---
----A---
----B---
----A---
省略。。。

代碼說明:app

第17行:調用函數B,並把a傳遞進去。執行打印B的代碼,代碼執行到next(c)時,會調用函數A,執行打印A的代碼,當代碼實行帶第6行遇到yield的實行,該協程進入等待狀態,回到原來next(c)處繼續執行,從而實現多協程的切換,經過yield關鍵字。ssh

 

3、greenlet

一、greenlet實現多任務協程異步

爲了更好使用協程來完成多任務,python中的greenlet模塊對其封裝,從而使得切換任務變的更加簡單,在使用前先要確保greenlet模塊安裝async

使用以下命令安裝greenlet模塊:函數

sudo pip install greenlet
#coding = utf-8
from greenlet import greenlet
def test1():
    print("1")
    gr2.switch()
    print("2")

def test2():
    print("3")
    gr1.switch()
    print("4")

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

運行結果:

1
3
2

當建立一個greenlet時,首先初始化一個空的棧, switch到這個棧的時候,會運行在greenlet構造時傳入的函數(首先在test1中打印 1), 若是在這個函數(test1)中switch到其餘協程(到了test2 打印3),那麼該協程會被掛起,等到切換回來(在test1切換回來 打印2)。當這個協程對應函數執行完畢,那麼這個協程就變成dead狀態。
  

注意 上面沒有打印test2的最後一行輸出 4,由於在test2中切換到gr1以後掛起,可是沒有地方再切換回來。

二、greenlet的模塊與類

咱們首先看一下greenlet這個module裏面的屬性

>>> import greenlet
>>> dir(greenlet)
['GREENLET_USE_GC', 'GREENLET_USE_TRACING', 'GreenletExit', '_C_API', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '__version__', 'error', 'getcurrent', 'gettrace', 'greenlet', 'settrace']

其中,比較重要的是getcurrent(), 類greenlet、異常類GreenletExit。

getcurrent()返回當前的greenlet實例;

GreenletExit:是一個特殊的異常,當觸發了這個異常的時候,即便不處理,也不會拋到其parent(後面會提到協程中對返回值或者異常的處理)

而後咱們再來看看greenlet.greenlet這個類:

>>>dir(greenlet.greenlet)
['GreenletExit', '__bool__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_stack_saved', 'dead', 'error', 'getcurrent', 'gettrace', 'gr_frame', 'parent', 'run', 'settrace', 'switch', 'throw']

比較重要的幾個屬性:

  run:當greenlet啓動的時候會調用到這個callable,若是咱們須要繼承greenlet.greenlet時,須要重寫該方法

  switch:前面已經介紹過了,在greenlet之間切換

  parent:可讀寫屬性,後面介紹

  dead:若是greenlet執行結束,那麼該屬性爲true

  throw:切換到指定greenlet後當即跑出異常

文章後面提到的greenlet大多都是指greenlet.greenlet這個class,請注意區別 

對於greenlet,最經常使用的寫法是 x = gr.switch(y)。 這句話的意思是切換到gr,傳入參數y。當從其餘協程(不必定是這個gr)切換回來的時候,將值付給x。

import greenlet


def test1(x, y):
    z = gr2.switch(x + y)
    print("test1:%s" % z)


def test2(a):
    print('test2:%s' % a)
    gr1.switch(10)


gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print(gr1.switch("Hello", "World"))

運行結果爲:

test2:HelloWorld
test1:10
None

上面的例子,第10行從main greenlet切換到了gr1,test1第3行切換到了gs2,而後gr1掛起,第7行從gr2切回gr1時,將值(10)返回值給了 z。 

三、greenlet生命週期

 文章開始的地方提到第一個例子中的gr2其實並無正常結束,咱們能夠借用greenlet.dead這個屬性來查看

運行結果爲:

 1 import greenlet
 2 
 3 
 4 def test1():
 5     gr2.switch(1)
 6     print("test1: finished")
 7 
 8 
 9 def test2(x):
10     print("test2:first %s" % x)
11     gr1.switch()
12     print("test2:back")
13 
14 gr1 = greenlet.greenlet(test1)
15 gr2 = greenlet.greenlet(test2)
16 gr1.switch()
17 print("gr1 is dead? : %s, gr2 is dead? :%s" % (gr1.dead, gr2.dead))
18 gr2.switch()
19 print("gr1 is dead? : %s, gr2 is dead? :%s" % (gr1.dead, gr2.dead))

運行結果爲:

test2:first 1
test1: finished
gr1 is dead? : True, gr2 is dead? :False
test2:back
gr1 is dead? : True, gr2 is dead? :True

只有當協程對應的函數執行完畢,協程纔會die,因此第一次Check的時候gr2並無die,由於第12行切換出去了就沒切回來。在main中再switch到gr2的時候, 執行後面的邏輯,gr2 die

四、greenlet注意事項

使用greenlet須要注意一下三點:

  第一:greenlet創生以後,必定要結束,不能switch出去就不回來了,不然容易形成內存泄露

  第二:python中每一個線程都有本身的main greenlet及其對應的sub-greenlet ,不能線程之間的greenlet是不能相互切換的

  第三:不能存在循環引用,這個是官方文檔明確說明

 1 from greenlet import greenlet, GreenletExit
 2 huge = []
 3 def show_leak():
 4     def test1():
 5         gr2.switch()
 6 
 7     def test2():
 8         huge.extend([x* x for x in range(100)])
 9         gr1.switch()
10         print 'finish switch del huge'
11         del huge[:]
12     
13     gr1 = greenlet(test1)
14     gr2 = greenlet(test2)
15     gr1.switch()
16     gr1 = gr2 = None
17     print 'length of huge is zero ? %s' % len(huge)
18 
19 if __name__ == '__main__':
20     show_leak() 

在test2函數中 第11行,咱們將huge清空,而後再第16行將gr一、gr2的引用計數降到了0。但運行結果告訴咱們,第11行並無執行,因此若是一個協程沒有正常結束是很危險的,每每不符合程序員的預期。greenlet提供瞭解決這個問題的辦法,官網文檔提到:若是一個greenlet實例的引用計數變成0,那麼會在上次掛起的地方拋出GreenletExit異常,這就使得咱們能夠經過try ... finally 處理資源泄露的狀況。以下面的代碼: 

1 from greenlet import greenlet, GreenletExit
 2 huge = []
 3 def show_leak():
 4     def test1():
 5         gr2.switch()
 6 
 7     def test2():
 8         huge.extend([x* x for x in range(100)])
 9         try:
10             gr1.switch()
11         finally:
12             print 'finish switch del huge'
13             del huge[:]
14     
15     gr1 = greenlet(test1)
16     gr2 = greenlet(test2)
17     gr1.switch()
18     gr1 = gr2 = None
19     print 'length of huge is zero ? %s' % len(huge)
20 
21 if __name__ == '__main__':
22     show_leak()

上述代碼的switch流程:main greenlet --> gr1 --> gr2 --> gr1 --> main greenlet, 很明顯gr2沒有正常結束(在第10行颳起了)。第18行以後gr1,gr2的引用計數都變成0,那麼會在第10行拋出GreenletExit異常,所以finally語句有機會執行。同時,在文章開始介紹Greenlet module的時候也提到了,GreenletExit這個異常並不會拋出到parent,因此main greenlet也不會出異常。

4、gevent

greenlet已經實現了協程,可是這個還的人工切換,是否是以爲太麻煩了,不要捉急,python還有一個比greenlet更強大的而且可以自動切換任務的模塊gevent

其原理是當一個greenlet遇到IO(指的是input output 輸入輸出,好比網絡、文件操做等)操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。

因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO

import gevent


def f():
    for i in range(5):
        print("%s:%d"%(gevent.getcurrent(),i))


g1 = gevent.spawn(f)
g2 = gevent.spawn(f)
g3 = gevent.spawn(f)
g1.join()
g2.join()
g3.join()

運行結果爲:

<Greenlet at 0x1ba533f9598: f(5)>:0
<Greenlet at 0x1ba533f9598: f(5)>:1
<Greenlet at 0x1ba533f9598: f(5)>:2
<Greenlet at 0x1ba533f9598: f(5)>:3
<Greenlet at 0x1ba533f9598: f(5)>:4
<Greenlet at 0x1ba533f97b8: f(5)>:0
<Greenlet at 0x1ba533f97b8: f(5)>:1
<Greenlet at 0x1ba533f97b8: f(5)>:2
<Greenlet at 0x1ba533f97b8: f(5)>:3
<Greenlet at 0x1ba533f97b8: f(5)>:4
<Greenlet at 0x1ba533f99d8: f(5)>:0
<Greenlet at 0x1ba533f99d8: f(5)>:1
<Greenlet at 0x1ba533f99d8: f(5)>:2
<Greenlet at 0x1ba533f99d8: f(5)>:3
<Greenlet at 0x1ba533f99d8: f(5)>:4

能夠看到,3個greenlet是依次運行而不是交替運行

gevent的切換執行

import gevent


def f():
    for i in range(5):
        print("%s:%d"%(gevent.getcurrent(),i))
        gevent.sleep(0)


g1=gevent.spawn(f)
g2=gevent.spawn(f)
g3=gevent.spawn(f)
g1.join()
g2.join()
g3.join()

執行結果爲:

<Greenlet at 0x20a5e719598: f>:0
<Greenlet at 0x20a5e7197b8: f>:0
<Greenlet at 0x20a5e7199d8: f>:0
<Greenlet at 0x20a5e719598: f>:1
<Greenlet at 0x20a5e7197b8: f>:1
<Greenlet at 0x20a5e7199d8: f>:1
<Greenlet at 0x20a5e719598: f>:2
<Greenlet at 0x20a5e7197b8: f>:2
<Greenlet at 0x20a5e7199d8: f>:2
<Greenlet at 0x20a5e719598: f>:3
<Greenlet at 0x20a5e7197b8: f>:3
<Greenlet at 0x20a5e7199d8: f>:3
<Greenlet at 0x20a5e719598: f>:4
<Greenlet at 0x20a5e7197b8: f>:4
<Greenlet at 0x20a5e7199d8: f>:4

3個greenlet交替運行

gevent.spawn 啓動協程,參數爲函數名稱,參數名稱

三、gevent併發下載器

monkey可使一些阻塞的模塊變得不阻塞,機制:遇到IO操做則自動切換,手動切換能夠用gevent.sleep(0)

from gevent import monkey
import gevent
import urllib.request


#有I/O時須要這一句,若是沒有這句話就會有阻塞狀態,加上就沒有阻塞
monkey.patch_all()


def myDownLoad(url):
    print("GET:%s"%url)
    resp = urllib.request.urlopen(url)
    data = resp.read()
    print("%d bytes received from %s"%(len(data),url))


gevent.joinall((
    gevent.spawn(myDownLoad,"http://www.baidu.com/"),
    gevent.spawn(myDownLoad,"https://apple.com"),
    gevent.spawn(myDownLoad,"https://www.cnblogs.com/Se7eN-HOU/")
))

運行結果爲:

GET:http://www.baidu.com/
GET:https://apple.com
GET:https://www.cnblogs.com/Se7eN-HOU/
153390 bytes received from http://www.baidu.com/
18880 bytes received from https://www.cnblogs.com/Se7eN-HOU/
58865 bytes received from https://apple.com

從上可以看到是先發送的獲取baidu的相關信息,而後依次是apple,cnblogs可是收到數據的前後順序不必定與發送順序相同,這也就體現出了異步,即不肯定何時會收到數據,順序不必定.

上面若是沒有下面這句代碼,

#有I/O時須要這一句,若是沒有這句話就會有阻塞狀態,加上就沒有阻塞
monkey.patch_all()

執行結果以下

GET:http://www.baidu.com/
153378 bytes received from http://www.baidu.com/
GET:https://apple.com
58865 bytes received from https://apple.com
GET:https://www.cnblogs.com/Se7eN-HOU/
18880 bytes received from https://www.cnblogs.com/Se7eN-HOU/

每請求一個網站就會等着請求完畢了在執行第二個,在請求的過程當中,網速慢等待的狀態就是在阻塞。

5、asyncio

咱們都知道,如今的服務器開發對於IO調度的優先級控制權已經再也不依靠系統,都但願採用協程的方式實現高效的併發任務,如js、lua等在異步協程方面都作的很強大。

Python在3.4版本也加入了協程的概念,並在3.5肯定了基本完善的語法和實現方式。同時3.6也對其進行了如解除了await和yield在同一個函數體限制等相關的優化。

event_loop 事件循環:程序開啓一個無限的循環,程序員會把一些函數註冊到事件循環上。當知足事件發生的時候,調用相應的協程函數。
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。
task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含任務的各類狀態。
future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質的區別
async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

一、建立協程

首先定義一個協程,在def前加入async聲明,就能夠定義一個協程函數。

一個協程函數不能直接調用運行,只能把協程加入到事件循環loop中。asyncio.get_event_loop方法能夠建立一個事件循環,而後使用run_until_complete將協程註冊到事件循環,並啓動事件循環。

例如:

import asyncio


async def fun():
    print("---協程中---")

def main():
    print("---主線程中---")

    loop = asyncio.get_event_loop()
    loop.run_until_complete(fun())

if __name__ == "__main__":
    main()

運行結果:

---主線程中---
---協程中---

2、任務對象task

協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。所謂task對象是Future類的子類。保存了協程運行後的狀態,用於將來獲取協程的結果。

例如:

import asyncio


async def fun():
    print("---協程中---")
    return "Se7eN_HOU"

def main():
    print("---主線程中---")

    loop = asyncio.get_event_loop()
    #建立task
    task = loop.create_task(fun())
    print(task)
    loop.run_until_complete(task)
    print(task)

if __name__ == "__main__":
    main()

運行結果爲:

---主線程中---
<Task pending coro=<fun() running at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4>>
---協程中---
<Task finished coro=<fun() done, defined at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> result='Se7eN_HOU'>

建立task後,task在加入事件循環以前是pending狀態,由於fun()中沒有耗時的阻塞操做,task很快就執行完畢了。後面打印的finished狀態。
asyncio.ensure_future 和 loop.create_task均可以建立一個task,run_until_complete的參數是一個futrue對象。

 3、綁定回調

import asyncio

#協程
async def fun():
    print("---協程中---")
    return "Se7eN_HOU"

#協程的回調函數
def callback(future):
    #future.result是協程的返回值
    print("callBack:%s"%future.result())


def main():
    print("---主線程中---")
    #建立loop迴路
    loop = asyncio.get_event_loop()
    #建立task
    task = loop.create_task(fun())
    #調用回調函數
    task.add_done_callback(callback)
    print(task)
    loop.run_until_complete(task)
    print(task)

if __name__ == "__main__":
    main()

運行結果爲:

---主線程中---
<Task pending coro=<fun() running at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> cb=[callback() at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:9]>
---協程中---
callBack:Se7eN_HOU
<Task finished coro=<fun() done, defined at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> result='Se7eN_HOU'>

也可使用ensure_future獲取返回值

例如:

import asyncio

#協程
async def fun():
    print("---協程中---")
    return "Se7eN_HOU"

#協程的回調函數
#def callback(future):
    #future.result是協程的返回值
    #print("callBack:%s"%future.result())


def main():
    #建立loop迴路
    loop = asyncio.get_event_loop()
    #建立task
    #task = loop.create_task(fun())
    #調用回調函數
    #task.add_done_callback(callback)
    task = asyncio.ensure_future(fun())
    loop.run_until_complete(task)
    print("fun函數的返回值是:%s"%format(task.result()))

if __name__ == "__main__":
    main()

運行結果爲:

---協程中---
fun函數的返回值是:Se7eN_HOU

4、await阻塞,執行併發

使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行。
耗時的操做通常是一些IO操做,例如網絡請求,文件讀取等。咱們使用asyncio.sleep函數來模擬IO操做。協程的目的也是讓這些IO操做異步化。

例如:

import asyncio


async def test1():
    print("---1---")
    await asyncio.sleep(5)    
    print("---2---")


async def test2():
    print("---3---")
    await asyncio.sleep(1)
    print("---4---")


async def test3():
    print("---5---")
    await asyncio.sleep(3)
    print("---6---")

def main():
    loop = asyncio.get_event_loop()
    print("begin")

    t1 = test1()
    t2 = test2()
    t3 = test3()
    tasks1 = [t1,t2,t3]


    loop.run_until_complete(asyncio.wait(tasks1))
    print("end")
    loop.close()

if __name__=="__main__":
    main()

運行結果爲:

begin
---3---
---1---
---5---
---4---
---6---
---2---
end
相關文章
相關標籤/搜索