import threadingimport timeimport queue# 创建一个蒸笼(最多放5个包子)basket = queue.Queue(maxsize=5)# 生产者:做包子的师傅def producer(name): count = 1 while True: print(f"{name} 开始做第 {count} 个包子...") time.sleep(1) # 做包子需要时间 # 如果蒸笼满了,这里会等待 basket.put(f"包子{count}") print(f"{name} 做好了第 {count} 个包子,蒸笼里还有 {basket.qsize()} 个") count += 1# 消费者:吃包子的顾客def consumer(name): while True: time.sleep(1.5) # 顾客消化时间 # 如果蒸笼空了,这里会等待 baozi = basket.get() print(f"{name} 吃了 {baozi},蒸笼里还剩 {basket.qsize()} 个") # 告诉队列这个包子处理完了 basket.task_done()# 创建角色并开始if __name__ == "__main__": # 2个师傅做包子 threading.Thread(target=producer, args=("张师傅",), daemon=True).start() threading.Thread(target=producer, args=("李师傅",), daemon=True).start() # 3个顾客吃包子 threading.Thread(target=consumer, args=("小王",), daemon=True).start() threading.Thread(target=consumer, args=("小赵",), daemon=True).start() threading.Thread(target=consumer, args=("小孙",), daemon=True).start() # 让程序运行一会 time.sleep(10) print("\n包子铺打烊了!")
import threadingimport queueimport timeimport random# 消息队列task_queue = queue.Queue(maxsize=10)result_queue = queue.Queue()# 生产者:模拟下载图片def downloader(name): for i in range(1, 6): print(f"{name} 开始下载图片{i}...") time.sleep(random.uniform(0.5, 1.5)) # 模拟下载时间 # 下载完成,放入任务队列 task_queue.put(f"图片{i}_数据") print(f"{name} 下载完成图片{i},待处理队列长度:{task_queue.qsize()}")# 消费者:模拟处理图片def processor(name): while True: # 从队列获取任务 image_data = task_queue.get() print(f"{name} 开始处理 {image_data}...") time.sleep(random.uniform(1, 2)) # 模拟处理时间 # 处理完成 result = f"{image_data}_已处理" result_queue.put(result) print(f"{name} 处理完成,结果:{result}") # 标记任务完成 task_queue.task_done()# 主程序if __name__ == "__main__": print("=== 图片处理系统开始工作 ===") # 启动2个下载者(生产者) for i in range(2): t = threading.Thread(target=downloader, args=(f"下载器{i+1}",)) t.start() # 启动3个处理器(消费者) for i in range(3): t = threading.Thread(target=processor, args=(f"处理器{i+1}",), daemon=True) t.start() # 等待所有下载任务完成 time.sleep(8) # 等待所有任务被处理 task_queue.join() print("\n=== 所有任务处理完成 ===") print(f"处理结果数量:{result_queue.qsize()}")