Python下定時任務框架APScheduler的使用

今天準備實現一個功能須要用到定時執行任務,因此就看到了Python的一個定時任務框架APScheduler,試了一下感受還不錯。html

1.APScheduler簡介: python

APScheduler是Python的一個定時任務框架,能夠很方便的知足用戶定時執行或者週期執行任務的需求,它提供了基於日期date、固定時間間隔interval 、以及相似於Linux上的定時任務crontab類型的定時任務。而且該框架不只能夠添加、刪除定時任務,還能夠將任務存儲到數據庫中,實現任務的持久化,因此使用起來很是方便。sql

2.APScheduler安裝:mongodb

APScheduler的安裝相對來講也很是簡單,能夠直接利用pip安裝,若是沒有pip能夠下載源碼,利用源碼安裝。數據庫

1).利用pip安裝:(推薦)bash

# pip install apscheduler

2).基於源碼安裝:https://pypi.python.org/pypi/APScheduler/ 框架

# python setup.py install

3.基本概念async

APScheduler有四種組件及相關說明:ide

 1) triggers(觸發器):觸發器包含調度邏輯,每個做業有它本身的觸發器,用於決定接下來哪個做業會運行,除了他們本身初始化配置外,觸發器徹底是無狀態的。ui

 2)job stores(做業存儲):用來存儲被調度的做業,默認的做業存儲器是簡單地把做業任務保存在內存中,其它做業存儲器能夠將任務做業保存到各類數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對做業任務進行持久化存儲的時候,做業的數據將被序列化,從新讀取做業時在反序列化。

3) executors(執行器):執行器用來執行定時任務,只是將須要執行的任務放在新的線程或者線程池中運行。看成業任務完成時,執行器將會通知調度器。對於執行器,默認狀況下選擇ThreadPoolExecutor就能夠了,可是若是涉及到一下特殊任務如比較消耗CPU的任務則能夠選擇ProcessPoolExecutor,固然根據根據實際需求能夠同時使用兩種執行器。

4) schedulers(調度器):調度器是將其它部分聯繫在一塊兒,通常在應用程序中只有一個調度器,應用開發者不會直接操做觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。經過調度器完成任務的存儲以及執行器的配置操做,如能夠添加。修改、移除任務做業。

 APScheduler提供了多種調度器,能夠根據具體需求來選擇合適的調度器,經常使用的調度器有:

     BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用。

     BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用。

     AsyncIOScheduler:適合於使用asyncio框架的狀況

     GeventScheduler: 適合於使用gevent框架的狀況

     TornadoScheduler: 適合於使用Tornado框架的應用

     TwistedScheduler: 適合使用Twisted框架的應用

     QtScheduler: 適合使用QT的狀況

4.配置調度器

APScheduler提供了許多不一樣的方式來配置調度器,你可使用一個配置字典或者做爲參數關鍵字的方式傳入。你也能夠先建立調度器,再配置和添加做業,這樣你能夠在不一樣的環境中獲得更大的靈活性。

1)下面一個簡單的示例:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
 
def test_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
 
scheduler = BlockingScheduler()
'''
 #該示例代碼生成了一個BlockingScheduler調度器,使用了默認的默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,而且最大線程數爲10。
'''
scheduler.add_job(test_job, 'interval', seconds=5, id='test_job')
'''
 #該示例中的定時任務採用固定時間間隔(interval)的方式,每隔5秒鐘執行一次。
 #而且還爲該任務設置了一個任務id
'''
scheduler.start()

2)若是想執行一些複雜任務,如上邊所說的同時使用兩種執行器,或者使用多種任務存儲方式,而且須要根據具體狀況對任務的一些默認參數進行調整。能夠參考下面的方式。(http://apscheduler.readthedocs.io/en/latest/userguide.html)

 第一種方式:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

第二種方式:

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

第三種方式:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

5.對任務做業的基本操做:

1).添加做業有兩種方式:第一種能夠直接調用add_job(),第二種使用scheduled_job()修飾器。

