要对数据库运行查询,我们首先需要连接到 PostgreSQL 实例并直接在外部创建数据库。可以通过以下命令连接到默认的 PostgreSQL 用户后执行一次:
CREATE DATABASE products;
你可以在终端中运行 sudo -u postgres psql -c "CREATE DATABASE products;" 来执行。在接下来的例子中,我们假设你已执行此命令,因为我们将直接连接到 products 数据库。
现在数据库已创建,我们连接到它并执行之前的 create 语句。连接类有一个名为 execute 的协程,我们可以用它逐个运行创建语句。这个协程返回一个字符串,表示 PostgreSQL 返回的查询状态。我们来运行上一节创建的语句。
列表 5.3 使用 execute 协程运行 create 语句
import asyncpgimport asyncioasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') statements = [CREATE_BRAND_TABLE, CREATE_PRODUCT_TABLE, CREATE_PRODUCT_COLOR_TABLE, CREATE_PRODUCT_SIZE_TABLE, CREATE_SKU_TABLE, SIZE_INSERT, COLOR_INSERT] print('正在创建产品数据库...') for statement in statements: status = await connection.execute(statement) print(status) print('产品数据库创建完成!') await connection.close()asyncio.run(main())
我们首先以与第一个示例类似的方式连接到 products 数据库,区别在于这次连接的是 products 数据库。获得连接后,我们开始逐个使用 connection.execute() 运行 CREATE TABLE 语句。注意 execute() 是一个协程,因此运行 SQL 时必须 await 调用。假设一切顺利,每个 execute 语句的状态应为 CREATE TABLE,每个 insert 语句的状态应为 INSERT 0 1。最后关闭与产品数据库的连接。注意在这个例子中,我们在 for 循环中 await 每个 SQL 语句,确保我们按顺序运行 INSERT 语句。由于某些表依赖于其他表,我们不能并发运行它们。
这些语句不返回任何结果,所以我们来插入一些数据并运行简单的 select 查询。我们先插入几个品牌,然后查询以确保插入正确。我们可以用 execute 协程插入数据,用 fetch 协程运行查询。
import asyncpgimport asynciofrom asyncpg import Recordfrom typing import Listasync def main(): connection = await asyncpg.connect(host='127.0.0.1', port=5432, user='postgres', database='products', password='password') await connection.execute("INSERT INTO brand VALUES(DEFAULT, 'Levis')") await connection.execute("INSERT INTO brand VALUES(DEFAULT, 'Seven')") brand_query = 'SELECT brand_id, brand_name FROM brand' results: List[Record] = await connection.fetch(brand_query) for brand in results: print(f'id: {brand["brand_id"]}, name: {brand["brand_name"]}') await connection.close()asyncio.run(main())
我们首先向 brand 表中插入两个品牌。插入后,使用 connection.fetch 获取品牌表中的所有品牌。查询完成后,所有结果都会在 results 变量中加载进内存。每个结果都是一个 asyncpg 的 Record 对象,其行为类似于字典,可通过下标语法访问列数据。执行后,输出如下:
id: 1, name: Levisid: 2, name: Seven
本例中,我们将查询的所有数据都加载到了列表中。如果我们想获取单个结果,可以调用 connection.fetchrow(),它会返回查询的一个记录。默认情况下,asyncpg 连接会将查询的所有结果拉取到内存中,因此目前 fetchrow 和 fetch 在性能上无差别。稍后我们会看到如何使用游标流式读取结果集,这样每次只加载少量结果,非常适合处理大数据量查询。
这些例子是串行运行查询的,用非 asyncio 驱动也能达到类似性能。但既然我们现在返回的是协程,就可以利用第 4 章学到的 asyncio API 方法,让查询并发执行。