一、适配器模式 (Adapter)
适配器模式用于解决接口不兼容的问题,让原本无法一起工作的类能够协同工作。
1.1 经典实现
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
# 目标接口
classDataAnalyzer(ABC):
@abstractmethod
defanalyze(self) -> Dict[str, Any]:
pass
# 现有系统A(不兼容的接口)
classCSVDataProvider:
def__init__(self, filename: str):
self.filename = filename
defread_csv(self) -> List[List[str]]:
# 模拟读取CSV
return [
['name', 'age', 'score'],
['Alice', '25', '95'],
['Bob', '30', '87']
]
defparse_headers(self, data: List[List[str]]) -> List[str]:
return data[0]
# 现有系统B(不兼容的接口)
classJSONDataProvider:
def__init__(self, json_str: str):
self.json_str = json_str
defload_json(self) -> Dict[str, Any]:
# 模拟JSON数据
return {
'users': [
{'name': 'Alice', 'age': 25, 'score': 95},
{'name': 'Bob', 'age': 30, 'score': 87}
]
}
# 适配器1:适配CSV数据源
classCSVAdapter(DataAnalyzer):
def__init__(self, csv_provider: CSVDataProvider):
self._provider = csv_provider
defanalyze(self) -> Dict[str, Any]:
data = self._provider.read_csv()
headers = self._provider.parse_headers(data)
rows = data[1:]
# 转换为统一格式
records = []
for row in rows:
record = dict(zip(headers, row))
record['age'] = int(record['age'])
record['score'] = int(record['score'])
records.append(record)
return self._perform_analysis(records)
def_perform_analysis(self, records: List[Dict]) -> Dict[str, Any]:
scores = [r['score'] for r in records]
ages = [r['age'] for r in records]
return {
'avg_score': sum(scores) / len(scores),
'avg_age': sum(ages) / len(ages),
'total_records': len(records),
'records': records
}
# 适配器2:适配JSON数据源
classJSONAdapter(DataAnalyzer):
def__init__(self, json_provider: JSONDataProvider):
self._provider = json_provider
defanalyze(self) -> Dict[str, Any]:
data = self._provider.load_json()
records = data['users']
scores = [r['score'] for r in records]
ages = [r['age'] for r in records]
return {
'avg_score': sum(scores) / len(scores),
'avg_age': sum(ages) / len(ages),
'total_records': len(records),
'records': records
}
# 使用示例
defmain_adapter():
csv_provider = CSVDataProvider('data.csv')
json_provider = JSONDataProvider('{"users": [...]}')
analyzers = [
CSVAdapter(csv_provider),
JSONAdapter(json_provider)
]
for analyzer in analyzers:
result = analyzer.analyze()
print(f"分析结果: 平均分={result['avg_score']}, 平均年龄={result['avg_age']}")
1.2 使用Python特性的高级适配器
from functools import wraps
import inspect
classMethodAdapter:
"""使用描述符协议实现的方法级适配器"""
def__init__(self, adaptee_method, converter):
self.adaptee_method = adaptee_method
self.converter = converter
def__get__(self, instance, owner):
if instance isNone:
return self
@wraps(self.adaptee_method)
defwrapper(*args, **kwargs):
# 转换参数
converted_args = [self.converter(arg) for arg in args]
# 调用原始方法
result = self.adaptee_method(instance, *converted_args, **kwargs)
return result
return wrapper
# 使用类装饰器实现的适配器工厂
defadapt_to(target_interface):
"""类装饰器:自动生成适配器类"""
defdecorator(source_class):
target_methods = [m for m in dir(target_interface)
ifnot m.startswith('_') and
callable(getattr(target_interface, m))]
classAdapterClass:
def__init__(self, source_obj):
self._source = source_obj
def__getattr__(self, name):
if name in target_methods:
# 动态适配方法
source_method = getattr(self._source, f'_{name}', None)
if source_method:
return source_method
raise AttributeError(f"No adaptation for {name}")
AdapterClass.__name__ = f"{source_class.__name__}Adapter"
return AdapterClass
return decorator
# 使用示例
classLegacyPaymentSystem:
defprocess_payment_legacy(self, amount_cents: int, currency: str):
print(f"处理付款: {amount_cents/100}{currency}")
classNewPaymentInterface:
defprocess_payment(self, amount: float, currency: str):
pass
@adapt_to(NewPaymentInterface)
classLegacyPaymentSystem:
pass
# 或者使用更灵活的方式
classFlexibleAdapter:
"""支持多种适配策略的适配器"""
def__init__(self, adaptee, adapters_config: Dict[str, callable]):
self._adaptee = adaptee
self._adapters = adapters_config
def__getattr__(self, name):
if name in self._adapters:
adapter_func = self._adapters[name]
returnlambda *args, **kwargs: adapter_func(self._adaptee, *args, **kwargs)
# 直接转发未适配的方法
return getattr(self._adaptee, name)
二、代理模式 (Proxy)
代理模式控制对对象的访问,可以在访问前后添加额外逻辑。
2.1 虚拟代理与延迟加载
import time
from typing import Optional, Any
from functools import lru_cache
classHeavyDatabase:
"""模拟重量级数据库连接"""
def__init__(self, connection_string: str):
print(f"建立到 {connection_string} 的重量级连接...")
time.sleep(2) # 模拟慢速连接
self._connection_string = connection_string
self._cache = {}
defquery(self, sql: str) -> List[Dict]:
print(f"执行查询: {sql[:50]}...")
time.sleep(0.5) # 模拟查询耗时
# 模拟返回数据
return [{"id": i, "data": f"result_{i}"} for i in range(10)]
defclose(self):
print("关闭数据库连接")
classDatabaseProxy:
"""代理模式:延迟初始化和连接池管理"""
_instance = None
_connection_count = 0
_max_connections = 5
def__init__(self, connection_string: str):
self._connection_string = connection_string
self._real_db: Optional[HeavyDatabase] = None
self._query_cache = {}
def_get_db(self) -> HeavyDatabase:
"""懒加载真实对象"""
if self._real_db isNone:
# 检查连接池限制
if DatabaseProxy._connection_count >= DatabaseProxy._max_connections:
raise ConnectionError("连接池已满")
self._real_db = HeavyDatabase(self._connection_string)
DatabaseProxy._connection_count += 1
return self._real_db
@lru_cache(maxsize=128)
defquery(self, sql: str) -> List[Dict]:
"""带缓存的查询"""
# 访问控制检查
if"DROP"in sql.upper() or"DELETE"in sql.upper():
raise PermissionError("不允许执行危险操作")
# 记录查询日志
print(f"[LOG] 查询: {sql[:50]}...")
# 执行实际查询
result = self._get_db().query(sql)
return result
def__enter__(self):
return self
def__exit__(self, exc_type, exc_val, exc_tb):
self.close()
defclose(self):
if self._real_db:
self._real_db.close()
DatabaseProxy._connection_count -= 1
# 使用示例
defmain_proxy():
# 注意:此时并未真正连接数据库
db_proxy = DatabaseProxy("postgresql://localhost/mydb")
# 第一次查询时才会建立连接
result1 = db_proxy.query("SELECT * FROM users WHERE age > 18")
# 第二次查询使用缓存
result2 = db_proxy.query("SELECT * FROM users WHERE age > 18")
assert result1 == result2 # 缓存命中
# 危险操作被阻止
try:
db_proxy.query("DROP TABLE users")
except PermissionError as e:
print(f"安全拦截: {e}")
2.2 动态代理与AOP
from typing import Any, Callable, Dict
import logging
from datetime import datetime
classDynamicProxy:
"""使用__getattr__实现动态代理"""
def__init__(self, target: Any, interceptors: Dict[str, Callable] = None):
self._target = target
self._interceptors = interceptors or {}
def__getattr__(self, name: str) -> Any:
attr = getattr(self._target, name)
ifnot callable(attr):
return attr
defintercepted_method(*args, **kwargs):
# 前置拦截
if'before'in self._interceptors:
self._interceptors['before'](self._target, name, args, kwargs)
# 执行原始方法
try:
result = attr(*args, **kwargs)
# 后置拦截
if'after'in self._interceptors:
result = self._interceptors['after'](self._target, name, result)
return result
except Exception as e:
# 异常拦截
if'error'in self._interceptors:
self._interceptors['error'](self._target, name, e)
raise
return intercepted_method
# 具体应用:带日志和性能监控的服务代理
classUserService:
defget_user(self, user_id: int) -> Dict:
# 模拟数据库查询
return {"id": user_id, "name": f"User_{user_id}", "email": f"user{user_id}@example.com"}
defupdate_user(self, user_id: int, data: Dict) -> bool:
print(f"更新用户 {user_id}: {data}")
returnTrue
defdelete_user(self, user_id: int) -> bool:
print(f"删除用户 {user_id}")
returnTrue
defcreate_proxied_service():
"""工厂函数:创建带完整拦截器的代理服务"""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
deflog_before(target, method_name, args, kwargs):
logger.info(f"[{datetime.now()}] 调用 {method_name},参数: args={args}, kwargs={kwargs}")
deflog_after(target, method_name, result):
logger.info(f"[{datetime.now()}] {method_name} 返回: {result}")
return result
defhandle_error(target, method_name, error):
logger.error(f"[{datetime.now()}] {method_name} 失败: {error}")
interceptors = {
'before': log_before,
'after': log_after,
'error': handle_error
}
return DynamicProxy(UserService(), interceptors)
# 更高级:基于装饰器的代理工厂
defproxy(class_or_instance, **interceptors):
"""装饰器风格的代理工厂"""
defcreate_proxy(obj):
return DynamicProxy(obj, interceptors)
if isinstance(class_or_instance, type):
# 代理整个类
classProxiedClass:
def__init__(self, *args, **kwargs):
self._instance = class_or_instance(*args, **kwargs)
self._proxy = DynamicProxy(self._instance, interceptors)
def__getattr__(self, name):
return getattr(self._proxy, name)
return ProxiedClass
else:
# 代理实例
return DynamicProxy(class_or_instance, interceptors)
三、外观模式 (Facade)
外观模式为复杂子系统提供简化的统一接口。
3.1 复杂系统的简化接口
# 复杂的子系统
classVideoDecoder:
defdecode(self, video_data: bytes) -> bytes:
print("解码视频流...")
return video_data # 简化处理
classAudioDecoder:
defdecode(self, audio_data: bytes) -> bytes:
print("解码音频流...")
return audio_data
classSubtitleParser:
defparse(self, subtitle_data: str) -> list:
print("解析字幕...")
return [{"start": 0, "end": 10, "text": "字幕内容"}]
classVideoRenderer:
defrender(self, decoded_video: bytes) -> None:
print("渲染视频帧...")
classAudioRenderer:
defrender(self, decoded_audio: bytes) -> None:
print("播放音频...")
classSubtitleRenderer:
defrender(self, subtitles: list, timestamp: float) -> None:
print(f"在 {timestamp}s 显示字幕...")
classNetworkLoader:
defload(self, url: str) -> Dict[str, bytes]:
print(f"从 {url} 下载媒体文件...")
return {
'video': b'video_data',
'audio': b'audio_data',
'subtitle': b'subtitle_data'
}
# 外观模式:媒体播放器统一接口
classMediaPlayerFacade:
"""为复杂的媒体播放子系统提供简单接口"""
def__init__(self):
self._network = NetworkLoader()
self._video_decoder = VideoDecoder()
self._audio_decoder = AudioDecoder()
self._subtitle_parser = SubtitleParser()
self._video_renderer = VideoRenderer()
self._audio_renderer = AudioRenderer()
self._subtitle_renderer = SubtitleRenderer()
self._current_timestamp = 0.0
defplay(self, url: str) -> None:
"""播放媒体文件的统一接口"""
print(f"\n=== 开始播放: {url} ===")
# 1. 加载媒体文件
media_data = self._network.load(url)
# 2. 解码
decoded_video = self._video_decoder.decode(media_data['video'])
decoded_audio = self._audio_decoder.decode(media_data['audio'])
# 3. 解析字幕(可选)
subtitles = []
if'subtitle'in media_data:
subtitle_text = media_data['subtitle'].decode('utf-8')
subtitles = self._subtitle_parser.parse(subtitle_text)
# 4. 同步渲染
self._render_sync(decoded_video, decoded_audio, subtitles)
print(f"=== 播放完成 ===\n")
def_render_sync(self, video: bytes, audio: bytes, subtitles: list):
"""同步渲染音视频和字幕"""
# 模拟播放时间轴
for timestamp in range(0, 30, 5):
self._current_timestamp = timestamp
self._video_renderer.render(video)
self._audio_renderer.render(audio)
# 查找当前时间的字幕
current_sub = next(
(s for s in subtitles if s['start'] <= timestamp <= s['end']),
None
)
if current_sub:
self._subtitle_renderer.render([current_sub], timestamp)
defpause(self) -> None:
print(f"暂停播放 (时间: {self._current_timestamp}s)")
defresume(self) -> None:
print(f"恢复播放 (时间: {self._current_timestamp}s)")
defstop(self) -> None:
print("停止播放")
self._current_timestamp = 0
defseek(self, timestamp: float) -> None:
print(f"跳转到 {timestamp}s")
self._current_timestamp = timestamp
### 3.2 智能外观:支持多种视频源
classVideoSource:
"""视频源接口"""
defget_stream(self) -> bytes:
pass
classLocalFileSource(VideoSource):
def__init__(self, filepath: str):
self.filepath = filepath
defget_stream(self) -> bytes:
print(f"从本地文件读取: {self.filepath}")
returnb'local_video_data'
classStreamingSource(VideoSource):
def__init__(self, url: str):
self.url = url
defget_stream(self) -> bytes:
print(f"从流媒体服务器读取: {self.url}")
returnb'streaming_data'
# 增强版外观:支持多种输入源和输出格式
classEnhancedMediaFacade:
"""支持多种输入源和输出格式的媒体播放器"""
def__init__(self):
self._codecs = {
'h264': VideoDecoder(),
'aac': AudioDecoder(),
}
self._current_source = None
defset_source(self, source: VideoSource) -> None:
self._current_source = source
defplay_with_quality(self, quality: str = 'auto') -> Dict[str, Any]:
"""带质量选择的播放接口"""
if self._current_source isNone:
raise ValueError("未设置视频源")
print(f"以 {quality} 质量播放")
video_data = self._current_source.get_stream()
# 根据质量选择不同的编解码器
codec = self._codecs.get('h264')
decoded = codec.decode(video_data)
return {
'status': 'playing',
'quality': quality,
'duration': 120# 秒
}
defexport_to_file(self, output_format: str, filename: str) -> None:
"""导出功能:隐藏复杂的格式转换逻辑"""
format_converters = {
'mp4': lambda x: x,
'avi': lambda x: x + b'_avi',
'mkv': lambda x: x + b'_mkv'
}
converter = format_converters.get(output_format)
ifnot converter:
raise ValueError(f"不支持的格式: {output_format}")
print(f"导出为 {output_format} 格式到 {filename}")
# 实际转换和写入逻辑...
四、结合使用:完整的应用程序架构
"""完整的应用程序架构示例:结合适配器、代理和外观模式"""
from typing import Protocol, runtime_checkable
from dataclasses import dataclass
from enum import Enum
import json
import csv
import io
# ========== 领域模型 ==========
@dataclass
classOrder:
order_id: str
customer_name: str
amount: float
status: str
# ========== 目标接口(协议) ==========
@runtime_checkable
classOrderRepository(Protocol):
defsave(self, order: Order) -> None: ...
deffind_by_id(self, order_id: str) -> Order: ...
deffind_all(self) -> list[Order]: ...
@runtime_checkable
classPaymentProcessor(Protocol):
defprocess_payment(self, order: Order) -> bool: ...
defrefund(self, order: Order) -> bool: ...
# ========== 适配器:适配不同数据源 ==========
classCSVOrderAdapter:
"""适配CSV文件到OrderRepository接口"""
def__init__(self, filepath: str):
self.filepath = filepath
defsave(self, order: Order) -> None:
with open(self.filepath, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([order.order_id, order.customer_name, order.amount, order.status])
deffind_all(self) -> list[Order]:
orders = []
with open(self.filepath, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
orders.append(Order(
order_id=row['order_id'],
customer_name=row['customer_name'],
amount=float(row['amount']),
status=row['status']
))
return orders
deffind_by_id(self, order_id: str) -> Order:
for order in self.find_all():
if order.order_id == order_id:
return order
raise ValueError(f"Order {order_id} not found")
classJSONOrderAdapter:
"""适配JSON API到OrderRepository接口"""
def__init__(self, api_endpoint: str):
self.endpoint = api_endpoint
defsave(self, order: Order) -> None:
# 模拟API调用
print(f"POST {self.endpoint}/orders")
print(json.dumps(order.__dict__))
deffind_all(self) -> list[Order]:
# 模拟API返回
mock_data = [
{"order_id": "001", "customer_name": "Alice", "amount": 100.0, "status": "pending"},
{"order_id": "002", "customer_name": "Bob", "amount": 250.0, "status": "completed"}
]
return [Order(**data) for data in mock_data]
deffind_by_id(self, order_id: str) -> Order:
# 简化实现
return self.find_all()[0]
# ========== 代理:添加缓存和访问控制 ==========
classCachedOrderRepositoryProxy:
"""为订单仓库添加缓存功能"""
def__init__(self, real_repository: OrderRepository):
self._real = real_repository
self._cache = {}
defsave(self, order: Order) -> None:
self._real.save(order)
# 使缓存失效
self._cache.pop(order.order_id, None)
deffind_by_id(self, order_id: str) -> Order:
if order_id notin self._cache:
print(f"缓存未命中,从真实仓库获取订单 {order_id}")
self._cache[order_id] = self._real.find_by_id(order_id)
else:
print(f"缓存命中,返回订单 {order_id}")
return self._cache[order_id]
deffind_all(self) -> list[Order]:
# 不缓存全部数据
return self._real.find_all()
classSecureOrderRepositoryProxy:
"""为订单仓库添加权限控制"""
def__init__(self, real_repository: OrderRepository, user_role: str):
self._real = real_repository
self._user_role = user_role
defsave(self, order: Order) -> None:
if self._user_role notin ['admin', 'manager']:
raise PermissionError("无权限创建订单")
self._real.save(order)
deffind_all(self) -> list[Order]:
if self._user_role notin ['admin', 'manager', 'viewer']:
raise PermissionError("无权限查看订单")
return self._real.find_all()
deffind_by_id(self, order_id: str) -> Order:
if self._user_role notin ['admin', 'manager', 'viewer']:
raise PermissionError("无权限查看订单")
return self._real.find_by_id(order_id)
# ========== 外观:统一入口 ==========
classOrderServiceFacade:
"""为订单处理系统提供简化的统一接口"""
def__init__(self, repository: OrderRepository, payment_processor: PaymentProcessor):
self._repository = repository
self._payment = payment_processor
defcreate_order(self, customer_name: str, amount: float) -> Order:
"""创建订单的简化接口"""
import uuid
order = Order(
order_id=str(uuid.uuid4())[:8],
customer_name=customer_name,
amount=amount,
status='created'
)
self._repository.save(order)
print(f"订单创建成功: {order.order_id}")
return order
defcheckout(self, order_id: str) -> bool:
"""结账流程:集成了支付、状态更新等复杂逻辑"""
order = self._repository.find_by_id(order_id)
if order.status != 'created':
raise ValueError(f"订单状态不正确: {order.status}")
# 处理支付
if self._payment.process_payment(order):
order.status = 'paid'
self._repository.save(order)
print(f"订单 {order_id} 支付成功")
returnTrue
else:
order.status = 'payment_failed'
self._repository.save(order)
print(f"订单 {order_id} 支付失败")
returnFalse
defget_order_status(self, order_id: str) -> str:
"""查询订单状态"""
order = self._repository.find_by_id(order_id)
return order.status
defget_all_orders(self) -> list[Order]:
"""获取所有订单"""
return self._repository.find_all()
# ========== 实际支付处理器实现 ==========
classStripePaymentProcessor(PaymentProcessor):
defprocess_payment(self, order: Order) -> bool:
print(f"通过Stripe处理支付: ${order.amount}")
returnTrue
defrefund(self, order: Order) -> bool:
print(f"通过Stripe退款: ${order.amount}")
returnTrue
# ========== 使用示例 ==========
defmain():
# 1. 创建适配器:适配不同的数据源
csv_repo = CSVOrderAdapter('orders.csv')
json_repo = JSONOrderAdapter('https://api.example.com')
# 2. 使用代理增强功能
cached_repo = CachedOrderRepositoryProxy(csv_repo)
secure_repo = SecureOrderRepositoryProxy(cached_repo, user_role='admin')
# 3. 创建外观
payment_processor = StripePaymentProcessor()
order_service = OrderServiceFacade(secure_repo, payment_processor)
# 4. 使用统一的Facade接口
order = order_service.create_order('Alice', 99.99)
print(f"订单状态: {order_service.get_order_status(order.order_id)}")
# 支付
order_service.checkout(order.order_id)
# 查看所有订单
all_orders = order_service.get_all_orders()
print(f"总订单数: {len(all_orders)}")
# 第二次查询会从缓存获取
same_order = order_service.get_order_status(order.order_id)
print(f"最终状态: {same_order}")
if __name__ == "__main__":
main()
总结
结构型模式在Python中的应用要点:
- 适配器模式:使用
__getattr__、描述符协议或类装饰器实现灵活的适配 - 代理模式:利用属性访问控制、懒加载、AOP切面编程
- 外观模式:使用
__init__组装复杂子系统,提供简洁的公开API
这些模式的组合使用可以构建出:
在实际项目中,应该根据具体需求选择合适的模式,避免过度设计。Python的动态特性让这些模式的实现更加简洁和优雅。