寫在前面:編程
asyncio 初學者可能較難理解,能夠結合我前面的幾篇文章一塊兒食用:微信
原視頻爲 PyCon2019上的一場技術分享,做者是 Spotify的工程師, 經過一個案例, 介紹了 asyncio 的一些 best practice。併發
併發地 publish message:注意圖中高亮的那一段,這裏用的不是await queue.put(msg)
,這是由於await
會阻塞while
循環(參考前面的一篇文章:異步編程 101:asyncio中的 for 循環),也就是說,要等queue.put(msg)
完成了,下一趟纔會開始。而asyncio.create_task()
會立刻 "fire" 而且當即返回,你能夠理解爲fire and forget machinism。異步
若是你沒時間看異步編程 101:asyncio中的 for 循環,我簡要回顧一下:
await
作的事情是把當前的協程掛起,把控制權交給事件循環,以便於事件循環有其餘協程能夠調度時,接着運行其餘協程。可是對於執行await
的這個協程而言,它是被阻塞的。這個例子中,publish()
中的while
循環是一個總體。async
這裏使用msg = await queue.get()
是make sense 的,由於你得先獲得 message 而後才能接着作其餘事情。然後面的restart_host
則用create_task
,由於咱們不想對他await
(等待它完成)而阻塞了整個 while Ture
循環。ide
收到 message 以後,除了restart_host()
以外,咱們可能還須要作一些其餘的任務,好比持久化保存message
。異步編程
這隻須要在consume
方法裏面再添加一個create_task()
:函數
然而有時候咱們是但願異步任務可以serial
執行的。若是要把restart_host()
的邏輯改一下:先獲取上次重啓時間,而後判斷上次重啓時間是否是大於7天,若是是,再 restart_host()
。這裏的last_restart_date()
和restart_host()
是有明確前後順序的。post
可是咱們又不想這裏的線性執行影響後面的 message 獲取,很簡單,只要把這個邏輯封裝成一個協程,而後create_task()
就行。性能
須要對message
ack
,這樣producer
纔不會從新發送。因此如今處理消息的邏輯以下:
須要保證:save
和restart_host
所有完成以前,才能cleanup
。
使用await
是可以 work 的,可是性能確定不夠。
因此asyncio.gather()
就派上用場了:這裏把save()
和restart_host()
兩個協程交給gather
,並傳給它一個callback
,等兩個任務所有完成以後調用callback
函數,也就是cleanup()
。
若是不想用 callback
,也能夠直接await
gather
,這樣的 code 更加 clean:
最後把程序跑一下,圖中不一樣顏色表示的是同一個 message:
save
和restart_host
所有結束以後,才ack
message。把publish
和consume
組合起來,獲得最後的main()
。
不是說你在原來代碼的基礎上加上async
、await
就能得到併發性能的,極可能你的異步代碼仍是 sequantial 的。
一些有前後順序的操做,不意味着就必定是要 block 的。好比先作A 在作 B,在等待 A 完成的過程當中,我能夠抽出時間作 C。放到本文的例子來講,我須要先save
和restart_host
才能cleanup
,但我能夠在等待save
和restart_host
的時候繼續作其餘事情:把控制權交給主事件循環,接着運行其餘協程。
await
的做用是什麼?asyncio.create_task()
asyncio.gather()
若是你像我同樣真正熱愛計算機科學,喜歡研究底層邏輯,歡迎關注個人微信公衆號: