还在用PyMongo写冗长的字典操作?是时候升级你的MongoDB开发体验了!
开篇:为什么需要MongoDB ORM?
先看一段典型的“PyMongo原始人”代码:
# 传统PyMongo写法 - 字典地狱
defcreate_user(data):
result = db.users.insert_one({
'name': data['name'],
'email': data['email'],
'age': data.get('age', 0),
'created_at': datetime.now()
})
return str(result.inserted_id)
deffind_users_by_age(min_age):
users = db.users.find({'age': {'$gte': min_age}})
return [{
'id': str(user['_id']),
'name': user['name'],
**user
} for user in users]
发现问题了吗?
是时候请出我们的MongoDB ORM天团了!
一、MongoEngine:企业级首选,功能最全
特点:大而全的瑞士军刀
# 安装
pip install mongoengine
# 定义数据模型
from mongoengine import Document, StringField, IntField, DateTimeField, ListField
classUser(Document):
name = StringField(required=True, max_length=50)
email = StringField(required=True, unique=True)
age = IntField(min_value=0, max_value=150, default=0)
tags = ListField(StringField())
created_at = DateTimeField(default=datetime.now)
meta = {
'collection': 'users',
'indexes': [
'email',
{'fields': ['created_at'], 'expireAfterSeconds': 3600} # TTL索引
],
'ordering': ['-created_at']
}
# 自定义方法
defis_adult(self):
return self.age >= 18
defadd_tag(self, tag):
if tag notin self.tags:
self.tags.append(tag)
self.save()
# 使用体验
user = User(name='张三', email='zhangsan@example.com', age=25)
user.save() # 自动验证数据
# 强大的查询API
vip_users = User.objects(tags='vip', age__gte=18)
recent_users = User.objects(created_at__gte=timezone.now() - timedelta(days=7))
# 聚合查询
from mongoengine.queryset.visitor import Q
admins = User.objects(Q(tags='admin') | Q(email__endswith='@company.com'))
适用场景:
二、Motor + PyMongo组合:异步高性能王者
特点:速度与激情的完美结合
# 安装
pip install motor pydantic # 配合Pydantic做数据验证
# 异步数据模型
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
classUserCreate(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: str = Field(..., regex=r'^[\w\.-]+@[\w\.-]+\.\w+$')
age: Optional[int] = Field(None, ge=0, le=150)
tags: List[str] = []
@validator('name')
defname_must_contain_space(cls, v):
if' 'notin v:
raise ValueError('姓名必须包含空格')
return v
classUserInDB(UserCreate):
id: str = Field(alias='_id')
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
classUserRepository:
def__init__(self, db):
self.collection = db.users
asyncdefcreate_user(self, user_data: UserCreate) -> UserInDB:
"""创建用户 - 异步高性能版本"""
user_dict = user_data.dict()
user_dict['created_at'] = user_dict['updated_at'] = datetime.now()
result = await self.collection.insert_one(user_dict)
# 返回完整用户数据
user_dict['_id'] = str(result.inserted_id)
return UserInDB(**user_dict)
asyncdeffind_users_with_aggregation(self):
"""复杂聚合查询 - 利用MongoDB原生能力"""
pipeline = [
{
'$match': {'age': {'$gte': 18}}
},
{
'$lookup': {
'from': 'orders',
'localField': '_id',
'foreignField': 'user_id',
'as': 'orders'
}
},
{
'$addFields': {
'order_count': {'$size': '$orders'},
'total_spent': {'$sum': '$orders.amount'}
}
},
{
'$sort': {'total_spent': -1}
},
{
'$limit': 10
}
]
asyncfor user in self.collection.aggregate(pipeline):
yield user
asyncdefbulk_update(self, filter_query, update_data):
"""批量更新 - 极致性能"""
result = await self.collection.update_many(
filter_query,
{'$set': update_data}
)
return result.modified_count
# 使用示例
asyncdefmain():
client = AsyncIOMotorClient('mongodb://localhost:27017')
db = client.myapp
repo = UserRepository(db)
# 创建用户
user = await repo.create_user(UserCreate(
name='李 四',
email='lisi@example.com',
age=30
))
# 批量处理
asyncfor top_user in repo.find_users_with_aggregation():
print(f"VIP用户: {top_user['name']}, 消费: {top_user['total_spent']}")
适用场景:
三、Beanie:Pythonic新选择,基于Pydantic
特点:现代、类型安全、开发体验好
# 安装
pip install beanie
from beanie import Document, Indexed, Link
from pydantic import Field
from typing import Optional, List
from datetime import datetime
classUser(Document):
name: str = Field(..., min_length=2)
email: str = Indexed(str, unique=True) # 自动创建索引
age: Optional[int] = Field(None, ge=0, le=150)
tags: List[str] = []
created_at: datetime = Field(default_factory=datetime.now)
# 关联查询(类似关系型数据库)
department: Link["Department"] # 类型提示
classSettings:
name = "users"# 集合名
use_state_management = True# 状态管理
asyncdefbefore_save(self):
"""保存前的钩子"""
self.updated_at = datetime.now()
@property
defdisplay_name(self):
returnf"{self.name} ({self.email})"
classDepartment(Document):
name: str
code: Indexed(str, unique=True)
classSettings:
name = "departments"
# 初始化
await User.init_db()
# 查询语法非常Pythonic
users = await User.find(
User.age >= 18,
User.tags.contains("vip")
).sort(-User.created_at).limit(10).to_list()
# 原生聚合支持
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}}
]
result = await User.aggregate(pipeline).to_list()
适用场景:
四、Umongo:轻量灵活,基于Marshmallow
特点:轻量级但功能不简单
# 安装
pip install umongo[pymongo] # 或 umongo[motor]
from umongo import Instance, Document, fields
from pymongo import MongoClient
from datetime import datetime
# 创建实例
client = MongoClient()
db = client.myapp
instance = Instance(db)
@instance.register
classUser(Document):
name = fields.StrField(required=True, validate=lambda v: len(v) >= 2)
email = fields.EmailField(unique=True)
age = fields.IntField(min_value=0, max_value=150)
# 复杂字段类型
preferences = fields.DictField(default={
'theme': 'light',
'notifications': True
})
created_at = fields.DateTimeField(default=datetime.now)
classMeta:
collection_name = "users"
@property
defis_verified(self):
return'verified'in self.tags if hasattr(self, 'tags') elseFalse
@staticmethod
asyncdeffind_by_domain(domain):
"""自定义查询方法"""
return User.find({'email': {'$regex': f'@{domain}$'}})
# 数据序列化/反序列化
user_data = {
'name': '王五',
'email': 'wangwu@example.com',
'age': 28
}
user = User(**user_data) # 自动验证
user.dump() # 序列化为字典
# 数据库操作
await user.commit() # 保存
await user.delete() # 删除
# 批量操作
users = await User.find({'age': {'$gte': 25}}).to_list()
五、ODMantic:异步优先,类型安全
特点:为async/await而生
# 安装
pip install odmantic
from odmantic import Model, Field, ObjectId
from typing import Optional
import asyncio
classUser(Model):
name: str = Field(min_length=2)
email: str = Field(unique=True)
age: Optional[int] = Field(ge=0, le=150, default=None)
classConfig:
collection = "users"
@property
defage_group(self):
if self.age < 18:
return"未成年"
elif self.age < 60:
return"成年"
else:
return"老年"
# 异步引擎
from odmantic import AIOEngine
from motor.motor_asyncio import AsyncIOMotorClient
asyncdefmain():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client=client, database="myapp")
# CRUD操作
user = User(name="赵六", email="zhaoliu@example.com", age=35)
await engine.save(user)
# 查询
users = await engine.find(User, User.age >= 30)
# 复杂查询
users = await engine.find(
User,
User.age > 25,
sort_by=User.age,
limit=10
)
# 原生聚合
pipeline = [
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
result = await engine.aggregate(User, pipeline)
📊 横向对比:哪个适合你?
| | | | | |
|---|
| 学习曲线 | | | | | |
| 性能 | | | | | |
| 异步支持 | | | | | |
| 类型提示 | | | | | |
| 验证能力 | | | | | |
| 社区生态 | | | | | |
| 适用规模 | | | | | |
🚀 实战:电商用户系统完整示例
# 综合方案:Beanie + 缓存 + 事件驱动
from beanie import Document, before_event, Replace, Insert
from pydantic import EmailStr, Field
from datetime import datetime
import redis.asyncio as redis
from enum import Enum
classUserRole(str, Enum):
CUSTOMER = "customer"
ADMIN = "admin"
VIP = "vip"
classUser(Document):
username: str = Field(..., min_length=3)
email: EmailStr = Field(unique=True)
password_hash: str
role: UserRole = UserRole.CUSTOMER
points: int = Field(default=0, ge=0)
last_login: datetime = None
metadata: dict = Field(default_factory=dict)
classSettings:
name = "users"
use_cache = True
cache_expiration_time = 300# 5分钟缓存
@before_event(Insert, Replace)
defupdate_timestamps(self):
self.updated_at = datetime.now()
asyncdefadd_points(self, points: int):
"""原子操作:增加积分"""
await self.set({User.points: self.points + points})
@classmethod
asyncdeffind_top_spenders(cls, limit: int = 10):
"""查找消费最高的用户"""
pipeline = [
{"$lookup": {
"from": "orders",
"localField": "_id",
"foreignField": "user_id",
"as": "orders"
}},
{"$unwind": "$orders"},
{"$group": {
"_id": "$_id",
"total_spent": {"$sum": "$orders.amount"},
"name": {"$first": "$username"},
"email": {"$first": "$email"}
}},
{"$sort": {"total_spent": -1}},
{"$limit": limit}
]
returnawait cls.aggregate(pipeline).to_list()
# 带缓存的Repository模式
classCachedUserRepository:
def__init__(self, redis_client):
self.redis = redis_client
self.cache_prefix = "user:"
asyncdefget_user_with_cache(self, user_id: str) -> Optional[User]:
# 先查缓存
cache_key = f"{self.cache_prefix}{user_id}"
cached = await self.redis.get(cache_key)
if cached:
return User.parse_raw(cached)
# 缓存未命中,查数据库
user = await User.get(user_id)
if user:
# 写入缓存
await self.redis.setex(
cache_key,
300, # 5分钟过期
user.json()
)
return user
💡 迁移指南:从PyMongo到ORM
# 你的旧代码
defold_way():
user = db.users.find_one({"email": "test@example.com"})
if user:
user["last_login"] = datetime.now()
db.users.update_one(
{"_id": user["_id"]},
{"$set": {"last_login": user["last_login"]}}
)
# 新方式:选择你喜欢的ORM
# 方案1:MongoEngine
user = User.objects(email="test@example.com").first()
if user:
user.update(set__last_login=datetime.now())
# 方案2:Beanie
user = await User.find_one(User.email == "test@example.com")
if user:
user.last_login = datetime.now()
await user.save()
# 方案3:继续用PyMongo,但更优雅
classUserService:
def__init__(self, db):
self.collection = db.users
asyncdefupdate_last_login(self, email):
await self.collection.update_one(
{"email": email},
{"$set": {"last_login": datetime.now()}},
upsert=False
)
🎯 选择建议
根据团队情况选择:
- 新手团队/快速原型 → MongoEngine(文档最全)
- 性能优先/高并发 → Motor + PyMongo(官方驱动)
- 类型安全/现代语法 → Beanie或ODMantic
- 已有PyMongo代码迁移 → 逐步引入Umongo
根据项目规模选择:
- 小型项目/工具脚本 → PyMongo裸奔也不是不行
- 中型SaaS应用 → Beanie或MongoEngine
- 大型微服务架构 → Motor + 精心设计的Repository模式
结语
没有最好的ORM,只有最适合的ORM。建议:
最后提醒:无论选择哪个ORM,都要理解底层的MongoDB原理。ORM是工具,不是魔法,好的数据库设计和索引策略才是性能的关键!
关注公众号,后台回复“资料领取”或点击“资料领取”菜单即可免费获取“软件测试”、“Python开发”相关资料~