> 字数6900字,大约阅读时间为10min
信息收集是技术岗的高频基础工作——运维需要收集服务器配置信息,安全工程师需要收集目标资产信息,开发者需要收集接口/数据信息。手动收集不仅耗时耗力,还容易出错,而用 Python 实现自动化信息收集,能将原本几小时的工作压缩到几分钟,效率直接提升 10 倍。
本文将深入浅出分享高频实战案例,覆盖「服务器信息收集、域名资产探测、日志关键信息提取、批量接口数据爬取」,所有案例均提供完整可运行代码,零基础也能直接复用,让你快速掌握 Python 自动化信息收集的核心逻辑。
一、核心逻辑:Python 自动化信息收集为什么高效?
Python 能成为自动化信息收集的首选语言,核心在于三点:
- 丰富的原生库+第三方库无需重复造轮子,
os/socket 等原生库处理系统/网络信息,requests/BeautifulSoup 等第三方库处理网络请求/数据解析; - 语法简洁
- 跨平台+可复用一套代码能在 Windows/Linux/macOS 运行,编写一次可重复使用,彻底摆脱重复劳动。
自动化的核心思路很简单:把手动操作的步骤(打开文件→复制内容→整理格式→输出结果),用 Python 代码替代,让计算机自动完成循环、筛选、汇总等机械工作。
二、实战案例 1:服务器基础信息自动收集(运维必备)
应用场景
作为运维工程师,需要定期收集服务器的核心信息(CPU 核心数、内存使用率、磁盘空间、开放端口),用于监控服务器状态或排查故障。手动执行 top/free/df 等命令,再复制结果整理成表格,耗时且易出错。
自动化思路
用 Python 调用系统命令(或通过 psutil 库),自动获取服务器硬件+网络信息,整理成结构化数据(字典/表格),直接输出或保存为文件。
完整代码(Linux 系统通用)
import psutil
import socket
import platform
from datetime import datetime
defcollect_server_info() -> dict:
"""收集服务器核心信息,返回结构化字典"""
server_info = {}
# 1. 基本信息(主机名、系统、当前时间)
server_info["主机名"] = socket.gethostname()
server_info["IP地址"] = socket.gethostbyname(socket.gethostname())
server_info["系统版本"] = platform.platform()
server_info["收集时间"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 2. CPU 信息
server_info["CPU核心数"] = psutil.cpu_count(logical=True) # 逻辑核心数
server_info["CPU使用率"] = f"{psutil.cpu_percent(interval=1)}%"# 1秒内平均使用率
# 3. 内存信息(单位:GB)
mem = psutil.virtual_memory()
server_info["总内存"] = f"{mem.total / 1024 / 1024 / 1024:.2f}GB"
server_info["已用内存"] = f"{mem.used / 1024 / 1024 / 1024:.2f}GB"
server_info["内存使用率"] = f"{mem.percent}%"
# 4. 磁盘信息(只统计根目录 /)
disk = psutil.disk_usage("/")
server_info["磁盘总空间"] = f"{disk.total / 1024 / 1024 / 1024:.2f}GB"
server_info["磁盘已用空间"] = f"{disk.used / 1024 / 1024 / 1024:.2f}GB"
server_info["磁盘使用率"] = f"{disk.percent}%"
# 5. 常用开放端口(扫描 22/80/443/3306/6379 等高频端口)
common_ports = [22, 80, 443, 3306, 6379, 8080, 9090]
open_ports = []
for port in common_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
try:
sock.connect(("127.0.0.1", port))
open_ports.append(str(port))
except:
pass
finally:
sock.close()
server_info["开放常用端口"] = ",".join(open_ports) if open_ports else"无"
return server_info
defsave_info_to_file(info: dict, filename: str = "server_info.txt"):
"""将服务器信息保存到文件"""
withopen(filename, "w", encoding="utf-8") as f:
for key, value in info.items():
f.write(f"{key}: {value}\n")
print(f"✅服务器信息已保存到{filename}")
if __name__ == "__main__":
print("===== 开始收集服务器信息 =====")
info = collect_server_info()
# 打印结果
for key, value in info.items():
print(f"{key:12s}:{value}")
# 保存到文件
save_info_to_file(info)
核心依赖与使用说明
- 安装依赖:
pip install psutil(跨平台系统监控库,比直接调用系统命令更稳定); - 运行方式:Linux 服务器上直接执行
python3 server_collect.py,无需修改代码; - 输出效果:控制台打印结构化信息,同时生成
server_info.txt 文件,包含所有核心数据,可直接用于汇报或监控。
效率对比
- 手动操作:执行 5+ 命令,复制结果,整理格式,耗时 10~15 分钟;
- 自动化操作:运行代码,等待 2 秒,直接获取结构化结果,耗时 < 1 分钟。
三、实战案例 2:域名资产自动探测(安全/运维必备)
应用场景
网络安全工程师或运维需要探测某个域名的相关资产(子域名、IP 地址、开放端口、网站标题),比如探测 example.com 的子域名 www.example.com/api.example.com 等,手动探测需要逐个测试,效率极低。
自动化思路
- 准备常见子域名字典(如
www/api/admin/test);
完整代码
import socket
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
# 常见子域名字典(可根据需求扩展)
SUB_DOMAINS = [
"www", "api", "admin", "test", "dev", "blog", "ftp", "mail",
"app", "web", "portal", "static", "file", "ops", "db"
]
defget_website_title(url: str) -> str:
"""获取网站标题,处理超时和异常"""
try:
response = requests.get(url, timeout=3, verify=False)
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string.strip() if soup.title else"无标题"
return title
except:
return"无法访问"
defdetect_subdomain(domain: str, sub: str) -> dict:
"""探测单个子域名的信息"""
sub_domain = f"{sub}.{domain}"
result = {"子域名": sub_domain, "IP地址": "解析失败", "80端口": "关闭", "443端口": "关闭", "网站标题": "无"}
# 1. 解析IP地址
try:
ip = socket.gethostbyname(sub_domain)
result["IP地址"] = ip
except:
return result
# 2. 测试80端口(HTTP)
sock80 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock80.settimeout(1)
try:
sock80.connect((ip, 80))
result["80端口"] = "开放"
result["网站标题"] = get_website_title(f"http://{sub_domain}")
except:
pass
finally:
sock80.close()
# 3. 测试443端口(HTTPS)
if result["80端口"] == "关闭":
sock443 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock443.settimeout(1)
try:
sock443.connect((ip, 443))
result["443端口"] = "开放"
result["网站标题"] = get_website_title(f"https://{sub_domain}")
except:
pass
finally:
sock443.close()
return result
defbatch_detect_domain(domain: str, thread_num: int = 10):
"""批量探测子域名资产,多线程提升效率"""
print(f"===== 开始探测域名资产:{domain} =====")
print(f"子域名字典数量:{len(SUB_DOMAINS)},线程数:{thread_num}\n")
results = []
# 多线程批量探测
with ThreadPoolExecutor(max_workers=thread_num) as executor:
futures = [executor.submit(detect_subdomain, domain, sub) for sub in SUB_DOMAINS]
for future in futures:
res = future.result()
if res["IP地址"] != "解析失败": # 只保留能解析的子域名
results.append(res)
# 打印结果(格式化输出)
print(f"{'子域名':20s}{'IP地址':15s}{'80端口':6s}{'443端口':6s}{'网站标题':20s}")
print("-" * 70)
for res in results:
print(f"{res['子域名']:20s}{res['IP地址']:15s}{res['80端口']:6s}{res['443端口']:6s}{res['网站标题']:20s}")
if __name__ == "__main__":
# 替换为要探测的域名(如 "example.com")
target_domain = "example.com"
batch_detect_domain(target_domain)
核心说明
- 依赖安装:
pip install requests beautifulsoup4; - 核心优化:使用
ThreadPoolExecutor 多线程探测,10 个线程同时运行,探测 20 个子域名仅需 3~5 秒; - 扩展建议:可扩展子域名字典(添加更多常见子域名),或增加端口扫描范围,提升探测全面性。
实际价值
快速摸清某个域名的资产分布,比如发现隐藏的 admin.example.com 管理后台、api.example.com 接口服务,为安全测试或运维管理提供基础数据。
四、实战案例 3:日志文件关键信息自动提取(开发/运维必备)
应用场景
开发或运维需要从海量日志文件中提取关键信息,比如从 Nginx 访问日志中提取 404/500 错误请求、高频访问 IP;从应用日志中提取异常堆栈信息。手动查找不仅耗时,还容易遗漏。
自动化思路
- 用正则表达式匹配关键信息(错误码、IP、异常关键词);
- 统计分析结果(如 IP 访问次数排序、错误码分布);
完整代码(Nginx 日志错误提取示例)
import re
from collections import Counter
defextract_nginx_errors(log_file: str, output_file: str = "nginx_errors.txt"):
"""从Nginx访问日志中提取404/500错误请求和高频IP"""
# Nginx日志格式(需根据实际日志格式调整正则)
# 示例日志格式:192.168.1.1 - - [20/May/2024:10:00:00 +0800] "GET /index.html HTTP/1.1" 404 150
log_pattern = r'(\d+\.\d+\.\d+\.\d+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+)'
# 存储结果
error_404 = [] # 404错误请求
error_500 = [] # 500错误请求
all_ips = [] # 所有访问IP
print(f"===== 开始提取Nginx日志错误信息:{log_file} =====")
# 分块读取日志文件(支持超大文件)
withopen(log_file, "r", encoding="utf-8") as f:
for line_num, line inenumerate(f, 1):
match = re.search(log_pattern, line)
ifnotmatch:
continue
ip = match.group(1)
time = match.group(2)
request = match.group(3)
status = match.group(4)
size = match.group(5)
all_ips.append(ip)
# 筛选404和500错误
if status == "404":
error_404.append({
"行号": line_num,
"IP": ip,
"时间": time,
"请求": request,
"状态码": status
})
elif status == "500":
error_500.append({
"行号": line_num,
"IP": ip,
"时间": time,
"请求": request,
"状态码": status
})
# 统计高频IP(前10)
top_ips = Counter(all_ips).most_common(10)
# 保存结果到文件
withopen(output_file, "w", encoding="utf-8") as f:
f.write("=" * 50 + "\n")
f.write("Nginx日志关键信息提取结果\n")
f.write("=" * 50 + "\n\n")
f.write(f"1. 404错误请求共{len(error_404)}条:\n")
for err in error_404:
f.write(f" 行号{err['行号']} | IP:{err['IP']} | 时间:{err['时间']} | 请求:{err['请求']}\n")
f.write(f"\n2. 500错误请求共{len(error_500)}条:\n")
for err in error_500:
f.write(f" 行号{err['行号']} | IP:{err['IP']} | 时间:{err['时间']} | 请求:{err['请求']}\n")
f.write(f"\n3. 高频访问IP(前10):\n")
for ip, count in top_ips:
f.write(f" {ip}: {count}次\n")
print(f"✅提取完成!共发现{len(error_404)}条404错误,{len(error_500)}条500错误")
print(f"结果已保存到{output_file}")
if __name__ == "__main__":
# 替换为你的Nginx日志文件路径
nginx_log_path = "/var/log/nginx/access.log"
extract_nginx_errors(nginx_log_path)
核心说明
- 正则适配:需根据实际 Nginx 日志格式调整
log_pattern 正则表达式,否则无法匹配; - 超大文件支持:采用逐行读取方式,不会加载整个文件到内存,支持几十 MB 甚至 GB 级日志;
- 扩展场景:可修改正则和关键词,用于提取应用日志中的
Error/Exception 异常信息,或数据库日志中的慢查询。
效率对比
- 手动操作:打开超大日志文件,搜索关键词,统计结果,耗时 30+ 分钟;
- 自动化操作:运行代码,等待 1~2 分钟,直接获取结构化错误信息和统计结果。
五、实战案例 4:批量接口数据自动爬取(开发者必备)
应用场景
开发者需要获取第三方平台的批量接口数据(如天气、股票、新闻),或公司内部系统的多页数据(如分页查询用户列表、订单记录)。手动通过 Postman 或浏览器逐个调用接口、复制 JSON 结果、整理成表格,不仅耗时(几百页数据需几小时),还容易因重复操作出错,数据汇总也麻烦。
自动化思路
- 分析接口规律:找到分页接口的参数变化逻辑(如
page=1/page=2/page=3,或 offset=0/offset=20 分页偏移); - 批量请求接口:用
requests 库循环发送 HTTP 请求,自动替换分页参数; - 解析结构化数据:接口返回 JSON 格式数据,用 Python 内置的
json 库解析,提取核心字段; - 数据持久化:将所有页面的结果汇总,保存为 CSV/Excel 文件,便于后续分析或导入数据库。
完整代码(以「批量爬取股票列表数据」为例)
以下以某公开股票接口为例(接口为公开测试接口,无访问限制),实现多页股票数据的自动爬取、解析和保存,代码可直接运行。
import requests
import json
import csv
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# 公开股票列表接口(分页参数:page,每页20条数据)
STOCK_API = "https://api.example.com/stock/list"# 替换为真实公开接口(示例接口仅作演示)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json"
}
deffetch_stock_page(page: int) -> list:
"""爬取单页股票数据"""
params = {
"page": page,
"size": 20, # 每页20条数据
"sort": "market_cap", # 按市值排序
"order": "desc"
}
try:
print(f"正在爬取第{page}页股票数据...")
response = requests.get(
url=STOCK_API,
params=params,
headers=HEADERS,
timeout=10,
verify=False
)
response.raise_for_status() # 抛出HTTP错误(如404/500)
# 解析JSON数据
data = response.json()
if data.get("code") != 200:
print(f"第{page}页爬取失败:{data.get('msg', '未知错误')}")
return []
# 提取核心字段(根据接口返回格式调整)
stock_list = data.get("data", {}).get("list", [])
result = []
for stock in stock_list:
stock_info = {
"股票代码": stock.get("code", ""),
"股票名称": stock.get("name", ""),
"最新价格": stock.get("price", 0),
"涨跌幅": stock.get("change_rate", 0),
"市值(亿)": stock.get("market_cap", 0),
"行业": stock.get("industry", ""),
"爬取时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
result.append(stock_info)
print(f"第{page}页爬取成功,获取{len(result)}条股票数据")
return result
except requests.exceptions.HTTPError as e:
print(f"第{page}页HTTP错误:{e.response.status_code}")
return []
except requests.exceptions.Timeout:
print(f"第{page}页请求超时")
return []
except Exception as e:
print(f"第{page}页爬取异常:{str(e)}")
return []
defbatch_fetch_stocks(total_pages: int, thread_num: int = 5) -> list:
"""多线程批量爬取多页股票数据"""
print(f"===== 开始批量爬取股票数据 =====")
print(f"总页数:{total_pages},线程数:{thread_num}\n")
all_stocks = []
# 多线程爬取(线程数不宜过多,避免触发接口限流)
with ThreadPoolExecutor(max_workers=thread_num) as executor:
# 提交所有页面的爬取任务
futures = [executor.submit(fetch_stock_page, page) for page inrange(1, total_pages + 1)]
# 汇总所有结果
for future in futures:
page_data = future.result()
if page_data:
all_stocks.extend(page_data)
print(f"\n===== 爬取完成 =====")
print(f"共爬取{len(all_stocks)}条股票数据")
return all_stocks
defsave_to_csv(data: list, filename: str = "stock_data.csv"):
"""将爬取的数据保存为CSV文件(Excel可直接打开)"""
ifnot data:
print("无数据可保存")
return
# 获取CSV表头(字典的键)
headers = data[0].keys()
withopen(filename, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=headers)
writer.writeheader() # 写入表头
writer.writerows(data) # 写入数据
print(f"✅数据已保存到{filename},可用Excel直接打开")
if __name__ == "__main__":
# 配置爬取参数(爬取前10页数据)
TOTAL_PAGES = 10
# 批量爬取
stock_data = batch_fetch_stocks(TOTAL_PAGES)
# 保存为CSV文件
if stock_data:
save_to_csv(stock_data)
核心说明
1. 接口适配技巧
- 若实际接口需要
POST 请求(如携带 JSON 体),只需修改 requests.post() 并添加 json=payload 参数; - 若接口有鉴权(如
token),在 HEADERS 中添加 Authorization: Bearer your_token 即可; - 核心是先通过 Postman 测试接口,明确参数格式、返回字段后,再在代码中适配。
2. 多线程优化逻辑
- 接口爬取的核心耗时是「网络等待」,多线程能同时发起多个页面的请求,效率提升 3~5 倍;
- 线程数建议设为 5~10(根据接口限流规则调整),过多线程可能触发接口的反爬机制(IP 被临时封禁)。
3. 数据保存优势
- 保存为 CSV 文件兼容性最强,Excel、WPS、Python 的
pandas 库都能直接读取; - 若需要保存为 Excel,可安装
openpyxl 库,使用 pandas.DataFrame(data).to_excel("stock_data.xlsx", index=False) 实现。
扩展场景(直接复用代码)
只需修改 fetch_stock_page 中的「接口地址、参数、字段提取逻辑」,即可适配其他场景:
- 爬取公司内部接口:如分页查询订单数据(
page=1,order_status=1); - 爬取第三方公开数据:如天气接口(多城市批量查询)、新闻接口(多分类爬取);
- 爬取分页网页数据:若接口返回 HTML,可结合
BeautifulSoup 解析页面元素(如表格数据)。
效率对比
- 手动操作:10 页数据,每页 20 条,需手动调用 10 次接口、复制 200 条数据、整理成表格,耗时 30~60 分钟;
- 自动化操作:运行代码,等待 1~2 分钟,直接获取 CSV 格式的结构化数据,无需人工干预。
六、自动化信息收集的核心技巧与避坑指南
1. 通用技巧(提升效率+稳定性)
- 多线程/异步优先:网络请求、文件读取等 IO 密集型任务,优先用多线程(
ThreadPoolExecutor)或异步(aiohttp),效率提升数倍; - 异常捕获要全面:网络波动、接口超时、数据格式异常是常态,需捕获
HTTPError、Timeout、JSONDecodeError 等常见异常,避免程序崩溃; - 添加请求头/延时:模拟浏览器请求(添加
User-Agent),避免被接口识别为爬虫;高频请求时添加 time.sleep(0.5) 延时,防止触发限流; - 结构化数据输出:结果优先保存为 CSV/JSON/Excel,便于后续分析、导入数据库,比纯文本更实用。
2. 常见坑与解决方案
- 坑1:接口返回格式不一致
解决方案:提取字段时用 get() 方法(如 stock.get("name", "")),设置默认值,避免 KeyError; - 坑2:IP 被接口封禁
解决方案:控制线程数、添加请求延时、使用代理 IP 池(复杂场景); - 坑3:超大文件读取内存溢出
解决方案:逐行读取(日志提取案例)或分块读取,不加载整个文件到内存; - 坑4:中文乱码
解决方案:文件读写时指定 encoding="utf-8" 或 utf-8-sig,接口请求时用 response.encoding = response.apparent_encoding 自动识别编码。
3. 合规性提醒
- 自动化爬取/收集数据时,需遵守「robots 协议」和网站的用户协议,仅爬取公开数据或授权数据;
- 禁止对未授权的系统、接口进行扫描或爬取,避免触犯法律或企业规定。
七、总结:自动化让技术人摆脱重复劳动
Python 自动化信息收集的核心价值,不在于代码有多复杂,而在于「用机器替代机械劳动」——把技术人从重复的复制粘贴、命令执行、数据整理中解放出来,将时间投入到更有价值的分析、优化、创新工作中。
本文的实战案例,覆盖了运维、安全、开发的高频场景,代码可直接复用,零基础也能快速上手。其实自动化的门槛并不高,关键是「找到重复劳动的场景,用代码实现步骤替代」:
- 如果你需要重复执行命令 → 用 Python 调用系统命令;
- 如果你需要重复处理数据 → 用正则、JSON 解析批量处理;
从今天开始,试着用 Python 自动化解决一个你工作中的重复场景,你会发现:原来几小时的工作,真的能压缩到几分钟,效率提升 10 倍不止!