

数据是个宝,研究少烦恼
挖的深,看的远,找规律
做笔记,练盘感,多总结

★★★★★博文原创不易,我的博文不需要打赏,也不需要知识付费,可以白嫖学习小技巧。喜欢的老铁可以点赞+收藏分享+置顶,小红牛在此表示感谢。★★★★★
量化教程: 教你快速上车,通达信量化平台(TdxQuant太强了,真香啊)
量化教程:通达信量化接口函数TdxQuant+Tkinter综合示例演示(带你速成速学)
量化教程:大盘指数月线回测系统1.0(TdxQuant+pandas+Tkinter)
大盘指数月线回测系统1.0是最开始的写法。目前将源码中get_datas 和 get_test_datas 定义为静态方法(使用 @staticmethod 装饰器),并移入 MonthlyBacktestApp 类内部。及get_test_datas(回测逻辑函数)的性能和代码简洁度,可优化之处如下。
方案一:使用 groupby().apply() + Series 返回(推荐):代码量减少约40%,逻辑集中,利用 groupby 内置优化,避免手动循环,易于扩展或修改统计指标。返回数据类型设置为字符串dtype='string'。
def get_test_datas(start, end, stock, stock_zdf, stock_zf):df = get_datas(start, end, stock)df['月份'] = df.index.month # 直接获取月份数字datas = df.dropna()def monthly_stats(sub_df):total = len(sub_df)zdf = sub_df['zdf']zf = sub_df['zf']cnt_up_th = (zdf >= stock_zdf).sum()cnt_down_th = (zdf <= -stock_zdf).sum()cnt_zf_up = (zf >= stock_zf).sum()cnt_zf_down = (zf <= -stock_zf).sum()cnt_up = (zdf > 0).sum()cnt_down = (zdf < 0).sum()return pd.Series({f'涨幅{stock_zdf}%次数': cnt_up_th,f'跌幅{-stock_zdf}%次数': cnt_down_th,'涨幅净次数': cnt_up_th - cnt_down_th,f'涨幅{stock_zdf}%胜率': round(100 * cnt_up_th / total, 2) if total else 0,f'振幅{stock_zf}%次数': cnt_zf_up,f'振幅{-stock_zf}%次数': cnt_zf_down,'振幅净次数': cnt_zf_up - cnt_zf_down,f'振幅{stock_zf}%胜率': round(100 * cnt_zf_up / total, 2) if total else 0,'上涨次数': cnt_up,'下跌次数': cnt_down,'胜率': round(100 * cnt_up / total, 2) if total else 0,'平均涨幅': round(zdf.mean(), 2),'最大跌幅': zdf.min(),'最大涨幅': zdf.max()}, dtype='string')result = datas.groupby('月份').apply(monthly_stats)result.index.name = '月份'return result
方案二:使用 agg + 自定义聚合函数(更函数式):优点,完全向量化,性能最好;缺点,列名和聚合函数映射写在一起略显臃肿。
def get_test_datas(start, end, stock, stock_zdf, stock_zf):df = get_datas(start, end, stock)df['月份'] = df.index.monthdatas = df.dropna()# 定义聚合函数字典agg_funcs = {f'涨幅{stock_zdf}%次数': ('zdf', lambda x: (x >= stock_zdf).sum()),f'跌幅{-stock_zdf}%次数': ('zdf', lambda x: (x <= -stock_zdf).sum()),'涨幅净次数': ('zdf', lambda x: (x >= stock_zdf).sum() - (x <= -stock_zdf).sum()),f'涨幅{stock_zdf}%胜率': ('zdf', lambda x: round(100 * (x >= stock_zdf).sum() / len(x), 2) if len(x) else 0),f'振幅{stock_zf}%次数': ('zf', lambda x: (x >= stock_zf).sum()),f'振幅{-stock_zf}%次数': ('zf', lambda x: (x <= -stock_zf).sum()),'振幅净次数': ('zf', lambda x: (x >= stock_zf).sum() - (x <= -stock_zf).sum()),f'振幅{stock_zf}%胜率': ('zf', lambda x: round(100 * (x >= stock_zf).sum() / len(x), 2) if len(x) else 0),'上涨次数': ('zdf', lambda x: (x > 0).sum()),'下跌次数': ('zdf', lambda x: (x < 0).sum()),'胜率': ('zdf', lambda x: round(100 * (x > 0).sum() / len(x), 2) if len(x) else 0),'平均涨幅': ('zdf', 'mean'),'最大跌幅': ('zdf', 'min'),'最大涨幅': ('zdf', 'max')}# 分组聚合result = datas.groupby('月份').agg(**{name: (col, func) for name, (col, func) in agg_funcs.items()})result = result.round(2).astype('string') # 统一保留两位小数result.index.name = '月份'return result
方案三:微调原代码(最小改动):用 list 收集字典再构造 DataFrame,避免预分配和多次 .loc,直接用 .sum() 代替 .shape[0],移除不必要的 columns 列表。
def get_test_datas(start, end, stock, stock_zdf, stock_zf):df = get_datas(start, end, stock)df['月份'] = df.index.month # 改进1:直接获取月份datas = df.dropna()# 预先获取所有月份,保证顺序months = sorted(datas['月份'].unique())records = []for month, group in datas.groupby('月份'):total = len(group)zdf = group['zdf']zf = group['zf']cnt_red = (zdf >= stock_zdf).sum()cnt_green = (zdf <= -stock_zdf).sum()cnt_zf_red = (zf >= stock_zf).sum()cnt_zf_green = (zf <= -stock_zf).sum()up = (zdf > 0).sum()down = (zdf < 0).sum()records.append({'月份': month,f'涨幅{stock_zdf}%次数': cnt_red,f'跌幅{-stock_zdf}%次数': cnt_green,'涨幅净次数': cnt_red - cnt_green,f'涨幅{stock_zdf}%胜率': round(100 * cnt_red / total, 2) if total else 0,f'振幅{stock_zf}%次数': cnt_zf_red,f'振幅{-stock_zf}%次数': cnt_zf_green,'振幅净次数': cnt_zf_red - cnt_zf_green,f'振幅{stock_zf}%胜率': round(100 * cnt_zf_red / total, 2) if total else 0,'上涨次数': up,'下跌次数': down,'胜率': round(100 * up / total, 2) if total else 0,'平均涨幅': round(zdf.mean(), 2),'最大跌幅': zdf.min(),'最大涨幅': zdf.max()})result = pd.DataFrame(records).set_index('月份').astype('string')# print(result)return result

