01 数据准备:真实业务场景下的“脏数据”清洗
在实际业务中,你拿到的数据永远不会像Kaggle竞赛那样干净整洁。让我们从最真实的场景开始。
实战数据集:消费金融申请数据
我们使用一个模拟但高度接近真实业务的数据集,包含以下特征:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
import warnings
warnings.filterwarnings('ignore')
# 生成模拟数据(实际业务中从数据库读取)
defgenerate_mock_data(n_samples=100000, fraud_ratio=0.03):
"""生成模拟的消费金融申请数据"""
np.random.seed(42)
n_fraud = int(n_samples * fraud_ratio)
n_normal = n_samples - n_fraud
# 正常用户数据
normal_data = {
'age': np.random.normal(35, 10, n_normal).clip(18, 70),
'income': np.random.lognormal(10.5, 0.4, n_normal),
'credit_score': np.random.normal(650, 100, n_normal).clip(300, 850),
'employment_years': np.random.exponential(5, n_normal).clip(0, 40),
'debt_to_income': np.random.beta(2, 5, n_normal) * 0.8,
'loan_amount': np.random.uniform(5000, 50000, n_normal),
'loan_term': np.random.choice([12, 24, 36, 48, 60], n_normal),
'previous_defaults': np.random.poisson(0.1, n_normal),
'inquiries_6m': np.random.poisson(0.5, n_normal),
'device_age_days': np.random.exponential(200, n_normal),
'apply_hour': np.random.choice(range(24), n_normal, p=[0.02]*6 + [0.04]*12 + [0.02]*6),
'is_weekend': np.random.binomial(1, 0.2, n_normal),
'ip_risk_score': np.random.beta(1, 9, n_normal),
'device_risk_score': np.random.beta(1, 9, n_normal),
'behavior_risk_score': np.random.beta(1, 9, n_normal)
}
# 欺诈用户数据(有明显不同的分布)
fraud_data = {
'age': np.random.normal(28, 8, n_fraud).clip(18, 50),
'income': np.random.lognormal(10.2, 0.6, n_fraud),
'credit_score': np.random.normal(580, 120, n_fraud).clip(300, 750),
'employment_years': np.random.exponential(2, n_fraud).clip(0, 10),
'debt_to_income': np.random.beta(5, 2, n_fraud) * 1.2,
'loan_amount': np.random.uniform(8000, 60000, n_fraud),
'loan_term': np.random.choice([12, 24, 36], n_fraud, p=[0.6, 0.3, 0.1]),
'previous_defaults': np.random.poisson(0.8, n_fraud),
'inquiries_6m': np.random.poisson(2.5, n_fraud),
'device_age_days': np.random.exponential(50, n_fraud),
'apply_hour': np.random.choice(range(24), n_fraud, p=[0.1]*6 + [0.05]*12 + [0.15]*6),
'is_weekend': np.random.binomial(1, 0.4, n_fraud),
'ip_risk_score': np.random.beta(9, 1, n_fraud),
'device_risk_score': np.random.beta(9, 1, n_fraud),
'behavior_risk_score': np.random.beta(9, 1, n_fraud)
}
# 合并数据
normal_df = pd.DataFrame(normal_data)
fraud_df = pd.DataFrame(fraud_data)
normal_df['is_fraud'] = 0
fraud_df['is_fraud'] = 1
df = pd.concat([normal_df, fraud_df], ignore_index=True)
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
return df
# 生成数据
df = generate_mock_data(n_samples=100000, fraud_ratio=0.03)
print(f"数据集形状: {df.shape}")
print(f"欺诈比例: {df['is_fraud'].mean():.2%}")
print("\n数据预览:")
print(df.head())
print("\n数据描述:")
print(df.describe())
输出结果:
数据集形状: (100000, 16)
欺诈比例: 3.00%
数据预览:
age income credit_score ... device_risk_score behavior_risk_score is_fraud
0 32.0 39568.841204 702.071712 ... 0.949165 0.934952 1
1 36.0 36086.549448 636.562622 ... 0.058418 0.164933 0
2 40.0 42660.780073 587.607192 ... 0.156494 0.259146 0
3 29.0 30851.193004 634.428271 ... 0.901538 0.913359 1
4 35.0 43680.699849 719.729066 ... 0.202329 0.048828 0
数据描述:
age income ... behavior_risk_score is_fraud
count 100000.0000 100000.000000 ... 100000.000000 100000.00000
mean 34.1491 37376.555550 ... 0.181945 0.03000
std 9.9725 15109.183776 ... 0.273649 0.17058
min 18.0000 4483.530130 ... 0.000066 0.00000
25% 27.0000 26852.983470 ... 0.012102 0.00000
50% 34.0000 36343.477143 ... 0.058418 0.00000
75% 41.0000 46056.892532 ... 0.259146 0.00000
max 70.0000 147148.420923 ... 0.999580 1.00000
现实数据清洗:处理缺失值、异常值和业务矛盾
在实际业务中,数据清洗占建模工作的60%以上时间。以下是实战中的清洗策略:
classFraudDataCleaner:
"""反欺诈数据清洗器"""
def__init__(self, df):
self.df = df.copy()
self.cleaning_report = {}
defclean_data(self):
"""执行完整的数据清洗流程"""
print("开始数据清洗...")
# 1. 处理缺失值
self.handle_missing_values()
# 2. 处理异常值(基于业务逻辑)
self.handle_outliers()
# 3. 处理业务逻辑矛盾
self.handle_business_contradictions()
# 4. 特征类型转换
self.convert_feature_types()
print("数据清洗完成!")
self.print_cleaning_report()
returnself.df
defhandle_missing_values(self):
"""处理缺失值"""
original_shape = self.df.shape[0]
# 关键特征缺失直接删除
critical_features = ['age', 'income', 'credit_score']
self.df = self.df.dropna(subset=critical_features)
# 非关键特征缺失用中位数填充
numeric_features = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_features:
ifself.df[col].isnull().sum() > 0:
median_val = self.df[col].median()
self.df[col].fillna(median_val, inplace=True)
self.cleaning_report[f'{col}_missing_filled'] = median_val
removed_count = original_shape - self.df.shape[0]
self.cleaning_report['rows_removed_for_missing'] = removed_count
defhandle_outliers(self):
"""基于业务逻辑处理异常值"""
# 年龄异常:超过70岁或低于18岁
age_outliers = self.df[(self.df['age'] < 18) | (self.df['age'] > 70)].shape[0]
self.df = self.df[(self.df['age'] >= 18) & (self.df['age'] <= 70)]
# 收入异常:超过百万或低于1万
income_outliers = self.df[(self.df['income'] < 10000) | (self.df['income'] > 1000000)].shape[0]
self.df = self.df[(self.df['income'] >= 10000) & (self.df['income'] <= 1000000)]
# 信用分异常:不在300-850范围内
credit_outliers = self.df[(self.df['credit_score'] < 300) | (self.df['credit_score'] > 850)].shape[0]
self.df = self.df[(self.df['credit_score'] >= 300) & (self.df['credit_score'] <= 850)]
self.cleaning_report.update({
'age_outliers_removed': age_outliers,
'income_outliers_removed': income_outliers,
'credit_outliers_removed': credit_outliers
})
defhandle_business_contradictions(self):
"""处理业务逻辑矛盾"""
# 矛盾1:工作年限大于年龄-18
self.df['employment_years'] = self.df.apply(
lambda x: min(x['employment_years'], x['age'] - 18), axis=1
)
# 矛盾2:债务收入比超过2
self.df['debt_to_income'] = self.df['debt_to_income'].clip(0, 2)
# 矛盾3:近期查询次数过多(>20次)
self.df['inquiries_6m'] = self.df['inquiries_6m'].clip(0, 20)
self.cleaning_report['business_contradictions_fixed'] = True
defconvert_feature_types(self):
"""转换特征类型"""
# 将分类特征转换为类别类型
categorical_features = ['loan_term', 'is_weekend']
for col in categorical_features:
self.df[col] = self.df[col].astype('category')
self.cleaning_report['features_converted'] = categorical_features
defprint_cleaning_report(self):
"""打印清洗报告"""
print("\n=== 数据清洗报告 ===")
for key, value inself.cleaning_report.items():
print(f"{key}: {value}")
print(f"清洗后数据形状: {self.df.shape}")
print(f"清洗后欺诈比例: {self.df['is_fraud'].mean():.2%}")
# 执行数据清洗
cleaner = FraudDataCleaner(df)
df_clean = cleaner.clean_data()
特征工程:从原始数据中挖掘欺诈信号
特征工程决定了模型的上限。在反欺诈场景中,我们需要创造性地构建特征。
classFraudFeatureEngineer:
"""反欺诈特征工程师"""
def__init__(self, df):
self.df = df.copy()
self.feature_report = {}
defcreate_features(self):
"""创建反欺诈特征"""
print("\n开始特征工程...")
# 1. 基础特征转换
self.create_basic_features()
# 2. 风险特征组合
self.create_risk_features()
# 3. 行为特征衍生
self.create_behavior_features()
# 4. 时间特征处理
self.create_time_features()
# 5. 交互特征
self.create_interaction_features()
print("特征工程完成!")
self.print_feature_report()
returnself.df
defcreate_basic_features(self):
"""创建基础特征"""
# 对数变换处理偏态分布
self.df['log_income'] = np.log1p(self.df['income'])
self.df['log_loan_amount'] = np.log1p(self.df['loan_amount'])
# 分箱处理连续特征
self.df['age_bin'] = pd.cut(self.df['age'],
bins=[18, 25, 35, 45, 55, 70],
labels=['18-25', '26-35', '36-45', '46-55', '56-70'])
self.df['credit_score_bin'] = pd.cut(self.df['credit_score'],
bins=[300, 500, 600, 700, 850],
labels=['300-500', '501-600', '601-700', '701-850'])
self.feature_report['basic_features'] = ['log_income', 'log_loan_amount', 'age_bin', 'credit_score_bin']
defcreate_risk_features(self):
"""创建风险特征"""
# 风险评分聚合
self.df['total_risk_score'] = (
self.df['ip_risk_score'] +
self.df['device_risk_score'] +
self.df['behavior_risk_score']
) / 3
# 风险一致性(欺诈者往往各项风险评分都高)
self.df['risk_consistency'] = (
(self.df['ip_risk_score'] > 0.7).astype(int) +
(self.df['device_risk_score'] > 0.7).astype(int) +
(self.df['behavior_risk_score'] > 0.7).astype(int)
)
# 高风险标志
self.df['high_risk_flag'] = (
(self.df['total_risk_score'] > 0.7) |
(self.df['risk_consistency'] >= 2)
).astype(int)
self.feature_report['risk_features'] = ['total_risk_score', 'risk_consistency', 'high_risk_flag']
defcreate_behavior_features(self):
"""创建行为特征"""
# 申请紧迫性(短时间内多次申请)
# 模拟数据中,我们用inquiries_6m来近似
self.df['application_urgency'] = self.df['inquiries_6m'] / 6# 月均查询次数
# 设备新鲜度(新设备风险更高)
self.df['device_freshness'] = (self.df['device_age_days'] < 7).astype(int)
# 债务负担程度
self.df['debt_burden'] = self.df['debt_to_income'] * self.df['loan_amount'] / self.df['income']
self.feature_report['behavior_features'] = ['application_urgency', 'device_freshness', 'debt_burden']
defcreate_time_features(self):
"""创建时间特征"""
# 申请时间风险(凌晨申请风险高)
self.df['high_risk_hour'] = (
(self.df['apply_hour'] >= 0) & (self.df['apply_hour'] <= 6)
).astype(int)
# 周末申请风险
self.df['weekend_risk'] = self.df['is_weekend'] * self.df['high_risk_hour']
self.feature_report['time_features'] = ['high_risk_hour', 'weekend_risk']
defcreate_interaction_features(self):
"""创建交互特征"""
# 年龄与信用分的交互
self.df['age_credit_interaction'] = self.df['age'] * self.df['credit_score'] / 1000
# 收入与贷款额的匹配度
self.df['income_loan_ratio'] = self.df['loan_amount'] / self.df['income']
# 风险评分与行为特征的交互
self.df['risk_behavior_interaction'] = self.df['total_risk_score'] * self.df['application_urgency']
self.feature_report['interaction_features'] = ['age_credit_interaction', 'income_loan_ratio', 'risk_behavior_interaction']
defprint_feature_report(self):
"""打印特征报告"""
print("\n=== 特征工程报告 ===")
print(f"原始特征数: 16")
print(f"新增特征数: {sum(len(v) for v in self.feature_report.values())}")
for category, features inself.feature_report.items():
print(f"\n{category} ({len(features)}个):")
print(f" {', '.join(features[:5])}" + ("..."iflen(features) > 5else""))
# 执行特征工程
engineer = FraudFeatureEngineer(df_clean)
df_features = engineer.create_features()
# 准备建模数据
# 选择最终特征
final_features = [
# 原始特征
'age', 'income', 'credit_score', 'employment_years', 'debt_to_income',
'loan_amount', 'loan_term', 'previous_defaults', 'inquiries_6m',
'device_age_days', 'apply_hour', 'is_weekend',
'ip_risk_score', 'device_risk_score', 'behavior_risk_score',
# 新增特征
'log_income', 'log_loan_amount',
'total_risk_score', 'risk_consistency', 'high_risk_flag',
'application_urgency', 'device_freshness', 'debt_burden',
'high_risk_hour', 'weekend_risk',
'age_credit_interaction', 'income_loan_ratio', 'risk_behavior_interaction'
]
# 处理分类特征
df_features = pd.get_dummies(df_features, columns=['age_bin', 'credit_score_bin', 'loan_term'], drop_first=True)
# 更新特征列表
categorical_dummies = [col for col in df_features.columns if'_'in col and col notin final_features]
final_features.extend(categorical_dummies)
X = df_features[final_features]
y = df_features['is_fraud']
print(f"\n最终特征维度: {X.shape[1]}")
print(f"目标变量分布:\n{y.value_counts(normalize=True)}")
02 模型构建:多算法融合的智能防御体系
单一模型难以应对复杂的欺诈模式。我们采用LightGBM + Logistic Regression的融合策略。
解决类别不平衡问题
反欺诈场景最大的挑战是样本极度不平衡(欺诈样本通常<5%)。
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline
defhandle_imbalance(X_train, y_train, strategy='hybrid'):
"""处理类别不平衡问题"""
if strategy == 'hybrid':
# 混合采样策略:过采样少数类 + 欠采样多数类
over = SMOTE(sampling_strategy=0.1, random_state=42) # 少数类增加到10%
under = RandomUnderSampler(sampling_strategy=0.5, random_state=42) # 多数类减少到50%
steps = [('over', over), ('under', under)]
pipeline = Pipeline(steps=steps)
X_resampled, y_resampled = pipeline.fit_resample(X_train, y_train)
elif strategy == 'smote':
# 仅使用SMOTE过采样
smote = SMOTE(sampling_strategy=0.2, random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
elif strategy == 'weight':
# 使用类别权重(不改变样本分布)
X_resampled, y_resampled = X_train, y_train
else:
raise ValueError(f"Unknown strategy: {strategy}")
print(f"重采样前类别分布: {np.bincount(y_train)}")
print(f"重采样后类别分布: {np.bincount(y_resampled)}")
return X_resampled, y_resampled
# 划分训练集和测试集(保持时间顺序)
# 在实际业务中,我们需要按时间划分,避免未来信息泄露
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, stratify=y, random_state=42
)
print(f"训练集形状: {X_train.shape}")
print(f"测试集形状: {X_test.shape}")
# 处理类别不平衡
X_train_balanced, y_train_balanced = handle_imbalance(X_train, y_train, strategy='hybrid')
LightGBM模型:处理复杂非线性关系
LightGBM在效率和效果上都有出色表现,特别适合反欺诈场景。
import lightgbm as lgb
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
classFraudLightGBMModel:
"""反欺诈LightGBM模型"""
def__init__(self):
self.model = None
self.feature_importance = None
self.best_params = None
deftrain(self, X_train, y_train, cv_strategy='time_series'):
"""训练LightGBM模型"""
print("\n开始训练LightGBM模型...")
# 基础参数
params = {
'objective': 'binary',
'metric': 'auc',
'boosting_type': 'gbdt',
'learning_rate': 0.05,
'num_leaves': 31,
'max_depth': -1,
'min_child_samples': 20,
'subsample': 0.8,
'colsample_bytree': 0.8,
'reg_alpha': 0.1,
'reg_lambda': 0.1,
'n_jobs': -1,
'random_state': 42,
'silent': True,
'verbose': -1
}
# 根据交叉验证策略选择
if cv_strategy == 'time_series':
# 时间序列交叉验证(更符合实际业务)
tscv = TimeSeriesSplit(n_splits=5)
cv = tscv
else:
# 常规交叉验证
cv = 5
# 网格搜索优化关键参数
param_grid = {
'num_leaves': [31, 63, 127],
'max_depth': [5, 7, -1],
'learning_rate': [0.01, 0.05, 0.1],
'min_child_samples': [10, 20, 30],
'subsample': [0.7, 0.8, 0.9]
}
print("进行参数调优...")
gbm = lgb.LGBMClassifier(**params)
grid_search = GridSearchCV(
estimator=gbm,
param_grid=param_grid,
scoring='roc_auc',
cv=cv,
n_jobs=-1,
verbose=0
)
grid_search.fit(X_train, y_train)
self.best_params = grid_search.best_params_
self.model = grid_search.best_estimator_
print(f"最佳参数: {self.best_params}")
print(f"最佳AUC: {grid_search.best_score_:.4f}")
# 获取特征重要性
self.feature_importance = pd.DataFrame({
'feature': X_train.columns,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
returnself.model
defevaluate(self, X_test, y_test, threshold=0.5):
"""评估模型性能"""
print("\n=== LightGBM模型评估 ===")
# 预测概率
y_pred_proba = self.model.predict_proba(X_test)[:, 1]
# 按阈值分类
y_pred = (y_pred_proba >= threshold).astype(int)
# 计算评估指标
auc = roc_auc_score(y_test, y_pred_proba)
print(f"AUC: {auc:.4f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=['正常', '欺诈']))
print("\n混淆矩阵:")
cm = confusion_matrix(y_test, y_pred)
print(f"真正例(TP): {cm[1, 1]} - 识别出的欺诈")
print(f"假正例(FP): {cm[0, 1]} - 误伤的正常客户")
print(f"假反例(FN): {cm[1, 0]} - 漏掉的欺诈")
print(f"真反例(TN): {cm[0, 0]} - 正确放行的正常客户")
# 计算业务关键指标
fraud_precision = cm[1, 1] / (cm[1, 1] + cm[0, 1]) if (cm[1, 1] + cm[0, 1]) > 0else0
fraud_recall = cm[1, 1] / (cm[1, 1] + cm[1, 0]) if (cm[1, 1] + cm[1, 0]) > 0else0
false_positive_rate = cm[0, 1] / (cm[0, 1] + cm[0, 0]) if (cm[0, 1] + cm[0, 0]) > 0else0
print(f"\n业务指标:")
print(f"欺诈查准率: {fraud_precision:.2%}")
print(f"欺诈查全率: {fraud_recall:.2%}")
print(f"误伤率: {false_positive_rate:.2%}")
return {
'auc': auc,
'fraud_precision': fraud_precision,
'fraud_recall': fraud_recall,
'false_positive_rate': false_positive_rate,
'y_pred_proba': y_pred_proba
}
defplot_feature_importance(self, top_n=20):
"""绘制特征重要性图"""
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(10, 8))
top_features = self.feature_importance.head(top_n)
sns.barplot(x='importance', y='feature', data=top_features)
plt.title(f'Top {top_n} 特征重要性')
plt.xlabel('重要性')
plt.ylabel('特征')
plt.tight_layout()
plt.show()
return top_features
# 训练LightGBM模型
lgb_model = FraudLightGBMModel()
lgb_model.train(X_train_balanced, y_train_balanced, cv_strategy='stratified')
lgb_results = lgb_model.evaluate(X_test, y_test, threshold=0.5)
# 查看特征重要性
top_features = lgb_model.plot_feature_importance(top_n=15)
print("\nTop 15重要特征:")
print(top_features[['feature', 'importance']].to_string(index=False))
Logistic Regression模型:可解释性的保障
虽然LightGBM效果更好,但LR模型的可解释性在金融领域至关重要。
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
classFraudLogisticModel:
"""反欺诈逻辑回归模型"""
def__init__(self):
self.model = None
self.scaler = StandardScaler()
self.feature_coefficients = None
deftrain(self, X_train, y_train):
"""训练逻辑回归模型"""
print("\n开始训练逻辑回归模型...")
# 标准化特征(LR对尺度敏感)
X_train_scaled = self.scaler.fit_transform(X_train)
# 训练模型
self.model = LogisticRegression(
penalty='l2',
C=0.1,
solver='liblinear',
class_weight='balanced',
random_state=42,
max_iter=1000
)
self.model.fit(X_train_scaled, y_train)
# 获取特征系数
self.feature_coefficients = pd.DataFrame({
'feature': X_train.columns,
'coefficient': self.model.coef_[0],
'abs_coefficient': abs(self.model.coef_[0])
}).sort_values('abs_coefficient', ascending=False)
print("逻辑回归模型训练完成!")
returnself.model
defevaluate(self, X_test, y_test, threshold=0.5):
"""评估模型性能"""
print("\n=== 逻辑回归模型评估 ===")
# 标准化测试数据
X_test_scaled = self.scaler.transform(X_test)
# 预测概率
y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
# 按阈值分类
y_pred = (y_pred_proba >= threshold).astype(int)
# 计算评估指标
auc = roc_auc_score(y_test, y_pred_proba)
print(f"AUC: {auc:.4f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=['正常', '欺诈']))
return {
'auc': auc,
'y_pred_proba': y_pred_proba
}
definterpret_coefficients(self, top_n=15):
"""解释模型系数"""
print(f"\n=== 逻辑回归系数解释 (Top {top_n}) ===")
top_coeffs = self.feature_coefficients.head(top_n)
for _, row in top_coeffs.iterrows():
feature = row['feature']
coeff = row['coefficient']
impact = "增加欺诈风险"if coeff > 0else"降低欺诈风险"
print(f"{feature:30} 系数: {coeff:+.4f} ({impact})")
return top_coeffs
# 训练逻辑回归模型
lr_model = FraudLogisticModel()
lr_model.train(X_train, y_train) # 注意:使用原始不平衡数据,因为LR有class_weight参数
lr_results = lr_model.evaluate(X_test, y_test, threshold=0.5)
# 解释模型系数
top_coeffs = lr_model.interpret_coefficients(top_n=15)
模型融合:结合两者优势
classEnsembleFraudModel:
"""集成欺诈检测模型"""
def__init__(self, model1, model2, weights=None):
self.model1 = model1 # LightGBM
self.model2 = model2 # Logistic Regression
self.weights = weights if weights else [0.7, 0.3] # 默认权重
defpredict_proba(self, X):
"""预测概率(加权平均)"""
# LightGBM预测
proba1 = self.model1.predict_proba(X)[:, 1]
# 逻辑回归预测(需要标准化)
ifhasattr(self.model2, 'scaler'):
X_scaled = self.model2.scaler.transform(X)
proba2 = self.model2.model.predict_proba(X_scaled)[:, 1]
else:
proba2 = self.model2.predict_proba(X)[:, 1]
# 加权平均
ensemble_proba = self.weights[0] * proba1 + self.weights[1] * proba2
return ensemble_proba
defevaluate_ensemble(self, X_test, y_test, thresholds=None):
"""评估集成模型在不同阈值下的表现"""
if thresholds isNone:
thresholds = [0.3, 0.4, 0.5, 0.6, 0.7]
print("\n=== 集成模型评估 ===")
# 获取集成预测概率
y_pred_proba = self.predict_proba(X_test)
results = []
for threshold in thresholds:
y_pred = (y_pred_proba >= threshold).astype(int)
# 计算指标
auc = roc_auc_score(y_test, y_pred_proba)
cm = confusion_matrix(y_test, y_pred)
# 业务指标
fraud_precision = cm[1, 1] / (cm[1, 1] + cm[0, 1]) if (cm[1, 1] + cm[0, 1]) > 0else0
fraud_recall = cm[1, 1] / (cm[1, 1] + cm[1, 0]) if (cm[1, 1] + cm[1, 0]) > 0else0
false_positive_rate = cm[0, 1] / (cm[0, 1] + cm[0, 0]) if (cm[0, 1] + cm[0, 0]) > 0else0
results.append({
'threshold': threshold,
'auc': auc,
'fraud_precision': fraud_precision,
'fraud_recall': fraud_recall,
'false_positive_rate': false_positive_rate,
'fraud_caught': cm[1, 1],
'normal_rejected': cm[0, 1]
})
print(f"\n阈值: {threshold}")
print(f" 欺诈查准率: {fraud_precision:.2%}")
print(f" 欺诈查全率: {fraud_recall:.2%}")
print(f" 误伤率: {false_positive_rate:.2%}")
print(f" 捕捉欺诈数: {cm[1, 1]}/{cm[1, :].sum()}")
print(f" 误伤正常数: {cm[0, 1]}/{cm[0, :].sum()}")
return pd.DataFrame(results), y_pred_proba
# 创建集成模型
ensemble_model = EnsembleFraudModel(lgb_model.model, lr_model)
ensemble_results, ensemble_proba = ensemble_model.evaluate_ensemble(X_test, y_test)
# 选择最佳阈值(基于业务需求)
print("\n=== 阈值选择建议 ===")
print("高安全场景(宁可错杀不可放过):选择阈值 0.3-0.4")
print("平衡场景(平衡风险与体验):选择阈值 0.5")
print("高体验场景(减少误伤):选择阈值 0.6-0.7")
03 模型部署与业务集成
模型训练只是开始,真正的挑战在于将模型部署到生产环境并创造业务价值。
模型保存与加载
import joblib
import json
from datetime import datetime
classFraudModelDeployer:
"""反欺诈模型部署器"""
def__init__(self, model, feature_names, metadata=None):
self.model = model
self.feature_names = feature_names
self.metadata = metadata if metadata else {}
self.metadata['deploy_time'] = datetime.now().isoformat()
defsave_model(self, filepath):
"""保存模型和元数据"""
model_data = {
'model': self.model,
'feature_names': self.feature_names,
'metadata': self.metadata
}
joblib.dump(model_data, filepath)
print(f"模型已保存至: {filepath}")
@staticmethod
defload_model(filepath):
"""加载模型"""
model_data = joblib.load(filepath)
print(f"模型加载成功,部署时间: {model_data['metadata'].get('deploy_time', '未知')}")
return model_data
defcreate_api_endpoint(self, threshold=0.5):
"""创建API端点(伪代码)"""
api_code = '''
from flask import Flask, request, jsonify
import pandas as pd
import joblib
app = Flask(__name__)
# 加载模型
model_data = joblib.load('fraud_model.pkl')
model = model_data['model']
feature_names = model_data['feature_names']
threshold = {threshold}
@app.route('/predict', methods=['POST'])
def predict():
try:
# 获取请求数据
data = request.json
# 转换为DataFrame并确保特征顺序
features = pd.DataFrame([data])[feature_names]
# 预测
if hasattr(model, 'predict_proba'):
# 集成模型
proba = model.predict_proba(features)[:, 1]
else:
# 单个模型
proba = model.predict_proba(features)[:, 1]
prediction = (proba[0] >= threshold).astype(int)
return jsonify({
'success': True,
'fraud_probability': float(proba[0]),
'is_fraud': bool(prediction),
'threshold': threshold,
'features_used': feature_names.tolist()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
'''.format(threshold=threshold)
withopen('fraud_api.py', 'w') as f:
f.write(api_code)
print("API端点代码已生成: fraud_api.py")
return api_code
# 保存模型
deployer = FraudModelDeployer(
model=ensemble_model,
feature_names=X.columns.tolist(),
metadata={
'model_type': 'Ensemble(LightGBM+LogisticRegression)',
'training_date': datetime.now().strftime('%Y-%m-%d'),
'performance': {
'auc': float(lgb_results['auc']),
'fraud_recall': float(lgb_results['fraud_recall']),
'false_positive_rate': float(lgb_results['false_positive_rate'])
}
}
)
deployer.save_model('fraud_detection_model.pkl')
# 生成API代码
api_code = deployer.create_api_endpoint(threshold=0.5)
业务规则集成
在实际业务中,纯模型决策往往不够,需要与业务规则结合。
classBusinessRulesEngine:
"""业务规则引擎"""
def__init__(self):
self.rules = self.initialize_rules()
definitialize_rules(self):
"""初始化业务规则"""
return [
{
'name': '刚性黑名单规则',
'condition': lambda x: x.get('in_blacklist', False),
'action': 'reject',
'priority': 100,
'description': '在黑名单中的直接拒绝'
},
{
'name': '关键信息缺失规则',
'condition': lambda x: notall([
x.get('age'),
x.get('income'),
x.get('credit_score')
]),
'action': 'reject',
'priority': 90,
'description': '关键信息缺失直接拒绝'
},
{
'name': '高风险时段规则',
'condition': lambda x: x.get('apply_hour', 12) in [0, 1, 2, 3, 4, 5] and x.get('is_weekend', 0) == 1,
'action': 'enhanced_verify',
'priority': 50,
'description': '周末凌晨申请需要增强验证'
},
{
'name': '新设备规则',
'condition': lambda x: x.get('device_age_days', 1000) < 7,
'action': 'enhanced_verify',
'priority': 40,
'description': '新设备需要增强验证'
}
]
defapply_rules(self, applicant_data, model_score):
"""应用业务规则"""
decisions = []
# 按优先级排序
sorted_rules = sorted(self.rules, key=lambda x: x['priority'], reverse=True)
for rule in sorted_rules:
try:
if rule['condition'](applicant_data):
decisions.append({
'rule': rule['name'],
'action': rule['action'],
'priority': rule['priority'],
'description': rule['description']
})
# 如果是拒绝规则,直接返回
if rule['action'] == 'reject':
return {
'final_decision': 'reject',
'reason': rule['description'],
'model_score': model_score,
'triggered_rules': decisions
}
except Exception as e:
print(f"规则执行错误: {rule['name']}, 错误: {e}")
# 如果没有触发拒绝规则,使用模型决策
if model_score >= 0.5:
final_decision = 'reject'
reason = f'模型评分过高: {model_score:.3f}'
elif model_score >= 0.3:
final_decision = 'enhanced_verify'
reason = f'模型评分中等: {model_score:.3f}'
else:
final_decision = 'approve'
reason = f'模型评分正常: {model_score:.3f}'
return {
'final_decision': final_decision,
'reason': reason,
'model_score': model_score,
'triggered_rules': decisions
}
# 测试业务规则引擎
rules_engine = BusinessRulesEngine()
# 模拟一个申请
test_applicant = {
'age': 25,
'income': 30000,
'credit_score': 620,
'apply_hour': 2,
'is_weekend': 1,
'device_age_days': 3,
'in_blacklist': False
}
# 假设模型评分为0.65
model_score = 0.65
decision = rules_engine.apply_rules(test_applicant, model_score)
print("\n=== 业务规则决策 ===")
print(f"申请信息: {test_applicant}")
print(f"模型评分: {model_score}")
print(f"最终决策: {decision['final_decision']}")
print(f"决策原因: {decision['reason']}")
print("\n触发的规则:")
for rule in decision['triggered_rules']:
print(f" - {rule['rule']}: {rule['action']} ({rule['description']})")
监控与迭代
模型上线后需要持续监控和迭代。
classModelMonitor:
"""模型监控器"""
def__init__(self, model, X_train, y_train):
self.model = model
self.X_train = X_train
self.y_train = y_train
self.performance_history = []
self.drift_detectors = {}
defmonitor_performance(self, X_new, y_new, batch_id):
"""监控模型性能"""
# 预测
y_pred_proba = self.model.predict_proba(X_new)[:, 1]
y_pred = (y_pred_proba >= 0.5).astype(int)
# 计算指标
auc = roc_auc_score(y_new, y_pred_proba)
fraud_recall = recall_score(y_new, y_pred, pos_label=1)
false_positive_rate = np.mean((y_pred == 1) & (y_new == 0))
performance = {
'batch_id': batch_id,
'timestamp': datetime.now().isoformat(),
'auc': auc,
'fraud_recall': fraud_recall,
'false_positive_rate': false_positive_rate,
'samples': len(y_new),
'fraud_rate': y_new.mean()
}
self.performance_history.append(performance)
# 检查性能下降
iflen(self.performance_history) > 5:
recent_auc = [p['auc'] for p inself.performance_history[-5:]]
if np.mean(recent_auc) < 0.85: # AUC低于0.85触发预警
print(f"⚠️ 预警: 模型性能下降,最近5批平均AUC: {np.mean(recent_auc):.3f}")
return performance
defdetect_drift(self, X_new):
"""检测数据漂移"""
from scipy import stats
drift_signals = []
for i, col inenumerate(self.X_train.columns):
if i >= 10: # 只检查前10个重要特征
break
# KS检验检测分布变化
stat, p_value = stats.ks_2samp(self.X_train[col].values[:1000], X_new[col].values[:1000])
if p_value < 0.01: # 分布显著变化
drift_signals.append({
'feature': col,
'ks_statistic': stat,
'p_value': p_value,
'message': f'特征{col}分布发生显著变化'
})
return drift_signals
defgenerate_report(self):
"""生成监控报告"""
ifnotself.performance_history:
return"暂无监控数据"
df_perf = pd.DataFrame(self.performance_history)
report = f"""
=== 模型监控报告 ===
监控时间范围: {df_perf['timestamp'].min()} 到 {df_perf['timestamp'].max()}
总批次数: {len(df_perf)}
总样本数: {df_perf['samples'].sum():,}
性能指标:
- 平均AUC: {df_perf['auc'].mean():.3f}
- 平均欺诈查全率: {df_perf['fraud_recall'].mean():.2%}
- 平均误伤率: {df_perf['false_positive_rate'].mean():.2%}
趋势分析:
- AUC趋势: {'稳定'if df_perf['auc'].std() < 0.05else'波动'}
- 欺诈率变化: {df_perf['fraud_rate'].iloc[-1]:.2%} (最近批次)
建议:
"""
if df_perf['auc'].iloc[-1] < 0.8:
report += "- 模型性能下降,建议重新训练\n"
if df_perf['false_positive_rate'].iloc[-1] > 0.05:
report += "- 误伤率过高,建议调整阈值\n"
return report
# 模拟监控
monitor = ModelMonitor(ensemble_model, X_train, y_train)
# 模拟几批新数据
for i inrange(10):
# 模拟新数据(在实际业务中从生产环境获取)
batch_size = 1000
X_batch = X_test.iloc[i*batch_size:(i+1)*batch_size]
y_batch = y_test.iloc[i*batch_size:(i+1)*batch_size]
perf = monitor.monitor_performance(X_batch, y_batch, batch_id=i)
if i % 3 == 0: # 每3批检查一次数据漂移
drift_signals = monitor.detect_drift(X_batch)
if drift_signals:
print(f"批次{i}: 检测到数据漂移")
for signal in drift_signals[:3]: # 只显示前3个
print(f" - {signal['message']} (p={signal['p_value']:.4f})")
# 生成最终报告
print(monitor.generate_report())
通过这个完整的Python实战案例,我们不仅构建了高性能的反欺诈模型,更重要的是建立了从数据处理到模型部署,再到持续监控的完整工作流。
但记住,在真实业务中:
- 1. 业务理解比算法更重要:知道为什么某些特征有效,比单纯追求AUC更有价值
- 2. 可解释性至关重要:当模型拒绝一个客户时,你必须能解释为什么
- 3. 误伤成本可能高于欺诈损失:每个误伤的真实客户,都可能永远离开你
- 4. 模型需要持续进化:欺诈手段在变化,模型也必须随之进化
最好的反欺诈模型,不是技术指标最漂亮的模型,而是业务团队愿意用、敢用、会用的模型。 这需要数据科学家不仅懂算法,更要懂业务、懂人性、懂风险管理的本质。
当你的模型既能精准识别欺诈,又能为真实客户提供顺畅体验时,你才真正掌握了机器学习反欺诈的艺术。这,才是数据科学在金融风控领域的真正价值。