Option Explicit' 定义常量Const DEFAULT_PATH As String = "C:\生产日报\"Const HEADER_ROWS As Long = 1Const MAX_COLUMNS As Long = 100' 自定义数据类型,用于存储文件信息Type FileInfo FullPath As String FileName As String FileSize As Long LastModified As Date Processed As Boolean ErrorMessage As StringEnd Type' 主合并函数Sub MergeWorkbooksAdvanced() Dim startTime As Double startTime = Timer Dim fileList() As FileInfo Dim fileCount As Long Dim destWs As Worksheet Dim result As Boolean ' 获取目标工作表 Set destWs = GetOrCreateWorksheet("汇总") ' 让用户选择文件夹 Dim sourcePath As String sourcePath = BrowseForFolder("请选择包含日报文件的文件夹") If sourcePath = "" Then MsgBox "未选择文件夹,操作已取消。", vbExclamation Exit Sub End If ' 获取文件列表 fileList = GetExcelFiles(sourcePath, fileCount) If fileCount = 0 Then MsgBox "在所选文件夹中未找到Excel文件。", vbExclamation Exit Sub End If ' 显示文件列表对话框 If Not ShowFileListDialog(fileList, fileCount) Then MsgBox "操作已取消。", vbInformation Exit Sub End If ' 清空目标表(保留表头) ClearDestinationSheet destWs ' 处理文件 Dim processedCount As Long, successCount As Long, errorCount As Long Dim totalRows As Long ProcessFiles fileList, fileCount, destWs, processedCount, successCount, errorCount, totalRows ' 添加汇总信息 AddSummaryInfo destWs, fileCount, successCount, errorCount, totalRows ' 格式化和美化 FormatDestinationSheet destWs ' 生成报告 Dim endTime As Double endTime = Timer ShowCompletionReport fileCount, successCount, errorCount, totalRows, endTime - startTime, fileList ' 保存结果 SaveResult destWsEnd Sub' 获取或创建工作表Function GetOrCreateWorksheet(sheetName As String) As Worksheet On Error Resume Next Set GetOrCreateWorksheet = ThisWorkbook.Worksheets(sheetName) If GetOrCreateWorksheet Is Nothing Then Set GetOrCreateWorksheet = ThisWorkbook.Worksheets.Add GetOrCreateWorksheet.Name = sheetName End IfEnd Function' 让用户选择文件夹Function BrowseForFolder(Optional title As String = "请选择文件夹") As String Dim folderDialog As FileDialog Set folderDialog = Application.FileDialog(msoFileDialogFolderPicker) With folderDialog .title = title .AllowMultiSelect = False If .Show = -1 Then BrowseForFolder = .SelectedItems(1) Else BrowseForFolder = "" End If End WithEnd Function' 获取Excel文件列表Function GetExcelFiles(folderPath As String, ByRef fileCount As Long) As FileInfo() Dim fileList() As FileInfo Dim fileName As String Dim i As Long ' 初始化数组 ReDim fileList(1 To 1000) ' 预设容量,最多1000个文件 fileCount = 0 ' 确保路径以反斜杠结尾 If Right(folderPath, 1) <> "\" Then folderPath = folderPath & "\" End If ' 获取所有Excel文件 fileName = Dir(folderPath & "*.xls*") Do While fileName <> "" fileCount = fileCount + 1 ' 如果数组不够大,则扩大数组 If fileCount > UBound(fileList) Then ReDim Preserve fileList(1 To UBound(fileList) + 100) End If ' 存储文件信息 With fileList(fileCount) .FullPath = folderPath & fileName .FileName = fileName .FileSize = FileLen(.FullPath) .LastModified = FileDateTime(.FullPath) .Processed = False .ErrorMessage = "" End With fileName = Dir() Loop ' 调整数组到实际大小 ReDim Preserve fileList(1 To fileCount) GetExcelFiles = fileListEnd Function' 显示文件列表对话框Function ShowFileListDialog(fileList() As FileInfo, fileCount As Long) As Boolean ' 这里简化实现,实际应用中可以使用UserForm Dim i As Long Dim fileNames As String For i = 1 To fileCount fileNames = fileNames & i & ". " & fileList(i).FileName & vbCrLf Next i Dim response As VbMsgBoxResult response = MsgBox("找到 " & fileCount & " 个文件:" & vbCrLf & vbCrLf & _ fileNames & vbCrLf & "是否继续处理?", _ vbQuestion + vbYesNo, "确认文件列表") ShowFileListDialog = (response = vbYes)End Function' 清空目标表Sub ClearDestinationSheet(ws As Worksheet) Application.DisplayAlerts = False ' 删除除当前工作表外的所有工作表 Dim sht As Worksheet For Each sht In ThisWorkbook.Worksheets If sht.Name <> ws.Name Then sht.Delete End If Next sht Application.DisplayAlerts = True ' 清空数据,但保留表头 If ws.UsedRange.Rows.Count > 1 Then ws.Range("A2:Z" & ws.UsedRange.Rows.Count).ClearContents End If ' 设置基本表头 ws.Cells(1, 1) = "文件来源" ws.Cells(1, 2) = "数据行号" ws.Cells(1, 3) = "日期" ws.Cells(1, 4) = "班组" ws.Cells(1, 5) = "产品型号" ws.Cells(1, 6) = "计划产量" ws.Cells(1, 7) = "实际产量" ws.Cells(1, 8) = "合格数" ws.Cells(1, 9) = "不合格数" ws.Cells(1, 10) = "不合格原因" ws.Cells(1, 11) = "工时" ws.Cells(1, 12) = "效率" ws.Cells(1, 13) = "备注"End Sub' 处理文件Sub ProcessFiles(fileList() As FileInfo, fileCount As Long, _ destWs As Worksheet, _ ByRef processedCount As Long, _ ByRef successCount As Long, _ ByRef errorCount As Long, _ ByRef totalRows As Long) Dim i As Long, destRow As Long destRow = 2 ' 从第2行开始 Application.ScreenUpdating = False Application.Calculation = xlCalculationManual Application.DisplayAlerts = False ' 创建进度条 CreateProgressBar "正在合并文件...", fileCount For i = 1 To fileCount UpdateProgressBar i, fileCount, "正在处理: " & fileList(i).FileName Dim result As Boolean Dim rowsAdded As Long Dim errorMsg As String ' 处理单个文件 result = ProcessSingleFile(fileList(i), destWs, destRow, rowsAdded, errorMsg) fileList(i).Processed = result fileList(i).ErrorMessage = errorMsg If result Then successCount = successCount + 1 totalRows = totalRows + rowsAdded destRow = destRow + rowsAdded Else errorCount = errorCount + 1 End If processedCount = processedCount + 1 Next i ' 关闭进度条 CloseProgressBar Application.ScreenUpdating = True Application.Calculation = xlCalculationAutomatic Application.DisplayAlerts = TrueEnd Sub' 处理单个文件Function ProcessSingleFile(fileInfo As FileInfo, _ destWs As Worksheet, _ destStartRow As Long, _ ByRef rowsAdded As Long, _ ByRef errorMsg As String) As Boolean On Error GoTo ErrorHandler Dim srcWb As Workbook Dim srcWs As Worksheet Dim srcLastRow As Long, srcLastCol As Long Dim destLastRow As Long Dim i As Long, j As Long ' 打开源工作簿 Set srcWb = Workbooks.Open(fileInfo.FullPath, ReadOnly:=True) ' 查找数据所在的工作表 Set srcWs = FindDataWorksheet(srcWb) If srcWs Is Nothing Then errorMsg = "未找到包含数据的工作表" srcWb.Close SaveChanges:=False ProcessSingleFile = False Exit Function End If ' 查找数据的最后一行和最后一列 srcLastRow = FindLastRow(srcWs) srcLastCol = FindLastColumn(srcWs) If srcLastRow <= HEADER_ROWS Then errorMsg = "工作表没有数据行" srcWb.Close SaveChanges:=False ProcessSingleFile = False Exit Function End If ' 确定要复制的列范围 Dim colMap As Collection Set colMap = CreateColumnMapping(srcWs, destWs) ' 复制数据 rowsAdded = 0 Dim srcRow As Long For srcRow = HEADER_ROWS + 1 To srcLastRow Dim isEmptyRow As Boolean isEmptyRow = True ' 检查是否为空行 For j = 1 To srcLastCol If Len(Trim(CStr(srcWs.Cells(srcRow, j).Value))) > 0 Then isEmptyRow = False Exit For End If Next j If Not isEmptyRow Then ' 复制文件来源信息 destWs.Cells(destStartRow + rowsAdded, 1) = fileInfo.FileName ' 复制数据 Dim destCol As Long For destCol = 2 To destWs.UsedRange.Columns.Count Dim srcCol As Variant srcCol = GetSourceColumn(colMap, destCol, srcWs) If Not IsEmpty(srcCol) Then Dim cellValue As Variant cellValue = srcWs.Cells(srcRow, srcCol).Value ' 数据清洗和转换 cellValue = CleanData(cellValue, destCol) destWs.Cells(destStartRow + rowsAdded, destCol) = cellValue End If Next destCol rowsAdded = rowsAdded + 1 End If Next srcRow ' 关闭源工作簿 srcWb.Close SaveChanges:=False ProcessSingleFile = True Exit FunctionErrorHandler: errorMsg = "错误 " & Err.Number & ": " & Err.Description ProcessSingleFile = FalseEnd Function' 查找数据工作表Function FindDataWorksheet(wb As Workbook) As Worksheet Dim ws As Worksheet Dim maxRowCount As Long Dim targetWs As Worksheet Set targetWs = Nothing maxRowCount = 0 ' 查找行数最多的工作表 For Each ws In wb.Worksheets Dim lastRow As Long lastRow = ws.Cells(ws.Rows.Count, 1).End(xlUp).Row If lastRow > maxRowCount And lastRow > 1 Then maxRowCount = lastRow Set targetWs = ws End If Next ws ' 如果没有找到,使用第一个工作表 If targetWs Is Nothing And wb.Worksheets.Count > 0 Then Set targetWs = wb.Worksheets(1) End If Set FindDataWorksheet = targetWsEnd Function' 查找最后一行Function FindLastRow(ws As Worksheet) As Long FindLastRow = ws.Cells(ws.Rows.Count, 1).End(xlUp).RowEnd Function' 查找最后一列Function FindLastColumn(ws As Worksheet) As Long FindLastColumn = ws.Cells(1, ws.Columns.Count).End(xlToLeft).ColumnEnd Function' 创建列映射Function CreateColumnMapping(srcWs As Worksheet, destWs As Worksheet) As Collection Dim colMap As Collection Set colMap = New Collection Dim destCol As Long For destCol = 2 To destWs.UsedRange.Columns.Count Dim destHeader As String destHeader = CStr(destWs.Cells(1, destCol).Value) If Len(Trim(destHeader)) > 0 Then Dim srcCol As Long srcCol = FindColumnByHeader(srcWs, destHeader) If srcCol > 0 Then colMap.Add srcCol, CStr(destCol) Else colMap.Add Empty, CStr(destCol) End If Else colMap.Add Empty, CStr(destCol) End If Next destCol Set CreateColumnMapping = colMapEnd Function' 通过表头查找列Function FindColumnByHeader(ws As Worksheet, headerText As String) As Long Dim lastCol As Long lastCol = ws.Cells(1, ws.Columns.Count).End(xlToLeft).Column Dim col As Long For col = 1 To lastCol If LCase(Trim(CStr(ws.Cells(1, col).Value))) = LCase(Trim(headerText)) Then FindColumnByHeader = col Exit Function End If Next col ' 尝试在更多行中查找表头 Dim row As Long For row = 1 To 5 For col = 1 To lastCol If LCase(Trim(CStr(ws.Cells(row, col).Value))) = LCase(Trim(headerText)) Then FindColumnByHeader = col Exit Function End If Next col Next row FindColumnByHeader = 0End Function' 获取源列Function GetSourceColumn(colMap As Collection, destCol As Long, srcWs As Worksheet) As Variant On Error Resume Next Dim srcCol As Variant srcCol = colMap(CStr(destCol)) If Err.Number <> 0 Then GetSourceColumn = Empty Else GetSourceColumn = srcCol End IfEnd Function' 数据清洗Function CleanData(value As Variant, destCol As Long) As Variant If IsEmpty(value) Or IsNull(value) Then CleanData = "" Exit Function End If Dim strValue As String strValue = CStr(value) ' 去除前后空格 strValue = Trim(strValue) ' 处理特殊字符 strValue = Replace(strValue, Chr(160), " ") ' 替换不间断空格 strValue = Replace(strValue, vbTab, " ") ' 替换制表符 strValue = Replace(strValue, vbCr, "") ' 替换回车符 strValue = Replace(strValue, vbLf, "") ' 替换换行符 ' 根据列类型进行特定清洗 Select Case destCol Case 3: ' 日期列 If IsDate(strValue) Then CleanData = CDate(strValue) Else CleanData = strValue End If Case 6, 7, 8, 9, 11: ' 数值列 If IsNumeric(strValue) Then CleanData = CDbl(strValue) Else ' 尝试提取数字 Dim numStr As String numStr = ExtractNumber(strValue) If IsNumeric(numStr) Then CleanData = CDbl(numStr) Else CleanData = 0 End If End If Case Else CleanData = strValue End SelectEnd Function' 提取数字Function ExtractNumber(text As String) As String Dim i As Long Dim result As String result = "" For i = 1 To Len(text) Dim ch As String ch = Mid(text, i, 1) If ch >= "0" And ch <= "9" Or ch = "." Or ch = "-" Then result = result & ch End If Next i ExtractNumber = resultEnd Function' 创建进度条Sub CreateProgressBar(title As String, maxValue As Long) ' 简化实现,实际应用中可以使用UserForm Application.StatusBar = title & ": 0/" & maxValueEnd Sub' 更新进度条Sub UpdateProgressBar(current As Long, maxValue As Long, message As String) Dim percent As Long percent = current / maxValue * 100 Application.StatusBar = message & " [" & String(percent \ 5, "█") & _ String(20 - (percent \ 5), "░") & "] " & _ percent & "% (" & current & "/" & maxValue & ")" DoEventsEnd Sub' 关闭进度条Sub CloseProgressBar() Application.StatusBar = FalseEnd Sub' 添加汇总信息Sub AddSummaryInfo(ws As Worksheet, fileCount As Long, _ successCount As Long, errorCount As Long, _ totalRows As Long) Dim lastRow As Long lastRow = ws.Cells(ws.Rows.Count, 1).End(xlUp).Row ' 在数据下方添加汇总行 Dim summaryRow As Long summaryRow = lastRow + 2 ws.Cells(summaryRow, 1) = "汇总信息" ws.Cells(summaryRow, 1).Font.Bold = True ws.Cells(summaryRow + 1, 1) = "总文件数" ws.Cells(summaryRow + 1, 2) = fileCount ws.Cells(summaryRow + 2, 1) = "成功合并" ws.Cells(summaryRow + 2, 2) = successCount ws.Cells(summaryRow + 3, 1) = "失败文件" ws.Cells(summaryRow + 3, 2) = errorCount ws.Cells(summaryRow + 4, 1) = "总数据行数" ws.Cells(summaryRow + 4, 2) = totalRows ' 添加时间戳 ws.Cells(summaryRow + 5, 1) = "合并时间" ws.Cells(summaryRow + 5, 2) = Now()End Sub' 格式化目标表Sub FormatDestinationSheet(ws As Worksheet) Dim lastRow As Long, lastCol As Long lastRow = ws.Cells(ws.Rows.Count, 1).End(xlUp).Row lastCol = ws.Cells(1, ws.Columns.Count).End(xlToLeft).Column ' 自动调整列宽 ws.Columns.AutoFit ' 设置表头样式 With ws.Range(ws.Cells(1, 1), ws.Cells(1, lastCol)) .Font.Bold = True .Interior.Color = RGB(91, 155, 213) ' 蓝色 .Font.Color = RGB(255, 255, 255) ' 白色 .HorizontalAlignment = xlCenter End With ' 设置交替行颜色 Dim i As Long For i = 2 To lastRow If i Mod 2 = 0 Then ws.Range(ws.Cells(i, 1), ws.Cells(i, lastCol)).Interior.Color = RGB(242, 242, 242) End If Next i ' 设置边框 With ws.Range(ws.Cells(1, 1), ws.Cells(lastRow, lastCol)).Borders .LineStyle = xlContinuous .Color = RGB(191, 191, 191) .Weight = xlThin End With ' 设置数字格式 For i = 1 To lastCol Dim header As String header = LCase(Trim(CStr(ws.Cells(1, i).Value))) If InStr(header, "日期") > 0 Then ws.Columns(i).NumberFormat = "yyyy-mm-dd" ElseIf InStr(header, "产量") > 0 Or _ InStr(header, "数") > 0 Or _ InStr(header, "工时") > 0 Then ws.Columns(i).NumberFormat = "#,##0" ElseIf InStr(header, "效率") > 0 Or _ InStr(header, "率") > 0 Then ws.Columns(i).NumberFormat = "0.00%" End If Next i ' 冻结首行 ws.Activate ws.Range("A2").Select ActiveWindow.FreezePanes = True ' 添加筛选 ws.Range(ws.Cells(1, 1), ws.Cells(1, lastCol)).AutoFilterEnd Sub' 显示完成报告Sub ShowCompletionReport(fileCount As Long, successCount As Long, _ errorCount As Long, totalRows As Long, _ elapsedTime As Double, fileList() As FileInfo) Dim report As String report = "合并完成!" & vbCrLf & vbCrLf report = report & "处理统计:" & vbCrLf report = report & "总文件数: " & fileCount & vbCrLf report = report & "成功合并: " & successCount & vbCrLf report = report & "失败文件: " & errorCount & vbCrLf report = report & "总数据行: " & totalRows & vbCrLf report = report & "处理时间: " & Format(elapsedTime, "0.00") & " 秒" & vbCrLf & vbCrLf ' 显示失败文件 If errorCount > 0 Then report = report & "失败文件列表:" & vbCrLf Dim i As Long For i = 1 To fileCount If Not fileList(i).Processed Then report = report & fileList(i).FileName & " - " & _ fileList(i).ErrorMessage & vbCrLf End If Next i End If MsgBox report, vbInformation, "合并完成"End Sub' 保存结果Sub SaveResult(ws As Worksheet) Dim savePath As String savePath = ThisWorkbook.Path & "\合并结果_" & _ Format(Now(), "yyyy-mm-dd HH-MM-SS") & ".xlsx" ' 创建新工作簿保存结果 Dim newWb As Workbook Set newWb = Workbooks.Add ws.Copy Before:=newWb.Worksheets(1) newWb.Worksheets(2).Delete ' 保存 Application.DisplayAlerts = False newWb.SaveAs savePath Application.DisplayAlerts = True newWb.Close SaveChanges:=False MsgBox "结果已保存到:" & vbCrLf & savePath, vbInformation, "保存成功"End Sub
import pandas as pdimport globimport osimport refrom datetime import datetimefrom typing import List, Dict, Tuple, Optional, Anyimport warningsfrom tqdm import tqdmimport numpy as npfrom openpyxl import load_workbookfrom openpyxl.styles import PatternFill, Font, Border, Side, Alignmentwarnings.filterwarnings('ignore')class AdvancedExcelMerger: """高级Excel合并器 - 处理各种复杂情况""" def __init__(self, config: Optional[Dict] = None): """ 初始化合并器 参数: config: 配置字典 """ self.config = config or {} # 默认配置 self.default_config = { 'folder_path': None, 'output_path': None, 'sheet_name': 0, # 工作表名或索引 'header_row': 0, # 表头行 'skip_rows': 0, # 跳过的行数 'required_columns': [], # 必需的列 'column_mapping': {}, # 列名映射 'data_types': {}, # 数据类型 'date_formats': ['%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日'], 'encoding': 'utf-8', 'engine': 'openpyxl', # 或 'xlrd' 'max_files': 1000, 'chunk_size': 10000, # 分块处理大小 } # 更新配置 self.default_config.update(self.config) self.config = self.default_config # 统计信息 self.stats = { 'total_files': 0, 'success_files': 0, 'failed_files': [], 'total_rows': 0, 'start_time': None, 'end_time': None } # 列映射缓存 self.column_cache = {} def merge_excel_files(self, folder_path: Optional[str] = None, output_file: Optional[str] = None) -> pd.DataFrame: """ 合并Excel文件 - 主函数 参数: folder_path: 文件夹路径 output_file: 输出文件路径 返回: 合并后的DataFrame """ print("=" * 60) print("Excel文件合并工具") print("=" * 60) # 设置路径 if folder_path: self.config['folder_path'] = folder_path if output_file: self.config['output_path'] = output_file if not self.config['folder_path']: raise ValueError("请提供文件夹路径") # 开始计时 self.stats['start_time'] = datetime.now() # 1. 扫描文件夹 print("\n1. 📁 扫描文件夹...") excel_files = self._scan_folder(self.config['folder_path']) if not excel_files: print("❌ 未找到Excel文件") return pd.DataFrame() print(f" 找到 {len(excel_files)} 个文件") # 2. 分块读取和合并 print("\n2. 📥 读取文件...") all_chunks = [] # 使用进度条 for i in tqdm(range(0, len(excel_files), self.config['chunk_size']), desc="处理进度"): chunk_files = excel_files[i:i + self.config['chunk_size']] chunk_data = self._process_chunk(chunk_files) if chunk_data is not None and not chunk_data.empty: all_chunks.append(chunk_data) if not all_chunks: print("❌ 没有成功读取任何数据") return pd.DataFrame() # 3. 合并所有数据块 print("\n3. 🔗 合并数据...") merged_df = pd.concat(all_chunks, ignore_index=True, sort=False) # 4. 数据清洗和转换 print("\n4. 🔧 数据清洗...") merged_df = self._clean_data(merged_df) # 5. 数据验证 print("\n5. ✅ 数据验证...") validation_results = self._validate_data(merged_df) # 6. 保存结果 print("\n6. 💾 保存结果...") output_path = self._save_results(merged_df, validation_results) # 7. 生成报告 print("\n7. 📊 生成报告...") self._generate_report(merged_df, validation_results, output_path) # 结束计时 self.stats['end_time'] = datetime.now() processing_time = (self.stats['end_time'] - self.stats['start_time']).total_seconds() print("\n" + "=" * 60) print(f"✅ 合并完成!耗时: {processing_time:.1f}秒") print("=" * 60) return merged_df def _scan_folder(self, folder_path: str) -> List[str]: """扫描文件夹,获取所有Excel文件""" excel_files = [] # 支持的文件扩展名 extensions = ['*.xlsx', '*.xls', '*.xlsm', '*.xlsb'] for ext in extensions: pattern = os.path.join(folder_path, '**', ext) # 包括子文件夹 files = glob.glob(pattern, recursive=True) excel_files.extend(files) # 去重和排序 excel_files = list(set(excel_files)) excel_files.sort() # 限制最大文件数 if len(excel_files) > self.config['max_files']: print(f"⚠️ 文件数量超过限制,只处理前 {self.config['max_files']} 个文件") excel_files = excel_files[:self.config['max_files']] return excel_files def _process_chunk(self, file_list: List[str]) -> Optional[pd.DataFrame]: """处理一个文件块""" chunk_data = [] for file_path in file_list: try: df = self._read_single_file(file_path) if df is not None and not df.empty: chunk_data.append(df) except Exception as e: self.stats['failed_files'].append({ 'file': file_path, 'error': str(e), 'time': datetime.now() }) print(f" ❌ 读取失败: {os.path.basename(file_path)[:30]}... - {str(e)[:50]}") if not chunk_data: return None # 合并当前块的数据 try: chunk_df = pd.concat(chunk_data, ignore_index=True, sort=False) self.stats['success_files'] += len(chunk_data) self.stats['total_rows'] += len(chunk_df) return chunk_df except Exception as e: print(f"❌ 合并块数据失败: {str(e)}") return None def _read_single_file(self, file_path: str) -> Optional[pd.DataFrame]: """读取单个Excel文件""" file_name = os.path.basename(file_path) file_ext = os.path.splitext(file_path)[1].lower() try: # 根据文件扩展名选择读取方式 if file_ext in ['.xlsx', '.xlsm', '.xlsb']: # 使用openpyxl引擎 engine = 'openpyxl' elif file_ext == '.xls': # 使用xlrd引擎 engine = 'xlrd' else: raise ValueError(f"不支持的文件格式: {file_ext}") # 尝试读取文件 read_kwargs = { 'engine': engine, 'sheet_name': self.config['sheet_name'], 'header': self.config['header_row'], 'skiprows': self.config['skip_rows'], 'dtype': self.config['data_types'], 'encoding': self.config['encoding'] } # 移除None值参数 read_kwargs = {k: v for k, v in read_kwargs.items() if v is not None} df = pd.read_excel(file_path, **read_kwargs) if df.empty: print(f" ⚠️ 文件无数据: {file_name}") return None # 添加文件信息 df['_源文件'] = file_name df['_文件路径'] = file_path df['_文件大小'] = os.path.getsize(file_path) df['_最后修改时间'] = datetime.fromtimestamp(os.path.getmtime(file_path)) df['_读取时间'] = datetime.now() # 标准列名 df = self._standardize_columns(df) # 提取文件信息 df = self._extract_file_info(df, file_name) print(f" ✅ 已读取: {file_name[:30]:30} ({len(df):6} 行)") return df except Exception as e: # 尝试使用不同的参数读取 try: print(f" ⚠️ 第一次读取失败,尝试其他参数: {file_name}") # 尝试不同的读取方式 for sheet in [0, None]: # 尝试第一个工作表或所有工作表 for header in [0, None]: # 尝试第一行作为表头或无表头 try: df = pd.read_excel(file_path, sheet_name=sheet, header=header) if df is not None and not df.empty: # 添加文件信息 df['_源文件'] = file_name df['_文件路径'] = file_path df['_读取时间'] = datetime.now() print(f" ✅ 重新读取成功: {file_name}") return df except: continue raise e # 如果所有尝试都失败,抛出异常 except Exception as e2: raise Exception(f"读取失败: {str(e2)}") def _standardize_columns(self, df: pd.DataFrame) -> pd.DataFrame: """标准化列名""" # 创建副本 df_clean = df.copy() # 标准化列名 df_clean.columns = df_clean.columns.astype(str) # 去除空格和特殊字符 df_clean.columns = df_clean.columns.str.strip() df_clean.columns = df_clean.columns.str.replace(r'\s+', ' ', regex=True) # 多个空格替换为单个 df_clean.columns = df_clean.columns.str.replace(r'[^\w\s]', '', regex=True) # 删除标点符号 # 统一中文标点 column_mapping = { '日期': ['日期', '时间', 'date', 'Date', 'DATE'], '产品型号': ['产品型号', '型号', '产品', 'product', 'Product', 'MODEL'], '计划产量': ['计划产量', '计划数', '计划', 'plan', 'Plan', 'PLAN_QTY'], '实际产量': ['实际产量', '实际数', '实际', 'actual', 'Actual', 'ACTUAL_QTY'], '合格数': ['合格数', '合格', '合格品', 'pass', 'Pass', 'PASS_QTY'], '不合格数': ['不合格数', '不合格', '不良数', 'fail', 'Fail', 'FAIL_QTY'], '不合格原因': ['不合格原因', '不良原因', '原因', 'reason', 'Reason', 'FAIL_REASON'], '工时': ['工时', '时间', '时长', 'time', 'Time', 'HOURS'], '效率': ['效率', '生产率', 'efficiency', 'Efficiency', 'EFF'], '备注': ['备注', '说明', 'note', 'Note', 'REMARK'] } # 应用列名映射 for standard_name, possible_names in column_mapping.items(): for col in df_clean.columns: if any(name.lower() in col.lower() for name in possible_names): df_clean = df_clean.rename(columns={col: standard_name}) break return df_clean def _extract_file_info(self, df: pd.DataFrame, file_name: str) -> pd.DataFrame: """从文件名提取信息""" # 尝试从文件名提取信息 patterns = [ r'(\w+)[-_](\w+)[-_](\d{4})[-_]?(\d{2})[-_]?(\d{2})', # 姓名-班组-日期 r'(\w+)[-_](\d{4})[-_]?(\d{2})[-_]?(\d{2})[-_]?(\w+)', # 姓名-日期-班组 r'班组(\d+)[-_](\d{4})[-_]?(\d{2})[-_]?(\d{2})', # 班组X-日期 ] file_info = { '文件名': file_name, '班组': '未知', '日期': '未知', '操作员': '未知' } for pattern in patterns: match = re.search(pattern, file_name, re.IGNORECASE) if match: groups = match.groups() if len(groups) >= 3: # 根据匹配结果设置信息 if '组' in file_name or 'team' in file_name.lower(): file_info['班组'] = groups[0] if any(keyword in file_name for keyword in ['202', '2023', '2024']): # 尝试提取日期 date_parts = [] for part in groups: if part and part.isdigit() and len(part) in [4, 2]: date_parts.append(part) if len(date_parts) >= 3: file_info['日期'] = f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}" break # 添加文件信息到DataFrame for key, value in file_info.items(): if key not in df.columns: df[key] = value return df def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame: """数据清洗和转换""" if df.empty: return df df_clean = df.copy() # 1. 处理缺失值 print(" 处理缺失值...") df_clean = df_clean.replace(['', ' ', 'NA', 'N/A', 'null', 'NULL', 'None', 'NaN', 'nan'], pd.NA) # 2. 转换数据类型 print(" 转换数据类型...") # 定义数值列 numeric_columns = [] for col in df_clean.columns: col_lower = str(col).lower() if any(keyword in col_lower for keyword in ['产量', '数', '量', '工时', '时间', '金额', '成本', 'price', 'qty', 'amount']): numeric_columns.append(col) for col in numeric_columns: if col in df_clean.columns: # 尝试转换为数值型 df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce') # 3. 转换日期列 print(" 转换日期列...") date_columns = [] for col in df_clean.columns: col_lower = str(col).lower() if any(keyword in col_lower for keyword in ['日期', '时间', 'date', 'time']): date_columns.append(col) for col in date_columns: if col in df_clean.columns: # 尝试多种日期格式 for fmt in self.config['date_formats']: try: df_clean[col] = pd.to_datetime(df_clean[col], format=fmt, errors='coerce') # 如果有成功转换的值,跳出循环 if not df_clean[col].isna().all(): break except: continue # 4. 去除重复行 print(" 去除重复行...") before_dedup = len(df_clean) df_clean = df_clean.drop_duplicates() after_dedup = len(df_clean) duplicates_removed = before_dedup - after_dedup if duplicates_removed > 0: print(f" 移除了 {duplicates_removed} 个重复行") # 5. 处理异常值 print(" 处理异常值...") for col in numeric_columns: if col in df_clean.columns: # 识别和处理异常值(基于IQR方法) Q1 = df_clean[col].quantile(0.25) Q3 = df_clean[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR # 将异常值替换为边界值 df_clean[col] = df_clean[col].clip(lower_bound, upper_bound) return df_clean def _validate_data(self, df: pd.DataFrame) -> Dict: """数据验证""" validation_results = { 'total_rows': len(df), 'missing_values': {}, 'data_types': {}, 'value_ranges': {}, 'business_rules': {} } if df.empty: return validation_results # 1. 检查缺失值 print(" 检查缺失值...") missing_counts = df.isnull().sum() missing_percent = (missing_counts / len(df) * 100).round(2) validation_results['missing_values'] = { 'counts': missing_counts[missing_counts > 0].to_dict(), 'percentages': missing_percent[missing_counts > 0].to_dict() } # 2. 检查数据类型 print(" 检查数据类型...") dtypes = df.dtypes.astype(str).to_dict() validation_results['data_types'] = dtypes # 3. 检查数值范围 print(" 检查数值范围...") numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: if col in df.columns: validation_results['value_ranges'][col] = { 'min': float(df[col].min()), 'max': float(df[col].max()), 'mean': float(df[col].mean()), 'std': float(df[col].std()) } # 4. 业务规则验证 print(" 业务规则验证...") # 规则1: 实际产量不能为负数 if '实际产量' in df.columns: negative_qty = (df['实际产量'] < 0).sum() validation_results['business_rules']['negative_production'] = { 'rule': '实际产量不能为负数', 'violations': int(negative_qty) } # 规则2: 合格数 + 不合格数 = 实际产量 if all(col in df.columns for col in ['合格数', '不合格数', '实际产量']): mismatch = (df['合格数'] + df['不合格数'] != df['实际产量']).sum() validation_results['business_rules']['quantity_mismatch'] = { 'rule': '合格数 + 不合格数 = 实际产量', 'violations': int(mismatch) } # 规则3: 效率应在合理范围内 (0-200%) if '效率' in df.columns: invalid_efficiency = ((df['效率'] < 0) | (df['效率'] > 2)).sum() validation_results['business_rules']['invalid_efficiency'] = { 'rule': '效率应在0-200%之间', 'violations': int(invalid_efficiency) } return validation_results def _save_results(self, df: pd.DataFrame, validation_results: Dict) -> str: """保存结果""" if self.config['output_path']: output_path = self.config['output_path'] else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = f"合并结果_{timestamp}.xlsx" print(f" 保存到: {output_path}") # 创建Excel写入器 with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # 1. 保存合并后的数据 df.to_excel(writer, sheet_name='合并数据', index=False) # 2. 保存统计信息 self._save_statistics(writer, df, validation_results) # 3. 保存数据质量报告 self._save_quality_report(writer, validation_results) # 4. 保存文件清单 self._save_file_list(writer) # 应用格式 self._apply_excel_formatting(output_path, df) return output_path def _save_statistics(self, writer, df: pd.DataFrame, validation_results: Dict): """保存统计信息""" stats_data = [] # 基本统计 stats_data.append(['总文件数', self.stats['total_files']]) stats_data.append(['成功文件数', self.stats['success_files']]) stats_data.append(['失败文件数', len(self.stats['failed_files'])]) stats_data.append(['总行数', len(df)]) stats_data.append(['总列数', len(df.columns)]) # 数值列统计 numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: stats_data.append([f'{col} - 平均值', df[col].mean()]) stats_data.append([f'{col} - 总和', df[col].sum()]) stats_data.append([f'{col} - 最小值', df[col].min()]) stats_data.append([f'{col} - 最大值', df[col].max()]) # 分组统计 if '班组' in df.columns: group_stats = df.groupby('班组').agg({ '实际产量': ['sum', 'mean', 'count'] }).round(2) # 重命名列 group_stats.columns = ['总产量', '平均产量', '记录数'] group_stats.reset_index(inplace=True) # 保存到Excel group_stats.to_excel(writer, sheet_name='班组统计', index=False) # 日期统计 date_cols = df.select_dtypes(include=['datetime64']).columns if len(date_cols) > 0: date_col = date_cols[0] df['月份'] = df[date_col].dt.to_period('M').astype(str) monthly_stats = df.groupby('月份').agg({ '实际产量': 'sum' }).round(2) monthly_stats.reset_index(inplace=True) monthly_stats.to_excel(writer, sheet_name='月度统计', index=False) # 保存基本统计 stats_df = pd.DataFrame(stats_data, columns=['指标', '值']) stats_df.to_excel(writer, sheet_name='统计信息', index=False) def _save_quality_report(self, writer, validation_results: Dict): """保存数据质量报告""" quality_data = [] # 缺失值报告 missing_counts = validation_results.get('missing_values', {}).get('counts', {}) missing_percent = validation_results.get('missing_values', {}).get('percentages', {}) for col, count in missing_counts.items(): percent = missing_percent.get(col, 0) quality_data.append([col, '缺失值', count, f"{percent}%"]) # 业务规则违反报告 business_rules = validation_results.get('business_rules', {}) for rule_name, rule_info in business_rules.items(): quality_data.append([ '业务规则', rule_info.get('rule', ''), rule_info.get('violations', 0), '' ]) if quality_data: quality_df = pd.DataFrame(quality_data, columns=['字段', '问题类型', '数量', '百分比']) quality_df.to_excel(writer, sheet_name='数据质量', index=False) def _save_file_list(self, writer): """保存文件清单""" file_data = [] # 成功文件 for i in range(min(1000, self.stats['success_files'])): # 限制数量 file_data.append(['成功', f'文件_{i+1}', '']) # 失败文件 for fail_info in self.stats['failed_files']: file_data.append([ '失败', os.path.basename(fail_info['file']), fail_info['error'][:100] # 限制错误信息长度 ]) if file_data: files_df = pd.DataFrame(file_data, columns=['状态', '文件名', '错误信息']) files_df.to_excel(writer, sheet_name='文件清单', index=False) def _apply_excel_formatting(self, file_path: str, df: pd.DataFrame): """应用Excel格式""" try: from openpyxl import load_workbook from openpyxl.styles import PatternFill, Font, Border, Side, Alignment, numbers wb = load_workbook(file_path) # 格式化每个工作表 for sheet_name in wb.sheetnames: ws = wb[sheet_name] # 设置列宽 for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) ws.column_dimensions[column_letter].width = adjusted_width # 设置表头样式 header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") header_font = Font(color="FFFFFF", bold=True) header_alignment = Alignment(horizontal="center", vertical="center") for cell in ws[1]: cell.fill = header_fill cell.font = header_font cell.alignment = header_alignment # 设置边框 thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) for row in ws.iter_rows(min_row=1, max_row=ws.max_row, max_col=ws.max_column): for cell in row: cell.border = thin_border # 设置数字格式 if sheet_name == '合并数据': for row in ws.iter_rows(min_row=2, max_row=ws.max_row): for cell in row: if isinstance(cell.value, (int, float)): if cell.column_letter in ['F', 'G', 'H', 'I', 'K']: # 数值列 cell.number_format = '#,##0' elif cell.column_letter == 'L': # 效率列 cell.number_format = '0.00%' wb.save(file_path) print(f" 格式已应用") except Exception as e: print(f" 应用格式时出错: {str(e)}") def _generate_report(self, df: pd.DataFrame, validation_results: Dict, output_path: str): """生成报告""" print("\n" + "=" * 60) print("数据合并报告") print("=" * 60) print(f"\n📊 汇总统计:") print(f" 总文件数: {self.stats['total_files']}") print(f" 成功合并: {self.stats['success_files']}") print(f" 处理失败: {len(self.stats['failed_files'])}") print(f" 总数据行: {len(df):,}") print(f" 总数据列: {len(df.columns)}") processing_time = (self.stats['end_time'] - self.stats['start_time']).total_seconds() print(f" 处理时间: {processing_time:.1f}秒") print(f" 处理速度: {len(df)/processing_time:.0f} 行/秒") print(f"\n✅ 数据质量:") # 缺失值统计 missing_counts = validation_results.get('missing_values', {}).get('counts', {}) if missing_counts: print(f" 缺失值统计:") for col, count in list(missing_counts.items())[:5]: # 显示前5个 percent = validation_results['missing_values']['percentages'][col] print(f" {col}: {count} ({percent}%)") else: print(f" ✅ 无缺失值") # 业务规则违反 business_rules = validation_results.get('business_rules', {}) if business_rules: print(f"\n⚠️ 业务规则检查:") for rule_name, rule_info in business_rules.items(): print(f" {rule_info['rule']}: {rule_info['violations']} 处违规") else: print(f"\n✅ 所有业务规则检查通过") # 文件清单 if self.stats['failed_files']: print(f"\n❌ 失败文件列表 (前10个):") for i, fail_info in enumerate(self.stats['failed_files'][:10]): print(f" {i+1}. {os.path.basename(fail_info['file'])[:30]}...") print(f" 错误: {fail_info['error'][:50]}...") print(f"\n💾 输出文件: {output_path}") print(f" 包含以下工作表:") print(f" - 合并数据: 所有合并后的数据") print(f" - 统计信息: 基本统计指标") if '班组' in df.columns: print(f" - 班组统计: 按班组分组统计") if len(df.select_dtypes(include=['datetime64']).columns) > 0: print(f" - 月度统计: 按月统计趋势") if business_rules or missing_counts: print(f" - 数据质量: 数据质量检查结果") print(f" - 文件清单: 处理的文件列表") print(f"\n🎯 分析建议:") # 基于数据的建议 if '效率' in df.columns: avg_efficiency = df['效率'].mean() if avg_efficiency < 0.8: print(f" ⚠️ 平均效率偏低: {avg_efficiency:.1%},建议分析原因") else: print(f" ✅ 平均效率良好: {avg_efficiency:.1%}") if '不合格数' in df.columns and '实际产量' in df.columns: defect_rate = df['不合格数'].sum() / df['实际产量'].sum() if defect_rate > 0.05: print(f" ⚠️ 不合格率偏高: {defect_rate:.1%},建议质量检查") else: print(f" ✅ 不合格率正常: {defect_rate:.1%}") if '班组' in df.columns: group_counts = df['班组'].nunique() print(f" 📈 共 {group_counts} 个班组的数据") # 找出产量最高和最低的班组 if '实际产量' in df.columns: group_production = df.groupby('班组')['实际产量'].sum() max_group = group_production.idxmax() min_group = group_production.idxmin() print(f" 🥇 产量最高: {max_group} ({group_production[max_group]:,.0f})") print(f" 🥇 产量最低: {min_group} ({group_production[min_group]:,.0f})") print(f"\n⏰ 报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60)