Python消息隊列工具 Python-rq 中文教程

原創文章,做者:Damon付,如若轉載,請註明出處:《Python消息隊列工具 Python-rq 中文教程》http://www.tiangr.com/python-xiao-xi-dui-lie-python-rq-zhong-wen-jiao-cheng-2.htmlhtml

翻譯至python-rq官網 http://python-rq.org前端

十分鐘入門

安裝方法python

pip install rq

首先,須要運行一個Redis服務,你可使用一個已經存在的Redis,放置任務(jobs)至隊列(queues),你不須要作任何其餘特別的事情,僅僅須要定義你本身須要加入隊列中耗時的阻塞方法:web

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

接着建立一個RQ隊列:redis

from redis import Redis
from rq import Queue

q = Queue(connection=Redis())

入列方法:數據庫

from my_module import count_words_at_url
result = q.enqueue(
             count_words_at_url, 'http://nvie.com')

Worker進程 在後臺開始執行隊列中的方法須要開啓一個worker進程。安全

$ rq worker
*** Listening for work on default
Got count_words_at_url('http://nvie.com') from default
Job result = 818
*** Listening for work on default

文檔

隊列

一個"任務"(job)是一個Python對象,表示在一個"工做"(worker)進程(後臺進程)中被異步調用的方法。任何Python方法均可以經過傳遞函數和其參數的引用值的方式被異步調用,這個過程稱做"入隊"(enqueueing)。異步

任務入隊(enqueueing jobs)

將任務放置入隊列,首先申明一個函數:socket

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

注意到了嗎?此函數沒有任何特殊的地方,任何函數均可以將其放入RQ隊列。async

將 count_words_at_url 放入隊列,在後臺運行。

from rq import Queue
from redis import Redis
from somewhere import count_words_at_url

# Tell RQ what Redis connection to use
redis_conn = Redis()
q = Queue(connection=redis_conn)  # no args implies the default queue

# Delay execution of count_words_at_url('http://nvie.com')
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print job.result   # => None

# Now, wait a while, until the worker is finished
time.sleep(2)
print job.result   # => 889

若是你想指定任務至特定的隊列,能夠如此定義隊列名稱:

q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')

注意示例中的Queue('low'),你可使用任意的名稱替代以獲取符合需求、具備擴展性的分佈式隊列任務模式。默認通用的隊列命名方式(e.g. high, medium, low)能夠區分隊列任務的優先級。

此外,你能夠添加一些額外的參數來控制隊列任務。默認狀況下,有以下鍵值對參數能夠傳遞給任務方法:

  • timeout , 指定任務的最大運行時間,超時將被丟棄。
  • result_ttl , 指定保存的任務結果過時時間。
  • ttl , 指定任務排隊的最大時間,超時將被取消。
  • depends_on , 指定任務對應所需執行的依賴任務(或者job id),必須完成依賴任務再執行指定任務。
  • job_id , 爲任務手動添加一個job_id標識。
  • at_front , 將任務放置在隊列的最前端而不是最後。
  • kwargs 和 args , 繞開這些自動彈出的參數 ie: 爲潛在的任務方法指定一個timeout參數。

最後,建議使用更明晰的方法.enqueue_call()來取代.enqueue()

q = Queue('low', connection=redis_conn)
q.enqueue_call(func=count_words_at_url,
               args=('http://nvie.com',),
               timeout=30)

有些狀況下,web進程沒法進入運行在工做(worker) 中的源碼(i.e. X中的代碼須要調用一個Y中延遲函數),你能夠經過字符串引用傳遞函數。

q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4)

隊列的使用

除了任務入列,隊列(queues)還包含了一些其它有用的方法:

from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn) 

# Getting the number of jobs in the queue
print len(q)

# Retrieving jobs
queued_job_ids = q.job_ids # Gets a list of job IDs from the queue
queued_jobs = q.jobs # Gets a list of enqueued job instances
job = q.fetch_job('my_id') # Returns job having ID "my_id"

RQ設計原理

使用RQ的過程當中,你你不須要提早建立任何的隊列,你也不須要指定任何使用渠道,數據交換規則,路由規則等。你只須要將你的任務放入到任何你想要放入的隊列中,一旦你將一個任務入列至一個還未存在的隊列中,它將迅速被建立。

