上节讲了多进程的启动方式,以及如何创建一个多进程程序。本节来聊一聊进程间如何传递参数,也叫进程间通信。P 3、进程间通信
(1)Lock锁通信
Lock锁的通信和多线程类似,但它不能用于全局变量,因为进程间内存是独立的,会生成各自的命名空间。某种意义上讲,在多进程中它是无效的。为什么这么说,因为它可以利用multiprocessing.Value()实现变量资源共享。但它不是进程通信的主要方式,所以这里就不多做说明了。
(2)Queue队列通信
队列通信是比较常用的进程间通信方式,它利用multiprocessing.Queue()方法实现。
案例展示,如下:
defmyfunc(name,q): t0 = time.time() print(f' Process {name} : Start') time.sleep(2) q.put(f' Process {name} sleep 2 seconds.') # 任务函数将数据推送到队列中 print(f' Process {name} : End. Time={time.time()-t0}')if __name__=='__main__': print('Process test : Start') t0 = time.time() q = multiprocessing.Queue() # 创建队列 proc = [] for i inrange(3): # 创建进程 obj = multiprocessing.Process(target=myfunc,args=(i,q)) # 队列通过参数传递给任务函数 proc.append(obj) for i inrange(3): # 启动进程 proc[i].start() for i inrange(3): print(q.get()) # 主函数将子进程的数据读取出来。 for i inrange(3): # 等待进程结束,此时对进程的join()方法可以运行,但其余的代码不可执行。 proc[i].join() print(f'Process test : End.Time={time.time()-t0}')
运行效果,如下:
Process test : Start Process0 : Start Process1 : Start Process2 : Start Process0 : End. Time=2.0036540031433105 Process0 sleep 2 seconds. Process1 : End. Time=2.0014002323150635 Process1 sleep 2 seconds. Process2 : End. Time=2.0017902851104736 Process2 sleep 2 seconds.Process test : End.Time=2.3170416355133057
(3)Pipe管道通信
管道和队列的通信方式比较相似,但有区别。因为管道是创建两个端口,这样就可以一个进一个出双向通信。而队列是每个进程都可以向队列中发送消息,也都可以读取数据,是一种多入多出的通信方式。
案例展示,如下:
def myfunc(name,pipe): t0 = time.time() print(f' Process {name} : Start') time.sleep(2) pipe.send(f' Process {name} sleep 2 seconds.') print(f' Process {name} : End. Time={time.time()-t0}')if __name__=='__main__': print('Process test : Start') p0,p1 = multiprocessing.Pipe() t0 = time.time() proc = [] for i in range(3): # 创建进程 obj = multiprocessing.Process(target=myfunc,args=(i,p1)) proc.append(obj) for i in range(3): # 启动进程 proc[i].start() print(p0.recv()) for i in range(3): # 等待进程结束,此时对进程的join()方法可以运行,但其余的代码不可执行。 proc[i].join() print(f'Process test : End.Time={time.time()-t0}')
运行效果,如下:
Process test : Start Process0 : Start Process1 : Start Process2 : Start Process0 : End. Time=2.000847816467285 Process0 sleep 2 seconds. Process1 : End. Time=2.000988721847534 Process1 sleep 2 seconds. Process2 : End. Time=2.001150608062744 Process2 sleep 2 seconds.Process test : End.Time=2.241823673248291
上面是单向消息,实际上管道可以双向传递,如下:
defmyfunc(name,pipe): t0 = time.time() print(f' Process {name} : Start') time.sleep(2) pipe.send(f' Process {name} sleep 2 seconds.') # 发送消息 print(f' Process {name} : {pipe.recv()}') # 接收消息 print(f' Process {name} : End. Time={time.time()-t0}')if __name__=='__main__': print('Process test : Start') p0,p1 = multiprocessing.Pipe() t0 = time.time() proc = [] for i inrange(3): # 创建进程 obj = multiprocessing.Process(target=myfunc,args=(i,p1)) proc.append(obj) for i inrange(3): # 启动进程 proc[i].start() for i in ['Tom','Lex','Rose']: print(p0.recv()) # 接收消息 p0.send(i) # 发送消息 for i inrange(3): # 等待进程结束,此时对进程的join()方法可以运行,但其余的代码不可执行。 proc[i].join() print(f'Process test : End.Time={time.time()-t0}')
Process test : Start Process0 : Start Process1 : Start Process2 : Start Process0 sleep 2 seconds. Process0 : Tom Process0 : End. Time=2.0024027824401855 Process1 sleep 2 seconds. Process1 : Lex Process1 : End. Time=2.0031163692474365 Process2 sleep 2 seconds. Process2 : Rose Process2 : End. Time=2.001136541366577Process test : End.Time=2.2119128704071045
通过上述的Queue、Pipe两种方式都可以实现消息传递,但并没有做消息校验,这在实际编程中是要不得的,可能造成一定的混乱。所以实际编程时,一定要在使用消息前,做消息校验。
-------------------------它是数字世界里的一把杀猪刀
却总能巧夺天工
它的世界是纯粹0、1组合
却总能创造无尽幻想
......
本公众号关注数据价值分析、编程学习,将不定期更新社会热点数据分析结果、编程技巧,分享数据分析工具、方法、学习等内容,欢迎有兴趣的小伙伴加入。