Python任務調度模塊 – APScheduler

APScheduler是一個Python定時任務框架,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab類型的任務,而且能夠持久化任務、並以daemon方式運行應用。目前最新版本爲3.0.x。
在APScheduler中有四個組件:
觸發器(trigger)包含調度邏輯,每個做業有它本身的觸發器,用於決定接下來哪個做業會運行。除了他們本身初始配置意外,觸發器徹底是無狀態的。
做業存儲(job store)存儲被調度的做業,默認的做業存儲是簡單地把做業保存在內存中,其餘的做業存儲是將做業保存在數據庫中。一個做業的數據講在保存在持久化做業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個做業存儲。
執行器(executor)處理做業的運行,他們一般經過在做業中提交制定的可調用對象到一個線程或者進城池來進行。看成業完成時,執行器將會通知調度器。
調度器(scheduler)是其餘的組成部分。你一般在應用只有一個調度器,應用的開發者一般不會直接處理做業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置做業存儲和執行器能夠在調度器中完成,例如添加、修改和移除做業。
你須要選擇合適的調度器,這取決於你的應用環境和你使用APScheduler的目的。一般最經常使用的兩個:
– BlockingScheduler: 當調度器是你應用中惟一要運行的東西時使用。
– BackgroundScheduler: 當你不運行任何其餘框架時使用,並但願調度器在你應用的後臺執行。
安裝APScheduler很是簡單:
pip install apscheduler
選擇合適的做業存儲,你須要決定是否須要做業持久化。若是你老是在應用開始時重建job,你能夠直接使用默認的做業存儲(MemoryJobStore).可是若是你須要將你的做業持久化,以免應用崩潰和調度器重啓時,你能夠根據你的應用環境來選擇具體的做業存儲。例如:使用Mongo或者SQLAlchemyJobStore (用於支持大多數RDBMS)
然而,調度器的選擇一般是爲你若是你使用上面的框架之一。然而,默認的ThreadPoolExecutor 一般用於大多數用途。若是你的工做負載中有較大的CPU密集型操做,你能夠用ProcessPoolExecutor來使用更多的CPU核。你也能夠在同一時間使用二者,將進程池調度器做爲第二執行器。html

配置調度器

APScheduler提供了許多不一樣的方式來配置調度器,你可使用一個配置字典或者做爲參數關鍵字的方式傳入。你也能夠先建立調度器,再配置和添加做業,這樣你能夠在不一樣的環境中獲得更大的靈活性。
下面是一個簡單使用BlockingScheduler,並使用默認內存存儲和默認執行器。(默認選項分別是MemoryJobStore和ThreadPoolExecutor,其中線程池的最大線程數爲10)。配置完成後使用start()方法來啓動。git

from apscheduler.schedulers.blocking import BlockingSchedulermongodb

def my_job():數據庫

    print 'hello world'express

 

sched = BlockingScheduler()框架

sched.add_job(my_job, 'interval', seconds=5)ide

sched.start()函數

 

在運行程序5秒後,將會輸出第一個Hello world。
下面進行一個更復雜的配置,使用兩個做業存儲和兩個調度器。在這個配置中,做業將使用mongo做業存儲,信息寫入到MongoDB中。ui

 

from pymongo import MongoClientspa

from apscheduler.schedulers.blocking import BlockingScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

from apscheduler.jobstores.memory import MemoryJobStore

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

 

 

def my_job():

    print 'hello world'

host = '127.0.0.1'

port = 27017

client = MongoClient(host, port)

 

jobstores = {

    'mongo': MongoDBJobStore(collection='job', database='test', client=client),

    'default': MemoryJobStore()

}

executors = {

    'default': ThreadPoolExecutor(10),

    'processpool': ProcessPoolExecutor(3)

}

job_defaults = {

    'coalesce': False,

    'max_instances': 3

}

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)

scheduler.add_job(my_job, 'interval', seconds=5)

 

try:

    scheduler.start()

except SystemExit:

    client.close()

 

查詢MongoDB能夠看到做業的運行狀況以下:

 

{

  "_id" : "55ca54ee4bb744f8a5ab08cc4319bc24",

  "next_run_time" : 1434017278.797,

  "job_state" : new BinData(0,"gAJ9cQEoVQRhcmdzcQIpVQhleGVjdXRvcnEDVQdkZWZhdWx0cQRVDW1heF9pbnN0YW5jZXNxBUsDVQRmdW5jcQZVD19fbWFpbl9fOm15X2pvYnEHVQJpZHEIVSA1NWNhNTRlZTRiYjc0NGY4YTVhYjA4Y2M0MzE5YmMyNHEJVQ1uZXh0X3J1bl90aW1lcQpjZGF0ZXRpbWUKZGF0ZXRpbWUKcQtVCgffBgsSBzoMKUhjcHl0egpfcApxDChVDUFzaWEvU2hhbmdoYWlxDU2AcEsAVQNDU1RxDnRScQ+GUnEQVQRuYW1lcRFVBm15X2pvYnESVRJtaXNmaXJlX2dyYWNlX3RpbWVxE0sBVQd0cmlnZ2VycRRjYXBzY2hlZHVsZXIudHJpZ2dlcnMuaW50ZXJ2YWwKSW50ZXJ2YWxUcmlnZ2VyCnEVKYFxFn1xF1UPaW50ZXJ2YWxfbGVuZ3RocRhHQBQAAAAAAABzfXEZKFUIdGltZXpvbmVxGmgMKGgNTehxSwBVA0xNVHEbdFJxHFUIaW50ZXJ2YWxxHWNkYXRldGltZQp0aW1lZGVsdGEKcR5LAEsFSwCHUnEfVQpzdGFydF9kYXRlcSBoC1UKB98GCxIHIQwpSGgPhlJxIVUIZW5kX2RhdGVxIk51hmJVCGNvYWxlc2NlcSOJVQd2ZXJzaW9ucSRLAVUGa3dhcmdzcSV9cSZ1Lg==")

}

操做做業

1. 添加做業

上面是經過add_job()來添加做業,另外還有一種方式是經過scheduled_job()修飾器來修飾函數。

@sched.scheduled_job('cron', id='my_job_id', day='last sun')

 

def some_decorated_task():

    print("I am printed at 00:00:00 on the last Sunday of every month!")

2. 移除做業

job = scheduler.add_job(myfunc, 'interval', minutes=2)

job.remove()

Same, using an explicit job ID:

 

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

scheduler.remove_job('my_job_id')

3. 暫停和恢復做業

暫停做業:
– apscheduler.job.Job.pause()
– apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復做業:
– apscheduler.job.Job.resume()
– apscheduler.schedulers.base.BaseScheduler.resume_job()

4. 得到job列表

得到調度做業的列表,可使用get_jobs()來完成,它會返回全部的job實例。或者使用print_jobs()來輸出全部格式化的做業列表。

5. 修改做業

def some_decorated_task():

    print("I am printed at 00:00:00 on the last Sunday of every month!")</pre>

6. 關閉調度器

默認狀況下調度器會等待全部正在運行的做業完成後,關閉全部的調度器和做業存儲。若是你不想等待,能夠將wait選項設置爲False。

scheduler.shutdown()

scheduler.shutdown(wait=False)

做業運行的控制

add_job的第二個參數是trigger,它管理着做業的調度方式。它能夠爲date, interval或者cron。對於不一樣的trigger,對應的參數也相同。

(1). cron定時調度

year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
和Linux的Crontab同樣,它的值格式爲:

Expression Field Description

* any Fire on every value

*/a any Fire every a values, starting from the minimum

a-b any Fire on any value within the a-b range (a must be smaller than b)

a-b/c any Fire every c values within the a-b range

xth y day Fire on the x -th occurrence of weekday y within the month

last x day Fire on the last occurrence of weekday x within the month

last day Fire on the last day within the month

x,y,z any Fire on any matching expression; can combine any number of any of the above expressions   

例子:

# Schedules job_function to be run on the third Friday

# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00

sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00

sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

 

(2). interval 間隔調度

它的參數以下:
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
例子:

 

# Schedule job_function to be called every two hours

sched.add_job(job_function, 'interval', hours=2)

(3). date 定時調度

最基本的一種調度,做業只會執行一次。它的參數以下:
run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
例子:

# The job will be executed on November 6th, 2009

sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])

# The job will be executed on November 6th, 2009 at 16:30:05

sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

Ref:
http://apscheduler.readthedocs.org/en/latest/modules/triggers
http://apscheduler.readthedocs.org/en/3.0/userguide.html

相關文章
相關標籤/搜索