SQLAlchemy连接数过多?sessionmaker与连接池问题详解
2025-03-23 03:42:21
SQLAlchemy 连接过多问题探究:sessionmaker 和连接池的那些事儿
在使用 SQLAlchemy 连接数据库时,经常遇到连接数过多的问题,程序跑着跑着数据库就拒绝服务了。本文将针对一个实际的连接数问题,带你一步步分析原因,并提供可行的解决方法。
碰到的问题
我写了两个函数,用来创建引擎(engine),sessionmaker 和 session:
import functools
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy import create_engine
import pandas as pd
# 假设 get_engine() 返回一个已配置好的 engine
def get_engine():
# 这里换成你实际的数据库连接字符串
return create_engine("postgresql://user:password@host:port/database")
@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> sessionmaker:
engine = get_engine()
sessionfactory = sessionmaker(bind=engine)
return sessionfactory
@functools.lru_cache(maxsize=None)
def get_session() -> Session:
return _get_sessionmaker()()
然后在 myfun()
函数里用上下文管理器调用 get_session()
,理论上来说,session 应该能正确关闭:
def myfun():
with get_session() as session, session.begin():
# 执行一些查询操作...
pass #此处用pass占位,实际生产环境需填入数据库查询代码.
但是,实际情况是,session/连接 并没有被正确关闭/归还。(反复调用后, 服务端那边的连接数太多,出问题了。) 按道理说,不该有这种问题啊!
我不太明白为啥会这样。如果直接在 myfun_withoutwrapper
使用原始的方式, 反而没问题:
def myfun_withoutwrapper():
engine = get_engine()
Session = sessionmaker(bind=engine)
with Session() as session, session.begin():
# 执行一些查询操作
pass #此处用pass占位,实际生产环境需填入数据库查询代码.
测试连接数的代码:
# 测试执行 myfun() 前后的连接数
engine = get_engine()
count_sql = """SELECT COUNT(*) FROM pg_stat_activity;"""
with engine.connect() as conn:
pre_count_df = pd.read_sql(count_sql, conn)
# 运行 myfun()
_ = myfun()
with engine.connect() as conn:
post_count_df = pd.read_sql(count_sql, conn)
print(f"Before: {pre_count_df.iloc[0,0]}, After: {post_count_df.iloc[0,0]}")
测试结果显示,执行 myfun()
后,连接数增加了 1。 多次执行该测试代码,连接数每次都+1. 这显然不对。(为了证明是 myfun()
的问题,我还专门做了个对照实验,不调用 myfun()
,连接数就没变化。所以问题 100% 出在 myfun()
身上。)
环境:Python 3.12.8, SQLAlchemy 2.0.38, PostgreSQL 16。
问题原因分析
问题的根源在于 @functools.lru_cache
和 SQLAlchemy 连接池之间的相互作用。
-
@functools.lru_cache
的作用: 这个装饰器会缓存函数的结果。_get_sessionmaker
和get_session
都被缓存了。这意味着每次调用get_session()
,实际上返回的是 同一个 sessionmaker 实例,以及由这个 sessionmaker 产生的 同一个 session 实例。 -
SQLAlchemy 连接池:
get_engine()
创建的引擎默认会启用连接池。连接池会维护一组数据库连接,以便重复使用,减少频繁创建和关闭连接的开销。 -
session.close()
的行为:session.close()
实际上是将连接 归还 到连接池,而不是真正关闭连接。 -
综合起来: 由于
get_session()
返回的是同一个 session 实例,多次调用myfun()
,实际上是在重复使用同一个连接。每次with
语句块结束时,session.close()
将连接归还到连接池。但是,由于lru_cache
的存在,get_session()
得到的永远是同一个对象. 其持有的connection并不会释放到engine的连接池中. 当连接池大小用光后,就会有文章开头的异常.
解决方案
方案一:去掉 @lru_cache
(不推荐)
最简单粗暴的方法是去掉 get_session()
上的 @lru_cache
装饰器。这样每次调用 get_session()
都会创建一个新的 session 实例,with
语句块结束时,连接就能正确归还给连接池了.
# 去掉 @functools.lru_cache
def get_session() -> Session:
return _get_sessionmaker()()
缺点: 如果 get_session
调用非常频繁,去掉缓存可能会影响性能. 不建议.
方案二: 使用scoped_session
(推荐)
更优雅的解决方案是使用 scoped_session
。scoped_session
可以提供一个线程安全的 session 管理机制,确保每个线程都有自己独立的 session.
- 修改
_get_sessionmaker()
:
from sqlalchemy.orm import scoped_session
@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> scoped_session:
engine = get_engine()
session_factory = sessionmaker(bind=engine)
return scoped_session(session_factory)
- 修改
get_session()
:
@functools.lru_cache(maxsize=None)
# 无需再加 @functools.lru_cache,scoped_session 已经自带了类似的功能
def get_session() -> Session:
return _get_sessionmaker()
- 修改
myfun()
(重要!): 使用完scoped_session
返回的 session后, 务必调用remove
方法.
def myfun():
session = get_session() # 不再需要 with 语句
try:
with session.begin():
# 执行查询...
pass #此处用pass占位,实际生产环境需填入数据库查询代码.
finally:
session.remove() # 非常重要!清理 session
原理: scoped_session
使用线程本地存储 (thread-local storage) 来保存 session。每次调用 _get_sessionmaker()
返回的是同一个 scoped_session
对象, 但通过该对象获得的session(_get_sessionmaker()()
)对于不同的线程来说是不同的 session 实例. 调用 remove()
方法会清除当前线程的 session,并将其持有的连接释放回连接池。
进阶使用技巧:
- 你可以在
scoped_session
的构造函数中传入自定义的scopefunc
参数, 用于标识不同的 "作用域"。默认的 scopefunc 是线程 ID,你也可以根据需要,用其他信息, 比如请求 ID,来作为 scope。 - 务必在每个作用域结束时调用
remove
.
方案三: 调整 myfun
(更灵活)
更灵活的做法是直接修改 myfun
,去掉 with session.begin()
, 手动管理事务。
def myfun():
session = get_session() # 不再需要 with 语句
try:
session.begin() # 手动开启事务
# do some query...
session.commit() # 手动提交
except Exception:
session.rollback() # 发生异常,手动回滚
raise
finally:
session.close()
这个方案的好处是不依赖 with session.begin()
, 能够处理更加精细的业务需求.
方案四: 连接池参数调优(锦上添花)
连接池的行为也可以通过 create_engine
的参数来控制。
engine = create_engine(
"postgresql://user:password@host:port/database",
pool_size=10, # 连接池大小,默认是 5
max_overflow=5, # 超过 pool_size 后,最多允许再创建的连接数, -1表示无限制.
pool_recycle=3600, # 连接回收时间,单位:秒, -1表示不回收.
pool_pre_ping=True # 在每次从连接池获取连接之前,都进行一次 ping 操作,检查连接是否有效.
)
解释:
pool_size
: 连接池保持的连接数量。如果你的应用并发量不大, 可以适当减小这个值。max_overflow
: 当连接池中的连接都被占用时,允许临时创建的额外连接数。pool_recycle
: 设置连接的生存时间,防止连接长时间不用而失效.pool_pre_ping
: 保证每次从连接池拿出来的都是可用连接. 虽然有一定性能开销,但是强烈建议加上。
安全建议:
- 密码不要硬编码! 使用环境变量或专门的配置管理工具来存储数据库密码。
- 限制数据库用户的权限! 给 SQLAlchemy 使用的数据库用户分配合理的权限,不要给它超级管理员权限。
选择合适的解决方案,取决于你的具体场景。我个人比较推荐使用 scoped_session
。通过合理配置,能够大大减少“连接过多”这类糟心事发生的概率。