帶你簡單瞭解python協程和異步

帶你簡單瞭解python的協程和異步

前言

對於學習異步的出發點,是寫爬蟲。從簡單爬蟲到學會了使用多線程爬蟲以後,在翻看別人的博客文章時偶爾會看到異步這一說法。而對於異步的瞭解實在困擾了我很久很久,看了N遍廖雪峯python3協程和異步的文章,一直都是隻知其一;不知其二,也學不會怎麼使用異步來寫爬蟲。因而翻看了其餘關於異步的文章,才慢慢了解python的異步機制並學會使用,可是沒看到有特別全面的文章,因此在參考別人的文章基礎上,加上了本身的理解,寫了出來,也算是本身的一個小總結。html

一.認識生成器

生成器的產生其實比較容易理解,例如當咱們要建立了0到1000000這樣一個很大的列表但同時咱們只須要取出部分數據,這樣的須要並很多見,而顯然這種作法浪費了大量的內存空間。而生成器的做用就是爲了解決上述的問題,利用生成器咱們只須要可以保持一個整數的內存便可遍歷數組列表。生成器的使用是經過yield實現,看下面代碼樣例。python

def l_range(num):
    index = 0
    while index < num:
        yield index    # (1)
        index += 1

l = l_range(5)
print(next(l))    #0
print(next(l))    #1
print(next(l))    #2

不少人會混淆yield和send(後面會提到)的使用,上面的代碼中 yield index,配合next(l)的使用。簡單能夠這樣理解,函數l_range的while循環中,每次程序運行到(1)處都"暫停"了,向調用函數處返回index參數,注意此時並無執行(1)這條語句!!!而每調用一次next(l)循環就會執行一次,而當index>num的時候,倘若再調用next(l),由於此時已經跳出了while循環,yield不會再執行,因此會拋出異常。
除了使用next()調用生成器,可是實際上還能夠用for循環遍歷,可知生成器也是可迭代對象。編程

for i in l_range(5):
    print(i)

明白了「暫停」的概念,生成器就變得很是好理解了!數組

二.認識協程

從上面的demo中,咱們能夠得知生成器的引入使得函數的調用可以「暫停」而且向外傳遞數據,既然能夠向外傳遞數據,那麼是否可以向函數裏傳遞數據呢?生成器send的引入就是爲了實現這個需求!send可以從生成器(函數)調用處傳遞數據到yield處。
來看下面這個demo。網絡

def jumping_range(up_to):
    index = 0
    while index < up_to:
        jump = yield index    # (1)
        # print('index = %s, jump = %s' % (index, jump))
        if jump is None:
            jump = 1
        index += jump

iterator = jumping_range(5)
print(next(iterator))         #0
print(iterator.send(2))        #2
print(next(iterator))         #3
print(iterator.send(-1))      #2
print(next(iterator))         #3
print(next(iterator))         #4

下面解釋下每個輸出,當第一次next(iterator),程序執行到(1)處,可是未執行,只是把index傳遞出去,因此此時輸出的是0(index=0)。接着執行iterator.send(2),這裏把2從調用處傳遞給了生成器裏並賦值給jump,注意yield index是傳遞index參數出去,而jump=yield是把參數傳遞進去給jump!!!而後執行完while的第一次循環回到(1),此時index 執行了一次 index+=jump,而且jump=2。因此iterator.send(2)的輸出是2!然後面的輸出請各位獨自推算一下,若實在想不通能夠嘗試在生成器中print一下各參數出來,方便理解。
要搞明白協程,對於這句代碼的理解尤其重要。session

jump = yield index

其實意思上能夠理解爲多線程

jump = yield
yield index

即 jump接受從外面傳遞進來的參數,而index則是要傳遞出去的參數。可是固然,這只是我爲了方便理解拆分出來的代碼,實際上這樣拆分會致使不一樣的結果。架構

來看看拆分出來的代碼併發

def jumping_range(up_to):
    index = 0
    while index < up_to:
        jump = yield    #(a)
        yield index    #(b)
        # print('index = %s, jump = %s' % (index, jump))
        if jump is None:
            jump = 1
        # print('jump = %d' % jump)
        index += jump


iterator = jumping_range(5)
print(next(iterator))         #None
print(iterator.send(2))        #0
print(next(iterator))         #None
print(iterator.send(-1))      #2
print(next(iterator))         #None
print(next(iterator))         #1

