返回 2026-04-16
⚙️ 工程

SQLAlchemy 2 实战第五章:高级多对多关系设计技巧SQLAlchemy 2 In Practice - Chapter 5 - Advanced Many-To-Many Relationships

miguelgrinberg.com·2026-04-16

Miguel Grinberg 在其《SQLAlchemy 2 in Practice》课程第五讲中深入讲解复杂多对多关系的实现策略。他介绍了如何通过中间表、关联对象和自定义查询逻辑来处理非对称、条件化或多态关联场景。书中提供具体代码示例,展示如何优化性能、维护数据完整性,并避免常见的 ORM 陷阱。

Miguel Grinberg

这是《SQLAlchemy 2 in Practice》的第五章。如果你希望支持我的工作,我建议你直接在我的商店或亚马逊上购买这本书。谢谢!

你已经学习了关系型数据库中使用的设计模块。然而,有时为了实现某个特定目标,这些基础组件需要进行一些“微调”。本章将深入探讨多对多关系的一种非常实用的变体。

供你参考,以下是本书内容的摘要:

  • 前言
  • 第一章:数据库设置
  • 第二章:数据表
  • 第三章:一对多关系
  • 第四章:多对多关系
  • 第五章:高级多对多关系(本文)
  • 尚未发布: 第六章:页面分析解决方案 第七章:异步 SQLAlchemy 第八章:SQLAlchemy 与 Web 练习题解答
  • 带有附加数据的多对多关系

    在本节中,你将为数据库添加 RetroFun 订单子系统,其中包括客户和订单。下一张图展示了新表和它们之间的关系如何融入现有数据库结构。

    新的客户表和订单表之间是一对多的关系,客户处于“一”的一方;而产品和订单之间是多对多关系,通过 orders_items 表作为连接表。但在此关系中,该连接表除了两个外键之外还包含额外的列。

    这个连接表实际上代表了订单中的每一行明细项,它引用了订单和产品。但这还不够,该关系还需要定义销售时的数量和单价。

    在连接表中加入额外数据会使情况复杂化。之前产品与国家之间的多对多关系没有额外列,因此 SQLAlchemy 可以完全控制根据需要向此表插入或删除条目。但当存在额外列时,SQLAlchemy 在需要插入新记录时又该如何填写这些额外字段呢?

    由于应用必须在两个实体建立关联时为关系中的额外字段提供值,因此包含额外列的多对多关系需要使用比自动方式更手动的工作流程来处理 SQLAlchemy。

    为实现 RetroFun 的订单功能,第一步是在应用中扩展 Customer 和 Order 模型,并建立它们之间的一对多关系。这些内容被添加到 models.py 文件底部。

    models.py:订单与客户

    from datetime import datetime
    from uuid import UUID, uuid4
    from sqlalchemy.orm WriteOnlyMapped
    
    # ...
    
    class Order(Model):
        __tablename__ = 'orders'
    
        id: Mapped[UUID] = mapped_column(default=uuid4, primary_key=True)
        timestamp: Mapped[datetime] = mapped_column(default=datetime.utcnow,
                                                    index=True)
        customer_id: Mapped[UUID] = mapped_column(ForeignKey('customers.id'),
                                                  index=True)
    
        customer: Mapped['Customer'] = relationship(back_populates='orders')
    
        def __repr__(self):
            return f'Order({self.id.hex})'
    
    
    class Customer(Model):
        __tablename__ = 'customers'
    
        id: Mapped[UUID] = mapped_column(default=uuid4, primary_key=True)
        name: Mapped[str] = mapped_column(String(64), index=True, unique=True)
        address: Mapped[Optional[str]] = mapped_column(String(128))
        phone: Mapped[Optional[str]] = mapped_column(String(32))
    
        orders: WriteOnlyMapped['Order'] = relationship(back_populates='customer')
    
        def __repr__(self):
            return f'Customer({self.id.hex}, "{self.name}")'

    这些模型中有几个新特性,接下来的部分将逐一介绍。

    UUID 主键

    这两个新模型最重要的区别在于它们的 id 列使用了 UUID 类型而非此前所有模型所用的 int 类型主键。

    备注: UUID 类型的支持是在 SQLAlchemy 2.0 中引入的。官方文档中包含一个可用于旧版本的自定义 UUID 类型实现方案。

    先前使用的自增整数主键的问题在于,当它们出现在 URL 或邮件中时会间接暴露所引用数据库表的大小信息。大多数企业可能更倾向于保护其客户或订单数量等隐私信息,因此为这类表使用整型主键并不是一个好主意。

    避免泄露此类信息的一种方法是不再使用数值型主键。新模型将 id 定义为 UUID,即 16 字节的二进制序列。UUID 有多种类型,其中一种称为 UUID4,非常适合作为主键。如果您不熟悉 Python 中的 UUID4 支持,以下 Python 会话展示了如何生成并打印它们:

    >>> from uuid import uuid4
    >>> id = uuid4()
    >>> id.bytes
    b'kF\xf8\xe9o\x94MM\xad\xfe\x15\xe1\xeb\xb1\xd0\xac'
    >>> id.hex
    '6b46f8e96f944d4dadfe15e1ebb1d0ac'

    遗憾的是,SQLAlchemy 只能自动管理整型主键。为了避免每次添加新的 Order 或 Customer 实例时都要记住创建并分配一个 UUID4,这两个模型中的 id 列都设置了默认参数,这样当应用程序未指定值时,该参数会自动为其赋值。

    默认参数可以设置为常量值,也可以设置为一个可调用的对象,例如函数。如果设置为可调用对象,SQLAlchemy 每次需要为某列提供默认值时都会调用它。上述 id 列将 uuid4 函数作为默认值传入,因此每个新项目都会获得一个新生成的 UUID4。在将函数作为列的默认值传递时,请务必注意不要在函数名后加括号 ()。SQLAlchemy 需要的是函数本身的引用,以便在需要生成值时调用它。

    两个类中的 __repr__() 方法都使用了 UUID 对象的 hex 属性,将二进制 UUID4 解码为可读的十六进制字符串,便于调试,如上方 Python 示例所示。

    customers 和 orders 之间的一对多关系通过在“多”这一侧(本例中为 Order 模型)添加外键来建立。这就是 customer_id 列,其类型也设为 UUID,以匹配 Customer.id 的类型。

    日期和时间列

    Order 模型包含一个 timestamp 列,声明为 datetime 类型。与 UUID 类型类似,SQLAlchemy 会自动将 Python 的 datetime 对象映射到数据库中相应的类型。

    timestamp 列同样具有默认参数。将 datetime.utcnow 作为默认值传入后,SQLAlchemy 会在每次向数据库添加新订单时调用 datetime.utcnow()。这意味着在添加订单时可以省略 timestamp 列的赋值,提交操作时会自动设置当前日期和时间。再次强调,默认参数中不包含 (),以便 SQLAlchemy 接收到函数引用而非立即调用它。

    在处理数据库中的日期和时间值时,保持时区一致性非常重要。最合理的做法是将所有时间戳存储在 UTC 时区,因此默认值设置为 utcnow 而非 now。应用程序可以在从数据库检索到 UTC 时间戳后,将其转换为用户所在时区的时间再呈现给用户。对于 Web 应用而言,这通常是在浏览器端完成的,因为浏览器能获取用户的时区信息。

    只写关系

    这些模型上的关系属性配置方式与 manufacturers 和 products 之间的关系类似。Order.customer 关系使用默认的惰性求值方式,但 Customer.orders 关系采用了新的定义:

        orders: WriteOnlyMapped['Order'] = relationship(back_populates='customer')

    这里使用了 WriteOnlyMapped 类型提示来定义一个配置了 lazy='write_only' 选项的关系。如果不使用或不需要类型提示,也可以在 relationship() 调用中直接指定 lazy 参数。

    为什么这种关系有所不同?一般来说,只有在特定情况下,关系的惰性求值才有意义。像 Order.customer 这样可能只是偶尔使用的“一对多”关系,以及像前一章中 Product.countries 这样预期元素数量较少的“一对多”关系,都适合惰性求值。

    Customer.orders 被定义为 write_only 关系的原因在于,惰性求值对它来说作用有限。对于重复客户而言,这可能是一个包含大量订单的关系,而一次性获取客户所有订单的列表通常用处不大。更实用的做法是提供一种方式,仅请求客户的某部分订单,例如最近的一笔或过去一个月内的订单。但惰性加载的关系无法定义任何过滤器来配置需要检索的元素范围。

    write_only 加载器之所以方便,是因为它并不尝试实际加载关系,而是生成一个查询对象供你自行执行——你可以在之后根据需要添加过滤器、排序或其他选项。该关系对象还提供了 add()、add_all() 和 remove() 方法,可用于向关系中添加或移除元素。

    write_only 关系在实践中如何运作?本章稍后你将看到它的实际应用。

    关联对象模式(Association Object Pattern)

    下一步是在 Order 和 Product 模型之间建立多对多关系,以定义每个订单的内容。对于带有额外列的简单多对多关系,可以将连接表创建为 Table 实例并交由 SQLAlchemy 管理。但由于此关系需要额外数据,因此将连接表创建为 Model 的子类,以便应用程序能够管理这些附加列。SQLAlchemy 将这种定义多对多关系的替代方法称为“关联对象模式”。

    以下是你可以在 models.py 底部看到的新连接表。

    models.py:OrderItem 模型

    class OrderItem(Model):
        __tablename__ = 'orders_items'
    
        product_id: Mapped[int] = mapped_column(ForeignKey('products.id'),
                                                primary_key=True)
        order_id: Mapped[UUID] = mapped_column(ForeignKey('orders.id'),
                                               primary_key=True)
        unit_price: Mapped[float]
        quantity: Mapped[int]

    OrderItem 模型的主键由两个外键列组成,这与前一章中更简单的多对多关系处理方式类似。由于未使用 Optional 类型提示,这两个键都是必需的(在数据库术语中为非空),这意味着如果删除某个产品或订单,则必须清除此表中引用该项目的所有条目。

    该模型类还有两列用于存储所订购产品的单价和数量,这些信息对于完整记录购买详情是必要的。

    接下来是有趣的部分。产品和国家之间较简单的多对多关系使用了 relationship 的 secondary 参数,并在两个模型中都进行了设置,这足以让 SQLAlchemy 知道如何处理。遗憾的是,在此场景中无法使用 secondary,因为应用由于存在额外列而无法放弃对连接表的控制。

    实现这一效果的方法源于这样一个事实:可以将多对多关系理解为两个一对多的关系,或者更准确地说,理解为第一个模型与连接表之间的一对多关系,以及连接表与第二个模型之间的多对一关系。如果将这种关系拆分为两部分来考虑,则无需依赖次要参数。

    下面展示了如何使用一对多关系的对象来实现多对多关系的两端。

    models.py:分解后的多对多关系

    class Product(Model):
        # ...
        order_items: WriteOnlyMapped['OrderItem'] = relationship(
            back_populates='product')
        # ...
    
    class Order(Model):
        # ...
        order_items: Mapped[list['OrderItem']] = relationship(
            back_populates='order')
        # ...
    
    class OrderItem(Model):
        # ...
        product: Mapped['Product'] = relationship(back_populates='order_items')
        order: Mapped['Order'] = relationship(back_populates='order_items')
        # ...

    结果是在模型中添加了四个关系属性,每个多对多关系中的一对多部分各对应两个。Order 实例可以通过其 order_items 关系属性获取订单中包含的明细项列表。该关系返回的每一项都是一个 OrderItem 连接表模型的实例,从中可以访问 product(产品)、unit_price(单价)和 quantity(数量)。

    从产品一侧来看,product 的 order_items 关系属性表示所有购买记录的列表,这些记录同样是 OrderItem 的实例,每条记录都指向对应的订单(进而关联到客户)、单价和数量。由于一个产品可能被多次售出,因此这部分关系使用了 write_only loader 定义,允许应用程序对该列表进行筛选、排序和分页查询。

    新的数据库迁移

    现在 models.py 中对 customers 和 orders 的实现已经完成,是时候对数据库进行迁移以应用这些变更。使用以下命令创建第二个数据库迁移文件:

    (venv) $ alembic revision --autogenerate -m "customers and orders"

    确保生成的迁移脚本包含三个新表后,将其应用到数据库中:

    (venv) $ alembic upgrade head

    此时数据库已准备好接收 customers 和 orders 的数据结构。

    如何创建订单

    带有额外列的多对多关系非常强大,但相比简单的多对多关系,它需要更多的工作量,因为连接表必须由应用程序进行管理。

    使用此方案如何创建订单?步骤如下:

  • 首先创建一个 Customer 实例,如果是回头客,则加载现有客户。
  • 接着创建一个 Order 实例,并将其与客户关联——可以通过在构造函数中传入 customer 参数,或调用 Customer.orders 关系上的 add() 方法实现。
  • 对于订单中的每一项,创建一个包含 product、unit_price 和 quantity 的 OrderItem 实例,并将其添加到订单的 order_items 关系中。
  • 如果你更倾向于通过实际代码来理解,以下会话演示了创建一个包含两项的订单:

    >>> # import all the necessary things and create a session
    >>> from models import Product, Customer, Order, OrderItem
    >>> from db import Session
    >>> session = Session()
    
    >>> # create a new customer
    >>> c = Customer(name='Jane Smith')
    
    >>> # create a new order, add it to the customer and to the database session
    >>> o = Order()
    >>> c.orders.add(o)
    >>> session.add(o)
    
    >>> # add the first line item in the order: product #45 for $45.50
    >>> p1 = session.get(Product, 45)
    >>> o.order_items.append(OrderItem(product=p1, unit_price=45.5, quantity=1))
    
    >>> # add the second line item: 2 of product #82 for $37 each
    >>> p2 = session.get(Product, 82)
    >>> o.order_items.append(OrderItem(product=p2, unit_price=37, quantity=2))
    
    >>> # write the order (along with the customer and order items) to the database
    >>> session.commit()
    
    >>> # check the UUID and the timestamp defaults assigned to the new order
    >>> o.id
    UUID('a73c6aad-8ba9-4550-ac2f-1fcc9285cddc')
    >>> o.timestamp
    datetime.datetime(2023, 2, 24, 19, 52, 47, 293727)

    值得注意的是,用于向关系添加和移除元素的方法并不总是相同的。对于表现为标准 Python 列表的关系,使用的是 append() 和 remove() 方法。如上例所示,Order.order_items 关系就使用了 append()。

    使用 write_only loader 的关系不遵循列表语义,因为相关项目不会被直接加载。这类关系属于 WriteOnlyCollection 类型,提供的是 add() 和 delete() 方法。上述示例在 Customer.orders 关系上使用了 add() 方法。

    删除操作

    与之前的关系类似,理解实体被删除时会发生什么是很重要的。如果至少有一个 OrderItem 条目引用的 Product 或 Order 实体被删除,SQLAlchemy 会尝试将 NULL 写入失效的 OrderItem 实例的外键中。此模型中的两个外键均有意定义为必填(不可为空),因此该操作会失败。在 products 和 countries 关系中不存在此问题,因为关系的次要选项确保了当一个关联实体被删除时,相关元素会从连接表中移除。

    对于未使用次要选项的多对多关系,有两种处理此问题的方法。一种可能的解决方案是实现“删除”级联,类似于 Manufacturer.products 关系中配置的级联,从而自动删除孤立的 OrderItem 条目,类似于 SQLAlchemy 在次要类型关系中所做的操作。另一种选择是假设当存在引用它们的 OrderItem 实例时,产品或订单不能被删除。

    第二种选项本质上是一种“不执行任何操作”的选项,因为 SQLAlchemy 在删除产品或订单时会返回错误。将连接表中的外键设为必填是一种方便的方法,可确保只有在没有指向产品或订单的 OrderItem 实例时才能删除它们。要删除一个订单,应用程序首先需要删除与其关联的所有 OrderItem 实体,然后才能移除 Order 实体。对于此关系,将采用这种方案。

    OrderItem 实例可以被删除吗?由于数据库中没有其他实体有指向它们的外键,因此可以安全地删除这些实体而不会导致数据库完整性错误。可以从关系的两侧删除 OrderItem 实体,方法是从 Order 或 Product 模型的 order_items 关系中移除相应的实例。以下是一个示例:

    >>> # this assumes "oi" has the OrderItem instance to delete,
    
    >>> # delete the OrderItem from an Order instance "o"
    >>> o.order_items.remove(oi)
    >>> session.commit()
    
    >>> # delete the OrderItem from a Product instance "p"
    >>> p.order_items.delete(oi)
    >>> session.commit()

    订单导入脚本

    为了能够查询并实验一个包含大量订单的数据库,以下脚本从 CSV 文件中导入大量随机生成的订单。将以下代码复制到项目目录中名为 import_orders.py 的文件中。

    import_orders.py:从 CSV 文件导入客户和订单

    import csv
    from datetime import datetime
    from sqlalchemy import select, delete
    from db import Session
    from models import Product, Customer, Order, OrderItem
    
    
    def main():
        with Session() as session:
            with session.begin():
                session.execute(delete(OrderItem))
                session.execute(delete(Order))
                session.execute(delete(Customer))
    
        with Session() as session:
            with session.begin():
                with open('orders.csv') as f:
                    reader = csv.DictReader(f)
                    all_customers = {}
                    all_products = {}
    
                    for row in reader:
                        if row['name'] not in all_customers:
                            c = Customer(name=row['name'], address=row['address'],
                                         phone=row['phone'])
                            all_customers[row['name']] = c
                        o = Order(
                            timestamp=datetime.strptime(row['timestamp'],
                                                        '%Y-%m-%d %H:%M:%S'))
                        all_customers[row['name']].orders.add(o)
                        session.add(o)
    
                        product = all_products.get(row['product1'])
                        if product is None:
                            product = session.scalar(select(Product).where(
                                Product.name == row['product1']))
                            all_products[row['product1']] = product
                        o.order_items.append(OrderItem(
                            product=product,
                            unit_price=float(row['unit_price1']),
                            quantity=int(row['quantity1'])))
    
                        if row['product2']:
                            product = all_products.get(row['product2'])
                            if product is None:
                                product = session.scalar(select(Product).where(
                                    Product.name == row['product2']))
                                all_products[row['product2']] = product
                            o.order_items.append(OrderItem(
                                product=product,
                                unit_price=float(row['unit_price2']),
                                quantity=int(row['quantity2'])))
    
                        if row['product3']:
                            product = all_products.get(row['product3'])
                            if product is None:
                                product = session.scalar(select(Product).where(
                                    Product.name == row['product3']))
                                all_products[row['product3']] = product
                            o.order_items.append(OrderItem(
                                product=product,
                                unit_price=float(row['unit_price3']),
                                quantity=int(row['quantity3'])))
    
    
    if __name__ == '__main__':
        main()

    此脚本中使用的大量技术与产品导入脚本中使用的技术相似。第一个会话块删除了所有订单和客户,以便从干净的表格开始。然后在第二个会话块中逐行读取和处理 CSV 文件。

    数据文件的每一行包含一个订单的信息,包括以下字段:

  • name:客户的姓名。
  • address:客户的地址。
  • phone:客户的电话号码。
  • timestamp:订单的日期和时间。
  • product1、unit_price1 和 quantity1:此订单的第一行项目。
  • product2、unit_price2 和 quantity2:此订单的第二行项目;若 quantity2 为 "0",则表示该订单没有第二行项目。
  • product3、unit_price3 和 quantity3:此订单的第三行项目;若 quantity3 为 "0",则表示该订单没有第三行项目。
  • all_customers 字典用于跟踪从文件中读取订单时创建的 Customer 实例,而 all_products 字典则维护从数据库加载的产品缓存,以减少查询次数。

    创建一个 Order 实例并将其关联到客户。timestamp 属性被显式设置为从 CSV 文件导入的日期和时间,以防止该模型中的默认设置将所有订单的时间戳设为脚本运行时的当前时间。

    为了完成订单,会创建一到三个 OrderItem 实例并添加到其中。对于每个订单项,如果该产品之前未见过,则通过查询从数据库中加载;否则从 all_products 缓存中获取。

    在运行此脚本之前,必须将 orders.csv 文件的副本放置到项目目录中。您可以从本书的 GitHub 仓库下载此文件。

    执行以下命令来运行脚本并导入所有订单:

    (venv) $ python import_orders.py

    该脚本应花费几秒钟时间导入订单。

    查询

    现在数据库中已包含客户和订单,可以执行各种有趣的查询。首先,在一个全新的 Python shell 中导入所有需要的类和函数,并创建一个数据库会话:

    >>> from sqlalchemy import select, func
    >>> from db import Session
    >>> from models import Product, Customer, Order, OrderItem
    >>> session = Session()

    让我们尝试新的 write_only 关系。下一个查询通过姓名获取客户,然后访问 orders 关系属性:

    >>> c = session.scalar(
            select(Customer)
                .where(Customer.name == 'John Butler'))
    >>> c
    Customer(14a4d407bca54a69a0118715b664032a, "John Butler")
    >>> c.orders
    <sqlalchemy.orm.writeonly.WriteOnlyCollection object at 0x10ae1be20>

    如您所见,此类关系不会以列表形式呈现。但这个 WriteOnlyCollection 对象有一个 select() 方法,当执行时会返回一个检索相关对象的查询:

    >>> session.scalars(c.orders.select()).all()
    [Order(2971e3d105aa4394822c227da3f4a743), ..., Order(6e5214f6af744d02bea74a6228dec725)]

    您也可以手动编写相同的查询,但借助 write_only 关系,SQLAlchemy 会自动生成该查询。而且由于这是一个查询对象,它可以像其他查询一样添加额外的子句,这与之前看到的列表型关系不同。以下是几个示例:

    >>> # sort the orders from newer to older
    >>> session.scalars(
            c.orders.select()
                .order_by(Order.timestamp.desc())
        ).all()
    [Order(4cbd2174ee6a4f52bc89a65ff74942d2), ..., Order(6e5214f6af744d02bea74a6228dec725)]
    
    >>> # get one order at most
    >>> session.scalar(
            c.orders.select()
                .limit(1))
    Order(2971e3d105aa4394822c227da3f4a743)

    请注意,所有这些打印订单的查询在您自己的系统上会显示不同的主键值,因为 UUID 是在导入订单时随机生成的。

    系统中总共有多少客户和订单?以下查询可获取这些数量:

    >>> session.scalar(select(func.count(Customer.id)))
    2754
    >>> session.scalar(select(func.count(Order.id)))
    4728

    每个 OrderItem 实例包含产品的单价和订单行的数量,但不包括该项目的总价,需要单独计算。下一个查询列出金额最高的三条订单项及其对应的产品。

    >>> item_total = (OrderItem.unit_price * OrderItem.quantity).label(None)
    >>> q = (select(item_total, Product)
                .join(Product.order_items)
                .order_by(item_total.desc())
                .limit(3))
    >>> session.execute(q).all()
    [(385.95000000000005, Product(127, "ZX Spectrum")), (283.16, Product(127, "ZX Spectrum")),
    (259.98, Product(127, "ZX Spectrum"))]

    要解决这个查询问题,需要将产品单价乘以订购数量来计算价格。这由 item_total 变量表示,它存储了该计算的带标签版本。

    请注意,根据所使用的数据库及版本,上述查询返回的金额可能会有微小差异。这是由于浮点数运算的不精确性导致的。

    该查询的要求是列出订单项的总价和产品,因此 select() 接收这两个参数。由于这两个参数来自数据库中不同的表,所以需要进行连接操作。join() 的 Product.order_items 参数告诉 SQLAlchemy,这个查询将把左侧实体(Product)与右侧实体(OrderItem)进行连接。使用 Order.order_items(反向关系)也是等效的,只是连接时表的顺序会调换,但结果相同。

    这个查询最有趣的地方在于,SQLAlchemy 能够理解 item_total 中存储的两个列的乘法运算应在查询中执行,而不是在 Python 进程中处理。这可能会引起一些困惑,因为这是 SQLAlchemy 一种“神奇”的行为。列属性具有自定义的数学运算符实现,这些运算符实际上并不执行计算,而是将操作转移到 SQL 查询中,由数据库来完成。

    如果你想知道这条查询生成的 SQL 是什么,可以打印查询对象:

    >>> print(q)
    SELECT orders_items.unit_price * orders_items.quantity AS anon_1,
      products.id, products.name, products.manufacturer_id, products.year, products.cpu
    FROM products JOIN orders_items ON products.id = orders_items.product_id
      ORDER BY anon_1 DESC LIMIT :param_1

    这是一个有趣的查询,包含了一些新挑战,但实际上查看单个订单项意义不大,因为它们属于某个订单,而一个订单可能包含多个一起购买的订单项。更有用的查询应考虑整个订单的总销售额,即将所有订单项的价格相加。下一条查询找出三个总价最高的订单,综合所有商品计算。这看起来更难实现,但查询语句却惊人地相似:

    >>> order_total = func.sum(OrderItem.unit_price * OrderItem.quantity).label(None)
    >>> q = (select(Order, order_total)
                .join(Order.order_items)
                .group_by(Order)
                .order_by(order_total.desc())
                .limit(3))
    >>> session.execute(q).all()
    [(Order(a3e5d5187a7d420a8086dec947721a1c), 463.99),
    (Order(4b659023464b43688f4eb49cc19cc787), 461.51),
    (Order(8731df42c5fb45e7a90232d67dab3f9a), 443.3)]

    让这个查询生效的关键是使用分组(grouping)和 sum() 聚合函数。查询获取已连接的订单及其订单项,并按订单进行分组,这样属于同一订单的所有订单项会被合并为一个结果,从而可以进行聚合计算。

    与前一条查询中的 item_total 计算不同,这里使用的是 order_total,它对每个订单项应用相同的乘法运算,但由于当前查询进行了分组,可以使用 sum() 函数将所有订单项加总,得出订单的最终总额。

    需要注意的是,根据所使用的数据库以及数据库驱动程序的差异,func.sum() 聚合函数的结果可能以 decimal 对象的形式返回,这种类型比标准的浮点数算术更精确地表示数值。

    下一条查询找出销量最高的前五个产品:

    >>> units = func.sum(OrderItem.quantity).label(None)
    >>> q = (select(Product, units)
                .join(Product.order_items)
                .group_by(Product)
                .order_by(units.desc())
                .limit(5))
    >>> session.execute(q).all()
    [(Product(41, "Commodore 64"), 2023), (Product(48, "Amiga"), 1578),
    (Product(127, "ZX Spectrum"), 1004), (Product(16, "Apple II"), 600),
    (Product(2, "BBC Micro"), 209)]

    对于这个查询,创建了一个标签表达式,用于汇总按产品分组的订单项数量。查询获取已连接的产品和订单项,并按产品对订单项进行分组。

    这里你可能会再次看到 sum 的结果以 Python 的 Decimal 类实例形式报告。

    上述查询中缺少的一个要素是日期范围。通常企业希望在特定时间段(如一个月或一个季度)内计算销售统计。上面返回价格最高订单的查询可以通过在 where() 子句中添加 between() 条件来限制其在指定日期范围内运行。以下是计算 2022 年 11 月价格前三高的订单的方法:

    >>> from datetime import datetime
    >>> order_total = func.sum(OrderItem.unit_price * OrderItem.quantity).label(None)
    >>> q = (select(Order, order_total)
                .join(Order.order_items)
                .where(Order.timestamp.between(
                    datetime(2022, 11, 1), datetime(2022, 12, 1)))
                .group_by(Order)
                .order_by(order_total.desc())
                .limit(3))
    >>> session.execute(q).all()
    [(Order(2cfeb68e0bed4fe7b0f3bbe707f194ee), 335.09000000000003),
    (Order(5b02a2f26aa8499a96da008d3cff99f0), 318.48),
    (Order(d53530d88c9f4f45b960bdddf1b89a40), 305.57)]

    在计算销量前五产品的查询中,添加日期范围时会出现一个小问题,因为该查询并未使用包含订单时间戳的 Order 模型。为了按日期筛选,此查询需要在 OrderItem 和 Order 之间进行额外连接,这实际上意味着将使用产品和订单之间的完整多对多关系。

    >>> units = func.sum(OrderItem.quantity).label(None)
    >>> q = (select(Product, units)
                .join(Product.order_items)
                .join(OrderItem.order)
                .where(Order.timestamp.between(
                    datetime(2022, 11, 1), datetime(2022, 12, 1)))
                .group_by(Product)
                .order_by(units.desc())
                .limit(5))
    >>> session.execute(q).all()
    [(Product(41, "Commodore 64"), 157), (Product(48, "Amiga"), 139),
    (Product(127, "ZX Spectrum"), 65), (Product(16, "Apple II"), 46),
    (Product(2, "BBC Micro"), 23)]

    注意,这个通过关联对象模式构建的多对多关系需要显式地为它的两个分支进行连接,而产品与国家之间更简单的关系则基于 SQLAlchemy 关系的 secondary 参数自动从单个 join() 子句向数据库发出两次连接。这是手动管理多对多关系时丢失的另一个小便利。

    之前的许多查询都在订单时间戳上使用 between() 过滤器来将结果限制在特定时间段内。在处理时间戳时,另一种常见的查询模式是按某个时间单位(如天、月、季度或年)对结果进行分组。这更具挑战性,因为时间戳需要被转换为一个可用于 group_by() 子句的值,以便每个区间内的所有结果都能被聚合。

    以下查询使用 extract() 函数从订单时间戳中提取年份和月份,然后按它们进行分组,以计算 2022 年每月销售的总数量。

    >>> month = func.extract('month', Order.timestamp).label(None)
    >>> year = func.extract('year', Order.timestamp).label(None)
    >>> units = func.sum(OrderItem.quantity).label(None)
    >>> q = (select(year, month, units)
                .join(OrderItem)
                .where(Order.timestamp.between(
                    datetime(2022, 1, 1), datetime(2023, 1, 1)))
                .group_by(year, month)
                .order_by(year, month))
    >>> session.execute(q).all()
    [(2022, 1, 505), (2022, 2, 426), (2022, 3, 525), ..., (2022, 12, 564)]

    extract() 函数接受一个时间单位(如 day、week、month、quarter 或 year)作为第一个参数,后跟一个 datetime 列,并返回请求的日期或时间部分。上面的示例提取了年份和月份作为单独的结果值,然后使用一个复合 group_by() 子句,同时按这两个值进行分组。结果随后按这两个相同的值升序排序。

    如前所述,某些数据库通过计算或函数获得的结果会以 Decimal 对象的形式返回。上面示例中的结果来自 SQLite,它使用标准的 int 和 float 数字。在使用 MySQL 时,你将获得相同的结果,但总和是 decimal 对象:

    [(2022, 1, Decimal('505')), (2022, 2, Decimal('426')), ..., (2022, 12, Decimal('564'))]

    当使用 PostgreSQL 运行相同的查询时,结果会以前所未有的格式返回:

    [(Decimal('2022'), Decimal('1'), 505), (Decimal('2022'), Decimal('2'), 426), ...,
    Decimal('12'), 564)]

    这里,extract() 函数调用的结果是 decimal 对象,而总和则以整数形式返回。这些是数据库引擎之间的微小实现差异。当你收到一个 Decimal 对象时,可以使用 int() 函数将其转换为原始整数类型:

    >>> from decimal import Decimal
    >>> int(Decimal('2022'))
    2022

    你可以使用 float() 转换函数将 decimal 对象转换为浮点数,但请注意,转换过程中可能会丢失一些精度。

    更多聚合技术

    接下来你将为 RetroFun 数据库添加的一项功能是支持客户撰写的产品评论。客户评论包括 1 到 5 星的评分,以及可选的文字评论。

    思考实现方式,客户评论将客户与产品关联起来,因此这两个模型之间将建立新的关系。要决定哪种关系类型是合适的,你必须考虑关系两端的基数。鉴于预计客户可以评价多个产品,且产品也可以被多个客户评价,因此适合此问题的关系类型仍然是多对多。

    下图展示了一个新的 product_reviews 连接表,用于关联 products 和 customers。

    这个关系应该使用前一章介绍的简单多对多结构,还是上面订单例子中更高级的方式?由于该关系需要额外存储星级评分和评论文本,因此基于关联对象模式的高级方案是正确选择。

    以下是表示该关系连接表的模型,以及在 Product 和 Customer 模型中添加的新关系属性:

    models.py: 分解的多对多关系

    from sqlalchemy import Text
    
    
    class Product(Model):
        # ...
        reviews: WriteOnlyMapped['ProductReview'] = relationship(
            back_populates='product')
        # ...
    
    
    class Customer(Model):
        # ...
        product_reviews: WriteOnlyMapped['ProductReview'] = relationship(
            back_populates='customer')
        # ...
    
    
    class ProductReview(Model):
        __tablename__ = 'products_reviews'
    
        product_id: Mapped[int] = mapped_column(ForeignKey('products.id'),
                                                primary_key=True)
        customer_id: Mapped[UUID] = mapped_column(ForeignKey('customers.id'),
                                                  primary_key=True)
        timestamp: Mapped[datetime] = mapped_column(default=datetime.utcnow,
                                                    index=True)
        rating: Mapped[int]
        comment: Mapped[Optional[str]] = mapped_column(Text)
    
        product: Mapped['Product'] = relationship(back_populates='reviews')
        customer: Mapped['Customer'] = relationship(
            back_populates='product_reviews')

    这种关系的构建方式与 products 和 orders 之间的关系非常相似。连接表被写成一个 Model 子类,包含三个额外字段:一个用于存储评论的时间戳(带默认值自动设为当前时间)、数值型评分和评论内容。时间戳和星级评分是必填且不可为空的。评论文本是可选的,使用 Text 类型而非 String,因为 SQLAlchemy 用它来处理长度不定的长文本块。

    鉴于产品评论列表可能很大,Product 和 Customer 模型中的关系属性都配置了 write_only 加载器。而连接表中的关系映射到单个实体,因此采用默认的惰性加载机制。

    要生成包含这些变更的新数据库迁移并应用到数据库,请运行以下命令:

    (venv) $ alembic revision --autogenerate -m "product reviews"
    (venv) $ alembic upgrade head

    与 products 和 orders 类似,在数据库中准备一些数据有助于测试查询。为此,以下脚本从 CSV 文件批量导入产品评论。

    import_reviews.py: 从 CSV 文件导入评论

    import csv
    from datetime import datetime
    from sqlalchemy import select, delete
    from db import Session
    from models import Product, Customer, ProductReview
    
    
    def main():
        with Session() as session:
            with session.begin():
                session.execute(delete(ProductReview))
    
        with Session() as session:
            with session.begin():
                with open('reviews.csv') as f:
                    reader = csv.DictReader(f)
    
                    for row in reader:
                        c = session.scalar(select(Customer).where(
                            Customer.name == row['customer']))
                        p = session.scalar(select(Product).where(
                            Product.name == row['product']))
                        r = ProductReview(
                            customer=c,
                            product=p,
                            timestamp=datetime.strptime(row['timestamp'],
                                                        '%Y-%m-%d %H:%M:%S'),
                            rating=int(row['rating']),
                            comment=row['comment'] or None)
                        session.add(r)
    
    
    if __name__ == '__main__':
        main()

    这个导入器比之前的更简单,因为它只向单个表——即由 ProductReview 模型表示的连接表——插入数据。

    对于此脚本,没有使用 write_only 关系的 add() 方法,而是直接将 customer 和 product 实例赋值给每个新的 ProductReview 对象,效果相同。

    reviews.csv 文件必须位于项目目录下,上述脚本才能正常运行。你可以从本书的 GitHub 仓库下载此文件。

    运行下一条命令以导入所有评论:

    (venv) $ python import_orders.py

    查询

    现在可以启动 Python shell 并开始执行一些查询。首先导入所有必要的函数和类,并创建数据库会话:

    >>> from sqlalchemy import select, func
    >>> from db import Session
    >>> from models import Product, Customer, ProductReview
    >>> session = Session()

    第一个查询计算所有客户星级评分的平均值:

    >>> q = select(func.avg(ProductReview.rating))
    >>> session.scalar(q)
    3.7731384829505914

    该查询使用了 avg() 聚合函数,用于计算所有输入结果的平均值。请注意,根据所使用的数据库,结果可能以 Decimal 对象形式返回,而不是普通数字。

    更有用的是计算某个产品的平均评分。下一个查询获取 ZX Spectrum 家用电脑的评论评分:

    >>> p = session.scalar(
            select(Product)
                .where(Product.name == 'ZX Spectrum'))
    >>> q = (select(func.avg(ProductReview.rating))
                .where(ProductReview.product == p))
    >>> session.scalar(q)
    4.0

    注意在此查询中,where() 子句使用了 ProductReview.product 关系来构造条件。SQLAlchemy 的 ORM 模块会将这个高层级条件转换为更基础但等价的条件,利用 ProductReview.product_id 外键包含在 SQL 查询中。

    另一个有趣的选项是生成所有产品的平均评分报告。下一个查询实现了这一点:

    >>> product_rating = func.avg(ProductReview.rating).label(None)
    >>> q = (select(Product, product_rating)
                .join(Product.reviews)
                .group_by(Product)
                .order_by(product_rating.desc(), Product.name))
    >>> session.execute(q).all()
    [(Product(19, "Apple IIc Plus"), 5.0), ..., (Product(138, "Timex Sinclair 1000"), 1.0)]

    该查询使用了 avg() 函数并结合分组,因此现在平均值是针对各个分组而非整个结果集计算的。Product 模型与 ProductReview 关联,因此可以在查询的 select() 部分以及 order_by() 子句中同时请求这两个模型的结果,并按评分从高到低排序,再按产品名称字母顺序排列。

    如你所记,ProductReview 模型中的 comment 列是可选的。接下来的脚本会生成一个产品列表,显示每款产品评论中没有书面评论的比例。

    >>> no_comment_percent = (
            100 - 100 * func.count(ProductReview.comment) / func.count(ProductReview.rating)
        ).label(None)
    >>> q = (select(Product.name, no_comment_percent)
                .join(ProductReview.product)
                .group_by(Product)
                .order_by(no_comment_percent.desc(), Product.name))
    >>> session.execute(q).all()
    [('464 Plus', 100.0), ('Acorn Atom', 100.0), ..., ('ZX81', 0.0)]

    no_comment_percent 标记表达式通过巧妙的方法计算空白评论的百分比。对 ProductReview.comment 列应用 count() 函数将返回每组中非 NULL 的列数,而对 ProductReview.rating 列应用相同函数则返回每个产品组中的总评论数,因为评分是必填项,每条记录都会有值。要计算空白评论的百分比,先计算有评论的评论占比为 100.0 * comments / total,然后从 100 中减去这个数值即可得到空白评论的百分比。SQLAlchemy 会将这些列上的乘法、除法和减法操作转换为 SQL 查询,由数据库执行实际计算。

    该查询选择产品名称和计算出的百分比。它按产品分组行,以便聚合表达式分别为每个产品单独计算百分比。最终结果首先按百分比降序排列,然后按产品名称排序。

    练习题

    是时候自己练习了。编写以下查询:

  • 销售额超过 $300 的订单,按销售金额从高到低降序排列。
  • 包含一台或多台 ZX81 计算机的订单。
  • 包含 Amstrad 制造产品的订单。
  • 在 2022 年 12 月 25 日下单且包含两个或更多条目的订单。
  • 客户及其首次和最后一次下单的日期和时间。提示:min() 和 max() 函数可以帮助完成此查询。
  • 销售额最高的 5 家制造商,按销售额降序排列。
  • 产品及其平均星级评分和评论数量,按评论数量降序排列。
  • 产品及平均星级评分,但仅统计包含书面评论的评论。
  • Commodore 64 电脑在 2022 年各个月份的平均星级评分。
  • 客户所给产品的最低和最高星级评分,按客户姓名字母顺序排列。
  • 制造商及其平均星级评分,按评分从高到低降序排列。
  • 产品国家及其平均星级评分,按评分从高到低降序排列。
  • 感谢您访问我的博客!如果您喜欢这篇文章,请考虑支持我的工作,并通过 Buy me a coffee 捐赠一杯咖啡来保持我精力充沛。谢谢!

    需要完整排版与评论请前往来源站点阅读。