Celery實現分佈式定時任務並開啓監控(Celery-Beat、Celery-Once、flower)

Celery實現分佈式定時任務並開啓監控(Celery-Beat、Celery-Once、flower)

原理:celery-beat做爲任務調度,當達到定時時間時,beat將任務id裝載進rabbitmq隊列中,worker在隊列的另外一端取出任務id,並匹配當前註冊的任務。若是沒有註冊,那麼會報錯。除此以外,worker還會經過celery-once來嘗試從redis中獲取分佈式鎖,只有獲取到鎖的worker纔會執行這個任務。worker執行成功或者失敗經過flower監控

yGw7PU.png

 

1.環境準備

pip install celery
pip install flower
pip install celery_once

 

2.代碼結構

代碼結構以下:html

yGG6de.png

任務函數sendDingTest.py(釘釘機器人):python

from dingtalkchatbot.chatbot import DingtalkChatbot
from start import celery_app
from datetime import datetime
import socket
from celery_once import QueueOnce

WEB_HOOK_SPIDER = '在釘釘機器人頁面獲取web_hook'


# 基於任務名及傳遞的參數值來確認是不是同一個任務
@celery_app.task(base=QueueOnce, once={'graceful': True})
def send_ding_test(arg1, arg2):
    dingding = DingtalkChatbot(WEB_HOOK_SPIDER)
    arg3 = arg1 + arg2
    dingding.send_text(msg="城市數據定時任務測試-{},{} 執行時刻:{}".format(arg3, socket.gethostname(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

配置文件config.py:git

from celery.schedules import crontab


class celeryConfig(object):
    # accept_content = ['json']  # 能夠是set,list,tuple,pickle,yaml
    # result_accept_content = ['json']
    timezone = 'Asia/Shanghai'  # 中國只有兩個時區,一個上海,一個烏魯木齊
    broker_url = "amqp://user:password@ip:port/vhost"
    backend = ""
    include = ['jobs.sendDingTest']  # worker啓動時要導入的任務模塊,須要在這裏添加,以便worker可以找到咱們的任務
    beat_schedule = {
        'add-every-monday-morning':
            {
                'task': 'jobs.sendDingTest.send_ding_test', # 這裏要寫全路徑,不然worker找不到
                'schedule': crontab(minute="*/2"),
            'args': (16, 16),
            },
    }
    # celery-once配置
    ONCE = {
        'backend': 'celery_once.backends.Redis',
        'settings': {
            'url': 'redis://ip:port/database',
            'default_timeout': 60 * 60 # 分佈式鎖的默認超時時間
        }
    }

啓動函數start.py:github

from celery import Celery
import os

# windows平臺須要設置,
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')


celery_app = Celery()
celery_app.config_from_object('config.celeryConfig')

 

3.啓動定時任務與監控

# 注意:beat,worker,flower均可以不在同一臺服務器上,分佈開的話,須要在其餘服務器上copy一份代碼
# 開啓beat(start是文件名)
celery -A start.celery_app beat
# 開啓worker
celery -A start.celery_app worker -c 1 -l info
# 開啓flower(默認地址是localhost:5555)
celery -A start flower

如下是flower頁面,能夠查看worker數量、消息隊列(須要rabbitmq開啓rabbitmq_management)和任務執行結果等。web

yGw3E6.png

4.文檔參考

celery-once文檔redis

flower文檔shell

celery官方文檔json

相關文章
相關標籤/搜索