🐍 多线程 — 并发执行的入门
🕐 预计用时:2-3 小时 | 🎯 目标:掌握 threading 模块、Thread 类、Lock、GIL 概念
📖 今日目录
1. 什么是并发?为什么需要多线程?
并发(Concurrency)就是同时做多件事。Python 提供三种并发方式:
# 单线程:一个一个做
import time
def download(name, seconds):
print(f"⬇️ 开始下载 {name}...")
time.sleep(seconds) # 模拟下载耗时
print(f"✅ {name} 下载完成")
return name
start = time.time()
download("文件A", 2)
download("文件B", 2)
download("文件C", 2)
print(f"总耗时: {time.time() - start:.1f}秒")
# 总耗时: 6.0秒(串行,一个接一个)
# 多线程:同时做
import threading
import time
def download(name, seconds):
print(f"⬇️ 开始下载 {name}...")
time.sleep(seconds)
print(f"✅ {name} 下载完成")
start = time.time()
# 创建 3 个线程
t1 = threading.Thread(target=download, args=("文件A", 2))
t2 = threading.Thread(target=download, args=("文件B", 2))
t3 = threading.Thread(target=download, args=("文件C", 2))
# 启动线程
t1.start()
t2.start()
t3.start()
# 等待所有线程完成
t1.join()
t2.join()
t3.join()
print(f"总耗时: {time.time() - start:.1f}秒")
# 总耗时: 2.0秒(并行,同时下载!)
2. threading 模块基础
import threading
# 查看当前线程
print(threading.current_thread().name) # MainThread
# 查看活跃线程数量
print(threading.active_count()) # 1(只有主线程)
# 列出所有线程
for t in threading.enumerate():
print(f" {t.name} (daemon={t.daemon})")
两种创建线程的方式
import threading
# 方式1: 传入 target 函数
def worker():
print(f"Worker 线程: {threading.current_thread().name}")
t = threading.Thread(target=worker)
t.start()
t.join()
# 方式2: 继承 Thread 类(重写 run 方法)
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__(name=name)
self.delay = delay
def run(self):
"""线程执行的内容"""
print(f"{self.name} 开始")
import time
time.sleep(self.delay)
print(f"{self.name} 结束")
t = MyThread("自定义线程", 1)
t.start()
t.join()
3. Thread 类 — 创建线程
import threading
import time
class DownloadThread(threading.Thread):
def __init__(self, url, filename):
super().__init__()
self.url = url
self.filename = filename
self.result = None
def run(self):
print(f"⬇️ {self.filename} 开始下载 from {self.url}")
time.sleep(2) # 模拟下载
self.result = f"{self.filename} 已保存"
print(f"✅ {self.filename} 下载完成")
# 创建并启动
threads = []
urls = [
("https://example.com/a.jpg", "a.jpg"),
("https://example.com/b.jpg", "b.jpg"),
("https://example.com/c.jpg", "c.jpg"),
]
start = time.time()
for url, filename in urls:
t = DownloadThread(url, filename)
t.start()
threads.append(t)
# 等待所有完成
for t in threads:
t.join()
print(f" 结果: {t.result}")
print(f"总耗时: {time.time() - start:.1f}秒")
4. 线程参数与守护线程
线程参数
import threading
def worker(name, delay):
print(f"{name} 开始")
import time
time.sleep(delay)
print(f"{name} 结束")
# args: 位置参数
t1 = threading.Thread(target=worker, args=("线程A", 1))
# kwargs: 关键字参数
t2 = threading.Thread(target=worker, kwargs={"name": "线程B", "delay": 2})
# name: 线程名称
t3 = threading.Thread(target=worker, args=("线程C", 1), name="MyThread")
守护线程(Daemon Thread)
import threading
import time
def background_task():
"""后台守护任务"""
while True:
print("💓 心跳检测...")
time.sleep(1)
# 设置为守护线程
daemon = threading.Thread(target=background_task, daemon=True)
daemon.start()
# 主线程执行 3 秒后结束
time.sleep(3)
print("主线程结束")
# 守护线程会随主线程一起终止
⚠️ 守护线程 vs 非守护线程:
• 非守护线程(默认):主线程会等它结束
• 守护线程(daemon=True):主线程结束时自动终止
• 适合做后台任务:心跳检测、日志刷新、定时清理
5. 线程同步 — Lock 互斥锁
多个线程同时修改共享数据会出问题——需要锁来保护。
import threading
# ❌ 不加锁的问题:竞态条件(Race Condition)
counter = 0
def increment_no_lock(n):
global counter
for _ in range(n):
counter += 1 # 这不是原子操作!
threads = []
for _ in range(10):
t = threading.Thread(target=increment_no_lock, args=(100000,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"期望: 1000000, 实际: {counter}")
# 期望: 1000000, 实际: 938274(每次不同!数据丢失!)
import threading
# ✅ 加锁:保证数据安全
counter = 0
lock = threading.Lock()
def increment_with_lock(n):
global counter
for _ in range(n):
lock.acquire() # 获取锁
try:
counter += 1
finally:
lock.release() # 释放锁
# 更优雅的写法:用 with 语句
def increment_with_lock_v2(n):
global counter
for _ in range(n):
with lock: # 自动获取和释放锁
counter += 1
threads = []
for _ in range(10):
t = threading.Thread(target=increment_with_lock_v2, args=(100000,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"期望: 1000000, 实际: {counter}")
# 期望: 1000000, 实际: 1000000 ✅
💡 Lock 使用模式:
with lock: 比 lock.acquire() / lock.release() 更安全——即使中间出异常也会释放锁。
6. RLock — 可重入锁
import threading
# ❌ 普通 Lock 的死锁问题
lock = threading.Lock()
def outer():
with lock:
print("outer 获取锁")
inner() # 再次获取锁 → 死锁!
def inner():
with lock: # 已经被 outer 持有,永远拿不到
print("inner 获取锁")
# outer() # 死锁!程序卡住
# ✅ 用 RLock 解决(可重入锁)
rlock = threading.RLock()
def outer():
with rlock:
print("outer 获取锁")
inner() # 同一个线程可以再次获取
def inner():
with rlock:
print("inner 获取锁")
outer() # 正常运行!
⚠️ Lock vs RLock:
• Lock:只能获取一次,再次获取会死锁
• RLock:同一线程可以多次获取(计数+1),需要释放同样次数
• 简单场景用 Lock,函数可能递归调用时用 RLock
7. Condition — 线程通信
import threading
import time
import random
# 生产者-消费者模型
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
def producer(name):
for i in range(5):
with condition:
while len(buffer) >= MAX_SIZE:
print(f"📦 {name}: 缓冲区满,等待...")
condition.wait() # 等待消费者通知
item = f"{name}-产品{i}"
buffer.append(item)
print(f"📦 {name}: 生产 {item} | 缓冲区: {len(buffer)}")
condition.notify_all() # 通知消费者
time.sleep(random.uniform(0.1, 0.3))
def consumer(name):
for _ in range(5):
with condition:
while len(buffer) == 0:
print(f"🛒 {name}: 缓冲区空,等待...")
condition.wait() # 等待生产者通知
item = buffer.pop(0)
print(f"🛒 {name}: 消费 {item} | 缓冲区: {len(buffer)}")
condition.notify_all() # 通知生产者
time.sleep(random.uniform(0.1, 0.3))
# 启动
t1 = threading.Thread(target=producer, args=("生产者A",))
t2 = threading.Thread(target=producer, args=("生产者B",))
t3 = threading.Thread(target=consumer, args=("消费者X",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
8. 线程池 ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def download(url):
"""模拟下载"""
time.sleep(1) # 模拟耗时
return f"{url} 下载完成"
urls = [f"https://example.com/file{i}.jpg" for i in range(10)]
# 方式1: submit + as_completed
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(download, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
result = future.result()
print(f"✅ {result}")
print(f"耗时: {time.time() - start:.1f}秒")
# 10个任务,4个线程,约 3 秒(而不是 10 秒)
# 方式2: map(保持顺序)
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(download, urls)
for result in results:
print(f" {result}")
# 方式3: 带回调
def on_complete(future):
print(f"🔔 完成: {future.result()}")
with ThreadPoolExecutor(max_workers=4) as executor:
for url in urls:
future = executor.submit(download, url)
future.add_done_callback(on_complete)
💡 ThreadPoolExecutor 优势:
1. 自动管理线程生命周期
2. 限制最大线程数(防止资源耗尽)
3. 支持回调、异常处理
4. 比手动创建 Thread 更推荐
9. GIL — 全局解释器锁
GIL(Global Interpreter Lock)是 CPython 的一个互斥锁,同一时刻只允许一个线程执行 Python 字节码。
# GIL 的影响:CPU 密集型任务多线程反而更慢!
import threading
import time
def cpu_work(n):
"""CPU 密集型计算"""
total = 0
for i in range(n):
total += i * i
return total
# 单线程
start = time.time()
cpu_work(10_000_000)
cpu_work(10_000_000)
print(f"单线程: {time.time() - start:.2f}秒")
# 多线程
start = time.time()
t1 = threading.Thread(target=cpu_work, args=(10_000_000,))
t2 = threading.Thread(target=cpu_work, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"多线程: {time.time() - start:.2f}秒")
# 单线程: 1.20秒
# 多线程: 1.35秒(反而更慢!因为线程切换有开销)
⚠️ GIL 的结论:
• I/O 密集型(网络、文件)→ 多线程有效 ✅
• CPU 密集型(计算)→ 用多进程(Day37)✅
• GIL 只影响 CPython,PyPy/Jython 没有这个问题
10. 实战:多线程下载器
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import os
class MultiDownloader:
"""多线程文件下载器"""
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.results = []
self.lock = threading.Lock()
def download(self, url, save_dir="/tmp"):
"""下载单个文件"""
filename = os.path.basename(urlparse(url).path) or "index.html"
save_path = os.path.join(save_dir, filename)
# 模拟下载(实际用 requests.get)
time.sleep(1)
size = 1024 * 10 # 模拟 10KB
with self.lock:
self.results.append({
"url": url,
"filename": filename,
"size": size,
"status": "success"
})
return f"✅ {filename} ({size}B)"
def batch_download(self, urls):
"""批量下载"""
print(f"🚀 开始下载 {len(urls)} 个文件({self.max_workers} 线程)")
start = time.time()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.download, url): url for url in urls}
for future in as_completed(futures):
try:
result = future.result()
print(f" {result}")
except Exception as e:
url = futures[future]
print(f" ❌ {url}: {e}")
elapsed = time.time() - start
print(f"\n📊 完成: {len(self.results)}/{len(urls)} | 耗时: {elapsed:.1f}秒")
return self.results
# 使用
downloader = MultiDownloader(max_workers=4)
urls = [f"https://example.com/files/document_{i}.pdf" for i in range(10)]
downloader.batch_download(urls)
11. 今日小结
| | |
|---|
| | Thread(target=func) |
| | t.start() |
| | t.join() |
| | with lock: |
| | |
| | wait() |
| | executor.submit(func) |
| | |
🎯 练习建议:
1. 用线程池并发请求 10 个网页,统计响应时间
2. 实现一个线程安全的队列(生产者-消费者)
3. 对比单线程和多线程下载同一组文件的速度差异
📚 Day36 完成!明天学习多进程 — 真正的并行计算