最近在用 sanic
寫東西,全部涉及到IO阻塞的代碼都須要用 aio
的模塊,好在近年來 asyncio
生態圈發展的還算不錯,該有的都有 ~
近期業務中 登陸/註冊
業務涉及的很複雜(涉及到邀請),須要解鎖、發送短信等操做,想來這麼個模塊整的很繁瑣,之後加個滑動驗證那還了得。
因而乎,想整一個相似於celery
的模塊,進行任務解耦,可是目前 celery
還目前不支持異步(官方將在 celery5
支持異步)。
因此目前查閱資料發現了一個 python 實現的 arq
模塊,已經應用在了生產環境,效果還算不錯 ~
官方是這麼介紹它的:python
首先先安裝一下它:redis
$ pip install arq
那麼接下來,快速瞭解下它的使用吧 ~api
先看下面編寫的這段代碼異步
# filename: tasks.py #! /usr/bin/env python # -*- coding: utf-8 -*- # Date: 2019/5/23 import asyncio from arq import create_pool from arq.connections import RedisSettings async def say_hello(ctx, name) -> None: """任務函數 Parameters ---------- ctx: dict 工做者上下文 name: string Returns ------- dict """ print(ctx) print(f"Hello {name}") async def startup(ctx): print("starting...") async def shutdown(ctx): print("ending...") async def main(): # 建立 redis = await create_pool(RedisSettings(password="root123456")) # 分配任務 await redis.enqueue_job('say_hello', name="liuzhichao") # WorkerSettings定義了建立工做時要使用的設置, # 它被arq cli使用 class WorkerSettings: # 隊列使用 `redis` 配置, 能夠配置相關參數 # 例如個人密碼是 `rooot123456` redis_settings = RedisSettings(password="root123456") # 被監聽的函數 functions = [say_hello] # 開啓 `worker` 運行 on_startup = startup # 終止 `worker` 後運行 on_shutdown = shutdown if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
一、接下來看咱們怎麼運行它async
$ arq tasks.WorkerSettings Maybe you can see 10:56:25: Starting worker for 1 functions: say_hello 10:56:25: redis_version=4.0.1 mem_usage=32.00M clients_connected=6 db_keys=19189 starting...
二、運行 tasks.py
文件函數
$ python3 tasks.py Maybe you can see 11:01:04: 0.29s → 5a5ac0edd5ad4b318b9848637b1ae800:say_hello(name='liuzhichao') {'redis': <ArqRedis <ConnectionsPool [db:0, size:[1:10], free:1]>>, 'job_id': '5a5ac0edd5ad4b318b9848637b1ae800', 'job_try': 1, 'enqueue_time': datetime.datetime(2019, 5, 23, 3, 1, 4, 570000), 'score': 1558580464570} Hello liuzhichao 11:01:04: 0.00s ← 5a5ac0edd5ad4b318b9848637b1ae800:say_hello ●
三、那麼這個簡單任務就執行完成了,是否是特別簡單 ~oop
#! /usr/bin/env python # -*- coding: utf-8 -*- # Date: 2019/5/23 from arq import cron from arq.connections import RedisSettings async def run_regularly(ctx): # 表示在 十、十一、12 分 50秒的時候打印 print('run job at 26:05, 27:05 and 28:05') class WorkerSettings: redis_settings = RedisSettings(password="root123456") cron_jobs = [ cron(run_regularly, minute={10, 11, 12}, second=50) ]
一、運行它學習
$ arq tasks.WorkerSettings If run out of the time,maybe you can see 11:10:25: Starting worker for 1 functions: cron:run_regularly 11:10:25: redis_version=4.0.1 mem_usage=32.00M clients_connected=6 db_keys=19190 11:10:51: 0.51s → cron:run_regularly() run foo job at 26:05, 27:05 and 28:05 11:10:51: 0.00s ← cron:run_regularly ● 11:11:51: 0.51s → cron:run_regularly() run foo job at 26:05, 27:05 and 28:05 11:11:51: 0.00s ← cron:run_regularly ● 11:12:50: 0.50s → cron:run_regularly() run foo job at 26:05, 27:05 and 28:05 11:12:50: 0.00s ← cron:run_regularly ● 按照此時間線,而後會一直進行無限循環下去
更多api學習請查看官方文檔 --> https://arq-docs.helpmanual.iocode