一、为什么需要多进程?
Python 中由于 GIL(全局解释器锁) 的存在:
✅ 多线程:适合 I/O 密集型任务
❌ 多线程:无法真正并行执行 CPU 密集型任务
👉 多进程(multiprocessing) 可以:
二、multiprocessing 基础用法
1️⃣ 最简单的多进程示例
from multiprocessing import Process
import os
def task():
print(f"进程 ID: {os.getpid()}")
if __name__ == "__main__":
p = Process(target=task)
p.start()
p.join()
⚠️ Windows 必须加:
if __name__ == "__main__":
否则会导致无限递归创建进程。
三、创建多个进程
2️⃣ 启动多个进程
from multiprocessing import Process
def task(n):
print(f"任务 {n} 执行")
if __name__ == "__main__":
processes = []
for i in range(4):
p = Process(target=task, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
✅ 每个进程都有独立的 Python 解释器和内存空间。
四、进程池(Pool)
3️⃣ Pool 的基本使用(推荐)
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
result = pool.map(square, [1, 2, 3, 4])
print(result)
📌 常用方法:
方法 | 说明 |
|---|
map
| 阻塞式并行 |
imap
| 惰性迭代 |
apply
| 单个任务 |
apply_async
| 异步执行 |
close()
| 关闭池 |
join()
| 等待所有进程结束 |
4️⃣ apply_async 示例
def task(n):
return n * n
if __name__ == "__main__":
with Pool(4) as pool:
results = []
for i in range(5):
res = pool.apply_async(task, (i,))
results.append(res)
print([r.get() for r in results])
五、进程间通信(IPC)
5️⃣ Queue(最常用)
from multiprocessing import Process, Queue
def producer(q):
q.put("Hello from producer")
def consumer(q):
print(q.get())
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
✅ 线程安全 & 进程安全
6️⃣ Pipe(双向通信)
from multiprocessing import Process, Pipe
def send_msg(conn):
conn.send("Hello")
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=send_msg, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
六、共享内存
7️⃣ Value 和 Array
from multiprocessing import Process, Value, Array
def modify(n, arr):
n.value += 1
arr[0] = 100
if __name__ == "__main__":
num = Value('i', 0)
arr = Array('i', [1, 2, 3])
p = Process(target=modify, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
📌 'i'表示 int,'d'表示 double
8️⃣ Manager(高级共享对象)
from multiprocessing import Manager, Process
def worker(d, lst):
d["count"] = 10
lst.append(1)
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
lst = manager.list()
p = Process(target=worker, args=(d, lst))
p.start()
p.join()
print(d)
print(lst)
✅ 支持:
dict
list
Lock
Queue
Namespace
七、进程同步(锁)
9️⃣ Lock 防止竞争条件
from multiprocessing import Process, Lock
def task(lock, n):
with lock:
print(f"Process {n} working")
if __name__ == "__main__":
lock = Lock()
processes = [Process(target=task, args=(lock, i)) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
八、守护进程(Daemon)
p = Process(target=task, daemon=True)
九、Pool + 共享数据(注意事项)
❌ Pool 不能直接传 Value / Array
✅ 正确方式:
def init(shared_dict):
global d
d = shared_dict
with Pool(initializer=init, initargs=(manager.dict(),)) as pool:
...
十、multiprocessing 使用建议
场景 | 推荐方案 |
|---|
CPU 密集 | multiprocessing.Process
|
批量任务 | Pool.map
|
异步任务 | apply_async
|
数据共享 | Manager
|
高性能 | 减少 IPC |
十一、一句话总结
multiprocessing 让 Python 突破 GIL,真正实现多核并行计算。
但代价是: