还在Excel里手动找空单元格?Python三行代码帮你清洗上万行数据!
一、为什么要数据清洗?
先看一个真实场景:
import pandas as pd# 模拟一个有问题的学生成绩表原始数据 = {'学号': ['2023001', '2023002', '2023002', '2023004', None, '2023006', '2023007', '2023007'],'姓名': ['张三', '李四', '李四', '王五', '赵六', '钱七', '孙八', '孙八'],'性别': ['男', '女', '女', '男', None, '女', '男', '男'],'语文': [85, 92, 92, 78, 90, 88, 150, 95], # 150是异常值'数学': [92, None, 88, 85, 95, 90, 88, 88], # 有缺失值'英语': [88, 85, 85, 80, 92, 87, -10, 92], # -10是异常值'班级': ['三(2)班', '三(1)班', '三(1)班', '三(2)班', '三(1)班', '三(2)班', '三(3)班', '三(3)班']}df = pd.DataFrame(原始数据)print("📊 原始数据(包含各种问题):")print(df)print("\n❌ 问题清单:")print("1. 第5行学号为None")print("2. 第2行数学成绩为None")print("3. 第2、3行完全重复")print("4. 第7、8行部分重复(学号姓名相同)")print("5. 第7行语文150分(异常)")print("6. 第7行英语-10分(异常)")
运行结果:
📊 原始数据(包含各种问题): 学号 姓名 性别 语文 数学 英语 班级0 2023001 张三 男 85 92.0 88 三(2)班1 2023002 李四 女 92 NaN 85 三(1)班2 2023002 李四 女 92 88.0 85 三(1)班3 2023004 王五 男 78 85.0 80 三(2)班4 None 赵六 None 90 95.0 92 三(1)班5 2023006 钱七 女 88 90.0 87 三(2)班6 2023007 孙八 男 150 88.0 -10 三(3)班7 2023007 孙八 男 95 88.0 92 三(3)班❌ 问题清单:1. 第5行学号为None2. 第2行数学成绩为None3. 第2、3行完全重复4. 第7、8行部分重复(学号姓名相同)5. 第7行语文150分(异常)6. 第7行英语-10分(异常)
二、数据清洗三板斧
2.1 第一板斧:处理缺失值(NaN/None)
print("🧹 第一步:处理缺失值")print("="*50)# 1. 查看缺失值情况print("🔍 缺失值统计:")print(df.isnull().sum())print(f"\n📊 总缺失值:{df.isnull().sum().sum()}个")# 2. 查看缺失值的具体位置print("\n📍 缺失值位置:")缺失行 = df[df.isnull().any(axis=1)]print(缺失行)# 3. 处理方法1:删除缺失值print("\n🗑️ 方法1:删除所有含缺失值的行")df_删除缺失 = df.dropna()print(f"删除后行数:{len(df_删除缺失)} (原{len(df)}行)")# 4. 处理方法2:填充缺失值print("\n🔄 方法2:智能填充缺失值")df_填充 = df.copy()# 不同列用不同方式填充df_填充['学号'] = df_填充['学号'].fillna('未知学号') # 文本用特定值填充df_填充['性别'] = df_填充['性别'].fillna('未知') # 文本用特定值填充df_填充['数学'] = df_填充['数学'].fillna(df_填充['数学'].mean()) # 数值用平均值填充print("填充后数据:")print(df_填充)print(f"\n✅ 填充后缺失值统计:")print(df_填充.isnull().sum())# 5. 处理方法3:仅删除特定列缺失的行print("\n🎯 方法3:只删除关键列缺失的行")df_关键列清理 = df.dropna(subset=['学号', '姓名']) # 只删除学号或姓名为空的行print(f"删除后行数:{len(df_关键列清理)}行")
2.2 第二板斧:处理重复值
print("\n🧹 第二步:处理重复值")print("="*50)# 1. 查找重复行print("🔍 查找完全重复的行:")重复行 = df[df.duplicated(keep=False)] # keep=False标记所有重复行print(重复行)print(f"\n📊 重复行数量:{len(重复行)}行")# 2. 删除完全重复的行print("\n🗑️ 删除完全重复的行(保留第一行):")df_去重 = df.drop_duplicates()print(f"去重后行数:{len(df_去重)} (原{len(df)}行)")print(df_去重)# 3. 按特定列去重print("\n🎯 按学号去重(保留第一次出现的行):")df_学号去重 = df.drop_duplicates(subset=['学号'], keep='first')print(f"按学号去重后:{len(df_学号去重)}行")print(df_学号去重)# 4. 查看部分重复(学号相同但其他信息不同)print("\n⚠️ 检查学号重复但其他信息可能不同的行:")学号重复 = df[df.duplicated(subset=['学号'], keep=False)]print(学号重复)# 5. 智能去重:对重复行取平均值print("\n🧠 智能处理:对学号重复的行取平均值")def 智能去重(df):# 数值列取平均值,非数值列取第一个值 数值列 = ['语文', '数学', '英语'] 非数值列 = [col for col in df.columns if col not in 数值列] 去重数据 = [] 已处理学号 = set()for _, row in df.iterrows(): 学号 = row['学号']if 学号 in 已处理学号 or pd.isna(学号):continue# 找到所有相同学号的行 相同学号行 = df[df['学号'] == 学号]if len(相同学号行) > 1:# 计算平均值 平均值行 = 相同学号行[数值列].mean()# 取非数值列的第一行值 非数值值 = 相同学号行.iloc[0][非数值列]# 合并 新行 = pd.concat([非数值值, 平均值行]) 去重数据.append(新行)else: 去重数据.append(row) 已处理学号.add(学号)return pd.DataFrame(去重数据)df_智能去重 = 智能去重(df)print("智能去重后:")print(df_智能去重)
2.3 第三板斧:处理异常值
print("\n🧹 第三步:处理异常值")print("="*50)# 1. 识别异常值(简单的统计方法)print("📊 数值列统计信息:")数值列 = ['语文', '数学', '英语']print(df[数值列].describe())# 2. 方法1:3σ原则(适用于正态分布)print("\n🔬 方法1:3σ原则检测异常值")def 检测3σ异常(df, 列名): mean = df[列名].mean() std = df[列名].std() 下限 = mean - 3 * std 上限 = mean + 3 * std 异常值 = df[(df[列名] < 下限) | (df[列名] > 上限)]return 异常值print("语文异常值:")print(检测3σ异常(df, '语文'))# 3. 方法2:IQR方法(更稳健)print("\n📏 方法2:IQR(四分位距)方法检测异常值")def 检测IQR异常(df, 列名): Q1 = df[列名].quantile(0.25) Q3 = df[列名].quantile(0.75) IQR = Q3 - Q1 下限 = Q1 - 1.5 * IQR 上限 = Q3 + 1.5 * IQR 异常值 = df[(df[列名] < 下限) | (df[列名] > 上限)]return 异常值for 列 in 数值列: 异常 = 检测IQR异常(df, 列)if len(异常) > 0:print(f"{列}异常值:")print(异常)# 4. 方法3:业务规则检测print("\n📋 方法3:业务规则检测(0-100分范围)")def 业务规则异常(df): 异常行 = []for index, row in df.iterrows(): 问题 = []# 分数应该在0-100之间if row['语文'] < 0 or row['语文'] > 100: 问题.append(f"语文{row['语文']}分")if row['数学'] < 0 or row['数学'] > 100: 问题.append(f"数学{row['数学']}分")if row['英语'] < 0 or row['英语'] > 100: 问题.append(f"英语{row['英语']}分")if 问题: 异常行.append({'行索引': index,'学号': row['学号'],'姓名': row['姓名'],'问题': '、'.join(问题) })return pd.DataFrame(异常行)异常表 = 业务规则异常(df)print("业务规则检测到的异常:")print(异常表)# 5. 处理异常值print("\n🔄 处理异常值")df_处理异常 = df.copy()# 方法1:删除异常行df_删除异常 = df_处理异常.drop(异常表['行索引'])print(f"删除异常行后:{len(df_删除异常)}行")# 方法2:修正异常值(用中位数替换)print("\n✏️ 修正异常值(用中位数替换):")for index in 异常表['行索引']:for 列 in 数值列:if df_处理异常.at[index, 列] < 0 or df_处理异常.at[index, 列] > 100:# 用中位数替换异常值 中位数 = df_处理异常[列].median() df_处理异常.at[index, 列] = 中位数print(f"修正:行{index} {列}={中位数}")print("\n修正后数据:")print(df_处理异常)
三、实战:完整的数据清洗流程
class 数据清洗工具箱:"""完整的数据清洗工具""" def __init__(self, 数据): self.原始数据 = 数据.copy() self.清洗后数据 = 数据.copy() self.清洗报告 = [] def 生成报告(self):"""生成清洗报告"""print("📋 数据清洗报告")print("="*60)print(f"原始数据:{len(self.原始数据)}行 × {len(self.原始_data.columns)}列")print(f"清洗后数据:{len(self.清洗后数据)}行 × {len(self.清洗后数据.columns)}列")print(f"清洗步骤:{len(self.清洗报告)}步")for i, 步骤 in enumerate(self.清洗报告, 1):print(f"\n{i}. {步骤['操作']}")print(f" 结果:{步骤['结果']}")if'详情'in 步骤:print(f" 详情:{步骤['详情']}") def 处理缺失值(self, 策略='智能填充'):"""处理缺失值"""print("\n🧹 处理缺失值...")# 统计缺失值 缺失统计 = self.清洗后数据.isnull().sum() 总缺失 = 缺失统计.sum()if 总缺失 == 0: self.清洗报告.append({'操作': '处理缺失值','结果': '无缺失值' })returnif 策略 == '删除':# 删除所有含缺失值的行 原始行数 = len(self.清洗后数据) self.清洗后数据 = self.清洗后数据.dropna() 删除行数 = 原始行数 - len(self.清洗后数据) self.清洗报告.append({'操作': '删除缺失值','结果': f'删除{删除行数}行','详情': f'剩余{len(self.清洗后数据)}行' })elif 策略 == '智能填充':# 智能填充缺失值for 列 in self.清洗后_data.columns:if self.清洗后数据[列].dtype in ['int64', 'float64']:# 数值列用中位数填充 填充值 = self.清洗后_data[列].median() self.清洗后_data[列] = self.清洗后数据[列].fillna(填充值)else:# 文本列用众数填充if not self.清洗后_data[列].dropna().empty: 填充值 = self.清洗后_data[列].mode()[0] self.清洗后_data[列] = self.清洗后数据[列].fillna(填充值) self.清洗报告.append({'操作': '智能填充缺失值','结果': f'填充{总缺失}个缺失值' }) def 处理重复值(self, 依据列=None, 保留='first'):"""处理重复值"""print("\n🔍 处理重复值...")if 依据列 is None:# 完全重复的行 重复行数 = len(self.清洗后数据) - len(self.清洗后_data.drop_duplicates()) self.清洗后数据 = self.清洗后_data.drop_duplicates()else:# 按指定列去重 重复行数 = len(self.清洗后_data) - len(self.清洗后_data.drop_duplicates(subset=依据列, keep=保留)) self.清洗后数据 = self.清洗后_data.drop_duplicates(subset=依据列, keep=保留) self.清洗报告.append({'操作': f'处理重复值(依据:{依据列 if 依据列 else "所有列"})','结果': f'删除{重复行数}行重复数据' }) def 处理异常值(self, 列列表=None, 方法='业务规则'):"""处理异常值"""print("\n⚠️ 处理异常值...")if 列列表 is None: 列列表 = self.清洗后_data.select_dtypes(include=['int64', 'float64']).columns 异常行索引 = set()for 列 in 列列表:if 方法 == 'IQR':# IQR方法 Q1 = self.清洗后_data[列].quantile(0.25) Q3 = self.清洗后_data[列].quantile(0.75) IQR = Q3 - Q1 下限 = Q1 - 1.5 * IQR 上限 = Q3 + 1.5 * IQR 异常索引 = self.清洗后_data[(self.清洗后_data[列] < 下限) | (self.清洗后_data[列] > 上限)].index 异常行索引.update(异常索引)elif 方法 == '业务规则':# 业务规则:分数在0-100之间if 列 in ['语文', '数学', '英语']: 异常索引 = self.清洗后_data[(self.清洗后_data[列] < 0) | (self.清洗后_data[列] > 100)].index 异常行索引.update(异常索引)if 异常行索引:# 修正异常值为中位数for 列 in 列列表:for 索引 in 异常行索引:if 索引 in self.清洗后_data.index: 中位数 = self.清洗后_data[列].median() self.清洗后_data.at[索引, 列] = 中位数 self.清洗报告.append({'操作': f'处理异常值({方法}方法)','结果': f'修正{len(异常行索引)}行中的异常值','详情': f'涉及列:{", ".join(列列表)}' })else: self.清洗报告.append({'操作': '处理异常值','结果': '未发现异常值' }) def 标准化文本(self):"""标准化文本数据"""print("\n🔤 标准化文本数据...") 文本列 = self.清洗后_data.select_dtypes(include=['object']).columns 修改数量 = 0for 列 in 文本列:# 去除首尾空格 原始值 = self.清洗后_data[列].astype(str) 标准化值 = 原始值.str.strip()# 统计修改 修改数量 += (原始值 != 标准化值).sum()# 更新数据 self.清洗后_data[列] = 标准化值 self.清洗报告.append({'操作': '标准化文本数据','结果': f'标准化{修改数量}处文本' }) def 一键清洗(self, 策略='智能'):"""一键完成所有清洗"""print("🚀 开始一键数据清洗...")print("="*50)# 记录原始状态 原始形状 = self.原始数据.shape# 执行清洗步骤 self.处理缺失值(策略='智能填充') self.处理重复值(依据列=['学号'], 保留='first') self.处理异常值(方法='业务规则') self.标准化文本()# 生成最终报告print("\n" + "="*50)print("✅ 数据清洗完成!")print("="*50) self.生成报告()# 对比清洗效果print(f"\n📈 清洗效果:")print(f" 行数变化:{原始形状[0]} → {self.清洗后数据.shape[0]}")print(f" 列数变化:{原始形状[1]} → {self.清洗后数据.shape[1]}")print(f" 数据完整性:{self.清洗后_data.isnull().sum().sum()}/{self.清洗后_data.size}个空值")return self.清洗后数据# 使用示例if __name__ == "__main__":# 创建测试数据 测试数据 = pd.DataFrame({'学号': ['2023001', '2023002', '2023002', '2023004', None, '2023006'],'姓名': ['张三', ' 李四 ', '李四', '王五', '赵六', '钱七'],'语文': [85, 92, 92, 78, 90, 88],'数学': [92, None, 88, 85, 95, 90],'英语': [88, 85, 85, 80, 92, 87],'班级': ['三(2)班', '三(1)班', '三(1)班', ' 三(2)班', '三(1)班', '三(2)班 '] })print("📊 原始数据:")print(测试数据)print("\n" + "="*50)# 使用清洗工具箱 工具箱 = 数据清洗工具箱(测试数据) 清洗后数据 = 工具箱.一键清洗()print("\n📋 清洗后数据:")print(清洗后数据)
四、实战:批量清洗多个文件
import osimport pandas as pdfrom datetime import datetimeclass 批量数据清洗器:"""批量清洗多个文件""" def __init__(self, 输入文件夹, 输出文件夹): self.输入文件夹 = 输入文件夹 self.输出文件夹 = 输出文件夹 self.清洗日志 = []# 创建输出文件夹if not os.path.exists(输出文件夹): os.makedirs(输出文件夹) def 清洗单个文件(self, 文件路径):"""清洗单个文件""" 文件名 = os.path.basename(文件路径)print(f"\n📄 清洗文件:{文件名}") try:# 根据文件类型读取if 文件路径.endswith('.xlsx') or 文件路径.endswith('.xls'): df = pd.read_excel(文件路径)elif 文件_path.endswith('.csv'):# 尝试不同编码 try: df = pd.read_csv(文件路径, encoding='utf-8-sig') except: df = pd.read_csv(文件路径, encoding='gbk')else:print(f"❌ 不支持的文件格式:{文件名}")return None# 记录原始状态 原始行数 = len(df) 原始空值 = df.isnull().sum().sum()# 创建清洗工具箱 工具箱 = 数据清洗工具箱(df)# 执行清洗 df_清洗后 = 工具箱.一键清洗()# 保存清洗后的文件 输出文件名 = f"清洗后_{文件名}" 输出路径 = os.path.join(self.输出文件夹, 输出文件名)if 文件路径.endswith('.xlsx'): df_清洗后.to_excel(输出路径, index=False)else: df_清洗后.to_csv(输出路径, index=False, encoding='utf-8-sig')# 记录日志 self.清洗日志.append({'文件名': 文件名,'原始行数': 原始行数,'清洗后行数': len(df_清洗后),'原始空值': 原始空值,'清洗后空值': df_清洗后.isnull().sum().sum(),'输出文件': 输出文件名,'状态': '成功' })print(f"✅ 清洗完成,保存到:{输出文件名}")return df_清洗后 except Exception as e:print(f"❌ 清洗失败:{e}") self.清洗日志.append({'文件名': 文件名,'状态': f'失败:{str(e)}' })return None def 批量清洗(self):"""批量清洗文件夹中所有文件"""print(f"🚀 开始批量清洗:{self.输入文件夹}")print("="*60)# 获取所有支持的文件 支持格式 = ['.xlsx', '.xls', '.csv'] 文件列表 = []for 文件 in os.listdir(self.输入文件夹): 文件路径 = os.path.join(self.输入文件夹, 文件)if os.path.isfile(文件路径):if any(文件.lower().endswith(格式) for 格式 in 支持格式): 文件列表.append(文件路径)print(f"找到{len(文件列表)}个文件:")for 文件 in 文件列表:print(f" • {os.path.basename(文件)}")# 批量清洗for 文件路径 in 文件列表: self.清洗单个文件(文件路径)# 生成清洗报告 self.生成清洗报告() def 生成清洗报告(self):"""生成批量清洗报告""" 时间戳 = datetime.now().strftime("%Y%m%d_%H%M%S") 报告文件 = os.path.join(self.输出文件夹, f"批量清洗报告_{时间戳}.xlsx")if not self.清洗日志:print("❌ 没有清洗日志")return# 转换为DataFrame df_报告 = pd.DataFrame(self.清洗日志)# 保存报告 with pd.ExcelWriter(报告文件, engine='openpyxl') as writer: df_报告.to_excel(writer, sheet_name='清洗日志', index=False)# 添加统计摘要 统计摘要 = pd.DataFrame({'统计项': ['总文件数', '成功数', '失败数', '平均行数减少', '平均空值减少'],'数值': [ len(self.清洗日志), len([logforlogin self.清洗日志 iflog['状态'] == '成功']), len([logforlogin self.清洗日志 iflog['状态'] != '成功']), df_报告['原始行数'].mean() - df_报告['清洗后行数'].mean(), df_报告['原始空值'].mean() - df_报告['清洗后空值'].mean() ] }) 统计摘要.to_excel(writer, sheet_name='统计摘要', index=False)print(f"\n📋 批量清洗报告已保存:{报告文件}")print("\n📊 清洗统计:")print(统计摘要.to_string(index=False))# 使用示例if __name__ == "__main__":# 创建测试文件if not os.path.exists('测试输入'): os.makedirs('测试输入')# 创建几个有问题的测试文件for i in range(1, 4): 问题数据 = pd.DataFrame({'学号': [f'202300{j}'for j in range(1, 6)],'姓名': ['张三', '李四', '李四', '王五', '赵六'],'语文': [85, 92, 92, 78, 90],'数学': [92, None, 88, 85, 95],'英语': [88, 85, 85, 80, 92] }) 问题数据.iloc[0, 2] = 150 # 添加异常值 问题数据.to_excel(f'测试输入/班级{i}成绩.xlsx', index=False)# 也保存一个CSV版本 问题数据.to_csv(f'测试输入/班级{i}成绩.csv', index=False, encoding='utf-8-sig')print("✅ 已创建测试文件")# 批量清洗 清洗器 = 批量数据清洗器('测试输入', '清洗输出') 清洗器.批量清洗()
五、数据清洗常见问题解决方案
# 📋 数据清洗速查表# 1. 检测问题# df.isnull().sum() # 统计缺失值# df.duplicated().sum() # 统计重复行# df.describe() # 查看异常值# 2. 处理缺失值# df.dropna() # 删除所有含缺失值的行# df.dropna(subset=['列1', '列2']) # 删除指定列缺失的行# df.fillna(值) # 用固定值填充# df.fillna(df.mean()) # 用平均值填充# df.fillna(df.median()) # 用中位数填充# df.fillna(method='ffill') # 用前一个值填充# df.fillna(method='bfill') # 用后一个值填充# 3. 处理重复值# df.drop_duplicates() # 删除完全重复的行# df.drop_duplicates(subset=['列1']) # 按指定列去重# df.drop_duplicates(keep='first') # 保留第一个# df.drop_duplicates(keep='last') # 保留最后一个# df.drop_duplicates(keep=False) # 删除所有重复行# 4. 处理异常值# 基于IQR:# Q1 = df['列'].quantile(0.25)# Q3 = df['列'].quantile(0.75)# IQR = Q3 - Q1# 下限 = Q1 - 1.5 * IQR# 上限 = Q3 + 1.5 * IQR# 异常值 = df[(df['列'] < 下限) | (df['列'] > 上限)]# 5. 标准化文本# df['列'] = df['列'].str.strip() # 去除首尾空格# df['列'] = df['列'].str.lower() # 转为小写# df['列'] = df['列'].str.upper() # 转为大写# df['列'] = df['列'].str.replace(' ', '') # 去除空格# 6. 数据类型转换# df['列'] = df['列'].astype(int) # 转为整数# df['列'] = df['列'].astype(float) # 转为浮点数# df['列'] = pd.to_datetime(df['列']) # 转为日期时间
六、明日预告
明天我们将学习 「数据"选妃记":如何精准筛选出你想要的那一行?」你将学到:
公众号:数字编程
回复"Py-Day9"获取完整代码和清洗工具
下期:明天下午6点,数据筛选与查询实战!