在量化投资与日常炒股分析中,“资金流向”是一个极具参考价值的指标。个股资金的大单净流入、主力流向,往往预示着市场资金的博弈方向。
然而,手动在财经网站上点击翻页、复制粘贴几千只股票的数据,不仅耗时耗力,还容易出错。今天,我们将用 Python 手把手教你编写一个全自动资金流向爬虫,一键下载同花顺数据中心个股资金流向数据,并将其整理为标准的 CSV 格式。
注意: 本教程及代码仅供个人学习研究及数据分析使用,勿用于商用,后果自负。
我们的目标网址是同花顺财经的数据中心:https://data.10jqka.com.cn/funds/ggzjl/
在这个页面上,展示了全市场所有个股的即时资金流入情况,包含最新价、涨跌幅、流入/流出资金、净流入额以及成交额等关键指标。
通过仔细观察我们发现,同花顺的分页网页 URL:https://data.10jqka.com.cn/funds/ggzjl/field/zdf/order/desc/page/{页码}/
该链接是由服务器直接将完整数据渲染在 HTML 中的。直接请求该网页 URL,不需要提供任何动态加密的hexin-v 参数! 我们只需要使用 Python 模拟正常的浏览器访问,直接抓取网页并解析 HTML 表格即可。
本代码需要安装以下几个常用的 Python 库:
pip install requests pandas beautifulsoup4 lxml tqdm
网页展示的资金数据一般带有中文单位,例如1.08亿 或9896.31万。这虽然方便人类阅读,但在 Excel 中却无法进行求和、求平均或重新排序。 我们需要编写两个转换函数,把它们转化为标准的浮点数:
defclean_amount(val_str):ifnot val_str:return0.0 val_str = val_str.strip().replace(",", "")if val_str in ("-", "--"):return0.0 match = re.match(r"^([+-]?\d+(?:\.\d+)?)(亿|万)?$", val_str)ifnot match:try:return float(val_str)except ValueError:return0.0 num_part, unit = match.groups() num = float(num_part)if unit == "亿":return num * 100_000_000.0elif unit == "万":return num * 10_000.0return num
股票数量在不断变动,网页总页数也会随之变化。我们先爬取第 1 页,利用正则表达式从底部的分页控件1/104 中提取出最大页数(如 104 页),从而自适应执行后续的循环爬取。
为防被同花顺临时限制 IP,我们引入以下策略:
- 在
headers 中随机选取不同的浏览器User-Agent; - 每次翻页请求间,随机睡眠 1.2 至 2.5 秒,模拟真人操作;
附:源码
import osimport reimport timeimport randomimport datetimeimport pandas as pdfrom bs4 import BeautifulSoupfrom tqdm import tqdmimport urllib3urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)USER_AGENTS = ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36","Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36","Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0"]defclean_amount(val_str):ifnot val_str:return0.0 val_str = val_str.strip().replace(",", "")if val_str == "-"or val_str == "--":return0.0 match = re.match(r"^([+-]?\d+(?:\.\d+)?)(亿|万)?$", val_str)ifnot match:try:return float(val_str)except ValueError:return0.0 num_part, unit = match.groups() num = float(num_part)if unit == "亿":return num * 100_000_000.0elif unit == "万":return num * 10_000.0else:return numdefclean_percent(val_str):ifnot val_str:return0.0 val_str = val_str.strip()if val_str == "-"or val_str == "--":return0.0if val_str.endswith("%"):try:return float(val_str[:-1])except ValueError:return0.0try:return float(val_str)except ValueError:return0.0defscrape_page(page_num, sort_field="zdf", sort_order="desc"): url = f"https://data.10jqka.com.cn/funds/ggzjl/field/{sort_field}/order/{sort_order}/page/{page_num}/" headers = {'User-Agent': random.choice(USER_AGENTS),'Referer': 'https://data.10jqka.com.cn/funds/ggzjl/','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', }for attempt in range(3):try: time.sleep(random.uniform(1.2, 2.5))import requests response = requests.get(url, headers=headers, timeout=15, verify=False)if response.status_code == 200:if"chameleon"in response.text and"window.location.href"in response.text: print(f"\n[警告] 第 {page_num} 页被防爬虫拦截,正在重试第 {attempt + 1} 次...") time.sleep(random.uniform(3, 5))continue response.encoding = 'gbk'return response.textelse: print(f"\n[错误] 请求第 {page_num} 页失败,状态码: {response.status_code},重试中...") time.sleep(random.uniform(2, 4))except Exception as e: print(f"\n[异常] 请求第 {page_num} 页发生异常: {str(e)},重试中...") time.sleep(random.uniform(2, 4))returnNonedefparse_html(html_content):ifnot html_content:return [] soup = BeautifulSoup(html_content, 'lxml') table = soup.find('table', class_='m-table')ifnot table:return [] tbody = table.find('tbody')ifnot tbody:return [] rows = tbody.find_all('tr') data_list = []for row in rows: tds = row.find_all('td')if len(tds) < 10:continue rank = tds[0].text.strip() code = tds[1].text.strip() name = tds[2].text.strip() price = tds[3].text.strip() change_pct_str = tds[4].text.strip() turnover_pct_str = tds[5].text.strip() flowin_str = tds[6].text.strip() flowout_str = tds[7].text.strip() net_str = tds[8].text.strip() amount_str = tds[9].text.strip()try: price_val = float(price) if price and price != "-"else0.0except ValueError: price_val = 0.0 change_pct_val = clean_percent(change_pct_str) turnover_pct_val = clean_percent(turnover_pct_str) flowin_val = clean_amount(flowin_str) flowout_val = clean_amount(flowout_str) net_val = clean_amount(net_str) amount_val = clean_amount(amount_str) data_list.append({"排名": int(rank) if rank.isdigit() else rank,"股票代码": code,"股票简称": name,"最新价": price_val,"涨跌幅": change_pct_str,"涨跌幅_数值(%)": change_pct_val,"换手率": turnover_pct_str,"换手率_数值(%)": turnover_pct_val,"流入资金": flowin_str,"流入资金_元": flowin_val,"流出资金": flowout_str,"流出资金_元": flowout_val,"净额": net_str,"净额_元": net_val,"成交额": amount_str,"成交额_元": amount_val })return data_listdefmain(): print("=" * 60) print(" 同花顺个股资金流入数据爬取程序") print("=" * 60) print("[1/3] 正在建立连接并获取第一页数据...") first_page_html = scrape_page(1)ifnot first_page_html: print("[错误] 无法连接到同花顺数据中心,请检查网络或稍后再试。")return soup = BeautifulSoup(first_page_html, 'lxml') page_info = soup.find('span', class_='page_info') total_pages = 104if page_info: match = re.search(r'/(\d+)', page_info.text)if match: total_pages = int(match.group(1)) print(f"成功获取第一页!检测到总页数为: {total_pages} 页")import sys pages_to_scrape = total_pagesif len(sys.argv) > 1:try: pages_to_scrape = min(max(1, int(sys.argv[1])), total_pages) print(f"[提示] 检测到命令行参数,将爬取前 {pages_to_scrape} 页数据")except ValueError: print(f"[提示] 命令行参数 '{sys.argv[1]}' 无效,将通过交互式输入确认页数")if len(sys.argv) <= 1: user_pages = input(f"请输入需要爬取的页数 (1-{total_pages},直接回车则默认爬取全部 {total_pages} 页): ").strip()if user_pages:try: pages_to_scrape = min(max(1, int(user_pages)), total_pages)except ValueError: print(f"[提示] 输入无效,默认爬取全部 {total_pages} 页") pages_to_scrape = total_pageselse: pages_to_scrape = total_pages print(f"\n[2/3] 开始爬取数据,共计 {pages_to_scrape} 页...") all_data = [] first_page_data = parse_html(first_page_html) all_data.extend(first_page_data) print(f"第 1/{pages_to_scrape} 页解析完成,获取到 {len(first_page_data)} 条记录")if pages_to_scrape > 1: pbar = tqdm(range(2, pages_to_scrape + 1), desc="爬取进度")for page_num in pbar: pbar.set_description(f"正在爬取第 {page_num} 页") html = scrape_page(page_num)if html: page_data = parse_html(html) all_data.extend(page_data) pbar.set_postfix({"单页行数": len(page_data), "累计总数": len(all_data)})else: print(f"\n[警告] 第 {page_num} 页获取失败,已跳过")ifnot all_data: print("[错误] 未能成功爬取到任何数据!")return print(f"\n[3/3] 数据爬取完成,共获取到 {len(all_data)} 条记录,正在保存为 CSV...") df = pd.DataFrame(all_data) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"同花顺个股资金流入_{timestamp}.csv"try: df.to_csv(csv_filename, index=False, encoding='utf-8-sig') print(f"CSV 文件保存成功!文件名: {csv_filename}") print(f"文件保存路径: {os.path.abspath(csv_filename)}")except Exception as e: print(f"[错误] 保存 CSV 失败: {str(e)}") print("\n程序运行完毕!")if __name__ == "__main__": main()