

你是否曾回头审视自己半年前写的Python项目,然后心想:这堆垃圾是谁写的?哦等等…好像是我自己。
别担心,每个真正的开发者都有这样的时刻。问题不在于你的编程技巧,而在于你的架构。而架构,正是90%技术债务的诞生地。
经过四年多构建从“可爱小脚本”到“为什么这东西要吃12GB内存?”的系统后,我学到了一个痛苦的教训:语法拯救不了项目,模式才能。
今天,云朵君要分享给你10个Python模式,在构建任何像样的“大项目”之前,你应该把它们刻在脑子里。
这些不是你在每个YouTube教程里看到的基础模式。这些是我多么希望有人在我写下4万行意大利面条式代码之前,就甩在我脸上的模式。
想象一下这个场景:你的项目从简单的脚本成长为有多个模块、需要团队协作的完整应用。突然之间:
这就是缺乏设计模式的代价。而下面的10个模式,正是解决这些问题的钥匙。
大多数开发者不用依赖注入,因为他们觉得“Python是动态语言嘛”。确实…直到你尝试测试一个自己实例化数据库连接的类。
为什么重要?
# 错误的做法:硬编码依赖classBadUserManager:def__init__(self): self.email_service = EmailService() # 硬编码!defregister(self, user):# 业务逻辑 self.email_service.send(user, "Welcome!")# 正确的做法:依赖注入classEmailService:defsend(self, to, message): print(f"发送邮件给 {to}: {message}")classUserManager:def__init__(self, email_service):# 依赖作为参数传入 self.email_service = email_servicedefregister(self, user):# 业务逻辑 self.email_service.send(user, "欢迎!")# 使用email_service = EmailService()manager = UserManager(email_service) # 注入依赖manager.register("test@example.com")# 测试时可以轻松模拟classMockEmailService:defsend(self, to, message): print(f"[测试] 模拟发送给 {to}")test_manager = UserManager(MockEmailService())test_manager.register("test@example.com")实战技巧:在FastAPI或Django中,依赖注入常用于数据库会话、配置等。试试看,你的测试代码会变得多么干净!
如果你的代码有一个巨大的决策树,恭喜你——你创造了一个怪物。策略模式用清晰、可互换的行为替换了这团乱麻。
# 常见反模式:if/else地狱defprocess_payment(amount, payment_method):if payment_method == "paypal":# 处理PayPal支付passelif payment_method == "stripe":# 处理Stripe支付passelif payment_method == "alipay":# 处理支付宝支付pass# ... 更多elif# 优雅的解决方案:策略模式from abc import ABC, abstractmethodclassPaymentStrategy(ABC):"""支付策略抽象基类""" @abstractmethoddefpay(self, amount):passclassPayPalPayment(PaymentStrategy):defpay(self, amount): print(f"通过PayPal支付 {amount} 元")# 实际的PayPal API调用returnTrueclassStripePayment(PaymentStrategy):defpay(self, amount): print(f"通过Stripe支付 {amount} 元")# 实际的Stripe API调用returnTrueclassAlipayPayment(PaymentStrategy):defpay(self, amount): print(f"通过支付宝支付 {amount} 元")# 实际的支付宝API调用returnTrueclassPaymentProcessor:def__init__(self): self.strategies = {"paypal": PayPalPayment(),"stripe": StripePayment(),"alipay": AlipayPayment() }defprocess(self, method, amount): strategy = self.strategies.get(method)ifnot strategy:raise ValueError(f"不支持的支付方式: {method}")return strategy.pay(amount)# 使用processor = PaymentProcessor()processor.process("paypal", 100)processor.process("alipay", 200)# 添加新支付方式非常容易classWeChatPayment(PaymentStrategy):defpay(self, amount): print(f"通过微信支付 {amount} 元")returnTrueprocessor.strategies["wechat"] = WeChatPayment()设计优势:新增支付方式只需添加新类,无需修改现有代码——这符合开闭原则。
曾经有一个类有九个可选参数吗?是的。那不是构造函数——那是人质劫持现场。
# 问题:太多参数的构造函数classUser:def__init__(self, name, email=None, phone=None, age=None, address=None, city=None, country=None, is_active=True, is_admin=False):# 初始化所有属性...pass# 使用起来很痛苦user = User( name="张三", email="zhangsan@example.com", phone="13800138000", age=25, address="某街道123号", city="北京", country="中国", is_active=True, is_admin=False)# 解决方案:建造者模式classQueryBuilder:def__init__(self, table="users"): self.table = table self._select_fields = ["*"] self._where_conditions = [] self._order_by_fields = [] self._limit_value = Nonedefselect(self, *fields): self._select_fields = fieldsreturn selfdefwhere(self, condition): self._where_conditions.append(condition)return selfdeforder_by(self, field, descending=False): direction = "DESC"if descending else"ASC" self._order_by_fields.append(f"{field}{direction}")return selfdeflimit(self, count): self._limit_value = countreturn selfdefbuild(self):# 构建SELECT部分 select_clause = f"SELECT {', '.join(self._select_fields)} FROM {self.table}"# 构建WHERE部分 where_clause = ""if self._where_conditions: where_clause = " WHERE " + " AND ".join(self._where_conditions)# 构建ORDER BY部分 order_clause = ""if self._order_by_fields: order_clause = " ORDER BY " + ", ".join(self._order_by_fields)# 构建LIMIT部分 limit_clause = ""if self._limit_value: limit_clause = f" LIMIT {self._limit_value}"return select_clause + where_clause + order_clause + limit_clause# 流畅的API调用query = ( QueryBuilder("users") .select("id", "name", "email") .where("age > 18") .where("is_active = TRUE") .order_by("created_at", descending=True) .limit(10) .build())print(query)# 输出: SELECT id, name, email FROM users WHERE age > 18 AND is_active = TRUE ORDER BY created_at DESC LIMIT 10现代应用:SQLAlchemy的查询接口、Django的Q对象、Pydantic的模型配置都使用了类似的建造者模式。
大型系统崩溃往往因为一切依赖于一切。解决方案是:事件,而不是链式调用。
classEventBus:"""事件总线 - 组件通信的中枢"""def__init__(self): self._subscribers = {}defsubscribe(self, event_type, callback):"""订阅事件"""if event_type notin self._subscribers: self._subscribers[event_type] = []if callback notin self._subscribers[event_type]: self._subscribers[event_type].append(callback)defunsubscribe(self, event_type, callback):"""取消订阅"""if event_type in self._subscribers: self._subscribers[event_type].remove(callback)defpublish(self, event_type, data=None):"""发布事件"""if event_type in self._subscribers:for callback in self._subscribers[event_type]:try: callback(data)except Exception as e: print(f"事件处理出错: {e}")defget_subscriber_count(self, event_type):"""获取订阅者数量"""return len(self._subscribers.get(event_type, []))# 定义事件类型USER_REGISTERED = "user_registered"ORDER_CREATED = "order_created"PAYMENT_SUCCESS = "payment_success"# 创建事件总线实例bus = EventBus()# 事件处理器defsend_welcome_email(user_data): print(f"[邮件服务] 发送欢迎邮件给 {user_data.get('email')}")definit_user_storage(user_data): print(f"[存储服务] 为用户 {user_data.get('username')} 初始化存储空间")deflog_user_registration(user_data): print(f"[日志服务] 用户注册: {user_data}")# 订阅事件bus.subscribe(USER_REGISTERED, send_welcome_email)bus.subscribe(USER_REGISTERED, init_user_storage)bus.subscribe(USER_REGISTERED, log_user_registration)# 发布事件new_user = {"id": 123,"username": "zhangsan","email": "zhangsan@example.com","registered_at": "2024-01-01 10:00:00"}print("用户注册事件触发...")bus.publish(USER_REGISTERED, new_user)print(f"事件处理完成,共有{bus.get_subscriber_count(USER_REGISTERED)}个订阅者")# 输出:# 用户注册事件触发...# [邮件服务] 发送欢迎邮件给 zhangsan@example.com# [存储服务] 为用户 zhangsan 初始化存储空间# [日志服务] 用户注册: {'id': 123, 'username': 'zhangsan', ...}# 事件处理完成,共有3个订阅者应用场景:微服务通信、插件系统、GUI应用的事件处理。这种模式比治疗更能解耦有毒的关系。
如果SQL查询生活在你的业务逻辑中,那么你正在邀请混乱。
import sqlite3from contextlib import contextmanagerfrom typing import List, Optional, Dict, Any@contextmanagerdefget_db_connection():"""数据库连接上下文管理器""" conn = sqlite3.connect(":memory:") # 使用内存数据库示例 conn.row_factory = sqlite3.Row # 返回字典-like的行try:yield connfinally: conn.close()classUserRepository:"""用户数据仓储"""def__init__(self, db_connection): self.db = db_connection self._create_table()def_create_table(self):"""创建用户表""" self.db.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL UNIQUE, age INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) self.db.commit()defget_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:"""根据ID获取用户""" cursor = self.db.execute("SELECT * FROM users WHERE id = ?", (user_id,) ) row = cursor.fetchone()return dict(row) if row elseNonedefget_by_email(self, email: str) -> Optional[Dict[str, Any]]:"""根据邮箱获取用户""" cursor = self.db.execute("SELECT * FROM users WHERE email = ?", (email,) ) row = cursor.fetchone()return dict(row) if row elseNonedefget_all(self, limit: int = 100) -> List[Dict[str, Any]]:"""获取所有用户""" cursor = self.db.execute("SELECT * FROM users LIMIT ?", (limit,) )return [dict(row) for row in cursor.fetchall()]defadd(self, username: str, email: str, age: int = None) -> int:"""添加用户""" cursor = self.db.execute("INSERT INTO users (username, email, age) VALUES (?, ?, ?)", (username, email, age) ) self.db.commit()return cursor.lastrowiddefupdate(self, user_id: int, **kwargs) -> bool:"""更新用户信息"""ifnot kwargs:returnFalse set_clause = ", ".join([f"{k} = ?"for k in kwargs.keys()]) values = list(kwargs.values()) values.append(user_id) self.db.execute(f"UPDATE users SET {set_clause} WHERE id = ?", values ) self.db.commit()returnTruedefdelete(self, user_id: int) -> bool:"""删除用户""" cursor = self.db.execute("DELETE FROM users WHERE id = ?", (user_id,) ) self.db.commit()return cursor.rowcount > 0# 使用示例with get_db_connection() as conn: repo = UserRepository(conn)# 添加用户 user_id = repo.add("zhangsan", "zhangsan@example.com", 25) print(f"添加用户成功,ID: {user_id}")# 查询用户 user = repo.get_by_id(user_id) print(f"查询用户: {user}")# 更新用户 repo.update(user_id, age=26, email="new_email@example.com")# 再次查询 updated_user = repo.get_by_id(user_id) print(f"更新后用户: {updated_user}")# 获取所有用户 all_users = repo.get_all() print(f"总用户数: {len(all_users)}")架构优势:更换数据库?只需更新一个文件。这就是架构的力量。
将数据结构映射到Python对象可以给你一致性,并消除"字典末日"问题。
from datetime import datetimefrom typing import OptionalclassUser:"""领域模型:用户实体"""def__init__(self, id: int, username: str, email: str, age: Optional[int] = None, created_at: Optional[datetime] = None): self.id = id self.username = username self.email = email self.age = age self.created_at = created_at or datetime.now()def__repr__(self):returnf"User(id={self.id}, username='{self.username}', email='{self.email}')"defis_adult(self) -> bool:return self.age isnotNoneand self.age >= 18defget_display_name(self) -> str:returnf"{self.username} ({self.email})"classUserMapper:"""用户映射器:在领域模型和数据模型之间转换""" @staticmethoddeffrom_dict(data: dict) -> User:"""从字典创建User对象"""return User( id=data.get("id"), username=data["username"], email=data["email"], age=data.get("age"), created_at=datetime.fromisoformat(data["created_at"]) if"created_at"in data elseNone ) @staticmethoddeffrom_database_row(row) -> User:"""从数据库行创建User对象"""return User( id=row["id"], username=row["username"], email=row["email"], age=row["age"], created_at=datetime.fromisoformat(row["created_at"]) if row["created_at"] elseNone ) @staticmethoddefto_dict(user: User) -> dict:"""将User对象转为字典"""return {"id": user.id,"username": user.username,"email": user.email,"age": user.age,"created_at": user.created_at.isoformat() if user.created_at elseNone,"is_adult": user.is_adult(),"display_name": user.get_display_name() } @staticmethoddefto_database_params(user: User) -> tuple:"""将User对象转为数据库参数"""return ( user.username, user.email, user.age, user.created_at.isoformat() if user.created_at elseNone )# 示例:从API JSON到领域对象api_response = {"id": 123,"username": "zhangsan","email": "zhangsan@example.com","age": 25,"created_at": "2024-01-01T10:00:00"}# 映射到领域对象user = UserMapper.from_dict(api_response)print(f"领域对象: {user}")print(f"是否成人: {user.is_adult()}")print(f"显示名称: {user.get_display_name()}")# 映射回字典user_dict = UserMapper.to_dict(user)print(f"回传字典: {user_dict}")# 示例:从数据库到领域对象classMockDBRow:"""模拟数据库行"""def__init__(self): self.data = {"id": 456,"username": "lisi","email": "lisi@example.com","age": 17,"created_at": "2024-01-02T09:00:00" }def__getitem__(self, key):return self.data[key]db_row = MockDBRow()user_from_db = UserMapper.from_database_row(db_row)print(f"\n从数据库创建的用户: {user_from_db}")print(f"是否成人: {user_from_db.is_adult()}")API给你JSON,你的代码值得拥有结构。
完美适用于机器学习预处理、ETL,或任何你的逻辑有多个阶段的情况。
from abc import ABC, abstractmethodfrom typing import Any, ListclassPipelineStep(ABC):"""管道步骤抽象基类""" @abstractmethoddefprocess(self, data: Any) -> Any:passdef__call__(self, data: Any) -> Any:return self.process(data)classTextCleanStep(PipelineStep):"""文本清洗步骤"""defprocess(self, text: str) -> str:# 移除多余空格 text = ' '.join(text.split())# 转换为小写 text = text.lower()# 移除特殊字符(简单示例) text = ''.join(c for c in text if c.isalnum() or c.isspace())return textclassTokenizeStep(PipelineStep):"""分词步骤"""def__init__(self, tokenizer=None): self.tokenizer = tokenizer or str.splitdefprocess(self, text: str) -> List[str]:return self.tokenizer(text)classStopwordRemovalStep(PipelineStep):"""停用词移除步骤"""def__init__(self, stopwords=None): self.stopwords = stopwords or {"a", "an", "the", "and", "or", "but", "in", "on", "at"}defprocess(self, tokens: List[str]) -> List[str]:return [token for token in tokens if token notin self.stopwords]classStemmingStep(PipelineStep):"""词干提取步骤(简化版)"""defprocess(self, tokens: List[str]) -> List[str]:# 简化版的词干提取 stemmed = []for token in tokens:if token.endswith("ing"): token = token[:-3]elif token.endswith("ed"): token = token[:-2]elif token.endswith("s"): token = token[:-1] stemmed.append(token)return stemmedclassPipeline:"""管道:连接多个处理步骤"""def__init__(self, steps: List[PipelineStep] = None): self.steps = steps or []defadd_step(self, step: PipelineStep):"""添加处理步骤""" self.steps.append(step)return self # 支持链式调用defprocess(self, data: Any) -> Any:"""处理数据""" result = datafor step in self.steps: print(f"执行步骤: {step.__class__.__name__}") result = step.process(result) print(f"当前结果: {result}")return resultdef__call__(self, data: Any) -> Any:return self.process(data)# 构建文本处理管道text_pipeline = Pipeline()text_pipeline.add_step(TextCleanStep())text_pipeline.add_step(TokenizeStep())text_pipeline.add_step(StopwordRemovalStep())text_pipeline.add_step(StemmingStep())# 处理文本sample_text = " I am RUNNING in the park and playing with dogs! "print("原始文本:", sample_text)print("\n开始管道处理...")final_result = text_pipeline.process(sample_text)print(f"\n最终结果: {final_result}")# 输出:# 原始文本: I am RUNNING in the park and playing with dogs! # 开始管道处理...# 执行步骤: TextCleanStep# 当前结果: i am running in the park and playing with dogs# 执行步骤: TokenizeStep# 当前结果: ['i', 'am', 'running', 'in', 'the', 'park', 'and', 'playing', 'with', 'dogs']# 执行步骤: StopwordRemovalStep# 当前结果: ['running', 'park', 'playing', 'dogs']# 执行步骤: StemmingStep# 当前结果: ['runn', 'park', 'play', 'dog']# 最终结果: ['runn', 'park', 'play', 'dog']# 扩展:可以轻松添加新步骤classSpellCheckStep(PipelineStep):"""拼写检查步骤(示例)"""defprocess(self, tokens: List[str]) -> List[str]:# 这里可以实现实际的拼写检查逻辑 corrected = []for token in tokens:# 简单示例:修正常见拼写错误if token == "runn": token = "run" corrected.append(token)return corrected# 添加新步骤到现有管道text_pipeline.add_step(SpellCheckStep())new_result = text_pipeline.process("I am runn in the parc")print(f"\n包含拼写检查的结果: {new_result}")管道保持你的大脑——和你的项目——整洁有序。
如果你的项目需要可逆操作,这是你的救星。
from abc import ABC, abstractmethodfrom typing import List, Dict, AnyclassCommand(ABC):"""命令抽象基类""" @abstractmethoddefexecute(self) -> Any:"""执行命令"""pass @abstractmethoddefundo(self) -> Any:"""撤销命令"""passdef__str__(self):return self.__class__.__name__classTextEditor:"""文本编辑器(接收者)"""def__init__(self): self.text = "" self.history: List[str] = []defadd_text(self, new_text: str):"""添加文本""" self.history.append(self.text) self.text += new_text print(f"添加文本: '{new_text}'") print(f"当前文本: '{self.text}'")defdelete_last_word(self):"""删除最后一个单词"""if self.text: self.history.append(self.text) words = self.text.split()if words: last_word = words[-1] self.text = ' '.join(words[:-1]) print(f"删除最后一个单词: '{last_word}'") print(f"当前文本: '{self.text}'")return last_wordreturn""defclear_text(self):"""清空文本""" self.history.append(self.text) self.text = "" print("清空文本")defrestore_from_history(self):"""从历史恢复"""if self.history: self.text = self.history.pop() print(f"恢复到: '{self.text}'")classAddTextCommand(Command):"""添加文本命令"""def__init__(self, editor: TextEditor, text: str): self.editor = editor self.text = text self.previous_text = ""defexecute(self) -> None: self.previous_text = self.editor.text self.editor.add_text(self.text)defundo(self) -> None: self.editor.text = self.previous_text print(f"撤销添加文本,恢复为: '{self.editor.text}'")classDeleteLastWordCommand(Command):"""删除最后一个单词命令"""def__init__(self, editor: TextEditor): self.editor = editor self.deleted_word = "" self.previous_text = ""defexecute(self) -> None: self.previous_text = self.editor.text self.deleted_word = self.editor.delete_last_word()defundo(self) -> None: self.editor.text = self.previous_text print(f"撤销删除,恢复为: '{self.editor.text}'")classClearTextCommand(Command):"""清空文本命令"""def__init__(self, editor: TextEditor): self.editor = editor self.previous_text = ""defexecute(self) -> None: self.previous_text = self.editor.text self.editor.clear_text()defundo(self) -> None: self.editor.text = self.previous_text print(f"撤销清空,恢复为: '{self.editor.text}'")classCommandHistory:"""命令历史管理器"""def__init__(self): self.history: List[Command] = [] self.undo_history: List[Command] = []defexecute_command(self, command: Command):"""执行命令并记录""" command.execute() self.history.append(command) self.undo_history.clear() # 新的执行清除重做历史 print(f"执行命令: {command}")defundo(self):"""撤销上一个命令"""if self.history: command = self.history.pop() command.undo() self.undo_history.append(command) print(f"撤销命令: {command}")else: print("没有可撤销的命令")defredo(self):"""重做上一个撤销的命令"""if self.undo_history: command = self.undo_history.pop() command.execute() self.history.append(command) print(f"重做命令: {command}")else: print("没有可重做的命令")defget_history(self):"""获取命令历史"""return [str(cmd) for cmd in self.history]# 使用示例editor = TextEditor()history = CommandHistory()print("=== 文本编辑器演示 ===\n")# 执行一系列命令add_hello = AddTextCommand(editor, "Hello ")history.execute_command(add_hello)add_world = AddTextCommand(editor, "World ")history.execute_command(add_world)add_python = AddTextCommand(editor, "Python ")history.execute_command(add_python)print(f"\n当前文本: '{editor.text}'")print(f"命令历史: {history.get_history()}")# 撤销操作print("\n--- 执行撤销 ---")history.undo()print(f"撤销后文本: '{editor.text}'")history.undo()print(f"再撤销后文本: '{editor.text}'")# 重做操作print("\n--- 执行重做 ---")history.redo()print(f"重做后文本: '{editor.text}'")# 执行删除命令print("\n--- 执行删除 ---")delete_cmd = DeleteLastWordCommand(editor)history.execute_command(delete_cmd)# 执行清空命令print("\n--- 执行清空 ---")clear_cmd = ClearTextCommand(editor)history.execute_command(clear_cmd)# 多次撤销print("\n--- 多次撤销 ---")for _ in range(3): history.undo()print(f"\n最终文本: '{editor.text}'")print(f"最终历史: {history.get_history()}")UI应用、CLI工具、编辑器——这种模式无处不在。
当过滤器变得复杂时,规范模式保持其优雅。
from abc import ABC, abstractmethodfrom typing import List, Dict, Anyfrom datetime import datetime, timedeltaclassSpecification(ABC):"""规范抽象基类""" @abstractmethoddefis_satisfied(self, item: Dict[str, Any]) -> bool:"""检查项是否满足规范"""passdef__and__(self, other: 'Specification') -> 'AndSpecification':"""与操作"""return AndSpecification(self, other)def__or__(self, other: 'Specification') -> 'OrSpecification':"""或操作"""return OrSpecification(self, other)def__invert__(self) -> 'NotSpecification':"""非操作"""return NotSpecification(self)def__call__(self, item: Dict[str, Any]) -> bool:"""使规范可调用"""return self.is_satisfied(item)classAgeAboveSpecification(Specification):"""年龄大于规范"""def__init__(self, min_age: int): self.min_age = min_agedefis_satisfied(self, user: Dict[str, Any]) -> bool: age = user.get('age')return age isnotNoneand age > self.min_agedef__str__(self):returnf"Age > {self.min_age}"classHasEmailSpecification(Specification):"""有邮箱规范"""defis_satisfied(self, user: Dict[str, Any]) -> bool: email = user.get('email')return bool(email and'@'in email)def__str__(self):return"Has Email"classIsActiveSpecification(Specification):"""活跃用户规范"""def__init__(self, days_threshold: int = 30): self.days_threshold = days_thresholddefis_satisfied(self, user: Dict[str, Any]) -> bool: last_login = user.get('last_login')ifnot last_login:returnFalseif isinstance(last_login, str): last_login = datetime.fromisoformat(last_login) days_since_login = (datetime.now() - last_login).daysreturn days_since_login <= self.days_thresholddef__str__(self):returnf"Active within {self.days_threshold} days"classBalanceAboveSpecification(Specification):"""余额大于规范"""def__init__(self, min_balance: float): self.min_balance = min_balancedefis_satisfied(self, user: Dict[str, Any]) -> bool: balance = user.get('balance', 0)return balance >= self.min_balancedef__str__(self):returnf"Balance >= {self.min_balance}"classAndSpecification(Specification):"""与规范"""def__init__(self, *specs: Specification): self.specs = specsdefis_satisfied(self, item: Dict[str, Any]) -> bool:return all(spec(item) for spec in self.specs)def__str__(self):return" AND ".join(str(spec) for spec in self.specs)classOrSpecification(Specification):"""或规范"""def__init__(self, *specs: Specification): self.specs = specsdefis_satisfied(self, item: Dict[str, Any]) -> bool:return any(spec(item) for spec in self.specs)def__str__(self):return" OR ".join(str(spec) for spec in self.specs)classNotSpecification(Specification):"""非规范"""def__init__(self, spec: Specification): self.spec = specdefis_satisfied(self, item: Dict[str, Any]) -> bool:returnnot self.spec(item)def__str__(self):returnf"NOT ({self.spec})"classUserFilter:"""用户过滤器""" @staticmethoddeffilter_users(users: List[Dict[str, Any]], spec: Specification) -> List[Dict[str, Any]]:"""根据规范过滤用户"""return [user for user in users if spec.is_satisfied(user)] @staticmethoddefcount_users(users: List[Dict[str, Any]], spec: Specification) -> int:"""统计满足规范的用户数"""return sum(1for user in users if spec.is_satisfied(user))# 示例数据users = [ {"id": 1, "name": "张三", "age": 25, "email": "zhangsan@example.com", "balance": 1000.0, "last_login": "2024-01-15T10:00:00", "is_active": True}, {"id": 2, "name": "李四", "age": 17, "email": "lisi@example.com", "balance": 500.0, "last_login": "2023-11-01T09:00:00", "is_active": True}, {"id": 3, "name": "王五", "age": 30, "email": "", "balance": 2000.0, "last_login": "2024-01-20T14:00:00", "is_active": False}, {"id": 4, "name": "赵六", "age": 22, "email": "zhaoliu@example.com", "balance": 300.0, "last_login": "2024-01-18T16:00:00", "is_active": True}, {"id": 5, "name": "孙七", "age": 35, "email": "sunqi@example.com", "balance": 1500.0, "last_login": "2023-12-01T08:00:00", "is_active": True},]print("=== 用户过滤系统 ===\n")# 基本规范adult_spec = AgeAboveSpecification(18)has_email_spec = HasEmailSpecification()active_spec = IsActiveSpecification(days_threshold=30)rich_spec = BalanceAboveSpecification(1000)print("1. 成年用户:")adults = UserFilter.filter_users(users, adult_spec)print(f" 规范: {adult_spec}")print(f" 数量: {len(adults)}")for user in adults: print(f" - {user['name']} ({user['age']}岁)")print("\n2. 有邮箱的活跃成年用户:")complex_spec = adult_spec & has_email_spec & active_specfiltered = UserFilter.filter_users(users, complex_spec)print(f" 规范: {complex_spec}")print(f" 数量: {len(filtered)}")for user in filtered: print(f" - {user['name']}")print("\n3. 有钱或活跃的用户:")rich_or_active = rich_spec | active_specfiltered = UserFilter.filter_users(users, rich_or_active)print(f" 规范: {rich_or_active}")print(f" 数量: {len(filtered)}")for user in filtered: print(f" - {user['name']} (余额: {user['balance']}, 最近登录: {user['last_login'][:10]})")print("\n4. 非活跃用户:")not_active = ~active_specfiltered = UserFilter.filter_users(users, not_active)print(f" 规范: {not_active}")print(f" 数量: {len(filtered)}")for user in filtered: print(f" - {user['name']} (最后登录: {user['last_login'][:10]})")# 动态构建复杂查询print("\n5. 动态构建的复杂查询:")# 查找:成年、有邮箱、余额>500,且(是活跃用户或余额>1500)dynamic_spec = ( adult_spec & has_email_spec & BalanceAboveSpecification(500) & (active_spec | BalanceAboveSpecification(1500)))print(f" 规范: {dynamic_spec}")count = UserFilter.count_users(users, dynamic_spec)print(f" 满足条件的用户数: {count}")filtered = UserFilter.filter_users(users, dynamic_spec)for user in filtered: print(f" - {user['name']}: 年龄{user['age']}, 余额{user['balance']}, 活跃{user['is_active']}")向迷宫般的过滤逻辑说再见。
如果你的"插件系统"使用if name == 'plugin_a':,那么你并没有插件系统。
from abc import ABC, abstractmethodfrom typing import Dict, Any, Callable, Optional, Typeimport importlibimport pkgutilclassPlugin(ABC):"""插件基类""" @abstractmethoddefexecute(self, *args, **kwargs) -> Any:"""执行插件"""passdefget_name(self) -> str:"""获取插件名称"""return self.__class__.__name__defget_version(self) -> str:"""获取插件版本"""return"1.0.0"classPluginRegistry:"""插件注册表""" _plugins: Dict[str, Type[Plugin]] = {} _instances: Dict[str, Plugin] = {} @classmethoddefregister(cls, name: Optional[str] = None):"""注册插件装饰器"""defdecorator(plugin_cls: Type[Plugin]) -> Type[Plugin]: plugin_name = name or plugin_cls.__name__.lower()if plugin_name in cls._plugins:raise ValueError(f"插件 '{plugin_name}' 已注册") cls._plugins[plugin_name] = plugin_cls print(f"注册插件: {plugin_name} -> {plugin_cls.__name__}")return plugin_clsreturn decorator @classmethoddefget_plugin(cls, name: str) -> Optional[Plugin]:"""获取插件实例"""if name notin cls._instances:if name in cls._plugins: cls._instances[name] = cls._plugins[name]()else:returnNonereturn cls._instances.get(name) @classmethoddefget_all_plugins(cls) -> Dict[str, Plugin]:"""获取所有插件实例"""# 确保所有插件都已实例化for name in cls._plugins:if name notin cls._instances: cls._instances[name] = cls._plugins[name]()return cls._instances.copy() @classmethoddeflist_plugins(cls) -> list:"""列出所有注册的插件"""return list(cls._plugins.keys()) @classmethoddefexecute_plugin(cls, name: str, *args, **kwargs) -> Any:"""执行指定插件""" plugin = cls.get_plugin(name)ifnot plugin:raise ValueError(f"未找到插件: {name}") print(f"执行插件: {name}")return plugin.execute(*args, **kwargs) @classmethoddefload_plugins_from_package(cls, package_name: str):"""从包中自动加载插件"""try: package = importlib.import_module(package_name)# 遍历包中的所有模块for _, module_name, is_pkg in pkgutil.iter_modules(package.__path__):ifnot is_pkg: full_module_name = f"{package_name}.{module_name}" importlib.import_module(full_module_name) print(f"从包 '{package_name}' 加载插件完成")except ImportError as e: print(f"加载包 '{package_name}' 失败: {e}")# 定义一些插件@PluginRegistry.register("greeter")classGreeterPlugin(Plugin):"""问候插件"""defexecute(self, name: str = "World") -> str: greeting = f"Hello, {name}!" print(greeting)return greetingdefget_version(self) -> str:return"2.0.0"@PluginRegistry.register("calculator")classCalculatorPlugin(Plugin):"""计算器插件"""defexecute(self, operation: str, a: float, b: float) -> float: operations = {"add": lambda x, y: x + y,"subtract": lambda x, y: x - y,"multiply": lambda x, y: x * y,"divide": lambda x, y: x / y if y != 0else float('inf') }if operation notin operations:raise ValueError(f"不支持的操作: {operation}") result = operations[operation](a, b) print(f"{a}{operation}{b} = {result}")return result@PluginRegistry.register("uppercase")classUppercasePlugin(Plugin):"""大写转换插件"""defexecute(self, text: str) -> str: result = text.upper() print(f"大写转换: '{text}' -> '{result}'")return result@PluginRegistry.register() # 使用默认名称classReversePlugin(Plugin):"""字符串反转插件"""defexecute(self, text: str) -> str: result = text[::-1] print(f"反转: '{text}' -> '{result}'")return result# 动态插件注册defregister_dynamic_plugin():"""动态注册插件(例如从配置文件加载)"""classDynamicPlugin(Plugin):def__init__(self, name, func): self._name = name self._func = funcdefexecute(self, *args, **kwargs): print(f"执行动态插件: {self._name}")return self._func(*args, **kwargs)defget_name(self):return self._name# 动态创建和注册插件defcustom_function(x):return x * 2# 动态注册 plugin_name = "dynamic_doubler" PluginRegistry._plugins[plugin_name] = type("DynamicDoubler", (Plugin,), {"execute": lambda self, x: custom_function(x),"get_name": lambda self: plugin_name } ) print(f"动态注册插件: {plugin_name}")# 使用示例print("=== 插件系统演示 ===\n")# 列出所有插件print("已注册的插件:")for plugin_name in PluginRegistry.list_plugins(): print(f" - {plugin_name}")print("\n1. 执行问候插件:")PluginRegistry.execute_plugin("greeter", "Python开发者")print("\n2. 执行计算器插件:")result = PluginRegistry.execute_plugin("calculator", "multiply", 6, 7)print(f" 结果: {result}")print("\n3. 执行大写插件:")PluginRegistry.execute_plugin("uppercase", "hello world")print("\n4. 执行反转插件:")PluginRegistry.execute_plugin("reverseplugin", "Python") # 使用默认名称print("\n5. 动态插件演示:")register_dynamic_plugin()# 执行动态插件if"dynamic_doubler"in PluginRegistry.list_plugins(): result = PluginRegistry.execute_plugin("dynamic_doubler", 21) print(f" 动态插件结果: {result}")print("\n6. 获取插件信息:")greeter = PluginRegistry.get_plugin("greeter")if greeter: print(f" 插件名称: {greeter.get_name()}") print(f" 插件版本: {greeter.get_version()}")print("\n7. 批量执行所有插件:")all_plugins = PluginRegistry.get_all_plugins()for name, plugin in all_plugins.items(): print(f" 执行 {name}: ", end="")if name == "greeter": plugin.execute("All Plugins")elif name == "calculator": plugin.execute("add", 10, 20)elif name == "uppercase": plugin.execute("test")elif name == "reverseplugin": plugin.execute("abcd")elif name == "dynamic_doubler": result = plugin.execute(15) print(f"结果: {result}", end="") print()# 模拟从包加载插件print("\n8. 模拟从包加载插件:")# 注意:这里需要实际创建包结构,此处仅演示概念print(" 调用 PluginRegistry.load_plugins_from_package('my_plugins')")print(" (假设my_plugins包中有更多插件)")这种模式为框架、CLI和可扩展系统提供动力。
大多数开发者认为架构是"以后学"的东西。这就是为什么你会在凌晨3点重写整个模块。
真相是什么?
模式是可扩展系统和慢动作灾难之间的区别。
当你开始有意识地使用它们时,你的代码会从"勉强能用"跃升到"这维护性也太好了吧"。
如果你已经用Python编码多年,并希望提升水平——这就是真正成长的开始。
你在项目中用过哪些设计模式?有没有遇到过特别适合或特别不适合使用设计模式的场景?欢迎在评论区分享你的经验和思考!
如果你觉得这篇文章有帮助,欢迎点赞、收藏、转发,让更多开发者受益。关注我,获取更多Python高级技巧和架构知识!

长按👇关注- 数据STUDIO -设为星标,干货速递
