celery -1

異步分佈式任務隊列html

語言:pythonpython

文檔:http://docs.celeryproject.org/en/master/getting-started/introduction.htmlredis

 

安裝:pip install  celery 以redis爲broker(或rabbitmq)mongodb

(venv) l@l:~/pycharm/py3$ celery
usage: celery <command> [options] 

Show help screen and exit.

positional arguments:
  args

optional arguments:
  -h, --help            show this help message and exit
  --version             show program's version number and exit

Global Options:
  -A APP, --app APP
  -b BROKER, --broker BROKER
  --loader LOADER
  --config CONFIG
  --workdir WORKDIR
  --no-color, -C
  --quiet, -q

---- -- - - ---- Commands- -------------- --- ------------

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status
 
|    celery inspect --help
|    celery inspect active 
|    celery inspect active_queues 
|    celery inspect clock 
|    celery inspect conf [include_defaults=False]
|    celery inspect memdump [n_samples=10]
|    celery inspect memsample 
|    celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]
|    celery inspect ping 
|    celery inspect query_task [id1 [id2 [... [idN]]]]
|    celery inspect registered [attr1 [attr2 [... [attrN]]]]
|    celery inspect report 
|    celery inspect reserved 
|    celery inspect revoked 
|    celery inspect scheduled 
|    celery inspect stats 
 
|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max [min]]
|    celery control cancel_consumer <queue>
|    celery control disable_events 
|    celery control election 
|    celery control enable_events 
|    celery control heartbeat 
|    celery control pool_grow [N=1]
|    celery control pool_restart 
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
|    celery control revoke [id1 [id2 [... [idN]]]]
|    celery control shutdown 
|    celery control terminate <signal> [id1 [id2 [... [idN]]]]
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery call
|    celery result
|    celery migrate
|    celery graph
|    celery upgrade

+ Debugging: 
|    celery report
|    celery logtool
---- -- - - --------- -- - -------------- --- ------------

Type 'celery <command> --help' for help using a specific command.
View Code

先爲redis設置密碼:shell

127.0.0.1:6379> CONFIG SET requirepass lyb
(error) NOAUTH Authentication required.
127.0.0.1:6379> exit
l@l:~/one$ redis-cli
127.0.0.1:6379> keys *
(error) NOAUTH Authentication required.
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth 'lyb'
OK
127.0.0.1:6379> ping
PONG
View Code

使用:django

# celery_1.py  這是worker,負責執行任務,須要提早制定好,如add_2

from celery import Celery

app = Celery('tasks',  #app名稱
             broker='redis://:lyb@localhost:6379/0',  #redis://:password@hostname:port/db_number
        backend='redis://:lyb@localhost:6379/0')   #接收結果的隊列

@app.task   
def add_2(x,y):   #worker執行的任務
  print('x + y :') 
  return x+y

啓動worker,可啓動多個worker,之間搶任務json

(venv) l@l:~/celerylx$ celery -A celery worker -l debug

app.__dict__session

