一、文章目录导航
本文为 SRE 运维工程师必备 Python 网络协议与自动化实战 预习手册
所有知识点均配 可直接运行代码 + 逐行解释 + 生产实践总结,适合预习
目录
1.基础工具模块
1.1 ipaddress IP 处理
1.2 winreg 注册表读取
1.3 netifaces 网卡信息
1.4 IP 列表排序
1.5Kamene(Scapy) 网络发包工具
2.ARP 协议实战
2.1 ARP 请求构造
2.2 ARP 局域网扫描
2.3 ARP 毒化(测试用)
2.4 耗时统计装饰器
3.ICMP 协议实战
3.1 ICMP 请求构造
3.2 ICMP 网段扫描
3.3 IPv6 ND 包构造
4.UDP 协议实战
4.1 UDP 服务端
4.2 UDP 客户端
4.3 struct 二进制打包
4.4 手动构造 UDP 包
5.SNMP 运维监控核心
5.1 SNMP 基础概念
5.2 Cisco 设备配置
5.3 SNMP 7 种操作
5.4 SNMPv2 采集
5.5 SNMPv3 采集
6.SNMP 数据可视化
6.1 matplotlib 中文不乱码
6.2 实战:SNMP → SQLite → 绘图
6.3 实战:SNMP → InfluxDB → Grafana
1SRE 视角总结与生产最佳实践
二、纯净 Markdown 完整版
Markdown # Python 网络协议与运维自动化实战手册 > SRE 工程师必备|逐行可运行|生产可用|公众号专属版 --- # 0 基础工具模块 ## 0.1 ipaddress —— IP 地址标准化处理 ```python import ipaddress # 解析单个 IPv4 地址 ip = ipaddress.IPv4Address("192.168.10.20") print("IP:", ip) print("是否私网:", ip.is_private) print("是否回环:", ip.is_loopback) # 解析网段并遍历可用主机 net = ipaddress.IPv4Network("192.168.10.0/24", strict=False) print("\n网段:", net.network_address) print("广播地址:", net.broadcast_address) # 打印前 5 个可用 IP for idx, host in enumerate(net.hosts()): print(host) if idx == 4: break
|
SRE 要点
统一 IP 校验逻辑,避免脚本因非法 IP 崩溃
批量扫描、资产梳理必用工具
0.2 winreg —— Windows 注册表读取
Python import winreg key_path = r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces" try: with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path) as key: i = 0 while True: try: guid = winreg.EnumKey(key, i) sub_path = f"{key_path}\\{guid}" with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, sub_path) as sub_key: ip, _ = winreg.QueryValueEx(sub_key, "IPAddress") if ip: print("网卡IP:", ip) i += 1 except OSError: break except Exception as e: print("无权限或非Windows环境")
|
SRE 要点
适合 Windows 服务器自动化巡检
必须管理员权限运行,否则读取失败
0.3 netifaces —— 跨平台获取网卡信息
Bash pip install netifaces
|
Python import netifaces # 列出所有网卡 print("网卡列表:", netifaces.interfaces()) # 获取 eth0 信息 if "eth0" in netifaces.interfaces(): addrs = netifaces.ifaddresses("eth0") if netifaces.AF_INET in addrs: print("IPv4:", addrs[netifaces.AF_INET][0]["addr"]) if netifaces.AF_LINK in addrs: print("MAC:", addrs[netifaces.AF_LINK][0]["addr"]) # 获取默认网关 print("\n网关:", netifaces.gateways().get("default", {}))
SRE 要点
Linux / Windows / macOS 通用
比手动解析 /proc 或注册表更稳定
0.4 sort_ip —— IP 列表自然排序
Python def sort_ip(ips): return sorted(ips, key=lambda x: tuple(map(int, x.split(".")))) ip_list = ["192.168.2.10", "10.0.0.1", "192.168.1.100", "192.168.1.1"] print(sort_ip(ip_list))
SRE 要点
直接 sorted() 会按字符串乱序
必须转成数字元组才能实现网段排序
1 Kamene(Scapy) 网络发包工具
Python from kamene.all import * # 简单 ping def ping(ip): p = IP(dst=ip)/ICMP() resp = sr1(p, timeout=2, verbose=0) return resp is not None print("192.168.10.1 是否在线:", ping("192.168.10.1"))
|
SRE 要点
需要 root / 管理员权限
可构造任意协议包:ARP / TCP / UDP / ICMP / VLAN 等
2 ARP 协议实战
2.1 ARP 请求构造
Python from kamene.all import * p = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst="192.168.10.1") p.show()
|
2.2 ARP 扫描(局域网资产发现)
Python from kamene.all import * def arp_scan(net): p = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=net) ans, _ = srp(p, timeout=2, verbose=0) return [{"ip": r.psrc, "mac": r.hwsrc} for s, r in ans] hosts = arp_scan("192.168.10.0/24") for h in hosts: print(h["ip"], h["mac"])
|
SRE 要点
比 ping 扫描更可靠
可发现禁 ping 但存在 ARP 响应的设备
2.3 ARP 毒化(仅测试环境)
Python from kamene.all import * import time def arp_spoof(target, gw, iface="eth0"): while True: send(ARP(op=2, pdst=target, psrc=gw), verbose=0) time.sleep(1) # arp_spoof("192.168.10.20", "192.168.10.1")
|
SRE 要点
仅授权环境使用
可用于流量引流、安全测试、故障模拟
2.4 装饰器:统计函数耗时
Python import time from functools import wraps def timer(func): @wraps(func) def wrapper(*args, **kwargs): t = time.time() res = func(*args, **kwargs) print(f"{func.__name__} 耗时 {time.time()-t:.2f}s") return res return wrapper @timer def test(): time.sleep(1) test()
|
SRE 要点
批量扫描、巡检脚本必加
快速定位慢任务、超时瓶颈
3 ICMP 协议实战
3.1 ICMP 请求构造
Python from kamene.all import * p = IP(dst="192.168.10.1") / ICMP() / b"test" p.show()
|
3.2 ICMP 网段扫描
Python from kamene.all import * import ipaddress def icmp_scan(net): up = [] for ip in ipaddress.IPv4Network(net, strict=False).hosts(): ip_str = str(ip) r = sr1(IP(dst=ip_str)/ICMP(), timeout=0.5, verbose=0) if r: up.append(ip_str) return up print(icmp_scan("192.168.10.0/24"))
3.3 IPv6 ND 包构造
Python from kamene.all import * p = IPv6(dst="2001:db8::1") / ICMPv6ND_NS(tgt="2001:db8::1") p.show()
|
4 UDP 协议实战
4.1 UDP 服务端
Python import socket def udp_server(host="0.0.0.0", port=9999): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind((host, port)) while True: data, addr = s.recvfrom(1024) print(addr, data.decode()) s.sendto(b"ok", addr) udp_server()
|
4.2 UDP 客户端
Python import socket def udp_client(msg="hello"): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.sendto(msg.encode(), ("127.0.0.1", 9999)) print(s.recvfrom(1024)) udp_client()
4.3 struct 二进制打包
Python import struct # 网络字节序打包 buf = struct.pack("!HHH", 1234, 80, 16) print(buf) # 解包 a,b,c = struct.unpack("!HHH", buf) print(a,b,c)
SRE 要点
用于解析私有协议、抓包分析、流量回放
网络协议一律使用 ! 网络字节序
4.4 手动构造 UDP 包
Python import socket import struct udp_hdr = struct.pack("!HHHH", 12345, 9999, 8+4, 0) data = b"test" pkt = udp_hdr + data s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP) s.sendto(pkt, ("127.0.0.1", 9999))
|
5 SNMP 实战(运维监控核心)
5.1 SNMP 核心概念
OID:设备指标唯一标识
v2c:团体字明文,简单
v3:加密认证,生产必用
161/udp:采集;162/udp:trap 告警
5.2 Cisco 常用 SNMP 配置
Bash # v2c snmp-server community public RO snmp-server host 192.168.10.100 version 2c public # v3 snmp-server group v3grp v3 priv snmp-server user snmpuser v3grp v3 auth sha Auth@123 priv aes128 Priv@123
|
5.3 SNMP 7 种操作
get-request
get-next
get-bulk
set
response
trap
inform
5.4 SNMPv2 采集
Python from pysnmp.hlapi import * def snmp_get(ip, community, oid): errInd, errStat, errIdx, varBinds = next( getCmd(SnmpEngine(), CommunityData(community), UdpTransportTarget((ip, 161)), ContextData(), ObjectType(ObjectIdentity(oid))) ) if not errInd and not errStat: return varBinds[0][1] # 系统描述 print(snmp_get("192.168.10.2", "public", "1.3.6.1.2.1.1.1.0"))
|
5.5 SNMPv3 采集
Python from pysnmp.hlapi import * def snmp_v3_get(ip, user, auth, priv, oid): errInd, errStat, errIdx, varBinds = next( getCmd(SnmpEngine(), UsmUserData(user, authKey=auth, privKey=priv, authProtocol=usmHMACSHAAuthProtocol, privProtocol=usmAesCfb128Protocol), UdpTransportTarget((ip, 161)), ContextData(), ObjectType(ObjectIdentity(oid))) ) return varBinds[0][1] if not errInd else None
6 SNMP 数据可视化
6.1 matplotlib 中文不乱码设置
Python import matplotlib.pyplot as plt plt.rcParams["font.sans-serif"] = ["WenQuanYi Zen Hei", "SimHei"] plt.rcParams["axes.unicode_minus"] = False
|
6.2 实战:SNMP → SQLite3 → 绘图
Python import sqlite3 import time from pysnmp.hlapi import * import matplotlib.pyplot as plt # 建库 conn = sqlite3.connect("snmp.db") c = conn.cursor() c.execute("CREATE TABLE IF NOT EXISTS cpu (ts TEXT, val REAL)") conn.commit() conn.close() # 采集 def get_cpu(ip, com): return snmp_get(ip, com, "1.3.6.1.4.1.9.2.1.57.0") # 入库 for _ in range(5): val = get_cpu("192.168.10.2", "public") ts = time.strftime("%H:%M:%S") conn = sqlite3.connect("snmp.db") c = conn.cursor() c.execute("INSERT INTO cpu VALUES (?, ?)", (ts, float(val))) conn.commit() conn.close() time.sleep(10) # 绘图 conn = sqlite3.connect("snmp.db") c = conn.cursor() rows = c.execute("SELECT * FROM cpu").fetchall() conn.close() ts = [r[0] for r in rows] val = [r[1] for r in rows] plt.plot(ts, val) plt.title("CPU 使用率") plt.xticks(rotation=45) plt.tight_layout() plt.savefig("cpu.png")
|
6.3 实战:SNMP → InfluxDB → Grafana
Python from influxdb import InfluxDBClient client = InfluxDBClient("localhost", 8086, "admin", "admin", "snmp") client.create_database("snmp") def write_influx(val): body = [{"measurement": "cpu", "fields": {"value": val}}] client.write_points(body) # 配合 Grafana 做大盘 while True: val = get_cpu("192.168.10.2", "public") write_influx(float(val)) time.sleep(10)
SRE 视角整体总结
一、这套技术栈在生产环境解决什么问题
资产发现:ARP / ICMP 批量扫描,自动梳理在线设备
网络排障:Scapy 抓包、构造报文,定位二层/三层问题
设备监控:SNMP 采集 CPU、内存、端口流量、温度
数据可视化:从脚本采集 → 入库 → 大盘展示,形成完整监控链路
自动化巡检:定时任务 + 多线程 + 异常捕获,7×24 小时巡检
二、生产环境最佳实践
权限最小化
Scapy/raw socket 仅堡垒机/专用采集机使用
SNMP 生产用 v3,禁止 v2c 公网暴露
稳定性保障
所有网络操作加超时
批量任务用线程池/协程,避免阻塞
异常捕获全覆盖,防止脚本崩溃
数据治理
监控数据走时序库(InfluxDB/Prometheus)
历史数据保留策略、聚合策略
安全合规
ARP/ND 毒化仅用于授权测试
禁止在生产环境随意发包、扫描
三、进阶路线
大规模采集:asyncio + aioSNMP
自动化平台:FastAPI + Ansible + 网络设备 API
智能监控:异常检测、阈值告警、自动自愈
流量分析:eBPF + Scapy + 可视化平台