import osimport jsonimport reimport base64import ioimport timefrom datetime import datetimefrom pathlib import Pathimport requestsimport pandas as pdfrom openpyxl import load_workbookfrom openpyxl.styles import PatternFill, Font, Alignment, Border, Sideimport fitzfrom PIL import Image# ==================== 配置 ====================OLLAMA_API_URL = "http://localhost:11434/api/generate"MODEL_NAME = "glm-ocr"DESKTOP_PATH = Path.home() / "Desktop"OUTPUT_EXCEL = DESKTOP_PATH / "发票汇总表.xlsx"INVOICE_FOLDER = DESKTOP_PATH / "发票"PDF_DPI = 200MAX_IMAGE_SIZE = 1600REQUEST_TIMEOUT = 180# ==================== Excel表头 ====================COLUMN_NAMES_MAP = { 'invoice_number': '发票号码', 'is_duplicate': '是否重复', 'invoice_date': '开票日期', 'invoice_type': '发票类型', 'total_amount': '价税合计(元)', 'tax_amount': '税额(元)', 'amount_without_tax': '不含税金额(元)', 'seller_name': '销售方名称', 'seller_tax_id': '销售方税号', 'buyer_name': '购买方名称', 'buyer_tax_id': '购买方税号', 'recognition_time': '识别时间', 'source_file': '源文件', 'page_number': '页码'}EXCEL_COLUMNS_ORDER = [ 'is_duplicate', 'invoice_number', 'invoice_date', 'invoice_type', 'total_amount', 'tax_amount', 'amount_without_tax', 'seller_name', 'seller_tax_id', 'buyer_name', 'buyer_tax_id', 'recognition_time', 'source_file', 'page_number']# ==================== 图片处理 ====================def pdf_to_base64(pdf_path): images = [] pdf_doc = None try: pdf_doc = fitz.open(pdf_path) for page_num in range(len(pdf_doc)): page = pdf_doc[page_num] zoom = PDF_DPI / 72 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat, alpha=False) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) if max(img.size) > MAX_IMAGE_SIZE: ratio = MAX_IMAGE_SIZE / max(img.size) img = img.resize((int(img.width * ratio), int(img.height * ratio)), Image.Resampling.LANCZOS) buf = io.BytesIO() img.save(buf, format='JPEG', quality=90, optimize=True) images.append(base64.b64encode(buf.getvalue()).decode('utf-8')) return images except Exception as e: print(f" ❌ PDF处理失败: {e}") return [] finally: if pdf_doc: pdf_doc.close()def image_to_base64(image_path): try: img = Image.open(image_path) if img.mode in ('RGBA', 'LA', 'P'): rgb_img = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') rgb_img.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None) img = rgb_img if max(img.size) > MAX_IMAGE_SIZE: ratio = MAX_IMAGE_SIZE / max(img.size) img = img.resize((int(img.width * ratio), int(img.height * ratio)), Image.Resampling.LANCZOS) buf = io.BytesIO() img.save(buf, format='JPEG', quality=90, optimize=True) return base64.b64encode(buf.getvalue()).decode('utf-8') except Exception as e: print(f" ❌ 图片处理失败: {e}") return None# ==================== OCR调用 ====================def call_ocr(image_base64, prompt): payload = { "model": MODEL_NAME, "prompt": prompt, "images": [image_base64], "stream": False, "options": {"temperature": 0, "top_p": 0.9, "num_predict": 2048} } try: resp = requests.post(OLLAMA_API_URL, json=payload, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json().get('response', '').strip() except: return ""# ==================== 识别逻辑 ====================def extract_all_text(image_base64): prompt = """请识别这张发票中的所有文字,按从上到下、从左到右顺序逐行输出。只输出原文,不要添加任何解释或JSON格式。""" text = call_ocr(image_base64, prompt) if text: text = re.sub(r'^(以下是|识别结果|文字内容|OCR结果)[::]\s*', '', text, flags=re.IGNORECASE) text = re.sub(r'^```\w*\s*|```$', '', text) return text.strip() return ""def parse_fields(text): """从全文提取字段,关键正则都针对发票结构优化""" if not text: return {} result = {} # 发票号码(20位) m = re.search(r'发票号码[::\s]*(\d{20})', text) or re.search(r'(?<!\d)(\d{20})(?!\d)', text) if m: result['invoice_number'] = m.group(1) # 开票日期 m = re.search(r'开票日期[::\s]*(\d{4}[年/-]\d{1,2}[月/-]\d{1,2}日?)', text) if not m: m = re.search(r'(\d{4}年\d{1,2}月\d{1,2}日|\d{4}-\d{1,2}-\d{1,2})', text) if m: result['invoice_date'] = re.sub(r'[年月]', '-', m.group(1)).replace('日', '') # 金额:优先找"价税合计(小写)" m = re.search(r'价税合计[((]小写[))]\s*[¥¥]\s*(\d+\.?\d{0,2})', text) if m: result['total_amount'] = f"{float(m.group(1)):.2f}" # 提取所有两位小数,按大小分配 all_money = sorted(set(float(x) for x in re.findall(r'(?<!\d)(\d+\.\d{2})(?!\d)', text) if float(x) > 0)) if len(all_money) >= 3: if 'total_amount' not in result: result['total_amount'] = f"{all_money[-1]:.2f}" result['amount_without_tax'] = f"{all_money[-2]:.2f}" result['tax_amount'] = f"{all_money[0]:.2f}" elif len(all_money) == 2: if 'total_amount' not in result: result['total_amount'] = f"{all_money[-1]:.2f}" result['tax_amount'] = f"{all_money[0]:.2f}" result['amount_without_tax'] = f"{all_money[-1] - all_money[0]:.2f}" # 划分买卖双方区域 if '销售方' in text: parts = text.split('销售方', 1) buyer_section, seller_section = parts[0], parts[1] else: lines = text.split('\n') mid = len(lines) // 2 buyer_section = '\n'.join(lines[:mid]) seller_section = '\n'.join(lines[mid:]) # 购买方名称:必须在"名称:"之后、税号关键词之前,排除明显噪声 m = re.search(r'名称[::]\s*([^\n]{1,40}?)\s*(?:统一社会信用代码|纳税人识别号|$)', buyer_section) if m: name = m.group(1).strip(':: ') # 排除发票标题类噪声 noise = ['电子发票', '普通发票', '增值税', '发票号码', '开票日期', '旅客运输服务', '货物或应税劳务'] if name and name not in noise and not re.match(r'^[\d\s.,,。::]+$', name): result['buyer_name'] = name # 购买方税号 m = re.search(r'(?:纳税人识别号|统一社会信用代码)[//]?\s*[::]\s*([0-9A-Za-z]{18})', buyer_section) if m: result['buyer_tax_id'] = m.group(1).upper() # 销售方名称:在销售方区域找"名称:" m = re.search(r'名称[::]\s*([^\n]{1,40}?)\s*(?:统一社会信用代码|纳税人识别号|地址|电话|开户行|$)', seller_section) if m: name = m.group(1).strip(':: ') noise = ['电子发票', '普通发票', '备注', '收款人', '复核人', '开票人'] if name and name not in noise and not re.match(r'^[\d\s.,,。::]+$', name) and len(name) >= 2: result['seller_name'] = name # 销售方税号 m = re.search(r'(?:纳税人识别号|统一社会信用代码)[//]?\s*[::]\s*([0-9A-Za-z]{18})', seller_section) if m: result['seller_tax_id'] = m.group(1).upper() else: # 全局搜索18位码,排除银行账号上下文 buyer_tid = result.get('buyer_tax_id', '') for tid in re.findall(r'(?<!\d)([0-9A-Za-z]{18})(?!\d)', text.upper()): if tid != buyer_tid: pos = text.find(tid) if not any(kw in text[max(0, pos-30):pos] for kw in ['银行', '账号', '开户']): result['seller_tax_id'] = tid break return resultdef targeted_extract(image_base64, field_name): """精准提取缺失字段""" prompts = { 'total_amount': '请找出"价税合计(小写)"金额。只返回JSON:{"total_amount":"金额"}', 'tax_amount': '请找出"合计"行"税额"列,通常最小。只返回JSON:{"tax_amount":"税额"}', 'amount_without_tax': '请找出"合计"行"金额"列(不含税)。只返回JSON:{"amount_without_tax":"金额"}', 'buyer_name': '请找购买方"名称",个人填"个人"。只返回JSON:{"buyer_name":"名称"}', 'buyer_tax_id': '请找购买方税号(18位),个人无则""。只返回JSON:{"buyer_tax_id":"税号"}', 'seller_name': '请找销售方公司全名。只返回JSON:{"seller_name":"名称"}', 'seller_tax_id': '请找销售方税号(18位)。只返回JSON:{"seller_tax_id":"税号"}', } if field_name not in prompts: return "" text = call_ocr(image_base64, prompts[field_name]) if not text: return "" m = re.search(r'\{[^{}]*\}', text) if m: try: return json.loads(m.group()).get(field_name, '').strip() except: pass return ""def clean_and_validate(data): """清理和校验数据""" cleaned = {} for f in ['invoice_number', 'invoice_date', 'total_amount', 'tax_amount', 'amount_without_tax', 'seller_name', 'seller_tax_id', 'buyer_name', 'buyer_tax_id']: v = str(data.get(f, '')).strip() if not v: cleaned[f] = '' continue if 'amount' in f: v = re.sub(r'[^\d.]', '', v) if v.count('.') > 1: v = v.split('.')[0] + '.' + ''.join(v.split('.')[1:]) try: cleaned[f] = f"{float(v):.2f}" if v else '' except: cleaned[f] = '' elif 'tax_id' in f: m = re.search(r'([0-9A-Z]{18})', re.sub(r'[^0-9A-Za-z]', '', v).upper()) cleaned[f] = m.group(1) if m else '' elif f == 'invoice_number': m = re.search(r'(\d{20})', re.sub(r'[^\d]', '', v)) cleaned[f] = m.group(1) if m else '' else: cleaned[f] = v # 金额修复 try: total = float(cleaned.get('total_amount', 0) or 0) without = float(cleaned.get('amount_without_tax', 0) or 0) tax = float(cleaned.get('tax_amount', 0) or 0) if tax > without > 0: cleaned['tax_amount'], cleaned['amount_without_tax'] = cleaned['amount_without_tax'], cleaned['tax_amount'] tax, without = without, tax if total > 0 and without > 0 and tax > 0: if abs(total - without - tax) > 0.05: cleaned['amount_without_tax'] = f"{round(total - tax, 2):.2f}" except: pass code = cleaned.get('invoice_number', '') if code: if code[:2] == '13': cleaned['invoice_type'] = '电子发票(专用发票)' else: cleaned['invoice_type'] = '电子发票(普通发票)' else: cleaned['invoice_type'] = '' return cleaneddef recognize(image_base64): text = extract_all_text(image_base64) if not text: return None data = parse_fields(text) required = ['total_amount', 'amount_without_tax', 'tax_amount', 'buyer_name', 'seller_name', 'seller_tax_id'] missing = [f for f in required if f not in data or not data[f]] if missing: print(f" 补充识别: {'/'.join(missing)}") for f in missing: val = targeted_extract(image_base64, f) if val: data[f] = val time.sleep(0.3) return clean_and_validate(data)# ==================== 查重和保存 ====================def check_duplicate(df, new_records, new_record): num = new_record.get('invoice_number', '') if not num: return False for _, row in df.iterrows(): if str(row.get('invoice_number', '')) == num: return True for rec in new_records: if rec.get('invoice_number', '') == num: return True return Falsedef save_excel(df): try: df_display = df[EXCEL_COLUMNS_ORDER].copy() df_display.columns = [COLUMN_NAMES_MAP[k] for k in EXCEL_COLUMNS_ORDER] df_display.to_excel(OUTPUT_EXCEL, index=False) wb = load_workbook(OUTPUT_EXCEL) ws = wb.active hf = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") hfont = Font(color="FFFFFF", bold=True, size=11) tb = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin')) dfill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") for cell in ws[1]: cell.fill = hf cell.font = hfont cell.alignment = Alignment(horizontal="center", vertical="center") cell.border = tb for row in range(2, ws.max_row + 1): is_dup = ws.cell(row=row, column=1).value for col in range(1, ws.max_column + 1): cell = ws.cell(row=row, column=col) cell.border = tb if is_dup == '是': cell.fill = dfill cell.alignment = Alignment(horizontal="right" if col in (5, 6, 7) else "left", vertical="center") for col_cells in ws.columns: max_len = 0 for cell in col_cells: if cell.value: max_len = max(max_len, sum(2 if '\u4e00' <= c <= '\u9fff' else 1 for c in str(cell.value))) ws.column_dimensions[col_cells[0].column_letter].width = min(max_len + 3, 50) ws.freeze_panes = 'A2' wb.save(OUTPUT_EXCEL) return True except Exception as e: print(f"❌ 保存失败: {e}") return False# ==================== 主程序 ====================def main(): print("\n" + "=" * 50) print(" 发票OCR识别系统 v7.5") print("=" * 50) if not INVOICE_FOLDER.exists(): INVOICE_FOLDER.mkdir(parents=True) print(f"\n📁 请将发票文件放入: {INVOICE_FOLDER}") return all_files = sorted(set(f for ext in ('*.pdf', '*.PDF', '*.jpg', '*.jpeg', '*.png', '*.bmp') for f in INVOICE_FOLDER.glob(ext))) if not all_files: print(f"\n❌ 文件夹内没有发票文件") return print(f"\n📊 找到 {len(all_files)} 个文件") if OUTPUT_EXCEL.exists(): df_display = pd.read_excel(OUTPUT_EXCEL) reverse_map = {v: k for k, v in COLUMN_NAMES_MAP.items()} df = pd.DataFrame() for col in df_display.columns: if col in reverse_map: df[reverse_map[col]] = df_display[col] print(f"📂 已有 {len(df)} 条记录") else: df = pd.DataFrame(columns=list(COLUMN_NAMES_MAP.keys())) processed = set(df['source_file'].tolist()) if not df.empty else set() new_records = [] print("\n🔄 开始识别...\n") for idx, file_path in enumerate(all_files, 1): print(f"[{idx}/{len(all_files)}] {file_path.name}") if file_path.name in processed: print(" ⏭️ 已处理过\n") continue images = pdf_to_base64(file_path) if file_path.suffix.lower() == '.pdf' else [img] if (img := image_to_base64(file_path)) else [] if not images: print() continue for page_num, img_b64 in enumerate(images, 1): data = recognize(img_b64) if not data: print(" ❌ 识别失败\n") continue data['source_file'] = file_path.name data['page_number'] = page_num if file_path.suffix.lower() == '.pdf' else '' data['recognition_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') is_dup = check_duplicate(df, new_records, data) data['is_duplicate'] = '是' if is_dup else '否' print(f" 价税合计: ¥{data.get('total_amount', '0')}") print(f" 不含税金额: ¥{data.get('amount_without_tax', '0')}") print(f" 税额: ¥{data.get('tax_amount', '0')}") try: t, w, x = float(data.get('total_amount', 0) or 0), float(data.get('amount_without_tax', 0) or 0), float(data.get('tax_amount', 0) or 0) if t > 0 and w > 0 and x > 0: print(f" {'✅'ifabs(t-w-x) <= 0.05else'⚠️'} 金额验证: ¥{t} = ¥{w} + ¥{x}") except: pass print(f" {'⚠️ 重复发票'if is_dup else'✅ 正常'}\n") new_records.append(data) processed.add(file_path.name) if new_records: new_df = pd.DataFrame(new_records) df = pd.concat([df, new_df], ignore_index=True) if save_excel(df): print("=" * 50) print(f"✅ 新增 {len(new_records)} 张,总计 {len(df)} 张") print(f"💾 已保存到桌面: 发票汇总表.xlsx") else: print("ℹ️ 没有新增发票")if __name__ == "__main__": main()