前11篇,我们像搭积木一样学了 pandas 的各种工具——从读文件到合并、从处理缺失值到分组聚合。但真实工作中,你会拿到一张"完全不干净"的原始表。今天我们不学新工具,而是学怎么把工具串起来,做一条自动化清洗流水线。这也是一篇适合收藏的工作手册。
文中涉及到的数据文件可以联系我发给你,因为我还不知道怎么在公众号插入文件
pandas 大多数方法都返回 DataFrame,所以你可以链式调用——但自定义函数不行。
# 这样写会报错,因为drop_duplicates返回的DataFrame上找不到clean_text方法df.drop_duplicates().clean_text().pipe() 解决了这个问题:它把一个函数作为参数,把当前 DataFrame 传入函数,再返回处理后的 DataFrame。
def clean_text(df): """清理所有文本列的前后空格""" text_cols = df.select_dtypes(include='object').columns df = df.copy() for col in text_cols: df[col] = df[col].str.strip() return df# 用pipe把自定义函数接入链式调用df_clean = (df .drop_duplicates() .pipe(clean_text) # pipe在这里接上了自定义函数 .reset_index(drop=True))
.pipe()的本质就是把df传给函数,再拿回返回值。相当于clean_text(df),但它允许你写成优雅的链式调用风格。
把每个清洗步骤封装成一个独立函数,是建立流水线的前提。好的清洗函数遵循三个原则:
原则一:一个函数只做一件事
def clean_all(df): | def fix_dtypes(df): |
原则二:永远返回一个 DataFrame
无论内部做了什么,函数的 return 类型一定是 pd.DataFrame。这保证了链条不会断。
原则三:接受并转发 **kwargs
给灵活性留个口子——调用者可能需要传参数给内部方法。
def remove_duplicates(df, **kwargs): subset = kwargs.get('subset', None) keep = kwargs.get('keep', 'first') return df.drop_duplicates(subset=subset, keep=keep)# 可以指定去重列df_clean = remove_duplicates(df, subset=['emp_id'])把各个清洗函数用 .pipe() 串在一起,就是一条清洗流水线:
import pandas as pd# ===== 步骤1:去除重复行 =====def remove_duplicates(df): before = len(df) df = df.drop_duplicates() after = len(df) print(f"[去重] {before} → {after} 行 (移除 {before - after} 行)") return df# ===== 步骤2:修正数据类型 =====def fix_dtypes(df): df = df.copy() # 日期列转为datetime if 'hire_date' in df.columns: df['hire_date'] = pd.to_datetime(df['hire_date'], errors='coerce') # 薪资转为float if 'salary' in df.columns: df['salary'] = pd.to_numeric(df['salary'], errors='coerce') # 年龄转为float再转int(如果全部合法) if 'age' in df.columns: df['age'] = pd.to_numeric(df['age'], errors='coerce') print("[类型修正] 完成") print(df.dtypes.to_string()) return df# ===== 步骤3:处理缺失值 =====def handle_missing(df): df = df.copy() for col in df.columns: missing = df[col].isnull().sum() if missing == 0: continue pct = missing / len(df) * 100 if df[col].dtype in ['float64', 'int64']: # 数值列用中位数填充 df[col] = df[col].fillna(df[col].median()) print(f"[缺失处理] {col}: {missing}个缺失 ({pct:.1f}%) → 中位数填充") else: # 分类/文本列用众数填充 mode_val = df[col].mode() if len(mode_val) > 0: df[col] = df[col].fillna(mode_val[0]) print(f"[缺失处理] {col}: {missing}个缺失 ({pct:.1f}%) → 众数填充") return df# ===== 步骤4:标准化文本 =====def standardize_text(df): df = df.copy() text_cols = df.select_dtypes(include='object').columns for col in text_cols: df[col] = df[col].astype(str).str.strip() # 统一大小写(对部门等分类列特别有用) if col in ['department', 'gender', 'education']: df[col] = df[col].str.replace('nan', pd.NA) print("[文本标准化] 完成") return df# ===== 步骤5:数据验证 =====def validate(df): print("\n===== 数据验证 =====") checks = [] # 检查:emp_id是否有重复 if 'emp_id' in df.columns: dup_count = df['emp_id'].duplicated().sum() result = '通过' if dup_count == 0 else f'失败 (重复{dup_count}个)' checks.append(('员工ID唯一性', result)) print(f" 员工ID唯一性: {result}") # 检查:salary范围 if 'salary' in df.columns: valid_sal = (df['salary'] > 0).sum() result = '通过' if valid_sal == len(df) else f'失败 ({len(df) - valid_sal}条异常)' checks.append(('薪资>0', result)) print(f" 薪资>0: {result}") # 检查:年龄范围 if 'age' in df.columns: valid_age = df['age'].between(18, 80).sum() result = '通过' if valid_age == len(df) else f'失败 ({len(df) - valid_age}条异常)' checks.append(('年龄18-80', result)) print(f" 年龄18-80: {result}") print(f"\n总计: {sum(1 for _, v in checks if '通过' in v)}/{len(checks)} 项通过") return df# ===== 组装流水线 =====def clean_pipeline(df): return (df .pipe(remove_duplicates) .pipe(fix_dtypes) .pipe(handle_missing) .pipe(standardize_text) .pipe(validate) )流水线的好处:你可以像读菜单一样从上到下看懂整个清洗流程,改一个步骤不影响其他步骤,而且任何一个函数都可以单独拿出来测试。
下面我们用一个完整的、end-to-end 的案例把整个系列串起来。
import pandas as pdimport numpy as npdf = pd.read_csv('employee_data.csv')print("=== 数据结构 ===")print(df.info())print(f"\n行数: {len(df)}, 列数: {len(df.columns)}")print(f"列名: {df.columns.tolist()}")print("\n=== 缺失值统计 ===")missing = df.isnull().sum()missing_pct = (missing / len(df) * 100).round(1)missing_report = pd.DataFrame({ '缺失数': missing, '缺失率(%)': missing_pct})print(missing_report[missing_report['缺失数'] > 0])# 按员工ID检查dup_emp = df[df['emp_id'].duplicated(keep=False)]print(f"\n重复的员工ID: {len(dup_emp)}条 ({df['emp_id'].duplicated().sum()}个重复)")# 整体检查full_dup = df.duplicated().sum()print(f"完全重复的行: {full_dup}条")# 日期转换df['hire_date'] = pd.to_datetime(df['hire_date'], errors='coerce')# 数值转换df['salary'] = pd.to_numeric(df['salary'], errors='coerce')df['age'] = pd.to_numeric(df['age'], errors='coerce')print("\n修正后的数据类型:")print(df.dtypes)# 年龄异常检查age_outliers = df[(df['age'] < 18) | (df['age'] > 75)]print(f"年龄异常: {len(age_outliers)}条")if len(age_outliers) > 0: print(age_outliers[['emp_id', 'name', 'age']])# 工资异常检查(用IQR法)Q1 = df['salary'].quantile(0.25)Q3 = df['salary'].quantile(0.75)IQR = Q3 - Q1lower = Q1 - 1.5 * IQRupper = Q3 + 1.5 * IQRsal_outliers = df[(df['salary'] < lower) | (df['salary'] > upper)]print(f"\n工资异常(IQR法): {len(sal_outliers)}条")# 去除前后空格text_cols = df.select_dtypes(include='object').columnsfor col in text_cols: df[col] = df[col].str.strip()# 检查department的异常值print("\n部门分类:")print(df['department'].value_counts())# 如果有'nan'字符串,替换为真正的缺失值df['department'] = df['department'].replace('nan', np.nan)# 统一education的写法(把"本科"、"_本科"统一)print(f"\n学历分类: {df['education'].value_counts().to_dict()}")dept = pd.read_csv('department_data.csv')# 合并前检查print(f"部门表department唯一性: {dept['department'].is_unique}")df = pd.merge(df, dept, on='department', how='left', validate='m:1')# 简单看下合并结果print(f"\n合并后列数: {len(df.columns)}")print(f"新增列: manager, floor, headcount, dept_code")print("\n===== 最终数据概况 =====")print(f"行数: {len(df)}, 列数: {len(df.columns)}")print(f"缺失值总数: {df.isnull().sum().sum()}")print(f"数据类型: \n{df.dtypes.value_counts().to_string()}")# 保存清洗后的数据output_path = 'employee_data_clean.csv'df.to_csv(output_path, index=False, encoding='utf-8-sig')print(f"\n清洗完成,已保存至: {output_path}")拿来做一张速查表。每次拿到新数据,按顺序跑一遍:
| 了解结构 | .info().shape | ||
| 统计摘要 | .describe() | ||
| 缺失值 | .isnull().sum() | ||
| 重复行 | .duplicated() | ||
| 数据类型 | .dtypes | ||
| 异常值 | .quantile().std() | ||
| 分类变量 | .value_counts() | ||
| 文本规范 | .str.strip().str.lower() | ||
| 关联一致性 | merge(indicator=True) | ||
| 记录变更 |
建议把这个清单打印出来贴在工位上——前5次照着做,以后就变成肌肉记忆了。
这是我们"从零开始学Python数据清洗"系列的第12篇,也是最后一篇。来回顾一下这趟旅程:
print("Hello")read_csv、read_excel、to_csv,搞定文件I/O.info()、.describe()、.value_counts(),三招看清数据面貌.isnull() 排查,删除法 .dropna().fillna(),均值、中位数、众数、前后值、插值astype()、pd.to_numeric()、pd.to_datetime(),让每列各归其位.str 访问器的全套方法:拆分、替换、提取、正则匹配query()、loc/iloc、sort_values,精准定位数据concat、merge、join,打通多张表groupby、pivot_table、melt,从明细到汇总pipe() 串联,从单步操作到自动化工程pd.read_csv() | ||
.info().describe() | ||
.fillna().astype(), .str | ||
.merge().groupby(), .pipe() |
数据清洗是数据分析的前80%。掌握了这12篇的内容,你已经可以处理工作中绝大多数"脏数据"了。下一步的建议路线:
scikit-learn 做机器学习,或 openpyxl 做自动化报表学习数据分析最好的方式永远是做项目。找一个你真正感兴趣的问题,用真实数据去回答它。
12篇文章,从 import pandas as pd 到构建完整的数据清洗流水线。
如果你是从第一篇一路跟下来的——恭喜你,你已经比大多数只会在 Excel 里手动点来点去的人领先了一大步。
数据分析这条路,入门不难,但要"会用"和"用得好"之间有一条很长的路。这12篇是你的起跑线,不是终点线。
记住两句话:
感谢阅读。愿你的数据永远干净,bug永远好找。
这是"从零开始学Python数据清洗"系列的终章。如果你觉得这12篇对你有帮助,请点赞、在看、收藏,并转发给和你一样在学数据分析的朋友。有任何问题,留言区见。
#数据清洗 #pipe #自动化 #pipeline #Python数据分析 #工作手册
系列全部文章已整理为合辑,后台回复「Python清洗」获取完整目录。