1、簡介
Celery是由Python開發、簡單、靈活、可靠的分佈式任務隊列,其本質是生產者消費者模型,生產者發送任務到消息隊列,消費者負責處理任務。Celery側重於實時操做,但對調度支持也很好,其天天能夠處理數以百萬計的任務。特色:html
- 簡單:熟悉celery的工做流程後,配置使用簡單
- 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery會自動嘗試從新執行任務
- 快速:一個單進程的celery每分鐘可處理上百萬個任務
- 靈活:幾乎celery的各個組件均可以被擴展及自定製
應用場景舉例:python
1.web應用:當用戶在網站進行某個操做須要很長時間完成時,咱們能夠將這種操做交給Celery執行,直接返回給用戶,等到Celery執行完成之後通知用戶,大大提好網站的併發以及用戶的體驗感。web
2.任務場景:好比在運維場景下須要批量在幾百臺機器執行某些命令或者任務,此時Celery能夠輕鬆搞定。redis
3.定時任務:向定時導數據報表、定時發送通知相似場景,雖然Linux的計劃任務能夠幫我實現,可是很是不利於管理,而Celery能夠提供管理接口和豐富的API。json
2、架構&工做原理
Celery由如下三部分構成:消息中間件(Broker)、任務執行單元Worker、結果存儲(Backend),以下圖:api
工做原理:promise
- 任務模塊Task包含異步任務和定時任務。其中,異步任務一般在業務邏輯中被觸發併發往消息隊列,而定時任務由Celery Beat進程週期性地將任務發往消息隊列;
- 任務執行單元Worker實時監視消息隊列獲取隊列中的任務執行;
- Woker執行完任務後將結果保存在Backend中;
消息中間件Broker
消息中間件Broker官方提供了不少備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ。架構
任務執行單元Worker
Worker是任務執行單元,負責從消息隊列中取出任務執行,它能夠啓動一個或者多個,也能夠啓動在不一樣的機器節點,這就是其實現分佈式的核心。併發
結果存儲Backend
Backend結果存儲官方也提供了諸多的存儲方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。app
3、安裝使用
這裏我使用的redis做爲消息中間件,redis安裝能夠參考https://www.cnblogs.com/wdliu/p/9360286.html。
Celery安裝:
pip3 install celery
簡單使用
目錄結構:
project/ ├── __init__.py ├── config.py └── tasks.py
各目錄文件說明:
__init__.py:初始化Celery以及加載配置文件
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from celery import Celery app = Celery('project') # 建立 Celery 實例 app.config_from_object('project.config') # 加載配置模塊
config.py: Celery相關配置文件,更多配置參考:http://docs.celeryproject.org/en/latest/userguide/configuration.html
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,能夠指定多個 'project.tasks', )
tasks.py :任務定義文件
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def show_name(name): return name
啓動Worker:
celery worker -A project -l debug
各個參數含義:
worker: 表明第啓動的角色是work固然還有beat等其餘角色;
-A :項目路徑,這裏個人目錄是project
-l:啓動的日誌級別,更多參數使用celery --help查看
查看日誌輸出,會發現咱們定義的任務,以及相關配置:
雖然啓動了worker,可是咱們還須要經過delay或apply_async來將任務添加到worker中,這裏咱們經過交互式方法添加任務,並返回AsyncResult對象,經過AsyncResult對象獲取結果:
AsyncResult除了get方法用於經常使用獲取結果方法外還提如下經常使用方法或屬性:
- state: 返回任務狀態;
- task_id: 返回任務id;
- result: 返回任務結果,同get()方法;
- ready(): 判斷任務是否以及有結果,有結果爲True,不然False;
- info(): 獲取任務信息,默認爲結果;
- wait(t): 等待t秒後獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則wait期間一直阻塞,直到超時報錯;
- successfu(): 判斷任務是否成功,成功爲True,不然爲False;
4、進階使用
對於普通的任務來講可能知足不了咱們的任務需求,因此還須要瞭解一些進階用法,Celery提供了諸多調度方式,例如任務編排、根據任務狀態執行不一樣的操做、重試機制等,如下會對經常使用高階用法進行講述。
定時任務&計劃任務
Celery的提供的定時任務主要靠schedules來完成,經過beat組件週期性將任務發送給woker執行。在示例中,新建文件period_task.py,並添加任務到配置文件中:
period_task.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒執行add sender.add_periodic_task( crontab(hour=16, minute=56, day_of_week=1), #每週一下午四點五十六執行sayhai sayhi.s('wd'),name='say_hi' ) @app.task def add(x,y): print(x+y) return x+y @app.task def sayhi(name): return 'hello %s' % name
config.py
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,能夠指定多個 'project.tasks', 'project.period_task', #定時任務 )
啓動worker和beat:
celery worker -A project -l debug #啓動work celery beat -A project.period_task -l debug #啓動beat,注意此時對應的文件路徑
咱們能夠觀察worker日誌:
還能夠經過配置文件方式指定定時和計劃任務,此時的配置文件以下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,能夠指定多個 'project.tasks', 'project.period_task', ) app.conf.beat_schedule = { 'period_add_task': { # 計劃任務 'task': 'project.period_task.add', #任務路徑 'schedule': crontab(hour=18, minute=16, day_of_week=1), 'args': (3, 4), }, 'add-every-30-seconds': { # 每10秒執行 'task': 'project.period_task.sayhi', #任務路徑 'schedule': 10.0, 'args': ('wd',) }, }
此時的period_task.py只須要註冊到woker中就好了,以下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def add(x,y): print(x+y) return x+y @app.task def sayhi(name): return 'hello %s' % name
一樣啓動worker和beat結果和第一種方式同樣。更多詳細的內容請參考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
任務綁定
Celery可經過任務綁定到實例獲取到任務的上下文,這樣咱們能夠在任務運行時候獲取到任務的狀態,記錄相關日誌等。
修改任務中的period_task.py,以下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.utils.log import get_task_logger
logger = get_task_logger(__name__) @app.task(bind=True) # 綁定任務 def add(self,x,y): logger.info(self.request.__dict__) #打印日誌 try: a=[] a[10]==1 except Exception as e: raise self.retry(exc=e, countdown=5, max_retries=3) # 出錯每5秒嘗試一次,總共嘗試3次 return x+y
在以上代碼中,經過bind參數將任務綁定,self指任務的上下文,經過self獲取任務狀態,同時在任務出錯時進行任務重試,咱們觀察日誌:
內置鉤子函數
Celery在執行任務時候,提供了鉤子方法用於在任務執行完成時候進行對應的操做,在Task源碼中提供了不少狀態鉤子函數如:on_success(成功後執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行),在進行使用是咱們只須要重寫這些方法,完成相應的操做便可。
在如下示例中,咱們繼續修改period_task.py,分別定義三個任務來演示任務失敗、重試、任務成功後執行的操做:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.utils.log import get_task_logger from celery import Task logger = get_task_logger(__name__) class demotask(Task): def on_success(self, retval, task_id, args, kwargs): # 任務成功執行 logger.info('task id:{} , arg:{} , successful !'.format(task_id,args)) def on_failure(self, exc, task_id, args, kwargs, einfo): #任務失敗執行 logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc)) def on_retry(self, exc, task_id, args, kwargs, einfo): #任務重試執行 logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc)) @app.task(base=demotask,bind=True) def add(self,x,y): try: a=[] a[10]==1 except Exception as e: raise self.retry(exc=e, countdown=5, max_retries=1) # 出錯每5秒嘗試一次,總共嘗試1次 return x+y @app.task(base=demotask) def sayhi(name): a=[] a[10]==1 return 'hi {}'.format(name) @app.task(base=demotask) def sum(a,b): return 'a+b={} '.format(a+b)
此時的配置文件config.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,能夠指定多個 'project.tasks', 'project.period_task', ) app.conf.beat_schedule = { 'add': { # 每10秒執行 'task': 'project.period_task.add', #任務路徑 'schedule': 10.0, 'args': (10,12), }, 'sayhi': { # 每10秒執行 'task': 'project.period_task.sayhi', #任務路徑 'schedule': 10.0, 'args': ('wd',), }, 'sum': { # 每10秒執行 'task': 'project.period_task.sum', #任務路徑 'schedule': 10.0, 'args': (1,3), }, }
而後重啓worker和beat,查看日誌:
任務編排
在不少狀況下,一個任務須要由多個子任務或者一個任務須要不少步驟才能完成,Celery一樣也能實現這樣的任務,完成這類型的任務經過如下模塊完成:
-
group: 並行調度任務
-
chain: 鏈式任務調度
-
chord: 相似group,但分header和body2個部分,header能夠是一個group任務,執行完成後調用body的任務
-
map: 映射調度,經過輸入多個入參來屢次調度同一個任務
-
starmap: 相似map,入參相似*args
-
chunks: 將任務按照必定數量進行分組
修改tasks.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def add(x,y): return x+y @app.task def mul(x,y): return x*y @app.task def sum(data_list): res=0 for i in data_list: res+=i return res
group: 組任務,組內每一個任務並行執行
和project同級目錄新建consumer.py以下:
from celery import group from project.tasks import add,mul,sum res = group(add.s(1,2),add.s(1,2))() # 任務 [1+2,1+2] while True: if res.ready(): print('res:{}'.format(res.get())) break
結果:
chain:鏈式任務
鏈式任務中,默認上一個任務的返回結果做爲參數傳遞給子任務
from celery import chain from project.tasks import add,mul,sum res = chain(add.s(1,2),add.s(3),mul.s(3))() # 任務((1+2)+3)*3 while True: if res.ready(): print('res:{}'.format(res.get())) break #結果 #res:18
還可使用|表示鏈式任務,上面任務也能夠表示爲:
res = (add.s(1,2) | add.s(3) | (mul.s(3)))() res.get()
chord:任務分割,分爲header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果做爲參數傳遞給body
from celery import chord from project.tasks import add,mul,sum res = chord(header=[add.s(1,2),mul.s(3,4)],body=sum.s())() # 任務(1+2)+(3*4) while True: if res.ready(): print('res:{}'.format(res.get())) break #結果: #res:15
chunks:任務分組,按照任務的個數分組
from project.tasks import add,mul,sum res = add.chunks(zip(range(5),range(5)),4)() # 4 表明每組的任務的個數 while True: if res.ready(): print('res:{}'.format(res.get())) break
結果:
delay &apply_async
對於delay和apply_async均可以用來進行任務的調度,本質上是delay對apply_async進行了再一次封裝(或者能夠說是快捷方式),二者都返回AsyncResult對象,如下是兩個方法源碼。
def delay(self, *args, **kwargs): """Star argument version of :meth:`apply_async`. Does not support the extra options enabled by :meth:`apply_async`. Arguments: *args (Any): Positional arguments passed on to the task. **kwargs (Any): Keyword arguments passed on to the task. Returns: celery.result.AsyncResult: Future promise. """ return self.apply_async(args, kwargs)
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): """Apply tasks asynchronously by sending a message. Arguments: args (Tuple): The positional arguments to pass on to the task. kwargs (Dict): The keyword arguments to pass on to the task. countdown (float): Number of seconds into the future that the task should execute. Defaults to immediate execution. eta (~datetime.datetime): Absolute time and date of when the task should be executed. May not be specified if `countdown` is also supplied. expires (float, ~datetime.datetime): Datetime or seconds in the future for the task should expire. The task won't be executed after the expiration time. shadow (str): Override task name used in logs/monitoring. Default is retrieved from :meth:`shadow_name`. connection (kombu.Connection): Re-use existing broker connection instead of acquiring one from the connection pool. retry (bool): If enabled sending of the task message will be retried in the event of connection loss or failure. Default is taken from the :setting:`task_publish_retry` setting. Note that you need to handle the producer/connection manually for this to work. retry_policy (Mapping): Override the retry policy used. See the :setting:`task_publish_retry_policy` setting. queue (str, kombu.Queue): The queue to route the task to. This must be a key present in :setting:`task_queues`, or :setting:`task_create_missing_queues` must be enabled. See :ref:`guide-routing` for more information. exchange (str, kombu.Exchange): Named custom exchange to send the task to. Usually not used in combination with the ``queue`` argument. routing_key (str): Custom routing key used to route the task to a worker server. If in combination with a ``queue`` argument only used to specify custom routing keys to topic exchanges. priority (int): The task priority, a number between 0 and 9. Defaults to the :attr:`priority` attribute. serializer (str): Serialization method to use. Can be `pickle`, `json`, `yaml`, `msgpack` or any custom serialization method that's been registered with :mod:`kombu.serialization.registry`. Defaults to the :attr:`serializer` attribute. compression (str): Optional compression method to use. Can be one of ``zlib``, ``bzip2``, or any custom compression methods registered with :func:`kombu.compression.register`. Defaults to the :setting:`task_compression` setting. link (Signature): A single, or a list of tasks signatures to apply if the task returns successfully. link_error (Signature): A single, or a list of task signatures to apply if an error occurs while executing the task. producer (kombu.Producer): custom producer to use when publishing the task. add_to_parent (bool): If set to True (default) and the task is applied while executing another task, then the result will be appended to the parent tasks ``request.children`` attribute. Trailing can also be disabled by default using the :attr:`trail` attribute publisher (kombu.Producer): Deprecated alias to ``producer``. headers (Dict): Message headers to be included in the message. Returns: celery.result.AsyncResult: Promise of future evaluation. Raises: TypeError: If not enough arguments are passed, or too many arguments are passed. Note that signature checks may be disabled by specifying ``@task(typing=False)``. kombu.exceptions.OperationalError: If a connection to the transport cannot be made, or if the connection is lost. Note: Also supports all keyword arguments supported by :meth:`kombu.Producer.publish`. """ if self.typing: try: check_arguments = self.__header__ except AttributeError: # pragma: no cover pass else: check_arguments(*(args or ()), **(kwargs or {})) app = self._get_app() if app.conf.task_always_eager: with denied_join_result(): return self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options) if self.__v2_compat__: shadow = shadow or self.shadow_name(self(), args, kwargs, options) else: shadow = shadow or self.shadow_name(args, kwargs, options) preopts = self._get_exec_options() options = dict(preopts, **options) if options else preopts options.setdefault('ignore_result', self.ignore_result) return app.send_task( self.name, args, kwargs, task_id=task_id, producer=producer, link=link, link_error=link_error, result_cls=self.AsyncResult, shadow=shadow, task_type=self, **options )
對於其使用,apply_async支持經常使用參數:
- eta:指定任務執行時間,類型爲datetime時間類型;
- countdown:倒計時,單位秒,浮點類型;
- expires:任務過時時間,若是任務在超過過時時間還未執行則回收任務,浮點類型獲取datetime類型;
- retry:任務執行失敗時候是否嘗試,布爾類型。;
- serializer:序列化方案,支持pickle、json、yaml、msgpack;
- priority:任務優先級,有0~9優先級可設置,int類型;
- retry_policy:任務重試機制,其中包含幾個重試參數,類型是dict以下:
max_retries:最大重試次數 interval_start:重試等待時間 interval_step:每次重試疊加時長,假設第一重試等待1s,第二次等待1+n秒 interval_max:最大等待時間 ####示例 add.apply_async((1, 3), retry=True, retry_policy={ 'max_retries': 1, 'interval_start': 0, 'interval_step': 0.8, 'interval_max': 5, })
更多參數參考:http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async
5、管理與監控
Celery管理和監控功能是經過flower組件實現的,flower組件不只僅提供監控功能,還提供HTTP API可實現對woker和task的管理。
安裝使用
pip3 install flower
啓動
flower -A project --port=5555 # -A :項目目錄 #--port 指定端口
訪問http:ip:5555
api使用,例如獲取woker信息:
curl http://127.0.0.1:5555/api/workers
結果:
更多api參考:https://flower.readthedocs.io/en/latest/api.html