Celery-4.1 用戶指南: Configuration and defaults (配置和默認值)

這篇文檔描述了可用的配置選項。html

若是你使用默認的加載器,你必須建立 celeryconfig.py 模塊而且保證它在python路徑中。node

配置文件示例

如下是配置示例,你能夠從這個開始。它包括運行一個基本Celery應用的全部基礎設置。python

## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

新的小寫設置

4.0 版本引入了新的小寫設置名稱和機構環境。mysql

與之前版本的不一樣,除了設置項名稱變爲小寫字母外,還有一個前綴的重命名,例如 celerybeat_ 變爲 beat_,celeryd_ 變爲 worker,以及不少頂級 celery_ 設置重命名成了 task_ 前綴。git

Celery 仍然能讀取老的配置文件,因此並不倉促遷移到新的設置格式。github

Setting name Replace with
CELERY_ACCEPT_CONTENT accept_content
CELERY_ENABLE_UTC enable_utc
CELERY_IMPORTS imports
CELERY_INCLUDE include
CELERY_TIMEZONE timezone
CELERYBEAT_MAX_LOOP_INTERVAL beat_max_loop_interval
CELERYBEAT_SCHEDULE beat_schedule
CELERYBEAT_SCHEDULER beat_scheduler
CELERYBEAT_SCHEDULE_FILENAME beat_schedule_filename
CELERYBEAT_SYNC_EVERY beat_sync_every
BROKER_URL broker_url
BROKER_TRANSPORT broker_transport
BROKER_TRANSPORT_OPTIONS broker_transport_options
BROKER_CONNECTION_TIMEOUT broker_connection_timeout
BROKER_CONNECTION_RETRY broker_connection_retry
BROKER_CONNECTION_MAX_RETRIES broker_connection_max_retries
BROKER_FAILOVER_STRATEGY broker_failover_strategy
BROKER_HEARTBEAT broker_heartbeat
BROKER_LOGIN_METHOD broker_login_method
BROKER_POOL_LIMIT broker_pool_limit
BROKER_USE_SSL broker_use_ssl
CELERY_CACHE_BACKEND cache_backend
CELERY_CACHE_BACKEND_OPTIONS cache_backend_options
CASSANDRA_COLUMN_FAMILY cassandra_table
CASSANDRA_ENTRY_TTL cassandra_entry_ttl
CASSANDRA_KEYSPACE cassandra_keyspace
CASSANDRA_PORT cassandra_port
CASSANDRA_READ_CONSISTENCY cassandra_read_consistency
CASSANDRA_SERVERS cassandra_servers
CASSANDRA_WRITE_CONSISTENCY cassandra_write_consistency
CELERY_COUCHBASE_BACKEND_SETTINGS couchbase_backend_settings
CELERY_MONGODB_BACKEND_SETTINGS mongodb_backend_settings
CELERY_EVENT_QUEUE_EXPIRES event_queue_expires
CELERY_EVENT_QUEUE_TTL event_queue_ttl
CELERY_EVENT_QUEUE_PREFIX event_queue_prefix
CELERY_EVENT_SERIALIZER event_serializer
CELERY_REDIS_DB redis_db
CELERY_REDIS_HOST redis_host
CELERY_REDIS_MAX_CONNECTIONS redis_max_connections
CELERY_REDIS_PASSWORD redis_password
CELERY_REDIS_PORT redis_port
CELERY_RESULT_BACKEND result_backend
CELERY_MAX_CACHED_RESULTS result_cache_max
CELERY_MESSAGE_COMPRESSION result_compression
CELERY_RESULT_EXCHANGE result_exchange
CELERY_RESULT_EXCHANGE_TYPE result_exchange_type
CELERY_TASK_RESULT_EXPIRES result_expires
CELERY_RESULT_PERSISTENT result_persistent
CELERY_RESULT_SERIALIZER result_serializer
CELERY_RESULT_DBURI Use result_backend instead.
CELERY_RESULT_ENGINE_OPTIONS database_engine_options
[…]_DB_SHORT_LIVED_SESSIONS database_short_lived_sessions
CELERY_RESULT_DB_TABLE_NAMES database_db_names
CELERY_SECURITY_CERTIFICATE security_certificate
CELERY_SECURITY_CERT_STORE security_cert_store
CELERY_SECURITY_KEY security_key
CELERY_TASK_ACKS_LATE task_acks_late
CELERY_TASK_ALWAYS_EAGER task_always_eager
CELERY_TASK_ANNOTATIONS task_annotations
CELERY_TASK_COMPRESSION task_compression
CELERY_TASK_CREATE_MISSING_QUEUES task_create_missing_queues
CELERY_TASK_DEFAULT_DELIVERY_MODE task_default_delivery_mode
CELERY_TASK_DEFAULT_EXCHANGE task_default_exchange
CELERY_TASK_DEFAULT_EXCHANGE_TYPE task_default_exchange_type
CELERY_TASK_DEFAULT_QUEUE task_default_queue
CELERY_TASK_DEFAULT_RATE_LIMIT task_default_rate_limit
CELERY_TASK_DEFAULT_ROUTING_KEY task_default_routing_key
CELERY_TASK_EAGER_PROPAGATES task_eager_propagates
CELERY_TASK_IGNORE_RESULT task_ignore_result
CELERY_TASK_PUBLISH_RETRY task_publish_retry
CELERY_TASK_PUBLISH_RETRY_POLICY task_publish_retry_policy
CELERY_TASK_QUEUES task_queues
CELERY_TASK_ROUTES task_routes
CELERY_TASK_SEND_SENT_EVENT task_send_sent_event
CELERY_TASK_SERIALIZER task_serializer
CELERYD_TASK_SOFT_TIME_LIMIT task_soft_time_limit
CELERYD_TASK_TIME_LIMIT task_time_limit
CELERY_TRACK_STARTED task_track_started
CELERYD_AGENT worker_agent
CELERYD_AUTOSCALER worker_autoscaler
CELERYD_CONCURRENCY worker_concurrency
CELERYD_CONSUMER worker_consumer
CELERY_WORKER_DIRECT worker_direct
CELERY_DISABLE_RATE_LIMITS worker_disable_rate_limits
CELERY_ENABLE_REMOTE_CONTROL worker_enable_remote_control
CELERYD_HIJACK_ROOT_LOGGER worker_hijack_root_logger
CELERYD_LOG_COLOR worker_log_color
CELERYD_LOG_FORMAT worker_log_format
CELERYD_WORKER_LOST_WAIT worker_lost_wait
CELERYD_MAX_TASKS_PER_CHILD worker_max_tasks_per_child
CELERYD_POOL worker_pool
CELERYD_POOL_PUTLOCKS worker_pool_putlocks
CELERYD_POOL_RESTARTS worker_pool_restarts
CELERYD_PREFETCH_MULTIPLIER worker_prefetch_multiplier
CELERYD_REDIRECT_STDOUTS worker_redirect_stdouts
CELERYD_REDIRECT_STDOUTS_LEVEL worker_redirect_stdouts_level
CELERYD_SEND_EVENTS worker_send_task_events
CELERYD_STATE_DB worker_state_db
CELERYD_TASK_LOG_FORMAT worker_task_log_format
CELERYD_TIMER worker_timer
CELERYD_TIMER_PRECISION worker_timer_precision

配置指示

通用設置

  • accept_content
    默認值: {‘json’} (set, list, or tuple).
    容許的內容類型/序列化器的白名單

若是接收到一個消息,其內容類型再也不上述列表中,它將會被丟棄並拋出一個錯誤。redis

默認狀況下,任意內容類型都是啓用的,包括pickle以及yaml,因此確保不受信任的第三方不能訪問你的消息中間件。查看安全這一節獲取更多信息。算法

示例:sql

# using serializer name
accept_content = ['json']

# or the actual content-type (MIME)
accept_content = ['application/json']

時間與日期設置

  • enable_utc
    2.5 版本新特性。
    默認值:從 3.0 版本開始默認啓用

  一旦啓用,消息中的日期和時間將會轉化成 UTC 時區。mongodb

  注意2.5版本如下的工做單元將會認爲全部消息都使用的本地時區,因此只有在全部的工做單元都升級了的狀況下再啓用這個特性。

  • timezone
    2.5版本新特性
    默認值: 「UTC」

  設置Celery使用一個自定義的時區。這個時區值能夠是pytz庫支持的任意時區。

  若是沒有設置,UTC時區將被使用。爲了向後兼容,還提供了一個 enable_utc設置,若是他設置成假,將使用系統本地時區。

任務設置

  • task_annotations
    這個設置能夠用來在配置文件中重寫任意任務屬性。這個設置能夠是一個字典,獲取一個annotation對象的列表,這個列表對任務進行過濾,對匹配的任務名稱起做用,並返回待更改屬性的一個映射。

如下將更改 tasks.add 任務的 rate_limit 屬性:

task_annotations = {‘tasks.add’: {‘rate_limit’: ‘10/s’}} 

或者對全部的任務更改:

task_annotations = {‘*’: {‘rate_limit’: ‘10/s’}} 


你還能夠更改方法,例如 on_failure 處理函數:

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
  print(‘Oh no! Task failed: {0!r}’.format(exc))

task_annotations = {‘*’: {‘on_failure’: my_on_failure}} 

若是你須要更靈活的控制,那麼你可使用對象而不是字典來選擇任務來進行註解:

class MyAnnotate(object):

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})
  • task_compression
    默認值: None
    任務消息的默認壓縮算法。能夠是 gzip、bzip2(若是可用),或者任意在 Kombu 壓縮模式註冊表中註冊的自定義壓縮算法。

   默認發送未壓縮的消息。

  • task_protocol
    默認值:2(從4.0版本開始)
    設置默認的任務消息協議版本。支持的協議:1 和 2

   協議 2 在 3.1.24 以及 4.x+ 被支持

  • task_serializer
    默認值:「json」(從4.0版本開始,更早:pickle)
    一個表示使用的默認序列化方法的字符串。能夠是 json(默認)、pickle、 yaml、msgpack,或者任意在 kombu.serialization.registry 中註冊過的自定義序列化方法。

  另見:
    Serializers

  • task_publish_retry
    2.2版本新特性
    默認值:啓用

   決定當鏈接丟失或者其餘鏈接錯誤時任務消息的發佈是否會重試,查看 task_publish_retry_policy。

  • task_publish_retry_policy
    2.2版本新特性
    默認值:查看 Message Sending Retry。

   定義當鏈接丟失或者其餘鏈接錯誤時任務消息的發佈重試策略。

任務執行設置

  • task_always_eager
    默認值:禁用
    若是設置成 True,全部的任務都將在本地執行知道任務返回。apply_async() 以及Task.delay()將返回一個 EagerResult 實例,模擬AsyncResult實例的API和行爲,除了這個結果是已經計算過的以外。

   也就是說,任務將會在本地執行而不是發送到隊列。

  • task_eager_propagates
    默認值:禁用
    若是設置成 True,本地執行的任務(使用 task.apply(),或者 task_always_eager 被啓用)將傳遞異常。

   這與使用 apply()throw=True 參數有一樣的效果。

  • task_remote_tracebacks
    默認值:禁用
    若是啓用了,當從新拋出任務錯誤時,任務結果將會包括工做單元的堆棧信息。

   它須要 tblib 庫,能夠經過 pip 安裝:

    $ pip install celery[tblib] 


    查看 Bundles 獲取關於組合多個擴展需求的信息。

  • task_ignore_result
    默認值:禁用
    是否存儲任務返回值(tombstones)。若是你只是想在發生錯誤的時候記錄返回值,能夠設置:task_store_errors_even_if_ignored

  • task_store_errors_even_if_ignored
    默認值:禁用
    若是設置了,即便 Task.ignore_result 啓用了,工做單元也會愛結果後端中存儲全部的任務錯誤。

  • task_track_started
    默認值:禁用
    若是設置成真,當任務被工做單元執行時,任務將報告它的狀態爲started。默認值是假,由於一般行爲是不作這種粒度級別的彙報。任務會處於 pending、finished 或者 waiting to be retried。當有長時間任務,而且須要知道當前正在運行什麼任務時,有一個 started狀態將會頗有用。

  • task_time_limit
    默認值:沒有時間限制
    任務的硬時間限制,以秒爲單位。若是這個時間限制被超過,處理任務的工做單元進程將會被殺死並使用一個新的替代。

  • task_soft_time_limit
    默認值:沒有時間限制
    任務的軟時間限制,以秒爲單位

  當這個時間限制超事後,SoftTimeLimitExceeded異常將會被拋出。例如,任務能夠捕獲這個異常在硬時間限制到達以前對環境進行清理:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()
  • task_acks_late
    默認值:禁用
    延遲確認意味着任務消息將在任務執行完成以後再進行確認,而不是剛開始時(默認行爲)。

  另見:
    FAQ: Shoud I use retry or acks_late

  • task_reject_on_worker_lost
    默認值:禁用
    即便 task_acks_late 被啓用,當處理任務的工做單元異常退出或者收到信號而退出時工做單元將會確認任務消息。

   將這個設置成真可讓消息從新入隊,因此任務將會被再執行,在同一個工做單元或者另一個工做單元。

   告警:
    啓用這個可能致使消息循環;確保你知道你在作什麼

  • task_default_rate_limit
    默認值:沒有速率限制
    任務的全局默認速率限制

   當任務沒有一個自定義的速率限制時,這個值將被使用

   另見:
     worker_disable_rate_limits 設置能夠禁用全部的速率限制

任務結果後端設置

  • result_backend
    默認值:默認不啓用結果後端
    用來存儲結果的後端。能夠是下列之一:

    1. rpc
      以 AMQP 消息形式發送結果。查看 RPC 後端設置

    2. database
      使用一個 SQLAlchemy 支持的結構化數據庫。查看數據庫後端設置

    3. redis
      使用 Redis 存儲結果。查看 Redis 後端設置

    4. cache
      使用 Memcached 存儲結果。查看 Cache 後端設置

    5. cassandra
      使用 Cassandra 存儲結果。查看 Cassandra 後端設置

    6. elasticsearch
      使用 Elasticsearch 存儲結果。查看 Elasticsearch 後端設置

    7. ironcache
      使用 IronCache 存儲結果。查看 IronCache 後端設置

    8. couchbase
      使用 Couchbase 存儲結果。查看 Couchbase 後端設置

    9. couchdb
      使用 CouchDB 存儲結果。查看 CouchDB 後端設置

    10. filesystem
      使用共享文件夾存儲結果。查看 File-system 後端設置

    11. consul
      使用 Consul K/V 存儲結果。查看 Consul K/V 後端設置

  • result_serializer
    默認值:從4.0版本開始使用 json(更早:pickle)
    查看 Serializers 獲取支持的序列化格式的信息。

  • result_compression
    默認值:無壓縮
    結果值得可選壓縮方法。支持 task_seralizer 設置相同的選項。

  • result_expires
    默認值:1天后過時
    存儲的結果被刪除的時間(秒數,或者一個 timedelta 對象)

   (有一個內建的週期性任務將刪除過時的任務結果(celery.backend_cleanup),前提是 celery beat 已經被啓用。這個任務天天上午4點運行。

   值 None 或者 0 意思是結果永不刪除(取決於後端聲明))

   注意:
    當前這個特性只支持 AMQP, database, cache, Redis 這些存儲後端。當使用 database 存儲後端,celery beat必須執行使得過時結果被刪除。

  • result_cache_max
    默認值:默認禁用
    啓用結果的客戶端緩存。

   對於老的 amqp 後端,存儲結果一旦被消費它將再也不可用,此時這個特性將起到做用。

   這是老的結果被刪除以前總的結果緩存的數量。值 0 或者 None 意味着沒有限制,而且值 -1 將禁用緩存。

Database 後端設置

Database URL 示例

使用一個數據庫存儲後端,你必須配置 result_backend 設置爲一個鏈接的URL,而且帶 db+ 前綴:

result_backend = 'db+scheme://user:password@host:port/dbname'

示例:

# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'

查看 Supported Databases 獲取支持的數據庫的一個表,查看 Connection String 獲取相關的鏈接字符串(這是 db+ 前綴後帶的URI的一部分)

  • database_engine_options
    默認值:{} (空映射)
    你可使用 sqlalchmey_engine_options 設置聲明額外的 SQLAchemy 數據庫引擎選項:
# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}
  • database_short_lived_sessions
    默認值:默認禁用
    默認禁用短會話。若是啓用了,他們會急劇的下降性能,特別是對於處理不少任務的系統。當工做單元的流量很低,緩存的數據庫鏈接會因爲空閒而變爲無用,進而會致使工做單元出錯,這種狀況下這個選項是有用的。例如:間歇性的錯誤如(OperationalError)(2006, ‘MySQL server has gone away’)經過啓用短會話能解決。這個選項隻影響數據庫後端。

  • database_table_names
    默認值:{} (空映射)
    當 SQLAlchemy 設置成結果後端, Celery 自動建立兩個表來存儲任務的元數據。這個設置容許你自定義表名稱:

# use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}

RPC 後端設置

  • result_persistent
    默認值:默認被禁用(瞬態消息)
    若是設置成 True,結果消息將被持久化。這意味着消息中間件重啓後消息不會丟失。

  配置示例:

result_backend = 'rpc://'
result_persistent = False

Cache 後端設置

注意:
  緩存後端支持 pylibmc 和 python-memcached 庫。後者只有在 pylibmc 沒有安裝時纔會被使用。

使用一個 Memcached 服務器:

result_backend = 'cache+memcached://127.0.0.1:11211/'

使用多個 Memcached 服務器:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

「memory」 後端只在內存中存儲緩存:

result_backend = 'cache'
cache_backend = 'memory'
  • cache_backend_options
    默認值:{} (空映射)
    你可使用 cache_backend_options 設置 pylibmc 選項:
cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}
  • cache_backend
    這個設置再也不使用了,由於如今能夠直接在 result_backend 中設置後端存儲。

Redis 後端設置

配置後端 URL

注意:
  Redis 後端須要 Redis 庫。

可使用 pip 安裝這個包:

$ pip install celery[redis]

查看 Bundles 獲取組合多個擴展需求的信息

後端須要 result_backend 設置成一個 Redis URL:

result_backend = 'redis://:password@host:port/db'

例如:

result_backend = 'redis://localhost/0'

等同於:

result_backend = 'redis://'

URL 的字段以下定義:
1. password
  鏈接數據庫的密碼
2. host
  Redis 服務器的主機名或者IP地址(例如:localhost)
3. port
  Redis 服務器的端口。默認是 6379
4. db
  使用的數據庫編號。默認是0。db 能夠包含一個可選的斜槓

  • redis_backend_us_ssl
    默認值:禁用
    Redis後端支持 SSL。這個選項的合法值與 broker_use_ssl 相同

  • redis_max_connections
    默認值:無顯示
    Redis 鏈接池的最大可用鏈接數,這些鏈接用來發送和接收結果

  • redis_socket_connect_timeout
    5.0.1版本新特性
    默認值:None

從存儲後端鏈接到Redis服務器的鏈接的Socket超時時間(以秒爲單位,int/float)

  • redis_socket_timeout
    默認值:120秒
    對 Redis 服務器的讀寫操做的 Socket 超時時間(以秒爲單位,int/float),由存儲後端使用

Cassandra 後端設置

注意:
  Cassandra 後端驅動 cassandra-driver。

使用 pip 安裝:

$ pip install celery[cassandra]

查看 Bundles 獲取關於組合擴展需求的信息。

後端須要配置下列配置指令

  • cassandra_servers
    默認值: [] (空列表)
    Cassandra 服務器列表。例如:
cassandra_servers = ['localhost']
  • cassandra_port
    默認值:9042.
    鏈接到Cassandra服務器的端口

  • cassandra_keyspace
    默認值: None.
    存儲結果的 key-space。例如:

cassandra_keyspace = 'tasks_keyspace'
  • cassandra_table
    默認值: None.
    存儲結果的表(列族)。例如:
cassandra_table = 'tasks'
  • cassandra_read_consistency
    默認值: None.
    使用的讀一致性。值能夠是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_write_consistency
    默認值: None.
    使用的寫一致性。值能夠是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_entry_ttl
    默認值: None.
    狀態項的 Time-to-live。添加事後一段時間他們將會過時而且被刪除。值 None (默認) 意味着他們永不過時

  • cassandra_auth_provider
    默認值: None.
    使用的 cassandra.auth 模塊中的 AuthProvider。 值能夠是 PlainTextAuthProvider 或者 SaslAuthProvider

  • cassandra_auth_kwargs
    默認值: {} (空映射)
    傳遞給 authentication provider 的命名參數。例如:

cassandra_auth_kwargs = {
    username: 'cassandra',
    password: 'cassandra'
}

配置示例:

cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400

Elasticsearch 後端設置

使用 Elasticsearch 做爲結果後端,你只須要將result_backend設置成正確的 URL。

配置示例:

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
  • elasticsearch_retry_on_timeout
    默認值: False
    超時後是否應該觸發在另外一個節點重試?

  • elasticsearch_max_retries
    默認值: 3
    異常被傳遞前的最大重試次數

  • elasticsearch_timeout
    默認值: 10.0 秒
    elasticsearch 使用的全局超時時間

Riak 後端設置

注意:
Riak 後端須要 riak 庫

使用 pip 進行安裝:

$ pip install celery[riak]

查看 Bundles 獲取組合多擴展需求的信息。

後端須要result_backend設置成一個 Riak URL:

result_backend = 'riak://host:port/bucket'

例如:

result_backend = 'riak://localhost/celery

等同於:

result_backend = 'riak://'

URL 的字段定義以下:
1. host
  Riak 服務器的主機名或者IP地址(例如 localhost)
2. port
  使用 protobuf 協議的Riak 服務器端口,默認是 8087
3. bucket
  使用的Bucket名稱。默認是 celery。bucket 名稱須要是一個只包含ASCII字符的字符串。

另外,這個後端可使用以下配置指令進行配置:

  • riak_backend_settings
    默認值: {} (空映射)
    這是一個支持以下鍵的映射:

    1. host
      Riak 服務器的主機名或者IP地址(例如 localhost)

    2. port
      Riak 服務器端口。默認是 8087

    3. bucket
      使用的Bucket名稱。默認是 celery。bucket 名稱須要是一個只包含ASCII字符的字符串。

    4. protocol
      鏈接到 Riak 服務器使用的協議。這不能夠經過 result_backend 配置