簡單講解上述的輸出,首先當程序執行到a(注意a處的代碼未執行),此時yield 右邊並無參數,因此第一個print返回的是None。而當執行iterator.send(2),程序在a處把2傳遞給參數jump,而後往下執行,當遇到第二個yield,程序又「暫停」了,即一個while循環裏暫停2次!而執行到b處(b處的代碼未執行)把index傳遞到出去,因此此時print返回的是0(index=0)。接着來的能夠如此類推了!app

只要明白了上述2個demo,相信對於協程已經有必定的理解了。最後再提一下yield from的使用。yield from的使用相似函數調用,做用是讓重構變得簡單,也讓你可以將生成器串聯起來,使返回值能夠在調用棧中上下浮動,不需對編碼進行過多改動。

def bottom():
    return (yield 42)


def middle():
    return (yield from bottom())


def top():
    return (yield from middle())



gen = top()
value = next(gen)
print(value)
try:
    value = gen.send(value * 2)
except StopIteration as exc:
    print(exc)
    value = exc.value
print(value)

三.認識異步

對於異步IO,就是你發起一個IO操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。而要理解異步async/await,首先要理解什麼是事件循環。
事件循環,在維基百科的解釋是「一種等待程序分配事件或消息的編程架構」。簡單的說事件循環就是「當A發生時,執行B」。對python來講,用來提供事件循環的asyncio被加入標準庫,asyncio 重點解決網絡服務中的問題,事件循環在這裏未來自套接字(socket)的 I/O 已經準備好讀和/或寫做爲「當A發生時」(經過selectors模塊)。和多線程和多進程同樣,Asyncio是併發的一種方式。但因爲GIL(全局解釋器鎖)的存在,python的多線程以及Asyncio不能帶來真正的並行。而可交給asyncio執行的任務,就是上述的協程!一個協程能夠放棄執行,把機會給其餘協程(即yield from 或await)。

1.定義協程

定義協程有2種經常使用的方式,

  • 在定義函數的時候加上async做爲前綴
  • 使用python裝飾器。

前者是python3.5的新方式,然後者是3.4的方式(3.5也可用)。

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)
@asyncio.coroutine
def do_some_work2(x):
    print("Waiting " + str(x))
    yield from asyncio.sleep(x)

這樣一來do_some_work即是一個協程,準確來講是一個協程函數,而且能夠用asyncio.iscoroutinefunction來驗證

print(asyncio.iscoroutinefunction(do_some_work)) # True

在解釋await以前,咱們先來講明一下協程能夠作什麼事

  • 等待另外一個協程
  • 產生一個結果給正在等它的協程
  • 引起一個異常給正在等它的協程

demo中asyncio.sleep()也是一個協程,await asyncio.sleep(x),顧名思義就是等待,等待asyncio.sleep(x)執行完後返回do_some_work這個協程。

2.運行協程

協程函數的調用與普通函數不一樣,要讓協程對象運行的話,經常使用的方式有2中

  • 在另外一個已經運行的協程用‘await’等待它(或者yield from)
  • 經過 ‘ensure_future’ 函數計劃它的執行

簡單來講,只有loop運行了,協程纔可能運行。因此在運行協程以前,必須先拿到當前線程缺省的loop,而後把協程對象交給loop.run_until_complete,協程對象隨後會在loop裏獲得運行。

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

run_until_complete 是一個阻塞(blocking)調用,知道調用運行結束,才返回。而它的參數是一個future,可是咱們上面傳進去的確實協程對象,之因此能夠這樣,是由於它內部作了檢查,對於協程會經過ensure_future函數把協程對象包裝(wrap)成了future。
因此咱們能夠改成:

