在开发过程中,请求第三方接口时可能遇到到:500、502、503等服务段的异常码,导入我们的业务处理陷入异常,严重还有可能导致程序崩溃!今天这篇文章将详细介绍如何用Python装饰器实现一个生产级的自动重试+熔断机制,让你的接口测试稳如泰山。
一、熔断器
。
1.1 什么是熔断机制?
想象一下家里电路中的保险丝:当电流过大时,保险丝会自动熔断,防止电器损坏和火灾风险。熔断机制在软件设计中也是同样的道理。
在分布式系统中,当某个服务连续失败达到一定阈值时,系统会自动"跳闸",暂时停止对该服务的调用,避免资源耗尽和级联失败,这就是熔断器模式。
1.2 熔断器的三种状态
熔断器不是简单的"开"或"关",而是一个智能的状态机:
- 3. 半开状态(HALF-OPEN) - 恢复试探模式
1.3 为什么需要"重试+熔断"组合?
二、手把手实现智能重试熔断装饰器
2.1 核心设计思路
我们要实现一个装饰器,它能够:
2.2 熔断器状态管理
先实现熔断器的状态管理逻辑:
from datetime import datetime, timedeltafrom typing import Optionalfrom enum import Enumclass CircuitState(Enum): """熔断器状态枚举""" CLOSED = "CLOSED" # 关闭状态,正常通行 OPEN = "OPEN" # 打开状态,快速失败 HALF_OPEN = "HALF_OPEN" # 半开状态,试探恢复class ServiceCircuitBreaker: """ 服务熔断器实现 实现逻辑: 1. 记录服务调用成功/失败次数 2. 根据失败率决定是否熔断 3. 熔断后定时尝试恢复 4. 支持手动重置 """ def __init__( self, name: str, failure_threshold: int = 5, reset_timeout: int = 60, half_open_max_calls: int = 3): """ 初始化熔断器 参数说明: - name: 熔断器名称,用于标识不同的服务 - failure_threshold: 触发熔断的连续失败次数 - reset_timeout: 熔断后重置超时时间(秒) - half_open_max_calls: 半开状态最大试探次数 """ self.name = name self.failure_threshold = failure_threshold self.reset_timeout = reset_timeout self.half_open_max_calls = half_open_max_calls # 状态跟踪 self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.half_open_calls = 0 self.last_failure_time: Optional[datetime] = None self.last_state_change = datetime.now() def __str__(self) -> str: """返回熔断器状态字符串""" return (f"ServiceCircuitBreaker(name={self.name}, " f"state={self.state.value}, " f"failures={self.failure_count}, " f"successes={self.success_count})") def _change_state(self, new_state: CircuitState) -> None: """内部方法:改变状态并记录时间""" if self.state != new_state: self.state = new_state self.last_state_change = datetime.now() print(f"🔄 熔断器[{self.name}]状态变化: {self.state.value}") def call_successful(self) -> None: """记录成功调用""" if self.state == CircuitState.HALF_OPEN: self.success_count += 1 self.half_open_calls += 1 # 半开状态下成功次数达到阈值,恢复正常 if self.success_count >= self.half_open_max_calls: self._reset_to_closed() else: # 关闭状态下,成功调用重置失败计数 self.failure_count = 0 self.success_count = 0 def call_failed(self) -> None: """记录失败调用""" self.failure_count += 1 self.last_failure_time = datetime.now() # 检查是否需要熔断 if (self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold): self._change_state(CircuitState.OPEN) elif self.state == CircuitState.HALF_OPEN: # 半开状态下失败,重新熔断 self._change_state(CircuitState.OPEN) def _reset_to_closed(self) -> None: """重置为关闭状态""" self._change_state(CircuitState.CLOSED) self.failure_count = 0 self.success_count = 0 self.half_open_calls = 0 def allow_request(self) -> bool: """ 检查是否允许请求通过 返回: - True: 允许请求 - False: 拒绝请求(熔断中) """ if self.state == CircuitState.CLOSED: return True elif self.state == CircuitState.OPEN: # 检查是否过了重置时间 time_since_open = datetime.now() - self.last_state_change if time_since_open.total_seconds() >= self.reset_timeout: # 进入半开状态,允许少量请求试探 self._change_state(CircuitState.HALF_OPEN) return True return False elif self.state == CircuitState.HALF_OPEN: # 半开状态下限制请求数量 if self.half_open_calls < self.half_open_max_calls: return True return False return False
2.3 智能重试装饰器实现
现在实现核心的装饰器,它集成了重试和熔断逻辑:
import timeimport functoolsfrom typing import Callable, Any, Type, Tuple, Unionfrom dataclasses import dataclassfrom concurrent.futures import ThreadPoolExecutor, as_completed@dataclassclass RetryConfig: """重试配置类""" max_attempts: int = 3 # 最大尝试次数 initial_delay: float = 1.0 # 初始延迟(秒) max_delay: float = 30.0 # 最大延迟(秒) backoff_factor: float = 2.0 # 退避因子 jitter: float = 0.1 # 随机抖动范围 retry_exceptions: Tuple[Type[Exception], ...] = (Exception,) # 重试的异常类型class SmartRetryCircuitBreaker: """ 智能重试+熔断装饰器管理器 核心功能: 1. 为每个函数创建独立的熔断器 2. 支持指数退避重试策略 3. 支持随机抖动避免惊群效应 4. 提供详细的执行统计 """ def __init__(self): # 存储所有熔断器,按函数名索引 self.circuit_breakers: dict[str, ServiceCircuitBreaker] = {} # 执行统计 self.execution_stats: dict[str, dict] = {} def _get_circuit_breaker(self, func_name: str) -> ServiceCircuitBreaker: """获取或创建熔断器""" if func_name not in self.circuit_breakers: self.circuit_breakers[func_name] = ServiceCircuitBreaker( name=f"cb_{func_name}", failure_threshold=3, reset_timeout=30 ) self.execution_stats[func_name] = { "total_calls": 0, "successful_calls": 0, "failed_calls": 0, "circuit_opened": 0 } return self.circuit_breakers[func_name] def _calculate_delay(self, attempt: int, config: RetryConfig) -> float: """计算重试延迟,使用指数退避+随机抖动""" delay = min( config.initial_delay * (config.backoff_factor ** (attempt - 1)), config.max_delay ) # 添加随机抖动,避免同时重试 jitter_amount = delay * config.jitter delay += (jitter_amount * (2 * (hash(func_name) % 100) / 100 - 1)) return max(0, delay) def retry_with_circuit_breaker( self, config: RetryConfig = None): """ 主装饰器方法 实现逻辑: 1. 检查熔断器状态 2. 执行目标函数 3. 失败时按策略重试 4. 更新熔断器状态 5. 收集执行统计 """ if config is None: config = RetryConfig() def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs) -> Any: func_name = f"{func.__module__}.{func.__name__}" circuit_breaker = self._get_circuit_breaker(func_name) stats = self.execution_stats[func_name] stats["total_calls"] += 1 # 检查熔断器状态 if not circuit_breaker.allow_request(): stats["circuit_opened"] += 1 raise Exception( f"服务熔断中: {func_name} " f"(状态: {circuit_breaker.state.value}, " f"已熔断{stats['circuit_opened']}次)" ) last_exception = None # 重试逻辑 for attempt in range(1, config.max_attempts + 1): try: result = func(*args, **kwargs) # 调用成功 circuit_breaker.call_successful() stats["successful_calls"] += 1 if attempt > 1: print(f"✅ {func_name} 在第{attempt}次重试后成功") return result except Exception as e: # 检查是否需要重试 should_retry = any( isinstance(e, exc_type) for exc_type in config.retry_exceptions ) if not should_retry: # 不重试的异常,直接抛出 circuit_breaker.call_failed() stats["failed_calls"] += 1 raise last_exception = e circuit_breaker.call_failed() # 记录最后一次异常 if attempt == config.max_attempts: stats["failed_calls"] += 1 break # 计算延迟并等待 delay = self._calculate_delay(attempt, config) print(f"⚠️ {func_name} 第{attempt}次失败: {e}") print(f" 等待{delay:.1f}秒后重试...") time.sleep(delay) # 所有重试都失败 raise last_exception or Exception(f"{func_name} 执行失败") return wrapper return decorator def get_stats(self, func_name: str = None) -> dict: """获取执行统计""" if func_name: return self.execution_stats.get(func_name, {}) return self.execution_stats def reset_circuit(self, func_name: str) -> None: """手动重置熔断器""" if func_name in self.circuit_breakers: self.circuit_breakers[func_name]._reset_to_closed() print(f"🔄 已手动重置熔断器: {func_name}")# 创建全局管理器实例retry_manager = SmartRetryCircuitBreaker()# 创建便捷装饰器def resilient_api( max_attempts: int = 3, initial_delay: float = 1.0, retry_exceptions: Tuple[Type[Exception], ...] = (Exception,)): """便捷装饰器,用于API调用""" config = RetryConfig( max_attempts=max_attempts, initial_delay=initial_delay, retry_exceptions=retry_exceptions ) return retry_manager.retry_with_circuit_breaker(config)