RQ (Redis Queue) 使用的一些思考

最近使用了 rq 这个简单的队列处理库,其中有一些任务需要使用MySQL的连接或者redis的连接,对此有一些思考。

MySQL/redis的连接复用

rq 提供了两种 worker 模型:基于 fork 的 worker 模型和直接在主线程执行任务的 worker 模型。基于 fork 的 worker 在执行任务之前先 fork 一个子进程,在子进程中执行具体的任务,父进程等待子进程执行返回。在基于 fork 的 worker 模型下,如果在父进程有一个 MySQL/redis 连接,由于子进程会继承父进程的地址空间,具有相同的打开文件、socket、管道等,所以子进程中也有同样的 MySQL/redis 连接,那么在这种情况下这个连接可以直接使用么?通过以下代码简单测试一下,连接 MySQL 使用 torndb ,连接 redis 使用 redis-py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python
# encoding: utf-8

import redis
import torndb

def init_redis_conn():
return redis.StrictRedis(port=6380)

def init_mysql_conn():
return torndb.Connection(
host='127.0.0.1', user='root', password='password', database='test')

_redis_conn = init_redis_conn()
_mysql_conn = init_mysql_conn()

def get_redis_conn():
global _redis_conn
if _redis_conn is None:
_redis_conn = init_redis_conn()
return _redis_conn

def get_mysql_conn():
global _mysql_conn
if _mysql_conn is None:
_mysql_conn = init_mysql_conn()
return _mysql_conn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
# encoding: utf-8

import os
import time
import errno

import mconn

def redis_op():
conn = mconn.get_redis_conn()
k = int(time.time())
conn.set(k, k)

def mysql_op():
conn = mconn.get_mysql_conn()
sql = 'INSERT INTO users (name) values (%s)'
conn.insert(sql, str(int(time.time())))

def fork_test(func):
child_pid = os.fork()
if child_pid == 0:
func()
os._exit(0)
else:
try:
os.waitpid(child_pid, 0)
except OSError as e:
if e.errno != errno.EINTR:
raise

if __name__ == '__main__':
while True:
time.sleep(0.5)
# fork_test(mysql_op)
fork_test(redis_op)

执行 main.py,观察3306端口和6380端口的连接数,发现 MySQL 的连接会被复用,但是在 redis 的连接并没有复用,产生了大量到 redis 的 TCP 连接。查看一下 redis-py 的代码,很容易发现为什么每个工作子进程都新建了一个到 redis 的连接。redis-py 通过 StrictRedis 对象向 redis 发起命令时,首先调用 ConnectionPool 对象的 get_connection 方法获取一个可用的连接。在 get_connection 方法中,会首先调用 _checkpid 函数。_checkpid 检查 connnection_pool 的 pid与当前进程的pid是否一致,如果不相同,会关闭 connection_pool 中的所有连接,然后重新建立到 redis 的 TCP 连接。相关代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class StrictRedis(object):
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
connection = pool.get_connection(command_name, **options)
# connection send_command ...

class ConnectionPool(object):
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
# get connection ...
return connection

def _checkpid(self):
if self.pid != os.getpid():
with self._check_lock:
if self.pid == os.getpid():
# another thread already did the work while we waited
# on the lock.
return
self.disconnect()
self.reset()

通过上述代码就很清楚为什么在 fork 工作模式下 redis 连接没有复用了,ConnectPool 对象的 pid 是父进程的 pid,在子进程中与子进程的 pid不同,于是到 redis 的连接被重置。

fork 模型下共享 MySQL 连接合理么

不合理,会有很多问题。比如在子进程中到 MySQL 的 TCP 连接因为异常关闭(或者主动调用 db.close()),由于 copy-on-write 的特性,子进程中修改 *db = None (*db 是 torndb 中保存到 MySQL连接的对象)并不会影响父进程的值,父进程再次 fork 出的子进程使用该 torndb 连接对象时就会出现 OperationalError: (OperationalError) (2006, 'MySQL server has gone away') 的错误。使用 uWSGI 就会有这样的使用场景,于是 uWSGI 有了 lazy-apps 的选项。

为什么 rq 提供了基于 fork 的 worker 模型

回到最初的问题,现在需要在 rq 的任务中使用 MySQL 或者 redis 连接,如果使用 fork 模型的 worker,就需要每次重建一个 TCP 连接,这会带来很大的性能开销,通常是不可接受的。直接使用另外一种在主线程执行任务的 worker 似乎是更好的方案。那么为什么 rq 提供了基于 fork 的 worker 模型?rq 的作者给出了这样的理由:

This mainly has to do with stability. When you spawn a child process (with fork(), or multiprocessing, or whatever) you get an isolated execution context, which has a few nice benefits. Some of which are: 1. If a process crashes (by a segfault in a C module for example), only the child crashes; 2. Additionally, the worker will always be responsive and can easily kill the child after a time out; 3. Also, memory leaks caused in the child can never affect the main worker. The child is killed after every job, so memory should never grow, even when running rqworker for long periods of time.

rq 所看到的任务是一个可加载的 python 函数对象,执行任务时加载该对象并传入参数执行,对于可能出现的任务执行崩溃或内存泄漏等情况 rq 本身并不能处理(比如提到的 C 模块段错误,python 的 try-except 是无法捕捉的)。作为一个执行任务的通用库,fork-based worker 采用了一种保守的手段,通过进程级别的隔离保证了主进程的稳定运行。

关于 rq 的使用场景

最后,谈一谈 rq 在不同场景下的使用。

当有很多小任务、每个任务可能需要等待IO,这种情况下使用非阻塞模型最适合了,比如 gevent。那么 rq 是否支持 gevent?目前是没有官方支持的,当然有一些第三方的扩展,需要注意的是使用 gevent 时最好是重写worker执行的入口,即 rqworker,因为如果只是在 -w 对应的 worker 类中使用 gevent,在 monkey patch 之前已经引用了一些模块,可能会有未知的问题。一些实现参考:rq-gevent-worker, gevent_rqworker.py

另外一种场景,执行的任务是 CPU 密集型的,通常使用多进程比较合适。rq 对于支持使用多进程并发执行任务的 worker 也没有官方支持,一种解决方案是启动多个 rqorker 进程来从同一个任务队列消费任务;当然,也可以自己去扩展 rqworker。

总体而言,rq 代码本身实现得比较简洁,只支持 redis 作为队列存储任务,比较适合一些轻量级的异步任务处理。另一方面由于是一个通用库,一些具体场景下的需求就需要使用者自己来定制。