RQ沒有使用任何中間人來指定消息的位置,你可能認爲這是優點也可能任務這是不合理的,這取決與你想要解決的具體問題。

最後,RQ沒有使用簡明的協議,由於它依據的是Python自帶的pickle模塊來序列化工做任務。

延遲結果

當任務入列,queue.enqueue() 方法將會返回一個Job實例,這僅僅是一個用於檢測實際任務運行結果的代理對象。

爲此,它擁有一個方便的結果訪問屬性,在任務尚未完成時將返回None,或者當任務完成時返回一個非空值(假設此任務第一時間返回了值)。

@job 裝飾器

若是你想使用相似Celery的代碼風格,你可能須要使用@task裝飾器。RQ>=3.0版本將擁有相似的裝飾器:

from rq.decorators import job

@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

job = add.delay(3, 4)
time.sleep(1)
print job.result
繞過workers

爲了方便測試,你能夠入列一個任務而不須要綁定一個實際運行的工做(worker)進程(RQ >= 0.3.1 可用)。爲了實現此功能,你能夠在隊列構造器中傳遞參數async=False.

>>> q = Queue('low', async=False)
>>> job = q.enqueue(fib, 8)
>>> job.result
21

以上代碼將在同一進程中同步執行函數fib(8)而不須要任何一個激活的工做(worker)進程。相似於Celery中的ALWAYS_EAGER。

依賴任務

RQ 0.4.0版本的新特性,能夠用來管理多任務直接的依賴關係。使用depends_on參數實現執行一個依賴於另外一個任務的Job。

q = Queue('low', async=False)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)
The ability to handle job dependencies allows you to split a big job into several smaller ones. A job that is dependent on another is enqueued only when its dependency finishes successfully.
工做進程

(略)

任務(jobs)的注意事項

技術上來講,你能夠放置任何Python方法到隊列,但這不意味着你這樣作是明智的,有些因素你在入列任務前必須考慮:

  • 首先確保worker進程能夠引入任務函數的__module__。這意味着你不能入列申明在__main__模塊中的任務方法。
  • 確保工做進程和進程的生成器能夠共享源碼。
  • 確保任務方法在上下文中沒有任何依賴,尤爲是全局變量。另外,任務方法依賴的狀態(好比,"當前用戶",或者"當前web訪問請求")在工做(worker)進程中將不復存在。若是你想要爲"當前"的用戶執行相關任務,你應該爲此用戶建立一個實例對象,並將對應的對象引用當作參數傳遞至任務方法。
限制條件

RQ 的工做進程依賴系統的fork()方法,這意味着Windows下沒法運行。

工做(worker)

Worker是一個運行在後臺的Python進程,用來執行一些你不想要在web進程中執行的冗長或者阻塞任務。

開啓workers進程

從項目的root目錄下開啓一個worker進程:

$ rq worker high normal low
*** Listening for work on high, normal, low
Got send_newsletter('me@nvie.com') from default
Job ended normally without result
*** Listening for work on high, normal, low
...

工做進程將無限循環讀取給定隊列(順序很是重要)中的任務,並在全部任務方法執行完畢後繼續等待新的任務。

每個工做進程一次將只執行一個任務,在一個worker進程中,不會並行處理任務,若是你想要並行執行任務,你只須要開啓更多的worker進程。

突發模式

默認狀況下,工做進程將當即運行而且在運行完全部任務後一直阻塞直至新的任務產生。Worker進程一樣可使用突發模式運行,此模式將完成全部隊列中的任務,而且在全部給定的隊列執行完畢後退出。

$ rq worker --burst high normal low
*** Listening for work on high, normal, low
Got send_newsletter('me@nvie.com') from default
Job ended normally without result
No more work, burst finished.
Registering death.

這對批量執行須要週期性執行的任務或者大幅升高暫時性worker進程的峯值來講十分有用。

Worker進程內部原理

