返回

SQLAlchemy子查询Left Join: 挑战与解决方案

mysql

SQLAlchemy 中使用子查询实现 Left Join 的挑战与解决

在使用 SQLAlchemy 进行数据库操作时,经常会遇到需要使用子查询来构建 LEFT JOIN 语句的情况,这在处理具有复杂业务逻辑的数据关系时很常见。本文讨论在 ON 子句中使用子查询进行 LEFT JOIN 时可能遇到的问题,并提供多种解决方案。

问题:ON 子句中的子查询

在关系型数据库中,LEFT JOIN 语句可以根据指定的条件连接两个表,返回左表中的所有行以及右表中匹配的行。当需要在 ON 子句中使用子查询时,问题可能随之而来。一个常见的场景是需要从右表中选择满足某些条件的特定行,通常这个条件会涉及到与左表关联的字段。

典型场景:

比如,我们有两个表 usersemployee_pay_ratesemployee_pay_rates 表中存储了员工的历史工资信息,并且可能有多个历史记录,我们需要选取 start_date 小于指定日期的最新一条记录。 我们需要把 users 表 和筛选后的 employee_pay_rates 连接,展示每个用户的最新工资信息(如果存在的话)。使用 LEFT JOIN 可以保证所有用户都展示出来,即便他们没有工资记录。

SQL 的方式会这样写:

SELECT u.user_id, u.full_name, epr.start_date
FROM users as u
LEFT JOIN employee_pay_rates as epr on epr.pay_rate_id =  (select epr1.pay_rate_id
            from employee_pay_rates as epr1
            WHERE epr1.start_date <= '2024-08-24'
            AND epr1.company_id = u.company_id AND epr1.user_id = u.user_id
            ORDER BY epr1.start_date DESC LIMIT 1)

 where u.company_id = 1

我们想要将以上的 sql 用 SQLAlchemy 实现, 初步实现时可能遇到InvalidRequestError 报错。 这个错误发生的原因是 SQLAlchemy 尝试推断表之间的关联,但子查询不直接关联,并且SQLAlchemy 会检测出 auto-correlation问题,导致不知道应该怎么正确绑定关系。 也就是说,SQLAlchemy 不能够自动解析子查询与外部查询之间的关联。

解决方案:显式关联子查询

要解决这个问题,需要使用 SQLAlchemy 提供的 correlate 方法来显式声明子查询和主查询之间的关系, 或者将子查询作为别名来使用。以下分别说明这两种解决思路:

方案一:使用correlate进行关联

correlate方法明确指定了哪些表应该被视为子查询的外部查询的范围。这可以解决 auto-correlation 错误。

步骤:

  1. 创建一个选择 EmployeePayRate 表的子查询,条件包括用户ID,公司ID 和日期。 并在子查询内部用 order_by 进行降序排列, limit(1) 取最新的一个。
  2. 使用correlate 将外层查询的 User 表 与子查询关联。
  3. LEFT JOINON 条件里,指定子查询的 pay_rate_idEmployeePayRate.pay_rate_id 对应。

代码示例:

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Boolean, ForeignKey, text, and_, select, Date, cast
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, mapped_column, Mapped
from datetime import datetime

engine = create_engine("sqlite:///:memory:")  # 用sqlite作为测试环境
Base = declarative_base()

class Company(Base):
    __tablename__ = 'companies'
    company_id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(255))

class User(Base):
    __tablename__ = "users"

    user_id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    company_id: Mapped[int] = mapped_column(ForeignKey(Company.company_id))
    full_name: Mapped[str] = mapped_column(String(75), default ='')
    email: Mapped[str] = mapped_column(String(255), default ='')
    phone: Mapped[str] = mapped_column(String(25), default ='')
    lang: Mapped[str] = mapped_column(String(25), default ='')
    time_zone: Mapped[str] = mapped_column(String(50), default ='')


