学习笔记 | 实验手册 | 考试咨询 | 就业辅导
场景还原:凌晨3点,运维群炸了——"192.168.10.x网段大面积丢包!"你打开Xshell,手动ping了半小时,发现是.10.87那台核心交换机挂了...如果有个工具能自动巡检254台设备,3秒出结果呢?
投稿人✍️:热心市民胡同学
编辑排版:铲屎官小五🐱
一、为什么会想设计这个工具?
实际应用场景举例
场景1:日常巡检
每天早上9点自动检查:├── 核心交换机 (192.168.1.1-10)├── 服务器区 (192.168.10.1-100)└── 办公区 (192.168.20.1-254)
场景2:故障排查
网络异常时快速定位:某网段突然异常 → 3秒扫描254个IP → 精准定位故障设备
场景3:变更验证
网络割接后验证:新增VLAN后,一键验证所有设备可达性
胡同学这次的方案
Jinja2 (规则描述) + Python (执行引擎) + Ping3 (探测能力) ↓ 一个文件搞定所有事!
三大核心能力:
- 动态生成IP - 用规则代替硬编码,3行顶254行
三、环境准备(2分钟搞定)
Step 1: 安装依赖
pip install ping3 jinja2
Step 2: 权限检查
重要提示:ping需要发送ICMP包,需要特殊权限
# Linux/Mac 用户sudo python3 your_script.py# 或者给Python解释器加权限(一次性)sudo setcap cap_net_raw+ep $(readlink -f $(which python3))# Windows 用户# 以管理员身份运行CMD/PowerShell即可
Step 3: 验证安装
from ping3 import pingprint(ping('127.0.0.1')) # 应该返回一个小数(延迟时间)
四、完整代码(可直接运行)
核心代码 - 一个文件搞定
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""网络设备批量Ping探测工具Author: 热心市民小胡同学Version: 2.0"""from ping3 import pingfrom datetime import datetimefrom jinja2 import Templatefrom pathlib import Pathimport sys# ===== 配置区 =====IP_JINJA_LIST = r"""{% for i in range(1, 10) %}192.168.10.{{ i }}{% endfor %}"""LOG_FILE = "ping_log.txt"TIMEOUT = 1# 超时时间(秒)# ===== 核心函数 =====defpull_ip_list(template_str: str) -> list:""" 从Jinja2模板生成IP列表 Args: template_str: Jinja2模板字符串 Returns: IP地址列表 ['192.168.10.1', '192.168.10.2', ...] """try: tpl = Template(template_str) rendered = tpl.render()# 过滤空行和注释 ip_list = [ line.split('#')[0].strip() for line in rendered.splitlines() if line.strip() andnot line.strip().startswith('#') ]return ip_listexcept Exception as e: print(f"❌ 模板解析失败: {e}") sys.exit(1)defdo_ping(ip: str, log_file: str = LOG_FILE, timeout: int = TIMEOUT) -> None:""" 执行ping探测并记录日志 Args: ip: 目标IP地址 log_file: 日志文件路径 timeout: 超时时间 """try: response = ping(ip, timeout=timeout) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")if response isNone:# 网络不可达 status = "❌ 不可达" print(f"{status:12s} | {ip:15s}") log_msg = f"{now} | {ip:15s} | 不可达\n"else:# 网络可达,显示RTT rtt_ms = response * 1000 status = f"✅ 可达" print(f"{status:12s} | {ip:15s} | RTT: {rtt_ms:6.2f}ms") log_msg = f"{now} | {ip:15s} | 可达 | RTT: {rtt_ms:.2f}ms\n"# 追加写入日志 Path(log_file).parent.mkdir(exist_ok=True)with open(log_file, 'a', encoding='utf-8') as f: f.write(log_msg)except PermissionError: print("❌ 权限不足!请使用 sudo 运行或以管理员身份运行") sys.exit(1)except Exception as e: print(f"❌ {ip:15s} | 探测异常: {e}")defprint_banner():"""打印工具启动横幅""" banner = """╔════════════════════════════════════════════╗║ 网络设备批量探测工具 版本 2.0 ║ ║ 作者:热心市民小胡同学 ║╚════════════════════════════════════════════╝ """ print(banner)# ===== 主程序 =====if __name__ == "__main__": print_banner()# 1. 生成IP列表 ip_list = pull_ip_list(IP_JINJA_LIST) print(f"📋 本次共需探测 {len(ip_list)} 个IP地址\n") print(f"{'状态':12s} | {'IP地址':15s} | 详情") print("-" * 50)# 2. 在日志文件中插入分隔符with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(f"\n{'='*60}\n") f.write(f"探测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"{'='*60}\n")# 3. 批量pingfor ip in ip_list: do_ping(ip)# 4. 输出统计信息 print("-" * 50) print(f"✅ 探测完成!日志已保存至: {LOG_FILE}")
📚 五、代码详解(逐行分析)
模块1:IP规则定义层
IP_JINJA_LIST = r"""{% for i in range(1, 10) %}192.168.10.{{ i }}{% endfor %}"""
知识点解析:
| | |
|---|
r"""...""" | | r"\n" |
{% for %} | | |
{{ variable }} | | |
实际效果:
192.168.10.1192.168.10.2...192.168.10.9
模块2:IP列表生成器
defpull_ip_list(template_str: str) -> list: tpl = Template(template_str) # 创建模板对象 rendered = tpl.render() # 渲染模板 ip_list = [ # 解析为列表 line.split('#')[0].strip() # 支持行内注释for line in rendered.splitlines() if line.strip() andnot line.strip().startswith('#') ]return ip_list
处理流程:
Jinja2模板 → 渲染成文本 → 按行分割 → 过滤空行/注释 → 返回列表
支持的高级用法:
# ✅ 支持行内注释192.168.10.1# 核心交换机192.168.10.2# 备份交换机# ✅ 支持整行注释# 这是一行注释192.168.10.3
模块3:Ping探测引擎
defdo_ping(ip: str, log_file: str = LOG_FILE, timeout: int = TIMEOUT): response = ping(ip, timeout=timeout)if response isNone:# 返回None = 网络不可达 status = "❌ 不可达"else:# 返回float = RTT延迟(秒) rtt_ms = response * 1000# 转换为毫秒 status = "✅ 可达"
Ping3返回值说明:
🎬 六、运行效果展示
控制台输出
╔════════════════════════════════════════════╗║ 网络设备批量探测工具 版本 2.0 ║║ 作者:热心市民小胡同学 ║╚════════════════════════════════════════════╝📋 本次共需探测 9 个IP地址状态 | IP地址 | 详情--------------------------------------------------✅ 可达 | 192.168.10.1 | RTT: 2.34ms❌ 不可达 | 192.168.10.2 |✅ 可达 | 192.168.10.3 | RTT: 1.87ms✅ 可达 | 192.168.10.4 | RTT: 15.62ms❌ 不可达 | 192.168.10.5 |✅ 可达 | 192.168.10.6 | RTT: 3.21ms✅ 可达 | 192.168.10.7 | RTT: 2.98ms✅ 可达 | 192.168.10.8 | RTT: 4.12ms✅ 可达 | 192.168.10.9 | RTT: 2.76ms--------------------------------------------------✅ 探测完成!日志已保存至: ping_log.txt
日志文件示例
============================================================探测时间: 2026-01-10 15:30:25============================================================2026-01-10 15:30:25 | 192.168.10.1 | 可达 | RTT: 2.34ms2026-01-10 15:30:26 | 192.168.10.2 | 不可达2026-01-10 15:30:27 | 192.168.10.3 | 可达 | RTT: 1.87ms...
七、实战应用场景
场景1:多网段巡检
IP_JINJA_LIST = r"""{% for subnet in [10, 20, 30] %} {% for host in range(1, 255) %}192.168.{{ subnet }}.{{ host }} {% endfor %}{% endfor %}"""
效果:自动生成3个C段共762个IP
场景2:特定设备监控
IP_JINJA_LIST = r"""{% set devices = { 'core-sw-1': '192.168.1.1', 'core-sw-2': '192.168.1.2', 'firewall': '192.168.1.254', 'db-server': '192.168.10.100', 'web-server-1': '192.168.10.101', 'web-server-2': '192.168.10.102'} %}{% for name, ip in devices.items() %}{{ ip }} # {{ name }}{% endfor %}"""
效果:只监控关键设备,带设备名注释
场景3:跳跃式IP段
IP_JINJA_LIST = r"""{% for i in range(1, 21) %}192.168.10.{{ i * 10 }} {# 只ping .10, .20, .30...200 #}{% endfor %}"""
效果:192.168.10.10, 192.168.10.20, ... 192.168.10.200
场景4:排除特定IP
IP_JINJA_LIST = r"""{% for i in range(1, 255) %} {% if i not in [1, 254] %} {# 排除网关和广播地址 #}192.168.10.{{ i }} {% endif %}{% endfor %}"""
八、进阶功能扩展
扩展1:并发Ping(速度提升10倍)
from concurrent.futures import ThreadPoolExecutordefbatch_ping_concurrent(ip_list, max_workers=20):"""并发执行ping,大幅提升速度""" print(f"🚀 使用 {max_workers} 个线程并发探测...")with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(do_ping, ip_list)
使用方式:
# 替换原来的for循环# for ip in ip_list:# do_ping(ip)# 改为并发执行batch_ping_concurrent(ip_list, max_workers=20)
性能对比:
扩展2:生成统计报告
defgenerate_report(log_file: str):"""分析日志并生成统计报告"""with open(log_file, 'r', encoding='utf-8') as f: lines = f.readlines() total = 0 reachable = 0 unreachable = 0for line in lines:if'可达'in line and'不可达'notin line: reachable += 1 total += 1elif'不可达'in line: unreachable += 1 total += 1if total > 0: rate = (reachable / total) * 100 print(f"\n📊 统计报告") print(f"{'='*40}") print(f"总探测数: {total}") print(f"✅ 可达: {reachable}") print(f"❌ 不可达: {unreachable}") print(f"📈 可达率: {rate:.2f}%")
🔐 安全建议
敏感信息加密存储
# 使用环境变量import osWEBHOOK_URL = os.getenv('WECHAT_WEBHOOK')
日志轮转
from logging.handlers import RotatingFileHandler# 单个日志文件最大10MB,保留5个备份
访问控制
chmod 700 monitor.pychown monitor:monitor monitor.py
胡同学也是跟着我们的网络自动化课程边学习边动手
有正反馈后也特别有动力,大家也可以跟着动手练习。
🤔 思考题
如何实现"ping不通自动重试3次"?
defdo_ping_with_retry(ip, max_retries=3):for i in range(max_retries): response = ping(ip)if response isnotNone:returnTruereturnFalse
核心思想:
用"规则"代替"硬编码"用"自动化"代替"手工操作"用"工程化思维"解决运维问题
再次感谢小胡同学的分享📒
这个工具虽小,但也是小胡同学自己通过思考和动手迈出的一小步
希望这个工具也能成为你自动化之路的第一步