AWS DynamoDB 後端設置

注意:
  Dynamodb 後端須要 boto3 庫

使用 pip 進行安裝:

$ pip install celery[dynamodb]

查看 Bundles 獲取組合多擴展需求的信息。

後端須要 result_backend 設置成一個 DynamoDB URL:

result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'

例如,聲明 AWS 區域以及表名稱:

result_backend = 'dynamodb://@us-east-1/celery_results

或者從環境中獲取 AWS 配置參數,使用默認表名稱(celery)以及聲明讀寫吞吐量:

result_backend = 'dynamodb://@/?read=5&write=5'

或者在本地使用 DynamoDB 的可下載版本:

result_backend = 'dynamodb://@localhost:8000

URL 中的字段以下定義:

  1. aws_access_key_id & aws_secret_access_key
    訪問 AWS API 資源的認證信息。這能夠經過 boto3 從不一樣的源獲取到

  2. region
    AWS 區域,例如: us-east-1 或者本地版本的 localhost。查看 boto3 庫文檔獲取更多的信息。

  3. port
    若是你使用的本地版本,這是本地DynamoDB示例監聽的端口。若是你沒有把區域設置成 localhost,這個設置選項將無效

  4. table
    使用的表名。默認是 celery。查看 DynamoDB 命名規則獲取容許的字符以及表名長度的信息。

  5. read & write
    所建立的 DynamoBD 表的讀寫能力單元。默認的讀寫值都是 1。更多的細節能夠從 Provisioned Throughput documentation 中獲取到。

IronCache 後端設置

注意:
IronCache 後端須要 iron_celery 庫:

使用 pip 進行安裝:

$ pip install iron_celery

IronCache 經過在 result_backend 中配置的 URL 進行聲明,例如:

result_backend = 'ironcache://project_id:token@'

或者更改緩存名稱:

ironcache:://project_id:token@/awesomecache

更多的信息,查看 https://github.com/iron-io/iron_celery

Couchbase 後端設置

注意:
Couchbase 後端須要 couchbase 庫

使用 pip 進行安裝:

$ pip install celery[couchbase]

查看 Bundle 獲取組合多擴展需求的步驟。

後端能夠經過 result_backend 設置成一個 Couchbase URL:

result_backend = 'couchbase://username:password@host:port/bucket'
  • couchbase_backend_settings

   默認值:{} (空映射)

  這是一個支持以下鍵的映射:

  1. host
    Couchbase 服務器的主機名。默認是 localhost
  2. port
    Couchbase 服務器監聽的端口。默認是 8091
  3. bucket
    Couchbase 服務器默認寫入的桶。默認是default
  4. username
    Couchbase 服務器認證的用戶名(可選)
  5. password
    Couchbase 服務器認證的密碼(可選)

CouchDB 後端設置

注意:
CouchDB 後端須要 pycouchdb 庫:
使用 pip 安裝這個包:

$ pip install celery[couchdb]

查看 Bundles 獲取更多關於組合多擴展需求的信息

後端能夠經過 result_backend 配置成一個 CouchDB URL:

result_backend = 'couchdb://username:password@host:port/container'

URL 由如下部分組成:

  1. username
    Couchbase 服務器認證的用戶名(可選)
  2. password
    Couchbase 服務器認證的密碼(可選)
  3. host
    Couchbase 服務器的主機名。默認是 localhost
  4. port
    Couchbase 服務器監聽的端口。默認是 8091
  5. container
    CouchDB 服務器寫入的默認容器。默認是 default

File-system 後端設置

後端能夠經過一個文件 URL 配置,例如:

CELERY_RESULT_BACKEND = 'file:///var/celery/results'

配置的目錄須要被共享,而且全部使用該後端的服務器均可寫。

若是你在單獨的一個系統上使用 Celery,你不須要任何進一步的配置就能夠簡單的使用這個後端。對於大型的集羣,你可使用 NFS、GlusterFS、CIFS、HDFS(使用FUSE),或者其餘文件系統。

Consul K/V 存儲後端設置

Consul 後端能夠經過 URL 配置:

CELERY_RESULT_BACKEND = ‘consul://localhost:8500/’

後端將在 Consul K/V 存儲中做爲單獨鍵存儲結果

後端使用Consul 中的 TTLs 支持結果的自動過時

消息路由

  • task_queues
    默認值: None (默認隊列的配置)
    多數用戶不肯聲明這個配置,而是使用 automatic routing facilites。

若是你真的須要配置高級路由,這個設置應該是一個 kombu.Queue 對象的列表,工做單元能夠從中消費。

注意工做單元能夠經過 -Q 選項覆蓋這個設置,或者這個列表中的單獨隊列能夠經過 -X 選項進行排除。

查看 Basics 獲取更多的信息。

默認值是 celery 隊列的一個隊列/消息交換器/綁定的鍵,消息交互類型是direct。

查看 task_routes

  • task_routes
    默認值: None
    一個路由器的列表,或者單個路路由,用來路由任務到相應的隊列。當決定一個任務的最終目的,路由器將按聲明順序進行輪詢。

一個路由器能夠經過以下方式聲明:

  1. 函數,簽名格式爲 (name, args, kwargs, options, task=None, **kwargs)
  2. 字符串,提供到路由函數的路徑
  3. 字典,包含路由聲明,它將會轉化成一個 celery.routes.MapRoute 實例
  4. 一個 (pattern, route) 元組的列表,它將會轉化成一個 celery.routes.MapRoute 實例
    示例:
task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media'
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})
其中,myapp.tasks.route_task 能夠是:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}

