你是否遇到过这样的问题?
今天分享一个绝妙的解决方案:一个同时支持同步、异步的函数调用限制装饰器!
核心特性
- • 严格单次调用限制 - 函数执行完成前只能被调用一次
- • 线程安全 - 使用 threading.Lock() 保证多线程环境安全
应用场景
函数级
import asyncioimport inspectimport threadingimport timefrom functools import wrapsfrom typing importCallable, Optional, Anydefsingle_call( timeout: float = 0, on_success: Optional[Callable[..., Any]] = None, on_rejected: Optional[Callable[..., Any]] = None, reject_result: Any = None,):""" 函数调用限制装饰器 参数: timeout: 两次允许执行的最小时间间隔(秒), 0表示只限制执行期间。 timeout = 0: 仅限制函数不能同时执行(前一次执行完成后才能开始下一次) timeout > 0: 即使前一次执行已完成,仍需等待设定的时间间隔后才允许再次执行 on_success: 成功执行后的回调函数 on_rejected: 被拒绝执行时的回调函数 reject_result: 被拒绝时返回的默认值 """defdecorator(func): is_running = False last_run_time = 0.0 lock = threading.Lock()if inspect.iscoroutinefunction(func): @wraps(func)asyncdefasync_wrapped(*args, **kwargs):nonlocal is_running, last_run_time current_time = time.time() should_reject = Falsewith lock:if is_running or (current_time - last_run_time < timeout): should_reject = Trueelse: is_running = True# 如果被拒绝,执行回调if should_reject:if on_rejected:try:if inspect.iscoroutinefunction(on_rejected):await on_rejected(*args, **kwargs)else: on_rejected(*args, **kwargs)except Exception as e:print(f"拒绝回调错误: {e}")return reject_result# 执行原函数和成功回调try: result = await func(*args, **kwargs)if on_success:try:if inspect.iscoroutinefunction(on_success):await on_success(*args, **kwargs)else: on_success(*args, **kwargs)except Exception as e:print(f"成功回调错误: {e}")return resultfinally:# 函数执行完毕后,恢复状态并记录完成时间(用于timeout冷却)with lock: is_running = False last_run_time = time.time()return async_wrappedelse: @wraps(func)defsync_wrapped(*args, **kwargs):nonlocal is_running, last_run_time current_time = time.time() should_reject = Falsewith lock:if is_running or (current_time - last_run_time < timeout): should_reject = Trueelse: is_running = Trueif should_reject:if on_rejected:try:if inspect.iscoroutinefunction(on_rejected):raise ValueError("同步函数不允许使用异步的 on_rejected 回调") on_rejected(*args, **kwargs)except Exception as e:print(f"拒绝回调错误: {e}")return reject_resulttry: result = func(*args, **kwargs)if on_success:try:if inspect.iscoroutinefunction(on_success):raise ValueError("同步函数不允许使用异步的 on_success 回调") on_success(*args, **kwargs)except Exception as e:print(f"成功回调错误: {e}")return resultfinally:with lock: is_running = False last_run_time = time.time()return sync_wrappedreturn decorator
多线程环境下的同步函数调用示例
import threadingimport timedefsuccess_handler(num):print(f"线程 {num} 执行完毕")defrejected_callback(num):print(f'线程 {num} 被拒绝了')@single_call(on_rejected=rejected_callback, on_success=success_handler, timeout=1)defworker(num):print(f'线程 {num} 开始') time.sleep(2)print(f'线程 {num} 结束')threads = []for i inrange(4): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()for t in threads: t.join()threading.Thread(target=worker, args=(4,)).start()"""线程 0 开始线程 1 被拒绝了线程 2 被拒绝了线程 3 被拒绝了线程 0 结束线程 4 开始线程 4 结束"""
异步函数在多线程环境中的调用示例
import asyncioimport threadingasyncdefasync_success_callback(num):print(f"线程 {num} 操作成功")asyncdefasync_reject_callback(num):print(f"线程 {num} 被拒绝")@single_call( on_success=async_success_callback, on_rejected=async_reject_callback)asyncdefasync_operation(task_id):print(f"开始异步 {task_id}")await asyncio.sleep(1)print(f"结束异步 {task_id}")defthread_worker(task_id):print(f"线程 {task_id} 启动") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(async_operation(task_id)) loop.close()threads = []for i inrange(3): t = threading.Thread(target=thread_worker, args=(i,)) threads.append(t) t.start()for t in threads: t.join()threading.Thread(target=thread_worker, args=(100,)).start()"""线程 0 启动线程 1 启动线程 2 启动开始异步 1线程 0 被拒绝线程 2 被拒绝结束异步 1线程 1 操作成功线程 100 启动开始异步 100结束异步 100线程 100 操作成功"""
业务陷阱(必读!)
常见错误用法
@single_call(timeout=60, reject_result="发送太频繁")defsend_sms(phone_number):print(f"正在给 {phone_number} 发短信")
问题:这是"函数级"锁,所有用户共享!
- • 张三给 13800000001 发短信 → 锁住了整个 send_sms 函数
- • 李四给 13900000002 发短信 → 直接被拒绝!
正确做法:参数级锁
使用 single_call 装饰器,基于手机号/用户ID等参数创建独立的锁:
import asyncioimport inspectimport threadingimport timefrom functools import wrapsfrom typing importCallable, Optional, Anyimport logginglogging.basicConfig( level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')defsingle_call( key_func: Callable[..., str], # 用于从参数中提取唯一 Key 的函数 timeout: float = 0.0, on_success: Optional[Callable[..., Any]] = None, on_rejected: Optional[Callable[..., Any]] = None, reject_result: Any = None, cleanup_threshold: int = 1000, # 触发内存清理的字典长度阈值):""" 基于参数 Key 的函数调用限制装饰器 参数: key_func: 一个函数,接收与目标函数相同的参数,返回一个唯一的字符串(如手机号、用户ID) timeout: 同一个 Key 两次允许执行的最小时间间隔(秒) timeout = 0: 仅限制函数不能同时执行(前一次执行完成后才能开始下一次) timeout > 0: 即使前一次执行已完成,仍需等待设定的时间间隔后才允许再次执行 on_success: 成功执行后的回调函数 on_rejected: 被拒绝执行时的回调函数 reject_result: 被拒绝时返回的默认值 cleanup_threshold: 状态字典的最大缓存数,超过该值将触发垃圾清理,防止内存泄漏 """defdecorator(func):# 状态字典,格式: { "1380000": {"is_running": False, "last_run_time": 1690000000.0} } states = {} lock = threading.Lock()defclean_expired_keys(current_time: float):""" 清理已过期且未运行的 Key,防止内存泄漏""" keys_to_delete = []for k, state in states.items():ifnot state["is_running"] and (current_time - state["last_run_time"] >= timeout): keys_to_delete.append(k)for k in keys_to_delete:del states[k]if inspect.iscoroutinefunction(func): @wraps(func)asyncdefasync_wrapped(*args, **kwargs):# 动态生成本次请求的专属 Keytry: call_key = str(key_func(*args, **kwargs))except Exception as e: logging.warning(f"Key提取错误,使用默认Key: {e}") call_key = "default_key" current_time = time.time() should_reject = False# 状态检查与更新with lock:# 定期清理垃圾内存 (字典超过 cleanup_threshold 个元素时触发清理)iflen(states) > cleanup_threshold: clean_expired_keys(current_time) state = states.get(call_key)if state isNone: state = {"is_running": False, "last_run_time": 0.0} states[call_key] = stateif state["is_running"] or (current_time - state["last_run_time"] < timeout): should_reject = Trueelse: state["is_running"] = True# 拒绝逻辑及回调if should_reject:if on_rejected:try:if inspect.iscoroutinefunction(on_rejected):await on_rejected(*args, **kwargs)else: on_rejected(*args, **kwargs)except Exception as e: logging.error(f"拒绝回调错误: {e}")return reject_result# 核心逻辑及成功回调try: result = await func(*args, **kwargs)if on_success:try:if inspect.iscoroutinefunction(on_success):await on_success(*args, **kwargs)else: on_success(*args, **kwargs)except Exception as e: logging.error(f"成功回调错误: {e}")return resultfinally:# 执行完毕,复位当前 Key 的状态with lock: state = states.get(call_key)if state: state["is_running"] = False state["last_run_time"] = time.time()# 如果 timeout 为 0,说明不需要冷却,用完立刻删除以节省内存if timeout <= 0:del states[call_key]return async_wrappedelse: @wraps(func)defsync_wrapped(*args, **kwargs):try: call_key = str(key_func(*args, **kwargs))except Exception as e: logging.warning(f"Key提取错误,使用默认Key: {e}") call_key = "default_key" current_time = time.time() should_reject = Falsewith lock:iflen(states) > cleanup_threshold: clean_expired_keys(current_time) state = states.get(call_key)if state isNone: state = {"is_running": False, "last_run_time": 0.0} states[call_key] = stateif state["is_running"] or (current_time - state["last_run_time"] < timeout): should_reject = Trueelse: state["is_running"] = Trueif should_reject:if on_rejected:try:if inspect.iscoroutinefunction(on_rejected):raise TypeError("同步函数不允许使用异步的 on_rejected 回调") on_rejected(*args, **kwargs)except Exception as e: logging.error(f"拒绝回调错误: {e}")return reject_resulttry: result = func(*args, **kwargs)if on_success:try:if inspect.iscoroutinefunction(on_success):raise TypeError("同步函数不允许使用异步的 on_success 回调") on_success(*args, **kwargs)except Exception as e: logging.error(f"成功回调错误: {e}")return resultfinally:with lock: state = states.get(call_key)if state: state["is_running"] = False state["last_run_time"] = time.time()if timeout <= 0:del states[call_key]return sync_wrappedreturn decorator
使用示例
asyncdefsms_success(phone, user_id):print(f"[成功回调] 用户 {user_id}-{phone} 任务成功回调")asyncdefsms_rejected(phone, user_id):print(f"[失败回调] {user_id} 重复提交已拦截。")# key_func 用于提取请求的唯一标识。# 这里的 lambda phone, user_id: phone 的意思是:取第一个参数 phone 作为锁的 Key@single_call( key_func=lambda q, user_id: q, timeout=5.0, # 对同一个手机号,冷却 5 秒 on_success=sms_success, on_rejected=sms_rejected, reject_result={"code": 429, "msg": "发送太频繁,请稍后再试"})asyncdefsend_sms(phone, user_id):print(f"[{user_id}] 任务开始...")await asyncio.sleep(1)return {"code": 200, "msg": f"{user_id} 任务成功"}asyncdeftest(): res1 = asyncio.create_task(send_sms("13800000000", user_id="U_001")) res2 = asyncio.create_task(send_sms("13800000000", user_id="U_001")) res3 = asyncio.create_task(send_sms("13800000000", user_id="U_001")) res4 = asyncio.create_task(send_sms("13999999999", user_id="U_002")) results = await asyncio.gather(res1, res2, res3, res4)for i, r inenumerate(results):print(f"返回值 {i + 1}: {r}")if __name__ == "__main__": asyncio.run(test())
在真实的 Web 场景中,提交订单、发送短信防并发,通常会放弃 Python 内置锁,改为使用 Redis 分布式锁。
你的项目中有这样的需求吗?
欢迎在评论区分享你的使用场景!
觉得有帮助?别忘了点赞 + 转发 + 关注! 🙏