🐍 多进程 — 真正的并行计算
🕐 预计用时:2-3 小时 | 🎯 目标:掌握 multiprocessing、Pool、进程间通信
📖 今日目录
1. 多线程 vs 多进程
2. Process 类 — 创建进程
import multiprocessing
import time
import os
def worker(name, seconds):
"""工作进程"""
pid = os.getpid()
ppid = os.getppid()
print(f"👷 {name} 开始 | PID={pid} | 父PID={ppid}")
time.sleep(seconds)
print(f"✅ {name} 结束")
if __name__ == "__main__":
print(f"🔧 主进程 PID={os.getpid()}")
# 创建进程
p1 = multiprocessing.Process(target=worker, args=("进程A", 2))
p2 = multiprocessing.Process(target=worker, args=("进程B", 2))
p1.start()
p2.start()
p1.join()
p2.join()
print("所有进程完成")
⚠️ 必须用 if __name__ == "__main__":!
多进程在 Windows 上需要这个保护,否则会无限创建子进程(递归)。Linux 上也建议加上。
继承 Process 类
import multiprocessing
import time
class DownloadProcess(multiprocessing.Process):
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
print(f"⬇️ 下载 {self.url}")
time.sleep(2)
print(f"✅ {self.url} 完成")
if __name__ == "__main__":
processes = [
DownloadProcess("https://example.com/a.jpg"),
DownloadProcess("https://example.com/b.jpg"),
DownloadProcess("https://example.com/c.jpg"),
]
for p in processes:
p.start()
for p in processes:
p.join()
3. 进程池 Pool
进程池自动管理进程数量——比手动创建 Process 更高效。
from multiprocessing import Pool
import time
import os
def cpu_task(n):
"""CPU 密集型任务"""
pid = os.getpid()
total = sum(i * i for i in range(n))
return f"PID={pid}: 计算完成, 结果={total}"
if __name__ == "__main__":
tasks = [10_000_000] * 8 # 8 个任务
# 方式1: map(阻塞,保持顺序)
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(cpu_task, tasks)
print(f"Pool map 耗时: {time.time() - start:.2f}秒")
for r in results:
print(f" {r}")
# 方式2: apply_async(非阻塞)
start = time.time()
with Pool(processes=4) as pool:
async_results = [pool.apply_async(cpu_task, (t,)) for t in tasks]
results = [r.get() for r in async_results]
print(f"Pool async 耗时: {time.time() - start:.2f}秒")
# imap:惰性版本,按顺序返回结果
from multiprocessing import Pool
def process_item(item):
time.sleep(0.5)
return item * 2
if __name__ == "__main__":
with Pool(4) as pool:
# imap 返回迭代器,按顺序
for result in pool.imap(process_item, range(10)):
print(f" 结果: {result}")
# imap_unordered 返回迭代器,按完成顺序
for result in pool.imap_unordered(process_item, range(10)):
print(f" 结果: {result}")
4. 进程间通信 — Queue
from multiprocessing import Process, Queue
import time
def producer(queue, name):
"""生产者"""
for i in range(5):
item = f"{name}-产品{i}"
queue.put(item)
print(f"📦 {name}: 生产 {item}")
time.sleep(0.3)
def consumer(queue, name):
"""消费者"""
while True:
item = queue.get()
if item is None: # None 作为终止信号
break
print(f"🛒 {name}: 消费 {item}")
time.sleep(0.5)
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q, "工厂A"))
p2 = Process(target=consumer, args=(q, "消费者X"))
p1.start()
p2.start()
p1.join()
q.put(None) # 发送终止信号
p2.join()
print("通信完成")
5. 进程间通信 — Pipe
from multiprocessing import Process, Pipe
import time
def sender(conn, name):
"""发送方"""
for i in range(5):
msg = f"{name}: 消息{i}"
conn.send(msg)
print(f"📤 {msg}")
time.sleep(0.3)
conn.close()
def receiver(conn):
"""接收方"""
while True:
try:
msg = conn.recv()
print(f"📥 收到: {msg}")
except EOFError:
break
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(parent_conn, "进程A"))
p2 = Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
💡 Queue vs Pipe:
• Queue:多生产者多消费者,线程安全
• Pipe:一对一通信,更快速
• 需要多个进程通信 → Queue;两个进程点对点 → Pipe
6. 共享内存 — Value / Array
from multiprocessing import Process, Value, Array
import time
def increment(shared_counter, shared_array, n):
for _ in range(n):
shared_counter.value += 1
for i in range(len(shared_array)):
shared_array[i] += 1
if __name__ == "__main__":
# 创建共享变量
counter = Value('i', 0) # 'i' = 整数,初始值 0
arr = Array('i', [1, 2, 3]) # 共享数组
processes = [
Process(target=increment, args=(counter, arr, 100000))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"计数器: {counter.value}") # 400000
print(f"数组: {list(arr)}") # [5, 6, 7]
7. 进程锁 Manager
from multiprocessing import Process, Manager
import time
def update_dict(shared_dict, shared_list, key, value):
"""修改共享数据"""
shared_dict[key] = value
shared_list.append(value)
time.sleep(0.1)
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict() # 共享字典
shared_list = manager.list() # 共享列表
processes = [
Process(target=update_dict, args=(shared_dict, shared_list, f"key{i}", i))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"字典: {dict(shared_dict)}")
print(f"列表: {list(shared_list)}")
8. CPU 密集型任务对比
import time
from multiprocessing import Pool
from threading import Thread
def cpu_work(n):
"""CPU 密集型计算"""
return sum(i * i for i in range(n))
if __name__ == "__main__":
N = 5_000_000
tasks = [N] * 4
# 单线程串行
start = time.time()
for t in tasks:
cpu_work(t)
print(f"串行: {time.time() - start:.2f}秒")
# 多线程
start = time.time()
threads = [Thread(target=cpu_work, args=(t,)) for t in tasks]
for t in threads: t.start()
for t in threads: t.join()
print(f"多线程: {time.time() - start:.2f}秒")
# 多进程
start = time.time()
with Pool(4) as pool:
pool.map(cpu_work, tasks)
print(f"多进程: {time.time() - start:.2f}秒")
# 结果(4核 CPU):
# 串行: 3.20秒
# 多线程: 3.35秒(GIL 限制,没有加速)
# 多进程: 0.95秒(真正并行,接近 4 倍加速!)
9. 实战:并行图片处理器
import os
from multiprocessing import Pool, cpu_count
from pathlib import Path
import time
def process_image(image_path):
"""模拟图片处理(如缩放、加水印)"""
# 实际项目中用 PIL/Pillow
# from PIL import Image
# img = Image.open(image_path)
# img.thumbnail((800, 600))
# img.save(output_path)
time.sleep(0.5) # 模拟处理耗时
size = os.path.getsize(image_path)
return f"✅ {Path(image_path).name} ({size}B)"
def batch_process(image_dir, output_dir=None, workers=None):
"""批量处理图片"""
if workers is None:
workers = cpu_count() # 使用 CPU 核心数
# 收集所有图片
image_dir = Path(image_dir)
images = []
for ext in ["*.jpg", "*.png", "*.jpeg", "*.gif"]:
images.extend(image_dir.glob(ext))
if not images:
print("⚠️ 没有找到图片文件")
return
print(f"📂 找到 {len(images)} 张图片")
print(f"🔧 使用 {workers} 个进程")
start = time.time()
with Pool(workers) as pool:
results = pool.map(process_image, [str(img) for img in images])
for r in results:
print(f" {r}")
elapsed = time.time() - start
print(f"\n📊 完成: {len(results)} 张 | 耗时: {elapsed:.1f}秒")
# 使用
# batch_process("/path/to/images", workers=4)
10. 今日小结
| | |
|---|
| | Process(target=func) |
| | Pool(4).map(func, tasks) |
| | queue.put() |
| | conn.send() |
| | Value('i', 0) |
| | Manager().dict() |
核心要点
- ✅ 多进程不受 GIL 限制,CPU 密集型任务用多进程
- ✅ 必须用
if __name__ == "__main__": 保护 - ✅ 进程间通信用 Queue(多对多)或 Pipe(一对一)
- ✅ 共享内存用 Value/Array,复杂对象用 Manager
🎯 练习建议:
1. 用多进程计算 1-10000000 的素数个数,对比单进程速度
2. 实现一个多进程日志分析器:每个进程分析一个日志文件,最后汇总
3. 用 Pool.imap_unordered 实现一个并行网页状态检测器
📚 Day37 完成!明天学习异步编程 — asyncio 的协程魔法