最终推荐方案一:它在简洁性、可读性和性能之间取得了最佳平衡,且便于后续增加新的统计指标(只需在 Series 字典中添加键值对)。如果需要更高的执行效率(数据量极大时),可选择方案二;如果希望保持原有结构但小幅优化,可选择方案三。
方案1完整源码如下
# -*- coding: utf-8 -*-# @Author : 小红牛# 微信公众号:gxzfp88import tkinter as tkfrom tkinter import ttk, messageboximport pandas as pdimport numpy as npimport threadingfrom datetime import datetimefrom tqcenter import tqclass MonthlyBacktestApp:def __init__(self, root):self.root = rootself.root.title("大盘指数月线回测系统 优化版——小红牛微信公众号:gxzfp88")self.root.state('zoomed')self.root.resizable(True, True)self.status_var = tk.StringVar()self.status_var.set("正在初始化...")# 排序状态self.sort_column_name = Noneself.sort_reverse = Falseself.display_df = None # 用于排序的 DataFrameself.create_widgets()self.set_default_params()# 尝试连接通达信客户端try:tq.initialize(__file__)self.status_var.set("通达信连接成功")self.load_stock_list()except Exception as e:messagebox.showerror("初始化错误", f"通达信初始化失败:{str(e)}\n请确保通达信客户端已运行。")self.status_var.set("初始化失败")self.run_btn.config(state=tk.DISABLED)self.view_btn.config(state=tk.DISABLED) # 新增:禁用查看按钮def create_widgets(self):"""创建所有界面组件"""# 顶部参数设置区域param_frame = ttk.LabelFrame(self.root, text="设置回测参数(默认为前复权数据)", padding=10)param_frame.pack(fill=tk.X, padx=10, pady=5)# 第一行:股票代码,起始日期,结束日期row1 = ttk.Frame(param_frame)row1.pack(fill=tk.X, pady=2)ttk.Label(row1, text="指数代码:").pack(side=tk.LEFT, padx=5)self.stock_combo = ttk.Combobox(row1, width=25, state='readonly')self.stock_combo.pack(side=tk.LEFT, padx=5)ttk.Label(row1, text="起始日期 (YYYYMMDD):").pack(side=tk.LEFT, padx=5)self.start_entry = ttk.Entry(row1, width=12)self.start_entry.pack(side=tk.LEFT, padx=5)ttk.Label(row1, text="结束日期 (YYYYMMDD):").pack(side=tk.LEFT, padx=5)self.end_entry = ttk.Entry(row1, width=12)self.end_entry.pack(side=tk.LEFT, padx=5)# 第二行:涨幅阈值,振幅阈值,功能按钮row2 = ttk.Frame(param_frame)row2.pack(fill=tk.X, pady=2)ttk.Label(row2, text="涨幅阈值 (%):").pack(side=tk.LEFT, padx=5)self.zdf_combo = ttk.Combobox(row2, width=6, state='readonly',values=['2', '3', '5', '8', '10', '13'])self.zdf_combo.pack(side=tk.LEFT, padx=5)self.zdf_combo.set('5')ttk.Label(row2, text="振幅阈值 (%):").pack(side=tk.LEFT, padx=5)self.zf_combo = ttk.Combobox(row2, width=6, state='readonly',values=['5', '8', '10', '13', '15', '18', '20'])self.zf_combo.pack(side=tk.LEFT, padx=15)self.zf_combo.set('10')# 新增:查看数据按钮(位于回测按钮左侧)self.view_btn = ttk.Button(row2, text="查看数据", command=self.start_view_data)self.view_btn.pack(side=tk.LEFT, padx=15)self.run_btn = ttk.Button(row2, text="开始回测指数数据", command=self.start_backtest)self.run_btn.pack(side=tk.LEFT, padx=5)# 状态标签status_label = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)status_label.pack(fill=tk.X, padx=10, pady=2)# 结果显示区域result_frame = ttk.LabelFrame(self.root, text="显示月线数据回测结果——点击表格标题,可以进行排序", padding=5)result_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)tree_frame = ttk.Frame(result_frame)tree_frame.pack(fill=tk.BOTH, expand=True)v_scroll = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL)h_scroll = ttk.Scrollbar(tree_frame, orient=tk.HORIZONTAL)self.tree = ttk.Treeview(tree_frame, yscrollcommand=v_scroll.set, xscrollcommand=h_scroll.set)v_scroll.config(command=self.tree.yview)h_scroll.config(command=self.tree.xview)self.tree.grid(row=0, column=0, sticky="nsew")v_scroll.grid(row=0, column=1, sticky="ns")h_scroll.grid(row=1, column=0, sticky="ew")tree_frame.grid_rowconfigure(0, weight=1)tree_frame.grid_columnconfigure(0, weight=1)self.current_df = Nonedef load_stock_list(self):"""从通达信获取股票列表并填充组合框"""try:stocks = tq.get_stock_list('9', list_type=1)self.stock_dict = {}display_list = []for item in stocks:code = item['Code']name = item['Name']display = f"{name} ({code})"self.stock_dict[display] = codedisplay_list.append(display)self.stock_combo['values'] = display_listif display_list:for disp in display_list:if '999999' in self.stock_dict[disp]:self.stock_combo.set(disp)breakelse:self.stock_combo.current(0)except Exception as e:self.status_var.set(f"获取股票列表失败: {e}")self.stock_combo['values'] = ['获取失败']def set_default_params(self):self.start_entry.delete(0, tk.END)self.start_entry.insert(0, "19901231")today = datetime.now().strftime('%Y%m%d')self.end_entry.delete(0, tk.END)self.end_entry.insert(0, today)self.zdf_combo.set('5')self.zf_combo.set('10')def get_params(self):stock_display = self.stock_combo.get()if not stock_display:messagebox.showerror("参数错误", "请选择股票")return Nonestock = self.stock_dict.get(stock_display, '')if not stock:messagebox.showerror("参数错误", "无效的股票选择")return Nonestart = self.start_entry.get().strip()end = self.end_entry.get().strip()try:zdf = float(self.zdf_combo.get().strip())zf = float(self.zf_combo.get().strip())except ValueError:messagebox.showerror("参数错误", "涨幅阈值和振幅阈值必须是数字")return Noneif not (start.isdigit() and len(start) == 8):messagebox.showerror("参数错误", "起始日期格式必须为YYYYMMDD,如19901231")return Noneif not (end.isdigit() and len(end) == 8):messagebox.showerror("参数错误", "结束日期格式必须为YYYYMMDD,如20260408")return Nonereturn stock, start, end, zdf, zf# 新增:启动查看数据线程def start_view_data(self):params = self.get_params()if params is None:returnstock, start, end, _, _ = params # 查看数据不需要阈值参数self.view_btn.config(state=tk.DISABLED)self.run_btn.config(state=tk.DISABLED)self.status_var.set("正在获取原始月线数据,请稍候...")thread = threading.Thread(target=self.run_view_data_thread, args=(stock, start, end), daemon=True)thread.start()# 新增:查看数据后台线程def run_view_data_thread(self, stock, start, end):try:raw_df = MonthlyBacktestApp.get_datas(start, end, stock)self.root.after(0, self.update_raw_results, raw_df)except Exception as e:self.root.after(0, self.show_error, f"获取原始数据出错:{str(e)}")# 新增:更新表格显示原始数据def update_raw_results(self, df):"""将原始月线数据显示在表格中"""if df is None or df.empty:self.status_var.set("无有效数据,请检查股票代码或时间范围")self.view_btn.config(state=tk.NORMAL)self.run_btn.config(state=tk.NORMAL)return# 清空原有数据for row in self.tree.get_children():self.tree.delete(row)# 准备显示数据:索引为日期,添加为第一列display_df = df.copy()display_df.insert(0, '日期', display_df.index.strftime('%Y-%m-%d'))# 保留常见字段,可按需调整columns_to_show = ['日期', 'Open', 'High', 'Low', 'Close', 'Volume', 'Amount', 'zdf', 'zf']display_df = display_df[columns_to_show]self.display_df = display_df# 设置 Treeview 列all_columns = list(display_df.columns)self.tree['columns'] = all_columnsself.tree['show'] = 'headings'# 配置列标题、宽度,并绑定排序事件for col in all_columns:width = 90 if col == '日期' else 80self.tree.heading(col, text=col, command=lambda c=col: self.sort_column(c))self.tree.column(col, width=width, anchor='center')# 重置排序状态self.sort_column_name = Noneself.sort_reverse = False# 填充数据self._populate_tree()self.status_var.set(f"{self.stock_combo.get()}数据加载完成,共 {len(display_df)} 条记录")self.view_btn.config(state=tk.NORMAL)self.run_btn.config(state=tk.NORMAL)def start_backtest(self):params = self.get_params()if params is None:returnself.view_btn.config(state=tk.DISABLED) # 新增:禁用查看按钮self.run_btn.config(state=tk.DISABLED)self.status_var.set("正在获取数据并计算回测,请稍候...")thread = threading.Thread(target=self.run_backtest_thread, args=params, daemon=True)thread.start()def run_backtest_thread(self, stock, start, end, zdf, zf):try:result_df = MonthlyBacktestApp.get_test_datas(start, end, stock, zdf, zf)self.root.after(0, self.update_results, result_df)except Exception as e:self.root.after(0, self.show_error, f"回测出错:{str(e)}")def update_results(self, df):"""更新结果显示"""if df is None or df.empty:self.status_var.set("无有效数据,请检查股票代码或时间范围")self.view_btn.config(state=tk.NORMAL) # 新增:恢复查看按钮self.run_btn.config(state=tk.NORMAL)returnself.current_df = df.copy()# 清空原有数据for row in self.tree.get_children():self.tree.delete(row)# 重置索引,添加月份列columns = list(df.columns)self.display_df = df.reset_index()self.display_df.columns = ['月份'] + columns# 设置 Treeview 列all_columns = list(self.display_df.columns)self.tree['columns'] = all_columnsself.tree['show'] = 'headings'# 配置列标题、宽度,并绑定排序事件for col in all_columns:width = 60 if col == '月份' else 100self.tree.heading(col, text=col, command=lambda c=col: self.sort_column(c))self.tree.column(col, width=width, anchor='center')# 重置排序状态self.sort_column_name = Noneself.sort_reverse = False# 填充数据self._populate_tree()self.status_var.set(f"{self.stock_combo.get()}回测完成,数据如下显示")self.view_btn.config(state=tk.NORMAL) # 新增:恢复查看按钮self.run_btn.config(state=tk.NORMAL)def _populate_tree(self):"""清空 tree 并根据 self.display_df 重新填充数据"""for row in self.tree.get_children():self.tree.delete(row)for _, row in self.display_df.iterrows():formatted_row = []for item in row:if isinstance(item, float):formatted_row.append(f"{item:.2f}")else:formatted_row.append(str(item))self.tree.insert('', tk.END, values=formatted_row)def sort_column(self, col):"""点击列标题进行排序,切换升/降序并更新箭头"""if self.display_df is None:return# 判断排序方向if self.sort_column_name == col:self.sort_reverse = not self.sort_reverseelse:self.sort_column_name = colself.sort_reverse = False# 对 display_df 排序# 尝试按数值排序,失败则按字符串排序try:self.display_df = self.display_df.sort_values(by=col, ascending=not self.sort_reverse, na_position='last')except TypeError:# 如果混合类型导致错误,转为字符串排序self.display_df = self.display_df.sort_values(by=col, ascending=not self.sort_reverse, na_position='last', key=lambda x: x.astype(str))# 更新列标题箭头符号all_columns = list(self.display_df.columns)for c in all_columns:text = cif c == self.sort_column_name:arrow = ' ▼' if self.sort_reverse else ' ▲'text += arrowself.tree.heading(c, text=text)# 重新填充数据self._populate_tree()def show_error(self, err_msg):messagebox.showerror("错误", err_msg)self.status_var.set("操作失败")self.view_btn.config(state=tk.NORMAL) # 新增:恢复按钮self.run_btn.config(state=tk.NORMAL)# ================= 以下是加载数据与回测逻辑(已移入类中) =================@staticmethoddef get_datas(start, end, stock):"""获取月线数据并计算涨跌幅和振幅"""df = tq.get_market_data(field_list=[],start_time=start,end_time=end,stock_list=[stock],count=-1,dividend_type='front',period='1mon',fill_data=True)# 转换数据结构df = pd.concat({k: v.squeeze() for k, v in df.items()}, axis=1)# 计算涨跌幅 (zdf)df['zdf'] = ((df['Close'] - df['Close'].shift(1)) * 100 / df['Close'].shift(1)).round(2)# 计算振幅 (zf),阳线振幅为正,阴线振幅为负numerator = np.where(df['Close'] > df['Open'], df['High'] - df['Low'], df['Low'] - df['High'])df['zf'] = (numerator * 100 / df['Close'].shift(1)).round(2)return df@staticmethoddef get_test_datas(start, end, stock, stock_zdf, stock_zf):"""生成月线回测数据(按月份聚合,忽略年份)—— 优化版:使用 groupby + apply"""df = MonthlyBacktestApp.get_datas(start, end, stock)df['月份'] = df.index.month # 直接获取月份数字(1-12)datas = df.dropna()def monthly_stats(sub_df):total = len(sub_df)zdf_vals = sub_df['zdf']zf_vals = sub_df['zf']# 涨幅阈值统计cnt_up_th = (zdf_vals >= stock_zdf).sum()cnt_down_th = (zdf_vals <= -stock_zdf).sum()# 振幅阈值统计cnt_zf_up = (zf_vals >= stock_zf).sum()cnt_zf_down = (zf_vals <= -stock_zf).sum()# 涨跌次数cnt_up = (zdf_vals > 0).sum()cnt_down = (zdf_vals < 0).sum()return pd.Series({f'涨幅{stock_zdf}%次数': cnt_up_th,f'跌幅{-stock_zdf}%次数': cnt_down_th,'涨幅净次数': cnt_up_th - cnt_down_th,f'涨幅{stock_zdf}%胜率': round(100 * cnt_up_th / total, 2) if total else 0,f'振幅{stock_zf}%次数': cnt_zf_up,f'振幅{-stock_zf}%次数': cnt_zf_down,'振幅净次数': cnt_zf_up - cnt_zf_down,f'振幅{stock_zf}%胜率': round(100 * cnt_zf_up / total, 2) if total else 0,'上涨次数': cnt_up,'下跌次数': cnt_down,'胜率': round(100 * cnt_up / total, 2) if total else 0,'平均涨幅': round(zdf_vals.mean(), 2),'最大跌幅': zdf_vals.min(),'最大涨幅': zdf_vals.max()}, dtype='string')result = datas.groupby('月份').apply(monthly_stats)result.index.name = '月份'return resultif __name__ == '__main__':root = tk.Tk()app = MonthlyBacktestApp(root)root.mainloop()
温馨提示:股市有风险,投资需谨慎。本文所写内容仅供粉丝们参考使用,仅为个人研究观点表述,股友们须自己思考与分析股市。
-!! 完毕 ,感谢您的收看!!-
-------★★历史博文集合★★------
