Flask-APScheduler使用教程

項目中須要用到定時器和循環執行。去網上搜了一下,比較常見的有一下集中。運用Python線程執行輪詢操做,也有運用Linux系統的Cron,Celery的文章最多,可是太麻煩。看看就知道,Celery 須要一個發送和接受消息的傳輸者。RabbitMQ 和 Redis 中間人的消息傳輸支持全部特性,但也提供大量其餘實驗性方案的支持,包括用 SQLite 進行本地開發。須要用到隊列,對於這點需求簡直就是大材小用。最後找到了比較合適的Flask-APScheduler。python

介紹

看看 github的flask-apscheduler介紹。git

  • Loads scheduler configuration from Flask configuration.(支持從Flask中加載調度)
  • Loads job definitions from Flask configuration.(支持從Flask中加載任務配置)
  • Allows to specify the hostname which the scheduler will run on.(容許指定服務器運行任務)
  • Provides a REST API to manage the scheduled jobs.(提供Rest接口管理任務)
  • Provides authentication for the REST API.(提供Rest接口認證)

安裝及配置

pip install Flask-APScheduler

在Flask配置文件中添加github

SCHEDULER_API_ENABLED = True
JOBS = [
        {
            'id': 'job_1h_data',
            'func': job_1h_data,
            'args': '',
            'trigger': {
                'type': 'cron',
                'day_of_week': "0-6",
                'hour': '*',
                'minute': '1',
                'second': '0'
            }


        },

        {
            'id': 'job_announce',
            'func': exchange_an,
            'args': '',
            'trigger': 'interval',
            'seconds': 300
        }
]

上面指定了每一小時獲取全部貨幣24h最高位以及交易所公告。shell

獲取公告

def exchange_an():
    """

    :param start_date: 開始時間 YYYY-MM-DD HH:MM:SS
    :param end_date: 結束時間 YYYY-MM-DD HH:MM:SS
    :return: 推送消息,保持數據庫
    """
    current_local = time.time()
    start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local - 300))
    end_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local))
    announce = pro.query('exchange_ann', start_date=start_date, end_date=end_date)
    print('請求交易所公告...')
    for x in announce.values:
        s = {
            'title': x[0],
            'content': x[1],
            'type': x[2],
            'url': x[3],
            'datetime': x[4]
        }
        value = json.dumps(s)
        print(value)
        mqttClient.publish('system/ex_announce', value)

動態添加任務

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime


def aps_test(x):
    print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task')

scheduler.start()
"""
暫停任務
"""
scheduler.pause_job('interval_task')
"""
恢復任務
"""
scheduler.resume_job('interval_task')
"""
刪除任務
"""
scheduler.remove_job('interval_task')

apscheduler支持添加三種方式的任務,分別是定時任務,一次性任務及循環任務。同時也包含了對任務的控制。數據庫

總結

由於是單機版本,因此指定服務器運行任務,Rest接口管理任務,Rest接口認證就不寫了。後續有需求在繼續。json


歡迎 長按下圖 -> 識別圖中二維碼或者微信 掃一掃關注個人公衆號
相關文章
相關標籤/搜索