🐍 Python Day54:Flask 与数据库 — ORM 的力量
🕐 预计用时:2-3 小时 | 🎯 目标:掌握 Flask-SQLAlchemy 和数据库迁移
📖 今日目录
1. 为什么需要 ORM?
😩 直接写 SQL 的痛点
# 用原生 SQL 操作数据库——噩梦般的体验
import sqlite3
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
# 创建表——手写 SQL,容易出错
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
age INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 插入数据——字符串拼接,SQL 注入风险
cursor.execute(
f"INSERT INTO users (username, email, age) VALUES ('{name}', '{email}', {age})"
)
# 查询——手动解析结果为对象
cursor.execute("SELECT * FROM users WHERE age > 18")
rows = cursor.fetchall()
for row in rows:
user = {'id': row[0], 'username': row[1], 'email': row[2]} # 手动映射!
问题清单:
- ❌ SQL 语句写在 Python 代码里,改表结构要改所有相关 SQL
- ❌ 查询结果是元组,不是对象——没有
user.username,只有 row[1] - ❌ 不同数据库(SQLite/MySQL/PostgreSQL)SQL 语法有差异
✨ ORM 的魔法
# 用 ORM 操作数据库——优雅!
# 定义模型(一张表 = 一个类)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), nullable=False)
# 创建表——一行代码
db.create_all()
# 插入数据——像操作 Python 对象
user = User(username='alice', email='alice@example.com')
db.session.add(user)
db.session.commit()
# 查询——返回对象,直接用属性访问
users = User.query.filter(User.age > 18).all()
for user in users:
print(user.username, user.email) # 优雅!
💡 ORM = Object-Relational Mapping(对象-关系映射)
把数据库的"表"映射为 Python 的"类",把"行"映射为"对象",把"列"映射为"属性"。
类比:ORM 就像翻译官——你说 Python 语,它帮你翻译成 SQL 语,数据库也能听懂。
2. Flask-SQLAlchemy 安装与配置
📥 安装
pip install flask-sqlalchemy flask-migrate
⚙️ 配置
# app.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
# 数据库配置
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db' # SQLite
# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://user:pass@localhost/mydb' # MySQL
# app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb' # PostgreSQL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 关闭修改追踪(节省内存)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
🔗 常用数据库 URI 格式
| | |
|---|
| sqlite:///app.db | |
| sqlite:////absolute/path/app.db | |
| mysql+pymysql://user:pass@host/db | |
| postgresql://user:pass@host/db | |
3. 定义模型
📋 模型 = 数据库表的 Python 表示
from datetime import datetime
class User(db.Model):
# 表名(默认是类名小写:user)
__tablename__ = 'users'
# 字段定义
id = db.Column(db.Integer, primary_key=True) # 主键,自增
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
age = db.Column(db.Integer, default=0)
bio = db.Column(db.Text, nullable=True) # 可选字段
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<User {self.username}>'
📊 常用字段类型
| | | |
|---|
| db.Integer | | |
| db.BigInteger | | |
| db.Float | | |
| db.String(n) | | |
| db.Text | | |
| db.Boolean | | |
| db.DateTime | | |
| db.Date | | |
| db.Numeric(p,s) | | |
| db.LargeBinary | | |
🔧 字段参数
| | |
|---|
primary_key | | id = db.Column(db.Integer, primary_key=True) |
unique | | username = db.Column(db.String(80), unique=True) |
nullable | | email = db.Column(db.String(120), nullable=False) |
default | | age = db.Column(db.Integer, default=0) |
index | | email = db.Column(db.String, index=True) |
💡 db.String vs db.Text 的区别:
• db.String(n):有长度限制,适合用户名、邮箱、标题等短文本
• db.Text:无长度限制,适合文章内容、评论等长文本
类比:String 是便签纸(有边界),Text 是笔记本(想写多少写多少)。
4. 模型关系
🔗 一对多关系
最经典的关系:一个用户有多篇文章。
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# relationship:反向引用,user.posts 获取该用户的所有文章
posts = db.relationship('Post', backref='author', lazy=True)
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
# ForeignKey:外键,关联到 users 表的 id 列
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# 使用关系
user = User(username='alice')
post1 = Post(title='Flask 入门', content='...', author=user)
post2 = Post(title='ORM 教程', content='...', author=user)
db.session.add_all([user, post1, post2])
db.session.commit()
# 正向查询:获取用户的所有文章
print(user.posts) # [<Post Flask 入门>, <Post ORM 教程>]
# 反向查询:获取文章的作者
print(post1.author) # <User alice>
🔗 多对多关系
学生和课程:一个学生选多门课,一门课有多个学生。
# 中间表(自动创建)
enrollments = db.Table('enrollments',
db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
db.Column('course_id', db.Integer, db.ForeignKey('courses.id'))
)
class Student(db.Model):
__tablename__ = 'students'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False)
courses = db.relationship('Course', secondary=enrollments,
backref=db.backref('students', lazy='dynamic'))
class Course(db.Model):
__tablename__ = 'courses'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
# 使用
student = Student(name='小明')
course1 = Course(name='Python 基础')
course2 = Course(name='Flask Web')
student.courses.append(course1)
student.courses.append(course2)
db.session.commit()
# 查询
print(student.courses) # [<Course Python 基础>, <Course Flask Web>]
print(course1.students) # [<Student 小明>]
🔄 lazy 参数
| | |
|---|
True / 'select' | | |
'joined' | | |
'subquery' | | |
'dynamic' | | |
5. 创建数据库
# 方式1:在应用上下文中创建(开发时用)
with app.app_context():
db.create_all() # 根据所有模型创建表
# 方式2:在 Flask shell 中创建
# 终端执行:
# flask shell
# >>> from app import db
# >>> db.create_all()
# 方式3:用 Flask-Migrate(推荐,Day54 下半部分会讲)
💡 db.create_all() 的注意事项:
• 已存在的表不会被修改(不加新列、不改类型)
• 要修改表结构,需要先 db.drop_all() 再 db.create_all()(会丢数据!)
• 正确做法:用数据库迁移(Flask-Migrate)来安全地修改表结构
6. CRUD 操作
➕ Create(创建)
# 创建单个对象
user = User(username='alice', email='alice@example.com', age=25)
db.session.add(user)
db.session.commit() # 必须 commit 才会真正写入数据库
# 创建多个对象
users = [
User(username='bob', email='bob@example.com'),
User(username='charlie', email='charlie@example.com'),
]
db.session.add_all(users)
db.session.commit()
📖 Read(查询)
# 查询所有用户
all_users = User.query.all()
# 查询第一个
first_user = User.query.first()
# 根据主键查询
user = User.query.get(42) # 获取 id=42 的用户
# 条件查询
active_users = User.query.filter_by(is_active=True).all()
admin = User.query.filter_by(username='admin').first()
✏️ Update(更新)
# 方式1:直接修改属性
user = User.query.get(1)
user.age = 26
user.email = 'newemail@example.com'
db.session.commit() # commit 后才生效
# 方式2:批量更新
User.query.filter_by(is_active=False).update({'is_active': True})
db.session.commit()
❌ Delete(删除)
# 删除单个对象
user = User.query.get(1)
db.session.delete(user)
db.session.commit()
# 批量删除
User.query.filter_by(is_active=False).delete()
db.session.commit()
⚠️ 别忘了 db.session.commit()!
所有对数据库的修改(add/update/delete)都是在会话(session)中暂存的,只有调用 commit() 才会真正写入数据库。
如果出错了,可以用 db.session.rollback() 回滚。
7. 查询进阶
🔍 filter vs filter_by
# filter_by:简单的等值查询(关键字参数)
User.query.filter_by(username='alice').all()
# SELECT * FROM users WHERE username = 'alice'
# filter:更灵活的条件(SQL 表达式)
User.query.filter(User.age > 18).all()
# SELECT * FROM users WHERE age > 18
User.query.filter(User.username.like('%ali%')).all()
# SELECT * FROM users WHERE username LIKE '%ali%'
User.query.filter(User.age >= 18, User.is_active == True).all()
# SELECT * FROM users WHERE age >= 18 AND is_active = 1
📊 常用查询方法
| | |
|---|
.all() | | User.query.all() |
.first() | | User.query.first() |
.get(id) | | User.query.get(42) |
.count() | | User.query.count() |
.limit(n) | | User.query.limit(10).all() |
.offset(n) | | User.query.offset(10).all() |
.order_by() | | User.query.order_by(User.age.desc()).all() |
.first_or_404() | | User.query.get_or_404(42) |
.paginate() | | User.query.paginate(page=1, per_page=20) |
🔗 链式查询
# 复杂查询可以链式调用
results = User.query \
.filter(User.age >= 18) \
.filter(User.is_active == True) \
.order_by(User.created_at.desc()) \
.limit(10) \
.all()
# 分页查询
page = User.query.paginate(page=2, per_page=10)
print(page.items) # 当前页的数据
print(page.total) # 总记录数
print(page.pages) # 总页数
print(page.has_prev) # 是否有上一页
print(page.has_next) # 是否有下一页
🔎 高级过滤
from sqlalchemy import or_, and_, not_
# OR 条件
User.query.filter(or_(User.username == 'alice', User.username == 'bob')).all()
# AND 条件(默认就是 AND)
User.query.filter(and_(User.age > 18, User.is_active == True)).all()
# NOT 条件
User.query.filter(not_(User.is_active == False)).all()
# IN 条件
User.query.filter(User.username.in_(['alice', 'bob', 'charlie'])).all()
# BETWEEN
User.query.filter(User.age.between(18, 30)).all()
# LIKE
User.query.filter(User.email.like('%@gmail.com')).all()
# IS NULL
User.query.filter(User.bio.is_(None)).all()
8. 聚合查询
📊 统计函数
from sqlalchemy import func
# COUNT
total = db.session.query(func.count(User.id)).scalar()
# SELECT COUNT(id) FROM users
# SUM
total_age = db.session.query(func.sum(User.age)).scalar()
# SELECT SUM(age) FROM users
# AVG
avg_age = db.session.query(func.avg(User.age)).scalar()
# SELECT AVG(age) FROM users
# MAX / MIN
max_age = db.session.query(func.max(User.age)).scalar()
min_age = db.session.query(func.min(User.age)).scalar()
📋 GROUP BY 分组
# 按年龄段统计人数
results = db.session.query(
User.age,
func.count(User.id).label('count')
).group_by(User.age).all()
# 结果:[(25, 3), (30, 5), (18, 2), ...]
# 表示:25岁有3人,30岁有5人,18岁有2人
# 带 HAVING 的分组(过滤分组结果)
results = db.session.query(
User.age,
func.count(User.id).label('count')
).group_by(User.age).having(func.count(User.id) > 2).all()
# 只返回人数超过 2 的年龄段
💡 两种查询风格:
• User.query.filter(...) — Flask-SQLAlchemy 的简化语法(推荐日常使用)
• db.session.query(...) — SQLAlchemy 的完整语法(聚合/子查询等复杂场景)
9. 数据库迁移(Flask-Migrate)
🔄 什么是数据库迁移?
开发过程中,你的数据模型会不断变化——加字段、改类型、删列。如果每次都 drop_all() + create_all(),数据就全没了。
数据库迁移让你安全地修改表结构,同时保留数据。
# 迁移的工作流程:
# 1. 你修改了 Python 模型(加了一个字段)
# 2. flask-migrate 自动对比模型和数据库的差异
# 3. 生成迁移脚本(ALTER TABLE ADD COLUMN ...)
# 4. 执行迁移脚本,数据库表结构更新,数据保留
🛠️ 初始化迁移
# 终端命令
# 1. 初始化迁移仓库(只需执行一次)
flask db init
# 创建 migrations/ 目录
# 2. 生成迁移脚本(每次模型变更后执行)
flask db migrate -m "initial migration"
# 检测模型变化,生成迁移脚本
# 3. 执行迁移(应用变更到数据库)
flask db upgrade
# 执行 SQL,更新表结构
# 4. 回滚迁移(如果出错)
flask db downgrade
# 撤销上一次迁移
💡 迁移四步法:
1. flask db init — 初始化(只做一次)
2. 修改模型代码
3. flask db migrate -m "说明" — 生成迁移脚本
4. flask db upgrade — 应用到数据库
10. 迁移实战
📝 场景1:添加新字段
# 原始模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), nullable=False)
email = db.Column(db.String(120), nullable=False)
# 修改后:添加 phone 字段
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), nullable=False)
email = db.Column(db.String(120), nullable=False)
phone = db.Column(db.String(20)) # ← 新增
# 终端执行:
# flask db migrate -m "add phone to user"
# flask db upgrade
📝 场景2:修改字段
# 修改 email 长度从 120 到 200
# 原始:email = db.Column(db.String(120))
# 修改后:email = db.Column(db.String(200))
# flask db migrate -m "increase email length"
# flask db upgrade
📝 场景3:添加关系
# 添加一对多关系:User → Post
class User(db.Model):
# ... 原有字段 ...
posts = db.relationship('Post', backref='author', lazy=True)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# flask db migrate -m "add posts table"
# flask db upgrade
📝 场景4:数据迁移
# 如果需要在迁移中同时迁移数据,手动编辑迁移脚本
# migrations/versions/xxxx_add_phone.py
def upgrade():
# 先加列
op.add_column('users', sa.Column('phone', sa.String(20)))
# 再迁移数据(可选)
# 给现有用户设置默认手机号
op.execute("UPDATE users SET phone = '未设置' WHERE phone IS NULL")
def downgrade():
op.drop_column('users', 'phone')
⚠️ 迁移注意事项:
• 每次 migrate 后检查生成的迁移脚本,确认它做了正确的事
• 不是所有变更都能自动检测——比如改列名可能被识别为"删旧列 + 加新列"
• 重要数据迁移前先备份数据库
• 迁移脚本应该提交到 Git,方便团队协作
11. 今日练习
🏋️ 练习 1:博客数据模型
# 设计一个博客系统的数据模型:
# - User:用户(username, email, password, created_at)
# - Post:文章(title, content, created_at, author_id)
# - Comment:评论(content, created_at, author_id, post_id)
# 要求:定义好所有关系,能通过 user.posts、post.comments 访问
🏋️ 练习 2:CRUD 操作
# 基于上面的博客模型,完成以下操作:
# 1. 创建一个用户,发表 3 篇文章
# 2. 给每篇文章添加 2 条评论
# 3. 查询某用户的所有文章(按时间倒序)
# 4. 查询某文章的所有评论
# 5. 统计每个用户的文章数量
🏋️ 练习 3:数据库迁移
# 1. 创建初始模型并执行迁移
# 2. 给 User 添加 avatar 字段(String(200))
# 3. 给 Post 添加 tags 字段(String(500))
# 4. 执行迁移,验证数据未丢失
# 5. 回滚迁移,验证表结构恢复
12. 今日小结
| |
|---|
| 表=类,行=对象,列=属性;用 Python 代替 SQL |
| db.Model 定义模型,db.Column 定义字段 |
| Integer/String/Text/Boolean/DateTime/ForeignKey |
| 一对多 relationship + backref,多对多 secondary |
| add/commit/delete + query.filter/all/first/get |
| func.count/sum/avg + group_by + having |
| init → migrate → upgrade,安全修改表结构 |
🚀 明日预告:Day 55 — 用户认证
数据库就位了,接下来要让用户能注册、登录、登出。明天学 Flask-Login + 密码哈希,让你的网站有完整的用户系统——从密码加密存储到会话管理,一网打尽!