事务是许多数据库的核心概念,满足 ACID(原子性、一致性、隔离性、持久性)属性。一个事务由一个或多个 SQL 语句组成,这些语句作为一个原子单元执行。如果在事务中执行语句时没有错误,我们就提交这些语句,使更改永久化。如果出现任何错误,我们就会回滚这些语句,就好像它们从未发生过一样。在我们的产品数据库中,如果我们尝试插入重复的品牌,或者违反了设定的数据库约束,可能需要回滚一组更新。
在 asyncpg 中,处理事务最简单的方法是使用 connection.transaction 异步上下文管理器来启动它。如果 async with 块中发生异常,事务将自动回滚。如果一切顺利执行,则自动提交。我们来看看如何创建一个事务并执行两条简单的 insert 语句来添加几个品牌。
import asyncioimport asyncpgasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') async with connection.transaction(): # ❶ await connection.execute("INSERT INTO brand " "VALUES(DEFAULT, 'brand_1')") await connection.execute("INSERT INTO brand " "VALUES(DEFAULT, 'brand_2')") query = """SELECT brand_name FROM brand WHERE brand_name LIKE 'brand%'""" brands = await connection.fetch(query) # ❷ print(brands) await connection.close()asyncio.run(main())
❶ 启动数据库事务。❷ 查询品牌以确保事务已提交。
假设事务成功提交,我们应该在控制台看到 [<Record brand_name='brand_1'>, <Record brand_name='brand_2'>]。这个例子假设两条 insert 语句零错误运行,且全部成功提交。为了演示回滚的情况,我们强制引发一个 SQL 错误。为了测试,我们尝试插入两个具有相同主键 id 的品牌。第一次插入会成功,但第二次插入会引发重复键错误。
import asyncioimport loggingimport asyncpgasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') try: async with connection.transaction(): insert_brand = "INSERT INTO brand VALUES(9999, 'big_brand')" await connection.execute(insert_brand) await connection.execute(insert_brand) # ❶ except Exception: logging.exception('运行事务时出错') # ❷ finally: query = """SELECT brand_name FROM brand WHERE brand_name LIKE 'big_%'""" brands = await connection.fetch(query) # ❸ print(f'查询结果为: {brands}') await connection.close()asyncio.run(main())
❶ 这个插入语句会因重复主键而报错。❷ 如果出现异常,记录错误。❸ 查询品牌以确保未插入任何数据。
在以下代码中,我们的第二个插入语句抛出了错误。这会导致以下输出:
ERROR:root:运行事务时出错Traceback (most recent call last): File "listing_5_10.py", line 16, in main await connection.execute("INSERT INTO brand " File "asyncpg/connection.py", line 272, in execute return await self._protocol.query(query, timeout) File "asyncpg/protocol/protocol.pyx", line 316, in queryasyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "brand_pkey"DETAIL: Key (brand_id)=(9999) already exists.查询结果为: []
我们首先因尝试插入重复键而收到异常,然后发现 select 语句的结果为空,表明事务已成功回滚。
asyncpg 还通过 PostgreSQL 的一个特性——保存点(savepoints),支持嵌套事务的概念。在 PostgreSQL 中,使用 SAVEPOINT 命令定义保存点。当我们定义一个保存点后,可以回滚到该保存点,之后执行的查询将被回滚,但在此保存点之前成功执行的查询则不会被回滚。
在 asyncpg 中,我们可以通过在现有事务内调用 connection.transaction 上下文管理器来创建一个保存点。然后,如果内部事务中发生任何错误,它将被回滚,但外部事务不受影响。我们来试试看:在一个事务中插入一个品牌,然后在嵌套事务中尝试插入一个已在数据库中存在的颜色。
import asyncioimport asyncpgimport loggingasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') async with connection.transaction(): await connection.execute("INSERT INTO brand VALUES(DEFAULT, 'my_new_brand')") try: async with connection.transaction(): await connection.execute("INSERT INTO product_color VALUES(1, 'black')") except Exception as ex: logging.warning('忽略插入产品颜色时的错误', exc_info=ex) await connection.close()asyncio.run(main())
运行此代码时,我们的第一个 INSERT 语句成功执行,因为我们数据库中还没有这个品牌。我们的第二个 INSERT 语句因重复键错误而失败。由于这个插入语句在事务内,我们捕获并记录了异常,尽管出现了错误,但外部事务并未回滚,品牌也正确插入。如果我们没有使用嵌套事务,第二个插入语句也会回滚我们的品牌插入。
到目前为止,我们一直使用异步上下文管理器来处理提交和回滚事务。由于这种方式比手动管理更简洁,通常是首选方案。不过,我们有时可能需要手动管理事务。例如,我们可能希望在回滚时执行自定义代码,或者在非异常条件下回滚。
要手动管理事务,可以使用 connection.transaction 返回的事务管理器,而不使用上下文管理器。此时,我们必须手动调用其 start 方法来启动事务,然后在成功时调用 commit,在失败时调用 rollback。我们来看看如何通过重写第一个例子来实现。
import asyncioimport asyncpgfrom asyncpg.transaction import Transactionasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') transaction: Transaction = connection.transaction() # ❶ await transaction.start() # ❷ try: await connection.execute("INSERT INTO brand " "VALUES(DEFAULT, 'brand_1')") await connection.execute("INSERT INTO brand " "VALUES(DEFAULT, 'brand_2')") except asyncpg.PostgresError: print('出现错误,回滚事务!') await transaction.rollback() # ❸ else: print('无错误,提交事务!') await transaction.commit() # ❹ query = """SELECT brand_name FROM brand WHERE brand_name LIKE 'brand%'""" brands = await connection.fetch(query) print(brands) await connection.close()asyncio.run(main())
❶ 创建事务实例。❷ 启动事务。❸ 如果有异常,回滚。❹ 如果无异常,提交。
我们首先使用与异步上下文管理器语法相同的调用方法创建事务,但这次我们存储了该调用返回的 Transaction 实例。可以把这个类看作事务的管理者,因为我们可以用它执行任何需要的提交和回滚。获得事务实例后,我们可以调用 start 协程。这将执行一个查询来在 PostgreSQL 中启动事务。然后,在 try 块中,我们可以执行任何查询。在这个例子中,我们插入了两个品牌。如果其中任何一个 INSERT 语句出错,我们将进入 except 块并调用 rollback 协程回滚事务。如果没有错误,我们调用 commit 协程,结束事务并将事务中的任何更改永久化到数据库中。
到目前为止,我们运行查询的方式是将所有查询结果一次性拉入内存。这对许多应用来说是合理的,因为很多查询返回的结果集较小。然而,我们可能遇到需要处理大型结果集的情况,这些结果集可能无法一次性放入内存。在这种情况下,我们可能希望流式传输结果,以避免过度消耗系统的随机存取内存(RAM)。接下来,我们将探讨如何使用 asyncpg 实现这一点,同时引入异步生成器。