SQLAlchemy 2 实战第六章:构建页面流量分析系统SQLAlchemy 2 In Practice - Chapter 6: A Page Analytics Solution
本章节演示如何使用 SQLAlchemy 2 构建网页流量分析系统,涵盖数据建模、查询优化和结果可视化。通过实际案例讲解连接池配置、批量插入和聚合查询技巧,帮助读者掌握高性能数据库操作实践。
Miguel Grinberg
这是《SQLAlchemy 2 in Practice》的第六章。如果你愿意支持我的工作,欢迎直接在我的商店或亚马逊购买这本书。谢谢!
本章的目标是利用你已掌握的概念构建一个网页流量分析解决方案。这既是对前几章所演示技术的巩固,也是更复杂、更贴近现实的数据库设计示例。
供参考,以下是本书内容摘要:
第一部分:博客文章与作者
像现实世界中的许多公司一样,RetroFun 拥有一个博客,作者在此发布文章以促销产品。本节将扩展数据库,用于记录公司博客上发布的文章,以便后续追踪这些文章的网页访问流量。
接下来的代码块添加了博客文章和作者的模型定义。
models.py:博客文章与作者
class Product(Model):
# ...
blog_articles: WriteOnlyMapped['BlogArticle'] = relationship(
back_populates='product')
# ...
class BlogArticle(Model):
__tablename__ = 'blog_articles'
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(128), index=True)
author_id: Mapped[int] = mapped_column(ForeignKey('blog_authors.id'),
index=True)
product_id: Mapped[Optional[int]] = mapped_column(
ForeignKey('products.id'), index=True)
timestamp: Mapped[datetime] = mapped_column(default=datetime.utcnow,
index=True)
author: Mapped['BlogAuthor'] = relationship(back_populates='articles')
product: Mapped[Optional['Product']] = relationship(
back_populates='blog_articles')
def __repr__(self):
return f'BlogArticle({self.id}, "{self.title}")'
class BlogAuthor(Model):
__tablename__ = 'blog_authors'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(64), index=True)
articles: WriteOnlyMapped['BlogArticle'] = relationship(
back_populates='author')
def __repr__(self):
return f'BlogAuthor({self.id}, "{self.name}")'BlogArticle 模型包含标题、指向作者和产品的外键以及发布日期。BlogAuthor 模型仅存储作者姓名。这两个模型的标识列均为标准的自增整数主键。
此次改动引入了两个新的“一对多”关系。一个是作者与文章之间的关系,通过 BlogAuthor.articles 和 BlogArticle.author 属性管理;另一个是产品与文章之间的关系,通过 Product.articles 和 BlogArticle.product 管理。
BlogArticle.product_id 外键是本项目中第一个被定义为可选的字段,意味着一篇博客文章不必必须关联到某个产品。其理念是公司博客将涵盖多种类型文章——有些专门与特定产品相关(如评测),另一些则更为通用。同时,BlogArticle.product 关系对象也被标记为可选类型,因为当 product_id 未设置时,该值将为 None。
第二部分:博客会话与浏览记录
在本实现的第二个阶段中,引入博客用户与会话的概念。无论是已知客户还是匿名访客,每次访问博客的用户都将被视为“博客用户”。
假设 RetroFun 网站将通过 Cookie 或类似机制来跟踪访问者。每当用户进入博客时,系统会为该用户创建一个新的会话,并记录该次访问中所浏览的所有博客文章。下一段代码展示了 BlogUser 和 BlogSession 模型的定义。
models.py:博客用户与会话
class Customer(Model):
# ...
blog_users: WriteOnlyMapped['BlogUser'] = relationship(
back_populates='customer')
# ...
class BlogUser(Model):
__tablename__ = 'blog_users'
id: Mapped[UUID] = mapped_column(default=uuid4, primary_key=True)
customer_id: Mapped[Optional[UUID]] = mapped_column(
ForeignKey('customers.id'), index=True)
customer: Mapped[Optional['Customer']] = relationship(
back_populates='blog_users')
sessions: WriteOnlyMapped['BlogSession'] = relationship(
back_populates='user')
def __repr__(self):
return f'BlogUser({self.id.hex})'
class BlogSession(Model):
__tablename__ = 'blog_sessions'
id: Mapped[UUID] = mapped_column(default=uuid4, primary_key=True)
user_id: Mapped[UUID] = mapped_column(ForeignKey('blog_users.id'),
index=True)
user: Mapped['BlogUser'] = relationship(back_populates='sessions')
def __repr__(self):
return f'BlogSession({self.id.hex})'用户和会话均使用 UUID 作为主键,因为在 Web 应用中这些标识符可能存储在 Cookie 中而可能被访客看到。如前所述,在标识符可能被公开暴露且可能泄露底层数据表规模的情况下,不建议使用自增数字主键。
还引入了两个一对多的关系。一个是博客用户与博客会话之间的关系,另一个是客户与博客用户之间的关系。这两个关系的设定基于一个前提:RetroFun 网站能够“记住”不同访问中的用户。用户跟踪逻辑的一种可能实现方式如下:
支持客户与博客用户之间关联的 BlogUser.customer_id 外键被定义为可选字段,因为大多数博客用户并非客户。此外,客户也可能在未登录的情况下访问博客,此时将不会被识别。此方案的关键在于,RetroFun 网站应努力将博客用户与客户进行匹配,这将有助于生成更丰富的报告,稍后您将看到这一点。
可能有人还不清楚,为何要将客户与博客用户的关系设计为一对多,而非一对一。原因是,一个人在 RetroFun 数据库中未必只对应一个博客用户。例如,当匿名访客用手机打开网站,之后又在笔记本电脑上访问,系统会分别创建两个不同的博客用户。若该用户后续完成购买成为客户,则希望最终能将该客户与这两个博客用户都关联起来,以便分析其在手机和笔记本上的行为。
接下来需要建立的是博客文章与会话之间的关联,用于记录某个用户在特定会话中浏览了哪些文章——换句话说,就是记录页面浏览情况的表。每当博客用户访问一篇博客文章时,RetroFun 网站就会向该表中插入一条记录。考虑到这个新表的作用,它显然是文章与会话之间多对多关系的连接表,因为一篇文章可在多个会话中被查看,而一个会话中也可查看多篇文章。
models.py: Blog views
class BlogArticle(Model):
# ...
views: WriteOnlyMapped['BlogView'] = relationship(back_populates='article')
# ...
class BlogSession(Model):
# ...
views: WriteOnlyMapped['BlogView'] = relationship(back_populates='session')
# ...
class BlogView(Model):
__tablename__ = 'blog_views'
id: Mapped[int] = mapped_column(primary_key=True)
article_id: Mapped[int] = mapped_column(ForeignKey('blog_articles.id'))
session_id: Mapped[UUID] = mapped_column(ForeignKey('blog_sessions.id'))
timestamp: Mapped[datetime] = mapped_column(default=datetime.utcnow,
index=True)
article: Mapped['BlogArticle'] = relationship(back_populates='views')
session: Mapped['BlogSession'] = relationship(back_populates='views')对于这种关系,记录页面访问的时间戳非常有用,因此必须使用高级的关联对象(association object)方式来实现。
这种关系带来了一个在前面的多对多关系中未曾遇到的问题:用户可能在同一会话中多次查看同一篇文章,例如在浏览器中刷新页面。这意味着可能存在多个 BlogView 记录,它们具有相同的文章和会话外键。
如你所记,多对多关系通常将两个外键组合作为连接表的复合主键,以防止重复记录。防止重复在某些情况下很有用,但对于这种特定关系,应允许重复,以便准确统计所有页面浏览量。所做的调整是使用标准的自增数字 ID 作为主键,而不是由两个外键组成的复合主键。
为了帮助你跟踪当前的数据库结构,这里有一个图表展示了截至目前的所有表及其关系。
信不信由你,仅此而已就可以实现一个基本页面分析功能的存储方案。现在该迁移数据库,以记录并应用所有这些更改:
(venv) $ alembic revision --autogenerate -m "blog integration"
(venv) $ alembic upgrade head导入脚本
与之前所有表一样,添加一些数据有助于进行查询实验。以下是一个从 CSV 文件导入文章和作者的脚本:
import_articles.py: 文章导入脚本
import csv
from datetime import datetime
from sqlalchemy import select, delete
from db import Session
from models import BlogArticle, BlogAuthor, Product, BlogView, BlogSession, \
BlogUser
def main():
with Session() as session:
with session.begin():
session.execute(delete(BlogView))
session.execute(delete(BlogSession))
session.execute(delete(BlogUser))
session.execute(delete(BlogArticle))
session.execute(delete(BlogAuthor))
with Session() as session:
with session.begin():
all_authors = {}
all_products = {}
with open('articles.csv') as f:
reader = csv.DictReader(f)
for row in reader:
author = all_authors.get(row['author'])
if author is None:
author = BlogAuthor(name=row['author'])
all_authors[author.name] = author
product = None
if row['product']:
product = all_products.get(row['product'])
if product is None:
product = session.scalar(select(Product).where(
Product.name == row['product']))
all_products[product.name] = product
article = BlogArticle(
title=row['title'],
author=author,
product=product,
timestamp=datetime.strptime(
row['timestamp'], '%Y-%m-%d %H:%M:%S'
),
)
session.add(article)
if __name__ == '__main__':
main()该脚本使用了之前导入器中相同的技巧来在数据库中创建 BlogArticle 和 BlogAuthor 条目,因此应该是自解释的。
脚本中引用的 articles.csv 文件必须复制到项目目录。你可以从本书的 GitHub 仓库下载此文件。请注意,该数据文件中使用的文章标题和作者姓名是由假数据生成器创建的,因此并非真实内容。
运行以下命令执行脚本并将文章和作者导入数据库:
(venv) $ python import_articles.py下一个脚本导入页面浏览记录,以及博客用户和会话。以下是其代码:
import_views.py: 博客页面浏览记录导入脚本
import csv
from datetime import datetime
from uuid import UUID
from sqlalchemy import select, delete
from db import Session
from models import BlogArticle, BlogUser, BlogView, BlogSession, Customer
def main():
with Session() as session:
with session.begin():
session.execute(delete(BlogView))
session.execute(delete(BlogSession))
session.execute(delete(BlogUser))
with Session() as session:
all_articles = {}
all_customers = {}
all_blog_users = {}
all_blog_sessions = {}
with open('views.csv') as f:
reader = csv.DictReader(f)
i = 0
for row in reader:
user = all_blog_users.get(row['user'])
if user is None:
customer = None
if row['customer']:
customer = all_customers.get(row['customer'])
if customer is None:
customer = session.scalar(select(Customer).where(
Customer.name == row['customer']))
all_customers[customer.name] = customer
user_id = UUID(row['user'])
user = BlogUser(id=user_id, customer=customer)
session.add(user)
all_blog_users[row['user']] = user
blog_session = all_blog_sessions.get(row['session'])
if blog_session is None:
session_id = UUID(row['session'])
blog_session = BlogSession(id=session_id, user=user)
session.add(blog_session)
all_blog_sessions[row['session']] = blog_session
article = all_articles.get(row['title'])
if article is None:
article = session.scalar(select(BlogArticle).where(
BlogArticle.title == row['title']))
all_articles[article.title] = article
view = BlogView(
article=article,
session=blog_session,
timestamp=datetime.strptime(
row['timestamp'], '%Y-%m-%d %H:%M:%S'),
)
session.add(view)
i += 1
if i % 100 == 0:
print(i)
session.commit()
print(i)
session.commit()
if __name__ == '__main__':
main()该导入器使用数据库会话的方式与前几个不同。之所以改变,是因为页面浏览表可能包含的数据量远大于其他表。为反映这一现实,示例数据的 CSV 文件要大得多,如果将所有导入数据累积到一个会话并在最后提交,将非常不切实际。
因此,该导入器不依赖 session.begin() 上下文管理器在结束时提交,而是统计已导入的行数,并每 100 行显式提交一次。实现此逻辑使用了计数器。以下是与此功能相关的具体代码(其余部分省略):
i = 0
for row in reader:
# ... import the row
i += 1
if i % 100 == 0:
print(i)
session.commit()
print(i)
session.commit()print(i) 语句会在终端输出 100、200 等,以显示进度。底部的第二个打印和提交确保在循环退出前最后一批已导入的行也能被保存。
从本书的 GitHub 仓库下载 views.csv 文件。如上所述,这是一个相当大的数据文件(约 19MB),因此根据你的网络连接速度,下载可能需要一些时间。
按如下方式运行导入器:
(venv) $ python import_views.py一旦导入过程开始,你将看到终端中不断输出 100 的倍数。由于这是一个较大的数据文件,脚本需要几分钟才能处理完整个 CSV 文件,该文件约有 138,000 行。
页面分析查询
有了数据库中的所有这些新信息,可以进行许多非常有趣的查询。打开一个新的 Python 会话,导入所有必需的依赖项,并创建一个新数据库会话:
>>> from datetime import datetime
>>> from sqlalchemy import select, func
>>> from db import Session
>>> from models import BlogArticle, BlogView, Product
>>> session = Session()首先从最显而易见的问题说起,你可能想知道在特定时间段内总共浏览了多少页。接下来的查询计算的是2022年11月的总页面浏览量:
>>> q = (select(func.count(BlogView.id))
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1))))
>>> session.scalar(q)
4034随后的查询展示了博客文章从最多到最少浏览量的排名,同样针对2022年11月:
>>> page_views = func.count(BlogView.id).label(None)
>>> q = (select(BlogArticle.title, page_views)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(BlogArticle)
.order_by(page_views.desc(), BlogArticle.title))
>>> session.execute(q).all()
[('Boy itself fish traditional', 57), ..., , ('Still defense foreign social', 1)]如果你将这个查询报告的所有浏览量相加,总数将达到4034次,这完全合理,因为该查询与前一个查询获取的正是相同的页面浏览量,只是组织方式不同而已。
如你所记,BlogArticle 可以通过与产品的关系(BlogArticle.product)关联到某个特定产品。这使得可以跨关系导航,并为每个产品生成与其相关的页面浏览量报告:
>>> page_views = func.count(BlogView.id).label(None)
>>> q = (select(Product.name, page_views)
.join(Product.blog_articles)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(Product)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[('ZX Spectrum', 1096), ('Commodore 64', 1056), ('Apple II', 349),
('TRS-80 Color Computer', 349), ('Amiga', 301), ('BBC Micro', 180), ('TI-99/4A', 133),
('Commodore 128', 77)]现在这是一个有趣的查询。如果将所有页面浏览量相加,总数是3541,而不是之前查询中的4034。你能猜出原因吗?
BlogArticle.product_id 外键被配置为可选列(或在数据库术语中称为“可为空”)。没有产品关联的文章的页面浏览量不会包含在此报告中,因为 join(Product.blog_articles) 子句将 Product 实例与 product 字段匹配的 BlogArticle 实例配对。product_id(以及相应的 product 关系)设置为 None 的 BlogArticle 实例在 Product 侧无法匹配任何内容,因此会被 join 操作省略。
仅包含两个表中匹配行的连接被称为内连接(inner join)。这是 SQLAlchemy 的 join() 方法默认使用的连接类型,也是本书迄今为止唯一使用的类型。但内连接并非唯一可用的连接类型。
另一种连接两个表的方式是使用外连接(outer join),它还会包含关系两侧不匹配的实体。外连接有三种类型:
这到底意味着什么?博客文章和产品之间的全外连接结果将合并三种不同类型的记录:
注意: 遗憾的是,外连接的支持在不同数据库之间并不统一。
SQLite 支持所有类型的外连接,但全外连接和右外连接是在2022年发布的3.39.0版本中新增的,目前并未广泛部署。如果使用旧版本尝试执行这些类型的连接,将会返回错误。
MySQL 支持左外连接和右外连接,但截至2023年4月仍不支持全外连接。
PostgreSQL 支持所有类型的外连接。
SQLAlchemy 仅实现了全外连接和左外连接。如果需要右外连接,必须交换表的顺序,以便使用左外连接来实现。
将上一个查询中的第一个连接改为全外连接,可确保检索到所有博客文章(在该连接中位于右侧),而不仅仅是那些能与产品匹配的文章。这样,后续对 BlogView 的连接就不会丢失任何页面浏览记录。只需将默认的内连接转换为全外连接,方法是在 join() 子句中添加 full=True 参数即可:
>>> q = (select(Product.name, page_views)
.join(Product.blog_articles, full=True)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(Product)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[('ZX Spectrum', 1096), ('Commodore 64', 1056), (None, 493), ('Apple II', 349),
('TRS-80 Color Computer', 349), ('Amiga', 301), ('BBC Micro', 180), ('TI-99/4A', 133),
('Commodore 128', 77)]注意,这些新结果中包含一个产品字段为 None 的条目,其中包含了之前报告中遗漏的 493 次页面浏览。
该查询也可以使用右外连接实现,甚至可能更高效,但如前所述,SQLAlchemy 目前不支持这种连接类型。还需注意的是,并非所有数据库都实现了全外连接,因此根据所选数据库的不同,上述查询可能会因数据库错误而失败。特别是,该查询在 MySQL 或旧版本的 SQLite 中无法运行。
当全外连接不可用时该怎么办?幸运的是,此查询并不需要全外连接,只需要右外连接即可。诀窍是反转连接方向,然后使用左外连接,后者可通过在 join() 子句中添加 isouter=True 参数来生成。以下是具体做法:
>>> q = (select(Product.name, page_views)
.join(BlogArticle.product, isouter=True)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(Product)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[('ZX Spectrum', 1096), ('Commodore 64', 1056), (None, 493), ('Apple II', 349),
('TRS-80 Color Computer', 349), ('Amiga', 301), ('BBC Micro', 180), ('TI-99/4A', 133),
('Commodore 128', 77)]在此版本查询中,第一个连接从 Product.blog_articles 关系切换到了 BlogArticle.product。这两个属性分别代表了产品和博客文章之间的双向关联,因此这一更改实际上反转了连接方向,将博客文章置于左侧、产品置于右侧,从而可以使用左外连接检索没有对应产品的博客文章。
有趣的是,上面使用全外连接和右外连接的查询返回的结果完全一致。按理说,使用全外连接时结果中应包含更多数据,对吧?结果显示了与产品关联的博客文章的浏览量,以及未关联产品的博客文章的浏览量。但在使用全外连接时,结果还应包含所有没有对应博客文章的产品,这些产品应显示为零浏览量。为什么这些数据缺失了?
要理解这一点,必须回顾该查询全外连接版本的其他部分。在全外连接执行后,还有一个与 BlogView 实体的连接,这是一个默认的内连接。这个第二次连接利用 BlogArticle.views 关系作为匹配列,将全外连接产生的(产品,文章)配对与 BlogView 记录进行匹配。由于这是内连接,第一次连接返回的所有(Product, None)配对都会被丢弃,因为配对中的文章部分为 None,永远无法与 BlogView 侧的任何内容匹配。
如果目标是保留那些没有博客浏览记录的产品,则第二次连接也必须升级为全外连接,这样才能确保结果中保留所有(Product, BlogArticle, BlogView)三元组——其中文章和浏览记录均为 None。
但如果将第二个连接改为全外连接,部分结果中 BlogView 实体对应的值将为 None。此查询中的 where() 子句使用了 BlogView.timestamp,因此必须更新以允许不仅包含关注时间段内的页面浏览量,还包含那些为 None 的页面浏览量。这可以通过 or_() 函数实现。以下是最终查询:
>>> from sqlalchemy import or_
>>> q = (select(Product.name, page_views)
.join(Product.blog_articles, full=True)
.join(BlogArticle.views, full=True)
.where(or_(
BlogView.timestamp == None,
BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1))))
.group_by(Product)
.order_by(page_views.desc(), Product.name))
>>> session.execute(q).all()
[('ZX Spectrum', 1096), ('Commodore 64', 1056), (None, 493), ..., ('ZX80', 0), ('ZX81', 0)]该查询返回了所有产品的报告,包括指定时间段内的页面浏览量,涵盖未与产品关联的通用文章以及没有相关文章或没有博客浏览量的产品。当然,这个完整版本需要使用全外连接,而某些数据库并不支持该操作。
仅使用左外连接生成最后一个查询较为困难,因为无论连接方式如何,总有一侧未匹配的实体无法返回结果。模拟全外连接的常见方法是运行两个查询而非一个。上述使用的左外连接查询可用于获取带有页面浏览量的产品列表,以及未分配产品的文章的页面浏览量。然后可以使用第二个查询获取没有任何博客浏览量的产品,原因可能是这些产品在博客中没有内容,或者其内容尚未被任何人浏览。以下是检索这些数据的查询:
>>> q2 = (select(Product.name, page_views)
.join(Product.blog_articles, isouter=True)
.join(BlogArticle.views, isouter=True)
.where(or_(
BlogView.timestamp == None,
BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1))))
.group_by(Product)
.having(page_views == 0)
.order_by(Product.name))
>>> session.execute(q2).all()
[('464 Plus', 0), ('6128 Plus', 0), ..., ('ZX80', 0), ('ZX81', 0)]对于此查询,产品与博客文章进行连接,生成的配对再与博客浏览记录连接。这两个连接均为左外连接,这意味着即使产品在关注时间段内没有匹配的博客文章或浏览记录,仍会保留在结果中。having() 子句会过滤掉页面浏览量非零的结果,因为这些数据已由第一个查询捕获。
现在可以在 Python 中合并这两个查询的结果;或者,如果更倾向于让数据库完成合并,也可以使用 SQLAlchemy 的 union() 函数。以下展示了如何编写两个查询 q1 和 q2,并通过 union 操作符合并为 q:
>>> from sqlalchemy import union
>>> q1 = (select(Product.name, page_views)
.join(BlogArticle.product, isouter=True)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(Product))
>>> q2 = (select(Product.name, page_views)
.join(Product.blog_articles, isouter=True)
.join(BlogArticle.views, isouter=True)
.where(or_(
BlogView.timestamp == None,
BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1))))
.group_by(Product)
.having(page_views == 0))
>>> q = union(q1, q2).order_by(page_views.desc(), Product.name)
>>> session.execute(q).all()
[('ZX Spectrum', 1096), ('Commodore 64', 1056), (None, 493), ..., ('ZX80', 0), ('ZX81', 0)]一旦开始使用外连接,你会发现许多原本使用内连接的查询可以通过升级为某种外连接类型而得到优化。例如,本章前面提到的按文章统计页面浏览量的查询并未包含任何页面浏览量为零的文章,因为博客文章与博客浏览记录之间的内连接已将这些文章排除在外。若将此查询改为使用左外连接,则可保留无浏览量的文章。以下是更新后的查询:
>>> page_views = func.count(BlogView.id).label(None)
>>> q = (select(BlogArticle.title, page_views)
.join(BlogArticle.views, isouter=True)
.where(or_(
BlogView.timestamp == None,
BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1))))
.group_by(BlogArticle)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[..., ('Prepare culture part budget star organization there', 0)]]更新后的查询显示,2022年11月只有一篇文章未获得任何页面浏览量,在结果列表底部显示浏览量为零。
为使该查询正常工作,不仅需要将连接改为左外连接,还需扩展 where() 子句以接受 BlogView 为 None 的情况,如前一个查询所示。
第三部分:多语言博客文章
像许多公司一样,RetroFun 希望拓展国际市场。为此,他们除了英语外还会创作其他语言的原创博客内容,同时也有专门团队负责将成功的英文博客文章翻译成其他语言。
为了使网页分析项目更加实用,在本项目的第三也是最后阶段,你将学习如何扩展数据库,以跟踪每篇文章所使用的语言,以及哪些文章是翻译自其他文章而非原创内容。这将带来许多从数据中提取的有趣报告。
第一个改动是添加一个 Language 模型,以及在语言和博客文章之间建立一对多关系。
models.py:博客文章的语言
class BlogArticle(Model):
# ...
language_id: Mapped[Optional[int]] = mapped_column(
ForeignKey('languages.id'), index=True)
# ...
language: Mapped[Optional['Language']] = relationship(
back_populates='blog_articles')
# ...
class Language(Model):
__tablename__ = 'languages'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(32), index=True, unique=True)
blog_articles: WriteOnlyMapped['BlogArticle'] = relationship(
back_populates='language')
def __repr__(self):
return f'Language({self.id}, "{self.name}")'这种新的一对多关系与之前的关系非常相似,因此不会带来任何挑战。
然而,追踪原始内容与译文之间的关系似乎有些棘手。哪种关系类型适用于此用例?一篇原始文章可以有多个译文,而每个译文只能有一个原始来源。这表明这应该是一对多的关系。
关系的两个实体是什么?很明显,关系中的“一”端将是原始文章,即 BlogArticle 模型的实例。那么“多”端呢?它们也是文章,对吧?这是本项目中第一次在关系的两边都使用同一实体——此处为 BlogArticle。这被称为自引用关系。
在使用 SQLAlchemy 时,自引用关系需要一些额外的配置。以下是向 BlogArticle 模型添加该关系的更改。
models.py:博客文章的译文
class BlogArticle(Model):
# ...
translation_of_id: Mapped[Optional[int]] = mapped_column(
ForeignKey('blog_articles.id'), index=True)
# ...
translation_of: Mapped[Optional['BlogArticle']] = relationship(
remote_side=id, back_populates='translations')
translations: Mapped[list['BlogArticle']] = relationship(
back_populates='translation_of')
# ...translation_of_id 外键的定义方式与其他外键相同,唯一的区别是它引用的是同一表中的主键。
现在需要将表示关系两端的两个 relationship 属性添加到同一个模型类中,这需要格外小心。SQLAlchemy 难以自动区分自引用关系中哪一个是“一”端,因此在引用“一”端的 relationship() 定义中添加 remote_side 参数以消除歧义。在此例中,translation_of 关系的 remote_side 参数被设置为 id 主键,这足以让 SQLAlchemy 理解该关系指向“一”端,因此另一个关系即为包含“多”端的列表。
下图展示了在向 blog_articles 表添加新的 translation_of_id 外键列后的数据库表结构图。这是 RetroFun 数据库的最终版本。
这些更改现在可以纳入数据库迁移:
(venv) $ alembic revision --autogenerate -m "multi-language support"
(venv) $ alembic upgrade head之前用于导入文章和作者的 articles.csv 数据文件已经包含了 language 和 translation_of 列,但 import_articles.py 脚本忽略了这些信息。借助新的多语言支持,可以对该文件进行第二次遍历,以导入这些额外信息。下面展示的 import_languages.py 脚本正是这样做的。
import_languages.py:导入语言及翻译关系
import csv
from sqlalchemy import select
from db import Session
from models import BlogArticle, Language
def main():
with Session() as session:
with session.begin():
all_articles = {}
all_languages = {}
with open('articles.csv') as f:
reader = csv.DictReader(f)
for row in reader:
article = all_articles.get(row['title'])
if article is None:
article = session.scalar(select(BlogArticle).where(
BlogArticle.title == row['title']))
all_articles[article.title] = article
language = all_languages.get(row['language'])
if language is None:
language = session.scalar(select(Language).where(
Language.name == row['language']))
if language is None:
language = Language(name=row['language'])
session.add(language)
all_languages[language.name] = language
article.language = language
if row['translation_of']:
translation_of = all_articles.get(
row['translation_of'])
if translation_of is None:
translation_of = session.scalar(select(
BlogArticle).where(BlogArticle.title ==
row['translation_of']))
all_articles[article.title] = article
article.translation_of = translation_of
if __name__ == '__main__':
main()该脚本读取 articles.csv 文件的行,仅关注文章的标题、language 和 translation_of 列。本脚本不插入任何文章,而是直接通过标题从数据库加载文章,因为假设 import_articles.py 脚本已经导入了所有文章。
对于语言支持,该脚本会将对应的 Language 实例分配给 BlogArticle.language 属性,并在首次出现某种语言时创建新的 Language 实例。与之前类似,all_languages 字典会缓存目前已添加的所有语言,方便使用。
CSV 文件中的 translation_of 列对原文为空,因此脚本首先检查该列是否有值。若存在值,则表示文章已被翻译,该字段的值即为原文的标题。随后,翻译文章会通过 translation_of 自引用关系关联到其原文。
运行该脚本来建立语言和翻译之间的关系:
(venv) $ python import_languages.py语言查询
语言支持为可生成的查询增加了新的维度。打开一个新的 Python 会话并导入实验所需的常用组件:
>>> from datetime import datetime
>>> from sqlalchemy import select, func
>>> from models import Language, BlogArticle, BlogView
>>> from db import Session
>>> session = Session()下面是一个简单的查询,返回每种语言的文章数量:
>>> q = (select(Language, func.count(BlogArticle.id))
.join(Language.blog_articles)
.group_by(Language)
.order_by(Language.name))
>>> session.execute(q).all()
[(Language(1, "English"), 108), (Language(3, "French"), 25), (Language(2, "German"), 21),
(Language(4, "Italian"), 13), (Language(6, "Portuguese"), 25), (Language(5, "Spanish"), 17)]这些统计包含所有文章,无论是否为原文或译文。接下来的两个查询分别统计原文和译文的数量:
>>> q = (select(Language, func.count(BlogArticle.id))
.join(Language.blog_articles)
.where(BlogArticle.translation_of == None)
.group_by(Language)
.order_by(Language.name))
>>> session.execute(q).all()
[(Language(1, "English"), 108), (Language(3, "French"), 11), (Language(2, "German"), 6),
(Language(4, "Italian"), 5), (Language(6, "Portuguese"), 7), (Language(5, "Spanish"), 6)]>>> q = (select(Language, func.count(BlogArticle.id))
.join(Language.blog_articles)
.where(BlogArticle.translation_of != None)
.group_by(Language)
.order_by(Language.name))
>>> session.execute(q).all()
[(Language(3, "French"), 14), (Language(2, "German"), 15), (Language(4, "Italian"), 8),
(Language(6, "Portuguese"), 18), (Language(5, "Spanish"), 11)]这些查询利用 BlogArticle.translation_of 关系来区分原文和译文,因为原文的该属性始终设为 None。
注意: 在编写 SQLAlchemy 表达式以与 None 进行比较时,必须使用 == 和 != 运算符。许多 Python 开发者更倾向于使用 is None 或 is not None,但 SQLAlchemy 无法将这些转换为 SQL 表达式。
下一个查询较为复杂。目标是生成一份报告,列出每篇原文及其可用翻译的数量。解决方案是查询所有原文与译文配对,然后按原文分组并对译文应用计数函数。
>>> from sqlalchemy.orm import aliased
>>> TranslatedBlogArticle = aliased(BlogArticle)
>>> article_count = func.count(TranslatedBlogArticle.id).label(None)
>>> q = (select(BlogArticle, article_count)
.join(TranslatedBlogArticle.translation_of)
.group_by(BlogArticle)
.order_by(article_count.desc(), BlogArticle.title))
>>> session.execute(q).all()
[(BlogArticle(63, "Business seven ability cup church similar itself"), 3), ...,
(BlogArticle(1, "Within across act song"), 1)]此查询使用了 aliased 函数,您此前尚未见过。为了将文章与其译文配对,需要在 self-referential translation_of 关系上执行连接操作。但由于关系两侧是同一张表,会产生复杂性——当两侧的表名相同时,无法独立引用左侧或右侧。SQL 通过别名解决这种歧义。为其中一侧赋予新名称后,就可以将同一张表的两个实例视为不同对象进行操作。上面创建的 TranslatedBlogArticle 别名代表从译文视角看 many-to-one 关系中的左侧,即译文指向其原文的那一侧。
现在有了 TranslatedBlogArticle 和 BlogArticle,并在它们之间建立了连接。join(TranslatedBlogArticle.translation_of) 表达式会在左侧使用别名化的表,右侧使用原始 BlogArticle。
为了统计译文数量,需对 TranslatedBlogArticle 实例使用 count() 函数进行聚合。该计数表达式被赋予标签,以便在 order_by() 子句中重复使用,如之前多次所做的那样。
这种数据库设计极其灵活,允许进行更复杂、更有趣的查询。假设公司希望生成一份按文章统计页面浏览量的报告(类似于之前生成的报告),但附加一个额外的复杂性:仅列出原创文章,并汇总包含其翻译版本的页面浏览量。为了与之前的查询保持一致,该报告还需涵盖2022年11月的页面浏览量。
如你所忆,返回每篇文章页面浏览量的查询通过连接 BlogArticle 和 BlogView 模型,然后按 BlogArticle 分组并使用 count() 聚合函数来返回每个组中有多少行。为了能够将翻译文章的页面浏览量与原始文章一起包含在内,需要在每一行的结果中有一个列引用原始文章,然后可以使用这个列对结果进行分组。
BlogArticle.translation_of 关系指向原始文章,但对于原始文章本身,此关系设置为 None。此查询需要的是一列作为条件:当文章是原创时,应引用同一篇文章;而当文章是翻译版本时,则应引用父文章。
SQL 语言提供了一种可用于解决此问题的条件语句——CASE 结构,在 SQLAlchemy 中可通过 case() 函数使用。以下是一个带标签的列定义示例,展示了该函数的使用方式:
>>> from sqlalchemy import case
>>> original_id = case(
(BlogArticle.translation_of == None, BlogArticle.id),
else_=BlogArticle.translation_of_id).label(None)case() 函数接受一个或多个元组作为参数。每个元组的第一个元素是条件,第二个元素是值。case 表达式的结果将取第一个条件为 True 的元组中的值。else_ 参数用于指定当所有元组的条件均为 False 时应使用的值。
在上述定义中,case() 只有一个条件,用于检查 translation_of 关系是否为 None,这表明该文章是原创的。此时,该列将被赋值为文章的 id。当条件为 False 时,else_ 参数提供了来自 translation_of_id 属性的替代值,其中包含父文章的 id。
注意: 尽可能优先使用 ORM 实体,但由于 case() 函数中使用的值无法设置为模型实体,因此此处使用的是主键标识符而非实体对象。
现在,original_id 列可以在 group_by() 子句中使用,而不再使用之前的 blog article。
>>> page_views = func.count(BlogView.id).label(None)
>>> q = (select(original_id, page_views)
.join(BlogArticle.views)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(original_id)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[(171, 136), (76, 107), (98, 99), ..., (112, 2), (31, 2), (201, 1)]遗憾的是,这并不是期望的结果,对吧?该查询按 original_id 值分组,这些是分配给 BlogArticle 实例的数字主键值。为了能够使用 case() 函数,必须使用基本类型值,但现在理想情况下应将这些数字转换回它们所代表的实体。
一个巧妙的小技巧可以解决这个问题:将 original_id 列与 BlogArticle 的一个别名实例进行连接,从而将每个数字与其对应的 BlogArticle 实体关联起来:
>>> OriginalBlogArticle = aliased(BlogArticle)
>>> q = (select(OriginalBlogArticle, page_views)
.join(BlogArticle.views)
.join(OriginalBlogArticle, original_id == OriginalBlogArticle.id)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(OriginalBlogArticle)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[(BlogArticle(171, "Our activity public responsibility represent"), 136), ...,
(BlogArticle(201, "Exist they particular important note kitchen current"), 1)]这里创建了一个 BlogArticle 模型的第二个实例作为别名,并将其命名为 OriginalBlogArticle。然后向查询添加一个额外的 join() 子句,以将 original_id 值与其进行连接。
join() 调用是第一个没有在其第一个参数中建立关系的操作。当提供了关系时,SQLAlchemy 可以从该关系推导出连接的所有参数。但在此情况下,original_id 值与别名为 OriginalBlogArticle 的模型之间未定义关系,因此 SQLAlchemy 需要更多关于如何执行此连接的具体信息。这种替代形式的 join() 子句将连接的右侧实体作为第一个参数,并将连接条件作为第二个参数。上述使用的连接条件确保每个 original_id 值都能匹配到具有相同标识符的博客文章。
这个查询可以进一步扩展,以返回每个结果所考虑的文章数量。对于没有翻译的文章,返回值为 1;而对于有若干翻译的文章,则能得知有多少篇文章被聚合到了页面浏览结果中。
为此,需要在查询中添加第三列来统计每组中的文章数量。已知 count() 函数用于统计行数,因此使用 count(BlogArticle.id) 会与 page_counts 标签返回相同的结果,因为两者都统计的是代表页面浏览而非博客文章的同一行中的不同列。在 count 函数中加入 distinct() 方法可消除重复项,从而返回正确的文章计数:
>>> q = (select(
OriginalBlogArticle,
page_views,
func.count(BlogArticle.id.distinct())
)
.join(BlogArticle.views)
.join(OriginalBlogArticle, original_id == OriginalBlogArticle.id)
.where(BlogView.timestamp.between(
datetime(2022, 11, 1), datetime(2022, 12, 1)))
.group_by(OriginalBlogArticle)
.order_by(page_views.desc()))
>>> session.execute(q).all()
[(BlogArticle(171, "Our activity public responsibility represent"), 136, 4), ...,
(BlogArticle(201, "Exist they particular important note kitchen current"), 1, 1)]练习题
是否想通过更多查询练习?编写以下查询:
感谢您访问我的博客!如果您喜欢这篇文章,请考虑支持我的工作,并通过 Buy me a coffee 进行小额一次性捐赠,让我保持精力充沛。谢谢!
需要完整排版与评论请前往来源站点阅读。