【Python】【容器 | 迭代對象 | 迭代器 | 生成器 | 生成器表達式 | 協程 | 期物 | 任務】

Python 的 asyncio 相似於 C++ 的 Boost.Asio。html

所謂「異步 IO」,就是你發起一個 IO 操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。java

Asyncio 是併發(concurrency)的一種方式。對 Python 來講,併發還能夠經過線程(threading)和多進程(multiprocessing)來實現。python

Asyncio 並不能帶來真正的並行(parallelism)。固然,由於 GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的並行。linux

可交給 asyncio 執行的任務,稱爲協程(coroutine)。一個協程能夠放棄執行,把機會讓給其它協程(即 yield from 或 await)。nginx

定義協程 協程的定義,須要使用 async def 語句。git

? code 1 async def do_some_work(x): pass do_some_work 即是一個協程。 準確來講,do_some_work 是一個協程函數,能夠經過 asyncio.iscoroutinefunction 來驗證:程序員

? code 1 print(asyncio.iscoroutinefunction(do_some_work)) # True 這個協程什麼都沒作,咱們讓它睡眠幾秒,以模擬實際的工做量 :github

? code 1 2 3 async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) 在解釋 await 以前,有必要說明一下協程能夠作哪些事。協程能夠:web

? code 1 2 3 4 5 * 等待一個 future 結束redis

  • 等待另外一個協程(產生一個結果,或引起一個異常)
  • 產生一個結果給正在等它的協程
  • 引起一個異常給正在等它的協程 asyncio.sleep 也是一個協程,因此 await asyncio.sleep(x) 就是等待另外一個協程。可參見 asyncio.sleep 的文檔:

? code 1 2 sleep(delay, result=None, *, loop=None) Coroutine that completes after a given time (in seconds). 運行協程 調用協程函數,協程並不會開始運行,只是返回一個協程對象,能夠經過 asyncio.iscoroutine 來驗證:

? code 1 print(asyncio.iscoroutine(do_some_work(3))) # True 此處還會引起一條警告:

? code 1 2 async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited print(asyncio.iscoroutine(do_some_work(3))) 要讓這個協程對象運行的話,有兩種方式:

? code 1 2 3 * 在另外一個已經運行的協程中用 await 等待它

  • 經過 ensure_future 函數計劃它的執行 簡單來講,只有 loop 運行了,協程纔可能運行。 下面先拿到當前線程缺省的 loop ,而後把協程對象交給 loop.run_until_complete,協程對象隨後會在 loop 裏獲得運行。

? code 1 2 loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3)) run_until_complete 是一個阻塞(blocking)調用,直到協程運行結束,它才返回。這一點從函數名不難看出。 run_until_complete 的參數是一個 future,可是咱們這裏傳給它的倒是協程對象,之因此能這樣,是由於它在內部作了檢查,經過 ensure_future 函數把協程對象包裝(wrap)成了 future。因此,咱們能夠寫得更明顯一些:

? code 1 loop.run_until_complete(asyncio.ensure_future(do_some_work(3))) 完整代碼:

? code 1 2 3 4 5 6 7 8 import asyncio

async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)

loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3)) 運行結果:

? code 1 2 Waiting 3 <三秒鐘後程序結束> 回調 假如協程是一個 IO 的讀操做,等它讀完數據後,咱們但願獲得通知,以便下一步數據的處理。這一需求能夠經過往 future 添加回調來實現。

? code 1 2 3 4 5 6 7 def done_callback(futu): print('Done')

futu = asyncio.ensure_future(do_some_work(3)) futu.add_done_callback(done_callback)

loop.run_until_complete(futu) 多個協程 實際項目中,每每有多個協程,同時在一個 loop 裏運行。爲了把多個協程交給 loop,須要藉助 asyncio.gather 函數。

? code 1 loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3))) 或者先把協程存在列表裏:

? code 1 2 coros = [do_some_work(1), do_some_work(3)] loop.run_until_complete(asyncio.gather(*coros)) 運行結果:

? code 1 2 3 4 Waiting 3 Waiting 1 <等待三秒鐘> Done 這兩個協程是併發運行的,因此等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程爲準。

參考函數 gather 的文檔:

gather(*coros_or_futures, loop=None, return_exceptions=False) Return a future aggregating results from the given coroutines or futures.

發現也能夠傳 futures 給它:

? code 1 2 3 4 futus = [asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3))]

loop.run_until_complete(asyncio.gather(*futus)) gather 起聚合的做用,把多個 futures 包裝成單個 future,由於 loop.run_until_complete 只接受單個 future。

run_until_complete 和 run_forever 咱們一直經過 run_until_complete 來運行 loop ,等到 future 完成,run_until_complete 也就返回了。

? code 1 2 3 4 5 6 7 8 9 async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3) loop.run_until_complete(coro) 輸出:

? code 1 2 3 4 Waiting 3 <等待三秒鐘> Done <程序退出> 如今改用 run_forever:

? code 1 2 3 4 5 6 7 8 9 10 11 async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3) asyncio.ensure_future(coro)

loop.run_forever() 輸出:

? code 1 2 3 4 Waiting 3 <等待三秒鐘> Done <程序沒有退出> 三秒鐘事後,future 結束,可是程序並不會退出。run_forever 會一直運行,直到 stop 被調用,可是你不能像下面這樣調 stop:

? code 1 2 loop.run_forever() loop.stop() run_forever 不返回,stop 永遠也不會被調用。因此,只能在協程中調 stop:

? code 1 2 3 4 5 async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop.stop() 這樣並不是沒有問題,假若有多個協程在 loop 裏運行:

? code 1 2 3 4 asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3))

loop.run_forever() 第二個協程沒結束,loop 就中止了——被先結束的那個協程給停掉的。 要解決這個問題,能夠用 gather 把多個協程合併成一個 future,並添加回調,而後在回調裏再去中止 loop。

? code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')

def done_callback(loop, futu): loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3)) futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever() 其實這基本上就是 run_until_complete 的實現了,run_until_complete 在內部也是調用 run_forever。

Close Loop? 以上示例都沒有調用 loop.close,好像也沒有什麼問題。因此到底要不要調 loop.close 呢? 簡單來講,loop 只要不關閉,就還能夠再運行。:

? code 1 2 3 loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close() 可是若是關閉了,就不能再運行了:

? code 1 2 3 loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常 建議調用 loop.close,以完全清理 loop 對象防止誤用。

gather vs. wait asyncio.gather 和 asyncio.wait 功能類似。

? code 1 2 coros = [do_some_work(loop, 1), do_some_work(loop, 3)] loop.run_until_complete(asyncio.wait(coros)) 具體差異可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。

Timer C++ Boost.Asio 提供了 IO 對象 timer,可是 Python 並無原生支持 timer,不過能夠用 asyncio.sleep 模擬。

? code 1 2 3 4 5 6 7 async def timer(x, cb): futu = asyncio.ensure_future(asyncio.sleep(x)) futu.add_done_callback(cb) await futu

t = timer(3, lambda futu: print('Done')) loop.run_until_complete(t) 第一部分完。

一直對asyncio這個庫比較感興趣,畢竟這是官網也很是推薦的一個實現高併發的一個模塊,python也是在python 3.4中引入了協程的概念。也經過此次整理更加深入理解這個模塊的使用

asyncio 是幹什麼的?

異步網絡操做 併發 協程 python3.0時代,標準庫裏的異步網絡模塊:select(很是底層) python3.0時代,第三方異步網絡庫:Tornado python3.4時代,asyncio:支持TCP,子進程

如今的asyncio,有了不少的模塊已經在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新

固然到目前爲止實現協程的不只僅只有asyncio,tornado和gevent都實現了相似功能

關於asyncio的一些關鍵字的說明:

event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數

coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。

task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態

future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別

async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

看了上面這些關鍵字,你可能扭頭就走了,其實一開始瞭解和研究asyncio這個模塊有種抵觸,本身也不知道爲啥,這也致使很長一段時間,這個模塊本身也基本就沒有關注和使用,可是隨着工做上用python遇到各類性能問題的時候,本身告訴本身仍是要好好學習學習這個模塊。

定義一個協程 複製代碼 import time import asyncio

now = lambda : time.time()

async def do_some_work(x): print("waiting:", x)

start = now()

這裏是一個協程對象,這個時候do_some_work函數並無執行

coroutine = do_some_work(2) print(coroutine)

建立一個事件loop

loop = asyncio.get_event_loop()

將協程加入到事件循環loop

loop.run_until_complete(coroutine)

print("Time:",now()-start) 複製代碼 在上面帶中咱們經過async關鍵字定義一個協程(coroutine),固然協程不能直接運行,須要將協程加入到事件循環loop中

asyncio.get_event_loop:建立一個事件循環,而後使用run_until_complete將協程註冊到事件循環,並啓動事件循環

建立一個task 協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象. task對象是Future類的子類,保存了協程運行後的狀態,用於將來獲取協程的結果

複製代碼 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:", x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print("Time:",now()-start) 複製代碼 結果爲:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>> waiting: 2 <Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None> Time: 0.0003514289855957031 建立task後,在task加入事件循環以前爲pending狀態,當完成後,狀態爲finished

關於上面經過loop.create_task(coroutine)建立task,一樣的能夠經過 asyncio.ensure_future(coroutine)建立task

關於這兩個命令的官網解釋: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

asyncio.ensure_future(coro_or_future, *, loop=None)¶ Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly. https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

複製代碼 AbstractEventLoop.create_task(coro) Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions. 複製代碼 綁定回調 綁定回調,在task執行完成的時候能夠獲取執行的結果,回調的最後一個參數是future對象,經過該對象能夠獲取協程返回值。

複製代碼 import time import asyncio

now = lambda : time.time()

async def do_some_work(x): print("waiting:",x) return "Done after s".format(x)

def callback(future): print("callback:",future.result())

start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) task.add_done_callback(callback) print(task) loop.run_until_complete(task)

print("Time:", now()-start) 複製代碼 結果爲:

複製代碼 <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>> <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]> waiting: 2 callback: Done after 2s Time: 0.00039196014404296875 複製代碼 經過add_done_callback方法給task任務添加回調函數,當task(也能夠說是coroutine)執行完成的時候,就會調用回調函數。並經過參數future獲取協程執行的結果。這裏咱們建立 的task和回調裏的future對象其實是同一個對象

阻塞和await 使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行

耗時的操做通常是一些IO操做,例如網絡請求,文件讀取等。咱們使用asyncio.sleep函數來模擬IO操做。協程的目的也是讓這些IO操做異步化。

複製代碼 import asyncio import time

now = lambda :time.time()

async def do_some_work(x): print("waiting:",x) # await 後面就是調用耗時的操做 await asyncio.sleep(x) return "Done after s".format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print("Task ret:", task.result()) print("Time:", now() - start) 複製代碼 在await asyncio.sleep(x),由於這裏sleep了,模擬了阻塞或者耗時操做,這個時候就會讓出控制權。 即當遇到阻塞調用的函數的時候,使用await方法將協程的控制權讓出,以便loop調用其餘的協程。

併發和並行 併發指的是同時具備多個活動的系統

並行值得是用併發來使一個系統運行的更快。並行能夠在操做系統的多個抽象層次進行運用

因此併發一般是指有多個任務須要同時進行,並行則是同一個時刻有多個任務執行

下面這個例子很是形象:

併發狀況下是一個老師在同一時間段輔助不一樣的人功課。並行則是好幾個老師分別同時輔助多個學生功課。簡而言之就是一我的同時吃三個饅頭仍是三我的同時分別吃一個的狀況,吃一個饅頭算一個任務

複製代碼 import asyncio import time

now = lambda :time.time()

async def do_some_work(x): print("Waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

start = now()

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print("Task ret:",task.result())

print("Time:",now()-start) 複製代碼 運行結果:

複製代碼 Waiting: 1 Waiting: 2 Waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s Time: 4.004154920578003 複製代碼 總時間爲4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。若是是同步順序的任務,那麼至少須要7s。此時咱們使用了aysncio實現了併發。asyncio.wait(tasks) 也可使用 asyncio.gather(*tasks) ,前者接受一個task列表,後者接收一堆task。

關於asyncio.gather和asyncio.wait官網的說明:

https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

複製代碼 Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future. 複製代碼 https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

複製代碼 Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return. 複製代碼 協程嵌套 使用async能夠定義協程,協程用於耗時的io操做,咱們也能夠封裝更多的io操做過程,這樣就實現了嵌套的協程,即一個協程中await了另一個協程,如此鏈接起來。

複製代碼 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

dones, pendings = await asyncio.wait(tasks)
for task in dones:
    print("Task ret:", task.result())

# results = await asyncio.gather(*tasks)
# for result in results:
#     print("Task ret:",result)

start = now()

loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("Time:", now()-start) 複製代碼 若是咱們把上面代碼中的:

dones, pendings = await asyncio.wait(tasks)
for task in dones:
    print("Task ret:", task.result())

替換爲:

results = await asyncio.gather(*tasks)
for result in results:
    print("Task ret:",result)

這樣獲得的就是一個結果的列表

不在main協程函數裏處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果。 將上述的代碼更改成:

複製代碼 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print("Task ret:",result)

print("Time:", now()-start) 複製代碼 或者返回使用asyncio.wait方式掛起協程。

將代碼更改成:

複製代碼 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop() done,pending = loop.run_until_complete(main()) for task in done: print("Task ret:",task.result())

print("Time:", now()-start) 複製代碼 也可使用asyncio的as_completed方法

複製代碼 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task print("Task ret: ".format(result))

start = now()

loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("Time:", now()-start) 複製代碼 從上面也能夠看出,協程的調用和組合很是靈活,主要體如今對於結果的處理:如何返回,如何掛起

協程的中止 future對象有幾個狀態:

Pending Running Done Cacelled 建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消。可使用asyncio.Task獲取事件循環的task

import asyncio import time

now = lambda :time.time()

async def do_some_work(x): print("Waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

coroutine1 =do_some_work(1) coroutine2 =do_some_work(2) coroutine3 =do_some_work(2)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ]

start = now()

loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()

print("Time:",now()-start)

啓動事件循環以後,立刻ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。而後經過循環asyncio.Task取消future。能夠看到輸出以下:

Waiting: 1 Waiting: 2 Waiting: 2 ^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for= cb=[_wait. ._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for= cb=[_wait. ._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for= >} False True True True Time: 1.0707225799560547 複製代碼 True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常

循環task,逐個cancel是一種方案,但是正如上面咱們把task的列表封裝在main函數中,main函數外進行事件循環的調用。這個時候,main至關於最外出的一個task,那麼處理包裝的main函數便可。

不一樣線程的事件循環 不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block。

import asyncio from threading import Thread import time

now = lambda :time.time()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: '.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3) 複製代碼 啓動上述代碼以後,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法註冊的more_work方法, 後者由於time.sleep操做是同步阻塞的,所以運行完畢more_work須要大體6 + 3

新線程協程 複製代碼 import asyncio import time from threading import Thread

now = lambda :time.time()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) print('Done after s'.format(x))

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: '.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中建立一個new_loop,而後在另外的子線程中開啓一個無限事件循環。 主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。一共執行的時間大概在6s左右。

-- coding:utf-8 --

#14.1 Sentence類初版,單詞序列 #栗子14-1 吧句子劃分爲單詞序列 """ import re import reprlib

RE_WORD = re.compile('\w+') class Sentence: def init(self,text): self.text = text self.words = RE_WORD.findall(text) #返回一個字符串列表 def getitem(self, item): return self.words[item] def len(self): #爲了完善序列,咱們實現__len__方法,爲了讓對象可迭代,不必實現這個方法 return len(self.words) def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) #生成大型數據結構的簡略字符串表示 #栗子14-2 測試Sentence是否可迭代 s = Sentence('"The tiem has come,",the walrus said,') print(s) #Sentence('"The tiem ha... walrus said,') for word in s: print(word) ''' The tiem has come the walrus said ''' print(list(s)) #['The', 'tiem', 'has', 'come', 'the', 'walrus', 'said'] #【分析】序列可迭代的緣由 ''' (1)檢查內置對象是否實現了__iter__方法,若是實現了就調用他,獲取一個迭代器 (2)若是沒有實現__iter__方法,可是實現了__getitem__方法,Python會建立一個迭代器,嘗試按順序(從索引0開始)獲取元素 (3)若是嘗試失敗,Python跑出TypeError異常,一般會提示"C object is not itrable" '''

#14.2 可迭代對象和迭代器對比

Iterable 和 Iterator 抽象基類。前者是後者的父類,後者在前者__iter__的基礎上,新加了__next__方法。

#Iterator 裏有個方法 import abc @classmethod def subclasshook(cls,C): if cls is abc.Iterator: if (any("next" in B.dict for B in C.mro) and any("iter" in B.dict for B in C.mro)): return True return NotImplemented #考慮到Lib/types.py中的建議,以及Lib/_collections_abc.py中的邏輯實現,檢查對象x是否爲迭代器最好的方式是調用isinstance(x,abc.Iterator)。得益於Iterator.__subclasshook__方法,即便對象x #...所屬的類不是Iterator類的真實子類或者虛擬子類,也能這麼檢查 #使用栗子14-1 中的類,用iter()函數構建迭代器,用next()函數使用迭代器 s3 = Sentence('Pig and Pepper') it = iter(s3) print(it) #<iterator object at 0x0000000002948A58> print(next(it)) #Pig print(next(it)) #and print(next(it)) #Pepper #print(next(it)) #StopIteration print(list(it)) #[] 到頭後,迭代器沒用了

#由於內置的 iter(...) 函數會對序列作特殊處理,因此第 1 版 Sentence 類能夠迭代。接下來要實現標準的可迭代協議 #使用迭代器模式實現Sentence類 import re import reprlib

RE_WORD = re.compile('\w+')

class Sentence: def init(self,text): self.text = text self.words = RE_WORD.findall(text) def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) str = repr def iter(self): #與前一版相比,這裏只多了一個 iter 方法。這一版沒有 getitem 方法,爲的是明確代表這個類能夠迭代,由於實現了 iter 方法。 return SsentenceIterator(self.words) #根據可迭代協議, iter 方法實例化並返回一個迭代器

