Python 系统教程
Python 设计模式实战
Python 设计模式实战
所属阶段:第八阶段 · 实战项目
建议学习时间:4小时
重要程度:⭐⭐⭐⭐⭐
一、为什么学设计模式
1.1 设计模式的价值
设计模式(Design Pattern)是解决软件设计中反复出现问题的通用解决方案。学习设计模式的核心收益:
• 提升代码可复用性:模式本身就是经过验证的复用方案
• 改善代码可维护性:遵循模式的代码更易于理解和修改
• 提高沟通效率:团队成员用同一语言沟通("这里用工厂方法")
1.2 GoF 23 种模式分类
| 类型 | 模式 | 关注点 |
|------|------|--------|
| 创建型 | 单例、工厂方法、抽象工厂、建造者、原型 | 对象的创建 |
| 结构型 | 装饰器、适配器、代理、外观、组合、桥接、享元 | 对象的组合 |
| 行为型 | 观察者、策略、命令、责任链、模板方法、迭代器等 | 对象的交互 |
Python 特点:部分模式在 Python 中有更简洁的实现(函数即对象,鸭子类型),本文重点讲最实用的 10 个。
二、创建型模式
2.1 单例模式(Singleton)
意图:确保一个类只有一个实例,并提供全局访问点。
适用场景:数据库连接池、配置管理、日志系统。
python
# singleton.py
"""单例模式的四种 Python 实现"""
import threading
# ========== 方式1:模块级单例(最 Pythonic,推荐)==========
# Python 模块本身就是单例的,导入时只执行一次
# config.py 文件内容:
# settings = {"debug": False, "version": "1.0"}
# 任何地方 from config import settings 得到同一个对象
# ========== 方式2:使用 __new__(经典实现)==========
class SingletonV1:
"""__new__ 方式实现单例"""
_instance = None
_lock = threading.Lock() # 线程锁,防止多线程竞态
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
# 双重检查:进入锁后再次检查,避免重复创建
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, name: str = "default"):
# __init__ 可能被多次调用,需要防护
if not hasattr(self, "_initialized"):
self.name = name
self._initialized = True
# 验证单例
s1 = SingletonV1("第一次")
s2 = SingletonV1("第二次")
print(s1 is s2) # True
print(s1.name) # "第一次"(第二次创建被忽略)
# ========== 方式3:元类(metaclass)实现 ==========
class SingletonMeta(type):
"""单例元类:可让任意类变为单例"""
_instances = {}
_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class DatabaseConnection(metaclass=SingletonMeta):
"""数据库连接(单例)"""
def __init__(self, host: str = "localhost", port: int = 5432):
self.host = host
self.port = port
self.connected = False
print(f"创建数据库连接:{host}:{port}")
def connect(self):
self.connected = True
print(f"已连接到 {self.host}:{self.port}")
def query(self, sql: str) -> list:
print(f"执行查询:{sql}")
return []
# 使用示例
db1 = DatabaseConnection("db.example.com", 5432)
db2 = DatabaseConnection("another.host", 3306) # 不会创建新连接
print(db1 is db2) # True
print(db2.host) # "db.example.com"(第一次配置生效)
# ========== 方式4:装饰器实现 ==========
def singleton(cls):
"""单例装饰器"""
instances = {}
lock = threading.Lock()
def get_instance(*args, **kwargs):
if cls not in instances:
with lock:
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class AppConfig:
"""应用配置(单例)"""
def __init__(self):
self.debug = False
self.version = "1.0.0"
self.database_url = "sqlite:///./app.db"
config1 = AppConfig()
config2 = AppConfig()
print(config1 is config2) # True
2.2 工厂方法模式(Factory Method)
意图:定义创建对象的接口,由子类决定实例化哪个类。
python
# factory_method.py
"""工厂方法模式:根据参数动态创建不同类型的对象"""
from abc import ABC, abstractmethod
from typing import Dict, Type
# ===== 产品接口 =====
class Notification(ABC):
"""通知抽象基类"""
@abstractmethod
def send(self, recipient: str, message: str) -> bool:
"""发送通知"""
...
@abstractmethod
def get_channel(self) -> str:
"""获取通道名称"""
...
# ===== 具体产品 =====
class EmailNotification(Notification):
"""邮件通知"""
def __init__(self, smtp_host: str = "smtp.gmail.com"):
self.smtp_host = smtp_host
def send(self, recipient: str, message: str) -> bool:
print(f"[邮件] 发送到 {recipient}:{message}")
# 实际会调用 smtplib 发送
return True
def get_channel(self) -> str:
return "email"
class SMSNotification(Notification):
"""短信通知"""
def __init__(self, api_key: str = ""):
self.api_key = api_key
def send(self, recipient: str, message: str) -> bool:
print(f"[短信] 发送到 {recipient}:{message[:100]}")
return True
def get_channel(self) -> str:
return "sms"
class PushNotification(Notification):
"""推送通知"""
def send(self, recipient: str, message: str) -> bool:
print(f"[推送] 发送到设备 {recipient}:{message}")
return True
def get_channel(self) -> str:
return "push"
class WeChatNotification(Notification):
"""微信通知"""
def send(self, recipient: str, message: str) -> bool:
print(f"[微信] 发送到 {recipient}:{message}")
return True
def get_channel(self) -> str:
return "wechat"
# ===== 工厂(注册表模式,扩展性更强)=====
class NotificationFactory:
"""通知工厂:通过注册表管理所有通知类型"""
# ���册表:类型名 → 类
_registry: Dict[str, Type[Notification]] = {
"email": EmailNotification,
"sms": SMSNotification,
"push": PushNotification,
"wechat": WeChatNotification,
}
@classmethod
def register(cls, channel: str, notification_class: Type[Notification]):
"""注册新的通知类型(开放-封闭原则:扩展不需修改工厂)"""
cls._registry[channel] = notification_class
@classmethod
def create(cls, channel: str, **kwargs) -> Notification:
"""根据通道名称创建通知对象"""
if channel not in cls._registry:
raise ValueError(f"未知的通知通道:{channel},可用通道:{list(cls._registry.keys())}")
return cls._registry[channel](**kwargs)
@classmethod
def available_channels(cls) -> list:
return list(cls._registry.keys())
# 使用示例
notifier = NotificationFactory.create("email", smtp_host="smtp.example.com")
notifier.send("user@example.com", "您的订单已发货!")
# 批量发送多渠道通知
def send_multi_channel(recipient: str, message: str, channels: list):
"""多渠道通知发送"""
results = {}
for channel in channels:
notifier = NotificationFactory.create(channel)
results[channel] = notifier.send(recipient, message)
return results
results = send_multi_channel("user001", "操作成功", ["email", "sms"])
print(results)
# 动态扩展:无需修改工厂代码
class DingTalkNotification(Notification):
"""钉钉通知"""
def send(self, recipient: str, message: str) -> bool:
print(f"[钉钉] @{recipient}:{message}")
return True
def get_channel(self) -> str:
return "dingtalk"
NotificationFactory.register("dingtalk", DingTalkNotification)
dt = NotificationFactory.create("dingtalk")
dt.send("张三", "有新的审批任务")
2.3 建造者模式(Builder)
意图:分步骤构建复杂对象,使同样的构建过程可以创建不同的产品。
python
# builder.py
"""建造者模式:链式调用构建复杂对象"""
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class QueryConfig:
"""SQL 查询配置(产品)"""
table: str = ""
columns: List[str] = field(default_factory=list)
conditions: List[str] = field(default_factory=list)
order_by: Optional[str] = None
limit: Optional[int] = None
offset: int = 0
joins: List[str] = field(default_factory=list)
class QueryBuilder:
"""SQL 查询建造者(支持链式调用)"""
def __init__(self):
self._config = QueryConfig()
def from_table(self, table: str) -> "QueryBuilder":
self._config.table = table
return self # 返�� self 支持链式调用
def select(self, *columns: str) -> "QueryBuilder":
self._config.columns.extend(columns)
return self
def where(self, condition: str) -> "QueryBuilder":
self._config.conditions.append(condition)
return self
def order_by(self, column: str, direction: str = "ASC") -> "QueryBuilder":
self._config.order_by = f"{column} {direction}"
return self
def limit(self, n: int) -> "QueryBuilder":
self._config.limit = n
return self
def offset(self, n: int) -> "QueryBuilder":
self._config.offset = n
return self
def join(self, table: str, on: str, join_type: str = "INNER") -> "QueryBuilder":
self._config.joins.append(f"{join_type} JOIN {table} ON {on}")
return self
def build(self) -> str:
"""生成最终的 SQL 字符串"""
if not self._config.table:
raise ValueError("必须指定查询表名(from_table)")
columns = ", ".join(self._config.columns) if self._config.columns else "*"
sql = f"SELECT {columns} FROM {self._config.table}"
for join in self._config.joins:
sql += f"\n {join}"
if self._config.conditions:
sql += "\nWHERE " + " AND ".join(self._config.conditions)
if self._config.order_by:
sql += f"\nORDER BY {self._config.order_by}"
if self._config.limit:
sql += f"\nLIMIT {self._config.limit}"
if self._config.offset:
sql += f"\nOFFSET {self._config.offset}"
return sql
# 链式调用构建复杂查询
sql = (
QueryBuilder()
.from_table("users")
.select("users.id", "users.name", "orders.total")
.join("orders", on="users.id = orders.user_id")
.where("users.is_active = true")
.where("orders.total > 1000")
.order_by("orders.total", "DESC")
.limit(20)
.offset(40)
.build()
)
print(sql)
三、结构型模式
3.1 类装饰器模式(Class Decorator)
前面学过函数装饰器,这里讲类装饰器:用类实现装饰器,可以保存更多状态。
python
# class_decorator.py
"""类装饰器:携带状态的装饰器"""
import time
import functools
from typing import Callable
class Retry:
"""
重试装饰器(类实现)
支持配置最大重试次数和等待时间
"""
def __init__(self, max_retries: int = 3, delay: float = 1.0, exceptions=(Exception,)):
"""
:param max_retries: 最大重试次数
:param delay: 重试间隔(秒)
:param exceptions: 需要重试的异常类型
"""
self.max_retries = max_retries
self.delay = delay
self.exceptions = exceptions
def __call__(self, func: Callable) -> Callable:
"""当装饰器被调用时执行"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except self.exceptions as e:
last_exception = e
if attempt < self.max_retries:
print(f"第 {attempt + 1} 次尝试失败:{e},{self.delay}s 后重试...")
time.sleep(self.delay)
else:
print(f"已达到最大重试次数 {self.max_retries}")
raise last_exception
return wrapper
class Cache:
"""缓存装饰器(类实现,支持 TTL 过期)"""
def __init__(self, ttl: int = 60):
"""
:param ttl: 缓存存活时间(秒)
"""
self.ttl = ttl
self._cache = {} # {key: (value, expire_time)}
def __call__(self, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = (args, tuple(sorted(kwargs.items())))
# 检查缓存是否有效
if cache_key in self._cache:
value, expire_time = self._cache[cache_key]
if time.time() < expire_time:
print(f"[缓存命中] {func.__name__}")
return value
# 缓存未命中,执行函数
result = func(*args, **kwargs)
self._cache[cache_key] = (result, time.time() + self.ttl)
print(f"[缓存写入] {func.__name__}")
return result
def clear():
"""清空所有缓存"""
self._cache.clear()
wrapper.clear = clear
return wrapper
# 使用类装饰器
@Retry(max_retries=3, delay=0.5, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url: str) -> dict:
"""模拟可能失败的网络请求"""
import random
if random.random() < 0.7:
raise ConnectionError("网络连接失败")
return {"data": "success", "url": url}
cache_instance = Cache(ttl=30)
@cache_instance
def expensive_calculation(n: int) -> int:
"""模拟耗时计算"""
time.sleep(0.1)
return n * n
# 测试缓存
print(expensive_calculation(5)) # 计算,写入缓存
print(expensive_calculation(5)) # 直接从缓存返回
expensive_calculation.clear() # 清空缓存
3.2 适配器模式(Adapter)
意图:将不兼容的接口转换为客户端期望的接口。
python
# adapter.py
"""适配器模式:让不兼容的接口协同工作"""
from abc import ABC, abstractmethod
# ===== 目标接口(客户端期望的接口)=====
class DataExporter(ABC):
"""数据导出器接口"""
@abstractmethod
def export(self, data: list, filename: str) -> str:
"""导出数据,返回文件路径"""
...
# ===== 被适配的第三方库(接口不兼容)=====
class ThirdPartyCSVWriter:
"""第三方 CSV 库(接口与我们的不兼容)"""
def write_to_file(self, rows: list, path: str, delimiter=",") -> bool:
"""写入 CSV,接口与 DataExporter 不同"""
print(f"[第三方CSV] 写入 {len(rows)} 行到 {path}")
return True
class ThirdPartyExcelWriter:
"""第三方 Excel 库"""
def create_workbook(self, sheet_data: dict, output_path: str):
"""创建 Excel 工作簿,接口完全不同"""
print(f"[第三方Excel] 创建工作簿 {output_path},{len(sheet_data)} 个工作表")
# ===== 适配器 =====
class CSVAdapter(DataExporter):
"""将第三方 CSV 库适配为 DataExporter 接口"""
def __init__(self):
self._writer = ThirdPartyCSVWriter()
def export(self, data: list, filename: str) -> str:
"""将 DataExporter 接口转换为第三方库接口"""
path = f"{filename}.csv"
success = self._writer.write_to_file(data, path, delimiter=",")
return path if success else ""
class ExcelAdapter(DataExporter):
"""将第三方 Excel 库适配为 DataExporter 接口"""
def __init__(self):
self._writer = ThirdPartyExcelWriter()
def export(self, data: list, filename: str) -> str:
"""转换数据格式并调用第三方库"""
path = f"{filename}.xlsx"
# 数据格式转换:list → dict(第三方库要求的格式)
sheet_data = {"Sheet1": data}
self._writer.create_workbook(sheet_data, path)
return path
# ===== 客户端代码(只依赖 DataExporter 接口)=====
def generate_report(exporter: DataExporter, data: list, report_name: str):
"""生成报��(不关心具体导出格式)"""
path = exporter.export(data, report_name)
print(f"报告已生成:{path}")
sample_data = [{"name": "张三", "score": 95}, {"name": "李四", "score": 88}]
# 灵活切换格式,不修改客户端代码
generate_report(CSVAdapter(), sample_data, "月度报告")
generate_report(ExcelAdapter(), sample_data, "月度报告")
3.3 代理模式(Proxy)
意图:为对象提供代理,控制对原对象的访问。
python
# proxy.py
"""代理模式:访问控制、懒加载、日志记录"""
from abc import ABC, abstractmethod
import time
class DatabaseService(ABC):
"""数据库服务接口"""
@abstractmethod
def query(self, sql: str) -> list:
...
@abstractmethod
def execute(self, sql: str) -> int:
...
class RealDatabaseService(DatabaseService):
"""真实的数据库服务(高开销)"""
def __init__(self, connection_string: str):
print(f"建立数据库连接:{connection_string}(耗时2秒...)")
time.sleep(0.1) # 模拟连接耗时
def query(self, sql: str) -> list:
print(f"执行查询:{sql}")
return [{"id": 1, "name": "测试"}]
def execute(self, sql: str) -> int:
print(f"执行写操作:{sql}")
return 1
class DatabaseProxy(DatabaseService):
"""
数据库代理:
1. 懒加载(延迟初始化真实服务)
2. 访问控制(只读用户不能执行写操作)
3. 查询日志
4. 简单缓存
"""
def __init__(self, connection_string: str, read_only: bool = False):
self._connection_string = connection_string
self._read_only = read_only
self._real_service = None # 延迟初始化
self._cache = {} # 简单查询缓存
self._query_count = 0
def _get_service(self) -> RealDatabaseService:
"""懒加载:第一次访问时才创建真实连接"""
if self._real_service is None:
self._real_service = RealDatabaseService(self._connection_string)
return self._real_service
def query(self, sql: str) -> list:
# 缓存检查
if sql in self._cache:
print(f"[代理缓存命中] {sql}")
return self._cache[sql]
self._query_count += 1
start = time.perf_counter()
result = self._get_service().query(sql)
elapsed = (time.perf_counter() - start) * 1000
print(f"[代理日志] 查询耗时 {elapsed:.2f}ms,累计查询 {self._query_count} 次")
self._cache[sql] = result
return result
def execute(self, sql: str) -> int:
# 访问控制:只读用户拒绝写操作
if self._read_only:
raise PermissionError("只读用户不允许执行写操作")
return self._get_service().execute(sql)
@property
def stats(self) -> dict:
return {
"query_count": self._query_count,
"cache_size": len(self._cache),
"connected": self._real_service is not None,
}
# 使用代理
proxy = DatabaseProxy("postgresql://localhost/mydb", read_only=False)
data = proxy.query("SELECT * FROM users")
data_again = proxy.query("SELECT * FROM users") # 从缓存返回
print(proxy.stats)
四、行为型模式
4.1 观察者模式(Observer)/ 事件系统
意图:定义对象间的一对多依赖,当对象状态变化时,通知所有依赖者。
python
# observer.py
"""观察者模式:事件驱动系统"""
from typing import Callable, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Event:
"""事件对象"""
name: str # 事件名称
data: Any # 事件数据
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class EventBus:
"""
事件总线(发布-订阅模式)
适用于:模块间解耦通信、GUI 事件处理、消息通知系统
"""
def __init__(self):
# 事件名 → 订阅者列表
self._subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, event_name: str, handler: Callable):
"""
订阅事件
:param event_name: 事件名称(支持通配符 "*" 订阅所有事件)
:param handler: 事件处理函数 fn(event: Event)
"""
if event_name not in self._subscribers:
self._subscribers[event_name] = []
self._subscribers[event_name].append(handler)
print(f"[EventBus] {handler.__name__} 订阅了事件:{event_name}")
def unsubscribe(self, event_name: str, handler: Callable):
"""取消订阅"""
if event_name in self._subscribers:
self._subscribers[event_name] = [
h for h in self._subscribers[event_name] if h != handler
]
def publish(self, event: Event):
"""发布事件,通知所有订阅者"""
handlers = self._subscribers.get(event.name, [])
wildcard_handlers = self._subscribers.get("*", [])
all_handlers = handlers + wildcard_handlers
print(f"[EventBus] 发布事件:{event.name},通知 {len(all_handlers)} 个订阅者")
for handler in all_handlers:
try:
handler(event)
except Exception as e:
print(f"[EventBus] 处理器 {handler.__name__} 出错:{e}")
def on(self, event_name: str):
"""装饰器形式的订阅"""
def decorator(func: Callable) -> Callable:
self.subscribe(event_name, func)
return func
return decorator
# 全局事件总线
bus = EventBus()
# ===== 各模块作为订阅者 =====
@bus.on("user.registered")
def send_welcome_email(event: Event):
"""新用户注册后发送欢迎邮件"""
user = event.data
print(f"[邮件服务] 发送欢迎邮件给 {user['email']}")
@bus.on("user.registered")
def create_user_profile(event: Event):
"""新用户注册后创建默认配置"""
user = event.data
print(f"[配置服务] 为用户 {user['username']} 创建默认配置")
@bus.on("user.registered")
def log_registration(event: Event):
"""记录注册日志"""
user = event.data
print(f"[审计日志] 新用户注册:{user['username']} @ {event.timestamp}")
@bus.on("*")
def global_monitor(event: Event):
"""全局事件监控(订阅���有事件)"""
print(f"[监控] 事件:{event.name},时间:{event.timestamp.strftime('%H:%M:%S')}")
# ===== 发布者 =====
class UserService:
def register(self, username: str, email: str) -> dict:
"""用户注册(只关注核心逻辑,不关心后续操作)"""
user = {"id": 1, "username": username, "email": email}
print(f"\n[UserService] 用户 {username} 注册成功")
# 发布事件,触发所有订阅者(解耦!)
bus.publish(Event("user.registered", data=user))
return user
# 测试
service = UserService()
service.register("张三", "zhang@example.com")
4.2 策略模式(Strategy)
意图:定义一系列算法,将每个算法封装起来,使它们可以相互替换。
python
# strategy.py
"""策略模式:函数作为策略(Python 最简洁的实现)"""
from typing import Callable, List
from dataclasses import dataclass
@dataclass
class Product:
name: str
price: float
category: str
rating: float
# Python 中函数是一等公民,直接用函数作为策略(无需抽象基类)
def sort_by_price_asc(products: List[Product]) -> List[Product]:
return sorted(products, key=lambda p: p.price)
def sort_by_price_desc(products: List[Product]) -> List[Product]:
return sorted(products, key=lambda p: p.price, reverse=True)
def sort_by_rating(products: List[Product]) -> List[Product]:
return sorted(products, key=lambda p: p.rating, reverse=True)
def sort_by_name(products: List[Product]) -> List[Product]:
return sorted(products, key=lambda p: p.name)
# 折扣策略
def no_discount(price: float) -> float:
return price
def percentage_discount(percent: float) -> Callable:
"""返回一个折扣策略函数(闭包)"""
def strategy(price: float) -> float:
return price * (1 - percent / 100)
strategy.__name__ = f"折扣{percent}%"
return strategy
def member_discount(price: float) -> float:
"""会员折扣:100以上打9折"""
return price * 0.9 if price >= 100 else price
# 使用策略
class ProductCatalog:
"""商品目录(使用策略模式排序和定价)"""
def __init__(self, products: List[Product]):
self.products = products
def get_sorted(self, strategy: Callable[[List[Product]], List[Product]]) -> List[Product]:
"""使用指定策略排序"""
return strategy(self.products)
def get_price(self, product: Product, discount_strategy: Callable[[float], float]) -> float:
"""使用指定折扣策略计算价格"""
return round(discount_strategy(product.price), 2)
products = [
Product("手机", 3999, "电子", 4.5),
Product("耳机", 599, "电子", 4.8),
Product("书包", 299, "配件", 4.2),
Product("键盘", 899, "电子", 4.9),
]
catalog = ProductCatalog(products)
# 动态切换排序策略
print("按价格升序:", [p.name for p in catalog.get_sorted(sort_by_price_asc)])
print("按评分降序:", [p.name for p in catalog.get_sorted(sort_by_rating)])
# 动态切换折扣策略
phone = products[0]
print(f"原价:{phone.price}")
print(f"会员价:{catalog.get_price(phone, member_discount)}")
print(f"7折价:{catalog.get_price(phone, percentage_discount(30))}")
# 策略注册表(让策略可配置)
sort_strategies = {
"price_asc": sort_by_price_asc,
"price_desc": sort_by_price_desc,
"rating": sort_by_rating,
"name": sort_by_name,
}
user_choice = "rating"
sorted_products = catalog.get_sorted(sort_strategies[user_choice])
4.3 命令模式(Command)
意图:将请求封装为对象,支持撤销、队列、日志。
python
# command.py
"""命令模式:可撤销的操作"""
from abc import ABC, abstractmethod
from typing import List
class Command(ABC):
"""命令接口"""
@abstractmethod
def execute(self): ...
@abstractmethod
def undo(self): ...
class TextEditor:
"""文本编辑器(接收者)"""
def __init__(self):
self.text = ""
def insert(self, pos: int, content: str):
self.text = self.text[:pos] + content + self.text[pos:]
def delete(self, pos: int, length: int):
self.text = self.text[:pos] + self.text[pos + length:]
class InsertCommand(Command):
"""插入文本命令"""
def __init__(self, editor: TextEditor, pos: int, content: str):
self.editor = editor
self.pos = pos
self.content = content
def execute(self):
self.editor.insert(self.pos, self.content)
def undo(self):
self.editor.delete(self.pos, len(self.content))
class DeleteCommand(Command):
"""删除文本命令"""
def __init__(self, editor: TextEditor, pos: int, length: int):
self.editor = editor
self.pos = pos
self.length = length
self.deleted_text = "" # 保存删除的内容(用于撤销)
def execute(self):
self.deleted_text = self.editor.text[self.pos:self.pos + self.length]
self.editor.delete(self.pos, self.length)
def undo(self):
self.editor.insert(self.pos, self.deleted_text)
class CommandHistory:
"""命令历史(支持撤销/重做)"""
def __init__(self):
self._history: List[Command] = []
self._redo_stack: List[Command] = []
def execute(self, command: Command):
command.execute()
self._history.append(command)
self._redo_stack.clear() # 执行新命令清空重做栈
def undo(self):
if not self._history:
print("没有可撤销的操作")
return
command = self._history.pop()
command.undo()
self._redo_stack.append(command)
def redo(self):
if not self._redo_stack:
print("没有可重做的操作")
return
command = self._redo_stack.pop()
command.execute()
self._history.append(command)
# 使用示例
editor = TextEditor()
history = CommandHistory()
history.execute(InsertCommand(editor, 0, "Hello"))
print(editor.text) # Hello
history.execute(InsertCommand(editor, 5, ", World"))
print(editor.text) # Hello, World
history.undo()
print(editor.text) # Hello
history.redo()
print(editor.text) # Hello, World
4.4 责任链模式(Chain of Responsibility)
意图:使多个对象都有机会处理请求,将这些对象连成一条链,依次传递请求。
python
# chain_of_responsibility.py
"""责任链:HTTP 请求中间件、审批流程"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class Request:
"""HTTP 请求(模拟)"""
path: str
method: str
headers: dict
user_id: Optional[int] = None
body: dict = None
class Middleware(ABC):
"""中间件基类(责任链节点)"""
def __init__(self):
self._next: Optional["Middleware"] = None
def set_next(self, middleware: "Middleware") -> "Middleware":
"""设置下一个中间件,支持链式调用"""
self._next = middleware
return middleware
def handle(self, request: Request) -> Optional[str]:
"""处理请求,不处理则传递给下一个"""
if self._next:
return self._next.handle(request)
return None
class AuthMiddleware(Middleware):
"""认证中间件:验证 Token"""
def handle(self, request: Request) -> Optional[str]:
print("[Auth] 验证 Token...")
if "Authorization" not in request.headers:
return "401 Unauthorized: 缺少 Token"
# 模拟 Token 验证
request.user_id = 1
print("[Auth] Token 验证通过")
return super().handle(request)
class RateLimitMiddleware(Middleware):
"""限流中间件:防止请求过于频繁"""
def __init__(self, max_requests: int = 100):
super().__init__()
self.max_requests = max_requests
self.request_counts = {}
def handle(self, request: Request) -> Optional[str]:
print("[RateLimit] 检查请求频率...")
user_id = request.user_id or "anonymous"
count = self.request_counts.get(user_id, 0)
if count >= self.max_requests:
return "429 Too Many Requests: 请求过于频繁"
self.request_counts[user_id] = count + 1
return super().handle(request)
class LoggingMiddleware(Middleware):
"""日志中间件:记录所有请求"""
def handle(self, request: Request) -> Optional[str]:
print(f"[Log] {request.method} {request.path}")
result = super().handle(request)
print(f"[Log] 响应:{result}")
return result
class RouteHandler(Middleware):
"""路由处理(最终处理者)"""
def handle(self, request: Request) -> Optional[str]:
return f"200 OK: 处理了 {request.method} {request.path}"
# 构建责任链
auth = AuthMiddleware()
rate_limit = RateLimitMiddleware(max_requests=10)
logging = LoggingMiddleware()
handler = RouteHandler()
# 链式连接:logging → auth → rate_limit → handler
logging.set_next(auth).set_next(rate_limit).set_next(handler)
# 测试
print("=== 正常请求 ===")
req1 = Request("/api/users", "GET", {"Authorization": "Bearer token123"})
result = logging.handle(req1)
print("\n=== 未认证请求 ===")
req2 = Request("/api/users", "GET", {})
result = logging.handle(req2)
print(f"结果:{result}")
总结
| 模式 | 类型 | 核心意图 | Python 最佳实践 |
|------|------|---------|----------------|
| 单例 | 创建型 | 全局唯一实例 | 模块级变量 / 元类 |
| 工厂方法 | 创建型 | 解耦对象创建 | 注册表 + 字典 |
| 建造者 | 创建型 | 分步构建复杂对象 | 链式调用(返回 self) |
| 装饰器 | 结构型 | 动态添加功能 | 类装饰器(保存状态) |
| 适配器 | 结构型 | 接口转换 | 组合优于继承 |
| 代理 | 结构型 | 控制访问 | 懒加载 + 缓存 + 权限 |
| 观察者 | 行为型 | 事件驱动解耦 | 事件总线 / 回调列表 |
| 策略 | 行为型 | 算法可替换 | 函数作为策略(一等公民)|
| 命令 | 行为型 | 操作对象化 | 支持撤销/重做 |
| 责任链 | 行为型 | 请求依次处理 | 中间件链 |
课后练习
1. 基础练习:使用工厂模式实现一个日志格式化器工厂,支持 JSON、文本、CSV 三种格式。
2. 进阶练习:使用观察者模式实现一个简单的股价监控系统,当股价超过阈值时触发短信和邮件通知。
3. 挑战练习:综合运用策略模式+命令模式+建造者模式,实现一个数据处理管道:支持配置多种处理步骤(清洗、转换、验证)的顺序,并支持撤销最后一步处理。
回到目录:README