from fastapi import FastAPI, Depends, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from datetime import datetime
import hashlib
import json
app = FastAPI()
class User(BaseModel):
id: int
name: str
email: str
updated_at: datetime
class CacheManager:
"""缓存管理器"""
def __init__(self):
self.l1_cache = LRUCache(max_size=500)
self.l2_client = redis.Redis(host="localhost", decode_responses=True)
def _get_l2(self, key: str) -> Optional[dict]:
value = self.l2_client.get(key)
if value:
return json.loads(value)
return None
def _set_l2(self, key: str, value: dict, expire: int = 300):
self.l2_client.setex(key, expire, json.dumps(value))
def get(self, key: str) -> Optional[dict]:
# L1
value = self.l1_cache.get(key)
if value is not None:
return value
# L2
value = self._get_l2(key)
if value is not None:
self.l1_cache.set(key, value)
return value
return None
def set(self, key: str, value: dict, expire: int = 300):
self.l1_cache.set(key, value)
self._set_l2(key, value, expire)
def delete(self, key: str):
self.l1_cache.delete(key)
self.l2_client.delete(key)
def delete_pattern(self, pattern: str):
keys = self.l2_client.keys(pattern)
if keys:
self.l2_client.delete(*keys)
cache_manager = CacheManager()
@app.get("/api/users/{user_id}", response_model=User)
async def get_user(
user_id: int,
db: Session = Depends(get_db)
):
"""获取用户(多级缓存)"""
cache_key = f"user:{user_id}"
# 尝试从缓存获取
cached = cache_manager.get(cache_key)
if cached:
return User(**cached)
# 从数据库获取
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 写入缓存
cache_manager.set(cache_key, user.model_dump(), expire=300)
return user
@app.get("/api/users", response_model=list[User])
async def get_users(
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db)
):
"""获取用户列表(带缓存)"""
cache_key = f"users:list:{skip}:{limit}"
cached = cache_manager.get(cache_key)
if cached:
return [User(**u) for u in cached]
users = db.query(UserModel).offset(skip).limit(limit).all()
if not users:
return []
cache_manager.set(cache_key, [u.model_dump() for u in users], expire=300)
return [User.model_validate(u) for u in users]
@app.post("/api/users")
async def create_user(user_in: UserCreate, db: Session = Depends(get_db)):
"""创建用户(失效缓存)"""
user = UserCreate(**user_in.model_dump())
db.add(user)
db.commit()
db.refresh(user)
# 失效缓存
cache_manager.delete_pattern("users:*")
return user
@app.put("/api/users/{user_id}")
async def update_user(
user_id: int,
user_in: UserUpdate,
db: Session = Depends(get_db)
):
"""更新用户(失效缓存)"""
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
for field, value in user_in.model_dump(exclude_unset=True).items():
setattr(user, field, value)
db.commit()
# 失效缓存
cache_manager.delete(f"user:{user_id}")
cache_manager.delete_pattern("users:list:*")
return user
@app.delete("/api/users/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(get_db)):
"""删除用户(失效缓存)"""
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
db.delete(user)
db.commit()
# 失效缓存
cache_manager.delete(f"user:{user_id}")
cache_manager.delete_pattern("users:list:*")
return {"message": "删除成功"}