Python 进阶的核心,是跳出 “能用就行” 的基础编程思维,掌握更高效的语法、更合理的数据结构、更优的算法思路、更工程化的编程范式(函数式 / 面向对象 / 并发),最终写出性能更高、可读性更好、可维护性更强的代码。
本文将从高效语法、内置工具模块、算法基础、函数进阶、面向对象进阶、迭代器 / 生成器、并发编程七大维度,系统梳理 Python 进阶的核心知识点,结合实战案例拆解难点,帮你从 “会写代码” 升级为 “会写好代码”。
prices = {'AAPL': 191.88,'GOOG': 1186.96,'IBM': 149.24,'ORCL': 48.44,'ACN': 166.89,'FB': 208.09,'SYMC': 21.29}# 用股票价格大于100元的股票构造一个新的字典prices2 = {key: value for key, value in prices.items() if value > 100}print(prices2)说明:生成式是 Python 特有的高效语法,可一键生成列表、集合、字典,替代繁琐的
for循环 +append,代码更简洁、执行效率更高。
[list] * n 会让所有元素指向同一个列表对象。names = ['关羽', '张飞', '赵云', '马超', '黄忠']courses = ['语文', '数学', '英语']# 录入五个学生三门课程的成绩# 错误 - 参考http://pythontutor.com/visualize.html#mode=edit# scores = [[None] * len(courses)] * len(names)scores = [[None] * len(courses) for _ inrange(len(names))]for row, name inenumerate(names):for col, course inenumerate(courses): scores[row][col] = float(input(f'请输入{name}的{course}成绩: '))print(scores)Python Tutor - VISUALIZE CODE AND GET LIVE HELPheapq:堆排序找极值heapq模块基于堆结构实现,可高效找出列表中最大 / 最小的 N 个元素(时间复杂度 O (n log k),优于排序后切片的 O (n log n))。"""从列表中找出最大的或最小的N个元素堆结构(大根堆/小根堆)"""import heapqlist1 = [34, 25, 12, 99, 87, 63, 58, 78, 88, 92]list2 = [ {'name': 'IBM', 'shares': 100, 'price': 91.1}, {'name': 'AAPL', 'shares': 50, 'price': 543.22}, {'name': 'FB', 'shares': 200, 'price': 21.09}, {'name': 'HPQ', 'shares': 35, 'price': 31.75}, {'name': 'YHOO', 'shares': 45, 'price': 16.35}, {'name': 'ACME', 'shares': 75, 'price': 115.65}]print(heapq.nlargest(3, list1))print(heapq.nsmallest(3, list1))print(heapq.nlargest(2, list2, key=lambda x: x['price']))print(heapq.nlargest(2, list2, key=lambda x: x['shares']))itertools:迭代器工具集itertools提供了高效的迭代器生成函数,支持排列、组合、笛卡尔积等场景,节省内存(迭代器按需生成元素,不一次性加载)。"""迭代工具模块"""import itertools# 产生ABCD的全排列itertools.permutations('ABCD')# 产生ABCDE的五选三组合itertools.combinations('ABCDE', 3)# 产生ABCD和123的笛卡尔积itertools.product('ABCD', '123')# 产生ABC的无限循环序列itertools.cycle(('A', 'B', 'C'))collections:增强版集合类型collections是 Python 内置的 “数据结构增强包”,解决原生列表 / 字典的不足,常用工具类如下:Counter(words).most_common(3) | ||
Point = namedtuple('Point', ['x','y']) | ||
dq.appendleft(1)dq.popright() | ||
dd = defaultdict(list) | ||
"""找出序列中出现次数最多的元素"""from collections import Counterwords = ['look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes','the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around','the', 'eyes', "don't", 'look', 'around', 'the', 'eyes','look', 'into', 'my', 'eyes', "you're", 'under']counter = Counter(words)print(counter.most_common(3))算法是解决问题的核心,Python 进阶需掌握算法复杂度分析和经典算法思想。
算法的好坏用时间复杂度(执行步骤)和空间复杂度(占用内存)衡量,常用大 O 标记表示 “渐近复杂度”:
2. 经典排序 / 查找算法
(1)排序算法
defmerge_sort(items):"""归并排序(分治思想)"""iflen(items) < 2:return items mid = len(items) // 2 left = merge_sort(items[:mid]) right = merge_sort(items[mid:])# 合并两个有序子列表 result = [] i = j = 0while i < len(left) and j < len(right):if left[i] < right[j]: result.append(left[i]) i += 1else: result.append(right[j]) j += 1 result += left[i:] + right[j:]return result# 测试print(merge_sort([34, 25, 12, 99, 87])) # [12, 25, 34, 87, 99](2)查找算法3. 经典算法思想说明:子列表指的是列表中索引(下标)连续的元素构成的列表;列表中的元素是int类型,可能包含正整数、0、负整数;程序输入列表中的元素,输出子列表元素求和的最大值,例如:
输入:1 -2 3 5 -3 2
输出:8
输入:0 -2 3 5 -1 2
输出:9
输入:-9 -2 -3 -5 -3
输出:-2
defmain(): items = list(map(int, input().split())) overall = partial = items[0]for i inrange(1, len(items)): partial = max(items[i], partial + items[i]) overall = max(partial, overall)print(overall)if __name__ == '__main__': main()说明:这个题目最容易想到的解法是使用二重循环,但是代码的时间性能将会变得非常的糟糕。使用动态规划的思想,仅仅是多用了两个变量,就将原来复杂度的问题变成了。
items1 = list(map(lambda x: x ** 2, filter(lambda x: x % 2, range(1, 10))))items2 = [x ** 2for x inrange(1, 10) if x % 2]lambda函数)defrecord_time(func):"""自定义装饰函数的装饰器""" @wraps(func)defwrapper(*args, **kwargs): start = time() result = func(*args, **kwargs)print(f'{func.__name__}: {time() - start}秒')return resultreturn wrapperfrom functools import wrapsfrom time import timedefrecord(output):"""可以参数化的装饰器"""defdecorate(func): @wraps(func)defwrapper(*args, **kwargs): start = time() result = func(*args, **kwargs) output(func.__name__, time() - start)return resultreturn wrapperreturn decoratefrom functools import wrapsfrom time import timeclassRecord():"""通过定义类的方式定义装饰器"""def__init__(self, output):self.output = outputdef__call__(self, func): @wraps(func)defwrapper(*args, **kwargs): start = time() result = func(*args, **kwargs)self.output(func.__name__, time() - start)return resultreturn wrapper例子:用装饰器来实现单例模式。说明:由于对带装饰功能的函数添加了@wraps装饰器,可以通过
func.__wrapped__方式获得被装饰之前的函数或类来取消装饰器的作用。
from functools import wrapsdefsingleton(cls):"""装饰类的装饰器""" instances = {} @wraps(cls)defwrapper(*args, **kwargs):if cls notin instances: instances[cls] = cls(*args, **kwargs)return instances[cls]return wrapper@singletonclassPresident:"""总统(单例类)"""pass提示:上面的代码中用到了闭包(closure),不知道你是否已经意识到了。还没有一个小问题就是,上面的代码并没有实现线程安全的单例,如果要实现线程安全的单例应该怎么做呢?
from functools import wrapsfrom threading import RLockdefsingleton(cls):"""线程安全的单例装饰器""" instances = {} locker = RLock() @wraps(cls)defwrapper(*args, **kwargs):if cls notin instances:with locker:if cls notin instances: instances[cls] = cls(*args, **kwargs)return instances[cls]return wrapper提示:上面的代码用到了
with上下文语法来进行锁操作,因为锁对象本身就是上下文管理器对象(支持__enter__和__exit__魔术方法)。在wrapper函数中,我们先做了一次不带锁的检查,然后再做带锁的检查,这样做比直接加锁检查性能要更好,如果对象已经创建就没有必须再去加锁而是直接返回该对象就可以了。
"""月薪结算系统 - 部门经理每月15000 程序员每小时200 销售员1800底薪加销售额5%提成"""from abc import ABCMeta, abstractmethodclassEmployee(metaclass=ABCMeta):"""员工(抽象类)"""def__init__(self, name):self.name = name @abstractmethoddefget_salary(self):"""结算月薪(抽象方法)"""passclassManager(Employee):"""部门经理"""defget_salary(self):return15000.0classProgrammer(Employee):"""程序员"""def__init__(self, name, working_hour=0):self.working_hour = working_hoursuper().__init__(name)defget_salary(self):return200.0 * self.working_hourclassSalesman(Employee):"""销售员"""def__init__(self, name, sales=0.0):self.sales = salessuper().__init__(name)defget_salary(self):return1800.0 + self.sales * 0.05classEmployeeFactory:"""创建员工的工厂(工厂模式 - 通过工厂实现对象使用者和对象之间的解耦合)""" @staticmethoddefcreate(emp_type, *args, **kwargs):"""创建员工""" all_emp_types = {'M': Manager, 'P': Programmer, 'S': Salesman} cls = all_emp_types[emp_type.upper()]return cls(*args, **kwargs) if cls elseNonedefmain():"""主函数""" emps = [ EmployeeFactory.create('M', '曹操'), EmployeeFactory.create('P', '荀彧', 120), EmployeeFactory.create('P', '郭嘉', 85), EmployeeFactory.create('S', '典韦', 123000), ]for emp in emps:print(f'{emp.name}: {emp.get_salary():.2f}元')if __name__ == '__main__': main()例子:扑克游戏。
"""经验:符号常量总是优于字面常量,枚举类型是定义符号常量的最佳选择"""from enum import Enum, uniqueimport random@uniqueclassSuite(Enum):"""花色""" SPADE, HEART, CLUB, DIAMOND = range(4)def__lt__(self, other):returnself.value < other.valueclassCard:"""牌"""def__init__(self, suite, face):"""初始化方法"""self.suite = suiteself.face = facedefshow(self):"""显示牌面""" suites = ['♠︎', '♥︎', '♣︎', '♦︎'] faces = ['', 'A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K']returnf'{suites[self.suite.value]}{faces[self.face]}'def__repr__(self):returnself.show()classPoker:"""扑克"""def__init__(self):self.index = 0self.cards = [Card(suite, face)for suite in Suitefor face inrange(1, 14)]defshuffle(self):"""洗牌(随机乱序)""" random.shuffle(self.cards)self.index = 0defdeal(self):"""发牌""" card = self.cards[self.index]self.index += 1return card @propertydefhas_more(self):returnself.index < len(self.cards)classPlayer:"""玩家"""def__init__(self, name):self.name = nameself.cards = []defget_one(self, card):"""摸一张牌"""self.cards.append(card)defsort(self, comp=lambda card: (card.suite, card.face)):"""整理手上的牌"""self.cards.sort(key=comp)defmain():"""主函数""" poker = Poker() poker.shuffle() players = [Player('东邪'), Player('西毒'), Player('南帝'), Player('北丐')]while poker.has_more:for player in players: player.get_one(poker.deal())for player in players: player.sort()print(player.name, end=': ')print(player.cards)if __name__ == '__main__': main()说明:上面的代码中使用了Emoji字符来表示扑克牌的四种花色,在某些不支持Emoji字符的系统上可能无法显示。
typedefstruct _object {/* 引用计数 */int ob_refcnt;/* 对象指针 */struct _typeobject *ob_type;} PyObject;/* 增加引用计数的宏定义 */#define Py_INCREF(op) ((op)->ob_refcnt++)/* 减少引用计数的宏定义 */#define Py_DECREF(op) \ //减少计数if (--(op)->ob_refcnt != 0) \ ; \else \ __Py_Dealloc((PyObject *)(op))导致引用计数+1的情况:导致引用计数-1的情况:
引用计数可能会导致循环引用问题,而循环引用会导致内存泄露,如下面的代码所示。为了解决这个问题,Python中引入了“标记-清除”和“分代收集”。在创建一个对象的时候,对象被放在第一代中,如果在第一代的垃圾检查中对象存活了下来,该对象就会被放到第二代中,同理在第二代的垃圾检查中对象存活下来,该对象就会被放到第三代中。
# 循环引用会导致内存泄露 - Python除了引用技术还引入了标记清理和分代回收# 在Python 3.6以前如果重写__del__魔术方法会导致循环引用处理失效# 如果不想造成循环引用可以使用弱引用list1 = []list2 = [] list1.append(list2)list2.append(list1)以下情况会导致垃圾回收:
如果循环引用中两个对象都定义了__del__方法,gc模块不会销毁这些不可达对象,因为gc模块不知道应该先调用哪个对象的__del__方法,这个问题在Python 3.6中得到了解决。
也可以通过weakref模块构造弱引用的方式来解决循环引用的问题。
gc.collect()gc模块的计数器达到阀值del aa = 24a = 23b = af(a)list1 = [a, a]set中?能去重吗?dict的键?classSetOnceMappingMixin:"""自定义混入类""" __slots__ = ()def__setitem__(self, key, value):if key inself:raise KeyError(str(key) + ' already set')returnsuper().__setitem__(key, value)classSetOnceDict(SetOnceMappingMixin, dict):"""自定义字典"""passmy_dict= SetOnceDict()try: my_dict['username'] = 'jackfrued' my_dict['username'] = 'hellokitty'except KeyError:passprint(my_dict)object,所有的元类都直接或间接的继承自type。例子:用元类实现单例模式。import threadingclassSingletonMeta(type):"""自定义元类"""def__init__(cls, *args, **kwargs): cls.__instance = None cls.__lock = threading.RLock()super().__init__(*args, **kwargs)def__call__(cls, *args, **kwargs):if cls.__instance isNone:with cls.__lock:if cls.__instance isNone: cls.__instance = super().__call__(*args, **kwargs)return cls.__instanceclassPresident(metaclass=SingletonMeta):"""总统(单例类)"""pass__iter__和__next__方法:classFib(object):"""迭代器"""def__init__(self, num):self.num = numself.a, self.b = 0, 1self.idx = 0def__iter__(self):returnselfdef__next__(self):ifself.idx < self.num:self.a, self.b = self.b, self.a + self.bself.idx += 1returnself.araise StopIteration()protocol或interface这样的定义协议的关键字。__iter__和__next__魔术方法就是迭代器协议。deffib(num):"""生成器""" a, b = 0, 1for _ inrange(num): a, b = b, a + byield asend()方法发送数据,发送的数据会成为生成器函数中通过yield表达式获得的值。这样,生成器就可以作为协程使用,协程简单的说就是可以相互协作的子程序。defcalc_avg():"""流式计算平均值""" total, counter = 0, 0 avg_value = NonewhileTrue: value = yield avg_value total, counter = total + value, counter + 1 avg_value = total / countergen = calc_avg()next(gen)print(gen.send(10))print(gen.send(20))print(gen.send(30))Python 实现并发有三种方案,需根据场景选择,核心是解决 GIL(全局解释器锁)问题。
Thread类并辅以Lock、Condition、Event、Semaphore和Barrier。Python中有GIL来防止多个线程同时执行本地字节码,这个锁对于CPython是必须的,因为CPython的内存管理并不是线程安全的,因为GIL的存在多线程并不能发挥CPU的多核特性。"""面试题:进程和线程的区别和联系?进程 - 操作系统分配内存的基本单位 - 一个进程可以包含一个或多个线程线程 - 操作系统分配CPU的基本单位并发编程(concurrent programming)1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行2. 改善用户体验 - 让耗时间的操作不会造成程序的假死"""import globimport osimport threadingfrom PIL import ImagePREFIX = 'thumbnails'defgenerate_thumbnail(infile, size, format='PNG'):"""生成指定图片文件的缩略图""" file, ext = os.path.splitext(infile) file = file[file.rfind('/') + 1:] outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}' img = Image.open(infile) img.thumbnail(size, Image.ANTIALIAS) img.save(outfile, format)defmain():"""主函数"""ifnot os.path.exists(PREFIX): os.mkdir(PREFIX)for infile in glob.glob('images/*.png'):for size in (32, 64, 128):# 创建并启动线程 threading.Thread( target=generate_thumbnail, args=(infile, (size, size)) ).start()if __name__ == '__main__': main()多个线程竞争资源的情况。"""多线程程序如果没有竞争资源处理起来通常也比较简单当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱说明:临界资源就是被多个线程竞争的资源"""import timeimport threadingfrom concurrent.futures import ThreadPoolExecutorclassAccount(object):"""银行账户"""def__init__(self):self.balance = 0.0self.lock = threading.Lock()defdeposit(self, money):# 通过锁保护临界资源withself.lock: new_balance = self.balance + money time.sleep(0.001)self.balance = new_balancedefmain():"""主函数""" account = Account()# 创建线程池 pool = ThreadPoolExecutor(max_workers=10) futures = []for _ inrange(100): future = pool.submit(account.deposit, 1) futures.append(future)# 关闭线程池 pool.shutdown()for future in futures: future.result()print(account.balance)if __name__ == '__main__': main()修改上面的程序,启动5个线程向账户中存钱,5个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading模块的Condition来实现线程调度,该对象也是基于锁来创建的,代码如下所示:"""多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition"""from concurrent.futures import ThreadPoolExecutorfrom random import randintfrom time import sleepimport threadingclassAccount:"""银行账户"""def__init__(self, balance=0):self.balance = balance lock = threading.RLock()self.condition = threading.Condition(lock)defwithdraw(self, money):"""取钱"""withself.condition:while money > self.balance:self.condition.wait() new_balance = self.balance - money sleep(0.001)self.balance = new_balancedefdeposit(self, money):"""存钱"""withself.condition: new_balance = self.balance + money sleep(0.001)self.balance = new_balanceself.condition.notify_all()defadd_money(account):whileTrue: money = randint(5, 10) account.deposit(money)print(threading.current_thread().name, ':', money, '====>', account.balance) sleep(0.5)defsub_money(account):whileTrue: money = randint(10, 30) account.withdraw(money)print(threading.current_thread().name, ':', money, '<====', account.balance) sleep(1)defmain(): account = Account()with ThreadPoolExecutor(max_workers=15) as pool:for _ inrange(5): pool.submit(add_money, account)for _ inrange(10): pool.submit(sub_money, account)if __name__ == '__main__': main()Process,其他辅助的类跟threading模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing模块中有一个Queue类,它基于管道和锁机制提供了多个进程共享的队列。下面是官方文档上关于多进程和进程池的一个示例。"""多进程和进程池的使用多线程因为GIL的存在不能够发挥CPU的多核特性对于计算密集型任务应该考虑使用多进程time python3 example22.pyreal 0m11.512suser 0m39.319ssys 0m0.169s使用多进程后实际执行时间为11.512秒,而用户时间39.319秒约为实际执行时间的4倍这就证明我们的程序通过多进程使用了CPU的多核特性,而且这台计算机配置了4核的CPU"""import concurrent.futuresimport mathPRIMES = [1116281,1297337,104395303,472882027,533000389,817504243,982451653,112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419] * 5defis_prime(n):"""判断素数"""if n % 2 == 0:returnFalse sqrt_n = int(math.floor(math.sqrt(n)))for i inrange(3, sqrt_n + 1, 2):if n % i == 0:returnFalsereturnTruedefmain():"""主函数"""with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime inzip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__': main()重点:多线程和多进程的比较。
以下情况需要使用多线程:
以下情况需要使用多进程:
future对象来获取任务执行的结果。Python 3 通过asyncio模块和await和async关键字(在 Python 3.7 中正式被列为关键字)来支持异步处理。"""异步I/O - async / await"""import asynciodefnum_generator(m, n):"""指定范围的数字生成器"""yieldfromrange(m, n + 1)asyncdefprime_filter(m, n):"""素数过滤器""" primes = []for i in num_generator(m, n): flag = Truefor j inrange(2, int(i ** 0.5 + 1)):if i % j == 0: flag = Falsebreakif flag:print('Prime =>', i) primes.append(i)await asyncio.sleep(0.001)returntuple(primes)asyncdefsquare_mapper(m, n):"""平方映射器""" squares = []for i in num_generator(m, n):print('Square =>', i * i) squares.append(i * i)await asyncio.sleep(0.001)return squaresdefmain():"""主函数""" loop = asyncio.get_event_loop() future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100)) future.add_done_callback(lambda x: print(x.result())) loop.run_until_complete(future) loop.close()if __name__ == '__main__': main()说明:上面的代码使用
get_event_loop函数获得系统默认的事件循环,通过gather函数可以获得一个future对象,future对象的add_done_callback可以添加执行完成时的回调函数,loop对象的run_until_complete方法可以等待通过future对象获得协程执行结果。
import asyncioimport reimport aiohttpPATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')asyncdeffetch_page(session, url):asyncwith session.get(url, ssl=False) as resp:returnawait resp.text()asyncdefshow_title(url):asyncwith aiohttp.ClientSession() as session: html = await fetch_page(session, url)print(PATTERN.search(html).group('title'))defmain(): urls = ('https://www.python.org/','https://git-scm.com/','https://www.jd.com/','https://www.taobao.com/','https://www.douban.com/') loop = asyncio.get_event_loop() cos = [show_title(url) for url in urls] loop.run_until_complete(asyncio.wait(cos)) loop.close()if __name__ == '__main__': main()重点:异步I/O与多进程的比较。
当程序不需要真正的并发性或并行性,而是更多的依赖于异步处理和回调时,
asyncio就是一种很好的选择。如果程序中有大量的等待与休眠时,也应该考虑asyncio,它很适合编写没有实时数据处理需求的 Web 应用服务器。
Python 进阶的核心是从 “实现功能” 到 “优化实现”,关键知识点可总结为:
国内直接使用顶级AI工具
谷歌浏览器访问:https://www.nezhasoft.cloud/r/vMPJZr