{
'steps': defaultdict(<class 'set'>, {'consumer': set(), 'worker': set()}), 
'_using_v1_reduce': None, 
'_config_source': None, 
'on_after_fork': <Signal: app.on_after_fork providing_args=set()>, 
'set_as_current': True, 
'fixups': {'celery.fixups.django:fixup'}, 
'_preconf_set_by_auto': {'result_backend', 'broker_url'}, 
'log': <celery.app.log.Logging object at 0x7f28a180f048>, 
'on_after_configure': <Signal: app.on_after_configure providing_args={'source'}>, 
'annotations': (), 
'control_cls': 'celery.app.control:Control', 
'namespace': None, 
'user_options': defaultdict(<class 'set'>, {'preload': set(), 'worker': set()}), 
'autofinalize': True, 
'log_cls': 'celery.app.log:Logging', 
'_pending_defaults': deque([]), 
'_fixups': [None], 
'backend': <celery.backends.redis.RedisBackend object at 0x7f28a14466a0>, 
'Worker': <class 'celery.apps.worker.Worker'>, 
'strict_typing': True, 
'_tasks': {'celery.accumulate': <@task: celery.accumulate of tasks at 0x7f28a18d55c0>, 'celery.chord': <@task: celery.chord of tasks at 0x7f28a18d55c0>, 'celery.map': <@task: celery.map of tasks at 0x7f28a18d55c0>, 'celery.starmap': <@task: celery.starmap of tasks at 0x7f28a18d55c0>, 'celery.group': <@task: celery.group of tasks at 0x7f28a18d55c0>, 'celery.chord_unlock': <@task: celery.chord_unlock of tasks at 0x7f28a18d55c0>, 'celery.chunks': <@task: celery.chunks of tasks at 0x7f28a18d55c0>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of tasks at 0x7f28a18d55c0>, 'lx.add_2': <@task: lx.add_2 of tasks at 0x7f28a18d55c0>, 'celery.chain': <@task: celery.chain of tasks at 0x7f28a18d55c0>}, 
'clock': <LamportClock: 0>, 
'task_cls': 'celery.app.task:Task', 
'loader_cls': 'celery.loaders.app:AppLoader', 
'_pending_periodic_tasks': deque([]), 
'on_configure': <Signal: app.on_configure providing_args=set()>, 
'finalized': True, 
'GroupResult': <class 'celery.result.GroupResult'>, 
'_preconf': {'result_backend': 'redis://:lyb@localhost:6379/0', 'broker_url': 'redis://:lyb@localhost:6379/0'}, 
'registry_cls': <class 'celery.app.registry.TaskRegistry'>, 
'amqp': <celery.app.amqp.AMQP object at 0x7f28a2772208>, 
'on_after_finalize': <Signal: app.on_after_finalize providing_args=set()>, 
'_finalize_mutex': <unlocked _thread.lock object at 0x7f28a1927fa8>, 
'_pending': deque([]), 
'main': 'tasks', 
'loader': <celery.loaders.app.AppLoader object at 0x7f28a18e6278>, 
'events_cls': 'celery.app.events:Events', 
'events': <celery.app.events.Events object at 0x7f28a140f2e8>, 
'configured': True, 
'_conf': Settings({'include': ('lx', 'celery.app.builtins'), 'result_backend': 'redis://:lyb@localhost:6379/0', 'broker_url': 'redis://:lyb@localhost:6379/0'}, {}, {'broker_transport_options': {}, 'result_exchange': 'celeryresults', 'database_engine_options': None, 'couchbase_backend_settings': None, 'broker_password': None, 'worker_enable_remote_control': True, 'cassandra_keyspace': None, 'broker_write_url': None, 'broker_vhost': None, 'cassandra_write_consistency': None, 'broker_port': None, 'security_cert_store': None, 'broker_read_url': None, 'cache_backend': None, 'worker_hijack_root_logger': True, 'task_store_errors_even_if_ignored': False, 'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'cassandra_servers': None, 'beat_sync_every': 0, 'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'imports': (), 'task_queue_max_priority': None, 'elasticsearch_timeout': None, 'worker_pool': 'prefork', 'broker_connection_retry': True, 'worker_max_memory_per_child': None, 'task_queue_ha_policy': None, 'worker_direct': False, 'task_default_routing_key': None, 'beat_schedule_filename': 'celerybeat-schedule', 'task_acks_late': False, 'task_track_started': False, 'redis_max_connections': None, 'result_backend': 'redis://:lyb@localhost:6379/0', 'task_default_queue': 'celery', 'redis_backend_use_ssl': None, 'cassandra_auth_kwargs': None, 'cassandra_auth_provider': None, 'task_eager_propagates': False, 'task_compression': None, 'broker_use_ssl': False, 'result_persistent': None, 'worker_redirect_stdouts_level': 'WARNING', 'task_soft_time_limit': None, 'timezone': None, 'elasticsearch_retry_on_timeout': None, 'result_cache_max': -1, 'elasticsearch_max_retries': None, 'cassandra_entry_ttl': None, 'cassandra_table': None, 'database_short_lived_sessions': False, 'task_queues': None, 'task_default_rate_limit': None, 'task_publish_retry_policy': {'max_retries': 3, 'interval_max': 1, 'interval_start': 0, 'interval_step': 0.2}, 'task_create_missing_queues': True, 'worker_timer_precision': 1.0, 'broker_connection_timeout': 4, 'include': (), 'task_routes': None, 'task_always_eager': False, 'cassandra_read_consistency': None, 'control_queue_expires': 10.0, 'result_compression': None, 'beat_max_loop_interval': 0, 'worker_max_tasks_per_child': None, 'task_default_delivery_mode': 2, 'cassandra_port': None, 'task_remote_tracebacks': False, 'result_serializer': 'json', 'task_default_exchange_type': 'direct', 'result_exchange_type': 'direct', 'redis_socket_connect_timeout': None, 'control_queue_ttl': 300.0, 'database_table_names': None, 'worker_prefetch_multiplier': 4, 'broker_heartbeat': 120, 'accept_content': ['json'], 'broker_failover_strategy': None, 'event_queue_expires': 60.0, 'worker_pool_restarts': False, 'event_queue_ttl': 5.0, 'worker_autoscaler': 'celery.worker.autoscale:Autoscaler', 'task_serializer': 'json', 'worker_concurrency': 0, 'worker_redirect_stdouts': True, 'redis_port': None, 'worker_log_color': None, 'worker_disable_rate_limits': False, 'task_annotations': None, 'event_queue_prefix': 'celeryev', 'redis_socket_timeout': 120.0, 'worker_consumer': 'celery.worker.consumer:Consumer', 'cache_backend_options': {}, 'broker_user': None, 'worker_timer': None, 'redis_password': None, 'result_expires': datetime.timedelta(1), 'task_reject_on_worker_lost': None, 'enable_utc': True, 'task_ignore_result': False, 'broker_host': None, 'worker_send_task_events': False, 'broker_connection_max_retries': 100, 'riak_backend_settings': None, 'task_default_exchange': None, 'broker_pool_limit': 10, 'database_url': None, 'worker_agent': None, 'task_publish_retry': True, 'event_serializer': 'json', 'beat_scheduler': 'celery.beat:PersistentScheduler', 'broker_url': 'redis://:lyb@localhost:6379/0', 'redis_db': None, 'security_certificate': None, 'worker_lost_wait': 10.0, 'mongodb_backend_settings': None, 'beat_schedule': {}, 'broker_transport': None, 'task_time_limit': None, 'worker_pool_putlocks': True, 'task_send_sent_event': False, 'worker_state_db': None, 'security_key': None, 'task_protocol': 2, 'broker_heartbeat_checkrate': 3.0, 'redis_host': None, 'broker_login_method': None}), 
'Task': <class 'celery.app.task.Task'>, 
'AsyncResult': <class 'celery.result.AsyncResult'>, 
'control': <celery.app.control.Control object at 0x7f28a1435780>, 
'amqp_cls': 'celery.app.amqp:AMQP', 
'tasks': {'celery.accumulate': <@task: celery.accumulate of tasks at 0x7f28a18d55c0>, 
'celery.chord': <@task: celery.chord of tasks at 0x7f28a18d55c0>, 
'celery.map': <@task: celery.map of tasks at 0x7f28a18d55c0>, 
'celery.starmap': <@task: celery.starmap of tasks at 0x7f28a18d55c0>, 
'celery.group': <@task: celery.group of tasks at 0x7f28a18d55c0>, 
'celery.chord_unlock': <@task: celery.chord_unlock of tasks at 0x7f28a18d55c0>, 
'celery.chunks': <@task: celery.chunks of tasks at 0x7f28a18d55c0>, 
'celery.backend_cleanup': <@task: celery.backend_cleanup of tasks at 0x7f28a18d55c0>, 
'lx.add_2': <@task: lx.add_2 of tasks at 0x7f28a18d55c0>, 
'celery.chain': <@task: celery.chain of tasks at 0x7f28a18d55c0>}}
View Code

 

