從零開始學asyncio(上)

  這篇文章主要是介紹生成器和IO多路複用機制, 算是學習asyncio須要的預備知識. 這個系列還有另外兩篇文章:html

一. 簡單爬蟲實例

  首先建立一個crawler.py文件, 寫入如下代碼:python

import socket


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


def simple_crawler():
    sock = socket.socket()
    sock.connect(address)
    sock.send(req)
    response = b''
    while 1:
        chunk = sock.recv(1024)
        if chunk == b'':
            sock.close()
            break
        else:
            response += chunk
    db.append(response)


if __name__ == '__main__':
    print('開始爬取...')
    simple_crawler()
    print('獲取到{}條數據'.format(len(db)))

運行crawler.py文件, 結果以下:服務器

其中, simple_crawler函數作了以下幾件事:網絡

  1. 建立一個socket對象
  2. 鏈接服務器
  3. 向服務器發送http請求
  4. 接收服務端的響應內容
  5. 處理和儲存響應內容

經過這五個步驟, 咱們實現了一個最基本的爬蟲實例.併發

這裏的請求之因此使用HTTP1.0協議, 是由於HTTP1.0默認不是長鏈接, 服務器在發送完數據後會本身斷開. 所以當socket接收到空字節的時候, 就說明服務器已經斷開了, 也就是說數據已經接收完了.
若是要使用HTTP1.1協議, 那麼在請求頭中加上Connection:close就行.
補充說明

二. IO操做

1.  爬蟲實例中的耗時操做

   首先測試一下simple_crawler獲取一次數據的用時:app

import time
print('開始爬取...')
start=time.time()
simple_crawler()
print('獲取到{}條數據'.format(len(db)))
print('本次用時:{:.2f}秒'.format(time.time()-start))

運行幾回crawler.py文件, 結果以下:socket

相比計算機的計算速度而言, 這段代碼的運行速度是至關慢的, 若是如今須要獲取100個數據, 那麼就須要大約三分半鐘的時間.async

  如今修改一下crawler.py的代碼, 看看各個步驟的執行時間:ide

import socket
import time


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


def simple_crawler():
    print('開始運行',time.time())
    sock = socket.socket()
    print('已建立socket對象',time.time())
    sock.connect(address)
    print('已鏈接服務器',time.time())
    sock.send(req)
    print('已發送請求',time.time())
    response = b''
    while 1:
        chunk = sock.recv(1024)
        if chunk == b'':
            sock.close()
            break
        else:
            response += chunk
    print('已接收響應',time.time())
    db.append(response)
    print('已處理響應',time.time())


if __name__ == '__main__':
    simple_crawler()

代碼運行結果以下:函數

 能夠看到, 在這個程序中, 建立socket對象, 發送http請求, 處理響應結果, 基本都是不耗時的, 耗時操做在於鏈接服務器和接收響應. 

socket對象的send方法只是將數據寫入到內核態, 由系統將數據發送到服務器. 所以, 若是socket對應的內核位置的可寫緩衝區還沒裝滿, 而且還能裝下本次send的數據, 就不會阻塞, 不然, send操做也會是阻塞的.
補充說明

2. 阻塞IO

  如今運行下面一段代碼:

input('按回車退出>>>')
exit()

顯然, 若是不按回車或者ctrl+c, 程序就會一直卡在input這一行. 在這段時間, 程序沒有作任何事, 只是單純地等待用戶按回車而已, 就像下面這張圖:

IO的全稱是input/output, 即向/從計算機傳輸數據的操做, 在針對文件和網絡操做中比較常見. 其特色是須要花費必定的等待時間才能完成操做, 上一節的代碼中, sock.connect和sock.recv就是IO操做, 花費了大量的時間在等待服務器響應上, 所以用時較長.

  通常狀況下, 這些基本的IO操做是阻塞式的, 也就是程序會卡在等待的期間, 直到IO操做完成. 好比input語句, 在用戶按下回車以前, 程序處於'死機'狀態.

3. 非阻塞IO

  如今運行以下代碼:

import socket
import time


sock = socket.socket()
sock.setblocking(0)
print('開始鏈接服務器', time.time())
try:
    sock.connect(('cn.bing.com', 80))
except BlockingIOError:
    pass
print('完成鏈接服務器', time.time())

而後運行:

 能夠看到, 本來耗時的鏈接操做變得不耗時了.

  調用socket對象的setblocking方法, 傳入False, 就能夠將這個socket對象設置爲非阻塞式的, 這時再調用該對象涉及到IO操做的方法, 程序將不會阻塞, 但若是操做不能當即完成, 就會拋出異常.

