前言
写 Python 的人几乎都要和数据库打交道。最原始的方式就是拼 SQL 字符串——cursor.execute("SELECT * FROM users WHERE id = " + user_id)——这种方式不仅容易出 SQL 注入漏洞,而且代码一多就乱成一团。如果你正在寻找一种更优雅、更安全的数据库操作方式,SQLAlchemy 就是 Python 生态里的标准答案。
这篇文章从零开始,带你一步步掌握 SQLAlchemy 2.0 的核心用法。不绕弯子,直接上手。
一、SQLAlchemy 是什么?为什么选它?
SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)框架。简单来说,它让你用 Python 对象来操作数据库,而不是写 SQL 语句。
ORM 做了什么?
你把数据库表想象成 Python 类,把表里的每一行想象成类的实例。插入数据就是创建一个对象,查询数据就是调用类的方法,删除数据就是删除对象——SQLAlchemy 在背后帮你把这些操作翻译成 SQL。
和同类工具比,SQLAlchemy 强在哪?
| 对比维度 | SQLAlchemy | Django ORM | Peewee | Tortoise ORM |
|---|
| 学习曲线 | 中等 | 低 | 低 | 中等 |
| 功能完整度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ |
| 异步支持 | ✅ 2.0+ | ❌ | ❌ | ✅ |
| 类型提示 | ✅ 2.0+ | 部分 | ❌ | ✅ |
| 复杂查询 | 极强 | 一般 | 一般 | 一般 |
| 社区规模 | 最大 | 大 | 小 | 较小 |
| 适用场景 | 全场景 | Django 项目 | 小型项目 | 异步项目 |
实际选型建议:
- • 如果你用 Django 框架,Django ORM 够用,不用额外引入
- • 如果你的项目需要复杂查询、多数据库、或者未来可能扩展,SQLAlchemy 是首选
- • 如果你只是写个小脚本,Peewee 更轻量
- • 如果你需要异步数据库操作,SQLAlchemy 2.0 和 Tortoise ORM 都可以
二、安装与环境搭建
# 安装 SQLAlchemy 2.0
pip install sqlalchemy
# 如果使用 MySQL,还需要安装驱动
pip install pymysql
# 或者
pip install mysqlclient
# 如果使用 PostgreSQL
pip install psycopg2-binary
# 如果使用异步
pip install sqlalchemy[asyncio]
pip install asyncpg # PostgreSQL 异步驱动
pip install aiosqlite # SQLite 异步驱动
验证安装:
import sqlalchemy
print(sqlalchemy.__version__) # 应该输出 2.0.x
三、第一个 SQLAlchemy 程序:5 分钟跑通
我们用一个实际场景来演示——一个用户管理系统,包含用户表和地址表。
3.1 定义数据模型
from typing import List, Optional
from sqlalchemy import create_engine, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# 1. 定义基类
class Base(DeclarativeBase):
pass
# 2. 定义用户表
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[Optional[str]] = mapped_column(String(100))
age: Mapped[Optional[int]]
# 一对多关系:一个用户可以有多个地址
addresses: Mapped[List["Address"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
def __repr__(self):
return f"User(id={self.id}, name='{self.name}', fullname='{self.fullname}')"
# 3. 定义地址表
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str] = mapped_column(String(100))
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
# 多对一关系:多个地址属于一个用户
user: Mapped["User"] = relationship(back_populates="addresses")
def __repr__(self):
return f"Address(id={self.id}, email='{self.email_address}')"
关键点解读:
- •
DeclarativeBase 是所有模型的基类,类似于 Django 的 models.Model - •
Mapped[int] 是类型注解,SQLAlchemy 2.0 的新特性,让 IDE 能正确提示类型 - •
mapped_column() 定义数据库列的各种属性 - •
relationship() 建立表之间的关联,back_populates 实现双向引用 - •
cascade="all, delete-orphan" 表示删除用户时自动删除其所有地址
3.2 创建数据库连接
# SQLite(适合开发和测试)
engine = create_engine("sqlite:///myapp.db", echo=True)
# MySQL(生产环境常用)
# engine = create_engine(
# "mysql+pymysql://username:password@localhost:3306/mydb",
# echo=True
# )
# PostgreSQL
# engine = create_engine(
# "postgresql+psycopg2://username:password@localhost:5432/mydb",
# echo=True
# )
# 创建所有表
Base.metadata.create_all(engine)
echo=True 会打印所有生成的 SQL 语句,调试时非常有用,生产环境记得关掉。
3.3 插入数据
from sqlalchemy.orm import Session
with Session(engine) as session:
# 创建用户和地址
user1 = User(
name="zhangsan",
fullname="张三",
age=28,
addresses=[
Address(email_address="zhangsan@example.com"),
Address(email_address="zhangsan@work.com"),
]
)
user2 = User(
name="lisi",
fullname="李四",
age=32,
addresses=[
Address(email_address="lisi@example.com"),
]
)
user3 = User(name="wangwu", fullname="王五", age=25) # 没有地址的用户
# 批量插入
session.add_all([user1, user2, user3])
session.commit()
print(f"插入成功!用户ID:{user1.id}, {user2.id}, {user3.id}")
3.4 查询数据
from sqlalchemy import select
with Session(engine) as session:
# 查询所有用户
stmt = select(User)
users = session.scalars(stmt).all()
print(f"所有用户:{users}")
# 条件查询
stmt = select(User).where(User.name == "zhangsan")
user = session.scalar(stmt)
print(f"查询结果:{user}")
# 查询用户的地址
print(f"张三的地址:{user.addresses}")
# IN 查询
stmt = select(User).where(User.age.in_([28, 32]))
users = session.scalars(stmt).all()
print(f"年龄28或32的用户:{users}")
# 模糊查询
stmt = select(User).where(User.fullname.like("张%"))
users = session.scalars(stmt).all()
print(f"姓张的用户:{users}")
3.5 更新数据
with Session(engine) as session:
# 查询要更新的用户
stmt = select(User).where(User.name == "zhangsan")
user = session.scalar(stmt)
# 直接修改属性
user.age = 29
user.fullname = "张三丰"
# 提交更新
session.commit()
print(f"更新后:{user}")
3.6 删除数据
with Session(engine) as session:
# 方式1:按主键查询后删除
user = session.get(User, 1) # 获取 id=1 的用户
if user:
session.delete(user)
session.commit()
print("用户已删除")
# 方式2:批量删除(SQLAlchemy 2.0 新语法)
from sqlalchemy import delete
stmt = delete(Address).where(Address.email_address.like("%test%"))
session.execute(stmt)
session.commit()
四、进阶用法:JOIN 查询与关联操作
4.1 基本 JOIN 查询
# 查询拥有某个邮箱的用户
stmt = (
select(User)
.join(Address)
.where(Address.email_address == "zhangsan@example.com")
)
user = session.scalar(stmt)
print(f"查询结果:{user}")
# 反向查询:通过用户查地址
stmt = (
select(Address)
.join(User)
.where(User.name == "zhangsan")
)
addresses = session.scalars(stmt).all()
print(f"张三的地址:{addresses}")
4.2 延迟加载 vs 立即加载
这是 SQLAlchemy 最容易踩坑的地方。默认情况下,user.addresses 是延迟加载的——只有你访问它的时候才会发 SQL 查询。如果循环中访问大量用户的地址,会产生 N+1 查询问题。
# ❌ 问题代码:N+1 查询
users = session.scalars(select(User)).all()
for user in users:
print(user.addresses) # 每个用户都会发一次查询!
# ✅ 解决方案:使用 joinedload 立即加载
from sqlalchemy.orm import joinedload
stmt = select(User).options(joinedload(User.addresses))
users = session.scalars(stmt).all()
for user in users:
print(user.addresses) # 不会再发额外查询
加载策略对比:
| 策略 | 适用场景 | SQL 数量 | 性能 |
|---|
| 延迟加载(默认) | 单个对象、偶尔访问关联 | N+1 | 差(循环场景) |
| joinedload | 需要关联数据、一对多 | 1(JOIN) | 好 |
| subqueryload | 一对多、数据量大 | 2 | 好 |
| selectinload | 多对多、数据量大 | 2 | 好 |
4.3 聚合查询
from sqlalchemy import func
# 统计用户数量
count = session.scalar(select(func.count(User.id)))
print(f"用户总数:{count}")
# 按年龄分组统计
stmt = select(User.age, func.count(User.id)).group_by(User.age)
results = session.execute(stmt).all()
print(f"各年龄人数:{results}")
# 求平均年龄
avg_age = session.scalar(select(func.avg(User.age)))
print(f"平均年龄:{avg_age}")
五、SQLAlchemy 2.0 的重大变化
如果你之前用过 1.x 版本,以下变化必须注意:
5.1 查询语法变化
# ❌ 1.x 旧语法(2.0 中已废弃)
session.query(User).filter(User.name == "zhangsan").all()
# ✅ 2.0 新语法(统一使用 select)
stmt = select(User).where(User.name == "zhangsan")
session.scalars(stmt).all()
5.2 删除操作变化
# ❌ 1.x 旧语法
session.query(Address).filter(Address.email.like("%test%")).delete()
# ✅ 2.0 新语法
from sqlalchemy import delete
stmt = delete(Address).where(Address.email.like("%test%"))
session.execute(stmt)
5.3 自动提交已移除
2.0 不再自动提交事务,必须显式调用 session.commit()。这是好事——避免了隐式提交导致的难以追踪的 bug。
5.4 类型注解支持
2.0 全面支持 PEP 484 类型注解,IDE 提示更准确:
class User(Base):
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
# IDE 现在能正确推断 user.name 是 str 类型
六、实战:FastAPI + SQLAlchemy 完整项目
下面是一个完整的 FastAPI + SQLAlchemy 项目结构,适合实际开发参考。
6.1 项目结构
myapp/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 入口
│ ├── database.py # 数据库配置
│ ├── models.py # 数据模型
│ ├── schemas.py # Pydantic 模型
│ └── routers/
│ └── users.py # 用户路由
└── requirements.txt
6.2 数据库配置(database.py)
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
# 数据库 URL
SQLALCHEMY_DATABASE_URL = "sqlite:///./myapp.db"
# SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:password@localhost/mydb"
# 创建引擎
engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=False)
# 创建 Session 工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 基类
class Base(DeclarativeBase):
pass
# 依赖注入:获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
6.3 数据模型(models.py)
from sqlalchemy import String, Integer, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
age: Mapped[int | None]
created_at: Mapped[str] = mapped_column(
default=func.now()
)
6.4 Pydantic 模型(schemas.py)
from pydantic import BaseModel, EmailStr
from typing import Optional
class UserCreate(BaseModel):
name: str
email: EmailStr
age: Optional[int] = None
class UserResponse(BaseModel):
id: int
name: str
email: str
age: Optional[int]
class Config:
from_attributes = True # 支持从 ORM 对象转换
6.5 路由(routers/users.py)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import select
from app.database import get_db
from app import models, schemas
router = APIRouter(prefix="/users", tags=["users"])
# 获取所有用户
@router.get("/", response_model=list[schemas.UserResponse])
def get_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
stmt = select(models.User).offset(skip).limit(limit)
return db.scalars(stmt).all()
# 创建用户
@router.post("/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# 检查邮箱是否已存在
stmt = select(models.User).where(models.User.email == user.email)
existing = db.scalar(stmt)
if existing:
raise HTTPException(status_code=400, detail="邮箱已存在")
# 创建新用户
db_user = models.User(**user.model_dump())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 获取单个用户
@router.get("/{user_id}", response_model=schemas.UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.get(models.User, user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
# 更新用户
@router.put("/{user_id}", response_model=schemas.UserResponse)
def update_user(user_id: int, user_update: schemas.UserCreate, db: Session = Depends(get_db)):
user = db.get(models.User, user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
for key, value in user_update.model_dump().items():
setattr(user, key, value)
db.commit()
db.refresh(user)
return user
# 删除用户
@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.get(models.User, user_id)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
db.delete(user)
db.commit()
return {"message": "用户已删除"}
七、避坑指南:新手最常犯的 5 个错误
7.1 忘记 commit
# ❌ 错误:数据不会保存
session.add(user)
# 没有 commit
# ✅ 正确
session.add(user)
session.commit()
7.2 在循环中触发 N+1 查询
# ❌ 错误:每个用户都会触发一次查询
users = session.scalars(select(User)).all()
for user in users:
print(len(user.addresses)) # N 次额外查询
# ✅ 正确:使用 joinedload
from sqlalchemy.orm import joinedload
stmt = select(User).options(joinedload(User.addresses))
users = session.scalars(stmt).all()
7.3 Session 不关闭导致连接泄漏
# ❌ 错误
session = Session(engine)
# ... 使用 session
# 忘记关闭
# ✅ 正确:使用 with 语句
with Session(engine) as session:
# ... 使用 session
pass # 自动关闭
7.4 混淆 scalar 和 scalars
# scalar:返回单个对象
user = session.scalar(select(User).where(User.id == 1))
# scalars:返回迭代器(多个对象)
users = session.scalars(select(User)).all()
7.5 主键冲突
# ❌ 错误:手动指定已存在的主键
user = User(id=1, name="test") # id=1 已存在
session.add(user)
session.commit() # 报错!
# ✅ 正确:让 SQLAlchemy 自动生成主键
user = User(name="test")
session.add(user)
session.commit()
print(user.id) # 自动生成
八、性能优化建议
8.1 连接池配置
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://user:pass@localhost/db",
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接数
pool_recycle=3600, # 连接回收时间(秒)
pool_pre_ping=True, # 连接前检查是否有效
)
8.2 使用批量操作
# ❌ 逐条插入(慢)
for data in large_dataset:
user = User(**data)
session.add(user)
session.commit()
# ✅ 批量插入(快)
session.bulk_save_objects([User(**data) for data in large_dataset])
session.commit()
# ✅ 或者使用 insert 语句(更快)
from sqlalchemy import insert
stmt = insert(User).values(large_dataset)
session.execute(stmt)
session.commit()
8.3 合理使用索引
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), index=True) # 单列索引
email: Mapped[str] = mapped_column(String(100), unique=True) # 唯一索引
总结
SQLAlchemy 是 Python 数据库操作的瑞士军刀——从简单的 CRUD 到复杂的 JOIN、聚合、事务管理,它都能胜任。2.0 版本在类型注解、异步支持和 API 统一上做了大量改进,新手入门也更加友好。
核心要点回顾:
- 1. 用
select() 替代 query(),这是 2.0 的统一查询方式 - 2. 用
with Session() 管理会话,避免连接泄漏 - 3. 注意 N+1 查询问题,合理使用
joinedload - 4. 生产环境用连接池,配置
pool_pre_ping=True - 5. 批量操作用
bulk_save_objects 或 insert,性能提升明显
下一篇预告: 我们将深入讲解 SQLAlchemy 的异步用法和 Alembic 数据库迁移工具,敬请期待。
互动时间
你在用 SQLAlchemy 的过程中遇到过哪些坑?或者有什么独门技巧?欢迎在评论区分享交流!如果觉得这篇文章对你有帮助,点个「在看」让更多人看到吧 🐾
本文基于 SQLAlchemy 2.0.39 编写,代码示例均可直接运行。如有疑问欢迎留言。