GIL(Global Interpreter Lock)全局解释器锁,它本质是一个互斥锁,用户保护Python对象,防止多个线程同时执行Python字节码.目的是为了保证内存安全(防止多线程同时修改引用计数导致的内存泄漏或错误)每个python线程在执行前必须先获取GIL,执行完一定数量的字节码释放GIL,其他线程获取GIL.如下图CPython中有一个机制叫check_interval.CPython解释器会去轮询检查线程GIL的锁住情况.每个一段时间,Python解释器就会强制当前线程去释放GIL,这样别的线程才能有机会执行.不同版本的Python中check_interval实现方式不一样,早起的Python是100个ticks,而Python3以后,interval是15毫秒CPython是Python的默认实现,它的核心是C语言编写的,所以叫C +Python=CPython.它的主要作用是把Python代码转换成计算机能执行的指令,工作流程如下解析Python代码-->变成抽象语法树(AST)Python的线程安全主要依赖全局解释器(GIL)和同步原语(锁,信号量等)操作类型 | 是否原子操作(GIL 保护) | 是否需要额外同步 | 示例 |
简单变量赋值 | ✅是 | ❌不需要 | x = 10 |
list.append() | ✅是 | ❌不需要 | lst.append(1) |
dict[key] = value | ✅是 | ❌不需要 | d["k"] = "v" |
+= / -= | ❌否 | ✅需要锁 | counter += 1 |
复合操作(先读后写) | ❌否 | ✅需要锁 | if x in lst: lst.remove(x) |
文件读写 | ❌否 | ✅需要锁 | f.write("data") |
defaultdict 操作 | ❌否 | ✅需要锁 | dd[key].append(x) |
x = 10 # 原子操作(线程安全)y = x # 原子操作(线程安全)
2).列表(list)的append()和extendlst = []def safe_append(item): lst.append(item) # ✅ 原子操作(GIL 保护)t1 = threading.Thread(target=safe_append, args=(1,))t2 = threading.Thread(target=safe_append, args=(2,))t1.start(); t2.start()t1.join(); t2.join()print(lst) # 输出可能是 [1, 2] 或 [2, 1],但不会损坏
d = {}def safe_dict_write(key, value): d[key] = value # ✅ 原子操作(GIL 保护)t1 = threading.Thread(target=safe_dict_write, args=("a", 1))t2 = threading.Thread(target=safe_dict_write, args=("b", 2))t1.start(); t2.start()t1.join(); t2.join()print(d) # 输出 {'a': 1, 'b': 2}(顺序可能不同,但数据完整)
s = set()def safe_set_add(item): s.add(item) # ✅ 原子操作(GIL 保护)t1 = threading.Thread(target=safe_set_add, args=(1,))t2 = threading.Thread(target=safe_set_add, args=(2,))t1.start(); t2.start()t1.join(); t2.join()print(s) # 输出 {1, 2} 或 {2, 1}(不会丢失数据)
from queue import Queueq = Queue()def producer(): for i in range(10): q.put(i) # 线程安全def consumer(): while True: item = q.get() # 线程安全 print(f"Got {item}") q.task_done()threading.Thread(target=producer).start()threading.Thread(target=consumer, daemon=True).start()q.join() # 等待所有任务完成
threading.lock互斥锁,确保同一时间只有一个线程能访问共享资源lock = threading.Lock()counter = 0def safe_increment(): global counter for _ in range(100_000): with lock: # ✅ 加锁保证原子性 counter += 1t1 = threading.Thread(target=safe_increment)t2 = threading.Thread(target=safe_increment)t1.start(); t2.start()t1.join(); t2.join()print(counter) # 正确输出 200_000
lock.acquire()try: # 代码块finally: lock.release() # 确保锁一定会释放
lock = threading.Lock()lst = [1, 2, 3]def safe_remove(): lock.acquire() # ✅ 加锁保护 try: if lst: lst.pop() finally: lock.release() t1 = threading.Thread(target=safe_remove)t2 = threading.Thread(target=safe_remove)t1.start(); t2.start()t1.join(); t2.join()print(lst) # 正确输出 [1] 或 [1, 2](取决于执行顺序)
sem = threading.Semaphore(3) # 最多允许 3 个线程同时运行def worker(id): with sem: print(f"Thread {id} working...") time.sleep(1)for i in range(10): threading.Thread(target=worker, args=(i,)).start()
from threading import Barrierbarrier = Barrier(3) # 需要 3 个线程到达屏障点def task(): print("Phase 1") barrier.wait() # 等待所有线程完成 Phase 1 print("Phase 2")for i in range(3): threading.Thread(target=task).start()
from threading import Eventevent = Event()def waiter(): print("Waiting for event...") event.wait() # 阻塞,直到 event.set() 被调用 print("Event triggered!")def setter(): time.sleep(2) event.set() # 唤醒所有等待的线程threading.Thread(target=waiter).start()threading.Thread(target=setter).start()