Python3-定時任務四種實現方式

老貓最近作一個小程序開發任務,主要負責後臺部分開發;根據項目需求老貓須要實現三個定時任務:數據庫

1>定時更新微信token,須要2小時更新一次;
2>商品定時上線;
3>定時檢測後臺服務是否存活;小程序

老貓使用Python去實現這三個任務,這裏須要使用定時相關知識點;
Python實現定點與定時任務方式比較多,老貓找到下面四中實現方式,每一個方式都有本身應用場景;下面老貓來快速介紹Python中經常使用的定時任務實現方式:微信

1>循環+sleep;
2>線程模塊中Timer類;
3>schedule模塊;
4>定時框架:APScheduler網絡

在開始以前先設定一個任務(這樣不用依賴外部環境):
1:定時或者定點監測CPU與內存使用率;
2:將時間,CPU,內存使用狀況保存到日誌文件;併發

先來實現系統監測功能:
準備工做:安裝psutil:pip install psutil
功能實現框架

#psutil:獲取系統信息模塊,能夠獲取CPU,內存,磁盤等的使用狀況
import psutil
import time
import datetime
#logfile:監測信息寫入文件
def MonitorSystem(logfile = None):
    #獲取cpu使用狀況
    cpuper = psutil.cpu_percent()
    #獲取內存使用狀況:系統內存大小,使用內存,有效內存,內存使用率
    mem = psutil.virtual_memory()
    #內存使用率
    memper = mem.percent
    #獲取當前時間
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
    print(line)
    if logfile:
        logfile.write(line)

代碼運行結果:異步

2019-03-21 14:23:41 cpu:0.6%, mem:77.2%

接下來咱們要實現定時監測,好比3s監測一下系統資源使用狀況。ide

最簡單使用方式:sleep

這種方式最簡單,直接使用while+sleep就能夠實現:函數

def loopMonitor():
    while True:
        MonitorSystem()
        #2s檢查一次
        time.sleep(3)
loopMonitor()

輸出結果:oop

2019-03-21 14:28:42 cpu:1.5%, mem:77.6%
2019-03-21 14:28:45 cpu:1.6%, mem:77.6%
2019-03-21 14:28:48 cpu:1.4%, mem:77.6%
2019-03-21 14:28:51 cpu:1.4%, mem:77.6%
2019-03-21 14:28:54 cpu:1.3%, mem:77.6%

這種方式存在問題:只能處理單個定時任務。
又來了新任務:須要每秒監測網絡收發字節,代碼實現以下:

def MonitorNetWork(logfile = None):
    #獲取網絡收信息
    netinfo = psutil.net_io_counters()
    #獲取當前時間
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
    print(line)
    if logfile:
        logfile.write(line)
MonitorNetWork()

代碼執行結果:

2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973

若是咱們同時在while循環中監測兩個任務會有等待問題,不能每秒監測網絡狀況。

Timer實現方式

timer最基本理解就是定時器,咱們能夠啓動多個定時任務,這些定時器任務是異步執行,因此不存在等待順序執行問題。
先來看Timer的基本使用:
導入:from threading import Timer
主要方法:

Timer方法 說明
Timer(interval, function, args=None, kwargs=None) 建立定時器
cancel() 取消定時器
start() 使用線程方式執行
join(self, timeout=None) 等待線程執行結束

定時器只能執行一次,若是須要重複執行,須要從新添加任務;
咱們先來看基本使用:

from threading import Timer
#記錄當前時間
print(datetime.datetime.now())
#3S執行一次
sTimer = Timer(3, MonitorSystem)
#1S執行一次
nTimer = Timer(1, MonitorNetWork)
#使用線程方式執行
sTimer.start()
nTimer.start()
#等待結束
sTimer.join()
nTimer.join()
#記錄結束時間
print(datetime.datetime.now())

輸出結果:

2019-03-21 15:13:36.739798
2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349
2019-03-21 15:13:39 cpu:1.4%, mem:93.2%
2019-03-21 15:13:39.745187

能夠看到,花費時間爲3S,可是咱們想要作的是每秒監控網絡狀態;如何處理。
Timer只能執行一次,因此執行完成以後須要再次添加任務,咱們對代碼進行修改:

from threading import Timer
import psutil
import time
import datetime
def MonitorSystem(logfile = None):
    cpuper = psutil.cpu_percent()
    mem = psutil.virtual_memory()
    memper = mem.percent
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
    print(line)
    if logfile:
        logfile.write(line)
    #啓動定時器任務,每三秒執行一次
    Timer(3, MonitorSystem).start()

def MonitorNetWork(logfile = None):
    netinfo = psutil.net_io_counters()
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
    print(line)
    if logfile:
        logfile.write(line)
    #啓動定時器任務,每秒執行一次
    Timer(1, MonitorNetWork).start()
MonitorSystem()
MonitorNetWork()

執行結果:

2019-03-21 15:18:21 cpu:1.5%, mem:93.2%
2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678
2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294
2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702
2019-03-21 15:18:24 cpu:1.9%, mem:93.2%
2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110
2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600
2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008

從時間中能夠看到,這兩個任務能夠同時進行不存在等待問題。
Timer的實質是使用線程方式去執行任務,每次執行完後會銷燬,因此沒必要擔憂資源問題。

調度模塊:schedule

