🧵
📖 引言:你的代码可能正在“裸奔”
多线程,听起来高大上,用起来真香!
几行代码就能让程序“分身有术”,性能起飞。
但等等,你的线程安全吗?
我见过太多新手,甚至一些老鸟,写出的多线程代码就像在悬崖边跳舞。
平时跑得挺欢,一上线就各种诡异bug:数据莫名丢失、计算结果随机出错、程序偶尔卡死...
线程安全问题,就是多线程世界的“隐形杀手”。
它不一定会每次都发作,但一旦发作,调试起来能让你怀疑人生。
今天,小甲鱼就带你扒一扒Python多线程里最容易被忽视的5个线程安全大坑。
看完这篇,保证你写多线程代码时,后背发凉,然后... 写出更安全的代码!
🔍 核心原理:先搞懂什么是“线程不安全”
想象一下这个场景:
你和你女朋友同时去ATM机查余额(假设能同时操作)。
但最终余额是多少?
可能是500元,也可能是400元,甚至可能是-100元!
这就是典型的竞态条件(Race Condition)。
🎯 线程安全的本质
线程安全就是保证多个线程同时访问共享资源时,程序的行为是可预测且正确的。
关键就两点:
- 1. 共享资源:多个线程都能访问的数据(全局变量、共享对象等)
- 2. 非原子操作:看起来是一步,实际上需要多个CPU指令完成的操作
比如 count += 1 这个操作,在Python里至少需要三步:
线程A刚做完第一步,线程B可能就插进来了! 这就是问题的根源。
🔒 锁:多线程世界的“交通信号灯”
Python的threading.Lock就是为了解决这个问题。
它就像一个单人卫生间的门锁:
- • 线程B想进去?等着! 直到线程A出来(
release())
lock = threading.Lock()
defsafe_function():
lock.acquire() # 🔐 锁上门
try:
# 操作共享资源的代码
pass
finally:
lock.release() # 🔓 打开门
但锁不是万能的,用不好反而会制造更多问题...
💡 实战案例:三个让你惊掉下巴的线程安全问题
案例一:全局变量的“量子态”
你以为的代码:
import threading
counter = 0
results = []
defincrement():
global counter
for _ inrange(100000):
counter += 1
results.append(counter)
# 创建10个线程
threads = []
for i inrange(10):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数: {counter}")
print(f"预期结果: 1000000")
print(f"实际结果列表: {results}")
你看到的结果:
最终计数: 643289 # 每次运行都不一样!
预期结果: 1000000
实际结果列表: [100000, 200000, 300000, 387654, 487654, ...] # 乱序且有缺失
为什么?
counter += 1 不是原子操作!
多个线程同时读取、修改、写回,导致大量操作被覆盖。
修复方案:加锁!
import threading
counter = 0
lock = threading.Lock() # 🔐 创建一把锁
results = []
defsafe_increment():
global counter
for _ inrange(100000):
lock.acquire() # 上锁
try:
counter += 1
finally:
lock.release() # 必须释放锁!
lock.acquire()
results.append(counter)
lock.release()
# 测试代码同上...
优化:使用上下文管理器
defbetter_increment():
global counter
for _ inrange(100000):
with lock: # 自动管理锁的获取和释放
counter += 1
with lock:
results.append(counter)
案例二:列表操作的“魔法消失术”
场景: 多线程向同一个列表添加元素
import threading
shared_list = []
defadd_items(thread_id):
for i inrange(1000):
# 模拟一些处理时间
temp = f"Thread-{thread_id}-Item-{i}"
shared_list.append(temp)
# 启动5个线程
threads = []
for i inrange(5):
t = threading.Thread(target=add_items, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"预期元素数量: 5000")
print(f"实际元素数量: {len(shared_list)}")
print(f"列表前10个元素: {shared_list[:10]}")
可能的结果:
- • 可能出现
IndexError(列表内部结构损坏)
为什么列表不安全?
列表的append()操作虽然看起来是一个方法调用,但在CPython内部,它可能触发列表的重新分配和复制。
多个线程同时触发这个过程,就会导致内部状态混乱。
正确做法:
import threading
shared_list = []
list_lock = threading.Lock()
defsafe_add_items(thread_id):
for i inrange(1000):
temp = f"Thread-{thread_id}-Item-{i}"
with list_lock:
shared_list.append(temp)
# 或者使用线程安全的队列
from queue import Queue
safe_queue = Queue()
defproducer(thread_id):
for i inrange(1000):
safe_queue.put(f"Thread-{thread_id}-Item-{i}")
defconsumer():
whileTrue:
item = safe_queue.get()
if item isNone:
break
# 处理item...
safe_queue.task_done()
案例三:文件操作的“内容混搭”
场景: 多个线程同时写入同一个文件
import threading
import time
defwrite_to_file(thread_id):
withopen('shared_log.txt', 'a') as f:
for i inrange(100):
f.write(f"Thread {thread_id}: Line {i}\n")
time.sleep(0.001) # 模拟耗时操作
# 启动3个线程同时写入
threads = []
for i inrange(3):
t = threading.Thread(target=write_to_file, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 检查文件内容
withopen('shared_log.txt', 'r') as f:
lines = f.readlines()
print(f"总行数: {len(lines)}")
print("最后10行:")
for line in lines[-10:]:
print(line, end='')
文件内容可能变成这样:
Thread 0: Line 99
Thread 1: Line 9Thread 2: Line 88
Thread 0: Line 98
Thread 1: Line 9Thread 2: Line 87
...
为什么?
文件的write()操作不是原子的,多个线程的写入会相互干扰,导致内容错乱。
解决方案:
import threading
file_lock = threading.Lock()
defsafe_write_to_file(thread_id):
for i inrange(100):
with file_lock:
withopen('shared_log.txt', 'a') as f:
f.write(f"Thread {thread_id}: Line {i}\n")
time.sleep(0.001)
# 或者使用专门的日志模块,它内部已经处理了线程安全
import logging
logging.basicConfig(
filename='thread_safe.log',
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(message)s'
)
deflog_with_logging(thread_id):
for i inrange(100):
logging.info(f"Thread {thread_id}: Line {i}")
🚀 高级技巧:不只是Lock那么简单
技巧一:RLock(可重入锁)
问题场景: 函数递归调用时,普通锁会死锁
import threading
lock = threading.Lock()
defrecursive_function(n):
lock.acquire()
try:
if n > 0:
print(f"进入第{n}层")
recursive_function(n-1) # 💀 这里会再次尝试获取锁,导致死锁!
print(f"离开第{n}层")
finally:
lock.release()
# 这会导致死锁!
# recursive_function(3)
解决方案:使用RLock
import threading
rlock = threading.RLock() # 可重入锁
defsafe_recursive_function(n):
with rlock: # 同一线程可以多次获取同一个RLock
if n > 0:
print(f"进入第{n}层")
safe_recursive_function(n-1) # ✅ 可以再次获取
print(f"离开第{n}层")
safe_recursive_function(3)
RLock原理:
技巧二:Semaphore(信号量)
场景: 需要控制同时访问资源的线程数量(比如数据库连接池)
import threading
import time
from random import random
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
defaccess_database(thread_id):
print(f"线程 {thread_id} 等待数据库连接...")
with semaphore: # 获取信号量
print(f"线程 {thread_id} 获得连接,正在查询...")
time.sleep(random() * 2) # 模拟查询时间
print(f"线程 {thread_id} 查询完成,释放连接")
# 启动10个线程,但同时只有3个能访问数据库
threads = []
for i inrange(10):
t = threading.Thread(target=access_database, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
输出示例:
线程 0 等待数据库连接...
线程 0 获得连接,正在查询...
线程 1 等待数据库连接...
线程 1 获得连接,正在查询...
线程 2 等待数据库连接...
线程 2 获得连接,正在查询...
线程 3 等待数据库连接... # 必须等待,直到有信号量释放
...
技巧三:Condition(条件变量)
场景: 生产者-消费者模型
import threading
import time
from collections import deque
classProducerConsumer:
def__init__(self, capacity=10):
self.buffer = deque()
self.capacity = capacity
self.condition = threading.Condition()
defproduce(self, item):
withself.condition:
# 缓冲区满时等待
whilelen(self.buffer) >= self.capacity:
print("缓冲区已满,生产者等待...")
self.condition.wait()
self.buffer.append(item)
print(f"生产: {item}, 缓冲区大小: {len(self.buffer)}")
# 通知消费者
self.condition.notify_all()
defconsume(self):
withself.condition:
# 缓冲区空时等待
whilelen(self.buffer) == 0:
print("缓冲区为空,消费者等待...")
self.condition.wait()
item = self.buffer.popleft()
print(f"消费: {item}, 缓冲区大小: {len(self.buffer)}")
# 通知生产者
self.condition.notify_all()
return item
# 测试
pc = ProducerConsumer(capacity=5)
defproducer():
for i inrange(20):
pc.produce(f"产品-{i}")
time.sleep(0.1)
defconsumer():
for _ inrange(20):
item = pc.consume()
time.sleep(0.15)
# 启动生产者和消费者
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
技巧四:Event(事件)
场景: 线程间简单的信号通知
import threading
import time
# 创建事件对象
data_ready = threading.Event()
data = None
defdata_producer():
global data
time.sleep(2) # 模拟数据准备
data = {"status": "success", "value": 42}
# 设置事件,通知消费者数据已准备好
data_ready.set()
print("生产者: 数据已准备好")
defdata_consumer():
print("消费者: 等待数据...")
# 等待事件被设置
data_ready.wait()
print(f"消费者: 收到数据 {data}")
# 启动线程
t1 = threading.Thread(target=data_consumer)
t2 = threading.Thread(target=data_producer)
t1.start()
t2.start()
t1.join()
t2.join()
⚠️ 常见误区:这些错误你可能正在犯
误区一:忘记释放锁
错误代码:
lock = threading.Lock()
defdangerous_function():
lock.acquire()
# 做一些事情
if some_condition:
return# 💀 直接返回,锁永远不会释放!
# 更多代码...
lock.release()
正确做法:
defsafe_function():
lock.acquire()
try:
# 做一些事情
if some_condition:
return# ✅ 即使返回,finally也会执行
# 更多代码...
finally:
lock.release() # 确保锁被释放
# 或者使用上下文管理器(推荐)
defeven_better_function():
with lock:
# 做一些事情
if some_condition:
return# ✅ 自动释放锁
# 更多代码...
误区二:锁的粒度太大
错误代码: 锁住整个函数,性能极差
lock = threading.Lock()
defslow_function():
with lock: # 💀 整个函数都被锁住
# 一些不需要锁的操作
time.sleep(1) # 耗时操作
# 只有这一小部分需要锁
update_shared_data()
# 更多不需要锁的操作
process_data()
正确做法: 只锁住必要的部分
deffast_function():
# 不需要锁的操作
time.sleep(1)
prepare_data()
# 只锁住真正需要同步的部分
with lock:
update_shared_data()
# 不需要锁的操作
process_data()
误区三:在锁内调用外部函数
危险代码:
lock = threading.Lock()
deffunction_a():
with lock:
# 调用外部函数,不知道它内部是否也会获取锁
external_function() # 💀 可能导致死锁!
defexternal_function():
# 如果这个函数内部也尝试获取同一个锁...
with lock: # 💀 死锁!
pass
原则:
误区四:以为某些操作是线程安全的
常见误解:
# 很多人以为这些是线程安全的,其实不是!
# 1. 列表操作
shared_list = []
shared_list.append(item) # ❌ 不安全
shared_list.pop() # ❌ 不安全
# 2. 字典操作
shared_dict = {}
shared_dict[key] = value # ❌ 不安全(在resize时可能出问题)
value = shared_dict[key] # ❌ 不安全
# 3. 文件操作
withopen('file.txt', 'a') as f:
f.write('data') # ❌ 不安全
# 4. 甚至print()也不是完全安全的!
print("Hello") # ❌ 可能与其他线程的输出混在一起
正确做法: 对所有共享资源的访问都要加锁!
误区五:过度同步导致性能问题
错误模式:
# 每个微小的操作都加锁
lock = threading.Lock()
defover_synced():
for i inrange(1000000):
with lock:
x = i * 2# 💀 这根本不需要锁!
with lock:
y = x + 1# 💀 这也不需要!
with lock:
results.append(y) # 只有这里需要锁
优化策略:
- 3. 考虑无锁数据结构:如
queue.Queue、collections.deque(在特定操作下)
📌 总结:线程安全的五个黄金法则
- 1. 识别共享资源:所有能被多个线程访问的数据都是潜在风险点
- 2. 保护所有访问:对共享资源的每一次读写操作都要加锁
- 3. 锁的粒度要适中:太大会影响性能,太小容易遗漏
- 4. 使用高级同步工具:根据场景选择RLock、Semaphore、Condition等
- 5. 测试!测试!测试!:多线程bug难以复现,要充分测试并发场景
记住: 在多线程世界里,没有所谓的"大部分时候正确"。
要么完全正确,要么就是定时炸弹。
👋 行动引导:别光看,动手练!
觉得有收获?点赞👍 让更多人看到!
想以后复习?收藏⭐ 这篇文章!
想持续学习?关注🔔 Python小甲鱼!
评论区等你:
下期预告: 《异步编程:asyncio的十个高级技巧》
保证让你的异步代码飞起来!
版权声明:
本文为Python小甲鱼公众号原创文章,转载请注明出处。
技术无罪,但代码有对错。让我们一起写出更健壮的Python代码! 🐍