這篇文檔描述了可用的配置選項。html
若是你使用默認的加載器,你必須建立 celeryconfig.py 模塊而且保證它在python路徑中。node
如下是配置示例,你能夠從這個開始。它包括運行一個基本Celery應用的全部基礎設置。python
## Broker settings. broker_url = 'amqp://guest:guest@localhost:5672//' # List of modules to import when the Celery worker starts. imports = ('myapp.tasks',) ## Using the database to store task state and results. result_backend = 'db+sqlite:///results.db' task_annotations = {'tasks.add': {'rate_limit': '10/s'}}
4.0 版本引入了新的小寫設置名稱和機構環境。mysql
與之前版本的不一樣,除了設置項名稱變爲小寫字母外,還有一個前綴的重命名,例如 celerybeat_ 變爲 beat_,celeryd_ 變爲 worker,以及不少頂級 celery_ 設置重命名成了 task_ 前綴。git
Celery 仍然能讀取老的配置文件,因此並不倉促遷移到新的設置格式。github
Setting name | Replace with |
---|---|
CELERY_ACCEPT_CONTENT | accept_content |
CELERY_ENABLE_UTC | enable_utc |
CELERY_IMPORTS | imports |
CELERY_INCLUDE | include |
CELERY_TIMEZONE | timezone |
CELERYBEAT_MAX_LOOP_INTERVAL | beat_max_loop_interval |
CELERYBEAT_SCHEDULE | beat_schedule |
CELERYBEAT_SCHEDULER | beat_scheduler |
CELERYBEAT_SCHEDULE_FILENAME | beat_schedule_filename |
CELERYBEAT_SYNC_EVERY | beat_sync_every |
BROKER_URL | broker_url |
BROKER_TRANSPORT | broker_transport |
BROKER_TRANSPORT_OPTIONS | broker_transport_options |
BROKER_CONNECTION_TIMEOUT | broker_connection_timeout |
BROKER_CONNECTION_RETRY | broker_connection_retry |
BROKER_CONNECTION_MAX_RETRIES | broker_connection_max_retries |
BROKER_FAILOVER_STRATEGY | broker_failover_strategy |
BROKER_HEARTBEAT | broker_heartbeat |
BROKER_LOGIN_METHOD | broker_login_method |
BROKER_POOL_LIMIT | broker_pool_limit |
BROKER_USE_SSL | broker_use_ssl |
CELERY_CACHE_BACKEND | cache_backend |
CELERY_CACHE_BACKEND_OPTIONS | cache_backend_options |
CASSANDRA_COLUMN_FAMILY | cassandra_table |
CASSANDRA_ENTRY_TTL | cassandra_entry_ttl |
CASSANDRA_KEYSPACE | cassandra_keyspace |
CASSANDRA_PORT | cassandra_port |
CASSANDRA_READ_CONSISTENCY | cassandra_read_consistency |
CASSANDRA_SERVERS | cassandra_servers |
CASSANDRA_WRITE_CONSISTENCY | cassandra_write_consistency |
CELERY_COUCHBASE_BACKEND_SETTINGS | couchbase_backend_settings |
CELERY_MONGODB_BACKEND_SETTINGS | mongodb_backend_settings |
CELERY_EVENT_QUEUE_EXPIRES | event_queue_expires |
CELERY_EVENT_QUEUE_TTL | event_queue_ttl |
CELERY_EVENT_QUEUE_PREFIX | event_queue_prefix |
CELERY_EVENT_SERIALIZER | event_serializer |
CELERY_REDIS_DB | redis_db |
CELERY_REDIS_HOST | redis_host |
CELERY_REDIS_MAX_CONNECTIONS | redis_max_connections |
CELERY_REDIS_PASSWORD | redis_password |
CELERY_REDIS_PORT | redis_port |
CELERY_RESULT_BACKEND | result_backend |
CELERY_MAX_CACHED_RESULTS | result_cache_max |
CELERY_MESSAGE_COMPRESSION | result_compression |
CELERY_RESULT_EXCHANGE | result_exchange |
CELERY_RESULT_EXCHANGE_TYPE | result_exchange_type |
CELERY_TASK_RESULT_EXPIRES | result_expires |
CELERY_RESULT_PERSISTENT | result_persistent |
CELERY_RESULT_SERIALIZER | result_serializer |
CELERY_RESULT_DBURI | Use result_backend instead. |
CELERY_RESULT_ENGINE_OPTIONS | database_engine_options |
[…]_DB_SHORT_LIVED_SESSIONS | database_short_lived_sessions |
CELERY_RESULT_DB_TABLE_NAMES | database_db_names |
CELERY_SECURITY_CERTIFICATE | security_certificate |
CELERY_SECURITY_CERT_STORE | security_cert_store |
CELERY_SECURITY_KEY | security_key |
CELERY_TASK_ACKS_LATE | task_acks_late |
CELERY_TASK_ALWAYS_EAGER | task_always_eager |
CELERY_TASK_ANNOTATIONS | task_annotations |
CELERY_TASK_COMPRESSION | task_compression |
CELERY_TASK_CREATE_MISSING_QUEUES | task_create_missing_queues |
CELERY_TASK_DEFAULT_DELIVERY_MODE | task_default_delivery_mode |
CELERY_TASK_DEFAULT_EXCHANGE | task_default_exchange |
CELERY_TASK_DEFAULT_EXCHANGE_TYPE | task_default_exchange_type |
CELERY_TASK_DEFAULT_QUEUE | task_default_queue |
CELERY_TASK_DEFAULT_RATE_LIMIT | task_default_rate_limit |
CELERY_TASK_DEFAULT_ROUTING_KEY | task_default_routing_key |
CELERY_TASK_EAGER_PROPAGATES | task_eager_propagates |
CELERY_TASK_IGNORE_RESULT | task_ignore_result |
CELERY_TASK_PUBLISH_RETRY | task_publish_retry |
CELERY_TASK_PUBLISH_RETRY_POLICY | task_publish_retry_policy |
CELERY_TASK_QUEUES | task_queues |
CELERY_TASK_ROUTES | task_routes |
CELERY_TASK_SEND_SENT_EVENT | task_send_sent_event |
CELERY_TASK_SERIALIZER | task_serializer |
CELERYD_TASK_SOFT_TIME_LIMIT | task_soft_time_limit |
CELERYD_TASK_TIME_LIMIT | task_time_limit |
CELERY_TRACK_STARTED | task_track_started |
CELERYD_AGENT | worker_agent |
CELERYD_AUTOSCALER | worker_autoscaler |
CELERYD_CONCURRENCY | worker_concurrency |
CELERYD_CONSUMER | worker_consumer |
CELERY_WORKER_DIRECT | worker_direct |
CELERY_DISABLE_RATE_LIMITS | worker_disable_rate_limits |
CELERY_ENABLE_REMOTE_CONTROL | worker_enable_remote_control |
CELERYD_HIJACK_ROOT_LOGGER | worker_hijack_root_logger |
CELERYD_LOG_COLOR | worker_log_color |
CELERYD_LOG_FORMAT | worker_log_format |
CELERYD_WORKER_LOST_WAIT | worker_lost_wait |
CELERYD_MAX_TASKS_PER_CHILD | worker_max_tasks_per_child |
CELERYD_POOL | worker_pool |
CELERYD_POOL_PUTLOCKS | worker_pool_putlocks |
CELERYD_POOL_RESTARTS | worker_pool_restarts |
CELERYD_PREFETCH_MULTIPLIER | worker_prefetch_multiplier |
CELERYD_REDIRECT_STDOUTS | worker_redirect_stdouts |
CELERYD_REDIRECT_STDOUTS_LEVEL | worker_redirect_stdouts_level |
CELERYD_SEND_EVENTS | worker_send_task_events |
CELERYD_STATE_DB | worker_state_db |
CELERYD_TASK_LOG_FORMAT | worker_task_log_format |
CELERYD_TIMER | worker_timer |
CELERYD_TIMER_PRECISION | worker_timer_precision |
若是接收到一個消息,其內容類型再也不上述列表中,它將會被丟棄並拋出一個錯誤。redis
默認狀況下,任意內容類型都是啓用的,包括pickle以及yaml,因此確保不受信任的第三方不能訪問你的消息中間件。查看安全這一節獲取更多信息。算法
示例:sql
# using serializer name accept_content = ['json'] # or the actual content-type (MIME) accept_content = ['application/json']
一旦啓用,消息中的日期和時間將會轉化成 UTC 時區。mongodb
注意2.5版本如下的工做單元將會認爲全部消息都使用的本地時區,因此只有在全部的工做單元都升級了的狀況下再啓用這個特性。
設置Celery使用一個自定義的時區。這個時區值能夠是pytz庫支持的任意時區。
若是沒有設置,UTC時區將被使用。爲了向後兼容,還提供了一個 enable_utc設置,若是他設置成假,將使用系統本地時區。
如下將更改 tasks.add 任務的 rate_limit 屬性:
task_annotations = {‘tasks.add’: {‘rate_limit’: ‘10/s’}}
或者對全部的任務更改:
task_annotations = {‘*’: {‘rate_limit’: ‘10/s’}}
你還能夠更改方法,例如 on_failure 處理函數:
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print(‘Oh no! Task failed: {0!r}’.format(exc)) task_annotations = {‘*’: {‘on_failure’: my_on_failure}}
若是你須要更靈活的控制,那麼你可使用對象而不是字典來選擇任務來進行註解:
class MyAnnotate(object): def annotate(self, task): if task.name.startswith('tasks.'): return {'rate_limit': '10/s'} task_annotations = (MyAnnotate(), {other,})
默認發送未壓縮的消息。
協議 2 在 3.1.24 以及 4.x+ 被支持
另見:
Serializers
決定當鏈接丟失或者其餘鏈接錯誤時任務消息的發佈是否會重試,查看 task_publish_retry_policy。
定義當鏈接丟失或者其餘鏈接錯誤時任務消息的發佈重試策略。
也就是說,任務將會在本地執行而不是發送到隊列。
這與使用 apply() 帶 throw=True 參數有一樣的效果。
它須要 tblib 庫,能夠經過 pip 安裝:
$ pip install celery[tblib]
查看 Bundles 獲取關於組合多個擴展需求的信息。
task_ignore_result
默認值:禁用
是否存儲任務返回值(tombstones)。若是你只是想在發生錯誤的時候記錄返回值,能夠設置:task_store_errors_even_if_ignored
task_store_errors_even_if_ignored
默認值:禁用
若是設置了,即便 Task.ignore_result 啓用了,工做單元也會愛結果後端中存儲全部的任務錯誤。
task_track_started
默認值:禁用
若是設置成真,當任務被工做單元執行時,任務將報告它的狀態爲started。默認值是假,由於一般行爲是不作這種粒度級別的彙報。任務會處於 pending、finished 或者 waiting to be retried。當有長時間任務,而且須要知道當前正在運行什麼任務時,有一個 started狀態將會頗有用。
task_time_limit
默認值:沒有時間限制
任務的硬時間限制,以秒爲單位。若是這個時間限制被超過,處理任務的工做單元進程將會被殺死並使用一個新的替代。
task_soft_time_limit
默認值:沒有時間限制
任務的軟時間限制,以秒爲單位
當這個時間限制超事後,SoftTimeLimitExceeded異常將會被拋出。例如,任務能夠捕獲這個異常在硬時間限制到達以前對環境進行清理:
from celery.exceptions import SoftTimeLimitExceeded @app.task def mytask(): try: return do_work() except SoftTimeLimitExceeded: cleanup_in_a_hurry()
另見:
FAQ: Shoud I use retry or acks_late
將這個設置成真可讓消息從新入隊,因此任務將會被再執行,在同一個工做單元或者另一個工做單元。
告警:
啓用這個可能致使消息循環;確保你知道你在作什麼
當任務沒有一個自定義的速率限制時,這個值將被使用
另見:
worker_disable_rate_limits 設置能夠禁用全部的速率限制
result_backend
默認值:默認不啓用結果後端
用來存儲結果的後端。能夠是下列之一:
rpc
以 AMQP 消息形式發送結果。查看 RPC 後端設置
database
使用一個 SQLAlchemy 支持的結構化數據庫。查看數據庫後端設置
redis
使用 Redis 存儲結果。查看 Redis 後端設置
cache
使用 Memcached 存儲結果。查看 Cache 後端設置
cassandra
使用 Cassandra 存儲結果。查看 Cassandra 後端設置
elasticsearch
使用 Elasticsearch 存儲結果。查看 Elasticsearch 後端設置
ironcache
使用 IronCache 存儲結果。查看 IronCache 後端設置
couchbase
使用 Couchbase 存儲結果。查看 Couchbase 後端設置
couchdb
使用 CouchDB 存儲結果。查看 CouchDB 後端設置
filesystem
使用共享文件夾存儲結果。查看 File-system 後端設置
consul
使用 Consul K/V 存儲結果。查看 Consul K/V 後端設置
result_serializer
默認值:從4.0版本開始使用 json(更早:pickle)
查看 Serializers 獲取支持的序列化格式的信息。
result_compression
默認值:無壓縮
結果值得可選壓縮方法。支持 task_seralizer 設置相同的選項。
result_expires
默認值:1天后過時
存儲的結果被刪除的時間(秒數,或者一個 timedelta 對象)
(有一個內建的週期性任務將刪除過時的任務結果(celery.backend_cleanup),前提是 celery beat 已經被啓用。這個任務天天上午4點運行。
值 None 或者 0 意思是結果永不刪除(取決於後端聲明))
注意:
當前這個特性只支持 AMQP, database, cache, Redis 這些存儲後端。當使用 database 存儲後端,celery beat必須執行使得過時結果被刪除。
對於老的 amqp 後端,存儲結果一旦被消費它將再也不可用,此時這個特性將起到做用。
這是老的結果被刪除以前總的結果緩存的數量。值 0 或者 None 意味着沒有限制,而且值 -1 將禁用緩存。
Database URL 示例
使用一個數據庫存儲後端,你必須配置 result_backend 設置爲一個鏈接的URL,而且帶 db+ 前綴:
result_backend = 'db+scheme://user:password@host:port/dbname'
示例:
# sqlite (filename) result_backend = 'db+sqlite:///results.sqlite' # mysql result_backend = 'db+mysql://scott:tiger@localhost/foo' # postgresql result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase' # oracle result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'
查看 Supported Databases 獲取支持的數據庫的一個表,查看 Connection String 獲取相關的鏈接字符串(這是 db+ 前綴後帶的URI的一部分)
# echo enables verbose logging from SQLAlchemy. app.conf.database_engine_options = {'echo': True}
database_short_lived_sessions
默認值:默認禁用
默認禁用短會話。若是啓用了,他們會急劇的下降性能,特別是對於處理不少任務的系統。當工做單元的流量很低,緩存的數據庫鏈接會因爲空閒而變爲無用,進而會致使工做單元出錯,這種狀況下這個選項是有用的。例如:間歇性的錯誤如(OperationalError)(2006, ‘MySQL server has gone away’)經過啓用短會話能解決。這個選項隻影響數據庫後端。
database_table_names
默認值:{} (空映射)
當 SQLAlchemy 設置成結果後端, Celery 自動建立兩個表來存儲任務的元數據。這個設置容許你自定義表名稱:
# use custom table names for the database result backend. database_table_names = { 'task': 'myapp_taskmeta', 'group': 'myapp_groupmeta', }
配置示例:
result_backend = 'rpc://' result_persistent = False
注意:
緩存後端支持 pylibmc 和 python-memcached 庫。後者只有在 pylibmc 沒有安裝時纔會被使用。
使用一個 Memcached 服務器:
result_backend = 'cache+memcached://127.0.0.1:11211/'
使用多個 Memcached 服務器:
result_backend = """ cache+memcached://172.19.26.240:11211;172.19.26.242:11211/ """.strip()
「memory」 後端只在內存中存儲緩存:
result_backend = 'cache' cache_backend = 'memory'
cache_backend_options = { 'binary': True, 'behaviors': {'tcp_nodelay': True}, }
注意:
Redis 後端須要 Redis 庫。
可使用 pip 安裝這個包:
$ pip install celery[redis]
查看 Bundles 獲取組合多個擴展需求的信息
後端須要 result_backend 設置成一個 Redis URL:
result_backend = 'redis://:password@host:port/db'
例如:
result_backend = 'redis://localhost/0'
等同於:
result_backend = 'redis://'
URL 的字段以下定義:
1. password
鏈接數據庫的密碼
2. host
Redis 服務器的主機名或者IP地址(例如:localhost)
3. port
Redis 服務器的端口。默認是 6379
4. db
使用的數據庫編號。默認是0。db 能夠包含一個可選的斜槓
redis_backend_us_ssl
默認值:禁用
Redis後端支持 SSL。這個選項的合法值與 broker_use_ssl 相同
redis_max_connections
默認值:無顯示
Redis 鏈接池的最大可用鏈接數,這些鏈接用來發送和接收結果
redis_socket_connect_timeout
5.0.1版本新特性
默認值:None
從存儲後端鏈接到Redis服務器的鏈接的Socket超時時間(以秒爲單位,int/float)
Cassandra 後端設置
注意:
Cassandra 後端驅動 cassandra-driver。
使用 pip 安裝:
$ pip install celery[cassandra]
查看 Bundles 獲取關於組合擴展需求的信息。
後端須要配置下列配置指令
cassandra_servers = ['localhost']
cassandra_port
默認值:9042.
鏈接到Cassandra服務器的端口
cassandra_keyspace
默認值: None.
存儲結果的 key-space。例如:
cassandra_keyspace = 'tasks_keyspace'
cassandra_table = 'tasks'
cassandra_read_consistency
默認值: None.
使用的讀一致性。值能夠是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE
cassandra_write_consistency
默認值: None.
使用的寫一致性。值能夠是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE
cassandra_entry_ttl
默認值: None.
狀態項的 Time-to-live。添加事後一段時間他們將會過時而且被刪除。值 None (默認) 意味着他們永不過時
cassandra_auth_provider
默認值: None.
使用的 cassandra.auth 模塊中的 AuthProvider。 值能夠是 PlainTextAuthProvider 或者 SaslAuthProvider
cassandra_auth_kwargs
默認值: {} (空映射)
傳遞給 authentication provider 的命名參數。例如:
cassandra_auth_kwargs = { username: 'cassandra', password: 'cassandra' }
配置示例:
cassandra_servers = ['localhost'] cassandra_keyspace = 'celery' cassandra_table = 'tasks' cassandra_read_consistency = 'ONE' cassandra_write_consistency = 'ONE' cassandra_entry_ttl = 86400
使用 Elasticsearch 做爲結果後端,你只須要將result_backend設置成正確的 URL。
配置示例:
result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
elasticsearch_retry_on_timeout
默認值: False
超時後是否應該觸發在另外一個節點重試?
elasticsearch_max_retries
默認值: 3
異常被傳遞前的最大重試次數
elasticsearch_timeout
默認值: 10.0 秒
elasticsearch 使用的全局超時時間
注意:
Riak 後端須要 riak 庫
使用 pip 進行安裝:
$ pip install celery[riak]
查看 Bundles 獲取組合多擴展需求的信息。
後端須要result_backend設置成一個 Riak URL:
result_backend = 'riak://host:port/bucket'
例如:
result_backend = 'riak://localhost/celery
等同於:
result_backend = 'riak://'
URL 的字段定義以下:
1. host
Riak 服務器的主機名或者IP地址(例如 localhost)
2. port
使用 protobuf 協議的Riak 服務器端口,默認是 8087
3. bucket
使用的Bucket名稱。默認是 celery。bucket 名稱須要是一個只包含ASCII字符的字符串。
另外,這個後端可使用以下配置指令進行配置:
riak_backend_settings
默認值: {} (空映射)
這是一個支持以下鍵的映射:
host
Riak 服務器的主機名或者IP地址(例如 localhost)
port
Riak 服務器端口。默認是 8087
bucket
使用的Bucket名稱。默認是 celery。bucket 名稱須要是一個只包含ASCII字符的字符串。
protocol
鏈接到 Riak 服務器使用的協議。這不能夠經過 result_backend 配置
注意:
Dynamodb 後端須要 boto3 庫
使用 pip 進行安裝:
$ pip install celery[dynamodb]
查看 Bundles 獲取組合多擴展需求的信息。
後端須要 result_backend 設置成一個 DynamoDB URL:
result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'
例如,聲明 AWS 區域以及表名稱:
result_backend = 'dynamodb://@us-east-1/celery_results
或者從環境中獲取 AWS 配置參數,使用默認表名稱(celery)以及聲明讀寫吞吐量:
result_backend = 'dynamodb://@/?read=5&write=5'
或者在本地使用 DynamoDB 的可下載版本:
result_backend = 'dynamodb://@localhost:8000
URL 中的字段以下定義:
aws_access_key_id & aws_secret_access_key
訪問 AWS API 資源的認證信息。這能夠經過 boto3 從不一樣的源獲取到
region
AWS 區域,例如: us-east-1 或者本地版本的 localhost。查看 boto3 庫文檔獲取更多的信息。
port
若是你使用的本地版本,這是本地DynamoDB示例監聽的端口。若是你沒有把區域設置成 localhost,這個設置選項將無效
table
使用的表名。默認是 celery。查看 DynamoDB 命名規則獲取容許的字符以及表名長度的信息。
read & write
所建立的 DynamoBD 表的讀寫能力單元。默認的讀寫值都是 1。更多的細節能夠從 Provisioned Throughput documentation 中獲取到。
注意:
IronCache 後端須要 iron_celery 庫:
使用 pip 進行安裝:
$ pip install iron_celery
IronCache 經過在 result_backend 中配置的 URL 進行聲明,例如:
result_backend = 'ironcache://project_id:token@'
或者更改緩存名稱:
ironcache:://project_id:token@/awesomecache
更多的信息,查看 https://github.com/iron-io/iron_celery
注意:
Couchbase 後端須要 couchbase 庫
使用 pip 進行安裝:
$ pip install celery[couchbase]
查看 Bundle 獲取組合多擴展需求的步驟。
後端能夠經過 result_backend 設置成一個 Couchbase URL:
result_backend = 'couchbase://username:password@host:port/bucket'
默認值:{} (空映射)
這是一個支持以下鍵的映射:
注意:
CouchDB 後端須要 pycouchdb 庫:
使用 pip 安裝這個包:
$ pip install celery[couchdb]
查看 Bundles 獲取更多關於組合多擴展需求的信息
後端能夠經過 result_backend 配置成一個 CouchDB URL:
result_backend = 'couchdb://username:password@host:port/container'
URL 由如下部分組成:
後端能夠經過一個文件 URL 配置,例如:
CELERY_RESULT_BACKEND = 'file:///var/celery/results'
配置的目錄須要被共享,而且全部使用該後端的服務器均可寫。
若是你在單獨的一個系統上使用 Celery,你不須要任何進一步的配置就能夠簡單的使用這個後端。對於大型的集羣,你可使用 NFS、GlusterFS、CIFS、HDFS(使用FUSE),或者其餘文件系統。
Consul 後端能夠經過 URL 配置:
CELERY_RESULT_BACKEND = ‘consul://localhost:8500/’
後端將在 Consul K/V 存儲中做爲單獨鍵存儲結果
後端使用Consul 中的 TTLs 支持結果的自動過時
若是你真的須要配置高級路由,這個設置應該是一個 kombu.Queue 對象的列表,工做單元能夠從中消費。
注意工做單元能夠經過 -Q 選項覆蓋這個設置,或者這個列表中的單獨隊列能夠經過 -X 選項進行排除。
查看 Basics 獲取更多的信息。
默認值是 celery 隊列的一個隊列/消息交換器/綁定的鍵,消息交互類型是direct。
查看 task_routes
一個路由器能夠經過以下方式聲明:
(name, args, kwargs, options, task=None, **kwargs)
celery.routes.MapRoute
實例(pattern, route)
元組的列表,它將會轉化成一個 celery.routes.MapRoute
實例 task_routes = { 'celery.ping': 'default', 'mytasks.add': 'cpu-bound', 'feed.tasks.*': 'feeds', # <-- glob pattern re.compile(r'(image|video)\.tasks\..*'): 'media', # <-- regex 'video.encode': { 'queue': 'video', 'exchange': 'media' 'routing_key': 'media.video.encode', }, } task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default}) 其中,myapp.tasks.route_task 能夠是: def route_task(self, name, args, kwargs, options, task=None, **kw): if task == 'celery.ping': return {'queue': 'default'}
route_task 能夠返回一個字符串或者一個字典。一個字符串表示 task_queues 中的一個隊列名,而字典表示一個自定義的路由。
當發送消息,路由被按順序詢問。第一個返回非 None 值得路由將被使用。消息選項此時將與找到的路由設置合併,其中路由器的設置要優先。
例如: apply_async() 有這些參數:
Task.apply_async(immediate=False, exchange='video', routing_key='video.compress')
而且有一個路由器返回:
{'immediate': True, 'exchange': 'urgent'}
那麼最終的消息選項將是:
immediate=True, exchange='urgent', routing_key='video.compress'
(以及Task類中定義的任意默認消息選項)
當進行合併時,task_routes 中定義的值會優先於 task_queues 中定義的值。
對於以下設置:
task_queues = { 'cpubound': { 'exchange': 'cpubound', 'routing_key': 'cpubound', }, } task_routes = { 'tasks.add': { 'queue': 'cpubound', 'routing_key': 'tasks.add', 'serializer': 'json', }, }
tasks.add 的最終路由選項將變爲:
{'exchange': 'cpubound', 'routing_key': 'tasks.add', 'serializer': 'json'}
查看路由器獲取更多的示例。
task_queue_ha_policy = 'all'
使用 all 將複製隊列到全部的當前節點,或者你指定一個節點的列表:
task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']
使用一個列表將隱示設置 x-ha-policy爲‘nodes,x-ha-policy-params` 爲給定的節點列表
查看 http://www.rabbitmq.com/ha.html 獲取更多的信息
task_queue_max_priority
消息中間件: RabbitMQ
默認值: None
查看 RabbitMQ Message Priorities
worker_direct
默認值: 禁用
這個選項使得每一個工做單元又一個專門的隊列,因此任務能夠路由到指定的工做單元。
每一個工做單元的隊列名稱是基於工做單元主機名和一個 .dq後綴自動產生的,使用 C.dq 消息交互器。
例如:節點名稱爲 w1@example.com 的工做單元的隊列名稱爲:
w1@example.com.dq
此時,你能夠經過指定主機名爲路由鍵而且使用 C.dq 消息交互器來將任務路由到指定的節點。
task_routes = { 'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'} }
task_create_missing_queues
默認值:啓用
若是啓用(默認),任何聲明的未在 task_queues 中未定義的隊列都將自動被建立。查看 Automaci routing。
task_default_queue
默認值: celery
若是消息沒有聲明路由或者自定義的隊列,apply_async 默認使用的隊列名稱。
這個隊列必須在 task_queues 中。若是 task_queues 沒有聲明,那麼他將自動建立一個隊列項,而這個設置值就做爲隊列的名稱。
另見:
修改默認隊列的名稱
task_default_exchange
默認值:」celery」
當 task_queues 設置中指定鍵沒有聲明自定義的消息交互器,那麼這個默認的消息交互器將被使用。
task_default_exchange_type
默認值:」direct」
當 task_queues 設置中指定鍵沒有聲明自定義的消息交互器類型,那麼這個默認的消息交互器類型將被使用。
task_default_routing_key
默認值:」celery」
當 task_queues 設置中指定鍵沒有聲明自定義的路由鍵,那麼這個默認的路由鍵將被使用。
task_default_delivery_mode
默認值:」presistent」
能夠是瞬態的(消息不寫硬盤),或者持久的(寫硬盤)
transport://userid:password@hostname:port/virtual_host
其中只有模式部分是必須的,其他部分都是可選的,默認會設置爲對應傳輸中間件的默認值。
傳輸部分是使用的消息中間件的實現,默認是 amqp,(若是安裝了librabbitmq會使用這個庫,不然使用pyamqp)。還有其餘可用的選擇,包括 redis://、 sqs://、 qpid://。
模式部分能夠是你本身的傳輸中間件實現的全限定路徑:
broker_url = 'proj.transports.MyTransport://localhost'
能夠配置多個消息中間件,使用相同的傳輸協議也行。消息中間件能夠經過當個字符串聲明,不一樣的消息中間件URL之間用冒號分隔:
broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'
或者做爲一個列表:
broker_url = [ 'transport://userid:password@localhost:port//', 'transport://userid:password@hostname:port//' ]
這些消息中間件將被用於broker_failover_strategy
查看Kombu 文檔中的 URLs 章節獲取更多的信息。
示例:
broker_read_url = 'amqp://user:pass@broker.example.com:56721' broker_write_url = 'amqp://user:pass@broker.example.com:56722'
全部選項均可以聲明成一個列表,做爲故障恢復的可選值,查看 broker_url 獲取更多的信息
示例:
# Random failover strategy def random_failover_strategy(servers): it = list(servers) # don't modify callers list shuffle = random.shuffle for _ in repeat(None): shuffle(it) yield it[0] broker_failover_strategy = random_failover_strategy
注意:這個值只被工做單元使用,客戶端此時不使用心跳。
由於單純使用 TCP/IP 並不老是及時探測到鏈接丟失,因此 AMQP 定義了心跳,客戶端和消息中間件用來檢測鏈接是否關閉。
心跳會被監控,若是心跳值是 10 秒,那麼檢測心跳的時間間隔是 10 除以broker_heartbeat_checkrate (默認狀況下,這個值是心跳值的兩倍,因此對於10秒心跳,心跳每隔5秒檢測一次)
工做單元會間隔監控消息中間件沒有丟失過多的心跳。這個檢測的速率是用 broker_heartbeat 值除以這個設置值獲得的,因此若是心跳是 10.0 而且這個設置值是默認的2.0,那麼這個監控將每隔5秒鐘執行一次(心跳發送速率的兩倍)
在消息中間件鏈接上使用SSL
這個選項的合法值依據使用的傳輸協議的不一樣而不一樣
注意SSL套接字通常會在消息中間件的一個單獨的端口上服務。
如下示例提供了客戶端證書,而且使用一個自定義的認證受權來驗證服務器證書:
import ssl broker_use_ssl = { 'keyfile': '/var/ssl/private/worker-key.pem', 'certfile': '/var/ssl/amqp-server-cert.pem', 'ca_certs': '/var/ssl/myca.pem', 'cert_reqs': ssl.CERT_REQUIRED }
告警:
使用 broker_use_ssl=True 時請當心。可能你的默認配置根本不會驗證服務器證書。請閱讀python的 ssl module security considerations。
ssl_cert_reqs (required): one of the SSLContext.verify_mode values: ssl.CERT_NONE ssl.CERT_OPTIONAL ssl.CERT_REQUIRED ssl_ca_certs (optional): path to the CA certificate ssl_certfile (optional): path to the client certificate ssl_keyfile (optional): path to the client key
默認值:10
鏈接池中能夠打開最大鏈接數。
從2.5版本開始鏈接池被默認啓用,默認限制是10個鏈接。這個數值能夠依據使用一個鏈接的 threads/green-threads (eventlet/gevent) 數量進行更改。例如:運行 eventlet 啓動 1000 個 greenlets,他們使用一個鏈接到消息中間件,若是發生競態條件,那麼你應該開始增長這個限制。
若是設置成None或者0,鏈接池將會被禁用,而且每次使用鏈接都會從新創建鏈接並關閉。
broker_connection_timeout
默認值:4.0
放棄與AMQP服務器創建鏈接以前默認等待的超時時間。當使用 gevent 時該設置被禁用。
broker_connection_retry
默認值:啓用
若是與 AMQP 消息中間件的鏈接斷開,將自動從新創建鏈接
每次重試中間等待的時間會遞增,而且在 broker_connection_max_retries 未達到以前會一隻重試
若是設置成 0 或者 None,將一直重試
broker_login_method
默認值:AMQPLAIN
設置自定義的 amqp 登錄方法
broker_transport_options
2.2 版本新特性
默認值:{} (空映射)
傳遞給底層傳輸中間件的一個附加選項的字典
設置可見超時時間的示例以下(Redis 與 SQS 傳輸中間件支持):
broker_transport_options = {‘visibility_timeout’: 18000} # 5 hours
工做單元
這用來聲明要導入的模塊,可是它還可用來導入信號處理函數和附加的遠程控制命令,等等。
這些模塊將會以原來聲明的順序導入
這個設置中的模塊是在 imports 設置中的模塊導入以後才導入
若是你大部分操做是I/O操做,你能夠設置更多的進程(線程),可是大部分狀況下都是以CPU數做爲定界,嘗試讓這個值接近你機器的CPU核數。若是沒有設置,當前機器的 CPU核數將會被使用
禁用這個選項,只要將 worker_prefetch_multiplier 設置成 1。設置成 0 將容許工做單元持續消費它想要的儘量多的消息。
更詳細的信息,請閱讀 Prefetch Limits
注意:
帶 ETA/countdown 的任務不會受 prefetch 限制的影響
worker_lost_wait
默認值:10.0 秒
有些狀況下,工做單元可能在沒有適當清理的狀況下就被殺死,而且工做單元可能在終止前已經發布了一個結果。這個值聲明瞭在拋出 WorkerLostError 異常以前咱們會在丟失的結果值上等待多久
worker_max_tasks_per_child
一個工做單元進程在被一個新的進程替代以前能夠執行的最大任務數
worker_max_memory_per_child
默認值:沒有限制。類型:int(kilobytes)
一個工做單元進程在被一個新的進程替代以前能夠消耗的最大預留內存(單位KB)。若是單獨一個任務就致使工做單元超過這個限制,當前的任務會執行完成,而且以後這個進程將會被更新替代。
示例:
worker_max_memory_per_child = 12000 # 12MB
worker_disable_rate_limits
默認值:禁用(啓用速率限制)
即便任務顯示設置了速率,仍然禁用全部速率限制
worker_state_db
默認值:None
存儲工做單元狀態的文件名稱(如取消的任務)。能夠是相對或者絕對路徑,可是注意後綴.db 可能會被添加到文件名後(依賴於python 的版本)
也能夠經過celery worker –statedb 參數設置
設置成1意味着調度器精度將爲1秒。若是你須要毫秒精度,你能夠設置成 0.1
worker_send_task_events
默認值:默認禁用
發送任務相關的事件,使得任務可使用相似flower 的工做監控到。爲工做單元的 -E 參數設置默認值
task_send_sent_event
2.2 版本新特性
默認值:默認禁用
若是啓用,對於每一個任務都將有一個 task-sent 事件被髮送,所以任務在被消費前就能被追蹤。
例如:若是這個值設置爲10,被遞送到這個隊列的消息將會在10秒後被刪除
event_queue_expires
支持的傳輸中間件: amqp
默認值:60.0 秒
一個監控客戶端事件隊列被刪除前的過時時間(x-expires)。
event_queue_prefix
默認值: 「celeryev」.
事件接收隊列名稱的前綴
event_serializer
默認值: 「json」.
當發送事件消息時使用的消息序列化格式
control_queue_ttl
默認值: 300.0
control_queue_expires
默認值: 10.0
默認狀況下,任意前面配置的根日誌器的處理函數都將被移除。若是你想自定義日誌處理函數,那麼你能夠經過設置 worker_hijack_root_logger = False 來禁用這個行爲。
注意:
日誌能夠經過鏈接到 celery.signals.setup_logging 進行定製化
worker_log_color
默認值: 若是應用實例日誌輸出到一個終端,這個將啓用
啓用/禁用Celery 應用日誌輸出的顏色
worker_log_format
默認值:
[%(asctime)s: %(levelname)s/%(processName)s] %(message)s 日誌信息的格式
查看python 日誌模塊獲取更多關於日誌的信息
[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s
任務中記錄日誌使用的格式。查看python 日誌模塊獲取更多關於日誌的信息
工做單元和 beat 將使用到
包含私鑰的文件的相對或者絕對路徑,私鑰用來在使用消息簽名時對消息進行簽名。
包含X.509認證的文件的相對或者絕對路徑,認證用來在使用消息簽名時對消息進行簽名。
包含用來進行消息簽名的X.509認證的目錄。可使用文件名模式匹配(例如:/etc/certs/*.pem)
自定義組件類 (高級)
worker_pool
默認值:」prefork」 (celery.concurrency.prefork:TaskPool).
工做單元使用的池類的名稱
Eventlet/Gevent
永遠不要使用這個選項來選擇用eventlet 仍是 gevent。你必須對工做單元使用-P選項,確保應急補丁不會應用過遲,致使出現奇怪的現象。
worker_pool_restarts
默認值:默認禁用
若是啓用,工做單元池可使用 pool_restart 遠程控制命令進行重啓
使用的自動擴展類的名稱
worker_consumer
默認值:」celery.worker.consumer:Consumer」.
工做單元使用的消費類的名稱
worker_timer
默認值:」kombu.async.hub.timer:Timer」.
工做單元使用的 ETA 調度器類的名稱。默認值是被池具體實現設置。
beat_schedule
默認值: {} (空映射)
beat調度的週期性任務。查看Entries
beat_scheduler
默認值:」celery.beat:PersistentScheduler」.
默認的調度器類。若是同時使用django-celery-beat擴展,能夠設置成 「django_celery_beat.schedulers:DatabaseScheduler」
也能夠經過celery beat 的 -S 參數進行設置
也能夠經過 celery beat 的 –schedule 參數進行設置
beat_sync_every
默認值:0.
另外一個數據庫同步發起前能夠執行的週期性任務的數量。值0(默認)表示基於時間同步 - 默認是3分鐘,由scheduler.sync_every肯定。若是設置成1,beat將在每一個任務消息發送後發起同步。
beat_max_loop_interval
默認值: 0.
轉自:https://blog.csdn.net/libing_thinking/article/details/78812472