from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.triggers.cron import CronTriggerimport pywencaiimport requestsfrom datetime import datetimefrom chinese_calendar import is_workday# 飞书机器人配置# 获取方式:飞书群设置 -> 群机器人 -> 添加机器人 -> 自定义机器人FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx" # 请替换为你的飞书Webhook地址def get_auction_data(): """获取竞价排名前10的股票数据""" try: query = "非ST,竞价涨停" # 改成自己的策略 # 注意:pywencai 返回的数据可能包含停牌或无数据项,需视情况处理 df = pywencai.get(query=query, sort_key='竞价涨停封单金额', sort_order='desc') return df[['股票代码', '股票简称']].head(10) except Exception as e: print(f"数据获取失败: {str(e)}") return Nonedef send_feishu_msg(content): """发送消息到飞书""" headers = {"Content-Type": "application/json"} # 飞书卡片消息格式 data = { "msg_type": "interactive", "card": { "config": { "wide_screen_mode": True }, "header": { "title": { "tag": "plain_text", "content": "🕘 股票监控" }, "template": "blue" }, "elements": [ { "tag": "markdown", "content": content } ] } } try: response = requests.post(FEISHU_WEBHOOK, json=data, headers=headers) print(f"消息发送状态: {response.status_code}, 返回信息: {response.text}") except Exception as e: print(f"发送消息异常: {str(e)}")def job(): """定时任务主逻辑""" # 检查是否为工作日(注释掉则每天都发) # if not is_workday(datetime.now()): # return data = get_auction_data() if data is not None: markdown_content = "" for _, row in data.iterrows(): # 格式:序号. **股票名称** (股票代码) markdown_content += f"**{row['股票简称']}** ({row['股票代码']})\n" # 打印日志调试 print("准备发送数据:\n", markdown_content) # 发送消息 send_feishu_msg(markdown_content)if __name__ == "__main__": scheduler = BlockingScheduler(timezone="Asia/Shanghai") scheduler.add_job( job, CronTrigger(day_of_week='mon-fri', hour=19, minute=19) ) print("定时任务启动,等待执行...") try: scheduler.start() except (KeyboardInterrupt, SystemExit): scheduler.shutdown()