Tornado源码分析3

注:在分割线之前是基于 Tornado2.4 的分析。在Tornado3.0+以后IOLoop发生了一些改动,分割线之后有相应的介绍。

IOLoop是基于epoll实现的底层网络I/O的核心调度模块,用于处理socket相关的连接、响应、异步读写等网络事件。每个Tornado进程都会初始化一个全局唯一的IOLoop实例,在IOLoop中通过静态方法instance()进行封装,获取IOLoop实例直接调用此方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
@staticmethod
def instance():
    """
    class MyClass(object):
        def __init__(self, io_loop=None):
            self.io_loop = io_loop or IOLoop.instance()
    """
    if not hasattr(IOLoop, "_instance"):
        with IOLoop._instance_lock:
            if not hasattr(IOLoop, "_instance"):
                # New instance after double check
                IOLoop._instance = IOLoop()
    return IOLoop._instance

在上一篇文章中已经分析Tornado服务器启动时会创建监听socket,并将socket的file descriptor注册到IOLoop实例中,IOLoop添加对socket的IOLoop.READ事件监听并传入回调处理函数。当某个socket通过accept接受连接请求后调用注册的回调函数进行读写。接下来主要分析IOLoop对epoll的封装和I/O调度具体实现。

epoll是Linux内核中实现的一种可扩展的I/O事件通知机制,是对POISX系统中 select(2) poll(2) 的替代,具有更高的性能和扩展性,FreeBSD中类似的实现是kqueue。Tornado中基于Python C扩展实现的的epoll模块(或kqueue)对epoll(kqueue)的使用进行了封装,使得IOLoop对象可以通过相应的事件处理机制对I/O进行调度。

IOLoop模块对网络事件类型的封装与epoll一致,分为READ,WRITE, ERROR三类,具体如下所示。

1
2
3
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP

IOLoop的初始化

初始化过程中选择epoll的实现方式,Linux平台为epoll,BSD平台为kqueue,其他平台如果安装有C模块扩展的epoll则使用tornado对epoll的封装,否则退化为select。

1
2
3
4
5
6
7
8
9
10
11
12
def __init__(self, impl=None):
    self._impl = impl or _poll()
    #省略部分代码
    self._waker = Waker()
    self.add_handler(self._waker.fileno(),
                     lambda fd, events: self._waker.consume(),
                     self.READ)

def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = stack_context.wrap(handler)
    self._impl.register(fd, events | self.ERROR)

在IOLoop初始化的过程中创建了一个Waker对象,将Waker对象fd的读端注册到事件循环中并设定相应的回调函数(这样做的好处是当事件循环阻塞而没有响应描述符出现,需要在最大timeout时间之前返回,就可以向这个管道发送一个字符)。Waker的使用:一种是在其他线程向IOLoop添加callback时使用,唤醒IOLoop同时会将控制权转移给IOLoop线程并完成特定请求。唤醒的方法向管道中写入一个字符’x’。另外,在IOLoop的stop函数中会调用self._waker.wake(),通过向管道写入’x’停止事件循环。

add_handler函数使用了stack_context提供的wrap方法。wrap返回了一个可以直接调用的对象并且保存了传入之前的堆栈信息,在执行时可以恢复,这样就保证了函数的异步调用时具有正确的运行环境。

IOLoop的start方法

IOLoop的核心调度集中在start方法中,IOLoop实例对象调用start后开始epoll事件循环机制,该方法会一直运行直到IOLoop对象调用stop函数、当前所有事件循环完成。start方法中主要分三个部分:一个部分是对超时的相关处理;一部分是epoll事件通知阻塞、接收;一部分是对epoll返回I/O事件的处理。

  • 为防止IO event starvation,将回调函数延迟到下一轮事件循环中执行。

  • 超时的处理 heapq维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出deadline最早的回调函数,如果callback标志位为True并且已经超时,通过_run_callback调用函数;如果没有超时需要重新设定poll_timeout的值。

  • 通过self._impl.poll(poll_timeout)进行事件阻塞,当有事件通知或超时时poll返回特定的event_pairs。

  • epoll返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
while True:
    poll_timeout = 3600.0

    with self._callback_lock:
        callbacks = self._callbacks
        self._callbacks = []
    for callback in callbacks:
        self._run_callback(callback)

    # 超时处理
    if self._timeouts:
        now = time.time()
        while self._timeouts:
            if self._timeouts[0].callback is None:
                # the timeout was cancelled
                heapq.heappop(self._timeouts)
            elif self._timeouts[0].deadline <= now:
                timeout = heapq.heappop(self._timeouts)
                self._run_callback(timeout.callback)
            else:
                seconds = self._timeouts[0].deadline - now
                poll_timeout = min(seconds, poll_timeout)
                break

    if self._callbacks:
        # If any callbacks or timeouts called add_callback,
        # we don't want to wait in poll() before we run them.
        poll_timeout = 0.0

    if not self._running:
        break

    if self._blocking_signal_threshold is not None:
        # clear alarm so it doesn't fire while poll is waiting for events.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)

    # epoll阻塞,当有事件通知或超时返回event_pairs
    try:
        event_pairs = self._impl.poll(poll_timeout)
    except Exception, e:
        # 异常处理,省略

    # 对epoll返回event_pairs事件的处理
    self._events.update(event_pairs)
    while self._events:
        fd, events = self._events.popitem()
        try:
            self._handlers[fd](fd, events)
        except Exception e:
            # 异常处理,省略

至此IOLoop模块的分析基本完成。下一篇文章将会继续分析IOStream模块。

————————————————————————我是分割线—————————————————————————–

补充于2013年4月30日,介绍Tornado3.0以后IOLoop模块的一些改动。

1. IOLoop成为util.Configurable的子类,IOLoop 中绝大多数成员方法都作为抽象接口,具体实现由派生类 PollIOLoop 完成。IOLoop实现了 Configurable 中的 configurable_base 和 configurable_default 这两个抽象接口,用于初始化过程中获取类类型和类的实现方法(即 IOLoop 中 poller 的实现方式)。在Tornado3.0+ 中针对不同平台,单独出 poller 相应的实现,EPollIOLoop、KQueueIOLoop、SelectIOLoop 均继承于 PollIOLoop。下边的代码是 configurable_default 方法根据平台选择相应的 epoll 实现。初始化 IOLoop 的过程中会自动根据平台选择合适的 poller 的实现方法。

1
2
3
4
5
6
7
8
9
10
11
@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop

2. 其他有很多细节上的改动,详细可参见官方文档What’s new in Tornado 3.0

参考

Tornado源码分析之http服务器篇tornado源码分析系列

Comments