class SsentenceIterator: def init(self,words): self.words = words #SentenceIterator 實例引用單詞列表 self.index = 0 # self.index 用於肯定下一個要獲取的單詞 def next(self): try: word = self.words[self.index] except IndexError: raise StopIteration() self.index += 1 return word def iter(self): return self print(list(iter(s3))) #['Pig', 'and', 'Pepper'] 要想再次迭代,要從新構建迭代器

#【注意】注意,對這個示例來講,其實不必在 SentenceIterator 類中實現 iter 方法,不過這麼作是對的,由於迭代器應該實現 nextiter 兩個方法,並且這麼作能讓迭代器經過 issubclass(SentenceInterator, abc.Iterator) 測試。若是讓SentenceIterator 類繼承 abc.Iterator 類,那麼它會繼承 abc.Iterator.iter #... 這個具體方法

#【注意】 ''' 把Sentence變成迭代器:壞主意 迭代器模式可用來: 訪問一個聚合對象的內容而無需暴露它的內部表示 支持對聚合對象的多種遍歷 爲遍歷不一樣的聚合結構提供一個統一的接口(即支持多態迭代) 爲了「支持多種遍歷」,必須能從同一個可迭代的實例中獲取多個獨立的迭代器,並且各個 迭代器要能維護自身的內部狀態,所以這一模式正確的實現方式是,每次調用 iter(my_iterable) 都新建一個獨立的迭代器。這就是爲何這個示例須要定義 SentenceIterator 類。 可迭代的對象必定不能是自身的迭代器。也就是說,可迭代的對象必須實現 iter 方法,但不能實現 next 方法 '''

#栗子14-5 使用生成器函數實現Sentence類 import re import reprlib RE_WORD = '\w+' class Sentence: def init(self,text): self.text = text self.words = RE_WORD.findall(self.text) def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) def iter(self): #迭代器實際上是生成器對象,每次調用 iter 方法都會自動建立,由於這裏的 iter 方法是生成器函數 for word in self.words: yield word return #生成器函數的工做原理只要 Python 函數的定義體中有 yield 關鍵字,該函數就是生成器函數。調用生成器函數時,會返回一個生成器對象。也就是說,生成器函數是生成器工廠 def gen_123(): yield 1 yield 2 yield 3 print(gen_123) #<function gen_123 at 0x0000000002093E18> print(gen_123()) #<generator object gen_123 at 0x000000000367A990> for i in gen_123(): print(i) ''' 1 2 3 ''' g = gen_123() print(next(g)) #1 print(next(g)) #2 print(next(g)) #3 print(next(g)) #StopIteration

#栗子14-6 運行時打印消息的生成器函數

def gen_AB(): print('start') yield 'A' print('continue') yield 'B' print('END') for c in gen_AB(): print ('--》' ,c)

''' start --》 A continue --》 B END '''

#這一版 Sentence 類比前一版簡短多了,可是還不夠懶惰。現在,人們認爲惰性是好的特質,至少在編程語言和 API 中是如此。惰性實現是指儘量延後生成值。這樣作能節省# 內存,並且或許還能夠避免作無用的處理

#栗子14-9 使用生成器表達式實現Sentence類 import re import reprlib RE_WORD = re.compile('\w+') class Sentenct: def init(self,text): self.text = text def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) str = repr def iter(self): return (match.group() for match in RE_WORD.finditer(self.text))

#【備註】若是生成器表達式要分紅多行寫,我傾向於定義生成器函數,以便提升可讀性。此外,生成器函數有名稱,所以能夠重用

#下面咱們在控制檯中對稍後實現的 ArithmeticProgression 類作一些測試,如示例 14-10 所示。這裏,構造方法的簽名是 ArithmeticProgression(begin, step[,end])。 range() 函數與這個 ArithmeticProgression 類的做用相似,不過簽名是range(start, stop[, step])。我選擇使用不一樣的簽名是由於,建立等差數列時必須指定公差(step),而末項(end)是可選的。我還把參數的名稱由 start/stop 改爲了begin/end,以明確代表簽名不一樣。在示例 14-10 裏的每一個測試中,我都調用了 list()

...函數,用於查看生成的值

#栗子14-10 ArithmeticProgression class ArithmeticProgression: def init(self,begin,step,end=None): self.begin = begin self.step = step self.end = end def iter(self): result = type(self.begin + self.step)(self.begin) forever = self.end is None index = 0 while forever or result < self.end: yield result index += 1 result = self.begin + self.step * index

#測試 ap = ArithmeticProgression(0,1,3) print(list(ap)) ap = ArithmeticProgression(1,.5,3) print(list(ap)) ap = ArithmeticProgression(0,1/3,1) print(list(ap)) from fractions import Fraction ap = ArithmeticProgression(0,Fraction(1,3),1) print(list(ap)) from decimal import Decimal ap = ArithmeticProgression(0,Decimal('.1'),.3) print(list(ap)) ''' [0, 1, 2] [1.0, 1.5, 2.0, 2.5] [0.0, 0.3333333333333333, 0.6666666666666666] [Fraction(0, 1), Fraction(1, 3), Fraction(2, 3)] [Decimal('0'), Decimal('0.1'), Decimal('0.2')] 1/3 '''

#示例 14-12 中定義了一個名爲 aritprog_gen 的生成器函數,做用與ArithmeticProgression 類同樣,只不過代碼量更少。若是把ArithmeticProgression 類換成 aritprog_gen 函數,示例 14-10 中的測試也都能經過 #栗子14-12 aritprog_gen生成器函數 def aritprog_gen(begin,step,end=None): result = type(begin + step)(begin) forever = end is None index = 0 while forever or result < end: yield result index += 1 result = begin + step*index

#itertools模塊提供了19個生成器函數,結合起來使用能實現不少有趣的用法. #itertools.count函數返回的生成器能生成多個數。若是不傳入參數,itertools.count函數會生成從0開始的整數數列. import itertools gen = itertools.count(1,.5) print(next(gen)) print(next(gen)) print(next(gen)) ''' 1 1.5 2.0 ''' #不過, itertools.takewhile 函數則不一樣,它會生成一個使用另外一個生成器的生成器,在指定的條件計算結果爲 False 時中止。所以,能夠把這兩個函數結合在一塊兒使用,編寫下述代碼: gen = itertools.takewhile(lambda n : n < 3, itertools.count(1,.5)) print(list(gen)) #[1, 1.5, 2.0, 2.5]

#aritprog_gen函數還能夠這麼實現 #栗子14-13 利用 takewhile 和 count 函數,寫出的代碼流暢而簡短

import itertools def aritprog_gen(begin,step,end=None): first = type(begin + step)(begin) ap_gen = itertools.count(first,step) if end is not None: ap_gen = itertools.takewhile(lambda n : n<end,ap_gen) return ap_gen """

【流暢的Python】【控制流程】【上下文管理器】 參考「上下文管理器」

【流暢的Python】【控制流程】【協程】 """

16.2 用過協程的生成器的基本行爲

#例子16-1 多是協程最簡單的使用演示 def simple_coroutine(): print('-> coroiutine started') x = yield print('-> coroutine recived:',x) my_coro = simple_coroutine() print(my_coro) #<generator object simple_coroutine at 0x10900f9e8> #print(next(my_coro)) ''' → coroiutine started None ''' #my_coro.send(42) ''' → coroutine recived: 42 StopIteration

''' #【備註】協程能夠身處四個狀態中的一個。當前狀態可使用inspect.getgeneratorstate()函數肯定,該函數會返回下述字符串中的一個。 #... 'GEN_CREATED'等待開始執行。 'GEN_RUNNING' 解釋器正在執行 'GEN_SUSPENDED'在yield表達式處暫停 'GEN_CLOSED'執行結束。 #...由於send方法的參數會稱爲暫停的yield表達式的值,因此,僅當協程處於暫停狀態時才能調用send方法。不過,若是協程還沒激活(即,狀態是'GEN_CREATED'),狀況就不一樣了。所以,始終要調用next()激活協程-也能夠調用my_coro.sen(None),效果同樣 #...若是建立協程對象後當即把None以外的值發給他,會出現下述錯誤: my_coro = simple_coroutine() my_coro.send(1729) #TypeError: can't send non-None value to a just-started generator

例子16-2 產出兩個值的協程

def simple_coro2(a): print('-> Started:a = ',a) b = yield a print('-> Received:b = ',b) c = yield (a+b) print('-> Received:c = ',c) my_coro2 = simple_coro2(14) from inspect import getgeneratorstate print(getgeneratorstate(my_coro2)) #GEN_CREATED print(next(my_coro2)) ''' → Started:a = 14 14

''' print(getgeneratorstate(my_coro2)) #GEN_SUSPENDED print(my_coro2.send(28)) ''' → Started:b = 28 42 ''' print(my_coro2.send(99)) ''' → Received:c = 99 Traceback (most recent call last): File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 47, in my_coro2.send(99) StopIteration ''' print(getgeneratorstate(my_coro2)) #'GEN_CLOSED'

#例子16-3 一個計算移動平均值的協程 def averager(): total = 0.0 count = 0 average = None while True:#這個無限循環代表,只要調用方不斷把值發給這個協程,它就會一直接收值,而後生成結果。僅當調用方在協程上調用.close()方法,或者沒有對協程引用而被垃圾回收程序回收時,這個協程才終止 term = yield average total += term count += 1 average = total/count coro_avg = averager() print(next(coro_avg)) #None print(coro_avg.send(10)) #10.0 print(coro_avg.send(30)) #20.0 print(coro_avg.send(5)) #15.0

#16.4 預激程序的裝飾器 from functools import wraps

def coroutine(func): @wraps(func) def primer(*args,**kwargs): gen = func(*args,**kwargs) next(gen) return gen return primer

@coroutine def averager(): total = 0.0 count = 0 average = None while True: term = yield average total += term count += 1 average = total/count

coro_avg = averager() from inspect import getgeneratorstate print(getgeneratorstate(coro_avg)) #GEN_SUSPENDED print(coro_avg.send(10)) #10.0 print(coro_avg.send(30)) #20.0 print(coro_avg.send(5)) #15.0

#16.5 終止協程和異常處理 #例子 16-7 未處理的異常會致使協程終止 from functools import wraps def coroutine(func): @wraps(func) def primer(*args,**kwargs): gen = func(*args,**kwargs) next(gen) return gen return primer

@coroutine def averager(): total = 0.0 count = 0 average = None while True: term = yield average total += term count += 1 average = total/count coro_avg = averager() print(coro_avg.send(40)) #40.0 print(coro_avg.send(50)) #45.0 print(coro_avg.send('spam')) #TypeError: unsupported operand type(s) for +=: 'float' and 'str'。此時,因爲在協程裏沒有處理異常,協程會終止。若是試圖從新激活協程,會拋出 print(coro_avg.send(60)) #不會處理

例子16-8 在協程中處理異常代碼

class DemoException(Exception): '''爲此次演示定義的異常類型''' def demo_exc_handling(): print('-> coroutine started') while True: try: x = yield except DemoException: print('*** DemoException handled.Continuing...') else: #若是沒有異常,則顯示接收到的值 print('->coroutine received:{!s}'.format(x)) raise RuntimeError('This line should never run.') #這一行永遠不會執行,由於只有未處理的異常纔會終止那個無限循環

#激活和關閉demo_exc_handling,沒有異常 exc_coro = demo_exc_handling() next(exc_coro) #coroutine started exc_coro.send(11) #->coroutine received:11 exc_coro.send(22) #->coroutine received:22 exc_coro.close() from inspect import getgeneratorstate print(getgeneratorstate(exc_coro)) #GEN_CLOSED

#把DemoException異常傳入demo_exc_handling不會致使協程停止 exc_coro = demo_exc_handling() next(exc_coro) #-> coroutine started exc_coro.send(11) #->coroutine received:11 exc_coro.throw(DemoException) #*** DemoException handled.Continuing... print(getgeneratorstate(exc_coro)) #GEN_SUSPENDED

#若是沒法處理傳入的異常,協程會終止 exc_coro = demo_exc_handling() next(exc_coro) #-> coroutine started exc_coro.send(11) print(exc_coro.throw(ZeroDivisionError)) ''' Traceback (most recent call last): File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 172, in print(exc_coro.throw(ZeroDivisionError)) File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 145, in demo_exc_handling x = yield ZeroDivisionError ''' print(getgeneratorstate(exc_coro)) #GEN_CLOSED

#例子16-12 使用try/finally 塊在協程終止時執行操做 class DemoException(Exception): '''爲此次演示定義的異常類型''' def demo_finally(): print('-> coroutine started') try: while True: try: x = yield except DemoException: print('*** DemoException handled.Continuing.....') else: print('-> coroutine received:{!s}'.format(x)) finally: print('->coroutine ending')

激活和關閉demo_exc_handling,沒有異常

exc_coro = demo_finally() next(exc_coro) #coroutine started 換行後打印 →coroutine ending exc_coro.send(11) #->coroutine received:11 換行後打印 →coroutine ending exc_coro.send(22) #->coroutine received:22 換行後打印 →coroutine ending exc_coro.close() from inspect import getgeneratorstate print(getgeneratorstate(exc_coro)) #GEN_CLOSED 換行後打印 →coroutine ending

把DemoException異常傳入demo_exc_handling不會致使協程停止

exc_coro = demo_finally() next(exc_coro) #-> coroutine started 換行後打印 →coroutine ending exc_coro.send(11) #->coroutine received:11 換行後打印 →coroutine ending exc_coro.throw(DemoException) #*** DemoException handled.Continuing... 換行後打印 →coroutine ending print(getgeneratorstate(exc_coro)) #GEN_SUSPENDED 換行後打印 →coroutine ending

#若是沒法處理傳入的異常,協程會終止 exc_coro = demo_finally() next(exc_coro) #-> coroutine started 換行後打印 →coroutine ending exc_coro.send(11) #->coroutine received:11 換行後打印 →coroutine ending

exc_coro.throw(ZeroDivisionError) ''' Traceback (most recent call last): File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 220, in print(exc_coro.throw(ZeroDivisionError)) →coroutine ending File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 192, in demo_finally x = yield ZeroDivisionError →coroutine ending ''' from inspect import getgeneratorstate print(getgeneratorstate(exc_coro)) #什麼也沒打印,由於沒有走到這一行

#讓協程返回值 #例子16-14 from collections import namedtuple

Result = namedtuple('Result','count average')

def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total/count return Result(count,average)

coro_avg = averager() next(coro_avg) #無產出 coro_avg.send(10) #無產出 coro_avg.send(30) #無產出 coro_avg.send(6.5) #無產出 #coro_avg.send(None) #StopIteration: Result(count=3, average=15.5)

#例子16-15 from collections import namedtuple Result = namedtuple('Result','count average')

def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total/count return Result(count,average)

coro_avg = averager() next(coro_avg) #無產出 coro_avg.send(10) #無產出 coro_avg.send(30) #無產出 coro_avg.send(6.5) #無產出 try: coro_avg.send(None) except StopIteration as exc: result = exc.value print(result) #Result(count=3, average=15.5)

"""

#對比 yield from 和 yield def gen(): yield from 'AB' yield from range(1,3) print(list(gen())) #['A', 'B', 1, 2]

def gen(): yield 'AB' yield range(1,3) print(list(gen())) #['AB', range(1, 3)]

#16.7 yield from def gen(): yield from 'AB' yield from range(1,3) print(list(gen())) #['A', 'B', 1, 2]

#栗子16-16 使用yield from 鏈接可迭代對象 def chain(*iterables): for it in iterables: yield from it s = 'ABC' t = tuple(range(3)) print(list(chain(s,t))) #['A', 'B', 'C', 0, 1, 2] #栗子16-17 說明yield from 用法 from collections import namedtuple

Result = namedtuple('Result','count average') def averager(): #子生成器 total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total/count return Result(count,average)

#委派生成器 def grouper(results,key): while True: results[key] = yield from averager() #客戶端代碼,即調用方 def main(data): results = for key,values in data.items(): group = grouper(results,key) next(group) for value in values: group.send(value) group.send(None)

print(results)
report(results)

#輸出報告 def report(results): for key,result in sorted(results.items()): group,unit = key.split(';') print(' averaging '.format(result.count,group,result.average,unit)) data = {'girls;kg': [40.9,38.5,44.3], 'girls;m': [1.6,1.51,1.4], 'boys;kg': [50.6,60,70.33], 'boys;m': [1.7,1.89,1.78], }

if name == 'main': main(data) #[解析] ''' 下面簡要說明示例 16-17 的運做方式,還會說明把 main 函數中調用 group.send(None) 那一行代碼(帶有「重要! 」註釋的那一行)去掉會發生什麼事。 外層 for 循環每次迭代會新建一個 grouper 實例,賦值給 group 變量; group 是委 派生成器。 調用 next(group),預激委派生成器 grouper,此時進入 while True 循環,調用 子生成器 averager 後,在 yield from 表達式處暫停。 內層 for 循環調用 group.send(value),直接把值傳給子生成器 averager。同 時,當前的 grouper 實例(group)在 yield from 表達式處暫停。 內層循環結束後, group 實例依舊在 yield from 表達式處暫停,所以, grouper 函數定義體中爲 results[key] 賦值的語句尚未執行。 本文檔由Linux公社 www.linuxidc.com 整理若是外層 for 循環的末尾沒有 group.send(None),那麼 averager 子生成器永遠 不會終止,委派生成器 group 永遠不會再次激活,所以永遠不會爲 results[key] 賦值。 外層 for 循環從新迭代時會新建一個 grouper 實例,而後綁定到 group 變量上。前 一個 grouper 實例(以及它建立的還沒有終止的 averager 子生成器實例)被垃圾回 收程序回收。 ''' #打印結果 ''' 3boys averaging (男孩體重的平均值)kg 3boys averaging (男孩身高的平均值)m 3girls averaging (女孩體重的平均值)kg 3girls averaging (女孩身高的平均值)m '''

#16.9 使用案例,使用xiecheng作離散事件仿真 from collections import namedtuple

Event = namedtuple('Event','time proc action') #實現各兩出租車的活動 def taxi_process(ident,trips,start_time): time = yield Event(start_time,ident,'leave garage') for i in range(trips): time = yield Event(time,ident,'pick up passenger') time = yield Event(time,ident,'drop off passenger') yield Event(time,ident,'going home') ''' #驅動taxi_process xiecheng taxi = taxi_process(ident=13,trips=2,start_time=0) result1 = next(taxi) print(result1) #Event(time=0, proc=13, action='leave garage') result2 = taxi.send(result1.time + 7) print(result2) #Event(time=7, proc=13, action='pick up passenger') result3 = taxi.send(result2.time + 23) print(result3) #Event(time=30, proc=13, action='drop off passenger') result4 = taxi.send(result3.time + 5) print(result4) #Event(time=35, proc=13, action='pick up passenger') result5 = taxi.send(result4.time + 48) print(result5) #Event(time=83, proc=13, action='drop off passenger') result6 = taxi.send(result5.time + 1) print(result6) #Event(time=84, proc=13, action='going home') #result7 = taxi.send(result6.time + 10) #print(result7) #StopIteration ''' #爲了實例化 Simulator 類, taxi_sim.py 腳本的 main 函數構建了一個 taxis 字典,以下所示 DEPARTURE_INTERVAL = 5 num_taxis = 3 taxis = {i:taxi_process(i,(i+1)2,iDEPARTURE_INTERVAL) for i in range(num_taxis)} #sim = Simulator(taxis) ''' DEPARTURE_INTERVAL 的值是 5;若是 num_taxis 的值與前面的運行示例同樣也是 3, 這三行代碼的做用與下述代碼同樣: taxis = {0: taxi_process(ident=0, trips=2, start_time=0), 1: taxi_process(ident=1, trips=4, start_time=5), 2: taxi_process(ident=2, trips=6, start_time=10)} sim = Simulator(taxis) '''

