当前位置:首页>python>Python 自动化运维开发:20个脚本解决80%的重复工作

Python 自动化运维开发:20个脚本解决80%的重复工作

  • 2026-02-06 07:31:56
Python 自动化运维开发:20个脚本解决80%的重复工作

Python凭借其“优雅、明确、简单”的设计哲学,成为运维效率提升的绝佳武器。它不仅仅是连接不同系统的“胶水语言”,更是构建稳定、高效、可扩展的自动化运维体系的核心工具。本文精炼整理了20个Python脚本,覆盖从基础巡检到智能告警的完整自动化场景,解决80%的日常重复性运维工作,可以根据实际的场景适度调整落地使用。

往期阅读>>>

Python 为什么会成为AI时代的头部语言

Python 40个常用的列表推导式

Python 50个提高代码开发效率的方法

Python 自动检测服务HTTPS证书过期时间并发送预警

Python 自动化操作Redis的15个实用脚本

Python 自动化管理Jenkins的15个实用脚本,提升效率

Python copyparty搭建轻量的文件服务器的方法

Python 实现2FA认证的方法,提升安全性

Python 封装20个常用API接口,提升开发效率

App2Docker:如何无需编写Dockerfile也可以创建容器镜像

Python 集成 Nacos 配置中心的方法

Python 35个JSON数据处理方法

Python 字典与列表的20个核心技巧

Python 15个文本分析的库,提升效率

Python 15个Pandas技巧,提升数据分析效率

Python 运维中30个常用的库,提升效率

Python调用远程接口的方法

Python 提取HTML文本的方法,提升效率

Python 应用容器化方法:实现“一次部署,处处运行”

Python 自动化识别Nginx配置并导出为excel文件,提升Nginx管理效率

Python 5个常见的异步任务处理框架

Python数据科学常见的30个库

Python 50个实用代码片段,优雅高效

一、环境搭建与标准化

脚本1:环境初始化与依赖检查

应用场景:新服务器上线,或团队新人配置开发环境,需要快速构建统一、合规的Python运行环境。

功能说明:自动检查并安装指定的Python版本、必要的系统工具及所有项目依赖的第三方库。

#!/usr/bin/env python3# init_env.pyimport subprocessimport sysREQUIRED_PYTHON = (3, 8)REQUIRED_PACKAGES = ['paramiko', 'requests', 'psutil', 'schedule', 'pymysql', 'redis']def check_python():    version = sys.version_info[:2]    if version < REQUIRED_PYTHON:        print(f"错误: 需要Python {REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}+,当前 {version[0]}.{version[1]}")        sys.exit(1)    print(f"✓ Python 版本: {sys.version}")def install_packages():    for pkg in REQUIRED_PACKAGES:        try:            subprocess.run([sys.executable, '-m', 'pip', 'install', '-q', pkg], check=True)            print(f"✓ 已安装/更新: {pkg}")        except subprocess.CalledProcessError:            print(f"✗ 安装失败: {pkg}")            sys.exit(1)if __name__ == '__main__':    check_python()    install_packages()    print("环境初始化完成!")

脚本2:标准化日志配置器

应用场景:所有自动化脚本都需要统一、规范的日志记录,以实现高效的故障调试、行为审计与运营分析。

功能说明:创建一个支持日志轮转、多级别输出、标准化格式的日志配置模块,作为所有脚本的日志基础。

# logging_config.pyimport loggingfrom logging.handlers import RotatingFileHandlerdef setup_logger(name='ops_auto', log_file='/var/log/ops/automation.log', level=logging.INFO):    logger = logging.getLogger(name)    logger.setLevel(level)    # 防止重复添加handler    if logger.hasHandlers():        return logger    # 控制台输出    console = logging.StreamHandler()    console.setLevel(logging.INFO)    # 文件输出(轮转)    file_handler = RotatingFileHandler(        log_file, maxBytes=10*1024*1024, backupCount=5    )    file_handler.setLevel(logging.DEBUG)    # 格式化    formatter = logging.Formatter('%(asctime)s | %(name)s | %(levelname)-8s | %(message)s')    console.setFormatter(formatter)    file_handler.setFormatter(formatter)    logger.addHandler(console)    logger.addHandler(file_handler)    return logger

脚本3:配置文件统一加载器

应用场景:多个自动化脚本需要读取共享的配置信息,如数据库连接串、服务器列表、API密钥等。

功能说明:支持YAML/JSON等多种格式的配置文件,提供配置热重载能力,并对敏感信息提供环境变量覆盖与加密选项,提升安全性。

# config_loader.pyimport yamlimport jsonimport osfrom pathlib import Pathclass Config:    _instance = None    def __new__(cls, config_path='config.yml'):        if cls._instance is None:            cls._instance = super().__new__(cls)            cls._instance.config_path = Path(config_path)            cls._instance._load()        return cls._instance    def _load(self):        ext = self.config_path.suffix.lower()        with open(self.config_path, 'r') as f:            if ext == '.yml' or ext == '.yaml':                self._config = yaml.safe_load(f)            elif ext == '.json':                self._config = json.load(f)            else:                raise ValueError(f"不支持的配置文件格式: {ext}")        # 示例:敏感信息可从环境变量覆盖        if 'mysql' in self._config:            self._config['mysql']['password'] = os.getenv('DB_PASS', self._config['mysql'].get('password', ''))    def get(self, key, default=None):        return self._config.get(key, default)# 使用示例:# cfg = Config().get('servers')

二、批量操作与系统管理

脚本4:并发SSH命令执行器

应用场景:在数十或上百台服务器组成的集群中,同时执行标准化命令,如系统更新(apt update)、服务状态检查、信息收集等。

功能说明:基于 paramiko库,支持SSH密钥认证、可调节的并发控制、执行超时设置,并能够将各节点的执行结果进行结构化汇总与格式化输出。

完整代码示例 (batch_ssh_executor.py):

#!/usr/bin/env python3"""并发SSH命令执行器支持:密钥认证、并发控制、超时设置、结果汇总与格式化输出"""import concurrent.futuresimport paramikofrom typing import List, Dict, Tupleimport loggingfrom logging_config import setup_loggerlogger = setup_logger('ssh_executor')class SSHExecutor:    def __init__(self, timeout=30, max_workers=10):        self.timeout = timeout        self.max_workers = max_workers    def _execute_single(self, host: str, username: str, key_path: str, command: str) -> Tuple[str, str, bool]:        """在单台主机上执行命令"""        client = paramiko.SSHClient()        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())        try:            client.connect(hostname=host, username=username, key_filename=key_path, timeout=self.timeout)            stdin, stdout, stderr = client.exec_command(command, timeout=self.timeout)            exit_code = stdout.channel.recv_exit_status()            output = stdout.read().decode('utf-8', errors='ignore').strip()            error = stderr.read().decode('utf-8', errors='ignore').strip()            success = (exit_code == 0)            if not success:                logger.warning(f"主机 {host} 执行失败 (code:{exit_code}): {error}")            return output, error, success        except Exception as e:            logger.error(f"连接/执行失败 {host}: {e}")            return "", str(e), False        finally:            client.close()    def run(self, hosts: List[str], username: str, key_path: str, command: str) -> Dict[str, Dict]:        """并发执行命令,返回汇总结果"""        results = {}        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:            future_to_host = {                executor.submit(self._execute_single, host, username, key_path, command): host                 for host in hosts            }            for future in concurrent.futures.as_completed(future_to_host):                host = future_to_host[future]                try:                    output, error, success = future.result()                    results[host] = {                        'success': success,                        'output': output,                        'error': error                    }                except Exception as e:                    results[host] = {                        'success': False,                        'output': '',                        'error': f'执行异常: {e}'                    }        return results# 使用示例if __name__ == '__main__':    executor = SSHExecutor()    hosts = ['192.168.1.10', '192.168.1.11', '192.168.1.12']    results = executor.run(        hosts=hosts,        username='ops',        key_path='/home/ops/.ssh/id_rsa',        command='uptime && df -h | grep -E "/($|/data)"'    )    # 输出格式化报告    for host, data in results.items():        status = "成功" if data['success'] else "失败"        print(f"{host}: {status}")        if data['output']:            print(f"   输出: {data['output'][:200]}...")  # 截断长输出

