当前位置:首页>python>Python Peewee终极秘籍:比SQLite3快10倍的ORM开发,从青铜到王者!

Python Peewee终极秘籍:比SQLite3快10倍的ORM开发,从青铜到王者!

  • 2026-01-23 06:05:35
Python Peewee终极秘籍:比SQLite3快10倍的ORM开发,从青铜到王者!

点击蓝字 关注我们

引言:为什么选择 Peewee?

Peewee 是一个轻量级、表达力强、Pythonic 的 ORM(对象关系映射)库。与 Django ORM 和 SQLAlchemy 相比,Peewee 更简单、更直观:

from peewee import *

db = SqliteDatabase('people.db')

classPerson(Model):
    name = CharField()
    birthday = DateField()

classMeta:
        database = db

# 创建表
db.create_tables([Person])

# 插入数据
Person.create(name='Alice', birthday=date(199011))

# 查询数据
alice = Person.select().where(Person.name == 'Alice').get()

这就是 Peewee 的魅力:简单直观,功能强大!


第一章:安装与基础配置

1.1 安装与环境搭建

# 基础安装
pip install peewee

# 安装可选扩展
pip install peewee[postgresql]  # PostgreSQL支持
pip install peewee[mysql]       # MySQL支持
pip install peewee[playhouse]   # 扩展功能
pip install peewee[cockroach]   # CockroachDB支持

1.2 数据库连接配置

from peewee import *
import datetime

# 1. SQLite 连接(最简单)
sqlite_db = SqliteDatabase('my_app.db')
sqlite_db = SqliteDatabase(':memory:')  # 内存数据库

# 2. PostgreSQL 连接
pg_db = PostgresqlDatabase(
'my_database',
    user='postgres',
    password='secret',
    host='localhost',
    port=5432
)

# 3. MySQL 连接
mysql_db = MySQLDatabase(
'my_database',
    user='root',
    password='secret',
    host='localhost',
    port=3306
)

# 4. CockroachDB 连接
cockroach_db = CockroachDatabase(
'my_database',
    user='root',
    password='secret',
    host='localhost',
    port=26257
)

# 5. 使用连接池(生产环境推荐)
from playhouse.pool import PooledPostgresqlDatabase

pooled_db = PooledPostgresqlDatabase(
'my_database',
    max_connections=20,
    stale_timeout=300,  # 5分钟
    user='postgres',
    password='secret'
)

# 6. 动态数据库配置
classConfig:
    DATABASE = {
'name''app.db',
'engine''peewee.SqliteDatabase',
'check_same_thread'False
    }

defget_database(config):
"""根据配置创建数据库连接"""
    db_class = getattr(peewee, config['engine'].split('.')[-1])
return db_class(**{k: v for k, v in config.items() if k != 'engine'})

# 7. 连接生命周期管理
classDatabaseManager:
def__init__(self, db_config):
        self.db = get_database(db_config)
        self.is_connected = False

defconnect(self):
ifnot self.is_connected:
            self.db.connect()
            self.is_connected = True

defclose(self):
if self.is_connected:
            self.db.close()
            self.is_connected = False

def__enter__(self):
        self.connect()
return self

def__exit__(self, exc_type, exc_val, exc_tb):
        self.close()

1.3 基础模型定义

from peewee import *
import datetime

# 定义基础模型(所有模型继承)
classBaseModel(Model):
"""所有模型的基类"""
    created_at = DateTimeField(default=datetime.datetime.now)
    updated_at = DateTimeField(default=datetime.datetime.now)

classMeta:
        database = db  # 将在后面设置

defsave(self, *args, **kwargs):
"""保存时自动更新updated_at"""
        self.updated_at = datetime.datetime.now()
return super().save(*args, **kwargs)

# 用户模型
classUser(BaseModel):
    username = CharField(max_length=50, unique=True, index=True)
    email = CharField(max_length=100, unique=True, index=True)
    password_hash = CharField(max_length=255)
    is_active = BooleanField(default=True)
    is_admin = BooleanField(default=False)

classMeta:
        table_name = 'users'# 自定义表名

def__str__(self):
returnf'User({self.username})'

# 文章模型
classArticle(BaseModel):
    title = CharField(max_length=200)
    slug = CharField(max_length=200, unique=True, index=True)
    content = TextField()
    published = BooleanField(default=False)
    published_at = DateTimeField(null=True)
    view_count = IntegerField(default=0)

# 外键关系
    author = ForeignKeyField(User, backref='articles')

classMeta:
        table_name = 'articles'
        indexes = (
# 复合索引
            (('published''published_at'), False),
        )

defpublish(self):
"""发布文章"""
        self.published = True
        self.published_at = datetime.datetime.now()
        self.save()

defincrement_views(self):
"""增加阅读计数"""
        self.view_count += 1
        self.save()

# 评论模型
classComment(BaseModel):
    content = TextField()
    is_approved = BooleanField(default=False)

# 多外键关系
    article = ForeignKeyField(Article, backref='comments')
    user = ForeignKeyField(User, backref='comments')
    parent = ForeignKeyField('self', backref='replies', null=True)  # 自引用

classMeta:
        table_name = 'comments'
        indexes = (
            (('article''created_at'), False),
        )

# 标签模型(多对多关系)
classTag(BaseModel):
    name = CharField(max_length=50, unique=True)
    slug = CharField(max_length=50, unique=True)
    description = TextField(null=True)

# 文章标签关联表(多对多)
classArticleTag(BaseModel):
    article = ForeignKeyField(Article, backref='tags')
    tag = ForeignKeyField(Tag, backref='articles')

classMeta:
        table_name = 'article_tags'
# 复合主键
        primary_key = CompositeKey('article''tag')

# 设置数据库
db = SqliteDatabase('blog.db')
BaseModel._meta.database = db

# 创建所有表
defcreate_tables():
    tables = [User, Article, Comment, Tag, ArticleTag]
    db.create_tables(tables)
    print("所有表创建完成!")

if __name__ == '__main__':
    create_tables()

第二章:字段类型详解

2.1 基本字段类型

from peewee import *
import datetime
import decimal
import uuid

classFieldDemo(Model):
"""字段类型演示模型"""

# 1. 文本字段
    char_field = CharField(max_length=100, default='default')
    text_field = TextField()
    fixed_char_field = FixedCharField(max_length=10)  # 定长字符串

# 2. 数值字段
    integer_field = IntegerField(default=0)
    big_integer_field = BigIntegerField()
    small_integer_field = SmallIntegerField()
    auto_field = AutoField()  # 自增主键
    float_field = FloatField()
    double_field = DoubleField()

# 3. Decimal字段(精确小数)
    decimal_field = DecimalField(
        max_digits=10,  # 最大位数
        decimal_places=2,  # 小数位数
        auto_round=True,
        rounding=decimal.ROUND_HALF_EVEN
    )

# 4. 布尔字段
    boolean_field = BooleanField(default=False)

# 5. 日期时间字段
    date_field = DateField(default=datetime.date.today)
    datetime_field = DateTimeField(default=datetime.datetime.now)
    time_field = TimeField(default=datetime.time)

# 6. 特殊字段
    uuid_field = UUIDField(default=uuid.uuid4)  # UUID字段
    binary_field = BlobField()  # 二进制字段
    json_field = JSONField(default=dict)  # JSON字段(需要SQLite 3.9+)

# 7. 选择字段
    STATUS_CHOICES = [
        ('draft''草稿'),
        ('published''已发布'),
        ('archived''已归档')
    ]

    status = CharField(
        choices=STATUS_CHOICES,
        default='draft'
    )

# 枚举字段(需要SQLite 3.9+)
    COLOR_ENUM = ['red''green''blue']
    color = CharField(choices=[(c, c) for c in COLOR_ENUM])

classMeta:
        database = db
        table_name = 'field_demo'

2.2 字段参数详解

classFieldParameters(Model):
"""字段参数演示"""

# 1. 约束参数
    required_field = CharField(
        null=False,           # 不允许NULL(默认)
        unique=True,          # 唯一约束
        index=True,           # 创建索引
        default='default',    # 默认值
        constraints=[Check('LENGTH(required_field) > 0')]  # 检查约束
    )

# 2. 验证参数
    email_field = CharField(
        max_length=255,
        constraints=[SQL("CHECK(email_field LIKE '%@%')")]
    )

    age_field = IntegerField(
        constraints=[
            Check('age_field >= 0'),
            Check('age_field <= 150')
        ]
    )

# 3. 数据库特定参数
    text_field = TextField(
        collation='NOCASE'# SQLite不区分大小写的排序规则
    )

# 4. 自增字段
    id = AutoField(primary_key=True)  # 主键自增

# 5. 外键参数
# 注意:外键字段在关系部分详细说明

classMeta:
        database = db

2.3 自定义字段类型

from peewee import Field
import json
import hashlib

classJSONField(Field):
"""自定义JSON字段"""
    field_type = 'TEXT'