如今將剛纔寫的爬蟲改成非阻塞的形式:

import socket


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


def noblocking_crawler():
    sock = socket.socket()
    sock.setblocking(0)
    # connect_ex與connect相似,但在這種狀況下不會拋出異常,而是返回錯誤碼
    # 所以,這裏使用connect_ex來省略一個try語句
    sock.connect_ex(address)
    while 1:
        try:
            sock.send(req)
            break
        except OSError:
            pass
    response = b''
    while 1:
        try:
            chunk = sock.recv(1024)
            if chunk == b'':
                sock.close()
                break
            else:
                response += chunk
        except BlockingIOError:
            pass

    db.append(response)


if __name__ == '__main__':
    print('開始爬取...')
    noblocking_crawler()
    print('獲取到{}條數據'.format(len(db)))

非阻塞式IO並不是意味着不須要等待時間, 而是說程序不會卡在這裏, 但這並不表明IO操做的等待時間會消失. 所以, 在使用connect方法以後, 須要在while循環中一直重複send, 若是捕獲到OSError異常, 就說明尚未鏈接成功, 也就是IO操做還未結束, 因而繼續循環, 直到IO結束爲止. 這一部分的流程以下:

 recv方法同理.

對函數的運行時間進行測試, 會發現耗時並無減小, 這是由於IO操做中的等待時間並不會消失. 所以, 單純將程序設置爲非阻塞並不能提升效率, 只有利用等待時間執行其它任務, 程序的總體效率纔會提升.

三. 生成器

  在上一節中, 非阻塞IO之因此沒有體現出優點, 是由於沒有利用好IO操做的等待時間去執行其餘程序. 假如如今有ABC三個任務, 而有一種機制, 能讓任務A遇到IO操做時, 切換到任務B, 任務B遇到IO操做時, 再切換到任務C, 最後就能夠充分利用IO操做的等待時間, 從而提高程序的總體運行效率.

  定義一個以下函數:

def gen():
    print('這裏是gen函數內部, 如今執行step1')
    yield
    print('這裏是gen函數內部, 如今執行step2')
    yield
    print('這裏是gen函數內部, 如今執行step3')
    return

 如今查看這個函數的返回值:

g = gen()
print(type(g))

結果以下:

  在函數中加入yield語句後, 調用這個函數, 函數內的語句就不會執行, 而是返回一個generator對象, 即生成器.

  若是想執行這個函數內部的語句, 能夠調用python內置的next函數對生成器進行驅動:

g = gen()
for i in range(1, 4):
    print('這裏是gen函數外部,如今是第%s次驅動生成器' % i)
    next(g)

結果以下:

 

 對於生成器, 在外部調用next對其驅動, 就能執行其內部的代碼, 若是執行到yield語句, 就會切換回外部, 下次再驅動, 會從上次結束的地方繼續. 程序的執行流程以下:

只要調用next函數驅動生成器, 程序就會切換到生成器的內部, 從上次停下來的位置開始繼續運行, 運行過程當中若是遇到yield語句, 再切換回調用next函數的位置. 所以, 使用next和yield, 就能夠方便地在不一樣程序中來回切換. 須要注意的是, 若是生成器內部的程序執行結束, 會拋出StopIteration異常.

  這樣看來, 生成器就知足了咱們的需求: 即在不一樣的程序之間切換, 對於一個任務, 在IO操做的時候使用yield語句切換到其它任務, 而後在特定時間再用next函數切換回來, 這樣就能利用IO操做的等待時間.

yield語句除了能暫停程序的執行外, 它仍是個生成器內部與外部的雙向通道.
須要向外部傳值時, yield的用法等於return;
若是要向生成器內部傳值, 那麼就在生成器內部寫成a=yield的形式, 而後在外部調用生成器的send方法將值傳給a(此方法同時會驅動生成器)
舉個例子:
def gen():
    first_sentence = '天王蓋地虎'
    second_sentence = yield first_sentence
    print('生成器從外部獲取的值:', second_sentence)
    yield


g = gen()
first_sentence = next(g)
print('外部從生成器獲取的值:', first_sentence)
g.send('小雞燉蘑菇')
有關python生成器的更多內容, 能夠參考https://www.python.org/dev/peps/pep-0342/
補充說明

四. IO多路複用

  程序之間切換的問題解決了, 如今的問題是, IO操做的等待時間是不肯定的, 若是在操做還未結束的時候, 就調用next對生成器進行驅動, 好比還沒鏈接成功時就調用send語句, 顯然得不到想要的結果. 所以, 須要一種機制, 可以在IO操做完成的時候進行通知, 這時候再驅動生成器進行後續的操做.

  使用python自帶的select模塊能夠對多個socket對象進行監聽, 當觸發到可讀, 可寫或者錯誤事件時, 返回觸發事件的socket對象列表.

