引言:装饰器的魔法符号@
在Python中,@符号最常见的用途就是装饰器(Decorator)。装饰器是Python最强大、最优雅的特性之一,它允许你修改或增强函数和类的行为,而无需修改它们的源代码。
简单来说,装饰器就是一个包装器,它接收一个函数或类,然后返回一个新的、增强版的函数或类。
第一章:装饰器基础
1.1 最简单的装饰器
# 基础装饰器示例
defmy_decorator(func):
"""一个简单的装饰器"""
defwrapper():
print("函数执行前...")
func() # 调用原函数
print("函数执行后...")
return wrapper
# 使用@应用装饰器
@my_decorator
defsay_hello():
print("Hello, World!")
# 调用
say_hello()
"""
输出:
函数执行前...
Hello, World!
函数执行后...
"""
# 理解等价形式
defsay_hello():
print("Hello, World!")
# 下面这行等价于 @my_decorator
say_hello = my_decorator(say_hello)
say_hello()
1.2 装饰带参数的函数
deflog_decorator(func):
"""记录函数调用的装饰器"""
defwrapper(*args, **kwargs):# 接收任意参数
print(f"调用函数: {func.__name__}")
print(f"参数: args={args}, kwargs={kwargs}")
result = func(*args, **kwargs) # 传递参数给原函数
print(f"返回结果: {result}")
return result
return wrapper
@log_decorator
defadd(a, b):
return a + b
@log_decorator
defgreet(name, greeting="Hello"):
returnf"{greeting}, {name}!"
# 测试
print("add(3, 5):")
add(3, 5)
print("\ngreet('Alice'):")
greet('Alice')
print("\ngreet('Bob', greeting='Hi'):")
greet('Bob', greeting='Hi')
"""
输出:
add(3, 5):
调用函数: add
参数: args=(3, 5), kwargs={}
返回结果: 8
greet('Alice'):
调用函数: greet
参数: args=('Alice',), kwargs={}
返回结果: Hello, Alice!
greet('Bob', greeting='Hi'):
调用函数: greet
参数: args=('Bob',), kwargs={'greeting': 'Hi'}
返回结果: Hi, Bob!
"""
1.3 装饰器带返回值
deftiming_decorator(func):
"""计时装饰器"""
import time
defwrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs) # 保存原函数的返回值
end_time = time.perf_counter()
print(f"{func.__name__} 执行时间: {end_time - start_time:.6f}秒")
return result # 返回原函数的返回值
return wrapper
@timing_decorator
defslow_function():
import time
time.sleep(0.5)
return"完成"
@timing_decorator
defcalculate_sum(n):
"""计算1到n的和"""
return sum(range(1, n+1))
# 测试
result1 = slow_function()
print(f"slow_function 返回值: {result1}")
result2 = calculate_sum(1000000)
print(f"calculate_sum 返回值: {result2}")
第二章:使用functools.wraps保持原函数信息
2.1 为什么需要functools.wraps?
defbad_decorator(func):
defwrapper(*args, **kwargs):
"""包装函数"""
return func(*args, **kwargs)
return wrapper
defgood_decorator(func):
from functools import wraps
@wraps(func) # 关键:使用wraps装饰内部函数
defwrapper(*args, **kwargs):
"""包装函数"""
return func(*args, **kwargs)
return wrapper
@bad_decorator
defexample1():
"""示例函数1"""
pass
@good_decorator
defexample2():
"""示例函数2"""
pass
# 比较
print("不使用wraps的问题:")
print(f"example1.__name__: {example1.__name__}") # 输出: wrapper(错误!)
print(f"example1.__doc__: {example1.__doc__}") # 输出: 包装函数(错误!)
print("\n使用wraps的好处:")
print(f"example2.__name__: {example2.__name__}") # 输出: example2(正确!)
print(f"example2.__doc__: {example2.__doc__}") # 输出: 示例函数2(正确!)
# 查看函数签名
import inspect
print(f"\nexample1签名: {inspect.signature(example1)}")
print(f"example2签名: {inspect.signature(example2)}")
2.2 完整示例
from functools import wraps
import time
defdebug_decorator(func):
"""调试装饰器,保持原函数信息"""
@wraps(func)
defwrapper(*args, **kwargs):
print(f"[DEBUG] 调用 {func.__name__}()")
print(f" 位置参数: {args}")
print(f" 关键字参数: {kwargs}")
start_time = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start_time
print(f"[DEBUG] {func.__name__}() 返回: {result}")
print(f"[DEBUG] 执行时间: {elapsed:.6f}秒")
return result
return wrapper
@debug_decorator
defcalculate_bmi(weight, height, unit='metric'):
"""
计算BMI指数
Args:
weight: 体重
height: 身高
unit: 单位制 ('metric' 或 'imperial')
Returns:
BMI值
"""
if unit == 'metric':
# 公制:体重(kg) / 身高(m)^2
bmi = weight / (height ** 2)
else:
# 英制:(体重(磅) * 703) / 身高(英寸)^2
bmi = (weight * 703) / (height ** 2)
return round(bmi, 2)
# 测试
print("函数信息保持:")
print(f"函数名: {calculate_bmi.__name__}")
print(f"文档: {calculate_bmi.__doc__}")
print("\n调用函数:")
bmi = calculate_bmi(70, 1.75, unit='metric')
print(f"BMI结果: {bmi}")
第三章:带参数的装饰器
3.1 创建带参数的装饰器
defrepeat(times):
"""
重复执行函数的装饰器
Args:
times: 重复次数
"""
defdecorator(func):
@wraps(func)
defwrapper(*args, **kwargs):
results = []
for i in range(times):
print(f"第 {i+1} 次执行:")
result = func(*args, **kwargs)
results.append(result)
return results
return wrapper
return decorator
# 使用
@repeat(times=3)
defgreet(name):
returnf"Hello, {name}!"
print("带参数的装饰器:")
results = greet("Alice")
print(f"所有结果: {results}")
3.2 更复杂的带参数装饰器
defretry(max_attempts=3, delay=1, exceptions=(Exception,)):
"""
重试装饰器
Args:
max_attempts: 最大尝试次数
delay: 重试延迟(秒)
exceptions: 触发重试的异常类型
"""
defdecorator(func):
@wraps(func)
defwrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
print(f"尝试 {attempt}/{max_attempts}...")
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
print(f" 失败: {e}")
if attempt < max_attempts:
print(f" 等待 {delay} 秒后重试...")
time.sleep(delay)
# 所有尝试都失败
raise Exception(f"函数 {func.__name__} 在 {max_attempts} 次尝试后仍失败") from last_exception
return wrapper
return decorator
# 模拟不稳定的API
import random
@retry(max_attempts=5, delay=0.5, exceptions=(ConnectionError,))
defunstable_api_call():
"""模拟不稳定的API调用"""
if random.random() < 0.7: # 70%失败率
raise ConnectionError("API调用失败")
return"API调用成功"
# 测试
try:
result = unstable_api_call()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
第四章:类装饰器
4.1 类作为装饰器
classTimer:
"""计时装饰器(类版本)"""
def__init__(self, func):
self.func = func
wraps(func)(self) # 保持原函数信息
self.times = []
def__call__(self, *args, **kwargs):
"""使实例可调用"""
start_time = time.perf_counter()
result = self.func(*args, **kwargs)
elapsed = time.perf_counter() - start_time
self.times.append(elapsed)
print(f"{self.func.__name__} 执行时间: {elapsed:.6f}秒")
return result
defaverage_time(self):
"""计算平均执行时间"""
ifnot self.times:
return0
return sum(self.times) / len(self.times)
@Timer
defslow_operation():
"""模拟耗时操作"""
time.sleep(0.1)
return"操作完成"
# 测试
print("类装饰器示例:")
for i in range(3):
result = slow_operation()
print(f"\n函数名: {slow_operation.__name__}")
print(f"文档: {slow_operation.__doc__}")
print(f"平均执行时间: {slow_operation.average_time():.6f}秒")
4.2 带参数的类装饰器
classRetryClass:
"""重试装饰器(类版本,带参数)"""
def__init__(self, max_attempts=3, delay=1):
self.max_attempts = max_attempts
self.delay = delay
def__call__(self, func):
@wraps(func)
defwrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, self.max_attempts + 1):
try:
print(f"尝试 {attempt}/{self.max_attempts}...")
return func(*args, **kwargs)
except Exception as e:
last_exception = e
print(f" 失败: {e}")
if attempt < self.max_attempts:
print(f" 等待 {self.delay} 秒后重试...")
time.sleep(self.delay)
raise Exception(f"所有尝试失败") from last_exception
return wrapper
@RetryClass(max_attempts=4, delay=0.5)
defrisky_operation():
"""有风险的操作"""
if random.random() < 0.8:
raise ValueError("操作失败")
return"操作成功"
# 测试
try:
result = risky_operation()
print(f"结果: {result}")
except Exception as e:
print(f"最终错误: {e}")
第五章:装饰器链(多个装饰器)
5.1 多个装饰器的执行顺序
defdecorator1(func):
@wraps(func)
defwrapper(*args, **kwargs):
print("装饰器1 - 前")
result = func(*args, **kwargs)
print("装饰器1 - 后")
return result
return wrapper
defdecorator2(func):
@wraps(func)
defwrapper(*args, **kwargs):
print("装饰器2 - 前")
result = func(*args, **kwargs)
print("装饰器2 - 后")
return result
return wrapper
defdecorator3(func):
@wraps(func)
defwrapper(*args, **kwargs):
print("装饰器3 - 前")
result = func(*args, **kwargs)
print("装饰器3 - 后")
return result
return wrapper
@decorator1
@decorator2
@decorator3
defmy_function():
print("原始函数执行")
return"完成"
print("多个装饰器执行顺序:")
result = my_function()
print(f"最终结果: {result}")
"""
输出:
装饰器1 - 前
装饰器2 - 前
装饰器3 - 前
原始函数执行
装饰器3 - 后
装饰器2 - 后
装饰器1 - 后
最终结果: 完成
执行顺序:从上到下装饰,从下到上执行
等价于:my_function = decorator1(decorator2(decorator3(my_function)))
"""
5.2 实际应用:构建完整的API处理链
from functools import wraps
import json
defvalidate_json(func):
"""验证输入是否为JSON"""
@wraps(func)
defwrapper(data, *args, **kwargs):
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError:
raise ValueError("输入不是有效的JSON")
return func(data, *args, **kwargs)
return wrapper
deflog_request(func):
"""记录请求日志"""
@wraps(func)
defwrapper(*args, **kwargs):
print(f"[LOG] 开始处理请求: {func.__name__}")
result = func(*args, **kwargs)
print(f"[LOG] 请求处理完成: {func.__name__}")
return result
return wrapper
defhandle_exceptions(func):
"""异常处理"""
@wraps(func)
defwrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ValueError as e:
return {"error": f"验证错误: {str(e)}", "status": 400}
except Exception as e:
return {"error": f"服务器错误: {str(e)}", "status": 500}
return wrapper
defrequire_fields(*required_fields):
"""验证必需字段"""
defdecorator(func):
@wraps(func)
defwrapper(data, *args, **kwargs):
if isinstance(data, dict):
missing = [field for field in required_fields if field notin data]
if missing:
raise ValueError(f"缺少必需字段: {', '.join(missing)}")
return func(data, *args, **kwargs)
return wrapper
return decorator
# 应用多个装饰器
@log_request
@handle_exceptions
@validate_json
@require_fields('name', 'email', 'age')
defprocess_user_data(data):
"""处理用户数据"""
print(f"处理用户数据: {data}")
# 模拟处理逻辑
data['processed'] = True
data['user_id'] = hash(data['email']) % 10000
return {"status": "success", "data": data}
# 测试
print("完整的API处理链示例:")
# 测试用例1:有效数据
valid_json = '{"name": "张三", "email": "zhangsan@example.com", "age": 25}'
result1 = process_user_data(valid_json)
print(f"结果1: {json.dumps(result1, ensure_ascii=False, indent=2)}")
print("\n" + "="*50 + "\n")
# 测试用例2:缺少字段
invalid_json = '{"name": "李四", "email": "lisi@example.com"}'
result2 = process_user_data(invalid_json)
print(f"结果2: {json.dumps(result2, ensure_ascii=False, indent=2)}")
第六章:装饰类的装饰器
6.1 装饰类的方法
defadd_class_method(method_name):
"""向类添加方法的装饰器"""
defdecorator(cls):
defnew_method(self):
returnf"这是动态添加的方法: {method_name}"
setattr(cls, method_name, new_method)
return cls
return decorator
defadd_class_attribute(attr_name, attr_value):
"""向类添加属性的装饰器"""
defdecorator(cls):
setattr(cls, attr_name, attr_value)
return cls
return decorator
@add_class_method("extra_method")
@add_class_attribute("VERSION", "1.0.0")
classMyClass:
def__init__(self, name):
self.name = name
deforiginal_method(self):
returnf"原始方法: {self.name}"
# 测试
obj = MyClass("测试对象")
print("类装饰器示例:")
print(f"原始方法: {obj.original_method()}")
print(f"动态添加的方法: {obj.extra_method()}")
print(f"类属性: {MyClass.VERSION}")
6.2 单例模式装饰器
defsingleton(cls):
"""单例模式装饰器"""
instances = {}
@wraps(cls)
defget_instance(*args, **kwargs):
if cls notin instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
classDatabaseConnection:
def__init__(self):
print("创建数据库连接...")
self.connection_id = id(self)
defquery(self, sql):
returnf"执行查询: {sql}"
@singleton
classConfigManager:
def__init__(self):
print("初始化配置管理器...")
self.config = {"debug": True, "timeout": 30}
# 测试单例
print("单例模式装饰器:")
db1 = DatabaseConnection()
db2 = DatabaseConnection()
print(f"db1 id: {db1.connection_id}")
print(f"db2 id: {db2.connection_id}")
print(f"是同一个实例吗: {db1 is db2}")
config1 = ConfigManager()
config2 = ConfigManager()
print(f"config1 is config2: {config1 is config2}")
6.3 自动注册装饰器
classPluginRegistry:
"""插件注册器"""
_plugins = {}
@classmethod
defregister(cls, plugin_type):
"""注册插件的装饰器"""
defdecorator(plugin_class):
if plugin_type notin cls._plugins:
cls._plugins[plugin_type] = []
cls._plugins[plugin_type].append(plugin_class)
return plugin_class
return decorator
@classmethod
defget_plugins(cls, plugin_type):
"""获取指定类型的所有插件"""
return cls._plugins.get(plugin_type, [])
@classmethod
deflist_all_plugins(cls):
"""列出所有插件"""
return cls._plugins
# 注册插件
@PluginRegistry.register('reader')
classCSVReader:
defread(self, filepath):
returnf"读取CSV文件: {filepath}"
@PluginRegistry.register('reader')
classJSONReader:
defread(self, filepath):
returnf"读取JSON文件: {filepath}"
@PluginRegistry.register('writer')
classCSVWriter:
defwrite(self, data, filepath):
returnf"写入CSV文件: {filepath}"
@PluginRegistry.register('processor')
classDataCleaner:
defprocess(self, data):
returnf"清洗数据: {len(data)}条记录"
# 使用注册的插件
print("插件注册装饰器示例:")
print("\n所有插件类型:")
for plugin_type, plugins in PluginRegistry.list_all_plugins().items():
print(f" {plugin_type}: {[p.__name__ for p in plugins]}")
print("\n所有reader插件:")
for plugin_class in PluginRegistry.get_plugins('reader'):
plugin = plugin_class()
print(f" {plugin.read('data.csv')}")
第七章:内置装饰器
7.1 @property - 属性装饰器
classCircle:
def__init__(self, radius):
self._radius = radius # 私有属性
@property
defradius(self):
"""获取半径"""
print("获取半径值")
return self._radius
@radius.setter
defradius(self, value):
"""设置半径(带验证)"""
if value <= 0:
raise ValueError("半径必须大于0")
print(f"设置半径为: {value}")
self._radius = value
@radius.deleter
defradius(self):
"""删除半径"""
print("删除半径")
del self._radius
@property
defdiameter(self):
"""直径(只读属性)"""
return self._radius * 2
@property
defarea(self):
"""面积(只读属性)"""
import math
return math.pi * self._radius ** 2
# 使用
print("@property装饰器示例:")
circle = Circle(5)
print(f"半径: {circle.radius}") # 调用getter
print(f"直径: {circle.diameter}") # 调用只读属性
print(f"面积: {circle.area:.2f}") # 调用只读属性
circle.radius = 10# 调用setter
print(f"新面积: {circle.area:.2f}")
try:
circle.radius = -5# 会抛出异常
except ValueError as e:
print(f"错误: {e}")
# del circle.radius # 调用deleter
7.2 @classmethod - 类方法装饰器
classDate:
def__init__(self, year, month, day):
self.year = year
self.month = month
self.day = day
def__str__(self):
returnf"{self.year}-{self.month:02d}-{self.day:02d}"
@classmethod
deffrom_string(cls, date_string):
"""从字符串创建Date实例(工厂方法)"""
try:
year, month, day = map(int, date_string.split('-'))
return cls(year, month, day)
except (ValueError, AttributeError):
raise ValueError(f"无效的日期字符串: {date_string}")
@classmethod
deftoday(cls):
"""创建今天的Date实例"""
import datetime
today = datetime.date.today()
return cls(today.year, today.month, today.day)
@classmethod
defis_valid_date(cls, year, month, day):
"""验证日期是否有效"""
try:
datetime.date(year, month, day)
returnTrue
except ValueError:
returnFalse
# 使用
print("\n@classmethod装饰器示例:")
# 使用构造函数
date1 = Date(2024, 1, 15)
print(f"直接构造: {date1}")
# 使用类方法(工厂方法)
date2 = Date.from_string("2024-12-31")
print(f"从字符串创建: {date2}")
# 使用类方法(替代构造函数)
date3 = Date.today()
print(f"今天的日期: {date3}")
# 使用类方法(工具方法)
print(f"2024-02-30是否有效: {Date.is_valid_date(2024, 2, 30)}")
print(f"2024-03-15是否有效: {Date.is_valid_date(2024, 3, 15)}")
7.3 @staticmethod - 静态方法装饰器
classMathUtils:
"""数学工具类"""
@staticmethod
defadd(a, b):
"""加法"""
return a + b
@staticmethod
defmultiply(a, b):
"""乘法"""
return a * b
@staticmethod
defis_prime(n):
"""判断是否为质数"""
if n <= 1:
returnFalse
if n <= 3:
returnTrue
if n % 2 == 0or n % 3 == 0:
returnFalse
i = 5
while i * i <= n:
if n % i == 0or n % (i + 2) == 0:
returnFalse
i += 6
returnTrue
@staticmethod
deffibonacci(n):
"""计算斐波那契数列的第n项"""
if n <= 0:
return0
elif n == 1:
return1
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
classStringUtils:
"""字符串工具类"""
@staticmethod
defreverse(s):
"""反转字符串"""
return s[::-1]
@staticmethod
defis_palindrome(s):
"""判断是否为回文"""
s = ''.join(c.lower() for c in s if c.isalnum())
return s == s[::-1]
@staticmethod
defcount_words(text):
"""统计单词数"""
return len(text.split())
# 使用
print("\n@staticmethod装饰器示例:")
print("数学运算:")
print(f"5 + 3 = {MathUtils.add(5, 3)}")
print(f"5 × 3 = {MathUtils.multiply(5, 3)}")
print(f"17是质数吗: {MathUtils.is_prime(17)}")
print(f"斐波那契第10项: {MathUtils.fibonacci(10)}")
print("\n字符串操作:")
print(f"'hello'反转: {StringUtils.reverse('hello')}")
print(f"'A man a plan a canal Panama'是回文吗: {StringUtils.is_palindrome('A man a plan a canal Panama')}")
print(f"单词数统计: {StringUtils.count_words('Python is awesome!')}")
7.4 @functools模块中的装饰器
import functools
# 1. @functools.lru_cache - 缓存装饰器
print("1. @functools.lru_cache - 缓存装饰器")
@functools.lru_cache(maxsize=128)
deffibonacci(n):
"""计算斐波那契数列(带缓存)"""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 测试缓存效果
print(f"计算 fibonacci(30)...")
result1 = fibonacci(30)
print(f"fibonacci(30) = {result1}")
# 再次计算相同值会直接从缓存读取
print(f"再次计算 fibonacci(30)...")
result2 = fibonacci(30)
print(f"fibonacci(30) = {result2}")
print(f"缓存信息: {fibonacci.cache_info()}")
# 2. @functools.total_ordering - 自动生成比较方法
print("\n2. @functools.total_ordering - 自动生成比较方法")
@functools.total_ordering
classStudent:
def__init__(self, name, score):
self.name = name
self.score = score
def__eq__(self, other):
ifnot isinstance(other, Student):
returnNotImplemented
return self.score == other.score
def__lt__(self, other):
ifnot isinstance(other, Student):
returnNotImplemented
return self.score < other.score
def__repr__(self):
returnf"Student(name='{self.name}', score={self.score})"
# 自动生成 >, <=, >= 等方法
alice = Student("Alice", 90)
bob = Student("Bob", 85)
charlie = Student("Charlie", 90)
print(f"alice > bob: {alice > bob}")
print(f"alice <= bob: {alice <= bob}")
print(f"alice == charlie: {alice == charlie}")
print(f"alice >= charlie: {alice >= charlie}")
# 3. @functools.singledispatch - 函数重载
print("\n3. @functools.singledispatch - 函数重载")
@functools.singledispatch
defprocess(data):
"""处理数据的通用函数"""
raise NotImplementedError(f"不支持的数据类型: {type(data)}")
@process.register(str)
def_(data):
"""处理字符串"""
returnf"处理字符串: {data.upper()}"
@process.register(int)
def_(data):
"""处理整数"""
returnf"处理整数: {data * 2}"
@process.register(list)
def_(data):
"""处理列表"""
returnf"处理列表: {[x**2for x in data]}"
@process.register(dict)
def_(data):
"""处理字典"""
returnf"处理字典: { {k: str(v).upper() for k, v in data.items()} }"
# 测试
print(process("hello"))
print(process(42))
print(process([1, 2, 3, 4, 5]))
print(process({"name": "alice", "age": 25}))
try:
print(process(3.14)) # 浮点数未注册
except NotImplementedError as e:
print(f"错误: {e}")
第八章:装饰器的实际应用场景
8.1 Web框架中的装饰器
# Flask-like路由装饰器示例
classRouter:
def__init__(self):
self.routes = {}
defroute(self, path, methods=['GET']):
"""路由装饰器"""
defdecorator(func):
self.routes[path] = {
'function': func,
'methods': methods
}
return func
return decorator
defrun(self, path, method='GET', **kwargs):
"""执行路由"""
if path in self.routes:
route = self.routes[path]
if method in route['methods']:
return route['function'](**kwargs)
else:
returnf"错误: 不支持的方法 {method}"
else:
returnf"错误: 路径 {path} 不存在"
# 创建路由器实例
router = Router()
# 使用装饰器注册路由
@router.route('/')
defhome():
return"欢迎来到首页"
@router.route('/user/<username>', methods=['GET', 'POST'])
defuser_profile(username):
returnf"用户: {username} 的个人资料"
@router.route('/api/data', methods=['GET'])
defapi_data():
return {"status": "success", "data": [1, 2, 3, 4, 5]}
# 测试路由
print("Web路由装饰器示例:")
print(f"访问 '/': {router.run('/')}")
print(f"访问 '/user/Alice': {router.run('/user/Alice')}")
print(f"访问 '/api/data': {router.run('/api/data')}")
print(f"访问不存在的路径: {router.run('/unknown')}")
8.2 权限控制装饰器
# 用户角色权限系统
classUser:
def__init__(self, name, role):
self.name = name
self.role = role
# 角色定义
ROLES = {
'guest': 0,
'user': 1,
'admin': 2,
'superadmin': 3
}
defrequire_role(min_role):
"""要求最小角色的装饰器"""
defdecorator(func):
@functools.wraps(func)
defwrapper(user, *args, **kwargs):
if user.role notin ROLES:
raise PermissionError(f"未知角色: {user.role}")
if ROLES[user.role] < ROLES[min_role]:
raise PermissionError(
f"需要至少 {min_role} 角色,但用户是 {user.role}"
)
return func(user, *args, **kwargs)
return wrapper
return decorator
# 创建不同角色的用户
guest = User("访客", "guest")
user = User("普通用户", "user")
admin = User("管理员", "admin")
superadmin = User("超级管理员", "superadmin")
# 定义需要不同权限的函数
@require_role("guest")
defview_public_content(user):
returnf"{user.name} 可以查看公开内容"
@require_role("user")
defpost_comment(user):
returnf"{user.name} 可以发表评论"
@require_role("admin")
defdelete_content(user):
returnf"{user.name} 可以删除内容"
@require_role("superadmin")
defmanage_users(user):
returnf"{user.name} 可以管理用户"
# 测试权限控制
print("权限控制装饰器示例:")
# 测试不同用户的权限
test_cases = [
(guest, view_public_content, "guest查看公开内容"),
(guest, post_comment, "guest发表评论"),
(user, post_comment, "user发表评论"),
(admin, delete_content, "admin删除内容"),
(superadmin, manage_users, "superadmin管理用户"),
]
for user_obj, func, description in test_cases:
print(f"\n{description}:")
try:
result = func(user_obj)
print(f" 成功: {result}")
except PermissionError as e:
print(f" 失败: {e}")
8.3 缓存装饰器
import time
from functools import lru_cache
defcache_with_timeout(timeout_seconds):
"""带超时的缓存装饰器"""
defdecorator(func):
cache = {}
@functools.wraps(func)
defwrapper(*args, **kwargs):
# 创建缓存键
key = (args, frozenset(kwargs.items()))
# 检查缓存
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < timeout_seconds:
print(f"从缓存获取 {func.__name__}{args}")
return result
# 计算新结果
print(f"计算 {func.__name__}{args}")
result = func(*args, **kwargs)
# 更新缓存
cache[key] = (result, time.time())
# 清理过期缓存
current_time = time.time()
expired_keys = [
k for k, (_, t) in cache.items()
if current_time - t >= timeout_seconds
]
for k in expired_keys:
del cache[k]
return result
# 添加缓存清理方法
defclear_cache():
cache.clear()
print(f"清空 {func.__name__} 的缓存")
wrapper.clear_cache = clear_cache
wrapper.cache_info = lambda: f"缓存条目数: {len(cache)}"
return wrapper
return decorator
# 使用自定义缓存装饰器
@cache_with_timeout(timeout_seconds=2)
defexpensive_operation(x, y):
"""模拟耗时操作"""
time.sleep(0.5) # 模拟计算时间
return x * y + x + y
# 使用内置lru_cache
@lru_cache(maxsize=4)
deffactorial(n):
"""计算阶乘"""
if n <= 1:
return1
return n * factorial(n - 1)
print("缓存装饰器示例:")
print("1. 带超时的缓存:")
for i in range(5):
print(f" 调用 {i+1}: expensive_operation(3, 4) = {expensive_operation(3, 4)}")
if i == 2:
print(" 等待3秒让缓存过期...")
time.sleep(3)
print(f"\n 缓存信息: {expensive_operation.cache_info()}")
print("\n2. LRU缓存 (最近最少使用):")
for n in [5, 3, 5, 7, 3, 5, 10, 3]:
print(f" factorial({n}) = {factorial(n)}")
print(f" 缓存信息: {factorial.cache_info()}")
factorial.cache_clear()
print(f" 清空后缓存信息: {factorial.cache_info()}")
第九章:装饰器的调试和测试
9.1 调试装饰器
defdebug_decorator(func):
"""调试装饰器,打印详细信息"""
@functools.wraps(func)
defwrapper(*args, **kwargs):
print("\n" + "="*50)
print(f"[DEBUG] 调用函数: {func.__name__}")
print(f"[DEBUG] 定义位置: {func.__module__}.{func.__qualname__}")
print(f"[DEBUG] 文档: {func.__doc__}")
print(f"[DEBUG] 参数: args={args}, kwargs={kwargs}")
import traceback
print("[DEBUG] 调用栈:")
for line in traceback.format_stack()[:-1]:
print(f" {line.strip()}")
try:
result = func(*args, **kwargs)
print(f"[DEBUG] 返回值: {result}")
print("="*50)
return result
except Exception as e:
print(f"[DEBUG] 异常: {type(e).__name__}: {e}")
print("="*50)
raise
return wrapper
# 测试装饰器
@debug_decorator
defcalculate(a, b, operation='add'):
"""
执行数学运算
Args:
a: 第一个数
b: 第二个数
operation: 运算类型 ('add', 'subtract', 'multiply', 'divide')
Returns:
运算结果
"""
operations = {
'add': lambda x, y: x + y,
'subtract': lambda x, y: x - y,
'multiply': lambda x, y: x * y,
'divide': lambda x, y: x / y if y != 0else float('inf')
}
if operation notin operations:
raise ValueError(f"未知的运算: {operation}")
return operations[operation](a, b)
# 运行测试
print("装饰器调试示例:")
result = calculate(10, 5, operation='multiply')
print(f"结果: {result}")
9.2 测试装饰器
import unittest
# 要测试的装饰器
defadd_logging(func):
"""添加日志的装饰器"""
@functools.wraps(func)
defwrapper(*args, **kwargs):
print(f"LOG: 调用 {func.__name__} 参数: {args}, {kwargs}")
return func(*args, **kwargs)
return wrapper
# 使用装饰器的函数
@add_logging
defadd_numbers(a, b):
"""两个数相加"""
return a + b
@add_logging
defgreet(name, greeting="Hello"):
"""打招呼"""
returnf"{greeting}, {name}!"
# 测试装饰器的单元测试
classTestDecorators(unittest.TestCase):
deftest_decorator_preserves_function_info(self):
"""测试装饰器是否保持原函数信息"""
self.assertEqual(add_numbers.__name__, "add_numbers")
self.assertEqual(add_numbers.__doc__, "两个数相加")
deftest_decorated_function_works(self):
"""测试装饰后的函数是否正常工作"""
self.assertEqual(add_numbers(2, 3), 5)
self.assertEqual(add_numbers(-1, 1), 0)
deftest_decorator_with_kwargs(self):
"""测试装饰器处理关键字参数"""
self.assertEqual(greet("Alice"), "Hello, Alice!")
self.assertEqual(greet("Bob", greeting="Hi"), "Hi, Bob!")
deftest_decorator_does_not_modify_behavior(self):
"""测试装饰器不改变函数的核心行为"""
# 创建一个简单的函数
deforiginal(x):
return x * 2
# 应用装饰器
decorated = add_logging(original)
# 测试行为是否一致
for i in range(10):
self.assertEqual(original(i), decorated(i))
# 运行测试
print("装饰器测试示例:")
suite = unittest.TestLoader().loadTestsFromTestCase(TestDecorators)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
第十章:装饰器最佳实践
10.1 装饰器设计原则
"""
装饰器设计最佳实践:
1. 使用 functools.wraps 保持原函数信息
2. 处理所有参数 (*args, **kwargs)
3. 考虑装饰器的可组合性
4. 避免在装饰器中修改全局状态
5. 提供清晰的文档和类型提示
6. 考虑性能影响
7. 正确处理异常
"""
from functools import wraps
from typing import Callable, Any, TypeVar
import time
T = TypeVar('T')
# 良好的装饰器设计示例
defbenchmark(func: Callable[..., T]) -> Callable[..., T]:
"""
基准测试装饰器
测量函数执行时间并打印结果。
保持原函数的所有信息。
Args:
func: 要测试的函数
Returns:
装饰后的函数
"""
@wraps(func)
defwrapper(*args: Any, **kwargs: Any) -> T:
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
end_time = time.perf_counter()
elapsed = end_time - start_time
print(f"{func.__name__} 执行时间: {elapsed:.6f}秒")
return wrapper
defvalidate_arguments(*validators: Callable) -> Callable:
"""
参数验证装饰器工厂
可以接受多个验证函数,按顺序执行验证。
Args:
*validators: 验证函数列表
Returns:
装饰器函数
"""
defdecorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
defwrapper(*args: Any, **kwargs: Any) -> T:
# 执行所有验证器
for i, validator in enumerate(validators):
try:
validator(*args, **kwargs)
except Exception as e:
raise ValueError(
f"参数验证失败(验证器 {i+1}): {e}"
) from e
# 所有验证通过,执行原函数
return func(*args, **kwargs)
return wrapper
return decorator
# 定义一些验证函数
defvalidate_positive_numbers(*args, **kwargs):
"""验证所有数值参数为正数"""
for arg in args:
if isinstance(arg, (int, float)) and arg <= 0:
raise ValueError(f"参数必须为正数,得到: {arg}")
defvalidate_string_not_empty(*args, **kwargs):
"""验证所有字符串参数非空"""
for arg in args:
if isinstance(arg, str) andnot arg.strip():
raise ValueError("字符串参数不能为空")
# 使用良好的装饰器
@benchmark
@validate_arguments(validate_positive_numbers, validate_string_not_empty)
defprocess_data(value: float, name: str) -> str:
"""处理数据"""
time.sleep(0.1) # 模拟处理时间
returnf"处理 {name}: {value * 2}"
# 测试
print("装饰器最佳实践示例:")
try:
# 有效调用
result = process_data(10.5, "测试数据")
print(f"成功: {result}")
# 无效调用 - 负数
result = process_data(-5, "测试")
except ValueError as e:
print(f"验证失败: {e}")
try:
# 无效调用 - 空字符串
result = process_data(5, "")
except ValueError as e:
print(f"验证失败: {e}")
总结:装饰器的力量
装饰器是Python中极其强大的工具,它们让你能够:
记住:
@decorator 只是语法糖:func = decorator(func)- 使用
@functools.wraps 保持函数信息
关注公众号,后台回复“资料领取”或点击“资料领取”菜单即可免费获取“软件测试”、“Python开发”相关资料~