导语:面试被问"多线程和多进程的区别"答不上来?写爬虫用了多线程却感觉没快多少?异步协程听着高大上却不知道怎么用?别慌!本文用最通俗的语言+最实用的代码,带你彻底搞懂Python并发编程的三大武器。读完这篇,你不仅能回答面试官的刁钻问题,还能在实际项目中选对方案,让程序飞起来!
假设你要下载100张图片,每张需要2秒。串行执行需要200秒,但如果同时下载10张,可能只需要20秒。这就是并发的威力。
但Python的并发方式不止一种,选错了反而更慢。先看一张对比表:
关键结论:I/O等待多 → 用线程或协程;计算量大 → 用多进程。
Python(CPython)有个"全局解释器锁"(GIL),简单说就是同一时刻只能有一个线程执行Python字节码。所以多线程不能利用多核CPU进行并行计算。
但!是!当线程在等待I/O(网络、文件、数据库)时,GIL会释放,其他线程就能干活了。这就是为什么多线程适合I/O密集型任务。
import threading
import time
import requests
# 待下载的URL列表
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
defdownload(url, index):
"""下载单个网页"""
print(f"🧵 线程{index}开始下载: {url}")
resp = requests.get(url, timeout=10)
print(f"✅ 线程{index}下载完成,状态码: {resp.status_code}")
# ========== 串行执行 ==========
start = time.time()
for i, url in enumerate(urls):
download(url, i)
print(f"\n⏱ 串行耗时: {time.time() - start:.2f}秒")
# ========== 多线程执行 ==========
start = time.time()
threads = []
for i, url in enumerate(urls):
t = threading.Thread(target=download, args=(url, i))
threads.append(t)
t.start() # 启动线程
for t in threads:
t.join() # 等待所有线程完成
print(f"⏱ 多线程耗时: {time.time() - start:.2f}秒")
输出对比:
⏱ 串行耗时: 5.23秒
⏱ 多线程耗时: 1.15秒 ← 快了4.5倍!
手动管理线程太麻烦?用concurrent.futures的线程池:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
defdownload(url):
resp = requests.get(url, timeout=10)
returnf"{url} -> {resp.status_code}"
urls = ["https://httpbin.org/delay/1"] * 10
# 创建线程池,最多5个线程同时工作
with ThreadPoolExecutor(max_workers=5) as pool:
# 提交所有任务
futures = [pool.submit(download, url) for url in urls]
# 按完成顺序获取结果
for future in as_completed(futures):
result = future.result()
print(f"✅ {result}")
print("全部下载完成!")
💡 小贴士:
as_completed按任务完成顺序返回结果,不是提交顺序。如果需要保持顺序,用pool.map()代替。
当你的任务是纯计算(比如图像处理、数据加密、大规模数学运算),多线程因为GIL的存在几乎没用。这时候必须上多进程——每个进程有独立的Python解释器和GIL,真正实现并行。
from multiprocessing import Pool
import time
defheavy_computation(n):
"""模拟CPU密集型任务:计算大量累加"""
total = 0
for i in range(n * 1000000):
total += i
return total
numbers = [100, 200, 300, 400]
# ========== 串行执行 ==========
start = time.time()
results = [heavy_computation(n) for n in numbers]
print(f"⏱ 串行耗时: {time.time() - start:.2f}秒")
# ========== 多进程执行 ==========
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(heavy_computation, numbers)
print(f"⏱ 多进程耗时: {time.time() - start:.2f}秒")
在4核CPU上,多进程版本通常能快3-4倍!
进程之间内存是隔离的,不能像线程那样共享变量。用Queue传递数据:
from multiprocessing import Process, Queue
defproducer(queue):
"""生产者:往队列里放数据"""
for i in range(5):
queue.put(f"数据包-{i}")
print(f"📦 生产: 数据包-{i}")
queue.put(None) # 发送结束信号
defconsumer(queue):
"""消费者:从队列里取数据"""
whileTrue:
item = queue.get()
if item isNone:
break
print(f"📬 消费: {item}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
想象你在餐厅点餐:
协程的核心是在I/O等待时切换到其他任务,用一个线程就能处理成千上万的并发连接。
import asyncio
import aiohttp
import time
asyncdefdownload(session, url, index):
"""异步下载单个网页"""
print(f"⚡ 任务{index}开始: {url}")
asyncwith session.get(url) as resp:
data = await resp.text()
print(f"✅ 任务{index}完成,长度: {len(data)}")
return data
asyncdefmain():
urls = ["https://httpbin.org/delay/1"] * 10
# 限制并发数为5,避免一次性请求太多
semaphore = asyncio.Semaphore(5)
asyncdeflimited_download(session, url, index):
asyncwith semaphore:
returnawait download(session, url, index)
asyncwith aiohttp.ClientSession() as session:
tasks = [
limited_download(session, url, i)
for i, url in enumerate(urls)
]
results = await asyncio.gather(*tasks)
print(f"\n🎉 全部完成!共获取 {len(results)} 个结果")
# 运行异步主函数
start = time.time()
asyncio.run(main())
print(f"⏱ 异步耗时: {time.time() - start:.2f}秒")
10个请求,限制5并发,总耗时约2秒——如果是串行需要10秒!
import asyncio
asyncdefmake_coffee():
print("☕ 开始磨咖啡豆...")
await asyncio.sleep(2) # 模拟等待(I/O操作)
print("☕ 咖啡好了!")
return"美式咖啡"
asyncdefmake_toast():
print("🍞 开始烤面包...")
await asyncio.sleep(1) # 模拟等待
print("🍞 面包好了!")
return"全麦吐司"
asyncdefbreakfast():
"""同时做咖啡和面包"""
# 用gather同时运行多个协程
coffee, toast = await asyncio.gather(
make_coffee(),
make_toast()
)
print(f"🎉 早餐就绪:{coffee} + {toast}")
asyncio.run(breakfast())
# 总耗时2秒,而不是3秒!
# 推荐:异步协程(aiohttp)
import asyncio
import aiohttp
asyncdeffetch_all(urls):
asyncwith aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return [await r.text() ifnot isinstance(r, Exception) else str(r)
for r in responses]
# 推荐:多线程(文件I/O是阻塞的)
from concurrent.futures import ThreadPoolExecutor
defprocess_file(filepath):
with open(filepath, 'r') as f:
data = f.read()
# 处理数据...
returnf"已处理: {filepath}"
files = ["file1.txt", "file2.txt", "file3.txt"]
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(process_file, files))
# 推荐:多进程(CPU密集型)
from multiprocessing import Pool
from PIL import Image
defresize_image(path):
img = Image.open(path)
img.thumbnail((800, 800))
img.save(f"resized_{path}")
return path
images = ["img1.jpg", "img2.jpg", "img3.jpg"]
with Pool() as pool:
pool.map(resize_image, images)
# ❌ 错误示范:多线程修改共享变量
counter = 0
defincrement():
global counter
for _ in range(100000):
counter += 1# 不是原子操作!结果可能不对
# ✅ 正确做法:加锁
import threading
lock = threading.Lock()
defsafe_increment():
global counter
for _ in range(100000):
with lock:
counter += 1
# ❌ 错误:在协程里用time.sleep会阻塞整个事件循环
asyncdefbad_example():
time.sleep(5) # 阻塞!其他协程全卡住
# ✅ 正确:用asyncio.sleep
asyncdefgood_example():
await asyncio.sleep(5) # 非阻塞,其他协程可以运行
if __name__ == "__main__"# Windows上必须加这个,否则会无限递归创建进程!
if __name__ == "__main__":
with Pool(4) as p:
p.map(worker, data)
你的任务是什么?
│
├── 需要等待I/O(网络、文件、数据库)
│ │
│ ├── 并发量 < 100 → 多线程(简单好用)
│ └── 并发量 > 100 → 异步协程(性能更强)
│
└── 需要大量计算(图片处理、数据分析、加密)
│
└── 多进程(充分利用多核CPU)
记住这个口诀:
你在实际项目中用过哪种并发方案?遇到过什么坑?欢迎在评论区分享你的经验!
如果觉得这篇文章有帮助,点赞+在看+转发三连走起,让更多Python开发者看到!👇
#Python并发编程 #多线程 #多进程 #异步协程 #asyncio #Python性能优化 #GIL #Python实战