celery官方入門文檔:http://docs.celeryproject.org/en/master/getting-started/index.htmlhtml
celery是用python開發的分佈式任務調度模塊,自己不包含消息服務,須要接入第三方的,好比RabbitMQ,Redis,還有其餘一些,但不建議生產環境使用node
下面的以redis做爲Broker,Backend來演示python
pip install celery
使用redis做爲Broker時,須要再安裝一個 celery-with-redislinux
pip install celery-with-redis
編寫tasks.pyredis
from celery import Celery # redis URL: redis://:password@host:port/db_number app = Celery('tasks', broker='redis://:cds-china@172.20.3.3:6379/0', backend='redis://:cds-china@172.20.3.3:6379/1') @app.task def add(x, y): return x + y
cd到tasks.py所在目錄,啓動celery處理任務json
celery -A tasks worker --loglevel=info
若是啓動不了,報錯以下的話,即不建議root用戶啓動ubuntu
root@ubuntu:~/zq# celery -A tasks worker --loglevel=info Running a worker with superuser privileges when the worker accepts messages serialized with pickle is a very bad idea! If you really want to continue then you have to set the C_FORCE_ROOT environment variable (but please think about this before you do). User information: uid=0 euid=0 gid=0 egid=0
若是實在要作的話,下面2種方式解決後端
1. 代碼裏修改 from celery import platforms platforms.C_FORCE_ROOT = True 2. 設置變量C_FORCE_ROOT export C_FORCE_ROOT="true"
再次啓動,以下所示的話,則正常啓動安全
root@ubuntu:~/zq# celery -A tasks worker --loglevel=info /usr/local/lib/python2.7/dist-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is absolutely not recommended! Please specify a different user using the -u option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, [2017-07-05 14:18:07,713: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/apps/worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@ubuntu v3.1.23 (Cipater) ---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f58957a9f50 - ** ---------- .> transport: redis://:**@172.20.3.3:6379/0 - ** ---------- .> results: redis://:**@172.20.3.3:6379/1 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2017-07-05 14:18:07,895: INFO/MainProcess] Connected to redis://:**@172.20.3.3:6379/0 [2017-07-05 14:18:07,910: INFO/MainProcess] mingle: searching for neighbors [2017-07-05 14:18:08,917: INFO/MainProcess] mingle: all alone [2017-07-05 14:18:08,928: WARNING/MainProcess] celery@ubuntu ready.
再開一個終端bash
root@ubuntu:~/zq# python Python 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> r=add.delay(1,1) >>> r.ready() True >>> r.get() 2 >>> r.id # task id 'bf779b9e-c779-4004-9864-68763090d560' >>> r=add.delay(2) >>> r.get() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 175, in get raise meta['result'] TypeError: add() takes exactly 2 arguments (1 given) >>> r.get(propagate=False) TypeError('add() takes exactly 2 arguments (1 given)',) >>> r.failed() True >>> r.successful() False >>> r.state 'FAILURE' # 調用任務,用delay()方法來調 # 這是apply_async()的快捷方式,該方法容許你更好地控制任務執行 # 調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(若是任務失敗,則爲異常和回溯)。 但這個功能默認是不開啓的,你須要設置一個 Celery 的結果後端 # state變遷:PENDING -> STARTED -> SUCCESS
此時worker服務端會出現以下log
[2017-07-05 14:29:20,165: INFO/MainProcess] Received task: tasks.add[e5737959-410f-4506-86f3-0b03ed359f00] [2017-07-05 14:29:22,170: INFO/MainProcess] Task tasks.add[e5737959-410f-4506-86f3-0b03ed359f00] succeeded in 2.00330784172s: 2 第一條顯示收到了消息 第二條執行了task,返回2
若是不設置backend的話,則調用.ready() .get()時會報
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
celery有許多配置均可以設置,雖然默認的配置都已經夠好了
兩種方式:
1. 直接代碼裏指定
單獨指定一個配置
app.conf.CELERY_TASK_SERIALIZER = 'json'
一次指定多個配置
app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
2. 模塊方式,單獨寫在配置文件裏
app.config_from_object('celeryconfig')
配置模塊一般稱爲 celeryconfig ,你也可使用任意的模塊名。config_from_object('celeryconfig')即會從當前目錄或系統路徑尋找celeryconfig.py文件。
celeryconfig.py
BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Europe/Oslo' CELERY_ENABLE_UTC = True
可經過
python -m celeryconfig
檢查是否配置文件有語法格式錯誤
# 後臺啓動, 默認pidfile logfile都在當前目錄 $ celery multi start 1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log # 重啓 $ celery multi restart 1 --pidfile=/var/run/celery/%n.pid # 中止 $ celery multi stop 1 -A proj -l info # 安全中止,中止前等待全部的任務完成 $ celery multi stopwait 1 -A proj -l info 啓動 root@ubuntu:~/zq# celery multi start 1 -A proj -l info -c4 --pidfile=%n.pid celery multi v3.1.23 (Cipater) > Starting nodes... > celery1@ubuntu: OK 啓動後會看到進程 root@ubuntu:~/zq# ps aux | grep cel root 24103 5.1 0.0 80364 24992 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24110 0.0 0.0 78904 20796 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24111 0.0 0.0 78904 20732 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24112 0.0 0.0 78904 20740 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu root 24113 0.0 0.0 78904 20816 ? S 15:48 0:00 /usr/bin/python -m celery worker -c4 -A proj -l info --logfile=celery1.log --pidfile=1.pid --hostname=celery1@ubuntu 重啓 root@ubuntu:~/zq# celery multi restart 1 --pidfile=1.pid celery multi v3.1.23 (Cipater) > Stopping nodes... > celery1@ubuntu: TERM -> 24103 > Waiting for 1 node -> 24103..... > celery1@ubuntu: OK > Restarting node celery1@ubuntu: OK 中止 root@ubuntu:~/zq# celery multi stop 1 -A proj -l info celery multi v3.1.23 (Cipater) > celery1@ubuntu: DOWN
查看運行時的一些信息
以上全部命令均可加上-d (--destination)選項,查看具體的一個或多個工做單元,不然命令將會發送到全部的工做單元。
工做單元:
好比前面經過 celery -A tasks worker --loglevel=info 就是啓動一個工做單元
也能夠一臺機器上啓動多個工做單元,經過--hostname聲明名稱
celery -A tasks worker --loglevel=info -n worker1@%h celery -A tasks worker --loglevel=info -n worker2@%h celery -A tasks worker --loglevel=info -n worker3@%h hostname 參數可使用如下變量擴展: - %h: 主機名,包含域名 - %n: 主機名 - %d: 域名 示例見下: Variable Template Result %h worker1@%h worker1@george.example.com %n worker1@%n worker1@george %d worker1@%d worker1@example.com
指定一個工做單元查看活動的任務
celery -A tasks inspect -d worker1@ubuntu active
所有示例見下,如下都沒有指定工做單元:
root@ubuntu:~/zq# celery -A tasks inspect active -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect scheduled -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect reserved -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect revoked -> celery@ubuntu: OK - empty - root@ubuntu:~/zq# celery -A tasks inspect registered # 這裏只有一個add的測試任務 -> celery@ubuntu: OK * tasks.add root@ubuntu:~/zq# celery -A tasks inspect stats # 查看當前狀態, broker,pool等 -> celery@ubuntu: OK { "broker": { "alternates": [], "connect_timeout": 4, "heartbeat": null, "hostname": "172.20.3.3", "insist": false, "login_method": null, "port": 6379, "ssl": false, "transport": "redis", "transport_options": {}, "uri_prefix": null, "userid": null, "virtual_host": "0" }, "clock": "2836", "pid": 25209, "pool": { "max-concurrency": 8, "max-tasks-per-child": "N/A", "processes": [ 25221, 25222, 25223, 25224, 25225, 25226, 25227, 25228 ], "put-guarded-by-semaphore": false, "timeouts": [ 0, 0 ], "writes": { "all": "33.33%, 33.33%, 33.33%", "avg": "33.33%", "inqueues": { "active": 0, "total": 8 }, "raw": "1, 1, 1", "total": 3 } }, "prefetch_count": 32, "rusage": { "idrss": 0, "inblock": 24, "isrss": 0, "ixrss": 0, "majflt": 0, "maxrss": 25016, "minflt": 24667, "msgrcv": 0, "msgsnd": 0, "nivcsw": 280, "nsignals": 0, "nswap": 0, "nvcsw": 6511, "oublock": 8, "stime": 0.483572, "utime": 3.573325 }, "total": { "tasks.add": 3 } } # 關於stats中每一個參數的意思見http://docs.celeryproject.org/en/latest/userguide/workers.html#statistics
conf可查看配置,能看到不少東西
root@ubuntu:~/zq# celery -A tasks inspect conf -> celery@ubuntu: OK { "ADMINS": [], "BROKER_CONNECTION_MAX_RETRIES": 100, "BROKER_CONNECTION_RETRY": true, "BROKER_CONNECTION_TIMEOUT": 4, "BROKER_FAILOVER_STRATEGY": null, "BROKER_HEARTBEAT": null, "BROKER_HEARTBEAT_CHECKRATE": 3.0, "BROKER_HOST": null, "BROKER_LOGIN_METHOD": null, "BROKER_PASSWORD": "********", "BROKER_POOL_LIMIT": 10, "BROKER_PORT": null, "BROKER_TRANSPORT": null, "BROKER_TRANSPORT_OPTIONS": {}, "BROKER_URL": "redis://:********@172.20.3.3:6379/0", "BROKER_USER": null, "BROKER_USE_SSL": false, "BROKER_VHOST": null, "CASSANDRA_COLUMN_FAMILY": null, "CASSANDRA_DETAILED_MODE": false, "CASSANDRA_KEYSPACE": "********", "CASSANDRA_READ_CONSISTENCY": null, "CASSANDRA_SERVERS": null, "CASSANDRA_WRITE_CONSISTENCY": null, "CELERYBEAT_LOG_FILE": null, "CELERYBEAT_LOG_LEVEL": "INFO", "CELERYBEAT_MAX_LOOP_INTERVAL": 0, "CELERYBEAT_SCHEDULE": {}, "CELERYBEAT_SCHEDULER": "celery.beat:PersistentScheduler", "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule", "CELERYBEAT_SYNC_EVERY": 0, "CELERYD_AGENT": null, "CELERYD_AUTORELOADER": "celery.worker.autoreload:Autoreloader", "CELERYD_AUTOSCALER": "celery.worker.autoscale:Autoscaler", "CELERYD_CONCURRENCY": 0, "CELERYD_CONSUMER": "celery.worker.consumer:Consumer", "CELERYD_FORCE_EXECV": false, "CELERYD_HIJACK_ROOT_LOGGER": true, "CELERYD_LOG_COLOR": null, "CELERYD_LOG_FILE": null, "CELERYD_LOG_FORMAT": "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s", "CELERYD_LOG_LEVEL": "WARN", "CELERYD_MAX_TASKS_PER_CHILD": null, "CELERYD_POOL": "prefork", "CELERYD_POOL_PUTLOCKS": true, "CELERYD_POOL_RESTARTS": false, "CELERYD_PREFETCH_MULTIPLIER": 4, "CELERYD_STATE_DB": null, "CELERYD_TASK_LOG_FORMAT": "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s", "CELERYD_TASK_SOFT_TIME_LIMIT": null, "CELERYD_TASK_TIME_LIMIT": null, "CELERYD_TIMER": null, "CELERYD_TIMER_PRECISION": 1.0, "CELERYD_WORKER_LOST_WAIT": 10.0, "CELERYMON_LOG_FILE": null, "CELERYMON_LOG_FORMAT": "[%(asctime)s: %(levelname)s] %(message)s", "CELERYMON_LOG_LEVEL": "INFO", "CELERY_ACCEPT_CONTENT": [ "json", "pickle", "msgpack", "yaml" ], "CELERY_ACKS_LATE": false, "CELERY_ALWAYS_EAGER": false, "CELERY_ANNOTATIONS": null, "CELERY_BROADCAST_EXCHANGE": "celeryctl", "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout", "CELERY_BROADCAST_QUEUE": "celeryctl", "CELERY_CACHE_BACKEND": null, "CELERY_CACHE_BACKEND_OPTIONS": {}, "CELERY_CHORD_PROPAGATES": true, "CELERY_COUCHBASE_BACKEND_SETTINGS": null, "CELERY_CREATE_MISSING_QUEUES": true, "CELERY_DEFAULT_DELIVERY_MODE": 2, "CELERY_DEFAULT_EXCHANGE": "celery", "CELERY_DEFAULT_EXCHANGE_TYPE": "direct", "CELERY_DEFAULT_QUEUE": "celery", "CELERY_DEFAULT_RATE_LIMIT": null, "CELERY_DEFAULT_ROUTING_KEY": "********", "CELERY_DISABLE_RATE_LIMITS": false, "CELERY_EAGER_PROPAGATES_EXCEPTIONS": false, "CELERY_ENABLE_REMOTE_CONTROL": true, "CELERY_ENABLE_UTC": true, "CELERY_EVENT_QUEUE_EXPIRES": null, "CELERY_EVENT_QUEUE_TTL": null, "CELERY_EVENT_SERIALIZER": "json", "CELERY_IGNORE_RESULT": false, "CELERY_IMPORTS": [], "CELERY_INCLUDE": [ "celery.app.builtins", "tasks" ], "CELERY_MAX_CACHED_RESULTS": 100, "CELERY_MESSAGE_COMPRESSION": null, "CELERY_MONGODB_BACKEND_SETTINGS": null, "CELERY_QUEUES": null, "CELERY_QUEUE_HA_POLICY": null, "CELERY_REDIRECT_STDOUTS": true, "CELERY_REDIRECT_STDOUTS_LEVEL": "WARNING", "CELERY_REDIS_DB": null, "CELERY_REDIS_HOST": null, "CELERY_REDIS_MAX_CONNECTIONS": null, "CELERY_REDIS_PASSWORD": "********", "CELERY_REDIS_PORT": null, "CELERY_RESULT_BACKEND": "redis://:********@172.20.3.3:6379/1", "CELERY_RESULT_DBURI": null, "CELERY_RESULT_DB_SHORT_LIVED_SESSIONS": false, "CELERY_RESULT_DB_TABLENAMES": null, "CELERY_RESULT_ENGINE_OPTIONS": null, "CELERY_RESULT_EXCHANGE": "celeryresults", "CELERY_RESULT_EXCHANGE_TYPE": "direct", "CELERY_RESULT_PERSISTENT": null, "CELERY_RESULT_SERIALIZER": "pickle", "CELERY_ROUTES": null, "CELERY_SECURITY_CERTIFICATE": null, "CELERY_SECURITY_CERT_STORE": null, "CELERY_SECURITY_KEY": "********", "CELERY_SEND_EVENTS": false, "CELERY_SEND_TASK_ERROR_EMAILS": false, "CELERY_SEND_TASK_SENT_EVENT": false, "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": false, "CELERY_TASK_PUBLISH_RETRY": true, "CELERY_TASK_PUBLISH_RETRY_POLICY": {}, "CELERY_TASK_RESULT_EXPIRES": "1 day, 0:00:00", "CELERY_TASK_SERIALIZER": "pickle", "CELERY_TIMEZONE": null, "CELERY_TRACK_STARTED": false, "CELERY_WORKER_DIRECT": false, "EMAIL_HOST": "localhost", "EMAIL_HOST_PASSWORD": "********", "EMAIL_HOST_USER": null, "EMAIL_PORT": 25, "EMAIL_TIMEOUT": 2, "EMAIL_USE_SSL": false, "EMAIL_USE_TLS": false, "SERVER_EMAIL": "celery@localhost" }
可控制運行時的一些行爲
[Commands] | add_consumer <queue> [exchange [type [routing_key]]] | tell worker(s) to start consuming a queue | autoscale [max] [min] | change autoscale settings | cancel_consumer <queue> | tell worker(s) to stop consuming a queue | disable_events | tell worker(s) to disable events | enable_events | tell worker(s) to enable events | pool_grow [N=1] | start more pool processes | pool_shrink [N=1] | use less pool processes | rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)> | tell worker(s) to modify the rate limit for a task type | time_limit <task_name> <soft_secs> [hard_secs] | tell worker(s) to modify the time limit for a task type.
For example you can force workers to enable event messages (used for monitoring tasks and workers):
celery -A proj control enable_events root@ubuntu:~/zq# celery -A proj control enable_events -> celery@ubuntu: OK task events enabled worker服務端log: [2017-07-05 17:00:32,450: INFO/MainProcess] Events of group {task} enabled by remote.
打開一個監控界面
celery -A proj events
顯示以下:
中止監控
root@ubuntu:~/zq# celery -A proj control disable_events -> celery@ubuntu: OK task events disabled worker服務端log: [2017-07-05 17:03:39,981: INFO/MainProcess] Events of group {task} disabled by remote.
更深刻一些的見下連接http://docs.celeryproject.org/en/master/userguide/workers.html#guide-workers