配套专栏:Python 全栈修炼之路 第 07 篇《函数 —— 代码复用的基石》
难度分布:⭐ → ⭐⭐ → ⭐⭐ → ⭐⭐⭐ → ⭐⭐⭐ → ⭐⭐⭐⭐
核心覆盖:*args/**kwargs、闭包、装饰器、递归、lru_cache、高阶函数、LEGB 作用域、延迟绑定
题目一:通用函数调用器 ⭐
📌 题目描述
编写函数 call_function(func, args=None, kwargs=None),实现一个通用的函数调用器:
def add(a, b, c=0): return a + b + c# 位置参数调用call_function(add, args=[1, 2]) # 3call_function(add, args=[1, 2, 3]) # 6# 关键字参数调用call_function(add, kwargs={"a": 1, "b": 2}) # 3# 混合调用call_function(add, args=[1], kwargs={"b": 2, "c": 10}) # 13
💡 编程思路
这道题考察 *args 和 **kwargs 的解包与转发:
- 函数需要接收一个可调用对象
func,以及可选的位置参数列表 args 和关键字参数字典 kwargs。 - 使用
* 解包位置参数列表,** 解包关键字参数字典。 - 通过
func(*args, **kwargs) 实现动态调用。
关键点:None 的默认值处理 —— 当 args 或 kwargs 为 None 时,应替换为空列表/空字典。
🖥️ 参考代码
def call_function(func, args=None, kwargs=None): """ 通用函数调用器 参数: func: 可调用对象 args: 位置参数列表 kwargs: 关键字参数字典 """ args = args or [] kwargs = kwargs or {} return func(*args, **kwargs)def call_function_safe(func, args=None, kwargs=None): """安全版本:捕获异常并返回友好错误信息""" args = args or [] kwargs = kwargs or {} try: return func(*args, **kwargs) except TypeError as e: return f"调用失败: {e}"# 测试if __name__ == "__main__": def add(a, b, c=0): return a + b + c def greet(name, greeting="你好"): return f"{greeting}, {name}!" def var_args(*args, **kwargs): return f"args={args}, kwargs={kwargs}" # 位置参数 print(call_function(add, args=[1, 2])) # 3 print(call_function(add, args=[1, 2, 3])) # 6 # 关键字参数 print(call_function(add, kwargs={"a": 1, "b": 2})) # 3 print(call_function(greet, kwargs={"name": "张三"})) # 你好, 张三! # 混合调用 print(call_function(add, args=[1], kwargs={"b": 2, "c": 10})) # 13 # 可变参数函数 print(call_function(var_args, args=[1, 2, 3], kwargs={"x": 99})) # args=(1, 2, 3), kwargs={'x': 99} # 错误处理 print(call_function_safe(add, args=[1])) # 缺少参数 print(call_function_safe(add, args=[1, 2, 3, 4])) # 参数过多
🔗 关联知识点
| |
|---|
*args | |
**kwargs | |
or [] | None |
TypeError | |
题目二:闭包实现带历史记录的计算器 ⭐⭐
📌 题目描述
使用闭包实现一个带历史记录的计算器工厂 make_calculator(),每次调用返回独立的计算器实例:
calc1 = make_calculator()calc1.add(5)calc1.add(3)calc1.subtract(1)print(calc1.result()) # 7print(calc1.history()) # ['+5', '+3', '-1']calc2 = make_calculator()calc2.multiply(4)calc2.add(6)print(calc2.result()) # 4 (独立的实例)
💡 编程思路
这道题考察 闭包的状态保持能力:
make_calculator() 内部维护 total 和 history 两个自由变量。- 返回的内部函数通过
nonlocal 修改这些变量。 - 每次调用
make_calculator() 都会创建独立的闭包实例,互不干扰。
关键点:闭包通过 __closure__ 捕获外部变量,即使外部函数已返回,变量仍存活。
🖥️ 参考代码
def make_calculator(initial=0): """ 创建带历史记录的计算器闭包 返回: 包含 add/subtract/multiply/divide/result/history/clear 方法的对象 """ total = initial history = [] def _update(value, symbol): nonlocal total total = value history.append(f"{symbol}{value}") def add(n): _update(total + n, f"+{n}") return total def subtract(n): _update(total - n, f"-{n}") return total def multiply(n): _update(total * n, f"*{n}") return total def divide(n): if n == 0: raise ZeroDivisionError("除数不能为零") _update(total / n, f"/{n}") return total def result(): return total def get_history(): return history.copy() def clear(): nonlocal total total = initial history.clear() def undo(): """撤销上一步操作(重新计算历史)""" nonlocal total if not history: return history.pop() total = initial # 重新执行所有历史操作 for record in history: op = record[0] val = float(record[1:]) if op == '+': total += val elif op == '-': total -= val elif op == '*': total *= val elif op == '/': total /= val return type('Calculator', (), { 'add': add, 'subtract': subtract, 'multiply': multiply, 'divide': divide, 'result': result, 'history': get_history, 'clear': clear, 'undo': undo })()# 测试if __name__ == "__main__": calc1 = make_calculator(10) calc1.add(5) # 15 calc1.multiply(2) # 30 calc1.subtract(3) # 27 print(f"calc1 结果: {calc1.result()}") # 27 print(f"calc1 历史: {calc1.history()}") # ['+5', '*2', '-3'] # 撤销 calc1.undo() print(f"撤销后: {calc1.result()}") # 30 print(f"历史: {calc1.history()}") # ['+5', '*2'] # 独立实例 calc2 = make_calculator() calc2.add(100) print(f"calc2 结果: {calc2.result()}") # 100(独立) # 验证闭包独立性 calc3 = make_calculator(0) calc3.add(1) calc3.add(1) print(f"calc3: {calc3.result()}") # 2 print(f"calc1: {calc1.result()}") # 30(不受影响) # 查看闭包内部 print(f"\n闭包变量: {calc1.add.__closure__}")
🔗 关联知识点
题目三:实现 retry 重试装饰器 ⭐⭐
📌 题目描述
编写一个带参数的装饰器 retry(max_attempts, delay, exceptions),在函数调用失败时自动重试:
import random@retry(max_attempts=3, delay=1, exceptions=(ValueError,))def unstable_api(): """模拟不稳定的 API 调用""" if random.random() < 0.7: raise ValueError("服务暂时不可用") return "success"result = unstable_api()# 如果前两次失败、第三次成功,则返回 "success"# 如果三次都失败,则抛出最后一个异常
💡 编程思路
这道题考察 带参数装饰器的三层嵌套结构:
- 最外层
retry(max_attempts, delay, exceptions) 接收装饰器参数。 - 中间层
decorator(func) 接收被装饰的函数。 - 最内层
wrapper(*args, **kwargs) 实现重试逻辑。
关键点:
- 使用
functools.wraps(func) 保留原函数元数据。 - 使用
time.sleep(delay) 控制重试间隔。
🖥️ 参考代码
import functoolsimport timedef retry(max_attempts=3, delay=0, exceptions=(Exception,)): """ 重试装饰器 参数: max_attempts: 最大尝试次数(含首次调用) delay: 每次重试的间隔秒数 exceptions: 需要重试的异常类型元组 """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except exceptions as e: last_exception = e if attempt < max_attempts: wait = delay * attempt # 递增等待 print(f" 第 {attempt} 次失败: {e},{wait}s 后重试...") time.sleep(wait) else: print(f" 第 {attempt} 次仍失败,放弃重试") raise last_exception return wrapper return decoratordef retry_with_backoff(max_attempts=3, base_delay=1, backoff_factor=2, exceptions=(Exception,), max_delay=60): """ 指数退避重试装饰器(进阶版) 每次等待时间: base_delay * backoff_factor^(attempt-1) """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except exceptions as e: last_exception = e if attempt < max_attempts: wait = min(base_delay * (backoff_factor ** (attempt - 1)), max_delay) print(f" 第 {attempt} 次失败: {e},{wait:.1f}s 后重试...") time.sleep(wait) raise last_exception return wrapper return decorator# 测试if __name__ == "__main__": import random call_count = 0 @retry(max_attempts=3, delay=0.1, exceptions=(ValueError,)) def unstable_api(): """模拟不稳定的 API(第3次调用成功)""" global call_count call_count += 1 if call_count < 3: raise ValueError("服务暂时不可用") return "success" result = unstable_api() print(f"结果: {result}") # success print(f"总调用次数: {call_count}") # 3 # 测试全部失败 call_count = 0 @retry(max_attempts=2, delay=0.1, exceptions=(ValueError,)) def always_fail(): global call_count call_count += 1 raise ValueError("永远失败") try: always_fail() except ValueError as e: print(f"最终异常: {e}") print(f"总调用次数: {call_count}") # 2 # 测试异常类型过滤 @retry(max_attempts=3, delay=0.1, exceptions=(ValueError,)) def type_error_api(): raise TypeError("类型错误,不应被重试") try: type_error_api() except TypeError as e: print(f"非目标异常直接抛出: {e}")
🔗 关联知识点
题目四:手写 LRU 缓存装饰器 ⭐⭐⭐
📌 题目描述
不使用 functools.lru_cache,手写一个带大小限制的 LRU 缓存装饰器 lru_cache(maxsize=128):
@lru_cache(maxsize=3)def expensive_compute(n): print(f" 计算 {n}...") return n * nexpensive_compute(1) # 计算 1...expensive_compute(2) # 计算 2...expensive_compute(1) # 命中缓存,不打印expensive_compute(3) # 计算 3...expensive_compute(4) # 计算 4...(此时缓存已满,淘汰最久未使用的 2)expensive_compute(2) # 计算 2...(已被淘汰,需重新计算)
💡 编程思路
这道题直击第7篇的 lru_cache 底层原理:
- 使用
OrderedDict 存储缓存(键为参数,值为结果),利用其 O(1) 的 move_to_end() 和 popitem(last=False)。 - 缓存未命中时,计算结果并插入。如果超出
maxsize,弹出最久未使用的(头部)。 - 提供
cache_info() 和 cache_clear() 方法。
难点:装饰器需要给被装饰函数动态添加方法(cache_info、cache_clear),使用 functools.update_wrapper + 直接赋值属性。
🖥️ 参考代码
import functoolsfrom collections import OrderedDictdef lru_cache(maxsize=128): """ 手写 LRU 缓存装饰器 参数: maxsize: 最大缓存条目数,None 表示无限制 """ def decorator(func): cache = OrderedDict() hits = misses = 0 @functools.wraps(func) def wrapper(*args): nonlocal hits, misses # 缓存命中 if args in cache: hits += 1 cache.move_to_end(args) # 移到末尾(最近使用) return cache[args] # 缓存未命中 misses += 1 result = func(*args) cache[args] = result # 淘汰最久未使用的 if maxsize is not None and len(cache) > maxsize: cache.popitem(last=False) return result def cache_info(): """返回缓存统计信息""" return { "hits": hits, "misses": misses, "maxsize": maxsize, "currsize": len(cache) } def cache_clear(): """清空缓存""" nonlocal hits, misses cache.clear() hits = misses = 0 # 将方法绑定到 wrapper 上 wrapper.cache_info = cache_info wrapper.cache_clear = cache_clear return wrapper return decorator# 测试if __name__ == "__main__": compute_count = 0 @lru_cache(maxsize=3) def expensive_compute(n): global compute_count compute_count += 1 print(f" 计算 expensive_compute({n})...") return n * n print("=== 基础测试 ===") print(expensive_compute(1)) # 计算并缓存 print(expensive_compute(2)) # 计算并缓存 print(expensive_compute(3)) # 计算并缓存(缓存满) print(expensive_compute(1)) # 命中缓存(1 移到末尾) print(expensive_compute(4)) # 计算并缓存(淘汰 2,因为 2 最久未使用) print(expensive_compute(2)) # 未命中,重新计算 print(f"总计算次数: {compute_count}") # 5(1,2,3,4,2) print(f"缓存信息: {expensive_compute.cache_info()}") # 清空缓存 expensive_compute.cache_clear() print(f"清空后: {expensive_compute.cache_info()}") # 与 functools.lru_cache 对比 print("\n=== 与标准库对比 ===") from functools import lru_cache as std_lru_cache std_count = 0 @std_lru_cache(maxsize=3) def std_compute(n): global std_count std_count += 1 return n * n std_compute(1) std_compute(2) std_compute(3) std_compute(1) # 命中 std_compute(4) # 淘汰 2 std_compute(2) # 重新计算 print(f"标准库计算次数: {std_count}") # 5 print(f"标准库缓存信息: {std_compute.cache_info()}")
🔗 关联知识点
| |
|---|
OrderedDict | |
popitem(last=False) | |
nonlocal | |
| wrapper.cache_info = cache_info |
functools.wraps | |
题目五:函数管道(链式调用) ⭐⭐⭐
📌 题目描述
实现一个函数管道系统,支持将多个函数串联执行,前一个函数的输出作为后一个函数的输入:
# 方式一:使用 pipe 函数result = pipe( [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], filter(lambda x: x % 2 == 0), # [2, 4, 6, 8, 10] map(lambda x: x ** 2), # [4, 16, 36, 64, 100] lambda x: sorted(x, reverse=True), # [100, 64, 36, 16, 4] lambda x: sum(x) # 220)print(result) # 220# 方式二:使用 | 运算符(重载)result = ( Pipeline([1, 2, 3, 4, 5]) | filter(lambda x: x % 2 != 0) | map(lambda x: x ** 2) | list)print(result) # [1, 9, 25]
💡 编程思路
这道题考察 高阶函数与 Python 运算符重载:
- 方式一(
pipe 函数):接收初始值和一系列函数,依次将值传入每个函数。使用 reduce 或简单的 for 循环实现。 - 方式二(
Pipeline 类):通过重载 __or__ 运算符实现 | 语法。Pipeline 对象持有当前值,每次 | 返回新的 Pipeline 对象。
关键点:map 和 filter 返回的是迭代器,在管道中需要自动处理迭代器到可迭代对象的转换。
🖥️ 参考代码
from functools import reducedef pipe(data, *functions): """ 函数管道:将数据依次通过一系列函数处理 参数: data: 初始数据 *functions: 一系列处理函数 返回: 最终处理结果 """ return reduce(lambda val, func: func(val), functions, data)class Pipeline: """支持 | 运算符的函数管道""" def __init__(self, data): self.data = data def __or__(self, func): """重载 | 运算符""" result = func(self.data) return Pipeline(result) def __repr__(self): return f"Pipeline({self.data!r})" def collect(self): """获取最终结果""" return self.datadef compose(*functions): """ 函数组合:从右到左执行 (f ∘ g)(x) = f(g(x)) 与 pipe 的区别:compose 返回一个新函数,而不是立即执行 """ def composed(x): return reduce(lambda val, func: func(val), reversed(functions), x) return composed# 测试if __name__ == "__main__": # === pipe 函数测试 === print("=== pipe 函数 ===") result1 = pipe( [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], filter(lambda x: x % 2 == 0), map(lambda x: x ** 2), list, sorted, sum ) print(f"偶数平方和: {result1}") # 220 result2 = pipe( "Hello World Python Programming", str.split, map(len), list, max ) print(f"最长单词长度: {result2}") # 11 (Programming) result3 = pipe( [3, 1, 4, 1, 5, 9, 2, 6], sorted, lambda x: x[-3:], # 取最大的三个 sum ) print(f"最大的三个数之和: {result3}") # 17 # === Pipeline 类测试 === print("\n=== Pipeline 类 ===") result4 = ( Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) | filter(lambda x: x % 3 == 0) | map(lambda x: x ** 2) | list ) print(f"3的倍数的平方: {result4.collect()}") # [9, 36, 81] # === compose 函数组合测试 === print("\n=== compose 函数组合 ===") # (f ∘ g ∘ h)(x) = f(g(h(x))) transform = compose( lambda x: x.upper(), # 最后执行 lambda x: x.replace(' ', '_'), # 中间执行 str.strip # 最先执行 ) print(transform(" hello world ")) # "HELLO_WORLD" # 数值处理管道 process = compose( lambda nums: sum(nums) / len(nums), # 求平均 lambda nums: nums[1:], # 去掉最小值 sorted ) print(f"去掉最小值后的平均: {process([1, 2, 3, 4, 5, 100])}") # (2+3+4+5+100)/5 = 22.8
🔗 关联知识点
| |
|---|
reduce | |
__or__ | |
| |
compose | f(g(x)) |
| map |
题目六:递归解决汉诺塔(含可视化) ⭐⭐⭐⭐
📌 题目描述
实现汉诺塔问题的递归解法,要求:
hanoi(n=3, source='A', target='C', auxiliary='B')输出:第 1 步: A → C (移动盘子 1)第 2 步: A → B (移动盘子 2)第 3 步: C → B (移动盘子 1)第 4 步: A → C (移动盘子 3)第 5 步: B → A (移动盘子 1)第 6 步: B → C (移动盘子 2)第 7 步: A → C (移动盘子 1)总步数: 7 (2^3 - 1)
💡 编程思路
汉诺塔是递归思维的经典训练题:
递归策略:
- 将上面 n-1 个盘子从 source 移到 auxiliary(借助 target)
- 将 n-1 个盘子从 auxiliary 移到 target(借助 source)
终止条件:n = 1 时,直接移动。
可视化:用三个列表表示三根柱子,每次移动后打印当前状态。
🖥️ 参考代码
def hanoi(n, source='A', target='C', auxiliary='B', verbose=True): """ 汉诺塔递归解法 参数: n: 盘子数量 source: 起始柱 target: 目标柱 auxiliary: 辅助柱 verbose: 是否打印步骤 """ steps = [] def _move(n, src, tgt, aux): if n == 1: steps.append((src, tgt)) if verbose: print(f" 移动盘子 1: {src} → {tgt}") else: _move(n - 1, src, aux, tgt) steps.append((src, tgt)) if verbose: print(f" 移动盘子 {n}: {src} → {tgt}") _move(n - 1, aux, tgt, src) if verbose: print(f"=== 汉诺塔 (n={n}) ===") _move(n, source, target, auxiliary) if verbose: print(f"总步数: {len(steps)} (2^{n} - 1 = {2**n - 1})") return stepsdef hanoi_visualize(n): """可视化汉诺塔的每一步""" # 初始化:所有盘子在 A 柱(大盘在下) towers = { 'A': list(range(n, 0, -1)), 'B': [], 'C': [] } def _print_towers(): max_height = n for i in range(max_height, 0, -1): row = "" for pole in ['A', 'B', 'C']: if i <= len(towers[pole]): disk = towers[pole][i - 1] row += f" {'█' * disk:^{2 * n + 1}}" else: row += f" {'|':^{2 * n + 1}}" print(row) print(" " + " ".join(f"{pole:^{2 * n + 1}}" for pole in ['A', 'B', 'C'])) print() steps = hanoi(n, verbose=False) print(f"=== 汉诺塔可视化 (n={n}) ===\n") _print_towers() for i, (src, tgt) in enumerate(steps, 1): disk = towers[src].pop() towers[tgt].append(disk) print(f"第 {i} 步: {src} → {tgt} (盘子 {disk})") _print_towers() print(f"完成!总步数: {len(steps)}")def hanoi_iterative(n): """ 汉诺塔的迭代解法(进阶) 规律: - 奇数步:移动最小的盘子(按 A→C→B→A 循环方向) - 偶数步:移动唯一合法的非最小盘子 """ if n == 0: return [] towers = { 'A': list(range(n, 0, -1)), 'B': [], 'C': [] } # 移动方向取决于 n 的奇偶性 poles = ['A', 'B', 'C'] if n % 2 == 0: poles = ['A', 'C', 'B'] # 偶数时方向反转 steps = [] total = 2 ** n - 1 for move_num in range(1, total + 1): if move_num % 2 == 1: # 奇数步:移动最小盘子 # 找到最小盘子所在柱子 src = None for p in poles: if towers[p] and towers[p][-1] == 1: src = p break # 按循环方向移动 src_idx = poles.index(src) tgt = poles[(src_idx + 1) % 3] else: # 偶数步:移动唯一合法的非最小盘子 src = tgt = None for p in poles: for q in poles: if p != q and towers[p] and towers[q]: if towers[p][-1] < towers[q][-1]: src, tgt = p, q break if src: break disk = towers[src].pop() towers[tgt].append(disk) steps.append((src, tgt)) return steps# 测试if __name__ == "__main__": # 基础递归 print("=== n = 3 ===") steps = hanoi(3) assert len(steps) == 7 assert steps == [('A', 'C'), ('A', 'B'), ('C', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('A', 'C')] # 可视化 print("\n=== 可视化 n = 3 ===") hanoi_visualize(3) # 验证迭代版本与递归版本一致 for n in range(1, 6): recursive_steps = hanoi(n, verbose=False) iterative_steps = hanoi_iterative(n) assert len(recursive_steps) == len(iterative_steps) == 2**n - 1 print(f"n={n}: 递归 {len(recursive_steps)} 步, 迭代 {len(iterative_steps)} 步 ✓") # 性能测试 import timeit n = 20 t = timeit.timeit(lambda: hanoi(n, verbose=False), number=10) print(f"\nn={n} 递归耗时: {t:.4f}s (共 {2**n - 1} 步)")
🔗 关联知识点
总结:知识点覆盖矩阵
| | |
|---|
| | *args |
| | |
| | |
| | OrderedDict |
| | 高阶函数、reduce、运算符重载、compose |
| | |
建议:按顺序从第 1 题做到第 6 题。第 1-2 题巩固参数和闭包基础,第 3-4 题深入装饰器原理,第 5 题体会函数式编程思想,第 6 题是递归思维的终极训练。完成全部 6 题后,你对 Python 函数的理解将达到新的高度。
延伸阅读
- LeetCode 509. 斐波那契数(递归 + 记忆化)
- LeetCode 206. 反转链表(递归 vs 迭代)
- LeetCode 394. 字符串解码(递归嵌套)
本文是《Python 全栈修炼之路》第 07 篇配套练习,欢迎点赞收藏!