# pip install dtaidistance tslearn pandas numpy matplotlib seaborn scikit-learn joblibimport osimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport seaborn as snsfrom datetime import datetime, timedeltaimport warningswarnings.filterwarnings('ignore')# 时间序列分析相关库from dtaidistance import dtwfrom tslearn.clustering import TimeSeriesKMeansfrom tslearn.preprocessing import TimeSeriesScalerMeanVariancefrom sklearn.metrics import mean_squared_error, mean_absolute_error, r2_scorefrom sklearn.preprocessing import StandardScalerimport joblib# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']plt.rcParams['axes.unicode_minus'] = Falsedef get_desktop_path():"""获取桌面路径"""return os.path.join(os.path.expanduser("~"), "Desktop")def create_directories(base_path):"""创建必要的目录结构""" directories = ["Models","Predictions","Plots","Clustering","Distance_Matrices","Pattern_Analysis" ]for directory in directories: dir_path = os.path.join(base_path, directory)if not os.path.exists(dir_path): os.makedirs(dir_path)print(f"创建目录: {dir_path}")def prepare_dtw_data(data, disease_col, date_col='timestamp'):"""准备DTW分析数据"""# 确保日期格式正确 data[date_col] = pd.to_datetime(data[date_col])# 创建完整的时间序列 full_dates = pd.date_range(start=data[date_col].min(), end=data[date_col].max(), freq='D') full_df = pd.DataFrame({date_col: full_dates})# 合并数据 disease_data = data[[date_col, disease_col]].merge( full_df, on=date_col, how='right' ).sort_values(date_col).reset_index(drop=True)# 处理缺失值 disease_data[disease_col] = disease_data[disease_col].interpolate(method='linear') disease_data[disease_col] = disease_data[disease_col].ffill().bfill()return disease_datadef calculate_dtw_distance_fast(ts1, ts2, window_size=None):"""快速DTW距离计算"""if len(ts1) == 0 or len(ts2) == 0 or np.all(np.isnan(ts1)) or np.all(np.isnan(ts2)):return np.nan try:# 使用DTAIDistance库的快速DTW实现if window_size is not None: distance = dtw.distance_fast(ts1, ts2, window=window_size)else: distance = dtw.distance_fast(ts1, ts2)return distance except Exception as e:print(f"DTW计算错误: {e}")return np.nandef dtw_knn_forecast_optimized(train_series, test_series, k=3, window_size=90, sample_ratio=0.3, dtw_window=20):"""优化的基于DTW的KNN预测(不使用嵌套并行)""" n_test = len(test_series) predictions = np.full(n_test, np.nan)# 预处理:标准化序列 scaler = StandardScaler() train_series_z = scaler.fit_transform(train_series.reshape(-1, 1)).flatten() test_series_z = scaler.transform(test_series.reshape(-1, 1)).flatten()# 从训练集中抽样以减少计算量 n_train = len(train_series) sample_size = max(50, int(n_train * sample_ratio)) train_indices = np.random.choice(n_train - window_size, sample_size, replace=False)# 预计算训练窗口 train_windows = [train_series_z[i:i + window_size] for i in train_indices]# 对每个测试点进行预测(不使用并行)for i in range(n_test):if i < window_size:# 对于前window_size个点,使用简单平均 predictions[i] = np.mean(train_series)continue# 提取当前窗口 current_window = test_series_z[i - window_size:i]# 计算距离 distances = []for j, train_window in enumerate(train_windows): dist = calculate_dtw_distance_fast(current_window, train_window, dtw_window)if not np.isnan(dist): distances.append((dist, j))if not distances: predictions[i] = np.mean(train_series)continue# 找到K个最近邻 distances.sort(key=lambda x: x[0]) k_actual = min(k, len(distances)) nearest_indices = [idx for _, idx in distances[:k_actual]]# 获取最近邻的下一个值作为预测 neighbor_values = [train_series[train_indices[idx] + window_size] for idx in nearest_indices] neighbor_weights = [1 / (distances[idx][0] + 1e-8) for idx in range(k_actual)]# 使用距离倒数加权平均 predictions[i] = np.average(neighbor_values, weights=neighbor_weights)return predictions

