Celery 全面學習筆記

介紹

講師的博客:https://www.cnblogs.com/alex3714/articles/6351797.html
文檔(入門的部分是中文的):http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps
網上更多資料:http://www.javashuo.com/article/p-fqrpploi-de.htmlhtml

Celery 是 Distributed Task Queue,分佈式任務隊列。分佈式決定了能夠有多個 worker 的存在,隊列表示其是異步操做。前端

Celery 核心模塊

Celery有一下5個核心角色
Task
就是任務,有異步任務和定時任務
Broker
中間人,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker。Celery自己不提供隊列服務,推薦用Redis或RabbitMQ實現隊列服務。
Worker
執行任務的單元,它實時監控消息隊列,若是有任務就獲取任務並執行它。
Beat
定時任務調度器,根據配置定時將任務發送給Broler。
Backend
用於存儲任務的執行結果。
各個角色間的關係看下面這張圖理解一下:
Celery 全面學習筆記node

安裝

Celery4.x 開始再也不支持Windows平臺了。3.1.26是最後目前最新的3.x版本,下面裝的是3.1.25。python

pip install celery
pip install celery==3.1.25

建議使用的Broker只有RabbitMQ和redis這兩個。RabbitMQ只要準備好服務,不須要安裝額外的模塊。
若是要用redis,那麼還要準備redis服務,以及安裝redis模塊:git

pip install redis

上面的安裝也能夠用下面的命令把redis一塊兒裝上:github

pip install -U 'celery[redis]'

驗證

使用命令 celery --version 查看版本,順便驗證:redis

>celery --version
'celery' 不是內部或外部命令,也不是可運行的程序
或批處理文件。

這裏報錯是由於沒有把celery加到環境變量裏,因此找不到程序。不過我也不想加,因此把路徑打全也好了:shell

>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery --version
3.1.25 (Cipater)

基本操做

這裏跑一個簡單的任務,最後再獲取到任務的執行結果。數據庫

建立任務

先按下面寫一段代碼:django

# task1.py

from celery import Celery

# 建立Celery實例
app = Celery('tasks',
             broker='redis://192.168.246.11:6379/0',
             )

# 建立任務
@app.task
def add(x, y):
    print("計算2個值的和: %s %s" % (x, y))
    return x+y

若是使用RabbitMQ,則把broker修改爲這個 broker='amqp://192.168.3.108'

啓動Worker

啓動Celery Worker來開始監聽並執行任務:

$ celery -A task1 worker --loglevel=info 
$ celery -A task1 worker --l debug  # 或者能夠這麼起

參數 -A 後跟的是Celery實例,實例的名字能夠省略,寫全是 task1.app 。你要把目錄切換到task1文件所在的目錄執行命令,或者看看有沒有參數能把文件目錄加到python的環境變量中去。由於-A 以後的參數是做爲python的模塊來導入的。因此像下面這樣,我也把Worker跑起來了:

G:\>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A Steed.Documents.PycharmProjects.Celery.task1 worker --loglevel=info
[2018-09-28 17:55:10,715: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@IDX-xujf v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1fb5056fda0
- ** ---------- .> transport:   redis://192.168.246.11:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . Steed.Documents.PycharmProjects.Celery.task1.add

[2018-09-28 17:55:10,864: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0
[2018-09-28 17:55:10,922: INFO/MainProcess] mingle: searching for neighbors
[2018-09-28 17:55:11,961: INFO/MainProcess] mingle: all alone
[2018-09-28 17:55:11,980: WARNING/MainProcess] celery@IDX-xujf ready.

調用任務

要給Worker發送任務,須要調用 delay() 方法,下面是在IDLE上操做的:

>>> import sys
>>> dir = r"G:\Steed\Documents\PycharmProjects\Celery"
>>> sys.path.append(dir)  # 個人任務文件不在環境變量裏,IDLE找不到
>>> from task1 import add
>>> add.delay(1, 2)
<AsyncResult: 4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a>
>>>

Worker顯示了下面這些信息

[2018-09-29 11:10:33,103: INFO/MainProcess] Received task: task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a]
[2018-09-29 11:10:33,107: WARNING/Worker-1] 計算2個值的和: 1 2
[2018-09-29 11:10:33,109: INFO/MainProcess] Task task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] succeeded in 0s: 3

上面只是一個發送任務的調用,結果是拿不到的。上面也沒有接收返回值,此次把返回值保存到起來:

>>> t = add.delay(3, 4)
>>> type(t)  # 查看返回值的類型
<class 'celery.result.AsyncResult'>
>>> t.get()  # 這句會報錯
Traceback (most recent call last):
  File "<pyshell#16>", line 1, in <module>
    t.get()
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\result.py", line 169, in get
    no_ack=no_ack,
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\backends\base.py", line 616, in _is_disabled
    'No result backend configured.  '
NotImplementedError: No result backend configured.  Please see the documentation for more information.

這裏是實例化的時候,沒有定義backend,就是保存任務結果的位置。

獲取返回結果

修改最初的任務的代碼,在實例化的時候加上backend參數,指定保存任務結果的位置。這裏把結果也存到同一個redis裏:

from celery import Celery

app = Celery('tasks',
             broker='redis://192.168.246.11',
             backend='redis://192.168.246.11',  # 此次把端口號什麼的都省了
             )

@app.task
def add(x, y):
    print("計算2個值的和: %s %s" % (x, y))
    return x+y

而後要重啓Worker,IDLE也要重啓,如今能夠獲取到任務的返回結果了:

>>> t = add.delay(1, 1)
>>> t.get()
2
>>>

若是是RabbitMQ,則app的初始話設置就這麼寫:

app = Celery('tasks',
             broker='amqp://192.168.3.108',
             backend='rpc://192.168.3.108',  # 新版本rpc將初步替代amqp,用的仍是RabbitMQ
             # backend='amqp://192.168.3.108',  # 若是是舊版本,沒有rpc,那隻能用amqp
                         )

其餘操做

get進入阻塞
上面的任務執行的太快了,準備一個須要執行一段時間的任務:

import time

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

用get調用任務會進入阻塞,直到任務返回結果,這樣就沒有異步的效果了:

>>> t = upper.delay("abc")
>>> t.get()
'ABC'

ready獲取任務是否完成,不阻塞
ready()方法能夠返回任務是否執行完成,等到返回True了再去get,立刻能拿到結果:

>>> t = upper.delay("abcd")
>>> t.ready()
False
>>> t.ready()
False
>>> t.ready()
False
>>> t.ready()
True
>>> t.get()
'ABCD'
>>

get設置超時時間
還能夠給get設置一個超時時間,若是超時,會拋出異常:

>>> t = upper.delay("abcde")
>>> t.get(timeout=11)
'ABCDE'
>>> t = upper.delay("abcde")
>>> t.get(timeout=1)
Traceback (most recent call last):
  File "<pyshell#17>", line 1, in <module>
    t.get(timeout=1)
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 169, in get
    no_ack=no_ack,
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\backends\base.py", line 238, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
>>>

任務報錯
若是任務執行報錯,好比執行這個任務:

>>> t = upper.delay(123)
>>>

那麼Worker那邊會顯示錯誤的內容:

[2018-09-29 12:57:07,077: ERROR/MainProcess] Task task1.upper[11820ee6-6936-4680-93c2-462487ec927e] raised unexpected: AttributeError("'int' object has no attribute 'upper'",)
Traceback (most recent call last):
  File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "G:\Steed\Documents\PycharmProjects\Celery\task1.py", line 25, in upper
    return v.upper()
AttributeError: 'int' object has no attribute 'upper'

而後再get結果的時候,會把這個錯誤做爲異常拋出,這樣很不友好:

>>> t = upper.delay(123)
>>> t.get()
Traceback (most recent call last):
  File "<pyshell#27>", line 1, in <module>
    t.get()
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 175, in get
    raise meta['result']
AttributeError: 'int' object has no attribute 'upper'
>>>

get設置只獲取錯誤結果,不觸發異常

>>> t.get(propagate=False)
AttributeError("'int' object has no attribute 'upper'",)
>>>

traceback 裏面存着錯誤信息

>>> t.traceback
'Traceback (most recent call last):\n  File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 240, in trace_task\n    R = retval = fun(*args, **kwargs)\n  File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 438, in __protected_call__\n    return self.run(*args, **kwargs)\n  File "G:\\Steed\\Documents\\PycharmProjects\\Celery\\task1.py", line 25, in upper\n    return v.upper()\nAttributeError: \'int\' object has no attribute \'upper\'\n'
>>>

小結

啓動Celery Worker來開始監聽並執行任務

$ celery -A tasks worker --loglevel=info

調用任務

>>> from tasks import add
>>> t = add.delay(4, 4)

同步拿結果

>>> t.get()
>>> t.get(timeout=1)

檢查任務是否完成

>>> t.ready()

若是出錯,獲取錯誤結果,不觸發異常

>>> t.get(propagate=False)
>>> t.traceback  # 打印異常詳細結果

在項目中使用Celery

能夠把celery配置成一個應用,假設應用名字是CeleryPro,目錄格式以下:

CeleryPro
├─__init.py
├─celery.py
├─tasks.py

這裏的鏈接文件命名必須爲celery.py,其餘名字隨意

celery文件

