SQLAlchemy子查询Left Join: 挑战与解决方案
2025-01-28 09:57:19
SQLAlchemy 中使用子查询实现 Left Join 的挑战与解决
在使用 SQLAlchemy 进行数据库操作时,经常会遇到需要使用子查询来构建 LEFT JOIN
语句的情况,这在处理具有复杂业务逻辑的数据关系时很常见。本文讨论在 ON
子句中使用子查询进行 LEFT JOIN
时可能遇到的问题,并提供多种解决方案。
问题:ON
子句中的子查询
在关系型数据库中,LEFT JOIN
语句可以根据指定的条件连接两个表,返回左表中的所有行以及右表中匹配的行。当需要在 ON
子句中使用子查询时,问题可能随之而来。一个常见的场景是需要从右表中选择满足某些条件的特定行,通常这个条件会涉及到与左表关联的字段。
典型场景:
比如,我们有两个表 users
和 employee_pay_rates
。 employee_pay_rates
表中存储了员工的历史工资信息,并且可能有多个历史记录,我们需要选取 start_date
小于指定日期的最新一条记录。 我们需要把 users
表 和筛选后的 employee_pay_rates
连接,展示每个用户的最新工资信息(如果存在的话)。使用 LEFT JOIN
可以保证所有用户都展示出来,即便他们没有工资记录。
SQL 的方式会这样写:
SELECT u.user_id, u.full_name, epr.start_date
FROM users as u
LEFT JOIN employee_pay_rates as epr on epr.pay_rate_id = (select epr1.pay_rate_id
from employee_pay_rates as epr1
WHERE epr1.start_date <= '2024-08-24'
AND epr1.company_id = u.company_id AND epr1.user_id = u.user_id
ORDER BY epr1.start_date DESC LIMIT 1)
where u.company_id = 1
我们想要将以上的 sql 用 SQLAlchemy 实现, 初步实现时可能遇到InvalidRequestError
报错。 这个错误发生的原因是 SQLAlchemy 尝试推断表之间的关联,但子查询不直接关联,并且SQLAlchemy 会检测出 auto-correlation
问题,导致不知道应该怎么正确绑定关系。 也就是说,SQLAlchemy 不能够自动解析子查询与外部查询之间的关联。
解决方案:显式关联子查询
要解决这个问题,需要使用 SQLAlchemy 提供的 correlate
方法来显式声明子查询和主查询之间的关系, 或者将子查询作为别名来使用。以下分别说明这两种解决思路:
方案一:使用correlate
进行关联
correlate
方法明确指定了哪些表应该被视为子查询的外部查询的范围。这可以解决 auto-correlation
错误。
步骤:
- 创建一个选择 EmployeePayRate 表的子查询,条件包括用户ID,公司ID 和日期。 并在子查询内部用 order_by 进行降序排列,
limit(1)
取最新的一个。 - 使用
correlate
将外层查询的User
表 与子查询关联。 - 在
LEFT JOIN
的ON
条件里,指定子查询的pay_rate_id
与EmployeePayRate.pay_rate_id
对应。
代码示例:
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Boolean, ForeignKey, text, and_, select, Date, cast
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, mapped_column, Mapped
from datetime import datetime
engine = create_engine("sqlite:///:memory:") # 用sqlite作为测试环境
Base = declarative_base()
class Company(Base):
__tablename__ = 'companies'
company_id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(255))
class User(Base):
__tablename__ = "users"
user_id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
company_id: Mapped[int] = mapped_column(ForeignKey(Company.company_id))
full_name: Mapped[str] = mapped_column(String(75), default ='')
email: Mapped[str] = mapped_column(String(255), default ='')
phone: Mapped[str] = mapped_column(String(25), default ='')
lang: Mapped[str] = mapped_column(String(25), default ='')
time_zone: Mapped[str] = mapped_column(String(50), default ='')
class EmployeePayRate(Base):
__tablename__ = "employee_pay_rates"
pay_rate_id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
user_id: Mapped[int] = mapped_column(ForeignKey(User.user_id))
company_id: Mapped[int] = mapped_column(ForeignKey(Company.company_id))
pay_rate: Mapped[float]
charge_rate: Mapped[float]
active: Mapped[bool] = mapped_column(default = True)
deleted: Mapped[bool] = mapped_column(default = False)
created_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default = text('CURRENT_TIMESTAMP'))
start_date: Mapped[datetime] = mapped_column(DateTime(timezone=True))
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Add dummy data
company1 = Company(company_id = 1, name="Test Company")
user1 = User(user_id = 37, company_id=1, full_name = "[email protected]", email= "[email protected]",phone= '(898) 404-2342',lang="ENG",time_zone ="EST")
session.add(company1)
session.add(user1)
session.commit()
payrate1 = EmployeePayRate(user_id = 37, company_id = 1, pay_rate = 75.04,charge_rate = 250.00,start_date =datetime(2024,7,9,11,7,9))
payrate2 = EmployeePayRate(user_id = 37, company_id = 1, pay_rate = 100.04,charge_rate = 250.00,start_date =datetime(2024,8,20,20,59,17))
payrate3 = EmployeePayRate(user_id = 37, company_id = 1, pay_rate = 100.04,charge_rate = 350.00,start_date = datetime(2024,10,8,13,23,33))
session.add_all([payrate1,payrate2,payrate3])
session.commit()
# 传入 datetime 对象 作为 日期参数, 灵活适应不同日期条件。
date_parameter = datetime(2024, 8, 24).date()
payrate_sel_stmt = select(EmployeePayRate).where(
and_(
EmployeePayRate.company_id == User.company_id,
EmployeePayRate.user_id == User.user_id,
cast(EmployeePayRate.start_date, Date) <= date_parameter
)
).order_by(EmployeePayRate.start_date.desc()).limit(1).correlate(User)
test_user_sel_stmt = select(User, EmployeePayRate.start_date).outerjoin(EmployeePayRate, EmployeePayRate.pay_rate_id == payrate_sel_stmt.c.pay_rate_id).where(
User.company_id == 1
)
users = session.execute(test_user_sel_stmt)
for user, start_date in users:
print(user.full_name, start_date)
结果:
[email protected] 2024-08-20 20:59:17
方案二: 使用alias
进行关联
使用alias
给子查询创建别名,使之成为可以引用的对象。
步骤:
- 创建子查询,并且调用
subquery()
方法给它设置一个别名,例如payrate_alias
。 - 在主查询中使用
alias
创建的对象与User
表关联。 - 使用新别名下的 column 来定义
LEFT JOIN
的连接条件,保证了连接的唯一性。
代码示例:
payrate_sel_stmt = select(EmployeePayRate).where(
and_(
EmployeePayRate.company_id == User.company_id,
EmployeePayRate.user_id == User.user_id,
cast(EmployeePayRate.start_date, Date) <= date_parameter
)
).order_by(EmployeePayRate.start_date.desc()).limit(1).subquery()
payrate_alias = payrate_sel_stmt.alias("latest_payrate")
test_user_sel_stmt = select(User, payrate_alias.c.start_date).outerjoin(
payrate_alias,
payrate_alias.c.pay_rate_id == EmployeePayRate.pay_rate_id
).where(User.company_id == 1)
users = session.execute(test_user_sel_stmt)
for user, start_date in users:
print(user.full_name, start_date)
结果:
[email protected] 2024-08-20 20:59:17
总结
处理在 ON
子句中使用子查询的情况可能会带来挑战。为了解决这个常见的 auto-correlation
问题,我们可以选择使用 correlate()
或者 alias()
来显式地定义子查询与主查询之间的关系。两种方式各有优点,选择合适的方案需要基于实际情况和个人喜好。 理解这两种技术可以提升数据处理的能力。 使用上述的 datetime
对象参数, 也增强了 SQLAlchemy 使用时的灵活度。