1. 数据清洗概述
什么是数据清洗?
数据清洗是数据预处理的关键步骤,指识别和纠正(或删除)数据集中的不准确、不完整、不合理或重复的部分,以提高数据质量的过程。
数据清洗的重要性
# importance_of_data_cleaning.pyimport pandas as pdimport numpy as npprint("=== 数据清洗的重要性 ===\n")data_quality_issues = { "缺失值": "可能导致分析偏差,统计结果不准确", "重复值": "使分析结果失真,资源浪费", "异常值": "影响统计模型的准确性和稳定性", "格式不一致": "难以进行统一处理和分析", "数据错误": "导致错误的业务决策", "单位不一致": "无法进行正确的数值比较"}print("常见数据质量问题及其影响:")for issue, impact in data_quality_issues.items(): print(f" • {issue}: {impact}")print("\n数据清洗流程:")steps = [ "1. 数据评估和理解", "2. 缺失值处理", "3. 异常值检测和处理", "4. 重复值处理", "5. 格式标准化", "6. 数据验证", "7. 质量检查"]for step in steps: print(f" {step}")# 脏数据示例dirty_data = pd.DataFrame({ '姓名': ['张三', '李四', '王五', '赵六', np.nan, '张三'], '年龄': [25, 30, 150, 28, 35, 25], # 150岁是异常值 '工资': ['5000', '6000', '5,500', 'NaN', '7000', '5000'], # 格式不一致 '邮箱': ['zhangsan@email.com', 'lisi@gmail', 'wangwu@email.com', 'zhaoliu@email.com', 'error', 'zhangsan@email.com']})print(f"\n脏数据示例:\n{dirty_data}")
数据清洗原则
# data_cleaning_principles.pyimport pandas as pdprint("=== 数据清洗原则 ===\n")principles = { "完整性": "确保数据记录完整,避免大量缺失", "一致性": "相同含义的数据保持一致的格式和单位", "准确性": "数据准确反映真实世界的情况", "时效性": "数据应在有效时间范围内", "唯一性": "避免重复记录", "有效性": "数据应符合预定义的业务规则"}print("数据质量维度:")for principle, description in principles.items(): print(f" • {principle}: {description}")print("\n数据清洗最佳实践:")best_practices = [ "保留原始数据副本", "记录所有清洗步骤和决策", "使用版本控制", "自动化重复清洗任务", "验证清洗结果", "与业务专家合作理解数据"]for i, practice in enumerate(best_practices, 1): print(f" {i}. {practice}")print("\n数据清洗工具:")tools = { "Pandas": "Python数据清洗核心库", "OpenRefine": "开源数据清洗工具", "Trifacta": "企业级数据清洗平台", "dbt": "数据构建工具,包含清洗功能", "Great Expectations": "数据测试和验证库"}for tool, description in tools.items(): print(f" • {tool}: {description}")
2. 数据质量评估
数据质量评估框架
# data_quality_assessment.pyimport pandas as pdimport numpy as npfrom datetime import datetimeimport reprint("=== 数据质量评估 ===\n")# 创建包含多种质量问题的示例数据def create_sample_data(): """创建包含各种质量问题的示例数据""" data = { 'ID': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], '姓名': ['张三', '李四', '王五', '赵六', '钱七', '孙八', '周九', '吴十', '郑十一', '王十二'], '年龄': [25, 30, 150, 28, 35, 40, 29, np.nan, 32, 0], # 异常值:150, 缺失值:np.nan, 不合理值:0 '性别': ['男', '女', 'male', 'female', '男', '女', '男', '男', '未知', '男'], # 不一致 '邮箱': [ 'zhangsan@email.com', 'lisi@gmail.com', 'wangwu@hotmail', 'zhaoliu@email.com', 'qianqi@email.com', 'sunba@email.com', 'zhoujiu@email.com', 'wushi@email.com', 'zhengsy@email.com', 'wangse@email.com' ], '电话号码': [ '13800138000', '13912345678', '123456', # 过短 '18888888888', '17777777777', '16666666666', '15555555555', '14444444444', '13333333333', '12222222222' ], '入职日期': [ '2020-01-15', '2019-03-20', '2021-05-10', '2018-08-01', '2022-02-28', '2023-01-01', # 未来日期 '2017-12-25', '2016-11-11', '2015-10-10', '2014-09-09' ], '工资': [5000, 6000, 5500, 7000, 8000, 9000, 10000, 11000, 12000, 13000], '部门': ['技术部', '销售部', '技术部', '市场部', '人事部', '财务部', '技术部', '销售部', '市场部', '技术部'], '绩效评分': ['A', 'B', 'C', 'A', 'B', 'A', 'A', 'B', 'C', 'D'] } return pd.DataFrame(data)df = create_sample_data()print("示例数据:")print(df)print(f"\n数据形状:{df.shape}")print("\n1. 基本信息统计:")print(f" 数据类型:\n{df.dtypes}")print(f"\n 内存使用:{df.memory_usage(deep=True).sum() / 1024:.2f} KB")print("\n2. 缺失值分析:")missing_stats = df.isnull().sum()missing_percent = (missing_stats / len(df)) * 100missing_df = pd.DataFrame({ '缺失数量': missing_stats, '缺失比例(%)': missing_percent.round(2)})print(f" 缺失值统计:\n{missing_df[missing_df['缺失数量'] > 0]}")print("\n3. 唯一值分析:")unique_stats = {}for column in df.columns: unique_count = df[column].nunique() unique_stats[column] = { '唯一值数量': unique_count, '唯一值比例(%)': (unique_count / len(df)) * 100 }unique_df = pd.DataFrame(unique_stats).Tprint(f" 唯一值统计:\n{unique_df}")print("\n4. 数据分布分析:")print(" 数值列统计描述:")numeric_cols = df.select_dtypes(include=[np.number]).columnsif len(numeric_cols) > 0: print(df[numeric_cols].describe())print("\n5. 数据质量指标计算:")def calculate_data_quality(df): """计算数据质量指标""" quality_metrics = {} for column in df.columns: total = len(df) # 计算完整性 non_missing = df[column].count() completeness = (non_missing / total) * 100 # 计算唯一性 unique_count = df[column].nunique() uniqueness = (unique_count / non_missing) * 100 if non_missing > 0 else 0 # 计算有效性(简单示例) if column == '邮箱': email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' valid_emails = df[column].apply(lambda x: bool(re.match(email_pattern, str(x))) if pd.notnull(x) else False) validity = (valid_emails.sum() / non_missing) * 100 if non_missing > 0 else 0 elif column == '电话号码': phone_pattern = r'^1[3-9]\d{9}$' valid_phones = df[column].apply(lambda x: bool(re.match(phone_pattern, str(x))) if pd.notnull(x) else False) validity = (valid_phones.sum() / non_missing) * 100 if non_missing > 0 else 0 elif column == '年龄': valid_ages = df[column].apply(lambda x: 18 <= x <= 65 if pd.notnull(x) else False) validity = (valid_ages.sum() / non_missing) * 100 if non_missing > 0 else 0 else: validity = 100 # 假设其他列都有效 quality_metrics[column] = { '完整性(%)': completeness, '唯一性(%)': uniqueness, '有效性(%)': validity, '质量得分(%)': (completeness + uniqueness + validity) / 3 } return pd.DataFrame(quality_metrics).Tquality_df = calculate_data_quality(df)print(f" 数据质量指标:\n{quality_df}")print("\n6. 异常值检测:")def detect_outliers_iqr(data, column): """使用IQR方法检测异常值""" Q1 = data[column].quantile(0.25) Q3 = data[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = data[(data[column] < lower_bound) | (data[column] > upper_bound)] return outliers, lower_bound, upper_boundnumeric_cols = df.select_dtypes(include=[np.number]).columnsfor col in numeric_cols: outliers, lower, upper = detect_outliers_iqr(df, col) if not outliers.empty: print(f" {col}异常值检测:") print(f" 边界:[{lower:.2f}, {upper:.2f}]") print(f" 异常值数量:{len(outliers)}") print(f" 异常值:{outliers[col].tolist()}")print("\n7. 数据一致性检查:")print(" 性别列唯一值:", df['性别'].unique())print(" 性别列标准化问题:包含'男', '女', 'male', 'female', '未知'")print("\n8. 业务规则验证:")# 检查年龄合理性invalid_age = df[(df['年龄'] < 18) | (df['年龄'] > 65)]print(f" 不合理年龄(<18或>65):{len(invalid_age)}个")# 检查未来入职日期df['入职日期_dt'] = pd.to_datetime(df['入职日期'])future_hires = df[df['入职日期_dt'] > datetime.now()]print(f" 未来入职日期:{len(future_hires)}个")print("\n9. 数据质量报告:")def generate_quality_report(df): """生成数据质量报告""" report = { '记录总数': len(df), '列数': len(df.columns), '总缺失值': df.isnull().sum().sum(), '完整记录数': df.dropna().shape[0], '重复记录数': df.duplicated().sum(), '数据质量得分': quality_df['质量得分(%)'].mean() } return pd.Series(report)quality_report = generate_quality_report(df)print(f" 数据质量报告:\n{quality_report}")
数据质量评估工具
# data_quality_tools.pyimport pandas as pdimport numpy as npfrom scipy import statsimport matplotlib.pyplot as pltimport seaborn as snsprint("=== 数据质量评估工具 ===\n")# 创建自定义数据质量评估类class DataQualityAssessor: """数据质量评估器""" def __init__(self, df): self.df = df.copy() self.quality_issues = {} def assess_completeness(self): """评估完整性""" missing_stats = self.df.isnull().sum() total = len(self.df) completeness = { column: { '缺失数量': missing_stats[column], '缺失比例(%)': (missing_stats[column] / total) * 100, '完整比例(%)': 100 - (missing_stats[column] / total) * 100 } for column in self.df.columns } return pd.DataFrame(completeness).T def assess_uniqueness(self): """评估唯一性""" uniqueness = {} for column in self.df.columns: non_missing = self.df[column].dropna() unique_count = non_missing.nunique() uniqueness[column] = { '唯一值数量': unique_count, '唯一值比例(%)': (unique_count / len(non_missing)) * 100 if len(non_missing) > 0 else 0, '重复值数量': len(non_missing) - unique_count if len(non_missing) > unique_count else 0 } return pd.DataFrame(uniqueness).T def assess_consistency(self): """评估一致性""" consistency = {} for column in self.df.columns: if self.df[column].dtype == 'object': # 对于文本列,检查格式一致性 samples = self.df[column].dropna().unique()[:5] # 取前5个样本 consistency[column] = { '样本值': list(samples), '格式多样性': len(samples), '建议标准化': len(samples) > 3 # 如果格式过多,建议标准化 } else: consistency[column] = { '样本值': 'N/A', '格式多样性': 'N/A', '建议标准化': False } return pd.DataFrame(consistency).T def detect_outliers(self, method='iqr'): """检测异常值""" outliers_report = {} numeric_cols = self.df.select_dtypes(include=[np.number]).columns for column in numeric_cols: data = self.df[column].dropna() if method == 'iqr': # IQR方法 Q1 = data.quantile(0.25) Q3 = data.quantile(0.75) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR outliers = data[(data < lower) | (data > upper)] elif method == 'zscore': # Z-score方法 z_scores = np.abs(stats.zscore(data)) outliers = data[z_scores > 3] outliers_report[column] = { '异常值数量': len(outliers), '异常值比例(%)': (len(outliers) / len(data)) * 100, '异常值范围': f"[{outliers.min():.2f}, {outliers.max():.2f}]" if len(outliers) > 0 else "无", '异常值示例': outliers.tolist()[:3] if len(outliers) > 0 else [] } return pd.DataFrame(outliers_report).T def validate_business_rules(self, rules=None): """验证业务规则""" if rules is None: rules = self._default_rules() validation_results = {} for column, rule_func in rules.items(): if column in self.df.columns: violations = rule_func(self.df[column]) validation_results[column] = { '违规数量': len(violations), '违规比例(%)': (len(violations) / len(self.df)) * 100, '违规示例': violations.tolist()[:3] if len(violations) > 0 else [] } return pd.DataFrame(validation_results).T def _default_rules(self): """默认业务规则""" rules = { '年龄': lambda x: x[(x < 0) | (x > 150)], '邮箱': lambda x: x[~x.str.contains(r'@', na=False)], '电话号码': lambda x: x[~x.str.match(r'^1[3-9]\d{9}$', na=False)] } return rules def generate_report(self): """生成综合报告""" report = {} # 收集各个评估结果 report['完整性'] = self.assess_completeness() report['唯一性'] = self.assess_uniqueness() report['一致性'] = self.assess_consistency() report['异常值'] = self.detect_outliers() report['业务规则'] = self.validate_business_rules() # 计算总体质量得分 completeness_score = report['完整性']['完整比例(%)'].mean() uniqueness_score = report['唯一性']['唯一值比例(%)'].mean() validity_score = 100 - report['业务规则']['违规比例(%)'].mean() if not report['业务规则'].empty else 100 overall_score = (completeness_score + uniqueness_score + validity_score) / 3 report['总体质量'] = { '完整性得分': completeness_score, '唯一性得分': uniqueness_score, '有效性得分': validity_score, '总体质量得分': overall_score, '质量等级': self._get_quality_level(overall_score) } return report def _get_quality_level(self, score): """获取质量等级""" if score >= 90: return '优秀' elif score >= 80: return '良好' elif score >= 70: return '一般' elif score >= 60: return '及格' else: return '差' def visualize_quality(self): """可视化数据质量""" fig, axes = plt.subplots(2, 2, figsize=(15, 10)) # 1. 缺失值热图 sns.heatmap(self.df.isnull(), cbar=False, cmap='viridis', ax=axes[0, 0]) axes[0, 0].set_title('缺失值分布热图') # 2. 唯一值条形图 uniqueness = self.assess_uniqueness() axes[0, 1].bar(uniqueness.index, uniqueness['唯一值比例(%)']) axes[0, 1].set_title('各列唯一值比例') axes[0, 1].set_ylabel('唯一值比例(%)') axes[0, 1].tick_params(axis='x', rotation=45) # 3. 异常值箱线图 numeric_cols = self.df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: self.df[numeric_cols].boxplot(ax=axes[1, 0]) axes[1, 0].set_title('数值列异常值检测') axes[1, 0].tick_params(axis='x', rotation=45) # 4. 质量得分雷达图 report = self.generate_report() if '总体质量' in report: scores = [ report['总体质量']['完整性得分'], report['总体质量']['唯一性得分'], report['总体质量']['有效性得分'] ] categories = ['完整性', '唯一性', '有效性'] # 闭合雷达图 scores += scores[:1] categories += categories[:1] angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=True) axes[1, 1] = plt.subplot(2, 2, 4, polar=True) axes[1, 1].plot(angles, scores, 'o-', linewidth=2) axes[1, 1].fill(angles, scores, alpha=0.25) axes[1, 1].set_thetagrids(angles[:-1] * 180/np.pi, categories[:-1]) axes[1, 1].set_title('数据质量雷达图') axes[1, 1].grid(True) plt.tight_layout() plt.show()# 使用示例print("使用DataQualityAssessor评估数据质量:")# 创建示例数据np.random.seed(42)data = { '客户ID': range(1, 101), '姓名': [f'客户{i}' for i in range(1, 101)], '年龄': np.random.randint(18, 70, 100), '年龄': np.where(np.random.rand(100) < 0.05, np.random.randint(0, 10, 100), np.random.randint(18, 70, 100)), # 添加一些异常值 '邮箱': [f'customer{i}@example.com' for i in range(1, 101)], '电话号码': ['1' + ''.join(str(np.random.randint(0, 10)) for _ in range(10)) for _ in range(100)], '消费金额': np.random.exponential(100, 100), '注册日期': pd.date_range('2020-01-01', periods=100, freq='D')}df = pd.DataFrame(data)# 添加一些质量问题df.loc[10:15, '年龄'] = np.nan # 添加缺失值df.loc[20:25, '邮箱'] = 'invalid' # 添加无效邮箱df.loc[95, '消费金额'] = 10000 # 添加异常值df.loc[50, '年龄'] = 200 # 添加不合理年龄# 创建评估器assessor = DataQualityAssessor(df)# 生成报告report = assessor.generate_report()print("\n数据质量报告:")for section, content in report.items(): if isinstance(content, dict): print(f"\n{section}:") for key, value in content.items(): print(f" {key}: {value}") else: print(f"\n{section}:") print(content)print("\n可视化数据质量:")assessor.visualize_quality()
3. 缺失值处理高级技术
缺失值模式分析
# missing_value_advanced.pyimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport seaborn as snsfrom sklearn.impute import KNNImputer, SimpleImputerfrom sklearn.experimental import enable_iterative_imputerfrom sklearn.impute import IterativeImputerprint("=== 高级缺失值处理技术 ===\n")# 创建包含复杂缺失模式的数据def create_complex_missing_data(): """创建包含复杂缺失模式的数据""" np.random.seed(42) # 创建相关变量 n_samples = 200 X1 = np.random.normal(50, 10, n_samples) X2 = 0.7 * X1 + np.random.normal(0, 5, n_samples) # X2与X1相关 X3 = 0.5 * X1 + 0.5 * X2 + np.random.normal(0, 3, n_samples) # X3与X1、X2相关 X4 = np.random.normal(30, 5, n_samples) # 独立变量 # 创建DataFrame df = pd.DataFrame({ '年龄': X1, '收入': X2, '消费': X3, '资产': X4, '教育程度': np.random.choice(['高中', '本科', '硕士', '博士'], n_samples), '城市': np.random.choice(['北京', '上海', '广州', '深圳'], n_samples) }) # 添加复杂的缺失模式 # 1. 完全随机缺失 (MCAR) mcar_mask = np.random.rand(n_samples) < 0.1 df.loc[mcar_mask, '资产'] = np.nan # 2. 随机缺失 (MAR) - 缺失与观察到的数据相关 # 年龄小于40的人更可能缺失收入数据 mar_mask = (df['年龄'] < 40) & (np.random.rand(n_samples) < 0.3) df.loc[mar_mask, '收入'] = np.nan # 3. 非随机缺失 (MNAR) - 缺失与缺失值本身相关 # 高消费的人可能不愿意报告消费数据 mnar_mask = (df['消费'] > 55) & (np.random.rand(n_samples) < 0.4) df.loc[mnar_mask, '消费'] = np.nan return dfdf = create_complex_missing_data()print("原始数据(前10行):")print(df.head(10))print(f"\n数据形状:{df.shape}")print(f"缺失值统计:")print(df.isnull().sum())print("\n1. 缺失值模式分析:")# 缺失值模式可视化plt.figure(figsize=(12, 6))plt.subplot(1, 2, 1)sns.heatmap(df.isnull(), cbar=False, cmap='binary')plt.title('缺失值模式热图')plt.subplot(1, 2, 2)missing_pattern = df.isnull().astype(int).T.corr()sns.heatmap(missing_pattern, cmap='coolwarm', center=0)plt.title('缺失值相关性热图')plt.tight_layout()plt.show()print("\n2. 缺失值机制分析:")def analyze_missing_mechanisms(df): """分析缺失值机制""" results = {} for column in df.columns: if df[column].isnull().sum() > 0: # MCAR测试:缺失是否与任何变量相关 is_mcar = True p_values = [] for other_col in df.columns: if other_col != column and df[other_col].dtype in [np.float64, np.int64]: # 使用t检验比较缺失组和非缺失组 missing_group = df.loc[df[column].isnull(), other_col] non_missing_group = df.loc[df[column].notnull(), other_col] from scipy.stats import ttest_ind if len(missing_group) > 1 and len(non_missing_group) > 1: t_stat, p_value = ttest_ind(missing_group, non_missing_group, nan_policy='omit') p_values.append(p_value) if p_value < 0.05: is_mcar = False # 判断缺失机制 if is_mcar and len(p_values) > 0: mechanism = 'MCAR (完全随机缺失)' else: # 检查是否为MAR # 简单检查:缺失是否与其他观察变量相关 mechanism = 'MAR (随机缺失)' if not is_mcar else '无法确定' results[column] = { '缺失数量': df[column].isnull().sum(), '缺失比例(%)': (df[column].isnull().sum() / len(df)) * 100, '可能机制': mechanism } return pd.DataFrame(results).Tmechanism_df = analyze_missing_mechanisms(df)print(f"缺失值机制分析:\n{mechanism_df}")print("\n3. 高级缺失值填充方法:")# 选择数值列进行填充numeric_cols = df.select_dtypes(include=[np.number]).columnsdf_numeric = df[numeric_cols].copy()print(f"数值列:{list(numeric_cols)}")# 方法1:均值/中位数填充(基线方法)print("\n方法1:均值/中位数填充")df_mean = df_numeric.copy()for col in numeric_cols: if df_mean[col].isnull().sum() > 0: df_mean[col].fillna(df_mean[col].mean(), inplace=True)print(f"均值填充后的缺失值数量:{df_mean.isnull().sum().sum()}")# 方法2:KNN填充print("\n方法2:KNN填充")df_knn = df_numeric.copy()knn_imputer = KNNImputer(n_neighbors=5)df_knn_imputed = pd.DataFrame( knn_imputer.fit_transform(df_knn), columns=df_knn.columns)print(f"KNN填充后的缺失值数量:{df_knn_imputed.isnull().sum().sum()}")# 方法3:迭代填充(MICE)print("\n方法3:迭代填充(MICE)")df_mice = df_numeric.copy()mice_imputer = IterativeImputer(max_iter=10, random_state=42)df_mice_imputed = pd.DataFrame( mice_imputer.fit_transform(df_mice), columns=df_mice.columns)print(f"迭代填充后的缺失值数量:{df_mice_imputed.isnull().sum().sum()}")# 方法4:随机森林填充print("\n方法4:随机森林填充")from sklearn.ensemble import RandomForestRegressordf_rf = df_numeric.copy()# 对每个有缺失值的列,用随机森林预测for col in numeric_cols: if df_rf[col].isnull().sum() > 0: # 分割数据 train_data = df_rf[df_rf[col].notnull()] test_data = df_rf[df_rf[col].isnull()] # 准备特征和目标变量 X_train = train_data.drop(columns=[col]) y_train = train_data[col] X_test = test_data.drop(columns=[col]) # 训练随机森林 rf = RandomForestRegressor(n_estimators=100, random_state=42) rf.fit(X_train, y_train) # 预测缺失值 predictions = rf.predict(X_test) # 填充缺失值 df_rf.loc[df_rf[col].isnull(), col] = predictionsprint(f"随机森林填充后的缺失值数量:{df_rf.isnull().sum().sum()}")print("\n4. 填充方法比较:")# 计算原始数据的统计量作为基准original_stats = df_numeric.describe().loc[['mean', 'std']]# 计算各种填充方法后的统计量methods = { '均值填充': df_mean, 'KNN填充': df_knn_imputed, '迭代填充': df_mice_imputed, '随机森林填充': df_rf}comparison_results = {}for method_name, method_df in methods.items(): method_stats = method_df.describe().loc[['mean', 'std']] # 计算与原始数据的差异 mean_diff = ((method_stats.loc['mean'] - original_stats.loc['mean']).abs() / original_stats.loc['mean'].abs()).mean() * 100 std_diff = ((method_stats.loc['std'] - original_stats.loc['std']).abs() / original_stats.loc['std'].abs()).mean() * 100 comparison_results[method_name] = { '均值差异(%)': mean_diff, '标准差差异(%)': std_diff }comparison_df = pd.DataFrame(comparison_results).Tprint(f"填充方法比较(与原数据统计量差异):\n{comparison_df}")print("\n5. 分类变量缺失值处理:")# 处理分类变量的缺失值categorical_cols = df.select_dtypes(include=['object']).columnsprint(f"分类变量:{list(categorical_cols)}")print(f"分类变量缺失值统计:")print(df[categorical_cols].isnull().sum())# 方法1:众数填充df_cat_mode = df.copy()for col in categorical_cols: if df_cat_mode[col].isnull().sum() > 0: mode_value = df_cat_mode[col].mode()[0] if not df_cat_mode[col].mode().empty else 'Unknown' df_cat_mode[col].fillna(mode_value, inplace=True)print(f"\n众数填充后的缺失值数量:{df_cat_mode[categorical_cols].isnull().sum().sum()}")# 方法2:创建新类别df_cat_newcat = df.copy()for col in categorical_cols: if df_cat_newcat[col].isnull().sum() > 0: df_cat_newcat[col].fillna('缺失', inplace=True)print(f"创建新类别后的缺失值数量:{df_cat_newcat[categorical_cols].isnull().sum().sum()}")# 方法3:基于模型的填充(使用KNN)from sklearn.preprocessing import LabelEncoderdf_cat_knn = df.copy()for col in categorical_cols: if df_cat_knn[col].isnull().sum() > 0: # 编码分类变量 le = LabelEncoder() encoded_col = le.fit_transform(df_cat_knn[col].dropna()) # 找到缺失值的索引 missing_idx = df_cat_knn[col].isnull() # 使用KNN填充 knn_imputer = KNNImputer(n_neighbors=5) # 需要将分类变量与其他数值变量一起考虑 temp_df = df_cat_knn.copy() temp_df[col] = pd.Series(encoded_col, index=df_cat_knn[col].dropna().index) # 填充 imputed = knn_imputer.fit_transform(temp_df.select_dtypes(include=[np.number])) # 解码 df_cat_knn.loc[missing_idx, col] = le.inverse_transform( imputed[missing_idx, list(temp_df.columns).index(col)].astype(int) )print(f"KNN填充分类变量后的缺失值数量:{df_cat_knn[categorical_cols].isnull().sum().sum()}")print("\n6. 时间序列缺失值处理:")# 创建时间序列数据dates = pd.date_range('2023-01-01', periods=100, freq='D')ts_data = pd.DataFrame({ '日期': dates, '销售额': np.random.randn(100).cumsum() + 100, '温度': np.random.normal(25, 5, 100), '是否促销': np.random.choice([0, 1], 100, p=[0.7, 0.3])})# 添加缺失值ts_data.loc[10:20, '销售额'] = np.nants_data.loc[30:35, '温度'] = np.nants_data.loc[50:55, '是否促销'] = np.nanprint("时间序列数据(有缺失):")print(ts_data.isnull().sum())# 时间序列特定的填充方法ts_filled = ts_data.copy()# 前向填充(适合时间序列)ts_filled['销售额'] = ts_filled['销售额'].fillna(method='ffill')# 线性插值(适合连续变化的时间序列)ts_filled['温度'] = ts_filled['温度'].interpolate(method='linear')# 对于分类变量,使用众数填充ts_filled['是否促销'] = ts_filled['是否促销'].fillna(ts_filled['是否促销'].mode()[0])print(f"\n时间序列填充后的缺失值数量:{ts_filled.isnull().sum().sum()}")print("\n7. 多重填补(Multiple Imputation):")# 使用fancyimpute进行多重填补(需要安装:pip install fancyimpute)try: from fancyimpute import IterativeImputer as FancyIterativeImputer # 创建多重填补器 mi_imputer = FancyIterativeImputer(n_imputations=5, sample_posterior=True, random_state=42) # 对数值数据进行多重填补 df_mi = df_numeric.copy() df_mi_imputed = mi_imputer.fit_transform(df_mi.values) # 计算多重填补的不确定性 # 这里我们简单展示:多重填补会生成多个填补数据集 print("多重填补完成") print(f"填补后的数据形状:{df_mi_imputed.shape}")except ImportError: print("需要安装fancyimpute库:pip install fancyimpute")
缺失值处理策略选择
# missing_value_strategy.pyimport pandas as pdimport numpy as npfrom sklearn.model_selection import train_test_splitfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.metrics import accuracy_scoreimport matplotlib.pyplot as pltprint("=== 缺失值处理策略选择 ===\n")# 创建包含缺失值的分类数据集def create_classification_data_with_missing(): """创建包含缺失值的分类数据集""" np.random.seed(42) n_samples = 1000 # 生成特征 X1 = np.random.normal(0, 1, n_samples) X2 = np.random.normal(0, 1, n_samples) X3 = 0.5 * X1 + 0.5 * X2 + np.random.normal(0, 0.5, n_samples) X4 = np.random.normal(0, 1, n_samples) # 生成目标变量 y = (X1 + X2 + X3 + X4 > 0).astype(int) # 创建DataFrame df = pd.DataFrame({ 'X1': X1, 'X2': X2, 'X3': X3, 'X4': X4, 'target': y }) # 添加不同模式的缺失值 # MCAR mcar_mask = np.random.rand(n_samples) < 0.1 df.loc[mcar_mask, 'X1'] = np.nan # MAR - X2缺失与X1相关 mar_mask = (df['X1'] > 0.5) & (np.random.rand(n_samples) < 0.3) df.loc[mar_mask, 'X2'] = np.nan # MNAR - X3缺失与自身值相关 mnar_mask = (df['X3'] > 0.5) & (np.random.rand(n_samples) < 0.4) df.loc[mnar_mask, 'X3'] = np.nan return dfdf = create_classification_data_with_missing()print(f"数据集形状:{df.shape}")print(f"缺失值统计:")print(df.isnull().sum())print("\n1. 不同缺失值处理策略:")strategies = { '删除法': { '描述': '删除包含缺失值的行', '优点': '简单,不会引入偏差', '缺点': '可能丢失大量数据', '适用场景': '缺失值很少,数据量大' }, '均值/中位数/众数填充': { '描述': '用统计量填充缺失值', '优点': '简单快速', '缺点': '可能扭曲数据分布', '适用场景': '数据缺失率低,且为完全随机缺失' }, '插值法': { '描述': '使用前后值进行插值', '优点': '保持数据趋势', '缺点': '不适合随机缺失', '适用场景': '时间序列数据' }, 'KNN填充': { '描述': '使用K近邻算法填充', '优点': '考虑数据之间的相似性', '缺点': '计算量大,需要选择K值', '适用场景': '数据有相关性,缺失率中等' }, '模型填充': { '描述': '使用机器学习模型预测缺失值', '优点': '准确性高', '缺点': '计算复杂,可能过拟合', '适用场景': '数据量大,特征间有强相关性' }, '多重填补': { '描述': '生成多个填补数据集', '优点': '能评估填补的不确定性', '缺点': '计算复杂', '适用场景': '需要评估填补不确定性的研究' }}print("缺失值处理策略比较:")for strategy, info in strategies.items(): print(f"\n{strategy}:") for key, value in info.items(): print(f" {key}: {value}")print("\n2. 策略选择指南:")def recommend_strategy(missing_rate, data_type, missing_mechanism, data_size): """推荐缺失值处理策略""" recommendations = [] if missing_rate < 0.05: recommendations.append("删除法:缺失率很低,删除影响小") if missing_mechanism == 'MCAR' and missing_rate < 0.3: recommendations.append("均值/众数填充:完全随机缺失,适合统计量填充") if data_type == 'time_series': recommendations.append("插值法:时间序列数据适合插值") if missing_rate > 0.1 and missing_rate < 0.4: recommendations.append("KNN填充:中等缺失率,KNN效果较好") if data_size > 1000 and missing_rate > 0.2: recommendations.append("模型填充:数据量大,模型填充更准确") if missing_rate > 0.3: recommendations.append("考虑是否收集更多数据:缺失率过高") return recommendations# 示例推荐print("示例场景推荐:")scenarios = [ {'missing_rate': 0.02, 'data_type': 'tabular', 'missing_mechanism': 'MCAR', 'data_size': 10000}, {'missing_rate': 0.15, 'data_type': 'time_series', 'missing_mechanism': 'MAR', 'data_size': 500}, {'missing_rate': 0.25, 'data_type': 'tabular', 'missing_mechanism': 'MNAR', 'data_size': 2000}, {'missing_rate': 0.40, 'data_type': 'tabular', 'missing_mechanism': 'MCAR', 'data_size': 5000}]for i, scenario in enumerate(scenarios, 1): print(f"\n场景{i}:") print(f" 缺失率: {scenario['missing_rate']*100}%") print(f" 数据类型: {scenario['data_type']}") print(f" 缺失机制: {scenario['missing_mechanism']}") print(f" 数据量: {scenario['data_size']}") recs = recommend_strategy(**scenario) for rec in recs: print(f" 推荐: {rec}")print("\n3. 实践建议:")practical_tips = [ "始终从简单方法开始(如删除或均值填充)", "比较不同方法对分析结果的影响", "对于重要分析,尝试多种方法并比较结果", "记录使用的填补方法和参数", "考虑业务背景和领域知识", "验证填补结果的合理性"]print("实践建议:")for tip in practical_tips: print(f" • {tip}")
4. 异常值检测和处理
异常值检测方法
# outlier_detection_methods.pyimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltfrom scipy import statsfrom sklearn.ensemble import IsolationForestfrom sklearn.neighbors import LocalOutlierFactorfrom sklearn.svm import OneClassSVMfrom sklearn.preprocessing import StandardScalerprint("=== 异常值检测方法 ===\n")# 创建包含不同类型异常值的数据def create_outlier_data(): """创建包含异常值的数据""" np.random.seed(42) # 正常数据 n_normal = 950 normal_data = np.random.normal(50, 10, n_normal) # 异常值 n_outliers = 50 outliers = np.concatenate([ np.random.uniform(0, 20, n_outliers // 2), # 低异常值 np.random.uniform(80, 120, n_outliers // 2) # 高异常值 ]) # 合并数据 all_data = np.concatenate([normal_data, outliers]) np.random.shuffle(all_data) return pd.DataFrame({'value': all_data})df = create_outlier_data()print(f"数据形状:{df.shape}")print(f"数据描述:\n{df['value'].describe()}")print("\n1. 可视化异常值检测:")fig, axes = plt.subplots(2, 3, figsize=(15, 10))# 直方图axes[0, 0].hist(df['value'], bins=50, edgecolor='black')axes[0, 0].set_title('直方图')axes[0, 0].set_xlabel('值')axes[0, 0].set_ylabel('频数')# 箱线图axes[0, 1].boxplot(df['value'])axes[0, 1].set_title('箱线图')axes[0, 1].set_ylabel('值')# Q-Q图stats.probplot(df['value'], dist="norm", plot=axes[0, 2])axes[0, 2].set_title('Q-Q图')print("\n2. 统计方法检测异常值:")# 方法1:Z-score方法def detect_outliers_zscore(data, threshold=3): """使用Z-score检测异常值""" z_scores = np.abs(stats.zscore(data)) return z_scores > thresholdzscore_outliers = detect_outliers_zscore(df['value'])print(f"Z-score方法检测到异常值:{zscore_outliers.sum()}个")# 方法2:修改的Z-score方法(对异常值更鲁棒)def detect_outliers_modified_zscore(data, threshold=3.5): """使用修改的Z-score检测异常值""" median = np.median(data) mad = np.median(np.abs(data - median)) modified_z_scores = 0.6745 * (data - median) / mad if mad != 0 else 0 return np.abs(modified_z_scores) > thresholdmodified_z_outliers = detect_outliers_modified_zscore(df['value'])print(f"修改的Z-score方法检测到异常值:{modified_z_outliers.sum()}个")# 方法3:IQR方法def detect_outliers_iqr(data): """使用IQR方法检测异常值""" Q1 = np.percentile(data, 25) Q3 = np.percentile(data, 75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return (data < lower_bound) | (data > upper_bound)iqr_outliers = detect_outliers_iqr(df['value'])print(f"IQR方法检测到异常值:{iqr_outliers.sum()}个")# 在图上标注异常值axes[1, 0].hist(df['value'], bins=50, edgecolor='black')axes[1, 0].scatter(df['value'][zscore_outliers], np.zeros(zscore_outliers.sum()), color='red', label='Z-score异常值')axes[1, 0].set_title('Z-score异常值检测')axes[1, 0].set_xlabel('值')axes[1, 0].set_ylabel('频数')axes[1, 0].legend()axes[1, 1].hist(df['value'], bins=50, edgecolor='black')axes[1, 1].scatter(df['value'][modified_z_outliers], np.zeros(modified_z_outliers.sum()), color='green', label='修改Z-score异常值')axes[1, 1].set_title('修改Z-score异常值检测')axes[1, 1].set_xlabel('值')axes[1, 1].set_ylabel('频数')axes[1, 1].legend()axes[1, 2].hist(df['value'], bins=50, edgecolor='black')axes[1, 2].scatter(df['value'][iqr_outliers], np.zeros(iqr_outliers.sum()), color='blue', label='IQR异常值')axes[1, 2].set_title('IQR异常值检测')axes[1, 2].set_xlabel('值')axes[1, 2].set_ylabel('频数')axes[1, 2].legend()plt.tight_layout()plt.show()print("\n3. 机器学习方法检测异常值:")# 准备数据X = df['value'].values.reshape(-1, 1)# 标准化scaler = StandardScaler()X_scaled = scaler.fit_transform(X)# 方法1:孤立森林print("方法1:孤立森林")iso_forest = IsolationForest(contamination=0.05, random_state=42)iso_labels = iso_forest.fit_predict(X_scaled)iso_outliers = iso_labels == -1print(f"孤立森林检测到异常值:{iso_outliers.sum()}个")# 方法2:局部异常因子print("\n方法2:局部异常因子")lof = LocalOutlierFactor(contamination=0.05)lof_labels = lof.fit_predict(X_scaled)lof_outliers = lof_labels == -1print(f"局部异常因子检测到异常值:{lof_outliers.sum()}个")# 方法3:One-Class SVMprint("\n方法3:One-Class SVM")try: oc_svm = OneClassSVM(nu=0.05, kernel="rbf", gamma=0.1) svm_labels = oc_svm.fit_predict(X_scaled) svm_outliers = svm_labels == -1 print(f"One-Class SVM检测到异常值:{svm_outliers.sum()}个")except Exception as e: print(f"One-Class SVM错误:{e}")print("\n4. 多维异常值检测:")# 创建多维数据def create_multidimensional_data(): """创建多维数据""" np.random.seed(42) # 正常数据 n_normal = 900 normal_data = np.random.multivariate_normal( mean=[0, 0], cov=[[1, 0.5], [0.5, 1]], size=n_normal ) # 异常值 n_outliers = 100 outliers = np.random.uniform(-5, 5, (n_outliers, 2)) # 合并数据 all_data = np.vstack([normal_data, outliers]) return pd.DataFrame(all_data, columns=['X1', 'X2'])df_multi = create_multidimensional_data()print(f"多维数据形状:{df_multi.shape}")# 使用马氏距离检测多维异常值def detect_outliers_mahalanobis(data, threshold=3): """使用马氏距离检测异常值""" from scipy.spatial.distance import mahalanobis # 计算均值和协方差矩阵 mean = np.mean(data, axis=0) cov = np.cov(data.T) # 计算协方差矩阵的逆 try: inv_cov = np.linalg.inv(cov) except np.linalg.LinAlgError: # 如果矩阵不可逆,使用伪逆 inv_cov = np.linalg.pinv(cov) # 计算马氏距离 mahalanobis_dist = [] for i in range(len(data)): dist = mahalanobis(data[i], mean, inv_cov) mahalanobis_dist.append(dist) mahalanobis_dist = np.array(mahalanobis_dist) # 检测异常值 outliers = mahalanobis_dist > threshold return outliers, mahalanobis_distmahalanobis_outliers, distances = detect_outliers_mahalanobis(df_multi.values)print(f"马氏距离检测到异常值:{mahalanobis_outliers.sum()}个")# 可视化多维异常值fig, axes = plt.subplots(1, 2, figsize=(12, 5))# 正常数据点axes[0].scatter(df_multi['X1'][~mahalanobis_outliers], df_multi['X2'][~mahalanobis_outliers], alpha=0.5, label='正常点')# 异常值axes[0].scatter(df_multi['X1'][mahalanobis_outliers], df_multi['X2'][mahalanobis_outliers], color='red', alpha=0.7, label='异常值')axes[0].set_xlabel('X1')axes[0].set_ylabel('X2')axes[0].set_title('多维异常值检测(马氏距离)')axes[0].legend()axes[0].grid(True, alpha=0.3)# 马氏距离分布axes[1].hist(distances, bins=30, edgecolor='black')axes[1].axvline(x=3, color='red', linestyle='--', label='阈值=3')axes[1].set_xlabel('马氏距离')axes[1].set_ylabel('频数')axes[1].set_title('马氏距离分布')axes[1].legend()axes[1].grid(True, alpha=0.3)plt.tight_layout()plt.show()print("\n5. 异常值检测方法比较:")def compare_outlier_methods(data): """比较不同异常值检测方法""" results = {} # 单维数据 if data.ndim == 1 or data.shape[1] == 1: data_1d = data.flatten() if data.ndim > 1 else data # Z-score zscore_outliers = detect_outliers_zscore(data_1d) results['Z-score'] = zscore_outliers.sum() # 修改Z-score modified_z_outliers = detect_outliers_modified_zscore(data_1d) results['修改Z-score'] = modified_z_outliers.sum() # IQR iqr_outliers = detect_outliers_iqr(data_1d) results['IQR'] = iqr_outliers.sum() # 孤立森林 iso_forest = IsolationForest(contamination=0.05, random_state=42) iso_labels = iso_forest.fit_predict(data_1d.reshape(-1, 1)) results['孤立森林'] = (iso_labels == -1).sum() # 局部异常因子 lof = LocalOutlierFactor(contamination=0.05) lof_labels = lof.fit_predict(data_1d.reshape(-1, 1)) results['局部异常因子'] = (lof_labels == -1).sum() # 多维数据 else: # 马氏距离 mahalanobis_outliers, _ = detect_outliers_mahalanobis(data) results['马氏距离'] = mahalanobis_outliers.sum() # 孤立森林 iso_forest = IsolationForest(contamination=0.1, random_state=42) iso_labels = iso_forest.fit_predict(data) results['孤立森林'] = (iso_labels == -1).sum() # 局部异常因子 lof = LocalOutlierFactor(contamination=0.1, novelty=False) lof_labels = lof.fit_predict(data) results['局部异常因子'] = (lof_labels == -1).sum() return resultsprint("单维数据异常值检测方法比较:")single_dim_results = compare_outlier_methods(df['value'].values.reshape(-1, 1))for method, count in single_dim_results.items(): print(f" {method}: {count}个异常值")print("\n多维数据异常值检测方法比较:")multi_dim_results = compare_outlier_methods(df_multi.values)for method, count in multi_dim_results.items(): print(f" {method}: {count}个异常值")print("\n6. 异常值处理策略:")outlier_strategies = { '删除': { '描述': '直接删除异常值', '适用场景': '异常值数量少,对分析影响大', '注意事项': '可能丢失重要信息' }, '替换': { '描述': '用统计量(均值、中位数等)替换异常值', '适用场景': '异常值数量适中,需要保留数据点', '注意事项': '可能扭曲数据分布' }, '转换': { '描述': '对数据进行转换(如对数转换)', '适用场景': '数据偏斜严重', '注意事项': '可能改变数据解释' }, '缩尾': { '描述': '将异常值限制在指定分位数', '适用场景': '需要保留极端值但限制其影响', '注意事项': '仍会改变数据分布' }, '分箱': { '描述': '将数据分箱,异常值归入极端箱', '适用场景': '需要将连续变量转换为分类变量', '注意事项': '可能丢失信息' }, '保留': { '描述': '保留异常值,但在分析中特别处理', '适用场景': '异常值可能包含重要信息', '注意事项': '需要特别的分析方法' }}print("异常值处理策略:")for strategy, info in outlier_strategies.items(): print(f"\n{strategy}:") for key, value in info.items(): print(f" {key}: {value}")
5. 数据格式标准化
文本数据清洗
# text_data_cleaning.pyimport pandas as pdimport numpy as npimport reimport unicodedatafrom typing import List, Dict, Anyimport stringprint("=== 文本数据清洗 ===\n")# 创建包含各种文本问题的数据def create_dirty_text_data(): """创建包含各种文本问题的数据""" data = { '姓名': [ '张三', '李四', '王五', '赵六', '钱七', 'zhang san', 'LI SI', 'Wang Wu', '赵 六', '钱 七' ], '地址': [ '北京市朝阳区建国路100号', '上海市浦东新区陆家嘴环路500号', '广州市天河区天河路200号', '深圳市南山区科技园南路300号', '成都市武侯区人民南路400号', '北京 朝阳 建国路 100号', '上海,浦东,陆家嘴环路,500号', '广州-天河-天河路-200号', '深圳 南山区 科技园南路 300号', '成都 武侯区 人民南路 400号' ], '邮箱': [ 'zhangsan@example.com', 'lisi@gmail.com', 'wangwu@hotmail.com', 'zhaoliu@example.com', 'qianqi@example.com', 'ZHANGSAN@EXAMPLE.COM', 'LISI@gmail.COM', 'WANGWU@hotmail.com', 'ZHAOLIU@example.com', 'QIANQI@example.com' ], '电话号码': [ '13800138000', '13912345678', '18888888888', '17777777777', '16666666666', '+8613800138000', '008613912345678', '188-8888-8888', '177 7777 7777', '166-6666-6666' ], '日期': [ '2023-01-15', '2023/02/20', '2023.03.25', '2023年04月30日', '2023-5-15', '23-01-15', '2023/2/20', '2023.3.25', '2023年4月30日', '15/01/2023' ], '产品描述': [ '苹果手机iPhone 14 Pro Max 256GB 深空灰色', '华为Mate 50 Pro 512GB 曜金黑', '小米13 Ultra 1TB 陶瓷黑', '三星Galaxy S23 Ultra 256GB 悠雾紫', 'OPPO Find X6 Pro 512GB 云墨黑', 'apple iphone 14 pro max 256gb 深空灰色', 'HUAWEI MATE 50 PRO 512GB 曜金黑', 'XIAOMI 13 ULTRA 1TB 陶瓷黑', 'SAMSUNG GALAXY S23 ULTRA 256GB 悠雾紫', 'oppo find x6 pro 512gb 云墨黑' ], '价格': [ '¥8,999.00', '¥7,999.00', '¥6,999.00', '¥9,999.00', '¥5,999.00', '8999元', '7999元', '6999元', '9999元', '5999元' ] } return pd.DataFrame(data)df = create_dirty_text_data()print("原始文本数据:")print(df)print(f"\n数据形状:{df.shape}")print("\n1. 文本清洗工具类:")class TextCleaner: """文本清洗工具类""" @staticmethod def remove_whitespace(text: str) -> str: """移除多余空白字符""" if not isinstance(text, str): return text # 替换多个空白字符为单个空格 text = re.sub(r'\s+', ' ', text) return text.strip() @staticmethod def normalize_case(text: str, case: str = 'title') -> str: """标准化大小写""" if not isinstance(text, str): return text if case == 'lower': return text.lower() elif case == 'upper': return text.upper() elif case == 'title': return text.title() elif case == 'proper': # 首字母大写,其余小写 return text.capitalize() else: return text @staticmethod def remove_special_chars(text: str, keep_chars: str = '') -> str: """移除特殊字符""" if not isinstance(text, str): return text # 保留字母、数字、中文、空格和指定的字符 pattern = f'[^a-zA-Z0-9\u4e00-\u9fa5\s{re.escape(keep_chars)}]' return re.sub(pattern, '', text) @staticmethod def normalize_unicode(text: str) -> str: """标准化Unicode字符""" if not isinstance(text, str): return text # 标准化Unicode字符 text = unicodedata.normalize('NFKC', text) return text @staticmethod def remove_accents(text: str) -> str: """移除重音符号""" if not isinstance(text, str): return text # 将带重音的字符转换为普通字符 text = unicodedata.normalize('NFD', text) text = ''.join(c for c in text if unicodedata.category(c) != 'Mn') return text @staticmethod def extract_numbers(text: str) -> str: """提取数字""" if not isinstance(text, str): return text numbers = re.findall(r'\d+', text) return ''.join(numbers) if numbers else '' @staticmethod def extract_letters(text: str) -> str: """提取字母""" if not isinstance(text, str): return text letters = re.findall(r'[a-zA-Z]+', text) return ''.join(letters) if letters else '' @staticmethod def extract_chinese(text: str) -> str: """提取中文字符""" if not isinstance(text, str): return text chinese_chars = re.findall(r'[\u4e00-\u9fa5]+', text) return ''.join(chinese_chars) if chinese_chars else '' @staticmethod def standardize_phone(phone: str) -> str: """标准化电话号码""" if not isinstance(phone, str): return phone # 移除所有非数字字符 digits = re.sub(r'\D', '', phone) # 处理中国手机号 if digits.startswith('86'): digits = digits[2:] elif digits.startswith('0086'): digits = digits[4:] # 确保是11位手机号 if len(digits) == 11 and digits.startswith('1'): return digits else: return phone # 返回原始值或处理错误 @staticmethod def standardize_email(email: str) -> str: """标准化邮箱""" if not isinstance(email, str): return email # 转换为小写 email = email.lower() # 验证邮箱格式 pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if re.match(pattern, email): return email else: return '' # 返回空字符串表示无效邮箱 @staticmethod def parse_date(date_str: str) -> pd.Timestamp: """解析日期字符串""" if not isinstance(date_str, str): return date_str # 常见日期格式 patterns = [ r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})日?', r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})' ] for pattern in patterns: match = re.match(pattern, date_str) if match: groups = match.groups() if len(groups) == 3: # 尝试解析日期 try: if len(groups[0]) == 4: # YYYY-MM-DD格式 year, month, day = groups else: # DD-MM-YYYY格式 day, month, year = groups return pd.Timestamp(f'{year}-{month.zfill(2)}-{day.zfill(2)}') except: pass # 如果无法解析,返回原始值 return date_str @staticmethod def extract_price(price_str: str) -> float: """提取价格数值""" if not isinstance(price_str, str): return price_str # 提取数字和小数点 matches = re.findall(r'[\d,.]+', price_str) if matches: # 移除千位分隔符 number_str = matches[0].replace(',', '') try: return float(number_str) except: pass return 0.0# 使用TextCleaner清洗数据print("\n2. 应用文本清洗:")# 复制原始数据df_clean = df.copy()# 清洗姓名列df_clean['姓名_清洗'] = df_clean['姓名'].apply( lambda x: TextCleaner.normalize_case( TextCleaner.remove_whitespace(str(x)), 'title' ))# 清洗地址列df_clean['地址_清洗'] = df_clean['地址'].apply( lambda x: TextCleaner.remove_special_chars( TextCleaner.remove_whitespace(str(x)), ',-号' ))# 清洗邮箱列df_clean['邮箱_清洗'] = df_clean['邮箱'].apply(TextCleaner.standardize_email)# 清洗电话号码列df_clean['电话号码_清洗'] = df_clean['电话号码'].apply(TextCleaner.standardize_phone)# 清洗日期列df_clean['日期_清洗'] = df_clean['日期'].apply(TextCleaner.parse_date)# 清洗产品描述列df_clean['产品描述_清洗'] = df_clean['产品描述'].apply( lambda x: TextCleaner.normalize_case(str(x), 'title'))# 清洗价格列df_clean['价格_清洗'] = df_clean['价格'].apply(TextCleaner.extract_price)print("清洗后的数据(前5行):")print(df_clean.head())print("\n3. 文本相似度检测:")from difflib import SequenceMatcherdef calculate_similarity(str1: str, str2: str) -> float: """计算两个字符串的相似度""" return SequenceMatcher(None, str1, str2).ratio()# 检测相似的姓名print("检测相似的姓名:")names = df_clean['姓名_清洗'].tolist()similar_pairs = []for i in range(len(names)): for j in range(i + 1, len(names)): similarity = calculate_similarity(names[i], names[j]) if similarity > 0.7 and similarity < 1.0: similar_pairs.append((names[i], names[j], similarity))for name1, name2, similarity in similar_pairs[:5]: # 显示前5对 print(f" '{name1}' 和 '{name2}' 相似度: {similarity:.2f}")print("\n4. 文本数据验证:")def validate_text_data(df, column_rules): """验证文本数据""" validation_results = {} for column, rules in column_rules.items(): if column in df.columns: violations = {} for rule_name, rule_func in rules.items(): # 应用验证规则 mask = df[column].apply(rule_func) violations[rule_name] = mask.sum() validation_results[column] = violations return validation_results# 定义验证规则column_rules = { '姓名_清洗': { '是否为空': lambda x: pd.isna(x) or str(x).strip() == '', '是否包含数字': lambda x: bool(re.search(r'\d', str(x))) if pd.notna(x) else False, '长度是否合理': lambda x: len(str(x)) < 2 or len(str(x)) > 20 if pd.notna(x) else False }, '邮箱_清洗': { '是否为空': lambda x: pd.isna(x) or str(x).strip() == '', '格式是否有效': lambda x: bool(re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', str(x))) if pd.notna(x) else False }, '电话号码_清洗': { '是否为空': lambda x: pd.isna(x) or str(x).strip() == '', '长度是否正确': lambda x: len(str(x)) != 11 if pd.notna(x) and str(x).strip() != '' else False, '是否以1开头': lambda x: not str(x).startswith('1') if pd.notna(x) and str(x).strip() != '' else False }}validation_results = validate_text_data(df_clean, column_rules)print("文本数据验证结果:")for column, results in validation_results.items(): print(f"\n{column}:") for rule, count in results.items(): print(f" {rule}: {count}个违规")print("\n5. 文本数据标准化流水线:")from sklearn.pipeline import Pipelinefrom sklearn.base import BaseEstimator, TransformerMixinclass TextStandardizer(BaseEstimator, TransformerMixin): """文本标准化转换器""" def __init__(self, columns=None, operations=None): self.columns = columns self.operations = operations or [ ('remove_whitespace', TextCleaner.remove_whitespace), ('normalize_case', lambda x: TextCleaner.normalize_case(x, 'title')), ('remove_special_chars', lambda x: TextCleaner.remove_special_chars(x, ',-号')) ] def fit(self, X, y=None): return self def transform(self, X): X_transformed = X.copy() if self.columns is None: self.columns = X.select_dtypes(include=['object']).columns for column in self.columns: if column in X_transformed.columns: for op_name, op_func in self.operations: X_transformed[column] = X_transformed[column].apply(op_func) return X_transformed# 创建和运行文本标准化流水线text_columns = ['姓名', '地址', '产品描述']pipeline = Pipeline([ ('text_standardizer', TextStandardizer(columns=text_columns))])df_pipelined = pipeline.fit_transform(df)print("流水线处理后的数据(前5行):")print(df_pipelined[text_columns].head())print("\n6. 高级文本清洗技术:")# 创建包含更复杂文本问题的数据complex_texts = [ "Hello!!! How are you??? I'm fine, thanks.", "Multiple spaces between words", "UPPERCASE and lowercase MIXED", "Special characters: @#$%^&*()", "Unicode characters: café résumé naïve", "HTML tags: <p>Hello</p> <br> World", "URLs: https://example.com and http://test.org", "Email addresses: user@example.com, test@test.org", "Phone numbers: (123) 456-7890, +1-800-555-1234", "Dates: 2023/12/31, 31-12-2023, Dec 31, 2023"]print("原始文本示例:")for text in complex_texts[:3]: print(f" {text}")print("\n清洗后的文本示例:")# 高级清洗函数def advanced_text_clean(text): """高级文本清洗""" if not isinstance(text, str): return text # 1. 移除HTML标签 text = re.sub(r'<[^>]+>', '', text) # 2. 移除URL text = re.sub(r'https?://\S+|www\.\S+', '', text) # 3. 移除邮箱 text = re.sub(r'\S+@\S+', '', text) # 4. 移除电话 text = re.sub(r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}', '', text) # 5. 移除特殊字符(保留字母、数字、空格和基本标点) text = re.sub(r'[^\w\s.,!?-]', '', text) # 6. 标准化空白字符 text = re.sub(r'\s+', ' ', text) # 7. 移除多余标点 text = re.sub(r'([.,!?-])\1+', r'\1', text) # 8. 转换为小写 text = text.lower() return text.strip()for text in complex_texts[:3]: cleaned = advanced_text_clean(text) print(f" {cleaned}")print("\n7. 文本数据质量指标:")def calculate_text_quality_metrics(texts): """计算文本数据质量指标""" metrics = { '平均长度': 0, '空值比例': 0, '特殊字符比例': 0, '大写比例': 0, '数字比例': 0, '质量得分': 0 } if not texts: return metrics total_length = 0 empty_count = 0 special_char_count = 0 upper_case_count = 0 digit_count = 0 total_chars = 0 for text in texts: if not isinstance(text, str): empty_count += 1 continue if text.strip() == '': empty_count += 1 continue total_length += len(text) total_chars += len(text) # 统计特殊字符 special_char_count += len(re.findall(r'[^\w\s]', text)) # 统计大写字母 upper_case_count += len(re.findall(r'[A-Z]', text)) # 统计数字 digit_count += len(re.findall(r'\d', text)) n_texts = len(texts) if n_texts > 0: metrics['平均长度'] = total_length / (n_texts - empty_count) if (n_texts - empty_count) > 0 else 0 metrics['空值比例'] = (empty_count / n_texts) * 100 if total_chars > 0: metrics['特殊字符比例'] = (special_char_count / total_chars) * 100 metrics['大写比例'] = (upper_case_count / total_chars) * 100 metrics['数字比例'] = (digit_count / total_chars) * 100 # 计算质量得分(简单示例) metrics['质量得分'] = 100 - metrics['空值比例'] - metrics['特殊字符比例'] * 0.5 return metrics# 计算示例文本的质量指标sample_texts = df['产品描述'].tolist() + df['地址'].tolist()quality_metrics = calculate_text_quality_metrics(sample_texts)print("文本数据质量指标:")for metric, value in quality_metrics.items(): print(f" {metric}: {value:.2f}")
日期和时间数据清洗
# datetime_cleaning.pyimport pandas as pdimport numpy as npfrom datetime import datetime, timedeltaimport dateutil.parserfrom dateutil.relativedelta import relativedeltaimport pytzimport reprint("=== 日期和时间数据清洗 ===\n")# 创建包含各种日期格式问题的数据def create_dirty_date_data(): """创建包含各种日期格式问题的数据""" data = { '订单ID': range(1, 21), '日期字符串': [ '2023-01-15', '2023/02/20', '2023.03.25', '2023年04月30日', '2023-5-15', '23-01-15', '2023/2/20', '2023.3.25', '2023年4月30日', '15/01/2023', 'Jan 15, 2023', 'February 20, 2023', 'March 25th, 2023', 'Apr 30, 2023', 'May 15, 2023', '20230115', '20230220', '20230325', '20230430', '20230515' ], '时间字符串': [ '14:30:00', '14:30', '2:30 PM', '02:30:00 PM', '14.30.00', '14-30-00', '143000', '14时30分00秒', '下午2点30分', '14:30:00.123', '14:30:00.123456', '14:30:00Z', '14:30:00+08:00', '14:30:00-05:00', 'T14:30:00', '14:30:00.000Z', '14:30:00+0000', '14:30:00 GMT', '14:30:00 UTC', '14:30:00 EST' ], '日期时间字符串': [ '2023-01-15 14:30:00', '2023/02/20 14:30', '2023.03.25 2:30 PM', '2023年04月30日 14时30分', '2023-5-15 14:30:00.123', '23-01-15 14:30', '2023/2/20 02:30:00 PM', '2023.3.25 14:30:00Z', '2023年4月30日 下午2点30分', '15/01/2023 14:30:00', 'Jan 15, 2023 14:30:00', 'February 20, 2023 2:30 PM', 'March 25th, 2023 14:30:00.123', 'Apr 30, 2023 14:30:00+08:00', 'May 15, 2023 14:30:00-05:00', '20230115143000', '202302201430', '20230325143000.123', '20230430143000Z', '20230515143000+0800' ] } return pd.DataFrame(data)df = create_dirty_date_data()print("原始日期时间数据:")print(df.head(10))print(f"\n数据形状:{df.shape}")print("\n1. 日期时间解析工具类:")class DateTimeCleaner: """日期时间清洗工具类""" @staticmethod def parse_date(date_str, default=None): """解析日期字符串""" if pd.isna(date_str) or not isinstance(date_str, str): return default try: # 尝试使用dateutil解析(最灵活) return dateutil.parser.parse(date_str) except: pass try: # 尝试常见格式 formats = [ '%Y-%m-%d', '%Y/%m/%d', '%Y.%m.%d', '%d/%m/%Y', '%d-%m-%Y', '%d.%m.%Y', '%Y%m%d', '%Y年%m月%d日', '%b %d, %Y', '%B %d, %Y', '%d %b %Y', '%d %B %Y' ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except: continue except: pass return default @staticmethod def parse_time(time_str, default=None): """解析时间字符串""" if pd.isna(time_str) or not isinstance(time_str, str): return default try: # 尝试使用dateutil解析 return dateutil.parser.parse(time_str).time() except: pass try: # 尝试常见格式 formats = [ '%H:%M:%S', '%H:%M:%S.%f', '%H:%M', '%H.%M.%S', '%H-%M-%S', '%I:%M:%S %p', '%I:%M %p', '%H时%M分%S秒' ] for fmt in formats: try: return datetime.strptime(time_str, fmt).time() except: continue except: pass # 处理纯数字格式(如143000) if re.match(r'^\d{6}$', time_str): try: return datetime.strptime(time_str, '%H%M%S').time() except: pass return default @staticmethod def parse_datetime(dt_str, default=None): """解析日期时间字符串""" if pd.isna(dt_str) or not isinstance(dt_str, str): return default try: # 尝试使用dateutil解析(处理时区) return dateutil.parser.parse(dt_str) except: pass try: # 尝试常见格式 formats = [ '%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S', '%Y.%m.%d %H:%M:%S', '%Y-%m-%d %H:%M', '%Y/%m/%d %H:%M', '%Y.%m.%d %H:%M', '%d/%m/%Y %H:%M:%S', '%d-%m-%Y %H:%M:%S', '%Y%m%d%H%M%S', '%Y%m%d%H%M', '%Y年%m月%d日 %H时%M分%S秒', '%b %d, %Y %H:%M:%S', '%B %d, %Y %H:%M:%S', '%b %d, %Y %I:%M %p', '%B %d, %Y %I:%M %p' ] for fmt in formats: try: return datetime.strptime(dt_str, fmt) except: continue except: pass return default @staticmethod def extract_date_components(dt): """提取日期组件""" if pd.isna(dt): return {} if isinstance(dt, (datetime, pd.Timestamp)): return { 'year': dt.year, 'month': dt.month, 'day': dt.day, 'quarter': (dt.month - 1) // 3 + 1, 'dayofweek': dt.weekday(), # 0=Monday, 6=Sunday 'dayofyear': dt.timetuple().tm_yday, 'weekofyear': dt.isocalendar()[1], 'is_leap_year': (dt.year % 4 == 0 and dt.year % 100 != 0) or (dt.year % 400 == 0) } elif isinstance(dt, pd.Timestamp): return { 'year': dt.year, 'month': dt.month, 'day': dt.day, 'quarter': dt.quarter, 'dayofweek': dt.dayofweek, 'dayofyear': dt.dayofyear, 'weekofyear': dt.week, 'is_leap_year': dt.is_leap_year } return {} @staticmethod def extract_time_components(t): """提取时间组件""" if pd.isna(t): return {} if isinstance(t, (datetime, pd.Timestamp)): return { 'hour': t.hour, 'minute': t.minute, 'second': t.second, 'microsecond': t.microsecond, 'is_morning': t.hour < 12, 'is_afternoon': 12 <= t.hour < 18, 'is_evening': t.hour >= 18, 'time_of_day': 'morning' if t.hour < 12 else 'afternoon' if t.hour < 18 else 'evening' } elif hasattr(t, 'hour'): return { 'hour': t.hour, 'minute': t.minute, 'second': t.second, 'microsecond': t.microsecond, 'is_morning': t.hour < 12, 'is_afternoon': 12 <= t.hour < 18, 'is_evening': t.hour >= 18, 'time_of_day': 'morning' if t.hour < 12 else 'afternoon' if t.hour < 18 else 'evening' } return {} @staticmethod def handle_timezone(dt, target_tz='UTC'): """处理时区""" if pd.isna(dt): return dt if not isinstance(dt, (datetime, pd.Timestamp)): return dt # 如果datetime是naive(无时区),假设是本地时间 if dt.tzinfo is None: # 假设是本地时间,转换为UTC local_tz = pytz.timezone('Asia/Shanghai') # 示例:上海时区 dt = local_tz.localize(dt) # 转换到目标时区 target_timezone = pytz.timezone(target_tz) return dt.astimezone(target_timezone) @staticmethod def validate_date_range(dt, start_date=None, end_date=None): """验证日期范围""" if pd.isna(dt): return False if start_date and dt < start_date: return False if end_date and dt > end_date: return False return True @staticmethod def fix_invalid_dates(date_str, strategy='closest_valid'): """修复无效日期(如2月30日)""" if pd.isna(date_str) or not isinstance(date_str, str): return date_str try: # 先尝试正常解析 dt = DateTimeCleaner.parse_date(date_str) if dt: return dt except: pass # 提取年月日 patterns = [ r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})', r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})' ] for pattern in patterns: match = re.search(pattern, date_str) if match: groups = match.groups() if len(groups[0]) == 4: # YYYY-MM-DD格式 year, month, day = groups else: # DD-MM-YYYY格式 day, month, year = groups year = int(year) month = int(month) day = int(day) # 修复无效日期 if strategy == 'closest_valid': # 找到最接近的有效日期 try: return datetime(year, month, 1) + relativedelta(day=day) except ValueError as e: # 如果day无效,使用该月的最后一天 last_day = (datetime(year, month, 1) + relativedelta(months=1, days=-1)).day return datetime(year, month, min(day, last_day)) elif strategy == 'last_day': # 使用该月的最后一天 last_day = (datetime(year, month, 1) + relativedelta(months=1, days=-1)).day return datetime(year, month, min(day, last_day)) return date_str # 无法修复,返回原始值# 应用日期时间清洗print("\n2. 清洗日期时间数据:")df_clean = df.copy()# 解析日期print("解析日期字符串...")df_clean['日期_解析'] = df_clean['日期字符串'].apply(DateTimeCleaner.parse_date)# 解析时间print("解析时间字符串...")df_clean['时间_解析'] = df_clean['时间字符串'].apply(DateTimeCleaner.parse_time)# 解析日期时间print("解析日期时间字符串...")df_clean['日期时间_解析'] = df_clean['日期时间字符串'].apply(DateTimeCleaner.parse_datetime)# 提取日期组件print("提取日期组件...")date_components = df_clean['日期_解析'].apply(DateTimeCleaner.extract_date_components)for component in ['year', 'month', 'day', 'dayofweek']: df_clean[f'日期_{component}'] = date_components.apply(lambda x: x.get(component))# 提取时间组件print("提取时间组件...")time_components = df_clean['时间_解析'].apply(DateTimeCleaner.extract_time_components)for component in ['hour', 'minute', 'second', 'time_of_day']: df_clean[f'时间_{component}'] = time_components.apply(lambda x: x.get(component))print("清洗后的数据(前5行):")print(df_clean[['订单ID', '日期_解析', '时间_解析', '日期时间_解析']].head())print("\n3. 日期时间验证:")def validate_datetime_data(df): """验证日期时间数据""" validation_results = {} # 检查日期范围 start_date = datetime(2023, 1, 1) end_date = datetime(2023, 12, 31) if '日期_解析' in df.columns: date_validation = df['日期_解析'].apply( lambda x: DateTimeCleaner.validate_date_range(x, start_date, end_date) ) validation_results['日期范围有效'] = date_validation.sum() validation_results['日期范围无效'] = (~date_validation).sum() # 检查时间有效性 if '时间_解析' in df.columns: time_not_null = df['时间_解析'].notnull().sum() validation_results['时间有效'] = time_not_null validation_results['时间无效'] = len(df) - time_not_null # 检查未来日期 if '日期_解析' in df.columns: future_dates = df['日期_解析'].apply( lambda x: x > datetime.now() if pd.notnull(x) else False ) validation_results['未来日期'] = future_dates.sum() return validation_resultsvalidation_results = validate_datetime_data(df_clean)print("日期时间验证结果:")for check, count in validation_results.items(): print(f" {check}: {count}个")print("\n4. 处理时区:")# 创建带时区的数据tz_data = pd.DataFrame({ '时间戳': [ '2023-01-15 14:30:00', '2023-01-15 14:30:00+08:00', '2023-01-15 14:30:00-05:00', '2023-01-15 14:30:00Z', '2023-01-15T14:30:00+00:00' ], '时区': [ '无时区', '东八区', '西五区', 'UTC', 'ISO格式' ]})print("时区处理示例:")tz_data['解析'] = tz_data['时间戳'].apply(DateTimeCleaner.parse_datetime)tz_data['UTC时间'] = tz_data['解析'].apply( lambda x: DateTimeCleaner.handle_timezone(x, 'UTC') if pd.notnull(x) else x)tz_data['本地时间'] = tz_data['解析'].apply( lambda x: DateTimeCleaner.handle_timezone(x, 'Asia/Shanghai') if pd.notnull(x) else x)print(tz_data[['时间戳', '时区', '解析', 'UTC时间', '本地时间']])print("\n5. 修复无效日期:")# 创建包含无效日期的数据invalid_dates = pd.DataFrame({ '无效日期': [ '2023-02-30', # 2月没有30日 '2023-04-31', # 4月只有30天 '2023-13-01', # 无效月份 '2023-00-15', # 无效月份 '2023-05-32', # 无效日期 '2023-06-31', # 6月只有30天 '2023-09-31', # 9月只有30天 '2023-11-31', # 11月只有30天 '2020-02-30', # 闰年2月也没有30日 '2023-12-32' # 12月只有31天 ]})print("无效日期示例:")print(invalid_dates)print("\n修复无效日期(使用最接近的有效日期):")invalid_dates['修复后_最近'] = invalid_dates['无效日期'].apply( lambda x: DateTimeCleaner.fix_invalid_dates(x, 'closest_valid'))print("\n修复无效日期(使用当月最后一天):")invalid_dates['修复后_最后一天'] = invalid_dates['无效日期'].apply( lambda x: DateTimeCleaner.fix_invalid_dates(x, 'last_day'))print(invalid_dates)print("\n6. 日期时间计算:")# 创建包含日期计算的数据calc_data = pd.DataFrame({ '开始日期': pd.date_range('2023-01-01', periods=10, freq='D'), '结束日期': pd.date_range('2023-01-05', periods=10, freq='D')})print("日期计算示例:")calc_data['天数差'] = (calc_data['结束日期'] - calc_data['开始日期']).dt.dayscalc_data['工作日差'] = calc_data.apply( lambda row: np.busday_count(row['开始日期'].date(), row['结束日期'].date()), axis=1)calc_data['月数差'] = calc_data.apply( lambda row: (row['结束日期'].year - row['开始日期'].year) * 12 + (row['结束日期'].month - row['开始日期'].month), axis=1)calc_data['添加天数'] = calc_data['开始日期'] + pd.Timedelta(days=7)calc_data['添加月数'] = calc_data['开始日期'] + pd.DateOffset(months=1)print(calc_data)print("\n7. 处理时间序列缺失值:")# 创建时间序列数据dates = pd.date_range('2023-01-01', periods=20, freq='D')ts_data = pd.DataFrame({ '日期': dates, '值': np.random.randn(20).cumsum() + 100})# 随机删除一些数据点drop_indices = np.random.choice(range(20), size=5, replace=False)ts_data.loc[drop_indices, '值'] = np.nanprint("时间序列数据(有缺失):")print(ts_data)print("\n时间序列填充方法:")# 方法1:前向填充ts_filled_ffill = ts_data.copy()ts_filled_ffill['值_前向填充'] = ts_filled_ffill['值'].fillna(method='ffill')# 方法2:后向填充ts_filled_bfill = ts_data.copy()ts_filled_bfill['值_后向填充'] = ts_filled_bfill['值'].fillna(method='bfill')# 方法3:线性插值ts_filled_interp = ts_data.copy()ts_filled_interp['值_线性插值'] = ts_filled_interp['值'].interpolate(method='linear')# 方法4:时间插值ts_filled_time = ts_data.copy()ts_filled_time['值_时间插值'] = ts_filled_time['值'].interpolate(method='time')print("不同填充方法结果:")results = pd.DataFrame({ '原始值': ts_data['值'], '前向填充': ts_filled_ffill['值_前向填充'], '后向填充': ts_filled_bfill['值_后向填充'], '线性插值': ts_filled_interp['值_线性插值'], '时间插值': ts_filled_time['值_时间插值']})print(results)
总结
今天深入学习了数据清洗技术,掌握了以下核心内容:
关键收获
数据清洗概述:理解了数据清洗的重要性、原则和流程
数据质量评估:学习了系统评估数据质量的方法和工具
缺失值处理高级技术:掌握了复杂缺失模式分析和多种填充方法
异常值检测和处理:学习了各种异常值检测方法和处理策略
数据格式标准化:掌握了文本和日期时间数据的清洗技术
核心技术点
数据质量评估框架:完整性、一致性、准确性、时效性、唯一性、有效性
缺失值处理:MCAR/MAR/MNAR分析,多种填充方法比较
异常值检测:统计方法(Z-score、IQR)和机器学习方法(孤立森林、LOF)
文本清洗:正则表达式、Unicode处理、相似度检测
日期时间清洗:多格式解析、时区处理、无效日期修复
最佳实践
实践建议
扩展学习
常见数据清洗挑战和解决方案
# data_cleaning_challenges.pyimport pandas as pdimport numpy as npprint("=== 数据清洗常见挑战和解决方案 ===\n")challenges_solutions = { "大量缺失值": { "挑战": "缺失值超过30%,删除会丢失大量数据", "解决方案": [ "使用多重填补方法", "考虑是否收集更多数据", "使用领域知识进行填补", "将缺失作为一个特征" ] }, "复杂异常值": { "挑战": "异常值模式复杂,难以检测", "解决方案": [ "结合多种检测方法", "使用领域知识定义异常", "考虑异常值是否包含重要信息", "使用鲁棒统计方法" ] }, "不一致的格式": { "挑战": "同一数据有多种格式", "解决方案": [ "创建标准化规则", "使用正则表达式提取", "建立数据字典", "使用模糊匹配" ] }, "大数据清洗": { "挑战": "数据量太大,无法在内存中处理", "解决方案": [ "使用分块处理", "使用Dask或Spark", "使用数据库", "抽样清洗" ] }, "实时数据清洗": { "挑战": "需要实时清洗流数据", "解决方案": [ "使用流处理框架", "定义实时清洗规则", "使用滑动窗口", "定期更新模型" ] }}print("常见挑战和解决方案:")for challenge, info in challenges_solutions.items(): print(f"\n{challenge}:") print(f" 挑战: {info['挑战']}") print(f" 解决方案:") for solution in info['解决方案']: print(f" • {solution}")print("\n数据清洗检查清单:")checklist = [ "□ 了解数据来源和采集过程", "□ 评估数据质量状况", "□ 处理缺失值", "□ 检测和处理异常值", "□ 标准化数据格式", "□ 处理重复值", "□ 验证数据一致性", "□ 检查数据完整性", "□ 验证清洗结果", "□ 记录清洗过程"]for item in checklist: print(f" {item}")
明天我们将学习数据可视化,这是数据分析和沟通的重要工具,帮助我们从数据中发现模式、趋势和洞见。