返回

SQLAlchemy连接数过多?sessionmaker与连接池问题详解

python

SQLAlchemy 连接过多问题探究:sessionmaker 和连接池的那些事儿

在使用 SQLAlchemy 连接数据库时,经常遇到连接数过多的问题,程序跑着跑着数据库就拒绝服务了。本文将针对一个实际的连接数问题,带你一步步分析原因,并提供可行的解决方法。

碰到的问题

我写了两个函数,用来创建引擎(engine),sessionmaker 和 session:

import functools
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy import create_engine
import pandas as pd

# 假设 get_engine() 返回一个已配置好的 engine
def get_engine():
  # 这里换成你实际的数据库连接字符串
  return create_engine("postgresql://user:password@host:port/database")

@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> sessionmaker:
    engine = get_engine()
    sessionfactory = sessionmaker(bind=engine)
    return sessionfactory

@functools.lru_cache(maxsize=None)
def get_session() -> Session:
    return _get_sessionmaker()()

然后在 myfun() 函数里用上下文管理器调用 get_session(),理论上来说,session 应该能正确关闭:

def myfun():
    with get_session() as session, session.begin():
        # 执行一些查询操作...
        pass #此处用pass占位,实际生产环境需填入数据库查询代码.

但是,实际情况是,session/连接 并没有被正确关闭/归还。(反复调用后, 服务端那边的连接数太多,出问题了。) 按道理说,不该有这种问题啊!

我不太明白为啥会这样。如果直接在 myfun_withoutwrapper 使用原始的方式, 反而没问题:

def myfun_withoutwrapper():
    engine = get_engine()
    Session = sessionmaker(bind=engine)
    with Session() as session, session.begin():
        # 执行一些查询操作
        pass #此处用pass占位,实际生产环境需填入数据库查询代码.

测试连接数的代码:

# 测试执行 myfun() 前后的连接数
engine = get_engine()

count_sql = """SELECT COUNT(*) FROM pg_stat_activity;"""

with engine.connect() as conn:
    pre_count_df = pd.read_sql(count_sql, conn)

# 运行 myfun()
_ = myfun()

with engine.connect() as conn:
    post_count_df = pd.read_sql(count_sql, conn)

print(f"Before: {pre_count_df.iloc[0,0]}, After: {post_count_df.iloc[0,0]}")

测试结果显示,执行 myfun() 后,连接数增加了 1。 多次执行该测试代码,连接数每次都+1. 这显然不对。(为了证明是 myfun() 的问题,我还专门做了个对照实验,不调用 myfun(),连接数就没变化。所以问题 100% 出在 myfun() 身上。)

环境:Python 3.12.8, SQLAlchemy 2.0.38, PostgreSQL 16。

问题原因分析

问题的根源在于 @functools.lru_cache 和 SQLAlchemy 连接池之间的相互作用。

  1. @functools.lru_cache 的作用: 这个装饰器会缓存函数的结果。_get_sessionmakerget_session 都被缓存了。这意味着每次调用 get_session(),实际上返回的是 同一个 sessionmaker 实例,以及由这个 sessionmaker 产生的 同一个 session 实例。

  2. SQLAlchemy 连接池: get_engine() 创建的引擎默认会启用连接池。连接池会维护一组数据库连接,以便重复使用,减少频繁创建和关闭连接的开销。

  3. session.close() 的行为: session.close() 实际上是将连接 归还 到连接池,而不是真正关闭连接。

  4. 综合起来: 由于 get_session() 返回的是同一个 session 实例,多次调用 myfun(),实际上是在重复使用同一个连接。每次 with 语句块结束时,session.close() 将连接归还到连接池。但是,由于lru_cache的存在,get_session()得到的永远是同一个对象. 其持有的connection并不会释放到engine的连接池中. 当连接池大小用光后,就会有文章开头的异常.

解决方案

方案一:去掉 @lru_cache (不推荐)

最简单粗暴的方法是去掉 get_session() 上的 @lru_cache 装饰器。这样每次调用 get_session() 都会创建一个新的 session 实例,with 语句块结束时,连接就能正确归还给连接池了.

