点击蓝字 关注我们


Peewee 是一个轻量级、表达力强、Pythonic 的 ORM(对象关系映射)库。与 Django ORM 和 SQLAlchemy 相比,Peewee 更简单、更直观:
from peewee import *
db = SqliteDatabase('people.db')
classPerson(Model):
name = CharField()
birthday = DateField()
classMeta:
database = db
# 创建表
db.create_tables([Person])
# 插入数据
Person.create(name='Alice', birthday=date(1990, 1, 1))
# 查询数据
alice = Person.select().where(Person.name == 'Alice').get()
这就是 Peewee 的魅力:简单直观,功能强大!
# 基础安装
pip install peewee
# 安装可选扩展
pip install peewee[postgresql] # PostgreSQL支持
pip install peewee[mysql] # MySQL支持
pip install peewee[playhouse] # 扩展功能
pip install peewee[cockroach] # CockroachDB支持
from peewee import *
import datetime
# 1. SQLite 连接(最简单)
sqlite_db = SqliteDatabase('my_app.db')
sqlite_db = SqliteDatabase(':memory:') # 内存数据库
# 2. PostgreSQL 连接
pg_db = PostgresqlDatabase(
'my_database',
user='postgres',
password='secret',
host='localhost',
port=5432
)
# 3. MySQL 连接
mysql_db = MySQLDatabase(
'my_database',
user='root',
password='secret',
host='localhost',
port=3306
)
# 4. CockroachDB 连接
cockroach_db = CockroachDatabase(
'my_database',
user='root',
password='secret',
host='localhost',
port=26257
)
# 5. 使用连接池(生产环境推荐)
from playhouse.pool import PooledPostgresqlDatabase
pooled_db = PooledPostgresqlDatabase(
'my_database',
max_connections=20,
stale_timeout=300, # 5分钟
user='postgres',
password='secret'
)
# 6. 动态数据库配置
classConfig:
DATABASE = {
'name': 'app.db',
'engine': 'peewee.SqliteDatabase',
'check_same_thread': False
}
defget_database(config):
"""根据配置创建数据库连接"""
db_class = getattr(peewee, config['engine'].split('.')[-1])
return db_class(**{k: v for k, v in config.items() if k != 'engine'})
# 7. 连接生命周期管理
classDatabaseManager:
def__init__(self, db_config):
self.db = get_database(db_config)
self.is_connected = False
defconnect(self):
ifnot self.is_connected:
self.db.connect()
self.is_connected = True
defclose(self):
if self.is_connected:
self.db.close()
self.is_connected = False
def__enter__(self):
self.connect()
return self
def__exit__(self, exc_type, exc_val, exc_tb):
self.close()
from peewee import *
import datetime
# 定义基础模型(所有模型继承)
classBaseModel(Model):
"""所有模型的基类"""
created_at = DateTimeField(default=datetime.datetime.now)
updated_at = DateTimeField(default=datetime.datetime.now)
classMeta:
database = db # 将在后面设置
defsave(self, *args, **kwargs):
"""保存时自动更新updated_at"""
self.updated_at = datetime.datetime.now()
return super().save(*args, **kwargs)
# 用户模型
classUser(BaseModel):
username = CharField(max_length=50, unique=True, index=True)
email = CharField(max_length=100, unique=True, index=True)
password_hash = CharField(max_length=255)
is_active = BooleanField(default=True)
is_admin = BooleanField(default=False)
classMeta:
table_name = 'users'# 自定义表名
def__str__(self):
returnf'User({self.username})'
# 文章模型
classArticle(BaseModel):
title = CharField(max_length=200)
slug = CharField(max_length=200, unique=True, index=True)
content = TextField()
published = BooleanField(default=False)
published_at = DateTimeField(null=True)
view_count = IntegerField(default=0)
# 外键关系
author = ForeignKeyField(User, backref='articles')
classMeta:
table_name = 'articles'
indexes = (
# 复合索引
(('published', 'published_at'), False),
)
defpublish(self):
"""发布文章"""
self.published = True
self.published_at = datetime.datetime.now()
self.save()
defincrement_views(self):
"""增加阅读计数"""
self.view_count += 1
self.save()
# 评论模型
classComment(BaseModel):
content = TextField()
is_approved = BooleanField(default=False)
# 多外键关系
article = ForeignKeyField(Article, backref='comments')
user = ForeignKeyField(User, backref='comments')
parent = ForeignKeyField('self', backref='replies', null=True) # 自引用
classMeta:
table_name = 'comments'
indexes = (
(('article', 'created_at'), False),
)
# 标签模型(多对多关系)
classTag(BaseModel):
name = CharField(max_length=50, unique=True)
slug = CharField(max_length=50, unique=True)
description = TextField(null=True)
# 文章标签关联表(多对多)
classArticleTag(BaseModel):
article = ForeignKeyField(Article, backref='tags')
tag = ForeignKeyField(Tag, backref='articles')
classMeta:
table_name = 'article_tags'
# 复合主键
primary_key = CompositeKey('article', 'tag')
# 设置数据库
db = SqliteDatabase('blog.db')
BaseModel._meta.database = db
# 创建所有表
defcreate_tables():
tables = [User, Article, Comment, Tag, ArticleTag]
db.create_tables(tables)
print("所有表创建完成!")
if __name__ == '__main__':
create_tables()
from peewee import *
import datetime
import decimal
import uuid
classFieldDemo(Model):
"""字段类型演示模型"""
# 1. 文本字段
char_field = CharField(max_length=100, default='default')
text_field = TextField()
fixed_char_field = FixedCharField(max_length=10) # 定长字符串
# 2. 数值字段
integer_field = IntegerField(default=0)
big_integer_field = BigIntegerField()
small_integer_field = SmallIntegerField()
auto_field = AutoField() # 自增主键
float_field = FloatField()
double_field = DoubleField()
# 3. Decimal字段(精确小数)
decimal_field = DecimalField(
max_digits=10, # 最大位数
decimal_places=2, # 小数位数
auto_round=True,
rounding=decimal.ROUND_HALF_EVEN
)
# 4. 布尔字段
boolean_field = BooleanField(default=False)
# 5. 日期时间字段
date_field = DateField(default=datetime.date.today)
datetime_field = DateTimeField(default=datetime.datetime.now)
time_field = TimeField(default=datetime.time)
# 6. 特殊字段
uuid_field = UUIDField(default=uuid.uuid4) # UUID字段
binary_field = BlobField() # 二进制字段
json_field = JSONField(default=dict) # JSON字段(需要SQLite 3.9+)
# 7. 选择字段
STATUS_CHOICES = [
('draft', '草稿'),
('published', '已发布'),
('archived', '已归档')
]
status = CharField(
choices=STATUS_CHOICES,
default='draft'
)
# 枚举字段(需要SQLite 3.9+)
COLOR_ENUM = ['red', 'green', 'blue']
color = CharField(choices=[(c, c) for c in COLOR_ENUM])
classMeta:
database = db
table_name = 'field_demo'
classFieldParameters(Model):
"""字段参数演示"""
# 1. 约束参数
required_field = CharField(
null=False, # 不允许NULL(默认)
unique=True, # 唯一约束
index=True, # 创建索引
default='default', # 默认值
constraints=[Check('LENGTH(required_field) > 0')] # 检查约束
)
# 2. 验证参数
email_field = CharField(
max_length=255,
constraints=[SQL("CHECK(email_field LIKE '%@%')")]
)
age_field = IntegerField(
constraints=[
Check('age_field >= 0'),
Check('age_field <= 150')
]
)
# 3. 数据库特定参数
text_field = TextField(
collation='NOCASE'# SQLite不区分大小写的排序规则
)
# 4. 自增字段
id = AutoField(primary_key=True) # 主键自增
# 5. 外键参数
# 注意:外键字段在关系部分详细说明
classMeta:
database = db
from peewee import Field
import json
import hashlib
classJSONField(Field):
"""自定义JSON字段"""
field_type = 'TEXT'
defdb_value(self, value):
"""Python值 -> 数据库值"""
if value isNone:
returnNone
return json.dumps(value, ensure_ascii=False)
defpython_value(self, value):
"""数据库值 -> Python值"""
if value isNone:
returnNone
if isinstance(value, str):
return json.loads(value)
return value
classPasswordField(CharField):
"""密码字段(自动哈希)"""
def__init__(self, *args, **kwargs):
kwargs['max_length'] = 255
super().__init__(*args, **kwargs)
defdb_value(self, value):
"""存储时自动哈希"""
if value isNone:
returnNone
if value.startswith('$2b$'): # 已经是bcrypt哈希
return value
return hashlib.sha256(value.encode()).hexdigest()
classMoneyField(DecimalField):
"""货币字段"""
def__init__(self, *args, **kwargs):
kwargs.setdefault('max_digits', 15)
kwargs.setdefault('decimal_places', 2)
super().__init__(*args, **kwargs)
classStatusField(IntegerField):
"""状态字段(带描述)"""
STATUS_MAP = {
0: 'draft',
1: 'pending',
2: 'approved',
3: 'rejected'
}
REVERSE_STATUS_MAP = {v: k for k, v in STATUS_MAP.items()}
defdb_value(self, value):
"""支持字符串状态"""
if isinstance(value, str):
return self.REVERSE_STATUS_MAP.get(value, 0)
return value
defpython_value(self, value):
"""返回字符串状态"""
return self.STATUS_MAP.get(value, 'unknown')
# 使用自定义字段
classProduct(Model):
id = AutoField()
name = CharField(max_length=100)
price = MoneyField()
metadata = JSONField(default=dict) # 使用自定义JSON字段
status = StatusField(default='draft') # 使用自定义状态字段
classMeta:
database = db
from peewee import *
import datetime
# 创建测试数据
defcreate_test_data():
"""创建测试数据"""
db.create_tables([User, Article, Comment, Tag, ArticleTag])
# 创建用户
users = []
for i in range(1, 6):
user = User.create(
username=f'user{i}',
email=f'user{i}@example.com',
password_hash=f'hash{i}',
is_active=(i != 3) # 第3个用户不活跃
)
users.append(user)
# 创建标签
tags = []
for tag_name in ['Python', 'Django', 'Flask', 'Database', 'Web']:
tag = Tag.create(
name=tag_name,
slug=tag_name.lower()
)
tags.append(tag)
# 创建文章
articles = []
for i in range(1, 11):
article = Article.create(
title=f'文章标题 {i}',
slug=f'article-{i}',
content=f'这是第 {i} 篇文章的内容...',
published=(i % 2 == 0), # 偶数文章发布
published_at=datetime.datetime.now() - datetime.timedelta(days=i),
author=users[i % len(users)],
view_count=i * 10
)
articles.append(article)
# 关联标签
for j in range(i % 3 + 1): # 每篇文章1-3个标签
ArticleTag.create(article=article, tag=tags[j])
# 创建评论
for i, article in enumerate(articles):
for j in range(i % 4): # 每篇文章0-3条评论
Comment.create(
content=f'评论内容 {j+1}',
article=article,
user=users[j % len(users)],
is_approved=(j % 2 == 0)
)
# 基础查询示例
defbasic_queries():
"""基础查询操作"""
print("=== 基础查询 ===")
# 1. 获取所有记录
all_users = list(User.select())
print(f"1. 所有用户数量: {len(all_users)}")
# 2. 获取单条记录
try:
user1 = User.get(User.id == 1)
print(f"2. 用户1: {user1.username}")
except User.DoesNotExist:
print("用户不存在")
# 3. 使用 get_or_none(推荐)
user_none = User.get_or_none(User.id == 999)
print(f"3. 不存在的用户: {user_none}")
# 4. 条件查询
active_users = list(User.select().where(User.is_active == True))
print(f"4. 活跃用户数量: {len(active_users)}")
# 5. 多条件查询
from peewee import fn
recent_articles = list(Article.select().where(
(Article.published == True) &
(Article.published_at > datetime.datetime.now() - datetime.timedelta(days=7))
))
print(f"5. 最近7天发布的文章: {len(recent_articles)}")
# 6. OR 条件
articles = list(Article.select().where(
(Article.author == 1) |
(Article.author == 2)
))
print(f"6. 作者1或2的文章: {len(articles)}")
# 7. IN 查询
user_ids = [1, 2, 3]
users = list(User.select().where(User.id << user_ids))
print(f"7. ID在[1,2,3]的用户: {len(users)}")
# 8. NOT IN 查询
excluded_users = list(User.select().where(~(User.id << [4, 5])))
print(f"8. ID不在[4,5]的用户: {len(excluded_users)}")
# 9. LIKE 查询
python_articles = list(Article.select().where(
Article.title % '%Python%'
))
print(f"9. 标题包含Python的文章: {len(python_articles)}")
# 10. 正则表达式查询(SQLite支持)
users_start_with_u = list(User.select().where(
User.username.regexp('^user[0-9]$')
))
print(f"10. 用户名匹配正则的用户: {len(users_start_with_u)}")
# 11. IS NULL 查询
articles_without_publish_date = list(Article.select().where(
Article.published_at.is_null()
))
print(f"11. 未设置发布时间文章: {len(articles_without_publish_date)}")
# 12. BETWEEN 查询
mid_articles = list(Article.select().where(
Article.view_count.between(30, 70)
))
print(f"12. 阅读量30-70的文章: {len(mid_articles)}")
# 排序和限制
defordering_and_limits():
"""排序和限制"""
print("\n=== 排序和限制 ===")
# 1. 简单排序
articles_asc = list(Article.select().order_by(Article.view_count))
print(f"1. 阅读量升序前3: {[a.view_count for a in articles_asc[:3]]}")
articles_desc = list(Article.select().order_by(Article.view_count.desc()))
print(f" 阅读量降序前3: {[a.view_count for a in articles_desc[:3]]}")
# 2. 多列排序
articles = list(Article.select().order_by(
Article.published.desc(), # 先按发布状态
Article.published_at.desc() # 再按发布时间
))
# 3. 限制和偏移(分页)
page = 2
page_size = 3
paginated = list(Article.select().order_by(Article.id).paginate(page, page_size))
print(f"2. 第{page}页(每页{page_size}条): {[a.id for a in paginated]}")
# 4. 使用limit和offset
limited = list(Article.select().order_by(Article.id).limit(5).offset(10))
print(f"3. 限制5条,偏移10: {[a.id for a in limited]}")
# 5. 随机排序
random_articles = list(Article.select().order_by(fn.Random()).limit(3))
print(f"4. 随机3篇文章ID: {[a.id for a in random_articles]}")
# 聚合查询
defaggregate_queries():
"""聚合查询"""
print("\n=== 聚合查询 ===")
# 1. 计数
total_articles = Article.select().count()
published_articles = Article.select().where(Article.published == True).count()
print(f"1. 总文章数: {total_articles}, 已发布: {published_articles}")
# 2. 求和、平均、最大、最小
stats = Article.select(
fn.SUM(Article.view_count).alias('total_views'),
fn.AVG(Article.view_count).alias('avg_views'),
fn.MAX(Article.view_count).alias('max_views'),
fn.MIN(Article.view_count).alias('min_views')
).get()
print(f"2. 统计: 总阅读{stats.total_views}, 平均{stats.avg_views:.1f}, "
f"最高{stats.max_views}, 最低{stats.min_views}")
# 3. 分组查询
from peewee import fn
author_stats = (Article
.select(
Article.author,
fn.COUNT(Article.id).alias('article_count'),
fn.SUM(Article.view_count).alias('total_views')
)
.group_by(Article.author)
.order_by(fn.COUNT(Article.id).desc()))
print("3. 作者文章统计:")
for stat in author_stats:
print(f" 作者{stat.author_id}: {stat.article_count}篇, "
f"{stat.total_views}次阅读")
# 4. HAVING 子句
popular_authors = (Article
.select(
Article.author,
fn.SUM(Article.view_count).alias('total_views')
)
.group_by(Article.author)
.having(fn.SUM(Article.view_count) > 100)
.order_by(fn.SUM(Article.view_count).desc()))
print("4. 总阅读>100的作者:")
for author in popular_authors:
print(f" 作者{author.author_id}: {author.total_views}次阅读")
# 连接查询
defjoin_queries():
"""连接查询"""
print("\n=== 连接查询 ===")
# 1. 内连接(默认)
articles_with_author = (Article
.select(Article, User)
.join(User, on=(Article.author == User.id))
.where(Article.published == True))
print(f"1. 已发布文章及作者: {len(list(articles_with_author))}篇")
# 2. 左外连接
all_users_with_articles = (User
.select(User, Article)
.join(Article, JOIN.LEFT_OUTER, on=(User.id == Article.author)))
# 3. 多表连接
comments_with_details = (Comment
.select(Comment, Article, User)
.join(Article, on=(Comment.article == Article.id))
.join(User, on=(Comment.user == User.id))
.where(Comment.is_approved == True))
print(f"2. 已批准评论及详情: {len(list(comments_with_details))}条")
# 4. 自连接(评论回复)
comments_with_replies = (Comment
.select(Comment, User)
.join(User, on=(Comment.user == User.id))
.where(Comment.parent.is_null(False))) # 有父评论
# 5. 多对多连接
articles_with_tags = (Article
.select(Article, Tag)
.join(ArticleTag, on=(Article.id == ArticleTag.article))
.join(Tag, on=(ArticleTag.tag == Tag.id))
.where(Tag.name == 'Python'))
print(f"3. 带有Python标签的文章: {len(list(articles_with_tags))}篇")
# 子查询和复杂查询
defsubqueries_and_complex():
"""子查询和复杂查询"""
print("\n=== 子查询和复杂查询 ===")
# 1. 标量子查询
subquery = Article.select(fn.AVG(Article.view_count))
above_avg_articles = list(Article.select().where(
Article.view_count > subquery
))
print(f"1. 阅读量高于平均的文章: {len(above_avg_articles)}篇")
# 2. IN 子查询
active_authors = User.select().where(User.is_active == True)
active_author_articles = list(Article.select().where(
Article.author << active_authors
))
print(f"2. 活跃作者的文章: {len(active_author_articles)}篇")
# 3. EXISTS 子查询
authors_with_articles = list(User.select().where(
fn.EXISTS(Article.select().where(Article.author == User.id))
))
print(f"3. 发表过文章的作者: {len(authors_with_articles)}人")
# 4. 条件表达式(CASE WHEN)
article_categories = (Article
.select(
Article.title,
fn.Case(None, [
(Article.view_count > 50, '热门'),
(Article.view_count > 20, '一般'),
(Article.view_count > 0, '冷门')
], '无人阅读').alias('category')
))
print("4. 文章分类:")
for article in article_categories.limit(5):
print(f" {article.title}: {article.category}")
# 5. 窗口函数(需要SQLite 3.25+)
# 为每个作者的文章按阅读量排名
ranked_articles = (Article
.select(
Article,
User.username,
fn.ROW_NUMBER().over(
partition_by=Article.author,
order_by=[Article.view_count.desc()]
).alias('rank')
)
.join(User, on=(Article.author == User.id)))
print("5. 作者文章排名(示例):")
for article in ranked_articles.limit(3):
print(f" {article.username}的文章'{article.title}' "
f"排名第{article.rank}")
if __name__ == '__main__':
db = SqliteDatabase(':memory:')
# 设置数据库
for model in [User, Article, Comment, Tag, ArticleTag]:
model._meta.database = db
# 创建测试数据
create_test_data()
# 运行查询示例
basic_queries()
ordering_and_limits()
aggregate_queries()
join_queries()
subqueries_and_complex()
defquery_optimization():
"""查询优化技巧"""
print("=== 查询优化技巧 ===")
# 1. 只选择需要的字段(减少数据传输)
print("1. 只选择需要的字段:")
users_light = list(User.select(User.id, User.username).limit(5))
print(f" 只选择id和username: {len(users_light)}条")
# 2. 使用iterator处理大量数据(节省内存)
print("\n2. 使用iterator处理大量数据:")
count = 0
for article in Article.select().iterator():
count += 1
if count % 1000 == 0:
print(f" 已处理 {count} 条...")
print(f" 总共处理 {count} 条记录")
# 3. 使用prefetch避免N+1查询问题
print("\n3. 使用prefetch避免N+1查询:")
from peewee import prefetch
# 错误方式:N+1查询
print(" 错误方式(N+1查询):")
for article in Article.select().limit(3):
author = article.author # 这里会执行额外查询!
print(f" 文章: {article.title}, 作者: {author.username}")
# 正确方式:使用prefetch
print("\n 正确方式(使用prefetch):")
query = prefetch(
Article.select().limit(3),
User.select()
)
for article in query:
print(f" 文章: {article.title}, 作者: {article.author.username}")
# 4. 使用索引优化查询
print("\n4. 索引优化:")
# 查看查询计划
query = Article.select().where(Article.published == True)
print(f" 查询计划: {query.sql()}")
# 5. 批量操作代替循环
print("\n5. 批量操作:")
# 错误方式:循环中逐个保存
# for article in articles:
# article.view_count += 1
# article.save()
# 正确方式:批量更新
Article.update(view_count=Article.view_count + 1).where(
Article.published == True
).execute()
print(" 批量更新完成")
# 6. 使用事务处理批量操作
print("\n6. 使用事务:")
with db.atomic():
for i in range(100):
Article.create(
title=f'批量文章 {i}',
slug=f'batch-article-{i}',
content='...',
author=1
)
print(" 批量插入完成(在事务中)")
# 7. 使用explain分析查询性能
print("\n7. 使用EXPLAIN分析:")
if db.__class__.__name__ == 'SqliteDatabase':
cursor = db.execute_sql('EXPLAIN QUERY PLAN SELECT * FROM article WHERE published = 1')
print(" 查询计划:")
for row in cursor.fetchall():
print(f" {row}")
defforeign_key_relationships():
"""外键关系详解"""
print("=== 外键关系 ===")
# 1. 一对一关系
classUserProfile(Model):
user = ForeignKeyField(User, primary_key=True) # 一对一
bio = TextField(null=True)
avatar_url = CharField(null=True)
website = CharField(null=True)
classMeta:
database = db
# 2. 一对多关系
classArticle(Model):
# 已经在前面定义
pass
# 3. 多对多关系(通过中间表)
classArticleTag(Model):
# 已经在前面定义
pass
# 4. 自引用关系(树形结构)
classCategory(Model):
name = CharField()
parent = ForeignKeyField('self', null=True, backref='children')
classMeta:
database = db
# 外键参数详解
classForeignKeyOptions(Model):
# on_delete 参数
user_cascade = ForeignKeyField(
User,
on_delete='CASCADE'# 用户删除时,相关记录也删除
)
user_set_null = ForeignKeyField(
User,
on_delete='SET NULL', # 用户删除时,外键设为NULL
null=True
)
user_set_default = ForeignKeyField(
User,
on_delete='SET DEFAULT', # 用户删除时,外键设为默认值
default=1
)
user_restrict = ForeignKeyField(
User,
on_delete='RESTRICT'# 如果有相关记录,禁止删除用户
)
# on_update 参数
user_cascade_update = ForeignKeyField(
User,
on_update='CASCADE'# 用户ID更新时,外键同步更新
)
# lazy_load 参数
user_lazy = ForeignKeyField(
User,
lazy_load=False# 不自动加载关联对象
)
classMeta:
database = db
print("外键关系模型定义完成")
# 关联查询实践
defrelationship_queries():
"""关联查询实践"""
print("\n=== 关联查询实践 ===")
# 1. 正向查询(从文章查作者)
print("1. 正向查询:")
article = Article.select().first()
if article:
author = article.author # 自动加载作者
print(f" 文章 '{article.title}' 的作者是: {author.username}")
# 2. 反向查询(从作者查文章)
print("\n2. 反向查询:")
author = User.select().first()
if author:
# 使用 backref
author_articles = author.articles
print(f" 作者 '{author.username}' 的文章数量: {author_articles.count()}")
# 条件过滤
published_articles = author.articles.where(Article.published == True)
print(f" 已发布文章数量: {published_articles.count()}")
# 3. 多对多查询
print("\n3. 多对多查询:")
tag = Tag.select().where(Tag.name == 'Python').first()
if tag:
# 获取所有带有Python标签的文章
python_articles = tag.articles
print(f" 带有'{tag.name}'标签的文章: {python_articles.count()}篇")
for article in python_articles.limit(3):
print(f" - {article.title}")
# 4. 链式查询
print("\n4. 链式查询:")
# 获取活跃用户的所有已发布文章
active_users_articles = (Article
.select()
.join(User)
.where(
(User.is_active == True) &
(Article.published == True)
))
print(f" 活跃用户的已发布文章: {active_users_articles.count()}篇")
# 5. 聚合关联查询
print("\n5. 聚合关联查询:")
from peewee import fn
author_stats = (User
.select(
User.username,
fn.COUNT(Article.id).alias('article_count'),
fn.SUM(Article.view_count).alias('total_views')
)
.join(Article, JOIN.LEFT_OUTER)
.group_by(User.id)
.order_by(fn.COUNT(Article.id).desc()))
print(" 作者统计:")
for stat in author_stats.limit(3):
print(f" {stat.username}: {stat.article_count}篇文章, "
f"{stat.total_views or0}次阅读")
# 6. 递归查询(树形结构)
print("\n6. 递归查询:")
# 创建类别数据
db.create_tables([Category])
root = Category.create(name='编程')
web = Category.create(name='Web开发', parent=root)
python = Category.create(name='Python', parent=web)
django = Category.create(name='Django', parent=python)
# 获取所有子类别(递归)
defget_all_children(category, depth=0):
children = []
for child in category.children:
children.append((' ' * depth + child.name, child))
children.extend(get_all_children(child, depth + 1))
return children
all_children = get_all_children(root)
print(" 类别树:")
for name, _ in all_children:
print(f" {name}")
# 预加载优化
defeager_loading():
"""预加载优化"""
print("\n=== 预加载优化 ===")
from peewee import prefetch
# 1. 预加载一对多关系
print("1. 预加载一对多:")
# 查询所有文章及其作者(避免N+1)
articles_with_authors = prefetch(
Article.select().limit(5),
User.select()
)
for article in articles_with_authors:
print(f" 文章: {article.title}, 作者: {article.author.username}")
# 2. 预加载多对多关系
print("\n2. 预加载多对多:")
articles_with_tags = prefetch(
Article.select().limit(3),
ArticleTag.select(),
Tag.select()
)
for article in articles_with_tags:
tag_names = [at.tag.name for at in article.tags_prefetch]
print(f" 文章: {article.title}, 标签: {', '.join(tag_names)}")
# 3. 多层预加载
print("\n3. 多层预加载:")
# 文章 -> 作者 -> 用户资料
classUserProfile(Model):
user = ForeignKeyField(User, primary_key=True)
bio = TextField()
classMeta:
database = db
db.create_tables([UserProfile])
# 创建测试数据
for user in User.select():
UserProfile.create(user=user, bio=f'{user.username}的简介')
# 多层预加载查询
articles_with_all = prefetch(
Article.select().limit(3),
User.select(),
UserProfile.select()
)
for article in articles_with_all:
print(f" 文章: {article.title}")
print(f" 作者: {article.author.username}")
print(f" 简介: {article.author.userprofile.bio}")
# 4. 自定义预加载查询
print("\n4. 自定义预加载:")
# 只预加载活跃作者的文章
active_authors = User.select().where(User.is_active == True)
articles_active_authors = prefetch(
Article.select().limit(3),
active_authors
)
print(f" 只预加载活跃作者的文章: {len(list(articles_active_authors))}篇")
from playhouse.migrate import *
import datetime
defdatabase_migrations():
"""数据库迁移"""
print("=== 数据库迁移 ===")
# 1. 初始化迁移器
migrator = SqliteMigrator(db)
# 2. 添加字段
print("1. 添加字段:")
# 添加手机号字段
migrate(
migrator.add_column('user', 'phone', CharField(null=True, max_length=20))
)
print(" 添加phone字段到user表")
# 3. 删除字段
# migrate(
# migrator.drop_column('user', 'phone')
# )
# 4. 重命名字段
print("2. 重命名字段:")
migrate(
migrator.rename_column('article', 'view_count', 'views')
)
print(" 将view_count重命名为views")
# 5. 修改字段
print("3. 修改字段:")
migrate(
migrator.alter_column_type('user', 'email', CharField(max_length=150))
)
print(" 修改email字段长度为150")
# 6. 添加NOT NULL约束
# migrate(
# migrator.add_not_null('user', 'email')
# )
# 7. 删除NOT NULL约束
# migrate(
# migrator.drop_not_null('user', 'phone')
# )
# 8. 添加默认值
print("4. 添加默认值:")
migrate(
migrator.add_default('article', 'views', 0)
)
print(" 为views字段添加默认值0")
# 9. 创建索引
print("5. 创建索引:")
migrate(
migrator.add_index('article', ('published', 'published_at'), False)
)
print(" 为article表添加复合索引")
# 10. 删除索引
# migrate(
# migrator.drop_index('article', 'article_published_published_at')
# )
# 11. 重命名表
# migrate(
# migrator.rename_table('article', 'posts')
# )
# 12. 复杂迁移示例
print("\n6. 复杂迁移示例:")
# 添加新表
classArticleViewLog(Model):
article = ForeignKeyField(Article)
user = ForeignKeyField(User, null=True)
viewed_at = DateTimeField(default=datetime.datetime.now)
ip_address = CharField(null=True)
classMeta:
database = db
db.create_tables([ArticleViewLog])
print(" 创建文章浏览日志表")
# 迁移数据(将views字段迁移到新表)
print(" 迁移数据...")
with db.atomic():
for article in Article.select():
# 模拟创建浏览记录
for i in range(article.views or0):
ArticleViewLog.create(
article=article,
viewed_at=datetime.datetime.now() - datetime.timedelta(hours=i)
)
print(" 数据迁移完成")
# 信号系统
defsignals_system():
"""信号系统"""
print("\n=== 信号系统 ===")
from playhouse.signals import Model, pre_save, post_save, pre_delete, post_delete
classSignalModel(Model):
name = CharField()
value = IntegerField(default=0)
classMeta:
database = db
@pre_save(sender=SignalModel)
defon_pre_save(model_class, instance, created):
print(f" [pre_save] 保存前: {instance.name}, 新建: {created}")
if created:
instance.value = 100# 设置新实例的默认值
@post_save(sender=SignalModel)
defon_post_save(model_class, instance, created):
print(f" [post_save] 保存后: {instance.name}, 新建: {created}")
if created:
print(f" 新记录ID: {instance.id}")
@pre_delete(sender=SignalModel)
defon_pre_delete(model_class, instance):
print(f" [pre_delete] 删除前: {instance.name}")
@post_delete(sender=SignalModel)
defon_post_delete(model_class, instance):
print(f" [post_delete] 删除后: {instance.name}")
# 创建表
db.create_tables([SignalModel])
# 测试信号
print("1. 测试保存信号:")
obj = SignalModel.create(name="测试对象")
obj.name = "修改后的对象"
obj.save()
print("\n2. 测试删除信号:")
obj.delete_instance()
print("\n3. 批量操作信号:")
# 注意:批量操作不会触发信号
SignalModel.insert_many([
{'name': '批量1', 'value': 1},
{'name': '批量2', 'value': 2}
]).execute()
print(" 批量插入完成(不触发信号)")
# 钩子方法
defhook_methods():
"""模型钩子方法"""
print("\n=== 模型钩子方法 ===")
classHookModel(Model):
name = CharField()
data = TextField(default='{}')
created_at = DateTimeField(default=datetime.datetime.now)
updated_at = DateTimeField(default=datetime.datetime.now)
classMeta:
database = db
def__init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._original_data = self.data
defsave(self, *args, **kwargs):
# 保存前钩子
self.before_save()
result = super().save(*args, **kwargs)
# 保存后钩子
self.after_save()
return result
defdelete_instance(self, *args, **kwargs):
# 删除前钩子
self.before_delete()
result = super().delete_instance(*args, **kwargs)
# 删除后钩子
self.after_delete()
return result
defbefore_save(self):
"""保存前的逻辑"""
self.updated_at = datetime.datetime.now()
print(f" [before_save] 更新updated_at: {self.name}")
defafter_save(self):
"""保存后的逻辑"""
print(f" [after_save] 保存完成: {self.name}")
if hasattr(self, '_original_data') and self.data != self._original_data:
print(f" 数据已修改")
defbefore_delete(self):
"""删除前的逻辑"""
print(f" [before_delete] 准备删除: {self.name}")
defafter_delete(self):
"""删除后的逻辑"""
print(f" [after_delete] 删除完成: {self.name}")
@classmethod
defbefore_create(cls):
"""类级别创建前钩子"""
print(f" [before_create] 准备创建新实例")
@classmethod
defafter_create(cls):
"""类级别创建后钩子"""
print(f" [after_create] 新实例创建完成")
# 创建表
db.create_tables([HookModel])
# 测试钩子
print("1. 测试实例钩子:")
hook_obj = HookModel(name="钩子测试")
hook_obj.save()
hook_obj.data = '{"key": "value"}'
hook_obj.save()
hook_obj.delete_instance()
print("\n2. 测试类钩子:")
HookModel.before_create()
obj = HookModel.create(name="类钩子测试")
HookModel.after_create()
defadvanced_query_features():
"""高级查询功能"""
print("=== 高级查询功能 ===")
from peewee import Window, SQL
from playhouse.shortcuts import model_to_dict, dict_to_model
# 1. 窗口函数
print("1. 窗口函数:")
# 为每个作者的文章按阅读量排名
window = Window(
partition_by=[Article.author],
order_by=[Article.views.desc()]
)
ranked_articles = (Article
.select(
Article,
User.username,
fn.ROW_NUMBER().over(window).alias('rank'),
fn.SUM(Article.views).over(window).alias('author_total_views')
)
.join(User, on=(Article.author == User.id))
.order_by(Article.author, Article.views.desc()))
print(" 文章排名(窗口函数):")
for article in ranked_articles.limit(5):
print(f" {article.username}: {article.title} "
f"(排名: {article.rank}, 作者总阅读: {article.author_total_views})")
# 2. 通用表表达式(CTE)
print("\n2. 通用表表达式(CTE):")
# 定义CTE
cte = (Article
.select(Article.author, fn.SUM(Article.views).alias('total_views'))
.group_by(Article.author)
.cte('author_stats'))
# 使用CTE查询
author_stats_query = (User
.select(User.username, cte.c.total_views)
.join(cte, on=(User.id == cte.c.author))
.order_by(cte.c.total_views.desc()))
print(" 作者统计(CTE):")
for stat in author_stats_query.limit(3):
print(f" {stat.username}: {stat.total_views}次阅读")
# 3. 递归CTE(树形结构)
print("\n3. 递归CTE:")
# 创建组织架构表
classDepartment(Model):
name = CharField()
parent = ForeignKeyField('self', null=True, backref='children')
classMeta:
database = db
db.create_tables([Department])
# 创建测试数据
company = Department.create(name='公司')
hr = Department.create(name='人力资源部', parent=company)
it = Department.create(name='技术部', parent=company)
dev = Department.create(name='开发组', parent=it)
test = Department.create(name='测试组', parent=it)
# 递归CTE查询所有子部门
Base = Department.alias()
Recursive = Department.alias()
cte = (Base
.select(Base.id, Base.name, Base.parent, SQL('1').alias('level'))
.where(Base.parent.is_null())
.cte('departments_recursive', recursive=True))
cte = cte.union_all(
Recursive
.select(Recursive.id, Recursive.name, Recursive.parent, cte.c.level + 1)
.join(cte, on=(Recursive.parent == cte.c.id))
)
departments_tree = (cte
.select_from(cte.c.name, cte.c.level)
.order_by(cte.c.level, cte.c.name))
print(" 部门树(递归CTE):")
for dept in departments_tree:
indent = ' ' * (dept.level - 1)
print(f" {indent}{dept.name}")
# 4. 全文搜索(SQLite FTS5)
print("\n4. 全文搜索:")
# 创建虚拟表
classArticleFTS(Model):
content = TextField()
classMeta:
database = db
table_name = 'article_fts'
# 使用FTS5扩展
options = {'tokenize': 'porter'} # 使用Porter词干分析器
# 创建虚拟表(实际项目需要处理同步)
try:
db.execute_sql('CREATE VIRTUAL TABLE IF NOT EXISTS article_fts USING fts5(title, content)')
# 插入测试数据
db.execute_sql('''
INSERT INTO article_fts (title, content)
SELECT title, content FROM article
''')
# 全文搜索查询
search_term = 'Python'
results = db.execute_sql('''
SELECT title, snippet(article_fts, 2, '<b>', '</b>', '...', 64) as snippet
FROM article_fts
WHERE article_fts MATCH ?
ORDER BY rank
''', (search_term,))
print(f" 搜索 '{search_term}' 结果:")
for row in results.fetchall():
print(f" {row[0]}: {row[1]}")
except Exception as e:
print(f" 全文搜索需要SQLite FTS5支持: {e}")
# 5. 模型转换
print("\n5. 模型转换:")
article = Article.select().first()
if article:
# 模型转字典
article_dict = model_to_dict(article)
print(f" 模型转字典: {article_dict['title']}")
# 包含关联对象
article_with_author = model_to_dict(article, backrefs=True)
print(f" 包含关联: 作者={article_with_author.get('author', {}).get('username')}")
# 字典转模型
new_article_data = {
'title': '新文章',
'slug': 'new-article',
'content': '内容',
'author': 1
}
# 注意:这不会保存到数据库
new_article = dict_to_model(Article, new_article_data)
print(f" 字典转模型: {new_article.title}")
# 6. 自定义查询构建
print("\n6. 自定义查询构建:")
defbuild_article_query(filters):
"""动态构建查询"""
query = Article.select()
if'author_id'in filters:
query = query.where(Article.author == filters['author_id'])
if'published'in filters:
query = query.where(Article.published == filters['published'])
if'min_views'in filters:
query = query.where(Article.views >= filters['min_views'])
if'search'in filters:
query = query.where(Article.title.contains(filters['search']))
if'order_by'in filters:
order_field = getattr(Article, filters['order_by'], Article.id)
query = query.order_by(order_field.desc())
return query
# 测试动态查询
filters = {
'published': True,
'min_views': 30,
'order_by': 'views'
}
dynamic_query = build_article_query(filters)
print(f" 动态查询结果: {dynamic_query.count()}篇文章")
defperformance_optimization():
"""性能优化与监控"""
print("=== 性能优化与监控 ===")
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
import time
# 1. 数据库连接池
print("1. 数据库连接池:")
from playhouse.pool import PooledSqliteDatabase
pooled_db = PooledSqliteDatabase(
'pooled.db',
max_connections=32,
stale_timeout=300,
check_same_thread=False
)
print(f" 连接池配置: 最大连接数=32, 超时=300秒")
# 2. 异步数据库(非阻塞)
print("\n2. 异步数据库:")
# 使用SqliteQueueDatabase避免写入冲突
queued_db = SqliteQueueDatabase(
'queued.db',
use_gevent=False, # 不使用gevent
autostart=True,
queue_max_size=64,
results_timeout=5.0
)
print(f" 队列数据库: 队列大小=64, 超时=5秒")
# 3. 性能监控装饰器
print("\n3. 性能监控:")
defquery_timer(func):
"""查询计时装饰器"""
defwrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f" 查询耗时: {elapsed:.4f}秒 - {func.__name__}")
return result
return wrapper
@query_timer
defslow_query():
"""模拟慢查询"""
return list(Article.select().where(
Article.title.contains('Python')
))
# 执行慢查询
results = slow_query()
# 4. 查询分析
print("\n4. 查询分析:")
# 查看生成的SQL
query = Article.select().where(Article.published == True)
print(f" SQL语句: {query.sql()}")
# 查看查询计划(SQLite)
if db.__class__.__name__ == 'SqliteDatabase':
cursor = db.execute_sql('EXPLAIN QUERY PLAN SELECT * FROM article WHERE published = 1')
print(" 查询计划:")
for row in cursor.fetchall():
print(f" {row}")
# 5. 批量操作优化
print("\n5. 批量操作优化:")
# 批量插入优化
defbatch_insert_performance():
"""批量插入性能测试"""
num_records = 1000
# 方式1:循环插入(慢)
start = time.perf_counter()
with db.atomic():
for i in range(num_records):
Article.create(
title=f'文章 {i}',
slug=f'article-{i}',
content='...',
author=1
)
loop_time = time.perf_counter() - start
# 方式2:批量插入(快)
data = [
{'title': f'批量 {i}', 'slug': f'batch-{i}', 'content': '...', 'author': 1}
for i in range(num_records)
]
start = time.perf_counter()
with db.atomic():
Article.insert_many(data).execute()
batch_time = time.perf_counter() - start
print(f" 循环插入: {loop_time:.3f}秒 ({num_records/loop_time:.1f}条/秒)")
print(f" 批量插入: {batch_time:.3f}秒 ({num_records/batch_time:.1f}条/秒)")
print(f" 性能提升: {loop_time/batch_time:.1f}倍")
batch_insert_performance()
# 6. 索引优化
print("\n6. 索引优化:")
# 检查现有索引
cursor = db.execute_sql("SELECT name, sql FROM sqlite_master WHERE type='index'")
indexes = cursor.fetchall()
print(f" 现有索引数量: {len(indexes)}")
for idx_name, idx_sql in indexes[:3]: # 显示前3个
print(f" {idx_name}")
# 建议索引
print("\n 建议索引:")
suggestions = [
"article(published, published_at) - 用于已发布文章查询",
"article(author, published) - 用于作者文章查询",
"comment(article, created_at) - 用于文章评论查询"
]
for suggestion in suggestions:
print(f" • {suggestion}")
# 7. 缓存优化
print("\n7. 缓存优化:")
from playhouse.shortcuts import model_to_dict
import pickle
classQueryCache:
"""简单查询缓存"""
def__init__(self, ttl=300):# 默认5分钟
self.cache = {}
self.ttl = ttl
self.timestamps = {}
defget(self, key):
"""获取缓存"""
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
returnNone
defset(self, key, value):
"""设置缓存"""
self.cache[key] = value
self.timestamps[key] = time.time()
defclear(self):
"""清空缓存"""
self.cache.clear()
self.timestamps.clear()
# 使用缓存
cache = QueryCache(ttl=60) # 1分钟缓存
defget_popular_articles_cached():
"""获取热门文章(带缓存)"""
cache_key = 'popular_articles'
# 尝试从缓存获取
cached = cache.get(cache_key)
if cached isnotNone:
print(" 从缓存获取热门文章")
return cached
# 缓存未命中,执行查询
print(" 执行数据库查询")
articles = list(Article
.select()
.where(Article.published == True)
.order_by(Article.views.desc())
.limit(10))
# 序列化并缓存
serialized = [(a.id, a.title, a.views) for a in articles]
cache.set(cache_key, serialized)
return serialized
# 测试缓存
print(" 第一次调用(缓存未命中):")
result1 = get_popular_articles_cached()
print(" 第二次调用(缓存命中):")
result2 = get_popular_articles_cached()
print(f" 结果相同: {result1 == result2}")
from flask import Flask, request, jsonify, g
from peewee import *
import datetime
import jwt
import os
from functools import wraps
# Flask应用配置
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')
app.config['DATABASE'] = os.getenv('DATABASE_URL', 'app.db')
# 数据库配置
db = SqliteDatabase(app.config['DATABASE'])
classBaseModel(Model):
"""基础模型"""
created_at = DateTimeField(default=datetime.datetime.now)
updated_at = DateTimeField(default=datetime.datetime.now)
classMeta:
database = db
defsave(self, *args, **kwargs):
self.updated_at = datetime.datetime.now()
return super().save(*args, **kwargs)
# 用户模型
classUser(BaseModel):
username = CharField(max_length=50, unique=True)
email = CharField(max_length=100, unique=True)
password_hash = CharField(max_length=255)
is_active = BooleanField(default=True)
is_admin = BooleanField(default=False)
classMeta:
table_name = 'users'
defset_password(self, password):
"""设置密码"""
import hashlib
self.password_hash = hashlib.sha256(password.encode()).hexdigest()
defcheck_password(self, password):
"""验证密码"""
import hashlib
return self.password_hash == hashlib.sha256(password.encode()).hexdigest()
defgenerate_token(self):
"""生成JWT token"""
payload = {
'user_id': self.id,
'username': self.username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
# 文章模型
classArticle(BaseModel):
title = CharField(max_length=200)
slug = CharField(max_length=200, unique=True)
content = TextField()
published = BooleanField(default=False)
published_at = DateTimeField(null=True)
views = IntegerField(default=0)
author = ForeignKeyField(User, backref='articles')
classMeta:
table_name = 'articles'
indexes = (
(('published', 'published_at'), False),
)
# Flask应用生命周期
@app.before_request
defbefore_request():
"""请求前连接数据库"""
g.db = db
g.db.connect()
@app.after_request
defafter_request(response):
"""请求后关闭数据库连接"""
if hasattr(g, 'db'):
g.db.close()
return response
@app.teardown_request
defteardown_request(exception):
"""请求结束时清理"""
if hasattr(g, 'db') andnot g.db.is_closed():
g.db.close()
# 认证装饰器
deflogin_required(f):
@wraps(f)
defdecorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
ifnot token:
return jsonify({'error': '认证令牌缺失'}), 401
try:
token = token.replace('Bearer ', '')
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
g.user_id = payload['user_id']
g.user = User.get_or_none(User.id == g.user_id)
ifnot g.user ornot g.user.is_active:
return jsonify({'error': '用户不存在或未激活'}), 401
except jwt.ExpiredSignatureError:
return jsonify({'error': '令牌已过期'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': '无效令牌'}), 401
return f(*args, **kwargs)
return decorated_function
defadmin_required(f):
@wraps(f)
@login_required
defdecorated_function(*args, **kwargs):
ifnot g.user.is_admin:
return jsonify({'error': '需要管理员权限'}), 403
return f(*args, **kwargs)
return decorated_function
# API路由
@app.route('/api/health', methods=['GET'])
defhealth_check():
"""健康检查"""
return jsonify({'status': 'healthy', 'timestamp': datetime.datetime.now().isoformat()})
@app.route('/api/register', methods=['POST'])
defregister():
"""用户注册"""
data = request.get_json()
required_fields = ['username', 'email', 'password']
ifnot data ornot all(field in data for field in required_fields):
return jsonify({'error': '缺少必要字段'}), 400
try:
with db.atomic():
user = User.create(
username=data['username'],
email=data['email']
)
user.set_password(data['password'])
user.save()
token = user.generate_token()
return jsonify({
'message': '注册成功',
'user': {
'id': user.id,
'username': user.username,
'email': user.email
},
'token': token
}), 201
except IntegrityError as e:
if'username'in str(e):
return jsonify({'error': '用户名已存在'}), 409
elif'email'in str(e):
return jsonify({'error': '邮箱已存在'}), 409
return jsonify({'error': '注册失败'}), 400
@app.route('/api/login', methods=['POST'])
deflogin():
"""用户登录"""
data = request.get_json()
ifnot data or'username'notin data or'password'notin data:
return jsonify({'error': '缺少用户名或密码'}), 400
user = User.get_or_none(
(User.username == data['username']) |
(User.email == data['username'])
)
if user and user.check_password(data['password']) and user.is_active:
token = user.generate_token()
return jsonify({
'message': '登录成功',
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'is_admin': user.is_admin
}
})
return jsonify({'error': '用户名或密码错误'}), 401
@app.route('/api/articles', methods=['GET'])
defget_articles():
"""获取文章列表"""
from peewee import fn
# 查询参数
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 10)), 100)
published_only = request.args.get('published', 'true').lower() == 'true'
author_id = request.args.get('author_id')
search = request.args.get('search')
tag = request.args.get('tag')
# 构建查询
query = Article.select(Article, User.username)
if published_only:
query = query.where(Article.published == True)
if author_id:
query = query.where(Article.author == author_id)
if search:
query = query.where(Article.title.contains(search))
# 关联标签查询
if tag:
from models import Tag, ArticleTag
query = (query
.join(ArticleTag, on=(Article.id == ArticleTag.article))
.join(Tag, on=(ArticleTag.tag == Tag.id))
.where(Tag.name == tag))
else:
query = query.join(User, on=(Article.author == User.id))
# 计算总数
total = query.count()
# 分页
articles = (query
.order_by(Article.published_at.desc())
.paginate(page, per_page))
# 构建响应
articles_data = []
for article in articles:
articles_data.append({
'id': article.id,
'title': article.title,
'slug': article.slug,
'excerpt': article.content[:100] + '...'if len(article.content) > 100else article.content,
'published': article.published,
'published_at': article.published_at.isoformat() if article.published_at elseNone,
'views': article.views,
'author': article.author.username if hasattr(article, 'author') elseNone,
'created_at': article.created_at.isoformat()
})
return jsonify({
'page': page,
'per_page': per_page,
'total': total,
'total_pages': (total + per_page - 1) // per_page,
'data': articles_data
})
@app.route('/api/articles', methods=['POST'])
@login_required
defcreate_article():
"""创建文章"""
data = request.get_json()
required_fields = ['title', 'content', 'slug']
ifnot data ornot all(field in data for field in required_fields):
return jsonify({'error': '缺少必要字段'}), 400
try:
with db.atomic():
article = Article.create(
title=data['title'],
content=data['content'],
slug=data['slug'],
author=g.user,
published=data.get('published', False)
)
if article.published:
article.published_at = datetime.datetime.now()
article.save()
return jsonify({
'message': '文章创建成功',
'article': {
'id': article.id,
'title': article.title,
'slug': article.slug,
'published': article.published,
'published_at': article.published_at.isoformat() if article.published_at elseNone
}
}), 201
except IntegrityError:
return jsonify({'error': '文章slug已存在'}), 409
@app.route('/api/articles/<int:article_id>', methods=['GET'])
defget_article(article_id):
"""获取单个文章"""
try:
article = (Article
.select(Article, User)
.join(User)
.where(Article.id == article_id)
.get())
# 增加阅读计数
article.views += 1
article.save()
return jsonify({
'id': article.id,
'title': article.title,
'slug': article.slug,
'content': article.content,
'published': article.published,
'published_at': article.published_at.isoformat() if article.published_at elseNone,
'views': article.views,
'author': {
'id': article.author.id,
'username': article.author.username
},
'created_at': article.created_at.isoformat(),
'updated_at': article.updated_at.isoformat()
})
except Article.DoesNotExist:
return jsonify({'error': '文章不存在'}), 404
@app.route('/api/articles/<int:article_id>', methods=['PUT'])
@login_required
defupdate_article(article_id):
"""更新文章"""
data = request.get_json()
try:
article = Article.get_or_none(Article.id == article_id)
ifnot article:
return jsonify({'error': '文章不存在'}), 404
# 权限检查
if article.author.id != g.user.id andnot g.user.is_admin:
return jsonify({'error': '没有权限修改此文章'}), 403
with db.atomic():
# 更新字段
if'title'in data:
article.title = data['title']
if'content'in data:
article.content = data['content']
if'slug'in data:
article.slug = data['slug']
if'published'in data:
article.published = data['published']
if data['published'] andnot article.published_at:
article.published_at = datetime.datetime.now()
article.save()
return jsonify({
'message': '文章更新成功',
'article': {
'id': article.id,
'title': article.title,
'slug': article.slug,
'published': article.published
}
})
except IntegrityError:
return jsonify({'error': '文章slug已存在'}), 409
@app.route('/api/articles/<int:article_id>', methods=['DELETE'])
@login_required
defdelete_article(article_id):
"""删除文章"""
try:
article = Article.get_or_none(Article.id == article_id)
ifnot article:
return jsonify({'error': '文章不存在'}), 404
# 权限检查
if article.author.id != g.user.id andnot g.user.is_admin:
return jsonify({'error': '没有权限删除此文章'}), 403
article.delete_instance()
return jsonify({'message': '文章删除成功'}), 204
except Exception as e:
return jsonify({'error': str(e)}), 500
# 初始化数据库
definit_db():
"""初始化数据库"""
tables = [User, Article] # 添加其他模型
db.create_tables(tables, safe=True)
print("数据库表创建完成")
# 命令行工具
if __name__ == '__main__':
import sys
if len(sys.argv) > 1and sys.argv[1] == 'init':
init_db()
print("数据库初始化完成")
else:
init_db()
app.run(debug=True, host='0.0.0.0', port=5000)
"""
peewee_migrate.py - Peewee数据库迁移工具
用法:
python peewee_migrate.py init # 初始化迁移
python peewee_migrate.py create # 创建新迁移
python peewee_migrate.py up # 执行迁移
python peewee_migrate.py down # 回滚迁移
python peewee_migrate.py status # 查看状态
"""
import os
import sys
import datetime
from playhouse.migrate import *
from peewee import *
# 数据库配置
DATABASE = 'app.db'
db = SqliteDatabase(DATABASE)
# 迁移历史表
classMigrationHistory(Model):
name = CharField(unique=True)
applied_at = DateTimeField(default=datetime.datetime.now)
classMeta:
database = db
table_name = 'migration_history'
# 迁移目录
MIGRATIONS_DIR = 'migrations'
defensure_migrations_dir():
"""确保迁移目录存在"""
ifnot os.path.exists(MIGRATIONS_DIR):
os.makedirs(MIGRATIONS_DIR)
definit_migration():
"""初始化迁移系统"""
ensure_migrations_dir()
db.create_tables([MigrationHistory], safe=True)
print("迁移系统初始化完成")
defcreate_migration(name):
"""创建新迁移文件"""
ensure_migrations_dir()
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{timestamp}_{name}.py"
filepath = os.path.join(MIGRATIONS_DIR, filename)
template = f'''"""
迁移: {name}
创建时间: {datetime.datetime.now()}
"""
from playhouse.migrate import *
from peewee import *
def up(migrator):
"""执行迁移"""
# 添加你的迁移代码
# 示例:
# migrate(
# migrator.add_column('table_name', 'column_name', CharField(null=True))
# )
pass
def down(migrator):
"""回滚迁移"""
# 添加回滚代码
# 示例:
# migrate(
# migrator.drop_column('table_name', 'column_name')
# )
pass
'''
with open(filepath, 'w', encoding='utf-8') as f:
f.write(template)
print(f"迁移文件创建成功: {filename}")
defget_applied_migrations():
"""获取已应用的迁移"""
return {m.name for m in MigrationHistory.select()}
defget_migration_files():
"""获取所有迁移文件"""
ifnot os.path.exists(MIGRATIONS_DIR):
return []
files = []
for f in os.listdir(MIGRATIONS_DIR):
if f.endswith('.py') and f != '__init__.py':
files.append(f)
return sorted(files)
defrun_migrations(direction='up'):
"""执行迁移"""
ensure_migrations_dir()
applied = get_applied_migrations()
migration_files = get_migration_files()
migrator = SqliteMigrator(db)
if direction == 'up':
files_to_run = [f for f in migration_files if f[:-3] notin applied]
else: # down
files_to_run = [f for f in reversed(migration_files) if f[:-3] in applied]
ifnot files_to_run:
print("没有需要执行的迁移")
return
for filename in files_to_run:
migration_name = filename[:-3]
# 动态导入迁移模块
import importlib.util
filepath = os.path.join(MIGRATIONS_DIR, filename)
spec = importlib.util.spec_from_file_location(migration_name, filepath)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
if direction == 'up':
print(f"执行迁移: {migration_name}")
module.up(migrator)
MigrationHistory.create(name=migration_name)
print(f" 完成")
else:
print(f"回滚迁移: {migration_name}")
module.down(migrator)
MigrationHistory.delete().where(MigrationHistory.name == migration_name).execute()
print(f" 完成")
except Exception as e:
print(f"迁移失败 {migration_name}: {e}")
sys.exit(1)
defshow_status():
"""显示迁移状态"""
applied = get_applied_migrations()
migration_files = get_migration_files()
print("迁移状态:")
print("-" * 80)
for filename in migration_files:
migration_name = filename[:-3]
status = "已应用"if migration_name in applied else"未应用"
print(f"{migration_name:50} [{status}]")
defmain():
"""主函数"""
if len(sys.argv) < 2:
print("用法: python peewee_migrate.py [init|create|up|down|status]")
sys.exit(1)
command = sys.argv[1]
if command == 'init':
init_migration()
elif command == 'create':
if len(sys.argv) < 3:
print("用法: python peewee_migrate.py create <迁移名称>")
sys.exit(1)
create_migration(sys.argv[2])
elif command == 'up':
run_migrations('up')
elif command == 'down':
run_migrations('down')
elif command == 'status':
show_status()
else:
print(f"未知命令: {command}")
if __name__ == '__main__':
main()
"""
Peewee开发最佳实践:
1. 数据库设计:
- 为每个表定义明确的模型
- 使用合适的数据类型和约束
- 创建必要的索引
- 使用外键维护数据完整性
2. 模型设计:
- 创建基础模型类
- 使用有意义的字段名
- 添加适当的验证
- 实现模型方法封装业务逻辑
3. 查询优化:
- 使用select_related/prefetch避免N+1
- 只选择需要的字段
- 合理使用索引
- 批量操作使用事务
4. 事务管理:
- 使用atomic()装饰器
- 保持事务简短
- 正确处理异常
5. 安全性:
- 使用参数化查询(Peewee自动处理)
- 验证用户输入
- 限制查询结果数量
- 实施适当的权限控制
6. 测试:
- 使用内存数据库进行测试
- 编写单元测试
- 测试边界条件
- 性能测试关键查询
7. 维护:
- 定期备份数据库
- 监控慢查询
- 清理无用数据
- 更新索引统计
"""
# 配置示例
classDatabaseConfig:
"""数据库配置类"""
@staticmethod
defget_database(env='development'):
"""根据环境获取数据库配置"""
configs = {
'development': {
'database': 'dev.db',
'pragmas': {
'journal_mode': 'wal',
'cache_size': -2000,
'foreign_keys': 1,
'ignore_check_constraints': 0,
'synchronous': 0
}
},
'testing': {
'database': ':memory:',
'pragmas': {
'journal_mode': 'memory',
'synchronous': 'off'
}
},
'production': {
'database': 'prod.db',
'pragmas': {
'journal_mode': 'wal',
'cache_size': -10000,
'foreign_keys': 1,
'synchronous': 'normal'
}
}
}
config = configs.get(env, configs['development'])
# 创建数据库
db = SqliteDatabase(
config['database'],
pragmas=config.get('pragmas', {})
)
# 生产环境使用连接池
if env == 'production':
from playhouse.pool import PooledSqliteDatabase
db = PooledSqliteDatabase(
config['database'],
max_connections=20,
stale_timeout=300,
pragmas=config.get('pragmas', {})
)
return db
# 错误处理示例
classDatabaseErrorHandler:
"""数据库错误处理"""
@staticmethod
defhandle_error(e):
"""处理数据库错误"""
if isinstance(e, IntegrityError):
if'UNIQUE'in str(e):
return"数据重复,请检查输入"
elif'FOREIGN KEY'in str(e):
return"关联数据不存在"
else:
return"数据完整性错误"
elif isinstance(e, OperationalError):
return"数据库操作错误,请稍后重试"
elif isinstance(e, DoesNotExist):
return"数据不存在"
else:
return"系统错误,请联系管理员"
# 性能监控装饰器
import time
from functools import wraps
defquery_performance_monitor(threshold=1.0):
"""查询性能监控装饰器"""
defdecorator(func):
@wraps(func)
defwrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start_time
if elapsed > threshold:
print(f"⚠️ 慢查询警告: {func.__name__} 耗时 {elapsed:.3f}秒")
# 可以记录到日志或监控系统
return result
return wrapper
return decorator
# 使用示例
@query_performance_monitor(threshold=0.5)
defget_complex_report():
"""生成复杂报表(需要监控性能)"""
# 复杂查询逻辑
pass
"""
生产环境部署注意事项:
1. 数据库配置:
- 使用连接池
- 配置合理的超时时间
- 启用WAL模式(SQLite)
- 设置适当的缓存大小
2. 数据安全:
- 定期备份
- 加密敏感数据
- 实施访问控制
- 监控异常访问
3. 性能优化:
- 创建必要的索引
- 优化复杂查询
- 使用缓存
- 定期清理历史数据
4. 监控和日志:
- 记录慢查询
- 监控连接数
- 记录错误日志
- 设置告警
5. 迁移策略:
- 使用版本控制管理迁移
- 测试迁移脚本
- 准备回滚方案
- 备份迁移前的数据
"""
# 生产环境配置文件示例
classProductionConfig:
"""生产环境配置"""
# 数据库配置
DATABASE = {
'name': '/data/app/production.db',
'engine': 'playhouse.pool.PooledSqliteDatabase',
'max_connections': 32,
'stale_timeout': 300,
'timeout': 10,
'pragmas': {
'journal_mode': 'wal',
'cache_size': -20000, # 20MB
'foreign_keys': 1,
'synchronous': 'normal',
'temp_store': 'memory',
'mmap_size': 268435456# 256MB
}
}
# 连接池配置
CONNECTION_POOL = {
'max_connections': 32,
'stale_timeout': 300, # 5分钟
'timeout': 10# 10秒超时
}
# 查询超时配置
QUERY_TIMEOUT = 30# 秒
# 备份配置
BACKUP = {
'enabled': True,
'interval': 3600, # 每小时备份一次
'retention': 7, # 保留7天
'location': '/backups'
}
# 监控配置
MONITORING = {
'slow_query_threshold': 1.0, # 秒
'log_queries': False,
'max_result_size': 10000# 最大返回结果数
}
# 健康检查端点
@app.route('/api/db/health', methods=['GET'])
defdb_health_check():
"""数据库健康检查"""
from peewee import fn
try:
# 测试连接
db.connect(reuse_if_open=True)
# 测试查询
result = db.execute_sql('SELECT 1').fetchone()
# 检查表状态
table_count = db.execute_sql(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table'"
).fetchone()[0]
# 检查连接数(如果是连接池)
if hasattr(db, '_in_use'):
connections_in_use = len(db._in_use)
else:
connections_in_use = 1
db.close()
return jsonify({
'status': 'healthy',
'database': 'connected',
'tables': table_count,
'connections_in_use': connections_in_use,
'timestamp': datetime.datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.datetime.now().isoformat()
}), 500
# 慢查询日志中间件
classSlowQueryLogger:
"""慢查询日志记录器"""
def__init__(self, threshold=1.0):
self.threshold = threshold
self.logger = logging.getLogger('peewee.slow_queries')
def__call__(self, sql, duration):
if duration > self.threshold:
self.logger.warning(
f"慢查询 detected: {duration:.3f}s\n"
f"SQL: {sql}"
)
# 配置Peewee使用慢查询日志
import logging
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('peewee')
# 设置慢查询日志
from playhouse.sqlite_ext import SqliteExtDatabase
db = SqliteExtDatabase(
'app.db',
pragmas={
'journal_mode': 'wal',
'cache_size': -10000
}
)
# 注册查询日志回调
db.set_sqlite_logger(SlowQueryLogger(threshold=0.5))
从今天开始实践:
pip install peewee记住:最好的学习方式是实践!
现在,开始你的Peewee之旅吧!
END
资料领取

关注公众号,后台回复“资料领取”或点击“资料领取”菜单即可免费获取“软件测试”、“Python开发”相关资料~