背景
使用AkShare拉取A股行情数据时,底层是对东方财富、新浪、腾讯等站点的HTTP请求。高频采集极易触发目标站点的IP限速或封禁,表现为连接超时、返回空数据、或者直接断开连接。
本文记录如何将快代理(KuaiDaiLi)的短期代理IP接入数据采集服务,实现自动轮换、TTL管理与失败剔除。
整体架构
┌─────────────────┐ │ AkShare 调用 │ └────────┬────────┘ │ ┌────────▼────────┐ │ try_with_proxy │ ← 统一入口 └────────┬────────┘ │ ┌────────▼────────┐ │ KdlProxyPool │ │ ┌─────────────┐ │ │ │ IP₁ IP₂ IP₃ │ │ │ │ IP₄ IP₅ ... │ │ │ └─────────────┘ │ │ TTL=120s 自动刷新 │ │ Round-Robin 轮转 │ └─────────────────┘
一、快代理 API 集成
1.1 凭证与配置
快代理提供 REST API 获取短期代理 IP。所需凭证通过环境变量注入:
KDL_SECRET_ID=<your_secret_id> # API 认证 IDKDL_SECRET_KEY=<your_secret_key> # API 签名密钥KDL_ORDER_ID=<your_order_id> # 代理订单号KDL_USER=<your_username> # 代理用户名(用于 HTTP 认证)KDL_PASS=<your_password> # 代理密码
凭证加载逻辑(config.py):
@dataclassclassKdlConfig: secret_id: str secret_key: str order_id: str username: str password: strdefload_kdl_config() -> KdlConfig | None: vals = {"secret_id": os.environ.get("KDL_SECRET_ID", "").strip(),"secret_key": os.environ.get("KDL_SECRET_KEY", "").strip(),"order_id": os.environ.get("KDL_ORDER_ID", "").strip(),"username": os.environ.get("KDL_USER", "").strip(),"password": os.environ.get("KDL_PASS", "").strip(), }if all(vals.values()):return KdlConfig(**vals)return None# 凭证不完整则禁用代理池
1.2 代理 IP 获取
调用快代理的getdps接口获取代理IP列表:
def_fetch_proxies(self, num: int = 5) -> list[dict]: params = {"secret_id": self._secret_id,"signature": self._secret_key,"orderid": self._order_id,"num": num, # 一次获取的 IP 数量"format": "json", # 响应格式"sep": "1", # 分隔模式"pt": "1", # 协议类型 } resp = requests.get("https://dps.kdlapi.com/api/getdps/", params=params, timeout=10, proxies={"http": None, "https": None}, # 获取代理时不走代理 )
返回的原始IP格式为ip:port,拼接认证信息后变为:
http://<username>:<password>@<ip>:<port>
这个URL可直接作为requests的proxies参数使用。
二、代理池实现(KdlProxyPool)
2.1 核心数据结构
classKdlProxyPool:def__init__(self, ..., ttl: int = 120): self._lock = threading.Lock() # 线程安全锁 self._proxies: list[dict] = [] # 当前可用的代理列表 self._index: int = 0# Round-Robin 轮转索引 self._fetched_at: float = 0.0# 上次获取时间戳 self._ttl = ttl # 代理有效期(秒)
2.2 自动刷新机制
快代理的短期IP有TTL(默认120秒),过期后不可用。_ensure_proxies在每次取IP时检查是否需要刷新:
def_ensure_proxies(self, num: int = 5) -> None:"""Must hold self._lock."""# 仍有未过期的代理则跳过if self._proxies and time.time() - self._fetched_at < self._ttl - 10:return# TTL 即将到期,获取新的一批 fresh = self._fetch_proxies(num)if fresh: self._proxies = fresh self._index = 0 self._fetched_at = time.time()
提前10秒刷新(self._ttl - 10),避免在请求过程中代理刚好过期。
2.3 Round-Robin轮转
defget_proxy(self) -> str | None:with self._lock: self._ensure_proxies(num=1)ifnot self._proxies:returnNone proxy = self._proxies[self._index % len(self._proxies)] self._index += 1return proxy["url"]
每次调用get_proxy() 返回下一个代理IP,实现均匀轮转,避免单个IP被过度使用。
2.4 失败代理剔除
defmark_bad(self, proxy_url: str) -> None:with self._lock: self._proxies = [p for p in self._proxies if p["url"] != proxy_url]ifnot self._proxies: self._fetched_at = 0.0# 池为空,强制下次get_proxy时刷新
请求失败的代理被立即移除。当池被耗尽时,设置_fetched_at = 0.0触发强制刷新,从快代理API获取全新的一批IP。
三、统一请求入口(try_with_proxy)
这是所有AkShare调用的统一封装,每次请求都从代理池中取一个IP,失败则换下一个,直到成功或超出重试次数。
3.1 代理轮换逻辑
deftry_with_proxy(fetch_fn, allow_empty=True, max_kdl_retries=50): pool = get_proxy_pool()if pool is None:return Nonefor attempt in range(max_kdl_retries): proxy_url = pool.get_proxy()ifnot proxy_url: time.sleep(1)continuetry: set_thread_proxy(proxy_url) result = fetch_fn() set_thread_proxy(None)# allow_empty=False时,空DataFrame视为失败,换下一个代理ifnot allow_empty and (result is None or result.empty): pool.mark_bad(proxy_url)continuereturn resultexcept Exception: set_thread_proxy(None) pool.mark_bad(proxy_url)continuereturn None# 超出重试次数
关键设计点:
| |
|---|
allow_empty=False | 将返回空 DataFrame 也视为失败,触发换代理重试 |
| |
mark_bad | |
| get_proxy() |
3.2 调用示例
# 拉取板块列表 — 不允许空结果df = proxy.try_with_proxy(lambda: ak.stock_board_industry_name_em(), allow_empty=False,)# 拉取K线数据 — 不允许空结果df = proxy.try_with_proxy(lambda: ak.stock_zh_a_hist( symbol="000001", period="daily", start_date="20250101", end_date="20250501", adjust="qfq", ), allow_empty=False,)
四、代理注入:Monkey-Patch requests
快代理是HTTP代理,需要注入到requests.Session 中。但AkShare内部自行创建requests.Session,无法从外部直接传入proxies参数。
解决方案是monkey-patch requests.Session的两个关键方法,使set_thread_proxy设置的代理能自动注入到AkShare发出的每一个请求中,无需改动任何 AkShare 调用代码。
4.1 实现
_thread_local = threading.local()# 保存原始方法_original_merge = requests.Session.merge_environment_settings_original_send = requests.Session.senddef_patched_merge(self, url, proxies, stream, verify, cert): settings = _original_merge(self, url, proxies, stream, verify, cert) thread_proxy = _get_thread_proxy()if thread_proxy: settings["proxies"] = {"http": thread_proxy, "https": thread_proxy} settings["verify"] = False# HTTP 代理可能 MITM HTTPS 流量return settingsdef_patched_send(self, request, **kwargs): timeout = kwargs.get("timeout") or30if _get_thread_proxy(): timeout = max(timeout, 30) # 代理请求最低超时 30 秒 kwargs["timeout"] = timeout request.headers["Connection"] = "close"# 避免连接复用残留上一次代理状态return _original_send(self, request, **kwargs)# 全局替换requests.Session.merge_environment_settings = _patched_mergerequests.Session.send = _patched_send
4.2 工作原理
try_with_proxy │ ├── set_thread_proxy(proxy_url) # 绑定代理到当前线程 │ ├── fetch_fn() # AkShare 内部调用 │ └── requests.Session.send() │ └── _patched_merge() │ └── 读取 thread_local.proxy,注入到 settings["proxies"] │ └── set_thread_proxy(None) # 清除绑定
thread_local.proxy存储于线程本地,互不干扰。
五、请求全生命周期
以拉取 K 线数据为例,一次请求的完整链路:
try_with_proxy(ak.stock_zh_a_hist) │ ├── pool.get_proxy() 从代理池取下一个 IP(Round-Robin) ├── set_thread_proxy(url) 绑定到当前线程 ├── fetch_fn() AkShare发起请求 │ └── _patched_merge() 自动注入代理到 requests │ └── HTTP via 快代理 IP → 东方财富服务器 ├── set_thread_proxy(None) 清除绑定 │ ├── 成功 → return result └── 失败 → pool.mark_bad() 剔除坏代理,取下一个继续重试
六、实际效果与结论
以上方案在实验环境中使用了快代理的免费套餐进行测试。从实际运行日志来看,失败率相当高——连续多个代理IP均以ProxyError或IncompleteRead告终,单次任务需要经历较多次方能成功:
09:30:08 快代理 58.19.54.**:13964 失败 ProxyError09:30:35 快代理 58.52.171.**:22998 失败 ProxyError09:30:47 快代理 113.2.155.**:17387 失败 ProxyError09:30:54 快代理 58.19.55.**:16426 失败 ProxyError09:31:00 快代理 111.170.157.**:19112 失败 ProxyError09:31:12 快代理 106.119.250.**:17326 失败 ProxyError09:31:20 快代理 61.184.8.**:15503 失败 ProxyError09:31:27 快代理 218.95.37.**:16475 失败 IncompleteRead09:31:46 快代理 183.128.197.**:20547 失败 ProxyError
失败原因主要有两类:
| |
|---|
ProxyError: RemoteDisconnected | |
IncompleteRead | |
可能是因为免费套餐的IP质量与付费套餐存在较大差异:IP存活率低、延迟高、被目标站点封禁的概率也更大。不过即使付费套餐相对于好些,IP代理池的价格仍然有些高,因此最终放弃了这个方案。