
做量化分析或者盯盘复盘的小伙伴们,一定都遇到过这个痛点:如何高效、稳定、低成本地获取全市场个股的日线数据?
用免费API?容易被限流、封IP,甚至时不时遇到数据缺失。 买收费接口?对于个人开发者或初创团队来说,长期下来也是一笔不小的开销。
其实,只要你电脑上安装了通达信客户端,最全面、最稳定的历史行情数据就已经悄悄躺在你的硬盘里了!今天,给大家分享一个极其硬核的 Python 自动化方案——通达信本地数据极速同步工具。
为了解决量化回测中至关重要的“价格复权”问题,我们将采用“两步走”的终极架构:
.day 二进制文件与 mootdx 接口,实现每日收盘后的全自动增量更新(含北交所及 ETF 数据)。通达信本地的二进制 .day 文件存储的是除权前(未复权)的原始价格。对于量化分析来说,我们需要的是前复权或后复权数据。
怎么做?
建议第一次运行数据库同步时,先在通达信客户端中盘后下载好完整数据,然后通过“数据导出”功能,将全市场股票(含复权处理)导出为 .xls 格式到本地文件夹(例如 D:\离线数据)。


需要注意的是,通达信导出的 .xls 实际上是带有特殊表头和底部的制表符(\t)文本文件。我们编写了以下脚本,它会自动清洗冗余前缀(如 SZ#、BJ#),自动跳过无效行,并在 MySQL 中动态创建“宽表”(每张表存 150 只股票,突破 MySQL 列宽限制),一次性将所有历史复权数据灌入数据库!
import osimport globimport pymysqlimport pandas as pdfrom tqdm import tqdmfrom datetime import datetimeimport warnings# 忽略 pandas 的警告warnings.filterwarnings('ignore')# ==================== 1. 配置区域 ====================XLS_DIR = r'D:\离线数据'# 你的文件路径STOCKS_PER_TABLE = 140# 每张大表存140只股票# ==================== 3. 解析文件名 (处理 SZ#, SH#, BJ#) ====================defparse_filename_to_code(filename):""" 需求 1:去除 SZ#, SH#, BJ# 前缀,只保留数字代码 例如:D:\\...\\SZ#000001.xls -> 返回 000001 """ basename = os.path.basename(filename) # 获取文件名 SZ#000001.xls name_without_ext = os.path.splitext(basename)[0] # 去掉后缀 SZ#000001# 按照 # 分割,取后面部分if'#'in name_without_ext:return name_without_ext.split('#')[1] # 返回 000001else:# 如果没有#,直接返回文件名(以防万一)return name_without_ext# ==================== 4. 核心:读取通达信特殊格式文件 ====================defread_tdx_file(filepath, stock_code):""" 需求 2 & 3 & 4: - 跳过第 1 行 (股票名) - 读取第 2 行 (表头) - 解析日期格式 1991/4/3 -> 1991-04-03 - 映射列名 开盘 -> 000001_open """ data_dict = {}try:# 【关键修正】:使用 read_csv,分隔符为 \\t (制表符),编码 gbk# skiprows=1 表示跳过第 1 行(股票信息),直接读第 2 行作为表头 df = pd.read_csv(filepath, sep='\t', encoding='gbk', skiprows=1, on_bad_lines='skip')# 清除列名可能存在的空格 (例如 " 开盘 " -> "开盘") df.columns = df.columns.str.strip()# 检查是否读取成功if'日期'notin df.columns:# 备用方案:有时候通达信导出的格式没有第一行,尝试不跳过try: df = pd.read_csv(filepath, sep='\t', encoding='gbk', skiprows=0, on_bad_lines='skip') df.columns = df.columns.str.strip()if'日期'notin df.columns:return {}except:return {}# 遍历每一行数据for _, row in df.iterrows():try:# --- 日期处理 (需求 3) --- date_raw = row['日期']if pd.isna(date_raw): continue date_str = str(date_raw).strip()# 过滤掉文件末尾的 "数据来源:通达信"if'数据来源'in date_str ornot date_str:continue# 将 1991/4/3 转为 1991-04-03try: date_obj = pd.to_datetime(date_str) date_key = date_obj.strftime('%Y-%m-%d')except:continueif date_key notin data_dict: data_dict[date_key] = {'date': date_key}# --- 数据获取与列名映射 (需求 4) ---# 辅助函数:处理空值和千分位逗号defget_val(col): val = row.get(col)if pd.isna(val): returnNone s = str(val).strip().replace(',', '') # 去掉逗号if s == '': returnNonereturn float(s)# 映射:表头 -> 数据库列 (例如 000001_open) data_dict[date_key][f'{stock_code}_open'] = get_val('开盘') data_dict[date_key][f'{stock_code}_high'] = get_val('最高') data_dict[date_key][f'{stock_code}_low'] = get_val('最低') data_dict[date_key][f'{stock_code}_close'] = get_val('收盘') data_dict[date_key][f'{stock_code}_volume'] = get_val('成交量') data_dict[date_key][f'{stock_code}_amount'] = get_val('成交额')except Exception:continuereturn data_dictexcept Exception as e: print(f"读取错误 {filepath}: {e}")return {}# ==================== 5. 数据库操作 ====================defcreate_table_if_not_exists(conn, table_name, stock_codes): cursor = conn.cursor()# 基础列 cols = ["`id` INT AUTO_INCREMENT PRIMARY KEY", "`date` DATE NOT NULL"]# 动态生成列:000001_open, 000001_high ...for code in stock_codes: cols.append(f"`{code}_open` DECIMAL(10,2)") cols.append(f"`{code}_high` DECIMAL(10,2)") cols.append(f"`{code}_low` DECIMAL(10,2)") cols.append(f"`{code}_close` DECIMAL(10,2)") cols.append(f"`{code}_volume` BIGINT") cols.append(f"`{code}_amount` DECIMAL(20,2)")# 建表语句 sql = f""" CREATE TABLE IF NOT EXISTS `{table_name}` ({', '.join(cols)}, UNIQUE KEY `uk_date` (`date`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='个股 XLS 日线数据'; """ cursor.execute(sql)# 检查列是否缺失(用于追加新股票) cursor.execute(f"SHOW COLUMNS FROM `{table_name}`") existing_cols = {row[0] for row in cursor.fetchall()} missing_sqls = []for code in stock_codes:iff"{code}_open"notin existing_cols: missing_sqls.append(f"ADD COLUMN `{code}_open` DECIMAL(10,2)") missing_sqls.append(f"ADD COLUMN `{code}_high` DECIMAL(10,2)") missing_sqls.append(f"ADD COLUMN `{code}_low` DECIMAL(10,2)") missing_sqls.append(f"ADD COLUMN `{code}_close` DECIMAL(10,2)") missing_sqls.append(f"ADD COLUMN `{code}_volume` BIGINT") missing_sqls.append(f"ADD COLUMN `{code}_amount` DECIMAL(20,2)")if missing_sqls: print(f" 正在为表 {table_name} 添加 {len(missing_sqls)//6} 只新股票的列...") cursor.execute(f"ALTER TABLE `{table_name}` {', '.join(missing_sqls)}") conn.commit() cursor.close()definsert_data_batch(conn, table_name, stock_codes, data_dict):ifnot data_dict:return0 cursor = conn.cursor()# 准备列名 db_cols = ['date']for code in stock_codes: db_cols.extend([f'{code}_open', f'{code}_high', f'{code}_low', f'{code}_close', f'{code}_volume', f'{code}_amount' ])# 准备数据 values_list = [] sorted_dates = sorted(data_dict.keys())for d in sorted_dates: row = data_dict[d] vals = [d] # 第一列是日期for code in stock_codes: vals.append(row.get(f'{code}_open')) vals.append(row.get(f'{code}_high')) vals.append(row.get(f'{code}_low')) vals.append(row.get(f'{code}_close')) vals.append(row.get(f'{code}_volume')) vals.append(row.get(f'{code}_amount')) values_list.append(vals)# 构造 SQL cols_sql = ', '.join([f'`{c}`'for c in db_cols]) placeholders = ', '.join(['%s'] * len(db_cols))# UPDATE 部分 update_parts = [f"`{c}`=VALUES(`{c}`)"for c in db_cols if c != 'date'] sql = f""" INSERT INTO `{table_name}` ({cols_sql}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {', '.join(update_parts)} """try: cursor.executemany(sql, values_list) conn.commit()return cursor.rowcountexcept Exception as e: print(f"写入失败:{e}") conn.rollback()return0finally: cursor.close()# ==================== 6. 主程序 ====================defmain(): print(">>> 开始处理通达信数据...")# 1. 扫描文件 files_map = {} # {'000001': '路径', ...} all_files = glob.glob(os.path.join(XLS_DIR, "*.xls")) + glob.glob(os.path.join(XLS_DIR, "*.xlsx")) print(f"正在扫描 {XLS_DIR} ...")for f in all_files: code = parse_filename_to_code(f)if code: files_map[code] = f sorted_codes = sorted(files_map.keys()) total_files = len(sorted_codes) print(f"找到 {total_files} 个有效股票文件 (去除前缀后)")if total_files == 0:return# 2. 连接数据库 conn = connect_to_database()# 3. 分批处理 num_batches = (total_files + STOCKS_PER_TABLE - 1) // STOCKS_PER_TABLEfor i in range(num_batches): batch_codes = sorted_codes[i*STOCKS_PER_TABLE : (i+1)*STOCKS_PER_TABLE] table_name = f"stock_batch_{i+1}"# 统一使用 stock_batch 命名 print(f"\nProcessing Batch {i+1}/{num_batches}: {table_name} ({len(batch_codes)} 只股票)")# 建表 create_table_if_not_exists(conn, table_name, batch_codes)# 读取数据 batch_data = {}for code in tqdm(batch_codes, desc="读取文件", leave=False): filepath = files_map[code] stock_data = read_tdx_file(filepath, code)# 合并数据for date_key, vals in stock_data.items():if date_key notin batch_data: batch_data[date_key] = {'date': date_key} batch_data[date_key].update(vals)# 写入数据库if batch_data: print(f" 正在写入 {len(batch_data)} 天的数据...") cnt = insert_data_batch(conn, table_name, batch_codes, batch_data) print(f" 写入完成,影响行数:{cnt}")else: print(" 本批次无有效数据 (可能是文件格式不对或为空)") conn.close() print("\n所有任务完成!")if __name__ == "__main__": main()运行完这段代码后,你的 MySQL 数据库中就拥有了从上市到今天完美的、经过复权的底层数据资产!
基准数据有了,每天收盘后总不能再手动导一次吧? 这时候就要用到我们的第二套方案:直读底层二进制文件 + mootdx 接口补充。
这套代码采用了 Python 的 struct 模块,通过解包通达信每天自动更新的二进制 .day 文件,实现降维打击般的秒级读取速度。对于极个别缺失的文件,通过 mootdx 进行在线补充。
由于 mootdx 库原作者已经很久没有更新,其内置的服务器节点大多失效,且未原生支持“北交所(北京证券交易所)”。 在运行每日增量同步代码前,我们需要对 mootdx 进行一次硬核魔改:
找到你 Python 环境中的 site-packages/mootdx/consts.py 文件,将里面的内容全部替换为以下最新配置(主要是补充可用主站 IP,并在顶部定义 MARKET_BJ = 2):
# 市场定义 (新增北交所支持)MARKET_SZ = 0# 深市MARKET_SH = 1# 沪市MARKET_BJ = 2# 北交# K线种类KLINE_5MIN = 0; KLINE_15MIN = 1; KLINE_30MIN = 2; KLINE_1HOUR = 3; KLINE_DAILY = 4; KLINE_WEEKLY = 5; KLINE_MONTHLY = 6; KLINE_EX_1MIN = 7; KLINE_1MIN = 8; KLINE_RI_K = 9; KLINE_3MONTH = 10; KLINE_YEARLY = 11MAX_TRANSACTION_COUNT = 2000MAX_KLINE_COUNT = 800FREQUENCY =['5m', '15m', '30m', '1h', 'day', 'week', 'mon', 'ex_1m', '1m', 'dk', '3mon', 'year']BLOCK_SZ = 'block_zs.dat'; BLOCK_FG = 'block_fg.dat'; BLOCK_GN = 'block_gn.dat'; BLOCK_DEFAULT = 'block.dat'TYPE_FLATS = 0; TYPE_GROUP = 1# 替换为当前最新的通达信活跃节点HQ_HOSTS =[ ('深圳双线主站1', '110.41.147.114', 7709), ('深圳双线主站2', '8.129.13.54', 7709), ('深圳双线主站3', '120.24.149.49', 7709), ('深圳双线主站4', '47.113.94.204', 7709), ('上海双线主站1', '124.70.176.52', 7709), ('上海双线主站2', '47.100.236.28', 7709), ('北京双线主站1', '121.36.54.217', 7709), ('广州双线主站1', '124.71.85.110', 7709), ('上海双线主站7', '106.14.201.131', 7709), ('上海双线主站8', '106.14.190.242', 7709), ('深圳双线主站8', '47.107.228.47', 7719), ('广州双线主站5', '116.205.163.254', 7709)]EX_HOSTS =[ ('银河阿里云扩展行情', '47.112.95.207', 7720), ('银河杭州电信扩展行情', '218.75.75.18', 7720),]GP_HOSTS =[('默认财务数据线路', '120.76.152.87', 7709)]CONFIG = {'SERVER': {'HQ': HQ_HOSTS, 'EX': EX_HOSTS, 'GP': GP_HOSTS},'BESTIP': {'HQ': '', 'EX': '', 'GP': ''},'TDXDIR': 'C:/new_tdx',}defreturn_last_value(retry_state):return retry_state.outcome.result()(注:如果第三方库底层连接代码里对 Market 进行了 if market in[0, 1] 限制,记得将 2 也放行即可无缝获取北交所)
魔改完成后,你只需要把它部署在电脑上,每天收盘后自动运行。代码会智能判断数据库中最后的更新日期,老股票只取增量,新上市股票自动在数据库中 ALTER 增加列并全量回测,连通达信自选板块(.blk)和 ETF 行情都能一并搞定!
import osimport structimport pymysqlimport pandas as pdfrom mootdx.reader import Readerfrom tqdm import tqdmfrom datetime import datetime, timedelta# ==================== 配置 ====================TDX_DIR = 'D:/交易/通达信'BLOCK_FILE = r'D:\交易\通达信\T0002\blocknew\QABJ.blk'STOCKS_PER_TABLE = 140# ==================== 数据库连接 ====================defconnect_to_database():try: conn = pymysql.connect( host='localhost', port=3306, user='local', # 替换为你的数据库账号 password='123456', # 替换为你的数据库密码 db='ggsj', charset='utf8mb4' )return connexcept Exception as e: print(f"数据库连接失败: {str(e)}")raise# ==================== 解析通达信 .day 文件 ====================defparse_tdx_day_file(filepath):"""解析通达信 .day 文件,返回 DataFrame""" data = []try:with open(filepath, 'rb') as f:whileTrue: stock_date = f.read(4) stock_open = f.read(4) stock_high = f.read(4) stock_low = f.read(4) stock_close = f.read(4) stock_amount = f.read(4) stock_vol = f.read(4) stock_reservation = f.read(4)ifnot stock_date:break stock_date = struct.unpack("l", stock_date)[0] # 4 字节 如 20091229 stock_open = struct.unpack("l", stock_open)[0] / 100# 开盘价 stock_high = struct.unpack("l", stock_high)[0] / 100# 最高价 stock_low = struct.unpack("l", stock_low)[0] / 100# 最低价 stock_close = struct.unpack("l", stock_close)[0] / 100# 收盘价 stock_amount = struct.unpack("f", stock_amount)[0] # 成交额 stock_vol = struct.unpack("l", stock_vol)[0] / 100# 成交量(单位:手)# 格式化日期 date_obj = datetime.strptime(str(stock_date), '%Y%m%d') data.append({'date': date_obj,'open': stock_open,'high': stock_high,'low': stock_low,'close': stock_close,'amount': stock_amount,'volume': stock_vol })if data: df = pd.DataFrame(data) df.set_index('date', inplace=True)return dfelse:returnNoneexcept Exception:returnNonedefresolve_day_file_path(code): code = str(code).zfill(6)# 根据股票代码前缀智能判断市场优先级# 6开头 - 沪市股票,优先匹配 sh# 0、3开头 - 深市股票,优先匹配 sz # 4、8开头 - 北交所股票,优先匹配 bjif code.startswith('6'): candidates = [ os.path.join(TDX_DIR, 'vipdoc', 'sh', 'lday', f'sh{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'sz', 'lday', f'sz{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'bj', 'lday', f'bj{code}.day') ]elif code.startswith(('0', '3')): candidates = [ os.path.join(TDX_DIR, 'vipdoc', 'sz', 'lday', f'sz{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'sh', 'lday', f'sh{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'bj', 'lday', f'bj{code}.day') ]elif code.startswith(('4', '8')): candidates = [ os.path.join(TDX_DIR, 'vipdoc', 'bj', 'lday', f'bj{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'sz', 'lday', f'sz{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'sh', 'lday', f'sh{code}.day') ]else:# 其他情况使用默认顺序 candidates = [ os.path.join(TDX_DIR, 'vipdoc', 'bj', 'lday', f'bj{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'sh', 'lday', f'sh{code}.day'), os.path.join(TDX_DIR, 'vipdoc', 'sz', 'lday', f'sz{code}.day') ]for filepath in candidates:if os.path.exists(filepath):return filepathreturnNone# ==================== 解析板块文件 ====================defparse_block_file(filepath):"""解析通达信自定义板块文件,返回所有股票代码列表(包含北交所)""" stocks = []ifnot os.path.exists(filepath): print(f"文件不存在: {filepath}")return stockstry:with open(filepath, 'r', encoding='gbk', errors='ignore') as f: content = f.read() lines = content.splitlines()for line in lines: line = line.strip()if len(line) >= 7: symbol = line[1:7]if symbol.isdigit() and len(symbol) == 6: symbol = symbol.zfill(6) stocks.append(symbol)except Exception as e: print(f"解析板块文件出错: {e}")return stocks# ==================== 获取数据库结构信息 ====================defget_db_structure(conn, table_prefix):"""获取数据库中现有的表结构和包含的股票""" cursor = None table_mapping = {} # table_name -> [stock_codes] all_existing_stocks = set()try: cursor = conn.cursor()# 查找所有批次表 cursor.execute(f"SHOW TABLES LIKE '{table_prefix}%'") tables = sorted([row[0] for row in cursor.fetchall()], key=lambda x: int(x.split('_')[-1]))for table_name in tables:# 获取列名 cursor.execute(f"SHOW COLUMNS FROM `{table_name}`") columns = cursor.fetchall()# 提取股票代码 stocks = []for col in columns: col_name = col[0]if'_'in col_name and col_name.endswith('_open'): stock_code = col_name.replace('_open', '') stocks.append(stock_code) all_existing_stocks.add(stock_code) table_mapping[table_name] = stocksreturn table_mapping, all_existing_stocks, tablesfinally:if cursor: cursor.close()# ==================== 更新数据库结构 ====================defupdate_schema(conn, all_stocks_from_file, table_prefix):"""更新数据库结构:添加新股票列或创建新表""" cursor = Nonetry: cursor = conn.cursor()# 获取当前数据库结构 table_mapping, existing_stocks, tables = get_db_structure(conn, table_prefix)# 找出新增的股票 new_stocks = [s for s in all_stocks_from_file if s notin existing_stocks]ifnot new_stocks: print("没有发现新上市的代码")return table_mapping, [] print(f"发现 {len(new_stocks)} 只新代码,正在更新数据库结构...")# 处理新股票 current_new_stock_idx = 0# 1. 检查最后一个表是否已满if tables: last_table = tables[-1] current_count = len(table_mapping[last_table])if current_count < STOCKS_PER_TABLE:# 还可以往最后一个表添加 slots_available = STOCKS_PER_TABLE - current_count stocks_to_add = new_stocks[current_new_stock_idx : current_new_stock_idx + slots_available]if stocks_to_add: print(f" 向 {last_table} 添加 {len(stocks_to_add)} 只新代码...")# 构建 ALTER TABLE 语句 alter_parts = []for code in stocks_to_add: alter_parts.append(f"ADD COLUMN `{code}_open` DECIMAL(10,2)") alter_parts.append(f"ADD COLUMN `{code}_high` DECIMAL(10,2)") alter_parts.append(f"ADD COLUMN `{code}_low` DECIMAL(10,2)") alter_parts.append(f"ADD COLUMN `{code}_close` DECIMAL(10,2)") alter_parts.append(f"ADD COLUMN `{code}_volume` BIGINT") alter_parts.append(f"ADD COLUMN `{code}_amount` DECIMAL(20,2)") alter_sql = f"ALTER TABLE `{last_table}` {', '.join(alter_parts)}" cursor.execute(alter_sql) conn.commit()# 更新映射 table_mapping[last_table].extend(stocks_to_add) current_new_stock_idx += len(stocks_to_add)# 2. 如果还有剩余新股票,创建新表while current_new_stock_idx < len(new_stocks): next_batch_idx = len(tables) + 1 new_table_name = f"{table_prefix}{next_batch_idx}" stocks_for_new_table = new_stocks[current_new_stock_idx : current_new_stock_idx + STOCKS_PER_TABLE] print(f" 创建新表 {new_table_name},包含 {len(stocks_for_new_table)} 只代码...")# 构建列定义 columns_def = ["id INT AUTO_INCREMENT PRIMARY KEY", "date DATE NOT NULL"]for code in stocks_for_new_table: columns_def.append(f"`{code}_open` DECIMAL(10,2)") columns_def.append(f"`{code}_high` DECIMAL(10,2)") columns_def.append(f"`{code}_low` DECIMAL(10,2)") columns_def.append(f"`{code}_close` DECIMAL(10,2)") columns_def.append(f"`{code}_volume` BIGINT") columns_def.append(f"`{code}_amount` DECIMAL(20,2)") create_sql = f""" CREATE TABLE `{new_table_name}` ({', '.join(columns_def)}, UNIQUE KEY uk_date (date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='板块日线数据' """ cursor.execute(create_sql) conn.commit()# 更新状态 tables.append(new_table_name) table_mapping[new_table_name] = stocks_for_new_table current_new_stock_idx += len(stocks_for_new_table)return table_mapping, new_stocksexcept Exception as e: print(f"更新数据库结构失败: {e}") conn.rollback()raisefinally:if cursor: cursor.close()# ==================== 收集数据 ====================defcollect_data(reader, stock_config):""" 收集数据 stock_config: dict, {stock_code: start_date} 如果 start_date 为 None,则获取全量数据 如果 start_date 有值,则获取该日期之后的数据 """ all_data = {} failed_stocks = []for code in tqdm(stock_config.keys(), desc=" 读取本地行情", leave=False):try: day_file = resolve_day_file_path(code) df = parse_tdx_day_file(day_file) if day_file elseNoneif df isNoneor df.empty: df = reader.daily(symbol=code)if df isnotNoneandnot df.empty: df = df.reset_index()if df isNoneor df.empty: failed_stocks.append(code)continueifnot isinstance(df.index, pd.DatetimeIndex) and'date'notin df.columns: failed_stocks.append(code)continueif'date'notin df.columns: df = df.reset_index() start_date = stock_config[code]if start_date: ts_start_date = pd.Timestamp(start_date) df = df[df['date'] >= ts_start_date]if df.empty:continuefor _, row in df.iterrows(): date = row['date'].strftime('%Y-%m-%d')if date notin all_data: all_data[date] = {'date': date} all_data[date][f'{code}_open'] = float(row['open']) if pd.notna(row['open']) elseNone all_data[date][f'{code}_high'] = float(row['high']) if pd.notna(row['high']) elseNone all_data[date][f'{code}_low'] = float(row['low']) if pd.notna(row['low']) elseNone all_data[date][f'{code}_close'] = float(row['close']) if pd.notna(row['close']) elseNone all_data[date][f'{code}_volume'] = int(row['volume']) if pd.notna(row['volume']) elseNone all_data[date][f'{code}_amount'] = float(row['amount']) if pd.notna(row['amount']) elseNoneexcept Exception: failed_stocks.append(code)continuereturn all_data, failed_stocks# ==================== 插入/更新数据 ====================defupsert_data(conn, table_name, stock_codes, data_dict):"""插入或更新数据""" cursor = Nonetry: cursor = conn.cursor() inserted = 0 updated = 0for date in sorted(data_dict.keys()): row_data = data_dict[date]# 检查该日期是否已存在 cursor.execute(f"SELECT COUNT(*) FROM `{table_name}` WHERE date = %s", (date,)) exists = cursor.fetchone()[0] > 0if exists:# 更新已有记录 set_parts = [] values = [] has_update = Falsefor code in stock_codes:# 只有当该股票在该日期有数据时才更新iff'{code}_open'in row_data:for field in ['open', 'high', 'low', 'close', 'volume', 'amount']: col_name = f'{code}_{field}' set_parts.append(f"`{col_name}` = %s") values.append(row_data.get(col_name)) has_update = Trueif has_update: values.append(date) # WHERE date = %s update_sql = f"UPDATE `{table_name}` SET {', '.join(set_parts)} WHERE date = %s" cursor.execute(update_sql, values) updated += 1else:# 插入新记录 values = [date]# 构建列名 columns = ['date']for code in stock_codes: columns.extend([f'{code}_open', f'{code}_high', f'{code}_low', f'{code}_close', f'{code}_volume', f'{code}_amount' ]) values.append(row_data.get(f'{code}_open')) values.append(row_data.get(f'{code}_high')) values.append(row_data.get(f'{code}_low')) values.append(row_data.get(f'{code}_close')) values.append(row_data.get(f'{code}_volume')) values.append(row_data.get(f'{code}_amount')) placeholders = ', '.join(['%s'] * len(columns)) column_names = ', '.join([f'`{col}`'for col in columns]) insert_sql = f"INSERT INTO `{table_name}` ({column_names}) VALUES ({placeholders})" cursor.execute(insert_sql, values) inserted += 1 conn.commit()return inserted, updatedexcept Exception as e: print(f"插入/更新数据失败: {e}") conn.rollback()raisefinally:if cursor: cursor.close()# ==================== 获取数据库最新日期 ====================defget_latest_date_in_db(conn, table_prefix):"""获取数据库中最新的交易日期""" cursor = Nonetry: cursor = conn.cursor() cursor.execute(f"SHOW TABLES LIKE '{table_prefix}%'") tables = [row[0] for row in cursor.fetchall()]ifnot tables:returnNone# 从第一个表获取最新日期 cursor.execute(f"SELECT MAX(date) FROM `{tables[0]}`") result = cursor.fetchone()if result and result[0]:return result[0]returnNonefinally:if cursor: cursor.close()# ==================== 处理同步任务 ====================defprocess_sync_task(conn, reader, block_file, table_prefix, task_name): print("\n" + "="*60) print(f"开始任务: {task_name}") print(f"板块文件: {block_file}") print(f"数据表前缀: {table_prefix}") print("="*60)# 解析板块文件 print(f"\n正在读取板块文件: {block_file}")ifnot os.path.exists(block_file): print(f"文件不存在: {block_file},跳过该任务")return all_stocks_from_file = parse_block_file(block_file) print(f"板块文件中共有 {len(all_stocks_from_file)} 只代码")ifnot all_stocks_from_file: print("未找到代码,跳过")return# 更新数据库结构 table_mapping, new_stocks = update_schema(conn, all_stocks_from_file, table_prefix)# 获取全局最新日期 latest_db_date = get_latest_date_in_db(conn, table_prefix) print(f"\n数据库现有最新日期: {latest_db_date}")# 逐表处理 total_inserted = 0 total_updated = 0 print("\n开始数据更新...")for table_name, stock_codes in table_mapping.items(): print(f"\n处理表: {table_name} ({len(stock_codes)} 只代码)")# 构建配置:新股全量,旧股增量 stock_config = {} new_cnt = 0 old_cnt = 0for code in stock_codes:if code in new_stocks: stock_config[code] = None# 全量 new_cnt += 1else: stock_config[code] = latest_db_date # 增量 old_cnt += 1 print(f" 包含: {new_cnt} 只新代码 (全量), {old_cnt} 只旧代码 (增量)")# 收集数据 print(f" 正在收集数据...") batch_data, failed = collect_data(reader, stock_config)ifnot batch_data: print(f" 无数据需要更新")continue dates = sorted(batch_data.keys()) print(f" 收集到 {len(dates)} 个交易日数据: {dates[0]} ~ {dates[-1]}")# 插入/更新 print(f" 正在写入数据库...") inserted, updated = upsert_data(conn, table_name, stock_codes, batch_data) total_inserted += inserted total_updated += updated print(f" ✓ 插入 {inserted} 行,更新 {updated} 行")if failed: print(f" ! 失败代码: {len(failed)} 只") print(f"\n{task_name} 任务完成!总插入: {total_inserted} 行, 总更新: {total_updated} 行")# ==================== 主函数 ====================defmain(): print("="*60) print("通达信数据智能同步工具 (支持多板块)") print("="*60)# 1. 连接数据库 conn = connect_to_database() print("数据库连接成功")try:# 2. 初始化 Reader print("\n正在初始化 Reader...") reader = Reader.factory(market='std', tdxdir=TDX_DIR)# 3. 执行任务# 任务1: 股票 (QABJ) -> stock_batch_ process_sync_task(conn, reader, BLOCK_FILE, 'stock_batch_', '股票行情(QABJ)')# 任务2: ETF (ETFSJ) -> etf_batch_ etf_block_file = r'D:\交易\通达信\T0002\blocknew\ETFSJ.blk' process_sync_task(conn, reader, etf_block_file, 'etf_batch_', 'ETF行情(ETFSJ)')except Exception as e: print(f"\n程序运行出错: {e}")import traceback traceback.print_exc()finally: conn.close() print("\n数据库连接已关闭")if __name__ == "__main__": main()
这里为您重新撰写了总结部分。新版本将“免费”、“极速”、“精准”作为三大核心亮点进行了提炼和升华,排版也更具视觉冲击力,非常适合作为硬核技术文章的收尾,能够有效调动读者的情绪并促使他们点赞转发:
这套“双剑合璧”的架构,不仅是一次 Python 极客的硬核实践,更是为散户和量化玩家量身定制的终极武器。它完美诠释了三个词:免费、极速、精准!
.day 二进制文件进行 C 语言级别的结构体解包。全市场 5000+ 股票与 ETF 的每日增量同步只需短短几秒,真正做到“断网也能跑,连网秒更新”!拥有了这套系统,你不仅省下了每年动辄成千上万的商业数据接口费用,更为未来的 AI 模型训练、量化选股和策略回测打下了最坚实的数据底座。彻底告别“看天吃饭”的外部接口,让量化回归纯粹!
如果这篇硬核干货对你搭建个人交易体系有启发,别忘了顺手点个【赞】和【在看】哦!也欢迎转发给身边做量化、写代码或者天天复盘的战友们,我们下期再见!🚀