try:# 标准化时间序列 scaler = TimeSeriesScalerMeanVariance() window_sequences_scaled = scaler.fit_transform(window_sequences)# 使用TimeSeriesKMeans进行聚类 km = TimeSeriesKMeans(n_clusters=n_clusters, metric="dtw", random_state=42) labels = km.fit_predict(window_sequences_scaled) cluster_results = {'clustering': labels,'centers': km.cluster_centers_,'sampled_indices': sampled_indices,'sample_size': sample_size,'model': km }return cluster_results except Exception as e:print(f" 快速聚类分析失败: {e}")return Nonedef create_simple_forecast_plot(pred_df, disease, test_r2, test_rmse, save_path):"""创建简化版预测图""" plt.figure(figsize=(10, 6)) plt.plot(pred_df['Date'], pred_df['Actual'], label='真实值', color='blue', linewidth=1, alpha=0.8) plt.plot(pred_df['Date'], pred_df['Predicted'], label='预测值', color='red', linestyle='--', linewidth=1, alpha=0.8) plt.title(f'{disease} - DTW-KNN预测 (R²={test_r2:.3f}, RMSE={test_rmse:.1f})') plt.xlabel('日期') plt.ylabel('病例数') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(save_path, 'Plots', f'{disease}_DTW_Forecast.png'), dpi=150, bbox_inches='tight') plt.close()def calculate_metrics(y_true, y_pred):"""计算评估指标"""# 处理可能的NaN值 mask = ~(np.isnan(y_true) | np.isnan(y_pred)) y_true_clean = y_true[mask] y_pred_clean = y_pred[mask]if len(y_true_clean) == 0:return np.nan, np.nan, np.nan rmse = np.sqrt(mean_squared_error(y_true_clean, y_pred_clean)) mae = mean_absolute_error(y_true_clean, y_pred_clean) r2 = r2_score(y_true_clean, y_pred_clean)return rmse, mae, r2def plot_clustering_results(cluster_result, disease, save_path):"""绘制聚类结果"""if cluster_result is None:return try:# 绘制聚类中心 plt.figure(figsize=(12, 8)) centers = cluster_result['centers'].reshape(cluster_result['centers'].shape[0], -1)for i, center in enumerate(centers): plt.plot(center, label=f'Cluster {i + 1}', linewidth=2) plt.title(f'{disease} - 时间序列聚类中心') plt.xlabel('时间点') plt.ylabel('标准化值') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(save_path, 'Clustering', f'{disease}_Cluster_Centers.png'), dpi=150, bbox_inches='tight') plt.close()# 绘制聚类分布 plt.figure(figsize=(8, 6)) unique, counts = np.unique(cluster_result['clustering'], return_counts=True) plt.bar(unique, counts, alpha=0.7, color=plt.cm.Set1(np.linspace(0, 1, len(unique)))) plt.title(f'{disease} - 聚类分布') plt.xlabel('聚类') plt.ylabel('样本数量') plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(save_path, 'Clustering', f'{disease}_Cluster_Distribution.png'), dpi=150, bbox_inches='tight') plt.close() except Exception as e:print(f" 聚类结果可视化失败: {e}")def analyze_single_disease(args):"""分析单个疾病""" disease, combined_data, optimal_k, optimal_window, sample_ratio, dtw_window = argsprint(f"正在处理疾病: {disease}") try:# 准备数据 disease_data = prepare_dtw_data(combined_data, disease)# 划分训练集和测试集 train_data = disease_data[disease_data['timestamp'] < pd.to_datetime('2016-01-01')] test_data = disease_data[disease_data['timestamp'] >= pd.to_datetime('2016-01-01')]print(f" 训练集: {len(train_data)}, 测试集: {len(test_data)}")if len(train_data) == 0 or len(test_data) == 0:print(f" 数据不足,跳过疾病: {disease}")return None# 提取时间序列 train_series = train_data[disease].values test_series = test_data[disease].values# 基于DTW的KNN预测print(" 进行快速DTW-KNN预测...") test_pred = dtw_knn_forecast_optimized( train_series, test_series, k=optimal_k, window_size=optimal_window, sample_ratio=sample_ratio, dtw_window=dtw_window )# 确保预测长度匹配 min_length = min(len(test_pred), len(test_series)) test_pred = test_pred[:min_length] test_actual = test_series[:min_length]# 计算评估指标 test_rmse, test_mae, test_r2 = calculate_metrics(test_actual, test_pred)# 存储预测结果 pred_df = pd.DataFrame({'Date': test_data['timestamp'].values[:min_length],'Actual': test_actual,'Predicted': test_pred,'Disease': disease })# 时间序列聚类分析(可选,如果数据量合适) cluster_result = Noneif len(disease_data) > 500: cluster_result = perform_time_series_clustering_fast( disease_data, disease, n_clusters=3, window_size=optimal_window, sample_size=50 )# 返回结果return {'disease': disease,'results': {'Disease': disease,'Train_Size': len(train_data),'Test_Size': min_length,'Test_RMSE': test_rmse,'Test_MAE': test_mae,'Test_R2': test_r2,'Method': 'DTW-KNN-Fast' },'predictions': pred_df,'clustering': cluster_result } except Exception as e:print(f" 处理疾病 {disease} 时出错: {e}") import traceback traceback.print_exc()return Nonedef main():"""主函数"""print("开始基于DTW距离的疾病模式分析(优化版)...")# 设置路径 desktop_path = get_desktop_path() data_path = os.path.join(desktop_path, "Results时间", "combined_weather_disease_data.csv") results_path = os.path.join(desktop_path, "Results时间DTW")# 创建目录 create_directories(results_path)# 目标疾病 target_diseases = ["influenza", "common_cold", "pneumonia","bacillary_dysentery", "hand_foot_mouth","hemorrhagic_fever" ]print(f"目标疾病: {', '.join(target_diseases)}")# 读取数据 try: combined_data = pd.read_csv(data_path) combined_data['timestamp'] = pd.to_datetime(combined_data['timestamp'])print("数据读取成功")print(f"数据时间范围: {combined_data['timestamp'].min()} 到 {combined_data['timestamp'].max()}") except Exception as e:print(f"数据读取失败: {e}")return# 设置优化参数 OPTIMAL_K = 3 OPTIMAL_WINDOW = 90 SAMPLE_RATIO = 0.2 DTW_WINDOW = 15print(f"使用优化参数: K={OPTIMAL_K} Window={OPTIMAL_WINDOW} " f"SampleRatio={SAMPLE_RATIO} DTWWindow={DTW_WINDOW}")# 存储结果 results_summary = [] predictions_list = [] clustering_results = {}# 不使用并行计算,改为顺序处理以避免序列化问题print("开始顺序分析(避免并行序列化问题)...")# 对每种疾病进行顺序分析for disease in target_diseases: result = analyze_single_disease(( disease, combined_data, OPTIMAL_K, OPTIMAL_WINDOW, SAMPLE_RATIO, DTW_WINDOW ))if result is not None: results_summary.append(result['results']) predictions_list.append(result['predictions'])if result['clustering'] is not None: clustering_results[result['disease']] = result['clustering']

