
大家好,我是煜道。
今天我们一起来学习 并发编程——多线程、GIL与asyncio。
引言
并发编程是现代软件开发中的重要话题,它允许程序同时执行多个任务,提高资源利用率和用户体验。 Python提供了多种并发编程方式,包括多线程(threading)、多进程(multiprocessing)和异步编程(asyncio)。每种方式都有其适用场景和优缺点,理解它们的工作原理对于编写高效的Python程序至关重要。
本文将深入探讨Python的并发编程模型,包括GIL(全局解释器锁)对多线程的影响、线程同步机制、进程通信以及asyncio异步编程。通过系统学习,我们将能够根据实际需求选择合适的并发方案。

01 全局解释器锁(GIL)
1.1 什么是GIL
GIL是CPython解释器中的一个互斥锁,确保同一时刻只有一个线程执行Python字节码:
import threadingimport time# 演示GIL的影响defcpu_bound_task():"""CPU密集型任务""" result = 0for i in range(10**7): result += ireturn resultdefio_bound_task():"""IO密集型任务""" time.sleep(0.1)return"IO done"# 单线程执行start = time.time()cpu_bound_task()io_bound_task()print(f"Single thread: {time.time() - start:.2f}s")# 多线程执行defrun_threads(): threads = []for _ in range(2): t1 = threading.Thread(target=cpu_bound_task) t2 = threading.Thread(target=io_bound_task) threads.extend([t1, t2]) t1.start() t2.start()for t in threads: t.join()start = time.time()run_threads()print(f"Multi thread: {time.time() - start:.2f}s")

1.2 GIL的影响
1.3 GIL为什么存在
GIL简化了CPython的实现:

02 多线程编程
2.1 创建线程
import threadingimport timedeftask(name, delay): print(f"Task {name} started") time.sleep(delay) print(f"Task {name} completed")# 方法1:函数作为目标thread = threading.Thread(target=task, args=("A", 1))thread.start()thread.join()# 方法2:继承Thread类classCustomThread(threading.Thread):def__init__(self, name, delay): super().__init__() self.name = name self.delay = delaydefrun(self): task(self.name, self.delay)thread = CustomThread("B", 1)thread.start()thread.join()
2.2 线程同步
import threadingimport time# 1. Lock - 互斥锁lock = threading.Lock()defsafe_increment(counter, name):with lock: counter[0] += 1 print(f"{name}: {counter[0]}")counter = [0]threads = []for i in range(10): t = threading.Thread(target=safe_increment, args=(counter, f"T{i}")) threads.append(t) t.start()for t in threads: t.join()print(f"Final: {counter[0]}") # 10# 2. RLock - 可重入锁rlock = threading.RLock()defrecursive_task(n):with rlock:if n > 0: recursive_task(n - 1)# 3. Event - 事件event = threading.Event()defwaiter(): print("Waiting for event...") event.wait() print("Event received!")defsetter(): time.sleep(1) event.set()threading.Thread(target=waiter).start()threading.Thread(target=setter).start()# 4. Condition - 条件变量condition = threading.Condition()defconsumer():with condition:whilenot hasattr(consumer, 'data'): condition.wait() print(f"Consumed: {consumer.data}") delattr(consumer, 'data')defproducer(): time.sleep(1)with condition: consumer.data = "produced" condition.notify()threading.Thread(target=consumer).start()threading.Thread(target=producer).start()

2.3 线程安全的数据结构
import queueimport threading# Queue - 线程安全的队列q = queue.Queue()defproducer():for i in range(5): q.put(i) print(f"Produced: {i}")defconsumer():whileTrue: item = q.get()if item == 4: # 结束标记 q.task_done()break print(f"Consumed: {item}") q.task_done()threading.Thread(target=producer).start()threading.Thread(target=consumer).start()q.join()
2.4 ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutorimport timedeftask(n): time.sleep(0.5)return n ** 2# 使用线程池with ThreadPoolExecutor(max_workers=3) as executor:# 提交多个任务 futures = [executor.submit(task, i) for i in range(5)]# 获取结果for future in futures: print(f"Result: {future.result()}")# 使用mapwith ThreadPoolExecutor(max_workers=2) as executor: results = list(executor.map(task, range(5))) print(results) # [0, 1, 4, 9, 16]

03 多进程编程
3.1 创建进程
import multiprocessingimport timedefprocess_task(name, delay): print(f"Process {name} started") time.sleep(delay) print(f"Process {name} completed")if __name__ == '__main__':# 创建进程 process = multiprocessing.Process(target=process_task, args=("A", 1)) process.start() process.join()
3.2 进程间通信
import multiprocessing# 1. Queue - 进程安全队列defproducer(q):for i in range(5): q.put(i) q.put(None) # 结束标记defconsumer(q):whileTrue: item = q.get()if item isNone:break print(f"Got: {item}")# 2. Pipe - 双工通信defsender(conn): conn.send("Hello from sender") conn.close()defreceiver(conn): msg = conn.recv() print(f"Received: {msg}")if __name__ == '__main__': q = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(q,)) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join() parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
3.3 共享内存
import multiprocessing# 1. Value - 共享值counter = multiprocessing.Value('i', 0)defincrement(c):with c.get_lock(): c.value += 1if __name__ == '__main__': processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(10)]for p in processes: p.start()for p in processes: p.join() print(f"Counter: {counter.value}") # 10# 2. Array - 共享数组 arr = multiprocessing.Array('i', [1, 2, 3, 4, 5]) print(arr[:]) # [1, 2, 3, 4, 5]
3.4 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutorimport timedefcpu_bound_task(n):return sum(i * i for i in range(n))# 使用进程池(绕过GIL限制)if __name__ == "__main__":with ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(cpu_bound_task, [10**6, 10**6, 10**6, 10**6])) print(results)

