提到并发,我最早的印象还停留在操作系统课程中:通过交替执行多个进程或线程,提高 CPU 和系统资源的利用率。但在实际开发中,这个概念长期停留在“知道有这么回事”的阶段,日常更多接触的仍然是一些顺序执行的 CRUD 逻辑。直到最近的一项工作任务中,需要对相对大量的数据进行处理,单线程模式下效率明显不足,才真正开始在项目中引入线程,并思考如何让它可控、可维护、可中断地运行。因此,本文将以这次任务为例,抽象出一个 Python 多线程处理框架,尝试回答一个更实际的问题:线程在工程中,究竟应该如何被组织和使用。
整个线程处理流程可以抽象为如下结构(任务被拆分为五个阶段): 输入文件 ↓ 任务队列 (task_queue) ↓ 多个工作线程 (worker threads) ↓ 结果队列 (result_queue) ↓ 单一保存线程 (save thread) ↓分类文件 + 统计信息
之所以这样拆分,是因为真正耗时的阶段集中在任务处理本身,因此只对这一部分引入多线程,而其余阶段尽量保持简单和确定性。数据加载
这一阶段不涉及任何并发逻辑,目标就是从文件中读取数据,并一次性送入任务队列,由后续线程统一调度。这种设计的好处是,后续线程不需要关心数据来源,只需从队列中取任务即可。当然,这里也存在进一步优化空间(如分批加载),但在当前场景下并非瓶颈。def load_datas(): """从文件读取数据到队列""" if not os.path.exists(INPUT_FILE): print(f"错误: 找不到输入文件 {INPUT_FILE}") return count = 0 with open(INPUT_FILE, "r", encoding="utf-8") as f: for line in f: data = line.strip() if data: task_queue.put(data) count += 1 return count
工作线程
这一阶段是整个程序的核心,也是耗时最多的部分。在设计时,我刻意让工作线程只聚焦在核心业务:取任务 → 执行业务逻辑 → 投递结果。不参与结果的最终写入,以及统计之类的收尾内容。这样不仅方便后续的调试,并且在一定程度上能够减少线程间的资源竞争。不过值得注意的是,并非所有业务逻辑都能够充分发挥多线程的并发效果,其场景更多是聚焦在 IO 密集型任务(比如网络请求、数据操作等)上,使程序减少不必要的等待,然后转而执行其他任务,从而提高整体吞吐。若业务逻辑主要是 CPU 计算,盲目增加线程,不仅没有提速的效果,还可能因为频繁切换上下文带来额外开销,导致速度更慢。def task_thread(): """任务线程""" while not task_queue.empty() and not stop_event.is_set(): try: data = task_queue.get(timeout=3) except Exception:break try: # 模拟分类任务 ...... # 将结果放入结果队列,由专门的保存线程处理 result_queue.put((data, status)) except Exception as e: result_queue.put((data, f"error: {str(e)}")) finally: task_queue.task_done()
保存线程与数据统计
该线程与数据加载阶段相对应,主要的目标是从结果队列中取数据,分类写入到结果文件,同时进行数据统计。这里采用单一保存线程,因此统计变量的更新不需要加锁;如果未来演进为多线程保存,则锁机制将是必不可少的。def save_results(total_count): """将处理后的数据保存到文件""" global total_processed, catagory1_count, catagory2_count, catagory3_count f_cat1 = open(CAT1_FILE, "a", encoding="utf-8") f_cat2 = open(CAT2_FILE, "a", encoding="utf-8") f_cat3 = open(CAT3_FILE, "a", encoding="utf-8") try: while True: item = result_queue.get() # 如果收到 None,说明没有数据再来了 if item is None: result_queue.task_done() break data, status = item # 更新统计数据 total_processed += 1 if status == "分类1": f_cat1.write(f"{data} | {status}\n") catagory1_count += 1 elif status == "分类2": f_cat2.write(f"{data} | {status}\n") catagory2_count += 1 else: f_cat3.write(f"{data} | {status}\n") catagory3_count += 1 # 定量刷新一次控制台 if total_processed % 200 == 0 or total_processed == total_count: percent = (total_processed / total_count) * 100 sys.stdout.write( f"\r进度: {total_processed}/{total_count} [{percent:.2f}%] | " f"分类1: {catagory1_count} | 分类2: {catagory2_count} | 分类3: {catagory3_count} " ) sys.stdout.flush() # 定量刷新文件到磁盘 if total_processed % 200 == 0 or total_processed == total_count: f_cat1.flush() f_cat2.flush() f_cat3.flush() result_queue.task_done() except Exception as e: print(f"保存线程异常: {e}") finally: f_cat1.close() f_cat2.close() f_cat3.close()
中断处理
与顺序执行程序不同,多线程程序在中途退出时,如果处理不当,很容易留下未写完的文件或不一致的统计结果。因此首先需要捕获 KeyboardInterrupt,然后设置 stop_event,并且清空任务队列,释放阻塞线程,最后等待队列与保存线程安全结束,这样才能确保所有线程都会有序退出。 ...... try: # 持续检查子线程状态,直到任务队列完成 while any(t.is_alive() for t in threads): for t in threads: t.join(timeout=0.5) # 轮询检查 except KeyboardInterrupt: print("\n检测到 Ctrl+C,正在紧急保存并退出...") stop_event.set() # 通知所有工作线程停止拿新任务 # 清空队列以释放被阻塞的线程 while not task_queue.empty(): try: task_queue.get_nowait() task_queue.task_done() except: break finally: # 等待所有请求任务放进 result_queue task_queue.join() # 给保存线程发一个结束信号 result_queue.put(None) # 等待保存线程把剩下的活干完 result_queue.join() ......
通过这次实践可以看出,线程并不是提升性能的“万能解法”。只有在合适的场景下(如 IO 密集型任务),并发才能真正发挥作用;若线程数量失控,反而可能因上下文切换带来额外开销。在实际设计中,合理约束线程的职责范围、控制任务粒度,往往比单纯增加并发度更重要。
在多线程实践的基础上,我也逐步对线程池、异步编程与进程池等并发模型建立了初步认识:前两者更适用于 IO 密集型场景,而进程池通常用于缓解 CPU 密集型任务的性能瓶颈。这种区分也为后续进一步学习并发模型及其工程化应用,提供了一个相对清晰的方向。