route_task 能夠返回一個字符串或者一個字典。一個字符串表示 task_queues 中的一個隊列名,而字典表示一個自定義的路由。

當發送消息,路由被按順序詢問。第一個返回非 None 值得路由將被使用。消息選項此時將與找到的路由設置合併,其中路由器的設置要優先。

例如: apply_async() 有這些參數:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')

而且有一個路由器返回:

{'immediate': True, 'exchange': 'urgent'}

那麼最終的消息選項將是:

immediate=True, exchange='urgent', routing_key='video.compress'

(以及Task類中定義的任意默認消息選項)

當進行合併時,task_routes 中定義的值會優先於 task_queues 中定義的值。

對於以下設置:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}

tasks.add 的最終路由選項將變爲:

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}

查看路由器獲取更多的示例。

  • task_queue_ha_policy
    消息中間件: RabbitMQ
    默認值:None
    這將設置一個隊列的HA策略,而且值能夠是一個字符串(一般是 all)
task_queue_ha_policy = 'all'

使用 all 將複製隊列到全部的當前節點,或者你指定一個節點的列表:

task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']

使用一個列表將隱示設置 x-ha-policy爲‘nodes,x-ha-policy-params` 爲給定的節點列表

查看 http://www.rabbitmq.com/ha.html 獲取更多的信息

  • task_queue_max_priority
    消息中間件: RabbitMQ
    默認值: None
    查看 RabbitMQ Message Priorities

  • worker_direct
    默認值: 禁用

這個選項使得每一個工做單元又一個專門的隊列,因此任務能夠路由到指定的工做單元。

每一個工做單元的隊列名稱是基於工做單元主機名和一個 .dq後綴自動產生的,使用 C.dq 消息交互器。

例如:節點名稱爲 w1@example.com 的工做單元的隊列名稱爲:

w1@example.com.dq

此時,你能夠經過指定主機名爲路由鍵而且使用 C.dq 消息交互器來將任務路由到指定的節點。

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}
  • task_create_missing_queues
    默認值:啓用
    若是啓用(默認),任何聲明的未在 task_queues 中未定義的隊列都將自動被建立。查看 Automaci routing。

  • task_default_queue
    默認值: celery
    若是消息沒有聲明路由或者自定義的隊列,apply_async 默認使用的隊列名稱。

這個隊列必須在 task_queues 中。若是 task_queues 沒有聲明,那麼他將自動建立一個隊列項,而這個設置值就做爲隊列的名稱。

另見:
修改默認隊列的名稱

  • task_default_exchange
    默認值:」celery」
    當 task_queues 設置中指定鍵沒有聲明自定義的消息交互器,那麼這個默認的消息交互器將被使用。

  • task_default_exchange_type
    默認值:」direct」
    當 task_queues 設置中指定鍵沒有聲明自定義的消息交互器類型,那麼這個默認的消息交互器類型將被使用。

  • task_default_routing_key
    默認值:」celery」
    當 task_queues 設置中指定鍵沒有聲明自定義的路由鍵,那麼這個默認的路由鍵將被使用。

  • task_default_delivery_mode
    默認值:」presistent」

  能夠是瞬態的(消息不寫硬盤),或者持久的(寫硬盤)

消息中間件設置

  • broker_url
    默認值:」amqp://」
    默認的消息中間件URL。這必須是一個以下形式的URL:
transport://userid:password@hostname:port/virtual_host

其中只有模式部分是必須的,其他部分都是可選的,默認會設置爲對應傳輸中間件的默認值。

傳輸部分是使用的消息中間件的實現,默認是 amqp,(若是安裝了librabbitmq會使用這個庫,不然使用pyamqp)。還有其餘可用的選擇,包括 redis://、 sqs://、 qpid://。

模式部分能夠是你本身的傳輸中間件實現的全限定路徑:

broker_url = 'proj.transports.MyTransport://localhost'

能夠配置多個消息中間件,使用相同的傳輸協議也行。消息中間件能夠經過當個字符串聲明,不一樣的消息中間件URL之間用冒號分隔:

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

或者做爲一個列表:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

這些消息中間件將被用於broker_failover_strategy

查看Kombu 文檔中的 URLs 章節獲取更多的信息。

  • broker_read_url / broker_write_url
    默認值:broker_url的設置值
    這些設置能夠配置而不用 broker_url 的設置,能夠爲消息中間件聲明不一樣的鏈接參數,用來消費和生成消息。

示例:

broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'

全部選項均可以聲明成一個列表,做爲故障恢復的可選值,查看 broker_url 獲取更多的信息

  • broker_failover_strategy
    默認值:「round-robin」
    消息中間件鏈接對象的默認故障恢復策略。若是提供了,將映射到 kombu.connection.failover_strategies 中的一個鍵,或者引用任何方法,從給定的列表中產生一個項。

示例:

# Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy
  • broker_heartbeat
    支持的傳輸層協議:pyamqp
    默認值:120.0(與服務器協商)

  注意:這個值只被工做單元使用,客戶端此時不使用心跳。

  由於單純使用 TCP/IP 並不老是及時探測到鏈接丟失,因此 AMQP 定義了心跳,客戶端和消息中間件用來檢測鏈接是否關閉。

  心跳會被監控,若是心跳值是 10 秒,那麼檢測心跳的時間間隔是 10 除以broker_heartbeat_checkrate (默認狀況下,這個值是心跳值的兩倍,因此對於10秒心跳,心跳每隔5秒檢測一次)

  • broker_heartbeat_checkrate
    支持的傳輸層協議:pyamqp
    默認值:2.0

工做單元會間隔監控消息中間件沒有丟失過多的心跳。這個檢測的速率是用 broker_heartbeat 值除以這個設置值獲得的,因此若是心跳是 10.0 而且這個設置值是默認的2.0,那麼這個監控將每隔5秒鐘執行一次(心跳發送速率的兩倍)

  • broker_use_ssl
    支持的傳輸層協議: pyamqp, redis
    默認值: 禁用

在消息中間件鏈接上使用SSL

這個選項的合法值依據使用的傳輸協議的不一樣而不一樣

  • pyamqp
    若是設置成True,鏈接將依據默認的SSL設置啓用SSL。若是設置成一個字典,將依據給定的策略配置SSL鏈接。使用的格式是 python 的 ssl.wrap_socket() 選項。

注意SSL套接字通常會在消息中間件的一個單獨的端口上服務。

如下示例提供了客戶端證書,而且使用一個自定義的認證受權來驗證服務器證書:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

告警:
  使用 broker_use_ssl=True 時請當心。可能你的默認配置根本不會驗證服務器證書。請閱讀python的 ssl module security considerations。

  • redis
    設置必須是一個字典,包括以下鍵:
ssl_cert_reqs (required): one of the SSLContext.verify_mode values: 
ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
ssl_ca_certs (optional): path to the CA certificate
ssl_certfile (optional): path to the client certificate
ssl_keyfile (optional): path to the client key
  • broker_pool_limit
    2.3版本新特性

    默認值:10

    鏈接池中能夠打開最大鏈接數。

    從2.5版本開始鏈接池被默認啓用,默認限制是10個鏈接。這個數值能夠依據使用一個鏈接的 threads/green-threads (eventlet/gevent) 數量進行更改。例如:運行 eventlet 啓動 1000 個 greenlets,他們使用一個鏈接到消息中間件,若是發生競態條件,那麼你應該開始增長這個限制。

    若是設置成None或者0,鏈接池將會被禁用,而且每次使用鏈接都會從新創建鏈接並關閉。

  • broker_connection_timeout
    默認值:4.0
    放棄與AMQP服務器創建鏈接以前默認等待的超時時間。當使用 gevent 時該設置被禁用。

  • broker_connection_retry
    默認值:啓用
    若是與 AMQP 消息中間件的鏈接斷開,將自動從新創建鏈接

   每次重試中間等待的時間會遞增,而且在 broker_connection_max_retries 未達到以前會一隻重試

  • broker_connection_max_retries
    默認值:100
    放棄與 AMQP 服務器從新創建鏈接以前的最大重試次數

   若是設置成 0 或者 None,將一直重試

  • broker_login_method
    默認值:AMQPLAIN
    設置自定義的 amqp 登錄方法

  • broker_transport_options
    2.2 版本新特性
    默認值:{} (空映射)

   傳遞給底層傳輸中間件的一個附加選項的字典

   設置可見超時時間的示例以下(Redis 與 SQS 傳輸中間件支持):

  broker_transport_options = {‘visibility_timeout’: 18000} # 5 hours 


工做單元

  • imports
    默認值:[] (空列表)
    當工做單元啓動時導入的一系列模塊

   這用來聲明要導入的模塊,可是它還可用來導入信號處理函數和附加的遠程控制命令,等等。

   這些模塊將會以原來聲明的順序導入

  • include
    默認值:[] (空列表)
    語義上與 imports 相同,可是能夠做爲將不一樣導入分類的一種手段

這個設置中的模塊是在 imports 設置中的模塊導入以後才導入

  • worker_concurrency
    默認值:CPU核數
    執行任務的併發工做單元 process/threads/green 數量

   若是你大部分操做是I/O操做,你能夠設置更多的進程(線程),可是大部分狀況下都是以CPU數做爲定界,嘗試讓這個值接近你機器的CPU核數。若是沒有設置,當前機器的 CPU核數將會被使用

  • worker_prefetch_multiplier
    默認值:4
    工做單元一次預獲取多少個消息是這個設置值乘以併發進程的數量。默認值是 4(每一個進程4條消息)。可是,默認設置一般是好的選擇 - 若是你有長時間任務等待在隊列中,而且你必須啓動工做單元,注意第一個工做單元初始時將收到4倍的消息量。所以任務可能在工做單元間不會平均分佈

   禁用這個選項,只要將 worker_prefetch_multiplier 設置成 1設置成 0 將容許工做單元持續消費它想要的儘量多的消息

   更詳細的信息,請閱讀 Prefetch Limits

   注意:
    帶 ETA/countdown 的任務不會受 prefetch 限制的影響

  • worker_lost_wait
    默認值:10.0 秒
    有些狀況下,工做單元可能在沒有適當清理的狀況下就被殺死,而且工做單元可能在終止前已經發布了一個結果。這個值聲明瞭在拋出 WorkerLostError 異常以前咱們會在丟失的結果值上等待多久

  • worker_max_tasks_per_child
    一個工做單元進程在被一個新的進程替代以前能夠執行的最大任務數

  • worker_max_memory_per_child

    默認值:沒有限制。類型:int(kilobytes)

    一個工做單元進程在被一個新的進程替代以前能夠消耗的最大預留內存(單位KB)。若是單獨一個任務就致使工做單元超過這個限制,當前的任務會執行完成,而且以後這個進程將會被更新替代。

    示例:

  worker_max_memory_per_child = 12000  # 12MB
  • worker_disable_rate_limits
    默認值:禁用(啓用速率限制)
    即便任務顯示設置了速率,仍然禁用全部速率限制

  • worker_state_db
    默認值:None
    存儲工做單元狀態的文件名稱(如取消的任務)。能夠是相對或者絕對路徑,可是注意後綴.db 可能會被添加到文件名後(依賴於python 的版本)

   也能夠經過celery worker –statedb 參數設置

  • worker_timer_precision
    默認值:1.0秒
    設置從新檢測調度器以前ETA調度器能夠休息的最大秒數

   設置成1意味着調度器精度將爲1秒。若是你須要毫秒精度,你能夠設置成 0.1

  • worker_enable_remote_control
    默認值:默認啓用
    聲明工做單元的遠程控制是否啓用

事件

  • worker_send_task_events
    默認值:默認禁用
    發送任務相關的事件,使得任務可使用相似flower 的工做監控到。爲工做單元的 -E 參數設置默認值

  • task_send_sent_event
    2.2 版本新特性
    默認值:默認禁用

   若是啓用,對於每一個任務都將有一個 task-sent 事件被髮送,所以任務在被消費前就能被追蹤。

  • event_queue_ttl
    支持的傳輸中間件: amqp
    默認值:5.0 秒
    發送到一個監控客戶端事件隊列的消息的過時時間(x-message-ttl),以秒爲單位(int/float)。

   例如:若是這個值設置爲10,被遞送到這個隊列的消息將會在10秒後被刪除

  • event_queue_expires
    支持的傳輸中間件: amqp
    默認值:60.0 秒
    一個監控客戶端事件隊列被刪除前的過時時間(x-expires)。

  • event_queue_prefix
    默認值: 「celeryev」.
    事件接收隊列名稱的前綴

  • event_serializer
    默認值: 「json」.
    當發送事件消息時使用的消息序列化格式

遠程控制命令

  • control_queue_ttl
    默認值: 300.0

  • control_queue_expires
    默認值: 10.0

日誌

  • worker_hijack_root_logger
    2.2 版本新特性
    默認值: 默認啓用 (hijack root logger).

   默認狀況下,任意前面配置的根日誌器的處理函數都將被移除。若是你想自定義日誌處理函數,那麼你能夠經過設置 worker_hijack_root_logger = False 來禁用這個行爲。

   注意:
     日誌能夠經過鏈接到 celery.signals.setup_logging 進行定製化

  • worker_log_color
    默認值: 若是應用實例日誌輸出到一個終端,這個將啓用
    啓用/禁用Celery 應用日誌輸出的顏色

  • worker_log_format
    默認值:

    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s  日誌信息的格式
    

  查看python 日誌模塊獲取更多關於日誌的信息

  • worker_task_log_format
    默認值:
[%(asctime)s: %(levelname)s/%(processName)s]
    [%(task_name)s(%(task_id)s)] %(message)s

任務中記錄日誌使用的格式。查看python 日誌模塊獲取更多關於日誌的信息

  • worker_redirect_stdouts
    默認值: 默認啓用
    若是啓用來,標準輸出和標準錯誤輸出將重定向到當前日誌器

工做單元和 beat 將使用到

  • worker_redirect_stdouts_level
    默認值:WARNING
    標準輸出和標準錯誤輸出的日誌級別。能夠是DEBUG, INFO, WARNING, ERROR, or CRITICAL

安全

  • security_key
    默認值: None.
    2.5 版本新特性

包含私鑰的文件的相對或者絕對路徑,私鑰用來在使用消息簽名時對消息進行簽名。

  • security_certificate
    默認值:None.
    2.5 版本新特性

  包含X.509認證的文件的相對或者絕對路徑,認證用來在使用消息簽名時對消息進行簽名。

  • security_cert_store
    默認值:None.
    2.5 版本新特性

  包含用來進行消息簽名的X.509認證的目錄。可使用文件名模式匹配(例如:/etc/certs/*.pem)

自定義組件類 (高級)

  • worker_pool
    默認值:」prefork」 (celery.concurrency.prefork:TaskPool).
    工做單元使用的池類的名稱

  • Eventlet/Gevent
    永遠不要使用這個選項來選擇用eventlet 仍是 gevent。你必須對工做單元使用-P選項,確保應急補丁不會應用過遲,致使出現奇怪的現象。

  • worker_pool_restarts
    默認值:默認禁用

   若是啓用,工做單元池可使用 pool_restart 遠程控制命令進行重啓

  • worker_autoscaler
    2.2 版本新特性
    默認值: 「celery.worker.autoscale:Autoscaler」.

使用的自動擴展類的名稱

  • worker_consumer
    默認值:」celery.worker.consumer:Consumer」.
    工做單元使用的消費類的名稱

  • worker_timer
    默認值:」kombu.async.hub.timer:Timer」.
    工做單元使用的 ETA 調度器類的名稱。默認值是被池具體實現設置。

Beat 設置 (celery beat)

  • beat_schedule
    默認值: {} (空映射)
    beat調度的週期性任務。查看Entries

  • beat_scheduler
    默認值:」celery.beat:PersistentScheduler」.
    默認的調度器類。若是同時使用django-celery-beat擴展,能夠設置成 「django_celery_beat.schedulers:DatabaseScheduler」

也能夠經過celery beat 的 -S 參數進行設置

  • beat_schedule_filename
    默認值: 「celerybeat-schedule」.
    存儲週期性任務最後運行時間的文件的名稱,這個文件被PersistentScheduler使用。能夠是相對或者絕對路徑,可是注意後綴.db可能添加到文件名後(依賴於python版本)

   也能夠經過 celery beat 的 –schedule 參數進行設置

  beat_sync_every
  默認值:0.
  另外一個數據庫同步發起前能夠執行的週期性任務的數量。值0(默認)表示基於時間同步 - 默認是3分鐘,由scheduler.sync_every肯定。若是設置成1,beat將在每一個任務消息發送後發起同步。

  beat_max_loop_interval
   默認值: 0.

 

轉自:https://blog.csdn.net/libing_thinking/article/details/78812472

相關文章
相關標籤/搜索