Python源碼學習Schedule

關於我
編程界的一名小小程序猿,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是咱們團隊的主要技術棧。 聯繫:hylinux1024@gmail.compython

上一篇介紹了一個簡單的Python調度器的使用,後來我翻閱了一下它的源碼,驚奇的發現核心庫才一個文件,代碼量短短700行不到。這是絕佳的學習材料。
讓我喜出望外的是這個庫的做者居然就是我最近閱讀的一本書《Python Tricks》的做者!如今就讓咱們看看大神的實現思路。linux

0x00 準備

項目地址git

github.com/dbader/sche…github

將代碼checkout到本地編程

環境json

PyCharm+venv+Python3小程序

0x01 用法

這個在上一篇也介紹過了,很是簡單bash

import schedule

# 定義須要執行的方法
def job():
    print("a simple scheduler in python.")

# 設置調度的參數,這裏是每2秒執行一次
schedule.every(2).seconds.do(job)

if __name__ == '__main__':
    while True:
        schedule.run_pending()

# 執行結果
a simple scheduler in python.
a simple scheduler in python.
a simple scheduler in python.
...
複製代碼

這個庫的文檔也很詳細,能夠瀏覽 schedule.readthedocs.io/ 瞭解庫的大概用法app

0x02 項目結構

(venv) ➜  schedule git:(master) tree -L 2
.
...
├── requirements-dev.txt
├── schedule
│   └── __init__.py
├── setup.py
├── test_schedule.py
├── tox.ini
└── venv
    ├── bin
    ├── include
    ├── lib
    ├── pip-selfcheck.json
    └── pyvenv.cfg

8 directories, 18 files

複製代碼
  • schedule目錄下就一個__init__.py文件,這是咱們須要重點學習的地方。
  • setup.py文件是發佈項目的配置文件
  • test_schedule.py是單元測試文件,一開始除了看文檔外,也能夠從單元測試中入手,瞭解這個庫的使用
  • requirements-dev.txt 開發環境的依賴庫文件,若是核心的庫是不須要第三方的依賴的,可是單元測試須要
  • venv是我checkout後建立的,本來的項目是沒有的

0x03 schedule

咱們知道__init__.py是定義Python包必需的文件。在這個文件中定義方法、類均可以在使用import命令時導入到工程項目中,而後使用。dom

schedule 源碼

如下是schedule會用到的模塊,都是Python內部的模塊。

import collections
import datetime
import functools
import logging
import random
import re
import time

logger = logging.getLogger('schedule')
複製代碼

而後定義了一個日誌打印工具實例

接着是定義了該模塊的3個異常類的結構體系,是由Exception派生出來的,分別是ScheduleErrorScheduleValueErrorIntervalError

class ScheduleError(Exception):
    """Base schedule exception"""
    pass

class ScheduleValueError(ScheduleError):
    """Base schedule value error"""
    pass

class IntervalError(ScheduleValueError):
    """An improper interval was used"""
    pass

複製代碼

還定義了一個CancelJob的類,用於取消調度器的繼續執行

class CancelJob(object):
    """ Can be returned from a job to unschedule itself. """
    pass

複製代碼

例如在自定義的須要被調度方法中返回這個CancelJob類就能夠實現一次性的任務

# 定義須要執行的方法
def job():
    print("a simple scheduler in python.")
    # 返回CancelJob能夠中止調度器的後續執行
    return schedule.CancelJob
複製代碼

接着就是這個庫的兩個核心類SchedulerJob

class Scheduler(object):
    """ Objects instantiated by the :class:`Scheduler <Scheduler>` are factories to create jobs, keep record of scheduled jobs and handle their execution. """
    
class Job(object):
    """ A periodic job as used by :class:`Scheduler`. :param interval: A quantity of a certain time unit :param scheduler: The :class:`Scheduler <Scheduler>` instance that this job will register itself with once it has been fully configured in :meth:`Job.do()`. Every job runs at a given fixed time interval that is defined by: * a :meth:`time unit <Job.second>` * a quantity of `time units` defined by `interval` A job is usually created and returned by :meth:`Scheduler.every` method, which also defines its `interval`. """
複製代碼

Scheduler是調度器的實現類,它負責調度任務(job)的建立和執行。

Job則是對須要執行任務的抽象。

這兩個類是這個庫的核心,後面咱們還會看到詳細的分析。
接下來就是默認調度器default_scheduler和任務列表jobs的建立。

# The following methods are shortcuts for not having to
# create a Scheduler instance:

#: Default :class:`Scheduler <Scheduler>` object
default_scheduler = Scheduler()

#: Default :class:`Jobs <Job>` list
jobs = default_scheduler.jobs  # todo: should this be a copy, e.g. jobs()?
複製代碼

在執行import schedule後,就默認建立了default_scheduler。而Scheduler的構造方法爲

def __init__(self):
    self.jobs = []
複製代碼

在執行初始化時,調度器就建立了一個空的任務列表。

在文件的最後定義了一些鏈式調用的方法,使用起來也是很是人性化的,值得學習。 這裏的方法都定義在模塊下,並且都是封裝了default_scheduler實例的調用。

def every(interval=1):
    """Calls :meth:`every <Scheduler.every>` on the :data:`default scheduler instance <default_scheduler>`. """
    return default_scheduler.every(interval)


def run_pending():
    """Calls :meth:`run_pending <Scheduler.run_pending>` on the :data:`default scheduler instance <default_scheduler>`. """
    default_scheduler.run_pending()