Simulator,一個簡單的離散事件仿真類;關注的重點是

''' 所以, taxis 字典的值是三個參數不一樣的生成器對象。例如, 1 號出租車從 start_time=5 時開始,尋找四個乘客。構建 Simulator 實例只需這個字典參數。 Simulator.init 方法如示例 16-22 所示。 Simulator 類的主要數據結構以下。 self.events PriorityQueue 對象,保存 Event 實例。元素能夠放進(使用 put 方 法) PriorityQueue 對象中,而後按 item[0](即 Event 對象的 time 屬性)依序取出 (使用 get 方法)。 self.procs   一個字典,把出租車的編號映射到仿真過程當中激活的進程(表示出租車的生成器對 象)。這個屬性會綁定前面所示的 taxis 字典副本。 示例 16-22 taxi_sim.py: Simulator 類的初始化方法 class Simulator: def init(self, procs_map): self.events = queue.PriorityQueue() ➊ self.procs = dict(procs_map) ➋ ❶ 保存排定事件的 PriorityQueue 對象,按時間正向排序。 ❷ 獲取的 procs_map 參數是一個字典(或其餘映射),但是又從中構建一個字典,建立 本地副本,由於在仿真過程當中,出租車回家後會從 self.procs 屬性中移除,而咱們不想 修改用戶傳入的對象。 優先隊列是離散事件仿真系統的基礎構件:建立事件的順序不定,放入這種隊列以後,可 以按照各個事件排定的時間順序取出。例如,可能會把下面兩個事件放入優先隊列: Event(time=14, proc=0, action='pick up passenger') Event(time=11, proc=1, action='pick up passenger') 這兩個事件的意思是, 0 號出租車 14 分鐘後拉到第一個乘客,而 1 號出租車(time=10 時出發) 1 分鐘後(time=11)拉到乘客。若是這兩個事件在隊列中,主循環從優先隊列 中獲取的第一個事件將是 Event(time=11, proc=1, action='pick up passenger')。 下面分析這個仿真系統的主算法——Simulator.run 方法。在 main 函數中,實例化 Simulator 類以後當即就調用了這個方法,以下所示: sim = Simulator(taxis) sim.run(end_time) 本文檔由Linux公社 www.linuxidc.com 整理Simulator 類帶有註解的代碼清單在示例 16-23 中,下面先概述 Simulator.run 方法實 現的算法。 (1) 迭表明示各輛出租車的進程。

  1. 在各輛出租車上調用 next() 函數,預激協程。這樣會產出各輛出租車的第一個事 件。
  2. 把各個事件放入 Simulator 類的 self.events 屬性(隊列)中。 (2) 知足 sim_time < end_time 條件時,運行仿真系統的主循環。
  3. 檢查 self.events 屬性是否爲空;若是爲空,跳出循環。
  4. 從 self.events 中獲取當前事件(current_event),即 PriorityQueue 對象 中時間值最小的 Event 對象。
  5. 顯示獲取的 Event 對象。 d.獲取 current_event 的 time 屬性,更新仿真時間。 e.把時間發給 current_event 的 proc 屬性標識的協程,產出下一個事件 (next_event)。 f.把 next_event 添加到 self.events 隊列中,排定 next_event。 Simulator 類完整的代碼如示例 16-23 所示。 run 方法 class Simulator: def init(self, procs_map): self.events = queue.PriorityQueue() self.procs = dict(procs_map) def run(self, end_time): ➊ "排定並顯示事件,直到時間結束"

排定各輛出租車的第一個事件

for _, proc in sorted(self.procs.items()): ➋ first_event = next(proc) ➌ self.events.put(first_event) ➍

這個仿真系統的主循環

sim_time = 0 ➎ while sim_time < end_time: ➏ if self.events.empty(): ➐ print('*** end of events ') break current_event = self.events.get() ➑ 本文檔由Linux公社 www.linuxidc.com 整理sim_time, proc_id, previous_action = current_event ➒ print('taxi:', proc_id, proc_id * ' ', current_event) ➓ active_proc = self.procs[proc_id] ⓫ next_time = sim_time + compute_duration(previous_action) ⓬ try: next_event = active_proc.send(next_time) ⓭ except StopIteration: del self.procs[proc_id] ⓮ else: self.events.put(next_event) ⓯ else: ⓰ msg = ' end of simulation time: events pending ***' print(msg.format(self.events.qsize())) ❶ run 方法只須要仿真結束時間(end_time)這一個參數。 ❷ 使用 sorted 函數獲取 self.procs 中按鍵排序的元素;用不到鍵,所以賦值給 _。 ❸ 調用 next(proc) 預激各個協程,向前執行到第一個 yield 表達式,作好接收數據的 準備。產出一個 Event 對象。 ❹ 把各個事件添加到 self.events 屬性表示的 PriorityQueue 對象中。如示例 16-20 中的運行示例,各輛出租車的第一個事件是 'leave garage'。 ❺ 把 sim_time 變量(仿真鍾)歸零。 ❻ 這個仿真系統的主循環: sim_time 小於 end_time 時運行。 ❼ 若是隊列中沒有未完成的事件,退出主循環。 ❽ 獲取優先隊列中 time 屬性最小的 Event 對象;這是當前事件(current_event)。 ❾ 拆包 Event 對象中的數據。這一行代碼會更新仿真鍾 sim_time,對應於事件發生時 的時間。 這一般是離散事件仿真:每次循環時仿真鐘不會以固定的量推動,而是根據各個事件持續的時間推動。 ❿ 顯示 Event 對象,指明是哪輛出租車,並根據出租車的編號縮進。 ⓫ 從 self.procs 字典中獲取表示當前活動的出租車的協程。 ⓬ 調用 compute_duration(...) 函數,傳入前一個動做(例如, 'pick up passenger'、 'drop off passenger' 等),把結果加到 sim_time 上,計算出下一次 活動的時間。 ⓭ 把計算獲得的時間發給出租車協程。協程會產出下一個事件(next_event),或者拋 出 StopIteration 異常(完成時)。 ⓮ 若是拋出了 StopIteration 異常,從 self.procs 字典中刪除那個協程。 16 16 本文檔由Linux公社 www.linuxidc.com 整理⓯ 不然,把 next_event 放入隊列中。 ⓰ 若是循環因爲仿真時間到了而退出,顯示待完成的事件數量(有時可能碰巧是零)。 注意,示例 16-23 中的 Simulator.run 方法有兩處用到了第 15 章介紹的 else 塊,並且 都不在 if 語句中。 主 while 循環有一個 else 語句,報告仿真系統因爲到達結束時間而結束,而不是由 於沒有事件要處理而結束。 靠近主 while 循環底部那個 try 語句把 next_time 發給當前的出租車進程,嘗試獲 取下一個事件(next_event),若是成功,執行 else 塊,把 next_event 放入 self.events 隊列中。 我以爲,若是沒有這兩個 else 塊, Simulator.run 方法的代碼會有點難以閱讀。 這個示例的要旨是說明如何在一個主循環中處理事件,以及如何經過發送數據驅動協程。 這是 asyncio 包底層的基本思想,咱們在第 18 章會學習這個包。 '''

【流暢的Python】【控制流程】【使用期物處理併發】

-- coding:utf-8 --

""" #熱身練習 ''' import requests,sys,os BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

cc = 'T_JINGSE2' url = '/.PNG'.format(BASE_URL,cc=cc.lower())

resp = requests.get(url) print(resp.content)

path = os.path.join(sys.path[0],'downloads','t_jingse2.PNG') path = path.replace('\','/') with open(path,'wb') as fp: fp.write(resp.content)

'''

#例子 有些時候我保存在博客園的資源圖片地址會失效,下面這個例子是最近一次生效的,再以後是以前的

import os,time,sys,requests from enum import Enum import collections from collections import namedtuple import asyncio from concurrent import futures from tqdm import tqdm

Result = namedtuple("Result","status cc") HTTPStatus = Enum('Status','ok not_found error')

BASE_URL = 'https://images2018.cnblogs.com/blog/1239321/201808'

DEST_DIR = 'downloads'

MAX_WORKERS = 20

POP20_CC1 = 'aaaa1239321-20180802153722539-931669752 bbbbb1239321-20180802154629131-632652627 1239321-20180802154726137-51175796 '
'1239321-20180802155107376-2107990591 1239321-20180802155118312-440576328 1239321-20180802155128983-1994377345 '
'1239321-20180802155159195-1272984827 1239321-20180802155208608-1491921327 1239321-20180802155214231-346476157 '
'1239321-20180802155220096-549202876 1239321-20180802155224859-827179894 1239321-20180802155231862-1759423827 '
'1239321-20180802155622645-1918814285 1239321-20180802155627878-380113558 1239321-20180802155635750-1829416793 '
'1239321-20180802155641947-721209039 1239321-20180802155712392-1928969512 1239321-20180802155717674-571154640 '
'1239321-20180802155724893-1505320081 1239321-20180802155729597-1637208554 1239321-20180802155734238-2129250463 '
'1239321-20180802155738782-1538050338 1239321-20180802155743498-1050604720 1239321-20180802155748466-57033362 '
'1239321-20180802155754337-1037641153'.split() POP20_CC = 'aaaa1239321-20180802153722539-931669752 bbbbb1239321-20180802154629131-632652627 1239321-20180802153722539-931669752'.split()

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.png'.format(BASE_URL,cc=cc) resp = requests.get(url) if 'PNG' in resp.text: return resp.content elif 'not found' in resp.text or 'not exist' in resp.text: resp.status_code = 404 resp.raise_for_status()

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc) status = HTTPStatus.ok msg = 'OK' if verbose: print (cc,msg) return Result(status,cc) def download_many(cc_list,verbose): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: to_do_map = for cc in sorted(cc_list): future = executor.submit(download_one,cc,verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if verbose: done_iter = tqdm(done_iter,total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if not verbose and error_msg: print ('*** error for :'.format(to_do_map[future],error_msg)) return counter

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print (msg.format(count,elapsed))

if name == 'main':

main()

#栗子17-2 依照順序下載的腳本 ''' 準備工做:個人博客園相冊裏有存了20張圖片 ''' import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content

def show(text): print(text,end=' ') sys.stdout.flush()

def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) save_flag(image,cc.lower() + '.PNG') return len(cc_list)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 1.25s

#栗子17-3 使用futures.ThreadPoolExecutor類實現多線程下載的腳本

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content

def show(text): print(text,end=' ') sys.stdout.flush()

def download_one(cc): image = get_flag(cc) show(cc) save_flag(image,cc.lower() + '.PNG') return cc

from concurrent import futures def download_many(cc_list): workers = min(MAX_WORKERS,len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one,sorted(cc_list)) #map 方法的做用與內置的 map 函數相似,不過 download_one 函數會在多個線程中併發調用; map 方法返回一個生成器,所以能夠迭代,獲取各個函數返回的值。 return len(list(res))

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 0.25s

栗子 17-4 瞭解期物未何物

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content

def show(text): print(text,end=' ') sys.stdout.flush()

def download_one(cc): image = get_flag(cc) show(cc) save_flag(image,cc.lower() + '.PNG') return cc

from concurrent import futures def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in sorted(cc_list): future = executor.submit(download_one,cc) to_do.append(future) msg = 'Scheduled for :' print(msg.format(cc,future)) results = [] for future in futures.as_completed(to_do): res = future.result() msg = ' result: {!s}' print(msg.format(future, res)) results.append(res) return len(results)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many)

''' Scheduled for T_JINGSE2:<Future at 0x3670240 state=running> Scheduled for T_JINGSE3:<Future at 0x36dfc88 state=running> Scheduled for T_JINGSE4:<Future at 0x37022b0 state=running> Scheduled for T_JINGSE5:<Future at 0x37028d0 state=pending> Scheduled for t_jingse6:<Future at 0x3702978 state=pending> T_JINGSE2 T_JINGSE4 T_JINGSE3 <Future at 0x3670240 state=finished returned str> result: T_JINGSE2 <Future at 0x37022b0 state=finished returned str> result: T_JINGSE4 <Future at 0x36dfc88 state=finished returned str> result: T_JINGSE3 T_JINGSE5 <Future at 0x37028d0 state=finished returned str> result: T_JINGSE5 t_jingse6 <Future at 0x3702978 state=finished returned str> result: t_jingse6

5 flags downloaded in 0.14s ''' #[分析] ''' 嚴格來講,咱們目前測試的併發腳本都不能並行下載。使用 concurrent.futures 庫實 現的那兩個示例受 GIL(Global Interpreter Lock,全局解釋器鎖)的限制,而 flags_asyncio.py 腳本在單個線程中運行。 讀到這裏,你可能會對前面作的非正規基準測試有下述疑問。 既然 Python 線程受 GIL的限制,任什麼時候候都只容許運行一個線程,那麼 flags_threadpool.py 腳本的下載速度怎麼會比 flags.py 腳本快 5 倍? flags_asyncio.py 腳本和 flags.py 腳本都在單個線程中運行,前者怎麼會比後者快 5 倍? 第二個問題在 18.3 節解答。 GIL幾乎對 I/O 密集型處理無害,緣由參見下一節 ''' ''' 17.2 阻塞型I/O和GIL CPython 解釋器自己就不是線程安全的,所以有全局解釋器鎖(GIL),一次只容許使用 一個線程執行 Python 字節碼。所以,一個 Python 進程一般不能同時使用多個 CPU 核 心。 這是 CPython 解釋器的侷限,與 Python 語言自己無關。 Jython 和 IronPython 沒有這種限制。不過,目前最快的 Python 解釋器 PyPy 也有 GIL。 編寫 Python 代碼時沒法控制 GIL;不過,執行耗時的任務時,可使用一個內置的函數或 一個使用 C 語言編寫的擴展釋放 GIL。其實,有個使用 C 語言編寫的 Python 庫能管理 GIL,自行啓動操做系統線程,利用所有可用的 CPU 核心。這樣作會極大地增長庫代碼的 複雜度,所以大多數庫的做者都不這麼作。 然而,標準庫中全部執行阻塞型 I/O 操做的函數,在等待操做系統返回結果時都會釋放 GIL。這意味着在 Python 語言這個層次上可使用多線程,而 I/O 密集型 Python 程序能從 中受益:一個 Python 線程等待網絡響應時,阻塞型 I/O 函數會釋放 GIL,再運行一個線 程。 所以 David Beazley 才說: 「Python 線程毫無做用。 」 出自「Generators: The Final Frontier」(http://www.dabeaz.com/finalgenerator/),第 106 張幻燈片。 Python 標準庫中的全部阻塞型 I/O 函數都會釋放 GIL,容許其餘線程運 行。 time.sleep() 函數也會釋放 GIL。所以,儘管有 GIL, Python 線程仍是能在 I/O 密集型應用中發揮做用。 下面簡單說明如何在 CPU 密集型做業中使用 concurrent.futures 模塊輕鬆繞開 GIL ''' ''' #下面這種,會讓運行時間增長一倍左右 下載國旗的示例或其餘 I/O 密集型做業使用 ProcessPoolExecutor 類得不到任何好處。 這一點易於驗證,只需把示例 17-3 中下面這幾行: def download_many(cc_list): workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: 改爲: def download_many(cc_list): with futures.ProcessPoolExecutor() as executor: 通過幾回測試,我發現使用 ProcessPoolExecutor 實例下載 20 面國旗的時間增長到了 1.8 秒,而原來使用 ThreadPoolExecutor 的版本是 1.4 秒。主要緣由多是,個人電腦 用的是四核 CPU,所以限制只能有 4 個併發下載,而使用線程池的版本有 20 個工做的線 程。 ProcessPoolExecutor 的價值體如今 CPU 密集型做業上。我用兩個 CPU 密集型腳本作 了一些性能測試。

'''

''' 若是使用 Python 處理 CPU 密集型工做,應該試試 PyPy(http://pypy.org)。使 用 PyPy 運行 arcfour_futures.py 腳本,速度快了 3.8~5.1 倍;具體的倍數由職程的數量 決定。我測試時使用的是 PyPy 2.4.0,這一版與 Python 3.2.5 兼容,所以標準庫中有 concurrent.futures 模塊 '''

#17.4 實驗Executor.map方法 #栗子17-6 from time import sleep,strftime

from concurrent import futures

def display(*args): print(strftime('[%H:%M:%S]'),end = ' ') print(*args)

def loiter(n): msg = 'loiter():doing nothing for s' display(msg.format('\t'*n,n,n)) sleep(n) msg = 'loiter():done.' display(msg.format('\t'n,n)) return n10

def main(): display('Script starting.') executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(loiter,range(5)) display('results:',results) display('Waiting for individual results:') for i,result in enumerate(results): display('result :'.format(i,result)) main()

''' [11:52:00] Script starting. [11:52:00] loiter(0):doing nothing for 0s [11:52:00] loiter(0):done. [11:52:00] loiter(1):doing nothing for 1s [11:52:00] loiter(2):doing nothing for 2s [11:52:00] results: <generator object Executor.map. .result_iterator at 0x0000000002CC6888> [11:52:00] loiter(3):doing nothing for 3s [11:52:00] Waiting for individual results: [11:52:00] result 0:0 [11:52:01] loiter(1):done. [11:52:01] result 1:10 [11:52:01] loiter(4):doing nothing for 4s [11:52:02] loiter(2):done. [11:52:02] result 2:20 [11:52:03] loiter(3):done. [11:52:03] result 3:30 [11:52:05] loiter(4):done. [11:52:05] result 4:40

''' #【備註】 ''' executor.submit 和 futures.as_completed 這個組合比 executor.map 更 靈活,由於 submit 方法能處理不一樣的可調用對象和參數,而 executor.map 只能處 理參數不一樣的同一個可調用對象。此外,傳給 futures.as_completed 函數的期物 集合能夠來自多個 Executor 實例,例如一些由 ThreadPoolExecutor 實例建立, 另外一些由 ProcessPoolExecutor 實例建立 '''

#17,5 處理錯誤 #17.5.1 順序下載 #栗子17-12 順序下載

import os,time,sys,requests from enum import Enum import collections from collections import namedtuple import asyncio from concurrent import futures from tqdm import tqdm

Result = namedtuple('Result','status cc') HTTPStatus = Enum('Status','ok not_found error')

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE200 T_JINGSE3'.split() #POP20_CC = 'T_JINGSE992'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if 'PNG' in resp.text: return resp.content elif '404' in resp.text: resp.status_code = 404 resp.raise_for_status()

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.reason = 'NOT FOUND' raise else: raise

else:
    save_flag(image, cc.lower() + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'

if verbose:
    print(cc,msg)
return Result(status,cc)

def download_many(cc_list,verbose): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: to_do_map = for cc in sorted(cc_list): future = executor.submit(download_one,cc,verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if not verbose: done_iter = tqdm(done_iter,total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error

counter[status] += 1
        if not verbose and error_msg:
            cc = to_do_map[future]
            print('*** Error for {} :{}'.format(cc,error_msg))
return counter

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count, elapsed))

if name == 'main': main()

''' #測試栗子1: POP20_CC第一個改爲假的+verbose=False+'if not verbose and error_msg: #備註1',運行結果以下 0%| | 0/19 [00:00<?, ?it/s]*** Error for T_JINGSE200: HTTP error 404 - NOT FOUND 100%|██████████| 19/19 [00:14<00:00, 1.29it/s]

Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 14.68s

''' ''' #測試栗子2:POP20_CC第一個改爲假的+verbose=True+'if verbose and error_msg: #備註1',運行結果以下 *** Error for T_JINGSE200: HTTP error 404 - NOT FOUND T_JINGSE3 OK T_JINGSE4 OK T_JINGSE5 OK T_jingse14 OK T_jingse15 OK T_jingse16 OK T_jingse17 OK T_jingse18 OK T_jingse19 OK T_jingse20 OK t_jingse10 OK t_jingse11 OK t_jingse12 OK t_jingse13 OK t_jingse6 OK t_jingse7 OK t_jingse8 OK t_jingse9 OK

Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 14.96s

'''

''' #測試栗子3:POP20_CC第一個改爲假的+verbose=False+'if verbose and error_msg: #備註1',運行結果以下 100%|██████████| 19/19 [00:15<00:00, 1.26it/s]

Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 15.14s '''

##17.5.2 使用futures.as_completed函數

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

DEFAULT_CONCUR_REQ = 30 #做限制 MAX_CONCUR_REQ = 1000 #爲上方的限制做保障

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if 'PNG' in resp.text: return resp.content elif '404' in resp.text: resp.status_code = 404 resp.raise_for_status()

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc.lower() + '.PNG') status = HTTPStatus.ok msg = 'OK'

if verbose:  #若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: to_do_map = for cc in sorted(cc_list): future = executor.submit(download_one,cc,verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if not verbose: done_iter = tqdm(done_iter,total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP - ' error_mas = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error

counter[status] += 1
       if verbose and error_msg:
           cc = to_do_map[future]
           print('*** Error for {}: {}'.format(cc,error_msg))

return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,concur_req=DEFAULT_CONCUR_REQ) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many)

''' #Python 線程特別適合 I/O 密集型應用, concurrent.futures 模塊大大簡化了某些使用場 景下 Python 線程的用法。咱們對 concurrent.futures 模塊基本用法的介紹到此結束。 下面討論不適合使用 ThreadPoolExecutor 或 ProcessPoolExecutor 類時,有哪些替 代方案。 ''' """ #17.5.3 線程和多進程的替代方案

