咱們的項目中用apschedule做爲核心定時調度模塊。因此對apschedule進行了一些調查和源碼級的分析。html
一、爲何選擇apschedule?java
聽信了一句話,apschedule之於python就像是quartz之於java。實際用起來仍是不錯的。python
二、安裝redis
# pip安裝方式 $ pip install apscheduler # 源碼編譯方式 $ wget https://pypi.python.org/pypi/APScheduler/#downloads $ python setup.py install
三、apschedule有四個主要的組件sql
1)trigger - 觸發器django
2)job stores - 任務存儲(內存memory和持久化persistence)api
3)executor - 執行器(實現是基於concurrent.futures的線程池或者進程池)多線程
4)schedulers - 調度器(控制着其餘的組件,最經常使用的是background方式和blocking方式)併發
先上一個例子app
# -*- coding:utf-8 -*- import redis from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.redis import RedisJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED class ScheduleFactory(object): def __init__(self): if not hasattr(ScheduleFactory, '__scheduler'): __scheduler = ScheduleFactory.get_instance() self.scheduler = __scheduler @staticmethod def get_instance(): pool = redis.ConnectionPool( host='10.94.99.56', port=6379, ) r = redis.StrictRedis(connection_pool=pool) jobstores = { 'redis': RedisJobStore(2, r), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(max_workers=30), 'processpool': ProcessPoolExecutor(max_workers=30) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False) return scheduler
說明:上例中,scheduleFactory被實現爲一個單例模式,保證new出的對象全局惟一
四、對scheduler的選擇
這裏只給出兩個場景:
1)BackgroundScheduler:這種方式在建立scheduler的父進程退出後,任務同時中止調度。適用範圍:集成在服務中,例如django。
2)BlockingScheduler:這種方式會阻塞住建立shceduler的進程,適用範圍:該程序只幹調度這一件事情。
選擇完調度器以後
1)scheduler.start() 啓動調度器
2)scheduler.shutdown() 中止調度器,調用該方法,調度器等到全部執行中的任務執行完成再退出,可使用wait=False禁用
程序變爲以下樣子
class ScheduleFactory(object): def __init__(self): if not hasattr(ScheduleFactory, '__scheduler'): __scheduler = ScheduleFactory.get_instance() self.scheduler = __scheduler @staticmethod def get_instance(): pool = redis.ConnectionPool( host='10.94.99.56', port=6379, ) r = redis.StrictRedis(connection_pool=pool) jobstores = { 'redis': RedisJobStore(2, r), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(max_workers=30), 'processpool': ProcessPoolExecutor(max_workers=30) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False) # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False) return scheduler def start(self): self.scheduler.start() def shutdown(self): self.scheduler.shutdown()
五、對jobstores的選擇
大的方向有兩個:
1)非持久化
可選的stores:MemoryJobStrore
適用於你不會頻繁啓動和關閉調度器,並且對定時任務丟失批次不敏感。
2)持久化
可選的stores:SQLAlchemyJobStore, RedisJobStore,MongoDBJobStore,ZooKeeperJobStore
適用於你對定時任務丟失批次敏感的狀況
jobStores初始化配置的方式是使用一個字典,例如
jobstores = { 'redis': RedisJobStore(2, r), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') }
key是你配置store的名字,後面在添加任務的使用,能夠指定對應的任務使用對應的store,例如這裏選用的都是key=default的store。
def add_job(self, job_func, interval, id, job_func_params=None) self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)
六、executor的選擇
只說兩個,線程池和進程池。默認default是線程池方式。這個數是執行任務的實際併發數,若是你設置的小了而job添加的比較多,可能出現丟失調度的狀況。
同時對於python多線程場景,若是是計算密集型任務,實際的併發度達不到配置的數量。因此這個數字要根據具體的要求設置。
通常來講咱們設置併發爲30,對通常的場景是沒有問題的。
executors = { 'default': ThreadPoolExecutor(max_workers=30), 'processpool': ProcessPoolExecutor(max_workers=30) }
一樣在add_job的時候,咱們能夠選擇對應的執行器
def add_job(self, job_func, interval, id, job_func_params=None)
self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)
七、trigger的選擇
這是最簡單的一個了,有三種,不用配置
一、date - 天天的固定時間
二、interval - 間隔多長時間執行
三、cron - 正則
八、job的增刪改查接口api能夠參看手冊
http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s
九、問題fix
1)2017-07-24 14:06:28,480 [apscheduler.executors.default:120] [WARNING]- Run time of job "etl_func (trigger: interval[0:01:00], next run at: 2017-07-24 14:07:27 CST)" was missed by 0:00:01.245424
這個問題對應的源碼片斷是
def run_job(job, jobstore_alias, run_times, logger_name): """ Called by executors to run the job. Returns a list of scheduler events to be dispatched by the scheduler. """ events = [] logger = logging.getLogger(logger_name) for run_time in run_times: # See if the job missed its run time window, and handle # possible misfires accordingly if job.misfire_grace_time is not None: difference = datetime.now(utc) - run_time grace_time = timedelta(seconds=job.misfire_grace_time) if difference > grace_time: events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias, run_time)) logger.warning('Run time of job "%s" was missed by %s', job, difference) continue logger.info('Running job "%s" (scheduled at %s)', job, run_time) try: retval = job.func(*job.args, **job.kwargs) except: exc, tb = sys.exc_info()[1:] formatted_tb = ''.join(format_tb(tb)) events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, exception=exc, traceback=formatted_tb)) logger.exception('Job "%s" raised an exception', job) else: events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval)) logger.info('Job "%s" executed successfully', job) return events
這裏面有個參數是misfire_grace_time,默認是1s,若是任務的實際執行時間與任務調度時間的時間差>misfire_grace_time,就會warning而且跳過此次任務的調度!!!
爲何會發生這個問題?
1)executor併發度不夠,你添加的任務太多
2) misfire_grace_time,仍是過小了
2)若是你使用的trigger=interval,而且設置了misfire_grace_time=30這種的話,若是你首次啓動的時間是10:50那麼調度間隔和實際執行可能有1分鐘的偏差
怎麼解決這個問題呢,你能夠經過next_run_time設置首次調度的時間,讓這個時間取整分鐘。例如
def add_job(self, job_func, interval, id, job_func_params=None): next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M") next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M") self.scheduler.add_job(job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30)
3)2017-07-25 11:02:00,003 [apscheduler.scheduler:962] [WARNING]- Execution of job "rule_func (trigger: interval[0:01:00], next run at: 2017-07-25 11:02:00 CST)" skipped: maximum number of running instances reached (1)
對應的源碼爲
for job in due_jobs: # Look up the job's executor try: executor = self._lookup_executor(job.executor) except: self._logger.error( 'Executor lookup ("%s") failed for job "%s" -- removing it from the ' 'job store', job.executor, job) self.remove_job(job.id, jobstore_alias) continue run_times = job._get_run_times(now) run_times = run_times[-1:] if run_times and job.coalesce else run_times if run_times: try: executor.submit_job(job, run_times) except MaxInstancesReachedError: self._logger.warning( 'Execution of job "%s" skipped: maximum number of running ' 'instances reached (%d)', job, job.max_instances) event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id, jobstore_alias, run_times) events.append(event)
submit_job的源碼
with self._lock: if self._instances[job.id] >= job.max_instances: raise MaxInstancesReachedError(job) self._do_submit_job(job, run_times) self._instances[job.id] += 1
這是什麼意思呢,當對一個job的一次調度的任務數>max_instances,會觸發這個異常,並終止調度。例如對一個批次的調度,好比job1,在10:00此次的調度,執行的時候發現有兩個任務被添加了。這怎麼會發生呢?會。可能09:59分的調度沒有成功執行,可是持久化了下來,那麼在10:00會嘗試再次執行。
max_instances默認是1,若是想讓這種異常放過的話,你能夠設置max_instances大一些,好比max_instances=3
十、若是你想監控你的調度,那麼apschedule提供了listener機制,能夠監聽一些異常。只須要註冊監聽者就好
def add_err_listener(self): self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR) def err_listener(ev): msg = '' if ev.code == EVENT_JOB_ERROR: msg = ev.traceback elif ev.code == EVENT_JOB_MISSED: msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time) elif ev.code == EVENT_JOB_MAX_INSTANCES: msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id) rs = RobotSender() rs.send( "https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58", u"[apscheduler調度異常] 異常信息:%s" % (msg), '15210885002', False )
最後的代碼
# -*- coding:utf-8 -*- import redis from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler from apscheduler.jobstores.redis import RedisJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED from alarmkits.send_robot import RobotSender class ScheduleFactory(object): def __init__(self): if not hasattr(ScheduleFactory, '__scheduler'): __scheduler = ScheduleFactory.get_instance() self.scheduler = __scheduler @staticmethod def get_instance(): pool = redis.ConnectionPool( host='10.94.99.56', port=6379, ) r = redis.StrictRedis(connection_pool=pool) jobstores = { 'redis': RedisJobStore(2, r), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(max_workers=30), 'processpool': ProcessPoolExecutor(max_workers=30) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False) # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, daemonic=False) return scheduler def start(self): self.scheduler.start() def shutdown(self): self.scheduler.shutdown() def add_job(self, job_func, interval, id, job_func_params=None): next_minute = (datetime.now() + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M") next_run_time = datetime.strptime(next_minute, "%Y-%m-%d %H:%M") self.scheduler.add_job( job_func, jobstore='default', trigger='interval', seconds=interval, id=id, kwargs=job_func_params, executor='default', next_run_time=next_run_time, misfire_grace_time=30, max_instances=3 ) def remove_job(self, id): self.scheduler.remove_job(id) def modify_job(self, id, interval): self.scheduler.modify_job(job_id=id, seconds=interval) def add_err_listener(self): self.scheduler.add_listener(err_listener, EVENT_JOB_MAX_INSTANCES|EVENT_JOB_MISSED|EVENT_JOB_ERROR) def err_listener(ev): msg = '' if ev.code == EVENT_JOB_ERROR: msg = ev.traceback elif ev.code == EVENT_JOB_MISSED: msg = 'missed job, job_id:%s, schedule_run_time:%s' % (ev.job_id, ev.scheduled_run_time) elif ev.code == EVENT_JOB_MAX_INSTANCES: msg = 'reached maximum of running instances, job_id:%s' %(ev.job_id) rs = RobotSender() rs.send( "https://oapi.dingtalk.com/robot/send?access_token=499ca69a2b45402c00503acea611a6ae6a2f1bacb0ca4d33365595d768bb2a58", u"[apscheduler調度異常] 異常信息:%s" % (msg), '15210885002', False )