🐍 CSV 处理 — 表格数据的读写利器
🕐 预计用时:2-3 小时 | 🎯 目标:掌握 csv 模块读写、DictReader/Writer、数据清洗与实战
📖 今日目录
1. CSV 是什么?为什么还在用?
CSV(Comma-Separated Values)是最简单的表格数据格式——用逗号分隔每列,每行一条记录。Excel 能打开,Python 能读写,任何语言都支持。
# 一个典型的 CSV 文件(students.csv)姓名,年龄,城市,成绩张三,20,北京,95李四,22,上海,88王五,21,广州,76赵六,23,深圳,92
💡 CSV 的优势:1. 人类可读——用记事本就能打开2. 体积小——没有 XML/JSON 的冗余标签3. 兼容性强——Excel、Google Sheets、数据库都能导入4. 适合大数据量——比 JSON 轻量得多
CSV 的"潜规则"
2. csv.reader() — 读取 CSV
csv.reader() 把每一行解析成一个列表,是最基本的读取方式。
import csv# 先创建一个测试 CSV 文件with open("students.csv", "w", encoding="utf-8", newline="") as f: f.write("姓名,年龄,城市,成绩\n") f.write("张三,20,北京,95\n") f.write("李四,22,上海,88\n") f.write("王五,21,广州,76\n") f.write("赵六,23,深圳,92\n")# 读取 CSVwith open("students.csv", "r", encoding="utf-8") as f: reader = csv.reader(f) for row in reader: print(row)# 输出:# ['姓名', '年龄', '城市', '成绩'] ← 第一行是表头# ['张三', '20', '北京', '95']# ['李四', '22', '上海', '88']# ['王五', '21', '广州', '76']# ['赵六', '23', '深圳', '92']
⚠️ 注意:所有值都是字符串!年龄 '20' 是 str 不是 int,需要手动转换。
跳过表头 + 转换类型
import csvwith open("students.csv", "r", encoding="utf-8") as f: reader = csv.reader(f) header = next(reader) # 跳过表头,返回第一行 print(f"列名: {header}") # 列名: ['姓名', '年龄', '城市', '成绩'] for row in reader: name = row[0] age = int(row[1]) # 字符串 → 整数 city = row[2] score = float(row[3]) # 字符串 → 浮点数 print(f"{name} ({city}): {age}岁, {score}分")# 输出:# 张三 (北京): 20岁, 95.0分# 李四 (上海): 22岁, 88.0分# 王五 (广州): 21岁, 76.0分# 赵六 (深圳): 23岁, 92.0分
一次性读成列表
import csvwith open("students.csv", "r", encoding="utf-8") as f: reader = csv.reader(f) all_rows = list(reader) # 一次性读取所有行print(f"共 {len(all_rows)} 行(含表头)")print(f"第一行: {all_rows[0]}")print(f"最后一行: {all_rows[-1]}")
3. csv.writer() — 写入 CSV
csv.writer() 把数据写入 CSV 文件,自动处理逗号、换行等细节。
import csv# 写入 CSVwith open("output.csv", "w", encoding="utf-8", newline="") as f: writer = csv.writer(f) writer.writerow(["姓名", "年龄", "城市"]) # 写一行 writer.writerow(["张三", 20, "北京"]) writer.writerow(["李四", 22, "上海"]) writer.writerows([ # 一次写多行 ["王五", 21, "广州"], ["赵六", 23, "深圳"] ])print("写入完成!")# output.csv 内容:# 姓名,年龄,城市# 张三,20,北京# 李四,22,上海# 王五,21,广州# 赵六,23,深圳
⚠️ 必加 newline=""!在 Windows 上,不加这个参数会导致每行之间多一个空行。这是 Python csv 模块的官方建议。
自定义分隔符
import csv# 用制表符分隔(TSV 格式)with open("output.tsv", "w", encoding="utf-8", newline="") as f: writer = csv.writer(f, delimiter="\t") writer.writerow(["姓名", "年龄", "城市"]) writer.writerow(["张三", 20, "北京"])# 用分号分隔with open("output_semi.csv", "w", encoding="utf-8", newline="") as f: writer = csv.writer(f, delimiter=";") writer.writerow(["姓名", "年龄", "城市"]) writer.writerow(["张三", 20, "北京"])
4. csv.DictReader() — 字典式读取
DictReader 把每行变成字典,用列名访问——比用索引号更直观。
import csvwith open("students.csv", "r", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: print(row)# 输出(每行是 OrderedDict):# {'姓名': '张三', '年龄': '20', '城市': '北京', '成绩': '95'}# {'姓名': '李四', '年龄': '22', '城市': '上海', '成绩': '88'}# {'姓名': '王五', '年龄': '21', '城市': '广州', '成绩': '76'}# {'姓名': '赵六', '年龄': '23', '城市': '深圳', '成绩': '92'}
import csv# 用列名访问,不用记索引号with open("students.csv", "r", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: name = row["姓名"] score = float(row["成绩"]) if score >= 90: grade = "A" elif score >= 80: grade = "B" else: grade = "C" print(f"{name}: {score}分 → {grade}")# 张三: 95.0分 → A# 李四: 88.0分 → B# 王五: 76.0分 → C# 赵六: 92.0分 → A
💡 DictReader 的好处:1. 不用记"第几列是什么"——用列名访问2. 调换列顺序不影响代码3. row["姓名"] 比 row[0] 更可读
5. csv.DictWriter() — 字典式写入
DictWriter 接收字典写入,需要先指定列名(fieldnames)。
import csv# 定义列名fieldnames = ["姓名", "年龄", "城市", "成绩"]with open("students_dict.csv", "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() # 写入表头 writer.writerow({"姓名": "张三", "年龄": 20, "城市": "北京", "成绩": 95}) writer.writerow({"姓名": "李四", "年龄": 22, "城市": "上海", "成绩": 88}) # 一次写多行 writer.writerows([ {"姓名": "王五", "年龄": 21, "城市": "广州", "成绩": 76}, {"姓名": "赵六", "年龄": 23, "城市": "深圳", "成绩": 92} ])print("写入完成!")
# 配合列表推导式,批量处理并写入import csv# 原始数据students = [ {"姓名": "张三", "年龄": 20, "城市": "北京", "成绩": 95}, {"姓名": "李四", "年龄": 22, "城市": "上海", "成绩": 88}, {"姓名": "王五", "年龄": 21, "城市": "广州", "成绩": 76}, {"姓名": "赵六", "年龄": 23, "城市": "深圳", "成绩": 92},]# 添加"等级"列for s in students: score = s["成绩"] if score >= 90: s["等级"] = "A" elif score >= 80: s["等级"] = "B" else: s["等级"] = "C"# 写入(多了"等级"列)fieldnames = ["姓名", "年龄", "城市", "成绩", "等级"]with open("graded.csv", "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(students)print("带等级的 CSV 已生成!")
6. 处理编码问题
CSV 的最大坑就是编码——尤其是中文 Windows 导出的 CSV 经常是 GBK 编码。
import csv# ❌ 读取 GBK 编码的文件会报错# with open("windows_export.csv", "r", encoding="utf-8") as f:# reader = csv.reader(f) # UnicodeDecodeError!# ✅ 正确方式:指定编码with open("windows_export.csv", "r", encoding="gbk") as f: reader = csv.reader(f) for row in reader: print(row)
自动检测编码
import csvdef detect_and_read(filepath): """尝试常见编码读取 CSV""" encodings = ["utf-8", "gbk", "gb2312", "utf-8-sig", "latin-1"] for enc in encodings: try: with open(filepath, "r", encoding=enc) as f: content = f.read() print(f"✅ 检测到编码: {enc}") with open(filepath, "r", encoding=enc) as f: return list(csv.reader(f)) except (UnicodeDecodeError, UnicodeError): continue raise ValueError(f"无法检测文件编码: {filepath}")# 使用# rows = detect_and_read("mystery.csv")
UTF-8 BOM 问题
import csv# Excel 导出的 CSV 可能带 BOM(字节顺序标记)# 读取时第一列名会变成 '\ufeff姓名'# ❌ 用 utf-8 读with open("excel_export.csv", "r", encoding="utf-8") as f: reader = csv.DictReader(f) row = next(reader) print(list(row.keys())) # ['\ufeff姓名', '年龄'] ← 有 BOM!# ✅ 用 utf-8-sig 读(自动去除 BOM)with open("excel_export.csv", "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) row = next(reader) print(list(row.keys())) # ['姓名', '年龄'] ← 正常了!
⚠️ 编码速查:• 普通 UTF-8 → encoding="utf-8"• Excel 导出 → encoding="utf-8-sig"• Windows 中文 → encoding="gbk"• 不确定 → 用上面的 detect_and_read()
7. 数据清洗实战
真实数据往往是"脏"的——缺失值、格式不一、多余空格……CSV 读取后第一步就是清洗。
import csv# 创建一个"脏"数据文件with open("dirty_data.csv", "w", encoding="utf-8", newline="") as f: f.write("姓名,年龄,城市,成绩\n") f.write(" 张三 ,20,北京,95\n") # 多余空格 f.write("李四,,上海,88\n") # 缺失年龄 f.write("王五,21,广州,abc\n") # 成绩不是数字 f.write("赵六,23,深圳,92\n") f.write("张三,20,北京,95\n") # 重复行 f.write(",25,武汉,70\n") # 缺失姓名# 读取并清洗def clean_csv(filepath): """清洗 CSV 数据""" cleaned = [] with open(filepath, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for i, row in enumerate(reader, start=2): # 从第2行开始计数 # 1. 去除多余空格 row = {k: v.strip() for k, v in row.items()} # 2. 检查必填字段 if not row["姓名"]: print(f"⚠️ 第{i}行: 姓名为空,跳过") continue # 3. 转换数值类型 try: row["年龄"] = int(row["年龄"]) if row["年龄"] else None except ValueError: print(f"⚠️ 第{i}行: 年龄 '{row['年龄']}' 不是数字,设为 None") row["年龄"] = None try: row["成绩"] = float(row["成绩"]) except ValueError: print(f"⚠️ 第{i}行: 成绩 '{row['成绩']}' 不是数字,跳过") continue # 4. 去重 if row not in cleaned: cleaned.append(row) else: print(f"⚠️ 第{i}行: 重复数据,跳过") return cleaned# 执行清洗result = clean_csv("dirty_data.csv")print(f"\n✅ 清洗完成: {len(result)} 条有效记录")for r in result: print(f" {r}")# 输出:# ⚠️ 第3行: 年龄 '' 不是数字,设为 None# ⚠️ 第5行: 成绩 'abc' 不是数字,跳过# ⚠️ 第6行: 重复数据,跳过# ⚠️ 第7行: 姓名为空,跳过## ✅ 清洗完成: 3 条有效记录# {'姓名': '张三', '年龄': 20, '城市': '北京', '成绩': 95.0}# {'姓名': '李四', '年龄': None, '城市': '上海', '成绩': 88.0}# {'姓名': '王五', '年龄': 21, '城市': '广州', '成绩': 76.0}
8. 实战:学生成绩分析器
把前面学的知识串起来——读取 CSV → 分析数据 → 写出报告。
import csvfrom collections import defaultdict# 准备数据with open("scores.csv", "w", encoding="utf-8", newline="") as f: writer = csv.writer(f) writer.writerow(["姓名", "班级", "语文", "数学", "英语"]) writer.writerows([ ["张三", "一班", 90, 95, 88], ["李四", "一班", 85, 80, 92], ["王五", "二班", 78, 88, 76], ["赵六", "二班", 92, 90, 85], ["钱七", "一班", 88, 75, 90], ["孙八", "二班", 95, 92, 88], ])# ---- 分析 ----# 读取数据with open("scores.csv", "r", encoding="utf-8") as f: reader = csv.DictReader(f) students = [] for row in reader: students.append({ "姓名": row["姓名"], "班级": row["班级"], "语文": int(row["语文"]), "数学": int(row["数学"]), "英语": int(row["英语"]), })# 1. 计算每人总分和平均分for s in students: s["总分"] = s["语文"] + s["数学"] + s["英语"] s["平均分"] = round(s["总分"] / 3, 1)# 按总分排名students.sort(key=lambda x: x["总分"], reverse=True)print("📊 成绩排名")print("-" * 50)for i, s in enumerate(students, 1): print(f" {i}. {s['姓名']} ({s['班级']}): 总分 {s['总分']}, 平均 {s['平均分']}")# 2. 按班级统计class_stats = defaultdict(lambda: {"count": 0, "total": 0, "scores": []})for s in students: cls = s["班级"] class_stats[cls]["count"] += 1 class_stats[cls]["total"] += s["总分"] class_stats[cls]["scores"].append(s["总分"])print("\n🏫 班级统计")print("-" * 50)for cls, stats in class_stats.items(): avg = round(stats["total"] / stats["count"], 1) highest = max(stats["scores"]) lowest = min(stats["scores"]) print(f" {cls}: 人数 {stats['count']}, 平均 {avg}, 最高 {highest}, 最低 {lowest}")# 3. 各科最高分subjects = ["语文", "数学", "英语"]print("\n🏆 各科最高分")print("-" * 50)for subj in subjects: best = max(students, key=lambda x: x[subj]) print(f" {subj}: {best['姓名']} ({best[subj]}分)")# 4. 写出分析报告 CSVwith open("report.csv", "w", encoding="utf-8", newline="") as f: fieldnames = ["排名", "姓名", "班级", "语文", "数学", "英语", "总分", "平均分"] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for i, s in enumerate(students, 1): writer.writerow({ "排名": i, "姓名": s["姓名"], "班级": s["班级"], "语文": s["语文"], "数学": s["数学"], "英语": s["英语"], "总分": s["总分"], "平均分": s["平均分"] })print("\n📄 报告已保存到 report.csv")
# 运行输出:# 📊 成绩排名# --------------------------------------------------# 1. 孙八 (二班): 总分 275, 平均 91.7# 2. 张三 (一班): 总分 273, 平均 91.0# 3. 赵六 (二班): 总分 267, 平均 89.0# 4. 李四 (一班): 总分 257, 平均 85.7# 5. 钱七 (一班): 总分 253, 平均 84.3# 6. 王五 (二班): 总分 242, 平均 80.7## 🏫 班级统计# --------------------------------------------------# 一班: 人数 3, 平均 87.7, 最高 273, 最低 253# 二班: 人数 3, 平均 86.0, 最高 275, 最低 242## 🏆 各科最高分# --------------------------------------------------# 语文: 孙八 (95分)# 数学: 张三 (95分)# 英语: 李四 (92分)## 📄 报告已保存到 report.csv
9. CSV vs JSON 选型指南
💡 选型口诀:• 扁平表格 → CSV(成绩表、通讯录、日志记录)• 嵌套/树形 → JSON(API 数据、配置文件、用户资料)• 两种都行 → 看下游:Excel 用户多用 CSV,程序间传数据用 JSON
10. 今日小结
| | |
|---|
csv.reader(f) | | |
csv.writer(f) | | |
csv.DictReader(f) | | |
csv.DictWriter(f) | | |
核心要点
- ✅ 写入时必加
newline=""(防 Windows 多空行) - ✅
reader/writer 用索引访问,DictReader/Writer 用列名访问 - ✅ 所有值都是字符串,数值要手动
int()/float() - ✅ Excel 导出用
utf-8-sig 编码读取
🎯 练习建议:1. 创建一个通讯录 CSV(姓名/电话/邮箱),实现增删改查2. 读取一个 CSV 文件,统计每列的缺失值数量3. 写一个函数,实现 CSV 和 JSON 格式互相转换
📚 Day29 完成!明天学习 datetime 模块 —— 时间处理不再头疼