作为一个经常关注原油价格的人,每天都要多次打开 https://oilprice.com 查看布伦特、WTI 和天然气的价格。次数多了就觉得麻烦——能不能写个脚本,自动在交易时间段内每隔5分钟抓取一次价格,然后推送到我的企业微信群里?
说干就干。经过一番折腾,我终于实现了这个需求。今天就把完整的过程分享出来,希望对你有帮助。
一、最终效果
先看看成品的样子。每天早上8点到11点半、下午2点到6点,脚本会自动运行,每隔5分钟获取一次最新油价,然后通过企业微信机器人发送到群里:
消息内容清晰,包含原油品种、当前价格、数据更新时间,完全满足日常监控需求。二、准备工作
1. 注册 OilPriceAPI 并获取密钥
我们要用的是 oilpriceapi.com 的官方 API,相比网页抓取,这种方式稳定、合规、简单。
2. 创建企业微信群机器人
以手机企业微信为例,打开目标群聊
点击右上角 ··· → 消息推送
⚠️ 安全提醒:Webhook 地址相当于机器人的“钥匙”,切勿上传到公开代码仓库!
3. 安装 Python 依赖
pip install oilpriceapi python-dotenv requests
三、代码实现
步骤1:配置 .env 文件
在项目根目录下创建 .env 文件(注意文件名以点开头),内容如下:
OILPRICE_API_KEY=你的API密钥WECHAT_WEBHOOK_URL=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人密钥
不要把这个文件提交到 Git,记得在 .gitignore 中加入 .env。
步骤2:完整 Python 脚本
将下面的代码保存为 oilprice_monitor.py:
import osimport timeimport requestsfrom datetime import datetime, time as dt_timefrom dotenv import load_dotenvfrom oilpriceapi import OilPriceAPI# 加载 .env 中的配置load_dotenv()API_KEY = os.getenv("OILPRICE_API_KEY")WECHAT_WEBHOOK_URL = os.getenv("WECHAT_WEBHOOK_URL")if not API_KEY or not WECHAT_WEBHOOK_URL: raise RuntimeError("请在 .env 文件中配置 OILPRICE_API_KEY 和 WECHAT_WEBHOOK_URL")# 初始化油价 API 客户端client = OilPriceAPI(api_key=API_KEY)# 要监控的原油品种commodities = [ ("BRENT_CRUDE_USD", "布伦特原油"), ("WTI_USD", "WTI原油"), ("NATURAL_GAS_USD", "天然气"),]def send_to_wechat(message): """发送消息到企业微信群""" data = {"msgtype": "text", "text": {"content": message}} try: resp = requests.post(WECHAT_WEBHOOK_URL, json=data, timeout=10) result = resp.json() if result.get('errcode') == 0: print(f"[{datetime.now()}] 消息发送成功") else: print(f"发送失败: {result.get('errmsg')}") except Exception as e: print(f"发送异常: {e}")def fetch_and_send(): """获取油价并发送到企业微信""" lines = [] lines.append(f"原油价格行情 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("=" * 30) for code, name in commodities: try: price_data = client.prices.get(code) price = price_data.value # API 返回的时间是 UTC,这里直接格式化 time_str = price_data.timestamp.strftime("%Y-%m-%d %H:%M:%S") line = f"{name}: ${price:.2f} / 桶 (更新时间: {time_str})" print(line) lines.append(line) except Exception as e: error_line = f"{name}: 获取失败 - {e}" print(error_line) lines.append(error_line) send_to_wechat("\n".join(lines))def is_trading_time(now: datetime) -> bool: """判断当前时间是否在交易时间段内""" current = now.time() morning = (dt_time(8, 0) <= current <= dt_time(11, 30)) afternoon = (dt_time(14, 0) <= current <= dt_time(18, 0)) return morning or afternoondef main_loop(): print("油价监控已启动") print("时间段: 8:00~11:30, 14:00~18:00 | 间隔: 5分钟") print("按 Ctrl+C 停止\n") while True: now = datetime.now() if is_trading_time(now): fetch_and_send() time.sleep(300) # 5分钟 else: time.sleep(60) # 非交易时段,每分钟检查一次if __name__ == "__main__": try: main_loop() except KeyboardInterrupt: print("\n程序已停止")
四、代码解析(挑重点讲)
1. 时间段控制
is_trading_time() 函数判断当前时间是否在 8:00-11:30 或 14:00-18:00 内。之所以这样设置,是因为原油主要交易时段覆盖了这两个区间(亚洲和欧美重叠时段)。你也可以根据自己的需要修改。
2. 定时循环
主循环 while True 中:
3. 消息格式
企业微信机器人支持纯文本、Markdown、图文等。这里使用了最简单的文本消息,内容清晰即可。
4. 时区处理
OilPriceAPI 返回的时间是 UTC 时间(末尾带 +00:00)。代码中直接格式化为本地字符串显示,如果你需要显示北京时间,可以在 strftime 前手动加上 8 小时,不过对于监控来说,UTC 时间也足够用了。
五、运行与测试
本地运行
在终端执行:
python oilprice_monitor.py
如果当前时间在交易时段内,会立刻看到打印信息,并且企业微信群会收到消息。后台运行(Linux / macOS)
nohup python oilprice_monitor.py > oil.log 2>&1 &
Windows 下作为计划任务
可以使用 pythonw.exe 静默运行,或者用 schedule 库替代 while True 循环,但目前的 sleep 方式最简单。
测试小技巧
如果你想立即测试,可以把 is_trading_time 中的时间范围临时改成包含当前时间的小区间,例如:
morning = (dt_time(0, 0) <= current <= dt_time(23, 59)) # 全天运行
六、常见问题与解决方案
Q1:API 请求失败或超时?
Q2:企业微信收不到消息?
Q3:时间不对?
OilPriceAPI 返回的时间是 UTC,如果你需要北京时间,可以这样转换:
from datetime import timezonebeijing_time = price_data.timestamp.astimezone(timezone(timedelta(hours=8)))
Q4:如何同时监控股票?
本项目聚焦原油,但思路完全通用。你可以用 yfinance、AData、Tushare 等库获取股票数据,用同样的 send_to_wechat 函数推送。我之前的探索中,也写过股票监控的版本,需要的话可以在此基础上扩展。
七、扩展想法
既然已经有了这个“定时获取 + 推送到企业微信”的框架,你还可以用它来监控:
加密货币价格(如 Binance API)
天气预报(如和风天气 API)
汇率(如 ExchangeRate-API)
服务器状态(ping、磁盘使用率等)
企业微信机器人是一个非常实用的通知渠道,配合 Python 的定时任务,可以打造属于自己的信息监控系统。
八、完整项目文件结构
oilprice_monitor/├── .env # 存放 API Key 和 Webhook(不提交 Git)├── oilprice_monitor.py # 主脚本├── .gitignore # 忽略 .env 和 __pycache__└── README.md # 项目说明
九、最后
整个脚本不到 100 行,却解决了一个实际的重复劳动问题。现在每天我打开企业微信,就能看到最新的油价,再也不用频繁刷新网页了。
如果你也想尝试,就按上面的步骤操作吧。遇到问题欢迎留言交流。