異步編程 101:asyncio 進階上篇

寫在前面:編程

asyncio 初學者可能較難理解,能夠結合我前面的幾篇文章一塊兒食用:微信

0x01 本文簡介

原視頻爲 PyCon2019上的一場技術分享,做者是 Spotify的工程師, 經過一個案例, 介紹了 asyncio 的一些 best practice。併發

0x02 初始化 setup

concurrently publish messages

併發地 publish message:注意圖中高亮的那一段,這裏用的不是await queue.put(msg),這是由於await會阻塞while循環(參考前面的一篇文章:異步編程 101:asyncio中的 for 循環),也就是說,要等queue.put(msg)完成了,下一趟纔會開始。而asyncio.create_task()會立刻 "fire" 而且當即返回,你能夠理解爲fire and forget machinism異步

若是你沒時間看異步編程 101:asyncio中的 for 循環,我簡要回顧一下:await作的事情是把當前的協程掛起,把控制權交給事件循環,以便於事件循環有其餘協程能夠調度時,接着運行其餘協程。可是對於執行await的這個協程而言,它是被阻塞的。這個例子中,publish()中的while循環是一個總體。async

concurrently consume messages

這裏使用msg = await queue.get()是make sense 的,由於你得先獲得 message 而後才能接着作其餘事情。然後面的restart_host則用create_task,由於咱們不想對他await(等待它完成)而阻塞了整個 while Ture循環。ide

concurrent work

收到 message 以後,除了restart_host()以外,咱們可能還須要作一些其餘的任務,好比持久化保存message異步編程

這隻須要在consume方法裏面再添加一個create_task()函數

block when needed

然而有時候咱們是但願異步任務可以serial執行的。若是要把restart_host()的邏輯改一下:先獲取上次重啓時間,而後判斷上次重啓時間是否是大於7天,若是是,再 restart_host()。這裏的last_restart_date()restart_host()是有明確前後順序的。post

可是咱們又不想這裏的線性執行影響後面的 message 獲取,很簡單,只要把這個邏輯封裝成一個協程,而後create_task()就行。性能

0x03 cleanup

須要對message ack,這樣producer纔不會從新發送。因此如今處理消息的邏輯以下:

須要保證:saverestart_host所有完成以前,才能cleanup

使用await是可以 work 的,可是性能確定不夠。

因此asyncio.gather()就派上用場了:這裏把save()restart_host()兩個協程交給gather,並傳給它一個callback,等兩個任務所有完成以後調用callback函數,也就是cleanup()

若是不想用 callback,也能夠直接await gather,這樣的 code 更加 clean:

最後把程序跑一下,圖中不一樣顏色表示的是同一個 message:

  • 獲取 message 是沒有阻塞的
  • saverestart_host所有結束以後,才ack message。

0x04 graceful shutdowns

publishconsume組合起來,獲得最後的main()

0x05 總結一下

  1. Asynchronous != concurrent

不是說你在原來代碼的基礎上加上asyncawait就能得到併發性能的,極可能你的異步代碼仍是 sequantial 的。

  1. Serial != blocking

一些有前後順序的操做,不意味着就必定是要 block 的。好比先作A 在作 B,在等待 A 完成的過程當中,我能夠抽出時間作 C。放到本文的例子來講,我須要先saverestart_host才能cleanup,但我能夠在等待saverestart_host的時候繼續作其餘事情:把控制權交給主事件循環,接着運行其餘協程。

  1. 回顧一下知識點
  • await的做用是什麼?
  • asyncio.create_task()
  • asyncio.gather()

若是你像我同樣真正熱愛計算機科學,喜歡研究底層邏輯,歡迎關注個人微信公衆號:

相關文章
相關標籤/搜索