#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Linux 系统一键巡检脚本
支持所有主流Linux发行版
"""
import os
import sys
import platform
import socket
import datetime
import subprocess
import json
import re
from typing import Dict, List, Any, Optional
defcheck_os() -> bool:
"""检查是否为Linux系统"""
return os.name == 'posix'and platform.system() == 'Linux'
defrun_command(cmd: str, capture_output: bool = True, shell: bool = True) -> Dict[str, Any]:
"""执行系统命令"""
try:
result = subprocess.run(
cmd,
capture_output=capture_output,
text=True,
shell=shell,
timeout=30
)
return {
'success': result.returncode == 0,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
defget_uptime() -> Dict[str, Any]:
"""获取系统运行时间"""
info = {}
try:
result = run_command('uptime')
if result['success']:
info['uptime_output'] = result['stdout'].strip()
result = run_command('cat /proc/uptime')
if result['success']:
uptime_seconds = float(result['stdout'].split()[0])
info['uptime_seconds'] = uptime_seconds
info['uptime_days'] = int(uptime_seconds // (24 * 3600))
info['uptime_hours'] = int((uptime_seconds % (24 * 3600)) // 3600)
info['uptime_minutes'] = int((uptime_seconds % 3600) // 60)
except Exception as e:
info['error'] = str(e)
return info
defget_system_info() -> Dict[str, Any]:
"""获取系统基本信息"""
info = {}
info['os_name'] = platform.system()
info['os_version'] = platform.version()
info['os_release'] = platform.release()
info['hostname'] = socket.gethostname()
try:
# 获取IP地址
result = run_command('ip -o -4 addr show | awk "{print $2, $4}"')
if result['success']:
info['ip_addresses'] = result['stdout'].strip().split('\n')
# 获取操作系统详细信息
if os.path.exists('/etc/os-release'):
with open('/etc/os-release', 'r') as f:
for line in f:
if'='in line:
key, value = line.strip().split('=', 1)
info[key.lower()] = value.strip('"')
# 获取内核版本
result = run_command('uname -r')
if result['success']:
info['kernel_version'] = result['stdout'].strip()
# 获取当前登录用户
result = run_command('who')
if result['success']:
info['current_users'] = result['stdout'].strip()
# 获取硬件信息
hardware = {}
# CPU信息
result = run_command('lscpu')
if result['success']:
hardware['cpu_info'] = result['stdout'].strip()
# 内存信息
result = run_command('free -h')
if result['success']:
hardware['memory_info'] = result['stdout'].strip()
# 磁盘信息
result = run_command('lsblk -f')
if result['success']:
hardware['disk_info'] = result['stdout'].strip()
# 网卡信息
result = run_command('lspci | grep -i network')
if result['success']:
hardware['network_cards'] = result['stdout'].strip()
info['hardware'] = hardware
except Exception as e:
info['error'] = str(e)
return info
defget_system_performance() -> Dict[str, Any]:
"""获取系统负载与性能"""
info = {}
try:
# CPU使用率
result = run_command('top -bn1 | grep "%Cpu(s)"')
if result['success']:
info['cpu_usage'] = result['stdout'].strip()
# 负载平均值
result = run_command('uptime')
if result['success']:
load_match = re.search(r'load average:\s*(\d+\.\d+),\s*(\d+\.\d+),\s*(\d+\.\d+)', result['stdout'])
if load_match:
info['load_average'] = {
'1min': load_match.group(1),
'5min': load_match.group(2),
'15min': load_match.group(3)
}
# 内存使用情况
result = run_command('free -h')
if result['success']:
info['memory_usage'] = result['stdout'].strip()
# 磁盘I/O
result = run_command('iostat -x')
if result['success']:
info['disk_io'] = result['stdout'].strip()
# 网络流量
result = run_command('ifstat 1 1')
if result['success']:
info['network_traffic'] = result['stdout'].strip()
# 进程数量
result = run_command('ps aux | wc -l')
if result['success']:
info['process_count'] = int(result['stdout'].strip())
# 僵尸进程
result = run_command('ps aux | grep Z | grep -v grep | wc -l')
if result['success']:
info['zombie_processes'] = int(result['stdout'].strip())
# 系统中断和上下文切换
result = run_command('vmstat 1 1')
if result['success']:
info['system_stats'] = result['stdout'].strip()
except Exception as e:
info['error'] = str(e)
return info
defget_disk_filesystem() -> Dict[str, Any]:
"""获取磁盘与文件系统信息"""
info = {}
try:
# 分区使用率
result = run_command('df -h')
if result['success']:
info['disk_usage'] = result['stdout'].strip()
# inode使用率
result = run_command('df -i')
if result['success']:
info['inode_usage'] = result['stdout'].strip()
# 挂载点信息
result = run_command('mount')
if result['success']:
info['mount_points'] = result['stdout'].strip()
# 磁盘健康状态(需要smartctl)
result = run_command('which smartctl')
if result['success']:
result = run_command('smartctl --scan')
if result['success']:
disks = result['stdout'].strip().split('\n')
smart_info = []
for disk in disks:
if disk:
device = disk.split()[0]
smart_result = run_command(f'smartctl -H {device} 2>/dev/null')
if smart_result['success']:
smart_info.append(f"{device}: {smart_result['stdout'].strip()}")
info['smart_status'] = smart_info
except Exception as e:
info['error'] = str(e)
return info
defget_network_info() -> Dict[str, Any]:
"""获取网络信息"""
info = {}
try:
# 网卡状态
result = run_command('ip link show')
if result['success']:
info['network_interfaces'] = result['stdout'].strip()
# 网络配置
result = run_command('ip addr show')
if result['success']:
info['network_config'] = result['stdout'].strip()
# 网络连通性
connectivity = {}
# Ping网关
result = run_command('ip route | grep default | awk "{print $3}"')
if result['success'] and result['stdout'].strip():
gateway = result['stdout'].strip()
ping_result = run_command(f'ping -c 2 {gateway}')
connectivity['gateway'] = f"{gateway}: {'UP'if ping_result['success'] else'DOWN'}"
# Ping外网
ping_result = run_command('ping -c 2 8.8.8.8')
connectivity['internet'] = f"8.8.8.8: {'UP'if ping_result['success'] else'DOWN'}"
info['connectivity'] = connectivity
# 端口监听
result = run_command('ss -tulnp')
if result['success']:
info['listening_ports'] = result['stdout'].strip()
# 防火墙规则
firewall_info = {}
if run_command('which firewalld').get('success'):
result = run_command('firewall-cmd --list-all')
firewall_info['firewalld'] = result['stdout'].strip()
elif run_command('which iptables').get('success'):
result = run_command('iptables -L')
firewall_info['iptables'] = result['stdout'].strip()
info['firewall'] = firewall_info
# 连接数
result = run_command('ss -s')
if result['success']:
info['connection_stats'] = result['stdout'].strip()
# 主机名解析
result = run_command('nslookup localhost')
if result['success']:
info['dns_resolution'] = result['stdout'].strip()
except Exception as e:
info['error'] = str(e)
return info
defget_services_processes() -> Dict[str, Any]:
"""获取服务与进程信息"""
info = {}
try:
# 关键服务状态
services = ['sshd', 'crond', 'rsyslog', 'ntp', 'nginx', 'mysql', 'httpd', 'tomcat']
service_status = {}
for service in services:
if run_command(f'which {service}').get('success') or run_command(f'systemctl status {service} 2>/dev/null').get('success'):
result = run_command(f'systemctl status {service} 2>/dev/null')
if result['success']:
status = 'running'if'active (running)'in result['stdout'] else'stopped'
service_status[service] = status
info['service_status'] = service_status
# 服务开机自启
result = run_command('systemctl list-unit-files --type=service | grep enabled')
if result['success']:
info['enabled_services'] = result['stdout'].strip()
# 进程CPU/内存占用TOP10
result = run_command('ps aux --sort=-%cpu | head -11')
if result['success']:
info['top_cpu_processes'] = result['stdout'].strip()
result = run_command('ps aux --sort=-%mem | head -11')
if result['success']:
info['top_memory_processes'] = result['stdout'].strip()
# 定时任务
result = run_command('crontab -l 2>/dev/null || echo "No crontab for current user"')
if result['success']:
info['crontab'] = result['stdout'].strip()
# 系统定时任务
if os.path.exists('/etc/crontab'):
with open('/etc/crontab', 'r') as f:
info['system_crontab'] = f.read().strip()
except Exception as e:
info['error'] = str(e)
return info
defget_logs() -> Dict[str, Any]:
"""获取日志信息"""
info = {}
try:
# 系统日志
log_files = [
'/var/log/messages',
'/var/log/syslog',
'/var/log/secure',
'/var/log/auth.log'
]
for log_file in log_files:
if os.path.exists(log_file):
# 检查关键字
result = run_command(f'grep -i "error\|fail\|warning\|panic\|oops" {log_file} | tail -20')
if result['success']:
info[os.path.basename(log_file)] = result['stdout'].strip()
# dmesg
result = run_command('dmesg | tail -30')
if result['success']:
info['dmesg'] = result['stdout'].strip()
except Exception as e:
info['error'] = str(e)
return info
defget_security() -> Dict[str, Any]:
"""获取安全与账号信息"""
info = {}
try:
# 检查root登录限制
result = run_command('grep "PermitRootLogin" /etc/ssh/sshd_config 2>/dev/null')
if result['success']:
info['root_login_restriction'] = result['stdout'].strip()
# 检查空口令
result = run_command('awk -F: ($2 == "") {print $1} /etc/shadow 2>/dev/null')
if result['success']:
info['empty_passwords'] = result['stdout'].strip()
# 检查sudo权限
result = run_command('grep -v "^#" /etc/sudoers 2>/dev/null | grep -v "^$"')
if result['success']:
info['sudo_permissions'] = result['stdout'].strip()
# 历史命令(最近50条)
result = run_command('history | tail -50')
if result['success']:
info['recent_commands'] = result['stdout'].strip()
# SELinux状态
result = run_command('sestatus 2>/dev/null || echo "SELinux not installed"')
if result['success']:
info['selinux_status'] = result['stdout'].strip()
except Exception as e:
info['error'] = str(e)
return info
defget_backup() -> Dict[str, Any]:
"""获取备份与一致性信息"""
info = {}
try:
# 检查备份文件
backup_dirs = ['/backup', '/var/backup', '/home/backup']
backup_files = []
for backup_dir in backup_dirs:
if os.path.exists(backup_dir):
result = run_command(f'ls -la {backup_dir}')
if result['success']:
backup_files.append(f"{backup_dir}:\n{result['stdout'].strip()}")
info['backup_files'] = backup_files
# 检查关键配置文件
config_files = [
'/etc/ssh/sshd_config',
'/etc/sysctl.conf',
'/etc/fstab'
]
config_checks = {}
for config_file in config_files:
if os.path.exists(config_file):
result = run_command(f'md5sum {config_file}')
if result['success']:
config_checks[config_file] = result['stdout'].strip()
info['config_files'] = config_checks
except Exception as e:
info['error'] = str(e)
return info
defget_time_sync() -> Dict[str, Any]:
"""获取时间同步信息"""
info = {}
try:
# 系统时间
result = run_command('date')
if result['success']:
info['system_time'] = result['stdout'].strip()
# NTP服务状态
if run_command('which ntpd').get('success'):
result = run_command('systemctl status ntpd 2>/dev/null || service ntp status')
info['ntp_status'] = result['stdout'].strip()
elif run_command('which chronyd').get('success'):
result = run_command('systemctl status chronyd 2>/dev/null')
info['ntp_status'] = result['stdout'].strip()
# 时间偏差
if run_command('which ntpdate').get('success'):
result = run_command('ntpdate -q pool.ntp.org 2>/dev/null')
info['time_offset'] = result['stdout'].strip()
except Exception as e:
info['error'] = str(e)
return info
defprint_section(title: str, data: Dict[str, Any]):
"""打印章节内容"""
print(f"\n{'=' * 70}")
print(f" {title}")
print('=' * 70)
defprint_dict(d: Dict, indent: int = 0):
for key, value in d.items():
prefix = ' ' * indent
if isinstance(value, dict):
print(f"{prefix}{key}:")
print_dict(value, indent + 2)
elif isinstance(value, list):
print(f"{prefix}{key}: {len(value)} 项")
for i, item in enumerate(value[:5]):
if isinstance(item, dict):
print(f"{prefix} [{i+1}]")
print_dict(item, indent + 4)
else:
print(f"{prefix} [{i+1}] {item}")
if len(value) > 5:
print(f"{prefix} ... 等 {len(value) - 5} 项")
elif isinstance(value, str) and len(value) > 500:
print(f"{prefix}{key}:")
lines = value.split('\n')
for line in lines[:10]:
print(f"{prefix}{line}")
if len(lines) > 10:
print(f"{prefix} ... 等 {len(lines) - 10} 行")
else:
if isinstance(value, str) and'\n'in value:
print(f"{prefix}{key}:")
for line in value.split('\n')[:5]:
print(f"{prefix}{line}")
if value.count('\n') > 4:
print(f"{prefix} ...")
else:
print(f"{prefix}{key}: {value}")
print_dict(data)
defwrite_dict_to_file(f, d: Dict, indent: int = 0):
"""将字典写入文件"""
for key, value in d.items():
prefix = ' ' * indent
if isinstance(value, dict):
f.write(f"{prefix}{key}:\n")
write_dict_to_file(f, value, indent + 2)
elif isinstance(value, list):
f.write(f"{prefix}{key}: {len(value)} 项\n")
for i, item in enumerate(value):
if isinstance(item, dict):
f.write(f"{prefix} [{i+1}]\n")
write_dict_to_file(f, item, indent + 4)
else:
f.write(f"{prefix} [{i+1}] {item}\n")
elif isinstance(value, str):
if len(value) > 1000:
f.write(f"{prefix}{key}:\n")
lines = value.split('\n')
for line in lines[:20]:
f.write(f"{prefix}{line}\n")
if len(lines) > 20:
f.write(f"{prefix} ... 等 {len(lines) - 20} 行\n")
else:
lines = value.split('\n')
if len(lines) > 1:
f.write(f"{prefix}{key}:\n")
for line in lines:
f.write(f"{prefix}{line}\n")
else:
f.write(f"{prefix}{key}: {value}\n")
else:
f.write(f"{prefix}{key}: {value}\n")
defgenerate_report(data: Dict[str, Any], section_titles: Dict[str, str]):
"""生成TXT格式报告文件"""
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = f"Linux_Inspection_Report_{timestamp}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
# 写入报告标题
f.write("=" * 70 + "\n")
f.write(" Linux 系统一键巡检报告\n")
f.write("=" * 70 + "\n")
f.write(f"报告时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Python版本: {platform.python_version()}\n")
f.write("=" * 70 + "\n\n")
# 写入各章节内容
for key, title in section_titles.items():
f.write("\n" + "=" * 70 + "\n")
f.write(f" {title}\n")
f.write("=" * 70 + "\n\n")
write_dict_to_file(f, data[key])
f.write("\n" + "=" * 70 + "\n")
f.write(" 巡检完成\n")
f.write("=" * 70 + "\n")
print(f"\n{'=' * 70}")
print(f"巡检报告已保存到: {report_file}")
return report_file
defmain():
"""主函数"""
print("=" * 70)
print(" Linux 系统一键巡检脚本")
print("=" * 70)
print(f"开始巡检: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
ifnot check_os():
print("错误: 此脚本仅适用于Linux操作系统")
input("\n按回车键退出...")
return
# 收集巡检数据
inspection_data = {
'system_info': get_system_info(),
'uptime': get_uptime(),
'performance': get_system_performance(),
'disk_filesystem': get_disk_filesystem(),
'network': get_network_info(),
'services_processes': get_services_processes(),
'logs': get_logs(),
'security': get_security(),
'backup': get_backup(),
'time_sync': get_time_sync()
}
section_titles = {
'system_info': '一、基础信息巡检',
'uptime': '系统运行时间',
'performance': '二、系统负载与性能',
'disk_filesystem': '三、磁盘与文件系统',
'network': '四、网络巡检',
'services_processes': '五、服务与进程',
'logs': '六、日志巡检',
'security': '七、安全与账号',
'backup': '八、备份与一致性',
'time_sync': '九、时间同步'
}
# 打印巡检结果
for key, title in section_titles.items():
print_section(title, inspection_data[key])
# 生成报告
generate_report(inspection_data, section_titles)
print("\n" + "=" * 70)
print(f"巡检完成: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
input("\n按回车键退出...")
if __name__ == '__main__':
main()