Celery 是 Distributed Task Queue,分佈式任務隊列。分佈式決定了能夠有多個 worker 的存在,隊列表示其是異步操做。前端
Celery4.x 開始再也不支持Windows平臺了。3.1.26是最後目前最新的3.x版本,下面裝的是3.1.25。python
pip install celery pip install celery==3.1.25
pip install redis
pip install -U 'celery[redis]'
使用命令 celery --version
>celery --version 'celery' 不是內部或外部命令,也不是可運行的程序 或批處理文件。
>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery --version 3.1.25 (Cipater)
# task1.py from celery import Celery # 建立Celery實例 app = Celery('tasks', broker='redis://', ) # 建立任務 @app.task def add(x, y): print("計算2個值的和: %s %s" % (x, y)) return x+y
若是使用RabbitMQ,則把broker修改爲這個 broker='amqp://'
啓動Celery Worker來開始監聽並執行任務:
$ celery -A task1 worker --loglevel=info $ celery -A task1 worker --l debug # 或者能夠這麼起
參數 -A 後跟的是Celery實例,實例的名字能夠省略,寫全是 task1.app
。你要把目錄切換到task1文件所在的目錄執行命令,或者看看有沒有參數能把文件目錄加到python的環境變量中去。由於-A 以後的參數是做爲python的模塊來導入的。因此像下面這樣,我也把Worker跑起來了:
G:\>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A Steed.Documents.PycharmProjects.Celery.task1 worker --loglevel=info [2018-09-28 17:55:10,715: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@IDX-xujf v3.1.25 (Cipater) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x1fb5056fda0 - ** ---------- .> transport: redis:// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . Steed.Documents.PycharmProjects.Celery.task1.add [2018-09-28 17:55:10,864: INFO/MainProcess] Connected to redis:// [2018-09-28 17:55:10,922: INFO/MainProcess] mingle: searching for neighbors [2018-09-28 17:55:11,961: INFO/MainProcess] mingle: all alone [2018-09-28 17:55:11,980: WARNING/MainProcess] celery@IDX-xujf ready.
要給Worker發送任務,須要調用 delay() 方法,下面是在IDLE上操做的:
>>> import sys >>> dir = r"G:\Steed\Documents\PycharmProjects\Celery" >>> sys.path.append(dir) # 個人任務文件不在環境變量裏,IDLE找不到 >>> from task1 import add >>> add.delay(1, 2) <AsyncResult: 4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a> >>>
[2018-09-29 11:10:33,103: INFO/MainProcess] Received task: task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] [2018-09-29 11:10:33,107: WARNING/Worker-1] 計算2個值的和: 1 2 [2018-09-29 11:10:33,109: INFO/MainProcess] Task task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] succeeded in 0s: 3
>>> t = add.delay(3, 4) >>> type(t) # 查看返回值的類型 <class 'celery.result.AsyncResult'> >>> t.get() # 這句會報錯 Traceback (most recent call last): File "<pyshell#16>", line 1, in <module> t.get() File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\result.py", line 169, in get no_ack=no_ack, File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\backends\base.py", line 616, in _is_disabled 'No result backend configured. ' NotImplementedError: No result backend configured. Please see the documentation for more information.
from celery import Celery app = Celery('tasks', broker='redis://', backend='redis://', # 此次把端口號什麼的都省了 ) @app.task def add(x, y): print("計算2個值的和: %s %s" % (x, y)) return x+y
>>> t = add.delay(1, 1) >>> t.get() 2 >>>
app = Celery('tasks', broker='amqp://', backend='rpc://', # 新版本rpc將初步替代amqp,用的仍是RabbitMQ # backend='amqp://', # 若是是舊版本,沒有rpc,那隻能用amqp )
import time @app.task def upper(v): for i in range(10): time.sleep(1) print(i) return v.upper()
>>> t = upper.delay("abc") >>> t.get() 'ABC'
>>> t = upper.delay("abcd") >>> t.ready() False >>> t.ready() False >>> t.ready() False >>> t.ready() True >>> t.get() 'ABCD' >>
>>> t = upper.delay("abcde") >>> t.get(timeout=11) 'ABCDE' >>> t = upper.delay("abcde") >>> t.get(timeout=1) Traceback (most recent call last): File "<pyshell#17>", line 1, in <module> t.get(timeout=1) File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 169, in get no_ack=no_ack, File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\backends\base.py", line 238, in wait_for raise TimeoutError('The operation timed out.') celery.exceptions.TimeoutError: The operation timed out. >>>
>>> t = upper.delay(123) >>>
[2018-09-29 12:57:07,077: ERROR/MainProcess] Task task1.upper[11820ee6-6936-4680-93c2-462487ec927e] raised unexpected: AttributeError("'int' object has no attribute 'upper'",) Traceback (most recent call last): File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "G:\Steed\Documents\PycharmProjects\Celery\task1.py", line 25, in upper return v.upper() AttributeError: 'int' object has no attribute 'upper'
>>> t = upper.delay(123) >>> t.get() Traceback (most recent call last): File "<pyshell#27>", line 1, in <module> t.get() File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 175, in get raise meta['result'] AttributeError: 'int' object has no attribute 'upper' >>>
>>> t.get(propagate=False) AttributeError("'int' object has no attribute 'upper'",) >>>
traceback 裏面存着錯誤信息
>>> t.traceback 'Traceback (most recent call last):\n File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 240, in trace_task\n R = retval = fun(*args, **kwargs)\n File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 438, in __protected_call__\n return self.run(*args, **kwargs)\n File "G:\\Steed\\Documents\\PycharmProjects\\Celery\\task1.py", line 25, in upper\n return v.upper()\nAttributeError: \'int\' object has no attribute \'upper\'\n' >>>
啓動Celery Worker來開始監聽並執行任務
$ celery -A tasks worker --loglevel=info
>>> from tasks import add >>> t = add.delay(4, 4)
>>> t.get() >>> t.get(timeout=1)
>>> t.ready()
>>> t.get(propagate=False) >>> t.traceback # 打印異常詳細結果
CeleryPro ├─__init.py ├─celery.py ├─tasks.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('CeleryPro', broker='redis://', backend='redis://', include=['CeleryPro.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
第一句 from __future__ import absolute_import, unicode_literals
,後面的unicode_literals不知道是什麼。不過前面的absolute_import是絕對引入。由於這個文件的文件名就是celery,因此默認後面的 form celery
是引入這個文件,但咱們實際須要的是引入celery模塊,因此用了絕對引入這個模塊。若是要引入這個文件,能夠這麼寫 from .celery
from __future__ import absolute_import, unicode_literals from .celery import app import time @app.task def add(x, y): print("計算2個值的和: %s %s" % (x, y)) return x+y @app.task def upper(v): for i in range(10): time.sleep(1) print(i) return v.upper()
啓動的時候,-A 參數後面用應用名稱 CeleryPro 。你還須要cd到你CeleryPro的父級目錄上啓動,不然找不到:
>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A CeleryPro worker -l info [2018-09-29 15:06:20,818: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@IDX-xujf v3.1.25 (Cipater) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: CeleryPro:0x21deadaf470 - ** ---------- .> transport: redis:// - ** ---------- .> results: redis:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . CeleryPro.tasks.add . CeleryPro.tasks.upper [2018-09-29 15:06:20,953: INFO/MainProcess] Connected to redis:// [2018-09-29 15:06:20,983: INFO/MainProcess] mingle: searching for neighbors [2018-09-29 15:06:21,994: INFO/MainProcess] mingle: all alone [2018-09-29 15:06:22,055: WARNING/MainProcess] celery@IDX-xujf ready.
celery -A CeleryPro worker -loglevel=info # 前臺啓動不推薦 celery -A CeleryPro worker -l info # 前臺啓動簡寫 celery multi start w1 -A CeleryPro -l info # 推薦用後臺啓動
操做都要在CeleryPro的父級目錄下執行,就是說只要保證CeleryPro的父級目錄在環境變量裏。或者用 sys.path.append()
celery -A 項目名 worker -loglevel=info
: 前臺啓動命令celery multi start w1 -A 項目名 -l info
: 後臺啓動命令celery multi restart w1 -A 項目名 -l info
: 後臺重啓命令celery multi stop w1 -A 項目名 -l info
: 後臺中止命令先後臺的區別:後臺是經過mult啓動的。
[root@Python3 ~]# celery multi start w1 -A CeleryPro -l info celery multi v4.2.1 (windowlicker) > Starting nodes... > w1@Python3: OK [root@Python3 ~]# celery multi start w2 -A CeleryPro -l info celery multi v4.2.1 (windowlicker) > Starting nodes... > w2@Python3: OK [root@Python3 ~]# celery multi start w3 -A CeleryPro -l info celery multi v4.2.1 (windowlicker) > Starting nodes... > w3@Python3: OK [root@Python3 ~]# ps -ef | grep celery root 1346 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3 root 1350 1346 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3 root 1360 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3 root 1364 1360 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3 root 1374 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3 root 1378 1374 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3 root 1391 1251 0 20:55 pts/0 00:00:00 grep --color=auto celery [root@Python3 ~]# celery multi stop w1 celery multi v4.2.1 (windowlicker) > Stopping nodes... > w1@Python3: TERM -> 1346 [root@Python3 ~]# ps -ef | grep celery root 1360 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3 root 1364 1360 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3 root 1374 1 0 20:49 ? 00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3 root 1378 1374 0 20:49 ? 00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3 root 1398 1251 0 20:57 pts/0 00:00:00 grep --color=auto celery [root@Python3 ~]#
File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\platforms.py", line 429, in detached raise RuntimeError('This platform does not support detach.') RuntimeError: This platform does not support detach. > w1@IDX-xujf: * Child terminated with errorcode 1 FAILED
if not resource: raise RuntimeError('This platform does not support detach.')
# 在開頭獲取了這個resource的值 resource = try_import('resource') # 上面的try_import方法,在另一個文件裏 def try_import(module, default=None): """Try to import and return module, or return None if the module does not exist.""" try: return importlib.import_module(module) except ImportError: return default # 下面有一個方法註釋裏代表resource爲None表明是Windows def get_fdmax(default=None): """Return the maximum number of open file descriptors on this system. :keyword default: Value returned if there's no file descriptor limit. """ try: return os.sysconf('SC_OPEN_MAX') except: pass if resource is None: # Windows return default fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if fdmax == resource.RLIM_INFINITY: return default return fdmax
上面作的就是要嘗試導入一個模塊 「resource」 。該模塊只用於Unix。
# CeleryPro/tasks.py from __future__ import absolute_import, unicode_literals from .celery import app import time @app.task def add(x, y): print("計算2個值的和: %s %s" % (x, y)) return x+y @app.task def upper(v): for i in range(10): time.sleep(1) print(i) return v.upper() # CeleryPro/celery.py from __future__ import absolute_import, unicode_literals from celery import Celery from celery.schedules import crontab from datetime import timedelta app = Celery('CeleryPro', broker='redis://', backend='redis://', include=['CeleryPro.tasks']) app.conf.CELERYBEAT_SCHEDULE = { 'add every 10 seconds': { 'task': 'CeleryPro.tasks.add', 'schedule': timedelta(seconds=10), # 能夠用timedelta對象 # 'schedule': 10, # 也支持直接用數字表示秒數 'args': (1, 2) }, 'upper every 2 minutes': { 'task': 'CeleryPro.tasks.upper', 'schedule': crontab(minute='*/2'), 'args': ('abc', ), }, } # app.conf.CELERY_TIMEZONE = 'UTC' app.conf.CELERY_TIMEZONE = 'Asia/Shanghai' # Optional configuration, see the application user guide. app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, ) if __name__ == '__main__': app.start()
任務結果過時設置 `CELERY_TASK_RESULT_EXPIRES=3600' 。默認設置是1天,官網介紹這是靠一個內置的週期性任務把超過期限的任務結果給清除的。
A built-in periodic task will delete the results after this time (celery.task.backend_cleanup).
G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro worker -l info G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro beat -l info
新版的好處是,能夠把定時任務和普通的任務同樣單獨定義了。多了 @app.on_after_configure.connect
# CeleryPro/periodic4.py from __future__ import absolute_import, unicode_literals from .celery import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 每10秒執行一次 sender.add_periodic_task(10.0, hello.s(), name='hello every 10') # 給任務取個名字 # 每30秒執行一次 sender.add_periodic_task(30, upper.s('abcdefg'), expires=10) # 設置任務超時時間10秒 # 執行週期和Linux的計劃任務crontab設置同樣 sender.add_periodic_task( crontab(hour='*', minute='*/2', day_of_week='*'), add.s(11, 22), ) @app.task def hello(): print('Hello World') @app.task def upper(arg): return arg.upper() @app.task def add(x, y): print("計算2個值的和: %s %s" % (x, y)) return x+y
name參數給計劃取名,這樣這個任務報告的時候就會使用name的值,像這樣:hello every 10
# CeleryPro/celery.py from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('CeleryPro', broker='amqp://', backend='rpc', include=['CeleryPro.tasks', 'CeleryPro.periodic4']) app.conf.timezone = 'UTC' # 計劃任務默認用的是UTC時間 # app.conf.timezone = 'Asia/Shanghai' # 也能夠更改成北京時間 # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
[root@Python3 ~]# celery -A CeleryPro worker -l info /usr/local/lib/python3.6/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@Python3 v4.2.1 (windowlicker) ---- **** ----- --- * *** * -- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-01 12:46:35 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: CeleryPro:0x7ffb0c8b2908 - ** ---------- .> transport: amqp://guest:**@ - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . CeleryPro.periodic4.add . CeleryPro.periodic4.hello . CeleryPro.periodic4.upper . CeleryPro.tasks.add . CeleryPro.tasks.upper [2018-10-01 12:46:35,187: INFO/MainProcess] Connected to amqp://guest:**@ [2018-10-01 12:46:35,216: INFO/MainProcess] mingle: searching for neighbors [2018-10-01 12:46:36,266: INFO/MainProcess] mingle: all alone [2018-10-01 12:46:36,307: INFO/MainProcess] celery@Python3 ready.
這裏-A後面要寫全 CeleryPro.periodic4
[root@Python3 ~]# celery -A CeleryPro.periodic4 beat -l info celery beat v4.2.1 (windowlicker) is starting. __ - ... __ - _ LocalTime -> 2018-10-01 12:45:04 Configuration -> . broker -> amqp://guest:**@ . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2018-10-01 12:45:04,934: INFO/MainProcess] beat: Starting... [2018-10-01 12:45:05,006: INFO/MainProcess] Scheduler: Sending due task hello every 10 (CeleryPro.periodic4.hello) [2018-10-01 12:45:05,356: INFO/MainProcess] Scheduler: Sending due task CeleryPro.periodic4.upper('abcdefg') (CeleryPro.periodic4.upper)
# CeleryPro/celery.py from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('CeleryPro', broker='amqp://', backend='rpc', include=['CeleryPro.tasks']) app.conf.beat_schedule = { 'every 5 seconds': { 'task': 'CeleryPro.tasks.upper', 'schedule': 5, 'args': ('xyz',) } } # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
# CeleryPro/tasks.py from __future__ import absolute_import, unicode_literals from .celery import app import time @app.task def add(x, y): print("計算2個值的和: %s %s" % (x, y)) return x+y @app.task def upper(v): for i in range(10): time.sleep(1) print(i) return v.upper()
[root@Python3 ~]# celery -A CeleryPro beat -l info
這裏Beat的-A參數用 CeleryPro
效果也是同樣的。另外若是把periodic4.py加到include列表裏去,用 CeleryPro.periodic4
然後一種方法,只要更新一下 app.conf.beat_schedule
Example | Meaning |
crontab() | Execute every minute. |
crontab(minute=0, hour=0) | Execute daily at midnight. |
crontab(minute=0, hour='*/3') | Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
crontab(minute=0,hour='0,3,6,9,12,15,18,21') | Same as previous. |
crontab(minute='*/15') | Execute every 15 minutes. |
crontab(day_of_week='sunday') | Execute every minute (!) at Sundays. |
crontab(minute='',hour='', day_of_week='sun') | Same as previous. |
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri') | Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0, hour='/2,/3') | Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') | Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of 「15」, which is divisible by 5). |
crontab(minute=0, hour='*/3,8-17') | Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(day_of_month='2') | Execute on the second day of every month. |
crontab(day_of_month='2-30/3') | Execute on every even numbered day. |
crontab(day_of_month='1-7,15-21') | Execute on the first and third weeks of the month. |
crontab(day_of_month='11',month_of_year='5') | Execute on 11th of May every year. |
crontab(month_of_year='*/3') | Execute on the first month of every quarter. |
If you have a task that should be executed according to sunrise, sunset, dawn or dusk, you can use the solar schedule type:
from celery.schedules import solar app.conf.beat_schedule = { # Executes at sunset in Melbourne 'add-at-melbourne-sunset': { 'task': 'tasks.add', 'schedule': solar('sunset', -37.81753, 144.96715), 'args': (16, 16), }, }
Sign | Argument | Meaning |
+ | latitude | North |
- | latitude | South |
+ | longitude | East |
- | longitude | West |
Event | Meaning |
dawn_astronomical | Execute at the moment after which the sky is no longer completely dark. This is when the sun is 18 degrees below the horizon. |
dawn_nautical | Execute when there’s enough sunlight for the horizon and some objects to be distinguishable; formally, when the sun is 12 degrees below the horizon. |
dawn_civil | Execute when there’s enough light for objects to be distinguishable so that outdoor activities can commence; formally, when the Sun is 6 degrees below the horizon. |
sunrise | Execute when the upper edge of the sun appears over the eastern horizon in the morning. |
solar_noon | Execute when the sun is highest above the horizon on that day. |
sunset | Execute when the trailing edge of the sun disappears over the western horizon in the evening. |
dusk_civil | Execute at the end of civil twilight, when objects are still distinguishable and some stars and planets are visible. Formally, when the sun is 6 degrees below the horizon. |
dusk_nautical | Execute when the sun is 12 degrees below the horizon. Objects are no longer distinguishable, and the horizon is no longer visible to the naked eye. |
dusk_astronomical | Execute at the moment after which the sky becomes completely dark; formally, when the sun is 18 degrees below the horizon. |
UsingCeleryWithDjango │ ├─manage.py │ ├─app01 │ │ admin.py │ │ apps.py │ │ models.py │ │ tests.py │ │ views.py │ └ __init__.py │ ├─CeleryPro │ │ celery.py │ └ __init__.py │ ├─templates │ └─UsingCeleryWithDjango │ settings.py │ urls.py │ wsgi.py └ __init__.py
CeleryPro/celery.py 文件,是用來建立celery實例的。
CeleryPro/__init__.py 文件,須要確保當Django啓動時加載celery。以後在app裏會用到celery模塊裏的 @shared_task
# UsingCeleryWithDjango/CeleryPro/__init__.py from __future__ import absolute_import, unicode_literals __author__ = '749B' # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',) # UsingCeleryWithDjango/CeleryPro/celery.py from __future__ import absolute_import import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'UsingCeleryWithDjango.settings') from django.conf import settings # noqa app = Celery('CeleryPro') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') # 自動發現全部app下的tasks # 可是,新版django的INSTALLED_APPS的寫法沒法發現到 # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 這是官方示例的寫法 ''' # 這裏是setting.py裏的INSTALLED_APPS部分 INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app01.apps.App01Config', # 這種寫法自動發現找不到tasks # 'app01', # 這種寫法就能自動發現 ] ''' # 或者不想改settings.INSTALLED_APPS,那就本身把app的列表寫在一個列表裏做爲參數吧 app.autodiscover_tasks(['app01']) # 這裏我就這麼 @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
- app01/ - app01/tasks.py - app01/models.py
tasks.py文件裏建立的函數用的是 @shared_task
# UsingCeleryWithDjango/app01/tasks.py # Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
# UsingCeleryWithDjango/UsingCeleryWithDjango/settings.py # 其餘都是django的配置內容,就省了 # Celery settings BROKER_URL = 'redis://' CELERY_RESULT_BACKEND = 'redis://'
G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info [2018-10-02 20:55:56,411: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@IDX-xujf v3.1.25 (Cipater) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: CeleryPro:0x27f5e4dbe80 - ** ---------- .> transport: redis:// - ** ---------- .> results: redis:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . CeleryPro.celery.debug_task . app01.tasks.add . app01.tasks.mul . app01.tasks.xsum [2018-10-02 20:55:56,548: INFO/MainProcess] Connected to redis:// [2018-10-02 20:55:56,576: INFO/MainProcess] mingle: searching for neighbors [2018-10-02 20:55:57,596: INFO/MainProcess] mingle: all alone [2018-10-02 20:55:57,647: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2018-10-02 20:55:57,653: WARNING/MainProcess] celery@IDX-xujf ready.
全部的tasks均可以經過app.tasks獲取到。這個app就是 CeleryPro/celery.py 裏 app = Celery('CeleryPro')
生成的實例。而且在 CeleryPro/__init__.py 裏經過 from .celery import app as celery_app
換了個別名,因此在這個項目裏應該是 celery_app.tasks 。
{'celery.chord_unlock': <@task: celery.chord_unlock of CeleryPro:0x2360b07fc88>, 'celery.group': <@task: celery.group of CeleryPro:0x2360b07fc88>, 'app01.tasks.xsum': <@task: app01.tasks.xsum of CeleryPro:0x2360b07fc88>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of CeleryPro:0x2360b07fc88>, 'app01.tasks.add': <@task: app01.tasks.add of CeleryPro:0x2360b07fc88>, 'celery.map': <@task: celery.map of CeleryPro:0x2360b07fc88>, 'app01.tasks.mul': <@task: app01.tasks.mul of CeleryPro:0x2360b07fc88>, 'celery.chain': <@task: celery.chain of CeleryPro:0x2360b07fc88>, 'CeleryPro.celery.debug_task': <@task: CeleryPro.celery.debug_task of CeleryPro:0x2360b07fc88>, 'celery.starmap': <@task: celery.starmap of CeleryPro:0x2360b07fc88>, 'celery.chord': <@task: celery.chord of CeleryPro:0x2360b07fc88>, 'celery.chunks': <@task: celery.chunks of CeleryPro:0x2360b07fc88>}
咱們的任務都在裏面了,可是還多了不少其餘的任務(都是celery開頭的)。以前啓動Worker的時候都是用 -l info 參數,若是用 -l debug 參數也是能看到這些任務的。也就是說celery在啓動Worker的時候作了個過濾,debug模式打印全部,info模式只打印用戶自定義的任務。接下來如今就是去源碼裏找一下,看看是怎麼作過濾的。
# celery/apps/worker.py # 首先是一些在啓動時會打印到控制檯的字符串內容 # 這個是LOGO,這個不是重點 ARTLINES = [ ' --------------', '---- **** -----', '--- * *** * --', '-- * - **** ---', '- ** ----------', '- ** ----------', '- ** ----------', '- ** ----------', '- *** --- * ---', '-- ******* ----', '--- ***** -----', ' --------------', ] # 這個字符串就是打印任務列表的字符串 # 輸出到控制檯以前,會用format作一下字符串格式化,這樣任務列表就能動態的輸出了 EXTRA_INFO_FMT = """ [tasks] {tasks} """ # 這個類裏有不少方法,這裏就看看動態獲取任務列表的恨啊 class Worker(WorkController): # 這個就是生成任務列表的方法 # 邏輯也很簡單就是判斷是否是以 'celery' 開頭 # include_builtins 爲True就輸出全部的task,爲False就過濾掉'celery'開頭的 # include_builtins 具體的值看下面的extra_info方法 def tasklist(self, include_builtins=True, sep='\n', int_='celery.'): return sep.join( ' . {0}'.format(task) for task in sorted(self.app.tasks) if (not task.startswith(int_) if not include_builtins else task) ) # 這個方法是調用上面的tasklist方法的 # 先判斷啓動級別,根據級別是否小於等於debug,決定include_builtins參數 # 最後用tasklist返回的結果,格式化EXTRA_INFO_FMT def extra_info(self): if self.loglevel <= logging.INFO: include_builtins = self.loglevel <= logging.DEBUG tasklist = self.tasklist(include_builtins=include_builtins) return EXTRA_INFO_FMT.format(tasks=tasklist)
from CeleryPro import celery_app def celery_list(request): task_list = [] for task in sorted(celery_app.tasks): if not task.startswith('celery.'): task_list.append(task) print(task_list) return HttpResponse('OK')
task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]
task_name = task_list[1] t = celery_app.tasks[task_name].delay(1, 2)
# 要經過uuid獲取對象,使用下面這個方法 from celery.result import AsyncResult task_obj = AsyncResult(uuid) # 經過uuid獲取到任務對象 # 先獲取到對象,以後的操做就和以前的同樣了 task_obj.ready() # 檢查任務是否執行完成 task_obj.get() # 阻塞的拿結果 task_obj.result # 任務執行完成後,結果就存在這裏,就不要再用get方法獲取了
# UsingCeleryWithDjango/templates/celery_list.html <body> <form method="post"> {% csrf_token %} <select name="task_name"> {% for task in task_list %} <option value="{{ task }}">{{ task }}</option> {% endfor %} </select> <input name="args" type="text" placeholder="參數" /> <input type="submit" value="提交任務" /> </form> <h3>Tips: 後臺會用json.loads把input提交的參數作一次反序列化,而後用*args傳參</h3> <p>debug_task方法,參數不填</p> <p>add和mul方法,參數填個2個元素的列表。好比:[1, 2]</p> <p>xsum方法,參數接收一個列表,因此要再包一層[]。好比:[[1, 2, 3, 4, 5]]</p> </body>
# UsingCeleryWithDjango/UsingCeleryWithDjango/urls.py from django.contrib import admin from django.urls import path from app01 import views urlpatterns = [ path('admin/', admin.site.urls), path('celery_list/', views.celery_list), path('celery_result/<uuid:uuid>/', views.celery_result), ]
# UsingCeleryWithDjango/app01/views.py from django.shortcuts import render, redirect, HttpResponse # Create your views here. from CeleryPro import celery_app from celery.result import AsyncResult import json def celery_list(request): if request.method == 'POST': task_name = request.POST.get('task_name') args = request.POST.get('args') if args: t = celery_app.tasks[task_name].delay(*json.loads(args)) return redirect('/celery_result/%s/' % t.id) else: celery_app.tasks[task_name]() # 參考源碼的方法,獲取到全部task名字的列表 task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')] return render(request, 'celery_list.html', {'task_list': task_list}) def celery_result(request, uuid): uuid = str(uuid) task_obj = AsyncResult(uuid) if task_obj.ready(): return HttpResponse(task_obj.result) else: ele = "<input type='button' value='再次獲取結果' onclick='location.reload();'>" return HttpResponse('Not Ready %s' % ele)
測試下來都很好,不過全部任務都是馬上會返回結果的。因此去修改一下tasks.py裏的任務。找個任務加點延遲 time.sleep()
pip install django_celery_beat
這個模塊是 django_celery_beat ,注意名字裏是下劃線,不過命令裏用中橫槓也認(大概是作了別名)。這個模塊不只僅只是作定時任務,它是經過把任務存到django的數據庫裏實現的,因此還能夠很方便的經過django admin來設置和管理。
自動發放全部app下的tasks是在 "UsingCeleryWithDjango/CeleryPro/celery.py" 這個文件裏配置的,具體是調用下面的這個方法:
# from django.conf import settings # noqa # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 從django的settings裏獲取app的路徑 app.autodiscover_tasks(['app01'],) # 本身指定
def autodiscover_tasks(self, packages=None, related_name='tasks', force=False): """Auto-discover task modules. Searches a list of packages for a "tasks.py" module (or use related_name argument). If the name is empty, this will be delegated to fix-ups (e.g., Django). For example if you have a directory layout like this: .. code-block:: text foo/__init__.py tasks.py models.py bar/__init__.py tasks.py models.py baz/__init__.py models.py Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will result in the modules ``foo.tasks`` and ``bar.tasks`` being imported. Arguments: packages (List[str]): List of packages to search. This argument may also be a callable, in which case the value returned is used (for lazy evaluation). related_name (str): The name of the module to find. Defaults to "tasks": meaning "look for 'module.tasks' for every module in ``packages``." force (bool): By default this call is lazy so that the actual auto-discovery won't happen until an application imports the default modules. Forcing will cause the auto-discovery to happen immediately. """ if force: return self._autodiscover_tasks(packages, related_name) signals.import_modules.connect(starpromise( self._autodiscover_tasks, packages, related_name, ), weak=False, sender=self)
app.autodiscover_tasks(['app01'], force=True) # 4版本有個force參數。默認是False,須要設爲True
啓動worker(-l info),打開網頁,提交任務。而後報錯。worker上的錯誤信息以下:
[2018-10-08 13:23:28,062: INFO/MainProcess] Received task: app01.tasks.add[ff0f5e76-6474-4f74-a93c-7b2486abe07e] [2018-10-08 13:23:28,078: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',) Traceback (most recent call last): File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)
pip uninstall celery pip install celery==3.1.25
pip install eventlet
裝完以後,加一個新的參數啓動worker,"-P eventlet" :
G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info -P eventlet -------------- celery@IDX-xujf v4.2.1 (windowlicker) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 2018-10-08 13:33:21 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: CeleryPro:0x16ad81d16a0 - ** ---------- .> transport: redis:// - ** ---------- .> results: redis:// - *** --- * --- .> concurrency: 4 (eventlet) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . CeleryPro.celery.debug_task . app01.tasks.add . app01.tasks.mul . app01.tasks.xsum [2018-10-08 13:33:21,430: INFO/MainProcess] Connected to redis:// [2018-10-08 13:33:21,457: INFO/MainProcess] mingle: searching for neighbors [2018-10-08 13:33:22,488: INFO/MainProcess] mingle: all alone [2018-10-08 13:33:22,502: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2018-10-08 13:33:22,504: INFO/MainProcess] celery@IDX-xujf ready. [2018-10-08 13:33:22,519: INFO/MainProcess] pidbox: Connected to redis:// [2018-10-08 13:34:13,596: INFO/MainProcess] Received task: app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d] [2018-10-08 13:34:13,611: INFO/MainProcess] Task app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d] succeeded in 0.0s: 7
INSTALLED_APPS = [ ...... 'django_celery_beat', ]
>python manage.py migrate Operations to perform: Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions Running migrations: Applying django_celery_beat.0001_initial... OK Applying django_celery_beat.0002_auto_20161118_0346... OK Applying django_celery_beat.0003_auto_20161209_0049... OK Applying django_celery_beat.0004_auto_20170221_0000... OK Applying django_celery_beat.0005_add_solarschedule_events_choices... OK Applying django_celery_beat.0006_auto_20180210_1226... OK
登陸django admin後,就能看下以下的幾張表了:
任務都是配置在Periodic tasks表裏的。另外幾張表就是各類任務執行週期的。
先進入 Intervals 表,新建任務週期。這裏建一個每5秒的週期。
而後進入 Periodic tasks 表,選擇要執行的任務,關聯上某個週期。
這裏的JSON會反序列化以後,以 "*args, **kwargs" 傳遞給任務函數的。
這裏依然須要啓動一個Beat來定時發任務的。先把Worker起動起來,而後啓動Beat須要多加一個參數 "-S django" :
G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro beat -l info -S django celery beat v4.2.1 (windowlicker) is starting. __ - ... __ - _ LocalTime -> 2018-10-08 14:43:43 Configuration -> . broker -> redis:// . loader -> celery.loaders.app.AppLoader . scheduler -> django_celery_beat.schedulers.DatabaseScheduler . logfile -> [stderr]@%INFO . maxinterval -> 5.00 seconds (5s) [2018-10-08 14:43:43,907: INFO/MainProcess] beat: Starting... [2018-10-08 14:43:43,908: INFO/MainProcess] Writing entries... [2018-10-08 14:43:48,911: INFO/MainProcess] Writing entries... [2018-10-08 14:43:48,939: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add) [2018-10-08 14:43:53,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add) [2018-10-08 14:43:58,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add) [2018-10-08 14:43:59,534: INFO/MainProcess] Writing entries... [2018-10-08 14:43:59,717: INFO/MainProcess] Writing entries... [2018-10-08 14:43:59,727: INFO/MainProcess] Writing entries... [2018-10-08 14:43:59,729: INFO/MainProcess] Writing entries... G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>
注意:每次修改任務,都須要重啓Beat,最新的配置才能生效。這個對 Intervals 的任務(每隔一段時間執行的),影響比較大。Crontab的任務問題貌似不是很大。