關於 schedule 使用的一個 bug

問題描述

最近在同步的時候老是遇到重複插入報錯:python

發現是我同一個導入過程執行了兩遍,我是使用 python 的任務調度模塊 schedule 去執行的,模擬出任務的使用場景以下:數據庫

# 測試調度模塊是否會重複執行任務
import logging
import threading
import time

import schedule

logger = logging.getLogger()
NUM = list()


def main_run():
    part_run()


def run_threaded(job_func):
    # 對於每個任務 開啓一個進程去執行
    job_thread = threading.Thread(target=job_func)
    job_thread.start()


def part_run():
    # 模擬每 30 s執行一次的定時任務
    schedule.every(10).seconds.do(run_threaded, new_finance_update)

    # 在一個有阻塞的主程序中去運行
    while True:
        # logger.info(schedule.jobs)
        print(schedule.jobs)
        schedule.run_pending()
        time.sleep(10)


def new_finance_update():
    # 真正執行的任務

    # 表示第幾回執行到這裏
    global NUM
    print("=="*10, NUM)
    for i in range(100):
        print(i, end=" ")
        NUM.append(i)
        time.sleep(30)
    for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
        print(j, end=" ")
        NUM.append(j)
        time.sleep(30)


if __name__ == "__main__":
    try:
        main_run()
    except KeyboardInterrupt:
        print(NUM)
複製代碼

執行結果:多線程

問題分析

開啓了多線程向數據庫進行插入,沒有在線程之間創建消息聯繫,形成一個線層已經插入的數據被另外的一個線程重複插入。app

解決方案01

去掉多線程,代碼以下:測試

# 測試調度模塊是否會重複執行任務
import logging
import threading
import time

import schedule

logger = logging.getLogger()
NUM = list()


def main_run():
    part_run()


# def run_threaded(job_func):
#     # 對於每個任務 開啓一個進程去執行
#     job_thread = threading.Thread(target=job_func)
#     job_thread.start()


def part_run():
    # 模擬每 30 s執行一次的定時任務
    schedule.every(10).seconds.do(new_finance_update)

    # 在一個有阻塞的主程序中去運行
    while True:
        # logger.info(schedule.jobs)
        print(schedule.jobs)
        schedule.run_pending()
        time.sleep(10)


def new_finance_update():
    # 真正執行的任務

    # 表示第幾回執行到這裏
    global NUM
    print("=="*10, NUM)
    for i in range(100):
        print(i, end=" ")
        NUM.append(i)
        time.sleep(30)
    for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
        print(j, end=" ")
        NUM.append(j)
        time.sleep(30)


if __name__ == "__main__":
    try:
        main_run()
    except KeyboardInterrupt:
        print(NUM)
複製代碼

中斷時的運行結果:fetch

解決方案02

第二種是將具體要作的任務進行劃分,而後放在一個隊列中,每一個任務的完成之間互相不受到影響:ui

具體代碼以下:spa

import threading
import time
from queue import Queue

import schedule

jobqueue = Queue()


def fetch_job():
    # 從隊列中去獲取任務
    job_func = jobqueue.get()
    return job_func


def cal_num(num):
    time.sleep(1)
    print(f"{num} 開始被計算")
    print()


def generate_job():
    # 根據具體的狀況去生成任務
    for j in range(10):
        yield (cal_num, j)


def put_job():
    # 遍歷生成的任務將其放入隊列
    for job in generate_job():
        jobqueue.put(job)


def run_threaded():
    while not jobqueue.empty():
        # 取出而且執行
        job_func, j = fetch_job()
        # 對於每個任務 開啓一個進程去執行 j 是任務參數
        job_thread = threading.Thread(target=job_func, args=(j,))
        job_thread.start()

    time.sleep(10)
    print("任務執行完畢")


def part_run():
    # 生成任務而且將其放入隊列中
    put_job()

    # 每 10 s執行一次的定時任務 開啓一個新的線程去執行
    schedule.every(10).seconds.do(run_threaded)

    # 在一個有阻塞的主程序中去運行
    while True:
        # logger.info(schedule.jobs)
        print(schedule.jobs)
        schedule.run_pending()
        time.sleep(10)


if __name__ == "__main__":
    part_run()
複製代碼
相關文章
相關標籤/搜索