常用的Python运维自动化脚本
1.自动备份脚本
说明:将指定的源文件夹打包压缩成一个带有时间戳的 ZIP 备份文件,并保存到目标目录中。
import shutilimport osimport datetimesource = '/data/k8s/' #需要备份的文件路径backup = '/data/k8s-backup/' #备份文件存放路径os.makedirs(backup, exist_ok=True) now = datetime.datetime.now()name = 'backup_' + now.strftime('%Y-%m-%d_%H-%M-%S') shutil.make_archive(os.path.join(backup, name), 'zip', source)print(f"备份成功完成!")
2.监控系统性能脚本
说明:监控CPU,内存,磁盘使用率等系统性能指标并发送邮件报警
import psutilimport timeimport smtplibfrom email.mime.text import MIMETextMAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人last_alert_time = 0ALERT_INTERVAL = 300 # 5分钟内只发一次告警,防抖def send_mail(body): try: server = smtplib.SMTP_SSL("smtp.163.com", 465) server.login(MAIL_USER, MAIL_PASS) msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = "【服务器告警】CPU/内存/磁盘过高" msg["From"] = MAIL_USER msg["To"] = MAIL_TO server.sendmail(MAIL_USER, [MAIL_TO], msg.as_string()) server.quit() print("告警邮件发送成功!") except Exception as e: print("邮件发送失败:", e)while True: cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory().percent disk = psutil.disk_usage("/").percent print(f"监控中 → CPU: {cpu}% | 内存: {mem}% | 磁盘: {disk}%") if cpu > 80 or mem > 80 or disk > 80: #告警阈值 now = time.time() if now - last_alert_time > ALERT_INTERVAL: content = f"""服务器性能告警!CPU:{cpu}%内存:{mem}%磁盘:{disk}%""" send_mail(content) last_alert_time = now time.sleep(60)
3.日志关键词告警脚本
说明:持续监控服务日志,当日志中出现异常关键字时,自动发送邮件告警,并支持告警防抖,避免邮件轰炸。
import tailerimport timeimport smtplibfrom email.mime.text import MIMETextMAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人LOG_FILE = "/usr/local/nginx/logs/error.log" #日志目录(nginx举例)KEYWORDS = ["error", "emerg", "failed", "invalid"] #告警关键字ALERT_INTERVAL = 300 # 防抖:5 分钟内只发 1 次last_alert_time = 0def send_mail(body): try: server = smtplib.SMTP_SSL("smtp.163.com", 465) server.login(MAIL_USER, MAIL_PASS) msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = "【Nginx 错误日志告警】" msg["From"] = MAIL_USER msg["To"] = MAIL_TO server.sendmail(MAIL_USER, [MAIL_TO], msg.as_string()) server.quit() print("告警邮件发送成功") except Exception as e: print("邮件发送失败:", e)print("Nginx 日志监控已启动")print(f"监控文件: {LOG_FILE}")print(f"告警关键词: {KEYWORDS}\n")for line in tailer.follow(open(LOG_FILE, "r")): line_lower = line.lower() alert = False for kw in KEYWORDS: if kw in line_lower: alert = True break if alert: now = time.time() if now - last_alert_time > ALERT_INTERVAL: print("⚠️ 触发告警:", line.strip()) send_mail(f"【Nginx 日志告警】\n\n日志内容:\n{line}") last_alert_time = now else: print("⏳ 告警冷却中(5分钟内仅发一次):", line.strip())
4.端口存活监控脚本
说明:监控 80/443/22/3306/6379 等端口是否能连通
import socket, time, smtplibfrom email.mime.text import MIMETextMAIL_USER = "systemalert@163.com"MAIL_PASS = "W8nE4gA2uM7pX1kV"MAIL_TO = "111111111@qq.com"CHECK_PORT = [22, 80, 443, 8090, 8080, 3306, 6379] #根据需要配置端口INTERVAL = 30def send_alert(msg): m = MIMEText(msg, "plain", "utf-8") m["Subject"] = "【端口监控告警】" m["From"] = MAIL_USER m["To"] = MAIL_TO try: s = smtplib.SMTP_SSL("smtp.163.com", 465) s.login(MAIL_USER, MAIL_PASS) s.sendmail(MAIL_USER, [MAIL_TO], m.as_string()) s.quit() except: passprint("端口监控已启动")last = {}while True: now = time.strftime("%Y-%m-%d %H:%M:%S") down = [] for p in CHECK_PORT: try: socket.create_connection(("127.0.0.1", p), timeout=2) except: down.append(p) if down and (str(down) not in last or time.time() - last[str(down)] > 60): send_alert(f"⚠️ 端口监听异常\n时间:{now}\n失联端口:{down}") last[str(down)] = time.time() time.sleep(INTERVAL)
5.进程存活监控脚本
说明:监控 nginx /mysql/redis 等进程是否存活
import psutil, time, smtplibfrom email.mime.text import MIMETextMAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人PROCESSES = ["nginx", "mysql", "redis", "docker", "etcd", "kubelet", "kube-proxy", "kube-apiserver", "kube-controller-manager", "kube-scheduler"] #自定义进程关键词INTERVAL = 30def send_alert(msg): m = MIMEText(msg, "plain", "utf-8") m["Subject"] = "【进程监控告警】" m["From"] = MAIL_USER m["To"] = MAIL_TO try: s = smtplib.SMTP_SSL("smtp.163.com", 465) s.login(MAIL_USER, MAIL_PASS) s.sendmail(MAIL_USER, [MAIL_TO], m.as_string()) except: passprint("进程监控已启动")last = {}while True: now = time.strftime("%Y-%m-%d %H:%M:%S") missing = [] for p in PROCESSES: if not any(p in proc.info["name"] for proc in psutil.process_iter(["name"])): missing.append(p) if missing: key = str(missing) if key not in last or time.time() - last[key] > 60: send_alert(f"⚠️ 进程丢失\n时间:{now}\n异常:{missing}") last[key] = time.time() time.sleep(INTERVAL)
6.Docker 容器监控脚本
说明:持续监控服务器上所有 Docker 容器,只要发现容器异常,自动发送邮件告警
import docker, time, smtplibfrom email.mime.text import MIMETextMAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人ALERT_INTERVAL = 120client = docker.from_env()def send_mail(body): try: s = smtplib.SMTP_SSL("smtp.163.com", 465) s.login(MAIL_USER, MAIL_PASS) msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = "【Docker告警】容器异常" msg["From"] = MAIL_USER msg["To"] = MAIL_TO s.sendmail(MAIL_USER, [MAIL_TO], msg.as_string()) s.quit() print("✅ 告警邮件发送成功") except: print("❌ 邮件发送失败")last_alert = 0print("Docker 容器监控已启动")while True: alerts = [f"{c.name} ({c.status})" for c in client.containers.list(all=True) if c.status != "running"] if alerts and time.time() - last_alert > ALERT_INTERVAL: now_str = time.strftime("%Y-%m-%d %H:%M:%S") # 时间戳 body = f"⚠️ Docker 容器异常告警 ⚠️\n" body += "=" * 40 + "\n" for i, info in enumerate(alerts, 1): body += f"{i}. {info}\n" body += "=" * 40 + "\n" body += f"告警时间:{now_str}" print(f"⚠️ {now_str} 异常容器:", alerts) send_mail(body) last_alert = time.time() time.sleep(30)
7. Inode 使用率监控脚本
说明:定时检查服务器所有分区的 Inode 使用率,一旦超过阈值,就自动发邮件告警,防止服务器因为 Inode 满了无法创建文件。
import os, time, smtplibfrom email.mime.text import MIMETextMAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人WARNING = 85INTERVAL = 60def send_alert(body): msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = "【inode 使用率过高】服务器紧急告警" msg["From"] = MAIL_USER msg["To"] = MAIL_TO try: s = smtplib.SMTP_SSL("smtp.163.com", 465) s.login(MAIL_USER, MAIL_PASS) s.sendmail(MAIL_USER, [MAIL_TO], msg.as_string()) s.quit() print("✅ 告警邮件发送成功") except: print("❌ 邮件发送失败")last_alert = {}print("✅ inode 监控已启动")while True: now = time.strftime("%Y-%m-%d %H:%M:%S") for line in os.popen("df -i").read().splitlines()[1:]: parts = line.split() if len(parts) < 6: continue try: inode_used = int(parts[-2].replace("%", "")) except: continue fs = parts[0] mount = parts[-1] key = mount if inode_used >= WARNING: if key in last_alert and time.time() - last_alert[key] < 300: continue body = f"""📛 服务器 Inode 使用率过高告警 📛============================================告警时间:{now}文件系统:{fs}挂载路径:{mount}Inode 使用率:{inode_used}%告警阈值:{WARNING}%============================================⚠️ 说明:Inode 耗尽会导致无法创建新文件!请及时清理小文件、日志或碎片文件。""" send_alert(body) last_alert[key] = time.time() time.sleep(INTERVAL)
8.SSL 证书过期监控脚本
说明:自动批量监控多个域名的 SSL 证书有效期,提前发现即将过期的证书并汇总邮件告警,防止因证书过期导致网站 HTTPS 服务中断。
import ssl, socket, time, smtplibfrom datetime import datetimefrom email.mime.text import MIMETextDOMAINS = ["www.test1.com", "www.test2.com", "www.test3.com"] #需要监控的域名WARNING_DAYS = 30 # 提前30天告警INTERVAL = 86400 # 每天检查一次MAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人last_alert = 0def send_alert(subject, body): msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = subject msg["From"] = MAIL_USER msg["To"] = MAIL_TO try: s = smtplib.SMTP_SSL("smtp.163.com", 465) s.login(MAIL_USER, MAIL_PASS) s.sendmail(MAIL_USER, [MAIL_TO], msg.as_string()) s.quit() print("✅ 汇总告警邮件已发送") except Exception as e: print("❌ 邮件发送失败:", e)print("✅ SSL证书监控已启动")while True: now = datetime.now() expiring = [] errors = [] for domain in DOMAINS: try: context = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() expire = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y GMT") days_left = (expire - now).days # 只保留即将过期的域名 if days_left <= WARNING_DAYS: expiring.append({ "domain": domain, "days": days_left, "expire": expire.strftime("%Y-%m-%d %H:%M:%S") }) except Exception as e: errors.append({"domain": domain, "error": str(e)}) if expiring or errors: if time.time() - last_alert > 300: body = "📛 服务器SSL证书过期告警 📛\n" body += "=" * 50 + "\n" body += f"检测时间:{now.strftime('%Y-%m-%d %H:%M:%S')}\n" body += "=" * 50 + "\n" if expiring: body += "【即将过期的域名】\n" for item in expiring: body += f" 域名:{item['domain']:<30}\n" body += f" 剩余天数:{item['days']} 天\n" body += f" 过期时间:{item['expire']}\n" body += "-" * 50 + "\n" if errors: body += "【检查失败的域名】\n" for item in errors: body += f" 域名:{item['domain']:<30}\n" body += f" 错误信息:{item['error']}\n" body += "-" * 50 + "\n" send_alert("【服务器SSL证书过期告警】", body) last_alert = time.time() time.sleep(INTERVAL)
9.服务器批量巡检脚本
说明:批量检查所有机器的运行状态,省去手动一台台登录,生成一份完整巡检报告并自动发送邮件到你的邮箱
import paramikoimport timeimport smtplibfrom email.mime.text import MIMEText# 服务器列表SERVERS = [ {"host": "172.16.0.2", "user": "test", "pass": "test123"}, {"host": "172.16.0.3", "user": "test", "pass": "test123"}, {"host": "172.16.0.4", "user": "test", "pass": "test123"}, {"host": "172.16.0.5", "user": "test", "pass": "test123"}]# 巡检命令(和你截图里的 emoji 保持一致)CHECK_CMD = r'''echo "🖥️ 主机名: $(hostname)"echo "🌐 IP地址: $(hostname -I | awk '{print (date '+%Y-%m-%d %H:%M:%S')"echo "⚙️ CPU负载: 2}')"echo "🧠 内存使用率: 3, $2, $3/(df -h / | awk '/\// {print $3 "/" $2 " (" (df -i / | awk '/\// {print (uptime -p)"'''# 邮箱配置MAIL_USER = "systemalert@163.com" #发邮件的163邮箱地址MAIL_PASS = "W8nE4gA2uM7pX1kV" #发邮件的163邮箱授权码MAIL_TO = "111111111@qq.com" #告警邮箱收件人def send_alert(body): msg = MIMEText(body, "plain", "utf-8") msg["Subject"] = "【服务器批量巡检报告】" msg["From"] = MAIL_USER msg["To"] = MAIL_TO try: s = smtplib.SMTP_SSL("smtp.163.com", 465) s.login(MAIL_USER, MAIL_PASS) s.sendmail(MAIL_USER, [MAIL_TO], msg.as_string()) s.quit() except Exception as e: print("❌ 邮件发送失败:", e)print("✅ 开始批量巡检...")report = []for srv in SERVERS: try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect( hostname=srv["host"], username=srv["user"], password=srv["pass"], timeout=10 ) stdin, stdout, stderr = ssh.exec_command(CHECK_CMD) output = stdout.read().decode().strip() ssh.close() block = f"\n【{srv['host']}】\n{output}\n" report.append(block) except Exception as e: block = f"\n【{srv['host']}】\n❌ 连接失败: {str(e)}\n" report.append(block)# 整体邮件内容final_body = f"""📛 服务器批量巡检报告 📛🕒 巡检时间: {time.strftime('%Y-%m-%d %H:%M:%S')}{'-' * 40}{"".join(report)}"""send_alert(final_body)print("✅ 巡检完成,报告已发送")
10.自动清理备份文件脚本
说明:自动清理指定目录下 N 天前的日志、备份文件,防止磁盘被占满,例如Gitlab数据备份目录
import osimport timeBAK_DIR = "/data/gitlab-bak" #存放备份文件目录KEEP_DAYS = 7 #保留7天# 清理 7 天前的 GitLab 备份if os.path.exists(BAK_DIR): cmd = f"find {BAK_DIR} -name '*gitlab_backup.tar' -type f -mtime +{KEEP_DAYS} -delete" os.system(cmd)print("✅ 清理完成:保留最近 7 天的 GitLab 备份")