最近在同步的時候老是遇到重複插入報錯:python
發現是我同一個導入過程執行了兩遍,我是使用 python 的任務調度模塊 schedule 去執行的,模擬出任務的使用場景以下:數據庫
# 測試調度模塊是否會重複執行任務
import logging
import threading
import time
import schedule
logger = logging.getLogger()
NUM = list()
def main_run():
part_run()
def run_threaded(job_func):
# 對於每個任務 開啓一個進程去執行
job_thread = threading.Thread(target=job_func)
job_thread.start()
def part_run():
# 模擬每 30 s執行一次的定時任務
schedule.every(10).seconds.do(run_threaded, new_finance_update)
# 在一個有阻塞的主程序中去運行
while True:
# logger.info(schedule.jobs)
print(schedule.jobs)
schedule.run_pending()
time.sleep(10)
def new_finance_update():
# 真正執行的任務
# 表示第幾回執行到這裏
global NUM
print("=="*10, NUM)
for i in range(100):
print(i, end=" ")
NUM.append(i)
time.sleep(30)
for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
print(j, end=" ")
NUM.append(j)
time.sleep(30)
if __name__ == "__main__":
try:
main_run()
except KeyboardInterrupt:
print(NUM)
複製代碼
執行結果:多線程
開啓了多線程向數據庫進行插入,沒有在線程之間創建消息聯繫,形成一個線層已經插入的數據被另外的一個線程重複插入。app
去掉多線程,代碼以下:測試
# 測試調度模塊是否會重複執行任務
import logging
import threading
import time
import schedule
logger = logging.getLogger()
NUM = list()
def main_run():
part_run()
# def run_threaded(job_func):
# # 對於每個任務 開啓一個進程去執行
# job_thread = threading.Thread(target=job_func)
# job_thread.start()
def part_run():
# 模擬每 30 s執行一次的定時任務
schedule.every(10).seconds.do(new_finance_update)
# 在一個有阻塞的主程序中去運行
while True:
# logger.info(schedule.jobs)
print(schedule.jobs)
schedule.run_pending()
time.sleep(10)
def new_finance_update():
# 真正執行的任務
# 表示第幾回執行到這裏
global NUM
print("=="*10, NUM)
for i in range(100):
print(i, end=" ")
NUM.append(i)
time.sleep(30)
for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
print(j, end=" ")
NUM.append(j)
time.sleep(30)
if __name__ == "__main__":
try:
main_run()
except KeyboardInterrupt:
print(NUM)
複製代碼
中斷時的運行結果:fetch
第二種是將具體要作的任務進行劃分,而後放在一個隊列中,每一個任務的完成之間互相不受到影響:ui
具體代碼以下:spa
import threading
import time
from queue import Queue
import schedule
jobqueue = Queue()
def fetch_job():
# 從隊列中去獲取任務
job_func = jobqueue.get()
return job_func
def cal_num(num):
time.sleep(1)
print(f"{num} 開始被計算")
print()
def generate_job():
# 根據具體的狀況去生成任務
for j in range(10):
yield (cal_num, j)
def put_job():
# 遍歷生成的任務將其放入隊列
for job in generate_job():
jobqueue.put(job)
def run_threaded():
while not jobqueue.empty():
# 取出而且執行
job_func, j = fetch_job()
# 對於每個任務 開啓一個進程去執行 j 是任務參數
job_thread = threading.Thread(target=job_func, args=(j,))
job_thread.start()
time.sleep(10)
print("任務執行完畢")
def part_run():
# 生成任務而且將其放入隊列中
put_job()
# 每 10 s執行一次的定時任務 開啓一個新的線程去執行
schedule.every(10).seconds.do(run_threaded)
# 在一個有阻塞的主程序中去運行
while True:
# logger.info(schedule.jobs)
print(schedule.jobs)
schedule.run_pending()
time.sleep(10)
if __name__ == "__main__":
part_run()
複製代碼