from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr, Field
from passlib.context import CryptContext
import jwt
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from app.database import get_db
from app import models
app = FastAPI()
配置
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
模型
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=8)
class UserLogin(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
refresh_token: Optional[str] = None
工具函数
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.ExpiredSignatureError:
raise credentials_exception
except jwt.InvalidTokenError:
raise credentials_exception
user = db.query(models.User).filter(models.User.username == username).first()
if user is None:
raise credentials_exception
return user
路由
@app.post("/register", response_model=Token)
async def register(user_in: UserCreate, db: Session = Depends(get_db)):
# 检查用户是否存在
existing_user = db.query(models.User).filter(
(models.User.username == user_in.username) |
(models.User.email == user_in.email)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户名或邮箱已存在"
)
# 创建用户
user = models.User(
username=user_in.username,
email=user_in.email,
hashed_password=get_password_hash(user_in.password)
)
db.add(user)
db.commit()
db.refresh(user)
# 生成令牌
access_token = create_access_token({"sub": user.username})
refresh_token = create_refresh_token({"sub": user.username})
return Token(access_token=access_token, token_type="bearer", refresh_token=refresh_token)
@app.post("/token", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token({"sub": user.username})
refresh_token = create_refresh_token({"sub": user.username})
return Token(access_token=access_token, token_type="bearer", refresh_token=refresh_token)
@app.get("/users/me", response_model=models.User)
async def read_users_me(
current_user: models.User = Depends(get_current_user)
):
return current_user
@app.post("/token/refresh")
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
username = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="Invalid token")
new_access_token = create_access_token({"sub": username})
new_refresh_token = create_refresh_token({"sub": username})
return Token(
access_token=new_access_token,
token_type="bearer",
refresh_token=new_refresh_token
)
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Refresh token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid refresh token")