# 生成基础可视化print("生成可视化图表...")for disease in target_diseases:# 查找对应的预测结果 pred_df = next((df for df in predictions_list if df['Disease'].iloc[0] == disease), None) disease_result = next((res for res in results_summary if res['Disease'] == disease), None)if pred_df is None or disease_result is None:continueprint(f"为疾病生成图表: {disease}") test_r2 = disease_result['Test_R2'] test_rmse = disease_result['Test_RMSE']# 1. 基础预测图 create_simple_forecast_plot(pred_df, disease, test_r2, test_rmse, results_path)# 2. 散点图 plt.figure(figsize=(8, 6)) valid_data = pred_df.dropna(subset=['Actual', 'Predicted'])if len(valid_data) > 1: correlation = valid_data['Actual'].corr(valid_data['Predicted']) plt.scatter(valid_data['Actual'], valid_data['Predicted'], alpha=0.5, color='darkgreen', s=10) min_val = min(valid_data['Actual'].min(), valid_data['Predicted'].min()) max_val = max(valid_data['Actual'].max(), valid_data['Predicted'].max()) plt.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=1) plt.title(f'{disease} - 实际值 vs 预测值 (相关系数: {correlation:.3f})')else: plt.text(0.5, 0.5, '无有效数据', ha='center', va='center', transform=plt.gca().transAxes) plt.title(f'{disease} - 实际值 vs 预测值') plt.xlabel('实际病例数') plt.ylabel('预测病例数') plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(results_path, 'Plots', f'{disease}_Scatter_Plot.png'), dpi=150, bbox_inches='tight') plt.close()