而add_job()是使用最多的,它能夠返回一個apscheduler.job.Job實例,於是能夠對它進行修改或者刪除,而使用修飾器添加的任務添加以後就不能進行修改。

例如使用add_job()添加做業:

#!/usr/bin/env python
#-*- coding:UTF-8
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job1(f):
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f

def job2(arg1, args2, f):
    print f, args1, args2

def job3(**args):
    print args

'''
APScheduler支持如下三種定時任務:
cron: crontab類型任務
interval: 固定時間間隔任務
date: 基於日期時間的一次性任務
'''
scheduler = BlockingScheduler()
#循環任務示例
scheduler.add_job(job1, 'interval', seconds=5, args=('循環',), id='test_job1')
#定時任務示例
scheduler.add_job(job1, 'cron', second='*/5', args=('定時',), id='test_job2')
#一次性任務示例
scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=('一次',), id='test_job3')
'''
傳遞參數的方式有元組(tuple)、列表(list)、字典(dict)
注意:不過須要注意採用元組傳遞參數時後邊須要多加一個逗號
'''
#基於list
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')
#基於tuple
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')
#基於dict
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job7')
print scheduler.get_jobs()
scheduler.start()

#帶有參數的示例
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b'], id='test_job4')
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b',), id='test_job5')
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'a':1,'b':2}, id='test_job6')
print scheduler.get_jobs()
scheduler.start()

或者使用scheduled_job()修飾器來添加做業:

@sched.scheduled_job('cron', second='*/5' ,id='my_job_id',)
def test_task():
    print("Hello world!")

2).得到任務列表:

能夠經過get_jobs方法來獲取當前的任務列表,也能夠經過get_job()來根據job_id來得到某個任務的信息。而且apscheduler還提供了一個print_jobs()方法來打印格式化的任務列表。

例如:

scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')
print scheduler.get_job('my_job_id')
print scheduler.get_jobs()

3).修改任務:

修改任務任務的屬性可使用apscheduler.job.Job.modify()或者modify_job()方法,能夠修改除了id的其它任何屬性。

例如:

job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job')
job.modify(max_instances=5, name='my_job')

4).刪除任務:

刪除調度器中的任務有能夠用remove_job()根據job ID來刪除指定任務或者使用remove(),若是使用remove()須要事先保存在添加任務時返回的實例對象,任務刪除後就不會在執行。

注意:經過scheduled_job()添加的任務只能使用remove_job()進行刪除。

例如:

job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')
job.remove()

或者

scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')
scheduler.remove_job('my_job')

5).暫停與恢復任務:

暫停與恢復任務能夠直接操做任務實例或者調度器來實現。當任務暫停時,它的運行時間會被重置,暫停期間不會計算時間。

暫停任務:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢復任務

apscheduler.job.Job.resume()
apscheduler.schedulers.BaseScheduler.resume_job()

6).啓動調度器

可使用start()方法啓動調度器,BlockingScheduler須要在初始化以後才能執行start(),對於其餘的Scheduler,調用start()方法都會直接返回,而後能夠繼續執行後面的初始化操做。

例如:

from apscheduler.schedulers.blocking import BlockingScheduler
 
def my_job():
    print "Hello world!"
 
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5)
scheduler.start()

7).關閉調度器:

使用下邊方法關閉調度器:

scheduler.shutdown()

默認狀況下調度器會關閉它的任務存儲和執行器,並等待全部正在執行的任務完成,若是不想等待,能夠進行以下操做:

scheduler.shutdown(wait=False)

注意:

當出現No handlers could be found for logger 「apscheduler.scheduler」次錯誤信息時,說明沒有

 logging模塊的logger存在,因此須要添加上,對應新增內容以下所示(僅供參考):

import logging
 logging.basicConfig(level=logging.DEBUG,
            format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
           datafmt='%a, %d %b %Y %H:%M:%S',
            filename='/var/log/aaa.txt',
            filemode='a')
相關文章
相關標籤/搜索