APScheduler 源码阅读笔记
概述
APScheduler 是由 python 实现的一个轻量级任务调度器,它可以按照一定间隔(IntervalTrigger)、指定时间(2.1中的SimpleTrigger/3.0中的DateTrigger)或者以类似 cron(CronTrigger) 的形式触发待执行任务(即调用函数或者调用 python 的 callable 对象)。现在 pypi 上的稳定版是 APScheduler 2.1.1,3.0 版本在 class Scheduler 中移除了针对不同 trigger 的 add_trigger_job() 接口,统一为 add_job(),但是底层实现变化不大。我主要看了 2.1.1 的代码。代码很简洁,加起来一共2049行。
模块组织
Scheduler 调度器的核心部分,负责对 job 的管理和调度,用户使用添加/移除任务,启动调度器都通过 Scheduler 提供的接口完成。
Job 封装了需要调度的任务,每一个 Job 实例是在 Scheduler 添加 job 时被初始化,具体的初始化参数决定了调度被触发的形式(3类不同的trigger)。
Trigger 包含 SimpleTrigger,IntervalTrigger和 CronTrigger 三个类。Trigger 的作用就是计算下一次触发任务的时间。
JobStore 抽象基类,针对任务存储的介质有多个实现,包括基于内存(RAMJobStore)、使用shelve的简单持久化存储(ShelveJobStore)、使用数据库存储(RedisJobStore,MongoDBJobStore)等。如果不指定参数默认使用 RAMJobStore,使用持久化的 JobStore 的目的是在 Scheduler 重启之后能够恢复原有的任务调度。
底层实现
从分析 Scheduler 类入手,首先看项目中自带的example:
1 |
|
上边代码的最核心的三行就是初始化Scheduler,添加以interval为触发的 job 和启动scheduler。这也是使用APScheduler 最基本也最主要的方式。
初始化 Scheduler 有很多参数可以选择(详细可以参考 scheduler-configuration-options),这里简单介绍 standalone 和 daemonic 两个参数。standalone 设置为 False,那么 scheduler 将会以 embedded 模式运行,该模式下调度器会在一个新的线程中运行调度循环(_main_loop);如果 standlone 设置为True,那么 scheduler 会阻塞当前线程,执行调度循环,直到不再有调度任务后返回,被阻塞的线程继续运行。daemonic 即是否以守护线程运行 scheduler,与python 守护线程的效果一致,如果 daemonic 设置为 False,显然该参数在 embedded 模式(standalone==False)下才有效果。Scheduler 默认的运行参数是 standalone == False, daemonic == True,即以 embedded 模式的守护线程中运行调度循环。
start 是启动 scheduler 的方法,如下所示。代码很简洁,启动前读取所有 job_store 中pending job(pending job 是 scheduler 未启动前添加的job),如果为 standalone 模式,会直接进入 _main_loop 调度循环,否则在新的线程中运行调度循环。
1 | def start(self): |
_main_loop 就是调度循环,主体就是一个 while 循环。
1 | while not self._stopped: |
进入循环后首先调用 _process_jobs
处理任务,以此处理不同 job_store 中的 每一个 job。在处理 job 过程中首先通过 get_run_times
获取 run_times(get_run_times
很有趣,它获取在 next_run_time 和 now 之间所有需要进行任务调度的时间点,之所以这样做的原因是 APScheduler 允许设定一个 misfire_grace_time 时间,也就是事件执行的延迟时间,因为有很多原因会导致计划调度不能准确在设定好的时间执行。)_process_jobs
处理很简单,将 job 的执行调度交给 scheduler 的线程池,针对每一个 job 的触发会开启一个新的线程(一个疑问:这个线程设置了 t.setDaemon(True)
,但是文档上却说”Jobs are always executed in non-daemonic threads.”)来执行,而实际的任务执行发生在 Scheduler 的 _run_job
方法中。
_process_jobs
会返回下次执行调度的时间,调度循环会根据返回值进行相应的处理,wait 指定时间、或一直 wait 等待事件通知唤醒、或退出循环。调度循环的阻塞和唤醒是由 python 原生 Event 的 wait 和 set 来实现的,阻塞结束的方式有两种:一是 wait(wait_seconds) 超时;另一种是在 scheduler 处于 running 状态添加新的任务,添加新任务过程中会自动调用 set()唤醒 event。
总结
总体而言 APScheduler 以 threading 模块为基础实现,主要用到了 threading.Event 和 threading.Thread,用到的 ThreadPool 是对 threading.Thread 的简单封装。真是因为此所以 APScheduler 有 “No (hard) external dependencies” 和 “Thread-safe API” 这两项优点。但同时存在一个问题,由于 GIL 的存在,任务的执行一定会阻塞主线程,所以如果任务执行时间较长、有更多异步调度的需求,那么可能就会用到另外一个更强大的框架:Celery。 Celery 毕竟是一个分布式的任务队列,相比而言 APScheduler 的特点是轻巧,一言以蔽之即: APScheduler is a light but powerful in-process task scheduler.