這個文件名必須是celery.py:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='redis://192.168.246.11',
             backend='redis://192.168.246.11',
             include=['CeleryPro.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

第一句 from __future__ import absolute_import, unicode_literals ,後面的unicode_literals不知道是什麼。不過前面的absolute_import是絕對引入。由於這個文件的文件名就是celery,因此默認後面的 form celery 是引入這個文件,但咱們實際須要的是引入celery模塊,因此用了絕對引入這個模塊。若是要引入這個文件,能夠這麼寫 from .celery ,加個點,下面的tasks裏會用到

tasks文件

這個文件開始兩行就多了一個點,這裏要導入上面的celery.py文件。後面只要寫各類任務加上裝飾器就能夠了:

from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("計算2個值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

啓動worker

啓動的時候,-A 參數後面用應用名稱 CeleryPro 。你還須要cd到你CeleryPro的父級目錄上啓動,不然找不到:

>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A CeleryPro worker -l info
[2018-09-29 15:06:20,818: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@IDX-xujf v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x21deadaf470
- ** ---------- .> transport:   redis://192.168.246.11:6379//
- ** ---------- .> results:     redis://192.168.246.11/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.tasks.add
  . CeleryPro.tasks.upper

[2018-09-29 15:06:20,953: INFO/MainProcess] Connected to redis://192.168.246.11:6379//
[2018-09-29 15:06:20,983: INFO/MainProcess] mingle: searching for neighbors
[2018-09-29 15:06:21,994: INFO/MainProcess] mingle: all alone
[2018-09-29 15:06:22,055: WARNING/MainProcess] celery@IDX-xujf ready.

各類啓動的姿式
這裏注意用的都是CeleryPro:

celery -A CeleryPro worker -loglevel=info  # 前臺啓動不推薦
celery -A CeleryPro worker -l info  # 前臺啓動簡寫
celery multi start w1 -A  CeleryPro -l info  # 推薦用後臺啓動

調用任務

調用任務也是在CeleryPro的父級目錄下調用就行了,各類用法都同樣。
操做都要在CeleryPro的父級目錄下執行,就是說只要保證CeleryPro的父級目錄在環境變量裏。或者用 sys.path.append() 加到環境變量裏去。
這裏理解爲把celery包裝成了你項目裏的一個應用,應用的內容都放在了CeleryPro這個文件夾下。而CeleryPro就做爲你的項目裏的一個模塊。而你項目的主目錄必定在項目啓動的時候加到環境變量裏的,因此其實這樣包裝好以後再項目裏使用應該很方便。

後臺啓動多個Worker

啓動命令:

  • celery -A 項目名 worker -loglevel=info : 前臺啓動命令
  • celery multi start w1 -A 項目名 -l info : 後臺啓動命令
  • celery multi restart w1 -A 項目名 -l info : 後臺重啓命令
  • celery multi stop w1 -A 項目名 -l info : 後臺中止命令

先後臺的區別:後臺是經過mult啓動的。
w1是worker的名稱,能夠後臺啓動多個worker,每一個worker有一個一名稱。
即使是全部的worker都已經done了,用戶任然啓動了任務,全部的任務都會保留,直到有worker來執行並返回結果。
若是前臺啓動的worker斷開了,那麼worker的任務會消失;若是後臺啓動的worker斷開了,後臺的任務仍然在。沒太理解這句的意思。
查看當前還有多少個Celery的worker
彷佛也就只能經過ps來查看了,下面先起了3個後臺Worker,ps看一下,而後停掉了一個Worker,再用ps看了一下:

[root@Python3 ~]# celery multi start w1 -A CeleryPro -l info
celery multi v4.2.1 (windowlicker)
> Starting nodes...
    > w1@Python3: OK
[root@Python3 ~]# celery multi start w2 -A CeleryPro -l info
celery multi v4.2.1 (windowlicker)
> Starting nodes...
    > w2@Python3: OK
[root@Python3 ~]# celery multi start w3 -A CeleryPro -l info
celery multi v4.2.1 (windowlicker)
> Starting nodes...
    > w3@Python3: OK
[root@Python3 ~]# ps -ef | grep celery
root       1346      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3
root       1350   1346  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3
root       1360      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1364   1360  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1374      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1378   1374  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1391   1251  0 20:55 pts/0    00:00:00 grep --color=auto celery
[root@Python3 ~]# celery multi stop w1
celery multi v4.2.1 (windowlicker)
> Stopping nodes...
    > w1@Python3: TERM -> 1346
[root@Python3 ~]# ps -ef | grep celery
root       1360      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1364   1360  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1374      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1378   1374  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1398   1251  0 20:57 pts/0    00:00:00 grep --color=auto celery
[root@Python3 ~]#

Windows平臺不支持

錯誤信息以下:

File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\platforms.py", line 429, in detached
    raise RuntimeError('This platform does not support detach.')
RuntimeError: This platform does not support detach.
        > w1@IDX-xujf: * Child terminated with errorcode 1
FAILED

根據錯誤信息查看一下429行的代碼:

if not resource:
        raise RuntimeError('This platform does not support detach.')

這裏判斷了一下resource,而後就直接拋出異常了。resource具體是什麼,能夠在這個文件裏搜索一下變量名(resource)

# 在開頭獲取了這個resource的值
resource = try_import('resource')

# 上面的try_import方法,在另一個文件裏
def try_import(module, default=None):
    """Try to import and return module, or return
    None if the module does not exist."""
    try:
        return importlib.import_module(module)
    except ImportError:
        return default

# 下面有一個方法註釋裏代表resource爲None表明是Windows
def get_fdmax(default=None):
    """Return the maximum number of open file descriptors
    on this system.

    :keyword default: Value returned if there's no file
                      descriptor limit.

    """
    try:
        return os.sysconf('SC_OPEN_MAX')
    except:
        pass
    if resource is None:  # Windows
        return default
    fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if fdmax == resource.RLIM_INFINITY:
        return default
    return fdmax

上面作的就是要嘗試導入一個模塊 「resource」 。該模塊只用於Unix。

定時任務

3版本的定時任務和4版本仍是有很大差異的。另外4版本里有更多的定時任務。

Celery3

繼續使用以前的2個任務,只須要爲celery添加一些配置(conf),爲任務設置計劃。
app.conf裏的參數都是全大寫的,這裏大小寫敏感,不能用小寫:

# CeleryPro/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("計算2個值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

# CeleryPro/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

app = Celery('CeleryPro',
             broker='redis://192.168.246.11',
             backend='redis://192.168.246.11',
             include=['CeleryPro.tasks'])

app.conf.CELERYBEAT_SCHEDULE = {
    'add every 10 seconds': {
        'task': 'CeleryPro.tasks.add',
        'schedule': timedelta(seconds=10),  # 能夠用timedelta對象
        # 'schedule': 10,  # 也支持直接用數字表示秒數
        'args': (1, 2)
    },
    'upper every 2 minutes': {
        'task': 'CeleryPro.tasks.upper',
        'schedule': crontab(minute='*/2'),
        'args': ('abc', ),
    },
}

# app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

任務結果過時設置 `CELERY_TASK_RESULT_EXPIRES=3600' 。默認設置是1天,官網介紹這是靠一個內置的週期性任務把超過期限的任務結果給清除的。

A built-in periodic task will delete the results after this time (celery.task.backend_cleanup).

設置完成後,啓動Worker,啓動Beat就OK了:

G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro worker -l info
G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro beat -l info

3版本里的參數都是全大寫的,到了4版本開始改用小寫了,而且不少參數名也變了。這裏有新舊參數的對應關係:
http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration

Celery4

新版的好處是,能夠把定時任務和普通的任務同樣單獨定義了。多了 @app.on_after_configure.connect 這個裝飾器,3版本是沒有這個裝飾器的。
寫代碼
單獨再建立一個py文件,存放定時任務:

# CeleryPro/periodic4.py
from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10秒執行一次
    sender.add_periodic_task(10.0, hello.s(), name='hello every 10') # 給任務取個名字

    # 每30秒執行一次
    sender.add_periodic_task(30, upper.s('abcdefg'), expires=10)  # 設置任務超時時間10秒

    # 執行週期和Linux的計劃任務crontab設置同樣
    sender.add_periodic_task(
        crontab(hour='*', minute='*/2', day_of_week='*'),
        add.s(11, 22),
    )

@app.task
def hello():
    print('Hello World')

@app.task
def upper(arg):
    return arg.upper()

@app.task
def add(x, y):
    print("計算2個值的和: %s %s" % (x, y))
    return x+y

上面一共定了3個計劃。
name參數給計劃取名,這樣這個任務報告的時候就會使用name的值,像這樣:hello every 10。不然默認顯示的是調用函數的命令,像這樣:CeleryPro.periodic4.upper('abcdefg')
expires參數設置任務超時時間,超時未完成,可能就放棄了(沒測試)。
修改一下以前的celery.py文件,把新寫的任務文件添加到include的列表裏。順便我這裏改用RabbitMQ玩一下:

# CeleryPro/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='amqp://192.168.3.108',
             backend='rpc',
             include=['CeleryPro.tasks', 'CeleryPro.periodic4'])

app.conf.timezone = 'UTC'  # 計劃任務默認用的是UTC時間
# app.conf.timezone = 'Asia/Shanghai'  # 也能夠更改成北京時間

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

啓動worker
啓動方法和以前同樣:

[root@Python3 ~]# celery -A CeleryPro worker -l info
/usr/local/lib/python3.6/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@Python3 v4.2.1 (windowlicker)
---- **** ----- 
--- * ***  * -- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-01 12:46:35
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x7ffb0c8b2908
- ** ---------- .> transport:   amqp://guest:**@192.168.3.108:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.periodic4.add
  . CeleryPro.periodic4.hello
  . CeleryPro.periodic4.upper
  . CeleryPro.tasks.add
  . CeleryPro.tasks.upper

[2018-10-01 12:46:35,187: INFO/MainProcess] Connected to amqp://guest:**@192.168.3.108:5672//
[2018-10-01 12:46:35,216: INFO/MainProcess] mingle: searching for neighbors
[2018-10-01 12:46:36,266: INFO/MainProcess] mingle: all alone
[2018-10-01 12:46:36,307: INFO/MainProcess] celery@Python3 ready.

啓動後看一下[tasks],新加的定時任務已經列出來了,以前的任務也都在。
啓動Beat
這裏-A後面要寫全 CeleryPro.periodic4 ,和啓動Worker的參數有點不同:

[root@Python3 ~]# celery -A CeleryPro.periodic4 beat -l info
celery beat v4.2.1 (windowlicker) is starting.
__    -    ... __   -        _
LocalTime -> 2018-10-01 12:45:04
Configuration ->
    . broker -> amqp://guest:**@192.168.3.108:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2018-10-01 12:45:04,934: INFO/MainProcess] beat: Starting...
[2018-10-01 12:45:05,006: INFO/MainProcess] Scheduler: Sending due task hello every 10 (CeleryPro.periodic4.hello)
[2018-10-01 12:45:05,356: INFO/MainProcess] Scheduler: Sending due task CeleryPro.periodic4.upper('abcdefg') (CeleryPro.periodic4.upper)

啓動以後立刻就把2個每隔一段時間執行的任務發送給Worker執行了,以後會根據定義的間隔繼續發送。
另一個用crontab設置的任務須要等到時間匹配上了纔會發送。當時是45分,等到46分就會執行了。

舊版本的作法同樣能夠用
上面說了,新版主要是多提供了一個裝飾器。不用新提供的裝飾器,依然能夠把定時任務寫在配置裏:

# CeleryPro/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='amqp://192.168.3.108',
             backend='rpc',
             include=['CeleryPro.tasks'])

app.conf.beat_schedule = {
    'every 5 seconds': {
        'task': 'CeleryPro.tasks.upper',
        'schedule': 5,
        'args': ('xyz',)
    }
}

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

這裏就是在配置裏設置,定時啓動一個普通任務。這裏把include裏的CeleryPro.periodic4刪掉了,留着也沒影響。
任務文件tasks.py仍是以前的那個,具體以下:

# CeleryPro/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("計算2個值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

最後啓動Worker,啓動Breat試一下:

[root@Python3 ~]# celery -A CeleryPro beat -l info

這裏Beat的-A參數用 CeleryPro 也能啓動這裏的定時任務。CeleryPro.tasks 效果也是同樣的。另外若是把periodic4.py加到include列表裏去,用 CeleryPro.periodic4 參數啓動的話,這裏的定時任務也會啓動。
這裏也是支持用crontab的,用法和以前的同樣,把schedule參數的值換成調用crontab的函數。

小結

上面的兩種定時任務的方法,各有應用場景。
若是要改任務執行的函數,只能改代碼,而後重啓Worker了。
這裏要說的是改計劃(包括新增、取消和修改計劃週期),可是任務執行的函數不變。用@app.on_after_configure.connect裝飾器,是把計劃寫死在一個函數裏了。彷佛沒法動態添加新任務。不過好處是結構比較清晰。
然後一種方法,只要更新一下 app.conf.beat_schedule 這個字典裏的配置信息,而後重啓Beat就能生效了。

crontab 舉例

下面是crontab的一些例子:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,hour='0,3,6,9,12,15,18,21') Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='',hour='', day_of_week='sun') Same as previous.
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri') Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.
crontab(minute=0, hour='/2,/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of 「15」, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(day_of_month='2') Execute on the second day of every month.
crontab(day_of_month='2-30/3') Execute on every even numbered day.
crontab(day_of_month='1-7,15-21') Execute on the first and third weeks of the month.
crontab(day_of_month='11',month_of_year='5') Execute on 11th of May every year.
crontab(month_of_year='*/3') Execute on the first month of every quarter.

日程表(Solar schedules)

4版本里還提供這樣的方法來指定計劃

If you have a task that should be executed according to sunrise, sunset, dawn or dusk, you can use the solar schedule type:
若是你有一個任務,是根據日出,日落,黎明或黃昏來執行的,你可使用日程表類型:

全部事件都是根據UTC時間計算的,因此不受時區設置影響。官網的例子:

from celery.schedules import solar

app.conf.beat_schedule = {
    # Executes at sunset in Melbourne
    'add-at-melbourne-sunset': {
        'task': 'tasks.add',
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16),
    },
}

這裏solar函數要提供3個參數,事件、緯度、經度。經緯度使用的標誌看下錶:

Sign Argument Meaning
+ latitude North
- latitude South
+ longitude East
- longitude West

支持的事件類型以下:

Event Meaning
dawn_astronomical Execute at the moment after which the sky is no longer completely dark. This is when the sun is 18 degrees below the horizon.
dawn_nautical Execute when there’s enough sunlight for the horizon and some objects to be distinguishable; formally, when the sun is 12 degrees below the horizon.
dawn_civil Execute when there’s enough light for objects to be distinguishable so that outdoor activities can commence; formally, when the Sun is 6 degrees below the horizon.
sunrise Execute when the upper edge of the sun appears over the eastern horizon in the morning.
solar_noon Execute when the sun is highest above the horizon on that day.
sunset Execute when the trailing edge of the sun disappears over the western horizon in the evening.
dusk_civil Execute at the end of civil twilight, when objects are still distinguishable and some stars and planets are visible. Formally, when the sun is 6 degrees below the horizon.
dusk_nautical Execute when the sun is 12 degrees below the horizon. Objects are no longer distinguishable, and the horizon is no longer visible to the naked eye.
dusk_astronomical Execute at the moment after which the sky becomes completely dark; formally, when the sun is 18 degrees below the horizon.

在Django中使用的最佳實踐

在django中使用的話,能夠把celery的配置直接寫在django的settings.py文件裏。另外任務函數則寫在tasks.py文件裏放在各個app的目錄下。每一個app下均可以有一個tasks.py,全部的任務都是共享的。

建立目錄結構

建立一個django的項目,項目名稱就叫UsingCeleryWithDjango,app的名字就app01好了。建立好項目後,在項目目錄下建立CeleryPro目錄,目錄下建一個celery.py文件。目錄結構以下:

UsingCeleryWithDjango
│
├─manage.py
│
├─app01
│  │  admin.py
│  │  apps.py
│  │  models.py
│  │  tests.py
│  │  views.py
│  └  __init__.py
│
├─CeleryPro
│  │  celery.py
│  └  __init__.py
│
├─templates
│
└─UsingCeleryWithDjango
    │  settings.py
    │  urls.py
    │  wsgi.py
    └  __init__.py

上面只要關注一下CeleryPro的結構和位置就行了,其餘都是建立django項目後的默認內容。
CeleryPro/celery.py 文件,是用來建立celery實例的。
CeleryPro/__init__.py 文件,須要確保當Django啓動時加載celery。以後在app裏會用到celery模塊裏的 @shared_task 這個裝飾器。

CeleryPro 示例代碼

具體的示例文件在官方的github裏,3版本和4版本有一些的區別。
最新版的:https://github.com/celery/celery/tree/master/examples/django
3.1版本的:https://github.com/celery/celery/tree/3.1/examples/django
Github裏也能夠自行切換各個版本的分支查看。
下面我就在Windows上用3.1.25版本搞一遍了。

# UsingCeleryWithDjango/CeleryPro/__init__.py
from __future__ import absolute_import, unicode_literals
__author__ = '749B'

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

# UsingCeleryWithDjango/CeleryPro/celery.py
from __future__ import absolute_import
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'UsingCeleryWithDjango.settings')

from django.conf import settings  # noqa

app = Celery('CeleryPro')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# 自動發現全部app下的tasks
# 可是,新版django的INSTALLED_APPS的寫法沒法發現到
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 這是官方示例的寫法
'''
# 這裏是setting.py裏的INSTALLED_APPS部分
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',  # 這種寫法自動發現找不到tasks
    # 'app01',  # 這種寫法就能自動發現
]
'''
# 或者不想改settings.INSTALLED_APPS,那就本身把app的列表寫在一個列表裏做爲參數吧
app.autodiscover_tasks(['app01'])  # 這裏我就這麼

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

這裏有個坑,我寫了一段註釋,寫的應該比較清楚了。

任務文件 tasks

在app下建立tasks.py文件(和models.py文件同一級目錄),建立任務。

- app01/
    - app01/tasks.py
    - app01/models.py

tasks.py文件裏建立的函數用的是 @shared_task 這個裝飾器。這些任務是全部app共享的。

# UsingCeleryWithDjango/app01/tasks.py
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

設置settings.py

這個是django的配置文件,不過如今celery的配置也均可以寫在這裏了:

# UsingCeleryWithDjango/UsingCeleryWithDjango/settings.py

# 其餘都是django的配置內容,就省了
# Celery settings

BROKER_URL = 'redis://192.168.246.11/0'
CELERY_RESULT_BACKEND = 'redis://192.168.246.11/0'

這裏就作最基本的設置,用redis收任務和存任務結果,其餘都默認了設置了。

啓動Worker

啓動命令是同樣的,關鍵就是-A後面的參數:

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info
[2018-10-02 20:55:56,411: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@IDX-xujf v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x27f5e4dbe80
- ** ---------- .> transport:   redis://192.168.246.11:6379/0
- ** ---------- .> results:     redis://192.168.246.11/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.celery.debug_task
  . app01.tasks.add
  . app01.tasks.mul
  . app01.tasks.xsum

[2018-10-02 20:55:56,548: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0
[2018-10-02 20:55:56,576: INFO/MainProcess] mingle: searching for neighbors
[2018-10-02 20:55:57,596: INFO/MainProcess] mingle: all alone
[2018-10-02 20:55:57,647: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-10-02 20:55:57,653: WARNING/MainProcess] celery@IDX-xujf ready.

上面這樣就是成功啓動了,確認一下[tasks]下面的任務是否都有就沒問題了。
關於這個[tasks]下面的內容,就是全部咱們自定義的任務的名字,下面研究了一下本身如何獲取到這些任務名字

獲取到全部的tasks

全部的tasks均可以經過app.tasks獲取到。這個app就是 CeleryPro/celery.py 裏 app = Celery('CeleryPro') 生成的實例。而且在 CeleryPro/__init__.py 裏經過 from .celery import app as celery_app 換了個別名,因此在這個項目裏應該是 celery_app.tasks 。
打印celery_app.tasks結果以下:

{'celery.chord_unlock': <@task: celery.chord_unlock of CeleryPro:0x2360b07fc88>, 'celery.group': <@task: celery.group of CeleryPro:0x2360b07fc88>, 'app01.tasks.xsum': <@task: app01.tasks.xsum of CeleryPro:0x2360b07fc88>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of CeleryPro:0x2360b07fc88>, 'app01.tasks.add': <@task: app01.tasks.add of CeleryPro:0x2360b07fc88>, 'celery.map': <@task: celery.map of CeleryPro:0x2360b07fc88>, 'app01.tasks.mul': <@task: app01.tasks.mul of CeleryPro:0x2360b07fc88>, 'celery.chain': <@task: celery.chain of CeleryPro:0x2360b07fc88>, 'CeleryPro.celery.debug_task': <@task: CeleryPro.celery.debug_task of CeleryPro:0x2360b07fc88>, 'celery.starmap': <@task: celery.starmap of CeleryPro:0x2360b07fc88>, 'celery.chord': <@task: celery.chord of CeleryPro:0x2360b07fc88>, 'celery.chunks': <@task: celery.chunks of CeleryPro:0x2360b07fc88>}

咱們的任務都在裏面了,可是還多了不少其餘的任務(都是celery開頭的)。以前啓動Worker的時候都是用 -l info 參數,若是用 -l debug 參數也是能看到這些任務的。也就是說celery在啓動Worker的時候作了個過濾,debug模式打印全部,info模式只打印用戶自定義的任務。接下來如今就是去源碼裏找一下,看看是怎麼作過濾的。
我在源碼裏截取了下面這些來分析一下:

# celery/apps/worker.py

# 首先是一些在啓動時會打印到控制檯的字符串內容
# 這個是LOGO,這個不是重點
ARTLINES = [
    ' --------------',
    '---- **** -----',
    '--- * ***  * --',
    '-- * - **** ---',
    '- ** ----------',
    '- ** ----------',
    '- ** ----------',
    '- ** ----------',
    '- *** --- * ---',
    '-- ******* ----',
    '--- ***** -----',
    ' --------------',
]

# 這個字符串就是打印任務列表的字符串
# 輸出到控制檯以前,會用format作一下字符串格式化,這樣任務列表就能動態的輸出了
EXTRA_INFO_FMT = """
[tasks]
{tasks}
"""

# 這個類裏有不少方法,這裏就看看動態獲取任務列表的恨啊
class Worker(WorkController):

    # 這個就是生成任務列表的方法
    # 邏輯也很簡單就是判斷是否是以 'celery' 開頭
    # include_builtins 爲True就輸出全部的task,爲False就過濾掉'celery'開頭的
    # include_builtins 具體的值看下面的extra_info方法
    def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
        return sep.join(
            '  . {0}'.format(task) for task in sorted(self.app.tasks)
            if (not task.startswith(int_) if not include_builtins else task)
        )

    # 這個方法是調用上面的tasklist方法的
    # 先判斷啓動級別,根據級別是否小於等於debug,決定include_builtins參數
    # 最後用tasklist返回的結果,格式化EXTRA_INFO_FMT
    def extra_info(self):
        if self.loglevel <= logging.INFO:
            include_builtins = self.loglevel <= logging.DEBUG
            tasklist = self.tasklist(include_builtins=include_builtins)
            return EXTRA_INFO_FMT.format(tasks=tasklist)

過濾方法很簡單,就是用startswith過濾掉以celery開頭的key就行了。另外過濾以前先用sorted作了個排序,順便把字典變成了用key組成的列表。
因此用下面的方法就能夠獲取到任務列表:

from CeleryPro import celery_app

def celery_list(request):
    task_list = []
    for task in sorted(celery_app.tasks):
        if not task.startswith('celery.'):
            task_list.append(task)
    print(task_list)
    return HttpResponse('OK')

上面的代碼最終得到的是一個列表,能夠直接用一個列表生成式搞定:

task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]

這裏拿到的只是任務的key,要調用任務的話,就用key在celery_app.tasks這個字典裏獲取到對應的value,調用這個value的方法:

task_name = task_list[1]
    t = celery_app.tasks[task_name].delay(1, 2)

在views裏調用任務

調用任務的具體作法,上一節最後已經有了。可是獲取任務執行結果還有些問題。
以前的作法都是在調用delay方法時獲取返回值,就是這個任務的對象,有了返回的對象,就能夠判斷任務是否執行完成以及獲取任務執行結果。
可是如今在views視圖函數裏提交任務後,函數就返回結束了,任務的對象就沒有了,而且也是沒法把這裏的對象直接返回給瀏覽器的。這裏就須要返回一個任務的id(就是爲每一個任務生成的uuid)。以後請求時,就經過這個uuid獲取到以前的任務的對象。

# 要經過uuid獲取對象,使用下面這個方法
from celery.result import AsyncResult

task_obj = AsyncResult(uuid)  # 經過uuid獲取到任務對象
# 先獲取到對象,以後的操做就和以前的同樣了
task_obj.ready()  # 檢查任務是否執行完成
task_obj.get()  # 阻塞的拿結果
task_obj.result  # 任務執行完成後,結果就存在這裏,就不要再用get方法獲取了

下面是我測試寫的示例代碼
前端頁面
這個頁面能夠選擇任務,填好參數,提交後臺執行。提交後會跳轉到任務結果頁面:

# UsingCeleryWithDjango/templates/celery_list.html

<body>
<form method="post">
    {% csrf_token %}
    <select name="task_name">
        {% for task in task_list %}
            <option value="{{ task }}">{{ task }}</option>
        {% endfor %}
    </select>
    <input name="args" type="text" placeholder="參數" />
    <input type="submit" value="提交任務" />
</form>
<h3>Tips: 後臺會用json.loads把input提交的參數作一次反序列化,而後用*args傳參</h3>
<p>debug_task方法,參數不填</p>
<p>add和mul方法,參數填個2個元素的列表。好比:[1, 2]</p>
<p>xsum方法,參數接收一個列表,因此要再包一層[]。好比:[[1, 2, 3, 4, 5]]</p>
</body>

路由函數
有兩個url,一個是提交任務頁面的url。還有一個url是根據uuid拿任務結果的,這個視圖沒寫html,直接用HttpResponse返回了:

# UsingCeleryWithDjango/UsingCeleryWithDjango/urls.py

from django.contrib import admin
from django.urls import path
from app01 import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('celery_list/', views.celery_list),
    path('celery_result/<uuid:uuid>/', views.celery_result),
]

視圖函數

# UsingCeleryWithDjango/app01/views.py

from django.shortcuts import render, redirect, HttpResponse
# Create your views here.
from CeleryPro import celery_app
from celery.result import AsyncResult
import json

def celery_list(request):

    if request.method == 'POST':
        task_name = request.POST.get('task_name')
        args = request.POST.get('args')
        if args:
            t = celery_app.tasks[task_name].delay(*json.loads(args))
            return redirect('/celery_result/%s/' % t.id)
        else:
            celery_app.tasks[task_name]()

    # 參考源碼的方法,獲取到全部task名字的列表
    task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]
    return render(request, 'celery_list.html', {'task_list': task_list})

def celery_result(request, uuid):
    uuid = str(uuid)
    task_obj = AsyncResult(uuid)
    if task_obj.ready():
        return HttpResponse(task_obj.result)
    else:
        ele = "<input type='button' value='再次獲取結果' onclick='location.reload();'>"
        return HttpResponse('Not Ready %s' % ele)

測試下來都很好,不過全部任務都是馬上會返回結果的。因此去修改一下tasks.py裏的任務。找個任務加點延遲 time.sleep() ,若是任務沒有執行完成,也不會卡住,而是先返回一個頁面,能夠再刷新,若是執行完成了,就能返回任務執行的結果。

在django中使用定時任務

要在django中使用定時任務,到這裏須要再安裝一個模塊:

pip install django_celery_beat

這個模塊是 django_celery_beat ,注意名字裏是下劃線,不過命令裏用中橫槓也認(大概是作了別名)。這個模塊不只僅只是作定時任務,它是經過把任務存到django的數據庫裏實現的,因此還能夠很方便的經過django admin來設置和管理。
注意:安裝這個模塊的時候還會自動安裝一些別的依賴模塊,不過坑的地方是,會把本來的celery更新到最新版,也就是號稱不支持windows的4版本。

既然升級了,就先在當前的環境下跑跑試試看。而後踩了2個坑。
我用的是win10系統,部分因爲windows操做系統致使的問題,不知道通用性是如何的。

沒法自動發現app的任務

worker能夠正常啓動,頁面也能打開,可是app裏定義的任務都找不到了。
自動發放全部app下的tasks是在 "UsingCeleryWithDjango/CeleryPro/celery.py" 這個文件裏配置的,具體是調用下面的這個方法:

# from django.conf import settings  # noqa
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 從django的settings裏獲取app的路徑
app.autodiscover_tasks(['app01'],)  # 本身指定

去看了下這個方法的源碼,一大段註釋,不過內容很簡單:

def autodiscover_tasks(self, packages=None,
                           related_name='tasks', force=False):
        """Auto-discover task modules.

        Searches a list of packages for a "tasks.py" module (or use
        related_name argument).

        If the name is empty, this will be delegated to fix-ups (e.g., Django).

        For example if you have a directory layout like this:

        .. code-block:: text

            foo/__init__.py
               tasks.py
               models.py

            bar/__init__.py
                tasks.py
                models.py

            baz/__init__.py
                models.py

        Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will
        result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.

        Arguments:
            packages (List[str]): List of packages to search.
                This argument may also be a callable, in which case the
                value returned is used (for lazy evaluation).
            related_name (str): The name of the module to find.  Defaults
                to "tasks": meaning "look for 'module.tasks' for every
                module in ``packages``."
            force (bool): By default this call is lazy so that the actual
                auto-discovery won't happen until an application imports
                the default modules.  Forcing will cause the auto-discovery
                to happen immediately.
        """
        if force:
            return self._autodiscover_tasks(packages, related_name)
        signals.import_modules.connect(starpromise(
            self._autodiscover_tasks, packages, related_name,
        ), weak=False, sender=self)

內容就是一個if,而後返回某個東西。關鍵是if的條件,是一個默認參數爲false的變量,因此用默認方法調用,是不會執行任何語句的。解決辦法就很簡單了,調用的時候指定force參數:

app.autodiscover_tasks(['app01'], force=True)  # 4版本有個force參數。默認是False,須要設爲True

執行任務報錯

啓動worker(-l info),打開網頁,提交任務。而後報錯。worker上的錯誤信息以下:

[2018-10-08 13:23:28,062: INFO/MainProcess] Received task: app01.tasks.add[ff0f5e76-6474-4f74-a93c-7b2486abe07e]
[2018-10-08 13:23:28,078: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
  File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\billiard\pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

這個問題基本上判斷下來就是4版本不支持windows系統致使的。
經過celery降級解決問題
這小段看看就好,由於後面有不降級的辦法。
到這裏我就沒能力看懂錯誤信息而後找出真正的問題了,只能把celery的版本降回去再看看了:

pip uninstall celery
pip install celery==3.1.25

所謂降級,其實就是先刪了,而後再裝一箇舊版本。這條路我沒繼續走下去。

4版本的celery仍是能用的
有發現個新的辦法,能夠解決這裏的問題,還須要再裝一個模塊:

pip install eventlet

裝完以後,加一個新的參數啓動worker,"-P eventlet" :

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info -P eventlet

 -------------- celery@IDX-xujf v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0 2018-10-08 13:33:21
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x16ad81d16a0
- ** ---------- .> transport:   redis://192.168.246.11:6379/0
- ** ---------- .> results:     redis://192.168.246.11/0
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.celery.debug_task
  . app01.tasks.add
  . app01.tasks.mul
  . app01.tasks.xsum

[2018-10-08 13:33:21,430: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0
[2018-10-08 13:33:21,457: INFO/MainProcess] mingle: searching for neighbors
[2018-10-08 13:33:22,488: INFO/MainProcess] mingle: all alone
[2018-10-08 13:33:22,502: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-10-08 13:33:22,504: INFO/MainProcess] celery@IDX-xujf ready.
[2018-10-08 13:33:22,519: INFO/MainProcess] pidbox: Connected to redis://192.168.246.11:6379/0.
[2018-10-08 13:34:13,596: INFO/MainProcess] Received task: app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d]
[2018-10-08 13:34:13,611: INFO/MainProcess] Task app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d] succeeded in 0.0s: 7

上面是worker的日誌,啓動後,還提交了一個任務,此次正常處理完了。

使用 Django_Celery_Beat

先在settings的INSTALLED_APPS裏註冊一下:

INSTALLED_APPS = [
    ......
    'django_celery_beat',
]

應用django_celery_beat的數據庫,會自動建立幾張表。只要直接migrate就行了:

>python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions
Running migrations:
  Applying django_celery_beat.0001_initial... OK
  Applying django_celery_beat.0002_auto_20161118_0346... OK
  Applying django_celery_beat.0003_auto_20161209_0049... OK
  Applying django_celery_beat.0004_auto_20170221_0000... OK
  Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
  Applying django_celery_beat.0006_auto_20180210_1226... OK

登陸django admin後,就能看下以下的幾張表了:
Celery 全面學習筆記

任務都是配置在Periodic tasks表裏的。另外幾張表就是各類任務執行週期的。

配置任務

先進入 Intervals 表,新建任務週期。這裏建一個每5秒的週期。
Celery 全面學習筆記

而後進入 Periodic tasks 表,選擇要執行的任務,關聯上某個週期。
這裏能看到的任務就是經過自動發現註冊的任務:
Celery 全面學習筆記

下面還有填寫任務參數的部分,這裏有兩個框,裏面寫JSON。位置參數寫上面,關鍵參數寫下面:
Celery 全面學習筆記

這裏的JSON會反序列化以後,以 "*args, **kwargs" 傳遞給任務函數的。
好了任務配置完了,其餘任務週期也是同樣的,就不試了。

啓動Beat

這裏依然須要啓動一個Beat來定時發任務的。先把Worker起動起來,而後啓動Beat須要多加一個參數 "-S django" :

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro beat -l info -S django
celery beat v4.2.1 (windowlicker) is starting.
__    -    ... __   -        _
LocalTime -> 2018-10-08 14:43:43
Configuration ->
    . broker -> redis://192.168.246.11:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2018-10-08 14:43:43,907: INFO/MainProcess] beat: Starting...
[2018-10-08 14:43:43,908: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:48,911: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:48,939: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)
[2018-10-08 14:43:53,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)
[2018-10-08 14:43:58,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)
[2018-10-08 14:43:59,534: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:59,717: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:59,727: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:59,729: INFO/MainProcess] Writing entries...

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>

注意:每次修改任務,都須要重啓Beat,最新的配置才能生效。這個對 Intervals 的任務(每隔一段時間執行的),影響比較大。Crontab的任務問題貌似不是很大。

相關文章
相關標籤/搜索