asyncpg 提供的默认 fetch 实现的一个缺点是,它会将任何查询执行的所有数据拉入内存。这意味着如果一个查询返回数百万行,我们将会尝试将整个结果集从数据库传输到请求机器。回到我们的产品数据库例子,想象我们更加成功,有数十亿件商品。很可能有些查询会返回非常大的结果集,从而影响性能。
当然,我们可以对查询应用 LIMIT 语句并分页,这对许多(如果不是大多数)应用来说是合理的。然而,这种方法也有开销,因为我们多次发送相同的查询,可能会给数据库带来额外压力。如果我们发现自己受制于这些问题,那么仅在需要时才流式传输特定查询的结果是有意义的。这将节省应用层的内存消耗,同时减轻数据库负载。不过,这会增加网络往返次数。
PostgreSQL 通过游标的概念支持流式查询结果。可以把游标看作指向我们当前在遍历结果集中位置的指针。当我们从流式查询中获取一个结果时,我们向前推进游标到下一个元素,依此类推,直到没有更多结果。
使用 asyncpg,我们可以直接从连接获取游标,然后用它执行流式查询。asyncpg 中的游标使用了我们尚未使用过的 asyncio 特性——异步生成器。异步生成器异步地逐个生成结果,类似于普通的 Python 生成器。它们还允许我们使用特殊的 async for 循环语法来遍历任何结果。为了完全理解其工作原理,我们首先介绍异步生成器以及 async for 语法来遍历这些生成器。
许多开发者对同步 Python 中的生成器并不陌生。生成器是著名的“四人帮”(Addison-Wesley Professional, 1994)在《设计模式:可复用面向对象软件的基础》一书中推广的迭代器设计模式的实现。该模式允许我们“懒惰”地定义数据序列,并逐个迭代。这对于潜在的大序列数据很有用,因为我们不需要一次性将所有数据存入内存。
一个简单的同步生成器是一个包含 yield 语句而不是 return 语句的普通 Python 函数。例如,我们来看看如何创建和使用一个生成器来返回从零开始到指定终点的正整数。
def positive_integers(until: int): for integer in range(until): yield integerpositive_iterator = positive_integers(2)print(next(positive_iterator))print(next(positive_iterator))
在上面的代码中,我们创建了一个函数,接受一个我们想要计数到的整数。然后我们开始一个循环,直到指定的终点整数。在循环的每次迭代中,我们 yield 序列中的下一个整数。当我们调用 positive_integers(2) 时,我们不会返回一个完整的列表,甚至不会运行方法内的循环。事实上,如果我们检查 positive_iterator 的类型,我们会得到 <class 'generator'>。
然后我们使用 next 工具函数来遍历我们的生成器。每次调用 next,都会触发 positive_integers 中 for 循环的一次迭代,给我们提供每次迭代 yield 语句的结果。因此,列表 5.13 的代码将向控制台打印 0 和 1。我们也可以使用 for 循环与我们的生成器来遍历生成器中的所有值。
这适用于同步方法,但如果我们想使用协程来异步生成一系列值呢?以我们的数据库为例,如果我们想生成一系列行,这些行是从数据库“懒惰”获取的呢?我们可以使用 Python 的异步生成器和特殊的 async for 语法来实现。为了演示一个简单的异步生成器,我们从正整数的例子开始,但引入一个需要几秒钟才能完成的协程调用。我们将使用第 2 章的 delay 函数来实现。
import asynciofrom util import delay, async_timedasync def positive_integers_async(until: int): for integer in range(1, until): await delay(integer) yield integer@async_timed()async def main(): async_generator = positive_integers_async(3) print(type(async_generator)) async for number in async_generator: print(f'得到数字 {number}')asyncio.run(main())
运行上面的代码,我们会看到类型不再是普通的生成器,而是 <class 'async_generator'>,即异步生成器。异步生成器与普通生成器的不同之处在于,它不是生成普通的 Python 对象,而是生成我们可以 await 直到获得结果的协程。由于这个原因,我们正常的 for 循环和 next 函数无法与这些类型的生成器一起工作。相反,我们有一个特殊的语法 async for 来处理这些类型的生成器。在这个例子中,我们使用这种语法来遍历 positive_integers_async。
该代码将打印数字 1 和 2,第一次返回前等待 1 秒,第二次返回前等待 2 秒。请注意,这并不是并发生成和等待协程;相反,它是按系列生成和等待它们。
异步生成器的概念与流式数据库游标的概念非常契合。使用这些生成器,我们可以用一个类似 for 循环的语法逐行获取结果。要使用 asyncpg 进行流式传输,我们首先需要启动一个事务,因为 PostgreSQL 要求使用游标时必须先启动事务。启动事务后,我们可以调用 Connection 类的 cursor 方法来获取游标。调用 cursor 方法时,我们传入希望流式的查询。该方法将返回一个异步生成器,我们可以用它逐个流式传输结果。
为了熟悉如何操作,让我们运行一个查询,获取数据库中的所有产品,使用游标。然后,我们使用 async for 语法逐个从结果集中获取元素。
import asyncpgimport asyncioasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') query = 'SELECT product_id, product_name FROM product' async with connection.transaction(): async for product in connection.cursor(query): print(product) await connection.close()asyncio.run(main())
上面的代码将逐个打印出我们的所有产品。尽管我们在这个表中放了 1000 个产品,但我们只会将少量数据拉入内存。在写作时,为了减少网络流量,游标默认每次预取 50 条记录。我们可以通过设置 prefetch 参数来改变这种行为,指定我们想要预取的元素数量。
我们还可以使用这些游标在结果集中跳转,并一次获取任意数量的行。我们来看看如何从刚才使用的查询中间部分获取几条记录。
import asyncpgimport asyncioasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') async with connection.transaction(): query = 'SELECT product_id, product_name from product' cursor = await connection.cursor(query) # ❶ await cursor.forward(500) # ❷ products = await cursor.fetch(100) # ❸ for product in products: print(product) await connection.close()asyncio.run(main())
❶ 为查询创建游标。❷ 将游标向前移动 500 条记录。❸ 获取接下来的 100 条记录。
上面代码中的 forward 方法会有效地跳过产品表中的前 500 条记录。移动游标后,我们获取接下来的 100 个产品并逐一打印到控制台。
这些类型的游标默认是非滚动的,意味着我们只能向前移动结果集。如果你想使用可以前后移动的滚动游标,需要用 SQL 手动执行,使用 DECLARE ... SCROLL CURSOR(你可以在 https://www.postgresql.org/docs/current/plpgsql-cursors.html(https://www.postgresql.org/docs/current/plpgsql-cursors.html) 查阅更多相关内容)。
这两种技术在我们有非常大的结果集且不希望所有数据都驻留在内存中时都非常有用。我们在列表 5.16 中看到的 async for 循环对于遍历整个集合很有用,而创建游标并使用 fetch 协程方法则适合获取一块记录或跳过一组记录。
然而,如果我们只想以预取方式获取固定数量的元素,并仍然使用 async for 循环怎么办?我们可以在 async for 循环中添加一个计数器,看到一定数量的元素后就退出,但这在代码中频繁使用时并不特别可复用。我们可以做的更简单的方法是构建自己的异步生成器。我们将这个生成器命名为 take。这个生成器将接收一个异步生成器和我们希望提取的元素数量。我们来研究如何创建它,并从结果集中获取前五个元素。
import asyncpgimport asyncioasync def take(generator, to_take: int): item_count = 0 async for item in generator: if item_count > to_take - 1: return item_count = item_count + 1 yield itemasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') async with connection.transaction(): query = 'SELECT product_id, product_name from product' product_generator = connection.cursor(query) async for product in take(product_generator, 5): print(product) print('已获取前五个产品!') await connection.close()asyncio.run(main())
我们的 take 异步生成器用 item_count 跟踪我们已看到的项目数量。然后我们进入一个 async for 循环,yield 我们看到的每条记录。yield 之后,我们检查 item_count 以查看是否已 yield 了调用者请求的数量。如果是,我们 return,结束异步生成器。在我们的主协程中,我们可以在正常的 async for 循环中使用 take。在这个例子中,我们用它要求游标中的前五个元素,给出以下输出:
<Recordproduct_id=1product_name='among paper foot see shoe ride age'><Recordproduct_id=2product_name='major wait half speech lake won't'><Recordproduct_id=3product_name='war area speak listen horse past edge'><Recordproduct_id=4product_name='smell proper force road house planet'><Recordproduct_id=5product_name='ship many dog fine surface truck'>已获取前五个产品!
虽然我们自己定义了这个功能,但开源库 aiostream 已经提供了这个功能以及其他处理异步生成器的功能。你可以在 http://aiostream.readthedocs.io(http://aiostream.readthedocs.io) 查看该库的文档。
在本章中,我们学习了使用异步数据库连接在 PostgreSQL 中创建和选择记录的基础知识。你应该现在能够运用这些知识来创建并发数据库客户端。
- 我们学会了如何使用
asyncpg 连接到 PostgreSQL 数据库。 - 我们学会了如何使用各种
asyncpg 协程来创建表、插入记录和执行单个查询。 - 我们学会了如何使用
asyncpg 创建连接池。这使我们能够使用 asyncio 的 API 方法(如 gather)并发运行多个查询。通过这种方式,我们可以潜在地通过并行执行查询来加速应用。 - 我们学会了如何使用
asyncpg 管理事务。事务允许我们在因故障导致更改时回滚,即使发生意外情况,也能保持数据库的一致性。 - 我们学会了如何创建异步生成器以及如何使用它们进行流式数据库连接。我们可以将这两个概念结合起来,处理无法一次性装入内存的大型数据集。