Linux 服务器巡检不再难,这个 Python 脚本全搞定
在企业级生产环境中,服务器的稳定性和可靠性至关重要。定期对服务器进行巡检,及时发现并解决潜在问题,是保障业务正常运行的关键。本文将介绍如何使用 Python3 编写一个功能完善的 Linux 服务器自动巡检脚本,帮助运维工程师实现自动化监控。
一、脚本功能设计
我们的巡检脚本将包含以下核心功能:
1. 检查项目
- 安全状态:检查登录失败次数、最近登录记录和关键文件权限
二、脚本实现
1. 环境准备
确保服务器已安装 Python 3.6+ 及以下依赖:
安装依赖包
对于 CentOS/RHEL/Rocky 系统:
## yum 或者 dnf 安装
yum install python-pip
pip install psutil requests -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
对于 Ubuntu/Debian 系统:
apt-get install python-pip
pip install psutil requests -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
使用清华镜像源可以提高安装速度,避免网络问题导致的安装失败。
三、脚本使用指南
1. 配置说明
- 进程检查:在脚本开头的
CHECK_PROCESSES 列表中添加需要监控的进程名称 - 端口检查:在脚本开头的
CHECK_PORTS 列表中添加需要监控的端口号 - 告警配置:在脚本开头的
ALERT_CONFIG 字典中配置 type:选择告警类型,支持 dingtalk(钉钉)或 wecom(企业微信)
2. 运行方式
# 直接运行
python server_monitor.py
# 添加到定时任务(每小时执行一次)
crontab -e
0 * * * * python /path/to/server_monitor.py
3. 告警设置
钉钉告警设置
- 打开钉钉群,点击「群设置」→「智能群助手」→「添加机器人」
- 选择「自定义」机器人,填写名称并获取 webhook 地址
- 将 webhook 地址填入脚本的
alert_config 中
企业微信告警设置
- 登录企业微信后台,进入「应用管理」→「自建」→「创建应用」
- 获取应用的
AgentId、CorpId 和 Secret - 使用企业微信机器人 webhook 或 API 发送消息
四、生优化建议
- 权限管理:脚本应使用非 root 用户运行,避免权限过高带来的安全风险
- 错误处理:增强脚本的错误处理能力,确保脚本稳定运行
- 监控频率:根据服务器重要性调整巡检频率,核心服务器可设置更频繁的检查
- 阈值调整:根据服务器配置和业务需求,调整各项指标的告警阈值
- 报告归档:定期归档巡检报告,便于历史数据查询和分析
完整脚本代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Linux 服务器自动巡检脚本
功能:检查 CPU、内存、磁盘、进程、网络端口状态,并生成报告和发送告警
"""
import os
import time
import psutil
import socket
import json
import requests
from datetime import datetime
# 配置参数
# 告警配置
ALERT_CONFIG = {
'type': 'dingtalk', # 可选: 'dingtalk' 或 'wecom'
'webhook': 'https://oapi.dingtalk.com/robot/send?access_token=your_token'
}
# 要检查的进程和端口
CHECK_PROCESSES = ['sshd', 'nginx', 'mysql', 'redis']
CHECK_PORTS = [22, 80, 443, 3306, 6379]
classServerMonitor:
def__init__(self):
self.hostname = socket.gethostname()
self.ip = self.get_local_ip()
self.results = {}
defget_local_ip(self):
"""获取本地 IP 地址"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return'127.0.0.1'
defcheck_cpu(self):
"""检查 CPU 使用率"""
cpu_percent = psutil.cpu_percent(interval=1)
status = "正常"if cpu_percent < 80else"警告"
self.results['cpu'] = {
'使用率': f"{cpu_percent}%",
'状态': status
}
return cpu_percent
defcheck_memory(self):
"""检查内存使用率"""
mem = psutil.virtual_memory()
mem_percent = mem.percent
status = "正常"if mem_percent < 80else"警告"
self.results['内存'] = {
'使用率': f"{mem_percent}%",
'总内存': f"{mem.total / 1024 / 1024 / 1024:.2f}GB",
'可用内存': f"{mem.available / 1024 / 1024 / 1024:.2f}GB",
'状态': status
}
return mem_percent
defcheck_disk(self):
"""检查磁盘使用率"""
disk_info = []
for partition in psutil.disk_partitions():
if partition.fstype:
try:
usage = psutil.disk_usage(partition.mountpoint)
usage_percent = usage.percent
status = "正常"if usage_percent < 85else"警告"
disk_info.append({
'挂载点': partition.mountpoint,
'文件系统': partition.fstype,
'使用率': f"{usage_percent}%",
'总空间': f"{usage.total / 1024 / 1024 / 1024:.2f}GB",
'可用空间': f"{usage.free / 1024 / 1024 / 1024:.2f}GB",
'状态': status
})
except:
pass
self.results['磁盘'] = disk_info
return disk_info
defcheck_processes(self, process_names):
"""检查进程状态"""
process_status = []
for name in process_names:
found = False
for proc in psutil.process_iter(['name']):
try:
if name.lower() in proc.info['name'].lower():
found = True
break
except:
pass
status = "运行中"if found else"未运行"
process_status.append({
'进程名': name,
'状态': status
})
self.results['进程'] = process_status
return process_status
defcheck_ports(self, ports):
"""检查网络端口"""
port_status = []
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((self.ip, port))
status = "开放"if result == 0else"关闭"
sock.close()
port_status.append({
'端口': port,
'状态': status
})
self.results['端口'] = port_status
return port_status
defcheck_uptime(self):
"""检查系统启动时间"""
uptime_seconds = time.time() - psutil.boot_time()
days = int(uptime_seconds // 86400)
hours = int((uptime_seconds % 86400) // 3600)
minutes = int((uptime_seconds % 3600) // 60)
uptime_str = f"{days}天{hours}小时{minutes}分钟"
boot_time = datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
self.results['系统启动时间'] = {
'运行时间': uptime_str,
'启动时间': boot_time
}
return uptime_str
defcheck_zombie_processes(self):
"""检查僵尸进程数量"""
zombie_count = 0
for proc in psutil.process_iter(['status']):
try:
if proc.info['status'] == psutil.STATUS_ZOMBIE:
zombie_count += 1
except:
pass
status = "正常"if zombie_count == 0else"警告"
self.results['僵尸进程'] = {
'数量': zombie_count,
'状态': status
}
return zombie_count
defcheck_network_status(self):
"""检查网络状态"""
network_info = []
net_io = psutil.net_io_counters()
bytes_sent = net_io.bytes_sent / 1024 / 1024# MB
bytes_recv = net_io.bytes_recv / 1024 / 1024# MB
# 获取网络接口信息
for interface, addrs in psutil.net_if_addrs().items():
if interface != 'lo': # 排除本地回环接口
interface_info = {
'接口': interface,
'发送流量': f"{bytes_sent:.2f} MB",
'接收流量': f"{bytes_recv:.2f} MB"
}
network_info.append(interface_info)
# 检查网络连接数
connections = psutil.net_connections()
connection_count = len(connections)
# 检查网络延迟
ping_result = self.ping_test('8.8.8.8')
self.results['网络状态'] = {
'接口信息': network_info,
'连接数': connection_count,
'网络延迟': f"{ping_result} ms",
'状态': "正常"if ping_result < 100else"警告"
}
return network_info
defping_test(self, host):
"""测试网络延迟"""
import subprocess
try:
output = subprocess.check_output(
['ping', '-c', '1', host],
stderr=subprocess.STDOUT,
universal_newlines=True
)
# 提取延迟信息
for line in output.split('\n'):
if'time='in line:
return float(line.split('time=')[1].split(' ')[0])
return9999
except:
return9999
defcheck_security(self):
"""检查安全状态"""
# 检查登录失败次数和失败的用户(从日志文件中获取)
failed_login_count, failed_users = self.get_failed_login_count()
# 检查异常登录(简单实现:检查最近登录记录)
recent_logins = self.get_recent_logins()
# 检查权限变更(简单实现:检查关键文件权限)
critical_files = ['/etc/shadow', '/etc/sudoers'] # 移除 /etc/passwd
permission_issues = []
for file_path in critical_files:
if os.path.exists(file_path):
try:
stat = os.stat(file_path)
# 检查权限是否过于宽松
if stat.st_mode & 0o007 != 0: # 其他用户有写权限
permission_issues.append(f"{file_path} 权限过于宽松")
except:
pass
status = "正常"if failed_login_count < 5and len(permission_issues) == 0else"警告"
self.results['安全状态'] = {
'登录失败次数': failed_login_count,
'登录失败的用户': failed_users,
'最近登录': recent_logins,
'权限问题': permission_issues,
'状态': status
}
return status
defget_failed_login_count(self):
"""获取登录失败次数和失败的用户"""
import subprocess
try:
# 检查 /var/log/auth.log 或 /var/log/secure
log_files = ['/var/log/auth.log', '/var/log/secure']
failed_count = 0
failed_users = []
for log_file in log_files:
if os.path.exists(log_file):
try:
output = subprocess.check_output(
['grep', 'Failed password', log_file],
stderr=subprocess.STDOUT,
universal_newlines=True
)
lines = output.split('\n')
failed_count += len(lines) - 1
# 提取失败的用户名
for line in lines:
if'for'in line and'from'in line:
# 解析用户名
parts = line.split()
if'for'in parts:
user_index = parts.index('for') + 1
if user_index < len(parts):
# 处理 "invalid user" 情况
if parts[user_index] == 'invalid'and user_index + 1 < len(parts) and parts[user_index + 1] == 'user':
# 格式:Failed password for invalid user test from...
if user_index + 2 < len(parts):
user = parts[user_index + 2]
if user notin failed_users:
failed_users.append(user)
else:
# 格式:Failed password for test from...
user = parts[user_index]
if user notin failed_users:
failed_users.append(user)
except:
pass
return failed_count, failed_users
except:
return0, []
defget_recent_logins(self):
"""获取最近登录记录"""
import subprocess
try:
output = subprocess.check_output(
['last', '-n', '5'],
stderr=subprocess.STDOUT,
universal_newlines=True
)
return output.strip()
except:
return"无法获取登录记录"
defgenerate_report(self):
"""生成巡检报告"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report = f"""===========================================
服务器巡检报告
===========================================
主机名: {self.hostname}
IP地址: {self.ip}
巡检时间: {timestamp}
【CPU 状态】
使用率: {self.results['cpu']['使用率']}
状态: {self.results['cpu']['状态']}
【内存状态】
使用率: {self.results['内存']['使用率']}
总内存: {self.results['内存']['总内存']}
可用内存: {self.results['内存']['可用内存']}
状态: {self.results['内存']['状态']}
【磁盘状态】
"""
for disk in self.results['磁盘']:
report += f"挂载点: {disk['挂载点']}\n"
report += f"文件系统: {disk['文件系统']}\n"
report += f"使用率: {disk['使用率']}\n"
report += f"总空间: {disk['总空间']}\n"
report += f"可用空间: {disk['可用空间']}\n"
report += f"状态: {disk['状态']}\n\n"
report += "【进程状态】\n"
for process in self.results['进程']:
report += f"进程名: {process['进程名']}\n"
report += f"状态: {process['状态']}\n"
report += "\n【端口状态】\n"
for port in self.results['端口']:
report += f"端口: {port['端口']}\n"
report += f"状态: {port['状态']}\n"
report += "\n【系统启动时间】\n"
report += f"运行时间: {self.results['系统启动时间']['运行时间']}\n"
report += f"启动时间: {self.results['系统启动时间']['启动时间']}\n"
report += "\n【僵尸进程】\n"
report += f"数量: {self.results['僵尸进程']['数量']}\n"
report += f"状态: {self.results['僵尸进程']['状态']}\n"
report += "\n【网络状态】\n"
report += f"连接数: {self.results['网络状态']['连接数']}\n"
report += f"网络延迟: {self.results['网络状态']['网络延迟']}\n"
report += f"状态: {self.results['网络状态']['状态']}\n"
report += "接口信息:\n"
for interface in self.results['网络状态']['接口信息']:
report += f" - {interface['接口']}: 发送 {interface['发送流量']}, 接收 {interface['接收流量']}\n"
report += "\n【安全状态】\n"
report += f"登录失败次数: {self.results['安全状态']['登录失败次数']}\n"
if self.results['安全状态']['登录失败的用户']:
report += f"登录失败的用户: {', '.join(self.results['安全状态']['登录失败的用户'])}\n"
report += f"状态: {self.results['安全状态']['状态']}\n"
if self.results['安全状态']['权限问题']:
report += "权限问题:\n"
for issue in self.results['安全状态']['权限问题']:
report += f" - {issue}\n"
report += f"最近登录:\n{self.results['安全状态']['最近登录']}\n"
report += "==========================================="
# 保存报告到文件
report_file = f"server_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w') as f:
f.write(report)
return report, report_file
defsend_dingtalk_alert(self, webhook, report):
"""发送钉钉告警"""
try:
message = {
"msgtype": "text",
"text": {
"content": f"服务器巡检异常\n{report}"
}
}
response = requests.post(webhook, json=message)
return response.status_code == 200
except:
returnFalse
defsend_wecom_alert(self, webhook, report):
"""发送企业微信告警"""
try:
message = {
"msgtype": "text",
"text": {
"content": f"服务器巡检异常\n{report}"
}
}
response = requests.post(webhook, json=message)
return response.status_code == 200
except:
returnFalse
defrun(self, process_names=None, ports=None, alert_config=None):
"""执行巡检"""
if process_names isNone:
process_names = ['sshd', 'nginx', 'mysql']
if ports isNone:
ports = [22, 80, 443]
# 执行各项检查
self.check_cpu()
self.check_memory()
self.check_disk()
self.check_processes(process_names)
self.check_ports(ports)
self.check_uptime()
self.check_zombie_processes()
self.check_network_status()
self.check_security()
# 生成报告
report, report_file = self.generate_report()
print(report)
print(f"报告已保存至: {report_file}")
# 检查是否有异常
has_alert = False
alert_message = ""
if self.results['cpu']['状态'] == "警告":
has_alert = True
alert_message += f"CPU 使用率异常: {self.results['cpu']['使用率']}\n"
if self.results['内存']['状态'] == "警告":
has_alert = True
alert_message += f"内存使用率异常: {self.results['内存']['使用率']}\n"
for disk in self.results['磁盘']:
if disk['状态'] == "警告":
has_alert = True
alert_message += f"磁盘 {disk['挂载点']} 使用率异常: {disk['使用率']}\n"
for process in self.results['进程']:
if process['状态'] == "未运行":
has_alert = True
alert_message += f"进程 {process['进程名']} 未运行\n"
for port in self.results['端口']:
if port['状态'] == "关闭":
has_alert = True
alert_message += f"端口 {port['端口']} 未开放\n"
if self.results['僵尸进程']['状态'] == "警告":
has_alert = True
alert_message += f"僵尸进程数量异常: {self.results['僵尸进程']['数量']}\n"
if self.results['网络状态']['状态'] == "警告":
has_alert = True
alert_message += f"网络状态异常: 延迟 {self.results['网络状态']['网络延迟']}\n"
if self.results['安全状态']['状态'] == "警告":
has_alert = True
alert_message += f"安全状态异常: 登录失败 {self.results['安全状态']['登录失败次数']} 次\n"
if self.results['安全状态']['登录失败的用户']:
alert_message += f" - 登录失败的用户: {', '.join(self.results['安全状态']['登录失败的用户'])}\n"
if self.results['安全状态']['权限问题']:
for issue in self.results['安全状态']['权限问题']:
alert_message += f" - {issue}\n"
# 发送告警
if has_alert and alert_config:
if alert_config.get('type') == 'dingtalk'and alert_config.get('webhook'):
success = self.send_dingtalk_alert(alert_config['webhook'], alert_message)
print(f"钉钉告警发送{'成功'if success else'失败'}")
elif alert_config.get('type') == 'wecom'and alert_config.get('webhook'):
success = self.send_wecom_alert(alert_config['webhook'], alert_message)
print(f"企业微信告警发送{'成功'if success else'失败'}")
return has_alert
if __name__ == '__main__':
# 执行巡检
monitor = ServerMonitor()
monitor.run(
process_names=CHECK_PROCESSES,
ports=CHECK_PORTS,
alert_config=ALERT_CONFIG
)
五、最后
通过本文介绍的 Python 脚本,基本可以实现 Linux 服务器的自动化巡检,及时发现并解决潜在问题。脚本不仅包含了基础的系统指标检查,还提供了自动生成报告和发送告警的功能,大大提高了运维效率。