★请求头、代理池、Playwright 反检测,逐层拆解
背景:为什么你的爬虫总是被封
很多爬虫不是代码写错了,而是根本没过反爬这一关。你本地跑得好好的,一上服务器就 403、429、验证码、重定向、空白页,原因通常不是一个点,而是一整套风控机制在一起工作。
更麻烦的是,不同网站的反爬方式差异很大。有的只看 UA,有的会上 JS 指纹,有的会看你整个会话的行为轨迹。如果你不知道自己被卡在哪一层,改来改去也是瞎猜。
我把自己踩过的坑整理成这篇手册,按从低成本到高成本的顺序来讲:先解决最基础的请求层问题,再处理频率和 IP,最后是 JS 指纹和行为检测。你可以把它当成一份排障清单,哪里被拦,就往下翻对应章节。
先搞清楚层级
第一层:请求头
先检查自己的请求头长什么样
import requestsr = requests.get('https://example.com/product/123')print(dict(r.request.headers))
如果你看到这样的输出:
{"User-Agent": "python-requests/2.28.1","Accept-Encoding": "gzip, deflate","Accept": "*/*","Connection": "keep-alive"}
那这个请求大概率会被直接拦截。python-requests、curl、Go-http-client、Scrapy等字符串全都在黑名单里,匹配到就直接 403,不需要做任何频率分析。
字段问题对照表
| | |
|---|
User-Agent | | python-requests/x.x.x |
Accept | | */* |
Accept-Language | zh-CN,zh;q=0.9,en;q=0.8 | |
Referer | | |
Sec-Fetch-Dest | document | |
Sec-Fetch-Mode | navigate | |
Sec-Fetch-Site | same-origin | |
完整请求头构建函数
import randomUA_LIST = ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.6045.160 Safari/537.36","Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36","Mozilla/5.0 (Macintosh; Intel Mac OS X 13_6_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36","Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0","Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15","Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",]defbuild_headers(referer=None, is_ajax=False): ua = random.choice(UA_LIST)if is_ajax:return {"User-Agent": ua,"Accept": "application/json, text/plain, */*","Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8","Accept-Encoding": "gzip, deflate, br","Content-Type": "application/json;charset=UTF-8","Connection": "keep-alive","Referer": referer or"https://example.com/","X-Requested-With": "XMLHttpRequest","Sec-Fetch-Dest": "empty","Sec-Fetch-Mode": "cors","Sec-Fetch-Site": "same-origin", }return {"User-Agent": ua,"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7","Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8","Accept-Encoding": "gzip, deflate, br","Connection": "keep-alive","Upgrade-Insecure-Requests": "1","Referer": referer or"https://example.com/","Sec-Fetch-Dest": "document","Sec-Fetch-Mode": "navigate","Sec-Fetch-Site": "same-origin","Sec-Fetch-User": "?1","Cache-Control": "max-age=0", }
★Sec-Fetch-Site的值要和 Referer保持一致。从站内跳转填 same-origin,从搜索引擎进来填 cross-site,直接访问填 none。填错了反而是破绽。
第二层:频率控制
随机延迟
import timeimport randomdefhuman_delay(min_s=1.5, max_s=4.0): delay = random.uniform(min_s, max_s)if random.random() < 0.05: delay += random.uniform(15, 45) time.sleep(delay)defbatch_delay(): time.sleep(random.uniform(30, 90))
使用方式:
for i, target_id in enumerate(targets): r = session.get(f"https://example.com/product/{target_id}", headers=build_headers() ) parse_and_save(r.text) human_delay()if (i + 1) % 50 == 0: print(f"[批次停顿] 已处理 {i+1} 个") batch_delay()
打乱请求顺序
targets_shuffled = targets.copy()random.shuffle(targets_shuffled)
第三层:IP 策略
代理类型选择
代理管理类
from dataclasses import dataclassimport requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retry@dataclassclassProxyStats: url: str success: int = 0 fail: int = 0 banned: bool = False @propertydeffail_rate(self): total = self.success + self.failreturn self.fail / total if total > 0else0.0classProxyPool:def__init__(self, proxy_urls: list, rotate_every: tuple = (15, 25)): self.proxies = [ProxyStats(url=url) for url in proxy_urls] self.rotate_every = rotate_every self._index = 0 self._request_count = 0 self._next_rotate = random.randint(*rotate_every)def_get_available(self): available = [p for p in self.proxies ifnot p.banned and p.fail_rate < 0.5]ifnot available:for p in self.proxies: p.banned = False available = self.proxiesreturn availabledefcurrent(self): available = self._get_available()if self._index >= len(available): self._index = 0return available[self._index]defrotate(self): available = self._get_available() self._index = (self._index + 1) % max(len(available), 1) self._next_rotate = self._request_count + random.randint(*self.rotate_every)return self.current()defmark_success(self): self.current().success += 1 self._request_count += 1if self._request_count >= self._next_rotate: self.rotate()defmark_fail(self, ban=False): proxy = self.current() proxy.fail += 1if ban: proxy.banned = True self.rotate()defmake_session(self): proxy = self.current() s = requests.Session() retry = Retry(total=3, backoff_factor=2, status_forcelist=) adapter = HTTPAdapter(max_retries=retry) s.mount("http://", adapter) s.mount("https://", adapter) s.proxies = {"http": proxy.url, "https": proxy.url}return s
完整请求循环
pool = ProxyPool(["http://user:pass@residential1.example.com:8080","http://user:pass@residential2.example.com:8080",])failed_ids = []success_count = 0for target_id in targets_shuffled: session = pool.make_session()try: r = session.get(f"https://example.com/product/{target_id}", headers=build_headers(), timeout=15, )if r.status_code == 200:if is_captcha_page(r.text, r.url): pool.mark_fail(ban=True) failed_ids.append(target_id) time.sleep(random.uniform(15, 30))continue parse_and_save(r.text) pool.mark_success() success_count += 1elif r.status_code == 403: pool.mark_fail(ban=True) failed_ids.append(target_id) time.sleep(random.uniform(8, 20))continueelif r.status_code == 429: time.sleep(random.uniform(45, 90))continueelse: failed_ids.append(target_id)except requests.exceptions.ProxyError: pool.mark_fail(ban=True) failed_ids.append(target_id)except requests.exceptions.Timeout: failed_ids.append(target_id)except Exception: failed_ids.append(target_id) human_delay()
第四层:验证码检测
CAPTCHA_KEYWORDS = ["captcha", "验证码", "滑块", "slider", "human verification","security check", "bot detection", "turnstile", "recaptcha", "geetest"]defis_captcha_page(html: str, url: str = "") -> bool: html_lower = html.lower()if any(k in url.lower() for k in ["captcha", "verify", "challenge"]):returnTrueif any(k in html_lower for k in CAPTCHA_KEYWORDS):returnTrueif len(html.strip()) < 200and"<html"in html_lower:returnTruereturnFalse
第五层:JS 指纹(Playwright)
Canvas 指纹
CANVAS_PATCH = r"""(() => { const origToDataURL = HTMLCanvasElement.prototype.toDataURL; const origToBlob = HTMLCanvasElement.prototype.toBlob; function perturb(canvas) { const ctx = canvas.getContext('2d'); if (!ctx) return; try { const w = Math.min(canvas.width, 16); const h = Math.min(canvas.height, 16); if (w === 0 || h === 0) return; const img = ctx.getImageData(0, 0, w, h); for (let i = 0; i < img.data.length; i += 4) { img.data[i] ^= 1; } ctx.putImageData(img, 0, 0); } catch(e) {} } HTMLCanvasElement.prototype.toDataURL = function(...args) { perturb(this); return origToDataURL.apply(this, args); }; HTMLCanvasElement.prototype.toBlob = function(callback, ...args) { perturb(this); return origToBlob.call(this, callback, ...args); };})();"""
WebGL 指纹
WEBGL_PATCH = r"""(() => { const getParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) { const ext = this.getExtension('WEBGL_debug_renderer_info'); if (ext) { if (parameter === ext.UNMASKED_VENDOR_WEBGL) return 'Intel Inc.'; if (parameter === ext.UNMASKED_RENDERER_WEBGL) return 'Intel Iris OpenGL Engine'; } return getParameter.call(this, parameter); };})();"""
Navigator 伪装
NAVIGATOR_PATCH = r"""Object.defineProperty(navigator, 'webdriver', { get: () => undefined });Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en-US'] });Object.defineProperty(navigator, 'platform', { get: () => 'Win32' });Object.defineProperty(navigator, 'hardwareConcurrency', { get: () => 8 });Object.defineProperty(navigator, 'deviceMemory', { get: () => 8 });"""
组装完整 Playwright
import asyncioimport randomfrom playwright.async_api import async_playwrightasyncdefcreate_stealth_context(p): browser = await p.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled", "--lang=zh-CN"] ) context = await browser.new_context( viewport={"width": 1366, "height": 768}, locale="zh-CN", timezone_id="Asia/Shanghai", )await context.add_init_script(CANVAS_PATCH)await context.add_init_script(WEBGL_PATCH)await context.add_init_script(NAVIGATOR_PATCH)return browser, contextasyncdefmain():asyncwith async_playwright() as p: browser, context = await create_stealth_context(p) page = await context.new_page()await page.goto("https://example.com/", wait_until="domcontentloaded")await page.wait_for_timeout(random.randint(2000, 4000))for target_id in targets_shuffled:await page.goto(f"https://example.com/product/{target_id}", wait_until="domcontentloaded" )await page.wait_for_timeout(random.randint(1500, 3500)) html = await page.content()ifnot is_captcha_page(html, page.url): parse_and_save(html)else: failed_ids.append(target_id) human_delay()await browser.close()asyncio.run(main())
第六层:行为轨迹
贝塞尔曲线鼠标
import mathasyncdefhuman_move(page, start: tuple, end: tuple): x1, y1 = start x2, y2 = end dist = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) steps = max(int(dist / 8), 10) cx = (x1 + x2) / 2 + random.randint(-100, 100) cy = (y1 + y2) / 2 + random.randint(-100, 100)for i in range(steps + 1): t = i / steps x = (1 - t) ** 2 * x1 + 2 * (1 - t) * t * cx + t ** 2 * x2 y = (1 - t) ** 2 * y1 + 2 * (1 - t) * t * cy + t ** 2 * y2 x += random.uniform(-1.5, 1.5) y += random.uniform(-1.5, 1.5)await page.mouse.move(x, y)if t < 0.2or t > 0.8:await asyncio.sleep(random.uniform(0.012, 0.025))else:await asyncio.sleep(random.uniform(0.005, 0.015))await asyncio.sleep(random.uniform(0.1, 0.5))
模拟输入
asyncdefhuman_type(page, selector: str, text: str):await page.click(selector)await asyncio.sleep(random.uniform(0.3, 0.8))for ch in text:await page.keyboard.type(ch)await asyncio.sleep(random.uniform(0.05, 0.2))await asyncio.sleep(random.uniform(0.3, 0.8))
自然滚动
asyncdefhuman_scroll(page):for _ in range(random.randint(2, 5)):await page.mouse.wheel(0, random.randint(200, 900))await asyncio.sleep(random.uniform(0.5, 2.0))if random.random() < 0.15:await asyncio.sleep(random.uniform(1.5, 4.0))
第七层:Session 与 Cookie
import picklefrom pathlib import PathCOOKIE_FILE = Path("cookies.pkl")defsave_cookies(session):with open(COOKIE_FILE, "wb") as f: pickle.dump(session.cookies, f)defload_cookies(session):if COOKIE_FILE.exists():with open(COOKIE_FILE, "rb") as f: session.cookies.update(pickle.load(f))returnTruereturnFalsedefbuild_session(base_url: str): session = requests.Session()ifnot load_cookies(session): session.get(base_url, headers=build_headers()) time.sleep(random.uniform(2, 4)) save_cookies(session)return session
第八层:重试与退避
import functoolsdefretry_with_backoff(max_retries=3, base_delay=2.0):defdecorator(func): @functools.wraps(func)defwrapper(*args, **kwargs):for attempt in range(max_retries + 1):try:return func(*args, **kwargs)except Exception as e:if attempt == max_retries:raise sleep_time = base_delay * (2 ** attempt) + random.uniform(0, 1) print(f"[重试 {attempt + 1}/{max_retries}] 等待 {sleep_time:.1f}s") time.sleep(sleep_time)return wrapperreturn decorator@retry_with_backoff(max_retries=3, base_delay=2)deffetch_product(session, product_id): r = session.get(f"https://example.com/product/{product_id}", headers=build_headers(), timeout=15 ) r.raise_for_status()return r
第九层:断点续跑
import jsonimport timefrom pathlib import PathclassCrawlState:def__init__(self, task_name: str): self.state_dir = Path(f".crawl_state/{task_name}") self.state_dir.mkdir(parents=True, exist_ok=True) self.success_file = self.state_dir / "success.json" self.failed_file = self.state_dir / "failed.json" self._success = set(self._load(self.success_file, [])) self._failed = set(self._load(self.failed_file, [])) self._start_time = time.time()def_load(self, path, default):return json.loads(path.read_text()) if path.exists() else defaultdefis_done(self, item_id) -> bool:return str(item_id) in self._successdefmark_success(self, item_id): self._success.add(str(item_id)) self._failed.discard(str(item_id))if len(self._success) % 100 == 0: self._flush()defmark_failed(self, item_id):if str(item_id) notin self._success: self._failed.add(str(item_id))def_flush(self): self.success_file.write_text( json.dumps(list(self._success), ensure_ascii=False) ) self.failed_file.write_text( json.dumps(list(self._failed), ensure_ascii=False) )defsave(self): self._flush()defsummary(self): total = len(self._success) + len(self._failed) rate = len(self._success) / total * 100if total > 0else0 elapsed = time.time() - self._start_time print(f"成功: {len(self._success)} | 失败: {len(self._failed)} | "f"成功率: {rate:.1f}% | 耗时: {elapsed / 60:.1f} min")if self._failed: print(f"⚠️ {len(self._failed)} 个失败,运行补跑脚本重试")
使用方式:
state = CrawlState("product_task_20260115")for target_id in targets_shuffled:if state.is_done(target_id):continuetry: r = fetch_product(session, target_id) parse_and_save(r.text) state.mark_success(target_id)except Exception: state.mark_failed(target_id) human_delay()state.save()state.summary()
快速排查清单
- 请求头是不是暴露了
python-requests? - Playwright 有没有处理 Canvas/WebGL?
navigator.webdriver是不是还暴露着?
按现象对应处理
最后
反爬不是一个技巧问题,而是一个分层问题。先判断自己卡在第几层,再针对那一层处理。
核心原则只有一条:让爬虫的行为特征尽量往真实用户靠拢。
- 普通采集先用
requests,别一上来就上浏览器。 - 遇到动态页、强 JS、强验证,再考虑 Playwright。