import paramikoimport timeimport datetimeimport socketfrom openpyxl import load_workbookimport loggingimport pandas as pdfrom openpyxl.utils import get_column_letterimport osimport getpassimport tkinter as tkfrom tkinter import messageboximport threading
重点:paramiko 是 SSH 连接的核心,pandas/openpyxl 用于 Excel 读写,threading 实现多线程批量处理。作用:生成如 20260202_10时30分00秒 的时间字符串,用于日志 / 文件命名。作用:确保 result 文件夹存在,避免保存文件时因目录不存在报错。def get_current_time(): """ 获取当前时间并格式化为字符串 :return: 格式化后的当前时间字符串 """ now = datetime.datetime.now() return now.strftime('%Y%m%d_%H时%M分%S秒')def ensure_result_folder(result_folder="result"): """ 确保结果文件夹存在,如果不存在则创建它 :param result_folder: 结果文件夹的名称,默认为 "result" """ if not os.path.exists(result_folder): os.makedirs(result_folder)
作用:创建独立的日志器(成功 / 错误),日志文件保存到 result 目录,格式包含时间、日志级别、内容。def configure_logger(logger_name, log_file, log_level=logging.INFO, formatter=logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')): """ 配置日志记录器 :param logger_name: 日志记录器名称 :param log_file: 日志文件路径 :param log_level: 日志级别,默认为 logging.INFO :param formatter: 日志格式,默认为 '%(asctime)s - %(levelname)s - %(message)s' :return: 配置好的日志记录器 """ logger = logging.getLogger(logger_name) logger.setLevel(log_level) # 确保日志文件所在目录存在 log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger
作用:获取运行脚本的本机 IP,用于日志记录(无实际业务作用)。def get_local_ip(): """ 获取本地 IP 地址 :return: 本地 IP 地址,如果获取失败则返回 None """ try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except (socket.error, Exception) as e: logging.getLogger('error').error(f"获取本地 IP 地址出错: {str(e)}") return None
def read_info(excel_file="info.xlsx"): """ 从 Excel 文件中读取设备信息和命令列表 :param excel_file: Excel 文件路径,默认为 "info.xlsx" :return: 设备信息字典和命令列表 """ try: workbook = load_workbook(excel_file) device_sheet = workbook['信息'] command_sheet = workbook['华为'] device_info = {} command_list = [] # 读取设备信息 for row in device_sheet.iter_rows(min_row=2, values_only=True): try: ip = row[2] username = row[5] password = str(row[6]) device_info[ip] = [username, password] except IndexError: logging.getLogger('error').error(f"在读取设备信息时,行数据格式不正确: {row}") continue # 读取命令列表 for row in command_sheet.iter_rows(min_row=2, values_only=True): if row and len(row) > 0 and row[0] is not None: command_txt = row[0].strip() if command_txt: command_list.append(command_txt) return device_info, command_list except FileNotFoundError: logging.getLogger('error').error(f"Excel 文件 '{excel_file}' 不存在,请检查文件路径是否正确!") raise except KeyError as e: logging.getLogger('error').error(f"工作表名称设置不正确: {str(e)},请检查 Excel 文件内容结构!") raise
关键:- 读取文件名为info的信息表中sheet信息,3、6、7列的信息生成字典,info的名称可以自己修改,代码中需同步修改,,IP 对应第 3 列(row [2])、用户名第 6 列(row [5])、密码第 7 列(row [6]);
- 读取sheet华为里面的第一列的命令生成字典,为输入设备的command,

def execute_commands_for_device(ip, username, password, command_list, success_logger, error_logger, success_count, failure_count, timeout=5, recv_wait_time=(0.5, 1), result_queue=None): """ 为单个设备执行命令并将结果保存到 txt 文件 :param ip: 设备的 IP 地址 :param username: 用户名 :param password: 密码 :param command_list: 命令列表 :param success_logger: 成功日志记录器 :param error_logger: 错误日志记录器 :param timeout: SSH 连接超时时间,默认为 5 秒 :param recv_wait_time: 接收数据的等待时间元组,(发送命令后的等待时间,接收数据时的等待时间),默认为 (0.5, 3) 秒 """ local_ip = get_local_ip() try: with paramiko.SSHClient() as ssh_client: ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect(hostname=ip, username=username, password=password, look_for_keys=False, timeout=timeout) success_logger.info(f"成功连接到 {ip},用户名: {username},源地址: {local_ip}") command = ssh_client.invoke_shell() all_outputs = {} for cmd in command_list: success_logger.info(f"发送命令: {cmd}") command.send(cmd + '\n') time.sleep(recv_wait_time[0]) all_output = "" while True: if command.recv_ready(): output = command.recv(65535).decode('utf-8') all_output += output time.sleep(recv_wait_time[1]) else: break all_outputs[cmd] = all_output date = datetime.datetime.now().strftime('%Y-%m-%d_%H时%M分%S秒') file_path = os.path.join("result", f"{ip}_{date}.xlsx") with pd.ExcelWriter(file_path, engine='openpyxl') as writer: for cmd, output in all_outputs.items(): data = output.splitlines() df = pd.DataFrame(data) sheet_name = cmd[:31] df.to_excel(writer, index=False, sheet_name=sheet_name, header=False) worksheet = writer.sheets[sheet_name] for i in range(1, len(df.columns) + 1): column_letter = get_column_letter(i) worksheet.column_dimensions[column_letter].width = 100 success_logger.info('----------创建 Excel 文件成功----------') with threading.Lock(): success_count[0] += 1 except paramiko.AuthenticationException: error_logger.error(f"连接 {ip} 时认证失败") print(f"连接 {ip} 时认证失败") with threading.Lock(): failure_count[0] += 1 except paramiko.SSHException as e: error_logger.error(f"连接 {ip} 出现SSH相关错误: {str(e)}") print(f"连接 {ip} 出现SSH相关错误: {str(e)}") with threading.Lock(): failure_count[0] += 1 except socket.timeout: error_logger.error(f"连接 {ip} 超时") print(f"连接 {ip} 超时") with threading.Lock(): failure_count[0] += 1 except Exception as e: error_logger.error(f"连接 {ip} 时出现未知错误: {str(e)}") print(f"连接 {ip} 时出现未知错误: {str(e)}") with threading.Lock(): failure_count[0] += 1
核心逻辑:- 调用
invoke_shell() 开启交互式 shell(适合网络设备的命令行交互); - 逐条发送命令,循环接收执行结果(避免数据接收不完整);
- 将每个命令的结果写入 Excel 的不同工作表,调整列宽;
- 捕获认证失败、SSH 错误、超时等异常,线程安全更新成功 / 失败计数(用
threading.Lock() 避免竞态)。
def execute_commands(device_info, command_list, success_logger, error_logger, timeout=5, recv_wait_time=(0.5, 1)): """ 执行命令并将结果保存到 txt 文件 :param device_info: 设备信息字典 :param command_list: 命令列表 :param success_logger: 成功日志记录器 :param error_logger: 错误日志记录器 :param timeout: SSH 连接超时时间,默认为 5 秒 :param recv_wait_time: 接收数据的等待时间元组,(发送命令后的等待时间,接收数据时的接收等待时间),默认为 (0.5, 1) 秒 :return: 总的成功计数和总的失败计数 """ threads = [] success_count = [0] failure_count = [0] for ip, (username, password) in device_info.items(): # 为每个设备创建一个线程 thread = threading.Thread(target=execute_commands_for_device, args=(ip, username, password, command_list, success_logger, error_logger, success_count, failure_count, timeout, recv_wait_time)) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() return success_count[0], failure_count[0]
关键:- 用列表
success_count = [0] 存储计数(因为列表是可变对象,线程内修改会同步到主线程); thread.join()
def main(): """ 主函数,调用其他函数完成程序流程 """ start_time = datetime.datetime.now() print(f"程序开始运行: {start_time}\n =======================================================") ensure_result_folder() current_time = get_current_time() success_logger = configure_logger('success', f'result/success_ssh_operations_{current_time}.log', logging.INFO) error_logger = configure_logger('error', f'result/error_ssh_operations_{current_time}.log', logging.ERROR) try: device_info, command_list = read_info() success_count, failure_count = execute_commands(device_info, command_list, success_logger, error_logger) print(f"成功执行的设备数量: {success_count}") print(f"执行失败的设备数量: {failure_count}") success_logger.info(f"成功执行的设备数量: {success_count}") error_logger.error(f"执行失败的设备数量: {failure_count}") except Exception as e: error_logger.error(f"程序运行出现严重错误: {str(e)}") print(f"=======================================================") end_time = datetime.datetime.now() print(f"程序运行结束: {end_time}\n =======================================================") run_time = end_time - start_time total_seconds = run_time.total_seconds() hours = int(total_seconds // 3600) minutes = int((total_seconds % 3600) // 60) seconds = int(total_seconds % 60) print(f"程序运行时间: {hours} 小时 {minutes} 分钟 {seconds} 秒") print(f'此次采集顺利完成') # 使用 tkinter 显示消息 root = tk.Tk() root.withdraw() messagebox.showinfo("程序完成", f"程序运行时间: {hours} 小时 {minutes} 分钟 {seconds} 秒\n成功执行的设备数量: {success_count}\n执行失败的设备数量: {failure_count}\n此次采集顺利完成!") root.mainloop()if __name__ == "__main__" and datetime.datetime.now() > datetime.datetime(2025, 2, 20) and datetime.datetime.now() < datetime.datetime(2026, 2, 20): main()else: print('没有授权,请联系管理员!')
核心流程:- 入口处增加授权时间限制(仅 2025.2.20 - 2026.2.20 可运行)
pyinstaller -F xxxx.py(要打包的py文件)-----单个文件大
pyinstaller -D xxxx.py(要打包的py文件) ----->优选