loop.run_until_complete(asyncio.ensure_future(do_some_work(3))

上面的demo這都是用ensure_future函數計劃它的執行, 來看看使用第一種方法

tasks = [
  asyncio.ensure_future(do_some_work(1)),
  asyncio.ensure_future(do_some_work(3))
]
loop.run_until_complete(asyncio.wait(tasks))

注意: asyncio.wait自己是一個協程

3.回調

有時候當協程運行結束的時候,咱們但願獲得通知,以便判斷程序執行的狀況以及下一步數據的處理。這一需求能夠經過往future添加回調來實現。

def done_callback(cor):
    """
    協程的回調函數
    :param cor:
    :return:
    """
    print('Done')

cor = asyncio.ensure_future(do_some_work(3))
cor.add_done_callback(done_callback)
loop = asyncio.get_event_loop()
loop.run_until_complete(cor)

4.多個協程

在實際運行異步中,每每是有多個協程,同時在一個loop裏運行。因而須要使用asyncio.gather函數把多個協程交給loop。

loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

固然協程一多起來,一條語句寫起來就不方便了,能夠先把協程存在列表裏。

coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

因爲這兩個協程是併發運行的,因此等待時間並非1+3=4,而是以耗時比較長的那個。
上面也提到run_until_complete的參數是future,而gather起聚合的做用,把多個futures包裝成一個future,由於loop.run_until_complete只接受單個future。上述代碼也能夠改成:

coros = [asyncio.ensure_future(do_some_work(1)),
             asyncio.ensure_future(do_some_work(3))]
loop.run_until_complete(asyncio.gather(*coros))

5.結束協程

經常使用的結束協程的方法有2種:

  • run_until_complete
  • run_forever

run_until_complete看函數名就大概明白,便是直到全部協程工做(future)結束才返回

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')


loop = asyncio.get_event_loop()

coro = do_some_work(3)
loop.run_until_complete(coro)

輸出:
程序等待3秒鐘後輸出'Done'返回

試試改成run_forever:

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')


loop = asyncio.get_event_loop()

coro = do_some_work(3)
asyncio.ensure_future(coro)

loop.run_forever()

輸出:
程序等待3秒鐘後輸出'Done'但並無返回。
run_forever會一直運行,直到loop.stop()被調用,可是不能在run_forever後調用stop,由於run_forever永遠都不會返回,因此stop永遠都不能被調用。

loop.run_forever()
loop.stop()

正確的使用方法應該是在協程中調用stop,因此須要在協程參數中傳入loop:

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

這樣看來彷佛沒有什麼問題,可是當有多個協程在loop裏運行呢?

asyncio.ensure_future(do_some_work(loop, 1))
asyncio.ensure_future(do_some_work(loop, 3))

loop.run_forever

運行程序時會發現,只輸出了一個‘Done’程序就返回了。這說明了第二個協程尚未結束,loop就中止了,被先結束的那個協程給停掉了。要解決這個問題,能夠用gather把多個協程合併在一塊兒,經過回調的方式調用loop.stop。

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

def done_callback(loop, futu):
    loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever()

6. Close loop

對於同一個loop,只要沒有close,那麼loop還能夠繼續添加協程而且再運行。

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))

可是關閉了就不能再運行了。

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))    # 拋出異常

最後提一下yield from 和 await雖然內部機制有所不一樣,可是從做用來看基本上是同樣的,這裏就不探討具體的區別了。
另外關於asyncio.gather和asyncio.wait的區別請看StackOverflow的討論Asyncio.gather vs asyncio.wait

7.爬蟲小demo

使用asyncio異步抓取豆瓣電影top250

# -*- coding: utf-8 -*-
from lxml import etree
from time import time
import asyncio
import aiohttp

__author__ = 'lateink'

url = 'https://movie.douban.com/top250'


async def fetch_content(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()


async def parse(url):
    page = await fetch_content(url)
    html = etree.HTML(page)

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
    xpath_title = './/span[@class="title"]'
    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

    pages = html.xpath(xpath_pages)
    fetch_list = []
    result = []

    for element_movie in html.xpath(xpath_movie):
        result.append(element_movie)

    for p in pages:
        fetch_list.append(url + p.get('href'))

    tasks = [fetch_content(url) for url in fetch_list]
    pages = await asyncio.gather(*tasks)

    for page in pages:
        html = etree.HTML(page)
        for element_movie in html.xpath(xpath_movie):
            result.append(element_movie)

    for i, movie in enumerate(result, 1):
        title = movie.find(xpath_title).text
        print(i, title)


def main():
    loop = asyncio.get_event_loop()
    start = time()
    for i in range(5):
        loop.run_until_complete(parse(url))
    end = time()
    print('Cost {} seconds'.format((end - start)/5))
    loop.close()


if __name__ == '__main__':
    main()
相關文章
相關標籤/搜索