【流暢的Python】【控制流程】【asyncio】

-- coding:utf-8 --

""" #18.1 線程&協程 #栗子18-1 threading import sys import time import itertools import threading

class Signal: go = True

def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) time.sleep(.1) if not signal.go: break write(' '* len(status) + '\x08'*len(status))

def slow_function(): time.sleep(1) return 42

def supervisor(): signal = Signal() spinner = threading.Thread(target=spin,args=('thinking!',signal)) print('spinner object:',spinner) spinner.start() result = slow_function() signal.go = False spinner.join() return result

def main(): result = supervisor() print('Answer:',result) if name == 'main': main()

''' spinner object: <Thread(Thread-1, initial)> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer: 42 '''

#栗子18-2 asyncio 實現

import asyncio import sys import itertools

@asyncio.coroutine def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08'*len(status)) #這是顯示文本式動畫的訣竅所在:使用退格符(\x08)把光標移回來 try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break write(' '*len(status) + '\x08'*len(status)) #使用空格清除狀態消息,把光標移回開頭 @asyncio.coroutine def slow_function(): # 僞裝等到I/O一段時間 yield from asyncio.sleep(1) #yield from asyncio.sleep(3) 表達式把控制權交給主循環,在休眠結束後恢復這個協程 return 42 @asyncio.coroutine def supervisor(): spinner = asyncio.async(spin('thinking!')) print('spinner object:',spinner) result = yield from slow_function() #驅動 slow_function() 函數。結束後,獲取返回值。同時,事件循環繼續運行,由於slow_function 函數最後使用 yield from asyncio.sleep(3) 表達式把控制權交回給了主循環。 spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() #獲取事件循環的引用。 result = loop.run_until_complete(supervisor()) #驅動 supervisor 協程,讓它運行完畢;這個協程的返回值是此次調用的返回值 loop.close() print('Answer :',result) if name == 'main': main() ''' spinner object: <Task pending coro=<spin() running at C:/Users/wangxue1/PycharmProjects/fluentPython/kongzhiliucheng/asyncio/init.py:69>> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer : 42 '''

#例子 async/await 實現

import asyncio import sys import itertools

async def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + msg write(status) flush() write('\x08' * len(status)) try: await asyncio.sleep(.3) except asyncio.CancelledError: break write('\x08' * len(status))

async def slow_function(): await asyncio.sleep(1) return 42

async def supervisor(): spinner = asyncio.ensure_future(spin('thinking!')) print (type(spinner)) print ('spinner object:',spinner) result = await slow_function() spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print ('Answer:',result) main()

''' #【比較】 這兩種 supervisor 實現之間的主要區別概述以下。 asyncio.Task 對象差很少與 threading.Thread 對象等效。 Victor Stinner(本章的 特約技術審校)指出, 「Task 對象像是實現協做式多任務的庫(例如 gevent)中的 綠色線程(green thread) 」。 Task 對象用於驅動協程, Thread 對象用於調用可調用的對象。 Task 對象不禁本身動手實例化,而是經過把協程傳給 asyncio.async(...) 函數或 loop.create_task(...) 方法獲取。 獲取的 Task 對象已經排定了運行時間(例如,由 asyncio.async 函數排 定); Thread 實例則必須調用 start 方法,明確告知讓它運行。 在線程版 supervisor 函數中, slow_function 函數是普通的函數,直接由線程調 用。在異步版 supervisor 函數中, slow_function 函數是協程,由 yield from 驅動。 沒有 API 能從外部終止線程,由於線程隨時可能被中斷,致使系統處於無效狀態。 若是想終止任務,可使用 Task.cancel() 實例方法,在協程內部拋出 CancelledError 異常。協程能夠在暫停的 yield 處捕獲這個異常,處理終止請 求。 supervisor 協程必須在 main 函數中由 loop.run_until_complete 方法執行。 上述比較應該能幫助你理解,與更熟悉的 threading 模型相比, asyncio 是如何編排並 發做業的。 線程與協程之間的比較還有最後一點要說明:若是使用線程作太重要的編程,你就知道寫 出程序有多麼困難,由於調度程序任什麼時候候都能中斷線程。必須記住保留鎖,去保護程序 中的重要部分,防止多步操做在執行的過程當中中斷,防止數據處於無效狀態。 而協程默認會作好全方位保護,以防止中斷。咱們必須顯式產出才能讓程序的餘下部分運 行。對協程來講,無需保留鎖,在多個線程之間同步操做,協程自身就會同步,由於在任 意時刻只有一個協程運行。想交出控制權時,可使用 yield 或 yield from 把控制權 交還調度程序。這就是可以安全地取消協程的緣由:按照定義,協程只能在暫停的 yield 處取消,所以能夠處理 CancelledError 異常,執行清理操做 '''

#18.1.1 故意不阻塞 ''' asyncio.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現方 式不一樣,不能夠互換。 「PEP 3156—Asynchronous IO Support Rebooted: the‘asyncio’Module」(https://www.python.org/dev/peps/pep-3156/)對這個不幸情況是這樣說 的: 將來可能會統一 asyncio.Future 和 concurrent.futures.Future 類實現的期物 (例如,爲後者添加兼容 yield from 的 iter 方法)。

總之,由於 asyncio.Future 類的目的是與 yield from 一塊兒使用,因此一般不須要使 用如下方法。 無需調用 my_future.add_done_callback(...),由於能夠直接把想在期物運行結 束後執行的操做放在協程中 yield from my_future 表達式的後面。這是協程的一 大優點:協程是能夠暫停和恢復的函數。 無需調用 my_future.result(),由於 yield from 從期物中產出的值就是結果 (例如, result = yield from my_future) '''

#18.2 使用asyncio和aiohttp下載

import os,sys,time import requests import asyncio import aiohttp

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE2 T_JINGSE3'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = await aiohttp.request('GET',url) image = await resp.read() return image

def show(text): print(text,end=' ') sys.stdout.flush()

async def download_one(cc): image = await get_flag(cc) show(cc) save_flag(image,cc.lower()+'.PNG') return cc

def download_many(cc_list): loop = asyncio.get_event_loop() to_do = [download_one(cc) for cc in sorted(cc_list)] wait_coro = asyncio.wait(to_do) #雖然函數的名稱是 wait,但它不是阻塞型函數。 wait 是一個協程,等傳給它的全部協程運行完畢後結束 ''' asyncio.wait(...) 協程的參數是一個由期物或協程構成的可迭代對象; wait 會分別 把各個協程包裝進一個 Task 對象。最終的結果是, wait 處理的全部對象都經過某種方 式變成 Future 類的實例。 wait 是協程函數,所以返回的是一個協程或生成器對 象; wait_coro 變量中存儲的正是這種對象。爲了驅動協程,咱們把協程傳給 loop.run_until_complete(...) 方法 ''' res,_ = loop.run_until_complete(wait_coro) #執行事件循環,直到 wait_coro 運行結束;事件循環運行的過程當中,這個腳本會在這裏阻塞。咱們忽略 run_until_complete 方法返回的第二個元素 ''' loop.run_until_complete 方法的參數是一個期物或協程。若是是協 程, run_until_complete 方法與 wait 函數同樣,把協程包裝進一個 Task 對象中。協 程、期物和任務都能由 yield from 驅動,這正是 run_until_complete 方法對 wait 函數返回的 wait_coro 對象所作的事。 wait_coro 運行結束後返回一個元組,第一個元 素是一系列結束的期物,第二個元素是一系列未結束的期物。在示例 18-5 中,第二個元 素始終爲空,所以咱們把它賦值給 _,將其忽略。可是 wait 函數有兩個關鍵字參數,如 果設定了可能會返回未結束的期物;這兩個參數是 timeout 和 return_when ''' loop.close() return len(res)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 0.25s ''' Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9470> t_jingse7 t_jingse11 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F128> T_JINGSE4 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003877BE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E8D0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FE48> t_jingse8 T_jingse17 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC7B8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003872C88> t_jingse6 t_jingse10 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BCBE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F5F8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FA20> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0B00> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E3C8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E390> T_JINGSE3 t_jingse9 T_JINGSE5 t_jingse13 T_jingse20 T_jingse16 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003868F28> T_JINGSE2 t_jingse12 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B02B0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC390> T_jingse19 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9048> T_jingse15 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B06D8> T_jingse18 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0F28> T_jingse14 19 flags downloaded in 0.45s ''' #【小結】 ''' 使用 asyncio 包時,咱們編寫的異步代碼中包含由 asyncio 自己驅動的 協程(即委派生成器),而生成器最終把職責委託給 asyncio 包或第三方庫(如 aiohttp)中的協程。這種處理方式至關於架起了管道,讓 asyncio 事件循環(經過我 們編寫的協程)驅動執行低層異步 I/O 操做的庫函數 '''

''' 18.3 避免阻塞型調用 Ryan Dahl(Node.js 的發明者)在介紹他的項目背後的哲學時說: 「咱們處理 I/O 的方式徹 底錯了。 」 他把執行硬盤或網絡 I/O 操做的函數定義爲阻塞型函數,主張不能像對待非 阻塞型函數那樣對待阻塞型函數。爲了說明緣由,他展現了表 18-1 中的前兩列。 「Introduction to Node.js」(https://www.youtube.com/watch?v=M-sc73Y-zQA)視頻 4:55 處。 表18-1:使用現代的電腦從不一樣的存儲介質中讀取數據的延遲狀況;第三欄按比例換 算成具體的時間,便於人類理解 存儲介質 CPU 週期 按比例換算成「人類時間」 L1 緩存 3 3 秒 L2 緩存 14 14 秒 RAM 250 250 秒 硬盤 41 000 000 1.3 年 網絡 240 000 000 7.6 年 爲了理解表 18-1,請記住一點:現代的 CPU 擁有 GHz 數量級的時鐘頻率,每秒鐘能運行 幾十億個週期。假設 CPU 每秒正好運行十億個週期,那麼 CPU 能夠在一秒鐘內讀取 L1 緩存 333 333 333 次,讀取網絡 4 次(只有 4 次)。表 18-1 中的第三欄是拿第二欄中的各 個值乘以固定的因子獲得的。所以,在另外一個世界中,若是讀取 L1 緩存要用 3 秒,那麼 讀取網絡要用 7.6 年! 有兩種方法能避免阻塞型調用停止整個應用程序的進程: 在單獨的線程中運行各個阻塞型操做 把每一個阻塞型操做轉換成非阻塞的異步調用使用 '''

#本身栗子1 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() loop.run_until_complete(coroutine)

print('TIME: ',now() - start)

#本身栗子2 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0010001659393310547

'''

#本身栗子3 ''' 協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。所謂task對象是Future類的子類。保存了協程運行後的狀態,用於將來獲取協程的結果'''

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。isinstance(task, asyncio.Future)將會輸出True ''' print(isinstance(task,asyncio.Future)) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0009999275207519531 True '''

#本身栗子4 :綁定回調

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(future): print('Result: ',future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task)

print('TIME: ',now() - start) ''' Waiting: 2 Result: <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子5:綁定回調 ,如回調須要多個參數

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(t,future): print('Result: ',t,future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) import functools task.add_done_callback(functools.partial(callback,2)) loop.run_until_complete(task)

print('TIME: ',now() - start)

''' Waiting: 2 Result: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子6: future 和 result 。回調一致是不少異步編程的噩夢,程序員更喜歡用同步的編寫方式寫異步代碼

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73AE8> TIME: 0.002000093460083008 '''

#本身栗子7: 阻塞和await

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73A60> TIME: 2.001114845275879 '''

#本身栗子8:併發&並行 #每當有阻塞任務時候就用await

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print('Task result: ',task.result())

print('Time: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s Time: 3.9912283420562744 '''

#例子

import asyncio import time

now = lambda: time.time()

start = now()

async def do_some_work(x): print ('Waiting:',x) await asyncio.sleep(x) return 'Done after s'.format(x)

tasks = [ asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(2)), asyncio.ensure_future(do_some_work(4)) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print ('Task result:',task.result())

''' Waiting: 2 Waiting: 1 Waiting: 4 Traceback (most recent call last): File "/Users/suren/PycharmProjects/untitled1/asyn.py", line 38, in print ('task result:',asyncio.ensure_future(coro).result()) asyncio.base_futures.InvalidStateError: Result is not ready.

'''

#本身栗子9 協程嵌套 [一] dones, pendings = await asyncio.wait(tasks)

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

dones,pendings = await asyncio.wait(tasks)

for task in dones:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 4.007229328155518 '''

#本身栗子10 協程嵌套 [二] 若是使用的是 asyncio.gather建立協程對象,那麼await的返回值就是協程運行的結果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
    print('Task result: ',result)

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9892282485961914 '''

#本身栗子11 協程嵌套 [三] 不在main協程函數裏處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)

loop = asyncio.get_event_loop() results = loop.run_until_complete(main())

for result in results: print('Task result: ', result)

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 4.0052289962768555 '''

#本身栗子12 協程嵌套 [四 ] 不在main協程函數裏處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果,使用asyncio.wait方式掛起協程。

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)

loop = asyncio.get_event_loop() dones,pendings = loop.run_until_complete(main())

for task in dones: print('Task result: ', task.result())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 3.9912283420562744 '''

#本身栗子13 協程嵌套 [五]使用asyncio的as_completed方法

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

for task in asyncio.as_completed(tasks):
    result = await task
    print('Task result: {}'.format(result))

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9912281036376953 '''

#本身栗子14 協程中止 【一】 main函數外進行事件循環的調用。這個時候,main至關於最外出的一個task,那麼處理包裝的main函數便可 ''' 上面見識了協程的幾種經常使用的用法,都是協程圍繞着事件循環進行的操做。future對象有幾個狀態:

Pending Running Done Cancelled 建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消。可使用asyncio.Task獲取事件循環的task'

啓動事件循環以後,立刻ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。而後經過循環asyncio.Task取消future。

'''

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

done,pending = await asyncio.wait(tasks)
for task in done:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print('*******************') print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() #True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常 finally: loop.close()

print('TIME: ',now() - start)

''' #不能再pycharm經過Ctrl+C,只能在Python交互環境裏 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}


True TIME: 2.0158370780944824 '''

#本身栗子15 協程中止 【二】 tasks在外層,沒有被包含在main函數裏面 import asyncio

import time

now = lambda: time.time() start = now() async def do_some_work(x): print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

start = now()

loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()

print('TIME: ', now() - start)

''' 打印四個True,而不是三個,緣由我也不知道 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>} True True True True TIME: 0.8858370780944824 '''

#本身栗子16 不一樣線程的時間循環 ''' 不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block。 啓動上述代碼以後,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法註冊的more_work方法,後者由於time.sleep操做是同步阻塞的,所以運行完畢more_work須要大體6 + 3 '''

from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

new_loop.call_soon_threadsafe(more_work,6) new_loop.call_soon_threadsafe(more_work,4)

''' More work 6 Finished more work 6 More work 4 Finished more work 4 '''

#本身栗子17: 新線程 協程 from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) print('Done after s'.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

asyncio.run_coroutine_threadsafe(do_some_work(6),new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4),new_loop)

''' Waiting 6 Waiting 4 Done after 4s Done after 6s '''

【aiohttp練習】

  1. 前言

本文翻譯自aiohttp的官方文檔,若有紕漏,歡迎指出。

aiohttp分爲服務器端和客戶端,本文只介紹客戶端。

另外我已經對 aiohttp 和 asyncio進行了封裝,能夠參考個人 github 地址:

https://github.com/web-trump/ahttp

因爲上下文的緣故,請求代碼必須在一個異步的函數中進行:

async def fn():

pass

  1. aiohttp安裝

pip3 install aiohttp

1.1. 基本請求用法

async with aiohttp.get('https://github.com') as r:
 
await r.text()

1
2

其中r.text(), 能夠在括號中指定解碼方式,編碼方式,例如

await resp.text(encoding='windows-1251')

1

或者也能夠選擇不編碼,適合讀取圖像等,是沒法編碼的

await resp.read()

2.發起一個session請求

首先是導入aiohttp模塊:

import aiohttp

而後咱們試着獲取一個web源碼,這裏以GitHub的公共Time-line頁面爲例:

async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) print(await resp.text())

上面的代碼中,咱們建立了一個 ClientSession 對象命名爲session,而後經過session的get方法獲得一個 ClientResponse 對象,命名爲resp,get方法中傳入了一個必須的參數url,就是要得到源碼的http url。至此便經過協程完成了一個異步IO的get請求。

有get請求固然有post請求,而且post請求也是一個協程:

session.post('http://httpbin.org/post', data=b'data')

用法和get是同樣的,區別是post須要一個額外的參數data,便是須要post的數據。

除了get和post請求外,其餘http的操做方法也是同樣的:

session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')

小記:

不要爲每次的鏈接都建立一次session,通常狀況下只須要建立一個session,而後使用這個session執行全部的請求。

每一個session對象,內部包含了一個鏈接池,而且將會保持鏈接和鏈接複用(默認開啓)能夠加快總體的性能。

3.在URL中傳遞參數

咱們常常須要經過 get 在url中傳遞一些參數,參數將會做爲url問號後面的一部分發給服務器。在aiohttp的請求中,容許以dict的形式來表示問號後的參數。舉個例子,若是你想傳遞 key1=value1 key2=value2 到 httpbin.org/get 你可使用下面的代碼:

params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as resp: assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

能夠看到,代碼正確的執行了,說明參數被正確的傳遞了進去。無論是一個參數兩個參數,仍是更多的參數,均可以經過這種方式來傳遞。除了這種方式以外,還有另一個,使用一個 list 來傳遞(這種方式能夠傳遞一些特殊的參數,例以下面兩個key是相等的也能夠正確傳遞):

params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面兩種,咱們也能夠直接經過傳遞字符串做爲參數來傳遞,可是須要注意,經過字符串傳遞的特殊字符不會被編碼:

async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert r.url == 'http://httpbin.org/get?key=value+1'

4.響應的內容

仍是以GitHub的公共Time-line頁面爲例,咱們能夠得到頁面響應的內容:

async with session.get('https://api.github.com/events') as resp: print(await resp.text())

運行以後,會打印出相似於以下的內容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,會自動將服務器端返回的內容進行解碼--decode,固然咱們也能夠自定義編碼方式:

await resp.text(encoding='gb2312')

除了text方法能夠返回解碼後的內容外,咱們也能夠獲得類型是字節的內容:

print(await resp.read())

運行的結果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzip和deflate轉換編碼已經爲你自動解碼。

小記:

text(),read()方法是把整個響應體讀入內存,若是你是獲取大量的數據,請考慮使用」字節流「(streaming response)

5.特殊響應內容:json

若是咱們獲取的頁面的響應內容是json,aiohttp內置了更好的方法來處理json:

async with session.get('https://api.github.com/events') as resp: print(await resp.json())

若是由於某種緣由而致使resp.json()解析json失敗,例如返回不是json字符串等等,那麼resp.json()將拋出一個錯誤,也能夠給json()方法指定一個解碼方式:

print(await resp.json(

encoding='gb2312'

)) 或者傳遞一個函數進去:

print(await resp.json( lambda(x:x.replace('a','b')) ))

6.以字節流的方式讀取響應內容

雖然json(),text(),read()很方便的能把響應的數據讀入到內存,可是咱們仍然應該謹慎的使用它們,由於它們是把整個的響應體所有讀入了內存。即便你只是想下載幾個字節大小的文件,但這些方法卻將在內存中加載全部的數據。因此咱們能夠經過控制字節數來控制讀入內存的響應內容:

async with session.get('https://api.github.com/events') as resp: await resp.content.read(10) #讀取前10個字節

通常地,咱們應該使用如下的模式來把讀取的字節流保存到文件中:

with open(filename, 'wb') as fd: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)

7.自定義請求頭

若是你想添加請求頭,能夠像get添加參數那樣以dict的形式,做爲get或者post的參數進行請求:

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'} headers = {'content-type': 'application/json'}

await session.post(url, data=json.dumps(payload), headers=headers)

8.自定義Cookie

給服務器發送cookie,能夠經過給 ClientSession 傳遞一個cookie參數:

url = 'http://httpbin.org/cookies' cookies = {'cookies_are': 'working'} async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == { "cookies": {"cookies_are": "working"}}

可直接訪問連接 「httpbin.org/cookies」查看當前cookie,訪問session中的cookie請見第10節。

9.post數據的幾種方式

(1)模擬表單post數據

payload = {'key1': 'value1', 'key2': 'value2'} async with session.post('http://httpbin.org/post', data=payload) as resp: print(await resp.text())

注意:data=dict的方式post的數據將被轉碼,和form提交數據是同樣的做用,若是你不想被轉碼,能夠直接以字符串的形式 data=str 提交,這樣就不會被轉碼。

(2)post json

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp: ...

其實json.dumps(payload)返回的也是一個字符串,只不過這個字符串能夠被識別爲json格式

(3)post 小文件

url = 'http://httpbin.org/post' files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)

能夠設置好文件名和content-type:

url = 'http://httpbin.org/post' data = FormData() data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')

await session.post(url, data=data)

若是將文件對象設置爲數據參數,aiohttp將自動以字節流的形式發送給服務器。

(4)post 大文件

aiohttp支持多種類型的文件以流媒體的形式上傳,因此咱們能夠在文件未讀入內存的狀況下發送大文件。

@aiohttp.streamer def file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(216) while chunk: yield from writer.write(chunk) chunk = f.read(216)

Then you can use file_sender as a data provider:

async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text())

同時咱們能夠從一個url獲取文件後,直接post給另外一個url,並計算hash值:

async def feed_stream(resp, stream): h = hashlib.sha256()

while True:
    chunk = await resp.content.readany()
    if not chunk:
        break
    h.update(chunk)
    stream.feed_data(chunk)

return h.hexdigest()

resp = session.get('http://httpbin.org/post') stream = StreamReader() loop.create_task(session.post('http://httpbin.org/post', data=stream))

file_hash = await feed_stream(resp, stream)

由於響應內容類型是StreamReader,因此能夠把get和post鏈接起來,同時進行post和get:

r = await session.get('http://python.org') await session.post('http://httpbin.org/post', data=r.content)

(5)post預壓縮數據

在經過aiohttp發送前就已經壓縮的數據, 調用壓縮函數的函數名(一般是deflate 或 zlib)做爲content-encoding的值:

async def my_coroutine(session, headers, my_data): data = zlib.compress(my_data) headers = {'Content-Encoding': 'deflate'} async with session.post('http://httpbin.org/post', data=data, headers=headers) pass

10.keep-alive, 鏈接池,共享cookie

ClientSession 用於在多個鏈接之間共享cookie:

async with aiohttp.ClientSession() as session: await session.get( 'http://httpbin.org/cookies/set?my_cookie=my_value') filtered = session.cookie_jar.filter_cookies('http://httpbin.org') assert filtered['my_cookie'].value == 'my_value' async with session.get('http://httpbin.org/cookies') as r: json_body = await r.json() assert json_body['cookies']['my_cookie'] == 'my_value'

也能夠爲全部的鏈接設置共同的請求頭:

async with aiohttp.ClientSession( headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session: async with session.get("http://httpbin.org/headers") as r: json_body = await r.json() assert json_body['headers']['Authorization'] ==
'Basic bG9naW46cGFzcw=='

ClientSession 還支持 keep-alive鏈接和鏈接池(connection pooling)

11.cookie安全性

默認ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受url和ip地址產生的cookie,只能接受 DNS 解析IP產生的cookie。能夠經過設置aiohttp.CookieJar 的 unsafe=True 來配置:

jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)

12.控制同時鏈接的數量(鏈接池)

也能夠理解爲同時請求的數量,爲了限制同時打開的鏈接數量,咱們能夠將限制參數傳遞給鏈接器:

conn = aiohttp.TCPConnector(limit=30)#同時最大進行鏈接的鏈接數爲30,默認是100,limit=0的時候是無限制

限制同時打開限制同時打開鏈接到同一端點的數量((host, port, is_ssl) 三的倍數),能夠經過設置 limit_per_host 參數:

conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0

13.自定義域名解析

咱們能夠指定域名服務器的 IP 對咱們提供的get或post的url進行解析:

from aiohttp.resolver import AsyncResolver

resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"]) conn = aiohttp.TCPConnector(resolver=resolver)

14.設置代理

aiohttp支持使用代理來訪問網頁:

async with aiohttp.ClientSession() as session: async with session.get("http://python.org", proxy="http://some.proxy.com") as resp: print(resp.status)

固然也支持須要受權的頁面:

async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp: print(resp.status)

或者經過這種方式來驗證受權:

session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

15.響應狀態碼 response status code

能夠經過 resp.status來檢查狀態碼是否是200:

async with session.get('http://httpbin.org/get') as resp: assert resp.status == 200

16.響應頭

咱們能夠直接使用 resp.headers 來查看響應頭,獲得的值類型是一個dict:

resp.headers {'ACCESS-CONTROL-ALLOW-ORIGIN': '*', 'CONTENT-TYPE': 'application/json', 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT', 'SERVER': 'gunicorn/18.0', 'CONTENT-LENGTH': '331', 'CONNECTION': 'keep-alive'}

或者咱們能夠查看原生的響應頭:

resp.raw_headers ((b'SERVER', b'nginx'), (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'), (b'CONTENT-TYPE', b'text/html; charset=utf-8'), (b'CONTENT-LENGTH', b'12150'), (b'CONNECTION', b'keep-alive'))

17.查看cookie

url = 'http://example.com/some/cookie/setting/url' async with session.get(url) as resp: print(resp.cookies)

18.重定向的響應頭

若是一個請求被重定向了,咱們依然能夠查看被重定向以前的響應頭信息:

resp = await session.get('http://example.com/some/redirect/') resp <ClientResponse(http://example.com/some/other/url/) [200]> resp.history (<ClientResponse(http://example.com/some/redirect/) [301]>,)

19.超時處理

默認的IO操做都有5分鐘的響應時間 咱們能夠經過 timeout 進行重寫:

async with session.get('https://github.com', timeout=60) as r: ...

若是 timeout=None 或者 timeout=0 將不進行超時檢查,也就是不限時長。

#18.4 改進asyncio下載腳本 #示例 18-7 flags2_asyncio.py:腳本的前半部分;餘下的代碼在示例 18-8 中

import os,time,sys import aiohttp from aiohttp import web import asyncio import async_timeout import collections from collections import namedtuple from enum import Enum from tqdm import tqdm

BASE_URL = 'https://images2018.cnblogs.com/blog/1239321/201808' POP20_CC1 = '1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'
'1239321-20180808065136786-868892759'
'1239321-20180808065146211-1880907820 1239321-20180808065155072-1392342345 1239321-20180808065222347-1439669487'
'1239321-20180808065232562-1454112423 1239321-20180808065246215-1857827340 1239321-20180808065301480-1707393818'
'1239321-20180808065312201-964077895 1239321-20180808065326211-1590046138 1239321-20180808065342568-448845'
'1239321-20180808065358869-366577464 1239321-20180808065410900-539910454 1239321-20180808065422695-222625730'
'1239321-20180808065430991-1182951067 1239321-20180808065437898-138307299 1239321-20180808065444387-1849567433'
'1239321-20180808065454537-30405473 1239321-20180808065506470-995044385 '.split() POP20_CC = 'aaaa1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(image,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) with open(path,'wb') as fp: fp.write(image)

async def get_flag(cc): url = '/.png'.format(BASE_URL,cc=cc)

async with aiohttp.ClientSession() as session:
    with async_timeout.timeout(3000):
        async with session.get(url,verify_ssl = False) as resp:
            #若是不加verify_ssl參數,則會報SSL錯誤,根源
            #是urllib或requests在打開https站點是會驗證證書

            #print(await resp.text())
            if b'PNG' in (await resp.read()):
                # 這裏不能用resp.status==404來判斷資源是否不存在,不是每一個網站返回結果的格式都是一致的。同時,也不能用'404' not in (await resp.text() 來判斷,由於若是資源存在,使用這個方法會報錯,'UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte'。
                # 因此只能用這種方式來判斷,這是試出來的,資源不存在時候這個值是191.
                # 這種判斷方法有點不太穩定,比較正規的的判斷方法待之後完善吧
                # 另一個辦法:if resp.status == 200 and ((await resp.read())[2] == 78):
                image = await resp.read()
                return image
            elif 'not found' in (await resp.text()) or 'not exist' in (await resp.text()):# 後來FetchError接到了,結果打印*** Error for T_JINGSE200: Not Found
                raise web.HTTPNotFound()
            else:

                raise aiohttp.HttpProcessingError(code = resp.status,message= resp.reason,headers = resp.headers) #後來FetchError接到了,結果打印*** Error for T_JINGSE200: module 'aiohttp' has no attribute

async def download_one(cc,semaphore,verbose): try: with (await semaphore):#在 yield from 表達式中把 semaphore 當成上下文管理器使用,防止阻塞整個系統:若是 semaphore 計數器的值是所容許的最大值,只有這個協程會阻塞。 image = await get_flag(cc) except Exception as exc:

raise FetchError(cc) from exc#引入的raise X from Y 句法連接原來的異常
else:
    save_flag(image,cc + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'
if verbose and msg:#若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print (cc,msg)
return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in cc_list] to_do_iter = asyncio.as_completed(to_do)#獲取一個迭代器,這個迭代器會在期物運行結束後返回期物 if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list))# 把迭代器傳給 tqdm 函數,顯示進度 for future in to_do_iter: #迭代運行結束的期物 try: res = await future except FetchError as exc: country_code = exc.country_code try: error_msg = exc.cause.args[0] #有的時候格式相似於("module 'aiohttp' has no attribute 'HttpProcessingError'",),此時就取元祖的第二個元素 #有的時候格式是相似於 (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:749)'),此時就取元祖的第一個元素 print (exc.cause.args) except IndexError: error_msg = exc.cause.class.name if not verbose and error_msg: msg = '****Error for :' print (msg.format(country_code,error_msg)) status = HTTPStatus.error

else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req)#download_many 函數只是實例化 downloader_coro 協程,而後經過run_until_complete 方法把它傳給事件循環 counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print (msg.format(count,elapsed))

main()

''' 返回正常的就下載到指定路徑 不正常的就報相應的錯誤 '''

18.4.2 使用Excutor對象,防止阻塞事件循環

''' 在示例 18-7 中,阻塞型函數是 save_flag。在這個腳本的線程版中(見示例 17- 14), save_flag 函數會阻塞運行 download_one 函數的線程,可是阻塞的只是衆多工 做線程中的一個。阻塞型 I/O 調用在背後會釋放 GIL,所以另外一個線程能夠繼續。可是在 flags2_asyncio.py 腳本中, save_flag 函數阻塞了客戶代碼與 asyncio 事件循環共用的惟 一線程,所以保存文件時,整個應用程序都會凍結。這個問題的解決方法是,使用事件循 環對象的 run_in_executor 方法。 asyncio 的事件循環在背後維護着一個 ThreadPoolExecutor 對象,咱們能夠調用 run_in_executor 方法,把可調用的對象發給它執行。若想在這個示例中使用這個功 能, download_one 協程只有幾行代碼須要改動 '''

栗子 異步下載,使用Executor對象,根上一個栗子相比,沒發現性能提高多少

from enum import Enum import os,sys,time import collections from collections import namedtuple import asyncio import async_timeout import aiohttp from aiohttp import web from tqdm import tqdm

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE200 T_JINGSE3'.split() DEST_DIR = 'download' MAX_WORKERS = 20

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) async with aiohttp.ClientSession() as session: with async_timeout.timeout(10000): async with session.get(url) as resp: if b'PNG' in (await resp.read()): image = await resp.read() return image elif '404' in (await resp.text()): raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError(code = resp.status,message=resp.reason,headers=resp.headers)

async def download_one(cc,semaphore,verbose): try: with (await semaphore): image = await get_flag(cc) except Exception as exc: raise FetchError(cc) from exc else: loop = asyncio.get_event_loop() loop.run_in_executor(None,save_flag,image,cc.lower()+'.PNG') status = HTTPStatus.ok msg = 'OK' if verbose and msg: print(cc,msg) return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in sorted(cc_list)] to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list)) for future in to_do_iter: try: res = await future

except FetchError as exc:
        country_code = exc.country_code
        try:
            error_msg = exc.__cause__.args[0]
            print(exc.__cause__.args)
        except IndexError:
            error_msg = exc.__cause__.__class__.__name__
        if not verbose and error_msg:
            msg = '*** Error for {} : {}'
            print(msg.format(country_code,error_msg))
        status = HTTPStatus.error
    else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req) counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=True,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

main()

#栗子 順序下載,把各個文件保存的字節數變成原來的 10 倍,不使用Executor對象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc.lower() + '.PNG') status = HTTPStatus.ok msg = 'OK'

if verbose:  #若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #備註1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 33.58s

if name == 'main': main(download_many)

栗子 順序下載,把各個文件保存的字節數變成原來的 10 倍,使用Executor對象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else:

#save_flag(image,cc.lower() + '.PNG')
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None,save_flag,image,cc.lower() + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'

if verbose:  #若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #備註1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #把各個文件保存的字節數變成原來的 10 倍(只需把fp.write(img) 改爲 fp.write(img*10)),此時便會看到效果 # Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 20.68s

if name == 'main': main(download_many)

"""

