Python 初学者也能看懂的 APScheduler 入门指南
从零理解定时任务:会写函数,就能学会 APScheduler技术博客/ Python 入门 / 阅读 12 min当你写 Python 程序时,很快会遇到这样的问题:
这些需求都属于“定时任务”。如果你只是简单地 while True 加 time.sleep(),刚开始能跑,但代码很快会变得不好维护。比如:怎么按每天 9 点执行?怎么暂停任务?怎么处理任务异常?程序重启后任务还在不在?
| APScheduler 的全称是 Advanced Python Scheduler,它是 Python 里非常常用的定时任务框架。它可以帮你用更清晰、更专业的方式管理定时任务。 |
| 本文面向 Python 初学者,不要求你熟悉调度系统,只需要会写基本的 Python 函数。 |
APScheduler 主要负责一件事:在指定的时间规则下,自动执行你写好的 Python 函数。
01def send_report():
02 print("发送日报")
APScheduler 会在后台帮你盯着时间,到点就调用这个函数。
01pip install APScheduler
01from apscheduler.schedulers.blocking import BlockingScheduler
01from datetime import datetime
02from apscheduler.schedulers.blocking import BlockingScheduler
03
04
05def print_time():
06 print("当前时间:", datetime.now())
07
08
09scheduler = BlockingScheduler()
10
11scheduler.add_job(print_time, "interval", seconds=3)
12
13print("定时任务已启动")
14scheduler.start()
01定时任务已启动
02当前时间: 2026-06-23 10:00:03
03当前时间: 2026-06-23 10:00:06
04当前时间: 2026-06-23 10:00:09
| ✔ | ✔ |
| ✔ | ✔ |
| ✔ | ✔scheduler.start() 表示启动调度器。 |
刚开始学 APScheduler,最容易被各种名词绕晕。其实先记住下面几个就够了。
初学阶段最常接触的是前三个:Scheduler、Job、Trigger。
dateintervalcron:按类似 Linux crontab 的规则执行。
01from datetime import datetime, timedelta
02from apscheduler.schedulers.blocking import BlockingScheduler
03
04
05def once_job():
06 print("这个任务只执行一次")
07
08
09scheduler = BlockingScheduler()
10
11run_time = datetime.now() + timedelta(seconds=10)
12scheduler.add_job(once_job, "date", run_date=run_time)
13
14print("任务将在 10 秒后执行")
15scheduler.start()
这个任务会在程序启动 10 秒后执行一次,执行完就不会再重复执行。
interval 适合“每隔多久执行一次”的场景。
01from apscheduler.schedulers.blocking import BlockingScheduler
02
03
04def sync_data():
05 print("同步数据中...")
06
07
08scheduler = BlockingScheduler()
09
10scheduler.add_job(sync_data, "interval", minutes=5)
11
12scheduler.start()
| |
weeks | |
days | |
hours | |
minutes | |
seconds | |
cron 适合“每天几点”“每周几”“每月几号”这种更像日历的规则。
01from apscheduler.schedulers.blocking import BlockingScheduler
02
03
04def morning_job():
05 print("早上 9 点任务执行")
06
07
08scheduler = BlockingScheduler()
09
10scheduler.add_job(morning_job, "cron", hour=9, minute=0)
11
12scheduler.start()
01scheduler.add_job(
02 morning_job,
03 "cron",
04 day_of_week="mon-fri",
05 hour=18,
06 minute=30,
07)
| | |
year | | 2026 |
month | | 1-12 |
day | | 1-31 |
day_of_week | | mon-fri |
hour | | 0-23 |
minute | | 0-59 |
second | | 0-59 |
| / BlockingScheduler 和 BackgroundScheduler 的区别 |
01from apscheduler.schedulers.blocking import BlockingScheduler
02from apscheduler.schedulers.background import BackgroundScheduler
BlockingScheduler 会阻塞当前程序。
后面的代码就不会继续往下走了,程序会一直停在那里运行定时任务。
01from apscheduler.schedulers.blocking import BlockingScheduler
02
03
04def job():
05 print("任务执行")
06
07
08scheduler = BlockingScheduler()
09scheduler.add_job(job, "interval", seconds=5)
10scheduler.start()
BackgroundScheduler 会在后台线程中运行,不会阻塞主程序。
它适合和 Web 服务、桌面程序或其他长期运行的程序一起使用。
01import time
02from apscheduler.schedulers.background import BackgroundScheduler
03
04
05def job():
06 print("后台任务执行")
07
08
09scheduler = BackgroundScheduler()
10scheduler.add_job(job, "interval", seconds=5)
11scheduler.start()
12
13print("主程序继续运行")
14
15while True:
16 time.sleep(1)
如果你写的是一个单独的定时任务脚本,优先使用 BlockingScheduler。
如果你要把定时任务放进已有程序里,通常使用 BackgroundScheduler。
如果你的函数需要参数,可以通过 args 或 kwargs 传入。
01from apscheduler.schedulers.blocking import BlockingScheduler
02
03
04def send_message(username, content):
05 print(f"给 {username} 发送消息:{content}")
06
07
08scheduler = BlockingScheduler()
09
10scheduler.add_job(
11 send_message,
12 "interval",
13 seconds=10,
14 args=["小明", "记得提交日报"],
15)
16
17scheduler.start()
01scheduler.add_job(
02 send_message,
03 "interval",
04 seconds=10,
05 kwargs={
06 "username": "小明",
07 "content": "记得提交日报",
08 },
09)
建议给重要任务设置 id,这样以后可以方便地查询、删除、暂停或恢复任务。
01scheduler.add_job(
02 send_message,
03 "interval",
04 seconds=10,
05 id="send_message_job",
06 args=["小明", "记得提交日报"],
07)
如果你重复添加同一个 id 的任务,APScheduler 默认会报错。开发时常用 replace_existing=True,表示如果任务已存在,就替换它:
01scheduler.add_job(
02 send_message,
03 "interval",
04 seconds=10,
05 id="send_message_job",
06 args=["小明", "记得提交日报"],
07 replace_existing=True,
08)
01scheduler.pause_job("send_message_job")
01scheduler.resume_job("send_message_job")
01scheduler.remove_job("send_message_job")
01jobs = scheduler.get_jobs()
02
03for job in jobs:
04 print(job.id, job.next_run_time)
如果任务函数内部报错,APScheduler 不会直接让整个调度器崩溃,但你应该自己处理异常,方便排查问题。
01def sync_data():
02 try:
03 print("开始同步数据")
04 # 这里写真正的同步逻辑
05 result = 1 / 0
06 except Exception as e:
07 print("同步数据失败:", e)
在真实项目中,更推荐使用 logging 记录日志:
01import logging
02
03logging.basicConfig(level=logging.INFO)
04logger = logging.getLogger(__name__)
05
06
07def sync_data():
08 try:
09 logger.info("开始同步数据")
10 # 这里写真正的同步逻辑
11 except Exception:
12 logger.exception("同步数据失败")
logger.exception() 会自动记录异常堆栈,比单纯 print() 更适合排查问题。
默认情况下,APScheduler 把任务保存在内存里。
如果你希望任务重启后仍然存在,就需要使用 JobStore,比如把任务保存到数据库。
01from apscheduler.schedulers.blocking import BlockingScheduler
02from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
03
04
05def job():
06 print("持久化任务执行")
07
08
09jobstores = {
10 "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
11}
12
13scheduler = BlockingScheduler(jobstores=jobstores)
14
15scheduler.add_job(
16 job,
17 "interval",
18 seconds=10,
19 id="persistent_job",
20 replace_existing=True,
21)
22
23scheduler.start()
这样任务信息会保存到 jobs.sqlite 文件中。
需要注意:使用持久化任务时,任务函数必须是可以被 Python 正常导入的函数。不要使用匿名函数 lambda,也不要使用临时定义的内部函数。
假设一个任务每 10 秒执行一次,但某次执行花了 30 秒,会发生什么?
如果不加限制,可能出现多个同类任务同时执行,甚至越积越多。APScheduler 提供了几个参数帮助你控制这种情况。
01scheduler.add_job(
02 sync_data,
03 "interval",
04 seconds=10,
05 id="sync_data_job",
06 max_instances=1,
07 coalesce=True,
08 misfire_grace_time=30,
09)
| |
max_instances=1 | |
coalesce=True | |
misfire_grace_time=30 | 如果任务错过执行时间不超过 30 秒,仍然允许执行 |
初学时可以先记住这套组合:如果任务不应该并发执行,就设置 max_instances=1。
如果你的任务涉及“每天几点执行”,一定要注意时区。
01from apscheduler.schedulers.blocking import BlockingScheduler
02
03
04scheduler = BlockingScheduler(timezone="Asia/Shanghai")
01scheduler.add_job(
02 job,
03 "cron",
04 hour=9,
05 minute=0,
06)
如果你在 Flask 项目里想启动一个后台任务,可以使用 BackgroundScheduler。
01from flask import Flask
02from apscheduler.schedulers.background import BackgroundScheduler
03
04
05app = Flask(__name__)
06
07
08def check_status():
09 print("检查系统状态")
10
11
12scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
13scheduler.add_job(
14 check_status,
15 "interval",
16 seconds=30,
17 id="check_status_job",
18 replace_existing=True,
19)
20scheduler.start()
21
22
23@app.route("/")
24def index():
25 return "Hello Flask"
26
27
28if __name__ == "__main__":
29 app.run(debug=False)
这里要注意:开发环境如果开启 Flask 的 debug=True,自动重载可能导致任务被启动两次。初学阶段可以先设置 debug=False,等理解清楚后再处理重载问题。
如果你使用的是 BackgroundScheduler,它不会阻塞主线程。
01from apscheduler.schedulers.background import BackgroundScheduler
02
03
04def job():
05 print("任务执行")
06
07
08scheduler = BackgroundScheduler()
09scheduler.add_job(job, "interval", seconds=5)
10scheduler.start()
01import time
02
03while True:
04 time.sleep(1)
如果你只是写独立脚本,更简单的方式是使用 BlockingScheduler。
常见原因是程序启动了两份,或者 Web 框架的开发模式开启了自动重载。
例如 Flask 的 debug=True,可能会导致代码被加载两次。解决时可以先关闭 debug,或者确保调度器只在主进程中启动。
01def my_job():
02 print("任务执行")
01scheduler.add_job(lambda: print("任务执行"), "interval", seconds=5)
APScheduler 适合单机程序里的定时任务。如果你需要分布式任务队列、任务重试、任务结果追踪、大量异步任务处理,通常要考虑 Celery、RQ、Arq 等任务队列工具。
如果你只是想在 Python 程序里每隔一段时间执行某个函数,APScheduler 是很合适的选择。
01import logging
02from datetime import datetime
03from apscheduler.schedulers.blocking import BlockingScheduler
04
05
06logging.basicConfig(
07 level=logging.INFO,
08 format="%(asctime)s [%(levelname)s] %(message)s",
09)
10logger = logging.getLogger(__name__)
11
12
13def sync_data():
14 try:
15 logger.info("开始同步数据")
16 logger.info("当前时间:%s", datetime.now())
17 # 在这里编写真实的数据同步逻辑
18 logger.info("数据同步完成")
19 except Exception:
20 logger.exception("数据同步失败")
21
22
23def clean_logs():
24 logger.info("开始清理日志")
25 # 在这里编写真实的日志清理逻辑
26 logger.info("日志清理完成")
27
28
29def main():
30 scheduler = BlockingScheduler(timezone="Asia/Shanghai")
31
32 scheduler.add_job(
33 sync_data,
34 "interval",
35 minutes=10,
36 id="sync_data_job",
37 max_instances=1,
38 coalesce=True,
39 replace_existing=True,
40 )
41
42 scheduler.add_job(
43 clean_logs,
44 "cron",
45 hour=2,
46 minute=0,
47 id="clean_logs_job",
48 max_instances=1,
49 coalesce=True,
50 replace_existing=True,
51 )
52
53 logger.info("调度器启动")
54 scheduler.start()
55
56
57if __name__ == "__main__":
58 main()
✔timezone="Asia/Shanghai":使用中国时区。 | ✔ |
✔max_instances=1:避免同一个任务并发执行。 | ✔coalesce=True:避免错过任务后大量补跑。 |
✔replace_existing=True:避免重复添加同名任务时报错。 | |
| APScheduler 的核心并不复杂:写好函数,设定时间规则,让调度器负责按时调用。 |
APScheduler 是 Python 中非常实用的定时任务框架。对于初学者来说,可以先掌握下面这些内容:
✔ 用 pip install APScheduler 安装。 | ✔ 用 BlockingScheduler 写独立定时任务脚本。 |
✔ 用 BackgroundScheduler 把任务放到已有程序后台运行。 | ✔ |
| ✔ | ✔ |
| ✔ | ✔ 使用 max_instances、coalesce 避免任务堆积。 |
| ✔ | |
如果你刚开始学习 APScheduler,不需要一次性理解所有高级概念。最好的学习方式是先写一个每隔几秒打印一次内容的小脚本,然后慢慢加入 cron、任务 id、日志、异常处理和持久化。
定时任务看起来简单,但在真实项目里非常常见。掌握 APScheduler 后,你就可以更优雅地处理数据同步、日志清理、状态检查、定时通知等后台工作。