引言:如何选择SQLite操作方式?
在Python中操作SQLite数据库,主要有两种方式:
- SQLite3:Python标准库,直接SQL操作
今天我们将从多个维度深度对比这两种方式,帮你做出最佳选择!
第一章:快速入门对比
1.1 SQLite3:原生SQL方式
import sqlite3
from datetime import datetime
# 1. 连接数据库
conn = sqlite3.connect('mydatabase.db', check_same_thread=False)
conn.row_factory = sqlite3.Row # 使返回结果为字典格式
# 2. 创建游标
cursor = conn.cursor()
# 3. 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
age INTEGER,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 4. 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_username ON users(username)')
# 5. 插入数据
cursor.execute('''
INSERT INTO users (username, email, age)
VALUES (?, ?, ?)
''', ('alice', 'alice@example.com', 25))
# 获取自增ID
user_id = cursor.lastrowid
# 6. 批量插入
users_data = [
('bob', 'bob@example.com', 30),
('charlie', 'charlie@example.com', 35),
('david', 'david@example.com', 28)
]
cursor.executemany('''
INSERT INTO users (username, email, age)
VALUES (?, ?, ?)
''', users_data)
# 7. 提交事务
conn.commit()
print(f"SQLite3: 插入完成,最后ID: {user_id}")
1.2 Peewee:ORM方式
from peewee import *
from datetime import datetime
# 1. 定义数据库
db = SqliteDatabase('mydatabase.db')
# 2. 定义模型
classUser(Model):
username = CharField(unique=True)
email = CharField(unique=True)
age = IntegerField(null=True)
is_active = BooleanField(default=True)
created_at = DateTimeField(default=datetime.now)
classMeta:
database = db
table_name = 'users'
indexes = (
(('username',), True), # 唯一索引
)
# 3. 连接数据库并创建表
db.connect()
db.create_tables([User])
# 4. 插入数据(单个)
user = User.create(username='alice', email='alice@example.com', age=25)
user_id = user.id
# 5. 批量插入(方式1)
User.insert_many([
{'username': 'bob', 'email': 'bob@example.com', 'age': 30},
{'username': 'charlie', 'email': 'charlie@example.com', 'age': 35},
{'username': 'david', 'email': 'david@example.com', 'age': 28}
]).execute()
# 批量插入(方式2)- 性能更好
with db.atomic():
for i in range(100):
User.create(username=f'user_{i}', email=f'user_{i}@example.com')
print(f"Peewee: 插入完成,最后ID: {user_id}")
第二章:查询操作对比
2.1 SQLite3查询操作
import sqlite3
from contextlib import closing
defsqlite3_query_examples():
"""SQLite3查询示例"""
conn = sqlite3.connect('mydatabase.db')
conn.row_factory = sqlite3.Row
with closing(conn.cursor()) as cursor:
# 1. 查询所有记录
cursor.execute('SELECT * FROM users')
all_users = cursor.fetchall()
# 转换为字典列表
all_users_dict = [dict(row) for row in all_users]
print(f"所有用户: {len(all_users_dict)} 条")
# 2. 查询单条记录
cursor.execute('SELECT * FROM users WHERE id = ?', (1,))
user = cursor.fetchone()
if user:
print(f"用户1: {dict(user)}")
# 3. 条件查询
cursor.execute('''
SELECT * FROM users
WHERE age > ? AND is_active = ?
ORDER BY age DESC
''', (25, 1))
active_users = cursor.fetchall()
# 4. 分页查询
page = 1
page_size = 10
cursor.execute('''
SELECT * FROM users
LIMIT ? OFFSET ?
''', (page_size, (page-1)*page_size))
page_users = cursor.fetchall()
# 5. 聚合查询
cursor.execute('''
SELECT
COUNT(*) as total,
AVG(age) as avg_age,
MIN(age) as min_age,
MAX(age) as max_age
FROM users
''')
stats = cursor.fetchone()
print(f"统计: {dict(stats)}")
# 6. 连表查询(如果有多张表)
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
user_id INTEGER,
title TEXT,
content TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
cursor.execute('''
SELECT u.username, p.title, p.content
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
''')
# 7. 使用上下文管理器确保连接关闭
conn.close()
sqlite3_query_examples()
2.2 Peewee查询操作
from peewee import *
import datetime
defpeewee_query_examples():
"""Peewee查询示例"""
classUser(Model):
username = CharField(unique=True)
email = CharField(unique=True)
age = IntegerField(null=True)
is_active = BooleanField(default=True)
created_at = DateTimeField(default=datetime.datetime.now)
classMeta:
database = db
table_name = 'users'
classPost(Model):
user = ForeignKeyField(User, backref='posts')
title = CharField()
content = TextField()
created_at = DateTimeField(default=datetime.datetime.now)
classMeta:
database = db
# 连接到数据库
db.connect()
try:
# 1. 查询所有记录
all_users = list(User.select())
print(f"所有用户: {len(all_users)} 条")
# 2. 查询单条记录
user = User.get_or_none(User.id == 1)
if user:
print(f"用户1: {user.username}")
# 3. 条件查询(多种方式)
# 方式1:where条件
active_users = User.select().where(
(User.age > 25) & (User.is_active == True)
).order_by(User.age.desc())
# 方式2:filter_by(更简洁)
alice = User.get_or_none(username='alice')
# 方式3:复杂条件
users = User.select().where(
(User.age.between(20, 30)) |
(User.username.contains('alice'))
)
# 4. 分页查询
page = 1
page_size = 10
paginated_users = User.select().paginate(page, page_size)
# 5. 聚合查询
from peewee import fn
query = User.select(
fn.COUNT(User.id).alias('total'),
fn.AVG(User.age).alias('avg_age'),
fn.MIN(User.age).alias('min_age'),
fn.MAX(User.age).alias('max_age')
)
stats = query.tuples().first()
print(f"统计: 总数={stats[0]}, 平均年龄={stats[1]:.1f}")
# 6. 连表查询
# 一对多查询
user_with_posts = (User
.select(User, Post)
.join(Post, JOIN.LEFT_OUTER)
.where(User.id == 1))
# 预加载关联数据(避免N+1查询)
users_with_posts = (User
.select(User, Post)
.join(Post, JOIN.LEFT_OUTER)
.order_by(User.username, Post.created_at.desc()))
# 7. 高级查询:子查询、分组等
# 子查询示例
subquery = User.select(fn.MAX(User.age))
oldest_users = User.select().where(User.age == subquery)
# 分组查询
age_stats = (User
.select(User.is_active, fn.COUNT(User.id).alias('count'))
.group_by(User.is_active))
for stat in age_stats:
print(f"活跃状态 {stat.is_active}: {stat.count} 人")
# 8. 原生SQL查询(当ORM不够用时)
query = User.raw('''
SELECT *,
(SELECT COUNT(*) FROM post WHERE post.user_id = user.id) as post_count
FROM user
WHERE age > %s
''', (25,))
for user in query:
print(f"{user.username} 有 {user.post_count} 篇文章")
finally:
db.close()
peewee_query_examples()
第三章:性能对比测试
3.1 批量插入性能测试
import sqlite3
import time
from peewee import *
import random
defperformance_test():
"""性能对比测试"""
# 测试数据准备
num_records = 10000
# SQLite3批量插入测试
print("=== SQLite3批量插入测试 ===")
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE test_sqlite3 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
value INTEGER,
data TEXT
)
''')
# 方式1:逐条插入(最慢)
start = time.time()
for i in range(1000):
cursor.execute(
'INSERT INTO test_sqlite3 (name, value, data) VALUES (?, ?, ?)',
(f'name_{i}', i, 'x' * 100)
)
conn.commit()
elapsed = time.time() - start
print(f"逐条插入1000条: {elapsed:.3f}秒 ({1000/elapsed:.1f}条/秒)")
# 方式2:批量插入(推荐)
data = [(f'batch_{i}', i, 'x' * 100) for i in range(num_records)]
start = time.time()
cursor.executemany(
'INSERT INTO test_sqlite3 (name, value, data) VALUES (?, ?, ?)',
data
)
conn.commit()
elapsed = time.time() - start
print(f"批量插入{num_records}条: {elapsed:.3f}秒 ({num_records/elapsed:.1f}条/秒)")
conn.close()
# Peewee批量插入测试
print("\n=== Peewee批量插入测试 ===")
db = SqliteDatabase(':memory:')
classTestPeewee(Model):
name = CharField()
value = IntegerField()
data = TextField()
classMeta:
database = db
db.connect()
db.create_tables([TestPeewee])
# 方式1:逐条插入
start = time.time()
for i in range(1000):
TestPeewee.create(name=f'name_{i}', value=i, data='x' * 100)
elapsed = time.time() - start
print(f"逐条插入1000条: {elapsed:.3f}秒 ({1000/elapsed:.1f}条/秒)")
# 方式2:insert_many
data = [
{'name': f'batch_{i}', 'value': i, 'data': 'x' * 100}
for i in range(num_records)
]
start = time.time()
TestPeewee.insert_many(data).execute()
elapsed = time.time() - start
print(f"insert_many插入{num_records}条: {elapsed:.3f}秒 ({num_records/elapsed:.1f}条/秒)")
# 方式3:使用atomic事务
start = time.time()
with db.atomic():
for i in range(num_records):
TestPeewee.create(name=f'trans_{i}', value=i, data='x' * 100)
elapsed = time.time() - start
print(f"事务内逐条插入{num_records}条: {elapsed:.3f}秒 ({num_records/elapsed:.1f}条/秒)")
db.close()
performance_test()
3.2 查询性能测试
import sqlite3
import time
from peewee import *
import numpy as np
defquery_performance_test():
"""查询性能对比测试"""
num_records = 50000
# 准备测试数据
print("准备测试数据...")
# SQLite3
conn = sqlite3.connect(':memory:')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE perf_test_sqlite3 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT,
value INTEGER,
score FLOAT
)
''')
# 创建索引
cursor.execute('CREATE INDEX idx_category ON perf_test_sqlite3(category)')
cursor.execute('CREATE INDEX idx_value ON perf_test_sqlite3(value)')
# 插入测试数据
categories = ['A', 'B', 'C', 'D']
data = [
(random.choice(categories), random.randint(1, 100), random.random() * 100)
for _ in range(num_records)
]
cursor.executemany(
'INSERT INTO perf_test_sqlite3 (category, value, score) VALUES (?, ?, ?)',
data
)
conn.commit()
# Peewee
db = SqliteDatabase(':memory:')
classPerfTestPeewee(Model):
category = CharField(index=True)
value = IntegerField(index=True)
score = FloatField()
classMeta:
database = db
db.connect()
db.create_tables([PerfTestPeewee])
with db.atomic():
for i in range(num_records):
PerfTestPeewee.create(
category=random.choice(categories),
value=random.randint(1, 100),
score=random.random() * 100
)
print("\n=== 查询性能测试 ===")
# 测试1:简单查询
print("\n1. 简单查询(category = 'A'):")
# SQLite3
start = time.time()
cursor.execute('SELECT * FROM perf_test_sqlite3 WHERE category = ?', ('A',))
results = cursor.fetchall()
sqlite3_time = time.time() - start
# Peewee
start = time.time()
peewee_results = list(PerfTestPeewee.select().where(PerfTestPeewee.category == 'A'))
peewee_time = time.time() - start
print(f"SQLite3: {sqlite3_time:.4f}秒,返回 {len(results)} 条")
print(f"Peewee: {peewee_time:.4f}秒,返回 {len(peewee_results)} 条")
# 测试2:复杂条件查询
print("\n2. 复杂条件查询:")
# SQLite3
start = time.time()
cursor.execute('''
SELECT category, AVG(score) as avg_score, COUNT(*) as count
FROM perf_test_sqlite3
WHERE value > 50
GROUP BY category
HAVING COUNT(*) > 1000
ORDER BY avg_score DESC
''')
results = cursor.fetchall()
sqlite3_time = time.time() - start
# Peewee
from peewee import fn
start = time.time()
query = (PerfTestPeewee
.select(
PerfTestPeewee.category,
fn.AVG(PerfTestPeewee.score).alias('avg_score'),
fn.COUNT(PerfTestPeewee.id).alias('count')
)
.where(PerfTestPeewee.value > 50)
.group_by(PerfTestPeewee.category)
.having(fn.COUNT(PerfTestPeewee.id) > 1000)
.order_by(fn.AVG(PerfTestPeewee.score).desc())
)
peewee_results = list(query)
peewee_time = time.time() - start
print(f"SQLite3: {sqlite3_time:.4f}秒,返回 {len(results)} 组")
print(f"Peewee: {peewee_time:.4f}秒,返回 {len(peewee_results)} 组")
# 测试3:分页查询
print("\n3. 分页查询(第10页,每页100条):")
# SQLite3
start = time.time()
cursor.execute('''
SELECT * FROM perf_test_sqlite3
ORDER BY id
LIMIT 100 OFFSET 900
''')
results = cursor.fetchall()
sqlite3_time = time.time() - start
# Peewee
start = time.time()
peewee_results = list(PerfTestPeewee.select().order_by(PerfTestPeewee.id).paginate(10, 100))
peewee_time = time.time() - start
print(f"SQLite3: {sqlite3_time:.4f}秒,返回 {len(results)} 条")
print(f"Peewee: {peewee_time:.4f}秒,返回 {len(peewee_results)} 条")
# 清理
conn.close()
db.close()
query_performance_test()
第四章:高级功能对比
4.1 事务处理对比
import sqlite3
from peewee import *
from contextlib import contextmanager
deftransaction_comparison():
"""事务处理对比"""
print("=== 事务处理对比 ===\n")
# SQLite3事务处理
print("1. SQLite3事务处理:")
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE accounts (
id INTEGER PRIMARY KEY,
name TEXT,
balance REAL
)
''')
cursor.execute("INSERT INTO accounts (name, balance) VALUES ('Alice', 1000)")
cursor.execute("INSERT INTO accounts (name, balance) VALUES ('Bob', 500)")
conn.commit()
# 手动事务控制
try:
conn.execute('BEGIN TRANSACTION')
# 转账操作
cursor.execute("UPDATE accounts SET balance = balance - 200 WHERE name = 'Alice'")
cursor.execute("UPDATE accounts SET balance = balance + 200 WHERE name = 'Bob'")
# 检查余额
cursor.execute("SELECT balance FROM accounts WHERE name = 'Alice'")
alice_balance = cursor.fetchone()[0]
if alice_balance < 0:
conn.rollback()
print(" 转账失败:余额不足")
else:
conn.commit()
print(" 转账成功")
except Exception as e:
conn.rollback()
print(f" 事务失败: {e}")
# 使用上下文管理器
@contextmanager
defsqlite3_transaction(connection):
"""SQLite3事务上下文管理器"""
try:
yield
connection.commit()
except Exception:
connection.rollback()
raise
with sqlite3_transaction(conn):
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE name = 'Alice'")
conn.close()
# Peewee事务处理
print("\n2. Peewee事务处理:")
db = SqliteDatabase(':memory:')
classAccount(Model):
name = CharField()
balance = FloatField()
classMeta:
database = db
db.connect()
db.create_tables([Account])
Account.create(name='Alice', balance=1000)
Account.create(name='Bob', balance=500)
# 方式1:使用atomic装饰器
@db.atomic()
deftransfer_amount(from_name, to_name, amount):
"""转账函数"""
# 扣款
query = Account.update(balance=Account.balance - amount).where(Account.name == from_name)
query.execute()
# 检查余额
alice = Account.get(Account.name == from_name)
if alice.balance < 0:
raise ValueError("余额不足")
# 存款
Account.update(balance=Account.balance + amount).where(Account.name == to_name).execute()
print(f" 转账 {amount} 从 {from_name} 到 {to_name}")
try:
transfer_amount('Alice', 'Bob', 200)
print(" 转账成功")
except ValueError as e:
print(f" 转账失败: {e}")
# 方式2:使用atomic上下文管理器
try:
with db.atomic():
# 批量操作
Account.update(balance=Account.balance + 100).where(Account.name == 'Alice').execute()
Account.update(balance=Account.balance - 50).where(Account.name == 'Bob').execute()
print(" 批量更新成功")
except Exception as e:
print(f" 批量更新失败: {e}")
# 方式3:嵌套事务
with db.atomic() as txn1:
Account.update(balance=500).where(Account.name == 'Alice').execute()
try:
with db.atomic() as txn2:
Account.update(balance=Account.balance + 1000).where(Account.name == 'Alice').execute()
raise ValueError("模拟错误")
except ValueError:
print(" 内层事务回滚,外层事务继续")
print(f" Alice余额: {Account.get(Account.name == 'Alice').balance}")
db.close()
transaction_comparison()
4.2 模型关系对比
from peewee import *
import datetime
defrelationship_comparison():
"""关系处理对比"""
print("=== 关系处理对比 ===\n")
# Peewee关系示例
db = SqliteDatabase(':memory:')
classBaseModel(Model):
classMeta:
database = db
classUser(BaseModel):
username = CharField(unique=True)
email = CharField(unique=True)
created_at = DateTimeField(default=datetime.datetime.now)
classPost(BaseModel):
user = ForeignKeyField(User, backref='posts', on_delete='CASCADE')
title = CharField()
content = TextField()
created_at = DateTimeField(default=datetime.datetime.now)
is_published = BooleanField(default=True)
classComment(BaseModel):
post = ForeignKeyField(Post, backref='comments', on_delete='CASCADE')
user = ForeignKeyField(User, backref='comments')
content = TextField()
created_at = DateTimeField(default=datetime.datetime.now)
classTag(BaseModel):
name = CharField(unique=True)
classPostTag(BaseModel):
post = ForeignKeyField(Post, backref='tags')
tag = ForeignKeyField(Tag, backref='posts')
classMeta:
primary_key = CompositeKey('post', 'tag')
# 创建表
db.create_tables([User, Post, Comment, Tag, PostTag])
# 创建测试数据
with db.atomic():
# 创建用户
alice = User.create(username='alice', email='alice@example.com')
bob = User.create(username='bob', email='bob@example.com')
# 创建文章
post1 = Post.create(
user=alice,
title='第一篇博客',
content='这是我的第一篇博客内容...'
)
post2 = Post.create(
user=bob,
title='Python教程',
content='Python是一种很好的语言...'
)
# 创建评论
Comment.create(post=post1, user=bob, content='写得很棒!')
Comment.create(post=post2, user=alice, content='学到了很多')
# 创建标签
python_tag = Tag.create(name='Python')
tutorial_tag = Tag.create(name='教程')
blog_tag = Tag.create(name='博客')
# 关联标签
PostTag.create(post=post1, tag=blog_tag)
PostTag.create(post=post2, tag=python_tag)
PostTag.create(post=post2, tag=tutorial_tag)
print("1. 一对多关系:")
# 获取用户的所有文章
alice_posts = alice.posts
print(f" Alice的文章数: {alice_posts.count()}")
# 获取文章的作者
post_author = post1.user
print(f" 文章作者: {post_author.username}")
print("\n2. 多对多关系:")
# 获取文章的所有标签
post2_tags = post2.tags
print(f" 文章标签: {[tag.name for tag in post2_tags]}")
# 获取标签的所有文章
python_posts = python_tag.posts
print(f" Python标签的文章数: {python_posts.count()}")
print("\n3. 复杂查询:")
# 查询所有发表过文章的活跃用户
active_users = (User
.select()
.join(Post)
.where(Post.is_published == True)
.group_by(User.id)
.having(fn.COUNT(Post.id) > 0))
print(f" 活跃用户数: {active_users.count()}")
# 查询每篇文章的评论数
posts_with_comment_count = (Post
.select(
Post,
fn.COUNT(Comment.id).alias('comment_count')
)
.join(Comment, JOIN.LEFT_OUTER)
.group_by(Post.id)
.order_by(fn.COUNT(Comment.id).desc()))
for post in posts_with_comment_count:
print(f" 文章: {post.title}, 评论数: {post.comment_count}")
print("\n4. 预加载关联数据(避免N+1查询问题):")
# 普通方式(N+1查询问题)
print(" 普通查询方式(N+1问题)...")
# 高效方式:使用prefetch
print(" 高效查询方式(使用prefetch)...")
users_with_posts = prefetch(
User.select(),
Post.select(),
Comment.select()
)
# 对于多对多关系
posts_with_tags = prefetch(
Post.select(),
PostTag.select(),
Tag.select()
)
db.close()
print("\n5. SQLite3手动处理关系:")
# 使用SQLite3手动处理关系需要写复杂的JOIN语句
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
email TEXT UNIQUE
)
''')
cursor.execute('''
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
user_id INTEGER,
title TEXT,
content TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# 插入数据
cursor.execute("INSERT INTO users (username, email) VALUES ('alice', 'alice@test.com')")
cursor.execute("INSERT INTO posts (user_id, title, content) VALUES (1, 'Test', 'Content')")
# 查询用户及其文章(需要手动JOIN)
cursor.execute('''
SELECT u.*, p.title, p.content
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.username = 'alice'
''')
results = cursor.fetchall()
print(f" SQLite3手动JOIN结果: {len(results)} 条记录")
conn.close()
relationship_comparison()
第五章:实际项目应用对比
5.1 完整API项目:SQLite3实现
import sqlite3
from flask import Flask, request, jsonify, g
import json
from datetime import datetime
from functools import wraps
import hashlib
import jwt
import os
# Flask + SQLite3 API实现
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'your-secret-key')
app.config['DATABASE'] = 'app.db'
defget_db():
"""获取数据库连接"""
if'db'notin g:
g.db = sqlite3.connect(app.config['DATABASE'])
g.db.row_factory = sqlite3.Row
return g.db
defclose_db(e=None):
"""关闭数据库连接"""
db = g.pop('db', None)
if db isnotNone:
db.close()
definit_db():
"""初始化数据库"""
db = get_db()
cursor = db.cursor()
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 文章表
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
published BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# 标签表
cursor.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
)
''')
# 文章标签关联表
cursor.execute('''
CREATE TABLE IF NOT EXISTS article_tags (
article_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (article_id, tag_id),
FOREIGN KEY (article_id) REFERENCES articles(id),
FOREIGN KEY (tag_id) REFERENCES tags(id)
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_articles_user_id ON articles(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_articles_slug ON articles(slug)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published)')
db.commit()
# 工具函数
defhash_password(password):
"""哈希密码"""
return hashlib.sha256(password.encode()).hexdigest()
defverify_password(password, password_hash):
"""验证密码"""
return hash_password(password) == password_hash
defgenerate_token(user_id):
"""生成JWT token"""
payload = {
'user_id': user_id,
'exp': datetime.utcnow().timestamp() + 3600# 1小时过期
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
defverify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['user_id']
except:
returnNone
# 装饰器
deflogin_required(f):
"""登录要求装饰器"""
@wraps(f)
defdecorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
ifnot token:
return jsonify({'error': 'Token required'}), 401
user_id = verify_token(token.replace('Bearer ', ''))
ifnot user_id:
return jsonify({'error': 'Invalid token'}), 401
g.user_id = user_id
return f(*args, **kwargs)
return decorated_function
# API路由
@app.route('/api/register', methods=['POST'])
defregister():
"""用户注册"""
data = request.get_json()
ifnot data ornot all(k in data for k in ['username', 'email', 'password']):
return jsonify({'error': 'Missing required fields'}), 400
db = get_db()
cursor = db.cursor()
try:
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (data['username'], data['email'], hash_password(data['password'])))
db.commit()
user_id = cursor.lastrowid
return jsonify({
'success': True,
'message': 'User registered successfully',
'user_id': user_id
}), 201
except sqlite3.IntegrityError as e:
return jsonify({'error': 'Username or email already exists'}), 409
@app.route('/api/login', methods=['POST'])
deflogin():
"""用户登录"""
data = request.get_json()
ifnot data ornot all(k in data for k in ['username', 'password']):
return jsonify({'error': 'Missing required fields'}), 400
db = get_db()
cursor = db.cursor()
cursor.execute('''
SELECT id, password_hash FROM users
WHERE username = ? OR email = ?
''', (data['username'], data['username']))
user = cursor.fetchone()
if user and verify_password(data['password'], user['password_hash']):
token = generate_token(user['id'])
return jsonify({
'success': True,
'token': token,
'user_id': user['id']
})
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/api/articles', methods=['GET'])
defget_articles():
"""获取文章列表"""
db = get_db()
cursor = db.cursor()
# 获取查询参数
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
tag = request.args.get('tag')
published_only = request.args.get('published', 'true').lower() == 'true'
# 构建查询
query = '''
SELECT a.*, u.username,
GROUP_CONCAT(t.name) as tags
FROM articles a
JOIN users u ON a.user_id = u.id
LEFT JOIN article_tags at ON a.id = at.article_id
LEFT JOIN tags t ON at.tag_id = t.id
'''
conditions = []
params = []
if published_only:
conditions.append('a.published = 1')
if tag:
conditions.append('t.name = ?')
params.append(tag)
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
query += ' GROUP BY a.id ORDER BY a.created_at DESC'
# 分页
query += ' LIMIT ? OFFSET ?'
params.extend([per_page, (page - 1) * per_page])
cursor.execute(query, params)
articles = cursor.fetchall()
# 获取总数
count_query = 'SELECT COUNT(DISTINCT a.id) as total FROM articles a'
if conditions:
count_query += ' WHERE ' + ' AND '.join(conditions)
cursor.execute(count_query, params[:-2] if len(params) > 2else [])
total = cursor.fetchone()['total']
# 格式化结果
result = []
for article in articles:
article_dict = dict(article)
article_dict['tags'] = article_dict['tags'].split(',') if article_dict['tags'] else []
result.append(article_dict)
return jsonify({
'success': True,
'page': page,
'per_page': per_page,
'total': total,
'total_pages': (total + per_page - 1) // per_page,
'data': result
})
# 更多API路由...
if __name__ == '__main__':
with app.app_context():
init_db()
app.run(debug=True)
5.2 完整API项目:Peewee实现
from flask import Flask, request, jsonify, g
from peewee import *
import datetime
import hashlib
import jwt
import os
from functools import wraps
# Flask + Peewee API实现
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'your-secret-key')
app.config['DATABASE'] = 'app.db'
# Peewee数据库配置
db = SqliteDatabase(app.config['DATABASE'])
classBaseModel(Model):
classMeta:
database = db
classUser(BaseModel):
username = CharField(unique=True)
email = CharField(unique=True)
password_hash = CharField()
created_at = DateTimeField(default=datetime.datetime.now)
updated_at = DateTimeField(default=datetime.datetime.now)
@property
defpassword(self):
raise AttributeError('password is not a readable attribute')
@password.setter
defpassword(self, password):
self.password_hash = hashlib.sha256(password.encode()).hexdigest()
defverify_password(self, password):
return self.password_hash == hashlib.sha256(password.encode()).hexdigest()
classArticle(BaseModel):
user = ForeignKeyField(User, backref='articles')
title = CharField()
content = TextField()
slug = CharField(unique=True)
published = BooleanField(default=False)
created_at = DateTimeField(default=datetime.datetime.now)
updated_at = DateTimeField(default=datetime.datetime.now)
classTag(BaseModel):
name = CharField(unique=True)
classArticleTag(BaseModel):
article = ForeignKeyField(Article, backref='tags')
tag = ForeignKeyField(Tag, backref='articles')
classMeta:
primary_key = CompositeKey('article', 'tag')
# 创建表
defcreate_tables():
with db:
db.create_tables([User, Article, Tag, ArticleTag])
# 工具函数
defgenerate_token(user_id):
"""生成JWT token"""
payload = {
'user_id': user_id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
defverify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['user_id']
except:
returnNone
# 装饰器
deflogin_required(f):
@wraps(f)
defdecorated_function(*args, **kwargs):
token = request.headers.get('Authorization')
ifnot token:
return jsonify({'error': 'Token required'}), 401
user_id = verify_token(token.replace('Bearer ', ''))
ifnot user_id:
return jsonify({'error': 'Invalid token'}), 401
g.user_id = user_id
return f(*args, **kwargs)
return decorated_function
# API路由
@app.route('/api/register', methods=['POST'])
defregister():
"""用户注册"""
data = request.get_json()
ifnot data ornot all(k in data for k in ['username', 'email', 'password']):
return jsonify({'error': 'Missing required fields'}), 400
try:
with db.atomic():
user = User.create(
username=data['username'],
email=data['email']
)
user.password = data['password']
user.save()
return jsonify({
'success': True,
'message': 'User registered successfully',
'user_id': user.id
}), 201
except IntegrityError:
return jsonify({'error': 'Username or email already exists'}), 409
@app.route('/api/login', methods=['POST'])
deflogin():
"""用户登录"""
data = request.get_json()
ifnot data ornot all(k in data for k in ['username', 'password']):
return jsonify({'error': 'Missing required fields'}), 400
try:
user = User.get(
(User.username == data['username']) |
(User.email == data['username'])
)
if user.verify_password(data['password']):
token = generate_token(user.id)
return jsonify({
'success': True,
'token': token,
'user_id': user.id
})
except User.DoesNotExist:
pass
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/api/articles', methods=['GET'])
defget_articles():
"""获取文章列表"""
from peewee import fn
# 获取查询参数
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
tag_name = request.args.get('tag')
published_only = request.args.get('published', 'true').lower() == 'true'
# 构建查询
query = (Article
.select(
Article,
User.username,
fn.GROUP_CONCAT(Tag.name).alias('tags')
)
.join(User)
.join(ArticleTag, JOIN.LEFT_OUTER)
.join(Tag, JOIN.LEFT_OUTER)
.group_by(Article.id))
if published_only:
query = query.where(Article.published == True)
if tag_name:
query = query.where(Tag.name == tag_name)
# 计算总数
total = query.count()
# 分页
query = query.order_by(Article.created_at.desc())
query = query.paginate(page, per_page)
# 格式化结果
articles = []
for article in query:
article_dict = {
'id': article.id,
'title': article.title,
'slug': article.slug,
'content': article.content,
'published': article.published,
'created_at': article.created_at.isoformat(),
'updated_at': article.updated_at.isoformat(),
'username': article.username,
'tags': article.tags.split(',') if article.tags else []
}
articles.append(article_dict)
return jsonify({
'success': True,
'page': page,
'per_page': per_page,
'total': total,
'total_pages': (total + per_page - 1) // per_page,
'data': articles
})
@app.route('/api/articles', methods=['POST'])
@login_required
defcreate_article():
"""创建文章"""
data = request.get_json()
required_fields = ['title', 'content', 'slug']
ifnot data ornot all(k in data for k in required_fields):
return jsonify({'error': 'Missing required fields'}), 400
try:
with db.atomic():
# 创建文章
article = Article.create(
user_id=g.user_id,
title=data['title'],
content=data['content'],
slug=data['slug'],
published=data.get('published', False)
)
# 处理标签
if'tags'in data:
tags = []
for tag_name in data['tags']:
tag, _ = Tag.get_or_create(name=tag_name)
tags.append(tag)
# 关联标签
for tag in tags:
ArticleTag.create(article=article, tag=tag)
# 获取完整的文章信息
article_full = (Article
.select(Article, User.username)
.join(User)
.where(Article.id == article.id)
.first())
return jsonify({
'success': True,
'message': 'Article created successfully',
'article': {
'id': article_full.id,
'title': article_full.title,
'slug': article_full.slug,
'username': article_full.user.username
}
}), 201
except IntegrityError:
return jsonify({'error': 'Slug already exists'}), 409
if __name__ == '__main__':
create_tables()
app.run(debug=True)
第六章:综合对比总结
6.1 对比表格
| | | |
|---|
| 学习曲线 | | | |
| 开发速度 | | | |
| 代码可读性 | | | |
| 灵活性 | | | |
| 性能 | | | |
| 安全性 | | | |
| 维护性 | | | |
| 事务支持 | | | |
| 关系处理 | | | |
| 迁移支持 | | | |
6.2 选择建议
"""
根据项目需求选择:
1. 选择 SQLite3 当:
- 小型工具/脚本
- 性能是关键需求
- 需要执行复杂SQL
- 学习SQL基础
- 单文件数据库应用
2. 选择 Peewee 当:
- Web应用/API开发
- 团队协作项目
- 需要快速原型开发
- 数据库结构复杂
- 需要ORM的高级特性
- 需要代码可维护性
3. 混合使用(最佳实践):
- 使用Peewee处理常规CRUD
- 使用SQLite3处理复杂查询
- 在Peewee中执行原生SQL
"""
# 混合使用示例
from peewee import *
classMixedUsage:
def__init__(self):
self.db = SqliteDatabase('mixed.db')
defsetup(self):
"""使用Peewee创建模型"""
classUser(Model):
username = CharField()
email = CharField()
classMeta:
database = self.db
self.User = User
self.db.connect()
self.db.create_tables([User])
# 使用Peewee插入数据
User.create(username='alice', email='alice@example.com')
defcomplex_query(self):
"""使用原生SQL执行复杂查询"""
# Peewee中执行原生SQL
query = self.User.raw('''
SELECT
u.*,
(SELECT COUNT(*) FROM some_other_table WHERE user_id = u.id) as count
FROM user u
WHERE u.username LIKE ?
ORDER BY count DESC
''', ('%a%',))
for user in query:
print(f"{user.username}: {user.count}")
# 或者直接使用SQLite3
import sqlite3
conn = sqlite3.connect('mixed.db')
cursor = conn.cursor()
cursor.execute('''
SELECT
username,
(SELECT COUNT(*) FROM user) as total
FROM user
''')
print(cursor.fetchall())
conn.close()
# 实际选择建议
defchoose_approach(project_type):
"""
根据项目类型选择方法
"""
recommendations = {
'script': 'SQLite3',
'cli_tool': 'SQLite3',
'web_api': 'Peewee',
'mobile_backend': 'Peewee',
'data_analysis': 'SQLite3 + pandas',
'prototype': 'Peewee',
'enterprise': 'Peewee + 数据库连接池',
'iot': 'SQLite3(轻量级)',
'desktop_app': 'SQLite3(单文件)',
'microservice': 'Peewee(结构化)'
}
return recommendations.get(project_type, 'Peewee')
print("项目类型建议:")
for project_type in ['script', 'web_api', 'data_analysis', 'enterprise']:
print(f" {project_type}: {choose_approach(project_type)}")
6.3 最佳实践
"""
使用SQLite3的最佳实践:
1. 使用参数化查询防止SQL注入
2. 使用上下文管理器确保连接关闭
3. 创建适当的索引
4. 使用事务处理批量操作
5. 定期备份数据库
使用Peewee的最佳实践:
1. 使用模型定义所有字段
2. 合理使用索引和外键
3. 使用atomic()处理事务
4. 使用prefetch避免N+1查询
5. 定期进行数据库迁移
通用最佳实践:
1. 数据库操作放在数据访问层
2. 编写单元测试
3. 记录数据库操作日志
4. 监控数据库性能
5. 定期优化数据库
"""
# 性能优化示例
defoptimization_tips():
tips = {
'SQLite3优化': [
'使用PRAGMA优化设置',
'创建复合索引',
'使用EXPLAIN分析查询',
'批量操作使用事务',
'避免在循环中查询'
],
'Peewee优化': [
'使用select_related/prefetch',
'只查询需要的字段',
'使用iterator()处理大量数据',
'合理使用缓存',
'使用数据库连接池'
],
'通用优化': [
'定期VACUUM数据库',
'使用合适的数据类型',
'避免SELECT *',
'合理分页',
'监控慢查询'
]
}
for category, items in tips.items():
print(f"\n{category}:")
for item in items:
print(f" • {item}")
# 实际配置示例
defoptimal_configuration():
"""最优配置示例"""
# SQLite3优化配置
defconfigure_sqlite3():
conn = sqlite3.connect('app.db', check_same_thread=False)
# 性能优化PRAGMA
pragmas = [
('journal_mode', 'WAL'), # 写前日志
('cache_size', -2000), # 2MB缓存
('synchronous', 'NORMAL'), # 平衡性能和安全
('foreign_keys', 'ON'), # 启用外键
('temp_store', 'MEMORY') # 临时表存内存
]
cursor = conn.cursor()
for pragma, value in pragmas:
cursor.execute(f'PRAGMA {pragma} = {value}')
conn.commit()
return conn
# Peewee优化配置
defconfigure_peewee():
from playhouse.pool import PooledSqliteDatabase
# 使用连接池
db = PooledSqliteDatabase(
'app.db',
max_connections=20,
stale_timeout=300,
check_same_thread=False
)
# 配置PRAGMA
@db.func()
defsetup_connection(conn):
conn.execute('PRAGMA journal_mode = WAL')
conn.execute('PRAGMA cache_size = -2000')
return db
print("优化配置完成")
optimization_tips()
optimal_configuration()
总结:没有绝对的最好,只有最适合
SQLite3 和 Peewee 各有优劣,选择哪个取决于:
- 项目规模:小项目用SQLite3,大项目用Peewee
- 团队技能:熟悉SQL用SQLite3,熟悉ORM用Peewee
- 开发周期:快速开发用Peewee,性能优先用SQLite3
- 维护需求:长期维护用Peewee,短期工具用SQLite3
个人建议:从SQLite3开始学习数据库基础,然后转向Peewee提高开发效率。在实际项目中,可以混合使用,取长补短!
关注公众号,后台回复“资料领取”或点击“资料领取”菜单即可免费获取“软件测试”、“Python开发”相关资料~