【流暢的Python】【控制流程】【五】【asyncio】

-- coding:utf-8 --

""" #18.1 線程&協程 #栗子18-1 threading import sys import time import itertools import threading

class Signal: go = True

def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) time.sleep(.1) if not signal.go: break write(' '* len(status) + '\x08'*len(status))

def slow_function(): time.sleep(1) return 42

def supervisor(): signal = Signal() spinner = threading.Thread(target=spin,args=('thinking!',signal)) print('spinner object:',spinner) spinner.start() result = slow_function() signal.go = False spinner.join() return result

def main(): result = supervisor() print('Answer:',result) if name == 'main': main()

''' spinner object: <Thread(Thread-1, initial)> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer: 42 '''

#栗子18-2 asyncio 實現

import asyncio import sys import itertools

@asyncio.coroutine def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08'*len(status)) #這是顯示文本式動畫的訣竅所在:使用退格符(\x08)把光標移回來 try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break write(' '*len(status) + '\x08'*len(status)) #使用空格清除狀態消息,把光標移回開頭 @asyncio.coroutine def slow_function(): # 僞裝等到I/O一段時間 yield from asyncio.sleep(1) #yield from asyncio.sleep(3) 表達式把控制權交給主循環,在休眠結束後恢復這個協程 return 42 @asyncio.coroutine def supervisor(): spinner = asyncio.async(spin('thinking!')) print('spinner object:',spinner) result = yield from slow_function() #驅動 slow_function() 函數。結束後,獲取返回值。同時,事件循環繼續運行,由於slow_function 函數最後使用 yield from asyncio.sleep(3) 表達式把控制權交回給了主循環。 spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() #獲取事件循環的引用。 result = loop.run_until_complete(supervisor()) #驅動 supervisor 協程,讓它運行完畢;這個協程的返回值是此次調用的返回值 loop.close() print('Answer :',result) if name == 'main': main() ''' spinner object: <Task pending coro=<spin() running at C:/Users/wangxue1/PycharmProjects/fluentPython/kongzhiliucheng/asyncio/init.py:69>> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer : 42 '''

#例子 async/await 實現

import asyncio import sys import itertools

async def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + msg write(status) flush() write('\x08' * len(status)) try: await asyncio.sleep(.3) except asyncio.CancelledError: break write('\x08' * len(status))

async def slow_function(): await asyncio.sleep(1) return 42

async def supervisor(): spinner = asyncio.ensure_future(spin('thinking!')) print (type(spinner)) print ('spinner object:',spinner) result = await slow_function() spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print ('Answer:',result) main()

''' #【比較】 這兩種 supervisor 實現之間的主要區別概述以下。 asyncio.Task 對象差很少與 threading.Thread 對象等效。 Victor Stinner(本章的 特約技術審校)指出, 「Task 對象像是實現協做式多任務的庫(例如 gevent)中的 綠色線程(green thread) 」。 Task 對象用於驅動協程, Thread 對象用於調用可調用的對象。 Task 對象不禁本身動手實例化,而是經過把協程傳給 asyncio.async(...) 函數或 loop.create_task(...) 方法獲取。 獲取的 Task 對象已經排定了運行時間(例如,由 asyncio.async 函數排 定); Thread 實例則必須調用 start 方法,明確告知讓它運行。 在線程版 supervisor 函數中, slow_function 函數是普通的函數,直接由線程調 用。在異步版 supervisor 函數中, slow_function 函數是協程,由 yield from 驅動。 沒有 API 能從外部終止線程,由於線程隨時可能被中斷,致使系統處於無效狀態。 若是想終止任務,可使用 Task.cancel() 實例方法,在協程內部拋出 CancelledError 異常。協程能夠在暫停的 yield 處捕獲這個異常,處理終止請 求。 supervisor 協程必須在 main 函數中由 loop.run_until_complete 方法執行。 上述比較應該能幫助你理解,與更熟悉的 threading 模型相比, asyncio 是如何編排並 發做業的。 線程與協程之間的比較還有最後一點要說明:若是使用線程作太重要的編程,你就知道寫 出程序有多麼困難,由於調度程序任什麼時候候都能中斷線程。必須記住保留鎖,去保護程序 中的重要部分,防止多步操做在執行的過程當中中斷,防止數據處於無效狀態。 而協程默認會作好全方位保護,以防止中斷。咱們必須顯式產出才能讓程序的餘下部分運 行。對協程來講,無需保留鎖,在多個線程之間同步操做,協程自身就會同步,由於在任 意時刻只有一個協程運行。想交出控制權時,可使用 yield 或 yield from 把控制權 交還調度程序。這就是可以安全地取消協程的緣由:按照定義,協程只能在暫停的 yield 處取消,所以能夠處理 CancelledError 異常,執行清理操做 '''

#18.1.1 故意不阻塞 ''' asyncio.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現方 式不一樣,不能夠互換。 「PEP 3156—Asynchronous IO Support Rebooted: the‘asyncio’Module」(https://www.python.org/dev/peps/pep-3156/)對這個不幸情況是這樣說 的: 將來可能會統一 asyncio.Future 和 concurrent.futures.Future 類實現的期物 (例如,爲後者添加兼容 yield from 的 iter 方法)。

總之,由於 asyncio.Future 類的目的是與 yield from 一塊兒使用,因此一般不須要使 用如下方法。 無需調用 my_future.add_done_callback(...),由於能夠直接把想在期物運行結 束後執行的操做放在協程中 yield from my_future 表達式的後面。這是協程的一 大優點:協程是能夠暫停和恢復的函數。 無需調用 my_future.result(),由於 yield from 從期物中產出的值就是結果 (例如, result = yield from my_future) '''

#18.2 使用asyncio和aiohttp下載

import os,sys,time import requests import asyncio import aiohttp

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE2 T_JINGSE3'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = await aiohttp.request('GET',url) image = await resp.read() return image

def show(text): print(text,end=' ') sys.stdout.flush()

async def download_one(cc): image = await get_flag(cc) show(cc) save_flag(image,cc.lower()+'.PNG') return cc

def download_many(cc_list): loop = asyncio.get_event_loop() to_do = [download_one(cc) for cc in sorted(cc_list)] wait_coro = asyncio.wait(to_do) #雖然函數的名稱是 wait,但它不是阻塞型函數。 wait 是一個協程,等傳給它的全部協程運行完畢後結束 ''' asyncio.wait(...) 協程的參數是一個由期物或協程構成的可迭代對象; wait 會分別 把各個協程包裝進一個 Task 對象。最終的結果是, wait 處理的全部對象都經過某種方 式變成 Future 類的實例。 wait 是協程函數,所以返回的是一個協程或生成器對 象; wait_coro 變量中存儲的正是這種對象。爲了驅動協程,咱們把協程傳給 loop.run_until_complete(...) 方法 ''' res,_ = loop.run_until_complete(wait_coro) #執行事件循環,直到 wait_coro 運行結束;事件循環運行的過程當中,這個腳本會在這裏阻塞。咱們忽略 run_until_complete 方法返回的第二個元素 ''' loop.run_until_complete 方法的參數是一個期物或協程。若是是協 程, run_until_complete 方法與 wait 函數同樣,把協程包裝進一個 Task 對象中。協 程、期物和任務都能由 yield from 驅動,這正是 run_until_complete 方法對 wait 函數返回的 wait_coro 對象所作的事。 wait_coro 運行結束後返回一個元組,第一個元 素是一系列結束的期物,第二個元素是一系列未結束的期物。在示例 18-5 中,第二個元 素始終爲空,所以咱們把它賦值給 _,將其忽略。可是 wait 函數有兩個關鍵字參數,如 果設定了可能會返回未結束的期物;這兩個參數是 timeout 和 return_when ''' loop.close() return len(res)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 0.25s ''' Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9470> t_jingse7 t_jingse11 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F128> T_JINGSE4 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003877BE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E8D0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FE48> t_jingse8 T_jingse17 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC7B8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003872C88> t_jingse6 t_jingse10 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BCBE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F5F8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FA20> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0B00> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E3C8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E390> T_JINGSE3 t_jingse9 T_JINGSE5 t_jingse13 T_jingse20 T_jingse16 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003868F28> T_JINGSE2 t_jingse12 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B02B0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC390> T_jingse19 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9048> T_jingse15 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B06D8> T_jingse18 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0F28> T_jingse14 19 flags downloaded in 0.45s ''' #【小結】 ''' 使用 asyncio 包時,咱們編寫的異步代碼中包含由 asyncio 自己驅動的 協程(即委派生成器),而生成器最終把職責委託給 asyncio 包或第三方庫(如 aiohttp)中的協程。這種處理方式至關於架起了管道,讓 asyncio 事件循環(經過我 們編寫的協程)驅動執行低層異步 I/O 操做的庫函數 '''

''' 18.3 避免阻塞型調用 Ryan Dahl(Node.js 的發明者)在介紹他的項目背後的哲學時說: 「咱們處理 I/O 的方式徹 底錯了。 」 他把執行硬盤或網絡 I/O 操做的函數定義爲阻塞型函數,主張不能像對待非 阻塞型函數那樣對待阻塞型函數。爲了說明緣由,他展現了表 18-1 中的前兩列。 「Introduction to Node.js」(https://www.youtube.com/watch?v=M-sc73Y-zQA)視頻 4:55 處。 表18-1:使用現代的電腦從不一樣的存儲介質中讀取數據的延遲狀況;第三欄按比例換 算成具體的時間,便於人類理解 存儲介質 CPU 週期 按比例換算成「人類時間」 L1 緩存 3 3 秒 L2 緩存 14 14 秒 RAM 250 250 秒 硬盤 41 000 000 1.3 年 網絡 240 000 000 7.6 年 爲了理解表 18-1,請記住一點:現代的 CPU 擁有 GHz 數量級的時鐘頻率,每秒鐘能運行 幾十億個週期。假設 CPU 每秒正好運行十億個週期,那麼 CPU 能夠在一秒鐘內讀取 L1 緩存 333 333 333 次,讀取網絡 4 次(只有 4 次)。表 18-1 中的第三欄是拿第二欄中的各 個值乘以固定的因子獲得的。所以,在另外一個世界中,若是讀取 L1 緩存要用 3 秒,那麼 讀取網絡要用 7.6 年! 有兩種方法能避免阻塞型調用停止整個應用程序的進程: 在單獨的線程中運行各個阻塞型操做 把每一個阻塞型操做轉換成非阻塞的異步調用使用 '''

#本身栗子1 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() loop.run_until_complete(coroutine)

print('TIME: ',now() - start)

#本身栗子2 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0010001659393310547

'''

#本身栗子3 ''' 協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。所謂task對象是Future類的子類。保存了協程運行後的狀態,用於將來獲取協程的結果'''

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。isinstance(task, asyncio.Future)將會輸出True ''' print(isinstance(task,asyncio.Future)) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0009999275207519531 True '''

#本身栗子4 :綁定回調

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(future): print('Result: ',future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task)

print('TIME: ',now() - start) ''' Waiting: 2 Result: <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子5:綁定回調 ,如回調須要多個參數

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(t,future): print('Result: ',t,future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) import functools task.add_done_callback(functools.partial(callback,2)) loop.run_until_complete(task)

print('TIME: ',now() - start)

''' Waiting: 2 Result: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子6: future 和 result 。回調一致是不少異步編程的噩夢,程序員更喜歡用同步的編寫方式寫異步代碼

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73AE8> TIME: 0.002000093460083008 '''

#本身栗子7: 阻塞和await

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73A60> TIME: 2.001114845275879 '''

#本身栗子8:併發&並行 #每當有阻塞任務時候就用await

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print('Task result: ',task.result())

print('Time: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s Time: 3.9912283420562744 '''

#例子

import asyncio import time

now = lambda: time.time()

start = now()

async def do_some_work(x): print ('Waiting:',x) await asyncio.sleep(x) return 'Done after s'.format(x)

tasks = [ asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(2)), asyncio.ensure_future(do_some_work(4)) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print ('Task result:',task.result())

''' Waiting: 2 Waiting: 1 Waiting: 4 Traceback (most recent call last): File "/Users/suren/PycharmProjects/untitled1/asyn.py", line 38, in print ('task result:',asyncio.ensure_future(coro).result()) asyncio.base_futures.InvalidStateError: Result is not ready.

'''

#本身栗子9 協程嵌套 [一] dones, pendings = await asyncio.wait(tasks)

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

dones,pendings = await asyncio.wait(tasks)

for task in dones:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 4.007229328155518 '''

#本身栗子10 協程嵌套 [二] 若是使用的是 asyncio.gather建立協程對象,那麼await的返回值就是協程運行的結果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
    print('Task result: ',result)

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9892282485961914 '''

#本身栗子11 協程嵌套 [三] 不在main協程函數裏處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)

loop = asyncio.get_event_loop() results = loop.run_until_complete(main())

for result in results: print('Task result: ', result)

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 4.0052289962768555 '''

#本身栗子12 協程嵌套 [四 ] 不在main協程函數裏處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果,使用asyncio.wait方式掛起協程。

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)

loop = asyncio.get_event_loop() dones,pendings = loop.run_until_complete(main())

for task in dones: print('Task result: ', task.result())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 3.9912283420562744 '''

#本身栗子13 協程嵌套 [五]使用asyncio的as_completed方法

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

for task in asyncio.as_completed(tasks):
    result = await task
    print('Task result: {}'.format(result))

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9912281036376953 '''

#本身栗子14 協程中止 【一】 main函數外進行事件循環的調用。這個時候,main至關於最外出的一個task,那麼處理包裝的main函數便可 ''' 上面見識了協程的幾種經常使用的用法,都是協程圍繞着事件循環進行的操做。future對象有幾個狀態:

Pending Running Done Cancelled 建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消。可使用asyncio.Task獲取事件循環的task'

啓動事件循環以後,立刻ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。而後經過循環asyncio.Task取消future。

'''

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

done,pending = await asyncio.wait(tasks)
for task in done:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print('*******************') print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() #True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常 finally: loop.close()

print('TIME: ',now() - start)

''' #不能再pycharm經過Ctrl+C,只能在Python交互環境裏 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}


True TIME: 2.0158370780944824 '''

#本身栗子15 協程中止 【二】 tasks在外層,沒有被包含在main函數裏面 import asyncio

import time

now = lambda: time.time() start = now() async def do_some_work(x): print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

start = now()

loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()

print('TIME: ', now() - start)

''' 打印四個True,而不是三個,緣由我也不知道 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>} True True True True TIME: 0.8858370780944824 '''

#本身栗子16 不一樣線程的時間循環 ''' 不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block。 啓動上述代碼以後,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法註冊的more_work方法,後者由於time.sleep操做是同步阻塞的,所以運行完畢more_work須要大體6 + 3 '''

from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

new_loop.call_soon_threadsafe(more_work,6) new_loop.call_soon_threadsafe(more_work,4)

''' More work 6 Finished more work 6 More work 4 Finished more work 4 '''

#本身栗子17: 新線程 協程 from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) print('Done after s'.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

asyncio.run_coroutine_threadsafe(do_some_work(6),new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4),new_loop)

''' Waiting 6 Waiting 4 Done after 4s Done after 6s '''

【aiohttp練習】

  1. 前言

本文翻譯自aiohttp的官方文檔,若有紕漏,歡迎指出。

aiohttp分爲服務器端和客戶端,本文只介紹客戶端。

另外我已經對 aiohttp 和 asyncio進行了封裝,能夠參考個人 github 地址:

https://github.com/web-trump/ahttp

因爲上下文的緣故,請求代碼必須在一個異步的函數中進行:

async def fn():

pass

  1. aiohttp安裝

pip3 install aiohttp

1.1. 基本請求用法

async with aiohttp.get('https://github.com') as r:
 
await r.text()

1
2

其中r.text(), 能夠在括號中指定解碼方式,編碼方式,例如

await resp.text(encoding='windows-1251')

1

或者也能夠選擇不編碼,適合讀取圖像等,是沒法編碼的

await resp.read()

2.發起一個session請求

首先是導入aiohttp模塊:

import aiohttp

而後咱們試着獲取一個web源碼,這裏以GitHub的公共Time-line頁面爲例:

async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) print(await resp.text())

上面的代碼中,咱們建立了一個 ClientSession 對象命名爲session,而後經過session的get方法獲得一個 ClientResponse 對象,命名爲resp,get方法中傳入了一個必須的參數url,就是要得到源碼的http url。至此便經過協程完成了一個異步IO的get請求。

有get請求固然有post請求,而且post請求也是一個協程:

session.post('http://httpbin.org/post', data=b'data')

用法和get是同樣的,區別是post須要一個額外的參數data,便是須要post的數據。

除了get和post請求外,其餘http的操做方法也是同樣的:

session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')

小記:

不要爲每次的鏈接都建立一次session,通常狀況下只須要建立一個session,而後使用這個session執行全部的請求。

每一個session對象,內部包含了一個鏈接池,而且將會保持鏈接和鏈接複用(默認開啓)能夠加快總體的性能。

3.在URL中傳遞參數

咱們常常須要經過 get 在url中傳遞一些參數,參數將會做爲url問號後面的一部分發給服務器。在aiohttp的請求中,容許以dict的形式來表示問號後的參數。舉個例子,若是你想傳遞 key1=value1 key2=value2 到 httpbin.org/get 你可使用下面的代碼:

params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as resp: assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

能夠看到,代碼正確的執行了,說明參數被正確的傳遞了進去。無論是一個參數兩個參數,仍是更多的參數,均可以經過這種方式來傳遞。除了這種方式以外,還有另一個,使用一個 list 來傳遞(這種方式能夠傳遞一些特殊的參數,例以下面兩個key是相等的也能夠正確傳遞):

params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面兩種,咱們也能夠直接經過傳遞字符串做爲參數來傳遞,可是須要注意,經過字符串傳遞的特殊字符不會被編碼:

async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert r.url == 'http://httpbin.org/get?key=value+1'

4.響應的內容

仍是以GitHub的公共Time-line頁面爲例,咱們能夠得到頁面響應的內容:

async with session.get('https://api.github.com/events') as resp: print(await resp.text())

運行以後,會打印出相似於以下的內容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,會自動將服務器端返回的內容進行解碼--decode,固然咱們也能夠自定義編碼方式:

await resp.text(encoding='gb2312')

除了text方法能夠返回解碼後的內容外,咱們也能夠獲得類型是字節的內容:

print(await resp.read())

運行的結果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzip和deflate轉換編碼已經爲你自動解碼。

小記:

text(),read()方法是把整個響應體讀入內存,若是你是獲取大量的數據,請考慮使用」字節流「(streaming response)

5.特殊響應內容:json

若是咱們獲取的頁面的響應內容是json,aiohttp內置了更好的方法來處理json:

async with session.get('https://api.github.com/events') as resp: print(await resp.json())

若是由於某種緣由而致使resp.json()解析json失敗,例如返回不是json字符串等等,那麼resp.json()將拋出一個錯誤,也能夠給json()方法指定一個解碼方式:

print(await resp.json(

encoding='gb2312'

)) 或者傳遞一個函數進去:

print(await resp.json( lambda(x:x.replace('a','b')) ))

6.以字節流的方式讀取響應內容

雖然json(),text(),read()很方便的能把響應的數據讀入到內存,可是咱們仍然應該謹慎的使用它們,由於它們是把整個的響應體所有讀入了內存。即便你只是想下載幾個字節大小的文件,但這些方法卻將在內存中加載全部的數據。因此咱們能夠經過控制字節數來控制讀入內存的響應內容:

async with session.get('https://api.github.com/events') as resp: await resp.content.read(10) #讀取前10個字節

通常地,咱們應該使用如下的模式來把讀取的字節流保存到文件中:

with open(filename, 'wb') as fd: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)

7.自定義請求頭

若是你想添加請求頭,能夠像get添加參數那樣以dict的形式,做爲get或者post的參數進行請求:

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'} headers = {'content-type': 'application/json'}

await session.post(url, data=json.dumps(payload), headers=headers)

8.自定義Cookie

給服務器發送cookie,能夠經過給 ClientSession 傳遞一個cookie參數:

url = 'http://httpbin.org/cookies' cookies = {'cookies_are': 'working'} async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == { "cookies": {"cookies_are": "working"}}

可直接訪問連接 「httpbin.org/cookies」查看當前cookie,訪問session中的cookie請見第10節。

9.post數據的幾種方式

(1)模擬表單post數據

payload = {'key1': 'value1', 'key2': 'value2'} async with session.post('http://httpbin.org/post', data=payload) as resp: print(await resp.text())

注意:data=dict的方式post的數據將被轉碼,和form提交數據是同樣的做用,若是你不想被轉碼,能夠直接以字符串的形式 data=str 提交,這樣就不會被轉碼。

(2)post json

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp: ...

其實json.dumps(payload)返回的也是一個字符串,只不過這個字符串能夠被識別爲json格式

(3)post 小文件

url = 'http://httpbin.org/post' files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)

能夠設置好文件名和content-type:

url = 'http://httpbin.org/post' data = FormData() data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')

await session.post(url, data=data)

若是將文件對象設置爲數據參數,aiohttp將自動以字節流的形式發送給服務器。

(4)post 大文件

aiohttp支持多種類型的文件以流媒體的形式上傳,因此咱們能夠在文件未讀入內存的狀況下發送大文件。

@aiohttp.streamer def file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(216) while chunk: yield from writer.write(chunk) chunk = f.read(216)

Then you can use file_sender as a data provider:

async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text())

同時咱們能夠從一個url獲取文件後,直接post給另外一個url,並計算hash值:

async def feed_stream(resp, stream): h = hashlib.sha256()

while True:
    chunk = await resp.content.readany()
    if not chunk:
        break
    h.update(chunk)
    stream.feed_data(chunk)

return h.hexdigest()

resp = session.get('http://httpbin.org/post') stream = StreamReader() loop.create_task(session.post('http://httpbin.org/post', data=stream))

file_hash = await feed_stream(resp, stream)

由於響應內容類型是StreamReader,因此能夠把get和post鏈接起來,同時進行post和get:

r = await session.get('http://python.org') await session.post('http://httpbin.org/post', data=r.content)

(5)post預壓縮數據

在經過aiohttp發送前就已經壓縮的數據, 調用壓縮函數的函數名(一般是deflate 或 zlib)做爲content-encoding的值:

async def my_coroutine(session, headers, my_data): data = zlib.compress(my_data) headers = {'Content-Encoding': 'deflate'} async with session.post('http://httpbin.org/post', data=data, headers=headers) pass

10.keep-alive, 鏈接池,共享cookie

ClientSession 用於在多個鏈接之間共享cookie:

async with aiohttp.ClientSession() as session: await session.get( 'http://httpbin.org/cookies/set?my_cookie=my_value') filtered = session.cookie_jar.filter_cookies('http://httpbin.org') assert filtered['my_cookie'].value == 'my_value' async with session.get('http://httpbin.org/cookies') as r: json_body = await r.json() assert json_body['cookies']['my_cookie'] == 'my_value'

也能夠爲全部的鏈接設置共同的請求頭:

async with aiohttp.ClientSession( headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session: async with session.get("http://httpbin.org/headers") as r: json_body = await r.json() assert json_body['headers']['Authorization'] ==
'Basic bG9naW46cGFzcw=='

ClientSession 還支持 keep-alive鏈接和鏈接池(connection pooling)

11.cookie安全性

默認ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受url和ip地址產生的cookie,只能接受 DNS 解析IP產生的cookie。能夠經過設置aiohttp.CookieJar 的 unsafe=True 來配置:

jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)

12.控制同時鏈接的數量(鏈接池)

也能夠理解爲同時請求的數量,爲了限制同時打開的鏈接數量,咱們能夠將限制參數傳遞給鏈接器:

conn = aiohttp.TCPConnector(limit=30)#同時最大進行鏈接的鏈接數爲30,默認是100,limit=0的時候是無限制

限制同時打開限制同時打開鏈接到同一端點的數量((host, port, is_ssl) 三的倍數),能夠經過設置 limit_per_host 參數:

conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0

13.自定義域名解析

咱們能夠指定域名服務器的 IP 對咱們提供的get或post的url進行解析:

from aiohttp.resolver import AsyncResolver

resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"]) conn = aiohttp.TCPConnector(resolver=resolver)

14.設置代理

aiohttp支持使用代理來訪問網頁:

async with aiohttp.ClientSession() as session: async with session.get("http://python.org", proxy="http://some.proxy.com") as resp: print(resp.status)

固然也支持須要受權的頁面:

async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp: print(resp.status)

或者經過這種方式來驗證受權:

session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

15.響應狀態碼 response status code

能夠經過 resp.status來檢查狀態碼是否是200:

async with session.get('http://httpbin.org/get') as resp: assert resp.status == 200

16.響應頭

咱們能夠直接使用 resp.headers 來查看響應頭,獲得的值類型是一個dict:

resp.headers {'ACCESS-CONTROL-ALLOW-ORIGIN': '*', 'CONTENT-TYPE': 'application/json', 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT', 'SERVER': 'gunicorn/18.0', 'CONTENT-LENGTH': '331', 'CONNECTION': 'keep-alive'}

或者咱們能夠查看原生的響應頭:

resp.raw_headers ((b'SERVER', b'nginx'), (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'), (b'CONTENT-TYPE', b'text/html; charset=utf-8'), (b'CONTENT-LENGTH', b'12150'), (b'CONNECTION', b'keep-alive'))

17.查看cookie

url = 'http://example.com/some/cookie/setting/url' async with session.get(url) as resp: print(resp.cookies)

18.重定向的響應頭

若是一個請求被重定向了,咱們依然能夠查看被重定向以前的響應頭信息:

resp = await session.get('http://example.com/some/redirect/') resp <ClientResponse(http://example.com/some/other/url/) [200]> resp.history (<ClientResponse(http://example.com/some/redirect/) [301]>,)

19.超時處理

默認的IO操做都有5分鐘的響應時間 咱們能夠經過 timeout 進行重寫:

async with session.get('https://github.com', timeout=60) as r: ...

若是 timeout=None 或者 timeout=0 將不進行超時檢查,也就是不限時長。

#18.4 改進asyncio下載腳本 #示例 18-7 flags2_asyncio.py:腳本的前半部分;餘下的代碼在示例 18-8 中

import os,time,sys import aiohttp from aiohttp import web import asyncio import async_timeout import collections from collections import namedtuple from enum import Enum from tqdm import tqdm

BASE_URL = 'https://images2018.cnblogs.com/blog/1239321/201808' POP20_CC1 = '1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'
'1239321-20180808065136786-868892759'
'1239321-20180808065146211-1880907820 1239321-20180808065155072-1392342345 1239321-20180808065222347-1439669487'
'1239321-20180808065232562-1454112423 1239321-20180808065246215-1857827340 1239321-20180808065301480-1707393818'
'1239321-20180808065312201-964077895 1239321-20180808065326211-1590046138 1239321-20180808065342568-448845'
'1239321-20180808065358869-366577464 1239321-20180808065410900-539910454 1239321-20180808065422695-222625730'
'1239321-20180808065430991-1182951067 1239321-20180808065437898-138307299 1239321-20180808065444387-1849567433'
'1239321-20180808065454537-30405473 1239321-20180808065506470-995044385 '.split() POP20_CC = 'aaaa1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(image,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) with open(path,'wb') as fp: fp.write(image)

async def get_flag(cc): url = '/.png'.format(BASE_URL,cc=cc)

async with aiohttp.ClientSession() as session:
    with async_timeout.timeout(3000):
        async with session.get(url,verify_ssl = False) as resp:
            #若是不加verify_ssl參數,則會報SSL錯誤,根源
            #是urllib或requests在打開https站點是會驗證證書

            #print(await resp.text())
            if b'PNG' in (await resp.read()):
                # 這裏不能用resp.status==404來判斷資源是否不存在,不是每一個網站返回結果的格式都是一致的。同時,也不能用'404' not in (await resp.text() 來判斷,由於若是資源存在,使用這個方法會報錯,'UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte'。
                # 因此只能用這種方式來判斷,這是試出來的,資源不存在時候這個值是191.
                # 這種判斷方法有點不太穩定,比較正規的的判斷方法待之後完善吧
                # 另一個辦法:if resp.status == 200 and ((await resp.read())[2] == 78):
                image = await resp.read()
                return image
            elif 'not found' in (await resp.text()) or 'not exist' in (await resp.text()):# 後來FetchError接到了,結果打印*** Error for T_JINGSE200: Not Found
                raise web.HTTPNotFound()
            else:

                raise aiohttp.HttpProcessingError(code = resp.status,message= resp.reason,headers = resp.headers) #後來FetchError接到了,結果打印*** Error for T_JINGSE200: module 'aiohttp' has no attribute

async def download_one(cc,semaphore,verbose): try: with (await semaphore):#在 yield from 表達式中把 semaphore 當成上下文管理器使用,防止阻塞整個系統:若是 semaphore 計數器的值是所容許的最大值,只有這個協程會阻塞。 image = await get_flag(cc) except Exception as exc:

raise FetchError(cc) from exc#引入的raise X from Y 句法連接原來的異常
else:
    save_flag(image,cc + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'
if verbose and msg:#若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print (cc,msg)
return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in cc_list] to_do_iter = asyncio.as_completed(to_do)#獲取一個迭代器,這個迭代器會在期物運行結束後返回期物 if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list))# 把迭代器傳給 tqdm 函數,顯示進度 for future in to_do_iter: #迭代運行結束的期物 try: res = await future except FetchError as exc: country_code = exc.country_code try: error_msg = exc.cause.args[0] #有的時候格式相似於("module 'aiohttp' has no attribute 'HttpProcessingError'",),此時就取元祖的第二個元素 #有的時候格式是相似於 (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:749)'),此時就取元祖的第一個元素 print (exc.cause.args) except IndexError: error_msg = exc.cause.class.name if not verbose and error_msg: msg = '****Error for :' print (msg.format(country_code,error_msg)) status = HTTPStatus.error

else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req)#download_many 函數只是實例化 downloader_coro 協程,而後經過run_until_complete 方法把它傳給事件循環 counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print (msg.format(count,elapsed))

main()

''' 返回正常的就下載到指定路徑 不正常的就報相應的錯誤 '''

18.4.2 使用Excutor對象,防止阻塞事件循環

''' 在示例 18-7 中,阻塞型函數是 save_flag。在這個腳本的線程版中(見示例 17- 14), save_flag 函數會阻塞運行 download_one 函數的線程,可是阻塞的只是衆多工 做線程中的一個。阻塞型 I/O 調用在背後會釋放 GIL,所以另外一個線程能夠繼續。可是在 flags2_asyncio.py 腳本中, save_flag 函數阻塞了客戶代碼與 asyncio 事件循環共用的惟 一線程,所以保存文件時,整個應用程序都會凍結。這個問題的解決方法是,使用事件循 環對象的 run_in_executor 方法。 asyncio 的事件循環在背後維護着一個 ThreadPoolExecutor 對象,咱們能夠調用 run_in_executor 方法,把可調用的對象發給它執行。若想在這個示例中使用這個功 能, download_one 協程只有幾行代碼須要改動 '''

栗子 異步下載,使用Executor對象,根上一個栗子相比,沒發現性能提高多少

from enum import Enum import os,sys,time import collections from collections import namedtuple import asyncio import async_timeout import aiohttp from aiohttp import web from tqdm import tqdm

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE200 T_JINGSE3'.split() DEST_DIR = 'download' MAX_WORKERS = 20

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) async with aiohttp.ClientSession() as session: with async_timeout.timeout(10000): async with session.get(url) as resp: if b'PNG' in (await resp.read()): image = await resp.read() return image elif '404' in (await resp.text()): raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError(code = resp.status,message=resp.reason,headers=resp.headers)

async def download_one(cc,semaphore,verbose): try: with (await semaphore): image = await get_flag(cc) except Exception as exc: raise FetchError(cc) from exc else: loop = asyncio.get_event_loop() loop.run_in_executor(None,save_flag,image,cc.lower()+'.PNG') status = HTTPStatus.ok msg = 'OK' if verbose and msg: print(cc,msg) return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in sorted(cc_list)] to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list)) for future in to_do_iter: try: res = await future

except FetchError as exc:
        country_code = exc.country_code
        try:
            error_msg = exc.__cause__.args[0]
            print(exc.__cause__.args)
        except IndexError:
            error_msg = exc.__cause__.__class__.__name__
        if not verbose and error_msg:
            msg = '*** Error for {} : {}'
            print(msg.format(country_code,error_msg))
        status = HTTPStatus.error
    else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req) counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=True,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

main()

#栗子 順序下載,把各個文件保存的字節數變成原來的 10 倍,不使用Executor對象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc.lower() + '.PNG') status = HTTPStatus.ok msg = 'OK'

if verbose:  #若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #備註1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 33.58s

if name == 'main': main(download_many)

栗子 順序下載,把各個文件保存的字節數變成原來的 10 倍,使用Executor對象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else:

#save_flag(image,cc.lower() + '.PNG')
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None,save_flag,image,cc.lower() + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'

if verbose:  #若是在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #備註1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #把各個文件保存的字節數變成原來的 10 倍(只需把fp.write(img) 改爲 fp.write(img*10)),此時便會看到效果 # Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 20.68s

if name == 'main': main(download_many)

"""

