Python Apscheduler源代碼解析(一) 任務調度流程

最近公司有項目須要使用到定時任務,其定時邏輯相似於linux的Cron,就使用了Apscheduler這個類庫。基於公司的業務,須要修改Apshceduler,故而研究了一下Apscheduler的代碼。python

Apscheduler的調度邏輯很是簡單,越簡單的東西每每也越有效。linux

調度器會開闢一個線程,這個線程會循環的從job_store中找到任務,計算任務的執行時間,並與當前時間作比較。若是任務的執行事件<=當前時間,就將任務的firetime放到一個列表中(runtimes)app

def _get_run_times(self, now):
        run_times = []
        next_run_time = self.next_run_time
        while next_run_time and next_run_time <= now:
            run_times.append(next_run_time)
            next_run_time = self.trigger.get_next_fire_time(next_run_time, now)

        return run_times

若是runtimes不爲空,就將其放入Executor中,下面代碼中的executor不是Python的線程池類,是Apscheduler的一個類,固然了,最終的結果是將任務放到線程池當中oop

if run_times:
                    try:
                        executor.submit_job(job, run_times)

在BaseExecutor類中,有一個abstract method,負責將任務放到線程池當中,在其子類BasePoolExecutor中,繼承了這個方法線程

def _do_submit_job(self, job, run_times):
        def callback(f):
            exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
                       (f.exception(), getattr(f.exception(), '__traceback__', None)))
            if exc:
                self._run_job_error(job.id, exc, tb)
            else:
                self._run_job_success(job.id, f.result())

        f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
        f.add_done_callback(callback)

代碼中的self._pool能夠是線程池,也能夠是進程池,在concurrent.futures包中,已是python3的標準類庫了。code

關於調度器的事件循環,若是讓他一直循環不斷的從job_store中取任務,而後判斷,這樣會十分浪費資源。Apscheduler在一次循環結束以前會計算任務下次執行事件與當前時間之差,而後讓調度線程掛起直到那個時間到來。繼承

def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()

self._process_jobs()的返回值就是上面說的那個時間,self._event.wait(wait_seconds)就是讓當前線程等待這麼長的一段時間進程

相關文章
相關標籤/搜索