python 實現定時任務

APScheduler是基於Quartz的一個Python定時任務框架。提供了基於日期、固定時間間隔以及crontab類型的任務,而且能夠持久化任務。
數據庫


1、安裝APScheduler框架

pip install apscheduler

2、基本概念ide

APScheduler有四大組件:
一、觸發器 triggers :
觸發器包含調度邏輯。每一個做業都有本身的觸發器,用於肯定下一個任務什麼時候運行。除了初始配置以外,觸發器是徹底無狀態的。
有三種內建的trigger:
(1)date: 特定的時間點觸發
(2)interval: 固定時間間隔觸發
(3)cron: 在特定時間週期性地觸發
二、任務儲存器 job stores:用於存聽任務,把任務存放在內存(爲默認MemoryJobStore)或數據庫中。
三、執行器 executors: 執行器是將任務提交到線程池或進程池中運行,當任務完成時,執行器通知調度器觸發相應的事件。
四、調度器 schedulers: 把上方三個組件做爲參數,經過建立調度器實例來運行
根據開發需求選擇相應的組件,下面是不一樣的調度器組件:
BlockingScheduler 阻塞式調度器:適用於只跑調度器的程序。
BackgroundScheduler 後臺調度器:適用於非阻塞的狀況,調度器會在後臺獨立運行。
AsyncIOScheduler AsyncIO調度器,適用於應用使用AsnycIO的狀況。
GeventScheduler Gevent調度器,適用於應用經過Gevent的狀況。
TornadoScheduler Tornado調度器,適用於構建Tornado應用。
TwistedScheduler Twisted調度器,適用於構建Twisted應用。
QtScheduler Qt調度器,適用於構建Qt應用。線程

3、使用步驟
一、新建一個調度器schedulers
二、添加調度任務
三、運行調度任務orm

4、使用實例blog

一、觸發器date進程

特定的時間點觸發,只執行一次。參數以下:事件

參數 說明
run_date (datetime 或 str) 做業的運行日期或時間
timezone (datetime.tzinfo 或 str) 指定時區

使用例子:crontab

複製代碼

from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    print(text)

scheduler = BlockingScheduler()
# 在 2019-8-30 運行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 運行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 運行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])

scheduler.start()

複製代碼

二、觸發器intervalip

固定時間間隔觸發。參數以下:

參數 說明
weeks (int) 間隔幾周
days (int) 間隔幾天
hours (int) 間隔幾小時
minutes (int) 間隔幾分鐘
seconds (int) 間隔多少秒
start_date (datetime 或 str) 開始日期
end_date (datetime 或 str) 結束日期
timezone (datetime.tzinfo 或str)  

使用例子:

複製代碼

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()
# 每隔 1分鐘 運行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間,每隔1分30秒 運行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])

scheduler.start()

'''
運行結果:
job2 --- 2019-08-29 22:15:00
job1 --- 2019-08-29 22:15:46
job2 --- 2019-08-29 22:16:30
job1 --- 2019-08-29 22:16:46
job1 --- 2019-08-29 22:17:46
...餘下省略...
'''

複製代碼

 三、觸發器cron

在特定時間週期性地觸發。參數以下:

參數 說明
year (int 或 str) 年,4位數字
month (int 或 str) 月 (範圍1-12)
day (int 或 str) 日 (範圍1-31)
week (int 或 str) 周 (範圍1-53)
day_of_week (int 或 str) 周內第幾天或者星期幾 (範圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 時 (範圍0-23)
minute (int 或 str) 分 (範圍0-59)
second (int 或 str) 秒 (範圍0-59)
start_date (datetime 或 str) 最先開始日期(包含)
end_date (datetime 或 str) 最晚結束時間(包含)
timezone (datetime.tzinfo 或str) 指定時區

這些參數支持算數表達式,取值格式有以下:

  使用例子:

複製代碼

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()
# 在天天22點,每隔 1分鐘 運行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在天天22和23點的25分,運行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])

scheduler.start()

'''
運行結果:
job1 --- 2019-08-29 22:25:00
job2 --- 2019-08-29 22:25:00
job1 --- 2019-08-29 22:26:00
job1 --- 2019-08-29 22:27:00
...餘下省略...
'''

複製代碼

 四、經過裝飾器scheduled_job()添加方法

 添加任務的方法有兩種:

(1)經過調用add_job()---見上面1至3代碼
(2)經過裝飾器scheduled_job():
第一種方法是最經常使用的方法。第二種方法主要是方便地聲明在應用程序運行時不會更改的任務。該 add_job()方法返回一個apscheduler.job.Job實例,可使用該實例稍後修改或刪除該任務。

複製代碼

import time
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('interval', seconds=5)
def job1():    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('job1 --- {}'.format(t))

@scheduler.scheduled_job('cron', second='*/7')
def job2():    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('job2 --- {}'.format(t))

scheduler.start()

'''
運行結果:
job2 --- 2019-08-29 22:36:35
job1 --- 2019-08-29 22:36:37
job2 --- 2019-08-29 22:36:42
job1 --- 2019-08-29 22:36:42
job1 --- 2019-08-29 22:36:47
job2 --- 2019-08-29 22:36:49
...餘下省略...
'''

複製代碼

相關文章
相關標籤/搜索