【生成器&生成器表達式&迭代器&可迭代對象&容器】 【概念】

#【【切片】】 L = ['Michael','Sarah','Tracy','Bob','Jack'] print(L[0:3]) #['Michael', 'Sarah', 'Tracy'] print(L[:3]) #['Michael', 'Sarah', 'Tracy'] print(L[1:3]) #['Sarah', 'Tracy'] print(L[-2:]) #['Bob', 'Jack'] print(L[-2:-1]) #['Bob']

#建立一個0~99的數列 L = list(range(100)) print(L) #取前10個數 print(L[:10]) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] #取後10個數 print(L[-10:]) #[90, 91, 92, 93, 94, 95, 96, 97, 98, 99] #前11~20個 print(L[10:20]) #[10, 11, 12, 13, 14, 15, 16, 17, 18, 19] #前10個,每2個取一個 print(L[:10:2]) #[0, 2, 4, 6, 8] #全部數,每5個取一個 print(L[::5]) #[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95] #什麼都不寫,原樣複製 print(L[:])

#tuple也是一種list,惟一的區別是tuple不可變。 tuple = (0,1,2,3,4,5) print(tuple[:3]) #(0, 1, 2)

#字符串也能夠切片 #練習題,利用切片,實現trim def trim(s): if len(s) == 0: return '' elif s[:1] == ' ': return trim(s[1:]) elif s[-1:] == ' ': return trim(s[:-1]) return s #測試 if trim('hello ') != 'hello': print('測試失敗!') elif trim(' hello') != 'hello': print('測試失敗!') elif trim(' hello ') != 'hello': print('測試失敗!') elif trim(' hello world ') != 'hello world': print('測試失敗!') elif trim('') != '': print('測試失敗!') elif trim(' ') != '': print('測試失敗!') else : print('測試成功!')