# 去掉 @functools.lru_cache
def get_session() -> Session:
    return _get_sessionmaker()()

缺点: 如果 get_session 调用非常频繁,去掉缓存可能会影响性能. 不建议.

方案二: 使用scoped_session (推荐)

更优雅的解决方案是使用 scoped_sessionscoped_session 可以提供一个线程安全的 session 管理机制,确保每个线程都有自己独立的 session.

  1. 修改 _get_sessionmaker()
from sqlalchemy.orm import scoped_session

@functools.lru_cache(maxsize=None)
def _get_sessionmaker() -> scoped_session:
    engine = get_engine()
    session_factory = sessionmaker(bind=engine)
    return scoped_session(session_factory)
  1. 修改 get_session()
@functools.lru_cache(maxsize=None)
# 无需再加 @functools.lru_cache,scoped_session 已经自带了类似的功能
def get_session() -> Session:
   return _get_sessionmaker()
  1. 修改 myfun() (重要!): 使用完scoped_session返回的 session后, 务必调用remove方法.
def myfun():
    session = get_session() # 不再需要 with 语句
    try:
      with session.begin():
        # 执行查询...
        pass #此处用pass占位,实际生产环境需填入数据库查询代码.
    finally:
        session.remove()  # 非常重要!清理 session

原理: scoped_session 使用线程本地存储 (thread-local storage) 来保存 session。每次调用 _get_sessionmaker() 返回的是同一个 scoped_session 对象, 但通过该对象获得的session(_get_sessionmaker()())对于不同的线程来说是不同的 session 实例. 调用 remove() 方法会清除当前线程的 session,并将其持有的连接释放回连接池。

进阶使用技巧:

  • 你可以在 scoped_session 的构造函数中传入自定义的 scopefunc 参数, 用于标识不同的 "作用域"。默认的 scopefunc 是线程 ID,你也可以根据需要,用其他信息, 比如请求 ID,来作为 scope。
  • 务必在每个作用域结束时调用remove.

方案三: 调整 myfun (更灵活)

更灵活的做法是直接修改 myfun,去掉 with session.begin(), 手动管理事务。

def myfun():
    session = get_session()  # 不再需要 with 语句
    try:
        session.begin() # 手动开启事务
        # do some query...
        session.commit() # 手动提交
    except Exception:
        session.rollback() # 发生异常,手动回滚
        raise
    finally:
        session.close()

这个方案的好处是不依赖 with session.begin(), 能够处理更加精细的业务需求.

方案四: 连接池参数调优(锦上添花)

连接池的行为也可以通过 create_engine 的参数来控制。

engine = create_engine(
    "postgresql://user:password@host:port/database",
    pool_size=10,       # 连接池大小,默认是 5
    max_overflow=5,     # 超过 pool_size 后,最多允许再创建的连接数, -1表示无限制.
    pool_recycle=3600,  # 连接回收时间,单位:秒, -1表示不回收.
    pool_pre_ping=True  # 在每次从连接池获取连接之前,都进行一次 ping 操作,检查连接是否有效.
)

解释:

  • pool_size 连接池保持的连接数量。如果你的应用并发量不大, 可以适当减小这个值。
  • max_overflow 当连接池中的连接都被占用时,允许临时创建的额外连接数。
  • pool_recycle 设置连接的生存时间,防止连接长时间不用而失效.
  • pool_pre_ping : 保证每次从连接池拿出来的都是可用连接. 虽然有一定性能开销,但是强烈建议加上。

安全建议:

  • 密码不要硬编码! 使用环境变量或专门的配置管理工具来存储数据库密码。
  • 限制数据库用户的权限! 给 SQLAlchemy 使用的数据库用户分配合理的权限,不要给它超级管理员权限。

选择合适的解决方案,取决于你的具体场景。我个人比较推荐使用 scoped_session。通过合理配置,能够大大减少“连接过多”这类糟心事发生的概率。