关注+星标,每天学习Python新技能
来源:网络
simplepyq介绍
simplepyq 是一个轻量级任务队列库,专为需要后台任务的小型 Python 项目而设计执行,而无需使用 Celery、Airflow 或 Redis 等繁重的工具。它使用 SQLite 完成任务 持久性,确保任务在应用程序重启后继续存在,并提供任务通道、自动 重试和 Dynamic Task Deferral 的 Rets作。凭借最小的依赖项,simplepyq 易于设置,非常适合 需要简单、可靠的任务队列的应用程序。
特性
- • 通道:通过将任务与特定功能相关联来组织任务,从而实现分组任务处理。
- • 持久性:将任务存储在 SQLite 数据库中,以确保它们在应用程序重新启动或崩溃期间不会丢失。
- • 重试:自动重试失败的任务指定次数,从而提高瞬态错误的弹性。
- • DelayException:将任务动态延迟指定的持续时间,从而允许根据运行时条件进行灵活调度。
- • 简单的设置:最少的依赖项和简单的 API,只需要 Python 和 msgpack。
- • 任务管理:用于清除失败任务、重新排队或删除整个通道的工具,从而提供对任务生命周期的控制。
概念
Channelssimplepyq 中的通道允许按任务的用途或关联功能对任务进行分组。每个频道都已链接 添加到处理任务的特定 Python 函数中,可以为每个 渠道。这对于分隔不同类型的任务非常有用,例如用于发送电子邮件的 “email” 和 “image_processing” 用于处理图像上传,确保有序和并行的任务执行。
持久化任务存储在 SQLite 数据库中,该数据库提供轻量级持久性,而无需外部系统。 每个任务都与其通道、参数、状态(待处理、正在运行、延迟、完成或失败)、重试和 可选的延迟时间戳。这可确保在应用程序重新启动时任务不会丢失,从而使 simplepyq 可靠 用于长时间运行的作。
重试当任务引发异常时,simplepyq 可以根据指定的重试计数自动重试它。这是 特别适用于处理暂时性故障。如果重试次数已用尽,则任务将被标记为 “失败” 稍后检查或重新排队。
DelayException 异常DelayException 允许通过引发具有指定延迟(以秒为单位)的异常来动态延迟任务。 这对于速率受限的 API 等场景非常有用,其中任务需要在重试之前等待,或者用于计划任务 以稍后运行。任务被标记为 “delayed” ,并在延迟期到期时自动重新排队。
任务管理SimplePyQ 提供了有效管理任务的方法: - clear_failed:从数据库中删除失败的任务。 - requeue_failed:使用原始或新的重试计数对失败的任务重新排队。 - remove_channel:删除频道及其所有任务。 - 停止和run_until_complete:控制调度程序的执行,在后台运行任务或直到所有任务完成。
安装
通过 pip 安装 simplepyq:
pip install simplepyq
使用示例
以下是演示 simplepyq 每个功能的示例,旨在突出其在实际场景中的功能。
1. 使用通道进行基本任务队列
将任务组织到一个频道中进行 Web 抓取,在后台处理 URL。
from simplepyq import SimplePyQdef scrape_url(args): url = args["url"] print(f"Scraping {url}")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("scrape", scrape_url, max_workers=2) # Two workers for parallel scrapingscheduler.enqueue("scrape", {"url": "https://example.com"})scheduler.enqueue("scrape", {"url": "https://example.org"})scheduler.start() # Runs in the background# Tasks are processed concurrently by two worker threads
说明:使用 Scrape 通道创建时具有处理 URL 的函数,并且两个工作程序允许并行执行。任务与参数一起排队并异步处理。
2. 任务重试以实现弹性
使用自动重试处理暂时性故障,例如网络问题。
from simplepyq import SimplePyQimport requestsdef fetch_data(args): url = args["url"] response = requests.get(url) if response.status_code != 200: raise Exception("Failed to fetch data") print(f"Fetched data from {url}")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("fetch", fetch_data)scheduler.enqueue("fetch", {"url": "https://api.example.com/data"}, retries=3) # Retry up to 3 timesscheduler.run_until_complete() # Runs until all tasks are complete
说明:如果 API 调用失败,则任务在标记为失败之前最多重试 3 次,以确保对临时问题的弹性。
3.带有 DelayException 的动态任务延迟
动态延迟任务,对于速率受限的 API 非常有用。
from simplepyq import SimplePyQ, DelayExceptiondef call_api(args): url = args["url"] response = requests.get(url) if response.status_code == 403: raise DelayException(60) # Wait 60 seconds before retrying print(f"Calling {url}")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("api", call_api)scheduler.enqueue("api", {"url": "https://api.example.com/rate_limit"})scheduler.start() # Task will be deferred for 60 seconds if rate-limited
说明:DelayException 将任务延迟 60 秒,以允许遵守速率限制或安排稍后重试。
4. 清除失败的任务
删除失败的任务以清理数据库。
from simplepyq import SimplePyQdef risky_task(args): raise Exception("Task failed intentionally")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("risky", risky_task)scheduler.enqueue("risky", {"data": "test"}, retries=1)scheduler.run_until_complete() # Task fails after one retryscheduler.clear_failed("risky") # Remove failed tasks for the 'risky' channel
解释:任务失败且重试次数用尽后,clear_failed会将其从数据库中删除,以保持其干净。
5. 对失败的任务进行重新排队
将失败的任务重新排队以进行另一次尝试。
from simplepyq import SimplePyQattempts = 0def flaky_task(args): global attempts if attempts < 2: # Fail on first attempt attempts += 1 raise Exception("Temporary failure") print("Task succeeded")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("flaky", flaky_task)scheduler.enqueue("flaky", {}, retries=0)scheduler.run_until_complete() # Task failsscheduler.requeue_failed("flaky", retries=1) # Requeue with one retryscheduler.run_until_complete() # Task succeeds on second attempt
6. 删除通道
不再需要时删除频道及其任务。
from simplepyq import SimplePyQdef temp_task(args): print(f"Processing {args['data']}")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("temp", temp_task)scheduler.enqueue("temp", {"data": "test"})scheduler.run_until_complete()scheduler.remove_channel("temp") # Removes channel and all its tasks
说明:临时通道及其任务已删除,可用于在不再需要任务类型时进行清理。
7. 运行直到完成
from simplepyq import SimplePyQdef process_data(args): print(f"Processing {args['data']}")scheduler = SimplePyQ("tasks.db")scheduler.add_channel("data", process_data)scheduler.enqueue("data", {"data": "item1"})scheduler.enqueue("data", {"data": "item2"})scheduler.run_until_complete() # Blocks until all tasks are done
说明:run_until_complete 处理所有任务并停止调度程序,非常适合脚本或批处理。
以上就是simplepyq的全部介绍了,简单容易上手,可以用在不需要太复杂的项目里。