1. 创始时间与作者
2. 官方资源
3. 核心功能
4. 应用场景
1. 生产者-消费者模式
import queueimport threadingimport time# 创建队列q = queue.Queue(maxsize=10)def producer():"""生产者:生成数据放入队列"""for i in range(10):item = f"产品-{i}"q.put(item)print(f"生产: {item}")time.sleep(0.1)def consumer():"""消费者:从队列取出数据处理"""while True:item = q.get()if item is None: # 终止信号breakprint(f"消费: {item}")q.task_done()# 创建并启动线程producer_thread = threading.Thread(target=producer)consumer_thread = threading.Thread(target=consumer)producer_thread.start()consumer_thread.start()# 等待生产者完成producer_thread.join()# 发送终止信号q.put(None)consumer_thread.join()2. 线程池任务分发
import queueimport threadingimport concurrent.futuresclass ThreadPool:def __init__(self, num_workers):self.task_queue = queue.Queue()self.workers = []self._create_workers(num_workers)def _create_workers(self, num_workers):for _ in range(num_workers):worker = threading.Thread(target=self._worker)worker.daemon = Trueworker.start()self.workers.append(worker)def _worker(self):while True:task, args, kwargs = self.task_queue.get()try:task(*args, **kwargs)except Exception as e:print(f"任务执行失败: {e}")finally:self.task_queue.task_done()def submit(self, task, *args, **kwargs):self.task_queue.put((task, args, kwargs))def wait_completion(self):self.task_queue.join()# 使用示例pool = ThreadPool(4)def process_data(data):print(f"处理数据: {data}")for i in range(20):pool.submit(process_data, i)pool.wait_completion()3. 优先级任务调度
import queueimport threadingimport timeclass Task:def __init__(self, priority, name):self.priority = priorityself.name = namedef __lt__(self, other):return self.priority<other.prioritydef __repr__(self):return f"Task({self.priority}, {self.name})"# 创建优先级队列pq = queue.PriorityQueue()# 添加任务(优先级数字越小优先级越高)pq.put(Task(3, "低优先级任务"))pq.put(Task(1, "高优先级任务"))pq.put(Task(2, "中优先级任务"))# 处理任务while not pq.empty():task = pq.get()print(f"处理: {task}")pq.task_done()4. Web请求队列
import queueimport threadingimport requestsclass RequestQueue:def __init__(self, max_concurrent=5):self.queue = queue.Queue()self.semaphore = threading.Semaphore(max_concurrent)self.results = []self.lock = threading.Lock()def add_request(self, url, params=None):self.queue.put((url, params))def worker(self):while True:try:url, params = self.queue.get(timeout=1)except queue.Empty:breakwith self.semaphore:try:response = requests.get(url, params=params, timeout=5)with self.lock:self.results.append({'url': url,'status': response.status_code,'data': response.json() if response.status_code == 200 else None })except Exception as e:print(f"请求失败 {url}: {e}")self.queue.task_done()def process(self, num_workers=10):workers = []for _ in range(num_workers):worker = threading.Thread(target=self.worker)worker.start()workers.append(worker)# 等待所有任务完成self.queue.join()# 停止工作线程for _ in range(num_workers):self.queue.put((None, None))for worker in workers:worker.join()return self.results# 使用示例rq = RequestQueue(max_concurrent=3)urls = ["https://api.github.com/users/octocat","https://api.github.com/users/torvalds","https://api.github.com/users/guido"]for url in urls:rq.add_request(url)results = rq.process()print(f"完成 {len(results)} 个请求")
5. 底层逻辑与技术原理
核心架构
关键技术
线程同步机制:
# queue.Queue 的核心同步实现class Queue:def __init__(self, maxsize=0):self.maxsize = maxsizeself._init(maxsize)# 互斥锁,保护队列操作self.mutex = threading.Lock()# 条件变量,用于通知队列非空self.not_empty = threading.Condition(self.mutex)# 条件变量,用于通知队列非满self.not_full = threading.Condition(self.mutex)# 条件变量,用于通知所有任务完成self.all_tasks_done = threading.Condition(self.mutex)self.unfinished_tasks = 0
阻塞与超时机制:
def put(self, item, block=True, timeout=None):with self.not_full:if self.maxsize>0:if not block:if self._qsize() >= self.maxsize:raise Fullelif timeout is None:while self._qsize() >= self.maxsize:self.not_full.wait()elif timeout<0:raise ValueError("'timeout' must be a non-negative number")else:endtime = time() +timeoutwhile self._qsize() >= self.maxsize:remaining = endtime-time()if remaining<= 0.0:raise Fullself.not_full.wait(remaining)self._put(item)self.unfinished_tasks += 1self.not_empty.notify()任务完成跟踪:
def task_done(self):with self.all_tasks_done:unfinished = self.unfinished_tasks-1if unfinished<= 0:if unfinished<0:raise ValueError('task_done() called too many times')self.all_tasks_done.notify_all()self.unfinished_tasks = unfinisheddef join(self):with self.all_tasks_done:while self.unfinished_tasks:self.all_tasks_done.wait()不同队列类型的实现:
# FIFO 队列 - 使用 dequeclass Queue:def _init(self, maxsize):self.queue = deque()# LIFO 队列 - 使用 listclass LifoQueue(Queue):def _init(self, maxsize):self.queue = list()# 优先级队列 - 使用 heapqclass PriorityQueue(Queue):def _init(self, maxsize):self.queue = []def _put(self, item):heapq.heappush(self.queue, item)def _get(self):return heapq.heappop(self.queue)
6. 安装与配置
基础安装
# queue 是 Python 标准库的一部分,无需安装# Python 3.x 中直接导入即可# 检查 Python 版本python --version# Python 3.6+ 都包含完整的 queue 模块
导入方式
# Python 3.x 导入方式import queue# Python 2.x 导入方式(兼容性)try:import queue# Python 3except ImportError:import Queue as queue# Python 2
模块结构
# queue 模块主要类from queue import (Queue, # FIFO队列(先进先出)LifoQueue, # LIFO队列(后进先出,栈)PriorityQueue, # 优先级队列SimpleQueue, # 简单队列(Python 3.7+)Empty, # 空队列异常Full, # 满队列异常)
环境要求
| 组件 | 最低要求 | 推荐配置 |
|---|
| Python 版本 | 3.0+ | 3.7+ |
| 操作系统 | 任意支持 Python 的系统 | Linux/macOS/Windows |
| 内存 | 无特殊要求 | 根据队列大小调整 |
| 线程支持 | 需要线程支持 | 标准 threading 模块 |
7. 性能指标
队列操作时间复杂度
| 操作 | Queue | LifoQueue | PriorityQueue | SimpleQueue |
|---|
| put() | O(1) | O(1) | O(log n) | O(1) |
| get() | O(1) | O(1) | O(log n) | O(1) |
| qsize() | O(1) | O(1) | O(1) | O(1) |
| empty() | O(1) | O(1) | O(1) | O(1) |
并发性能测试
import queueimport threadingimport timefrom concurrent.futures import ThreadPoolExecutordef benchmark_queue(queue_class, num_producers=3, num_consumers=3, num_items=10000):"""基准测试:不同队列的性能比较"""q = queue_class()results = []def producer(pid):for i in range(num_items):q.put((pid, i))def consumer(cid):count = 0while count<num_items*num_producers//num_consumers:q.get()q.task_done()count += 1start = time.time()# 创建生产者线程producers = []for i in range(num_producers):t = threading.Thread(target=producer, args=(i,))t.start()producers.append(t)# 创建消费者线程consumers = []for i in range(num_consumers):t = threading.Thread(target=consumer, args=(i,))t.start()consumers.append(t)# 等待生产者完成for t in producers:t.join()# 等待消费者完成q.join()end = time.time()return {'queue_type': queue_class.__name__,'time': end-start,'throughput': (num_items*num_producers) / (end-start) }# 测试不同队列性能queues_to_test = [queue.Queue, queue.LifoQueue, queue.PriorityQueue]for q_class in queues_to_test:result = benchmark_queue(q_class, num_items=5000)print(f"{result['queue_type']}: {result['time']:.2f}s, "f"吞吐量: {result['throughput']:.0f} 操作/秒")
8. 高级功能使用
1. 自定义队列
import queueimport heapqfrom datetime import datetimeclass TimedPriorityQueue(queue.PriorityQueue):"""带时间戳的优先级队列"""def _put(self, item):# item: (priority, timestamp, data)entry = [item[0], item[1], item[2], self._counter]heapq.heappush(self.queue, entry)self._counter += 1def get_oldest(self, max_age_seconds=3600):"""获取超过指定时间的旧项目"""now = datetime.now().timestamp()old_items = []with self.mutex:# 临时检查队列中的项目temp_queue = list(self.queue)for entry in temp_queue:priority, timestamp, data, count = entryif now-timestamp>max_age_seconds:old_items.append((priority, timestamp, data))# 从堆中移除self.queue.remove(entry)# 重新堆化heapq.heapify(self.queue)return old_items
2. 批量操作队列
class BatchQueue(queue.Queue):"""支持批量操作的队列"""def put_batch(self, items, block=True, timeout=None):"""批量放入项目"""with self.not_full:if self.maxsize>0:space_needed = len(items)if self._qsize() +space_needed>self.maxsize:if not block:raise queue.Full# 计算可用空间while self._qsize() +space_needed>self.maxsize:self.not_full.wait()for item in items:self._put(item)self.unfinished_tasks += 1self.not_empty.notify_all()def get_batch(self, batch_size=10, block=True, timeout=None):"""批量获取项目"""items = []with self.not_empty:if not block and self._qsize() == 0:raise queue.Emptyendtime = Noneif timeout is not None:endtime = time.time() +timeout# 收集项目直到达到batch_size或超时while len(items) <batch_size:if self._qsize() == 0:if timeout is None:while self._qsize() == 0:self.not_empty.wait()elif endtime is not None:remaining = endtime-time.time()if remaining<= 0.0:breakself.not_empty.wait(remaining)else:breakif self._qsize() >0:items.append(self._get())self.not_full.notify()if items:self.unfinished_tasks += len(items)return items
3. 监控队列
import queueimport threadingimport timefrom collections import defaultdictclass MonitoredQueue(queue.Queue):"""可监控的队列,统计使用情况"""def __init__(self, maxsize=0):super().__init__(maxsize)self.stats = defaultdict(int)self.stats_lock = threading.Lock()self.history = []def _update_stats(self, operation):with self.stats_lock:self.stats[operation] += 1self.history.append({'time': time.time(),'operation': operation,'size': self.qsize(),'unfinished': self.unfinished_tasks })# 保持最近1000条记录if len(self.history) >1000:self.history.pop(0)def put(self, item, block=True, timeout=None):super().put(item, block, timeout)self._update_stats('put')def get(self, block=True, timeout=None):item = super().get(block, timeout)self._update_stats('get')return itemdef get_stats(self):with self.stats_lock:return dict(self.stats)def get_queue_health(self):"""获取队列健康状态"""stats = self.get_stats()total_ops = sum(stats.values())if total_ops == 0:return "空闲"put_ratio = stats.get('put', 0) /total_opsget_ratio = stats.get('get', 0) /total_opscurrent_size = self.qsize()max_size = self.maxsize if self.maxsize>0 else float('inf')utilization = current_size/max_sizeifmax_size!= float('inf') else 0return {'total_operations': total_ops,'put_ratio': put_ratio,'get_ratio': get_ratio,'current_size': current_size,'utilization': utilization,'health_status': '正常' if utilization<0.8 else '警告' if utilization<0.95 else '危险' }4. 队列与协程集成
import asyncioimport queueimport threadingclass AsyncQueueAdapter:"""将同步queue适配到asyncio"""def __init__(self, maxsize=0):self.sync_queue = queue.Queue(maxsize)self.loop = asyncio.get_event_loop()async def put(self, item):"""异步放入项目"""return await self.loop.run_in_executor(None, self.sync_queue.put, item )async def get(self):"""异步获取项目"""return await self.loop.run_in_executor(None, self.sync_queue.get )async def join(self):"""异步等待所有任务完成"""while self.sync_queue.unfinished_tasks>0:await asyncio.sleep(0.1)def task_done(self):self.sync_queue.task_done()# 使用示例async def async_producer(aq, count):for i in range(count):await aq.put(f"item-{i}")await asyncio.sleep(0.01)async def async_consumer(aq, name):while True:try:item = await asyncio.wait_for(aq.get(), timeout=1.0)print(f"{name} 获取: {item}")aq.task_done()except asyncio.TimeoutError:breakasync def main():aq = AsyncQueueAdapter(maxsize=10)# 创建生产者和消费者任务producer_task = asyncio.create_task(async_producer(aq, 50))consumer_tasks = [asyncio.create_task(async_consumer(aq, f"consumer-{i}"))for i in range(3) ]# 等待生产者完成await producer_task# 等待所有项目被消费awaitaq.join()# 取消消费者任务for task in consumer_tasks:task.cancel()# 等待消费者任务结束await asyncio.gather(*consumer_tasks, return_exceptions=True)# 运行asyncio.run(main())
9. 与同类工具对比
| 特性 | queue.Queue | multiprocessing.Queue | asyncio.Queue | Redis Queue |
|---|
| 进程间通信 | ❌(仅线程) | ✅ | ❌(仅协程) | ✅ |
| 线程安全 | ✅ | ✅ | ✅(在单线程内) | ✅ |
| 网络支持 | ❌ | ✅(通过管道) | ❌ | ✅ |
| 持久化 | ❌ | ❌ | ❌ | ✅ |
| 分布式 | ❌ | ❌ | ❌ | ✅ |
| 最大大小 | 可配置 | 可配置 | 可配置 | 内存/磁盘限制 |
| 性能 | 高 | 中等 | 非常高 | 中等(网络开销) |
| 复杂度 | 低 | 中等 | 低 | 高 |
| 适用场景 | 单进程多线程 | 多进程 | 异步编程 | 分布式系统 |
10. 最佳实践与常见问题
1. 死锁避免
# 错误示例:可能导致死锁q1 = queue.Queue()q2 = queue.Queue()def worker1():item = q1.get() # 等待q1# 处理item...q2.put(result) # 可能阻塞如果q2满q1.task_done()def worker2():item = q2.get() # 等待q2# 处理item...q1.put(result) # 可能阻塞如果q1满q2.task_done()# 正确做法:使用超时和错误处理def safe_worker():try:item = q1.get(timeout=5.0) # 设置超时# 处理item...try:q2.put(result, timeout=2.0)except queue.Full:print("队列已满,处理重试逻辑")# 重试或记录日志q1.task_done()except queue.Empty:print("获取项目超时")2. 队列大小管理
class AdaptiveQueue(queue.Queue):"""自适应队列,根据负载动态调整"""def __init__(self, initial_maxsize=10):super().__init__(maxsize=initial_maxsize)self.adjustment_lock = threading.Lock()self.last_adjustment = time.time()self.load_history = []def monitor_and_adjust(self):"""监控队列负载并调整大小"""with self.adjustment_lock:now = time.time()# 每分钟调整一次if now - self.last_adjustment<60:returncurrent_size = self.qsize()max_size = self.maxsize# 计算负载因子load_factor = current_size/max_size if max_size>0 else 0# 记录负载历史self.load_history.append((now, load_factor))# 保持最近100个记录if len(self.load_history) >100:self.load_history.pop(0)# 动态调整队列大小if load_factor>0.8: # 负载过高new_size = min(max_size*2, 1000) # 最大1000if new_size>max_size:self.maxsize = new_sizeprint(f"队列负载高({load_factor:.2f}),增加大小到{new_size}")elif load_factor<0.2 and max_size>10: # 负载过低new_size = max(max_size//2, 10) # 最小10if new_size<max_size:self.maxsize = new_sizeprint(f"队列负载低({load_factor:.2f}),减少大小到{new_size}")self.last_adjustment = now3. 优雅关闭
import signalclass GracefulQueueManager:"""优雅关闭的队列管理器"""def __init__(self):self.task_queue = queue.Queue()self.workers = []self.shutdown_requested = Falseself.shutdown_lock = threading.Lock()# 注册信号处理器signal.signal(signal.SIGINT, self.signal_handler)signal.signal(signal.SIGTERM, self.signal_handler)def signal_handler(self, signum, frame):"""处理关闭信号"""print(f"收到信号 {signum},正在优雅关闭...")with self.shutdown_lock:self.shutdown_requested = True# 发送终止信号给所有工作线程for _ in self.workers:self.task_queue.put(None)def worker(self, worker_id):"""工作线程"""print(f"工作线程 {worker_id} 启动")while True:# 检查关闭请求with self.shutdown_lock:if self.shutdown_requested:breaktry:task = self.task_queue.get(timeout=1.0)# None 是终止信号if task is None:self.task_queue.task_done()break# 执行任务task()self.task_queue.task_done()except queue.Empty:continueexcept Exception as e:print(f"工作线程 {worker_id} 异常: {e}")print(f"工作线程 {worker_id} 关闭")def start(self, num_workers=4):"""启动工作线程"""for i in range(num_workers):worker_thread = threading.Thread(target=self.worker,args=(i,),daemon=True )worker_thread.start()self.workers.append(worker_thread)def wait_for_completion(self):"""等待所有工作完成"""self.task_queue.join()# 等待所有工作线程结束for worker in self.workers:worker.join(timeout=5.0)4. 性能优化技巧
# 1. 批量处理减少锁竞争class BatchProcessor:def __init__(self, process_batch_size=100):self.queue = queue.Queue()self.batch_size = process_batch_sizeself.current_batch = []def add_item(self, item):self.queue.put(item)def process_batch(self):"""批量处理项目"""batch = []# 收集一批项目,减少锁竞争while len(batch) < self.batch_size:try:item = self.queue.get_nowait()batch.append(item)except queue.Empty:breakif batch:# 批量处理self._process_items_batch(batch)# 标记所有任务完成for _ in batch:self.queue.task_done()def _process_items_batch(self, batch):# 批量处理逻辑pass# 2. 使用SimpleQueue提高性能(Python 3.7+)# SimpleQueue更简单更快,但没有task_done和join方法from queue import SimpleQueuesq = SimpleQueue() # 更快的队列实现# 3. 避免不必要的队列操作# 错误做法:频繁检查队列状态while not q.empty(): # 这行代码可能线程不安全item = q.get()process(item)# 正确做法:使用get()阻塞while True:try:item = q.get(timeout=1.0)process(item)q.task_done()except queue.Empty:break
11. 企业级应用案例
Web服务器请求处理
大数据处理管道
实时交易系统
云计算任务调度
总结
Python queue 模块是多线程编程的核心基础设施,核心价值在于:
线程安全:内置锁机制确保多线程环境下的数据安全
生产者-消费者模式:简化并发编程模型
阻塞与非阻塞操作:灵活的任务调度控制
多种队列类型:满足不同场景需求(FIFO、LIFO、优先级)
技术亮点:
基于条件变量(Condition)的高效同步机制
支持超时和阻塞控制
任务完成跟踪(task_done/join)
简单而强大的API设计
适用场景:
多线程任务调度和分发
生产者-消费者模式实现
线程池和工作队列管理
异步处理缓冲区
任务优先级调度
安装使用:
# 无需安装,Python标准库import queue# 基本使用q = queue.Queue()q.put(item)item = q.get()q.task_done()q.join()
学习资源:
官方文档:https://docs.python.org/3/library/queue.html
Python并发编程指南:https://docs.python.org/3/library/concurrency.html
《Python Cookbook》第12章:并发编程
实战教程:Real Python多线程教程
作为Python标准库的一部分,queue模块被广泛应用于各种规模的Python项目中。它遵循 Python软件基金会许可证(PSF License),是所有Python发行版的组成部分。无论是小型脚本还是大型分布式系统,queue模块都提供了可靠、高效的线程安全队列实现。