❝Python入门第三十四课,主要是学习了如何使用进程池统一管理多个子进程,降低频繁创建/销毁进程带来的开销,避免浪费系统资源。
概述
截至目前,我们已经学会了创建多个进程一起工作,但是我们仍然面临一个问题:如果每来一个任务就创建一个进程,会很浪费系统资源,因为进程创建/销毁成本很高,当有大量任务时,系统资源浪费严重。
那么进程池就可以解决这个问题。使用进程池的优势:如何使用进程池统一管理多个子进程,避免频繁创建/销毁进程带来的开销,因为进程池会提前创建固定数量的进程,反复使用它们来执行任务。
一、创建进程池
准确讲是创建『进程池执行器 ProcessPoolExecutor』,然后使用 submit 方法提交任务,使用 shutdown 方法等待任务完成。
注意:
submit方法只负责“提交任务”,不会阻塞主进程。shutdown(wait=True)方法的作用是不再接收新的任务,参数wait=True的作用是阻塞主进程,等待进程池中所有任务执行完毕。
import os, timefrom concurrent.futures import ProcessPoolExecutordefwork(n): print(f'work 正在执行任务 {n} ......... {os.getpid()}') time.sleep(1)if __name__ == '__main__': print('------------ Main Process Start ------------')# 创建一个进程池执行器 executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程# 使用 submit 方法提交任务 executor.submit(work, 1) executor.submit(work, 2) executor.submit(work, 3) executor.submit(work, 4) executor.submit(work, 5) executor.submit(work, 6) executor.submit(work, 7)# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕 executor.shutdown(wait=True) print('------------ Main Process End ------------')
二、获取进程执行结果
获取子进程执行后的返回结果(Future 类的实例对象 + result 方法)。
import os, timefrom concurrent.futures import ProcessPoolExecutordefwork(n): print(f'work 正在执行任务 {n} ......... {os.getpid()}') time.sleep(1)returnf'我是任务{n}的结果'if __name__ == '__main__': print('------------ Main Process Start ------------')# 创建一个进程池执行器 executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程# 使用 submit 方法提交任务# f1 = executor.submit(work, 1)# f2 = executor.submit(work, 2)# f3 = executor.submit(work, 3)# f4 = executor.submit(work, 4)# f5 = executor.submit(work, 5)# f6 = executor.submit(work, 6)# f7 = executor.submit(work, 7) futures = [executor.submit(work, index) for index in range(1, 8)]# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕 executor.shutdown(wait=True)# print(f1.result())# print(f2.result())# print(f3.result())# print(f4.result())# print(f5.result())# print(f6.result())# print(f7.result())for future in futures: print(future.result()) print('------------ Main Process End ------------')
三、按“完成顺序”获取结果
使用 as_completed 按“完成顺序”获取结果。
import os, timefrom concurrent.futures import ProcessPoolExecutor, as_completeddefwork(n): print(f'work 正在执行任务 {n} ......... {os.getpid()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'我是任务{n}的结果'if __name__ == '__main__': print('------------ Main Process Start ------------')# 创建一个进程池执行器 executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程# 使用 submit 方法提交任务 futures = [executor.submit(work, index) for index in range(1, 8)]# 收集每个任务的结果 result_list = []for future in as_completed(futures): result_list.append(future.result())# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕 executor.shutdown(wait=True)# 打印最终结果 print(result_list) print('------------ Main Process End ------------')
四、完成回调函数
使用 add_done_callback 方法,为任务添加完成时的回调函数。
import os, timefrom concurrent.futures import ProcessPoolExecutordefwork(n): print(f'work 正在执行任务 {n} ......... {os.getpid()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'我是任务{n}的结果'if __name__ == '__main__': print('------------ Main Process Start ------------')# 创建一个进程池执行器 executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程# 收集每个任务的结果 result_list = []# 定义任务完成后的回调函数defdone_func(future): result_list.append(future.result())# 开启7个任务,并指定回调函数for index in range(1, 8): f = executor.submit(work, index) f.add_done_callback(done_func)# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕 executor.shutdown(wait=True)# 打印最终结果 print(result_list) print('------------ Main Process End ------------')
五、map 批量提交任务
使用 map 方法批量提交任务。
注意:map方法本身不阻塞,但读取其返回的生成器对象是阻塞的,并且得到结果的顺序,与任务分配的顺序是一致的。
map方法会把这一批任务提交到进程池里执行,它会立刻返回一个生成器,真正的阻塞发生在:生成器取结果时,如list(result)。
import os, timefrom concurrent.futures import ProcessPoolExecutordefwork(n): print(f'work 正在执行任务 {n} ......... {os.getpid()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'我是任务{n}的结果'if __name__ == '__main__': print('------------ Main Process Start ------------')# 创建一个进程池执行器 executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程# 通过 map 方法批量提交任务(结果按照提交的顺序来) results = executor.map(work, range(1, 8))# 获取 results 生成器中的内容 print(list(results))# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕 executor.shutdown(wait=True) print('------------ Main Process End ------------')
六、自动回收的写法
使用 with:进程池的“自动回收”写法,离开 with 代码块时自动执行 shudown(wait=True)
import os, timefrom concurrent.futures import ProcessPoolExecutordefwork(n): print(f'work 正在执行任务 {n} ......... {os.getpid()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'我是任务{n}的结果'if __name__ == '__main__': print('------------ Main Process Start ------------')# 创建一个进程池执行器,进程池内启动3个进程with ProcessPoolExecutor(3) as executor:# 通过 map 方法批量提交任务(结果按照提交的顺序来) results = executor.map(work, range(1, 8))# 获取 results 生成器中的内容 print(list(results)) print('------------ Main Process End ------------')