Worker Pool(工作池)是控制并发的经典模式:
固定数量的「工人」(线程 / 进程),不随任务数量无限创建
工人从「任务队列」中领取任务执行
核心价值:限制并发数、优化资源、稳定可控、优雅关闭# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:20# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : settings.py"""全局配置文件:所有可变参数统一管理符合企业级:配置与代码分离"""import os# 工作池配置WORKER_COUNT = 3 # 固定生产线数量(可动态修改)TASK_QUEUE_MAXSIZE = 100 # 任务队列最大长度# 业务模拟配置SIMULATE_TASK_DELAY = True # 是否模拟任务耗时RANDOM_DELAY_RANGE = (0.5, 2.5) # 模拟IO延迟# 日志配置LOG_LEVEL = "INFO"LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:20# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : exceptions.py"""统一异常体系:职责单一专门处理工作池 + 业务异常"""class WorkerPoolException(Exception): """ 工作池基础异常 """ passclass TaskExecutionException(WorkerPoolException): """ 任务执行失败异常 """ passclass TaskNotFoundException(WorkerPoolException): """ 任务不存在异常 """ pass# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:21# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : logger.py"""日志工具:全局唯一日志实例职责单一:只做日志输出"""import loggingfrom WorkerPoolPattern.config.settings import LOG_LEVEL, LOG_FORMATdef get_logger(name: str = "jewelry_production") -> logging.Logger: logger = logging.getLogger(name) logger.setLevel(LOG_LEVEL) if not logger.handlers: handler = logging.StreamHandler() # Windows 必加:解决中文/编码报错 handler.stream.reconfigure(encoding='utf-8') formatter = logging.Formatter(LOG_FORMAT) handler.setFormatter(formatter) logger.addHandler(handler) return logger# 全局可用的日志对象logger = get_logger()# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:23# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : worker.py"""核心工作池:通用、可扩展、线程安全职责单一:只管理工人 + 任务队列不耦合任何珠宝业务,可直接复用于其他项目"""import queueimport threadingfrom typing import Callable, Anyfrom WorkerPoolPattern.utils.logger import loggerfrom WorkerPoolPattern.core.exceptions import TaskExecutionExceptionclass Worker(threading.Thread): """ 单个工作者:无限从队列取任务执行 """ def __init__(self, task_queue: queue.Queue, worker_id: int): super().__init__(daemon=True) self.queue = task_queue self.worker_id = worker_id self.name = f"Worker-{worker_id}" def run(self): """ :return: """ logger.info(f"工人【{self.worker_id}】已启动,等待任务...") while True: try: task_func, task_id, args, kwargs = self.queue.get() logger.info(f"工人【{self.worker_id}】领取任务【{task_id}】") try: task_func(*args, **kwargs) logger.info(f"工人【{self.worker_id}】完成任务【{task_id}】") except Exception as e: logger.error(f"任务【{task_id}】执行失败:{str(e)}") raise TaskExecutionException(f"任务执行异常:{e}") from e finally: self.queue.task_done() except Exception as e: logger.error(f"工人【{self.worker_id}】异常:{str(e)}")class WorkerPool: """ 工作池管理器:职责单一,只负责启动/管理/等待工人 """ def __init__(self, worker_count: int, queue_maxsize: int = 0): self.worker_count = worker_count self.task_queue = queue.Queue(maxsize=queue_maxsize) self.workers: list[Worker] = [] def start(self): """ 启动所有工作者 :return: """ logger.info(f"启动工作池,共 {self.worker_count} 个工作者") for i in range(1, self.worker_count + 1): worker = Worker(self.task_queue, i) worker.start() self.workers.append(worker) def submit(self, task_id: str, task_func: Callable, *args, **kwargs) -> None: """ 提交任务到队列(通用接口) :param task_id: :param task_func: :param args: :param kwargs: :return: """ self.task_queue.put((task_func, task_id, args, kwargs)) def wait_completion(self): """ 等待所有任务执行完成(优雅关闭) :return: """ self.task_queue.join() logger.info("所有任务已完成,工作池优雅关闭")
# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:25# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : check_tasks.py"""原料质检 / 成品质检"""import timeimport randomfrom WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGEfrom WorkerPoolPattern.utils.logger import loggerdef raw_material_check(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 原料质检:钻石4C、真伪、纯度检测") time.sleep(random.uniform(*RANDOM_DELAY_RANGE))def finished_goods_check(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 成品质检:工艺、成色、尺寸、镶口精度全检") time.sleep(random.uniform(*RANDOM_DELAY_RANGE))# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:27# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : process_tasks.py"""首饰加工、设计、镶嵌"""import timeimport randomfrom WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGEfrom WorkerPoolPattern.utils.logger import loggerdef jewelry_process(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 首饰加工:设计 → 执模 → 镶嵌 → 抛光 → 电金") time.sleep(random.uniform(*RANDOM_DELAY_RANGE))# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:28# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : logistics_tasks.py"""库存录入、物流发货"""import timeimport randomfrom WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGEfrom WorkerPoolPattern.utils.logger import loggerdef inventory_record(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 库存录入:生成唯一防伪码、ERP入库、标签打印") time.sleep(random.uniform(*RANDOM_DELAY_RANGE))def order_delivery(order_id: str): """ :param order_id: :return: """ logger.info(f"[{order_id}] 订单发货:物流下单、保价、短信通知") time.sleep(random.uniform(*RANDOM_DELAY_RANGE))# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:29# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : production_service.py"""珠宝生产业务服务只负责:接收订单 → 编排任务 → 提交到工作池"""from WorkerPoolPattern.tasks import JEWELRY_FULL_PROCESSfrom WorkerPoolPattern.core.worker import WorkerPoolfrom WorkerPoolPattern.utils.logger import loggerclass JewelryProductionService(object): """ 业务编排层(职责单一) """ def __init__(self, pool: WorkerPool): self.pool = pool # 依赖注入:解耦 def create_order_task(self, order_id: str): """ 为单个订单创建全流程任务链 :param order_id: :return: """ logger.info(f"📦 开始创建订单【{order_id}】的全流程任务") for task_name, task_func in JEWELRY_FULL_PROCESS: task_id = f"{order_id}-{task_name}" self.pool.submit(task_id, task_func, order_id) def batch_create_orders(self, order_count: int): """ 批量创建订单任务 :param order_count: :return: """ logger.info(f"📋 开始批量创建 {order_count} 个珠宝生产订单") for i in range(1, order_count + 1): order_id = f"珠宝订单-{i:03d}" self.create_order_task(order_id)
# encoding: utf-8# 版权所有 2026 ©涂聚文有限公司™ ®# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎# 描述:Worker Pool Pattern 工作池模式# Author : geovindu,Geovin Du 涂聚文.# IDE : PyCharm 2024.3.6 python 3.11# os : windows 10# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j# Datetime : 2026/6/21 8:31# User : geovindu# Product : PyCharm# Project : pydesginpattern# File : WorkerPoolBll.py"""项目主入口职责单一:只负责启动 → 执行业务 → 等待结束"""from WorkerPoolPattern.config.settings import WORKER_COUNT, TASK_QUEUE_MAXSIZEfrom WorkerPoolPattern.core.worker import WorkerPoolfrom WorkerPoolPattern.service.production_service import JewelryProductionServicefrom WorkerPoolPattern.utils.logger import loggerclass WorkerPoolBll(object): """ """ def demo(self): """ :return: """ logger.info("=" * 60) logger.info("珠宝企业级生产系统启动(Worker Pool 模式)") logger.info("=" * 60) pool = WorkerPool( worker_count=WORKER_COUNT, queue_maxsize=TASK_QUEUE_MAXSIZE ) pool.start() production_service = JewelryProductionService(pool) production_service.batch_create_orders(order_count=10) pool.wait_completion() logger.info("珠宝生产系统全部执行完成")