❝Python入门第三十六课,主要是学习了线程池,与进程池机制一样,线程池管理一组可复用的线程,减少创建销毁开销,高效处理并发任务,提升资源利用率。
一、创建线程池
创建『线程池执行器 ProcessPoolExecutor』,然后使用 submit 方法提交任务,使用 shutdown 方法等待任务完成。
注意:
shutdown(wait=True)方法的作用是不再接收新的任务,参数wait=True的作用是阻塞,等待线程池中所有任务执行完毕。
import time, osfrom concurrent.futures import ThreadPoolExecutorfrom threading import get_native_id, RLockdefwork(n, lock):with lock: print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}') time.sleep(1)if __name__ == '__main__': print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')# 创建一个线程池执行器,默认启动3个线程 executor = ThreadPoolExecutor(3)# 创建线程锁 lock = RLock()# 使用 submit 方法提交任务 executor.submit(work, 1, lock) executor.submit(work, 2, lock) executor.submit(work, 3, lock) executor.submit(work, 4, lock) executor.submit(work, 5, lock) executor.submit(work, 6, lock) executor.submit(work, 7, lock)# 阻塞等待线程池中所有任务执行完毕。 executor.shutdown(wait=True) print('--------- Main Process End -------------')
二、获取线程执行结果
获取线程执行后的返回结果(Future 类的实例对象 + result 方法)。
import time, osfrom concurrent.futures import ThreadPoolExecutorfrom threading import get_native_id, RLockdefwork(n, lock):with lock: print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}') time.sleep(1)returnf'任务{n}的结果'if __name__ == '__main__': print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')# 创建一个线程池执行器,默认启动3个线程 executor = ThreadPoolExecutor(3)# 创建线程锁 lock = RLock()# 使用 submit 方法提交任务 futures = [executor.submit(work, index, lock) for index in range(1, 8)]# 阻塞等待线程池中所有任务执行完毕。 executor.shutdown(wait=True)# 打印结果for future in futures: print(future.result()) print('--------- Main Process End -------------')
三、按“完成顺序”获取结果
使用 as_completed 按“完成顺序”获取结果。
import time, osfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom threading import get_native_id, RLockdefwork(n, lock):with lock: print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'任务{n}的结果'if __name__ == '__main__': print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')# 创建一个线程池执行器,默认启动3个线程 executor = ThreadPoolExecutor(3)# 创建线程锁 lock = RLock()# 使用 submit 方法提交任务 futures = [executor.submit(work, index, lock) for index in range(1, 8)]# 收集每个线程返回的结果 result_list = []for future in as_completed(futures): result_list.append(future.result())# 阻塞等待线程池中所有任务执行完毕。 executor.shutdown(wait=True)# 打印结果 print(result_list) print('--------- Main Process End -------------')
四、完成回调函数
使用 add_done_callback 方法,为任务添加完成时的回调函数。
import time, osfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom threading import get_native_id, RLockdefwork(n, lock):with lock: print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'任务{n}的结果'if __name__ == '__main__': print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')# 创建一个线程池执行器,默认启动3个线程 executor = ThreadPoolExecutor(3)# 创建线程锁 lock = RLock()# 收集每个线程返回的结果 result_list = []# 定义一个线程执行成功后的回调函数defdone_func(f): result_list.append(f.result())# 使用 submit 方法提交任务,并指定回调函数for index in range(1, 8): f = executor.submit(work, index, lock) f.add_done_callback(done_func)# 阻塞等待线程池中所有任务执行完毕。 executor.shutdown(wait=True)# 打印结果 print(result_list) print('--------- Main Process End -------------')
五、map 批量提交任务
使用 map 方法批量提交任务。
注意:map方法本身不阻塞,但读取其返回的生成器对象是阻塞的,并且得到结果的顺序,与任务分配的顺序是一致的。
map方法会把这一批任务提交到线程池里执行,它会立刻返回一个生成器,真正的阻塞发生在:生成器取结果时,如list(result)。
import time, osfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom threading import get_native_id, RLockdefwork(n, lock):with lock: print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'任务{n}的结果'if __name__ == '__main__': print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')# 创建一个线程池执行器,默认启动3个线程 executor = ThreadPoolExecutor(3)# 创建线程锁 lock = RLock()# 使用 map 方法批量提交任务 result = executor.map(work, range(1, 8), [lock] * 7)# 打印结果 print(list(result))# 阻塞等待线程池中所有任务执行完毕。 executor.shutdown(wait=True) print('--------- Main Process End -------------')
六、自动回收的写法
使用 with:线程池的“自动回收”写法,离开 with 代码块时自动执行 shudown(wait=True)
import time, osfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom threading import get_native_id, RLockdefwork(n, lock):with lock: print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')if n == 1: time.sleep(15)elif n == 2: time.sleep(10)else: time.sleep(1)returnf'任务{n}的结果'if __name__ == '__main__': print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')with ThreadPoolExecutor(3) as executor:# 创建线程锁 lock = RLock()# 使用 map 方法批量提交任务 result = executor.map(work, range(1, 8), [lock] * 7)# 打印结果 print(list(result)) print('--------- Main Process End -------------')