schedule是一個第三方輕量級的任務調度模塊,能夠按照秒,分,小時,日期或者自定義事件執行時間;
安裝方式:

pip install schedule

咱們來看一個例子:

import datetime
import schedule
import time
def func():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func  time :',ts)
def func2():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func2 time:',ts)
def tasklist():
    #清空任務
    schedule.clear()
    #建立一個按秒間隔執行任務
    schedule.every(1).seconds.do(func)
    #建立一個按2秒間隔執行任務
    schedule.every(2).seconds.do(func2)
    #執行10S
    for i in range(10):
        schedule.run_pending()
        time.sleep(1)
tasklist()

執行結果:

do func  time : 2019-03-22 08:51:38
do func2 time: 2019-03-22 08:51:39
do func  time : 2019-03-22 08:51:39
do func  time : 2019-03-22 08:51:40
do func2 time: 2019-03-22 08:51:41
do func  time : 2019-03-22 08:51:41
do func  time : 2019-03-22 08:51:42
do func2 time: 2019-03-22 08:51:43
do func  time : 2019-03-22 08:51:43
do func  time : 2019-03-22 08:51:44
do func2 time: 2019-03-22 08:51:45
do func  time : 2019-03-22 08:51:45
do func  time : 2019-03-22 08:51:46

執行過程分析:

>1>由於老貓在jupyter下執行,因此先將schedule任務清空;
>2>按時間間在schedule中隔添加任務;
>3>老貓這裏按照秒間隔添加func,按照兩秒間隔添加func2;
>4>schedule添加任務後,須要查詢任務並執行任務;
>5>爲了防止佔用資源,每秒查詢到點任務,而後順序執行;

第5個順序執行怎麼理解,咱們修改func函數,裏面添加time.sleep(2)
而後只執行func工做,輸出結果:

do func  time : 2019-03-22 09:00:59
do func  time : 2019-03-22 09:01:02
do func  time : 2019-03-22 09:01:05

能夠看到時間間隔爲3S,爲何不是1S?
由於這個按照順序執行,func休眠2S,循環任務查詢休眠1S,因此會存在這個問題。
在咱們使用這種方式執行任務須要注意這種阻塞現象。
咱們看下schedule模塊經常使用使用方法:

#schedule.every(1)建立Job, seconds.do(func)按秒間隔查詢並執行
schedule.every(1).seconds.do(func)
#添加任務按分執行
schedule.every(1).minutes.do(func)
#添加任務按天執行
schedule.every(1).days.do(func)
#添加任務按周執行
schedule.every().weeks.do(func)
#添加任務每週1執行,執行時間爲下週一這一時刻時間
schedule.every().monday.do(func)
#每週1,1點15開始執行
schedule.every().monday.at("12:00").do(job)

這種方式侷限性:若是工做任務回很是耗時就會影響其餘任務執行。咱們能夠考慮使用併發機制配置這個模塊使用。

任務框架APScheduler

APScheduler是Python的一個定時任務框架,用於執行週期或者定時任務,
能夠基於日期、時間間隔,及相似於Linux上的定時任務crontab類型的定時任務;
該該框架不只能夠添加、刪除定時任務,還能夠將任務存儲到數據庫中,實現任務的持久化,使用起來很是方便。
安裝方式:pip install apscheduler
apscheduler組件及簡單說明:

1>triggers(觸發器):觸發器包含調度邏輯,每個做業有它本身的觸發器
2>job stores(做業存儲):用來存儲被調度的做業,默認的做業存儲器是簡單地把做業任務保存在內存中,支持存儲到MongoDB,Redis數據庫中
3> executors(執行器):執行器用來執行定時任務,只是將須要執行的任務放在新的線程或者線程池中運行
4>schedulers(調度器):調度器是將其它部分聯繫在一塊兒,對使用者提供接口,進行任務添加,設置,刪除。

來看一個簡單例子:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
def func():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func  time :',ts)

def func2():
    #耗時2S
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func2 time:',ts)
    time.sleep(2)

def dojob():
    #建立調度器:BlockingScheduler
    scheduler = BlockingScheduler()
    #添加任務,時間間隔2S
    scheduler.add_job(func, 'interval', seconds=2, id='test_job1')
    #添加任務,時間間隔5S
    scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')
    scheduler.start()
dojob()

輸出結果:

do func  time : 2019-03-22 10:32:20
do func2 time: 2019-03-22 10:32:21
do func  time : 2019-03-22 10:32:22
do func  time : 2019-03-22 10:32:24
do func2 time: 2019-03-22 10:32:24
do func  time : 2019-03-22 10:32:26

輸出結果中能夠看到:任務就算是有延時,也不會影響其餘任務執行。
APScheduler框架提供豐富接口去實現定時任務,能夠去參考官方文檔去查看使用方式。

最後選擇:

老貓簡單總結上面四種定時定點任務實現:1:循環+sleep方式適合簡答測試,2:timer能夠實現定時任務,可是對定點任務來講,須要檢查當前時間點;3:schedule能夠定點定時執行,可是須要在循環中檢測任務,並且存在阻塞;4:APScheduler框架更增強大,能夠直接在裏面添加定點與定時任務;綜合考慮,老貓決定使用APScheduler框架,實現簡單,只須要直接建立任務,並將添加到調度器中便可。

相關文章
相關標籤/搜索