class EmployeePayRate(Base):

    __tablename__ = "employee_pay_rates"

    pay_rate_id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    user_id: Mapped[int] = mapped_column(ForeignKey(User.user_id))
    company_id: Mapped[int] = mapped_column(ForeignKey(Company.company_id))
    pay_rate: Mapped[float]
    charge_rate: Mapped[float]
    active: Mapped[bool] = mapped_column(default = True)
    deleted: Mapped[bool] = mapped_column(default = False)
    created_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default = text('CURRENT_TIMESTAMP'))
    start_date: Mapped[datetime] = mapped_column(DateTime(timezone=True))


Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()
# Add dummy data
company1 = Company(company_id = 1, name="Test Company")

user1 = User(user_id = 37, company_id=1, full_name = "[email protected]", email= "[email protected]",phone= '(898) 404-2342',lang="ENG",time_zone ="EST")
session.add(company1)
session.add(user1)
session.commit()

payrate1 = EmployeePayRate(user_id = 37, company_id = 1, pay_rate = 75.04,charge_rate = 250.00,start_date =datetime(2024,7,9,11,7,9))
payrate2 = EmployeePayRate(user_id = 37, company_id = 1, pay_rate = 100.04,charge_rate = 250.00,start_date =datetime(2024,8,20,20,59,17))
payrate3 = EmployeePayRate(user_id = 37, company_id = 1, pay_rate = 100.04,charge_rate = 350.00,start_date = datetime(2024,10,8,13,23,33))

session.add_all([payrate1,payrate2,payrate3])
session.commit()


# 传入 datetime 对象 作为 日期参数, 灵活适应不同日期条件。
date_parameter = datetime(2024, 8, 24).date()

payrate_sel_stmt = select(EmployeePayRate).where(
                and_(
                    EmployeePayRate.company_id == User.company_id,
                    EmployeePayRate.user_id == User.user_id,
                    cast(EmployeePayRate.start_date, Date) <= date_parameter
                )
            ).order_by(EmployeePayRate.start_date.desc()).limit(1).correlate(User)

test_user_sel_stmt = select(User, EmployeePayRate.start_date).outerjoin(EmployeePayRate, EmployeePayRate.pay_rate_id == payrate_sel_stmt.c.pay_rate_id).where(
    User.company_id == 1
)

users = session.execute(test_user_sel_stmt)

for user, start_date in users:
   print(user.full_name, start_date)

结果:

[email protected] 2024-08-20 20:59:17

方案二: 使用alias进行关联

使用alias给子查询创建别名,使之成为可以引用的对象。

步骤:

  1. 创建子查询,并且调用 subquery() 方法给它设置一个别名,例如 payrate_alias
  2. 在主查询中使用 alias 创建的对象与User 表关联。
  3. 使用新别名下的 column 来定义LEFT JOIN的连接条件,保证了连接的唯一性。

代码示例:

payrate_sel_stmt = select(EmployeePayRate).where(
                and_(
                    EmployeePayRate.company_id == User.company_id,
                    EmployeePayRate.user_id == User.user_id,
                    cast(EmployeePayRate.start_date, Date) <= date_parameter
                )
            ).order_by(EmployeePayRate.start_date.desc()).limit(1).subquery()

payrate_alias = payrate_sel_stmt.alias("latest_payrate")


test_user_sel_stmt = select(User, payrate_alias.c.start_date).outerjoin(
    payrate_alias,
     payrate_alias.c.pay_rate_id == EmployeePayRate.pay_rate_id
).where(User.company_id == 1)

users = session.execute(test_user_sel_stmt)
for user, start_date in users:
    print(user.full_name, start_date)

结果:

[email protected] 2024-08-20 20:59:17

总结

处理在 ON 子句中使用子查询的情况可能会带来挑战。为了解决这个常见的 auto-correlation 问题,我们可以选择使用 correlate() 或者 alias() 来显式地定义子查询与主查询之间的关系。两种方式各有优点,选择合适的方案需要基于实际情况和个人喜好。 理解这两种技术可以提升数据处理的能力。 使用上述的 datetime 对象参数, 也增强了 SQLAlchemy 使用时的灵活度。