接口明明不慢,单个请求 200ms,批量跑 5000 个却要十几分钟。这种代码我见过不少,打开一看,基本都是一个 for 循环从头怼到尾:
for user_id in user_ids:
profile = load_user_profile(user_id)
write_snapshot(profile)
这地方我第一眼不会怀疑 Python 慢,也不会先去优化函数内部。只要里面有网络请求、文件读写、数据库查询,八成是线程没用上。
Python 做多线程,最常见有两种写法:threading.Thread 和 ThreadPoolExecutor。
平时业务代码里,我更愿意用后者。线程池可控,不容易一激动创建几千个线程,把机器搞得风扇起飞。
比如一个批量拉用户状态的脚本,接口偶尔会抖,不能一个失败就把整批任务干废:
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
defquery_user_status(user_id):
begin = time.time()
# 这里模拟一次远程接口调用
time.sleep(random.uniform(0.05, 0.3))
if user_id % 17 == 0:
raise RuntimeError(f"remote api timeout, user_id={user_id}")
cost_ms = int((time.time() - begin) * 1000)
return {
"user_id": user_id,
"status": "ACTIVE",
"cost_ms": cost_ms
}
defbatch_query(user_ids):
ok_rows = []
bad_rows = []
with ThreadPoolExecutor(max_workers=12, thread_name_prefix="user-sync") as pool:
future_map = {
pool.submit(query_user_status, user_id): user_id
for user_id in user_ids
}
for future in as_completed(future_map):
user_id = future_map[future]
try:
row = future.result()
ok_rows.append(row)
except Exception as e:
bad_rows.append((user_id, str(e)))
return ok_rows, bad_rows
if __name__ == "__main__":
users = list(range(1, 101))
ok, bad = batch_query(users)
print("success:", len(ok))
print("failed:", bad[:5])
这段代码能解决大部分“批量处理慢”的问题。
注意,我这里没有把 max_workers 写成 100、200。线程不是越多越好。接口慢的时候,加线程确实能把等待时间叠起来,但线程多了以后,调度、连接数、下游限流都会跟着来。
线上我一般先从 8、12、16 这种数开始试,而不是拍脑袋写个 100。
如果任务之间要共享数据,就别随手改全局变量。这个坑很阴,开发环境跑 20 条数据没问题,上线跑 20 万条,数量偶尔少几条,日志还看不出来。
比如下面这种写法,看着没毛病,其实不稳:
total = 0
defadd_count():
global total
total += 1
total += 1 不是一个不可拆的动作,中间可能被别的线程插进来。
要么用锁:
import threading
from concurrent.futures import ThreadPoolExecutor
classCounter:
def__init__(self):
self.value = 0
self._lock = threading.Lock()
defincr(self, step=1):
with self._lock:
self.value += step
defhandle_one_line(line, counter):
if"ERROR"in line:
counter.incr()
if __name__ == "__main__":
lines = [
"INFO order created",
"ERROR payment timeout",
"WARN retry later",
"ERROR inventory locked",
] * 1000
counter = Counter()
with ThreadPoolExecutor(max_workers=6) as pool:
for line in lines:
pool.submit(handle_one_line, line, counter)
print(counter.value)
不过锁也别乱加。锁加大了,多线程又退化成单线程。能让每个线程自己算自己的结果,最后再汇总,就别共享一个变量。
另一种更常见的写法,是用 queue.Queue 做生产者消费者。
我处理日志、导文件、补数据的时候经常这么写。一个线程负责读,几个线程负责处理,最后再统一落盘或者写库。
import queue
import threading
import time
task_queue = queue.Queue(maxsize=1000)
stop_flag = object()
defread_log_file(file_path):
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if"orderId="in line:
task_queue.put(line.strip())
for _ in range(4):
task_queue.put(stop_flag)
defparse_worker(worker_no):
whileTrue:
line = task_queue.get()
try:
if line is stop_flag:
return
# 模拟解析日志里的订单号
order_id = line.split("orderId=")[-1].split()[0]
time.sleep(0.02)
print(f"worker={worker_no}, order_id={order_id}")
finally:
task_queue.task_done()
if __name__ == "__main__":
reader = threading.Thread(
target=read_log_file,
args=("app.log",),
name="log-reader"
)
workers = [
threading.Thread(target=parse_worker, args=(i,), name=f"log-parser-{i}")
for i in range(4)
]
reader.start()
for t in workers:
t.start()
reader.join()
task_queue.join()
for t in workers:
t.join()
这段代码有两个细节。
一个是 Queue(maxsize=1000),我不喜欢无限队列。读文件太快,处理太慢,内存会被撑起来。加个大小限制,至少能让读取端等等后面的处理线程。
另一个是 stop_flag。线程不能靠猜结束,尤其是消费者线程。没有明确退出信号,就容易卡在 get() 那里,脚本看着没报错,就是不结束。
还有个绕不开的问题:Python 多线程到底能不能提升性能?
得看任务类型。
如果是请求接口、读写文件、查数据库、扫日志,多线程通常是有效的。因为线程大部分时间在等 IO,CPU 没怎么干活。
如果是压缩图片、跑复杂计算、解析大 JSON、大量加密解密,这种 CPU 密集任务,多线程效果就别抱太大希望。Python 里有 GIL,多个线程不等于多个 CPU 核同时猛跑。这个时候我一般换 multiprocessing,或者把重活丢给 C 扩展、NumPy、外部服务处理。
还有一点,线程里的异常不会像主线程那样直接把你喊醒。
用 ThreadPoolExecutor 的好处就在这里,future.result() 会把异常重新抛出来。你能知道哪条数据炸了,而不是线程悄悄死掉,主流程还打印一个“执行完成”。
多线程不是为了把代码写得高级。
它解决的是很具体的问题:大量任务都在等,主线程却傻站着一个个排队。只要确认瓶颈在 IO,线程池基本就是第一把刀。先把并发数控住,再把异常、超时、退出信号补齐,这代码才敢放到生产脚本里跑。