使用 Tornado 进行异步编程

翻译自:Asynchronous programming with Tornado

对于初学者来说异步编程很令人迷惑,因此我觉得有必要介绍一些有用的基本概念来帮助初学者避免一些常见的陷阱。如果希望理解通用的异步编程模型,可以查看以下这些网络资源,Introduction to Asynchronous ProgrammingTwisted Introduction。在这篇文章中我将会着眼于如何使用 Tornado 进行异步编程。

来自Tornado主页的一段话:

FriendFeed’s web server is a relatively simple, non-blocking web server written in Python. The FriendFeed application is written using a web framework that looks a bit like web.py or Google’s webapp, but with additional tools and optimizations to take advantage of the non-blocking web server and tools. Tornado is an open source version of this web server and some of the tools we use most often at FriendFeed. The framework is distinct from most mainstream web server frameworks (and certainly most Python frameworks) because it is non-blocking and reasonably fast. Because it is non-blocking and uses epoll or kqueue, it can handle thousands of simultaneous standing connections, which means the framework is ideal for real-time web services. We built the web server specifically to handle FriendFeed’s real-time features every active user of FriendFeed maintains an open connection to the FriendFeed servers. (For more information on scaling servers to support thousands of clients, see The C10K problem.)

对于初学者首先需要认清的是自己是否真的需要异步操作。异步编程比同步编程复杂得多,因此有人说:异步编程是不适合人类大脑的。

如果你的应用需要监控一些资源并且当这些资源的状态发生变化时需要采取一定的操作,那么你需要使用异步编程。比如对于一个 web 服务器,如果没有请求到达,那么它处于空闲状态;当有请求通过 socket 到达 web 服务器它就需要对这条请求进行一定的处理。另外一种需要异步编程的情况比如一个应用需要定期的执行一些任务或者延迟一段时间再执行代码。可以使用多线程/进程来控制多个任务的并发执行,那样编程模型也会迅速变得复杂起来。

第二步是需要确认你想要的操作是否能够进行异步操作。不幸的是在 Tornado 中,并非所有的功能都可以异步执行。

Tornado是单线程运行的(尽管在实际应用中,它支持多线程模式),因此阻塞操作会阻塞整个服务器。这意味着一个阻塞操作将会阻止系统执行下一个等待执行的任务。任务的调度通过 IOLoop 完成,IOLoop运行在唯一的可用的线程中。

下边是一个错误使用 IOLoop 的例子(译者注:这段代码与原文不一样,是按照原文的描述修改的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
from tornado.ioloop import IOLoop
from tornado import gen


@gen.engine
def f():
print 'sleeping'
time.sleep(1)
print 'awake!'


if __name__ == "__main__":
# Note that now code is executed "concurrently"
IOLoop.instance().add_callback(f)
IOLoop.instance().add_callback(f)
IOLoop.instance().start()

注意到 blocking_call(译者注:函数f,不知道为什么原文作者说这是blocking_call) 被正确地调用,但是由于它被 time.sleep 阻塞,会阻止接下来任务(第二次调用该函数)的执行。只有当第一次调用结束后,这个函数才会被IOLoop 调度第二次调用。因此输出是这样的一个序列(“sleeping”, “awake!”, “sleeping”, “awake!”)。

对比同样的代码,但是使用 time.sleep 的异步版本,例如 add_timeout:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Example of non-blocking sleep.
import time
from tornado.ioloop import IOLoop
from tornado import gen


@gen.engine
def f():
print 'sleeping'
yield gen.Task(IOLoop.instance().add_timeout, time.time() + 1)
print 'awake!'


if __name__ == "__main__":
# Note that now code is executed "concurrently"
IOLoop.instance().add_callback(f)
IOLoop.instance().add_callback(f)
IOLoop.instance().start()

在这种情况下,函数 f 第一次被调用,会打印“sleeping”,然后它会在1秒之后向 IOLoop 请求继续执行。IOLoop 重获控制权,它会调度函数 f 的第二次调用,第二次调用首先打印“sleeping”,之后将控制权还给 IOLoop。1秒钟后 IOLoop 会在第一个函数挂起的位置继续执行并且打印“awake”。最后,第二次“awake”也会被打印。所以全部的打印序列为“sleeping”, “sleeping”, “awake!”, “awake!”。这两次函数调用是并发执行的(但不是并行!)

现在我会听到你提问:“我如何创建一个函数并且异步地执行它?”在 Tornado 中,每一个有“callback”参数的函数都可以使用 “gen.engine.Task(译者注:应该是gen.Task)”进行异步操作。但是要注意:使用 Task 并不意味着就一定是异步执行!一个事实是函数会被调度获得控制权并执行,执行后任何传递给 callback 的值都会在 Task 中返回。看下边的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from tornado.ioloop import IOLoop
from tornado import gen


def my_function(callback):
print 'do some work'
# Note: this line will block!
time.sleep(1)
callback(123)


@gen.engine
def f():
print 'start'
# Call my_function and return here as soon as "callback" is called.
# "result" is whatever argument was passed to "callback" in "my_function".
result = yield gen.Task(my_function)
print 'result is', result
IOLoop.instance().stop()


if __name__ == "__main__":
f()
IOLoop.instance().start()

绝大多数初学者可能会这样写:Task(my_func),然后认为 my_func 会自动被异步执行。事实上这并不是 Tornado 工作的原理,这是 Go 如何工作的!下边是我最后的建议(译者注:我觉得这是这篇文章最重要的建议):

** In a function that is going to be used “asynchronously”, only asynchronous libraries should be used. **

就是说如果希望异步编程,那么一些阻塞的调用比如 time.sleep 或者 urllib2.urlopen 或者 db.query,它们需要替换成相应的异步版本。比如,IOLoop.add_timeout 是 time.sleep 的替换,AsyncHTTPClient.fetch 是 urllib2.urlopen 的替换等等。对于数据库查询,情况比较复杂,需要一些特定的异步查询驱动,比如对于 MongoDB 的 Motor