Worker進程生命週期

  1. 啓動。加載Python環境。
  2. 註冊。worker進程將本身註冊進系統中。
  3. 監聽。一個Job任務從任意指定的Redis隊列中彈出。若是全部的隊列都是空的而且worker進程運行在突發模式下,將當即退出。不然將持續等待新任務。
  4. 任務執行準備工做。Worker進程將設置狀態爲 busy 以告知系統:Job任務即將開始工做,而且將Job註冊至StartedJobRegistry
  5. Fork子進程。一個子進程("work horse")在有故障的狀況下將關閉任務。
  6. 執行任務。處理在主進程中實際執行的任務。
  7. 清除執行任務。worker進程將設置它的狀態至"閒置" idle,而且依據result_ttl設置任務和任務執行結果至過時。Job任務將從 StartedJobRegistry 裏移除,並在成功執行以後新增至 FinishedJobRegistry,失敗後新增至 FailedQueue 。
  8. 循環。重複步驟3.

性能

基本上,rq worker進程腳本是一個簡單 獲取-Fork-執行的循環。當大量任務作了冗長的啓動設置,或者他們全都依賴於一樣的模塊,你在執行每一個任務時都將花費大量額外的時間(由於你在fork進程以後才進行import操做)。這種方式很簡潔,並且RQ也所以不會出現內存泄漏的狀況,但與此同時帶來的負面影響是執行效率下降了。

對此,你可以採用在fork進程以前先import引入必要模塊的方式來提升生產力。RQ目前沒有提供能夠採起這種設置的參數,可是你能夠在你的worker進程進行循環以前先作import導入。

你能夠自定義本身的worker腳本(替代掉原來使用的rq進程).一個簡單的實例:

#!/usr/bin/env python
import sys
from rq import Connection, Worker

# Preload libraries
import library_that_you_want_preloaded

# Provide queue names to listen to as arguments to this script,
# similar to rq worker
with Connection():
    qs = sys.argv[1:] or ['default']

    w = Worker(qs)
    w.work()
進程命名

Workers 進程命名方式默認等於當前的hostname與當前PID鏈接。覆蓋默認設置,能夠在開始worker進程時指定一個 --name 選項

worker進程關閉

任什麼時候候,worker進程收到SIGINT (via Ctrl+C) 或者 SIGTERM (via kill)信號,worker進程將會等待當前正在執行任務完成工做後,再關閉循環,將其註冊入死亡進程。

若是是在關閉進程階段,再次收到SIGINT 或者 SIGTERMworker進程將強制性使子進程中斷(發送SIGKILL),但依然會嘗試將其註冊入死亡進程。

使用配置文件

0.3.2 版本中的新特性

若是你想要經過配置文件而不是命令行參數來配置rq進程,你能夠建立一個名爲settings.py的Python文件:

REDIS_URL = 'redis://localhost:6379/1'

# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Queues to listen on
QUEUES = ['high', 'normal', 'low']

# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
SENTRY_DSN = 'http://public:secret@example.com/1'

圖上示例展現了全部可用的配置選項。 注意:QUEUESREDIS_PASSWORD設置在0.3.3之後的版本才存在。 指定worker進程讀取配置文件的路徑使用 -c 參數:

$ rq worker -c settings

自定義worker類

版本0.4.0 的新特性

There are times when you want to customize the worker's behavior. Some of the more common requests so far are:

  • 在執行任務前優先管理數據庫鏈接.
  • 執行任務使用不包含os.fork的模型.
  • 使用多進程模型multiprocessing 或者 gevent. 使用 -w 參數指定 worker類路徑:
$ rq worker -w 'path.to.GeventWorker'

自定義任務和隊列類

將來開放

You can tell the worker to use a custom class for jobs and queues using --job-class and/or --queue-class.

$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'

Don't forget to use those same classes when enqueueing the jobs.

For example:

from rq import Queue
from rq.job import Job

class CustomJob(Job):
    pass

class CustomQueue(Queue):
    job_class = CustomJob

queue = CustomQueue('default', connection=redis_conn)
queue.enqueue(some_func)

自定義異常捕獲操做

版本 0.5.5 的新特性

若是你想根據不一樣類型的任務來決定對應的異常操做,或者僅僅想重寫異常處理,能夠經過--exception-handler選項:

$ rq worker --exception-handler 'path.to.my.ErrorHandler'

# Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'

結果

入列任務是延遲函數的調用,也就是說咱們正在解決一個問題,可是須要等待一會纔回來執行。

