本文将带你系统梳理Python多进程编程的核心知识点,包含大量实战代码,建议收藏!
进程与线程
在深入编码前,先厘清两个基础概念:
- • 进程:一个正在运行的程序,操作系统分配资源的基本单位。每个进程拥有独立的内存空间。
- • 线程:进程内部的执行单元,操作系统CPU调度的基本单位。同一进程的线程共享进程资源。
简单理解:进程是“容器”,线程是“容器里的工人”。Python的多线程受GIL(全局解释器锁)限制,但多进程可以绕过GIL,实现真正的并行。
Process类详解
from multiprocessing import Process, current_process
import time
def task(name, age):
for i in range(3):
print(f'进程{current_process().name}: {name}, 年龄{age}, 第{i}次')
time.sleep(1)
if __name__ == '__main__':
p = Process(target=task, args=('悟空', 500), name='my_process')
p.start()
p.join()
| |
|---|
group | |
target | |
name | |
args | |
kwargs | |
daemon | |
进程同步锁(Lock / RLock)
多个进程操作共享资源(如文件、终端)时,需要加锁防止冲突。
普通锁 Lock
from multiprocessing import Process, Lock
def print_with_lock(lock, msg):
lock.acquire()
try:
print(msg) # 临界区
finally:
lock.release()
if __name__ == '__main__':
lock = Lock()
p1 = Process(target=print_with_lock, args=(lock, 'Hello'))
p2 = Process(target=print_with_lock, args=(lock, 'World'))
p1.start(); p2.start()
可重入锁(RLock)
RLock允许同一进程多次获取锁(递归锁),而Lock不支持。
from multiprocessing import RLock
rlock = RLock()
rlock.acquire()
rlock.acquire() # 可以再次获取,不会死锁
rlock.release()
rlock.release()
with自动管理锁
更推荐使用with语句,自动执行acquire和release,即使发生异常也能释放锁:
with lock:
# 临界区代码
print('安全打印')
等待进程结束: join
p.join()的作用是阻塞当前进程,等待进程p执行完毕再继续。
'''
join(timeout)可指定等待超时(秒)。
必须在start()之后调用。
注意:join阻塞的是调用它的进程,而不是被调用的进程。
'''
p1.start()
p2.start()
p1.join() # 主进程等 p1
p2.join() # 主进程等 p2
print('所有子进程已结束')
终止进程:terminate() 和 is_alive()
如果想强制结束一个子进程,使用terminate()
p1.terminate() # 异步终止,操作系统回收资源
p1.join() # 等待终止完成
print(p1.is_alive()) # False
注意:terminate不会执行finally块,进程资源可能无法正确清理,慎用!
守护进程
守护进程是一个依附于主进程的子进程,一旦主进程结束,它会被自动终止。
应用场景
from multiprocessing import Process
import time
'''
守护进程必须在start()之前设置daemon=True。
主进程结束时,守护进程立即被终止。
守护进程中不允许再创建子进程。
'''
def monitor():
while True:
print('守护进程监控中...')
time.sleep(1)
if __name__ == '__main__':
p = Process(target=monitor, daemon=True) # 必须在start前设置
p.start()
time.sleep(3)
print('主进程结束,守护进程自动终止')
进程不共线内存,那如何通信呢?
进程拥有独立内存空间,全局变量在子进程中只是副本,修改互不影响:
from multiprocessing import Process
num = 100
def change():
global num
num += 10
print(f'子进程: {num}')
if __name__ == '__main__':
p = Process(target=change)
p.start()
p.join()
print(f'主进程: {num}') # 仍然是100
因此,进程间通信需要借助专用机制:Queue 和 Pipe
进程通信(IPC)
Queue
Queue(队列)--先进先出。multiprocessing.Queue支持多进程安全地放入和取出数据。
from multiprocessing import Queue, Process
def producer(q):
q.put('hello')
def consumer(q):
data = q.get()
print(f'收到: {data}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join(); p2.join()
特点:自动加锁,适合多生产者/消费者模式。
Pipe
Pipe(管道)--双向或单向通信;Pipe返回两个连接对象, 默认全双工(双向通信)。
from multiprocessing import Process, Pipe
def sender(conn):
conn.send('你好')
conn.close()
def receiver(conn):
msg = conn.recv()
print(f'收到: {msg}')
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 全双工
p1 = Process(target=sender, args=(parent_conn,))
p2 = Process(target=receiver, args=(child_conn,))
p1.start(); p2.start()
单向模式:Pipe(duplex=False),第一个只能接收,第二个只能发送。
性能对比:Pipe比Queue更快(无锁开销),适合两个进程间的大数据传输。
自定义进程类
通过继承Process并重写run方法,可以构建更清晰的进程类:
from multiprocessing import Process, current_process
import time
class MyProcess(Process):
def __init__(self, msg, *args, **kwargs):
super().__init__(*args, **kwargs)
self.msg = msg
def run(self):
for i in range(3):
print(f'{current_process().name}: {self.msg}, 第{i}次')
time.sleep(1)
if __name__ == '__main__':
p = MyProcess('hello', name='custom')
p.start()
p.join()
进阶技巧与注意事项
- • 避免使用
os.fork():直接使用multiprocessing更安全。 - •
if name == 'main': 在Windows上必须使用,否则会无限递归创建进程。 - • 进程池
(Pool):如果大量创建进程,推荐使用Pool(本文未覆盖,下篇专门讲解)。 - • 共享内存
(Value/Array):除Queue和Pipe外,还可通过共享内存实现数据共享,但需要注意同步。
如果觉得有用,请点赞、在看、转发支持~