defdb_value(self, value):
"""Python值 -> 数据库值"""
if value isNone:
returnNone
return json.dumps(value, ensure_ascii=False)

defpython_value(self, value):
"""数据库值 -> Python值"""
if value isNone:
returnNone
if isinstance(value, str):
return json.loads(value)
return value

classPasswordField(CharField):
"""密码字段(自动哈希)"""

def__init__(self, *args, **kwargs):
        kwargs['max_length'] = 255
        super().__init__(*args, **kwargs)

defdb_value(self, value):
"""存储时自动哈希"""
if value isNone:
returnNone
if value.startswith('$2b$'):  # 已经是bcrypt哈希
return value
return hashlib.sha256(value.encode()).hexdigest()

classMoneyField(DecimalField):
"""货币字段"""

def__init__(self, *args, **kwargs):
        kwargs.setdefault('max_digits'15)
        kwargs.setdefault('decimal_places'2)
        super().__init__(*args, **kwargs)

classStatusField(IntegerField):
"""状态字段(带描述)"""

    STATUS_MAP = {
0'draft',
1'pending',
2'approved',
3'rejected'
    }

    REVERSE_STATUS_MAP = {v: k for k, v in STATUS_MAP.items()}

defdb_value(self, value):
"""支持字符串状态"""
if isinstance(value, str):
return self.REVERSE_STATUS_MAP.get(value, 0)
return value

defpython_value(self, value):
"""返回字符串状态"""
return self.STATUS_MAP.get(value, 'unknown')

# 使用自定义字段
classProduct(Model):
    id = AutoField()
    name = CharField(max_length=100)
    price = MoneyField()
    metadata = JSONField(default=dict)  # 使用自定义JSON字段
    status = StatusField(default='draft')  # 使用自定义状态字段

classMeta:
        database = db

第三章:查询操作详解

3.1 基础查询操作

from peewee import *
import datetime

# 创建测试数据
defcreate_test_data():
"""创建测试数据"""
    db.create_tables([User, Article, Comment, Tag, ArticleTag])

# 创建用户
    users = []
for i in range(16):
        user = User.create(
            username=f'user{i}',
            email=f'user{i}@example.com',
            password_hash=f'hash{i}',
            is_active=(i != 3)  # 第3个用户不活跃
        )
        users.append(user)

# 创建标签
    tags = []
for tag_name in ['Python''Django''Flask''Database''Web']:
        tag = Tag.create(
            name=tag_name,
            slug=tag_name.lower()
        )
        tags.append(tag)

# 创建文章
    articles = []
for i in range(111):
        article = Article.create(
            title=f'文章标题 {i}',
            slug=f'article-{i}',
            content=f'这是第 {i} 篇文章的内容...',
            published=(i % 2 == 0),  # 偶数文章发布
            published_at=datetime.datetime.now() - datetime.timedelta(days=i),
            author=users[i % len(users)],
            view_count=i * 10
        )
        articles.append(article)

# 关联标签
for j in range(i % 3 + 1):  # 每篇文章1-3个标签
            ArticleTag.create(article=article, tag=tags[j])

# 创建评论
for i, article in enumerate(articles):
for j in range(i % 4):  # 每篇文章0-3条评论
            Comment.create(
                content=f'评论内容 {j+1}',
                article=article,
                user=users[j % len(users)],
                is_approved=(j % 2 == 0)
            )

# 基础查询示例
defbasic_queries():
"""基础查询操作"""

    print("=== 基础查询 ===")

# 1. 获取所有记录
    all_users = list(User.select())
    print(f"1. 所有用户数量: {len(all_users)}")

# 2. 获取单条记录
try:
        user1 = User.get(User.id == 1)
        print(f"2. 用户1: {user1.username}")
except User.DoesNotExist:
        print("用户不存在")

# 3. 使用 get_or_none(推荐)
    user_none = User.get_or_none(User.id == 999)
    print(f"3. 不存在的用户: {user_none}")

# 4. 条件查询
    active_users = list(User.select().where(User.is_active == True))
    print(f"4. 活跃用户数量: {len(active_users)}")

# 5. 多条件查询
from peewee import fn
    recent_articles = list(Article.select().where(
        (Article.published == True) &
        (Article.published_at > datetime.datetime.now() - datetime.timedelta(days=7))
    ))
    print(f"5. 最近7天发布的文章: {len(recent_articles)}")

# 6. OR 条件
    articles = list(Article.select().where(
        (Article.author == 1) |
        (Article.author == 2)
    ))
    print(f"6. 作者1或2的文章: {len(articles)}")

# 7. IN 查询
    user_ids = [123]
    users = list(User.select().where(User.id << user_ids))
    print(f"7. ID在[1,2,3]的用户: {len(users)}")

# 8. NOT IN 查询
    excluded_users = list(User.select().where(~(User.id << [45])))
    print(f"8. ID不在[4,5]的用户: {len(excluded_users)}")

# 9. LIKE 查询
    python_articles = list(Article.select().where(
        Article.title % '%Python%'
    ))
    print(f"9. 标题包含Python的文章: {len(python_articles)}")

# 10. 正则表达式查询(SQLite支持)
    users_start_with_u = list(User.select().where(
        User.username.regexp('^user[0-9]$')
    ))
    print(f"10. 用户名匹配正则的用户: {len(users_start_with_u)}")

# 11. IS NULL 查询
    articles_without_publish_date = list(Article.select().where(
        Article.published_at.is_null()
    ))
    print(f"11. 未设置发布时间文章: {len(articles_without_publish_date)}")

# 12. BETWEEN 查询
    mid_articles = list(Article.select().where(
        Article.view_count.between(3070)
    ))
    print(f"12. 阅读量30-70的文章: {len(mid_articles)}")

# 排序和限制
defordering_and_limits():
"""排序和限制"""

    print("\n=== 排序和限制 ===")

# 1. 简单排序
    articles_asc = list(Article.select().order_by(Article.view_count))
    print(f"1. 阅读量升序前3: {[a.view_count for a in articles_asc[:3]]}")

    articles_desc = list(Article.select().order_by(Article.view_count.desc()))
    print(f"   阅读量降序前3: {[a.view_count for a in articles_desc[:3]]}")

# 2. 多列排序
    articles = list(Article.select().order_by(
        Article.published.desc(),  # 先按发布状态
        Article.published_at.desc()  # 再按发布时间
    ))

# 3. 限制和偏移(分页)
    page = 2
    page_size = 3

    paginated = list(Article.select().order_by(Article.id).paginate(page, page_size))
    print(f"2. 第{page}页(每页{page_size}条): {[a.id for a in paginated]}")

# 4. 使用limit和offset
    limited = list(Article.select().order_by(Article.id).limit(5).offset(10))
    print(f"3. 限制5条,偏移10: {[a.id for a in limited]}")

# 5. 随机排序
    random_articles = list(Article.select().order_by(fn.Random()).limit(3))
    print(f"4. 随机3篇文章ID: {[a.id for a in random_articles]}")

# 聚合查询
defaggregate_queries():
"""聚合查询"""

    print("\n=== 聚合查询 ===")

# 1. 计数
    total_articles = Article.select().count()
    published_articles = Article.select().where(Article.published == True).count()
    print(f"1. 总文章数: {total_articles}, 已发布: {published_articles}")

# 2. 求和、平均、最大、最小
    stats = Article.select(
        fn.SUM(Article.view_count).alias('total_views'),
        fn.AVG(Article.view_count).alias('avg_views'),
        fn.MAX(Article.view_count).alias('max_views'),
        fn.MIN(Article.view_count).alias('min_views')
    ).get()

    print(f"2. 统计: 总阅读{stats.total_views}, 平均{stats.avg_views:.1f}, "
f"最高{stats.max_views}, 最低{stats.min_views}")

# 3. 分组查询
from peewee import fn
    author_stats = (Article
        .select(
            Article.author,
            fn.COUNT(Article.id).alias('article_count'),
            fn.SUM(Article.view_count).alias('total_views')
        )
        .group_by(Article.author)
        .order_by(fn.COUNT(Article.id).desc()))

    print("3. 作者文章统计:")
for stat in author_stats:
        print(f"   作者{stat.author_id}{stat.article_count}篇, "
f"{stat.total_views}次阅读")

# 4. HAVING 子句
    popular_authors = (Article
        .select(
            Article.author,
            fn.SUM(Article.view_count).alias('total_views')
        )
        .group_by(Article.author)
        .having(fn.SUM(Article.view_count) > 100)
        .order_by(fn.SUM(Article.view_count).desc()))

    print("4. 总阅读>100的作者:")
for author in popular_authors:
        print(f"   作者{author.author_id}{author.total_views}次阅读")

# 连接查询
defjoin_queries():
"""连接查询"""

    print("\n=== 连接查询 ===")

# 1. 内连接(默认)
    articles_with_author = (Article
        .select(Article, User)
        .join(User, on=(Article.author == User.id))
        .where(Article.published == True))

    print(f"1. 已发布文章及作者: {len(list(articles_with_author))}篇")

# 2. 左外连接
    all_users_with_articles = (User
        .select(User, Article)
        .join(Article, JOIN.LEFT_OUTER, on=(User.id == Article.author)))

# 3. 多表连接
    comments_with_details = (Comment
        .select(Comment, Article, User)
        .join(Article, on=(Comment.article == Article.id))
        .join(User, on=(Comment.user == User.id))
        .where(Comment.is_approved == True))

    print(f"2. 已批准评论及详情: {len(list(comments_with_details))}条")

# 4. 自连接(评论回复)
    comments_with_replies = (Comment
        .select(Comment, User)
        .join(User, on=(Comment.user == User.id))
        .where(Comment.parent.is_null(False)))  # 有父评论

# 5. 多对多连接
    articles_with_tags = (Article
        .select(Article, Tag)
        .join(ArticleTag, on=(Article.id == ArticleTag.article))
        .join(Tag, on=(ArticleTag.tag == Tag.id))
        .where(Tag.name == 'Python'))

    print(f"3. 带有Python标签的文章: {len(list(articles_with_tags))}篇")

# 子查询和复杂查询
defsubqueries_and_complex():
"""子查询和复杂查询"""

    print("\n=== 子查询和复杂查询 ===")

# 1. 标量子查询
    subquery = Article.select(fn.AVG(Article.view_count))
    above_avg_articles = list(Article.select().where(
        Article.view_count > subquery
    ))
    print(f"1. 阅读量高于平均的文章: {len(above_avg_articles)}篇")

# 2. IN 子查询
    active_authors = User.select().where(User.is_active == True)
    active_author_articles = list(Article.select().where(
        Article.author << active_authors
    ))
    print(f"2. 活跃作者的文章: {len(active_author_articles)}篇")

# 3. EXISTS 子查询
    authors_with_articles = list(User.select().where(
        fn.EXISTS(Article.select().where(Article.author == User.id))
    ))
    print(f"3. 发表过文章的作者: {len(authors_with_articles)}人")

# 4. 条件表达式(CASE WHEN)
    article_categories = (Article
        .select(
            Article.title,
            fn.Case(None, [
                (Article.view_count > 50'热门'),
                (Article.view_count > 20'一般'),
                (Article.view_count > 0'冷门')
            ], '无人阅读').alias('category')
        ))

    print("4. 文章分类:")
for article in article_categories.limit(5):
        print(f"   {article.title}{article.category}")

# 5. 窗口函数(需要SQLite 3.25+)
# 为每个作者的文章按阅读量排名
    ranked_articles = (Article
        .select(
            Article,
            User.username,
            fn.ROW_NUMBER().over(
                partition_by=Article.author,
                order_by=[Article.view_count.desc()]
            ).alias('rank')
        )
        .join(User, on=(Article.author == User.id)))

    print("5. 作者文章排名(示例):")
for article in ranked_articles.limit(3):
        print(f"   {article.username}的文章'{article.title}' "
f"排名第{article.rank}")

if __name__ == '__main__':
    db = SqliteDatabase(':memory:')

# 设置数据库
for model in [User, Article, Comment, Tag, ArticleTag]:
        model._meta.database = db

# 创建测试数据
    create_test_data()

# 运行查询示例
    basic_queries()
    ordering_and_limits()
    aggregate_queries()
    join_queries()
    subqueries_and_complex()

3.2 查询优化技巧

defquery_optimization():
"""查询优化技巧"""

    print("=== 查询优化技巧 ===")

# 1. 只选择需要的字段(减少数据传输)
    print("1. 只选择需要的字段:")
    users_light = list(User.select(User.id, User.username).limit(5))
    print(f"   只选择id和username: {len(users_light)}条")

# 2. 使用iterator处理大量数据(节省内存)
    print("\n2. 使用iterator处理大量数据:")
    count = 0
for article in Article.select().iterator():
        count += 1
if count % 1000 == 0:
            print(f"   已处理 {count} 条...")
    print(f"   总共处理 {count} 条记录")

# 3. 使用prefetch避免N+1查询问题
    print("\n3. 使用prefetch避免N+1查询:")
from peewee import prefetch

# 错误方式:N+1查询
    print("   错误方式(N+1查询):")
for article in Article.select().limit(3):
        author = article.author  # 这里会执行额外查询!
        print(f"   文章: {article.title}, 作者: {author.username}")

# 正确方式:使用prefetch
    print("\n   正确方式(使用prefetch):")
    query = prefetch(
        Article.select().limit(3),
        User.select()
    )
for article in query:
        print(f"   文章: {article.title}, 作者: {article.author.username}")

# 4. 使用索引优化查询
    print("\n4. 索引优化:")

# 查看查询计划
    query = Article.select().where(Article.published == True)
    print(f"   查询计划: {query.sql()}")

# 5. 批量操作代替循环
    print("\n5. 批量操作:")

# 错误方式:循环中逐个保存
# for article in articles:
#     article.view_count += 1
#     article.save()

# 正确方式:批量更新
    Article.update(view_count=Article.view_count + 1).where(
        Article.published == True
    ).execute()
    print("   批量更新完成")

# 6. 使用事务处理批量操作
    print("\n6. 使用事务:")
with db.atomic():
for i in range(100):
            Article.create(
                title=f'批量文章 {i}',
                slug=f'batch-article-{i}',
                content='...',
                author=1
            )
    print("   批量插入完成(在事务中)")

# 7. 使用explain分析查询性能
    print("\n7. 使用EXPLAIN分析:")
if db.__class__.__name__ == 'SqliteDatabase':
        cursor = db.execute_sql('EXPLAIN QUERY PLAN SELECT * FROM article WHERE published = 1')
        print("   查询计划:")
for row in cursor.fetchall():
            print(f"     {row}")

第四章:关系与关联查询

4.1 外键关系详解

defforeign_key_relationships():
"""外键关系详解"""

    print("=== 外键关系 ===")

# 1. 一对一关系
classUserProfile(Model):
        user = ForeignKeyField(User, primary_key=True)  # 一对一
        bio = TextField(null=True)
        avatar_url = CharField(null=True)
        website = CharField(null=True)

classMeta:
            database = db

# 2. 一对多关系
classArticle(Model):
# 已经在前面定义
pass

# 3. 多对多关系(通过中间表)
classArticleTag(Model):
# 已经在前面定义
pass

# 4. 自引用关系(树形结构)
classCategory(Model):
        name = CharField()
        parent = ForeignKeyField('self', null=True, backref='children')

classMeta:
            database = db

# 外键参数详解
classForeignKeyOptions(Model):
# on_delete 参数
        user_cascade = ForeignKeyField(
            User,
            on_delete='CASCADE'# 用户删除时,相关记录也删除
        )

        user_set_null = ForeignKeyField(
            User,
            on_delete='SET NULL',  # 用户删除时,外键设为NULL
            null=True
        )

        user_set_default = ForeignKeyField(
            User,
            on_delete='SET DEFAULT',  # 用户删除时,外键设为默认值
            default=1
        )

        user_restrict = ForeignKeyField(
            User,
            on_delete='RESTRICT'# 如果有相关记录,禁止删除用户
        )

# on_update 参数
        user_cascade_update = ForeignKeyField(
            User,
            on_update='CASCADE'# 用户ID更新时,外键同步更新
        )

# lazy_load 参数
        user_lazy = ForeignKeyField(
            User,
            lazy_load=False# 不自动加载关联对象
        )

classMeta:
            database = db

    print("外键关系模型定义完成")

# 关联查询实践
defrelationship_queries():
"""关联查询实践"""

    print("\n=== 关联查询实践 ===")

# 1. 正向查询(从文章查作者)
    print("1. 正向查询:")
    article = Article.select().first()
if article:
        author = article.author  # 自动加载作者
        print(f"   文章 '{article.title}' 的作者是: {author.username}")

# 2. 反向查询(从作者查文章)
    print("\n2. 反向查询:")
    author = User.select().first()
if author:
# 使用 backref
        author_articles = author.articles
        print(f"   作者 '{author.username}' 的文章数量: {author_articles.count()}")

# 条件过滤
        published_articles = author.articles.where(Article.published == True)
        print(f"   已发布文章数量: {published_articles.count()}")

# 3. 多对多查询
    print("\n3. 多对多查询:")
    tag = Tag.select().where(Tag.name == 'Python').first()
if tag:
# 获取所有带有Python标签的文章
        python_articles = tag.articles
        print(f"   带有'{tag.name}'标签的文章: {python_articles.count()}篇")

for article in python_articles.limit(3):
            print(f"     - {article.title}")

# 4. 链式查询
    print("\n4. 链式查询:")
# 获取活跃用户的所有已发布文章
    active_users_articles = (Article
        .select()
        .join(User)
        .where(
            (User.is_active == True) &
            (Article.published == True)
        ))
    print(f"   活跃用户的已发布文章: {active_users_articles.count()}篇")