def run_all(delay_seconds=0):
    """Calls :meth:`run_all <Scheduler.run_all>` on the :data:`default scheduler instance <default_scheduler>`. """
    default_scheduler.run_all(delay_seconds=delay_seconds)


def clear(tag=None):
    """Calls :meth:`clear <Scheduler.clear>` on the :data:`default scheduler instance <default_scheduler>`. """
    default_scheduler.clear(tag)


def cancel_job(job):
    """Calls :meth:`cancel_job <Scheduler.cancel_job>` on the :data:`default scheduler instance <default_scheduler>`. """
    default_scheduler.cancel_job(job)


def next_run():
    """Calls :meth:`next_run <Scheduler.next_run>` on the :data:`default scheduler instance <default_scheduler>`. """
    return default_scheduler.next_run


def idle_seconds():
    """Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the :data:`default scheduler instance <default_scheduler>`. """
    return default_scheduler.idle_seconds
複製代碼

咱們看下入口方法run_pending(),從本文一開頭的Demo能夠知道這個是啓動調度器的方法。這裏它執行了default_scheduler中的方法。

default_scheduler.run_pending()
複製代碼

因此咱們就把目光定位到Scheduler類的相應方法

def run_pending(self):
    """ Run all jobs that are scheduled to run. Please note that it is *intended behavior that run_pending() does not run missed jobs*. For example, if you've registered a job that should run every minute and you only call run_pending() in one hour increments then your job won't be run 60 times in between but only once. """
    runnable_jobs = (job for job in self.jobs if job.should_run)
    for job in sorted(runnable_jobs):
        self._run_job(job)
複製代碼

這個方法中首先從jobs列表將須要執行的任務過濾後放在runnable_jobs列表,而後將其排序後順序執行內部的_run_job(job)方法

def _run_job(self, job):
    ret = job.run()
    if isinstance(ret, CancelJob) or ret is CancelJob:
        self.cancel_job(job)
複製代碼

_run_job方法中就調用了job類中的run方法,並根據返回值判斷是否須要取消任務。

這時候咱們要看下Job類的實現邏輯。

首先咱們要看下Job是何時建立的。仍是從Demo中的代碼入手

schedule.every(2).seconds.do(job)
複製代碼

這裏先執行了schedule.every()方法

def every(interval=1):
    """Calls :meth:`every <Scheduler.every>` on the :data:`default scheduler instance <default_scheduler>`. """
    return default_scheduler.every(interval)
複製代碼

這個方法就是scheduler類中的every方法

def every(self, interval=1):
    """ Schedule a new periodic job. :param interval: A quantity of a certain time unit :return: An unconfigured :class:`Job <Job>` """
    job = Job(interval, self)
    return job
複製代碼

在這裏建立了一個任務job,並將參數intervalscheduler實例傳入到構造方法中,最後返回job實例用於實現鏈式調用。

跳轉到Job的構造方法

def __init__(self, interval, scheduler=None):
    self.interval = interval  # pause interval * unit between runs
    self.latest = None  # upper limit to the interval
    self.job_func = None  # the job job_func to run
    self.unit = None  # time units, e.g. 'minutes', 'hours', ...
    self.at_time = None  # optional time at which this job runs
    self.last_run = None  # datetime of the last run
    self.next_run = None  # datetime of the next run
    self.period = None  # timedelta between runs, only valid for
    self.start_day = None  # Specific day of the week to start on
    self.tags = set()  # unique set of tags for the job
    self.scheduler = scheduler  # scheduler to register with
複製代碼

主要初始化了間隔時間配置、須要執行的方法、調度器各類時間單位等。

執行every方法以後又調用了seconds這個屬性方法

@property
def seconds(self):
    self.unit = 'seconds'
    return self
複製代碼

設置了時間單位,這個設置秒,固然還有其它相似的屬性方法minuteshoursdays等等。

最後就是執行了do方法

def do(self, job_func, *args, **kwargs):
    """ Specifies the job_func that should be called every time the job runs. Any additional arguments are passed on to job_func when the job runs. :param job_func: The function to be scheduled :return: The invoked job instance """
    self.job_func = functools.partial(job_func, *args, **kwargs)
    try:
        functools.update_wrapper(self.job_func, job_func)
    except AttributeError:
        # job_funcs already wrapped by functools.partial won't have
        # __name__, __module__ or __doc__ and the update_wrapper()
        # call will fail.
        pass
    self._schedule_next_run()
    self.scheduler.jobs.append(self)
    return self
複製代碼

在這裏使用functools工具的中的偏函數partial將咱們自定義的方法封裝成可調用的對象

而後就調用_schedule_next_run方法,它主要是對時間的解析,按照時間對job排序,我以爲這個方法是本項目中的技術點,邏輯也是稍微複雜一丟丟,仔細閱讀就能夠看懂,主要是對時間datetime的使用。因爲篇幅,這裏就再也不貼出代碼。

這裏就完成了任務job的添加。而後在調用run_pending方法中就可讓任務執行。

0x04 總結一下

schedule庫定義兩個核心類SchedulerJob。在導入包時就默認建立一個Scheduler對象,並初始化任務列表。
schedule模塊提供了鏈式調用的接口,在配置schedule參數時,就會建立任務對象job,並會將job添加到任務列表中,最後在執行run_pending方法時,就會調用咱們自定義的方法。 這個庫的核心思想是使用面向對象方法,對事物可以準確地抽象,它整體的邏輯並不複雜,是學習源碼很不錯的範例。

0x05 學習資料

相關文章
相關標籤/搜索