#【【迭代】】 #判斷一個對象是可迭代對象 from collections import Iterable print(isinstance('abc',Iterable)) #True print(isinstance([1,2,3],Iterable)) #True print(isinstance(123,Iterable)) #False

#若是要對list實現相似java那樣的下標循環? for i,value in enumerate(['A','B','C']): print(i,value) ''' 0 A 1 B 2 C '''

#在for循環裏,同時引用兩個變量 for x,y in [(1,1),(2,4),(3,9)]: print(x,y)

''' 1 1 2 4 3 9 '''

#練習題 使用迭代查找一個list中最小和最大值,並經過tuple 返回 #方法一 def findMinAndMax1(L): if L == None or len(L) == 0: return (None,None) my_min = L[0] my_max = L[0] for val in L: my_min = min(my_min,val) my_max = max(my_max,val) return (my_min,my_max) #方法二 def findMinAndMax2(L): if L == None or len(L) == 0: return(None,None) return (min(L),max(L)) print(findMinAndMax1(list(range(10)))) #(0, 9) print(findMinAndMax2(list(range(10)))) #(0, 9)

#【【列表生成式】】 #列出當前目錄下全部文件和目錄名 import os print([d for d in os.listdir('.')]) #['.idea', 'def.py', 'dict&set.py', 'list&tuple.py', 'senior_pro.py']

