## 引言
Scapy是一个强大的Python网络数据包操纵库。它可以构造、发送、捕获和分析网络数据包,是网络安全工程师和网络管理员的必备工具。
本文将教你如何使用Scapy进行网络流量分析和数据包处理。
>**法律声明**:捕获和分析网络流量仅应用于你自己的网络或获得书面授权的网络。未经授权的监听可能违反《网络安全法》。
## 第一部分:Scapy基础
```python
# 安装: pip install scapy
from scapy.all import *
import time
import struct
classPacketAnalyzer:
"""
网络数据包分析器
Scapy支持解析数百种协议,这里演示最基础的用法。
"""
def__init__(self, interface: str = None):
"""
初始化分析器
Args:
interface: 网络接口名称(如 'eth0', 'Wi-Fi')
"""
self.interface = interface or conf.iface
self.packet_count = 0
self.protocol_stats = {}
defhello_scapy(self):
"""Scapy的Hello World —— 发送一个ping包"""
print("[*] 发送ICMP Ping包...")
answer, unanswered = sr(IP(dst="127.0.0.1") / ICMP(),
timeout=2, verbose=False)
for sent, recv in answer:
print(f" 响应: {recv[IP].src} 延迟: "
f"{recv.time - sent.time:.4f}s")
defcapture_packets(self, count: int = 10,
filter_exp: str = None) -> list:
"""
抓包
Args:
count: 捕获的包数量
filter_exp: pcap过滤器表达式(如 "tcp port 80")
Returns:
捕获的Packet列表
"""
print(f"[*] 正在捕获 {count} 个数据包...")
print(f"[*] 网络接口: {self.interface}")
if filter_exp:
print(f"[*] 过滤器: {filter_exp}")
packets = sniff(count=count,
iface=self.interface,
prn=self._display_packet,
filter=filter_exp)
print(f"\n[*] 共捕获 {len(packets)} 个数据包")
returnlist(packets)
def_display_packet(self, packet):
"""
显示单个包的简要信息
这是sniff回调函数的典型用法——
每收到一个包就调用此函数。
"""
self.packet_count += 1
self._count_protocol(packet)
# 提取基本信息
ifIPin packet:
src = packet[IP].src
dst = packet[IP].dst
proto = packet[IP].proto
ttl = packet[IP].ttl
proto_names = {
1: "ICMP", 6: "TCP", 17: "UDP",
50: "ESP", 51: "AH", 89: "OSPF"
}
proto_name = proto_names.get(proto, f"未知({proto})")
# 获取层信息
layer_info = ""
ifTCPin packet:
layer_info = (f"TCP {src}:{packet[TCP].sport} "
f"-> {dst}:{packet[TCP].dport}")
flags = ""
if packet[TCP].flags.S: flags += "S"
if packet[TCP].flags.A: flags += "A"
if packet[TCP].flags.P: flags += "P"
if packet[TCP].flags.F: flags += "F"
if flags:
layer_info += f" [{flags}]"
elifUDPin packet:
layer_info = (f"UDP {src}:{packet[UDP].sport} "
f"-> {dst}:{packet[UDP].dport}")
elifICMPin packet:
layer_info = (f"ICMP {src} -> {dst} "
f"type={packet[ICMP].type} "
f"code={packet[ICMP].code}")
print(f" #{self.packet_count}: "
f"{proto_name:<6} "
f"{layer_info}")
def_count_protocol(self, packet):
"""统计各协议出现的次数"""
ifIPin packet:
proto = packet[IP].proto
proto_names = {
1: "ICMP", 6: "TCP", 17: "UDP",
50: "ESP", 51: "AH", 89: "OSPF"
}
name = proto_names.get(proto, f"Proto-{proto}")
self.protocol_stats[name] = self.protocol_stats.get(name, 0) + 1
defshow_protocol_stats(self):
"""显示协议统计结果"""
print("\n" + "=" * 60)
print("协议统计")
print("=" * 60)
for proto, count insorted(self.protocol_stats.items(),
key=lambdax: x[1], reverse=True):
print(f" {proto:<12}: {count:>5} 包")
defanalyze_tcp_handshake(self, packets: list):
"""
分析TCP三次握手
TCP三次握手是网络连接的基础:
1. Client -> Server: SYN (synchronize)
2. Server -> Client: SYN-ACK (synchronize-acknowledge)
3. Client -> Server: ACK (acknowledge)
"""
print("\n" + "=" * 60)
print("TCP握手分析")
print("=" * 60)
handshake_sequences = {}
for packet in packets:
ifTCPin packet andIPin packet:
src = packet[IP].src
dst = packet[IP].dst
sport = packet[TCP].sport
dport = packet[TCP].dport
flags = str(packet[TCP].flags)
# 识别SYN包
if'S'in flags and'A'notin flags:
seq_key = f"{src}:{sport} -> {dst}:{dport}"
handshake_sequences[seq_key] = {
"syn": packet.time,
"syn_ack": None,
"ack": None
}
# 识别SYN-ACK包
if'SA'in flags:
# 查找对应的SYN方向
for seq_key, sequence in handshake_sequences.items():
if seq_key.endswith(f"->{src}:{dport}"):
reverse = f"{dst}:{dport} -> {src}:{sport}"
if reverse in handshake_sequences:
handshake_sequences[reverse]["syn_ack"] = packet.time
break
# 识别ACK包(仅第三次)
if'A'in flags and'S'notin flags:
for seq_key, sequence in handshake_sequences.items():
if seq_key.endswith(f"->{dst}:{dport}"):
handshake_sequences[seq_key]["ack"] = packet.time
break
for seq_key, sequence in handshake_sequences.items():
if sequence["syn"]:
syn_time = sequence["syn"]
syn_ack_time = sequence.get("syn_ack") or syn_time
ack_time = sequence.get("ack") or syn_time
latency1 = syn_ack_time - syn_time
latency2 = ack_time - syn_ack_time
total = latency2 + latency1
print(f"\n 连接: {seq_key}")
print(f" SYN -> {latency1*1000:.1f}ms")
print(f" SYN-ACK -> {latency2*1000:.1f}ms")
print(f" 总握手延迟: {total*1000:.1f}ms")
defanalyze_http_traffic(self, packets: list):
"""
分析HTTP流量
识别HTTP请求和响应,提取关键信息
"""
print("\n" + "=" * 60)
print("HTTP流量分析")
print("=" * 60)
http_requests = {}
http_responses = {}
for packet in packets:
if Raw in packet andIPin packet:
payload = packet[Raw].load.decode("utf-8", errors="ignore")
src = packet[IP].src
dst = packet[IP].dst
if payload.startswith(("GET ", "POST ", "PUT ", "DELETE ",
"HEAD ", "OPTIONS ", "PATCH ")):
parts = payload.split("\r\n")
first_line = parts[0]
method, path, protocol = first_line.split(" ", 2)
key = f"{src}:{packet[TCP].sport} -> {dst}:{packet[TCP].dport}"
http_requests[key] = {
"method": method,
"path": path,
"protocol": protocol,
"raw": first_line,
"headers": {}
}
# 提取头部
for line in parts[1:]:
if": "in line:
h_name, h_value = line.split(": ", 1)
http_requests[key]["headers"][h_name] = h_value
elif payload.startswith(("HTTP/1.", "HTTP/2.")):
parts = payload.split("\r\n")
first_line = parts[0]
protocol, status_code, status_msg = first_line.split(" ", 2)
key = f"{src}:{packet[TCP].sport} -> {dst}:{packet[TCP].dport}"
http_responses[key] = {
"protocol": protocol,
"status": status_code,
"message": status_msg,
}
print("\n【HTTP请求】")
for key, req in http_requests.items():
print(f" {req['method']:7s}{req['path'][:50]:50s} "
f"-> {req['headers'].get('Host', '?')[:30]}")
print("\n【HTTP响应】")
for key, resp in http_responses.items():
print(f" {resp['protocol']}{resp['status']}{resp['message']}")
# ===== 数据包构造示例 =====
classPacketBuilder:
"""
自定义数据包构造器
展示了如何使用Scapy构建各种网络数据包
"""
@staticmethod
defconstruct_ping():
"""构造一个简单的ICMP Ping包"""
print("[*] 构造ICMP Ping包")
ping = IP(dst="127.0.0.1") / ICMP()
print(f" 数据包: {ping.summary()}")
# print(f" 完整内容:\n{ping.show()}")
@staticmethod
defconstruct_tcp_syn(ip: str, port: int):
"""构造TCP SYN包(半开放扫描的基础)"""
print(f"[*] 构造TCP SYN包 -> {ip}:{port}")
syn = IP(src="192.168.1.100", dst=ip) / \
TCP(sport=RandShort(), dport=port, flags="S")
print(f" 数据包: {syn.summary()}")
print(f" 源IP: {syn[IP].src}")
print(f" 目标IP: {syn[IP].dst}")
print(f" 源端口: {syn[TCP].sport}")
print(f" 目标端口: {syn[TCP].dport}")
print(f" 标志: {syn[TCP].flags}")
@staticmethod
defconstruct_dns_query(domain: str = "example.com"):
"""构造DNS查询包"""
print(f"[*] 构造DNS查询: {domain}")
dns_pkt = IP(dst="8.8.8.8") / UDP() / \
DNS(rd=1, qd=DNSQR(qname=domain, qtype="A"))
print(f" 数据包: {dns_pkt.summary()}")
# 发送并等待响应
ans, unans = sr(dns_pkt, timeout=2, verbose=False)
for sent, recv in ans:
ifDNSin recv and pkt[RR]:
answers = recv[DNS].an
if answers:
print(f" 响应: {answers.rdata}")
@staticmethod
defconstruct_arp_request(network: str = "192.168.1.0/24"):
"""构造ARP请求包"""
print(f"[*] 构造ARP请求: 查询 {network}")
arp = ARP(pdst=network)
ether = Ether(bcast="ff:ff:ff:ff:ff:ff")
arp_pkt = ether / arp
print(f" 数据包: {arp_pkt.summary()}")
# ===== 主程序演示 =====
if__name__ == "__main__":
# 1. Hello Scapy
analyzer = PacketAnalyzer()
analyzer.hello_scapy()
# 2. 构造数据包
builder = PacketBuilder()
builder.construct_ping()
builder.construct_tcp_syn("192.168.1.1", 80)
builder.construct_dns_query("example.com")
# 3. 抓包分析
print("\n" + "=" * 60)
print("抓包分析")
print("=" * 60)
packets = analyzer.capture_packets(count=5,
filter_exp="tcp or udp")
analyzer.show_protocol_stats()
```
## 第二部分:进阶用法
### 网络扫描器(基于Scapy)
```python
classScapyPortScanner:
"""
使用Scapy实现的轻量级端口扫描器
比Socket方式更灵活,可以自定义标志位
"""
def__init__(self, target: str, interface: str = None):
self.target = target
self.interface = interface or conf.iface
defscan_port(self, port: int, timeout: float = 1.0) -> str:
"""扫描单个端口"""
pkt = IP(dst=self.target) / TCP(dport=port, flags="S")
ans = sr1(pkt, timeout=timeout, verbose=False)
if ans isNone:
return"FILTERED"
elif ans.haslayer(TCP):
if ans[TCP].flags == 0x12: # SYN-ACK
# 发送RST重置连接
rsto = IP(dst=self.target) / \
TCP(dport=port, flags="RA")
send(rsto, verbose=False)
return"OPEN"
elif ans[TCP].flags == 0x14: # RST-ACK
return"CLOSED"
return"UNKNOWN"
defscan_ports(self, ports: list) -> dict:
"""批量扫描端口"""
results = {}
for port in ports:
status = self.scan_port(port)
results[port] = status
if status == "OPEN":
print(f" [+] OPEN: {port}")
elif status == "CLOSED":
print(f" [-] CLOSED: {port}")
else:
print(f" [?] {status}: {port}")
return results
```
## 实战练习
### 练习1:ARP欺骗检测器
编写一个程序检测网络中的ARP欺骗:
- 监控ARP响应包
- 检测同一IP是否有多个MAC地址
- 记录可疑活动的详细信息
### 练习2:自定义协议解析器
编写一个工具解析特定协议(如DNS、HTTP、SSH握手):
- 从捕获的包中提取协议特征
- 展示每个字段的含义
- 标注异常情况
### 练习3:流量可视化仪表板
使用Scapy捕获数据,结合Matplotlib绘制流量图表:
- 每小时/每天的流量趋势
- Top 10通信对(谁在和谁聊天)
- 协议分布饼图
## 总结
本篇教程涵盖了:
- ✅ Scapy基础用法
- ✅ 数据包捕获和分析
- ✅ TCP握手分析
- ✅ HTTP流量提取
- ✅ 自定义数据包构造
- ✅ Scapy端口扫描
掌握了数据包分析后,下一节我们将学习:**DNS信息收集工具**——利用DNS协议进行域名信息收集和技术栈识别。
---
> ⚖️ **版权声明**:本教程仅供学习和研究用途。