并发编程的意义
并发编程能充分利用CPU资源,提升程序执行效率。在多核时代,掌握并发编程很重要。
# 串行执行
import time
deftask(name):
print(f"任务 {name} 开始")
time.sleep(1)
print(f"任务 {name} 结束")
start = time.time()
task("A")
task("B")
print(f"串行耗时: {time.time() - start:.2f}秒")
多线程编程
使用threading模块实现多线程。
import threading
deftask(name):
print(f"任务 {name} 开始")
time.sleep(1)
print(f"任务 {name} 结束")
start = time.time()
t1 = threading.Thread(target=task, args=("A",))
t2 = threading.Thread(target=task, args=("B",))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多线程耗时: {time.time() - start:.2f}秒")
多进程编程
使用multiprocessing模块实现多进程。
import multiprocessing
deftask(name):
print(f"任务 {name} 开始")
time.sleep(1)
print(f"任务 {name} 结束")
if __name__ == "__main__":
start = time.time()
p1 = multiprocessing.Process(target=task, args=("A",))
p2 = multiprocessing.Process(target=task, args=("B",))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"多进程耗时: {time.time() - start:.2f}秒")
线程池与进程池
使用concurrent.futures提高效率。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
defcompute(n):
return sum(i * i for i in range(n))
# 线程池
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(compute, 1000000) for _ in range(4)]
results = [f.result() for f in futures]
# 进程池
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(compute, 1000000) for _ in range(4)]
results = [f.result() for f in futures]
GIL锁问题解析
理解Python的全局解释器锁。
# CPU密集型任务 - 多线程效率不高
defcpu_bound(n):
count = 0
for _ in range(n):
count += 1
return count
# IO密集型任务 - 多线程效果好
defio_bound(url):
import requests
response = requests.get(url)
return response.status_code
线程间通信
使用Queue实现线程间数据传递。
from queue import Queue
import threading
defproducer(q):
for i in range(5):
q.put(i)
print(f"生产: {i}")
time.sleep(0.5)
defconsumer(q):
whileTrue:
item = q.get()
print(f"消费: {item}")
if item == 4:
break
q = Queue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))
t1.start()
t2.start()
进程间通信
使用Pipe或Queue实现进程间通信。
from multiprocessing import Process, Pipe
defsender(conn):
conn.send("Hello from sender")
conn.close()
defreceiver(conn):
msg = conn.recv()
print(f"收到消息: {msg}")
conn.close()
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
实战案例:并行数据处理
用多进程处理CPU密集型任务。
import numpy as np
defprocess_chunk(chunk):
return np.sqrt(chunk) ** 2
defparallel_process(data, num_processes=4):
chunks = np.array_split(data, num_processes)
with ProcessPoolExecutor(max_workers=num_processes) as executor:
results = list(executor.map(process_chunk, chunks))
return np.concatenate(results)
data = np.random.rand(1000000)
result = parallel_process(data)
对比总结