APScheduler的使用

定時框架APScheduler

APSScheduler是python的一個定時任務框架,它提供了基於日期date、固定時間間隔interval、以及linux上的crontab類型的定時任務。該礦機不只能夠添加、刪除定時任務,還能夠將任務存儲到數據庫中、實現任務的持久化。linux

APScheduler有四種組件

  • triggers(觸發器):觸發器包含調度邏輯,每個做業有它本身的觸發器,用於決定接下來哪個做業會運行,除了他們本身初始化配置外,觸發器徹底是無狀態的。redis

  • job stores(做業存儲):用來存儲被調度的做業,默認的做業存儲器是簡單地把做業任務保存在內存中,其它做業存儲器能夠將任務做業保存到各類數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對做業任務進行持久化存儲的時候,做業的數據將被序列化,從新讀取做業時在反序列化。sql

  • executors(執行器):執行器用來執行定時任務,只是將須要執行的任務放在新的線程或者線程池中運行。看成業任務完成時,執行器將會通知調度器。對於執行器,默認狀況下選擇ThreadPoolExecutor就能夠了,可是若是涉及到一下特殊任務如比較消耗CPU的任務則能夠選擇ProcessPoolExecutor,固然根據根據實際需求能夠同時使用兩種執行器。mongodb

  • schedulers(調度器):調度器是將其它部分聯繫在一塊兒,通常在應用程序中只有一個調度器,應用開發者不會直接操做觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。經過調度器完成任務的存儲以及執行器的配置操做,如能夠添加。修改、移除任務做業數據庫

APScheduler提供了七種調度器

  • BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用。
  • BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用。
  • AsyncIOScheduler:適合於使用asyncio異步框架的狀況
  • GeventScheduler: 適合於使用gevent框架的狀況
  • TornadoScheduler: 適合於使用Tornado框架的應用
  • TwistedScheduler: 適合使用Twisted框架的應用
  • QtScheduler: 適合使用QT的狀況

APScheduler提供了四種存儲方式

  • MemoryJobStore
  • sqlalchemy
  • mongodb
  • redis

APScheduler提供了三種任務觸發器

  • data:固定日期觸發器:任務只運行一次,運行完畢自動清除;若錯過指定運行時間,任務不會被建立
  • interval:時間間隔觸發器
  • cron:cron風格的任務觸發

示例

示例1 BlockingScheduler

  • BlockingScheduler:在進程中運行單個任務,調度器是惟一運行的東西
  • 該示例代碼生成了一個BlockingScheduler調度器,使用了默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,而且最大線程數爲10。

定義jobbash

def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))

定義run方法框架

def run1():
    scheduler = BlockingScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()

完整代碼dom

import time
import random
from apscheduler.schedulers.blocking import BlockingScheduler


def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))


def run1():
    scheduler = BlockingScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()


if __name__ == '__main__':
    run1()

運行結果異步

2019-05-16 11:09:05 #####job1 2
2019-05-16 11:09:10 #####job1 5
2019-05-16 11:09:15 #####job1 1
2019-05-16 11:09:20 #####job1 0
2019-05-16 11:09:25 #####job1 8
2019-05-16 11:09:30 #####job1 0
2019-05-16 11:09:35 #####job1 9
2019-05-16 11:09:40 #####job1 1
2019-05-16 11:09:45 #####job1 4
......

測試1

咱們在示例1的基礎上再加一個job2,看看什麼狀況

job2示例代碼

def jod2():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job2 ' + str(random.randint(0, 10)))

run2示例代碼

def run2():
    scheduler = BlockingScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.add_job(jod2, 'interval', seconds=5)
    scheduler.start()

實行結果:

2019-05-16 11:10:46 #####job2 9
2019-05-16 11:10:46 #####job1 2
2019-05-16 11:10:51 #####job2 3
2019-05-16 11:10:51 #####job1 2
2019-05-16 11:10:56 #####job2 9
2019-05-16 11:10:56 #####job1 1
2019-05-16 11:11:01 #####job2 9
2019-05-16 11:11:01 #####job1 1
2019-05-16 11:11:06 #####job2 2
2019-05-16 11:11:06 #####job1 5
2019-05-16 11:11:11 #####job2 6
2019-05-16 11:11:11 #####job1 7
2019-05-16 11:11:16 #####job2 1
2019-05-16 11:11:16 #####job1 6
......

從執行結果來看,在同一個調度器中添加2個job,這兩個job會在同一時刻同時執行

測試2

咱們將job的運行時間強制改成6秒,看看調度器會怎麼處理

修改後的job1

def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
    time.sleep(6)

run1 不變,運行結果

2019-05-16 11:18:11 #####job1 1
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:16 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:21 #####job1 9
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:26 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:31 #####job1 5
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:36 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:41 #####job1 0
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:46 CST)" skipped: maximum number of running instances reached (1)

從執行結果來看,因爲job的運行時間超過了執行器須要執行時的時間,因此本次任務跳過,就變成了10秒執行一次的任務了。

示例2 BackgroundScheduler

完整示例:

import time
import random
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler


def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
    time.sleep(6)
    
def run3():
    scheduler = BackgroundScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()

    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        

if __name__ == '__main__':
    # run1()
    # run2()
    run3()

測試1

開啓一個任務,而後另外開啓一個線程

def run3():
    scheduler = BackgroundScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()

    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

運行結果

main-start: 2019-05-16 18:42:13
main-end: 2019-05-16 18:42:15
main-start: 2019-05-16 18:42:15
main-end: 2019-05-16 18:42:17
main-start: 2019-05-16 18:42:17
2019-05-16 18:42:18 #####job1 6
main-end: 2019-05-16 18:42:19
main-start: 2019-05-16 18:42:19
main-end: 2019-05-16 18:42:21
main-start: 2019-05-16 18:42:21
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 18:42:23 CST)" skipped: maximum number of running instances reached (1)
main-end: 2019-05-16 18:42:23
main-start: 2019-05-16 18:42:23
main-end: 2019-05-16 18:42:25
main-start: 2019-05-16 18:42:25
main-end: 2019-05-16 18:42:27
main-start: 2019-05-16 18:42:27
2019-05-16 18:42:28 #####job1 5
main-end: 2019-05-16 18:42:29
main-start: 2019-05-16 18:42:29
main-end: 2019-05-16 18:42:31
main-start: 2019-05-16 18:42:31
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 18:42:33 CST)" skipped: maximum number of running instances reached (1)
main-end: 2019-05-16 18:42:33
main-start: 2019-05-16 18:42:33
main-end: 2019-05-16 18:42:35
main-start: 2019-05-16 18:42:35
main-end: 2019-05-16 18:42:37
main-start: 2019-05-16 18:42:37
2019-05-16 18:42:38 #####job1 1
......

示例3 採用cron的方式

採用cron的方式來調度任務

def run4():
    scheduler = BackgroundScheduler()
    scheduler.add_job(jod1, 'cron', day_of_week='fri', second='*/5')
    scheduler.start()

    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
相關文章
相關標籤/搜索