Python3實現定時任務

經常使用推薦

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

# 定義一個job類,完成想要作的事
def worker():
    print("hello scheduler")

# 定時天天 00:00:00秒執行任務
scheduler.add_job(worker, 'cron', day_of_week='0-6', hour=00, minute=00, second=00)
scheduler.start()  # 開始任務
複製代碼
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

scheduler = BlockingScheduler()

# 表示每5秒執行該程序一次
scheduler.add_job(my_job, 'cron', second='*/5')

# 表示每5分鐘執行該程序一次
scheduler.add_job(my_job, 'cron', minute='*/5')

# 表示每5小時執行該程序一次
scheduler.add_job(my_job, 'cron', hour='*/5')

# 表示2017年3月22日17時19分07秒執行該程序
scheduler.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)

# 表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序
scheduler.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

# 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
scheduler.add_job(my_job(), 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2020-05-30')

scheduler.start()
複製代碼

一、while循環中使用sleep

缺點:不容易控制,並且是個阻塞函數python

def timer(n):  
    ''''' 每n秒執行一次 '''  
    while True:    
        print(time.strftime('%Y-%m-%d %X',time.localtime()))    
        yourTask()  # 此處爲要執行的任務 
        time.sleep(n)
複製代碼

二、schedule模塊

優勢:能夠管理和調度多個任務,能夠進行控制 缺點:阻塞式函數linux

import schedule
import time
import datetime

def job1():
    print('Job1:每隔10秒執行一次的任務,每次執行2秒')
    print('Job1-startTime:%s' %(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(2)
    print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')

def job2():
    print('Job2:每隔30秒執行一次,每次執行5秒')
    print('Job2-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(5)
    print('Job2-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job3():
    print('Job3:每隔1分鐘執行一次,每次執行10秒')
    print('Job3-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(10)
    print('Job3-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job4():
    print('Job4:天天下午17:49執行一次,每次執行20秒')
    print('Job4-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(20)
    print('Job4-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


def job5():
    print('Job5:每隔5秒到10秒執行一次,每次執行3秒')
    print('Job5-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    time.sleep(3)
    print('Job5-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    print('------------------------------------------------------------------------')


if __name__ == '__main__':
    schedule.every(10).seconds.do(job1)
    schedule.every(30).seconds.do(job2)
    schedule.every(1).minutes.do(job3)
    schedule.every().day.at('17:49').do(job4)
    schedule.every(5).to(10).seconds.do(job5)
    while True:
        schedule.run_pending()
複製代碼

三、Threading模塊中的Timer

優勢:非阻塞 缺點:不易管理多個任務git

from threading import Timer
import datetime
# 每隔兩秒執行一次任務
def printHello():
    print('TimeNow:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    t = Timer(2, printHello)
    t.start()

if __name__ == "__main__":
    printHello()
複製代碼

四、sched模塊

sched模塊實現了一個時間調度程序,該程序能夠經過單線程執行來處理按照時間尺度進行調度的時間。 經過調用scheduler.enter(delay,priority,func,args)函數,能夠將一個任務添加到任務隊列裏面,當指定的時間到了,就會執行任務(func函數)。redis

  • delay:任務的間隔時間。
  • priority:若是幾個任務被調度到相同的時間執行,將按照priority的增序執行這幾個任務。
  • func:要執行的任務函數
  • args:func的參數
import time, sched
import datetime

s = sched.scheduler(time.time, time.sleep)

def print_time(a='default'):
    print('Now Time:',datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),a)

def print_some_times():
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    s.enter(10,1,print_time)
    s.enter(5,2,print_time,argument=('positional',))
    s.run()
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
print_some_times()
複製代碼

執行結果爲:sql

2018-09-20 16:25:03
Now Time: 2018-09-20 16:25:08 positional
Now Time: 2018-09-20 16:25:13 default
2018-09-20 16:25:13

Process finished with exit code 0
複製代碼

按順序執行任務:mongodb

import time, sched
import datetime

s = sched.scheduler(time.time, time.sleep)


def event_fun1():
    print("func1 Time:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


def perform1(inc):
    s.enter(inc, 0, perform1, (inc,))
    event_fun1()


def event_fun2():
    print("func2 Time:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


def perform2(inc):
    s.enter(inc, 0, perform2, (inc,))
    event_fun2()


def mymain(func, inc=2):
    if func == "1":
        s.enter(0, 0, perform1, (10,))# 每隔10秒執行一次perform1
    if func == "2":
        s.enter(0, 0, perform2, (20,))# 每隔20秒執行一次perform2
if __name__ == '__main__':
    mymain('1')
    mymain('2')
    s.run()
複製代碼

執行結果爲:數據庫

E:\virtualenv\pachong\Scripts\python.exe F:/workspace/project_01/demo_09.py
func1 Time: 2018-09-20 16:30:28
func2 Time: 2018-09-20 16:30:28
func1 Time: 2018-09-20 16:30:38
func2 Time: 2018-09-20 16:30:48
func1 Time: 2018-09-20 16:30:48
func1 Time: 2018-09-20 16:30:58
func2 Time: 2018-09-20 16:31:08
func1 Time: 2018-09-20 16:31:08
func1 Time: 2018-09-20 16:31:18
func2 Time: 2018-09-20 16:31:28
func1 Time: 2018-09-20 16:31:28
func1 Time: 2018-09-20 16:31:38
複製代碼

s.run()會阻塞當前線程的執行 能夠用express

t=threading.Thread(target=s.run)
t.start()
複製代碼

也能夠用s.cancal(action)來取消sched中的某個actionbash

五、定時框架APScheduler

APSScheduler是python的一個定時任務框架,它提供了基於日期date、固定時間間隔interval、以及linux上的crontab類型的定時任務。該礦機不只能夠添加、刪除定時任務,還能夠將任務存儲到數據庫中、實現任務的持久化。框架

APScheduler有四種組件:

  • triggers(觸發器):觸發器包含調度邏輯,每個做業有它本身的觸發器,用於決定接下來哪個做業會運行,除了他們本身初始化配置外,觸發器徹底是無狀態的。
  • job stores(做業存儲):用來存儲被調度的做業,默認的做業存儲器是簡單地把做業任務保存在內存中,其它做業存儲器能夠將任務做業保存到各類數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對做業任務進行持久化存儲的時候,做業的數據將被序列化,從新讀取做業時在反序列化。
  • executors(執行器):執行器用來執行定時任務,只是將須要執行的任務放在新的線程或者線程池中運行。看成業任務完成時,執行器將會通知調度器。對於執行器,默認狀況下選擇ThreadPoolExecutor就能夠了,可是若是涉及到一下特殊任務如比較消耗CPU的任務則能夠選擇ProcessPoolExecutor,固然根據根據實際需求能夠同時使用兩種執行器。
  • schedulers(調度器):調度器是將其它部分聯繫在一塊兒,通常在應用程序中只有一個調度器,應用開發者不會直接操做觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。經過調度器完成任務的存儲以及執行器的配置操做,如能夠添加。修改、移除任務做業。

APScheduler提供了七種調度器:

  • BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用。
  • BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用。
  • AsyncIOScheduler:適合於使用asyncio異步框架的狀況
  • GeventScheduler: 適合於使用gevent框架的狀況
  • TornadoScheduler: 適合於使用Tornado框架的應用
  • TwistedScheduler: 適合使用Twisted框架的應用
  • QtScheduler: 適合使用QT的狀況

APScheduler提供了四種存儲方式:

  • MemoryJobStore
  • sqlalchemy
  • mongodb
  • redis

APScheduler提供了三種任務觸發器:

  • data:固定日期觸發器:任務只運行一次,運行完畢自動清除;若錯過指定運行時間,任務不會被建立
  • interval:時間間隔觸發器
  • cron:cron風格的任務觸發

示例一、

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # 該示例代碼生成了一個BlockingScheduler調度器,使用了默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,而且最大線程數爲10。
    
    # BlockingScheduler:在進程中運行單個任務,調度器是惟一運行的東西
    scheduler = BlockingScheduler()
    # 採用阻塞的方式

    # 採用固定時間間隔(interval)的方式,每隔5秒鐘執行一次
    scheduler.add_job(job, 'interval', seconds=5)
    
    scheduler.start()
複製代碼

示例二、

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
if __name__ == '__main__':
    # BlockingScheduler:在進程中運行單個任務,調度器是惟一運行的東西
    scheduler = BlockingScheduler()
    # 採用阻塞的方式
    
    # 採用date的方式,在特定時間只執行一次
    scheduler.add_job(job, 'date', run_date='2018-09-21 15:30:00')

    scheduler.start() 
複製代碼

示例三、

import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print('job:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

if __name__ == '__main__':
    # BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用。
    scheduler = BackgroundScheduler()
    # 採用非阻塞的方式

    # 採用固定時間間隔(interval)的方式,每隔3秒鐘執行一次
    scheduler.add_job(job, 'interval', seconds=3)
    # 這是一個獨立的線程
    scheduler.start()
    
    # 其餘任務是獨立的線程
    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
複製代碼

運行結果爲:

main-start: 2018-09-21 15:54:28
main-end: 2018-09-21 15:54:30
main-start: 2018-09-21 15:54:30
job: 2018-09-21 15:54:31
main-end: 2018-09-21 15:54:32
main-start: 2018-09-21 15:54:32
main-end: 2018-09-21 15:54:34
main-start: 2018-09-21 15:54:34
job: 2018-09-21 15:54:34
main-end: 2018-09-21 15:54:36
main-start: 2018-09-21 15:54:36
複製代碼

示例四、

import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print('job:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

if __name__ == '__main__':
    # BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用。
    scheduler = BackgroundScheduler()
    # 採用非阻塞的方式

    # 採用date的方式,在特定時間裏執行一次
    scheduler.add_job(job, 'date', run_date='2018-09-21 15:53:00')
    # 這是一個獨立的線程
    scheduler.start()

    # 其餘任務是獨立的線程
    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
複製代碼

運行結果爲:

main-start: 2018-09-21 15:52:57
main-end: 2018-09-21 15:52:59
main-start: 2018-09-21 15:52:59
job: 2018-09-21 15:53:00
main-end: 2018-09-21 15:53:01
複製代碼

示例五、

import time
from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用
    scheduler = BackgroundScheduler()
    # 採用非阻塞的方式

    # 採用corn的方式
    scheduler.add_job(job, 'cron', day_of_week='fri', second='*/5')
    ''' year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) econd (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) * any Fire on every value */a any Fire every a values, starting from the minimum a-b any Fire on any value within the a-b range (a must be smaller than b) a-b/c any Fire every c values within the a-b range xth y day Fire on the x -th occurrence of weekday y within the month last x day Fire on the last occurrence of weekday x within the month last day Fire on the last day within the month x,y,z any Fire on any matching expression; can combine any number of any of the above expressions '''
    scheduler.start()
    
    # 其餘任務是獨立的線程
    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
複製代碼

運行結果:

main-start: 2018-09-21 16:02:55
main-end: 2018-09-21 16:02:57
main-start: 2018-09-21 16:02:57
main-end: 2018-09-21 16:02:59
main-start: 2018-09-21 16:02:59
2018-09-21 16:03:00
main-end: 2018-09-21 16:03:01
main-start: 2018-09-21 16:03:01
main-end: 2018-09-21 16:03:03
main-start: 2018-09-21 16:03:03
2018-09-21 16:03:05
main-end: 2018-09-21 16:03:05
main-start: 2018-09-21 16:03:05
main-end: 2018-09-21 16:03:07
main-start: 2018-09-21 16:03:07
main-end: 2018-09-21 16:03:09
main-start: 2018-09-21 16:03:09
2018-09-21 16:03:10
複製代碼

示例六、

import time
from apscheduler.schedulers.background import BackgroundScheduler


def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # BackgroundScheduler: 適合於要求任何在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用
    scheduler = BackgroundScheduler()
    # 採用阻塞的方式

    # 採用corn的方式
    scheduler.add_job(job, 'cron', day_of_week='fri', second='*/5')
    ''' year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) econd (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) * any Fire on every value */a any Fire every a values, starting from the minimum a-b any Fire on any value within the a-b range (a must be smaller than b) a-b/c any Fire every c values within the a-b range xth y day Fire on the x -th occurrence of weekday y within the month last x day Fire on the last occurrence of weekday x within the month last day Fire on the last day within the month x,y,z any Fire on any matching expression; can combine any number of any of the above expressions '''

    scheduler.start()
複製代碼

示例七、

import time
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
    # mongodb存儲job
    scheduler = BlockingScheduler()
    client = MongoClient(host='127.0.0.1', port=27017)
    store = MongoDBJobStore(collection='job', database='test', client=client)
    scheduler.add_jobstore(store)
    scheduler.add_job(job, 'interval', second=5)
    scheduler.start()
複製代碼

原文連接https://www.jianshu.com/p/b77d934cc252

相關文章
相關標籤/搜索