脚本5:智能文件分发与同步工具

应用场景:将统一的应用程序配置文件、脚本、证书或静态资源,安全、高效地同步到大批量服务器,尤其适用于跨地域集群的配置管理。

功能说明:实现增量同步,通过MD5校验保障文件一致性,支持断点续传,并可灵活配置排除规则(如临时文件、版本控制目录)。

核心代码片段 (file_sync.py):

import hashlibimport osimport paramikofrom concurrent.futures import ThreadPoolExecutorfrom pathlib import Pathclass IntelligentFileSync:    def __init__(self, local_base, remote_base, exclude_patterns=None):        self.local_base = Path(local_base)        self.remote_base = remote_base        self.exclude_patterns = exclude_patterns or ['.tmp', '.git', '__pycache__']    def _calculate_md5(self, filepath: Path) -> str:        """计算文件的MD5值,用于校验"""        hash_md5 = hashlib.md5()        with open(filepath, 'rb') as f:            for chunk in iter(lambda: f.read(4096), b''):                hash_md5.update(chunk)        return hash_md5.hexdigest()    def _should_exclude(self, filepath: Path) -> bool:        """判断文件是否在排除名单内"""        for pattern in self.exclude_patterns:            if pattern in str(filepath):                return True        return False    def _sync_to_host(self, host: str, username: str, key_path: str):        """同步文件到单台主机(核心逻辑)"""        ssh = paramiko.SSHClient()        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())        try:            ssh.connect(hostname=host, username=username, key_filename=key_path)            sftp = ssh.open_sftp()            # 遍历本地目录            for local_file in self.local_base.rglob('*'):                if local_file.is_file() and not self._should_exclude(local_file):                    relative_path = local_file.relative_to(self.local_base)                    remote_file = f"{self.remote_base}/{relative_path}"                    # 检查远程文件是否存在及MD5是否一致                    try:                        remote_attr = sftp.stat(str(remote_file))                        # 可在此处添加MD5比较逻辑,决定是否覆盖                        local_md5 = self._calculate_md5(local_file)                        # 假设通过一个远程命令获取MD5(需远程支持)                        # 如果MD5不同或文件大小不同,则覆盖                        if remote_attr.st_size != local_file.stat().st_size:                            self._upload_file(sftp, local_file, remote_file)                    except FileNotFoundError:                        # 远程文件不存在,直接上传                        self._upload_file(sftp, local_file, remote_file)            sftp.close()            print(f"[{host}] 同步完成")        except Exception as e:            print(f"[{host}] 同步失败: {e}")        finally:            ssh.close()    def _upload_file(self, sftp, local_path, remote_path):        """实现文件上传,可扩展为断点续传"""        # 确保远程目录存在        remote_dir = os.path.dirname(remote_path)        try:            sftp.mkdir(remote_dir)        except:            pass        sftp.put(str(local_path), remote_path)

脚本6:系统及应用一键安装配置

应用场景:新服务器上线后,进行标准化、自动化的基础软件栈部署与配置,例如安装Nginx、配置防火墙规则、部署监控代理等。

#!/usr/bin/env python3# deploy_stack.pydef deploy_nginx(host):    commands = [        'apt update && apt install -y nginx',        'systemctl enable nginx',        'cp /tmp/nginx-site.conf /etc/nginx/sites-available/myapp',        'ln -sf /etc/nginx/sites-available/myapp /etc/nginx/sites-enabled/',        'nginx -t && systemctl reload nginx'    ]    # 调用脚本4的SSHExecutor并发执行命令列表    # ... 执行逻辑

脚本7:基于特定条件的系统巡检报告生成器

应用场景:每日或每周自动生成服务器集群的健康状态报告,涵盖CPU、内存、磁盘、关键进程等核心指标,实现趋势分析与预警。

# system_inspector.pyimport psutildef generate_inspection_report():    report_lines = []    # 检查磁盘    for part in psutil.disk_partitions():        usage = psutil.disk_usage(part.mountpoint)        if usage.percent > 85:            report_lines.append(f"警告: 磁盘 {part.mountpoint} 使用率 {usage.percent}%")    # 检查内存    mem = psutil.virtual_memory()    if mem.percent > 90:        report_lines.append(f"紧急: 内存使用率 {mem.percent}%")    # 检查关键进程    if 'nginx' not in (p.name() for p in psutil.process_iter(['name'])):        report_lines.append("关键: Nginx进程不存在")    return report_lines

脚本8:定时任务(Cron)集中管理器

应用场景:在微服务或多服务器环境中,批量管理所有服务器的Crontab配置,实现任务的统一视图、批量更新、合规检查与备份恢复,避免逐台手动修改的低效和错误。

功能说明:通过SSH远程读取、解析、更新Crontab,支持任务的增、删、改、查及备份。

应用代码片段:

# cron_manager.py# 在实际使用脚本8时,可以这样集成到运维流程中:from cron_manager import CronManagerdef deploy_new_cron_job(servers, new_job):    """在所有服务器上部署新的定时任务(例如,每天凌晨3点清理临时文件)"""    cm = CronManager()    for server in servers:        # 先备份当前crontab        cm.backup_crontab(server, backup_dir='/backup/cron/')        # 添加新任务        cm.update_cron_remotely(server, new_job, action='add')        # 验证添加成功        current = cm.update_cron_remotely(server, None, action='list')        if new_job not in current:            raise Exception(f"在 {server} 上添加Cron任务失败")# 新任务示例:每天凌晨3点清理 /tmp 下超过7天的文件new_job = '0 3 * * * find /tmp -type f -mtime +7 -delete >> /var/log/tmp_cleanup.log 2>&1'servers = ['web01', 'web02', 'api01', 'db01']deploy_new_cron_job(servers, new_job)

脚本9:用户与权限批量管理

应用场景:为新项目或新团队批量创建系统账户,并统一配置SSH密钥、Home目录及特定的Sudo权限。

# user_manager.pydef bulk_create_users(hosts, username, ssh_key):    commands = [        f'useradd -m -s /bin/bash {username}',        f'mkdir -p /home/{username}/.ssh',        f'echo "{ssh_key}" >> /home/{username}/.ssh/authorized_keys',        f'echo "{username} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart nginx" >> /etc/sudoers.d/{username}'    ]    # 调用并发SSH执行器执行命令列表    # ... 执行逻辑

三、监控、分析与告警

脚本10:一体化系统资源监控与推送

应用场景:以可配置的时间间隔,实时采集服务器的CPU、内存、磁盘IO、网络等性能指标,并推送至Prometheus、InfluxDB等监控系统或中央数据库,用于可视化与长期趋势分析。

功能说明:支持多维度指标采集、可配置的数据推送目标、异常检测与阈值告警。