基於IO多路複用和生成器等功能寫的爬蟲代碼以下:

import select
import socket
import time


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


class GenCrawler:

    '''
    這裏使用一個類將生成器封裝起來,若是要驅動生成器,就調用next_step方法
    另外,這個類還能夠獲取到使用的socket對象
    '''

    def __init__(self):
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self._gen = self._crawler()

    def next_step(self):
        next(self._gen)

    def _crawler(self):
        self.sock.connect_ex(address)
        yield
        self.sock.send(req)
        response = b''
        while 1:
            yield
            chunk = self.sock.recv(1024)
            if chunk == b'':
                self.sock.close()
                break
            else:
                response += chunk
        db.append(response)


def event_loop(crawlers):
    # 首先,創建sock與crawler對象的映射關係,便於由socket對象找到對應的crawler對象
    # 創建映射的同時順便調用crawler的next_step方法,讓內部的生成器運行起來
    sock_to_crawler = {}
    for crawler in crawlers:
        sock_to_crawler[crawler.sock] = crawler
        crawler.next_step()

    # select.select須要傳入三個列表,分別對應要監聽的可讀,可寫和錯誤事件的socket對象集合
    readable = []
    writeable = [crawler.sock for crawler in crawlers]
    errors = []
    while 1:
        rs, ws, es = select.select(readable, writeable, errors)
        for sock in ws:
            # 當socket對象鏈接到服務器時,會建立可讀緩衝區和可寫緩衝區
            # 因爲可寫緩衝區建立時爲空,所以鏈接成功時,就觸發可寫事件
            # 這時再轉爲監聽可讀事件,接收到數據時,就能夠觸發可讀事件了
            writeable.remove(sock)
            readable.append(sock)
            sock_to_crawler[sock].next_step()
        for sock in rs:
            try:
                sock_to_crawler[sock].next_step()
            except StopIteration:
                # 若是生成器結束了,就說明對應的爬蟲任務已經結束,不須要監聽事件了
                readable.remove(sock)
        # 全部的事件都結束後,就退出循環
        if not readable and not writeable:
            break


if __name__ == '__main__':
    start = time.time()
    n = 10
    print('開始爬取...')
    event_loop([GenCrawler() for _ in range(n)])
    print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))

  首先看看Crawler._crawler部分的代碼, 在調用connect_ex方法以後, 程序並不能肯定何時能鏈接到服務器, 在調用recv方法以前, 程序也不能肯定何時能收到服務器的數據, 所以, 在這兩個位置插入yield語句, 來使程序掛起. 這樣, 一個基於生成器的爬蟲程序就作好了.

  而後是event_loop部分, 首先, 因爲select監聽到事件後, 返回的是socket對象, 所以先創建一個socket對象映射crawler對象的字典, 這樣當監聽到事件時, 就能夠立刻找到對應的crawler並對其驅動. 映射創建後, 就能夠在while循環中持續監聽socket對象, 監聽到結果時, 就驅動對應的crawler, 直到全部的爬蟲任務都結束爲止.

  在程序末尾分別設置n=1以及 n=10, 運行程序, 結果以下 :

 n=1

 n=10

 程序的執行流程以下:

 

  event_loop負責對多個爬蟲任務進行調度, 在這個流程圖中,  首先監聽到某個事件, 因而驅動對應的crawler2, 而crawler2遇到IO操做後, 就使用yield掛起本身, 在crawlerr2的IO操做結束以前, event_loop又能夠去驅動crawler1, 不一樣的crawler任務和event_loop穿插運行, 減小了IO操做中的時間浪費.

五. 總結

  • IO在對文件和網絡的操做中較常見. 特色是須要花費必定的等待時間才能完成操做;
  • 在函數中加入yield關鍵字, 這個函數就可以返回一個生成器. 生成器的特色是運行到yield時會暫停, 而調用next函數由能夠將其繼續驅動;
  • IO多路複用機制能夠同時監聽多個socket對象.  在本文最後的實例中, 使用IO多路複用機制監聽socket對象, 觸發到事件時, 驅動對應的生成器運行, 當生成器運行到IO操做時, 再使用yield語句切換回事件監聽, 這樣一方面利用了IO操做中的等待時間, 提升的運行效率, 一方面實現了多個任務併發的效果.
相關文章
相關標籤/搜索