# 5. 聚合关联查询
    print("\n5. 聚合关联查询:")
from peewee import fn

    author_stats = (User
        .select(
            User.username,
            fn.COUNT(Article.id).alias('article_count'),
            fn.SUM(Article.view_count).alias('total_views')
        )
        .join(Article, JOIN.LEFT_OUTER)
        .group_by(User.id)
        .order_by(fn.COUNT(Article.id).desc()))

    print("   作者统计:")
for stat in author_stats.limit(3):
        print(f"     {stat.username}{stat.article_count}篇文章, "
f"{stat.total_views or0}次阅读")

# 6. 递归查询(树形结构)
    print("\n6. 递归查询:")

# 创建类别数据
    db.create_tables([Category])

    root = Category.create(name='编程')
    web = Category.create(name='Web开发', parent=root)
    python = Category.create(name='Python', parent=web)
    django = Category.create(name='Django', parent=python)

# 获取所有子类别(递归)
defget_all_children(category, depth=0):
        children = []
for child in category.children:
            children.append(('  ' * depth + child.name, child))
            children.extend(get_all_children(child, depth + 1))
return children

    all_children = get_all_children(root)
    print("   类别树:")
for name, _ in all_children:
        print(f"     {name}")

# 预加载优化
defeager_loading():
"""预加载优化"""

    print("\n=== 预加载优化 ===")

from peewee import prefetch

# 1. 预加载一对多关系
    print("1. 预加载一对多:")

# 查询所有文章及其作者(避免N+1)
    articles_with_authors = prefetch(
        Article.select().limit(5),
        User.select()
    )

for article in articles_with_authors:
        print(f"   文章: {article.title}, 作者: {article.author.username}")

# 2. 预加载多对多关系
    print("\n2. 预加载多对多:")

    articles_with_tags = prefetch(
        Article.select().limit(3),
        ArticleTag.select(),
        Tag.select()
    )

for article in articles_with_tags:
        tag_names = [at.tag.name for at in article.tags_prefetch]
        print(f"   文章: {article.title}, 标签: {', '.join(tag_names)}")

# 3. 多层预加载
    print("\n3. 多层预加载:")

# 文章 -> 作者 -> 用户资料
classUserProfile(Model):
        user = ForeignKeyField(User, primary_key=True)
        bio = TextField()

classMeta:
            database = db

    db.create_tables([UserProfile])

# 创建测试数据
for user in User.select():
        UserProfile.create(user=user, bio=f'{user.username}的简介')

# 多层预加载查询
    articles_with_all = prefetch(
        Article.select().limit(3),
        User.select(),
        UserProfile.select()
    )

for article in articles_with_all:
        print(f"   文章: {article.title}")
        print(f"     作者: {article.author.username}")
        print(f"     简介: {article.author.userprofile.bio}")

# 4. 自定义预加载查询
    print("\n4. 自定义预加载:")

# 只预加载活跃作者的文章
    active_authors = User.select().where(User.is_active == True)
    articles_active_authors = prefetch(
        Article.select().limit(3),
        active_authors
    )

    print(f"   只预加载活跃作者的文章: {len(list(articles_active_authors))}篇")

第五章:高级特性

5.1 数据库迁移

from playhouse.migrate import *
import datetime

defdatabase_migrations():
"""数据库迁移"""

    print("=== 数据库迁移 ===")

# 1. 初始化迁移器
    migrator = SqliteMigrator(db)

# 2. 添加字段
    print("1. 添加字段:")

# 添加手机号字段
    migrate(
        migrator.add_column('user''phone', CharField(null=True, max_length=20))
    )
    print("   添加phone字段到user表")

# 3. 删除字段
# migrate(
#     migrator.drop_column('user', 'phone')
# )

# 4. 重命名字段
    print("2. 重命名字段:")
    migrate(
        migrator.rename_column('article''view_count''views')
    )
    print("   将view_count重命名为views")

# 5. 修改字段
    print("3. 修改字段:")
    migrate(
        migrator.alter_column_type('user''email', CharField(max_length=150))
    )
    print("   修改email字段长度为150")

# 6. 添加NOT NULL约束
# migrate(
#     migrator.add_not_null('user', 'email')
# )

# 7. 删除NOT NULL约束
# migrate(
#     migrator.drop_not_null('user', 'phone')
# )

# 8. 添加默认值
    print("4. 添加默认值:")
    migrate(
        migrator.add_default('article''views'0)
    )
    print("   为views字段添加默认值0")

# 9. 创建索引
    print("5. 创建索引:")
    migrate(
        migrator.add_index('article', ('published''published_at'), False)
    )
    print("   为article表添加复合索引")

# 10. 删除索引
# migrate(
#     migrator.drop_index('article', 'article_published_published_at')
# )

# 11. 重命名表
# migrate(
#     migrator.rename_table('article', 'posts')
# )

# 12. 复杂迁移示例
    print("\n6. 复杂迁移示例:")

# 添加新表
classArticleViewLog(Model):
        article = ForeignKeyField(Article)
        user = ForeignKeyField(User, null=True)
        viewed_at = DateTimeField(default=datetime.datetime.now)
        ip_address = CharField(null=True)

classMeta:
            database = db

    db.create_tables([ArticleViewLog])
    print("   创建文章浏览日志表")

# 迁移数据(将views字段迁移到新表)
    print("   迁移数据...")
with db.atomic():
for article in Article.select():
# 模拟创建浏览记录
for i in range(article.views or0):
                ArticleViewLog.create(
                    article=article,
                    viewed_at=datetime.datetime.now() - datetime.timedelta(hours=i)
                )

    print("   数据迁移完成")

# 信号系统
defsignals_system():
"""信号系统"""

    print("\n=== 信号系统 ===")

from playhouse.signals import Model, pre_save, post_save, pre_delete, post_delete

classSignalModel(Model):
        name = CharField()
        value = IntegerField(default=0)

classMeta:
            database = db

    @pre_save(sender=SignalModel)
defon_pre_save(model_class, instance, created):
        print(f"   [pre_save] 保存前: {instance.name}, 新建: {created}")
if created:
            instance.value = 100# 设置新实例的默认值

    @post_save(sender=SignalModel)
defon_post_save(model_class, instance, created):
        print(f"   [post_save] 保存后: {instance.name}, 新建: {created}")
if created:
            print(f"      新记录ID: {instance.id}")

    @pre_delete(sender=SignalModel)
defon_pre_delete(model_class, instance):
        print(f"   [pre_delete] 删除前: {instance.name}")

    @post_delete(sender=SignalModel)
defon_post_delete(model_class, instance):
        print(f"   [post_delete] 删除后: {instance.name}")

# 创建表
    db.create_tables([SignalModel])

# 测试信号
    print("1. 测试保存信号:")
    obj = SignalModel.create(name="测试对象")
    obj.name = "修改后的对象"
    obj.save()

    print("\n2. 测试删除信号:")
    obj.delete_instance()

    print("\n3. 批量操作信号:")
# 注意:批量操作不会触发信号
    SignalModel.insert_many([
        {'name''批量1''value'1},
        {'name''批量2''value'2}
    ]).execute()
    print("   批量插入完成(不触发信号)")

# 钩子方法
defhook_methods():
"""模型钩子方法"""

    print("\n=== 模型钩子方法 ===")

classHookModel(Model):
        name = CharField()
        data = TextField(default='{}')
        created_at = DateTimeField(default=datetime.datetime.now)
        updated_at = DateTimeField(default=datetime.datetime.now)

classMeta:
            database = db

def__init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self._original_data = self.data

defsave(self, *args, **kwargs):
# 保存前钩子
            self.before_save()
            result = super().save(*args, **kwargs)
# 保存后钩子
            self.after_save()
return result

defdelete_instance(self, *args, **kwargs):
# 删除前钩子
            self.before_delete()
            result = super().delete_instance(*args, **kwargs)
# 删除后钩子
            self.after_delete()
return result

defbefore_save(self):
"""保存前的逻辑"""
            self.updated_at = datetime.datetime.now()
            print(f"   [before_save] 更新updated_at: {self.name}")

defafter_save(self):
"""保存后的逻辑"""
            print(f"   [after_save] 保存完成: {self.name}")