#!/usr/bin/env python3# system_monitor.pyimport timeimport jsonimport psutilimport socketfrom datetime import datetimefrom threading import Threadfrom queue import Queueimport loggingfrom logging_config import setup_loggerlogger = setup_logger('system_monitor')class SystemMonitor:    def __init__(self, interval=60, push_targets=None):        """初始化系统监控器        Args:            interval: 监控采样间隔(秒)            push_targets: 数据推送目标列表,如 ['prometheus', 'influxdb']        """        self.interval = interval        self.push_targets = push_targets or []        self.running = False        self.metrics_queue = Queue()        self.hostname = socket.gethostname()    def collect_system_metrics(self):        """收集全面的系统指标"""        timestamp = datetime.now().isoformat()        # CPU指标        cpu_times = psutil.cpu_times_percent(interval=0.5)        cpu_percent = psutil.cpu_percent(interval=0.5, percpu=True)        # 内存指标        memory = psutil.virtual_memory()        swap = psutil.swap_memory()        # 磁盘指标        disk_usage = {}        disk_io = {}        for part in psutil.disk_partitions():            if part.fstype:                try:                    usage = psutil.disk_usage(part.mountpoint)                    disk_usage[part.mountpoint] = {                        'total': usage.total,                        'used': usage.used,                        'free': usage.free,                        'percent': usage.percent                    }                except Exception as e:                    logger.warning(f"无法获取磁盘使用信息 {part.mountpoint}: {e}")        # 网络指标        net_io = psutil.net_io_counters()        # 进程和负载        load_avg = psutil.getloadavg()        metrics = {            'timestamp': timestamp,            'hostname': self.hostname,            'cpu': {                'percent_total': psutil.cpu_percent(interval=0),                'percent_per_core': cpu_percent,                'user': cpu_times.user,                'system': cpu_times.system,                'idle': cpu_times.idle,                'iowait': cpu_times.iowait if hasattr(cpu_times, 'iowait') else 0            },            'memory': {                'total': memory.total,                'available': memory.available,                'used': memory.used,                'free': memory.free,                'percent': memory.percent,                'swap_total': swap.total,                'swap_used': swap.used,                'swap_free': swap.free,                'swap_percent': swap.percent            },            'disk': {                'usage': disk_usage,                'read_bytes_per_sec': disk_io.get('read_bytes', 0),                'write_bytes_per_sec': disk_io.get('write_bytes', 0)            },            'network': {                'bytes_sent': net_io.bytes_sent,                'bytes_recv': net_io.bytes_recv,                'packets_sent': net_io.packets_sent,                'packets_recv': net_io.packets_recv,                'errin': net_io.errin,                'errout': net_io.errout            },            'system': {                'load_avg_1min': load_avg[0],                'load_avg_5min': load_avg[1],                'load_avg_15min': load_avg[2],                'uptime': time.time() - psutil.boot_time(),                'process_count': len(psutil.pids())            }        }        return metrics    def check_thresholds(self, metrics):        """检查指标是否超过阈值"""        alerts = []        # CPU阈值检查        if metrics['cpu']['percent_total'] > 90:            alerts.append({                'level': 'CRITICAL',                'metric': 'cpu_percent',                'value': metrics['cpu']['percent_total'],                'threshold': 90,                'message': f"CPU使用率过高: {metrics['cpu']['percent_total']:.1f}%"            })        elif metrics['cpu']['percent_total'] > 80:            alerts.append({                'level': 'WARNING',                'metric': 'cpu_percent',                'value': metrics['cpu']['percent_total'],                'threshold': 80,                'message': f"CPU使用率较高: {metrics['cpu']['percent_total']:.1f}%"            })        # 内存阈值检查        if metrics['memory']['percent'] > 95:            alerts.append({                'level': 'CRITICAL',                'metric': 'memory_percent',                'value': metrics['memory']['percent'],                'threshold': 95,                'message': f"内存使用率过高: {metrics['memory']['percent']:.1f}%"            })        elif metrics['memory']['percent'] > 85:            alerts.append({                'level': 'WARNING',                'metric': 'memory_percent',                'value': metrics['memory']['percent'],                'threshold': 85,                'message': f"内存使用率较高: {metrics['memory']['percent']:.1f}%"            })        # 磁盘阈值检查        for mountpoint, usage in metrics['disk']['usage'].items():            if usage['percent'] > 95:                alerts.append({                    'level': 'CRITICAL',                    'metric': f'disk_usage_{mountpoint.replace("/", "_")}',                    'value': usage['percent'],                    'threshold': 95,                    'message': f"磁盘 {mountpoint} 使用率过高: {usage['percent']:.1f}%"                })            elif usage['percent'] > 85:                alerts.append({                    'level': 'WARNING',                    'metric': f'disk_usage_{mountpoint.replace("/", "_")}',                    'value': usage['percent'],                    'threshold': 85,                    'message': f"磁盘 {mountpoint} 使用率较高: {usage['percent']:.1f}%"                })        # 系统负载检查        cpu_count = len(metrics['cpu']['percent_per_core'])        if metrics['system']['load_avg_1min'] > cpu_count * 4:            alerts.append({                'level': 'CRITICAL',                'metric': 'load_average',                'value': metrics['system']['load_avg_1min'],                'threshold': cpu_count * 4,                'message': f"系统负载过高: {metrics['system']['load_avg_1min']:.2f} (CPU数: {cpu_count})"            })        return alerts    def push_to_prometheus(self, metrics):        """将指标推送到Prometheus Pushgateway"""        try:            import requests            prometheus_data = []            # 转换指标为Prometheus格式            # CPU使用率            prometheus_data.append(f'cpu_percent_total{{host="{self.hostname}"}} {metrics["cpu"]["percent_total"]}')            prometheus_data.append(f'cpu_user{{host="{self.hostname}"}} {metrics["cpu"]["user"]}')            prometheus_data.append(f'cpu_system{{host="{self.hostname}"}} {metrics["cpu"]["system"]}')            # 内存使用率            prometheus_data.append(f'memory_percent{{host="{self.hostname}"}} {metrics["memory"]["percent"]}')            prometheus_data.append(f'memory_available_bytes{{host="{self.hostname}"}} {metrics["memory"]["available"]}')            # 磁盘使用率(每个挂载点)            for mountpoint, usage in metrics['disk']['usage'].items():                safe_mountpoint = mountpoint.replace('/', '_').replace('.', '_')                prometheus_data.append(f'disk_usage_percent{{host="{self.hostname}",mountpoint="{mountpoint}"}} {usage["percent"]}')            # 发送到Pushgateway            pushgateway_url = " http://localhost:9091/metrics/job/system_monitor "            response = requests.post(                pushgateway_url,                data='\n'.join(prometheus_data),                timeout=5            )            if response.status_code == 200:                logger.debug("指标已推送至Prometheus")            else:                logger.warning(f"Prometheus推送失败: {response.status_code}")        except ImportError:            logger.warning("requests库未安装,无法推送到Prometheus")        except Exception as e:            logger.error(f"Prometheus推送出错: {e}")    def push_to_influxdb(self, metrics):        """将指标推送到InfluxDB"""        try:            from influxdb import InfluxDBClient            influx_points = []            # CPU指标            cpu_point = {                "measurement": "cpu",                "tags": {                    "host": self.hostname                },                "time": metrics['timestamp'],                "fields": {                    "usage": metrics['cpu']['percent_total'],                    "user": metrics['cpu']['user'],                    "system": metrics['cpu']['system'],                    "idle": metrics['cpu']['idle']                }            }            influx_points.append(cpu_point)            # 内存指标            memory_point = {                "measurement": "memory",                "tags": {                    "host": self.hostname                },                "time": metrics['timestamp'],                "fields": {                    "used_percent": metrics['memory']['percent'],                    "available_bytes": metrics['memory']['available'],                    "total_bytes": metrics['memory']['total']                }            }            influx_points.append(memory_point)            # 磁盘指标(每个挂载点)            for mountpoint, usage in metrics['disk']['usage'].items():                disk_point = {                    "measurement": "disk",                    "tags": {                        "host": self.hostname,                        "mountpoint": mountpoint                    },                    "time": metrics['timestamp'],                    "fields": {                        "used_percent": usage['percent'],                        "used_bytes": usage['used'],                        "total_bytes": usage['total']                    }                }                influx_points.append(disk_point)            # 连接到InfluxDB并写入数据            client = InfluxDBClient(host='localhost', port=8086, database='system_metrics')            client.write_points(influx_points)            logger.debug("指标已推送至InfluxDB")        except ImportError:            logger.warning("influxdb库未安装,无法推送到InfluxDB")        except Exception as e:            logger.error(f"InfluxDB推送出错: {e}")    def save_to_local_file(self, metrics, alerts=None):        """将指标保存到本地JSON文件(用于调试或备份)"""        try:            data = {                'metrics': metrics,                'alerts': alerts or []            }            # 每天一个文件            date_str = datetime.now().strftime('%Y%m%d')            filename = f"/var/log/system_monitor/{date_str}.json"            import os            os.makedirs(os.path.dirname(filename), exist_ok=True)            # 读取现有数据(如果文件存在)            existing_data = []            if os.path.exists(filename):                with open(filename, 'r') as f:                    try:                        existing_data = json.load(f)                    except json.JSONDecodeError:                        existing_data = []            # 添加新数据            if not isinstance(existing_data, list):                existing_data = []            existing_data.append(data)            # 写入文件            with open(filename, 'w') as f:                json.dump(existing_data, f, indent=2)        except Exception as e:            logger.error(f"保存到本地文件出错: {e}")    def monitoring_loop(self):        """监控主循环"""        logger.info(f"系统监控器启动 (间隔: {self.interval}秒)")        while self.running:            try:                # 收集指标                metrics = self.collect_system_metrics()                # 检查阈值                alerts = self.check_thresholds(metrics)                # 触发告警(如果有的话)                for alert in alerts:                    if alert['level'] == 'CRITICAL':                        logger.critical(alert['message'])                    else:                        logger.warning(alert['message'])                # 推送到配置的目标                for target in self.push_targets:                    if target == 'prometheus':                        self.push_to_prometheus(metrics)                    elif target == 'influxdb':                        self.push_to_influxdb(metrics)                    elif target == 'local_file':                        self.save_to_local_file(metrics, alerts)                # 将数据放入队列(供其他线程消费)                self.metrics_queue.put({                    'metrics': metrics,                    'alerts': alerts                })                # 等待下一个采样周期                time.sleep(self.interval)            except Exception as e:                logger.error(f"监控循环出错: {e}")                time.sleep(10)  # 出错后等待10秒再重试    def start(self):        """启动监控器"""        if not self.running:            self.running = True            self.monitor_thread = Thread(target=self.monitoring_loop, daemon=True)            self.monitor_thread.start()            logger.info("系统监控器已启动")    def stop(self):        """停止监控器"""        self.running = False        if hasattr(self, 'monitor_thread'):            self.monitor_thread.join(timeout=10)        logger.info("系统监控器已停止")    def get_latest_metrics(self):        """获取最新的监控指标"""        # 清空队列中的旧数据,只返回最新的        latest = None        while not self.metrics_queue.empty():            latest = self.metrics_queue.get()        return latest    def generate_summary_report(self, metrics):        """生成监控摘要报告"""        if not metrics:            return "暂无监控数据"        report_lines = [            f"=== 系统监控摘要 ===",            f"主机: {metrics['hostname']}",            f"时间: {metrics['timestamp']}",            "",            "CPU使用率:",            f"  总体: {metrics['cpu']['percent_total']:.1f}%",            f"  用户态: {metrics['cpu']['user']:.1f}%",            f"  内核态: {metrics['cpu']['system']:.1f}%",            f"  空闲: {metrics['cpu']['idle']:.1f}%",            "",            "内存状态:",            f"  使用率: {metrics['memory']['percent']:.1f}%",            f"  可用内存: {metrics['memory']['available']/ (1024**3):.1f} GB",            "",            "磁盘使用:"        ]        for mountpoint, usage in metrics['disk']['usage'].items():            report_lines.append(f"  {mountpoint}: {usage['percent']:.1f}% ({usage['used']/(1024**3):.1f}/{usage['total']/(1024**3):.1f} GB)")        report_lines.extend([            "",            "网络状态:",            f"  发送: {metrics['network']['bytes_sent']/(1024**2):.1f} MB",            f"  接收: {metrics['network']['bytes_recv']/(1024**2):.1f} MB",            "",            "系统负载:",            f"  1分钟: {metrics['system']['load_avg_1min']:.2f}",            f"  5分钟: {metrics['system']['load_avg_5min']:.2f}",            f"  15分钟: {metrics['system']['load_avg_15min']:.2f}",            f"  运行时间: {metrics['system']['uptime']/3600:.1f} 小时",            f"  进程数: {metrics['system']['process_count']}"        ])        return '\n'.join(report_lines)# 使用示例if __name__ == '__main__':    # 创建监控实例,配置推送目标    monitor = SystemMonitor(        interval=30,  # 每30秒采样一次        push_targets=['prometheus', 'local_file']  # 推送到Prometheus并保存本地文件    )    try:        # 启动监控        monitor.start()        print("系统监控正在运行...")        print("按 Ctrl+C 停止监控")        # 主循环中展示摘要信息        while True:            time.sleep(60)            latest = monitor.get_latest_metrics()            if latest and 'metrics' in latest:                report = monitor.generate_summary_report(latest['metrics'])                print(report)                # 如果有告警,特别显示                if latest.get('alerts'):                    print("当前告警:")                    for alert in latest['alerts']:                        print(f"  {alert['level']}: {alert['message']}")                print("-" * 50)    except KeyboardInterrupt:        print("\n正在停止监控...")        monitor.stop()        print("监控已停止")

脚本11:日志实时分析与智能告警

应用场景:实时跟踪Nginx访问日志、应用错误日志等,自动识别错误率激增、异常访问模式(如爬虫、攻击)、安全威胁(如SQL注入特征)等,触发实时告警。

功能说明:支持实时尾随日志文件、正则匹配关键模式、基于滑动窗口的阈值统计,并可集成邮件、即时通讯工具等进行告警通知。

核心代码框架 (log_analyzer.py):

#!/usr/bin/env python3"""日志实时分析与智能告警支持:实时尾随日志文件、正则匹配关键错误、滑动窗口统计、阈值告警"""import reimport timefrom collections import deque, defaultdictimport subprocessfrom datetime import datetime# 假设使用上面定义的 logging_config 和邮件发送库from logging_config import setup_loggerimport smtplibfrom email.mime.text import MIMETextlogger = setup_logger('log_analyzer')class LogAnalyzer:    def __init__(self, log_path, alert_rules):        self.log_path = log_path        self.alert_rules = alert_rules  # 示例: [{'pattern': 'ERROR', 'threshold': 10, 'window': 60}]        self.counters = defaultdict(lambda: deque(maxlen=300))  # 滑动窗口计数器    def tail_log(self):        """模拟 tail -f 行为,持续读取日志新增行"""        process = subprocess.Popen(['tail', '-F', '-n', '0', self.log_path],                                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)        while True:            line = process.stdout.readline()            if line:                yield line.decode('utf-8', errors='ignore').strip()            else:                time.sleep(0.1)    def analyze_line(self, line: str):        """分析单行日志,更新计数器并检查告警"""        timestamp = datetime.now()        for rule in self.alert_rules:            pattern = re.compile(rule['pattern'])            if pattern.search(line):                key = rule['pattern']                self.counters[key].append(timestamp)                # 检查时间窗口内的数量是否超过阈值                window_sec = rule['window']                threshold = rule['threshold']                recent_count = sum(1 for ts in self.counters[key]                                    if (timestamp - ts).total_seconds() <= window_sec)                if recent_count >= threshold:                    self.trigger_alert(rule, line, recent_count)                    # 清空该计数器,避免重复告警                    self.counters[key].clear()    def trigger_alert(self, rule, sample_line, count):        """触发告警,可发送邮件、短信或调用Webhook"""        subject = f"[日志告警] 模式 '{rule['pattern']}' 在{rule['window']}秒内出现{count}次"        body = f"""告警规则: {rule}示例日志: {sample_line}触发时间: {datetime.now()}计数: {count}        """        logger.critical(subject)        self.send_email(subject, body)        # 可扩展:调用 Slack Webhook、企业微信机器人等    def send_email(self, subject, body):        """发送告警邮件(示例)"""        # 实际使用时应配置SMTP服务器、收件人列表等        msg = MIMEText(body)        msg['Subject'] = subject        msg['From'] = 'log_alert@yourcompany.com'        msg['To'] = 'ops-team@yourcompany.com'        try:            with smtplib.SMTP('localhost') as server:                server.send_message(msg)        except Exception as e:            logger.error(f"发送邮件失败: {e}")    def run(self):        """主循环"""        for line in self.tail_log():            self.analyze_line(line)if __name__ == '__main__':    # 示例规则:5分钟内出现10次“ERROR”或“5xx”状态码则告警    rules = [        {'pattern': r'\bERROR\b', 'threshold': 10, 'window': 300},        {'pattern': r'\s5\d\d\s', 'threshold': 20, 'window': 300},        {'pattern': r'(SQL注入|XSS|csrf)', 'threshold': 1, 'window': 60},  # 安全规则    ]    analyzer = LogAnalyzer('/var/log/nginx/access.log', rules)    analyzer.run()

脚本12:网络连通性与端口扫描监控

应用场景:定时对内网关键服务(如数据库、缓存、消息队列)或对外服务的端口(如80, 443, 22)进行连通性检查,快速发现网络隔离或服务宕机。

应用场景:定时对内网关键服务(如数据库、缓存、消息队列)或对外服务的端口(如80, 443, 22)进行连通性检查,快速发现网络隔离或服务宕机。

# port_checker.pyimport socketdef check_port(host, port, timeout=3):    try:        with socket.create_connection((host, port), timeout=timeout):            return True    except:        return False# 可搭配 schedule 库实现定时扫描列表

脚本13:SSL/TLS证书过期监控

应用场景:自动检查所有对外服务域名及内部自签证书的有效期,提前设定天数(如30天)发出更新告警,避免因证书过期导致的服务不可用。

功能说明:支持批量检查域名、IP及非标准端口,提供详细的证书信息(签发者、有效期、剩余天数),并生成清晰的可读报告。

完整代码示例 (ssl_cert_checker.py):

#!/usr/bin/env python3"""SSL/TLS证书过期监控支持:批量检查域名/IP、提前N天告警、多种通知方式"""import sslimport socketfrom datetime import datetime, timedeltaimport concurrent.futuresfrom typing import List, Dictimport loggingfrom logging_config import setup_loggerlogger = setup_logger('ssl_checker')class SSLCertChecker:    def __init__(self, alert_days_before=30):        self.alert_days_before = alert_days_before    def get_cert_info(self, hostname: str, port: int = 443) -> Dict:        """获取指定主机的SSL证书信息"""        context = ssl.create_default_context()        with socket.create_connection((hostname, port), timeout=10) as sock:            with context.wrap_socket(sock, server_hostname=hostname) as ssock:                cert = ssock.getpeercert()                # 解析证书有效期                not_after_str = cert['notAfter']                not_after = datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z')                not_before_str = cert['notBefore']                not_before = datetime.strptime(not_before_str, '%b %d %H:%M:%S %Y %Z')                issuer = dict(x[0] for x in cert['issuer'])                subject = dict(x[0] for x in cert['subject'])                return {                    'hostname': hostname,                    'valid_from': not_before,                    'valid_to': not_after,                    'issuer': issuer.get('organizationName', 'Unknown'),                    'common_name': subject.get('commonName', 'Unknown'),                    'days_left': (not_after - datetime.now()).days                }    def check_domains(self, domains: List[str]) -> List[Dict]:        """批量检查域名,返回检查结果"""        results = []        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:            future_to_domain = {executor.submit(self.get_cert_info, domain): domain for domain in domains}            for future in concurrent.futures.as_completed(future_to_domain):                domain = future_to_domain[future]                try:                    cert_info = future.result(timeout=15)                    results.append(cert_info)                except (ssl.SSLError, socket.timeout, ConnectionRefusedError) as e:                    results.append({                        'hostname': domain,                        'error': str(e),                        'days_left': None                    })                except Exception as e:                    logger.error(f"检查 {domain} 时发生未知错误: {e}")                    results.append({                        'hostname': domain,                        'error': f'Unknown error: {e}',                        'days_left': None                    })        return results    def generate_report(self, results: List[Dict]) -> str:        """生成可读的报告"""        report_lines = ["SSL/TLS证书检查报告", "="*40, f"检查时间: {datetime.now()}\n"]        for r in results:            if 'error' in r:                report_lines.append(f"{r['hostname']}: 检查失败 - {r['error']}")            else:                status = "true"                if r['days_left'] < 0:                    status = "已过期"                elif r['days_left'] < self.alert_days_before:                    status = "即将过期"                report_lines.append(                    f"{status} {r['hostname']} (CN: {r['common_name']}): "                    f"剩余 {r['days_left']} 天, 签发者: {r['issuer']}, "                    f"有效期: {r['valid_from'].date()} 至 {r['valid_to'].date()}"                )        return "\n".join(report_lines)    def run_and_alert(self, domains: List[str]):        """执行检查并触发告警"""        results = self.check_domains(domains)        report = self.generate_report(results)        logger.info(report)        # 检查是否有即将过期的证书        urgent = [r for r in results if 'days_left' in r and 0 <= r['days_left'] < self.alert_days_before]        if urgent:            alert_msg = f"发现 {len(urgent)} 个证书将在 {self.alert_days_before} 天内过期:\n"            for u in urgent:                alert_msg += f"- {u['hostname']} 剩余 {u['days_left']} 天\n"            logger.warning(alert_msg)            # 这里可以调用发送邮件或钉钉/企微机器人            # self.send_alert(alert_msg)if __name__ == '__main__':    # 示例域名列表,可从配置文件或数据库读取    domains_to_check = [        ' www.example.com',        'api.example.com',        'blog.example.com',        'internal-tool.yourcompany.com:8443',  # 支持非443端口    ]    checker = SSLCertChecker(alert_days_before=30)    checker.run_and_alert(domains_to_check)

脚本14:进程存活与资源占用监控

应用场景:确保如Java应用、消息队列消费者等关键业务进程持续存活,并监控其是否存在内存泄漏、CPU异常飙高等资源问题。

# process_watcher.pyimport psutildef monitor_process(process_name, max_memory_mb=1024):    for proc in psutil.process_iter(['pid', 'name', 'memory_info']):        if proc.info['name'] and process_name in proc.info['name']:            mem_mb = proc.info['memory_info'].rss / 1024 / 1024            if mem_mb > max_memory_mb:                # 发送告警,或尝试重启                proc.terminate()            return True    # 进程不存在,触发告警    return False

脚本15:自动化汇总报告生成器(日报/周报)

应用场景:将脚本10-14的监控结果进行聚合、分析,自动生成格式美观、内容全面的HTML或Markdown格式的每日/每周健康报告,并通过邮件或即时通讯工具自动发送给相关团队。

# daily_reporter.pydef generate_html_report(metrics, logs, alerts):    html = "<h1>系统日报</h1>"    html += f"<p>检查时间: {datetime.now()}</p>"    html += f"<h2>资源概览</h2><p>CPU平均负载: {metrics['cpu_avg']}...</p>"    # ... 整合各项数据    return html

四、数据与备份管理

脚本16:MySQL数据库智能备份与恢复验证

应用场景:制定定时(如每日凌晨)的全量备份与更频繁的增量备份策略,自动清理过期备份文件,并定期对备份文件进行恢复测试,确保备份的有效性。

#!/usr/bin/env python3"""MySQL基础备份 - 简化版功能:执行全量备份,压缩存盘"""import subprocessimport osfrom datetime import datetimedef mysql_backup(host='localhost', user='root', password='',                  backup_dir='/var/backup/mysql'):    """执行MySQL全量备份"""    # 创建备份目录    os.makedirs(backup_dir, exist_ok=True)    # 生成备份文件名    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')    backup_file = f"{backup_dir}/backup_{timestamp}.sql.gz"    # 构建mysqldump命令    cmd = f"mysqldump --host={host} --user={user}"    if password:        cmd += f" --password={password}"    cmd += f" --all-databases --single-transaction"    cmd += f" | gzip > {backup_file}"    # 执行备份    print(f"开始备份: {backup_file}")    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)    if result.returncode == 0:        size = os.path.getsize(backup_file) / 1024 / 1024        print(f"备份成功!大小: {size:.2f} MB")        return backup_file    else:        print(f"备份失败: {result.stderr}")        return Nonedef cleanup_old_backups(backup_dir, keep_days=7):    """清理旧备份(保留最近7天)"""    from datetime import datetime, timedelta    cutoff_date = datetime.now() - timedelta(days=keep_days)    deleted = 0    for filename in os.listdir(backup_dir):        if filename.startswith('backup_') and filename.endswith('.sql.gz'):            filepath = os.path.join(backup_dir, filename)            # 从文件名提取日期            try:                date_str = filename[7:15]  # 提取20250113                file_date = datetime.strptime(date_str, '%Y%m%d')                if file_date < cutoff_date:                    os.remove(filepath)                    deleted += 1            except:                continue    if deleted:        print(f"清理了 {deleted} 个过期备份")if __name__ == '__main__':    # 执行备份    backup = mysql_backup(        host='localhost',        user='root',        password='your_password',  # 建议使用环境变量        backup_dir='/var/backup/mysql'    )    # 清理过期备份    if backup:        cleanup_old_backups('/var/backup/mysql', keep_days=7)

脚本17:MySQL慢查询日志分析与优化建议

应用场景:定期(如每周)分析MySQL慢查询日志,自动找出最耗时的TOP N条SQL语句,分析其执行计划,并给出可能的索引优化建议,辅助性能调优。

#!/usr/bin/env python3"""MySQL慢查询分析 - 简化版功能:找出最慢的SQL,提供简单优化建议"""import pymysqldef analyze_slow_queries(host='localhost', user='root', password='',                          top_n=10):    """分析MySQL慢查询"""    # 连接数据库    connection = pymysql.connect(        host=host,        user=user,        password=password,        database='mysql'    )    try:        with connection.cursor() as cursor:            # 1. 查看慢查询统计            cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'")            slow_count = cursor.fetchone()            print(f"总慢查询数: {slow_count}")            # 2. 获取慢查询日志(如果启用)            cursor.execute("SHOW VARIABLES LIKE 'slow_query_log'")            slow_log_enabled = cursor.fetchone()            if slow_log_enabled == 'ON':                # 查看最慢的查询(从slow_log表,MySQL 5.1+)                cursor.execute("""                    SELECT                         start_time,                        query_time,                        lock_time,                        rows_sent,                        rows_examined,                        SUBSTRING(sql_text, 1, 200) as sql_snippet                    FROM mysql.slow_log                    ORDER BY query_time DESC                    LIMIT %s                """, (top_n,))                results = cursor.fetchall()                print(f"TOP {top_n} 最慢查询:")                for i, row in enumerate(results, 1):                    start_time, query_time, lock_time, rows_sent, rows_examined, sql = row                    print(f"\n{i}. 查询时间: {query_time:.3f}秒")                    print(f"   扫描行数: {rows_examined:,},返回行数: {rows_sent:,}")                    print(f"   效率比: {rows_sent/rows_examined*100:.1f}%" if rows_examined > 0 else "   效率比: N/A")                    print(f"   锁时间: {lock_time:.3f}秒")                    print(f"   SQL片段: {sql}...")                    # 简单优化建议                    if rows_examined > 10000 and rows_examined > rows_sent * 10:                        print("建议: 检查是否需要索引,或优化WHERE条件")                    if lock_time > 0.1:                        print("建议: 长时间锁表,考虑事务优化")                    if query_time > 5:                        print("建议: 查询超时,可能需要分表或优化架构")            else:                print("慢查询日志未开启,请先配置:")                print("  SET GLOBAL slow_query_log = 'ON';")                print("  SET GLOBAL long_query_time = 2;")                print("  SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';")            # 3. 检查未使用索引的查询            print(f"索引使用统计:")            cursor.execute("SHOW GLOBAL STATUS LIKE 'Handler_read%'")            for name, value in cursor.fetchall():                print(f"  {name}: {value}")    finally:        connection.close()def basic_optimization_suggestions():    """提供基础优化建议"""    suggestions = [        "1. 为频繁查询的WHERE条件列添加索引",        "2. 避免 SELECT *,只选择需要的列",        "3. 大表考虑分区(按时间或范围)",        "4. 定期执行 ANALYZE TABLE 更新统计信息",        "5. 检查并优化JOIN查询,确保有索引"    ]    print("基础优化建议:")    for suggestion in suggestions:        print(f"  {suggestion}")if __name__ == '__main__':    # 分析慢查询    analyze_slow_queries(        host='localhost',        user='root',        password='your_password',  # 建议使用环境变量        top_n=5    )    # 提供优化建议    basic_optimization_suggestions()

脚本18:Redis数据备份与内存分析

应用场景:定时备份Redis的RDB或AOF文件到安全位置,同时分析实例中的大Key(可能引起阻塞)、热Key及过期键模式,为容量规划与性能优化提供依据。

# redis_manager.pyimport redisdef backup_and_analyze(conn_params):    r = redis.Redis(**conn_params)    # 执行 BGSAVE    r.bgsave()    # 分析大Key(采样)    for key in r.scan_iter(count=100):        size = r.memory_usage(key)        if size > 1024 * 1024: # 大于1MB            print(f"大Key: {key}, 大小: {size/1024/1024:.2f} MB")

脚本19:Docker容器与镜像垃圾清理

应用场景:在CI/CD或开发测试环境中,自动清理已停止的容器、未被任何容器引用的悬空镜像(dangling images)、以及未被使用的数据卷(volume),有效释放磁盘空间。

#!/usr/bin/env python3"""Docker容器与镜像垃圾清理功能:自动清理停止的容器、无用镜像、废弃Volume,释放磁盘空间"""import subprocessimport jsonimport osfrom datetime import datetime, timedeltaimport refrom typing import List, Dict, Tupleimport loggingfrom logging_config import setup_loggerimport shutillogger = setup_logger('docker_cleanup')class DockerCleanupManager:    def __init__(self, dry_run=False, min_age_days=7, keep_last_images=5):        """初始化Docker清理管理器        Args:            dry_run: 试运行模式,只显示将要清理的内容,不实际执行            min_age_days: 最小保留天数,早于此时间的镜像/容器才考虑清理            keep_last_images: 对于每个镜像标签,保留最近几个版本        """        self.dry_run = dry_run        self.min_age_days = min_age_days        self.keep_last_images = keep_last_images        self.cutoff_date = datetime.now() - timedelta(days=min_age_days)        # 验证docker是否可用        if not self._check_docker_available():            raise RuntimeError("Docker不可用,请确保docker服务正在运行")    def _check_docker_available(self):        """检查Docker是否可用"""        try:            result = subprocess.run(['docker', '--version'],                                   capture_output=True, text=True)            return result.returncode == 0        except FileNotFoundError:            return False    def _run_docker_command(self, command: List[str], timeout=30) -> Dict:        """执行Docker命令并返回JSON格式结果"""        try:            cmd = ['docker'] + command + ['--format', '{{json .}}']            result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)            if result.returncode != 0:                logger.error(f"Docker命令失败: {' '.join(cmd)}")                logger.error(f"错误输出: {result.stderr}")                return None            # 解析JSON输出(每行一个JSON对象)            items = []            for line in result.stdout.strip().split('\n'):                if line:                    try:                        items.append(json.loads(line))                    except json.JSONDecodeError:                        logger.warning(f"无法解析JSON: {line}")            return items        except subprocess.TimeoutExpired:            logger.error(f"Docker命令超时: {' '.join(command)}")            return None        except Exception as e:            logger.error(f"执行Docker命令异常: {e}")            return None    def get_stopped_containers(self):        """获取所有已停止的容器"""        containers = self._run_docker_command(['ps', '-a', '-f', 'status=exited'])        stopped_containers = []        if containers:            for container in containers:                # 排除某些不应自动清理的容器(如数据备份容器)                # 可以根据容器名称或标签进行过滤                name = container.get('Names', '')                labels = container.get('Labels', '')                # 排除保护标签的容器                if 'protect=true' in labels:                    continue                # 排除特定名称模式的容器                if any(pattern in name for pattern in ['backup', 'restore', 'critical']):                    continue                stopped_containers.append(container)        return stopped_containers    def get_dangling_images(self):        """获取悬空镜像(未被任何容器引用的中间层镜像)"""        images = self._run_docker_command(['images', '-f', 'dangling=true'])        return images if images else []    def get_unused_images(self):        """获取未使用的镜像(长时间未被使用的镜像)"""        all_images = self._run_docker_command(['images', '--no-trunc'])        if not all_images:            return []        # 获取所有正在运行的容器使用的镜像        running_containers = self._run_docker_command(['ps', '-q'])        used_image_ids = set()        if running_containers:            for container_id in [c.get('ID', '') for c in running_containers]:                # 获取容器使用的镜像ID                inspect_cmd = ['docker', 'inspect', '--format', '{{.Image}}', container_id]                result = subprocess.run(inspect_cmd, capture_output=True, text=True)                if result.returncode == 0:                    used_image_ids.add(result.stdout.strip())        # 找出未被使用的镜像        unused_images = []        for image in all_images:            image_id = image.get('ID', '')            # 排除基础镜像(如alpine、ubuntu等)            repository = image.get('Repository', '')            if repository in ['<none>', '']:                continue            # 排除正在使用的镜像            if image_id in used_image_ids:                continue            # 分析镜像标签,确定是否应该保留            tag = image.get('Tag', '')            created_str = image.get('CreatedAt', '')            # 尝试解析创建时间            created_date = self._parse_docker_date(created_str)            if created_date and created_date < self.cutoff_date:                unused_images.append(image)        return unused_images    def get_dangling_volumes(self):        """获取悬空Volume(未被任何容器引用的数据卷)"""        volumes = self._run_docker_command(['volume', 'ls', '-f', 'dangling=true'])        return volumes if volumes else []    def get_unused_volumes(self):        """获取未使用的Volume(长时间未挂载的数据卷)"""        all_volumes = self._run_docker_command(['volume', 'ls'])        if not all_volumes:            return []        # 获取所有正在使用的Volume        running_containers = self._run_docker_command(['ps', '-q'])        used_volumes = set()        if running_containers:            for container_id in [c.get('ID', '') for c in running_containers]:                # 获取容器挂载的Volume                inspect_cmd = ['docker', 'inspect', '--format', '{{range .Mounts}}{{.Name}} {{end}}', container_id]                result = subprocess.run(inspect_cmd, capture_output=True, text=True)                if result.returncode == 0:                    volumes = result.stdout.strip().split()                    used_volumes.update(volumes)        # 找出未被使用的Volume        unused_volumes = []        for volume in all_volumes:            volume_name = volume.get('Name', '')            # 排除系统创建的Volume            if volume_name.startswith('docker_') or volume_name.startswith('com.'):                # 可以进一步检查是否正在使用                pass            # 排除正在使用的Volume            if volume_name in used_volumes:                continue            # 检查Volume的最后使用时间(需要复杂的inspect)            unused_volumes.append(volume)        return unused_volumes    def _parse_docker_date(self, date_str):        """解析Docker日期字符串"""        if not date_str:            return None        try:            # Docker日期格式示例: "2024-01-15 10:30:45 +0800 CST"            # 简化处理,只提取日期部分            date_part = date_str.split(' ')            return datetime.strptime(date_part, '%Y-%m-%d')        except:            return None    def cleanup_stopped_containers(self, containers=None):        """清理已停止的容器"""        if containers is None:            containers = self.get_stopped_containers()        if not containers:            logger.info("没有找到可清理的已停止容器")            return 0        cleaned_count = 0        for container in containers:            container_id = container.get('ID', '')            container_name = container.get('Names', '')            if self.dry_run:                logger.info(f"[试运行] 将清理已停止容器: {container_name} ({container_id[:12]})")                cleaned_count += 1            else:                try:                    result = subprocess.run(['docker', 'rm', container_id],                                           capture_output=True, text=True, timeout=30)                    if result.returncode == 0:                        logger.info(f"已清理容器: {container_name} ({container_id[:12]})")                        cleaned_count += 1                    else:                        logger.warning(f"清理容器失败 {container_id}: {result.stderr}")                except Exception as e:                    logger.error(f"清理容器异常 {container_id}: {e}")        return cleaned_count    def cleanup_dangling_images(self, images=None):        """清理悬空镜像"""        if images is None:            images = self.get_dangling_images()        if not images:            logger.info("没有找到可清理的悬空镜像")            return 0        cleaned_count = 0        for image in images:            image_id = image.get('ID', '')            if self.dry_run:                logger.info(f"[试运行] 将清理悬空镜像: {image_id}")                cleaned_count += 1            else:                try:                    result = subprocess.run(['docker', 'rmi', image_id],                                           capture_output=True, text=True, timeout=30)                    if result.returncode == 0:                        logger.info(f"已清理悬空镜像: {image_id[:12]}")                        cleaned_count += 1                    else:                        logger.warning(f"清理悬空镜像失败 {image_id}: {result.stderr}")                except Exception as e:                    logger.error(f"清理悬空镜像异常 {image_id}: {e}")        return cleaned_count    def cleanup_unused_images(self, images=None):        """清理未使用的镜像"""        if images is None:            images = self.get_unused_images()        if not images:            logger.info("没有找到可清理的未使用镜像")            return 0        # 按仓库分组,保留最近几个版本        images_by_repo = {}        for image in images:            repo = image.get('Repository', '')            tag = image.get('Tag', '')            created_str = image.get('CreatedAt', '')            if repo not in images_by_repo:                images_by_repo[repo] = []            created_date = self._parse_docker_date(created_str)            images_by_repo[repo].append({                'image': image,                'created': created_date,                'tag': tag            })        # 确定要删除的镜像        images_to_remove = []        for repo, image_list in images_by_repo.items():            # 按创建时间排序(最近的在前)            sorted_images = sorted(image_list,                                  key=lambda x: x['created'] if x['created'] else datetime.min,                                  reverse=True)            # 保留最近N个版本            to_keep = sorted_images[:self.keep_last_images]            to_remove = sorted_images[self.keep_last_images:]            images_to_remove.extend([item['image'] for item in to_remove])        cleaned_count = 0        for image in images_to_remove:            image_id = image.get('ID', '')            repo_tag = f"{image.get('Repository', '')}:{image.get('Tag', '')}"            if self.dry_run:                logger.info(f"[试运行] 将清理未使用镜像: {repo_tag} ({image_id[:12]})")                cleaned_count += 1            else:                try:                    result = subprocess.run(['docker', 'rmi', image_id],                                           capture_output=True, text=True, timeout=30)                    if result.returncode == 0:                        logger.info(f"已清理未使用镜像: {repo_tag} ({image_id[:12]})")                        cleaned_count += 1                    else:                        logger.warning(f"清理未使用镜像失败 {repo_tag}: {result.stderr}")                except Exception as e:                    logger.error(f"清理未使用镜像异常 {repo_tag}: {e}")        return cleaned_count    def cleanup_dangling_volumes(self, volumes=None):        """清理悬空Volume"""        if volumes is None:            volumes = self.get_dangling_volumes()        if not volumes:            logger.info("没有找到可清理的悬空Volume")            return 0        cleaned_count = 0        for volume in volumes:            volume_name = volume.get('Name', '')            if self.dry_run:                logger.info(f"[试运行] 将清理悬空Volume: {volume_name}")                cleaned_count += 1            else:                try:                    result = subprocess.run(['docker', 'volume', 'rm', volume_name],                                           capture_output=True, text=True, timeout=30)                    if result.returncode == 0:                        logger.info(f"已清理悬空Volume: {volume_name}")                        cleaned_count += 1                    else:                        logger.warning(f"清理悬空Volume失败 {volume_name}: {result.stderr}")                except Exception as e:                    logger.error(f"清理悬空Volume异常 {volume_name}: {e}")        return cleaned_count    def cleanup_unused_volumes(self, volumes=None):        """清理未使用的Volume"""        if volumes is None:            volumes = self.get_unused_volumes()        if not volumes:            logger.info("没有找到可清理的未使用Volume")            return 0        cleaned_count = 0        for volume in volumes:            volume_name = volume.get('Name', '')            # 检查Volume是否包含重要数据            # 这里可以添加更复杂的检查逻辑            if self.dry_run:                logger.info(f"[试运行] 将清理未使用Volume: {volume_name}")                cleaned_count += 1            else:                try:                    result = subprocess.run(['docker', 'volume', 'rm', volume_name],                                           capture_output=True, text=True, timeout=30)                    if result.returncode == 0:                        logger.info(f"已清理未使用Volume: {volume_name}")                        cleaned_count += 1                    else:                        logger.warning(f"清理未使用Volume失败 {volume_name}: {result.stderr}")                except Exception as e:                    logger.error(f"清理未使用Volume异常 {volume_name}: {e}")        return cleaned_count    def cleanup_build_cache(self):        """清理Docker构建缓存"""        if self.dry_run:            logger.info("[试运行] 将执行 docker builder prune")            return 1        try:            result = subprocess.run(['docker', 'builder', 'prune', '-f'],                                   capture_output=True, text=True, timeout=300)            if result.returncode == 0:                # 解析清理结果                output = result.stdout                if 'Total reclaimed space:' in output:                    space_line = [l for l in output.split('\n') if 'Total reclaimed space:' in l]                    logger.info(f"构建缓存清理完成: {space_line}")                else:                    logger.info("构建缓存清理完成")                return 1            else:                logger.warning(f"清理构建缓存失败: {result.stderr}")                return 0        except Exception as e:            logger.error(f"清理构建缓存异常: {e}")            return 0    def cleanup_system(self):        """执行完整的系统清理(谨慎使用)"""        if self.dry_run:            logger.info("[试运行] 将执行 docker system prune -a")            return 1        try:            result = subprocess.run(['docker', 'system', 'prune', '-a', '-f'],                                   capture_output=True, text=True, timeout=600)            if result.returncode == 0:                # 解析清理结果                output = result.stdout                if 'Total reclaimed space:' in output:                    space_line = [l for l in output.split('\n') if 'Total reclaimed space:' in l]                    logger.info(f"系统清理完成: {space_line}")                else:                    logger.info("系统清理完成")                return 1            else:                logger.warning(f"系统清理失败: {result.stderr}")                return 0        except Exception as e:            logger.error(f"系统清理异常: {e}")            return 0    def get_disk_usage(self):        """获取Docker磁盘使用情况"""        try:            # 使用docker system df命令            result = subprocess.run(['docker', 'system', 'df', '-v'],                                   capture_output=True, text=True, timeout=30)            if result.returncode == 0:                return result.stdout            else:                logger.warning(f"获取磁盘使用情况失败: {result.stderr}")                return None        except Exception as e:            logger.error(f"获取磁盘使用情况异常: {e}")            return None    def generate_cleanup_report(self, cleanup_stats):        """生成清理报告"""        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')        report = f"""=== Docker清理报告 ===生成时间: {timestamp}运行模式: {'试运行' if self.dry_run else '实际执行'}最小保留天数: {self.min_age_days}保留镜像版本数: {self.keep_last_images}清理统计:  已停止容器: {cleanup_stats.get('stopped_containers', 0)} 个  悬空镜像: {cleanup_stats.get('dangling_images', 0)} 个  未使用镜像: {cleanup_stats.get('unused_images', 0)} 个  悬空Volume: {cleanup_stats.get('dangling_volumes', 0)} 个  未使用Volume: {cleanup_stats.get('unused_volumes', 0)} 个  构建缓存: {'已清理' if cleanup_stats.get('build_cache', 0) > 0 else '未清理'}  系统清理: {'已执行' if cleanup_stats.get('system_cleanup', 0) > 0 else '未执行'}磁盘使用情况:"""        disk_usage = self.get_disk_usage()        if disk_usage:            report += disk_usage        else:            report += "  无法获取磁盘使用情况"        return report    def run_cleanup_pipeline(self, aggressive=False):        """运行清理流水线        Args:            aggressive: 是否进行激进清理(包括系统清理)        """        logger.info("开始Docker清理流水线")        cleanup_stats = {}        # 1. 清理已停止的容器        logger.info("步骤1: 清理已停止的容器")        stopped_containers = self.get_stopped_containers()        cleanup_stats['stopped_containers'] = self.cleanup_stopped_containers(stopped_containers)        # 2. 清理悬空镜像        logger.info("步骤2: 清理悬空镜像")        dangling_images = self.get_dangling_images()        cleanup_stats['dangling_images'] = self.cleanup_dangling_images(dangling_images)        # 3. 清理未使用的镜像        logger.info("步骤3: 清理未使用的镜像")        unused_images = self.get_unused_images()        cleanup_stats['unused_images'] = self.cleanup_unused_images(unused_images)        # 4. 清理悬空Volume        logger.info("步骤4: 清理悬空Volume")        dangling_volumes = self.get_dangling_volumes()        cleanup_stats['dangling_volumes'] = self.cleanup_dangling_volumes(dangling_volumes)        # 5. 清理未使用的Volume        logger.info("步骤5: 清理未使用的Volume")        unused_volumes = self.get_unused_volumes()        cleanup_stats['unused_volumes'] = self.cleanup_unused_volumes(unused_volumes)        # 6. 清理构建缓存        logger.info("步骤6: 清理构建缓存")        cleanup_stats['build_cache'] = self.cleanup_build_cache()        # 7. 系统清理(可选,激进模式)        if aggressive:            logger.info("步骤7: 执行系统清理(激进模式)")            cleanup_stats['system_cleanup'] = self.cleanup_system()        # 生成报告        report = self.generate_cleanup_report(cleanup_stats)        # 保存报告到文件        if not self.dry_run:            report_dir = '/var/log/docker_cleanup'            os.makedirs(report_dir, exist_ok=True)            report_file = os.path.join(report_dir, f"cleanup_report_{datetime.now().strftime('%Y%m%d')}.log")            with open(report_file, 'a') as f:                f.write(report)                f.write("\n" + "="*50 + "\n")        logger.info("清理流水线完成")        print(report)        return cleanup_stats# 使用示例if __name__ == '__main__':    # 创建清理管理器(试运行模式)    cleaner = DockerCleanupManager(        dry_run=True,        # 试运行,只显示不执行        min_age_days=7,      # 保留最近7天的镜像        keep_last_images=3   # 每个镜像保留最近3个版本    )    # 运行清理流水线    print("=== Docker垃圾清理试运行 ===")    print("此操作为试运行,不会实际删除任何内容")    print("=" * 50)    stats = cleaner.run_cleanup_pipeline(aggressive=False)    # 实际执行(取消注释以下代码)    # print("\n" + "="*50)    # print("确定要实际执行清理吗?")    # response = input("输入 'yes' 确认执行: ")    # if response.lower() == 'yes':    #     cleaner.dry_run = False    #     print("开始实际清理...")    #     stats = cleaner.run_cleanup_pipeline(aggressive=False)    #     print("清理完成!")    # else:    #     print("取消清理操作")

