导语:你的 Python 应用还在同步处理耗时任务吗?用户上传一个视频要等转码完成才能看到响应?本文教你用 Huey + Redis 搭建高并发异步任务队列,3 分钟上手,轻松应对百万级任务调度!
一、为什么选择 Huey?
Huey 优势:轻量级、API 简洁、支持定时任务、优先级调度,一个小文件就能跑起来!
二、项目架构一览
huey_demo/├── config.py # 配置中心├── tasks.py # 任务定义├── client.py # 任务调用└── run.sh # 一键启动
三、核心代码实战
1. 配置文件 config.py
# -*- coding: utf-8 -*-from huey import PriorityRedisHuey# 创建 Huey 实例(支持优先级)huey = PriorityRedisHuey( name='demo_app', host='localhost', port=6379, db=0, results=True, # 启用结果存储 utc=True,)TASK_CONFIG = {'max_retries': 3, # 最大重试次数'retry_delay': 60, # 重试间隔}
💡 小贴士:PriorityRedisHuey 支持任务优先级,高优先级任务会优先执行!
2. 任务定义 tasks.py
2.1 基础异步任务
from config import hueyimport time@huey.task()defadd(a, b):"""基础任务:两数相加"""print(f"[Task] 执行加法: {a} + {b}") time.sleep(1) # 模拟耗时return a + b
2.2 重试机制
@huey.task(retries=3, retry_delay=10)defdivide(a, b):"""带重试的除法任务"""print(f"[Task] 执行除法: {a} / {b}")if b == 0:raise ValueError("除数不能为0")return a / b
2.3 延迟任务(定时发送邮件)
from datetime import datetime, timedelta@huey.task()defsend_email(to_email, subject, content):"""发送邮件"""print(f"[Task] 发送邮件到: {to_email}") time.sleep(2)return {'status': 'sent','to': to_email,'sent_at': datetime.now().isoformat() }# 使用方式:# 延迟 10 秒执行send_email.schedule( args=('user@example.com', 'Hello', '内容'), delay=10)# 指定时间执行eta = datetime.now() + timedelta(minutes=5)send_email.schedule(args=..., eta=eta)
2.4 定时任务(类似 Cron)
from huey import crontab@huey.periodic_task(crontab(minute='*/5'))defhealth_check():"""每5分钟执行一次健康检查"""print(f"[Periodic] 健康检查 - {datetime.now()}")return {'status': 'healthy'}@huey.periodic_task(crontab(hour='2', minute='0'))defdaily_report():"""每天凌晨2点生成日报"""print(f"[Periodic] 生成日报")
2.5 优先级任务
@huey.task(priority=10) # 高优先级defhigh_priority_task(name):"""优先执行"""print(f"[High] 执行: {name}")returnf"Task '{name}' done"@huey.task(priority=1) # 低优先级deflow_priority_task(name):"""延后执行"""print(f"[Low] 执行: {name}")
2.6 链式任务(Pipeline)
@huey.task()defstep1_extract_data(source):"""步骤1:提取数据"""return {'source': source, 'data': [1, 2, 3]}@huey.task()defstep2_transform_data(data):"""步骤2:转换数据""" data['transformed'] = [x * 2for x in data['data']]return data@huey.task()defstep3_load_data(data):"""步骤3:加载数据"""return {'status': 'completed', 'result': data}# 创建执行管道pipeline = ( step1_extract_data.s('database') .then(step2_transform_data) .then(step3_load_data))result = huey.enqueue(pipeline)
3. 客户端调用 client.py
from tasks import add, send_email, dividefrom config import hueyimport time# 1. 基础调用result = add(10, 20)print(f"任务ID: {result.id}")value = result.get(blocking=True, timeout=10) # 阻塞等待结果print(f"结果: {value}") # 30# 2. 异步执行(不等待)result = send_email('user@test.com', '主题', '内容')print("邮件已入队,立即返回")# 3. 批量提交urls = ['url1', 'url2', 'url3']results = [fetch_url(url) for url in urls]# 等待所有完成for r in results:try: data = r.get(blocking=True, timeout=15)print(f"成功: {data}")except Exception as e:print(f"失败: {e}")# 4. 撤销任务delayed = send_email.schedule(args=..., delay=60)huey.revoke_by_id(delayed.id) # 撤销未执行的任务
四、启动运行
一键启动脚本 run.sh
#!/bin/bashcase"${1:-help}"in worker)# 启动 Worker(2个线程) huey_consumer.py tasks.huey -w 2 -k thread -v ;; client) python client.py ;; status)# 查看队列状态 redis-cli LLEN huey redis-cli ZCARD huey.schedule ;;esac
启动步骤
# 1. 启动 Redisdocker run -d -p 6379:6379 redis:7-alpine# 2. 启动 Worker./run.sh worker# 3. 运行客户端(另开终端)./run.sh client
五、运行效果展示
==================================================Huey 任务队列演示==================================================1. 基础任务 (add 10 + 20)... 任务ID: fa499959-9a21-40ba-a442-bb712a05de55 结果: 30 ✓2. 延迟任务 (3秒后发送邮件)... 任务ID: 06fb2750-4f15-4d39-9d25-e0c61570b2ea 3秒后准时执行 ✓3. 优先级任务队列... 高优先级任务先执行 ✓4. 错误重试机制... divide(10, 0) 触发重试,60秒后重试 ✓5. 定时任务... health_check 每5分钟自动执行 ✓
六、高级应用场景
6.1 图片处理流水线
@huey.task()defimage_pipeline(image_id): pipeline = ( download_image.s(image_id) .then(generate_thumbnail) .then(apply_watermark) .then(upload_to_cdn) )return huey.enqueue(pipeline)
6.2 Webhook 异步处理
@huey.task(retries=3, retry_delay=30)defprocess_webhook(payload, signature):"""异步处理支付回调"""ifnot verify_signature(payload, signature):raise ValueError("签名无效")if payload['event'] == 'order.paid': send_email.schedule(args=(payload['email'], ...), delay=5) update_inventory.delay(payload['items'])
6.3 数据同步
@huey.periodic_task(crontab(hour='2', minute='0'))defsync_to_warehouse():"""每天凌晨2点同步数据仓库""" tables = ['users', 'orders', 'products']for table in tables: sync_data_to_warehouse(table)
七、生产环境建议
1. Worker 类型选择
| | |
|---|
| | -k process -w 4 |
| | -k thread -w 8 |
| | -k greenlet -w 16 |
2. 监控命令
# 查看挂起任务redis-cli LRANGE huey 0 -1# 查看定时任务redis-cli ZRANGE huey.schedule 0 -1# 清空队列redis-cli FLUSHDB
3. Docker 部署
# docker-compose.ymlversion:'3.8'services:redis:image:redis:7-alpineports:-"6379:6379"worker:build:.command:huey_consumer.pytasks.huey-w4-kprocessdepends_on:-redis
八、总结
Huey 是一个轻量但功能强大的 Python 任务队列框架:
✅ 极简 API:几行代码就能实现异步任务 ✅ 功能全面:支持延迟任务、定时任务、优先级、重试 ✅ 易于监控:内置信号系统,可追踪任务生命周期 ✅ 生产就绪:支持多进程/多线程/协程多种 Worker 模式
适用场景:
九、获取完整代码
git clone https://gitee.com/michah/huey-demo.gitcd huey-demopip install -r requirements.txt./run.sh demo
项目地址:https://gitee.com/michah/huey-demo [1]
📢 觉得有用? 点击右上角「...」分享到朋友圈,让更多开发者看到!
💬 有问题? 欢迎在评论区留言交流!
🔥 关注我,获取更多 Python 高并发实战技巧!
引用链接
[1]: https://gitee.com/michah/huey-demo