if hasattr(self, '_original_data'and self.data != self._original_data:
                print(f"      数据已修改")

defbefore_delete(self):
"""删除前的逻辑"""
            print(f"   [before_delete] 准备删除: {self.name}")

defafter_delete(self):
"""删除后的逻辑"""
            print(f"   [after_delete] 删除完成: {self.name}")

        @classmethod
defbefore_create(cls):
"""类级别创建前钩子"""
            print(f"   [before_create] 准备创建新实例")

        @classmethod  
defafter_create(cls):
"""类级别创建后钩子"""
            print(f"   [after_create] 新实例创建完成")

# 创建表
    db.create_tables([HookModel])

# 测试钩子
    print("1. 测试实例钩子:")
    hook_obj = HookModel(name="钩子测试")
    hook_obj.save()
    hook_obj.data = '{"key": "value"}'
    hook_obj.save()
    hook_obj.delete_instance()

    print("\n2. 测试类钩子:")
    HookModel.before_create()
    obj = HookModel.create(name="类钩子测试")
    HookModel.after_create()

5.2 高级查询功能

defadvanced_query_features():
"""高级查询功能"""

    print("=== 高级查询功能 ===")

from peewee import Window, SQL
from playhouse.shortcuts import model_to_dict, dict_to_model

# 1. 窗口函数
    print("1. 窗口函数:")

# 为每个作者的文章按阅读量排名
    window = Window(
        partition_by=[Article.author],
        order_by=[Article.views.desc()]
    )

    ranked_articles = (Article
        .select(
            Article,
            User.username,
            fn.ROW_NUMBER().over(window).alias('rank'),
            fn.SUM(Article.views).over(window).alias('author_total_views')
        )
        .join(User, on=(Article.author == User.id))
        .order_by(Article.author, Article.views.desc()))

    print("   文章排名(窗口函数):")
for article in ranked_articles.limit(5):
        print(f"     {article.username}{article.title} "
f"(排名: {article.rank}, 作者总阅读: {article.author_total_views})")

# 2. 通用表表达式(CTE)
    print("\n2. 通用表表达式(CTE):")

# 定义CTE
    cte = (Article
        .select(Article.author, fn.SUM(Article.views).alias('total_views'))
        .group_by(Article.author)
        .cte('author_stats'))

# 使用CTE查询
    author_stats_query = (User
        .select(User.username, cte.c.total_views)
        .join(cte, on=(User.id == cte.c.author))
        .order_by(cte.c.total_views.desc()))

    print("   作者统计(CTE):")
for stat in author_stats_query.limit(3):
        print(f"     {stat.username}{stat.total_views}次阅读")

# 3. 递归CTE(树形结构)
    print("\n3. 递归CTE:")

# 创建组织架构表
classDepartment(Model):
        name = CharField()
        parent = ForeignKeyField('self', null=True, backref='children')

classMeta:
            database = db

    db.create_tables([Department])

# 创建测试数据
    company = Department.create(name='公司')
    hr = Department.create(name='人力资源部', parent=company)
    it = Department.create(name='技术部', parent=company)
    dev = Department.create(name='开发组', parent=it)
    test = Department.create(name='测试组', parent=it)

# 递归CTE查询所有子部门
    Base = Department.alias()
    Recursive = Department.alias()

    cte = (Base
        .select(Base.id, Base.name, Base.parent, SQL('1').alias('level'))
        .where(Base.parent.is_null())
        .cte('departments_recursive', recursive=True))

    cte = cte.union_all(
        Recursive
        .select(Recursive.id, Recursive.name, Recursive.parent, cte.c.level + 1)
        .join(cte, on=(Recursive.parent == cte.c.id))
    )

    departments_tree = (cte
        .select_from(cte.c.name, cte.c.level)
        .order_by(cte.c.level, cte.c.name))

    print("   部门树(递归CTE):")
for dept in departments_tree:
        indent = '  ' * (dept.level - 1)
        print(f"     {indent}{dept.name}")

# 4. 全文搜索(SQLite FTS5)
    print("\n4. 全文搜索:")

# 创建虚拟表
classArticleFTS(Model):
        content = TextField()

classMeta:
            database = db
            table_name = 'article_fts'
# 使用FTS5扩展
            options = {'tokenize''porter'}  # 使用Porter词干分析器

# 创建虚拟表(实际项目需要处理同步)
try:
        db.execute_sql('CREATE VIRTUAL TABLE IF NOT EXISTS article_fts USING fts5(title, content)')

# 插入测试数据
        db.execute_sql('''
            INSERT INTO article_fts (title, content)
            SELECT title, content FROM article
        '''
)

# 全文搜索查询
        search_term = 'Python'
        results = db.execute_sql('''
            SELECT title, snippet(article_fts, 2, '<b>', '</b>', '...', 64) as snippet
            FROM article_fts
            WHERE article_fts MATCH ?
            ORDER BY rank
        '''
, (search_term,))

        print(f"   搜索 '{search_term}' 结果:")
for row in results.fetchall():
            print(f"     {row[0]}{row[1]}")
except Exception as e:
        print(f"   全文搜索需要SQLite FTS5支持: {e}")

# 5. 模型转换
    print("\n5. 模型转换:")

    article = Article.select().first()
if article:
# 模型转字典
        article_dict = model_to_dict(article)
        print(f"   模型转字典: {article_dict['title']}")

# 包含关联对象
        article_with_author = model_to_dict(article, backrefs=True)
        print(f"   包含关联: 作者={article_with_author.get('author', {}).get('username')}")

# 字典转模型
        new_article_data = {
'title''新文章',
'slug''new-article',
'content''内容',
'author'1
        }

# 注意:这不会保存到数据库
        new_article = dict_to_model(Article, new_article_data)
        print(f"   字典转模型: {new_article.title}")

# 6. 自定义查询构建
    print("\n6. 自定义查询构建:")

defbuild_article_query(filters):
"""动态构建查询"""
        query = Article.select()

if'author_id'in filters:
            query = query.where(Article.author == filters['author_id'])

if'published'in filters:
            query = query.where(Article.published == filters['published'])

if'min_views'in filters:
            query = query.where(Article.views >= filters['min_views'])

if'search'in filters:
            query = query.where(Article.title.contains(filters['search']))

if'order_by'in filters:
            order_field = getattr(Article, filters['order_by'], Article.id)
            query = query.order_by(order_field.desc())

return query

# 测试动态查询
    filters = {
'published'True,
'min_views'30,
'order_by''views'
    }

    dynamic_query = build_article_query(filters)
    print(f"   动态查询结果: {dynamic_query.count()}篇文章")

5.3 性能优化与监控

defperformance_optimization():
"""性能优化与监控"""

    print("=== 性能优化与监控 ===")

from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
import time

# 1. 数据库连接池
    print("1. 数据库连接池:")

from playhouse.pool import PooledSqliteDatabase

    pooled_db = PooledSqliteDatabase(
'pooled.db',
        max_connections=32,
        stale_timeout=300,
        check_same_thread=False
    )

    print(f"   连接池配置: 最大连接数=32, 超时=300秒")

# 2. 异步数据库(非阻塞)
    print("\n2. 异步数据库:")

# 使用SqliteQueueDatabase避免写入冲突
    queued_db = SqliteQueueDatabase(
'queued.db',
        use_gevent=False,  # 不使用gevent
        autostart=True,
        queue_max_size=64,
        results_timeout=5.0
    )

    print(f"   队列数据库: 队列大小=64, 超时=5秒")

# 3. 性能监控装饰器
    print("\n3. 性能监控:")

defquery_timer(func):
"""查询计时装饰器"""
defwrapper(*args, **kwargs):
            start = time.perf_counter()
            result = func(*args, **kwargs)
            elapsed = time.perf_counter() - start
            print(f"   查询耗时: {elapsed:.4f}秒 - {func.__name__}")
return result
return wrapper

    @query_timer
defslow_query():
"""模拟慢查询"""
return list(Article.select().where(
            Article.title.contains('Python')
        ))

# 执行慢查询
    results = slow_query()

# 4. 查询分析
    print("\n4. 查询分析:")

# 查看生成的SQL
    query = Article.select().where(Article.published == True)
    print(f"   SQL语句: {query.sql()}")

# 查看查询计划(SQLite)
if db.__class__.__name__ == 'SqliteDatabase':
        cursor = db.execute_sql('EXPLAIN QUERY PLAN SELECT * FROM article WHERE published = 1')
        print("   查询计划:")
for row in cursor.fetchall():
            print(f"     {row}")

# 5. 批量操作优化
    print("\n5. 批量操作优化:")

# 批量插入优化
defbatch_insert_performance():
"""批量插入性能测试"""

        num_records = 1000

# 方式1:循环插入(慢)
        start = time.perf_counter()
with db.atomic():
for i in range(num_records):
                Article.create(
                    title=f'文章 {i}',
                    slug=f'article-{i}',
                    content='...',
                    author=1
                )
        loop_time = time.perf_counter() - start

# 方式2:批量插入(快)
        data = [
            {'title'f'批量 {i}''slug'f'batch-{i}''content''...''author'1}
for i in range(num_records)
        ]

        start = time.perf_counter()
with db.atomic():
            Article.insert_many(data).execute()
        batch_time = time.perf_counter() - start

        print(f"   循环插入: {loop_time:.3f}秒 ({num_records/loop_time:.1f}条/秒)")
        print(f"   批量插入: {batch_time:.3f}秒 ({num_records/batch_time:.1f}条/秒)")
        print(f"   性能提升: {loop_time/batch_time:.1f}倍")

    batch_insert_performance()

# 6. 索引优化
    print("\n6. 索引优化:")

# 检查现有索引
    cursor = db.execute_sql("SELECT name, sql FROM sqlite_master WHERE type='index'")
    indexes = cursor.fetchall()

    print(f"   现有索引数量: {len(indexes)}")
for idx_name, idx_sql in indexes[:3]:  # 显示前3个
        print(f"     {idx_name}")

# 建议索引
    print("\n   建议索引:")
    suggestions = [
"article(published, published_at) - 用于已发布文章查询",
"article(author, published) - 用于作者文章查询",
"comment(article, created_at) - 用于文章评论查询"
    ]

for suggestion in suggestions:
        print(f"     • {suggestion}")

# 7. 缓存优化
    print("\n7. 缓存优化:")

from playhouse.shortcuts import model_to_dict
import pickle

classQueryCache:
"""简单查询缓存"""

def__init__(self, ttl=300):# 默认5分钟
            self.cache = {}
            self.ttl = ttl
            self.timestamps = {}

defget(self, key):
"""获取缓存"""
if key in self.cache:
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.timestamps[key]
returnNone

defset(self, key, value):
"""设置缓存"""
            self.cache[key] = value
            self.timestamps[key] = time.time()

defclear(self):
"""清空缓存"""
            self.cache.clear()
            self.timestamps.clear()

# 使用缓存
    cache = QueryCache(ttl=60)  # 1分钟缓存

defget_popular_articles_cached():
"""获取热门文章(带缓存)"""
        cache_key = 'popular_articles'

# 尝试从缓存获取
        cached = cache.get(cache_key)
if cached isnotNone:
            print("   从缓存获取热门文章")
return cached

# 缓存未命中,执行查询
        print("   执行数据库查询")
        articles = list(Article
            .select()
            .where(Article.published == True)
            .order_by(Article.views.desc())
            .limit(10))

# 序列化并缓存
        serialized = [(a.id, a.title, a.views) for a in articles]
        cache.set(cache_key, serialized)

return serialized

# 测试缓存
    print("   第一次调用(缓存未命中):")
    result1 = get_popular_articles_cached()

    print("   第二次调用(缓存命中):")
    result2 = get_popular_articles_cached()

    print(f"   结果相同: {result1 == result2}")

第六章:实际项目应用

6.1 Flask + Peewee Web应用

from flask import Flask, request, jsonify, g
from peewee import *
import datetime
import jwt
import os
from functools import wraps

# Flask应用配置
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY''dev-secret-key')
app.config['DATABASE'] = os.getenv('DATABASE_URL''app.db')

# 数据库配置
db = SqliteDatabase(app.config['DATABASE'])

classBaseModel(Model):
"""基础模型"""
    created_at = DateTimeField(default=datetime.datetime.now)
    updated_at = DateTimeField(default=datetime.datetime.now)

classMeta:
        database = db

defsave(self, *args, **kwargs):
        self.updated_at = datetime.datetime.now()
return super().save(*args, **kwargs)

# 用户模型
classUser(BaseModel):
    username = CharField(max_length=50, unique=True)
    email = CharField(max_length=100, unique=True)
    password_hash = CharField(max_length=255)
    is_active = BooleanField(default=True)
    is_admin = BooleanField(default=False)

classMeta:
        table_name = 'users'

defset_password(self, password):
"""设置密码"""
import hashlib
        self.password_hash = hashlib.sha256(password.encode()).hexdigest()

defcheck_password(self, password):
"""验证密码"""
import hashlib
return self.password_hash == hashlib.sha256(password.encode()).hexdigest()

defgenerate_token(self):
"""生成JWT token"""
        payload = {
'user_id': self.id,
'username': self.username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
        }
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')

# 文章模型
classArticle(BaseModel):
    title = CharField(max_length=200)
    slug = CharField(max_length=200, unique=True)
    content = TextField()
    published = BooleanField(default=False)
    published_at = DateTimeField(null=True)
    views = IntegerField(default=0)

    author = ForeignKeyField(User, backref='articles')

classMeta:
        table_name = 'articles'
        indexes = (
            (('published''published_at'), False),
        )

# Flask应用生命周期
@app.before_request
defbefore_request():
"""请求前连接数据库"""
    g.db = db
    g.db.connect()

@app.after_request
defafter_request(response):
"""请求后关闭数据库连接"""
if hasattr(g, 'db'):
        g.db.close()
return response

@app.teardown_request
defteardown_request(exception):
"""请求结束时清理"""
if hasattr(g, 'db'andnot g.db.is_closed():
        g.db.close()

# 认证装饰器
deflogin_required(f):
    @wraps(f)
defdecorated_function(*args, **kwargs):
        token = request.headers.get('Authorization')
ifnot token:
return jsonify({'error''认证令牌缺失'}), 401

try:
            token = token.replace('Bearer ''')
            payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            g.user_id = payload['user_id']
            g.user = User.get_or_none(User.id == g.user_id)

ifnot g.user ornot g.user.is_active:
return jsonify({'error''用户不存在或未激活'}), 401

except jwt.ExpiredSignatureError:
return jsonify({'error''令牌已过期'}), 401
except jwt.InvalidTokenError:
return jsonify({'error''无效令牌'}), 401

return f(*args, **kwargs)
return decorated_function

defadmin_required(f):
    @wraps(f)
    @login_required
defdecorated_function(*args, **kwargs):
ifnot g.user.is_admin:
return jsonify({'error''需要管理员权限'}), 403
return f(*args, **kwargs)
return decorated_function

# API路由
@app.route('/api/health', methods=['GET'])
defhealth_check():
"""健康检查"""
return jsonify({'status''healthy''timestamp': datetime.datetime.now().isoformat()})

@app.route('/api/register', methods=['POST'])
defregister():
"""用户注册"""
    data = request.get_json()

    required_fields = ['username''email''password']
ifnot data ornot all(field in data for field in required_fields):
return jsonify({'error''缺少必要字段'}), 400

try:
with db.atomic():
            user = User.create(
                username=data['username'],
                email=data['email']
            )
            user.set_password(data['password'])
            user.save()

        token = user.generate_token()

return jsonify({
'message''注册成功',
'user': {
'id': user.id,
'username': user.username,
'email': user.email
            },
'token': token
        }), 201

except IntegrityError as e:
if'username'in str(e):
return jsonify({'error''用户名已存在'}), 409
elif'email'in str(e):
return jsonify({'error''邮箱已存在'}), 409
return jsonify({'error''注册失败'}), 400

@app.route('/api/login', methods=['POST'])
deflogin():
"""用户登录"""
    data = request.get_json()

ifnot data or'username'notin data or'password'notin data:
return jsonify({'error''缺少用户名或密码'}), 400

    user = User.get_or_none(
        (User.username == data['username']) | 
        (User.email == data['username'])
    )

if user and user.check_password(data['password']) and user.is_active:
        token = user.generate_token()
return jsonify({
'message''登录成功',
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'is_admin': user.is_admin
            }
        })

return jsonify({'error''用户名或密码错误'}), 401

@app.route('/api/articles', methods=['GET'])
defget_articles():
"""获取文章列表"""
from peewee import fn

# 查询参数
    page = int(request.args.get('page'1))
    per_page = min(int(request.args.get('per_page'10)), 100)
    published_only = request.args.get('published''true').lower() == 'true'
    author_id = request.args.get('author_id')
    search = request.args.get('search')
    tag = request.args.get('tag')

# 构建查询
    query = Article.select(Article, User.username)

if published_only:
        query = query.where(Article.published == True)

if author_id:
        query = query.where(Article.author == author_id)

if search:
        query = query.where(Article.title.contains(search))

# 关联标签查询
if tag:
from models import Tag, ArticleTag
        query = (query
            .join(ArticleTag, on=(Article.id == ArticleTag.article))
            .join(Tag, on=(ArticleTag.tag == Tag.id))
            .where(Tag.name == tag))
else:
        query = query.join(User, on=(Article.author == User.id))

# 计算总数
    total = query.count()

# 分页
    articles = (query
        .order_by(Article.published_at.desc())
        .paginate(page, per_page))

# 构建响应
    articles_data = []
for article in articles:
        articles_data.append({
'id': article.id,
'title': article.title,
'slug': article.slug,
'excerpt': article.content[:100] + '...'if len(article.content) > 100else article.content,
'published': article.published,
'published_at': article.published_at.isoformat() if article.published_at elseNone,
'views': article.views,
'author': article.author.username if hasattr(article, 'author'elseNone,
'created_at': article.created_at.isoformat()
        })

return jsonify({
'page': page,
'per_page': per_page,
'total': total,
'total_pages': (total + per_page - 1) // per_page,
'data': articles_data
    })

@app.route('/api/articles', methods=['POST'])
@login_required
defcreate_article():
"""创建文章"""
    data = request.get_json()

    required_fields = ['title''content''slug']
ifnot data ornot all(field in data for field in required_fields):
return jsonify({'error''缺少必要字段'}), 400

try:
with db.atomic():
            article = Article.create(
                title=data['title'],
                content=data['content'],
                slug=data['slug'],
                author=g.user,
                published=data.get('published'False)
            )

if article.published:
                article.published_at = datetime.datetime.now()
                article.save()

return jsonify({
'message''文章创建成功',
'article': {
'id': article.id,
'title': article.title,
'slug': article.slug,
'published': article.published,
'published_at': article.published_at.isoformat() if article.published_at elseNone
            }
        }), 201

except IntegrityError:
return jsonify({'error''文章slug已存在'}), 409

@app.route('/api/articles/<int:article_id>', methods=['GET'])
defget_article(article_id):
"""获取单个文章"""
try:
        article = (Article
            .select(Article, User)
            .join(User)
            .where(Article.id == article_id)
            .get())

# 增加阅读计数
        article.views += 1
        article.save()

return jsonify({
'id': article.id,
'title': article.title,
'slug': article.slug,
'content': article.content,
'published': article.published,
'published_at': article.published_at.isoformat() if article.published_at elseNone,
'views': article.views,
'author': {
'id': article.author.id,
'username': article.author.username
            },
'created_at': article.created_at.isoformat(),
'updated_at': article.updated_at.isoformat()
        })

except Article.DoesNotExist:
return jsonify({'error''文章不存在'}), 404

@app.route('/api/articles/<int:article_id>', methods=['PUT'])
@login_required
defupdate_article(article_id):
"""更新文章"""
    data = request.get_json()

try:
        article = Article.get_or_none(Article.id == article_id)
ifnot article:
return jsonify({'error''文章不存在'}), 404

# 权限检查
if article.author.id != g.user.id andnot g.user.is_admin:
return jsonify({'error''没有权限修改此文章'}), 403

with db.atomic():
# 更新字段
if'title'in data:
                article.title = data['title']
if'content'in data:
                article.content = data['content']
if'slug'in data:
                article.slug = data['slug']
if'published'in data:
                article.published = data['published']
if data['published'andnot article.published_at:
                    article.published_at = datetime.datetime.now()

            article.save()

return jsonify({
'message''文章更新成功',
'article': {
'id': article.id,
'title': article.title,
'slug': article.slug,
'published': article.published
            }
        })

except IntegrityError:
return jsonify({'error''文章slug已存在'}), 409

@app.route('/api/articles/<int:article_id>', methods=['DELETE'])
@login_required
defdelete_article(article_id):
"""删除文章"""
try:
        article = Article.get_or_none(Article.id == article_id)
ifnot article:
return jsonify({'error''文章不存在'}), 404

# 权限检查
if article.author.id != g.user.id andnot g.user.is_admin:
return jsonify({'error''没有权限删除此文章'}), 403

        article.delete_instance()

return jsonify({'message''文章删除成功'}), 204

except Exception as e:
return jsonify({'error': str(e)}), 500

# 初始化数据库
definit_db():
"""初始化数据库"""
    tables = [User, Article]  # 添加其他模型
    db.create_tables(tables, safe=True)
    print("数据库表创建完成")

# 命令行工具
if __name__ == '__main__':
import sys

if len(sys.argv) > 1and sys.argv[1] == 'init':
        init_db()
        print("数据库初始化完成")
else:
        init_db()
        app.run(debug=True, host='0.0.0.0', port=5000)

6.2 数据库迁移工具

"""
peewee_migrate.py - Peewee数据库迁移工具

用法:
    python peewee_migrate.py init        # 初始化迁移
    python peewee_migrate.py create      # 创建新迁移
    python peewee_migrate.py up          # 执行迁移
    python peewee_migrate.py down        # 回滚迁移
    python peewee_migrate.py status      # 查看状态
"""


import os
import sys
import datetime
from playhouse.migrate import *
from peewee import *

# 数据库配置
DATABASE = 'app.db'
db = SqliteDatabase(DATABASE)

# 迁移历史表
classMigrationHistory(Model):
    name = CharField(unique=True)
    applied_at = DateTimeField(default=datetime.datetime.now)

classMeta:
        database = db
        table_name = 'migration_history'

# 迁移目录
MIGRATIONS_DIR = 'migrations'

defensure_migrations_dir():
"""确保迁移目录存在"""
ifnot os.path.exists(MIGRATIONS_DIR):
        os.makedirs(MIGRATIONS_DIR)

definit_migration():
"""初始化迁移系统"""
    ensure_migrations_dir()
    db.create_tables([MigrationHistory], safe=True)
    print("迁移系统初始化完成")

defcreate_migration(name):
"""创建新迁移文件"""
    ensure_migrations_dir()

    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f"{timestamp}_{name}.py"
    filepath = os.path.join(MIGRATIONS_DIR, filename)

    template = f'''"""
迁移: {name}
创建时间: {datetime.datetime.now()}
"""

from playhouse.migrate import *
from peewee import *

def up(migrator):
    """执行迁移"""
    # 添加你的迁移代码
    # 示例:
    # migrate(
    #     migrator.add_column('table_name', 'column_name', CharField(null=True))
    # )
    pass

def down(migrator):
    """回滚迁移"""
    # 添加回滚代码
    # 示例:
    # migrate(
    #     migrator.drop_column('table_name', 'column_name')
    # )
    pass
'''


with open(filepath, 'w', encoding='utf-8'as f:
        f.write(template)

    print(f"迁移文件创建成功: {filename}")

defget_applied_migrations():
"""获取已应用的迁移"""
return {m.name for m in MigrationHistory.select()}

defget_migration_files():
"""获取所有迁移文件"""
ifnot os.path.exists(MIGRATIONS_DIR):
return []

    files = []
for f in os.listdir(MIGRATIONS_DIR):
if f.endswith('.py'and f != '__init__.py':
            files.append(f)

return sorted(files)

defrun_migrations(direction='up'):
"""执行迁移"""
    ensure_migrations_dir()

    applied = get_applied_migrations()
    migration_files = get_migration_files()

    migrator = SqliteMigrator(db)

if direction == 'up':
        files_to_run = [f for f in migration_files if f[:-3notin applied]
else:  # down
        files_to_run = [f for f in reversed(migration_files) if f[:-3in applied]

ifnot files_to_run:
        print("没有需要执行的迁移")
return

for filename in files_to_run:
        migration_name = filename[:-3]

# 动态导入迁移模块
import importlib.util
        filepath = os.path.join(MIGRATIONS_DIR, filename)

        spec = importlib.util.spec_from_file_location(migration_name, filepath)
        module = importlib.util.module_from_spec(spec)

try:
            spec.loader.exec_module(module)

if direction == 'up':
                print(f"执行迁移: {migration_name}")
                module.up(migrator)
                MigrationHistory.create(name=migration_name)
                print(f"  完成")
else:
                print(f"回滚迁移: {migration_name}")
                module.down(migrator)
                MigrationHistory.delete().where(MigrationHistory.name == migration_name).execute()
                print(f"  完成")

except Exception as e:
            print(f"迁移失败 {migration_name}{e}")
            sys.exit(1)

defshow_status():
"""显示迁移状态"""
    applied = get_applied_migrations()
    migration_files = get_migration_files()

    print("迁移状态:")
    print("-" * 80)

for filename in migration_files:
        migration_name = filename[:-3]
        status = "已应用"if migration_name in applied else"未应用"
        print(f"{migration_name:50} [{status}]")

defmain():
"""主函数"""
if len(sys.argv) < 2:
        print("用法: python peewee_migrate.py [init|create|up|down|status]")
        sys.exit(1)

    command = sys.argv[1]

if command == 'init':
        init_migration()
elif command == 'create':
if len(sys.argv) < 3:
            print("用法: python peewee_migrate.py create <迁移名称>")
            sys.exit(1)
        create_migration(sys.argv[2])
elif command == 'up':
        run_migrations('up')
elif command == 'down':
        run_migrations('down')
elif command == 'status':
        show_status()
else:
        print(f"未知命令: {command}")

if __name__ == '__main__':
    main()

第七章:最佳实践总结

7.1 开发最佳实践

"""
Peewee开发最佳实践:

1. 数据库设计:
   - 为每个表定义明确的模型
   - 使用合适的数据类型和约束
   - 创建必要的索引
   - 使用外键维护数据完整性

2. 模型设计:
   - 创建基础模型类
   - 使用有意义的字段名
   - 添加适当的验证
   - 实现模型方法封装业务逻辑

3. 查询优化:
   - 使用select_related/prefetch避免N+1
   - 只选择需要的字段
   - 合理使用索引
   - 批量操作使用事务

4. 事务管理:
   - 使用atomic()装饰器
   - 保持事务简短
   - 正确处理异常

5. 安全性:
   - 使用参数化查询(Peewee自动处理)
   - 验证用户输入
   - 限制查询结果数量
   - 实施适当的权限控制

6. 测试:
   - 使用内存数据库进行测试
   - 编写单元测试
   - 测试边界条件
   - 性能测试关键查询

7. 维护:
   - 定期备份数据库
   - 监控慢查询
   - 清理无用数据
   - 更新索引统计
"""


# 配置示例
classDatabaseConfig:
"""数据库配置类"""

    @staticmethod
defget_database(env='development'):
"""根据环境获取数据库配置"""
        configs = {
'development': {
'database''dev.db',
'pragmas': {
'journal_mode''wal',
'cache_size'-2000,
'foreign_keys'1,
'ignore_check_constraints'0,
'synchronous'0
                }
            },
'testing': {
'database'':memory:',
'pragmas': {
'journal_mode''memory',
'synchronous''off'
                }
            },
'production': {
'database''prod.db',
'pragmas': {
'journal_mode''wal',
'cache_size'-10000,
'foreign_keys'1,
'synchronous''normal'
                }
            }
        }

        config = configs.get(env, configs['development'])

# 创建数据库
        db = SqliteDatabase(
            config['database'],
            pragmas=config.get('pragmas', {})
        )

# 生产环境使用连接池
if env == 'production':
from playhouse.pool import PooledSqliteDatabase
            db = PooledSqliteDatabase(
                config['database'],
                max_connections=20,
                stale_timeout=300,
                pragmas=config.get('pragmas', {})
            )

return db

# 错误处理示例
classDatabaseErrorHandler:
"""数据库错误处理"""

    @staticmethod
defhandle_error(e):
"""处理数据库错误"""
if isinstance(e, IntegrityError):
if'UNIQUE'in str(e):
return"数据重复,请检查输入"
elif'FOREIGN KEY'in str(e):
return"关联数据不存在"
else:
return"数据完整性错误"
elif isinstance(e, OperationalError):
return"数据库操作错误,请稍后重试"
elif isinstance(e, DoesNotExist):
return"数据不存在"
else:
return"系统错误,请联系管理员"

# 性能监控装饰器
import time
from functools import wraps

defquery_performance_monitor(threshold=1.0):
"""查询性能监控装饰器"""
defdecorator(func):
        @wraps(func)
defwrapper(*args, **kwargs):
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            elapsed = time.perf_counter() - start_time

if elapsed > threshold:
                print(f"⚠️  慢查询警告: {func.__name__} 耗时 {elapsed:.3f}秒")
# 可以记录到日志或监控系统

return result
return wrapper
return decorator

# 使用示例
@query_performance_monitor(threshold=0.5)
defget_complex_report():
"""生成复杂报表(需要监控性能)"""
# 复杂查询逻辑
pass

7.2 生产环境部署

"""
生产环境部署注意事项:

1. 数据库配置:
   - 使用连接池
   - 配置合理的超时时间
   - 启用WAL模式(SQLite)
   - 设置适当的缓存大小

2. 数据安全:
   - 定期备份
   - 加密敏感数据
   - 实施访问控制
   - 监控异常访问

3. 性能优化:
   - 创建必要的索引
   - 优化复杂查询
   - 使用缓存
   - 定期清理历史数据

4. 监控和日志:
   - 记录慢查询
   - 监控连接数
   - 记录错误日志
   - 设置告警

5. 迁移策略:
   - 使用版本控制管理迁移
   - 测试迁移脚本
   - 准备回滚方案
   - 备份迁移前的数据
"""


# 生产环境配置文件示例
classProductionConfig:
"""生产环境配置"""

# 数据库配置
    DATABASE = {
'name''/data/app/production.db',
'engine''playhouse.pool.PooledSqliteDatabase',
'max_connections'32,
'stale_timeout'300,
'timeout'10,
'pragmas': {
'journal_mode''wal',
'cache_size'-20000,  # 20MB
'foreign_keys'1,
'synchronous''normal',
'temp_store''memory',
'mmap_size'268435456# 256MB
        }
    }

# 连接池配置
    CONNECTION_POOL = {
'max_connections'32,
'stale_timeout'300,  # 5分钟
'timeout'10# 10秒超时
    }

# 查询超时配置
    QUERY_TIMEOUT = 30# 秒

# 备份配置
    BACKUP = {
'enabled'True,
'interval'3600,  # 每小时备份一次
'retention'7,    # 保留7天
'location''/backups'
    }

# 监控配置
    MONITORING = {
'slow_query_threshold'1.0,  # 秒
'log_queries'False,
'max_result_size'10000# 最大返回结果数
    }

# 健康检查端点
@app.route('/api/db/health', methods=['GET'])
defdb_health_check():
"""数据库健康检查"""
from peewee import fn

try:
# 测试连接
        db.connect(reuse_if_open=True)

# 测试查询
        result = db.execute_sql('SELECT 1').fetchone()

# 检查表状态
        table_count = db.execute_sql(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table'"
        ).fetchone()[0]

# 检查连接数(如果是连接池)
if hasattr(db, '_in_use'):
            connections_in_use = len(db._in_use)
else:
            connections_in_use = 1

        db.close()

return jsonify({
'status''healthy',
'database''connected',
'tables': table_count,
'connections_in_use': connections_in_use,
'timestamp': datetime.datetime.now().isoformat()
        })

except Exception as e:
return jsonify({
'status''unhealthy',
'error': str(e),
'timestamp': datetime.datetime.now().isoformat()
        }), 500

# 慢查询日志中间件
classSlowQueryLogger:
"""慢查询日志记录器"""

def__init__(self, threshold=1.0):
        self.threshold = threshold
        self.logger = logging.getLogger('peewee.slow_queries')

def__call__(self, sql, duration):
if duration > self.threshold:
            self.logger.warning(
f"慢查询 detected: {duration:.3f}s\n"
f"SQL: {sql}"
            )

# 配置Peewee使用慢查询日志
import logging

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('peewee')

# 设置慢查询日志
from playhouse.sqlite_ext import SqliteExtDatabase

db = SqliteExtDatabase(
'app.db',
    pragmas={
'journal_mode''wal',
'cache_size'-10000
    }
)

# 注册查询日志回调
db.set_sqlite_logger(SlowQueryLogger(threshold=0.5))

总结:Peewee的优势与选择

Peewee的核心优势:

  1. 简单直观:API设计清晰,学习曲线平缓
  2. 轻量级:无复杂依赖,部署简单
  3. 表达力强:支持复杂的查询构建
  4. 功能完整:包含迁移、信号、扩展等高级功能
  5. 性能优秀:合理的默认配置和优化选项

什么时候选择Peewee?

  • ✅ 小型到中型项目
  • ✅ 需要快速开发原型
  • ✅ 团队熟悉Python但不熟悉SQL
  • ✅ 项目需要轻量级ORM
  • ✅ 使用SQLite或PostgreSQL数据库

什么时候考虑其他方案?

  • 🔄 超大型项目 → 考虑SQLAlchemy
  • 🔄 需要Django生态集成 → 使用Django ORM
  • 🔄 极其复杂的查询需求 → 直接使用SQL
  • 🔄 企业级功能需求 → 考虑SQLAlchemy + Alembic

最后建议

从今天开始实践:

  1. 安装Peeweepip install peewee
  2. 创建第一个模型:定义你的数据表
  3. 尝试基本CRUD:掌握增删改查
  4. 学习关联查询:理解关系处理
  5. 探索高级特性:信号、迁移、扩展

记住:最好的学习方式是实践!


资源推荐

  1. 官方文档:http://docs.peewee-orm.com/
  2. GitHub仓库:https://github.com/coleifer/peewee
  3. 示例项目:https://github.com/coleifer/peewee/tree/master/examples
  4. 扩展模块:https://github.com/coleifer/peewee#playhouse-extensions

现在,开始你的Peewee之旅吧!

END

资料领取

关注公众号,后台回复“资料领取”或点击“资料领取”菜单即可免费获取“软件测试”、“Python开发”相关资料~

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-02-08 07:39:22 HTTP/2.0 GET : https://f.mffb.com.cn/a/466044.html
  2. 运行时间 : 0.190437s [ 吞吐率:5.25req/s ] 内存消耗:4,810.74kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=362555da0986985a2fdbd1067d83ad59
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000903s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.001502s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.001997s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.002308s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.001353s ]
  6. SELECT * FROM `set` [ RunTime:0.000635s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.001574s ]
  8. SELECT * FROM `article` WHERE `id` = 466044 LIMIT 1 [ RunTime:0.041792s ]
  9. UPDATE `article` SET `lasttime` = 1770507562 WHERE `id` = 466044 [ RunTime:0.006146s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.001604s ]
  11. SELECT * FROM `article` WHERE `id` < 466044 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.002073s ]
  12. SELECT * FROM `article` WHERE `id` > 466044 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.001048s ]
  13. SELECT * FROM `article` WHERE `id` < 466044 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.005296s ]
  14. SELECT * FROM `article` WHERE `id` < 466044 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.010752s ]
  15. SELECT * FROM `article` WHERE `id` < 466044 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.022385s ]
0.194448s