from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, status
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from pathlib import Path
import uuid
import os
app = FastAPI()
配置
UPLOAD_DIR = Path("uploads")
MAX_FILE_SIZE = 50 1024 1024 # 50MB
ALLOWED_TYPES = ["image/jpeg", "image/png", "image/gif", "application/pdf"]
数据库模型(简化)
class FileRecord(BaseModel):
id: str
original_name: str
saved_name: str
size: int
content_type: str
owner_id: int
created_at: datetime
is_public: bool = False
内存存储(生产环境用数据库)
files_db: dict = {}
class FileValidator:
"""文件验证器"""
def validate(self, file: UploadFile) -> tuple[bool, str, str]:
# 检查扩展名
allowed_extensions = [".jpg", ".jpeg", ".png", ".gif", ".pdf"]
extension = os.path.splitext(file.filename)[1].lower()
if extension not in allowed_extensions:
return False, f"不支持的文件类型: {extension}", ""
# 检查MIME
if file.content_type not in ALLOWED_TYPES:
return False, f"不支持的文件类型: {file.content_type}", ""
# 检查大小
content = file.file.read()
file.file.seek(0)
if len(content) > MAX_FILE_SIZE:
return False, f"文件过大,最大允许 {MAX_FILE_SIZE // 1024 // 1024}MB", ""
if len(content) == 0:
return False, "文件为空", ""
# 生成安全文件名
safe_name = f"{uuid.uuid4().hex}{extension}"
return True, "验证通过", safe_name
validator = FileValidator()
class FileUploadResponse(BaseModel):
id: str
original_name: str
saved_name: str
size: int
content_type: str
url: str
@app.post("/files/upload/", response_model=FileUploadResponse)
async def upload_file(
file: UploadFile = File(...),
owner_id: int = Depends(lambda: 1) # 从认证获取
):
"""上传文件"""
# 验证
is_valid, message, safe_name = validator.validate(file)
if not is_valid:
raise HTTPException(status_code=400, detail=message)
# 保存
file_path = UPLOAD_DIR / safe_name
file_path.parent.mkdir(exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 创建记录
file_id = str(uuid.uuid4())
record = FileRecord(
id=file_id,
original_name=file.filename,
saved_name=safe_name,
size=file_path.stat().st_size,
content_type=file.content_type,
owner_id=owner_id,
created_at=datetime.utcnow()
)
files_db[file_id] = record
return FileUploadResponse(
id=file_id,
original_name=file.filename,
saved_name=safe_name,
size=record.size,
content_type=record.content_type,
url=f"/files/{file_id}/download"
)
@app.get("/files/{file_id}/download")
async def download_file(file_id: str):
"""下载文件"""
record = files_db.get(file_id)
if not record:
raise HTTPException(status_code=404, detail="文件不存在")
# 检查权限
# if record.owner_id != current_user_id and not record.is_public:
# raise HTTPException(status_code=403, detail="无权访问")
file_path = UPLOAD_DIR / record.saved_name
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
return FileResponse(
path=file_path,
filename=record.original_name,
media_type=record.content_type
)
@app.get("/files/{file_id}")
async def get_file_info(file_id: str):
"""获取文件信息"""
record = files_db.get(file_id)
if not record:
raise HTTPException(status_code=404, detail="文件不存在")
return record
@app.delete("/files/{file_id}")
async def delete_file(file_id: str, owner_id: int = Depends(lambda: 1)):
"""删除文件"""
record = files_db.get(file_id)
if not record:
raise HTTPException(status_code=404, detail="文件不存在")
# 检查权限
if record.owner_id != owner_id:
raise HTTPException(status_code=403, detail="无权删除")
# 删除文件
file_path = UPLOAD_DIR / record.saved_name
if file_path.exists():
file_path.unlink()
# 删除记录
del files_db[file_id]
return {"message": "文件已删除"}
@app.get("/files/")
async def list_files(
owner_id: int = Depends(lambda: 1),
limit: int = 20,
skip: int = 0
):
"""列出文件"""
user_files = [
f for f in files_db.values()
if f.owner_id == owner_id
][skip:skip + limit]
return {
"files": user_files,
"total": len([f for f in files_db.values() if f.owner_id == owner_id])
}