04 asyncio异步编程
4.1 异步基础
import asyncioasyncdefasync_task(name, delay): print(f"Task {name} started")await asyncio.sleep(delay) # 异步等待 print(f"Task {name} completed")returnf"{name} done"# 运行异步任务asyncdefmain():# 创建任务 task1 = asyncio.create_task(async_task("A", 1)) task2 = asyncio.create_task(async_task("B", 0.5))# 等待任务完成 results = await asyncio.gather(task1, task2) print(f"Results: {results}")asyncio.run(main())
4.2 async/await语法
import asyncioasyncdeffetch_data(url): print(f"Fetching {url}")await asyncio.sleep(1) # 模拟网络请求return {"url": url, "data": "some data"}asyncdefmain():# 并发执行多个请求 urls = ["url1", "url2", "url3"] tasks = [fetch_data(url) for url in urls]# 使用asyncio.gather并发执行 results = await asyncio.gather(*tasks)for result in results: print(f"Got: {result['url']}")asyncio.run(main())

4.3 异步上下文管理器
import asyncioclassAsyncContextManager:asyncdef__aenter__(self): print("Entering context")return selfasyncdef__aexit__(self, exc_type, exc_val, exc_tb): print("Exiting context")asyncdefmain():asyncwith AsyncContextManager() as cm: print("Inside context")asyncio.run(main())
4.4 异步迭代器
import asyncioclassAsyncIterator:def__init__(self, data): self.data = data self.index = 0def__aiter__(self):return selfasyncdef__anext__(self):if self.index >= len(self.data):raise StopAsyncIteration item = self.data[self.index] self.index += 1await asyncio.sleep(0.1) # 模拟异步操作return itemasyncdefmain():asyncfor item in AsyncIterator([1, 2, 3, 4, 5]): print(item)asyncio.run(main())
4.5 asyncio同步原语
import asyncio# Lock - 异步锁asyncdefasync_task(lock, name):asyncwith lock: print(f"{name} acquired lock")await asyncio.sleep(0.5) print(f"{name} released lock")asyncdefmain(): lock = asyncio.Lock()await asyncio.gather( async_task(lock, "A"), async_task(lock, "B") )asyncio.run(main())# Semaphore - 限制并发数asyncdeflimited_task(semaphore, n):asyncwith semaphore: print(f"Task {n} started")await asyncio.sleep(1) print(f"Task {n} completed")asyncdefmain(): sem = asyncio.Semaphore(2) # 最多2个并发await asyncio.gather( *[limited_task(sem, i) for i in range(5)] )asyncio.run(main())
4.6 aiohttp异步HTTP客户端
import asyncioimport aiohttpasyncdeffetch_url(session, url):asyncwith session.get(url) as response:returnf"{url}: {response.status}"asyncdefmain(): urls = ["https://httpbin.org/get","https://httpbin.org/ip","https://httpbin.org/headers" ]asyncwith aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks)for result in results: print(result)asyncio.run(main())

05 并发方案选择
5.1 场景对比
| | |
|---|
| | |
| | |
| | |
| | |
| asyncio (FastAPI/aiohttp) | |

5.2 性能对比示例
import asyncioimport threadingimport multiprocessingimport timeasyncdefasync_task():await asyncio.sleep(0.1)return"async"defthread_task(): time.sleep(0.1)return"thread"defprocess_task(): time.sleep(0.1)return"process"asyncdefbenchmark_async(n): start = time.time()await asyncio.gather(*[async_task() for _ in range(n)])return time.time() - startdefbenchmark_thread(n): start = time.time() threads = [threading.Thread(target=thread_task) for _ in range(n)]for t in threads: t.start()for t in threads: t.join()return time.time() - startdefbenchmark_process(n): start = time.time() processes = [multiprocessing.Process(target=process_task) for _ in range(n)]for p in processes: p.start()for p in processes: p.join()return time.time() - startif __name__ == "__main__": n = 100 print(f"Async: {asyncio.run(benchmark_async(n)):.2f}s") print(f"Thread: {benchmark_thread(n):.2f}s") print(f"Process: {benchmark_process(n):.2f}s")

06 小结
本文介绍了Python的并发编程模型:
- 多线程:threading模块、线程同步、ThreadPoolExecutor。
- 多进程:multiprocessing模块、进程通信、ProcessPoolExecutor。
- asyncio:async/await语法、异步任务、异步上下文管理器。
选择合适的并发方案需要根据具体场景:
