
Python凭借其“优雅、明确、简单”的设计哲学,成为运维效率提升的绝佳武器。它不仅仅是连接不同系统的“胶水语言”,更是构建稳定、高效、可扩展的自动化运维体系的核心工具。本文精炼整理了20个Python脚本,覆盖从基础巡检到智能告警的完整自动化场景,解决80%的日常重复性运维工作,可以根据实际的场景适度调整落地使用。
往期阅读>>>
Python 自动化管理Jenkins的15个实用脚本,提升效率
App2Docker:如何无需编写Dockerfile也可以创建容器镜像
Python 自动化识别Nginx配置并导出为excel文件,提升Nginx管理效率
应用场景:新服务器上线,或团队新人配置开发环境,需要快速构建统一、合规的Python运行环境。
功能说明:自动检查并安装指定的Python版本、必要的系统工具及所有项目依赖的第三方库。
#!/usr/bin/env python3# init_env.pyimport subprocessimport sysREQUIRED_PYTHON = (3, 8)REQUIRED_PACKAGES = ['paramiko', 'requests', 'psutil', 'schedule', 'pymysql', 'redis']def check_python(): version = sys.version_info[:2] if version < REQUIRED_PYTHON: print(f"错误: 需要Python {REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}+,当前 {version[0]}.{version[1]}") sys.exit(1) print(f"✓ Python 版本: {sys.version}")def install_packages(): for pkg in REQUIRED_PACKAGES: try: subprocess.run([sys.executable, '-m', 'pip', 'install', '-q', pkg], check=True) print(f"✓ 已安装/更新: {pkg}") except subprocess.CalledProcessError: print(f"✗ 安装失败: {pkg}") sys.exit(1)if __name__ == '__main__': check_python() install_packages() print("环境初始化完成!")
应用场景:所有自动化脚本都需要统一、规范的日志记录,以实现高效的故障调试、行为审计与运营分析。
功能说明:创建一个支持日志轮转、多级别输出、标准化格式的日志配置模块,作为所有脚本的日志基础。
# logging_config.pyimport loggingfrom logging.handlers import RotatingFileHandlerdef setup_logger(name='ops_auto', log_file='/var/log/ops/automation.log', level=logging.INFO): logger = logging.getLogger(name) logger.setLevel(level) # 防止重复添加handler if logger.hasHandlers(): return logger # 控制台输出 console = logging.StreamHandler() console.setLevel(logging.INFO) # 文件输出(轮转) file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5 ) file_handler.setLevel(logging.DEBUG) # 格式化 formatter = logging.Formatter('%(asctime)s | %(name)s | %(levelname)-8s | %(message)s') console.setFormatter(formatter) file_handler.setFormatter(formatter) logger.addHandler(console) logger.addHandler(file_handler) return logger
应用场景:多个自动化脚本需要读取共享的配置信息,如数据库连接串、服务器列表、API密钥等。
功能说明:支持YAML/JSON等多种格式的配置文件,提供配置热重载能力,并对敏感信息提供环境变量覆盖与加密选项,提升安全性。
# config_loader.pyimport yamlimport jsonimport osfrom pathlib import Pathclass Config: _instance = None def __new__(cls, config_path='config.yml'): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.config_path = Path(config_path) cls._instance._load() return cls._instance def _load(self): ext = self.config_path.suffix.lower() with open(self.config_path, 'r') as f: if ext == '.yml' or ext == '.yaml': self._config = yaml.safe_load(f) elif ext == '.json': self._config = json.load(f) else: raise ValueError(f"不支持的配置文件格式: {ext}") # 示例:敏感信息可从环境变量覆盖 if 'mysql' in self._config: self._config['mysql']['password'] = os.getenv('DB_PASS', self._config['mysql'].get('password', '')) def get(self, key, default=None): return self._config.get(key, default)# 使用示例:# cfg = Config().get('servers')
应用场景:在数十或上百台服务器组成的集群中,同时执行标准化命令,如系统更新(apt update)、服务状态检查、信息收集等。
功能说明:基于 paramiko库,支持SSH密钥认证、可调节的并发控制、执行超时设置,并能够将各节点的执行结果进行结构化汇总与格式化输出。
完整代码示例 (batch_ssh_executor.py):
#!/usr/bin/env python3"""并发SSH命令执行器支持:密钥认证、并发控制、超时设置、结果汇总与格式化输出"""import concurrent.futuresimport paramikofrom typing import List, Dict, Tupleimport loggingfrom logging_config import setup_loggerlogger = setup_logger('ssh_executor')class SSHExecutor: def __init__(self, timeout=30, max_workers=10): self.timeout = timeout self.max_workers = max_workers def _execute_single(self, host: str, username: str, key_path: str, command: str) -> Tuple[str, str, bool]: """在单台主机上执行命令""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect(hostname=host, username=username, key_filename=key_path, timeout=self.timeout) stdin, stdout, stderr = client.exec_command(command, timeout=self.timeout) exit_code = stdout.channel.recv_exit_status() output = stdout.read().decode('utf-8', errors='ignore').strip() error = stderr.read().decode('utf-8', errors='ignore').strip() success = (exit_code == 0) if not success: logger.warning(f"主机 {host} 执行失败 (code:{exit_code}): {error}") return output, error, success except Exception as e: logger.error(f"连接/执行失败 {host}: {e}") return "", str(e), False finally: client.close() def run(self, hosts: List[str], username: str, key_path: str, command: str) -> Dict[str, Dict]: """并发执行命令,返回汇总结果""" results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_host = { executor.submit(self._execute_single, host, username, key_path, command): host for host in hosts } for future in concurrent.futures.as_completed(future_to_host): host = future_to_host[future] try: output, error, success = future.result() results[host] = { 'success': success, 'output': output, 'error': error } except Exception as e: results[host] = { 'success': False, 'output': '', 'error': f'执行异常: {e}' } return results# 使用示例if __name__ == '__main__': executor = SSHExecutor() hosts = ['192.168.1.10', '192.168.1.11', '192.168.1.12'] results = executor.run( hosts=hosts, username='ops', key_path='/home/ops/.ssh/id_rsa', command='uptime && df -h | grep -E "/($|/data)"' ) # 输出格式化报告 for host, data in results.items(): status = "成功" if data['success'] else "失败" print(f"{host}: {status}") if data['output']: print(f" 输出: {data['output'][:200]}...") # 截断长输出
应用场景:将统一的应用程序配置文件、脚本、证书或静态资源,安全、高效地同步到大批量服务器,尤其适用于跨地域集群的配置管理。
功能说明:实现增量同步,通过MD5校验保障文件一致性,支持断点续传,并可灵活配置排除规则(如临时文件、版本控制目录)。
核心代码片段 (file_sync.py):
import hashlibimport osimport paramikofrom concurrent.futures import ThreadPoolExecutorfrom pathlib import Pathclass IntelligentFileSync: def __init__(self, local_base, remote_base, exclude_patterns=None): self.local_base = Path(local_base) self.remote_base = remote_base self.exclude_patterns = exclude_patterns or ['.tmp', '.git', '__pycache__'] def _calculate_md5(self, filepath: Path) -> str: """计算文件的MD5值,用于校验""" hash_md5 = hashlib.md5() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): hash_md5.update(chunk) return hash_md5.hexdigest() def _should_exclude(self, filepath: Path) -> bool: """判断文件是否在排除名单内""" for pattern in self.exclude_patterns: if pattern in str(filepath): return True return False def _sync_to_host(self, host: str, username: str, key_path: str): """同步文件到单台主机(核心逻辑)""" ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(hostname=host, username=username, key_filename=key_path) sftp = ssh.open_sftp() # 遍历本地目录 for local_file in self.local_base.rglob('*'): if local_file.is_file() and not self._should_exclude(local_file): relative_path = local_file.relative_to(self.local_base) remote_file = f"{self.remote_base}/{relative_path}" # 检查远程文件是否存在及MD5是否一致 try: remote_attr = sftp.stat(str(remote_file)) # 可在此处添加MD5比较逻辑,决定是否覆盖 local_md5 = self._calculate_md5(local_file) # 假设通过一个远程命令获取MD5(需远程支持) # 如果MD5不同或文件大小不同,则覆盖 if remote_attr.st_size != local_file.stat().st_size: self._upload_file(sftp, local_file, remote_file) except FileNotFoundError: # 远程文件不存在,直接上传 self._upload_file(sftp, local_file, remote_file) sftp.close() print(f"[{host}] 同步完成") except Exception as e: print(f"[{host}] 同步失败: {e}") finally: ssh.close() def _upload_file(self, sftp, local_path, remote_path): """实现文件上传,可扩展为断点续传""" # 确保远程目录存在 remote_dir = os.path.dirname(remote_path) try: sftp.mkdir(remote_dir) except: pass sftp.put(str(local_path), remote_path)
应用场景:新服务器上线后,进行标准化、自动化的基础软件栈部署与配置,例如安装Nginx、配置防火墙规则、部署监控代理等。
#!/usr/bin/env python3# deploy_stack.pydef deploy_nginx(host): commands = [ 'apt update && apt install -y nginx', 'systemctl enable nginx', 'cp /tmp/nginx-site.conf /etc/nginx/sites-available/myapp', 'ln -sf /etc/nginx/sites-available/myapp /etc/nginx/sites-enabled/', 'nginx -t && systemctl reload nginx' ] # 调用脚本4的SSHExecutor并发执行命令列表 # ... 执行逻辑
应用场景:每日或每周自动生成服务器集群的健康状态报告,涵盖CPU、内存、磁盘、关键进程等核心指标,实现趋势分析与预警。
# system_inspector.pyimport psutildef generate_inspection_report(): report_lines = [] # 检查磁盘 for part in psutil.disk_partitions(): usage = psutil.disk_usage(part.mountpoint) if usage.percent > 85: report_lines.append(f"警告: 磁盘 {part.mountpoint} 使用率 {usage.percent}%") # 检查内存 mem = psutil.virtual_memory() if mem.percent > 90: report_lines.append(f"紧急: 内存使用率 {mem.percent}%") # 检查关键进程 if 'nginx' not in (p.name() for p in psutil.process_iter(['name'])): report_lines.append("关键: Nginx进程不存在") return report_lines
应用场景:在微服务或多服务器环境中,批量管理所有服务器的Crontab配置,实现任务的统一视图、批量更新、合规检查与备份恢复,避免逐台手动修改的低效和错误。
功能说明:通过SSH远程读取、解析、更新Crontab,支持任务的增、删、改、查及备份。
应用代码片段:
# cron_manager.py# 在实际使用脚本8时,可以这样集成到运维流程中:from cron_manager import CronManagerdef deploy_new_cron_job(servers, new_job): """在所有服务器上部署新的定时任务(例如,每天凌晨3点清理临时文件)""" cm = CronManager() for server in servers: # 先备份当前crontab cm.backup_crontab(server, backup_dir='/backup/cron/') # 添加新任务 cm.update_cron_remotely(server, new_job, action='add') # 验证添加成功 current = cm.update_cron_remotely(server, None, action='list') if new_job not in current: raise Exception(f"在 {server} 上添加Cron任务失败")# 新任务示例:每天凌晨3点清理 /tmp 下超过7天的文件new_job = '0 3 * * * find /tmp -type f -mtime +7 -delete >> /var/log/tmp_cleanup.log 2>&1'servers = ['web01', 'web02', 'api01', 'db01']deploy_new_cron_job(servers, new_job)
应用场景:为新项目或新团队批量创建系统账户,并统一配置SSH密钥、Home目录及特定的Sudo权限。
# user_manager.pydef bulk_create_users(hosts, username, ssh_key): commands = [ f'useradd -m -s /bin/bash {username}', f'mkdir -p /home/{username}/.ssh', f'echo "{ssh_key}" >> /home/{username}/.ssh/authorized_keys', f'echo "{username} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart nginx" >> /etc/sudoers.d/{username}' ] # 调用并发SSH执行器执行命令列表 # ... 执行逻辑
应用场景:以可配置的时间间隔,实时采集服务器的CPU、内存、磁盘IO、网络等性能指标,并推送至Prometheus、InfluxDB等监控系统或中央数据库,用于可视化与长期趋势分析。
功能说明:支持多维度指标采集、可配置的数据推送目标、异常检测与阈值告警。
#!/usr/bin/env python3# system_monitor.pyimport timeimport jsonimport psutilimport socketfrom datetime import datetimefrom threading import Threadfrom queue import Queueimport loggingfrom logging_config import setup_loggerlogger = setup_logger('system_monitor')class SystemMonitor: def __init__(self, interval=60, push_targets=None): """初始化系统监控器 Args: interval: 监控采样间隔(秒) push_targets: 数据推送目标列表,如 ['prometheus', 'influxdb'] """ self.interval = interval self.push_targets = push_targets or [] self.running = False self.metrics_queue = Queue() self.hostname = socket.gethostname() def collect_system_metrics(self): """收集全面的系统指标""" timestamp = datetime.now().isoformat() # CPU指标 cpu_times = psutil.cpu_times_percent(interval=0.5) cpu_percent = psutil.cpu_percent(interval=0.5, percpu=True) # 内存指标 memory = psutil.virtual_memory() swap = psutil.swap_memory() # 磁盘指标 disk_usage = {} disk_io = {} for part in psutil.disk_partitions(): if part.fstype: try: usage = psutil.disk_usage(part.mountpoint) disk_usage[part.mountpoint] = { 'total': usage.total, 'used': usage.used, 'free': usage.free, 'percent': usage.percent } except Exception as e: logger.warning(f"无法获取磁盘使用信息 {part.mountpoint}: {e}") # 网络指标 net_io = psutil.net_io_counters() # 进程和负载 load_avg = psutil.getloadavg() metrics = { 'timestamp': timestamp, 'hostname': self.hostname, 'cpu': { 'percent_total': psutil.cpu_percent(interval=0), 'percent_per_core': cpu_percent, 'user': cpu_times.user, 'system': cpu_times.system, 'idle': cpu_times.idle, 'iowait': cpu_times.iowait if hasattr(cpu_times, 'iowait') else 0 }, 'memory': { 'total': memory.total, 'available': memory.available, 'used': memory.used, 'free': memory.free, 'percent': memory.percent, 'swap_total': swap.total, 'swap_used': swap.used, 'swap_free': swap.free, 'swap_percent': swap.percent }, 'disk': { 'usage': disk_usage, 'read_bytes_per_sec': disk_io.get('read_bytes', 0), 'write_bytes_per_sec': disk_io.get('write_bytes', 0) }, 'network': { 'bytes_sent': net_io.bytes_sent, 'bytes_recv': net_io.bytes_recv, 'packets_sent': net_io.packets_sent, 'packets_recv': net_io.packets_recv, 'errin': net_io.errin, 'errout': net_io.errout }, 'system': { 'load_avg_1min': load_avg[0], 'load_avg_5min': load_avg[1], 'load_avg_15min': load_avg[2], 'uptime': time.time() - psutil.boot_time(), 'process_count': len(psutil.pids()) } } return metrics def check_thresholds(self, metrics): """检查指标是否超过阈值""" alerts = [] # CPU阈值检查 if metrics['cpu']['percent_total'] > 90: alerts.append({ 'level': 'CRITICAL', 'metric': 'cpu_percent', 'value': metrics['cpu']['percent_total'], 'threshold': 90, 'message': f"CPU使用率过高: {metrics['cpu']['percent_total']:.1f}%" }) elif metrics['cpu']['percent_total'] > 80: alerts.append({ 'level': 'WARNING', 'metric': 'cpu_percent', 'value': metrics['cpu']['percent_total'], 'threshold': 80, 'message': f"CPU使用率较高: {metrics['cpu']['percent_total']:.1f}%" }) # 内存阈值检查 if metrics['memory']['percent'] > 95: alerts.append({ 'level': 'CRITICAL', 'metric': 'memory_percent', 'value': metrics['memory']['percent'], 'threshold': 95, 'message': f"内存使用率过高: {metrics['memory']['percent']:.1f}%" }) elif metrics['memory']['percent'] > 85: alerts.append({ 'level': 'WARNING', 'metric': 'memory_percent', 'value': metrics['memory']['percent'], 'threshold': 85, 'message': f"内存使用率较高: {metrics['memory']['percent']:.1f}%" }) # 磁盘阈值检查 for mountpoint, usage in metrics['disk']['usage'].items(): if usage['percent'] > 95: alerts.append({ 'level': 'CRITICAL', 'metric': f'disk_usage_{mountpoint.replace("/", "_")}', 'value': usage['percent'], 'threshold': 95, 'message': f"磁盘 {mountpoint} 使用率过高: {usage['percent']:.1f}%" }) elif usage['percent'] > 85: alerts.append({ 'level': 'WARNING', 'metric': f'disk_usage_{mountpoint.replace("/", "_")}', 'value': usage['percent'], 'threshold': 85, 'message': f"磁盘 {mountpoint} 使用率较高: {usage['percent']:.1f}%" }) # 系统负载检查 cpu_count = len(metrics['cpu']['percent_per_core']) if metrics['system']['load_avg_1min'] > cpu_count * 4: alerts.append({ 'level': 'CRITICAL', 'metric': 'load_average', 'value': metrics['system']['load_avg_1min'], 'threshold': cpu_count * 4, 'message': f"系统负载过高: {metrics['system']['load_avg_1min']:.2f} (CPU数: {cpu_count})" }) return alerts def push_to_prometheus(self, metrics): """将指标推送到Prometheus Pushgateway""" try: import requests prometheus_data = [] # 转换指标为Prometheus格式 # CPU使用率 prometheus_data.append(f'cpu_percent_total{{host="{self.hostname}"}} {metrics["cpu"]["percent_total"]}') prometheus_data.append(f'cpu_user{{host="{self.hostname}"}} {metrics["cpu"]["user"]}') prometheus_data.append(f'cpu_system{{host="{self.hostname}"}} {metrics["cpu"]["system"]}') # 内存使用率 prometheus_data.append(f'memory_percent{{host="{self.hostname}"}} {metrics["memory"]["percent"]}') prometheus_data.append(f'memory_available_bytes{{host="{self.hostname}"}} {metrics["memory"]["available"]}') # 磁盘使用率(每个挂载点) for mountpoint, usage in metrics['disk']['usage'].items(): safe_mountpoint = mountpoint.replace('/', '_').replace('.', '_') prometheus_data.append(f'disk_usage_percent{{host="{self.hostname}",mountpoint="{mountpoint}"}} {usage["percent"]}') # 发送到Pushgateway pushgateway_url = " http://localhost:9091/metrics/job/system_monitor " response = requests.post( pushgateway_url, data='\n'.join(prometheus_data), timeout=5 ) if response.status_code == 200: logger.debug("指标已推送至Prometheus") else: logger.warning(f"Prometheus推送失败: {response.status_code}") except ImportError: logger.warning("requests库未安装,无法推送到Prometheus") except Exception as e: logger.error(f"Prometheus推送出错: {e}") def push_to_influxdb(self, metrics): """将指标推送到InfluxDB""" try: from influxdb import InfluxDBClient influx_points = [] # CPU指标 cpu_point = { "measurement": "cpu", "tags": { "host": self.hostname }, "time": metrics['timestamp'], "fields": { "usage": metrics['cpu']['percent_total'], "user": metrics['cpu']['user'], "system": metrics['cpu']['system'], "idle": metrics['cpu']['idle'] } } influx_points.append(cpu_point) # 内存指标 memory_point = { "measurement": "memory", "tags": { "host": self.hostname }, "time": metrics['timestamp'], "fields": { "used_percent": metrics['memory']['percent'], "available_bytes": metrics['memory']['available'], "total_bytes": metrics['memory']['total'] } } influx_points.append(memory_point) # 磁盘指标(每个挂载点) for mountpoint, usage in metrics['disk']['usage'].items(): disk_point = { "measurement": "disk", "tags": { "host": self.hostname, "mountpoint": mountpoint }, "time": metrics['timestamp'], "fields": { "used_percent": usage['percent'], "used_bytes": usage['used'], "total_bytes": usage['total'] } } influx_points.append(disk_point) # 连接到InfluxDB并写入数据 client = InfluxDBClient(host='localhost', port=8086, database='system_metrics') client.write_points(influx_points) logger.debug("指标已推送至InfluxDB") except ImportError: logger.warning("influxdb库未安装,无法推送到InfluxDB") except Exception as e: logger.error(f"InfluxDB推送出错: {e}") def save_to_local_file(self, metrics, alerts=None): """将指标保存到本地JSON文件(用于调试或备份)""" try: data = { 'metrics': metrics, 'alerts': alerts or [] } # 每天一个文件 date_str = datetime.now().strftime('%Y%m%d') filename = f"/var/log/system_monitor/{date_str}.json" import os os.makedirs(os.path.dirname(filename), exist_ok=True) # 读取现有数据(如果文件存在) existing_data = [] if os.path.exists(filename): with open(filename, 'r') as f: try: existing_data = json.load(f) except json.JSONDecodeError: existing_data = [] # 添加新数据 if not isinstance(existing_data, list): existing_data = [] existing_data.append(data) # 写入文件 with open(filename, 'w') as f: json.dump(existing_data, f, indent=2) except Exception as e: logger.error(f"保存到本地文件出错: {e}") def monitoring_loop(self): """监控主循环""" logger.info(f"系统监控器启动 (间隔: {self.interval}秒)") while self.running: try: # 收集指标 metrics = self.collect_system_metrics() # 检查阈值 alerts = self.check_thresholds(metrics) # 触发告警(如果有的话) for alert in alerts: if alert['level'] == 'CRITICAL': logger.critical(alert['message']) else: logger.warning(alert['message']) # 推送到配置的目标 for target in self.push_targets: if target == 'prometheus': self.push_to_prometheus(metrics) elif target == 'influxdb': self.push_to_influxdb(metrics) elif target == 'local_file': self.save_to_local_file(metrics, alerts) # 将数据放入队列(供其他线程消费) self.metrics_queue.put({ 'metrics': metrics, 'alerts': alerts }) # 等待下一个采样周期 time.sleep(self.interval) except Exception as e: logger.error(f"监控循环出错: {e}") time.sleep(10) # 出错后等待10秒再重试 def start(self): """启动监控器""" if not self.running: self.running = True self.monitor_thread = Thread(target=self.monitoring_loop, daemon=True) self.monitor_thread.start() logger.info("系统监控器已启动") def stop(self): """停止监控器""" self.running = False if hasattr(self, 'monitor_thread'): self.monitor_thread.join(timeout=10) logger.info("系统监控器已停止") def get_latest_metrics(self): """获取最新的监控指标""" # 清空队列中的旧数据,只返回最新的 latest = None while not self.metrics_queue.empty(): latest = self.metrics_queue.get() return latest def generate_summary_report(self, metrics): """生成监控摘要报告""" if not metrics: return "暂无监控数据" report_lines = [ f"=== 系统监控摘要 ===", f"主机: {metrics['hostname']}", f"时间: {metrics['timestamp']}", "", "CPU使用率:", f" 总体: {metrics['cpu']['percent_total']:.1f}%", f" 用户态: {metrics['cpu']['user']:.1f}%", f" 内核态: {metrics['cpu']['system']:.1f}%", f" 空闲: {metrics['cpu']['idle']:.1f}%", "", "内存状态:", f" 使用率: {metrics['memory']['percent']:.1f}%", f" 可用内存: {metrics['memory']['available']/ (1024**3):.1f} GB", "", "磁盘使用:" ] for mountpoint, usage in metrics['disk']['usage'].items(): report_lines.append(f" {mountpoint}: {usage['percent']:.1f}% ({usage['used']/(1024**3):.1f}/{usage['total']/(1024**3):.1f} GB)") report_lines.extend([ "", "网络状态:", f" 发送: {metrics['network']['bytes_sent']/(1024**2):.1f} MB", f" 接收: {metrics['network']['bytes_recv']/(1024**2):.1f} MB", "", "系统负载:", f" 1分钟: {metrics['system']['load_avg_1min']:.2f}", f" 5分钟: {metrics['system']['load_avg_5min']:.2f}", f" 15分钟: {metrics['system']['load_avg_15min']:.2f}", f" 运行时间: {metrics['system']['uptime']/3600:.1f} 小时", f" 进程数: {metrics['system']['process_count']}" ]) return '\n'.join(report_lines)# 使用示例if __name__ == '__main__': # 创建监控实例,配置推送目标 monitor = SystemMonitor( interval=30, # 每30秒采样一次 push_targets=['prometheus', 'local_file'] # 推送到Prometheus并保存本地文件 ) try: # 启动监控 monitor.start() print("系统监控正在运行...") print("按 Ctrl+C 停止监控") # 主循环中展示摘要信息 while True: time.sleep(60) latest = monitor.get_latest_metrics() if latest and 'metrics' in latest: report = monitor.generate_summary_report(latest['metrics']) print(report) # 如果有告警,特别显示 if latest.get('alerts'): print("当前告警:") for alert in latest['alerts']: print(f" {alert['level']}: {alert['message']}") print("-" * 50) except KeyboardInterrupt: print("\n正在停止监控...") monitor.stop() print("监控已停止")
应用场景:实时跟踪Nginx访问日志、应用错误日志等,自动识别错误率激增、异常访问模式(如爬虫、攻击)、安全威胁(如SQL注入特征)等,触发实时告警。
功能说明:支持实时尾随日志文件、正则匹配关键模式、基于滑动窗口的阈值统计,并可集成邮件、即时通讯工具等进行告警通知。
核心代码框架 (log_analyzer.py):
#!/usr/bin/env python3"""日志实时分析与智能告警支持:实时尾随日志文件、正则匹配关键错误、滑动窗口统计、阈值告警"""import reimport timefrom collections import deque, defaultdictimport subprocessfrom datetime import datetime# 假设使用上面定义的 logging_config 和邮件发送库from logging_config import setup_loggerimport smtplibfrom email.mime.text import MIMETextlogger = setup_logger('log_analyzer')class LogAnalyzer: def __init__(self, log_path, alert_rules): self.log_path = log_path self.alert_rules = alert_rules # 示例: [{'pattern': 'ERROR', 'threshold': 10, 'window': 60}] self.counters = defaultdict(lambda: deque(maxlen=300)) # 滑动窗口计数器 def tail_log(self): """模拟 tail -f 行为,持续读取日志新增行""" process = subprocess.Popen(['tail', '-F', '-n', '0', self.log_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: line = process.stdout.readline() if line: yield line.decode('utf-8', errors='ignore').strip() else: time.sleep(0.1) def analyze_line(self, line: str): """分析单行日志,更新计数器并检查告警""" timestamp = datetime.now() for rule in self.alert_rules: pattern = re.compile(rule['pattern']) if pattern.search(line): key = rule['pattern'] self.counters[key].append(timestamp) # 检查时间窗口内的数量是否超过阈值 window_sec = rule['window'] threshold = rule['threshold'] recent_count = sum(1 for ts in self.counters[key] if (timestamp - ts).total_seconds() <= window_sec) if recent_count >= threshold: self.trigger_alert(rule, line, recent_count) # 清空该计数器,避免重复告警 self.counters[key].clear() def trigger_alert(self, rule, sample_line, count): """触发告警,可发送邮件、短信或调用Webhook""" subject = f"[日志告警] 模式 '{rule['pattern']}' 在{rule['window']}秒内出现{count}次" body = f"""告警规则: {rule}示例日志: {sample_line}触发时间: {datetime.now()}计数: {count} """ logger.critical(subject) self.send_email(subject, body) # 可扩展:调用 Slack Webhook、企业微信机器人等 def send_email(self, subject, body): """发送告警邮件(示例)""" # 实际使用时应配置SMTP服务器、收件人列表等 msg = MIMEText(body) msg['Subject'] = subject msg['From'] = 'log_alert@yourcompany.com' msg['To'] = 'ops-team@yourcompany.com' try: with smtplib.SMTP('localhost') as server: server.send_message(msg) except Exception as e: logger.error(f"发送邮件失败: {e}") def run(self): """主循环""" for line in self.tail_log(): self.analyze_line(line)if __name__ == '__main__': # 示例规则:5分钟内出现10次“ERROR”或“5xx”状态码则告警 rules = [ {'pattern': r'\bERROR\b', 'threshold': 10, 'window': 300}, {'pattern': r'\s5\d\d\s', 'threshold': 20, 'window': 300}, {'pattern': r'(SQL注入|XSS|csrf)', 'threshold': 1, 'window': 60}, # 安全规则 ] analyzer = LogAnalyzer('/var/log/nginx/access.log', rules) analyzer.run()
应用场景:定时对内网关键服务(如数据库、缓存、消息队列)或对外服务的端口(如80, 443, 22)进行连通性检查,快速发现网络隔离或服务宕机。
应用场景:定时对内网关键服务(如数据库、缓存、消息队列)或对外服务的端口(如80, 443, 22)进行连通性检查,快速发现网络隔离或服务宕机。
# port_checker.pyimport socketdef check_port(host, port, timeout=3): try: with socket.create_connection((host, port), timeout=timeout): return True except: return False# 可搭配 schedule 库实现定时扫描列表
应用场景:自动检查所有对外服务域名及内部自签证书的有效期,提前设定天数(如30天)发出更新告警,避免因证书过期导致的服务不可用。
功能说明:支持批量检查域名、IP及非标准端口,提供详细的证书信息(签发者、有效期、剩余天数),并生成清晰的可读报告。
完整代码示例 (ssl_cert_checker.py):
#!/usr/bin/env python3"""SSL/TLS证书过期监控支持:批量检查域名/IP、提前N天告警、多种通知方式"""import sslimport socketfrom datetime import datetime, timedeltaimport concurrent.futuresfrom typing import List, Dictimport loggingfrom logging_config import setup_loggerlogger = setup_logger('ssl_checker')class SSLCertChecker: def __init__(self, alert_days_before=30): self.alert_days_before = alert_days_before def get_cert_info(self, hostname: str, port: int = 443) -> Dict: """获取指定主机的SSL证书信息""" context = ssl.create_default_context() with socket.create_connection((hostname, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: cert = ssock.getpeercert() # 解析证书有效期 not_after_str = cert['notAfter'] not_after = datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z') not_before_str = cert['notBefore'] not_before = datetime.strptime(not_before_str, '%b %d %H:%M:%S %Y %Z') issuer = dict(x[0] for x in cert['issuer']) subject = dict(x[0] for x in cert['subject']) return { 'hostname': hostname, 'valid_from': not_before, 'valid_to': not_after, 'issuer': issuer.get('organizationName', 'Unknown'), 'common_name': subject.get('commonName', 'Unknown'), 'days_left': (not_after - datetime.now()).days } def check_domains(self, domains: List[str]) -> List[Dict]: """批量检查域名,返回检查结果""" results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_domain = {executor.submit(self.get_cert_info, domain): domain for domain in domains} for future in concurrent.futures.as_completed(future_to_domain): domain = future_to_domain[future] try: cert_info = future.result(timeout=15) results.append(cert_info) except (ssl.SSLError, socket.timeout, ConnectionRefusedError) as e: results.append({ 'hostname': domain, 'error': str(e), 'days_left': None }) except Exception as e: logger.error(f"检查 {domain} 时发生未知错误: {e}") results.append({ 'hostname': domain, 'error': f'Unknown error: {e}', 'days_left': None }) return results def generate_report(self, results: List[Dict]) -> str: """生成可读的报告""" report_lines = ["SSL/TLS证书检查报告", "="*40, f"检查时间: {datetime.now()}\n"] for r in results: if 'error' in r: report_lines.append(f"{r['hostname']}: 检查失败 - {r['error']}") else: status = "true" if r['days_left'] < 0: status = "已过期" elif r['days_left'] < self.alert_days_before: status = "即将过期" report_lines.append( f"{status} {r['hostname']} (CN: {r['common_name']}): " f"剩余 {r['days_left']} 天, 签发者: {r['issuer']}, " f"有效期: {r['valid_from'].date()} 至 {r['valid_to'].date()}" ) return "\n".join(report_lines) def run_and_alert(self, domains: List[str]): """执行检查并触发告警""" results = self.check_domains(domains) report = self.generate_report(results) logger.info(report) # 检查是否有即将过期的证书 urgent = [r for r in results if 'days_left' in r and 0 <= r['days_left'] < self.alert_days_before] if urgent: alert_msg = f"发现 {len(urgent)} 个证书将在 {self.alert_days_before} 天内过期:\n" for u in urgent: alert_msg += f"- {u['hostname']} 剩余 {u['days_left']} 天\n" logger.warning(alert_msg) # 这里可以调用发送邮件或钉钉/企微机器人 # self.send_alert(alert_msg)if __name__ == '__main__': # 示例域名列表,可从配置文件或数据库读取 domains_to_check = [ ' www.example.com', 'api.example.com', 'blog.example.com', 'internal-tool.yourcompany.com:8443', # 支持非443端口 ] checker = SSLCertChecker(alert_days_before=30) checker.run_and_alert(domains_to_check)
应用场景:确保如Java应用、消息队列消费者等关键业务进程持续存活,并监控其是否存在内存泄漏、CPU异常飙高等资源问题。
# process_watcher.pyimport psutildef monitor_process(process_name, max_memory_mb=1024): for proc in psutil.process_iter(['pid', 'name', 'memory_info']): if proc.info['name'] and process_name in proc.info['name']: mem_mb = proc.info['memory_info'].rss / 1024 / 1024 if mem_mb > max_memory_mb: # 发送告警,或尝试重启 proc.terminate() return True # 进程不存在,触发告警 return False
应用场景:将脚本10-14的监控结果进行聚合、分析,自动生成格式美观、内容全面的HTML或Markdown格式的每日/每周健康报告,并通过邮件或即时通讯工具自动发送给相关团队。
# daily_reporter.pydef generate_html_report(metrics, logs, alerts): html = "<h1>系统日报</h1>" html += f"<p>检查时间: {datetime.now()}</p>" html += f"<h2>资源概览</h2><p>CPU平均负载: {metrics['cpu_avg']}...</p>" # ... 整合各项数据 return html
应用场景:制定定时(如每日凌晨)的全量备份与更频繁的增量备份策略,自动清理过期备份文件,并定期对备份文件进行恢复测试,确保备份的有效性。
#!/usr/bin/env python3"""MySQL基础备份 - 简化版功能:执行全量备份,压缩存盘"""import subprocessimport osfrom datetime import datetimedef mysql_backup(host='localhost', user='root', password='', backup_dir='/var/backup/mysql'): """执行MySQL全量备份""" # 创建备份目录 os.makedirs(backup_dir, exist_ok=True) # 生成备份文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = f"{backup_dir}/backup_{timestamp}.sql.gz" # 构建mysqldump命令 cmd = f"mysqldump --host={host} --user={user}" if password: cmd += f" --password={password}" cmd += f" --all-databases --single-transaction" cmd += f" | gzip > {backup_file}" # 执行备份 print(f"开始备份: {backup_file}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode == 0: size = os.path.getsize(backup_file) / 1024 / 1024 print(f"备份成功!大小: {size:.2f} MB") return backup_file else: print(f"备份失败: {result.stderr}") return Nonedef cleanup_old_backups(backup_dir, keep_days=7): """清理旧备份(保留最近7天)""" from datetime import datetime, timedelta cutoff_date = datetime.now() - timedelta(days=keep_days) deleted = 0 for filename in os.listdir(backup_dir): if filename.startswith('backup_') and filename.endswith('.sql.gz'): filepath = os.path.join(backup_dir, filename) # 从文件名提取日期 try: date_str = filename[7:15] # 提取20250113 file_date = datetime.strptime(date_str, '%Y%m%d') if file_date < cutoff_date: os.remove(filepath) deleted += 1 except: continue if deleted: print(f"清理了 {deleted} 个过期备份")if __name__ == '__main__': # 执行备份 backup = mysql_backup( host='localhost', user='root', password='your_password', # 建议使用环境变量 backup_dir='/var/backup/mysql' ) # 清理过期备份 if backup: cleanup_old_backups('/var/backup/mysql', keep_days=7)
应用场景:定期(如每周)分析MySQL慢查询日志,自动找出最耗时的TOP N条SQL语句,分析其执行计划,并给出可能的索引优化建议,辅助性能调优。
#!/usr/bin/env python3"""MySQL慢查询分析 - 简化版功能:找出最慢的SQL,提供简单优化建议"""import pymysqldef analyze_slow_queries(host='localhost', user='root', password='', top_n=10): """分析MySQL慢查询""" # 连接数据库 connection = pymysql.connect( host=host, user=user, password=password, database='mysql' ) try: with connection.cursor() as cursor: # 1. 查看慢查询统计 cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'") slow_count = cursor.fetchone() print(f"总慢查询数: {slow_count}") # 2. 获取慢查询日志(如果启用) cursor.execute("SHOW VARIABLES LIKE 'slow_query_log'") slow_log_enabled = cursor.fetchone() if slow_log_enabled == 'ON': # 查看最慢的查询(从slow_log表,MySQL 5.1+) cursor.execute(""" SELECT start_time, query_time, lock_time, rows_sent, rows_examined, SUBSTRING(sql_text, 1, 200) as sql_snippet FROM mysql.slow_log ORDER BY query_time DESC LIMIT %s """, (top_n,)) results = cursor.fetchall() print(f"TOP {top_n} 最慢查询:") for i, row in enumerate(results, 1): start_time, query_time, lock_time, rows_sent, rows_examined, sql = row print(f"\n{i}. 查询时间: {query_time:.3f}秒") print(f" 扫描行数: {rows_examined:,},返回行数: {rows_sent:,}") print(f" 效率比: {rows_sent/rows_examined*100:.1f}%" if rows_examined > 0 else " 效率比: N/A") print(f" 锁时间: {lock_time:.3f}秒") print(f" SQL片段: {sql}...") # 简单优化建议 if rows_examined > 10000 and rows_examined > rows_sent * 10: print("建议: 检查是否需要索引,或优化WHERE条件") if lock_time > 0.1: print("建议: 长时间锁表,考虑事务优化") if query_time > 5: print("建议: 查询超时,可能需要分表或优化架构") else: print("慢查询日志未开启,请先配置:") print(" SET GLOBAL slow_query_log = 'ON';") print(" SET GLOBAL long_query_time = 2;") print(" SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';") # 3. 检查未使用索引的查询 print(f"索引使用统计:") cursor.execute("SHOW GLOBAL STATUS LIKE 'Handler_read%'") for name, value in cursor.fetchall(): print(f" {name}: {value}") finally: connection.close()def basic_optimization_suggestions(): """提供基础优化建议""" suggestions = [ "1. 为频繁查询的WHERE条件列添加索引", "2. 避免 SELECT *,只选择需要的列", "3. 大表考虑分区(按时间或范围)", "4. 定期执行 ANALYZE TABLE 更新统计信息", "5. 检查并优化JOIN查询,确保有索引" ] print("基础优化建议:") for suggestion in suggestions: print(f" {suggestion}")if __name__ == '__main__': # 分析慢查询 analyze_slow_queries( host='localhost', user='root', password='your_password', # 建议使用环境变量 top_n=5 ) # 提供优化建议 basic_optimization_suggestions()
应用场景:定时备份Redis的RDB或AOF文件到安全位置,同时分析实例中的大Key(可能引起阻塞)、热Key及过期键模式,为容量规划与性能优化提供依据。
# redis_manager.pyimport redisdef backup_and_analyze(conn_params): r = redis.Redis(**conn_params) # 执行 BGSAVE r.bgsave() # 分析大Key(采样) for key in r.scan_iter(count=100): size = r.memory_usage(key) if size > 1024 * 1024: # 大于1MB print(f"大Key: {key}, 大小: {size/1024/1024:.2f} MB")
应用场景:在CI/CD或开发测试环境中,自动清理已停止的容器、未被任何容器引用的悬空镜像(dangling images)、以及未被使用的数据卷(volume),有效释放磁盘空间。
#!/usr/bin/env python3"""Docker容器与镜像垃圾清理功能:自动清理停止的容器、无用镜像、废弃Volume,释放磁盘空间"""import subprocessimport jsonimport osfrom datetime import datetime, timedeltaimport refrom typing import List, Dict, Tupleimport loggingfrom logging_config import setup_loggerimport shutillogger = setup_logger('docker_cleanup')class DockerCleanupManager: def __init__(self, dry_run=False, min_age_days=7, keep_last_images=5): """初始化Docker清理管理器 Args: dry_run: 试运行模式,只显示将要清理的内容,不实际执行 min_age_days: 最小保留天数,早于此时间的镜像/容器才考虑清理 keep_last_images: 对于每个镜像标签,保留最近几个版本 """ self.dry_run = dry_run self.min_age_days = min_age_days self.keep_last_images = keep_last_images self.cutoff_date = datetime.now() - timedelta(days=min_age_days) # 验证docker是否可用 if not self._check_docker_available(): raise RuntimeError("Docker不可用,请确保docker服务正在运行") def _check_docker_available(self): """检查Docker是否可用""" try: result = subprocess.run(['docker', '--version'], capture_output=True, text=True) return result.returncode == 0 except FileNotFoundError: return False def _run_docker_command(self, command: List[str], timeout=30) -> Dict: """执行Docker命令并返回JSON格式结果""" try: cmd = ['docker'] + command + ['--format', '{{json .}}'] result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) if result.returncode != 0: logger.error(f"Docker命令失败: {' '.join(cmd)}") logger.error(f"错误输出: {result.stderr}") return None # 解析JSON输出(每行一个JSON对象) items = [] for line in result.stdout.strip().split('\n'): if line: try: items.append(json.loads(line)) except json.JSONDecodeError: logger.warning(f"无法解析JSON: {line}") return items except subprocess.TimeoutExpired: logger.error(f"Docker命令超时: {' '.join(command)}") return None except Exception as e: logger.error(f"执行Docker命令异常: {e}") return None def get_stopped_containers(self): """获取所有已停止的容器""" containers = self._run_docker_command(['ps', '-a', '-f', 'status=exited']) stopped_containers = [] if containers: for container in containers: # 排除某些不应自动清理的容器(如数据备份容器) # 可以根据容器名称或标签进行过滤 name = container.get('Names', '') labels = container.get('Labels', '') # 排除保护标签的容器 if 'protect=true' in labels: continue # 排除特定名称模式的容器 if any(pattern in name for pattern in ['backup', 'restore', 'critical']): continue stopped_containers.append(container) return stopped_containers def get_dangling_images(self): """获取悬空镜像(未被任何容器引用的中间层镜像)""" images = self._run_docker_command(['images', '-f', 'dangling=true']) return images if images else [] def get_unused_images(self): """获取未使用的镜像(长时间未被使用的镜像)""" all_images = self._run_docker_command(['images', '--no-trunc']) if not all_images: return [] # 获取所有正在运行的容器使用的镜像 running_containers = self._run_docker_command(['ps', '-q']) used_image_ids = set() if running_containers: for container_id in [c.get('ID', '') for c in running_containers]: # 获取容器使用的镜像ID inspect_cmd = ['docker', 'inspect', '--format', '{{.Image}}', container_id] result = subprocess.run(inspect_cmd, capture_output=True, text=True) if result.returncode == 0: used_image_ids.add(result.stdout.strip()) # 找出未被使用的镜像 unused_images = [] for image in all_images: image_id = image.get('ID', '') # 排除基础镜像(如alpine、ubuntu等) repository = image.get('Repository', '') if repository in ['<none>', '']: continue # 排除正在使用的镜像 if image_id in used_image_ids: continue # 分析镜像标签,确定是否应该保留 tag = image.get('Tag', '') created_str = image.get('CreatedAt', '') # 尝试解析创建时间 created_date = self._parse_docker_date(created_str) if created_date and created_date < self.cutoff_date: unused_images.append(image) return unused_images def get_dangling_volumes(self): """获取悬空Volume(未被任何容器引用的数据卷)""" volumes = self._run_docker_command(['volume', 'ls', '-f', 'dangling=true']) return volumes if volumes else [] def get_unused_volumes(self): """获取未使用的Volume(长时间未挂载的数据卷)""" all_volumes = self._run_docker_command(['volume', 'ls']) if not all_volumes: return [] # 获取所有正在使用的Volume running_containers = self._run_docker_command(['ps', '-q']) used_volumes = set() if running_containers: for container_id in [c.get('ID', '') for c in running_containers]: # 获取容器挂载的Volume inspect_cmd = ['docker', 'inspect', '--format', '{{range .Mounts}}{{.Name}} {{end}}', container_id] result = subprocess.run(inspect_cmd, capture_output=True, text=True) if result.returncode == 0: volumes = result.stdout.strip().split() used_volumes.update(volumes) # 找出未被使用的Volume unused_volumes = [] for volume in all_volumes: volume_name = volume.get('Name', '') # 排除系统创建的Volume if volume_name.startswith('docker_') or volume_name.startswith('com.'): # 可以进一步检查是否正在使用 pass # 排除正在使用的Volume if volume_name in used_volumes: continue # 检查Volume的最后使用时间(需要复杂的inspect) unused_volumes.append(volume) return unused_volumes def _parse_docker_date(self, date_str): """解析Docker日期字符串""" if not date_str: return None try: # Docker日期格式示例: "2024-01-15 10:30:45 +0800 CST" # 简化处理,只提取日期部分 date_part = date_str.split(' ') return datetime.strptime(date_part, '%Y-%m-%d') except: return None def cleanup_stopped_containers(self, containers=None): """清理已停止的容器""" if containers is None: containers = self.get_stopped_containers() if not containers: logger.info("没有找到可清理的已停止容器") return 0 cleaned_count = 0 for container in containers: container_id = container.get('ID', '') container_name = container.get('Names', '') if self.dry_run: logger.info(f"[试运行] 将清理已停止容器: {container_name} ({container_id[:12]})") cleaned_count += 1 else: try: result = subprocess.run(['docker', 'rm', container_id], capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.info(f"已清理容器: {container_name} ({container_id[:12]})") cleaned_count += 1 else: logger.warning(f"清理容器失败 {container_id}: {result.stderr}") except Exception as e: logger.error(f"清理容器异常 {container_id}: {e}") return cleaned_count def cleanup_dangling_images(self, images=None): """清理悬空镜像""" if images is None: images = self.get_dangling_images() if not images: logger.info("没有找到可清理的悬空镜像") return 0 cleaned_count = 0 for image in images: image_id = image.get('ID', '') if self.dry_run: logger.info(f"[试运行] 将清理悬空镜像: {image_id}") cleaned_count += 1 else: try: result = subprocess.run(['docker', 'rmi', image_id], capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.info(f"已清理悬空镜像: {image_id[:12]}") cleaned_count += 1 else: logger.warning(f"清理悬空镜像失败 {image_id}: {result.stderr}") except Exception as e: logger.error(f"清理悬空镜像异常 {image_id}: {e}") return cleaned_count def cleanup_unused_images(self, images=None): """清理未使用的镜像""" if images is None: images = self.get_unused_images() if not images: logger.info("没有找到可清理的未使用镜像") return 0 # 按仓库分组,保留最近几个版本 images_by_repo = {} for image in images: repo = image.get('Repository', '') tag = image.get('Tag', '') created_str = image.get('CreatedAt', '') if repo not in images_by_repo: images_by_repo[repo] = [] created_date = self._parse_docker_date(created_str) images_by_repo[repo].append({ 'image': image, 'created': created_date, 'tag': tag }) # 确定要删除的镜像 images_to_remove = [] for repo, image_list in images_by_repo.items(): # 按创建时间排序(最近的在前) sorted_images = sorted(image_list, key=lambda x: x['created'] if x['created'] else datetime.min, reverse=True) # 保留最近N个版本 to_keep = sorted_images[:self.keep_last_images] to_remove = sorted_images[self.keep_last_images:] images_to_remove.extend([item['image'] for item in to_remove]) cleaned_count = 0 for image in images_to_remove: image_id = image.get('ID', '') repo_tag = f"{image.get('Repository', '')}:{image.get('Tag', '')}" if self.dry_run: logger.info(f"[试运行] 将清理未使用镜像: {repo_tag} ({image_id[:12]})") cleaned_count += 1 else: try: result = subprocess.run(['docker', 'rmi', image_id], capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.info(f"已清理未使用镜像: {repo_tag} ({image_id[:12]})") cleaned_count += 1 else: logger.warning(f"清理未使用镜像失败 {repo_tag}: {result.stderr}") except Exception as e: logger.error(f"清理未使用镜像异常 {repo_tag}: {e}") return cleaned_count def cleanup_dangling_volumes(self, volumes=None): """清理悬空Volume""" if volumes is None: volumes = self.get_dangling_volumes() if not volumes: logger.info("没有找到可清理的悬空Volume") return 0 cleaned_count = 0 for volume in volumes: volume_name = volume.get('Name', '') if self.dry_run: logger.info(f"[试运行] 将清理悬空Volume: {volume_name}") cleaned_count += 1 else: try: result = subprocess.run(['docker', 'volume', 'rm', volume_name], capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.info(f"已清理悬空Volume: {volume_name}") cleaned_count += 1 else: logger.warning(f"清理悬空Volume失败 {volume_name}: {result.stderr}") except Exception as e: logger.error(f"清理悬空Volume异常 {volume_name}: {e}") return cleaned_count def cleanup_unused_volumes(self, volumes=None): """清理未使用的Volume""" if volumes is None: volumes = self.get_unused_volumes() if not volumes: logger.info("没有找到可清理的未使用Volume") return 0 cleaned_count = 0 for volume in volumes: volume_name = volume.get('Name', '') # 检查Volume是否包含重要数据 # 这里可以添加更复杂的检查逻辑 if self.dry_run: logger.info(f"[试运行] 将清理未使用Volume: {volume_name}") cleaned_count += 1 else: try: result = subprocess.run(['docker', 'volume', 'rm', volume_name], capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.info(f"已清理未使用Volume: {volume_name}") cleaned_count += 1 else: logger.warning(f"清理未使用Volume失败 {volume_name}: {result.stderr}") except Exception as e: logger.error(f"清理未使用Volume异常 {volume_name}: {e}") return cleaned_count def cleanup_build_cache(self): """清理Docker构建缓存""" if self.dry_run: logger.info("[试运行] 将执行 docker builder prune") return 1 try: result = subprocess.run(['docker', 'builder', 'prune', '-f'], capture_output=True, text=True, timeout=300) if result.returncode == 0: # 解析清理结果 output = result.stdout if 'Total reclaimed space:' in output: space_line = [l for l in output.split('\n') if 'Total reclaimed space:' in l] logger.info(f"构建缓存清理完成: {space_line}") else: logger.info("构建缓存清理完成") return 1 else: logger.warning(f"清理构建缓存失败: {result.stderr}") return 0 except Exception as e: logger.error(f"清理构建缓存异常: {e}") return 0 def cleanup_system(self): """执行完整的系统清理(谨慎使用)""" if self.dry_run: logger.info("[试运行] 将执行 docker system prune -a") return 1 try: result = subprocess.run(['docker', 'system', 'prune', '-a', '-f'], capture_output=True, text=True, timeout=600) if result.returncode == 0: # 解析清理结果 output = result.stdout if 'Total reclaimed space:' in output: space_line = [l for l in output.split('\n') if 'Total reclaimed space:' in l] logger.info(f"系统清理完成: {space_line}") else: logger.info("系统清理完成") return 1 else: logger.warning(f"系统清理失败: {result.stderr}") return 0 except Exception as e: logger.error(f"系统清理异常: {e}") return 0 def get_disk_usage(self): """获取Docker磁盘使用情况""" try: # 使用docker system df命令 result = subprocess.run(['docker', 'system', 'df', '-v'], capture_output=True, text=True, timeout=30) if result.returncode == 0: return result.stdout else: logger.warning(f"获取磁盘使用情况失败: {result.stderr}") return None except Exception as e: logger.error(f"获取磁盘使用情况异常: {e}") return None def generate_cleanup_report(self, cleanup_stats): """生成清理报告""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') report = f"""=== Docker清理报告 ===生成时间: {timestamp}运行模式: {'试运行' if self.dry_run else '实际执行'}最小保留天数: {self.min_age_days}保留镜像版本数: {self.keep_last_images}清理统计: 已停止容器: {cleanup_stats.get('stopped_containers', 0)} 个 悬空镜像: {cleanup_stats.get('dangling_images', 0)} 个 未使用镜像: {cleanup_stats.get('unused_images', 0)} 个 悬空Volume: {cleanup_stats.get('dangling_volumes', 0)} 个 未使用Volume: {cleanup_stats.get('unused_volumes', 0)} 个 构建缓存: {'已清理' if cleanup_stats.get('build_cache', 0) > 0 else '未清理'} 系统清理: {'已执行' if cleanup_stats.get('system_cleanup', 0) > 0 else '未执行'}磁盘使用情况:""" disk_usage = self.get_disk_usage() if disk_usage: report += disk_usage else: report += " 无法获取磁盘使用情况" return report def run_cleanup_pipeline(self, aggressive=False): """运行清理流水线 Args: aggressive: 是否进行激进清理(包括系统清理) """ logger.info("开始Docker清理流水线") cleanup_stats = {} # 1. 清理已停止的容器 logger.info("步骤1: 清理已停止的容器") stopped_containers = self.get_stopped_containers() cleanup_stats['stopped_containers'] = self.cleanup_stopped_containers(stopped_containers) # 2. 清理悬空镜像 logger.info("步骤2: 清理悬空镜像") dangling_images = self.get_dangling_images() cleanup_stats['dangling_images'] = self.cleanup_dangling_images(dangling_images) # 3. 清理未使用的镜像 logger.info("步骤3: 清理未使用的镜像") unused_images = self.get_unused_images() cleanup_stats['unused_images'] = self.cleanup_unused_images(unused_images) # 4. 清理悬空Volume logger.info("步骤4: 清理悬空Volume") dangling_volumes = self.get_dangling_volumes() cleanup_stats['dangling_volumes'] = self.cleanup_dangling_volumes(dangling_volumes) # 5. 清理未使用的Volume logger.info("步骤5: 清理未使用的Volume") unused_volumes = self.get_unused_volumes() cleanup_stats['unused_volumes'] = self.cleanup_unused_volumes(unused_volumes) # 6. 清理构建缓存 logger.info("步骤6: 清理构建缓存") cleanup_stats['build_cache'] = self.cleanup_build_cache() # 7. 系统清理(可选,激进模式) if aggressive: logger.info("步骤7: 执行系统清理(激进模式)") cleanup_stats['system_cleanup'] = self.cleanup_system() # 生成报告 report = self.generate_cleanup_report(cleanup_stats) # 保存报告到文件 if not self.dry_run: report_dir = '/var/log/docker_cleanup' os.makedirs(report_dir, exist_ok=True) report_file = os.path.join(report_dir, f"cleanup_report_{datetime.now().strftime('%Y%m%d')}.log") with open(report_file, 'a') as f: f.write(report) f.write("\n" + "="*50 + "\n") logger.info("清理流水线完成") print(report) return cleanup_stats# 使用示例if __name__ == '__main__': # 创建清理管理器(试运行模式) cleaner = DockerCleanupManager( dry_run=True, # 试运行,只显示不执行 min_age_days=7, # 保留最近7天的镜像 keep_last_images=3 # 每个镜像保留最近3个版本 ) # 运行清理流水线 print("=== Docker垃圾清理试运行 ===") print("此操作为试运行,不会实际删除任何内容") print("=" * 50) stats = cleaner.run_cleanup_pipeline(aggressive=False) # 实际执行(取消注释以下代码) # print("\n" + "="*50) # print("确定要实际执行清理吗?") # response = input("输入 'yes' 确认执行: ") # if response.lower() == 'yes': # cleaner.dry_run = False # print("开始实际清理...") # stats = cleaner.run_cleanup_pipeline(aggressive=False) # print("清理完成!") # else: # print("取消清理操作")
应用场景:在AWS、阿里云、腾讯云等多云或混合云环境中,自动识别并标记长期处于“已停止”状态的云主机、未被挂载的云磁盘、空闲的负载均衡器等闲置资源,执行归档或删除操作以优化成本。
# cloud_cleaner.py# 伪代码,使用 boto3 (AWS SDK) 或 aliyun-python-sdkimport boto3def cleanup_aws_instances(days_old=30): ec2 = boto3.resource('ec2') for instance in ec2.instances.all(): if instance.state['Name'] == 'stopped': launch_time = instance.launch_time # 判断是否超过设定天数 if is_older_than(launch_time, days_old): print(f"删除闲置实例: {instance.id}") instance.terminate()

想高效学习Python?下面三本精选好书满足你的不同需求!
《流畅的Python(第2版)》——Python进阶必读!深入讲解高级特性与最佳实践,适合想精进的开发者。
《Python从新手到高手》:初学者首选,系统学习全栈技能。
《Python数据分析:从零基础入门到案例实战》——数据科学利器!手把手教你用Python处理数据,实战案例学完就能用。
三本书均支持先用后付、运费险和7天无理由退货,放心购买!点击“购买”按钮,立即开启你的Python学习之旅吧!
https://ima.qq.com/wiki/?shareId=f2628818f0874da17b71ffa0e5e8408114e7dbad46f1745bbd1cc1365277631c