用戶調用接口:app

>>> import lx
>>> a = lx.add_2.delay(4,7)  #返回任務的id
>>> b = lx.mul.delay(6,6)
>>> a
<AsyncResult: 0c2b4d9e-c99d-4456-8569-2c993317fe21>
>>> b
<AsyncResult: 085c826e-3b26-4d3e-8f95-afd071057797>

>>> a.get()  #取出結果
11
>>> b.get()
36
>>> a.ready()  #判斷任務是否執行完畢

 

結合djnago:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#django-first-steps異步

一、在項目主應用下新建:celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite_celery')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()    #自動發現全部應用下的tasks.py


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

二、__init__.py中:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

三、在應用app01中新建 tasks.py,並定義任務

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task  #任務做用範圍不限於項目本身


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

四、settings.py

#for celery
CELERY_BROKER_URL = 'redis://:lyb@localhost:6379/0',
CELERY_RESULT_BACKEND ='redis://:lyb@localhost:6379/0'

五、views.py中

from django.shortcuts import render,HttpResponse,redirect
from . import tasks

def index(request):
    t1 = tasks.add.delay(1,2)
    res = t1.get()
    return HttpResponse(res)

六、啓動worker

(venv) l@l:~/$ celery -A mysite worker -l info

七、啓動項目,訪問,就可看到結果

 

改善

視圖中取task_id後取值:

from django.shortcuts import render,HttpResponse,redirect
from . import tasks
from celery.result import AsyncResult

def index(request):
    t1 = tasks.add.delay(1,2)
    id = t1.task_id
    
    print(AsyncResult(id=id).get())
    return HttpResponse(t1)
相關文章
相關標籤/搜索