关注+星标,每天学习Python新技能
在运维工作中,大量重复性任务占据了工程师60%-80%的工作时间:日志分析、批量操作、监控告警、资源清理等。这些任务虽然简单,但手工执行效率低且易出错。Python凭借其简洁的语法、丰富的标准库和第三方模块,成为运维自动化的首选语言。本文将分享10个经过生产环境验证的Python脚本,帮助运维工程师从重复劳动中解放出来。
# 检查Python版本python3 --version# 检查pip版本pip3 --version# 检查系统资源free -hdf -h# 升级pippip3 install --upgrade pip# 安装常用运维库pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client# 验证安装pip3 list | grep -E "paramiko|requests|psutil"# 生成SSH密钥对ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N ""# 分发公钥到目标服务器(示例)ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100说明:使用密钥认证替代密码登录,提高安全性并支持批量操作。建议为运维脚本单独创建密钥对,便于权限管理和审计。
# 配置文件:config.ymlservers:-host:192.168.1.100port:22user:rootkey_file:~/.ssh/ops_rsa-host:192.168.1.101port:22user:rootkey_file:~/.ssh/ops_rsamysql:host:192.168.1.200port:3306user:monitorpassword:your_passworddatabase:opsredis:host:192.168.1.201port:6379password:your_redis_passworddb:0log:level:INFOfile:/var/log/ops/automation.logmax_size:100# MBbackup_count:10参数说明:
servers:目标服务器列表,支持批量操作mysql/redis:数据库连接信息,用于存储执行结果和状态log:日志配置,建议使用轮转避免磁盘占满# logging_config.pyimport loggingfrom logging.handlers import RotatingFileHandlerdefsetup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO): logger = logging.getLogger('ops_automation') logger.setLevel(level)# 轮转文件处理器 handler = RotatingFileHandler( log_file, maxBytes=100*1024*1024, # 100MB backupCount=10 ) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler)return logger# 测试SSH连接python3 -c "import paramiko; print('paramiko OK')"# 测试配置文件读取python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"# 验证SSH批量执行(示例脚本1)python3 batch_ssh_executor.py "uptime"# 预期输出# [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08# [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18#!/usr/bin/env python3# 文件路径:batch_ssh_executor.py"""批量SSH命令执行器支持并发执行、结果收集、异常处理"""import paramikoimport yamlimport sysfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom logging_config import setup_loggerlogger = setup_logger()classSSHExecutor:def__init__(self, config_file='config.yml'):withopen(config_file) as f:self.config = yaml.safe_load(f)self.servers = self.config['servers']defexecute_on_host(self, server, command, timeout=30):"""在单个主机上执行命令""" host = server['host']try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 使用密钥认证 key = paramiko.RSAKey.from_private_key_file(server['key_file']) client.connect( hostname=host, port=server['port'], username=server['user'], pkey=key, timeout=10 ) stdin, stdout, stderr = client.exec_command(command, timeout=timeout) exit_code = stdout.channel.recv_exit_status() result = {'host': host,'success': exit_code == 0,'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),'exit_code': exit_code } client.close() logger.info(f"[{host}] Command executed, exit_code={exit_code}")return resultexcept Exception as e: logger.error(f"[{host}] Error: {str(e)}")return {'host': host,'success': False,'stdout': '','stderr': str(e),'exit_code': -1 }defexecute_parallel(self, command, max_workers=10):"""并发执行命令""" results = []with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(self.execute_on_host, server, command): serverfor server inself.servers }for future in as_completed(futures): results.append(future.result())return resultsdefprint_results(self, results):"""格式化输出结果""" success_count = sum(1for r in results if r['success'])print(f"\n执行完成: 成功 {success_count}/{len(results)}\n")for result insorted(results, key=lambda x: x['host']): status = "SUCCESS"if result['success'] else"FAILED"print(f"[{result['host']}] {status}")if result['stdout']:print(f" 输出: {result['stdout']}")if result['stderr']:print(f" 错误: {result['stderr']}")print()if __name__ == '__main__':iflen(sys.argv) < 2:print("用法: python3 batch_ssh_executor.py '<command>'") sys.exit(1) command = sys.argv[1] executor = SSHExecutor() results = executor.execute_parallel(command) executor.print_results(results)#!/usr/bin/env python3# 文件名:log_analyzer.py"""日志分析工具功能:错误统计、异常检测、自动告警"""import reimport jsonfrom collections import Counter, defaultdictfrom datetime import datetime, timedeltaimport requestsfrom logging_config import setup_loggerlogger = setup_logger()classLogAnalyzer:def__init__(self, log_file):self.log_file = log_fileself.error_patterns = {'http_5xx': r'HTTP/\d\.\d"\s5\d{2}','exception': r'(Exception|Error|Fatal)','timeout': r'(timeout|timed out)','connection_refused': r'Connection refused','out_of_memory': r'(OutOfMemory|OOM|Cannot allocate memory)' }defparse_nginx_log(self, line):"""解析Nginx日志格式""" pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'match = re.match(pattern, line)ifmatch:return {'ip': match.group(1),'time': match.group(2),'request': match.group(3),'status': int(match.group(4)),'size': int(match.group(5)),'referer': match.group(6),'user_agent': match.group(7) }returnNonedefanalyze(self, time_window=60):"""分析最近N分钟的日志""" now = datetime.now() cutoff_time = now - timedelta(minutes=time_window) stats = {'total_requests': 0,'error_count': defaultdict(int),'status_codes': Counter(),'top_ips': Counter(),'slow_requests': [] }withopen(self.log_file, 'r') as f:for line in f: entry = self.parse_nginx_log(line)ifnot entry:continue# 时间过滤 log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')if log_time.replace(tzinfo=None) < cutoff_time:continue stats['total_requests'] += 1 stats['status_codes'][entry['status']] += 1 stats['top_ips'][entry['ip']] += 1# 错误检测for error_type, pattern inself.error_patterns.items():if re.search(pattern, line): stats['error_count'][error_type] += 1# 5xx错误记录if500 <= entry['status'] < 600: stats['slow_requests'].append({'time': entry['time'],'request': entry['request'],'status': entry['status'] })return statsdefcheck_alert_conditions(self, stats):"""检查告警条件""" alerts = []# 5xx错误率超过5%if stats['total_requests'] > 0: error_5xx = sum(count for code, count in stats['status_codes'].items()if500 <= code < 600) error_rate = error_5xx / stats['total_requests']if error_rate > 0.05: alerts.append({'level': 'critical','message': f'5xx错误率: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})' })# OOM错误if stats['error_count']['out_of_memory'] > 0: alerts.append({'level': 'critical','message': f'检测到OOM错误: {stats["error_count"]["out_of_memory"]}次' })# 连接超时if stats['error_count']['timeout'] > 100: alerts.append({'level': 'warning','message': f'超时错误异常: {stats["error_count"]["timeout"]}次' })return alertsdefsend_alert(self, alerts, webhook_url):"""发送告警到企业微信/钉钉"""ifnot alerts:return message = "【日志告警】\n" + "\n".join(f"[{a['level'].upper()}] {a['message']}"for a in alerts ) payload = {"msgtype": "text","text": {"content": message} }try: response = requests.post(webhook_url, json=payload, timeout=5)if response.status_code == 200: logger.info("告警发送成功")else: logger.error(f"告警发送失败: {response.status_code}")except Exception as e: logger.error(f"告警发送异常: {str(e)}")if __name__ == '__main__': analyzer = LogAnalyzer('/var/log/nginx/access.log') stats = analyzer.analyze(time_window=5)print(f"总请求数: {stats['total_requests']}")print(f"状态码分布: {dict(stats['status_codes'])}")print(f"Top 10 IP: {stats['top_ips'].most_common(10)}")print(f"错误统计: {dict(stats['error_count'])}") alerts = analyzer.check_alert_conditions(stats)if alerts:print("\n触发告警:")for alert in alerts:print(f" [{alert['level']}] {alert['message']}")# 发送告警(替换为实际webhook地址)# analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')#!/usr/bin/env python3# 文件名:system_monitor.py"""系统资源监控监控CPU、内存、磁盘、网络,支持Prometheus集成"""import psutilimport timefrom prometheus_client import CollectorRegistry, Gauge, push_to_gatewayfrom logging_config import setup_loggerlogger = setup_logger()classSystemMonitor:def__init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):self.pushgateway_url = pushgateway_urlself.job_name = job_nameself.registry = CollectorRegistry()# 定义指标self.cpu_gauge = Gauge('system_cpu_percent', 'CPU使用率', registry=self.registry)self.memory_gauge = Gauge('system_memory_percent', '内存使用率', registry=self.registry)self.disk_gauge = Gauge('system_disk_percent', '磁盘使用率', ['mountpoint'], registry=self.registry)self.network_gauge = Gauge('system_network_bytes', '网络流量', ['interface', 'direction'], registry=self.registry)defcollect_metrics(self):"""采集系统指标""" metrics = {}# CPU cpu_percent = psutil.cpu_percent(interval=1) metrics['cpu'] = cpu_percentself.cpu_gauge.set(cpu_percent)# 内存 memory = psutil.virtual_memory() metrics['memory'] = {'percent': memory.percent,'total': memory.total,'available': memory.available,'used': memory.used }self.memory_gauge.set(memory.percent)# 磁盘 metrics['disk'] = {}for partition in psutil.disk_partitions():try: usage = psutil.disk_usage(partition.mountpoint) metrics['disk'][partition.mountpoint] = {'percent': usage.percent,'total': usage.total,'used': usage.used,'free': usage.free }self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)except PermissionError:continue# 网络 net_io = psutil.net_io_counters(pernic=True) metrics['network'] = {}for interface, stats in net_io.items(): metrics['network'][interface] = {'bytes_sent': stats.bytes_sent,'bytes_recv': stats.bytes_recv }self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)return metricsdefcheck_thresholds(self, metrics):"""检查阈值告警""" alerts = []if metrics['cpu'] > 80: alerts.append(f"CPU使用率过高: {metrics['cpu']:.1f}%")if metrics['memory']['percent'] > 85: alerts.append(f"内存使用率过高: {metrics['memory']['percent']:.1f}%")for mount, stats in metrics['disk'].items():if stats['percent'] > 90: alerts.append(f"磁盘空间不足: {mount} ({stats['percent']:.1f}%)")return alertsdefpush_metrics(self):"""推送指标到Pushgateway"""try: push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry) logger.info("指标推送成功")except Exception as e: logger.error(f"指标推送失败: {str(e)}")defrun(self, interval=60):"""持续监控""" logger.info(f"开始监控,采集间隔: {interval}秒")whileTrue:try: metrics = self.collect_metrics() alerts = self.check_thresholds(metrics)if alerts: logger.warning("触发告警: " + "; ".join(alerts))self.push_metrics() time.sleep(interval)except KeyboardInterrupt: logger.info("监控停止")breakexcept Exception as e: logger.error(f"监控异常: {str(e)}") time.sleep(interval)if __name__ == '__main__': monitor = SystemMonitor()# 单次采集 metrics = monitor.collect_metrics()print(f"CPU: {metrics['cpu']:.1f}%")print(f"内存: {metrics['memory']['percent']:.1f}%")print("磁盘:")for mount, stats in metrics['disk'].items():print(f" {mount}: {stats['percent']:.1f}%")# 持续监控(取消注释启用)# monitor.run(interval=60)#!/usr/bin/env python3# 文件名:mysql_slow_query_analyzer.py"""MySQL慢查询分析解析慢查询日志,生成优化建议"""import reimport pymysqlfrom collections import defaultdictfrom logging_config import setup_loggerlogger = setup_logger()classSlowQueryAnalyzer:def__init__(self, slow_log_file, db_config):self.slow_log_file = slow_log_fileself.db_config = db_configself.queries = []defparse_slow_log(self):"""解析慢查询日志""" current_query = {}withopen(self.slow_log_file, 'r') as f:for line in f:# Time行if line.startswith('# Time:'):if current_query:self.queries.append(current_query) current_query = {'time': line.split(':', 1)[1].strip()}# User@Host行elif line.startswith('# User@Host:'):match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)ifmatch: current_query['user'] = match.group(1) current_query['host'] = match.group(3)# Query_time行elif line.startswith('# Query_time:'):match = re.search(r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)', line )ifmatch: current_query['query_time'] = float(match.group(1)) current_query['lock_time'] = float(match.group(2)) current_query['rows_sent'] = int(match.group(3)) current_query['rows_examined'] = int(match.group(4))# SQL语句elifnot line.startswith('#') and line.strip(): current_query['sql'] = current_query.get('sql', '') + line.strip() + ' 'if current_query:self.queries.append(current_query) logger.info(f"解析完成,共 {len(self.queries)} 条慢查询")defanalyze(self):"""分析慢查询""" stats = {'total': len(self.queries),'avg_query_time': 0,'max_query_time': 0,'top_queries': [],'table_scan': [] }ifnotself.queries:return stats# 基础统计 total_time = sum(q['query_time'] for q inself.queries) stats['avg_query_time'] = total_time / len(self.queries) stats['max_query_time'] = max(q['query_time'] for q inself.queries)# Top 10耗时查询 sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True) stats['top_queries'] = sorted_queries[:10]# 全表扫描检测(rows_examined > 10000) stats['table_scan'] = [ q for q inself.queriesif q.get('rows_examined', 0) > 10000 ]return statsdefget_explain_plan(self, sql):"""获取EXPLAIN执行计划"""try: conn = pymysql.connect(**self.db_config) cursor = conn.cursor() cursor.execute(f"EXPLAIN {sql}") result = cursor.fetchall() cursor.close() conn.close()return resultexcept Exception as e: logger.error(f"EXPLAIN失败: {str(e)}")returnNonedefgenerate_report(self, stats):"""生成分析报告""" report = [] report.append("=" * 80) report.append("MySQL慢查询分析报告") report.append("=" * 80) report.append(f"总慢查询数: {stats['total']}") report.append(f"平均查询时间: {stats['avg_query_time']:.2f}秒") report.append(f"最大查询时间: {stats['max_query_time']:.2f}秒") report.append("") report.append("Top 10耗时查询:")for i, query inenumerate(stats['top_queries'], 1): report.append(f"\n{i}. 查询时间: {query['query_time']:.2f}秒") report.append(f" 扫描行数: {query.get('rows_examined', 0)}") report.append(f" SQL: {query.get('sql', '')[:200]}")if stats['table_scan']: report.append(f"\n发现 {len(stats['table_scan'])} 个全表扫描查询")for query in stats['table_scan'][:5]: report.append(f" - {query.get('sql', '')[:100]}")return"\n".join(report)if __name__ == '__main__': db_config = {'host': 'localhost','user': 'root','password': 'your_password','database': 'test' } analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config) analyzer.parse_slow_log() stats = analyzer.analyze()print(analyzer.generate_report(stats))#!/usr/bin/env python3# 文件名:file_sync.py"""文件同步工具支持增量同步、断点续传、校验"""import osimport hashlibimport paramikofrom pathlib import Pathfrom logging_config import setup_loggerlogger = setup_logger()classFileSync:def__init__(self, ssh_config):self.ssh_config = ssh_configself.client = Noneself.sftp = Nonedefconnect(self):"""建立SSH连接"""try:self.client = paramiko.SSHClient()self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])self.client.connect( hostname=self.ssh_config['host'], port=self.ssh_config['port'], username=self.ssh_config['user'], pkey=key )self.sftp = self.client.open_sftp() logger.info(f"连接成功: {self.ssh_config['host']}")except Exception as e: logger.error(f"连接失败: {str(e)}")raisedefdisconnect(self):"""关闭连接"""ifself.sftp:self.sftp.close()ifself.client:self.client.close()defcalculate_md5(self, file_path):"""计算文件MD5""" hash_md5 = hashlib.md5()withopen(file_path, "rb") as f:for chunk initer(lambda: f.read(4096), b""): hash_md5.update(chunk)return hash_md5.hexdigest()defremote_file_exists(self, remote_path):"""检查远程文件是否存在"""try:self.sftp.stat(remote_path)returnTrueexcept FileNotFoundError:returnFalsedefsync_file(self, local_path, remote_path, check_md5=True):"""同步单个文件"""try:# 确保远程目录存在 remote_dir = os.path.dirname(remote_path)try:self.sftp.stat(remote_dir)except FileNotFoundError:self._create_remote_dir(remote_dir)# MD5校验 need_upload = Trueif check_md5 andself.remote_file_exists(remote_path): local_md5 = self.calculate_md5(local_path)# 远程MD5计算(需要执行命令) stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}") remote_md5 = stdout.read().decode().split()[0]if local_md5 == remote_md5: logger.info(f"文件未变化,跳过: {local_path}") need_upload = Falseif need_upload:self.sftp.put(local_path, remote_path) logger.info(f"上传成功: {local_path} -> {remote_path}")returnTruereturnFalseexcept Exception as e: logger.error(f"同步失败 {local_path}: {str(e)}")returnFalsedef_create_remote_dir(self, remote_dir):"""递归创建远程目录""" dirs = []while remote_dir != '/': dirs.append(remote_dir) remote_dir = os.path.dirname(remote_dir)for dir_path inreversed(dirs):try:self.sftp.stat(dir_path)except FileNotFoundError:self.sftp.mkdir(dir_path) logger.info(f"创建目录: {dir_path}")defsync_directory(self, local_dir, remote_dir, exclude_patterns=None):"""同步整个目录""" exclude_patterns = exclude_patterns or [] synced_count = 0 skipped_count = 0for root, dirs, files in os.walk(local_dir):# 计算相对路径 rel_path = os.path.relpath(root, local_dir) remote_root = os.path.join(remote_dir, rel_path).replace('\\', '/')for file in files:# 排除规则ifany(pattern in file for pattern in exclude_patterns):continue local_file = os.path.join(root, file) remote_file = os.path.join(remote_root, file).replace('\\', '/')ifself.sync_file(local_file, remote_file): synced_count += 1else: skipped_count += 1 logger.info(f"同步完成: 上传{synced_count}个文件,跳过{skipped_count}个")if __name__ == '__main__': ssh_config = {'host': '192.168.1.100','port': 22,'user': 'root','key_file': '~/.ssh/ops_rsa' } sync = FileSync(ssh_config) sync.connect()# 同步单个文件# sync.sync_file('/local/config.yml', '/remote/config.yml')# 同步目录 sync.sync_directory('/local/app','/remote/app', exclude_patterns=['.git', '.pyc', '__pycache__'] ) sync.disconnect()场景描述:管理100+个域名的SSL证书,需要提前30天发现即将过期的证书并告警。
实现代码:
#!/usr/bin/env python3# 文件名:ssl_cert_checker.pyimport sslimport socketfrom datetime import datetime, timedeltaimport requestsclassSSLCertChecker:def__init__(self, domains, alert_days=30):self.domains = domainsself.alert_days = alert_daysdefcheck_cert_expiry(self, domain, port=443):"""检查证书过期时间"""try: context = ssl.create_default_context()with socket.create_connection((domain, port), timeout=10) as sock:with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert()# 解析过期时间 expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z') days_left = (expire_date - datetime.now()).daysreturn {'domain': domain,'expire_date': expire_date,'days_left': days_left,'issuer': dict(x[0] for x in cert['issuer']) }except Exception as e:return {'domain': domain,'error': str(e) }defcheck_all(self):"""检查所有域名""" results = [] alerts = []for domain inself.domains: result = self.check_cert_expiry(domain) results.append(result)if'days_left'in result and result['days_left'] < self.alert_days: alerts.append(f"{domain} 证书将在 {result['days_left']} 天后过期")return results, alerts# 使用示例domains = ['example.com', 'api.example.com', 'www.example.com']checker = SSLCertChecker(domains)results, alerts = checker.check_all()for result in results:if'days_left'in result:print(f"{result['domain']}: 剩余 {result['days_left']} 天")else:print(f"{result['domain']}: 检查失败 - {result['error']}")if alerts:print("\n告警:")for alert in alerts:print(f" - {alert}")运行结果:
example.com: 剩余 85 天api.example.com: 剩余 12 天www.example.com: 剩余 45 天告警: - api.example.com 证书将在 12 天后过期场景描述:定期清理停止超过7天的容器、未使用的镜像和volume,释放磁盘空间。
实现代码:
#!/usr/bin/env python3# 文件名:docker_cleanup.pyimport subprocessimport jsonfrom datetime import datetime, timedeltaclassDockerCleaner:def__init__(self, dry_run=True):self.dry_run = dry_rundefget_stopped_containers(self, days=7):"""获取停止超过N天的容器""" cutoff_time = datetime.now() - timedelta(days=days) cmd = "docker ps -a --format '{{json .}}'" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) stopped_containers = []for line in result.stdout.strip().split('\n'):ifnot line:continue container = json.loads(line)if container['State'] != 'exited':continue# 获取容器详细信息 inspect_cmd = f"docker inspect {container['ID']}" inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True) detail = json.loads(inspect_result.stdout)[0] finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])if finished_at < cutoff_time: stopped_containers.append({'id': container['ID'],'name': container['Names'],'finished_at': finished_at })return stopped_containersdefremove_containers(self, containers):"""删除容器"""for container in containers: cmd = f"docker rm {container['id']}"ifself.dry_run:print(f"[DRY RUN] {cmd}")else: subprocess.run(cmd, shell=True)print(f"已删除容器: {container['name']}")defprune_images(self):"""清理未使用的镜像""" cmd = "docker image prune -a -f"ifself.dry_run:print(f"[DRY RUN] {cmd}")else: result = subprocess.run(cmd, shell=True, capture_output=True, text=True)print(result.stdout)defprune_volumes(self):"""清理未使用的volume""" cmd = "docker volume prune -f"ifself.dry_run:print(f"[DRY RUN] {cmd}")else: result = subprocess.run(cmd, shell=True, capture_output=True, text=True)print(result.stdout)defcleanup(self, container_days=7):"""执行清理"""print(f"开始清理(DRY RUN: {self.dry_run})")# 清理容器 containers = self.get_stopped_containers(container_days)print(f"\n发现 {len(containers)} 个停止超过{container_days}天的容器")self.remove_containers(containers)# 清理镜像print("\n清理未使用的镜像...")self.prune_images()# 清理volumeprint("\n清理未使用的volume...")self.prune_volumes()# 使用示例cleaner = DockerCleaner(dry_run=False)cleaner.cleanup(container_days=7)场景描述:每天凌晨2点自动备份MySQL数据库,保留最近30天的备份,自动清理过期文件。
实现步骤:
#!/usr/bin/env python3# 文件名:mysql_backup.pyimport osimport subprocessfrom datetime import datetime, timedeltaimport gzipimport shutilclassMySQLBackup:def__init__(self, config):self.host = config['host']self.user = config['user']self.password = config['password']self.databases = config['databases']self.backup_dir = config['backup_dir']self.retention_days = config.get('retention_days', 30)defbackup_database(self, database):"""备份单个数据库""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = f"{self.backup_dir}/{database}_{timestamp}.sql"# mysqldump命令 cmd = ['mysqldump',f'--host={self.host}',f'--user={self.user}',f'--password={self.password}','--single-transaction','--routines','--triggers','--events', database ]try:withopen(backup_file, 'w') as f: subprocess.run(cmd, stdout=f, check=True)# 压缩withopen(backup_file, 'rb') as f_in:with gzip.open(f"{backup_file}.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.remove(backup_file)print(f"备份成功: {database} -> {backup_file}.gz")returnf"{backup_file}.gz"except subprocess.CalledProcessError as e:print(f"备份失败: {database} - {str(e)}")returnNonedefcleanup_old_backups(self):"""清理过期备份""" cutoff_time = datetime.now() - timedelta(days=self.retention_days)for filename in os.listdir(self.backup_dir):ifnot filename.endswith('.sql.gz'):continue file_path = os.path.join(self.backup_dir, filename) file_time = datetime.fromtimestamp(os.path.getmtime(file_path))if file_time < cutoff_time: os.remove(file_path)print(f"删除过期备份: {filename}")defrun(self):"""执行备份"""print(f"开始备份,时间: {datetime.now()}")for database inself.databases:self.backup_database(database)self.cleanup_old_backups()print("备份完成")# 配置config = {'host': 'localhost','user': 'backup','password': 'your_password','databases': ['app_db', 'user_db'],'backup_dir': '/data/mysql_backups','retention_days': 30}backup = MySQLBackup(config)backup.run()# 编辑crontabcrontab -e# 添加定时任务(每天凌晨2点执行)0 2 * * * /usr/bin/python3 /opt/scripts/mysql_backup.py >> /var/log/mysql_backup.log 2>&1# 查看备份文件ls -lh /data/mysql_backups/# 测试恢复gunzip < app_db_20250115_020001.sql.gz | mysql -u root -p app_db_test# 根据CPU核心数动态调整import osmax_workers = min(32, (os.cpu_count() or1) * 4)from dbutils.pooled_db import PooledDBimport pymysqlpool = PooledDB( creator=pymysql, maxconnections=10, host='localhost', user='root', password='password')# 批量插入cursor.executemany("INSERT INTO logs (message, level) VALUES (%s, %s)", [(msg, level) for msg, level in log_entries])import osfrom dotenv import load_dotenvload_dotenv()DB_PASSWORD = os.getenv('DB_PASSWORD')chmod 600 ~/.ssh/ops_rsaimport shlex# 安全的命令参数处理safe_command = shlex.quote(user_input)from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))defapi_call(): response = requests.get('https://api.example.com') response.raise_for_status()return response.json()import shutilshutil.copy2('/etc/nginx/nginx.conf', '/etc/nginx/nginx.conf.backup')# 查看脚本执行日志tail -f /var/log/ops/automation.log# 查看crontab执行记录grep CRON /var/log/syslog | tail -20# 查看Python异常堆栈grep -A 20 "Traceback" /var/log/ops/automation.log问题一:SSH连接超时
# 测试SSH连接ssh -vvv -i ~/.ssh/ops_rsa root@192.168.1.100# 检查防火墙sudo iptables -L -n | grep 22解决方案:
systemctl status sshdfirewall-cmd --list-allping 192.168.1.100问题二:内存占用持续增长
# 监控Python进程内存ps aux | grep python | sort -k4 -nr | head -5# 使用memory_profiler分析pip3 install memory-profilerpython3 -m memory_profiler script.py解决方案:
问题三:脚本执行缓慢
import cProfilecProfile.run('main()', sort='cumulative')# 开启详细日志import logginglogging.basicConfig(level=logging.DEBUG)# paramiko开启调试paramiko.util.log_to_file('/tmp/paramiko.log', level=logging.DEBUG)# requests显示HTTP请求详情import http.clienthttp.client.HTTPConnection.debuglevel = 1# 脚本执行时间监控import timefrom functools import wrapsdeftiming_decorator(func): @wraps(func)defwrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) duration = time.time() - start logger.info(f"{func.__name__} 执行耗时: {duration:.2f}秒")return resultreturn wrapper@timing_decoratordefbatch_operation():# 批量操作逻辑pass# 监控脚本资源占用top -p $(pgrep -f "python.*automation")# 监控网络流量iftop -i eth0# 监控磁盘IOiostat -x 1# Prometheus告警规则示例groups:-name:python_automation_alertsinterval:30srules:-alert:ScriptExecutionTimeoutexpr:script_duration_seconds>600for:5mlabels:severity:warningannotations:summary:"脚本执行超时: {{ $labels.script_name }}"description:"执行时间 {{ $value }}秒"-alert:HighErrorRateexpr:rate(script_errors_total[5m])>0.05for:5mlabels:severity:criticalannotations:summary:"脚本错误率过高"description:"错误率 {{ $value | humanizePercentage }}"#!/bin/bash# 脚本文件备份脚本BACKUP_DIR="/data/backups/scripts"SOURCE_DIR="/opt/ops_scripts"DATE=$(date +%Y%m%d)# 创建备份目录mkdir -p $BACKUP_DIR# 打包备份tar -czf $BACKUP_DIR/scripts_$DATE.tar.gz \ -C $SOURCE_DIR \ --exclude='*.pyc' \ --exclude='__pycache__' \ --exclude='.git' \ .# 保留最近30天备份find $BACKUP_DIR -name "scripts_*.tar.gz" -mtime +30 -deleteecho"备份完成: $BACKUP_DIR/scripts_$DATE.tar.gz"# 停止crontab任务crontab -r# 解压备份文件tar -xzf /data/backups/scripts/scripts_20250115.tar.gz -C /opt/ops_scripts_restore# 检查文件数量diff -r /opt/ops_scripts /opt/ops_scripts_restore# 测试脚本语法python3 -m py_compile /opt/ops_scripts_restore/*.py# 恢复crontabcrontab /data/backups/crontab_backup.txt# Python环境管理python3 -m venv /opt/venv # 创建虚拟环境source /opt/venv/bin/activate # 激活虚拟环境pip3 freeze > requirements.txt # 导出依赖pip3 install -r requirements.txt # 安装依赖# 常用运维命令python3 batch_ssh_executor.py "df -h"# 批量检查磁盘python3 log_analyzer.py # 分析日志python3 system_monitor.py # 系统监控python3 mysql_backup.py # 数据库备份# 调试和性能分析python3 -m pdb script.py # 调试脚本python3 -m cProfile -o profile.stats script.py # 性能分析python3 -m trace --count script.py # 代码覆盖率# 代码质量检查pylint script.py # 代码规范检查black script.py # 代码格式化mypy script.py # 类型检查paramiko.SSHClient参数:
timeout:连接超时时间(秒),默认无超时,建议设置10-30秒banner_timeout:SSH banner读取超时,默认15秒auth_timeout:认证超时,默认15秒allow_agent:是否使用SSH agent,默认Truelook_for_keys:是否搜索~/.ssh/目录下的密钥,默认Truelogging.RotatingFileHandler参数:
maxBytes:单个日志文件最大字节数,建议100MBbackupCount:保留的日志文件数量,建议10-30个encoding:日志文件编码,建议utf-8delay:延迟创建文件直到第一次写入,默认FalseThreadPoolExecutor参数:
max_workers:最大线程数,建议为CPU核心数的2-4倍thread_name_prefix:线程名称前缀,便于调试initializer:线程启动时执行的初始化函数
▲点击关注-免费领取
推荐阅读
Python:AI大模型时代的"瑞士军刀"——从开发到部署的全栈利器
点击 阅读原文