python版 定時任務機制

定時任務的原理
服務器執行一個python腳本
這個腳本,循環執行配置的定時任務地址python

Python請求地址, 該地址應該返回, 下次再來執行的秒數. 也就是任務的頻率
好比任務但願每3秒執行一次, 那麼任務結束後,應該返回一個3的數字git

python腳本拿到任務返回的數字, 算出下次執行任務的時間. 當時間條件知足是, python腳本會繼續訪問該任務vim

不一樣的任務, 直接修改 init裏面的配置就能夠了
python腳本以下:服務器

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
import urllib.request
import os,stat

class MyThread(object):
    '''
    多線程
    '''
    def __init__(self, func_list=None,timeout=5):
        self.threads = []
        self.rs = {}
        self.timeout = timeout
        self.func_list = func_list
        self.start()

    #封裝的線程函數
    def trace_func(self, func, name , *args, **kwargs):
        ret = func(*args, **kwargs)
        self.rs[name] = ret

    #執行多線程
    def start(self):
        for v in self.func_list:
            if v["args"]:
                lists = []
                lists.append(v["func"])
                lists.append(v["name"])
                for arg in v["args"]:
                    lists.append(arg)
                    tuples = tuple(lists)
                    t = threading.Thread(target=self.trace_func, args=tuples)
            else:
                t = threading.Thread(target=self.trace_func, args=(v["func"],v["name"],))
            self.threads.append(t)
        for t in self.threads:
            t.start()
        for t in self.threads:
            t.join()
            #t.join(self.timeout)

'''
執行任務
'''
def task(url, last_time):
    #當前時間, 最後執行時間
    cur_time  = int(time.time())
    last_time = int(last_time)
    if last_time==0:
        last_time = cur_time

    #知足條件就執行任務
    if cur_time>=last_time:
        print("請求:%s"%(url))
        try:
            req = urllib.request.urlopen(url)
            content = req.read()
            content = str(content, 'utf-8')
            content = content.strip()
            if content.isdigit():
                last_time += int(content)
            else:
                last_time += 1000
        except:
            print('發生了異常, 重置文件')
            init()
    return last_time

'''
多線程調用定時任務
'''
def main():
    #讀取文件
    with open("config.txt",'r') as f:
        lines = f.readlines()
        f.close()

    #多線程執行
    func_list = []
    for v in lines:
        v = v.split('|')
        v[1] = int(v[1])
        func_list.append({"func":task,"args":(v[0], v[1]), "name":v[0]})
    mt = MyThread(func_list)
    d = mt.rs
    #從新寫入文件
    content = ''
    for k in d:
        content += "%s|%s\n" % (k,str(d[k]))

    with open("config.txt",'w') as f:
        f.write(content)
        f.close()

'''
初始化要執行的定時任務
'''
def init():
    urls = [
        'http://admin.yqxv1.local/task/withdraw',
        'http://baidu.com'
    ]
    content = ''
    for v in urls:
        content += "%s|%s\n" % (v,0)
    with open("config.txt",'w+') as f:
        f.write(content)
        f.close()
    os.chmod("config.txt",stat.S_IRWXU)

init()
while True:
    main()

把上面代碼保存爲doit.py
如何把腳本跑起來, 命令流程
安裝python3
ps -ef | grep python3
查看是否有進程, 若是有該進程, 能夠先把進程殺了
kill -9 進程號
移除 config.txt
rm -rf config.txt
編輯你的doit.py的連接配置
sudo vim doit.py
再將定時腳本跑起來
nohup python3 doit.py >/dev/null 2>&1 &多線程

ps -ef | grep python3 查看你的任務跑起來沒有
測試反饋
在你的執行的腳本, 打印日誌log_test()app

相關文章
相關標籤/搜索