1. Python内置装饰器概述
Python标准库提供了多个实用的内置装饰器,它们位于标准库的不同模块中。这些装饰器大大简化了常见编程任务。
# 主要内置装饰器概览
import functools
import time
# 1. 类相关的装饰器
# @staticmethod, @classmethod, @property, @<property>.setter, @<property>.deleter
# 2. 函数工具装饰器
# @functools.lru_cache, @functools.total_ordering, @functools.singledispatch
# 3. 上下文管理装饰器
# @contextlib.contextmanager
# 4. 异步相关装饰器
# @asyncio.coroutine (Python 3.7之前)
# 5. 其他
# @typing.final, @typing.overload
2. 类相关装饰器
2.1 @staticmethod - 静态方法
静态方法是不访问实例属性,也不访问类属性的普通函数,只是逻辑上属于这个类。它既没有 self 参数,也没有 cls 参数。
基本语法
classMathOperations:
# 静态方法不需要访问实例或类
@staticmethod
defadd(a, b):
"""静态方法:与类相关但不依赖实例或类状态"""
return a + b
@staticmethod
deffactorial(n):
"""计算阶乘"""
if n <= 1:
return1
return n * MathOperations.factorial(n - 1)
# 使用静态方法
print(MathOperations.add(5, 3)) # 8,通过类调用
print(MathOperations.factorial(5)) # 120
# 也可以通过实例调用,但不推荐
obj = MathOperations()
print(obj.add(2, 3)) # 5
# 对比实例方法
classMathOperationsWithInstance:
def__init__(self, multiplier=1):
self.multiplier = multiplier
# 实例方法
defmultiply(self, x):
"""需要访问实例属性"""
return x * self.multiplier
# 静态方法
@staticmethod
defstatic_multiply(a, b):
"""不需要访问实例或类属性"""
return a * b
obj = MathOperationsWithInstance(2)
print(obj.multiply(5)) # 10,需要实例
print(obj.static_multiply(3, 4)) # 12,不需要实例
print(MathOperationsWithInstance.static_multiply(3, 4)) # 12,直接通过类调用
与实例方法和类方法的区别
classCalculator:
definstance_method(self):
"""实例方法:第一个参数是 self"""
returnf"实例方法,访问实例属性:{self.value}"
@classmethod
defclass_method(cls):
"""类方法:第一个参数是 cls"""
returnf"类方法,可以访问类属性:{cls.PI}"
@staticmethod
defstatic_method(x, y):
"""静态方法:没有 self 或 cls 参数"""
return x + y
使用场景
1. 工具函数
classMathUtils:
@staticmethod
defadd(x, y):
return x + y
@staticmethod
defmultiply(x, y):
return x * y
# 调用方式
print(MathUtils.add(5, 3)) # 8
print(MathUtils.multiply(2, 4)) # 8
# 也可以实例化后调用
calc = MathUtils()
print(calc.add(2, 3)) # 5
2. 工厂方法
classUser:
def__init__(self, username, role):
self.username = username
self.role = role
@staticmethod
defcreate_admin(username):
return User(username, "admin")
@staticmethod
defcreate_guest(username):
return User(username, "guest")
# 使用静态方法创建对象
admin = User.create_admin("alice")
guest = User.create_guest("bob")
3. 验证函数
classValidator:
@staticmethod
defis_valid_email(email):
import re
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return bool(re.match(pattern, email))
@staticmethod
defis_valid_phone(phone):
return phone.isdigit() and len(phone) == 11
print(Validator.is_valid_email("test@example.com")) # True
print(Validator.is_valid_phone("13800138000")) # True
注意事项
1. 访问限制
classMyClass:
class_attr = "类属性"
def__init__(self):
self.instance_attr = "实例属性"
@staticmethod
defstatic_method():
# 不能直接访问实例属性或类属性
# print(self.instance_attr) # 错误!
# print(cls.class_attr) # 错误!
print("这是一个静态方法")
2. 继承行为
classParent:
@staticmethod
defstatic_method():
return"父类的静态方法"
classChild(Parent):
pass
print(Parent.static_method()) # "父类的静态方法"
print(Child.static_method()) # "父类的静态方法" - 可以继承
# 子类可以覆盖
classChild2(Parent):
@staticmethod
defstatic_method():
return"子类覆盖的静态方法"
print(Child2.static_method()) # "子类覆盖的静态方法"
何时使用静态方法?
与类方法的比较
classTemperatureConverter:
BASE_UNIT = "Celsius"
@staticmethod
defcelsius_to_fahrenheit(c):
"""静态方法:不访问类属性"""
return (c * 9/5) + 32
@classmethod
defget_base_unit(cls):
"""类方法:可以访问类属性"""
return cls.BASE_UNIT
# 使用
print(TemperatureConverter.celsius_to_fahrenheit(100)) # 212.0
print(TemperatureConverter.get_base_unit()) # Celsius
@staticmethod 用于定义那些逻辑上属于类,但不需要访问类或实例状态的函数。它让代码组织更加清晰,提高了代码的可读性和可维护性。
2.2 @classmethod - 类方法
@classmethod 用于定义类方法,类方法的第一个参数是 cls(表示类本身),可以访问和修改类属性。
基本语法
classPerson:
population = 0# 类属性
def__init__(self, name, age):
self.name = name
self.age = age
Person.population += 1
@classmethod
defget_population(cls):
"""类方法:操作类属性,第一个参数是类本身"""
returnf"当前人口:{cls.population}"
@classmethod
defcreate_from_birth_year(cls, name, birth_year):
"""工厂方法:使用类方法作为替代构造函数"""
from datetime import datetime
current_year = datetime.now().year
age = current_year - birth_year
return cls(name, age) # 使用cls创建新实例
@classmethod
defcreate_adult(cls, name):
"""创建特定类型的实例"""
return cls(name, 18)
# 使用类方法
p1 = Person("Alice", 25)
p2 = Person("Bob", 30)
print(Person.get_population()) # 当前人口:2
# 使用类方法作为工厂
p3 = Person.create_from_birth_year("Charlie", 1995)
print(f"{p3.name} 年龄:{p3.age}") # Charlie 年龄:(根据当前年份计算)
p4 = Person.create_adult("David")
print(f"{p4.name} 年龄:{p4.age}") # David 年龄:18
# 继承中的类方法
classStudent(Person):
student_count = 0
def__init__(self, name, age, student_id):
super().__init__(name, age)
self.student_id = student_id
Student.student_count += 1
@classmethod
defget_student_count(cls):
returnf"学生人数:{cls.student_count}"
s1 = Student("Eve", 20, "S001")
print(Student.get_population()) # 当前人口:3,继承的类方法
print(Student.get_student_count()) # 学生人数:1,子类自己的类方法
主要特点
主要应用场景
1. 替代构造函数(创建对象的不同方式)
classDate:
def__init__(self, year, month, day):
self.year = year
self.month = month
self.day = day
@classmethod
deffrom_string(cls, date_string):
"""从字符串创建 Date 对象"""
year, month, day = map(int, date_string.split('-'))
return cls(year, month, day) # 创建新实例
@classmethod
deffrom_timestamp(cls, timestamp):
"""从时间戳创建 Date 对象"""
import datetime
dt = datetime.datetime.fromtimestamp(timestamp)
return cls(dt.year, dt.month, dt.day)
@classmethod
deftoday(cls):
"""创建今天的 Date 对象"""
import datetime
today = datetime.date.today()
return cls(today.year, today.month, today.day)
# 使用不同的类方法创建对象
d1 = Date(2024, 12, 25) # 常规方式
d2 = Date.from_string("2024-12-25") # 从字符串
d3 = Date.from_timestamp(1703452800) # 从时间戳
d4 = Date.today() # 今天日期
print(f"d2: {d2.year}-{d2.month}-{d2.day}") # d2: 2024-12-25
2. 工厂模式
classShape:
def__init__(self, name):
self.name = name
@classmethod
defcreate_circle(cls, radius):
"""创建圆形"""
circle = cls(f"Circle-r{radius}")
circle.radius = radius
circle.area = 3.14 * radius ** 2
return circle
@classmethod
defcreate_rectangle(cls, width, height):
"""创建矩形"""
rect = cls(f"Rectangle-{width}x{height}")
rect.width = width
rect.height = height
rect.area = width * height
return rect
@classmethod
defcreate_square(cls, side):
"""创建正方形"""
return cls.create_rectangle(side, side)
# 使用工厂方法
circle = Shape.create_circle(5)
rect = Shape.create_rectangle(4, 6)
square = Shape.create_square(3)
print(f"{circle.name}, 面积: {circle.area}") # Circle-r5, 面积: 78.5
print(f"{rect.name}, 面积: {rect.area}") # Rectangle-4x6, 面积: 24
3. 单例模式
classDatabaseConnection:
_instance = None# 类属性存储唯一实例
def__init__(self, connection_string):
self.connection_string = connection_string
self.connected = False
defconnect(self):
self.connected = True
returnf"连接到: {self.connection_string}"
@classmethod
defget_instance(cls, connection_string=None):
"""获取数据库连接的单例"""
if cls._instance isNone:
if connection_string isNone:
raise ValueError("首次调用需要提供连接字符串")
cls._instance = cls(connection_string)
return cls._instance
# 使用示例
# 第一次创建需要参数
db1 = DatabaseConnection.get_instance("mysql://localhost:3306")
print(db1.connect()) # 连接到: mysql://localhost:3306
# 后续调用不需要参数,返回同一个实例
db2 = DatabaseConnection.get_instance()
print(db1 is db2) # True,是同一个对象
4. 类方法链式调用
classCalculator:
def__init__(self, value=0):
self.value = value
@classmethod
defstart(cls, initial_value=0):
"""创建计算器并设置初始值"""
return cls(initial_value)
defadd(self, x):
self.value += x
return self # 返回 self 支持链式调用
defmultiply(self, x):
self.value *= x
return self
defget_value(self):
return self.value
# 链式调用
result = Calculator.start(10).add(5).multiply(2).add(3).get_value()
print(result) # (10 + 5) * 2 + 3 = 33
继承中的类方法
classAnimal:
category = "生物"
@classmethod
defget_category(cls):
return cls.category
@classmethod
defcreate(cls, name):
"""工厂方法,被子类继承"""
print(f"在 {cls.__name__} 类中创建")
return cls(name)
classDog(Animal):
category = "犬科"
def__init__(self, name):
self.name = name
classCat(Animal):
category = "猫科"
def__init__(self, name):
self.name = name
# 使用示例
animal = Animal.create("Generic")
print(animal.get_category()) # 生物
dog = Dog.create("Buddy") # 在 Dog 类中创建
print(dog.get_category()) # 犬科
print(type(dog)) # <class '__main__.Dog'>
cat = Cat.create("Mimi") # 在 Cat 类中创建
print(cat.get_category()) # 猫科
应用示例:数据验证与处理
classProduct:
products = [] # 类属性存储所有产品
def__init__(self, name, price):
self.name = name
self.price = price
Product.products.append(self)
@classmethod
defget_total_value(cls):
"""计算所有产品的总价值"""
return sum(p.price for p in cls.products)
@classmethod
defget_average_price(cls):
"""计算平均价格"""
ifnot cls.products:
return0
return cls.get_total_value() / len(cls.products)
@classmethod
deffind_by_price_range(cls, min_price, max_price):
"""根据价格范围查找产品"""
return [p for p in cls.products if min_price <= p.price <= max_price]
@classmethod
defload_from_csv(cls, filepath):
"""从CSV文件加载产品"""
import csv
products = []
with open(filepath, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
product = cls(
name=row['name'],
price=float(row['price'])
)
products.append(product)
return products
# 使用示例
p1 = Product("Laptop", 1200)
p2 = Product("Mouse", 25)
p3 = Product("Keyboard", 80)
print(f"总价值: ${Product.get_total_value()}") # 总价值: $1305
print(f"平均价格: ${Product.get_average_price():.2f}") # 平均价格: $435.00
expensive_products = Product.find_by_price_range(100, 1500)
for p in expensive_products:
print(f"{p.name}: ${p.price}") # Laptop: $1200
何时使用 @classmethod
总结
2.3 @property 和相关装饰器
@property 装饰器用于将类的方法转换为属性,使得可以像访问属性一样调用方法,同时提供更好的数据封装和验证机制。
完整的属性通常包含三个方法:
@属性名.setter - setter(设置属性)@属性名.deleter - deleter(删除属性)
基础用法
classTemperature:
def__init__(self, celsius=0):
self._celsius = celsius # 私有属性
@property
defcelsius(self):
"""getter方法:将方法作为属性访问"""
print("获取摄氏度值")
return self._celsius
@celsius.setter
defcelsius(self, value):
"""setter方法:验证并设置属性值"""
print(f"设置摄氏度值为: {value}")
if value < -273.15:
raise ValueError("温度不能低于绝对零度(-273.15°C)")
self._celsius = value
@celsius.deleter
defcelsius(self):
"""deleter方法:删除属性时的逻辑"""
print("删除温度值,重置为0")
self._celsius = 0
@property
deffahrenheit(self):
"""只读属性:计算华氏温度"""
return self._celsius * 9/5 + 32
@property
defkelvin(self):
"""只读属性:计算开尔文温度"""
return self._celsius + 273.15
@property
defdescription(self):
"""根据温度返回描述"""
if self._celsius < 0:
return"寒冷"
elif self._celsius < 20:
return"凉爽"
elif self._celsius < 30:
return"温暖"
else:
return"炎热"
# 使用属性装饰器
temp = Temperature(25)
# 访问属性(调用getter)
print(f"摄氏度: {temp.celsius}") # 摄氏度: 25
print(f"华氏度: {temp.fahrenheit:.2f}") # 华氏度: 77.00
print(f"开尔文: {temp.kelvin:.2f}") # 开尔文: 298.15
print(f"描述: {temp.description}") # 描述: 温暖
# 设置属性(调用setter)
temp.celsius = 30
print(f"新的摄氏度: {temp.celsius}") # 新的摄氏度: 30
# 尝试设置无效值
try:
temp.celsius = -300
except ValueError as e:
print(f"错误: {e}") # 错误: 温度不能低于绝对零度(-273.15°C)
# 删除属性(调用deleter)
del temp.celsius
print(f"删除后的摄氏度: {temp.celsius}") # 删除后的摄氏度: 0
# 只读属性测试
try:
temp.fahrenheit = 100# 会失败,因为没有setter
except AttributeError as e:
print(f"错误: {e}") # 错误: property 'fahrenheit' of 'Temperature' object has no setter
应用场景
1. 数据验证和转换
classTemperature:
def__init__(self, celsius):
self._celsius = celsius
@property
defcelsius(self):
return self._celsius
@celsius.setter
defcelsius(self, value):
if value < -273.15:
raise ValueError("温度不能低于绝对零度")
self._celsius = value
@property
deffahrenheit(self):
"""华氏温度(只读)"""
return self._celsius * 9/5 + 32
@fahrenheit.setter
deffahrenheit(self, value):
"""通过华氏温度设置摄氏温度"""
self._celsius = (value - 32) * 5/9
@property
defkelvin(self):
"""开尔文温度(只读)"""
return self._celsius + 273.15
@kelvin.setter
defkelvin(self, value):
"""通过开尔文温度设置摄氏温度"""
if value < 0:
raise ValueError("开尔文温度不能为负数")
self._celsius = value - 273.15
# 使用
temp = Temperature(25)
print(f"摄氏: {temp.celsius}°C") # 摄氏: 25°C
print(f"华氏: {temp.fahrenheit}°F") # 华氏: 77.0°F
print(f"开尔文: {temp.kelvin}K") # 开尔文: 298.15K
temp.fahrenheit = 100# 设置华氏温度
print(f"摄氏: {temp.celsius:.2f}°C") # 摄氏: 37.78°C
2. 属性别名和兼容性
classRectangle:
def__init__(self, width, height):
self.width = width
self.height = height
# 为旧代码提供兼容性
@property
defsize(self):
"""旧属性名(已废弃)"""
import warnings
warnings.warn("size属性已废弃,请使用width和height", DeprecationWarning)
return (self.width, self.height)
@size.setter
defsize(self, value):
"""支持旧代码设置size"""
import warnings
warnings.warn("size属性已废弃,请使用width和height", DeprecationWarning)
self.width, self.height = value
@property
defarea(self):
"""面积(只读)"""
return self.width * self.height
@property
defperimeter(self):
"""周长(只读)"""
return2 * (self.width + self.height)
# 使用
rect = Rectangle(3, 4)
print(f"面积: {rect.area}") # 面积: 12
print(f"周长: {rect.perimeter}") # 周长: 14
# rect.size # 会显示废弃警告
3. 延迟计算和缓存
import time
classExpensiveComputation:
def__init__(self, data):
self._data = data
self._result_cache = None
self._computation_time = None
@property
defresult(self):
"""延迟计算并缓存结果"""
if self._result_cache isNone:
print("执行昂贵计算...")
start_time = time.time()
# 模拟耗时计算
time.sleep(1)
self._result_cache = sum(self._data) ** 2
self._computation_time = time.time() - start_time
print(f"计算完成,耗时: {self._computation_time:.2f}秒")
return self._result_cache
@property
defcomputation_time(self):
"""获取计算时间"""
return self._computation_time
# 使用
data = list(range(1000000))
calc = ExpensiveComputation(data)
# 第一次访问会进行计算
print(f"结果: {calc.result}")
# 输出: 执行昂贵计算... 计算完成,耗时: 1.00秒 结果: 249999500000250000000000
# 第二次访问直接使用缓存
print(f"结果: {calc.result}") # 立即返回,不再计算
print(f"计算耗时: {calc.computation_time:.2f}秒")
4. 属性依赖和联动
classBankAccount:
def__init__(self, balance=0, currency="CNY"):
self._balance = balance
self._currency = currency
self._conversion_rates = {
"CNY": 1.0,
"USD": 0.14,
"EUR": 0.13,
"JPY": 20.0
}
@property
defbalance(self):
return self._balance
@property
defcurrency(self):
return self._currency
@currency.setter
defcurrency(self, new_currency):
"""更换货币时自动转换余额"""
if new_currency notin self._conversion_rates:
raise ValueError(f"不支持货币: {new_currency}")
# 转换余额
old_rate = self._conversion_rates[self._currency]
new_rate = self._conversion_rates[new_currency]
self._balance = self._balance * new_rate / old_rate
self._currency = new_currency
@property
defbalance_in_cny(self):
"""余额换算为人民币"""
rate = self._conversion_rates[self._currency]
return self._balance / rate
defdeposit(self, amount):
if amount <= 0:
raise ValueError("存款金额必须为正数")
self._balance += amount
defwithdraw(self, amount):
if amount > self._balance:
raise ValueError("余额不足")
self._balance -= amount
# 使用
account = BankAccount(10000, "CNY")
print(f"余额: {account.balance}{account.currency}") # 余额: 10000 CNY
account.currency = "USD"
print(f"余额: {account.balance:.2f}{account.currency}") # 余额: 1400.00 USD
print(f"人民币价值: {account.balance_in_cny:.2f} CNY") # 人民币价值: 10000.00 CNY
相关的内置装饰器
@classmethod + @property
classConfiguration:
_instance = None
def__init__(self):
self.settings = {}
@classmethod
@property
definstance(cls):
"""类属性的getter"""
if cls._instance isNone:
cls._instance = cls()
return cls._instance
@classmethod
@property
defversion(cls):
"""类属性(只读)"""
return"1.0.0"
# 使用
config = Configuration.instance
config.settings["debug"] = True
print(Configuration.version) # 1.0.0
@property 与继承
classAnimal:
def__init__(self, name):
self._name = name
@property
defname(self):
return self._name
@name.setter
defname(self, value):
ifnot value:
raise ValueError("名字不能为空")
self._name = value
@property
defsound(self):
raise NotImplementedError("子类必须实现sound属性")
classDog(Animal):
@property
defsound(self):
return"汪汪"
@property
defspecies(self):
return"犬科"
classCat(Animal):
@property
defsound(self):
return"喵喵"
@property
defspecies(self):
return"猫科"
# 使用
dog = Dog("旺财")
print(f"{dog.name}: {dog.sound}") # 旺财: 汪汪
print(f"种类: {dog.species}") # 种类: 犬科
cat = Cat("咪咪")
cat.name = "小白"
print(f"{cat.name}: {cat.sound}") # 小白: 喵喵
属性描述符
classValidatedAttribute:
"""自定义属性描述符"""
def__init__(self, min_value=None, max_value=None):
self.min_value = min_value
self.max_value = max_value
self.storage_name = None
def__set_name__(self, owner, name):
self.storage_name = f"_{name}"
def__get__(self, instance, owner):
if instance isNone:
return self
return getattr(instance, self.storage_name, None)
def__set__(self, instance, value):
if self.min_value isnotNoneand value < self.min_value:
raise ValueError(f"值不能小于 {self.min_value}")
if self.max_value isnotNoneand value > self.max_value:
raise ValueError(f"值不能大于 {self.max_value}")
setattr(instance, self.storage_name, value)
classProduct:
# 使用自定义描述符
price = ValidatedAttribute(min_value=0, max_value=10000)
quantity = ValidatedAttribute(min_value=0, max_value=1000)
def__init__(self, name, price, quantity):
self.name = name
self.price = price # 使用描述符验证
self.quantity = quantity # 使用描述符验证
@property
deftotal_value(self):
return self.price * self.quantity
# 使用
try:
p = Product("手机", 5000, 100)
print(f"总价值: {p.total_value}") # 总价值: 500000
p.price = 6000# 正常
# p.price = -100 # ValueError: 值不能小于 0
# p.price = 20000 # ValueError: 值不能大于 10000
except ValueError as e:
print(f"错误: {e}")
@property 装饰器的优势:
2.4 组合应用示例:数据验证
classBankAccount:
def__init__(self, account_holder, initial_balance=0):
self._account_holder = account_holder
self._balance = initial_balance
self._transaction_history = []
@property
defaccount_holder(self):
return self._account_holder
@account_holder.setter
defaccount_holder(self, value):
ifnot value ornot isinstance(value, str):
raise ValueError("账户持有人必须是非空字符串")
self._account_holder = value
@property
defbalance(self):
return self._balance
@property
deftransaction_history(self):
"""只读属性:返回交易历史的副本"""
return self._transaction_history.copy()
defdeposit(self, amount):
if amount <= 0:
raise ValueError("存款金额必须为正数")
self._balance += amount
self._transaction_history.append(f"存款: +${amount:.2f}")
return self._balance
defwithdraw(self, amount):
if amount <= 0:
raise ValueError("取款金额必须为正数")
if amount > self._balance:
raise ValueError("余额不足")
self._balance -= amount
self._transaction_history.append(f"取款: -${amount:.2f}")
return self._balance
# 使用示例
account = BankAccount("张三", 1000)
print(f"账户: {account.account_holder}, 余额: ${account.balance}")
account.deposit(500)
account.withdraw(200)
print(f"余额: ${account.balance}")
print("交易记录:", account.transaction_history)
# 修改账户持有人
account.account_holder = "李四"
print(f"新账户持有人: {account.account_holder}")
# 尝试无效操作
try:
account.account_holder = ""# 空字符串
except ValueError as e:
print(f"错误: {e}")
try:
account.balance = 1000# 不能直接设置余额
except AttributeError as e:
print(f"错误: {e}")
3. functools模块装饰器
3.1 @functools.lru_cache - 缓存装饰器
@functools.lru_cache 是 Python 标准库中的装饰器,用于实现 最近最少使用(Least Recently Used)缓存。它可以自动缓存函数的结果,当相同的参数再次调用时直接返回缓存结果,避免重复计算。
基本用法
import functools
import time
# 基础用法
@functools.lru_cache(maxsize=128)
deffibonacci(n):
"""计算斐波那契数列,使用缓存优化"""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# 测试性能
start = time.time()
result = fibonacci(35)
end = time.time()
print(f"fibonacci(35) = {result}, 时间: {end - start:.4f}秒")
# 对比无缓存的版本
deffibonacci_no_cache(n):
if n < 2:
return n
return fibonacci_no_cache(n - 1) + fibonacci_no_cache(n - 2)
start = time.time()
result = fibonacci_no_cache(35)
end = time.time()
print(f"无缓存 fibonacci(35) = {result}, 时间: {end - start:.4f}秒")
# 带参数的缓存
@functools.lru_cache(maxsize=32)
defexpensive_computation(x, y, *, verbose=False):
"""模拟耗时计算"""
time.sleep(0.5)
result = x * y
if verbose:
print(f"计算 {x} * {y} = {result}")
return result
# 第一次计算会耗时
print("第一次计算:")
result1 = expensive_computation(5, 3, verbose=True)
# 第二次相同参数的计算会使用缓存
print("\n第二次计算(相同参数):")
result2 = expensive_computation(5, 3, verbose=True)
# 不同参数的重新计算
print("\n第三次计算(不同参数):")
result3 = expensive_computation(6, 4, verbose=True)
# 查看缓存统计
print(f"\n缓存统计: {expensive_computation.cache_info()}")
# 输出: CacheInfo(hits=1, misses=2, maxsize=32, currsize=2)
# 清空缓存
expensive_computation.cache_clear()
print(f"清空后缓存统计: {expensive_computation.cache_info()}")
参数详解
maxsize 参数
import functools
# 无限制缓存
@functools.lru_cache(maxsize=None)
defuncached_function(x):
print(f"计算: {x}")
return x * x
# 固定大小缓存(LRU策略)
@functools.lru_cache(maxsize=3)
defcached_function(x):
print(f"计算: {x}")
return x * x
print("=== 无限制缓存 ===")
print(uncached_function(1)) # 计算: 1, 返回 1
print(uncached_function(1)) # 直接返回 1
print(uncached_function(2)) # 计算: 2, 返回 4
print(uncached_function(2)) # 直接返回 4
print("\n=== 固定大小缓存 (maxsize=3) ===")
for i in [1, 2, 3, 4, 1, 2, 3]:
print(f"cached_function({i}) = {cached_function(i)}")
# 输出:
# 计算: 1
# cached_function(1) = 1
# 计算: 2
# cached_function(2) = 4
# 计算: 3
# cached_function(3) = 9
# 计算: 4 (缓存已满,移除最旧的1)
# cached_function(4) = 16
# 计算: 1 (1已被移除,重新计算)
# cached_function(1) = 1
# cached_function(2) = 4 (2还在缓存中)
# cached_function(3) = 9 (3还在缓存中)
typed 参数
import functools
# typed=False(默认):不同类型但值相等的参数被视为相同
@functools.lru_cache(maxsize=2, typed=False)
defuntyped_cache(x):
print(f"计算: {x} (类型: {type(x).__name__})")
return x
# typed=True:不同类型视为不同参数
@functools.lru_cache(maxsize=2, typed=True)
deftyped_cache(x):
print(f"计算: {x} (类型: {type(x).__name__})")
return x
print("=== typed=False (默认) ===")
print(untyped_cache(1)) # 计算: 1 (类型: int)
print(untyped_cache(1.0)) # 直接返回 1 (1 == 1.0)
print("\n=== typed=True ===")
print(typed_cache(1)) # 计算: 1 (类型: int)
print(typed_cache(1.0)) # 计算: 1.0 (类型: float) 视为不同参数
应用场景
1. 昂贵的计算函数
import functools
import time
import math
@functools.lru_cache(maxsize=128)
defexpensive_calculation(x, y):
"""模拟耗时计算"""
print(f"计算 {x} 和 {y} 的复杂运算...")
time.sleep(1) # 模拟耗时操作
result = math.sqrt(x) * math.exp(y) + math.sin(x * y)
return result
# 使用缓存
start = time.time()
print(f"结果1: {expensive_calculation(2, 3):.4f}") # 第一次计算,耗时1秒
print(f"结果2: {expensive_calculation(4, 5):.4f}") # 第一次计算,耗时1秒
print(f"结果3: {expensive_calculation(2, 3):.4f}") # 从缓存获取,立即返回
print(f"总耗时: {time.time() - start:.2f}秒") # 约2秒,不是3秒
2. 数据获取和API调用
import functools
import requests
import time
classWeatherAPI:
def__init__(self):
self.call_count = 0
@functools.lru_cache(maxsize=32, typed=True)
defget_weather(self, city, country="CN", units="metric"):
"""获取天气信息,带缓存"""
self.call_count += 1
print(f"API调用 #{self.call_count}: {city}, {country}")
# 模拟API调用(实际中替换为真实API)
time.sleep(0.5)
# 模拟返回数据
weather_data = {
"city": city,
"country": country,
"temperature": 25.5,
"humidity": 65,
"timestamp": time.time()
}
return weather_data
defclear_cache(self):
"""清除缓存"""
self.get_weather.cache_clear()
# 使用
api = WeatherAPI()
print("第一次请求:")
weather1 = api.get_weather("Beijing")
print(f"温度: {weather1['temperature']}°C")
print("\n第二次相同请求(缓存):")
weather2 = api.get_weather("Beijing") # 从缓存获取
print(f"温度: {weather2['temperature']}°C")
print(f"是同一个对象吗?{weather1 is weather2}") # True
print("\n不同参数请求:")
api.get_weather("Shanghai")
api.get_weather("Beijing", "US") # 不同国家代码
print(f"\n总API调用次数: {api.call_count}") # 3次,不是4次
3. 动态规划优化
import functools
from typing import List
classKnapsackSolver:
"""0-1背包问题求解器"""
@staticmethod
@functools.lru_cache(maxsize=None)
defknap_sack(capacity: int, n: int, weights: tuple, values: tuple) -> int:
"""带缓存的递归解法"""
if n == 0or capacity == 0:
return0
# 如果当前物品重量大于剩余容量,跳过
if weights[n-1] > capacity:
return KnapsackSolver.knap_sack(capacity, n-1, weights, values)
# 否则,选择放入或不放入的最大值
include = values[n-1] + KnapsackSolver.knap_sack(
capacity - weights[n-1], n-1, weights, values
)
exclude = KnapsackSolver.knap_sack(capacity, n-1, weights, values)
return max(include, exclude)
# 使用
weights = (10, 20, 30)
values = (60, 100, 120)
capacity = 50
n = len(weights)
# 转换为元组以便缓存(列表不可哈希)
weights_tuple = tuple(weights)
values_tuple = tuple(values)
result = KnapsackSolver.knap_sack(capacity, n, weights_tuple, values_tuple)
print(f"最大价值: {result}") # 220
# 缓存信息
cache_info = KnapsackSolver.knap_sack.cache_info()
print(f"\n缓存信息:")
print(f"命中次数: {cache_info.hits}")
print(f"未命中次数: {cache_info.misses}")
print(f"最大缓存大小: {cache_info.maxsize}")
print(f"当前缓存大小: {cache_info.currsize}")
缓存管理方法
1. 查看缓存信息
import functools
@functools.lru_cache(maxsize=3)
defcached_func(x):
return x ** 2
# 调用函数
for i in [1, 2, 3, 1, 4, 2]:
cached_func(i)
# 获取缓存统计信息
cache_info = cached_func.cache_info()
print(f"缓存统计:")
print(f" 命中次数 (hits): {cache_info.hits}")
print(f" 未命中次数 (misses): {cache_info.misses}")
print(f" 最大缓存大小: {cache_info.maxsize}")
print(f" 当前缓存大小: {cache_info.currsize}")
# 输出:
# 缓存统计:
# 命中次数 (hits): 2 (第二次调用1和2时命中)
# 未命中次数 (misses): 4 (第一次调用1,2,3,4)
# 最大缓存大小: 3
# 当前缓存大小: 3
2. 清除缓存
import functools
@functools.lru_cache(maxsize=128)
defget_data(key):
print(f"获取数据: {key}")
returnf"data_for_{key}"
# 填充缓存
print(get_data("A")) # 获取数据: A
print(get_data("A")) # 直接返回(命中缓存)
# 清除缓存
get_data.cache_clear()
print("缓存已清除")
print(get_data("A")) # 获取数据: A(重新计算)
3. 查看缓存内容
import functools
@functools.lru_cache(maxsize=3)
deffunc(x):
return x * 10
# 填充缓存
func(1)
func(2)
func(3)
# 查看缓存键值对(Python 3.9+)
if hasattr(func, '__wrapped__'):
print("缓存装饰的函数:", func.__wrapped__.__name__)
# 通过缓存属性访问(内部实现,谨慎使用)
print("\n缓存内容:")
for key, value in func.cache.items():
print(f" 参数: {key} -> 结果: {value}")
注意事项
1. 不可哈希参数的问题
import functools
from typing import List, Dict
# 错误示例:列表不可哈希
# @functools.lru_cache
# def process_list(items: List[int]): # 会报错
# return sum(items)
# 解决方案1:转换为元组
@functools.lru_cache
defprocess_tuple(items_tuple: tuple):
return sum(items_tuple)
# 使用
items = [1, 2, 3, 4, 5]
result = process_tuple(tuple(items))
print(f"结果: {result}")
# 解决方案2:使用自定义键生成函数
deflist_to_key(items: List):
"""将列表转换为可哈希的键"""
return tuple(items)
@functools.lru_cache
defprocess_with_key(key):
items = list(key) # 从元组转换回列表
return sum(items)
# 使用
result = process_with_key(list_to_key([1, 2, 3]))
print(f"结果: {result}")
2. 类方法缓存
import functools
classDataProcessor:
def__init__(self, base_value):
self.base_value = base_value
@functools.lru_cache(maxsize=128)
defprocess(self, x, y):
"""实例方法缓存(每个实例有自己的缓存)"""
print(f"处理 {x}, {y}")
return self.base_value + x * y
@classmethod
@functools.lru_cache(maxsize=128)
defclass_process(cls, x, y):
"""类方法缓存(所有实例共享)"""
print(f"类处理 {x}, {y}")
return x * y * 100
# 使用
processor1 = DataProcessor(10)
processor2 = DataProcessor(20)
print("实例方法缓存:")
print(processor1.process(2, 3)) # 处理 2, 3
print(processor1.process(2, 3)) # 命中缓存
print(processor2.process(2, 3)) # 处理 2, 3(不同实例)
print("\n类方法缓存:")
print(processor1.class_process(2, 3)) # 类处理 2, 3
print(processor2.class_process(2, 3)) # 命中缓存(共享)
3. 带时间限制的缓存
import functools
import time
deftime_limited_cache(maxsize=128, ttl=60):
"""带TTL(生存时间)的缓存装饰器"""
defdecorator(func):
cached_func = functools.lru_cache(maxsize=maxsize)(func)
cached_func.expiry_times = {}
@functools.wraps(func)
defwrapper(*args, **kwargs):
key = (args, tuple(kwargs.items()))
# 检查缓存是否过期
current_time = time.time()
if key in cached_func.expiry_times:
if current_time > cached_func.expiry_times[key]:
# 缓存过期,清除
cached_func.cache_clear()
del cached_func.expiry_times[key]
# 设置过期时间
cached_func.expiry_times[key] = current_time + ttl
return cached_func(*args, **kwargs)
return wrapper
return decorator
# 使用
@time_limited_cache(maxsize=32, ttl=5) # 5秒过期
defget_current_data(x):
print(f"获取数据: {x}")
returnf"data_{x}_{time.time()}"
print(get_current_data(1)) # 获取数据: 1
print(get_current_data(1)) # 缓存命中
time.sleep(6) # 等待缓存过期
print(get_current_data(1)) # 获取数据: 1(重新获取)
@functools.lru_cache 适合做什么?
@functools.lru_cache 是一个强大的性能优化工具,特别适用于:
3.2 @functools.total_ordering - 简化比较操作
@functools.total_ordering 是一个类装饰器,只需要定义 eq() 和 lt()、__le__()、__gt__() 或 ge() 中的任意一个,它就能自动为你生成所有其他比较方法。
为什么需要它?
Python 中要为一个类定义完整的比较操作,通常需要实现 6 个方法:
@total_ordering 让你只需定义 2 个方法就能获得全部 6 个!
基本语法
import functools
from math import sqrt
@functools.total_ordering
classVector2D:
"""2D向量类,使用total_ordering自动生成比较方法"""
def__init__(self, x, y):
self.x = x
self.y = y
def__repr__(self):
returnf"Vector2D({self.x}, {self.y})"
@property
defmagnitude(self):
"""向量长度"""
return sqrt(self.x**2 + self.y**2)
# 只需要定义__eq__和__lt__,其他比较方法会自动生成
def__eq__(self, other):
ifnot isinstance(other, Vector2D):
returnNotImplemented
return self.x == other.x and self.y == other.y
def__lt__(self, other):
"""按长度比较"""
ifnot isinstance(other, Vector2D):
returnNotImplemented
return self.magnitude < other.magnitude
# 注意:不需要定义__gt__, __le__, __ge__,它们会自动生成
# 测试比较操作
v1 = Vector2D(1, 2)
v2 = Vector2D(3, 4)
v3 = Vector2D(1, 2)
print(f"v1 = {v1}, 长度 = {v1.magnitude:.2f}")
print(f"v2 = {v2}, 长度 = {v2.magnitude:.2f}")
print(f"v3 = {v3}, 长度 = {v3.magnitude:.2f}")
# 自动生成的比较方法
print(f"v1 == v3: {v1 == v3}") # True
print(f"v1 != v2: {v1 != v2}") # True
print(f"v1 < v2: {v1 < v2}") # True
print(f"v1 <= v2: {v1 <= v2}") # True
print(f"v2 > v1: {v2 > v1}") # True
print(f"v2 >= v1: {v2 >= v1}") # True
# 排序
vectors = [Vector2D(3, 4), Vector2D(1, 1), Vector2D(2, 2)]
sorted_vectors = sorted(vectors)
print(f"排序前: {vectors}")
print(f"排序后: {sorted_vectors}")
3.3 @functools.singledispatch - 单分派泛函数
@functools.singledispatch 是一个装饰器,用于创建单分派泛函数(single-dispatch generic functions),允许函数根据第一个参数的类型来调用不同的实现(类似于其他语言中的函数重载)。
单分派泛函数允许你为不同类型的第一个参数提供不同的实现,Python 在运行时根据实际参数类型自动选择正确的实现。
基本语法
import functools
from numbers import Integral, Real
from decimal import Decimal
@functools.singledispatch
defformat_data(data):
"""默认格式化函数"""
return str(data)
@format_data.register(str)
def_(text):
"""处理字符串"""
returnf'字符串: "{text}"'
@format_data.register(Integral) # int的父类
def_(number):
"""处理整数"""
returnf'整数: {number} (0x{number:x})'
@format_data.register(Real) # float的父类
def_(number):
"""处理浮点数"""
returnf'浮点数: {number:.4f}'
@format_data.register(list)
def_(items):
"""处理列表"""
formatted_items = [format_data(item) for item in items]
returnf'列表: [{", ".join(formatted_items)}]'
@format_data.register(dict)
def_(mapping):
"""处理字典"""
formatted_items = [f'{k}: {format_data(v)}'for k, v in mapping.items()]
returnf'字典: {{{", ".join(formatted_items)}}}'
# 还可以注册自定义类型
classPoint:
def__init__(self, x, y):
self.x = x
self.y = y
def__repr__(self):
returnf"Point({self.x}, {self.y})"
@format_data.register(Point)
def_(point):
returnf"点({point.x}, {point.y})"
# 测试单分派
test_cases = [
42, # 整数
3.14159, # 浮点数
"Hello, World!", # 字符串
[1, 2, "three", 4.0], # 列表
{"name": "Alice", "age": 30}, # 字典
Point(3, 4), # 自定义类型
Decimal("10.5"), # 未注册的类型,使用默认处理
True, # bool是int的子类,使用整数处理
]
for data in test_cases:
print(f"{repr(data):30} -> {format_data(data)}")
3.4 @functools.wraps - 保留元数据
import functools
import time
# 不使用wraps的问题
defbad_decorator(func):
defwrapper(*args, **kwargs):
"""包装函数"""
return func(*args, **kwargs)
return wrapper
@bad_decorator
defexample_function():
"""示例函数文档"""
return"Hello"
print(f"不使用wraps的函数名: {example_function.__name__}") # wrapper
print(f"不使用wraps的文档: {example_function.__doc__}") # 包装函数
# 使用wraps的正确方式
defgood_decorator(func):
@functools.wraps(func)
defwrapper(*args, **kwargs):
"""包装函数文档"""
return func(*args, **kwargs)
return wrapper
@good_decorator
defanother_example():
"""另一个示例函数文档"""
return"World"
print(f"\n使用wraps的函数名: {another_example.__name__}") # another_example
print(f"使用wraps的文档: {another_example.__doc__}") # 另一个示例函数文档
# 实用的装饰器示例
deftime_it(func):
"""计时装饰器"""
@functools.wraps(func)
defwrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
elapsed = end_time - start_time
print(f"{func.__name__} 执行时间: {elapsed:.6f}秒")
return result
return wrapper
deflog_call(func):
"""日志装饰器"""
@functools.wraps(func)
defwrapper(*args, **kwargs):
print(f"调用: {func.__name__}(args={args}, kwargs={kwargs})")
return func(*args, **kwargs)
return wrapper
# 组合多个装饰器
@time_it
@log_call
defcalculate_sum(n):
"""计算1到n的和"""
return sum(range(1, n + 1))
result = calculate_sum(1000)
print(f"结果: {result}")
4. contextlib模块装饰器
4.1 @contextlib.contextmanager - 上下文管理器
import contextlib
import time
from pathlib import Path
# 创建简单的上下文管理器
@contextlib.contextmanager
deftimer(name="操作"):
"""计时上下文管理器"""
start_time = time.perf_counter()
try:
print(f"{name}开始...")
yield# 这里是被管理的代码块
finally:
end_time = time.perf_counter()
elapsed = end_time - start_time
print(f"{name}结束,耗时: {elapsed:.4f}秒")
# 使用计时器
with timer("文件处理"):
time.sleep(0.5)
print("正在处理文件...")
# 更复杂的例子:临时目录
import tempfile
import os
@contextlib.contextmanager
deftemporary_directory():
"""创建临时目录并在退出时清理"""
temp_dir = tempfile.mkdtemp()
try:
print(f"创建临时目录: {temp_dir}")
yield Path(temp_dir) # 将临时目录路径传递给用户代码
finally:
import shutil
print(f"清理临时目录: {temp_dir}")
shutil.rmtree(temp_dir, ignore_errors=True)
# 使用临时目录
with temporary_directory() as temp_dir:
# 在临时目录中创建文件
temp_file = temp_dir / "test.txt"
temp_file.write_text("临时文件内容")
print(f"在 {temp_dir} 中创建了文件")
# 退出with块时自动清理
# 处理异常的上下文管理器
@contextlib.contextmanager
defhandle_exceptions(*exception_types):
"""处理特定类型异常的上下文管理器"""
try:
yield
except exception_types as e:
print(f"捕获到异常: {type(e).__name__}: {e}")
except Exception as e:
print(f"捕获到未处理的异常: {type(e).__name__}: {e}")
raise
# 使用异常处理
with handle_exceptions(ValueError, ZeroDivisionError):
result = 10 / 2
print(f"结果: {result}")
with handle_exceptions(ValueError, ZeroDivisionError):
result = 10 / 0# 会被捕获和处理
with handle_exceptions(ValueError):
raise TypeError("类型错误") # 不会被处理,会重新抛出
5. 异步相关装饰器
5.1 asyncio相关装饰器(Python 3.7之前)
import asyncio
# Python 3.7之前使用@asyncio.coroutine
# Python 3.7+推荐使用async/await语法
# 传统方式(Python 3.7之前)
@asyncio.coroutine
defold_coroutine():
"""传统的协程装饰器(已过时)"""
yieldfrom asyncio.sleep(1)
return"完成"
# 现代方式(Python 3.7+)
asyncdefmodern_coroutine():
"""使用async/await语法"""
await asyncio.sleep(1)
return"完成"
asyncdefmain():
# 运行传统协程
if asyncio.iscoroutinefunction(old_coroutine):
result = await old_coroutine()
print(f"传统协程结果: {result}")
# 运行现代协程
result = await modern_coroutine()
print(f"现代协程结果: {result}")
# 运行异步程序
asyncio.run(main())
6. typing模块装饰器
6.1 @typing.overload - 函数重载
from typing import overload, Union, List, Optional
# 使用@overload提供类型提示
classCalculator:
@overload
defadd(self, x: int, y: int) -> int: ...
@overload
defadd(self, x: float, y: float) -> float: ...
@overload
defadd(self, x: str, y: str) -> str: ...
# 实际实现
defadd(self, x: Union[int, float, str], y: Union[int, float, str]) -> Union[int, float, str]:
"""重载add方法,支持多种类型"""
if isinstance(x, str) or isinstance(y, str):
return str(x) + str(y)
return x + y
calc = Calculator()
print(calc.add(1, 2)) # 3 (int)
print(calc.add(1.5, 2.5)) # 4.0 (float)
print(calc.add("Hello, ", "World!")) # "Hello, World!" (str)
# 更复杂的重载示例
@overload
defprocess_data(data: int) -> List[int]: ...
@overload
defprocess_data(data: str) -> str: ...
@overload
defprocess_data(data: List[int]) -> int: ...
defprocess_data(data: Union[int, str, List[int]]) -> Union[List[int], str, int]:
"""根据输入类型处理数据"""
if isinstance(data, int):
return [i for i in range(data)]
elif isinstance(data, str):
return data.upper()
elif isinstance(data, list):
return sum(data)
else:
raise TypeError("不支持的参数类型")
6.2 @typing.final - 禁止继承/重写
from typing import final
# 标记类为final,禁止继承
@final
classBaseClass:
"""这个类不能被继承"""
defmethod(self):
return"Base method"
# 尝试继承会引发类型检查错误(运行时不会报错)
# class DerivedClass(BaseClass): # 类型检查器会报错
# pass
# 标记方法为final,禁止重写
classParent:
@final
defcannot_override(self):
"""这个方法不能被子类重写"""
return"Parent implementation"
defcan_override(self):
"""这个方法可以被子类重写"""
return"Parent implementation"
classChild(Parent):
# 尝试重写final方法会导致类型检查错误
# def cannot_override(self): # 类型检查器会报错
# return "Child implementation"
defcan_override(self):
"""正确:重写非final方法"""
return"Child implementation"
# 使用
parent = Parent()
child = Child()
print(parent.cannot_override()) # Parent implementation
print(child.cannot_override()) # Parent implementation
print(child.can_override()) # Child implementation
7. 综合应用举例
7.1 实现一个数据验证框架
import functools
from typing import Any, Callable, Type, Union, get_type_hints
from numbers import Number
classValidationError(Exception):
"""验证错误异常"""
pass
defvalidate_types(func: Callable) -> Callable:
"""类型验证装饰器"""
@functools.wraps(func)
defwrapper(*args, **kwargs) -> Any:
# 获取类型提示
type_hints = get_type_hints(func)
# 验证位置参数
for i, (arg, param_name) in enumerate(zip(args, type_hints.keys())):
if param_name in type_hints:
expected_type = type_hints[param_name]
ifnot isinstance(arg, expected_type):
raise ValidationError(
f"参数 '{param_name}' 应为 {expected_type.__name__} 类型, "
f"但实际为 {type(arg).__name__}"
)
# 验证关键字参数
for param_name, arg in kwargs.items():
if param_name in type_hints:
expected_type = type_hints[param_name]
ifnot isinstance(arg, expected_type):
raise ValidationError(
f"参数 '{param_name}' 应为 {expected_type.__name__} 类型, "
f"但实际为 {type(arg).__name__}"
)
return func(*args, **kwargs)
return wrapper
defvalidate_range(min_value: Number = None, max_value: Number = None):
"""数值范围验证装饰器工厂"""
defdecorator(func: Callable) -> Callable:
@functools.wraps(func)
defwrapper(*args, **kwargs) -> Any:
result = func(*args, **kwargs)
if isinstance(result, Number):
if min_value isnotNoneand result < min_value:
raise ValidationError(
f"返回值 {result} 小于最小值 {min_value}"
)
if max_value isnotNoneand result > max_value:
raise ValidationError(
f"返回值 {result} 大于最大值 {max_value}"
)
return result
return wrapper
return decorator
# 使用装饰器组合
classDataProcessor:
@validate_types
@validate_range(min_value=0, max_value=100)
defprocess_score(self, score: int) -> float:
"""处理分数,返回百分比"""
return score / 100.0
@validate_types
defcreate_person(self, name: str, age: int) -> dict:
"""创建人员信息"""
if age < 0:
raise ValidationError("年龄不能为负数")
return {"name": name, "age": age}
# 测试
processor = DataProcessor()
# 正常情况
try:
result = processor.process_score(85)
print(f"处理结果: {result:.2%}")
person = processor.create_person("Alice", 25)
print(f"人员信息: {person}")
except ValidationError as e:
print(f"验证错误: {e}")
# 错误情况
try:
result = processor.process_score("85") # 类型错误
except ValidationError as e:
print(f"验证错误: {e}")
try:
result = processor.process_score(150) # 范围错误
except ValidationError as e:
print(f"验证错误: {e}")
try:
person = processor.create_person("Bob", -5) # 逻辑错误
except ValidationError as e:
print(f"验证错误: {e}")
7.2 实现缓存API响应
import functools
import json
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
classCacheEntry:
"""缓存条目"""
data: Any
timestamp: datetime
expires_in: Optional[timedelta] = None
@property
defis_expired(self) -> bool:
"""检查缓存是否过期"""
if self.expires_in isNone:
returnFalse
return datetime.now() > self.timestamp + self.expires_in
defcache_response(ttl_seconds: Optional[int] = None):
"""缓存API响应装饰器"""
defdecorator(func):
cache: Dict[str, CacheEntry] = {}
@functools.wraps(func)
defwrapper(*args, **kwargs):
# 生成缓存键
cache_key = json.dumps({
'func': func.__name__,
'args': args,
'kwargs': kwargs
}, sort_keys=True)
# 检查缓存
if cache_key in cache:
entry = cache[cache_key]
ifnot entry.is_expired:
print(f"使用缓存: {func.__name__}")
return entry.data
# 调用函数获取新数据
print(f"调用API: {func.__name__}")
result = func(*args, **kwargs)
# 保存到缓存
expires_in = timedelta(seconds=ttl_seconds) if ttl_seconds elseNone
cache[cache_key] = CacheEntry(
data=result,
timestamp=datetime.now(),
expires_in=expires_in
)
return result
# 添加缓存管理方法
wrapper.clear_cache = lambda: cache.clear()
wrapper.cache_info = lambda: {
'size': len(cache),
'entries': list(cache.keys())
}
return wrapper
return decorator
# 模拟API客户端
classAPIClient:
def__init__(self):
self.call_count = 0
@cache_response(ttl_seconds=5) # 缓存5秒
defget_user_data(self, user_id: int) -> Dict[str, Any]:
"""模拟获取用户数据(耗时操作)"""
self.call_count += 1
time.sleep(0.5) # 模拟网络延迟
return {
"id": user_id,
"name": f"User{user_id}",
"timestamp": datetime.now().isoformat()
}
@cache_response() # 永久缓存
defget_static_data(self) -> Dict[str, Any]:
"""获取静态数据"""
self.call_count += 1
return {
"version": "1.0.0",
"supported_languages": ["en", "zh", "es"]
}
# 测试缓存
client = APIClient()
print("第一次调用(会调用API):")
result1 = client.get_user_data(1)
print(f"结果: {result1}")
print(f"API调用次数: {client.call_count}")
print("\n立即再次调用(使用缓存):")
result2 = client.get_user_data(1)
print(f"结果: {result2}")
print(f"API调用次数: {client.call_count}")
print("\n5秒后调用(缓存过期,重新调用API):")
time.sleep(6)
result3 = client.get_user_data(1)
print(f"结果: {result3}")
print(f"API调用次数: {client.call_count}")
print("\n调用不同参数(新缓存):")
result4 = client.get_user_data(2)
print(f"结果: {result4}")
print(f"API调用次数: {client.call_count}")
print("\n缓存信息:")
print(f"缓存大小: {client.get_user_data.cache_info()['size']}")
print("\n清空缓存:")
client.get_user_data.clear_cache()
print(f"清空后缓存大小: {client.get_user_data.cache_info()['size']}")
8. 内置装饰器总结
| | | |
|---|
@staticmethod | | | |
@classmethod | | | |
@property | | | |
@<property>.setter | | | |
@<property>.deleter | | | |
@functools.lru_cache | | | |
@functools.wraps | | | |
@functools.total_ordering | | | |
@functools.singledispatch | | | |
@contextlib.contextmanager | | | |
@typing.overload | | | |
@typing.final | | | |
- 自定义装饰器总是使用
@functools.wraps
- 使用
@typing.overload提高代码可读性
- 使用
@contextlib.contextmanager管理资源
常见陷阱:
# 陷阱1:在类中使用装饰器时忘记self参数
classExample:
@staticmethod
defstatic_method():
return"静态方法"
# 错误:实例方法忘记self参数
# @timer_decorator # 如果装饰器期望函数有参数
# def instance_method(): # 缺少self参数
# return "实例方法"
# 正确
definstance_method(self):
return"实例方法"
# 陷阱2:缓存可变对象
@functools.lru_cache
defprocess_list(items):
"""危险:缓存列表,列表是可变的!"""
return items.copy() # 返回副本
# 陷阱3:过度使用装饰器
# 避免装饰器嵌套过深,影响可读性
# @decorator1
# @decorator2
# @decorator3
# @decorator4 # 过多装饰器难以调试
# def function():
# pass
内置装饰器是Python的强大特性,合理使用可以: