APScheduler 源码阅读笔记

概述

APScheduler 是由 python 实现的一个轻量级任务调度器,它可以按照一定间隔(IntervalTrigger)、指定时间(2.1中的SimpleTrigger/3.0中的DateTrigger)或者以类似 cron(CronTrigger) 的形式触发待执行任务(即调用函数或者调用 python 的 callable 对象)。现在 pypi 上的稳定版是 APScheduler 2.1.1,3.0 版本在 class Scheduler 中移除了针对不同 trigger 的 add_trigger_job() 接口,统一为 add_job(),但是底层实现变化不大。我主要看了 2.1.1 的代码。代码很简洁,加起来一共2049行。

模块组织

  • Scheduler 调度器的核心部分,负责对 job 的管理和调度,用户使用添加/移除任务,启动调度器都通过 Scheduler 提供的接口完成。

  • Job 封装了需要调度的任务,每一个 Job 实例是在 Scheduler 添加 job 时被初始化,具体的初始化参数决定了调度被触发的形式(3类不同的trigger)。

  • Trigger 包含 SimpleTrigger,IntervalTrigger和 CronTrigger 三个类。Trigger 的作用就是计算下一次触发任务的时间。

  • JobStore 抽象基类,针对任务存储的介质有多个实现,包括基于内存(RAMJobStore)、使用shelve的简单持久化存储(ShelveJobStore)、使用数据库存储(RedisJobStore,MongoDBJobStore)等。如果不指定参数默认使用 RAMJobStore,使用持久化的 JobStore 的目的是在 Scheduler 重启之后能够恢复原有的任务调度。

底层实现

从分析 Scheduler 类入手,首先看项目中自带的example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime
from apscheduler.scheduler import Scheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = Scheduler(standalone=True)
    scheduler.add_interval_job(tick, seconds=3)
    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

上边代码的最核心的三行就是初始化Scheduler,添加以interval为触发的 job 和启动scheduler。这也是使用APScheduler 最基本也最主要的方式。

初始化 Scheduler 有很多参数可以选择(详细可以参考 scheduler-configuration-options),这里简单介绍 standalone 和 daemonic 两个参数。standalone 设置为 False,那么 scheduler 将会以 embedded 模式运行,该模式下调度器会在一个新的线程中运行调度循环(_main_loop);如果 standlone 设置为True,那么 scheduler 会阻塞当前线程,执行调度循环,直到不再有调度任务后返回,被阻塞的线程继续运行。daemonic 即是否以守护线程运行 scheduler,与python 守护线程的效果一致,如果 daemonic 设置为 False,显然该参数在 embedded 模式(standalone==False)下才有效果。Scheduler 默认的运行参数是 standalone == False, daemonic == True,即以 embedded 模式的守护线程中运行调度循环。

start 是启动 scheduler 的方法,如下所示。代码很简洁,启动前读取所有 job_store 中pending job(pending job 是 scheduler 未启动前添加的job),如果为 standalone 模式,会直接进入 _main_loop 调度循环,否则在新的线程中运行调度循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def start(self):
    if self.running:
        raise SchedulerAlreadyRunningError

    # Create a RAMJobStore as the default if there is no default job store
    if not 'default' in self._jobstores:
        self.add_jobstore(RAMJobStore(), 'default', True)

    # Schedule all pending jobs
    for job, jobstore in self._pending_jobs:
        self._real_add_job(job, jobstore, False)
    del self._pending_jobs[:]

    self._stopped = False
    if self.standalone:
        self._main_loop()
    else:
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        self._thread.setDaemon(self.daemonic)
        self._thread.start()

_main_loop 就是调度循环,主体就是一个 while 循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
while not self._stopped:
    logger.debug('Looking for jobs to run')
    now = datetime.now()
    next_wakeup_time = self._process_jobs(now)

    # Sleep until the next job is scheduled to be run, a new job is added or the scheduler is stopped
    if next_wakeup_time is not None:
        wait_seconds = time_difference(next_wakeup_time, now)
        self._wakeup.wait(wait_seconds)
        self._wakeup.clear()
    elif self.standalone:
        self.shutdown()
        break
    else:
        self._wakeup.wait()
        self._wakeup.clear()

进入循环后首先调用 _process_jobs 处理任务,以此处理不同 job_store 中的 每一个 job。在处理 job 过程中首先通过 get_run_times 获取 run_times(get_run_times 很有趣,它获取在 next_run_time 和 now 之间所有需要进行任务调度的时间点,之所以这样做的原因是 APScheduler 允许设定一个 misfire_grace_time 时间,也就是事件执行的延迟时间,因为有很多原因会导致计划调度不能准确在设定好的时间执行。)_process_jobs 处理很简单,将 job 的执行调度交给 scheduler 的线程池,针对每一个 job 的触发会开启一个新的线程(一个疑问:这个线程设置了t.setDaemon(True),但是文档上却说”Jobs are always executed in non-daemonic threads.“)来执行,而实际的任务执行发生在 Scheduler 的 _run_job 方法中。

_process_jobs 会返回下次执行调度的时间,调度循环会根据返回值进行相应的处理,wait 指定时间、或一直 wait 等待事件通知唤醒、或退出循环。调度循环的阻塞和唤醒是由 python 原生 Event 的 wait 和 set 来实现的,阻塞结束的方式有两种:一是 wait(wait_seconds) 超时;另一种是在 scheduler 处于 running 状态添加新的任务,添加新任务过程中会自动调用 set()唤醒 event。

总结

总体而言 APScheduler 以 threading 模块为基础实现,主要用到了 threading.Event 和 threading.Thread,用到的 ThreadPool 是对 threading.Thread 的简单封装。真是因为此所以 APScheduler 有 “No (hard) external dependencies” 和 “Thread-safe API” 这两项优点。但同时存在一个问题,由于 GIL 的存在,任务的执行一定会阻塞主线程,所以如果任务执行时间较长、有更多异步调度的需求,那么可能就会用到另外一个更强大的框架:Celery。 Celery 毕竟是一个分布式的任务队列,相比而言 APScheduler 的特点是轻巧,一言以蔽之即: APScheduler is a light but powerful in-process task scheduler.

Comments