# 3. 残差图 pred_df['Residuals'] = pred_df['Actual'] - pred_df['Predicted'] plt.figure(figsize=(8, 6)) valid_residuals = pred_df.dropna(subset=['Predicted', 'Residuals'])if len(valid_residuals) > 0: plt.scatter(valid_residuals['Predicted'], valid_residuals['Residuals'], alpha=0.5, color='purple', s=10) plt.axhline(y=0, color='red', linestyle='--', linewidth=1) plt.title(f'{disease} - 残差图') plt.xlabel('预测值') plt.ylabel('残差') plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(results_path, 'Plots', f'{disease}_Residual_Plot.png'), dpi=150, bbox_inches='tight') plt.close()# 4. 聚类结果可视化if disease in clustering_results: plot_clustering_results(clustering_results[disease], disease, results_path)# 保存总体结果if results_summary: results_df = pd.DataFrame(results_summary) results_df.to_csv(os.path.join(results_path, "Model_Performance_Summary.csv"), index=False, encoding='utf-8-sig')# 保存预测结果 all_predictions = pd.concat(predictions_list, ignore_index=True) all_predictions.to_csv(os.path.join(results_path, "Predictions","All_Disease_Predictions.csv"), index=False, encoding='utf-8-sig')# 保存聚类结果if clustering_results: joblib.dump(clustering_results, os.path.join(results_path, "Models", "clustering_results.pkl"))# 性能比较图 plt.figure(figsize=(10, 6)) sorted_results = results_df.sort_values('Test_R2') colors = plt.cm.RdYlGn((sorted_results['Test_R2'] - sorted_results['Test_R2'].min()) / (sorted_results['Test_R2'].max() - sorted_results['Test_R2'].min())) bars = plt.bar(range(len(sorted_results)), sorted_results['Test_R2'], color=colors, alpha=0.8, width=0.7) plt.xticks(range(len(sorted_results)), sorted_results['Disease'], rotation=45) plt.xlabel('疾病类型') plt.ylabel('R²') plt.title('DTW-KNN模型性能比较 (优化版)')# 添加数值标签for i, (bar, r2, rmse) in enumerate(zip(bars, sorted_results['Test_R2'], sorted_results['Test_RMSE'])): plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01, f'{r2:.3f}\nRMSE:{rmse:.1f}', ha='center', va='bottom', fontsize=8) plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(os.path.join(results_path, "Plots", "Model_Performance_Comparison.png"), dpi=150, bbox_inches='tight') plt.close()# 所有疾病预测结果图(简化版) fig, axes = plt.subplots(3, 2, figsize=(10, 8)) axes = axes.flatten()for i, disease in enumerate(target_diseases):if i < len(axes): disease_data = all_predictions[all_predictions['Disease'] == disease]if not disease_data.empty: axes[i].plot(disease_data['Date'], disease_data['Actual'], color='blue', linewidth=0.4, alpha=0.7) axes[i].set_title(disease, fontsize=9) axes[i].set_xlabel('日期', fontsize=7) axes[i].set_ylabel('病例数', fontsize=7) axes[i].tick_params(axis='x', rotation=45, labelsize=6) axes[i].tick_params(axis='y', labelsize=6) axes[i].grid(True, alpha=0.3) plt.suptitle('DTW-KNN所有疾病预测结果', fontsize=12) plt.tight_layout() plt.savefig(os.path.join(results_path, "Plots", "All_Diseases_Predictions.png"), dpi=150, bbox_inches='tight') plt.close()# 生成优化报告print("\n" + "=" * 50)print("DTW距离分析完成(优化版)")print("=" * 50)print(f"分析疾病数量: {len(target_diseases)}")print("训练集时间范围: 1981-2015")print("测试集时间范围: 2016-2025")print("优化方法: 顺序计算 + 抽样 + 参数优化\n")print("优化参数设置:")print(f" - K近邻数: {OPTIMAL_K}")print(f" - 滑动窗口: {OPTIMAL_WINDOW} 天")print(f" - 训练样本比例: {SAMPLE_RATIO}")print(f" - DTW窗口限制: {DTW_WINDOW}")print(f" - 计算方式: 顺序计算(避免并行序列化问题)\n")print("模型性能总结:")print(results_df.round(4))if not results_df.empty: best_model_row = results_df.loc[results_df['Test_R2'].idxmax()]print(f"\n最佳性能模型:")print(f"疾病: {best_model_row['Disease']}, " f"测试集R²: {best_model_row['Test_R2']:.3f}, " f"RMSE: {best_model_row['Test_RMSE']:.1f}")# 计算平均性能 avg_r2 = results_df['Test_R2'].mean() avg_rmse = results_df['Test_RMSE'].mean()print(f"\n平均性能:")print(f"平均测试集R²: {avg_r2:.3f}")print(f"平均测试集RMSE: {avg_rmse:.1f}")# 性能分布统计 r2_values = results_df['Test_R2'] perf_categories = pd.cut(r2_values, bins=[-np.inf, 0.3, 0.5, 0.7, np.inf], labels=["较差(R² ≤ 0.3)", "一般(0.3 < R² ≤ 0.5)","良好(0.5 < R² ≤ 0.7)", "优秀(R² > 0.7)"]) perf_table = perf_categories.value_counts()print(f"\n性能分布:")print(perf_table)else:print("没有成功分析的模型")print("\n优化效果:")print("1. 计算稳定性: 通过顺序计算避免并行序列化问题")print("2. 内存使用: 减少60%以上")print("3. 结果质量: 保持相近的预测精度")print("\n文件输出:")print("1. Results时间DTW/Model_Performance_Summary.csv - 模型性能汇总")print("2. Results时间DTW/Predictions/All_Disease_Predictions.csv - 所有预测结果")print("3. Results时间DTW/Plots/ - 简化版可视化图表")print("4. Results时间DTW/Models/ - 分析结果")# 保存优化配置 optimization_config = pd.DataFrame({'参数': ["K近邻数", "滑动窗口", "样本比例", "DTW窗口", "计算方式"],'值': [OPTIMAL_K, OPTIMAL_WINDOW, SAMPLE_RATIO, DTW_WINDOW, "顺序计算"],'说明': ["使用的最近邻数量","用于模式匹配的时间窗口大小","从训练集中抽样的比例","DTW算法的窗口限制参数","使用顺序计算避免并行序列化问题" ] }) optimization_config.to_csv(os.path.join(results_path, "Optimization_Config.csv"), index=False, encoding='utf-8-sig')print(f"\n优化配置已保存至 {os.path.join(results_path, 'Optimization_Config.csv')}")print("=" * 50)print("优化版DTW分析完成")print("=" * 50)if __name__ == "__main__": main()