脚本20:多云/混合云成本优化与资源清理

应用场景:在AWS、阿里云、腾讯云等多云或混合云环境中,自动识别并标记长期处于“已停止”状态的云主机、未被挂载的云磁盘、空闲的负载均衡器等闲置资源,执行归档或删除操作以优化成本。

# cloud_cleaner.py# 伪代码,使用 boto3 (AWS SDK) 或 aliyun-python-sdkimport boto3def cleanup_aws_instances(days_old=30):    ec2 = boto3.resource('ec2')    for instance in ec2.instances.all():        if instance.state['Name'] == 'stopped':            launch_time = instance.launch_time            # 判断是否超过设定天数            if is_older_than(launch_time, days_old):                print(f"删除闲置实例: {instance.id}")                instance.terminate()
“无他,惟手熟尔”!有需要的用起来!
如果你觉得这篇文章有用,欢迎点赞、转发、收藏、留言、推荐

想高效学习Python?下面三本精选好书满足你的不同需求!

《流畅的Python(第2版)》——Python进阶必读!深入讲解高级特性与最佳实践,适合想精进的开发者。

《Python从新手到高手》:初学者首选,系统学习全栈技能

《Python数据分析:从零基础入门到案例实战》——数据科学利器!手把手教你用Python处理数据,实战案例学完就能用。

三本书均支持先用后付、运费险和7天无理由退货,放心购买!点击“购买”按钮,立即开启你的Python学习之旅吧!

点击下方,即可购书
------加入知识库与更多人一起学习------

https://ima.qq.com/wiki/?shareId=f2628818f0874da17b71ffa0e5e8408114e7dbad46f1745bbd1cc1365277631c

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-02-08 07:39:27 HTTP/2.0 GET : https://f.mffb.com.cn/a/465692.html
  2. 运行时间 : 0.194559s [ 吞吐率:5.14req/s ] 内存消耗:4,966.67kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=ebde398583b9c349c8aa39182ba949d9
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000563s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000920s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000422s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000427s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000861s ]
  6. SELECT * FROM `set` [ RunTime:0.005226s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000965s ]
  8. SELECT * FROM `article` WHERE `id` = 465692 LIMIT 1 [ RunTime:0.001515s ]
  9. UPDATE `article` SET `lasttime` = 1770507568 WHERE `id` = 465692 [ RunTime:0.001396s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000970s ]
  11. SELECT * FROM `article` WHERE `id` < 465692 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.029861s ]
  12. SELECT * FROM `article` WHERE `id` > 465692 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.005936s ]
  13. SELECT * FROM `article` WHERE `id` < 465692 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.005807s ]
  14. SELECT * FROM `article` WHERE `id` < 465692 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.012874s ]
  15. SELECT * FROM `article` WHERE `id` < 465692 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.034357s ]
0.196097s