这是一个基于 Scapy 的多线程网络嗅探程序,专门用于捕获 DNS 查询请求并检测特定域名。本文将逐行拆解代码,让你理解多线程、队列和回调函数是如何协作的。
一、这个程序能做什么?
简单来说,这个程序会监听网络中的 DNS 查询请求,当发现有设备查询特定域名(比如 captive.apple.com、www.apple.com 等苹果相关域名)时,会在控制台打印出:
2025-05-25 16.30.45 192.168.1.100 -> 8.8.8.8 TTL:64 captive.apple.com
包含:时间、源 IP、目标 IP、TTL 值、查询的域名。
适用场景: - 检测局域网内是否有苹果设备 - 监控特定域名的 DNS 查询 - 学习 Scapy 抓包和多线程编程
二、整体架构:生产者 - 消费者模型
程序的核心是一个经典的生产者 - 消费者模型:
┌─────────────────┐ ┌──────────────┐ ┌──────────────────┐│ sniff 抓包线程 │ ───→ │ 队列 Queue │ ───→ │ 业务逻辑处理线程 ││ (生产者) │ │ (线程安全) │ │ (消费者) │└─────────────────┘ └──────────────┘ └──────────────────┘
•sniff 抓包线程:负责捕获网络包,通过回调函数把包放进队列•队列:线程安全的缓冲区,避免两个线程直接操作共享数据•业务逻辑线程:从队列取包,解析 DNS 信息,匹配关键字,打印结果为什么这样设计?
如果只用单线程,抓包和处理包会串行执行,可能丢包。用多线程 + 队列,抓包和处理可以并行,效率更高。
三、逐段拆解代码
1. 导入模块和全局变量
from scapy.layers.inet import IP, Etherfrom scapy.all import DNS, DNSQRfrom scapy.sendrecv import sniffimport time, queue, threading数据包 = queue.SimpleQueue() # 先进先出队列关键字 = ["captive.apple.com", "www.apple.com", "www.appleiphonecell.com", "www.itools.info", "www.ibook.info", "www.airport.us", "www.thinkdifferent.us"]
解释: - IP, Ether:用于解析 IP 层和以太网层 - DNS, DNSQR:用于解析 DNS 层和 DNS 查询记录 - sniff:抓包函数 - queue.SimpleQueue():创建一个线程安全的 FIFO 队列 - 关键字:要监控的域名列表
2. 回调函数:sniff_callback(生产者)
def sniff_callback(packet:Ether):global 数据包if packet.haslayer(IP) and packet.haslayer(DNS): 数据包.put(packet)
解释: - 这个函数是 sniff 的回调函数,每抓到一个包就会自动调用 - packet.haslayer(IP) and packet.haslayer(DNS):只处理同时包含 IP 层和 DNS 层的包 - 数据包.put(packet):把包放进队列,供消费者线程处理
注意:回调函数只做最简单的事(放队列),不做复杂解析,避免阻塞抓包。
3. 业务逻辑线程(消费者)
这是整个程序最复杂的部分,我们逐行拆解:
def 业务逻辑线程():global 数据包whileTrue:try: data = 数据包.get()
•数据包.get():从队列取出一个包(如果队列为空会阻塞等待)# 1. 确认 DNS 层是否存在ifnot (data.haslayer(DNS)):continue dns_layer = data[DNS]
# 2. 只处理查询请求 (qr=0),忽略响应包if dns_layer.qr != 0:continue
•qr=0 表示 DNS 查询请求,qr=1 表示 DNS 响应# 3. 检查 DNSQR 层是否存在ifnot dns_layer.haslayer(DNSQR):continue
•DNSQR 是 DNS Question Record,包含查询的域名•某些畸形包可能有 DNS 头但没有 Question 部分,需要跳过# 4. 安全获取 qnametry: qname_raw = dns_layer[DNSQR].qnameexcept Exception:continue
•qname 就是查询的域名,如 b'www.apple.com.'•用 try-except 防止访问出错导致线程崩溃# qname 通常是字节串且以点结尾,如 b'www.example.com.'if isinstance(qname_raw, bytes): qname = qname_raw.decode('utf-8', errors='ignore')else: qname = str(qname_raw)# 去除末尾的点if qname.endswith('.'): qname = qname[:-1]
•DNS 协议中域名以点结尾(如 www.apple.com.),需要去掉末尾的点# 5. 匹配关键字if qname in 关键字: t = time.strftime("%Y-%m-%d %H.%M.%S", time.localtime())# 安全获取 IP 信息if data.haslayer(IP): sip = data[IP].src dip = data[IP].dst ttl = data[IP].ttlprint(f"{t} {sip} -> {dip} TTL:{ttl} {qname}")
•src:源 IP,dst:目标 IP,ttl:生存时间except Exception as e:# 捕获其他潜在异常,防止线程崩溃 t = time.strftime("%Y-%m-%d %H.%M.%S", time.localtime())print(t ,e, "\n")
•最外层 try-except:防止任何未预料的异常导致线程退出
4. main 函数:启动流程
def main(): t1 = threading.Thread(target=业务逻辑线程) t1.start()# filter 过滤规则,prn 回调函数,stop_filter 回调函数返回 True 停止抓包# store=0 不缓存到内存,count 嗅探数据包个数,等于 0 为不限制个数 sniff(filter="udp port 53", prn=sniff_callback, store=0, count=0)
解释: - threading.Thread(target=业务逻辑线程):创建业务逻辑线程 - t1.start():启动线程 - sniff(...):开始抓包 - filter="udp port 53":只抓 UDP 53 端口(DNS)的包,减少无关流量 - prn=sniff_callback:设置回调函数 - store=0:不缓存到内存,节省资源 - count=0:不限制抓包数量,一直抓
四、关键知识点总结
1. 为什么用队列?
队列是线程安全的。如果两个线程同时操作同一个列表,可能导致数据错乱。队列内部有锁机制,保证 put 和 get 不会冲突。
2. 为什么用多线程?
•sniff 是阻塞的:抓包时会一直等待新包,如果单线程处理,抓包和处理会串行•多线程并行:抓包线程只管抓,处理线程只管解析,互不干扰3. filter 规则解读
这是 BPF 过滤语法,意思是: - udp:只抓 UDP 协议 - port 53:只抓 53 端口(DNS 默认端口)
其他常用过滤规则: - "tcp port 80":HTTP 流量 - "host 192.168.1.1":指定 IP - "not arp":排除 ARP 包
五、运行效果
启动程序后,当局域网内有设备查询苹果相关域名时,控制台会输出:
2025-05-25 16.30.45 192.168.1.100 -> 8.8.8.8 TTL:64 captive.apple.com2025-05-25 16.31.02 192.168.1.105 -> 114.114.114.114 TTL:128 www.apple.com
注意事项: - 需要管理员/root 权限运行(抓包需要) - 确保已安装 Scapy:pip install scapy - Python 3.12 可直接运行
六、完整代码
from scapy.layers.inet import IP, Etherfrom scapy.all import DNS, DNSQRfrom scapy.sendrecv import sniffimport time, queue, threading数据包 = queue.SimpleQueue() # 先进先出队列关键字 = ["captive.apple.com", "www.apple.com", "www.appleiphonecell.com", "www.itools.info", "www.ibook.info", "www.airport.us", "www.thinkdifferent.us"]# sniff 回调函数,只负责把数据包加入队列def sniff_callback(packet:Ether):global 数据包if packet.haslayer(IP) and packet.haslayer(DNS): 数据包.put(packet)def 业务逻辑线程():global 数据包whileTrue:try: data = 数据包.get()# 1. 确认 DNS 层是否存在ifnot (data.haslayer(DNS)):continue dns_layer = data[DNS]# 2. 只处理查询请求 (qr=0),忽略响应包if dns_layer.qr != 0:continue# 3. 检查 DNSQR 层是否存在ifnot dns_layer.haslayer(DNSQR):continue# 4. 安全获取 qnametry: qname_raw = dns_layer[DNSQR].qnameexcept Exception:continueif isinstance(qname_raw, bytes): qname = qname_raw.decode('utf-8', errors='ignore')else: qname = str(qname_raw)if qname.endswith('.'): qname = qname[:-1]# 5. 匹配关键字if qname in 关键字: t = time.strftime("%Y-%m-%d %H.%M.%S", time.localtime())if data.haslayer(IP): sip = data[IP].src dip = data[IP].dst ttl = data[IP].ttlprint(f"{t} {sip} -> {dip} TTL:{ttl} {qname}")except Exception as e: t = time.strftime("%Y-%m-%d %H.%M.%S", time.localtime())print(t ,e, "\n")def main(): t1 = threading.Thread(target=业务逻辑线程) t1.start() sniff(filter="udp port 53", prn=sniff_callback, store=0, count=0)if __name__ == '__main__': main()
总结:这个程序是一个典型的多线程网络嗅探示例,通过队列解耦抓包和处理逻辑,代码结构清晰,适合 Python 新手学习 Scapy 和多线程编程。