🕐 预计用时:3-4 小时 | 🎯 目标:用 requests + BeautifulSoup + CSV 完成完整爬虫项目
目标:爬取豆瓣电影 Top250 的所有电影信息,保存到 CSV 文件中。
# 豆瓣 Top250 URL 规律
# https://movie.douban.com/top250?start=0&filter= 第1页 (1-25)
# https://movie.douban.com/top250?start=25&filter= 第2页 (26-50)
# https://movie.douban.com/top250?start=50&filter= 第3页 (51-75)
# ...
# https://movie.douban.com/top250?start=225&filter= 第10页 (226-250)# 用浏览器开发者工具(F12)分析页面结构
# 每部电影的 HTML 结构大致如下:
<div class="item">
<div class="pic">
<em>1</em> <!-- 排名 -->
<a href="https://movie.douban.com/...">
<img src="poster.jpg" alt="肖申克的救赎">
</a>
</div>
<div class="info">
<div class="hd">
<a href="...">
<span class="title">肖申克的救赎</span> <!-- 电影名 -->
<span class="title"> / The Shawshank Redemption</span>
</a>
</div>
<div class="bd">
<p>导演: 弗兰克·德拉邦特 主演: 蒂姆·罗宾斯...<br>
1994 / 美国 / 犯罪 剧情</p>
<div class="star">
<span class="rating5-t"></span> <!-- 评分星级 -->
<span class="rating_num">9.7</span> <!-- 评分 -->
<span>1234567人评价</span> <!-- 评价人数 -->
</div>
<p class="quote">
<span class="inq">希望让人自由</span> <!-- 一句话评价 -->
</p>
</div>
</div>
</div>import requests
from bs4 import BeautifulSoup
def fetch_page(url):
"""获取单页 HTML"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.text
def parse_page(html):
"""解析单页电影数据"""
soup = BeautifulSoup(html, "html.parser")
movies = []
for item in soup.find_all("div", class_="item"):
# 排名
rank = item.find("em").text
# 电影名
title = item.find("span", class_="title").text
# 评分
rating = item.find("span", class_="rating_num").text
# 评价人数
star_div = item.find("div", class_="star")
people_text = star_div.find_all("span")[-1].text
people = people_text.replace("人评价", "").strip()
# 导演和信息
info_p = item.find("div", class_="bd").find("p")
info_text = info_p.get_text(strip=True)
# 一句话评价
quote_span = item.find("span", class_="inq")
quote = quote_span.text if quote_span else ""
movies.append({
"排名": rank,
"电影名": title,
"评分": rating,
"评价人数": people,
"一句话评价": quote,
"原始信息": info_text
})
return movies
# 测试单页
url = "https://movie.douban.com/top250?start=0&filter="
html = fetch_page(url)
movies = parse_page(html)
for m in movies[:3]:
print(f"#{m['排名']} {m['电影名']} | ⭐{m['评分']} | {m['评价人数']}人 | {m['一句话评价']}")
# #1 肖申克的救赎 | ⭐9.7 | 1234567人 | 希望让人自由
# #2 霸王别姬 | ⭐9.6 | 876543人 | 风华绝代
# #3 阿甘正传 | ⭐9.5 | 765432人 | 一部美国近现代史import re
def parse_movie_info(info_text):
"""从原始信息文本中提取导演、年份、国家、类型"""
result = {
"导演": "",
"主演": "",
"年份": "",
"国家": "",
"类型": ""
}
# 按行分割
lines = [line.strip() for line in info_text.split("\n") if line.strip()]
if len(lines) >= 1:
# 第一行:导演: xxx 主演: xxx
line1 = lines[0]
director_match = re.search(r"导演:\s*(.+?)(?:\s{2}|主演|$)", line1)
if director_match:
result["导演"] = director_match.group(1).strip()
actor_match = re.search(r"主演:\s*(.+?)(?:\.\.\.|\s{2}|$)", line1)
if actor_match:
result["主演"] = actor_match.group(1).strip()
if len(lines) >= 2:
# 第二行:年份 / 国家 / 类型
line2 = lines[1]
parts = [p.strip() for p in line2.split("/")]
if len(parts) >= 1:
year_match = re.search(r"(\d{4})", parts[0])
if year_match:
result["年份"] = year_match.group(1)
if len(parts) >= 2:
result["国家"] = parts[1].strip()
if len(parts) >= 3:
result["类型"] = parts[2].strip()
return result
# 测试
info = "导演: 弗兰克·德拉邦特 主演: 蒂姆·罗宾斯 / 摩根·弗里曼...\n1994 / 美国 / 犯罪 剧情"
result = parse_movie_info(info)
print(result)
# {'导演': '弗兰克·德拉邦特', '主演': '蒂姆·罗宾斯 / 摩根·弗里曼', '年份': '1994', '国家': '美国', '类型': '犯罪 剧情'}import time
import random
def crawl_top250():
"""爬取豆瓣 Top250 全部数据"""
base_url = "https://movie.douban.com/top250"
all_movies = []
for page in range(10): # 共 10 页
start = page * 25
url = f"{base_url}?start={start}&filter="
print(f"📄 正在爬取第 {page + 1}/10 页: {url}")
try:
html = fetch_page(url)
movies = parse_page(html)
# 解析详细信息
for movie in movies:
info = parse_movie_info(movie.pop("原始信息"))
movie.update(info)
all_movies.extend(movies)
print(f" ✅ 获取 {len(movies)} 部电影(累计 {len(all_movies)})")
except Exception as e:
print(f" ❌ 爬取失败: {e}")
# 礼貌延迟!
delay = random.uniform(1, 3)
print(f" ⏳ 等待 {delay:.1f}秒...")
time.sleep(delay)
return all_movies
movies = crawl_top250()
print(f"\n🎉 爬取完成: 共 {len(movies)} 部电影")import csv
def save_to_csv(movies, filename="douban_top250.csv"):
"""保存到 CSV 文件"""
fieldnames = ["排名", "电影名", "评分", "评价人数", "导演", "主演", "年份", "国家", "类型", "一句话评价"]
with open(filename, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(movies)
print(f"✅ 已保存到 {filename} ({len(movies)} 条记录)")
save_to_csv(movies)
# 读取验证
import csv
with open("douban_top250.csv", "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in list(reader)[:5]:
print(f"#{row['排名']} {row['电影名']} ⭐{row['评分']} | {row['导演']} | {row['年份']}")import csv
from collections import Counter
def analyze_data(filename="douban_top250.csv"):
"""分析爬取的数据"""
with open(filename, "r", encoding="utf-8-sig") as f:
movies = list(csv.DictReader(f))
print(f"📊 豆瓣 Top250 数据分析")
print("=" * 50)
# 1. 评分分布
print("\n⭐ 评分分布:")
rating_counter = Counter()
for m in movies:
rating = float(m["评分"])
bucket = round(rating, 0)
rating_counter[bucket] += 1
for rating in sorted(rating_counter.keys()):
count = rating_counter[rating]
bar = "█" * count
print(f" {rating:.0f}分: {count:3d} {bar}")
# 2. 国家分布 TOP 10
print("\n🌍 国家分布 TOP 10:")
country_counter = Counter()
for m in movies:
countries = [c.strip() for c in m["国家"].split(" ")]
for c in countries:
if c:
country_counter[c] += 1
for country, count in country_counter.most_common(10):
print(f" {country:10s} {count:3d} 部")
# 3. 年代分布
print("\n📅 年代分布:")
decade_counter = Counter()
for m in movies:
year = m.get("年份", "")
if year.isdigit():
decade = int(year) // 10 * 10
decade_counter[decade] += 1
for decade in sorted(decade_counter.keys()):
count = decade_counter[decade]
bar = "█" * (count // 2)
print(f" {decade}s: {count:3d} {bar}")
# 4. 高频导演
print("\n🎬 出现最多的导演 TOP 10:")
director_counter = Counter()
for m in movies:
director = m.get("导演", "").split(" ")[0] # 取第一个导演
if director:
director_counter[director] += 1
for director, count in director_counter.most_common(10):
print(f" {director:10s} {count} 部")
# 5. 类型分布
print("\n🎭 类型分布 TOP 10:")
genre_counter = Counter()
for m in movies:
genres = [g.strip() for g in m.get("类型", "").split(" ")]
for g in genres:
if g:
genre_counter[g] += 1
for genre, count in genre_counter.most_common(10):
print(f" {genre:10s} {count} 次")
analyze_data()"""
豆瓣电影 Top250 爬虫
用 requests + BeautifulSoup + CSV 完成
"""
import requests
from bs4 import BeautifulSoup
import csv
import re
import time
import random
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
def fetch_page(url):
r = requests.get(url, headers=HEADERS, timeout=10)
r.raise_for_status()
return r.text
def parse_info(text):
result = {"导演": "", "主演": "", "年份": "", "国家": "", "类型": ""}
lines = [l.strip() for l in text.split("\n") if l.strip()]
if lines:
dm = re.search(r"导演:\s*(.+?)(?:\s{2}|主演|$)", lines[0])
if dm: result["导演"] = dm.group(1).strip()
am = re.search(r"主演:\s*(.+?)(?:\.\.\.|$)", lines[0])
if am: result["主演"] = am.group(1).strip()
if len(lines) >= 2:
parts = [p.strip() for p in lines[1].split("/")]
ym = re.search(r"(\d{4})", parts[0]) if parts else None
if ym: result["年份"] = ym.group(1)
if len(parts) >= 2: result["国家"] = parts[1].strip()
if len(parts) >= 3: result["类型"] = parts[2].strip()
return result
def parse_page(html):
soup = BeautifulSoup(html, "html.parser")
movies = []
for item in soup.find_all("div", class_="item"):
info_div = item.find("div", class_="bd")
info = parse_info(info_div.find("p").get_text())
quote = item.find("span", class_="inq")
movies.append({
"排名": item.find("em").text,
"电影名": item.find("span", class_="title").text,
"评分": item.find("span", class_="rating_num").text,
"评价人数": item.find("div", class_="star").find_all("span")[-1].text.replace("人评价", ""),
"导演": info["导演"], "主演": info["主演"],
"年份": info["年份"], "国家": info["国家"], "类型": info["类型"],
"一句话评价": quote.text if quote else ""
})
return movies
def crawl():
all_movies = []
for page in range(10):
url = f"https://movie.douban.com/top250?start={page*25}&filter="
print(f"📄 第 {page+1}/10 页...")
try:
movies = parse_page(fetch_page(url))
all_movies.extend(movies)
print(f" ✅ {len(movies)} 部")
except Exception as e:
print(f" ❌ {e}")
time.sleep(random.uniform(1, 3))
return all_movies
def save(movies, filename="douban_top250.csv"):
fields = ["排名","电影名","评分","评价人数","导演","主演","年份","国家","类型","一句话评价"]
with open(filename, "w", encoding="utf-8-sig", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
w.writerows(movies)
print(f"✅ 保存到 {filename}")
if __name__ == "__main__":
movies = crawl()
save(movies)
print(f"🎉 完成: {len(movies)} 部电影")⚠️ 爬虫法律与道德:
https://movie.douban.com/robots.txt)# 爬虫最佳实践
import time
import random
# 1. 设置延迟
time.sleep(random.uniform(1, 3)) # 随机 1-3 秒
# 2. 设置 User-Agent
headers = {"User-Agent": "MyBot/1.0 (学习用途)"}
# 3. 设置超时
requests.get(url, timeout=10)
# 4. 异常处理
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
# 5. 限速
from functools import wraps
def rate_limit(func):
last_call = [0]
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < 1:
time.sleep(1 - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper🎯 扩展练习:
1. 添加"海报下载"功能——爬取每部电影的海报图片
2. 用 lxml + XPath 重写解析部分,对比代码量
3. 爬取 Top250 的详细页面(导演、演员、剧情简介)
4. 把分析结果用 HTML 格式输出(参考 Day32 的报告生成)
📚 Day44 完成!明天进入数据库基础 — SQL 入门
请在微信客户端打开