一、模型训练的核心概念
1. 什么是模型训练?
模型训练是通过算法从数据中学习模式,构建一个可以对新数据进行预测或决策的函数的过程。这就像是教计算机识别模式的艺术。
2. 训练的关键组成部分
# 模型训练的三个核心要素from sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScaler# 1. 数据准备X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y)# 2. 数据预处理(标准化)scaler = StandardScaler()X_train_scaled = scaler.fit_transform(X_train)X_test_scaled = scaler.transform(X_test) # 注意:使用训练集的参数# 3. 训练集、验证集、测试集的角色print(f"训练集: {X_train.shape} - 用于训练模型参数")print(f"验证集: 用于调整超参数和防止过拟合")print(f"测试集: {X_test.shape} - 用于最终评估模型性能")
二、常用机器学习算法
1. 监督学习算法
from sklearn.linear_model import LinearRegression, LogisticRegressionfrom sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressorfrom sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressorfrom sklearn.svm import SVC, SVRfrom sklearn.neighbors import KNeighborsClassifierfrom sklearn.naive_bayes import GaussianNBfrom sklearn.neural_network import MLPClassifier# 分类模型示例models_classification = { "逻辑回归": LogisticRegression(max_iter=1000, random_state=42), "决策树": DecisionTreeClassifier(max_depth=5, random_state=42), "随机森林": RandomForestClassifier(n_estimators=100, random_state=42), "SVM": SVC(kernel='rbf', probability=True, random_state=42), "K近邻": KNeighborsClassifier(n_neighbors=5), "朴素贝叶斯": GaussianNB(), "神经网络": MLPClassifier(hidden_layer_sizes=(100,), max_iter=1000, random_state=42)}# 回归模型示例models_regression = { "线性回归": LinearRegression(), "决策树回归": DecisionTreeRegressor(max_depth=5, random_state=42), "随机森林回归": RandomForestRegressor(n_estimators=100, random_state=42), "梯度提升回归": GradientBoostingRegressor(n_estimators=100, random_state=42), "SVR": SVR(kernel='rbf')}
2. 无监督学习算法
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClusteringfrom sklearn.decomposition import PCAfrom sklearn.manifold import TSNEfrom sklearn.mixture import GaussianMixture# 聚类算法clustering_models = { "K均值": KMeans(n_clusters=3, random_state=42), "DBSCAN": DBSCAN(eps=0.5, min_samples=5), "层次聚类": AgglomerativeClustering(n_clusters=3), "高斯混合模型": GaussianMixture(n_components=3, random_state=42)}# 降维算法dim_reduction_models = { "PCA": PCA(n_components=2, random_state=42), "t-SNE": TSNE(n_components=2, random_state=42)}
三、模型训练流程
1. 基础训练流程
def train_and_evaluate_model(model, X_train, y_train, X_test, y_test, model_name=""): """完整的模型训练和评估流程""" # 1. 训练模型 print(f"\n{'='*50}") print(f"训练模型: {model_name}") print(f"{'='*50}") start_time = time.time() model.fit(X_train, y_train) training_time = time.time() - start_time # 2. 预测 y_pred = model.predict(X_test) y_pred_proba = None if hasattr(model, "predict_proba"): y_pred_proba = model.predict_proba(X_test) # 3. 评估 from sklearn.metrics import classification_report, confusion_matrix, accuracy_score accuracy = accuracy_score(y_test, y_pred) print(f"训练时间: {training_time:.3f}秒") print(f"测试集准确率: {accuracy:.3f}") # 4. 详细评估报告 print("\n分类报告:") print(classification_report(y_test, y_pred)) # 5. 混淆矩阵 cm = confusion_matrix(y_test, y_pred) print("混淆矩阵:") print(cm) return model, y_pred, y_pred_proba# 批量训练多个模型results = {}for name, model in models_classification.items(): trained_model, preds, probas = train_and_evaluate_model( model, X_train_scaled, y_train, X_test_scaled, y_test, name ) results[name] = { 'model': trained_model, 'predictions': preds, 'probabilities': probas }
2. 交叉验证训练
from sklearn.model_selection import cross_val_score, StratifiedKFold, KFold# 分类问题的分层交叉验证def cross_validate_model(model, X, y, cv_strategy=5, scoring='accuracy'): """执行交叉验证""" # 分层K折交叉验证(保持类别比例) if scoring in ['accuracy', 'f1', 'roc_auc']: cv = StratifiedKFold(n_splits=cv_strategy, shuffle=True, random_state=42) else: cv = KFold(n_splits=cv_strategy, shuffle=True, random_state=42) # 交叉验证得分 scores = cross_val_score(model, X, y, cv=cv, scoring=scoring, n_jobs=-1) print(f"交叉验证{scoring}得分:") print(f" 各折得分: {scores}") print(f" 平均得分: {scores.mean():.3f} (+/- {scores.std() * 2:.3f})") return scores# 对多个模型进行交叉验证for name, model in models_classification.items(): print(f"\n{'='*50}") print(f"模型: {name}") print(f"{'='*50}") scores = cross_validate_model(model, X, y, cv_strategy=5, scoring='accuracy')
3. 超参数调优
from sklearn.model_selection import GridSearchCV, RandomizedSearchCVfrom scipy.stats import randint, uniform# 1. 网格搜索(适用于小规模参数空间)def grid_search_tuning(model, param_grid, X_train, y_train): """网格搜索超参数调优""" grid_search = GridSearchCV( estimator=model, param_grid=param_grid, cv=5, scoring='accuracy', n_jobs=-1, verbose=1, return_train_score=True ) grid_search.fit(X_train, y_train) print(f"最佳参数: {grid_search.best_params_}") print(f"最佳交叉验证得分: {grid_search.best_score_:.3f}") print(f"最佳模型: {grid_search.best_estimator_}") # 查看所有参数组合的结果 results_df = pd.DataFrame(grid_search.cv_results_) print(f"\n排名前5的参数组合:") print(results_df[['params', 'mean_test_score', 'std_test_score']] .sort_values('mean_test_score', ascending=False).head()) return grid_search# 随机森林的网格搜索rf_param_grid = { 'n_estimators': [100, 200, 300], 'max_depth': [None, 10, 20, 30], 'min_samples_split': [2, 5, 10], 'min_samples_leaf': [1, 2, 4], 'max_features': ['sqrt', 'log2']}rf_grid_search = grid_search_tuning( RandomForestClassifier(random_state=42), rf_param_grid, X_train_scaled, y_train)# 2. 随机搜索(适用于大规模参数空间)def random_search_tuning(model, param_dist, X_train, y_train, n_iter=50): """随机搜索超参数调优""" random_search = RandomizedSearchCV( estimator=model, param_distributions=param_dist, n_iter=n_iter, cv=5, scoring='accuracy', n_jobs=-1, random_state=42, verbose=1 ) random_search.fit(X_train, y_train) print(f"随机搜索最佳参数: {random_search.best_params_}") print(f"随机搜索最佳得分: {random_search.best_score_:.3f}") return random_search# 随机森林的随机搜索参数分布rf_param_dist = { 'n_estimators': randint(100, 500), 'max_depth': [None] + list(np.arange(5, 30, 5)), 'min_samples_split': randint(2, 20), 'min_samples_leaf': randint(1, 10), 'max_features': ['sqrt', 'log2', None], 'bootstrap': [True, False]}rf_random_search = random_search_tuning( RandomForestClassifier(random_state=42), rf_param_dist, X_train_scaled, y_train, n_iter=30)
四、模型评估指标详解
1. 分类问题评估指标
from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report, precision_recall_curve, roc_curve, log_loss)def evaluate_classification_model(y_true, y_pred, y_pred_proba=None): """全面评估分类模型""" metrics = {} # 基础指标 metrics['accuracy'] = accuracy_score(y_true, y_pred) metrics['precision_macro'] = precision_score(y_true, y_pred, average='macro') metrics['recall_macro'] = recall_score(y_true, y_pred, average='macro') metrics['f1_macro'] = f1_score(y_true, y_pred, average='macro') # 二分类特有指标 if len(np.unique(y_true)) == 2 and y_pred_proba is not None: metrics['roc_auc'] = roc_auc_score(y_true, y_pred_proba[:, 1]) metrics['log_loss'] = log_loss(y_true, y_pred_proba) # 生成详细报告 print(f"准确率: {metrics['accuracy']:.3f}") print(f"精确率(宏平均): {metrics['precision_macro']:.3f}") print(f"召回率(宏平均): {metrics['recall_macro']:.3f}") print(f"F1分数(宏平均): {metrics['f1_macro']:.3f}") if 'roc_auc' in metrics: print(f"ROC AUC: {metrics['roc_auc']:.3f}") print(f"对数损失: {metrics['log_loss']:.3f}") # 混淆矩阵可视化 plot_confusion_matrix(y_true, y_pred) # 分类报告 print("\n详细分类报告:") print(classification_report(y_true, y_pred)) return metricsdef plot_confusion_matrix(y_true, y_pred, labels=None): """绘制混淆矩阵""" import matplotlib.pyplot as plt import seaborn as sns cm = confusion_matrix(y_true, y_pred) plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels) plt.xlabel('预测标签') plt.ylabel('真实标签') plt.title('混淆矩阵') plt.show()def plot_roc_curve(y_true, y_pred_proba): """绘制ROC曲线""" from sklearn.metrics import roc_curve, auc fpr, tpr, thresholds = roc_curve(y_true, y_pred_proba[:, 1]) roc_auc = auc(fpr, tpr) plt.figure(figsize=(8, 6)) plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC曲线 (AUC = {roc_auc:.3f})') plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('假正率') plt.ylabel('真正率') plt.title('接收者操作特征(ROC)曲线') plt.legend(loc="lower right") plt.show() return roc_auc
2. 回归问题评估指标
from sklearn.metrics import ( mean_absolute_error, mean_squared_error, mean_squared_log_error, r2_score, explained_variance_score, median_absolute_error)def evaluate_regression_model(y_true, y_pred): """全面评估回归模型""" metrics = {} # 计算各种回归指标 metrics['MAE'] = mean_absolute_error(y_true, y_pred) metrics['MSE'] = mean_squared_error(y_true, y_pred) metrics['RMSE'] = np.sqrt(metrics['MSE']) metrics['R2'] = r2_score(y_true, y_pred) metrics['Explained_Variance'] = explained_variance_score(y_true, y_pred) metrics['MedAE'] = median_absolute_error(y_true, y_pred) # 对于非负目标变量 if (y_true >= 0).all() and (y_pred >= 0).all(): metrics['RMSLE'] = np.sqrt(mean_squared_log_error(y_true, y_pred)) # 输出结果 print(f"平均绝对误差(MAE): {metrics['MAE']:.3f}") print(f"均方误差(MSE): {metrics['MSE']:.3f}") print(f"均方根误差(RMSE): {metrics['RMSE']:.3f}") print(f"R²分数: {metrics['R2']:.3f}") print(f"可解释方差: {metrics['Explained_Variance']:.3f}") if 'RMSLE' in metrics: print(f"均方根对数误差(RMSLE): {metrics['RMSLE']:.3f}") # 可视化预测结果 plot_regression_results(y_true, y_pred) return metricsdef plot_regression_results(y_true, y_pred): """绘制回归结果可视化""" plt.figure(figsize=(12, 4)) # 1. 预测vs实际散点图 plt.subplot(1, 3, 1) plt.scatter(y_true, y_pred, alpha=0.5) plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2) plt.xlabel('实际值') plt.ylabel('预测值') plt.title('预测值 vs 实际值') # 2. 残差图 plt.subplot(1, 3, 2) residuals = y_true - y_pred plt.scatter(y_pred, residuals, alpha=0.5) plt.axhline(y=0, color='r', linestyle='--') plt.xlabel('预测值') plt.ylabel('残差') plt.title('残差图') # 3. 残差分布 plt.subplot(1, 3, 3) plt.hist(residuals, bins=30, edgecolor='black') plt.xlabel('残差') plt.ylabel('频率') plt.title('残差分布') plt.tight_layout() plt.show()
3. 聚类问题评估指标
from sklearn.metrics import ( silhouette_score, calinski_harabasz_score, davies_bouldin_score, adjusted_rand_score, normalized_mutual_info_score, homogeneity_score)def evaluate_clustering_model(X, labels, true_labels=None): """评估聚类模型""" metrics = {} # 内部指标(不需要真实标签) metrics['silhouette'] = silhouette_score(X, labels) metrics['calinski_harabasz'] = calinski_harabasz_score(X, labels) metrics['davies_bouldin'] = davies_bouldin_score(X, labels) print(f"轮廓系数: {metrics['silhouette']:.3f}") print(f"Calinski-Harabasz指数: {metrics['calinski_harabasz']:.3f}") print(f"Davies-Bouldin指数: {metrics['davies_bouldin']:.3f}") # 外部指标(需要真实标签) if true_labels is not None: metrics['adjusted_rand'] = adjusted_rand_score(true_labels, labels) metrics['nmi'] = normalized_mutual_info_score(true_labels, labels) metrics['homogeneity'] = homogeneity_score(true_labels, labels) print(f"调整兰德指数: {metrics['adjusted_rand']:.3f}") print(f"标准化互信息: {metrics['nmi']:.3f}") print(f"同质性分数: {metrics['homogeneity']:.3f}") # 可视化聚类结果 plot_clustering_results(X, labels, true_labels) return metricsdef plot_clustering_results(X, labels, true_labels=None): """可视化聚类结果""" from sklearn.decomposition import PCA # 使用PCA降维到2维进行可视化 pca = PCA(n_components=2, random_state=42) X_2d = pca.fit_transform(X) plt.figure(figsize=(12, 4)) # 1. 聚类结果 plt.subplot(1, 3, 1) scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], c=labels, cmap='tab20c') plt.colorbar(scatter) plt.xlabel('PC1') plt.ylabel('PC2') plt.title('聚类结果') # 2. 真实标签(如果有) if true_labels is not None: plt.subplot(1, 3, 2) scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], c=true_labels, cmap='tab20c') plt.colorbar(scatter) plt.xlabel('PC1') plt.ylabel('PC2') plt.title('真实标签') # 3. 轮廓分析 plt.subplot(1, 3, 3) from sklearn.metrics import silhouette_samples silhouette_vals = silhouette_samples(X, labels) y_lower = 10 for i in np.unique(labels): cluster_silhouette_vals = silhouette_vals[labels == i] cluster_silhouette_vals.sort() size_cluster_i = cluster_silhouette_vals.shape[0] y_upper = y_lower + size_cluster_i plt.fill_betweenx(np.arange(y_lower, y_upper), 0, cluster_silhouette_vals, alpha=0.7) plt.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i)) y_lower = y_upper + 10 plt.axvline(x=np.mean(silhouette_vals), color="red", linestyle="--") plt.xlabel('轮廓系数') plt.ylabel('聚类标签') plt.title('轮廓分析图') plt.tight_layout() plt.show()
五、模型诊断与优化
1. 学习曲线分析
from sklearn.model_selection import learning_curvedef plot_learning_curve(model, X, y, cv=5, train_sizes=np.linspace(0.1, 1.0, 10)): """绘制学习曲线""" train_sizes, train_scores, val_scores = learning_curve( model, X, y, cv=cv, n_jobs=-1, train_sizes=train_sizes, scoring='accuracy' ) train_scores_mean = np.mean(train_scores, axis=1) train_scores_std = np.std(train_scores, axis=1) val_scores_mean = np.mean(val_scores, axis=1) val_scores_std = np.std(val_scores, axis=1) plt.figure(figsize=(10, 6)) plt.plot(train_sizes, train_scores_mean, 'o-', color='r', label='训练得分') plt.fill_between(train_sizes, train_scores_mean - train_scores_std, train_scores_mean + train_scores_std, alpha=0.1, color='r') plt.plot(train_sizes, val_scores_mean, 'o-', color='g', label='验证得分') plt.fill_between(train_sizes, val_scores_mean - val_scores_std, val_scores_mean + val_scores_std, alpha=0.1, color='g') plt.xlabel('训练样本数') plt.ylabel('得分') plt.legend(loc='best') plt.title('学习曲线') plt.grid(True) plt.show() # 诊断模型问题 final_train_score = train_scores_mean[-1] final_val_score = val_scores_mean[-1] gap = final_train_score - final_val_score if gap > 0.1 and final_train_score > 0.9: print("警告:模型可能过拟合!") print(f"训练集-验证集差距: {gap:.3f}") elif final_val_score < 0.7: print("警告:模型可能欠拟合!") print(f"验证集得分偏低: {final_val_score:.3f}") else: print("模型表现良好") print(f"训练集得分: {final_train_score:.3f}") print(f"验证集得分: {final_val_score:.3f}") return train_scores, val_scores# 使用学习曲线诊断模型plot_learning_curve( RandomForestClassifier(n_estimators=100, random_state=42), X_train_scaled, y_train, cv=5)
2. 验证曲线(超参数影响)
from sklearn.model_selection import validation_curvedef plot_validation_curve(model, X, y, param_name, param_range): """绘制验证曲线""" train_scores, val_scores = validation_curve( model, X, y, param_name=param_name, param_range=param_range, cv=5, scoring='accuracy', n_jobs=-1 ) train_scores_mean = np.mean(train_scores, axis=1) train_scores_std = np.std(train_scores, axis=1) val_scores_mean = np.mean(val_scores, axis=1) val_scores_std = np.std(val_scores, axis=1) plt.figure(figsize=(10, 6)) plt.plot(param_range, train_scores_mean, 'o-', color='r', label='训练得分') plt.fill_between(param_range, train_scores_mean - train_scores_std, train_scores_mean + train_scores_std, alpha=0.1, color='r') plt.plot(param_range, val_scores_mean, 'o-', color='g', label='验证得分') plt.fill_between(param_range, val_scores_mean - val_scores_std, val_scores_mean + val_scores_std, alpha=0.1, color='g') plt.xlabel(param_name) plt.ylabel('得分') plt.legend(loc='best') plt.title(f'验证曲线: {param_name}') plt.grid(True) plt.show() # 找到最佳参数值 best_idx = np.argmax(val_scores_mean) best_param = param_range[best_idx] best_score = val_scores_mean[best_idx] print(f"最佳{param_name}: {best_param}") print(f"最佳验证得分: {best_score:.3f}") return best_param, best_score# 分析决策树深度的影响plot_validation_curve( DecisionTreeClassifier(random_state=42), X_train_scaled, y_train, 'max_depth', [1, 3, 5, 7, 9, 11, 13, 15])
3. 特征重要性分析
def analyze_feature_importance(model, feature_names, X_train, y_train): """分析特征重要性""" # 训练模型 if isinstance(model, type): model = model() model.fit(X_train, y_train) # 获取特征重要性 if hasattr(model, 'feature_importances_'): importances = model.feature_importances_ elif hasattr(model, 'coef_'): importances = np.abs(model.coef_[0]) else: print("模型不支持特征重要性分析") return # 创建特征重要性DataFrame feature_importance_df = pd.DataFrame({ 'feature': feature_names, 'importance': importances }).sort_values('importance', ascending=False) # 绘制特征重要性 plt.figure(figsize=(10, 6)) plt.barh(range(len(feature_importance_df)), feature_importance_df['importance']) plt.yticks(range(len(feature_importance_df)), feature_importance_df['feature']) plt.xlabel('特征重要性') plt.title('特征重要性排序') plt.tight_layout() plt.show() # 累积重要性 feature_importance_df['cumulative_importance'] = \ feature_importance_df['importance'].cumsum() / \ feature_importance_df['importance'].sum() # 找到最重要的特征(累积重要性达到95%) important_features = feature_importance_df[ feature_importance_df['cumulative_importance'] <= 0.95 ] print(f"重要特征数量(累积95%重要性): {len(important_features)}/{len(feature_names)}") print(f"最重要的5个特征:") for i, row in feature_importance_df.head().iterrows(): print(f" {row['feature']}: {row['importance']:.4f}") return feature_importance_df# 分析随机森林的特征重要性feature_importance_df = analyze_feature_importance( RandomForestClassifier(n_estimators=100, random_state=42), feature_names=X.columns, X_train=X_train_scaled, y_train=y_train)
六、模型集成方法
1. 基础集成方法
from sklearn.ensemble import ( VotingClassifier, VotingRegressor, StackingClassifier, StackingRegressor, BaggingClassifier, BaggingRegressor)def create_ensemble_models(X_train, y_train, X_test, y_test): """创建和比较集成模型""" # 基础模型 from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.svm import SVC estimators = [ ('lr', LogisticRegression(max_iter=1000, random_state=42)), ('dt', DecisionTreeClassifier(max_depth=5, random_state=42)), ('svc', SVC(kernel='rbf', probability=True, random_state=42)) ] # 1. 投票集成 voting_hard = VotingClassifier(estimators=estimators, voting='hard') voting_soft = VotingClassifier(estimators=estimators, voting='soft') # 2. 堆叠集成 stack_clf = StackingClassifier( estimators=estimators, final_estimator=LogisticRegression(), cv=5 ) # 3. Bagging集成 bagging_clf = BaggingClassifier( estimator=DecisionTreeClassifier(), n_estimators=100, max_samples=0.8, max_features=0.8, random_state=42 ) # 训练和评估所有集成模型 ensemble_models = { '硬投票': voting_hard, '软投票': voting_soft, '堆叠': stack_clf, 'Bagging': bagging_clf } results = {} for name, model in ensemble_models.items(): print(f"\n{'='*50}") print(f"训练集成模型: {name}") model.fit(X_train, y_train) y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) print(f"测试集准确率: {accuracy:.3f}") results[name] = { 'model': model, 'accuracy': accuracy, 'predictions': y_pred } return results# 创建和比较集成模型ensemble_results = create_ensemble_models( X_train_scaled, y_train, X_test_scaled, y_test)
七、模型部署准备
1. 创建模型管道
from sklearn.pipeline import Pipelinefrom sklearn.compose import ColumnTransformerdef create_model_pipeline(model, numeric_features, categorical_features): """创建包含预处理和模型的完整管道""" # 数值型特征处理 numeric_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) # 类别型特征处理 categorical_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore')) ]) # 组合预处理步骤 preprocessor = ColumnTransformer( transformers=[ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features) ]) # 创建完整管道 pipeline = Pipeline(steps=[ ('preprocessor', preprocessor), ('classifier', model) ]) return pipeline# 使用管道numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()categorical_features = X.select_dtypes(include=['object']).columns.tolist()model_pipeline = create_model_pipeline( RandomForestClassifier(n_estimators=100, random_state=42), numeric_features, categorical_features)# 训练管道model_pipeline.fit(X_train, y_train)# 使用管道进行预测y_pred = model_pipeline.predict(X_test)
2. 模型保存与加载
import joblibimport pickleimport jsonfrom datetime import datetimedef save_model(model, model_name, feature_names, metrics, save_dir='models'): """保存模型及相关信息""" import os os.makedirs(save_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') model_filename = f"{save_dir}/{model_name}_{timestamp}.joblib" metadata_filename = f"{save_dir}/{model_name}_{timestamp}_metadata.json" # 保存模型 joblib.dump(model, model_filename) print(f"模型已保存到: {model_filename}") # 保存元数据 metadata = { 'model_name': model_name, 'timestamp': timestamp, 'feature_names': feature_names.tolist() if hasattr(feature_names, 'tolist') else list(feature_names), 'metrics': metrics, 'model_type': type(model).__name__, 'training_date': datetime.now().isoformat() } with open(metadata_filename, 'w') as f: json.dump(metadata, f, indent=2) print(f"元数据已保存到: {metadata_filename}") return model_filename, metadata_filenamedef load_model(model_filename, metadata_filename=None): """加载模型及相关信息""" # 加载模型 model = joblib.load(model_filename) print(f"模型已从 {model_filename} 加载") # 加载元数据(如果提供) if metadata_filename: with open(metadata_filename, 'r') as f: metadata = json.load(f) print(f"元数据已从 {metadata_filename} 加载") print(f"模型名称: {metadata['model_name']}") print(f"训练日期: {metadata['training_date']}") print(f"特征数量: {len(metadata['feature_names'])}") return model, metadata else: return model# 保存模型best_model = rf_grid_search.best_estimator_model_file, meta_file = save_model( best_model, 'random_forest_classifier', X.columns, {'accuracy': accuracy_score(y_test, best_model.predict(X_test_scaled))})# 加载模型loaded_model, loaded_metadata = load_model(model_file, meta_file)
八、实战项目:完整机器学习流程
def complete_machine_learning_pipeline(data, target_column, test_size=0.2): """完整的机器学习管道""" print("=" * 60) print("开始完整机器学习流程") print("=" * 60) # 1. 准备数据 X = data.drop(columns=[target_column]) y = data[target_column] # 2. 划分数据集 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=42, stratify=y ) print(f"训练集大小: {X_train.shape}") print(f"测试集大小: {X_test.shape}") print(f"类别分布: {np.bincount(y_train)}") # 3. 特征工程 print("\n步骤1: 特征工程") # 分离数值型和类别型特征 numeric_features = X.select_dtypes(include=['int64', 'float64']).columns categorical_features = X.select_dtypes(include=['object']).columns # 创建预处理管道 numeric_transformer = Pipeline([ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) categorical_transformer = Pipeline([ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) ]) preprocessor = ColumnTransformer([ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features) ]) # 应用预处理 X_train_processed = preprocessor.fit_transform(X_train) X_test_processed = preprocessor.transform(X_test) # 4. 模型训练与选择 print("\n步骤2: 模型训练与选择") models = { 'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42), 'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), 'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42), 'SVM': SVC(kernel='rbf', probability=True, random_state=42) } results = {} for name, model in models.items(): print(f"\n训练 {name}...") # 交叉验证 cv_scores = cross_val_score(model, X_train_processed, y_train, cv=5, scoring='accuracy') # 完整训练 model.fit(X_train_processed, y_train) y_pred = model.predict(X_test_processed) # 评估 accuracy = accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred, average='macro') results[name] = { 'model': model, 'cv_mean': cv_scores.mean(), 'cv_std': cv_scores.std(), 'test_accuracy': accuracy, 'test_f1': f1 } print(f" 交叉验证准确率: {cv_scores.mean():.3f} (+/- {cv_scores.std()*2:.3f})") print(f" 测试集准确率: {accuracy:.3f}") print(f" 测试集F1分数: {f1:.3f}") # 5. 选择最佳模型 print("\n步骤3: 选择最佳模型") best_model_name = max(results.keys(), key=lambda x: results[x]['test_accuracy']) best_model = results[best_model_name]['model'] print(f"最佳模型: {best_model_name}") print(f"测试集准确率: {results[best_model_name]['test_accuracy']:.3f}") # 6. 超参数调优 print("\n步骤4: 超参数调优") if best_model_name == 'Random Forest': param_grid = { 'n_estimators': [100, 200, 300], 'max_depth': [None, 10, 20], 'min_samples_split': [2, 5, 10] } elif best_model_name == 'Logistic Regression': param_grid = { 'C': [0.1, 1, 10], 'penalty': ['l1', 'l2'] } else: param_grid = {} if param_grid: grid_search = GridSearchCV( best_model, param_grid, cv=5, scoring='accuracy', n_jobs=-1 ) grid_search.fit(X_train_processed, y_train) print(f"最佳参数: {grid_search.best_params_}") print(f"最佳交叉验证得分: {grid_search.best_score_:.3f}") best_model = grid_search.best_estimator_ # 7. 最终评估 print("\n步骤5: 最终评估") y_pred_final = best_model.predict(X_test_processed) y_pred_proba = best_model.predict_proba(X_test_processed) final_metrics = evaluate_classification_model(y_test, y_pred_final, y_pred_proba) # 8. 特征重要性(如果可用) if hasattr(best_model, 'feature_importances_'): print("\n步骤6: 特征重要性分析") # 获取特征名称(包括OneHot编码后的) if hasattr(preprocessor, 'get_feature_names_out'): feature_names = preprocessor.get_feature_names_out() else: feature_names = [f'feature_{i}' for i in range(X_train_processed.shape[1])] analyze_feature_importance(best_model, feature_names, X_train_processed, y_train) # 9. 创建最终管道 print("\n步骤7: 创建最终管道") final_pipeline = Pipeline([ ('preprocessor', preprocessor), ('classifier', best_model) ]) # 训练完整管道 final_pipeline.fit(X_train, y_train) # 保存模型 print("\n步骤8: 保存模型") model_file, meta_file = save_model( final_pipeline, f'final_{best_model_name.replace(" ", "_").lower()}', X.columns, final_metrics ) print("\n" + "=" * 60) print("机器学习流程完成!") print("=" * 60) return final_pipeline, results, final_metrics# 使用示例# final_pipeline, results, metrics = complete_machine_learning_pipeline(# data, 'target_column'# )
模型训练与评估是机器学习项目的核心环节。 通过系统的训练流程、科学的评估方法和不断的调优迭代,我们可以构建出强大而可靠的机器学习模型。记住:没有最好的模型,只有最适合当前数据和问题的模型。