#使用兩個變量 dict = {'x':'A','y':'B','z':'C'} print([k + '=' + v for k,v in dict.items()]) #['x=A', 'y=B', 'z=C']

#把list中全部字符串變成小寫 L = ['Hello','World','IBM','Apple'] print([x.lower() for x in L]) #['hello', 'world', 'ibm', 'apple']

#【【生成器】】generator

方法一 :把一個列表生成式的[] 換成 ()

L = [x * x for x in range(3)] print(L) #[0, 1, 4] g = (x * x for x in range(3)) print(g) #<generator object at 0x102a2e360>

打印元素

print(next(g)) #0 print(next(g)) #1 print(next(g)) #4 #print(next(g)) #StopIteration

#【注意】上面的方法不實用,正常的方法是for循環,而且不關心StopIteration錯誤 g = (x * x for x in range(3)) for n in g: print(n) ''' 0 1 4 '''

generator 功能特別強大,若是用相似列表生成式的for循環沒法實現,能夠用函數

斐波拉契

def fib(max): n,a,b = 0,0,1 while n < max: print(b) a,b = b,a + b n = n + 1 return 'done' fib(5) ''' 1 1 2 3 5 ''' #要想把此函數變成generator,只須要把print換成yield def fib(max): n,a,b = 0,0,1 while n < max: yield b a,b = b,a + b n = n + 1 return 'done' f = fib(5) print(f) #<generator object fib at 0x101a2e360>

#【知識點】generator和函數的執行流程不同。函數是順序執行,遇到return語句或最後一行函數語句就返回。

而變成generator的函數,在每次調用next()時執行,遇到yield語句返回,再次執行時從上次返回的yield語句處繼續執行。

舉例 定義一個generator,依次返回1,3,5

def odd(): print('step 1:') yield 1 print('step 2:') yield 3 print('step 3:') yield 5 o = odd() print(next(o)) print(next(o)) print(next(o)) ''' step 1: 1 step 2: 3 step 3: 5 ''' #print(next(o)) #StopIteration #【解析】能夠看到,odd不是普通函數,而是generator,在執行的過程當中,遇到yield就中斷,下次又繼續執行。

執行三次yield後,已經沒有yield能夠執行了,因此第四次調用next(o)就會報錯。

#回到fib,咱們在循環過程當中不斷調用yield,就會不斷中斷。固然要給循環設置一個條件來退出循環,否則就會產生一個無限序列。 for n in fib(5): print(n) ''' 1 1 2 3 5 '''

#【注意】經過for循環,一般拿不到return值。若是想要拿到,必須捕獲StopIteration錯,返回值包含在StopIteration的value中 g = fib(5) while True: try: n = next(g) print('next g:', n) except StopIteration as e: print('Generator return value:',e.value) break ''' next g: 1 next g: 1 next g: 2 next g: 3 next g: 5 Generator return value: done '''

例子 楊輝三角

def trangles(): N = [1] while True: yield N N.append(0) N = [N[i-1] + N[i] for i in range(len(N))] n = 0 for t in trangles(): print(t) n = n + 1 if n == 10: break ''' [1] [1, 1] [1, 2, 1] [1, 3, 3, 1] [1, 4, 6, 4, 1] [1, 5, 10, 10, 5, 1] [1, 6, 15, 20, 15, 6, 1] [1, 7, 21, 35, 35, 21, 7, 1] [1, 8, 28, 56, 70, 56, 28, 8, 1] [1, 9, 36, 84, 126, 126, 84, 36, 9, 1] '''

#【小結】 ''' generator是很是強大的工具,python中,能夠簡單的把列表生成式改爲generator,也能夠經過函數實現複雜邏輯的generator。

generator的工做原理,在for循環的過程當中不斷計算出下一個元素,並在適當的條件結束for循環。 對於函數改爲的generator來講,遇到return語句或者執行到函數體的最後一行語句,就是結束generator的指令,以後for循環也隨之結束。

請注意區分普通函數和generator函數,普通函數調用直接返回結果。 generator的調用實際返回一個generator對象。 '''

#【【可迭代對象、容器 、迭代器、生成器等概念。】】

可迭代對象、容器 、迭代器、生成器,都是一種概念,並非一種數據結構。

容器是一系列元素的集合,能夠用來詢問某個元素是否包含在其中時,那麼這個對象就能夠認定是一個容器。

容器一般是一個可迭代對象,判斷的話,[知足兩個條件 1⃣️ 可 檢測某元素是否包含在容器中 2⃣️ 可迭代對象賦予容器的能力:從容器中獲取其中的每一個值]

儘管大多數容器提供了某種方式來獲取其中的每個元素,但這並非容器自己提供的能力,而是可迭代對象賦予了容器這種能力。

固然,並非全部容器都是可迭代的,好比Bloom filter, 雖然可用來檢測某元素是否包含在容器中,可是並不能從容器中獲取其中的每個值。由於Bloom filter 壓根就沒把元素存儲在容器中,而是經過散列函數映射成一個值保存在數組中。

可迭代對象:可直接做用與for循環的對象統稱爲可迭代對象-> Iterable.

可迭代對象實現了__iter__方法,該方法返回一個迭代器對象。

一類是絕大部分(特殊的,)容器,如str list set tuple dict file sockets 等等。

一類是generator

判斷方法:isinstance()

from collections import Iterable print(isinstance([],Iterable)) #True print(isinstance((),Iterable)) #True print(isinstance(,Iterable)) #True print(isinstance('abc',Iterable)) #True print(isinstance((x for x in range(10)),Iterable)) #True print(isinstance(100,Iterable)) #False #迭代器 from collections import Iterator print(isinstance((x for x in range(10)),Iterator)) #True print(isinstance([],Iterator)) #False print(isinstance(,Iterator)) #False print(isinstance('abc',Iterator)) #False

持有一個內部狀態的字段,用於記錄下次迭代返回值,實現了__next__ 和 iter 方法。

#凡是可做用於next()函數的對象都是Iterator類型,表示一個惰性計算的序列。 #迭代器不會一次性把全部元素加載到內存,而是須要的時候才能生成返回結果。 #集合數據類型如list str dict 等 是Iterable 但不是 Iterator,不過能夠經過iter()函數得到一個Iterator對象。 #生成器是一種特殊的迭代器,它的返回值不是經過return 而是 yield。 #【引伸知識點】python的for循環本質上是經過不斷調用next()函數實現的 for x in [1,2,3]: pass #等價於 #首先得到Iterator對象 it = iter([1,2,3]) #循環 while True: try: '''得到下一個值''' x = next(it) except StopIteration: # 遇到StopIteration 就退出循環 break

【舉例區分】

1⃣️【容器】

print(i in [1,2,3]) #True print(4 not in [1,2,3]) #True print(1 in {1,2,3}) #True print(4 not in {1,2,3}) #True print(1 in (1,2,3)) #True print(4 not in (1,2,3)) #True

d = {1:'foo',2:'bar',3:'qux'} print(1 in d) #True print('foo' not in d) #True

s = 'foobar' print('b' in s) #True print('x' not in s) #True print('foo' in s) #True

#2⃣️【可迭代對象】 x = [1,2,3] y = iter(x) z = iter(x) print(next(y)) #1 print(next(y)) #2 print(next(z)) #1 print(type(x)) #<class 'list'> print(type(y)) #<class 'list_iterator'>

#3⃣️【迭代器】 itertools 函數返回的都是迭代器對象

生成無限序列

from itertools import count counter = count(start=13) print(next(counter)) #13 print(next(counter)) #14 #從有限序列中生成無限序列 from itertools import cycle colors = cycle(['red','white','blue']) print(next(colors)) #red print(next(colors)) #white print(next(colors)) #blue print(next(colors)) #red #從無限序列中生成有限序列 from itertools import cycle from itertools import islice colors = cycle(['red','white','blue']) myslice = slice(4) #limits = colors[myslice] #TypeError: 'itertools.cycle' object is not subscriptable 【解析】就是不可用中括號下標的形式訪問元素

limits1 = islice(colors,0,6) list1 = [] print('****************') for x in limits1: print(x) list1.append(x) myslice = slice(4) print(list1[myslice]) ''' red white blue red white blue ['red', 'white', 'blue', 'red'] '''

#爲了更直觀

【循環方法】

#【一】生成器 【備註】這三種方法同時運行,同時有非空內容打印,是由於每次都生成一個新的生成器對象 def gen_123(): yield 1 yield 2 yield 3 print(gen_123) #<function gen_123 at 0x00000000037A3840> print(gen_123()) #<generator object gen_123 at 0x000000000379E780> #循環方法一 print(list(gen_123())) #[1, 2, 3] #循環方法二 for i in gen_123(): print(i) ''' 1 2 3 ''' #循環方法三 a = gen_123() print(next(a)) print(next(a)) print(next(a)) ''' 1 2 3 '''

#循環方法四:拆包 def t(): yield 1 yield 2 yield 3

x,y,z = t() print (x,y,z)

【二】生成器表達式 a = (format(a,'.3e') for a in (3.4444444,78.4534534)) #循環方法一 【備註】不把a賦值給g,next(a)一樣也能夠循環 g = a print(next(g)) #3.444e+00 print(next(g)) #7.845e+01 print(a) #<generator object at 0x000000000227F620> #循環方法二 [備註]方法二和方法一不能同時執行 for i in a: print(i) ''' 3.444e+00 7.845e+01 ''' #循環方法三
#print(list(a)) #['3.444e+00', '7.845e+01']

#循環方法四:拆包 a = (format(a,'.3e') for a in (3.444444,78.54899,987.345555)) a,b,c = a print (a,b,c)

【三】迭代器 【備註】這三個方法不能同時運行,只能選擇一種執行

一個帶狀態的對象,,他能在你調用next()方法的時候返回容器中的下一個值,任何實現了__iter__和__next__()(python2中實現next())方法的對象都是迭代器,__iter__返回迭代器自身,__next__返回容器中的下一個值,若是容器中沒有更多元素了,則拋出StopIteration異常

#因此,迭代器就是實現了工廠模式的對象,它在你每次你詢問要下一個值的時候給你返回。有不少關於迭代器的例子,好比itertools函數返回的都是迭代器對象。 import itertools print('*************') b = itertools.chain([1,2],(3,4)) print(b) #<itertools.chain object at 0x0000000002924C18> #循環方法一 print(next(b)) print(next(b)) print(next(b)) print(next(b)) ''' 1 2 3 4 ''' #循環方法二 for i in b: print(i) #循環方法三 print(list(b)) #[1, 2, 3, 4]

#循環方法四:拆包 import itertools a = itertools.chain([1,2],(3,)) a,b,c = a print (a,b,c)

【四】可迭代對象 #循環方法一:for循環

#循環方法二:拆包‘ a,b,c = [1,2,3] print (a,b,c)

【join函數】

【一】迭代器

b = itertools.chain([1,2],(3,)) print(','.join(str(a) for a in b)) #1,2,3 【二】可迭代對象 print(''.join(['5','6','7'])) #56*7 【三】生成器

def gen_123(): yield 1 yield 2 yield 3 print(','.join(str(a) for a in gen_123())) #1,2,3 【四】生成器表達式 b = (format(a,'.3e') for a in (3.4444444,78.4534534)) print(','.join(str(a) for a in b)) #3.444e+00,7.845e+01

相關文章
相關標籤/搜索