Python併發編程之實戰異步IO框架:asyncio 下篇(十一)

image.png

你們好,併發編程 進入第十一章。git

前面兩節,咱們講了協程中的單任務多任務github

這節咱們將經過一個小實戰,來對這些內容進行鞏固。redis


在實戰中,將會用到如下知識點:編程

  • 多線程的基本使用多線程

  • Queue消息隊列的使用併發

  • Redis的基本使用異步

  • asyncio的使用async

動態添加協程

在實戰以前,咱們要先了解下在asyncio中如何將協程態添加到事件循環中的。這是前提。ide

如何實現呢,有兩種方法:oop

  • 主線程是同步


import time
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
   # 一個在後臺永遠運行的事件循環
   asyncio.set_event_loop(loop)
   loop.run_forever()

def do_sleep(x, queue, msg=""):
   time.sleep(x)
   queue.put(msg)

queue = Queue()

new_loop = asyncio.new_event_loop()

# 定義一個線程,並傳入一個事件循環對象
t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 動態添加兩個協程
# 這種方法,在主線程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一個")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二個")

while True:
   msg = queue.get()
   print("{} 協程運行完..".format(msg))
   print(time.ctime())

因爲是同步的,因此總共耗時6+3=9秒.

輸出結果

Thu May 31 22:11:16 2018
第一個 協程運行完..
Thu May 31 22:11:22 2018
第二個 協程運行完..
Thu May 31 22:11:25 2018


  • 主線程是異步的,這是重點,必定要掌握。。

import time
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
   # 一個在後臺永遠運行的事件循環
   asyncio.set_event_loop(loop)
   loop.run_forever()

async def do_sleep(x, queue, msg=""):
   await asyncio.sleep(x)
   queue.put(msg)

queue = Queue()

new_loop = asyncio.new_event_loop()

# 定義一個線程,並傳入一個事件循環對象
t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 動態添加兩個協程
# 這種方法,在主線程是異步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一個"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二個"), new_loop)

while True:
   msg = queue.get()
   print("{} 協程運行完..".format(msg))
   print(time.ctime())

輸出結果

因爲是同步的,因此總共耗時max(6, 3)=6

Thu May 31 22:23:35 2018
第二個 協程運行完..
Thu May 31 22:23:38 2018
第一個 協程運行完..
Thu May 31 22:23:41 2018

實戰:利用redis實現動態添加任務

對於併發任務,一般是用生成消費模型,對隊列的處理可使用相似master-worker的方式,master主要用戶獲取隊列的msg,worker用戶處理消息。

爲了簡單起見,而且協程更適合單線程的方式,咱們的主線程用來監聽隊列,子線程用於處理隊列。這裏使用redis的隊列。主線程中有一個是無限循環,用戶消費隊列。

先安裝Redis
到 https://github.com/MicrosoftArchive/redis/releases下載

image.png

解壓到你的路徑。

image.png

而後,在當前路徑運行cmd,運行redis的服務端。

image.png

服務開啓後,咱們就能夠運行咱們的客戶端了。
並依次輸入key=queue,value=5,3,1的消息。

image.png

一切準備就緒以後,咱們就能夠運行咱們的代碼了。

import time
import redis
import asyncio
from queue import Queue
from threading import Thread

def start_loop(loop):
   # 一個在後臺永遠運行的事件循環
   asyncio.set_event_loop(loop)
   loop.run_forever()

async def do_sleep(x, queue):
   await asyncio.sleep(x)
   queue.put("ok")

def get_redis():
   connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
   return redis.Redis(connection_pool=connection_pool)

def consumer():
   while True:
       task = rcon.rpop("queue")
       if not task:
           time.sleep(1)
           continue
       asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)


if __name__ == '__main__':
   print(time.ctime())
   new_loop = asyncio.new_event_loop()

   # 定義一個線程,運行一個事件循環對象,用於實時接收新任務
   loop_thread = Thread(target=start_loop, args=(new_loop,))
   loop_thread.setDaemon(True)
   loop_thread.start()
   # 建立redis鏈接
   rcon = get_redis()

   queue = Queue()

   # 子線程:用於消費隊列消息,並實時往事件對象容器中添加新任務
   consumer_thread = Thread(target=consumer)
   consumer_thread.setDaemon(True)
   consumer_thread.start()

   while True:
       msg = queue.get()
       print("協程運行完..")
       print("當前時間:", time.ctime())

稍微講下代碼

loop_thread:單獨的線程,運行着一個事件對象容器,用於實時接收新任務。
consumer_thread:單獨的線程,實時接收來自Redis的消息隊列,並實時往事件對象容器中添加新任務。

輸出結果

Thu May 31 23:42:48 2018
協程運行完..
當前時間: Thu May 31 23:42:49 2018

協程運行完..
當前時間: Thu May 31 23:42:51 2018

協程運行完..
當前時間: Thu May 31 23:42:53 2018

咱們在Redis,分別發起了5s3s1s的任務。
從結果來看,這三個任務,確實是併發執行的,1s的任務最早結束,三個任務完成總耗時5s

運行後,程序是一直運行在後臺的,咱們每一次在Redis中輸入新值,都會觸發新任務的執行。。

相關文章
相關標籤/搜索