Litestar是一个功能强大、性能优异的现代Python ASGI框架,专注于构建API。它充分利用了Python的类型提示系统,提供自动数据验证、序列化/反序列化以及OpenAPI文档生成。下面通过大量代码来展示它的核心特性。
安装和Hello World
首先通过pip安装Litestar。如果加上standard选项,会同时安装uvicorn服务器和CLI工具:
pip install 'litestar[standard]'
创建一个最简单的应用,保存为app.py:
from litestar import Litestar, get@get("/")async def hello_world() -> dict[str, str]: """传统的Hello World示例""" return {"hello": "world"}app = Litestar(route_handlers=[hello_world])
然后在终端运行:
litestar run# 或带自动重载的开发模式litestar run --reload
访问 http://127.0.0.1:8000/ 就能看到返回的JSON数据。
基于类的控制器
虽然Litestar支持函数式路由,但它也推荐使用类来组织相关的路由,让代码结构更清晰:
from typing import List, Optionalfrom datetime import datetimefrom uuid import UUIDfrom litestar import Controller, get, post, put, patch, deletefrom pydantic import BaseModel# 定义数据模型class User(BaseModel): id: UUID name: str email: str created_at: datetimeclass UserController(Controller): path = "/users" # 所有路由都以 /users 开头 @post() async def create_user(self, data: User) -> User: """创建新用户""" # 在实际应用中,这里会保存到数据库 return data @get() async def list_users(self) -> List[User]: """获取所有用户列表""" return [] @get(path="/{user_id:uuid}") async def get_user(self, user_id: UUID) -> Optional[User]: """根据ID获取单个用户""" # user_id 会自动从URL路径中解析并验证 return None @get(path="/search/{name:str}") async def search_users_by_name(self, name: str) -> List[User]: """根据名字搜索用户""" return [] @put(path="/{user_id:uuid}") async def update_user(self, user_id: UUID, data: User) -> User: """完整更新用户信息""" return data @patch(path="/{user_id:uuid}") async def partial_update_user(self, user_id: UUID, data: dict) -> User: """部分更新用户信息""" return User(**data) @delete(path="/{user_id:uuid}") async def delete_user(self, user_id: UUID) -> None: """删除用户""" return None# 注册控制器app = Litestar(route_handlers=[UserController])
通过类型注解,Litestar能自动完成参数验证、序列化和OpenAPI文档生成。路径参数如{user_id:uuid}会自动进行类型转换和验证。
依赖注入系统
Litestar的依赖注入系统受pytest启发,非常灵活。你可以定义同步或异步的依赖,并在应用的不同层级复用:
from litestar import Litestar, getfrom litestar.di import Providefrom litestar.datastructures import Stateimport random# 定义依赖函数async def get_user_id() -> str: """模拟从请求头或token中获取用户ID""" return f"user_{random.randint(1000, 9999)}"async def get_database_connection(state: State) -> dict: """获取数据库连接(从应用状态中获取配置)""" # 模拟数据库连接 return {"connection": "active", "config": getattr(state, "db_config", "default")}async def get_current_user( user_id: str, # 这个参数会从上一个依赖注入 db: dict # 这个也是) -> dict: """获取当前用户信息""" return { "id": user_id, "name": f"User_{user_id}", "db_status": db.get("connection") }# 使用依赖的路由@get("/me")async def get_me( current_user: dict, # 这个参数会被注入 db: dict # 这个也会) -> dict: """获取当前用户信息""" return { "user": current_user, "database": db }@get("/health")async def health_check(db: dict) -> dict: """健康检查端点""" return {"status": "ok", "database": db["connection"]}# 注册依赖app = Litestar( route_handlers=[get_me, health_check], dependencies={ "user_id": Provide(get_user_id), "db": Provide(get_database_connection), "current_user": Provide(get_current_user) }, state={"db_config": "postgresql://localhost/mydb"} # 应用状态)
依赖可以相互注入,Litestar会自动解析依赖图。
路由守卫(授权机制)
Guards是Litestar的授权机制,可以在请求到达路由处理器之前进行验证:
from litestar import Litestar, getfrom litestar.connection import ASGIConnectionfrom litestar.handlers.base import BaseRouteHandlerfrom litestar.exceptions import NotAuthorizedExceptionfrom litestar.di import Provide# 定义守卫函数async def require_auth(connection: ASGIConnection, handler: BaseRouteHandler) -> None: """验证用户是否已认证""" auth_header = connection.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise NotAuthorizedException("Missing or invalid authorization token") # 在实际应用中,这里会验证token并可能将用户信息存入connection token = auth_header.split(" ")[1] if token != "secret-token": raise NotAuthorizedException("Invalid token")async def require_admin(connection: ASGIConnection, handler: BaseRouteHandler) -> None: """验证用户是否有管理员权限""" # 假设从某处获取用户角色 user_role = connection.headers.get("X-User-Role") if user_role != "admin": raise NotAuthorizedException("Admin privileges required")# 应用守卫@get("/public")async def public_endpoint() -> dict: """公开端点,不需要认证""" return {"message": "This is public"}@get("/protected", guards=[require_auth])async def protected_endpoint() -> dict: """需要认证的端点""" return {"message": "You are authenticated"}@get("/admin", guards=[require_auth, require_admin])async def admin_endpoint() -> dict: """需要认证且是管理员""" return {"message": "Welcome admin"}app = Litestar(route_handlers=[public_endpoint, protected_endpoint, admin_endpoint])
守卫可以应用于单个路由、整个控制器或整个应用,并且支持多个守卫组合。
WebSocket和实时通信
Litestar对WebSocket有良好的支持,可以轻松实现实时通信功能。下面的例子展示了一个简单的聊天室:
from contextlib import asynccontextmanagerfrom typing import AsyncContextManagerfrom litestar import Litestar, WebSocketfrom litestar.channels import ChannelsPluginfrom litestar.channels.backends.memory import MemoryChannelsBackendfrom litestar.exceptions import WebSocketDisconnectfrom litestar.handlers import websocket_listener@asynccontextmanagerasync def chat_room_lifespan( socket: WebSocket, channels: ChannelsPlugin) -> AsyncContextManager[None]: """管理WebSocket连接的生命周期""" # 订阅聊天频道,历史消息保留10条 async with channels.start_subscription("chat", history=10) as subscriber: try: # 在后台运行消息发送任务 async with subscriber.run_in_background(socket.send_data): yield except WebSocketDisconnect: # 客户端断开连接时正常退出 return@websocket_listener("/ws/chat", connection_lifespan=chat_room_lifespan)async def chat_handler(data: str, channels: ChannelsPlugin) -> None: """处理收到的聊天消息并广播给所有订阅者""" # 将消息发布到chat频道 channels.publish(data, channels=["chat"])# 创建应用并注册channels插件app = Litestar( route_handlers=[chat_handler], plugins=[ ChannelsPlugin( channels=["chat"], backend=MemoryChannelsBackend(history=10) ) ])
配合前端HTML代码,只需要几十行代码就能实现一个完整的聊天室应用,支持消息广播和历史消息重放。
应用生命周期管理
Litestar提供了多种方式管理应用的启动和关闭,这对于处理数据库连接、缓存客户端等资源非常有用:
from contextlib import asynccontextmanagerfrom typing import AsyncGeneratorfrom litestar import Litestar, getfrom litestar.datastructures import Stateimport asyncpg# 方式1:使用 on_startup 和 on_shutdown 钩子async def setup_database(app: Litestar) -> None: """在应用启动时建立数据库连接""" # 创建数据库连接池 pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/db", min_size=1, max_size=10 ) app.state.db_pool = pool print("Database connected")async def cleanup_database(app: Litestar) -> None: """在应用关闭时清理数据库连接""" if hasattr(app.state, "db_pool"): await app.state.db_pool.close() print("Database disconnected")# 方式2:使用 lifespan 上下文管理器(推荐)@asynccontextmanagerasync def redis_connection(app: Litestar) -> AsyncGenerator[None, None]: """使用上下文管理器管理Redis连接""" import redis.asyncio as redis # 建立连接 client = redis.from_url("redis://localhost", decode_responses=True) app.state.redis = client print("Redis connected") try: yield # 应用运行期间 finally: # 关闭连接 await client.close() print("Redis disconnected")@get("/users/count")async def get_users_count(state: State) -> dict: """使用数据库连接的示例端点""" if hasattr(state, "db_pool"): async with state.db_pool.acquire() as conn: count = await conn.fetchval("SELECT COUNT(*) FROM users") return {"count": count} return {"error": "Database not available"}@get("/cache/test")async def cache_test(state: State) -> dict: """使用Redis缓存的示例端点""" if hasattr(state, "redis"): await state.redis.set("test_key", "Hello Redis") value = await state.redis.get("test_key") return {"cached": value} return {"error": "Redis not available"}# 可以同时使用多种生命周期管理方式app = Litestar( route_handlers=[get_users_count, cache_test], on_startup=[setup_database], # 启动时执行 on_shutdown=[cleanup_database], # 关闭时执行 lifespan=[redis_connection] # 使用上下文管理器)
当同时使用多个lifespan上下文管理器时,它们会按照声明的顺序进入,逆序退出。
应用状态管理
Litestar的State对象提供了一种在应用各组件之间共享数据的机制:
from litestar import Litestar, getfrom litestar.datastructures import State, ImmutableStatefrom litestar.di import Provide# 使用自定义状态类class AppState: def __init__(self): self.startup_time = None self.request_count = 0 self.config = {} def increment_count(self): self.request_count += 1@get("/stats")async def get_stats(state: State) -> dict: """获取应用统计信息""" if hasattr(state, "app_state"): app_state = state.app_state return { "request_count": app_state.request_count, "startup_time": app_state.startup_time, "config": app_state.config } return {"error": "State not initialized"}@get("/increment")async def increment(state: State) -> dict: """增加请求计数""" if hasattr(state, "app_state"): state.app_state.increment_count() return {"count": state.app_state.request_count} return {"error": "State not initialized"}# 初始化应用状态initial_state = AppState()initial_state.startup_time = "2024-01-01T00:00:00"initial_state.config = {"debug": True, "version": "1.0.0"}app = Litestar( route_handlers=[get_stats, increment], state=State({"app_state": initial_state}) # 使用State包装自定义对象)# 如果想使用不可变状态,防止意外修改# from litestar.datastructures import ImmutableState# app = Litestar(route_handlers=[...], state=ImmutableState({"key": "value"}))
状态对象可以通过依赖注入或直接在路由处理器中访问。
中间件
Litestar内置了多种常用中间件,也可以自定义中间件:
from litestar import Litestar, getfrom litestar.middleware.cors import CORSMiddlewarefrom litestar.middleware.rate_limit import RateLimitMiddlewarefrom litestar.middleware.session import SessionMiddleware, ServerSideSessionConfigfrom litestar.middleware.base import AbstractMiddlewarefrom litestar.types import ASGIApp, Receive, Scope, Sendimport time# 自定义中间件:记录请求处理时间class TimingMiddleware(AbstractMiddleware): async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] == "http": start_time = time.time() # 创建一个包装的send函数来捕获响应时间 async def timing_send(message): if message["type"] == "http.response.start": elapsed = time.time() - start_time # 可以在这里记录到日志 print(f"Request to {scope['path']} took {elapsed:.3f}s") await send(message) await self.app(scope, receive, timing_send) else: await self.app(scope, receive, send)@get("/")async def index() -> dict: return {"message": "Hello"}@get("/slow")async def slow() -> dict: """模拟慢请求""" import asyncio await asyncio.sleep(2) return {"message": "Done"}app = Litestar( route_handlers=[index, slow], middleware=[ TimingMiddleware, # 自定义中间件 CORSMiddleware, # 跨域资源共享 RateLimitMiddleware, # 速率限制 ], cors_config={ "allow_origins": ["http://localhost:3000"], "allow_methods": ["GET", "POST", "PUT", "DELETE"], "allow_headers": ["Content-Type", "Authorization"] }, rate_limit_config={ "rate_limit": ("minute", 100), # 每分钟最多100个请求 "exclude": ["/health"] # 排除健康检查端点 })
内置中间件包括CORS、CSRF、速率限制、GZip压缩、会话管理等。
小结
Litestar是一个功能完善的ASGI框架,核心优势包括:
强类型驱动:充分利用Python类型提示,提供自动验证和OpenAPI文档
高性能:基于msgspec进行超快序列化,性能优异
依赖注入:灵活强大的DI系统,便于测试和代码组织
完整的WebSocket支持:内置channels模块,支持消息广播和持久化
生产就绪:提供CORS、速率限制、会话管理等生产环境所需的功能
可扩展:插件系统支持SQLAlchemy等ORM的无缝集成
适合构建从简单API到复杂全栈应用的各类项目。建议从官方文档和示例项目(如litestar-fullstack)开始学习实践。