# -*- coding: utf-8 -*- import json from netmiko import ConnectHandler import pandas as pd import re import numpy as np from openpyxl.styles import PatternFill from openpyxl import load_workbook # ====================== # 读取设备 # ====================== with open("devices.json", "r", encoding="utf-8") as f: devices = json.load(f) # ====================== # 连接设备 # ====================== def connect_device(device): try: print(f"\n🔍 连接 {device['name']}...") conn = ConnectHandler( device_type="hp_comware", host=device['host'], username=device['username'], password=device['password'], timeout=10, global_delay_factor=2 ) print("✅ 连接成功") return conn except Exception as e: print(f"❌ 连接失败: {e}") return None # ====================== # 数据采集 # ====================== def collect_data(conn): return { "intf": conn.send_command("display interface", read_timeout=20), "cpu": conn.send_command("display cpu-usage"), "mem": conn.send_command("display memory summary"), "lag": conn.send_command("display link-aggregation verbose"), "log": conn.send_command("display logbuffer") } # ====================== # 解析函数 # ====================== def parse_cpu(cpu): values = [int(x) for x in re.findall(r"(\d+)%", cpu)] return max(values) if values else None def parse_mem(mem): match = re.findall(r"(\d+\.\d+)%", mem) if match: free_ratio = float(match[-1]) return round(100 - free_ratio, 1) return None def parse_flap(log): return re.findall(r"GigabitEthernet\d+/\d+/\d+", log) def parse_lag(lag): lags = [] blocks = lag.split("Bridge-Aggregation") for block in blocks[1:]: name = "Bridge-Aggregation" + block.split()[0] state = "DOWN" if "Current state: DOWN" in block else "UP" lags.append(f"{name}({state})") return lags def parse_interface_traffic(intf): result = {} current_intf = None for line in intf.splitlines(): m = re.match(r"(GigabitEthernet\d+/\d+/\d+)", line) if m: current_intf = m.group(1) t = re.search(r"(\d+)\s+bytes/sec", line) if t and current_intf: result[current_intf] = int(t.group(1)) return result def format_traffic(val): if val >= 1_000_000: return f"{round(val/1_000_000,2)}MB/s" elif val >= 1_000: return f"{round(val/1_000,2)}KB/s" else: return f"{val}B/s" def detect_traffic_abnormal(intf_traffic): abnormal = [] if not intf_traffic: return abnormal values = list(intf_traffic.values()) mean = np.mean(values) std = np.std(values) for intf, val in intf_traffic.items(): if val > mean + 2 * std and val > 100000: abnormal.append(f"{intf}({format_traffic(val)})") return abnormal # ====================== # 诊断 # ====================== def diagnose(data): cpu = parse_cpu(data['cpu']) mem = parse_mem(data['mem']) flap = parse_flap(data['log']) lag = parse_lag(data['lag']) intf_traffic = parse_interface_traffic(data['intf']) traffic_abnormal = detect_traffic_abnormal(intf_traffic) lag_problem = [x for x in lag if "DOWN" in x] return { "接口flap": ",".join(set(flap)) if flap else "正常", "CPU": f"{cpu}%" if cpu else "未获取", "内存": f"{mem}%" if mem else "未获取", "聚合口": ",".join(lag) if lag else "未配置", "流量异常": ",".join(traffic_abnormal) if traffic_abnormal else "正常", "总问题": "异常" if flap or lag_problem or traffic_abnormal else "正常" } # ====================== # 主程序 # ====================== results = [] for dev in devices: conn = connect_device(dev) if conn: data = collect_data(conn) conn.disconnect() res = diagnose(data) else: res = {k: "连接失败" for k in ["接口flap","CPU","内存","聚合口","流量异常","总问题"]} res["设备"] = dev['name'] results.append(res) # ====================== # 写入Excel # ====================== df = pd.DataFrame(results) file = "network_report.xlsx" df.to_excel(file, index=False) # ====================== # 精准高亮(只标异常字段) # ====================== wb = load_workbook(file) ws = wb.active red = PatternFill("solid", fgColor="FF0000") for r in range(2, ws.max_row + 1): # 接口flap if ws.cell(r, 1).value != "正常": ws.cell(r, 1).fill = red # CPU > 80% cpu = ws.cell(r, 2).value if cpu and cpu != "未获取": if int(cpu.replace("%", "")) > 80: ws.cell(r, 2).fill = red # 内存 > 80% mem = ws.cell(r, 3).value if mem and mem != "未获取": if float(mem.replace("%", "")) > 80: ws.cell(r, 3).fill = red # 聚合口 DOWN lag = ws.cell(r, 4).value if lag and "DOWN" in lag: ws.cell(r, 4).fill = red # 流量异常 if ws.cell(r, 5).value != "正常": ws.cell(r, 5).fill = red # 总问题 if ws.cell(r, 6).value != "正常": ws.cell(r, 6).fill = red wb.save(file) print("\n🎉 完成!生成 network_report.xlsx(已优化高亮)") |