1from fastapi import FastAPI, Depends, HTTPException, status2from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm3from pydantic import BaseModel, EmailStr4from datetime import datetime, timedelta5from typing import Optional6from passlib.context import CryptContext7import jwt89app = FastAPI()1011# 配置12SECRET_KEY = "your-secret-key"13ALGORITHM = "HS256"14ACCESS_TOKEN_EXPIRE_MINUTES = 301516# 密码加密17pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")18oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")1920class UserCreate(BaseModel):21 username: str22 email: EmailStr23 password: str2425class Token(BaseModel):26 access_token: str27 token_type: str2829class User(BaseModel):30 username: str31 email: EmailStr32 disabled: Optional[bool] = None3334# 模拟数据库35users_db = {}3637def verify_password(plain_password, hashed_password):38 return pwd_context.verify(plain_password, hashed_password)3940def get_password_hash(password):41 return pwd_context.hash(password)4243def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):44 to_encode = data.copy()45 expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))46 to_encode.update({"exp": expire})47 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)4849async def get_current_user(token: str = Depends(oauth2_scheme)):50 credentials_exception = HTTPException(51 status_code=status.HTTP_401_UNAUTHORIZED,52 detail="Could not validate credentials",53 headers={"WWW-Authenticate": "Bearer"},54 )55 try:56 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])57 username: str = payload.get("sub")58 if username is None:59 raise credentials_exception60 except:61 raise credentials_exception62 user = users_db.get(username)63 if user is None:64 raise credentials_exception65 return user6667@app.post("/register", response_model=User)68async def register(user: UserCreate):69 if user.username in users_db:70 raise HTTPException(status_code=400, detail="Username already registered")7172 hashed_password = get_password_hash(user.password)73 user_data = {74 "username": user.username,75 "email": user.email,76 "hashed_password": hashed_password,77 "disabled": False78 }79 users_db[user.username] = user_data80 return {"username": user.username, "email": user.email}8182@app.post("/token", response_model=Token)83async def login(form_data: OAuth2PasswordRequestForm = Depends()):84 user = users_db.get(form_data.username)85 if not user or not verify_password(form_data.password, user["hashed_password"]):86 raise HTTPException(87 status_code=status.HTTP_401_UNAUTHORIZED,88 detail="Incorrect username or password",89 headers={"WWW-Authenticate": "Bearer"},90 )9192 access_token = create_access_token(93 data={"sub": user["username"]},94 expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)95 )96 return {"access_token": access_token, "token_type": "bearer"}9798@app.get("/users/me", response_model=User)99async def read_users_me(current_user: User = Depends(get_current_user)):100 return current_user |