處理結果

Python 方法若是有返回值,若是任務返回一個非空的值,worker進程將返回值寫入Redis所存的任務對應Hash下,TTL默認的過時時間爲任務結束後的500s。

The party that enqueued the job gets back a Job instance as a result of the enqueueing itself. Such a Job object is a proxy object that is tied to the job's ID, to be able to poll for results.

返回結果 TTL

返回結果寫入Redis時伴有一個有限的生存時間,能夠避免Redis數據庫數據無限增加。

RQ >= 0.3.1時,TTL的值能夠在調用 enqueue_call()時經過使用 result_ttl關鍵詞參數指定。它一樣能夠禁用過時,這時你須要手動清理數據。

q.enqueue_call(func=foo)  # result expires after 500 secs (the default)
q.enqueue_call(func=foo, result_ttl=86400)  # result expires after 1 day
q.enqueue_call(func=foo, result_ttl=0)  # result gets deleted immediately
q.enqueue_call(func=foo, result_ttl=-1)  # result never expires--you should delete jobs manually

此外,你可使用來繼續執行一些沒有返回值,默認會當即刪除的已完成任務,

q.enqueue_call(func=func_without_rv, result_ttl=500)  # job kept explicitly

異常處理

一般,任務能夠在失敗後拋出異常,RQ將用如下方式處理:

失敗的任務須要關注而且失敗任務的返回值不該該設置過時時間。更進一步,失敗的任務須要再次運行測試。通常這些事情須要手動操做,由於RQ自己是沒法字典或者可靠地自動判斷任務從新執行是否安全。

當一個異常在任務中拋出,worker進程能夠獲取獲得,並將其序列化,以鍵爲exc_info的hash存儲在任務對應的Redis下。任務的引用隨即被置於失敗隊列中。

任務自己擁有一些十分有用的屬性幫助檢測:

  • 任務最初的建立時間
  • 最近一個任務入列的時間
  • 起始隊列
  • 所需方法的文本描述
  • 異常信息

這些能夠幫助你檢測和手動追查問題,而且再次提交任務。

處理任務中斷

worker進程被 Ctrl+C or kill 中斷,RQ 幾乎不會丟失任務。當前任務完成後,worker進程將其它未執行的任務終止。

然而,worker進程可使用kill -9強制終止,不過,這種方式worker進程沒法優雅地結束任務或者將任務放入失敗隊列中。所以,強制關閉一個worker進程可能引發潛在的問題。

處理任務超時

默認任務執行時間爲180s,若是逾期未執行完畢,worker進程將終止主進程而且將任務放入失敗隊列,並標識其超時。

若是一個任務須要更多(或更少)時間來完成,默認的超時時間也將改變,你能夠在調用Queue.enqueue()時經過參數指定:

q = Queue()
q.enqueue(func=mytask, args=(foo,), kwargs={'bar': qux}, timeout=600)  # 10 mins
You can also change the default timeout for jobs that are enqueued via specific queue instances at once, which can be useful for patterns like this:

# High prio jobs should end in 8 secs, while low prio
# work may take up to 10 mins
high = Queue('high', default_timeout=8)  # 8 secs
low = Queue('low', default_timeout=600)  # 10 mins

# Individual jobs can still override these defaults
low.enqueue_call(really_really_slow, timeout=3600)  # 1 hr
Individual jobs can still specify an alternative timeout, as workers will respect these.

任務(job)

在一些狀況下,你可能須要進入當前的任務ID或者任務方法的實例,或者爲任務存於任意數據。

進入 "當前" 任務

版本 0.3.3 的新特性

由於任務方法和Python方法本質是同樣的,你能夠向RQ 獲取當前任務ID:

from rq import get_current_job

def add(x, y):
    job = get_current_job()
    print 'Current job: %s' % (job.id,)
    return x + y

保存數據

版本 0.3.3 的新特性

爲了給任務增長/更新自定義的狀態信息,須要操做任務的meta屬性,

import socket

def add(x, y):
    job = get_current_job()
    job.meta['handled_by'] = socket.gethostname()
    job.save()
    return x + y

隊列任務的TTL

版本 0.4.7 的新特性

一個任務擁

相關文章
相關標籤/搜索