原創文章,做者:Damon付,如若轉載,請註明出處:《Python消息隊列工具 Python-rq 中文教程》http://www.tiangr.com/python-xiao-xi-dui-lie-python-rq-zhong-wen-jiao-cheng-2.htmlhtml
翻譯至python-rq官網 http://python-rq.org前端
安裝方法python
pip install rq
首先,須要運行一個Redis服務,你可使用一個已經存在的Redis,放置任務(jobs)至隊列(queues),你不須要作任何其餘特別的事情,僅僅須要定義你本身須要加入隊列中耗時的阻塞方法:web
import requests def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())
接着建立一個RQ隊列:redis
from redis import Redis from rq import Queue q = Queue(connection=Redis())
入列方法:數據庫
from my_module import count_words_at_url result = q.enqueue( count_words_at_url, 'http://nvie.com')
Worker進程 在後臺開始執行隊列中的方法須要開啓一個worker進程。安全
$ rq worker *** Listening for work on default Got count_words_at_url('http://nvie.com') from default Job result = 818 *** Listening for work on default
一個"任務"(job)是一個Python對象,表示在一個"工做"(worker)進程(後臺進程)中被異步調用的方法。任何Python方法均可以經過傳遞函數和其參數的引用值的方式被異步調用,這個過程稱做"入隊"(enqueueing)。異步
將任務放置入隊列,首先申明一個函數:socket
import requests def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())
注意到了嗎?此函數沒有任何特殊的地方,任何函數均可以將其放入RQ隊列。async
將 count_words_at_url
放入隊列,在後臺運行。
from rq import Queue from redis import Redis from somewhere import count_words_at_url # Tell RQ what Redis connection to use redis_conn = Redis() q = Queue(connection=redis_conn) # no args implies the default queue # Delay execution of count_words_at_url('http://nvie.com') job = q.enqueue(count_words_at_url, 'http://nvie.com') print job.result # => None # Now, wait a while, until the worker is finished time.sleep(2) print job.result # => 889
若是你想指定任務至特定的隊列,能夠如此定義隊列名稱:
q = Queue('low', connection=redis_conn) q.enqueue(count_words_at_url, 'http://nvie.com')
注意示例中的Queue('low')
,你可使用任意的名稱替代以獲取符合需求、具備擴展性的分佈式隊列任務模式。默認通用的隊列命名方式(e.g. high, medium, low)能夠區分隊列任務的優先級。
此外,你能夠添加一些額外的參數來控制隊列任務。默認狀況下,有以下鍵值對參數能夠傳遞給任務方法:
timeout
, 指定任務的最大運行時間,超時將被丟棄。result_ttl
, 指定保存的任務結果過時時間。ttl
, 指定任務排隊的最大時間,超時將被取消。depends_on
, 指定任務對應所需執行的依賴任務(或者job id),必須完成依賴任務再執行指定任務。job_id
, 爲任務手動添加一個job_id標識。at_front
, 將任務放置在隊列的最前端而不是最後。kwargs
和 args
, 繞開這些自動彈出的參數 ie: 爲潛在的任務方法指定一個timeout參數。最後,建議使用更明晰的方法.enqueue_call()
來取代.enqueue()
q = Queue('low', connection=redis_conn) q.enqueue_call(func=count_words_at_url, args=('http://nvie.com',), timeout=30)
有些狀況下,web進程沒法進入運行在工做(worker) 中的源碼(i.e. X中的代碼須要調用一個Y中延遲函數),你能夠經過字符串引用傳遞函數。
q = Queue('low', connection=redis_conn) q.enqueue('my_package.my_module.my_func', 3, 4)
除了任務入列,隊列(queues)還包含了一些其它有用的方法:
from rq import Queue from redis import Redis redis_conn = Redis() q = Queue(connection=redis_conn) # Getting the number of jobs in the queue print len(q) # Retrieving jobs queued_job_ids = q.job_ids # Gets a list of job IDs from the queue queued_jobs = q.jobs # Gets a list of enqueued job instances job = q.fetch_job('my_id') # Returns job having ID "my_id"
RQ設計原理
使用RQ的過程當中,你你不須要提早建立任何的隊列,你也不須要指定任何使用渠道,數據交換規則,路由規則等。你只須要將你的任務放入到任何你想要放入的隊列中,一旦你將一個任務入列至一個還未存在的隊列中,它將迅速被建立。
RQ沒有使用任何中間人來指定消息的位置,你可能認爲這是優點也可能任務這是不合理的,這取決與你想要解決的具體問題。
最後,RQ沒有使用簡明的協議,由於它依據的是Python自帶的pickle模塊來序列化工做任務。
當任務入列,queue.enqueue()
方法將會返回一個Job
實例,這僅僅是一個用於檢測實際任務運行結果的代理對象。
爲此,它擁有一個方便的結果訪問屬性,在任務尚未完成時將返回None,或者當任務完成時返回一個非空值(假設此任務第一時間返回了值)。
若是你想使用相似Celery的代碼風格,你可能須要使用@task裝飾器。RQ>=3.0版本將擁有相似的裝飾器:
from rq.decorators import job @job('low', connection=my_redis_conn, timeout=5) def add(x, y): return x + y job = add.delay(3, 4) time.sleep(1) print job.result
爲了方便測試,你能夠入列一個任務而不須要綁定一個實際運行的工做(worker)進程(RQ >= 0.3.1 可用)。爲了實現此功能,你能夠在隊列構造器中傳遞參數async=False
.
>>> q = Queue('low', async=False) >>> job = q.enqueue(fib, 8) >>> job.result 21
以上代碼將在同一進程中同步執行函數fib(8)
而不須要任何一個激活的工做(worker)進程。相似於Celery中的ALWAYS_EAGER。
RQ 0.4.0版本的新特性,能夠用來管理多任務直接的依賴關係。使用depends_on
參數實現執行一個依賴於另外一個任務的Job。
q = Queue('low', async=False) report_job = q.enqueue(generate_report) q.enqueue(send_report, depends_on=report_job) The ability to handle job dependencies allows you to split a big job into several smaller ones. A job that is dependent on another is enqueued only when its dependency finishes successfully.
(略)
技術上來講,你能夠放置任何Python方法到隊列,但這不意味着你這樣作是明智的,有些因素你在入列任務前必須考慮:
__module__
。這意味着你不能入列申明在__main__
模塊中的任務方法。RQ 的工做進程依賴系統的fork()
方法,這意味着Windows下沒法運行。
Worker是一個運行在後臺的Python進程,用來執行一些你不想要在web進程中執行的冗長或者阻塞任務。
從項目的root目錄下開啓一個worker進程:
$ rq worker high normal low *** Listening for work on high, normal, low Got send_newsletter('me@nvie.com') from default Job ended normally without result *** Listening for work on high, normal, low ...
工做進程將無限循環讀取給定隊列(順序很是重要)中的任務,並在全部任務方法執行完畢後繼續等待新的任務。
每個工做進程一次將只執行一個任務,在一個worker進程中,不會並行處理任務,若是你想要並行執行任務,你只須要開啓更多的worker進程。
默認狀況下,工做進程將當即運行而且在運行完全部任務後一直阻塞直至新的任務產生。Worker進程一樣可使用突發模式運行,此模式將完成全部隊列中的任務,而且在全部給定的隊列執行完畢後退出。
$ rq worker --burst high normal low *** Listening for work on high, normal, low Got send_newsletter('me@nvie.com') from default Job ended normally without result No more work, burst finished. Registering death.
這對批量執行須要週期性執行的任務或者大幅升高暫時性worker進程的峯值來講十分有用。
Worker進程生命週期
StartedJobRegistry
。idle
,而且依據result_ttl
設置任務和任務執行結果至過時。Job任務將從 StartedJobRegistry
裏移除,並在成功執行以後新增至 FinishedJobRegistry
,失敗後新增至 FailedQueue
。基本上,rq worker
進程腳本是一個簡單 獲取-Fork-執行的循環
。當大量任務作了冗長的啓動設置,或者他們全都依賴於一樣的模塊,你在執行每一個任務時都將花費大量額外的時間(由於你在fork
進程以後才進行import
操做)。這種方式很簡潔,並且RQ也所以不會出現內存泄漏的狀況,但與此同時帶來的負面影響是執行效率下降了。
對此,你可以採用在fork
進程以前先import
引入必要模塊的方式來提升生產力。RQ目前沒有提供能夠採起這種設置的參數,可是你能夠在你的worker
進程進行循環以前先作import
導入。
你能夠自定義本身的worker
腳本(替代掉原來使用的rq進程).一個簡單的實例:
#!/usr/bin/env python import sys from rq import Connection, Worker # Preload libraries import library_that_you_want_preloaded # Provide queue names to listen to as arguments to this script, # similar to rq worker with Connection(): qs = sys.argv[1:] or ['default'] w = Worker(qs) w.work()
Workers
進程命名方式默認等於當前的hostname
與當前PID
鏈接。覆蓋默認設置,能夠在開始worker
進程時指定一個 --name
選項
任什麼時候候,worker
進程收到SIGINT
(via Ctrl+C
) 或者 SIGTERM
(via kill
)信號,worker
進程將會等待當前正在執行任務完成工做後,再關閉循環,將其註冊入死亡進程。
若是是在關閉進程階段,再次收到SIGINT
或者 SIGTERM
,worker
進程將強制性使子進程中斷(發送SIGKILL
),但依然會嘗試將其註冊入死亡進程。
0.3.2
版本中的新特性
若是你想要經過配置文件而不是命令行參數來配置rq進程,你能夠建立一個名爲settings.py
的Python文件:
REDIS_URL = 'redis://localhost:6379/1' # You can also specify the Redis DB to use # REDIS_HOST = 'redis.example.com' # REDIS_PORT = 6380 # REDIS_DB = 3 # REDIS_PASSWORD = 'very secret' # Queues to listen on QUEUES = ['high', 'normal', 'low'] # If you're using Sentry to collect your runtime exceptions, you can use this # to configure RQ for it in a single step SENTRY_DSN = 'http://public:secret@example.com/1'
圖上示例展現了全部可用的配置選項。 注意:QUEUES
和REDIS_PASSWORD
設置在0.3.3
之後的版本才存在。 指定worker
進程讀取配置文件的路徑使用 -c
參數:
$ rq worker -c settings
版本0.4.0 的新特性
There are times when you want to customize the worker's behavior. Some of the more common requests so far are:
os.fork
的模型.multiprocessing
或者 gevent
. 使用 -w 參數指定 worker類路徑:$ rq worker -w 'path.to.GeventWorker'
將來開放
You can tell the worker to use a custom class for jobs and queues using --job-class and/or --queue-class.
$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'
Don't forget to use those same classes when enqueueing the jobs.
For example:
from rq import Queue from rq.job import Job class CustomJob(Job): pass class CustomQueue(Queue): job_class = CustomJob queue = CustomQueue('default', connection=redis_conn) queue.enqueue(some_func)
版本 0.5.5 的新特性
若是你想根據不一樣類型的任務來決定對應的異常操做,或者僅僅想重寫異常處理,能夠經過--exception-handler
選項:
$ rq worker --exception-handler 'path.to.my.ErrorHandler' # Multiple exception handlers is also supported $ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'
入列任務是延遲函數的調用,也就是說咱們正在解決一個問題,可是須要等待一會纔回來執行。
Python 方法若是有返回值,若是任務返回一個非空的值,worker進程將返回值寫入Redis所存的任務對應Hash下,TTL默認的過時時間爲任務結束後的500s。
The party that enqueued the job gets back a Job instance as a result of the enqueueing itself. Such a Job object is a proxy object that is tied to the job's ID, to be able to poll for results.
返回結果寫入Redis時伴有一個有限的生存時間,能夠避免Redis數據庫數據無限增加。
RQ >= 0.3.1時,TTL的值能夠在調用 enqueue_call()
時經過使用 result_ttl
關鍵詞參數指定。它一樣能夠禁用過時,這時你須要手動清理數據。
q.enqueue_call(func=foo) # result expires after 500 secs (the default) q.enqueue_call(func=foo, result_ttl=86400) # result expires after 1 day q.enqueue_call(func=foo, result_ttl=0) # result gets deleted immediately q.enqueue_call(func=foo, result_ttl=-1) # result never expires--you should delete jobs manually
此外,你可使用來繼續執行一些沒有返回值,默認會當即刪除的已完成任務,
q.enqueue_call(func=func_without_rv, result_ttl=500) # job kept explicitly
一般,任務能夠在失敗後拋出異常,RQ將用如下方式處理:
失敗的任務須要關注而且失敗任務的返回值不該該設置過時時間。更進一步,失敗的任務須要再次運行測試。通常這些事情須要手動操做,由於RQ自己是沒法字典或者可靠地自動判斷任務從新執行是否安全。
當一個異常在任務中拋出,worker進程能夠獲取獲得,並將其序列化,以鍵爲exc_info
的hash存儲在任務對應的Redis下。任務的引用隨即被置於失敗隊列中。
任務自己擁有一些十分有用的屬性幫助檢測:
這些能夠幫助你檢測和手動追查問題,而且再次提交任務。
當worker
進程被 Ctrl+C
or kill
中斷,RQ
幾乎不會丟失任務。當前任務完成後,worker進程將其它未執行的任務終止。
然而,worker
進程可使用kill -9
強制終止,不過,這種方式worker
進程沒法優雅地結束任務或者將任務放入失敗隊列中。所以,強制關閉一個worker
進程可能引發潛在的問題。
默認任務執行時間爲180s,若是逾期未執行完畢,worker
進程將終止主進程而且將任務放入失敗隊列,並標識其超時。
若是一個任務須要更多(或更少)時間來完成,默認的超時時間也將改變,你能夠在調用Queue.enqueue()
時經過參數指定:
q = Queue() q.enqueue(func=mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 mins You can also change the default timeout for jobs that are enqueued via specific queue instances at once, which can be useful for patterns like this: # High prio jobs should end in 8 secs, while low prio # work may take up to 10 mins high = Queue('high', default_timeout=8) # 8 secs low = Queue('low', default_timeout=600) # 10 mins # Individual jobs can still override these defaults low.enqueue_call(really_really_slow, timeout=3600) # 1 hr Individual jobs can still specify an alternative timeout, as workers will respect these.
在一些狀況下,你可能須要進入當前的任務ID或者任務方法的實例,或者爲任務存於任意數據。
版本 0.3.3 的新特性
由於任務方法和Python方法本質是同樣的,你能夠向RQ 獲取當前任務ID:
from rq import get_current_job def add(x, y): job = get_current_job() print 'Current job: %s' % (job.id,) return x + y
版本 0.3.3 的新特性
爲了給任務增長/更新自定義的狀態信息,須要操做任務的meta屬性,
import socket def add(x, y): job = get_current_job() job.meta['handled_by'] = socket.gethostname() job.save() return x + y
版本 0.4.7 的新特性
一個任務擁