SQLAlchemy 2 实战:异步 SQLAlchemy 应用SQLAlchemy 2 In Practice - Chapter 7: Asynchronous SQLAlchemy
自 SQLAlchemy 1.4 起支持 asyncio 异步编程,本章节深入讲解如何在实践中使用异步 SQLAlchemy。内容包括异步会话管理、协程化查询执行以及与 FastAPI 等异步框架的集成方式。通过真实代码示例,帮助开发者掌握高性能数据库交互模式。
Miguel Grinberg
这是《SQLAlchemy 2 in Practice》的第七章。如果你想支持我的工作,欢迎直接在我的商店或亚马逊购买本书。谢谢!
自 1.4 版本起,SQLAlchemy 开始通过 asyncio 包为 Core 和 ORM 模块提供异步编程支持。这是一项令人兴奋的改进,它将 SQLAlchemy 的强大功能引入现代应用,例如使用 FastAPI 框架编写的应用。
为方便查阅,以下是本书目录摘要:
本章将通过将前几章的工作全部迁移到异步模型,来讲解 SQLAlchemy 中异步支持的实现方式。
异步有何不同?
异步编程范式在应用的执行模型上带来了一些差异。
使异步编程变得困难的一个方面是“函数着色”现象,这一概念生动地描述了 Python 及其他语言在同步与异步代码混合使用方面的局限性。
简而言之,函数着色意味着异步应用应避免长时间运行的同步函数,因为这些函数会阻塞并阻碍并发。由于这一限制,使用 SQLAlchemy 的 Web 应用必须在所有层级都采用异步代码,从上到下可能包括:
虽然显然异步 Web 应用需要同样是异步的服务器和框架,但要求还延伸至底层。这意味着任何使用 SQLAlchemy 进行异步操作的函数本身也必须是异步函数,同时会话和引擎对象必须替换为其异步等效形式。最后,数据库驱动也必须设计为支持异步操作。
另一个重要区别与隐式数据库活动有关。SQLAlchemy ORM 是一个高级数据库框架,有时会自行决定发出数据库查询。最典型的例子是配置了默认延迟加载的关系属性,它们在首次访问时隐式执行数据库查询以获取结果。
由于异步模型的函数着色限制,这些隐式行为无法存在于异步应用中。所有异步数据库操作必须发生在异步函数内部。
异步数据库驱动
从栈底开始,要在异步应用中使用 SQLAlchemy,必须使用兼容的数据库驱动。本书前面提到的常规数据库驱动均无法用于异步场景。
以下章节将讨论三种主流开源数据库的可选方案。如果你使用的不是这些数据库,可以在 SQLAlchemy 文档中找到对应数据库的方言,并查找其异步驱动选项。
SQLite
Python 解释器自带的 sqlite 模块不支持异步模型。要在异步代码中使用 SQLite,SQLAlchemy 支持第三方包 aiosqlite,需按如下方式安装到虚拟环境中:
(venv) $ pip install aiosqlite提供给 SQLAlchemy 的数据库连接 URL 需要修改以反映该驱动的使用。如前所述,数据库 URL 在 scheme 部分通过 + 号分隔指定方言和驱动。以下是使用 aiosqlite 驱动的示例 URL:
DATABASE_URL=sqlite+aiosqlite:///retrofun.sqliteMySQL
在使用 MySQL 或 MariaDB 时,SQLAlchemy 2.0 支持两个异步驱动:aiomysql 和 asyncmy。
必须将所选驱动包安装到虚拟环境中。例如,安装 aiomysql 的方法如下:
(venv) $ pip install aiomysql然后数据库连接 URL 的方言部分必须更改为反映当前使用的驱动。示例如下:
DATABASE_URL=mysql+aiomysql://retrofun:my-password@localhost:3306/retrofunPostgreSQL
对于 PostgreSQL,目前 asyncpg 是唯一的异步驱动选项。
与其他数据库类似,该驱动包也需要安装:
(venv) $ pip install asyncpg数据库连接 URL 的方言部分必须包含 asyncpg。例如:
DATABASE_URL=postgresql+asyncpg://retrofun:my-password@localhost:5432/retrofun引擎、元数据和会话
SQLAlchemy 自带一个异步扩展,提供替代的 engine 和 session 对象。这些对象与你在前面章节中使用的常规对象具有相同的接口,但其方法是可 await 的。
以下是一个适用于异步应用的 db.py 版本。如果你打算在计算机上运行此代码,可以为异步代码创建一个新的项目目录,以便保留之前章节中的代码,以防你需要参考。
db.py:异步引擎、元数据和会话
import os
from dotenv import load_dotenv
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
class Model(DeclarativeBase):
metadata = MetaData(naming_convention={
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
})
load_dotenv()
engine = create_async_engine(os.environ['DATABASE_URL'])
Session = async_sessionmaker(engine, expire_on_commit=False)如你所见,与同步版本相比,差异并不大。此版本使用 create_async_engine() 而不是 create_engine(),并使用 async_sessionmaker 代替 sessionmaker。
唯一的其他区别是为会话配置了 expire_on_commit=False 选项。这禁用了 SQLAlchemy 的默认行为——在会话提交后将模型标记为过期。被标记为过期的模型在再次访问其属性时会隐式地从数据库查询刷新。由于异步应用中无法进行隐式数据库操作,因此不应使用过期的对象。expire_on_commit=False 选项确保不会因提交而将任何模型标记为过期。
不过,不使用过期模型的缺点是:当使用多次提交的长时间会话时,系统会认为模型始终是最新的,永远不会从数据库刷新。如果数据库可能被不同进程修改,这会导致长时间会话中的模型变得陈旧。为避免此问题,应用可使用较短的会话,或手动将对象从会话中移除并重新加载以确保其新鲜度。session.expunge() 和 session.expunge_all() 方法可用于根据需要从会话中移除模型,session.refresh() 可用于显式从数据库更新对象。
异步支持的一个有趣之处在于,MetaData 实例没有对应的异步版本。这一点在使用 create_all() 和 drop_all() 函数时尤为重要,因为这些函数没有可 await 的版本。SQLAlchemy 提供了一个 run_sync() 方法,可用于运行此类同步数据库代码(如这些函数),并通过 await 等待其执行,如下所示:
async with engine.begin() as connection:
await connection.run_sync(Model.metadata.drop_all)
await connection.run_sync(Model.metadata.create_all)关系加载器
对于大多数模型定义而言,异步应用程序无需更改。但需要仔细检查的是关系加载器的配置。
你已经看到,模型类中的许多 relationship() 属性使用了延迟加载机制——即首次访问属性时才从数据库查询关系数据。你还了解到,lazy 参数、options() 查询子句以及 WriteOnlyMapped 类型提示均可用于改变此行为。默认的 lazy 行为(映射为 lazy='select' 或 options(lazyload(...)))与异步应用程序不兼容,因此必须更改为具有更可预测行为的加载器。
那么应该使用哪种替代方案?以下是按数据库访问时机分类的所有可用加载器的表格,供参考:
延迟加载器已被排除,因此你为每条关系对象所做的选择主要是:它应在父模型加载时 eager 加载,还是仅在必要时显式加载。一旦确定哪种方式最合理,就可以根据所选方法提供的不同选项进行配置。部分关系已从默认的 lazy 更改为 write_only,由于该加载器从不发起隐式数据库查询,因此无需再修改。
一个安全的选择是将所有延迟加载的关系更改为 lazy='raise',这样当 SQLAlchemy 尝试进行延迟加载时会抛出错误。通过这种方式,应用可以在需要加载关系时,在 options() 子句中显式指定一种 eager 加载器作为覆盖。
另一种选择是为所有关系选择合适的加载器,以避免任何可能的延迟加载。这正是 RetroFun 数据库中所有关系将被修改的方式,以消除隐式数据库查询:
以下代码块展示了需要对关系所做的全部更新。models.py 中只需进行这些更改即可。如果你打算尝试异步解决方案,请将前一章中的 models.py 复制到正在构建异步项目的目录中,并按如下方式编辑关系。
models.py:适配异步的关系
# ...
class Product(Model):
# ...
manufacturer: Mapped['Manufacturer'] = relationship(
lazy='joined', innerjoin=True, back_populates='products')
countries: Mapped[list['Country']] = relationship(
lazy='selectin', secondary=ProductCountry, back_populates='products')
order_items: WriteOnlyMapped['OrderItem'] = relationship(
back_populates='product')
product_reviews: WriteOnlyMapped['ProductReview'] = relationship(
back_populates='product')
blog_articles: WriteOnlyMapped['BlogArticle'] = relationship(
back_populates='product')
# ...
class Manufacturer(Model):
# ...
products: Mapped[list['Product']] = relationship(
lazy='selectin', cascade='all, delete-orphan',
back_populates='manufacturer')
# ...
class Country(Model):
# ...
products: Mapped[list['Product']] = relationship(
lazy='selectin', secondary=ProductCountry,
back_populates='countries')
# ...
class Order(Model):
# ...
customer: Mapped['Customer'] = relationship(
lazy='joined', innerjoin=True, back_populates='orders')
order_items: Mapped[list['OrderItem']] = relationship(
lazy='selectin', back_populates='order')
# ...
class Customer(Model):
# ...
orders: WriteOnlyMapped['Order'] = relationship(back_populates='customer')
product_reviews: WriteOnlyMapped['ProductReview'] = relationship(
back_populates='customer')
blog_users: WriteOnlyMapped['BlogUser'] = relationship(
back_populates='customer')
# ...
class OrderItem(Model):
# ...
product: Mapped['Product'] = relationship(
lazy='joined', innerjoin=True, back_populates='order_items')
order: Mapped['Order'] = relationship(
lazy='joined', innerjoin=True, back_populates='order_items')
# ...
class ProductReview(Model):
# ...
product: Mapped['Product'] = relationship(
lazy='joined', innerjoin=True, back_populates='product_reviews')
customer: Mapped['Customer'] = relationship(
lazy='joined', innerjoin=True, back_populates='product_reviews')
# ...
class BlogArticle(Model):
# ...
author: Mapped['BlogAuthor'] = relationship(
lazy='joined', innerjoin=True, back_populates='articles')
product: Mapped[Optional['Product']] = relationship(
lazy='joined', back_populates='blog_articles')
views: WriteOnlyMapped['BlogView'] = relationship(back_populates='article')
language: Mapped[Optional['Language']] = relationship(
lazy='joined', back_populates='blog_articles')
translation_of: Mapped[Optional['BlogArticle']] = relationship(
lazy='joined', remote_side=id, back_populates='translations')
translations: Mapped[list['BlogArticle']] = relationship(
lazy='selectin', back_populates='translation_of')
# ...
class BlogAuthor(Model):
# ...
articles: WriteOnlyMapped['BlogArticle'] = relationship(
back_populates='author')
# ...
class BlogUser(Model):
# ...
customer: Mapped[Optional['Customer']] = relationship(
lazy='joined', back_populates='blog_users')
sessions: WriteOnlyMapped['BlogSession'] = relationship(
back_populates='user')
# ...
class BlogSession(Model):
# ...
user: Mapped['BlogUser'] = relationship(
lazy='joined', innerjoin=True, back_populates='sessions')
views: WriteOnlyMapped['BlogView'] = relationship(back_populates='session')
# ...
class BlogView(Model):
# ...
article: Mapped['BlogArticle'] = relationship(
lazy='joined', innerjoin=True, back_populates='views')
session: Mapped['BlogSession'] = relationship(
lazy='joined', innerjoin=True, back_populates='views')
# ...
class Language(Model):
# ...
blog_articles: WriteOnlyMapped['BlogArticle'] = relationship(
back_populates='language')
# ...Alembic 配置
切换到异步编程模型时,数据库迁移是另一个需要做出最小改动的领域。Alembic 使用模板的概念来生成通过 init 命令创建的迁移仓库内容,特别是 env.py 和 alembic.ini 文件。你之前在 RetroFun 数据库中使用的默认 Alembic 模板假设你的数据库引擎和驱动是同步的。
Alembic 自带一个异步模板,可用于初始化迁移仓库。下面的命令会使用该模板创建仓库。如果你想尝试此命令,请确保 db.py 和 models.py 的异步版本位于一个尚未创建迁移仓库的独立目录中。
(venv) $ alembic init -t async migrations基于该异步模板生成的 migrations 子目录中的 env.py 文件将与默认模板生成的文件有若干细微差异。
与之前一样,你需要编辑此文件,以便 Alembic 了解项目的数据库。所做的改动与同步版本类似。首先,在文件顶部添加导入语句:
migrations/env.py: Alembic 导入
from db import Model, engine
import models然后找到初始化 target_metadata 变量的那一行,并将其替换为以下代码:
migrations/env.py: 将项目数据库配置到 Alembic 中
target_metadata = Model.metadata
config.set_main_option("sqlalchemy.url", engine.url.render_as_string(
hide_password=False))最后一个改动是启用批量迁移模式。如果你使用的是 SQLite,这一点尤为重要,因为该数据库本身迁移能力有限,但此选项对所有数据库都是安全的。请在 do_run_migrations() 函数中找到 context.configure() 调用,并确保其包含 render_as_batch=True 选项。
migrations/env.py: 配置批量模式
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata,
run_as_batch=True)
with context.begin_transaction():
context.run_migrations()现在 Alembic 已完全配置完成,你应该可以生成初始数据库迁移了。在执行以下命令前,请确保 .env 文件中的 DATABASE_URL 变量已配置为使用上述异步数据库驱动,并且你正在连接的是一个全新的数据库,而非之前用过的那个。
(venv) $ alembic revision --autogenerate -m "initial migration"该命令会扫描你的数据库模型,并将其与当前仍为空白的数据库进行比较,因此初始数据库迁移将包含映射到这些模型的所有表、索引和约束。
迁移脚本就绪后,即可使用它进行数据库迁移:
(venv) $ alembic upgrade head会话刷新后的隐式 I/O
经过对模型所做的上述更改,几乎所有隐式的数据库操作现在都已被禁用。但仍有一种隐式情况发生在包含新对象的会话被刷新时。
在会话上下文中,flush() 操作会将会话中累积的所有待处理变更写入底层数据库事务,使数据库知晓这些变更的存在。SQLAlchemy 通常会因会话默认启用了 autoflush 选项而自动执行 flush() 调用——在数据库查询前触发 flush(),以确保查询能包含尚未提交到数据库的会话数据。
大多数情况下,flush() 调用不会引发问题,但有一种特定情况除外。如果会话中新增了对象,而这些对象具有尚未初始化的列表式关系属性,那么在 flush 这些对象时,未初始化的关系会被标记为未加载状态,这意味着下次访问时会尝试进行懒加载操作。
这个问题比较隐蔽,因此可能难以理解它如何影响应用程序。如果你已经完成了异步兼容性的所有更新,可以在 Python 会话中轻松触发此错误以便更好地理解。使用以下命令启动一个支持异步的 Python 会话:
(venv) $ python -m asyncio与直接运行 python 的区别在于,使用该命令后可以直接在提示符中使用 await 关键字。而普通 Python 会话只能在用 async def 声明的函数内部使用 await。
下面是 flush 后出现错误的简单演示:
>>> from db import Session
>>> from models import Customer, Order
>>> session = Session()
>>> c = Customer(name='Susan') # order_items has not been initialized explicitly
>>> o = Order(customer=c)
>>> session.add(o)
>>> o.order_items # no error before flush
[]
>>> await session.flush() # flush marks the order_items relationship as unloaded
>>> o.order_items # error after flush!
Traceback ...有几种方法可以避免在会话 flush 后对关系进行懒加载:
python Session = async_sessionmaker(engine, expire_on_commit=False, autoflush=False) - 确保在会话 flush 前将所有列表式关系初始化为某个值。这样 SQLAlchemy 会在 flush 时一并处理这些关系,并保留其值。
这些解决方案各有优缺点,应根据应用需求评估哪种最合适。最后提出的方案限制最少,既能与 autoflush 启用共存,也允许使用列表语义的关系加载器。因此,该方案将被用于 RetroFun 数据库的异步版本实现。
在会话 flush 前初始化关系的最简单方法是显式进行。继续以上述示例为例,以下是创建 Order 模型实例并初始化其 Order.order_items 关系的方法:
>>> await session.rollback() # clear the errored session state from above
>>> o = Order(customer=c, order_items=[]) # order_items is given an initial value
>>> session.add(o)
>>> await session.flush()
>>> o.order_items # the initial value is preserved after the flush
[]为避免每次创建新对象时都要记得初始化关系,可以将 Model 类扩展为自动将所有基于列表的关系初始化为空列表。以下是一个实现此想法的示例。
db.py: 初始化所有列表关系
from sqlalchemy import event, inspect
# ...
@event.listens_for(Model, "init", propagate=True)
def init_relationships(tgt, arg, kw):
mapper = inspect(tgt.__class__)
for arg in mapper.relationships:
if arg.collection_class is None and arg.uselist:
continue # skip write-only and similar relationships
if arg.key not in kw:
kw.setdefault(
arg.key, None if not arg.uselist else arg.collection_class())需要在 db.py 文件的底部添加 init_relationships() 函数。该函数上添加的 @event.listens_for() 装饰器会将此函数注册为 SQLAlchemy 在 Model 初始化事件发生时调用的处理程序,这意味着每次创建新的 Model 实例时都会调用该函数。propagate=True 选项将该事件处理程序扩展到 Model 的所有子类,从而有效地将此行为包含在应用程序定义的所有模型中。
函数体使用 SQLAlchemy 的 inspect() 函数对模型类进行自省,并查找所有需要初始化的关系。
注意: 本书除了上述示例外,并未涵盖 SQLAlchemy 的事件和 inspect 功能。如果您有兴趣了解更多,可以在官方文档中找到相关内容:
Import Scripts
在通过运行查询体验异步数据库之前,需要先导入所有 CSV 数据文件,但为了做到这一点,导入脚本也必须调整为作为异步应用程序运行。
每个导入脚本的一般结构必须改为使用 asyncio。以下是脚本的结构方式:
import asyncio
async def main():
# ... import logic here
if __name__ == '__main__':
asyncio.run(main())数据库会话使用异步上下文管理器,因此 with 语句必须更改为 async with。例如:
async with Session() as session:
async with session.begin():
# ... do database work here最后,查询和提交现在以异步方式执行,因此需要等待。这意味着这些脚本中所有的 session.execute()、session.scalar() 和 session.commit() 调用都必须加上 await 前缀。
完成这些更改后,脚本将完全兼容 SQLAlchemy 中的 asyncio 支持。如果您不想从上一章复制脚本并自行调整,可以在本书的 GitHub 仓库中找到异步版本。
要导入所有数据,必须按以下顺序运行所有导入器脚本:
(venv) $ python import_products.py
(venv) $ python import_orders.py
(venv) $ python import_reviews.py
(venv) $ python import_articles.py
(venv) $ python import_views.py
(venv) $ python import_languages.pyQueries
到现在为止,您可能已经了解如何运行许多数据库查询,那么如何异步运行它们呢?好消息是查询本身构造方式与之前完全相同。由于查询 API 中没有长时间运行或阻塞的函数,因此不需要异步版本。
但是,session.execute()、session.scalars() 和 session.scalar() 函数必须等待,因为它们以异步方式运行。此外,异步会话提供了两个额外的执行方法,称为 session.stream() 和 session.stream_scalars(),如下所示。
首先运行一个全新的异步 Python shell:
(venv) $ python -m asyncio如上所述,这将使整个 shell 会话在 asyncio 循环内运行,使您能够在提示符下直接使用 await,而无需创建包装函数。
现在可以导入所有需要的符号并手动启动数据库会话:
>>> from sqlalchemy import select
>>> from db import Session
>>> from models import Product, Customer, Order
>>> session = Session()首先检索“Commodore 64”产品:
>>> c64 = await session.scalar(
select(Product)
.where(Product.name == 'Commodore 64'))
>>> c64
Product(41, "Commodore 64")Product 模型中的 manufacturer 和 countries 关系分别配置了 joined 和 selectin 急切加载器,因此在发出上述查询时它们会自动加载。这可以通过以下方式确认:
>>> c64.manufacturer
Manufacturer(14, "Commodore")
>>> c64.countries
[Country(3, "USA")]让我们尝试获取按字母顺序排列的最后一个客户:
>>> c = await session.scalar(
select(Customer)
.order_by(Customer.name.desc())
.limit(1))
>>> c
Customer(e084528681ab4cb7bf45413ad6c7ce45, "Zoe Bradley")Customer 模型中的所有关系都使用了 write_only 加载器。正如你在前面的章节中看到的,要获取关系中的项,关系属性返回的选择查询必须手动执行。下面的示例从该客户获取最后两个订单:
>>> r = await session.scalars(
c.orders.select()
.order_by(Order.timestamp.desc())
.limit(2))
>>> r.all()
[Order(eaf9c1386a514c9781bdd849f7e99787), Order(db2c90dcc4ae4072b12a58496f47f5cf)]这里,获取订单的查询由 Customer.orders 关系属性的 select() 方法返回。由于这是一个查询对象,可以在会话中执行之前使用额外的子句对其进行扩展,从而在访问关系时获得最大的灵活性,尤其是在这些关系可能包含大量元素的情况下。
流式结果
如你所见,execute() 和 scalars() 方法返回一个标准的、非异步的 Python 可迭代对象,在使用异步会话时也是如此。
使用标准 Python 时,结果对象非常高效,因为它一次只从数据库加载一项。然而,在使用异步会话时,SQLAlchemy 被迫在返回结果之前从数据库检索整个结果列表,因为在标准 Python 可迭代对象内部无法进行异步操作。因此,在使用异步代码时,这些结果效率不高,尤其是在处理大型查询时。
stream() 和 stream_scalars() 方法的引入是为了在异步会话中提供与原始方法相同的高效结果迭代方式。它们的功能与原始方法类似,唯一的区别是它们返回支持 Python 异步迭代协议的结果对象的异步版本。上面的最后一个查询可以更高效地以流的形式发出:
>>> r = await session.stream_scalars(
c.orders.select()
.order_by(Order.timestamp.desc())
.limit(2))
>>> [order async for order in r]
[Order(eaf9c1386a514c9781bdd849f7e99787), Order(db2c90dcc4ae4072b12a58496f47f5cf)]你可以看到,流式结果可以在 async for 循环或列表推导式中访问。对于不需要异步迭代的场景,all() 方法也可用。
一般来说,当你预期每行有多个值时,应使用 stream() 而不是 execute();对于每行只有一个值的查询,应使用 stream_scalars() 而不是 scalars()。在异步应用程序中,标准的 scalar()、scalar_one() 和 scalar_or_none() 方法可以安全使用。
感谢您访问我的博客!如果您喜欢这篇文章,请考虑通过 Buy me a coffee 支持我的工作,并通过小额一次性捐赠让我保持咖啡因在线。谢谢!
需要完整排版与评论请前往来源站点阅读。