from celery import task from celery.five import monotonic from celery.utils.log import get_task_logger from contextlib import contextmanager from django.core.cache import cache from hashlib import md5 from djangofeeds.models import Feed logger = get_task_logger(__name__) LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes @contextmanager def memcache_lock(lock_id, oid): timeout_at = monotonic() + LOCK_EXPIRE - 3 # cache.add fails if the key already exists status = cache.add(lock_id, oid, LOCK_EXPIRE) try: yield status finally: # memcache delete is very slow, but we have to use it to take # advantage of using add() for atomic locking if monotonic() < timeout_at and status: # don't release the lock if we exceeded the timeout # to lessen the chance of releasing an expired lock # owned by someone else # also don't release the lock if we didn't acquire it cache.delete(lock_id) @task(bind=True) def import_feed(self, feed_url): # The cache key consists of the task name and the MD5 digest # of the feed URL. feed_url_hexdigest = md5(feed_url).hexdigest() lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest) logger.debug('Importing feed: %s', feed_url) with memcache_lock(lock_id, self.app.oid) as acquired: if acquired: return Feed.objects.import_feed(feed_url).url logger.debug( 'Feed %s is already being imported by another worker', feed_url)
Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。因爲在工做的平臺中用到Celery系統(用於發送郵件、發送短信、發送上線等任務),記錄一下學習的知識。
python
使用rabbitmq作celery的broker和redis作celery的broker的特性redis
使用RabbitMQ做爲Celery Broker的優勢:
Highly customizable routing(高度定製路由)
Persistent queues(一致性隊列)數據庫
使用redis做爲celery brocker的優勢:
high speed due to in memory datastore(速度極快的內存數據庫)
can double up as both key-value datastore and job queue(能夠保證key-value 數據存儲及job序列)django
pip3 install celery(4.0版本celery beat不支持熱加載)json
pip3 install flowerapi
pip3 install django-celery服務器
某個方法的消息請求celery執行,首先celery根據綁定的規則把任務消息放到制定的路由隊列中去,此隊列對應的worker節點取出執行。
說明:
爲何要定義多個worker?每一個worker都會新建一個進程,充分利用服務器資源,提升執行效率。
同一個服務器能夠啓動多個worker節點?能夠,啓動參數裏面寫上不一樣的–hostname便可。
celery默認會建立一個celery任務隊列,沒有任何綁定的任務將會發送到此消息隊列中。網絡
celery加redis的多節點配置實例,因爲資源限制只找了兩臺機器作測試多線程
10.10.42.33 10.10.190.234
咱們把redis服務放在10.10.190.234那臺服務器上
咱們把flower服務也啓動在10.10.42.33那臺服務器上
代碼中定義的隊列有queue_add、queue_sum (還有個默認隊列celery)
3三、234服務器用於啓動worker節點
33服務器上啓動處理celery和queue_add隊列的worker節點
234服務器上啓動處理celery和queue_sum隊列的worker節點架構
cat celeryconfig.py #!/usr/bin/python #coding:utf-8 from kombu import Queue CELERY_TIMEZONE = 'Asia/Shanghai' #################################### # 通常配置 # #################################### CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True # List of modules to import when celery starts. CELERY_IMPORTS = ('tasks', ) CELERYD_MAX_TASKS_PER_CHILD = 40 # 每一個worker執行了多少任務就會死掉 BROKER_POOL_LIMIT = 10 #默認celery與broker鏈接池鏈接數 CELERY_DEFAULT_QUEUE='default' CELERY_DEFAULT_ROUTING_KEY='task.default' CELERY_RESULT_BACKEND='redis://:fafafa@10.10.190.234:6379/0' BROKER_URL='redis://:fafafa@10.19.190.234:6379/0' #默認隊列 CELERY_DEFAULT_QUEUE = 'celery' #定義默認隊列 CELERY_DEFAULT_ROUTING_KEY = 'celery' #定義默認路由 CELERYD_LOG_FILE="./logs/celery.log" CELERY_QUEUES = ( Queue("queue_add", routing_key='queue_add'), Queue('queue_reduce', routing_key='queue_sum'), Queue('celery', routing_key='celery'), ) CELERY_ROUTES = { 'task.add':{'queue':'queue_add', 'routing_key':'queue_add'}, 'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'}, }
cat task.py
import os import sys import datetime BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from celery import Celery from celery import chain, group, chord, Task from celeryservice import celeryconfig app = Celery() app.config_from_object(celeryconfig) __all__ = ['add', 'reduce','sum_all', 'other'] #################################### # task定義 # #################################### @app.task def add(x, y): return x + y @app.task def reduce(x, y): return x - y @app.task def sum(values): return sum([int(value) for value in values]) @app.task def other(x, y): return x * y
cat flower.py
#!/usr/bin/env python #coding:utf-8 broker_api = 'redis://:afafafafa@10.10.190.234:6379/0' logging = 'DEBUG' address = '0.0.0.0' port = 5555 basic_auth = ['zero:zero'] #外部訪問密碼 persistent=True #持久化celery tasks(若是爲false的話,重啓flower以後,監控的task就消失了) db="./flower/flower_db"
啓動服務:
在33上啓動服務
celery worker -A task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33 >/dev/null 2>&1 &
在234上啓動服務
celery worker -A task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33 >/dev/null 2>&1 &
服務驗證:
在任一臺有celeryservice項目代碼的服務器上,運行add、reduce、sum、other任務(測試可簡單使用add.delay(1,2)等)
add只會在33上運行,
sum任務,可能會在33或234服務器的worker節點運行
reduce任務,只會在234上運行。
other任務可能會在33或者234上運行。
以下內容來自於網站,還沒實踐,存檔用。
大多數任務並無使用錯誤處理,若是任務失敗,那就失敗了。在一些狀況下這很不錯,可是做者見到的多數失敗任務都是去調用第三方API而後出現了網絡錯誤,
或者資源不可用這些錯誤,而對於這些錯誤,最簡單的方式就是重試一下,也許就是第三方API臨時服務或者網絡出現問題,沒準立刻就行了,那麼爲何不試着加個重試測試一下呢?
@app.task(bind=True, default_retry_delay=300, max_retries=5) def my_task_A(): try: print("doing stuff here...") except SomeNetworkException as e: print("maybe do some clenup here....") self.retry(e)
經過flower 查看 跑多線程報錯, 須要減小線程數.
# -*- coding:utf-8 -*- from datetime import timedelta from settings import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB_NUM # 某個程序中出現的隊列,在broker中不存在,則馬上建立它 CELERY_CREATE_MISSING_QUEUES = True CELERY_IMPORTS = ("async_task.tasks", "async_task.notify") # 使用redis 做爲任務隊列 BROKER_URL = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/' + str(REDIS_DB_NUM) #CELERY_RESULT_BACKEND = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/10' CELERYD_CONCURRENCY = 20 # 併發worker數 CELERY_TIMEZONE = 'Asia/Shanghai' CELERYD_FORCE_EXECV = True # 很是重要,有些狀況下能夠防止死鎖 CELERYD_PREFETCH_MULTIPLIER = 1 CELERYD_MAX_TASKS_PER_CHILD = 100 # 每一個worker最多執行萬100個任務就會被銷燬,可防止內存泄露 # CELERYD_TASK_TIME_LIMIT = 60 # 單個任務的運行時間不超過此值,不然會被SIGKILL 信號殺死 # BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 90} # 任務發出後,通過一段時間還未收到acknowledge , 就將任務從新交給其餘worker執行 CELERY_DISABLE_RATE_LIMITS = True # 定時任務 CELERYBEAT_SCHEDULE = { 'msg_notify': { 'task': 'async_task.notify.msg_notify', 'schedule': timedelta(seconds=10), #'args': (redis_db), 'options' : {'queue':'my_period_task'} }, 'report_result': { 'task': 'async_task.tasks.report_result', 'schedule': timedelta(seconds=10), #'args': (redis_db), 'options' : {'queue':'my_period_task'} }, #'report_retry': { # 'task': 'async_task.tasks.report_retry', # 'schedule': timedelta(seconds=60), # 'options' : {'queue':'my_period_task'} #}, } ################################################ # 啓動worker的命令 # *** 定時器 *** # nohup celery beat -s /var/log/boas/celerybeat-schedule --logfile=/var/log/boas/celerybeat.log -l info & # *** worker *** # nohup celery worker -f /var/log/boas/boas_celery.log -l INFO &