Python运维工程师必回的指令
在现代运维工作中,Python已经成为了不可或缺的工具。无论是服务器管理、日志分析、自动化部署还是监控告警,Python都能以简洁高效的代码完成复杂的运维任务。本文整理了Python运维工程师日常工作中最常用的核心指令和脚本,帮助你快速提升运维效率。
欢迎大家关注此公众号,后台点击按钮【免费资料】可免费获取【Python入门30节课】电子书
此外小庄推荐一本适合于新手\小白入手一本 Python基础书籍,欢迎大家订阅,也感谢大家支持,我才有更新的动力
一、系统资源管理指令
1. 查看CPU使用率
import psutil
# 获取CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
print(f"CPU使用率: {cpu_percent}%")
# 获取每个逻辑CPU的使用率
cpu_percpu = psutil.cpu_percent(interval=1, percpu=True)
for i, percent inenumerate(cpu_percpu):
print(f"CPU{i}: {percent}%")
# 获取CPU频率
cpu_freq = psutil.cpu_freq()
if cpu_freq:
print(f"CPU频率: 当前={cpu_freq.current:.2f}MHz, 最大={cpu_freq.max:.2f}MHz, 最小={cpu_freq.min:.2f}MHz")
# 获取CPU统计信息
cpu_stats = psutil.cpu_stats()
print(f"CPU上下文切换次数: {cpu_stats.ctx_switches}")
print(f"CPU中断次数: {cpu_stats.interrupts}")
2. 查看内存使用情况
import psutil
memory = psutil.virtual_memory()
print(f"总内存: {memory.total / (1024**3):.2f} GB")
print(f"已用内存: {memory.used / (1024**3):.2f} GB")
print(f"可用内存: {memory.available / (1024**3):.2f} GB")
print(f"内存使用率: {memory.percent}%")
# 查看交换内存
swap = psutil.swap_memory()
print(f"交换内存: 总共={swap.total/(1024**3):.2f}GB, 已用={swap.used/(1024**3):.2f}GB")
3. 查看磁盘使用情况
import psutil
# 获取所有磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
try:
usage = psutil.disk_usage(partition.mountpoint)
print(f"设备: {partition.device}")
print(f"挂载点: {partition.mountpoint}")
print(f"文件系统类型: {partition.fstype}")
print(f"总大小: {usage.total / (1024**3):.2f} GB")
print(f"已用: {usage.used / (1024**3):.2f} GB")
print(f"可用: {usage.free / (1024**3):.2f} GB")
print(f"使用率: {usage.percent}%")
print("-" * 40)
except PermissionError:
pass
# 磁盘IO统计
disk_io = psutil.disk_io_counters()
print(f"磁盘读取次数: {disk_io.read_count}")
print(f"磁盘写入次数: {disk_io.write_count}")
print(f"磁盘读取字节数: {disk_io.read_bytes / (1024**3):.2f} GB")
print(f"磁盘写入字节数: {disk_io.write_bytes / (1024**3):.2f} GB")
4. 查看网络信息
import psutil
# 获取网络接口信息
net_if_addrs = psutil.net_if_addrs()
for interface, addrs in net_if_addrs.items():
print(f"网卡: {interface}")
for addr in addrs:
if addr.family.name == 'AF_INET':
print(f" IP地址: {addr.address}")
print(f" 子网掩码: {addr.netmask}")
# 获取网络IO统计
net_io = psutil.net_io_counters()
print(f"发送字节数: {net_io.bytes_sent / (1024**2):.2f} MB")
print(f"接收字节数: {net_io.bytes_recv / (1024**2):.2f} MB")
print(f"发送包数: {net_io.packets_sent}")
print(f"接收包数: {net_io.packets_recv}")
二、进程管理指令
1. 列出所有进程
import psutil
# 获取所有进程
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
info = proc.info
print(f"PID: {info['pid']:>6} | CPU: {info['cpu_percent']:>5.1f}% | "
f"内存: {info['memory_percent']:>5.1f}% | 进程: {info['name']}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# 按CPU使用率排序TOP 10进程
print("\n=== CPU占用TOP 10进程 ===")
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
top_cpu = sorted(processes, key=lambda x: x['cpu_percent'] or0, reverse=True)[:10]
for p in top_cpu:
print(f"PID: {p['pid']:>6} | CPU: {p['cpu_percent']:>5.1f}% | {p['name']}")
2. 终止指定进程
import psutil
defkill_process_by_name(process_name):
"""根据进程名终止进程"""
killed = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
proc.terminate()
proc.wait(timeout=3)
killed.append(proc.info)
print(f"已终止进程: PID={proc.info['pid']}, 名称={proc.info['name']}")
except psutil.NoSuchProcess:
pass
except psutil.TimeoutExpired:
proc.kill()
print(f"强制终止进程: PID={proc.info['pid']}")
return killed
# 使用示例
# kill_process_by_name("chrome")
3. 监控特定进程资源占用
import psutil
import time
defmonitor_process(pid, interval=1, duration=10):
"""监控指定进程的资源占用"""
try:
process = psutil.Process(pid)
print(f"开始监控进程: PID={pid}, 名称={process.name()}")
for _ inrange(duration // interval):
try:
cpu = process.cpu_percent(interval=interval)
mem = process.memory_info()
print(f"CPU: {cpu}% | 内存: {mem.rss / (1024**2):.2f} MB | 线程数: {process.num_threads()}")
except psutil.NoSuchProcess:
print("进程已退出")
break
except psutil.NoSuchProcess:
print(f"找不到进程 PID={pid}")
# monitor_process(1234)
三、文件与目录管理指令
1. 批量重命名文件
import os
from pathlib import Path
defbatch_rename(directory, old_pattern, new_pattern):
"""批量重命名文件,支持正则替换"""
import re
dir_path = Path(directory)
count = 0
for file_path in dir_path.iterdir():
if file_path.is_file():
new_name = re.sub(old_pattern, new_pattern, file_path.name)
if new_name != file_path.name:
new_path = file_path.with_name(new_name)
file_path.rename(new_path)
print(f"重命名: {file_path.name} -> {new_name}")
count += 1
print(f"\n共重命名 {count} 个文件")
# 使用示例:将所有 .jpg 文件改为 .png
# batch_rename("/path/to/dir", r"\.jpg$", ".png")
2. 查找大文件
import os
from pathlib import Path
deffind_large_files(directory, size_mb=100):
"""查找大于指定大小的文件"""
dir_path = Path(directory)
large_files = []
for root, dirs, files in os.walk(dir_path):
for file in files:
file_path = Path(root) / file
try:
size = file_path.stat().st_size
if size > size_mb * 1024 * 1024:
large_files.append((file_path, size))
except (PermissionError, OSError):
pass
# 按大小排序
large_files.sort(key=lambda x: x[1], reverse=True)
print(f"找到 {len(large_files)} 个大于 {size_mb}MB 的文件:")
for path, size in large_files:
print(f" {size / (1024**3):.2f} GB - {path}")
return large_files
# find_large_files("/var/log", 500)
3. 清理临时文件
import os
import shutil
from pathlib import Path
import time
defclean_temp_files(directory, max_age_days=7):
"""清理指定目录中超过指定天数的文件"""
dir_path = Path(directory)
now = time.time()
max_age_seconds = max_age_days * 24 * 60 * 60
cleaned = 0
freed_space = 0
for root, dirs, files in os.walk(dir_path):
for file in files:
file_path = Path(root) / file
try:
file_age = now - file_path.stat().st_mtime
if file_age > max_age_seconds:
size = file_path.stat().st_size
file_path.unlink()
cleaned += 1
freed_space += size
print(f"已删除: {file_path}")
except (PermissionError, OSError):
pass
print(f"\n清理完成: 删除 {cleaned} 个文件, 释放 {freed_space / (1024**2):.2f} MB 空间")
return cleaned, freed_space
4. 目录大小统计
import os
from pathlib import Path
defget_directory_size(directory):
"""计算目录总大小"""
total_size = 0
dir_path = Path(directory)
for item in dir_path.rglob('*'):
if item.is_file():
try:
total_size += item.stat().st_size
except (PermissionError, OSError):
pass
return total_size
defformat_size(size_bytes):
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
returnf"{size_bytes:.2f}{unit}"
size_bytes /= 1024.0
returnf"{size_bytes:.2f} PB"
# 使用示例
# size = get_directory_size("/var/log")
# print(f"目录总大小: {format_size(size)}")
四、日志分析指令
1. 实时日志监控
import time
from pathlib import Path
deftail_log(log_file, lines=10, follow=True):
"""实时监控日志文件(类似 tail -f)"""
path = Path(log_file)
# 先输出最后N行
withopen(path, 'r', encoding='utf-8') as f:
# 移动到文件末尾
f.seek(0, 2)
file_size = f.tell()
lines_found = []
block_size = 4096
blocks = (file_size // block_size) + 1
for i inrange(blocks):
position = file_size - (i + 1) * block_size
position = max(position, 0)
f.seek(position)
block = f.read(block_size)
lines_found.extend(block.split('\n'))
iflen(lines_found) > lines:
break
# 输出最后N行
for line in lines_found[-lines:]:
if line.strip():
print(line)
# 持续监控新内容
if follow:
whileTrue:
where = f.tell()
line = f.readline()
ifnot line:
time.sleep(0.5)
f.seek(where)
else:
print(line.rstrip())
# tail_log("/var/log/syslog", lines=20)
2. 日志关键词过滤
import re
from collections import Counter
from datetime import datetime
defanalyze_log(log_file, patterns=None):
"""分析日志中的关键词和错误"""
if patterns isNone:
patterns = {
'error': re.compile(r'ERROR|error|Error|FATAL|CRITICAL', re.IGNORECASE),
'warning': re.compile(r'WARN|WARNING|warn', re.IGNORECASE),
'timeout': re.compile(r'timeout|Timeout|TIMEOUT', re.IGNORECASE),
'connection': re.compile(r'connection|Connection', re.IGNORECASE),
}
stats = {key: 0for key in patterns}
timeline = []
withopen(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line inenumerate(f, 1):
for name, pattern in patterns.items():
if pattern.search(line):
stats[name] += 1
timeline.append((line_num, name, line.strip()[:100]))
print("=== 日志分析结果 ===")
for name, count in stats.items():
print(f" {name}: {count} 次")
print(f"\n=== 最近10条异常 ===")
for line_num, name, content in timeline[-10:]:
print(f" 行 {line_num} [{name.upper()}]: {content}")
return stats
# analyze_log("/var/log/nginx/error.log")
3. 日志轮转管理
import os
import gzip
import shutil
from pathlib import Path
from datetime import datetime
defrotate_log(log_file, max_size_mb=100, keep_count=5):
"""日志轮转:超过大小限制时压缩备份"""
path = Path(log_file)
ifnot path.exists():
return
size_mb = path.stat().st_size / (1024 * 1024)
if size_mb < max_size_mb:
return
# 删除最旧的备份
oldest = Path(f"{log_file}.{keep_count}.gz")
if oldest.exists():
oldest.unlink()
# 重命名已有备份
for i inrange(keep_count - 1, 0, -1):
old_backup = Path(f"{log_file}.{i}.gz")
new_backup = Path(f"{log_file}.{i+1}.gz")
if old_backup.exists():
old_backup.rename(new_backup)
# 压缩当前日志
backup_path = Path(f"{log_file}.1.gz")
withopen(path, 'rb') as f_in:
with gzip.open(backup_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# 清空原文件
path.write_text('')
print(f"日志轮转完成: {log_file} -> {backup_path}")
# rotate_log("/var/log/app.log", max_size_mb=50)
五、网络诊断指令
1. Ping监控
import platform
import subprocess
defping_host(host, count=4):
"""Ping主机检测连通性"""
param = '-n'if platform.system().lower() == 'windows'else'-c'
command = ['ping', param, str(count), host]
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"✅ {host} 可达")
# 解析延迟
for line in result.stdout.split('\n'):
if'time='in line or'ttl='in line:
print(f" {line.strip()}")
returnTrue
else:
print(f"❌ {host} 不可达")
returnFalse
except subprocess.TimeoutExpired:
print(f"⏰ {host} 响应超时")
returnFalse
except Exception as e:
print(f"⚠️ Ping异常: {e}")
returnFalse
# 批量Ping
hosts = ['8.8.8.8', '114.114.114.114', 'baidu.com']
for host in hosts:
ping_host(host)
2. 端口扫描
import socket
from concurrent.futures import ThreadPoolExecutor
defscan_port(host, ports=None):
"""扫描指定主机的端口"""
if ports isNone:
ports = [21, 22, 23, 25, 53, 80, 110, 135, 139, 443, 445, 993, 995, 3306, 3389, 5432, 6379, 8080, 8443, 9200]
open_ports = []
defcheck_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
if result == 0:
# 尝试获取服务名称
try:
service = socket.getservbyport(port)
except OSError:
service = 'unknown'
open_ports.append((port, service))
sock.close()
except Exception:
pass
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(check_port, ports)
print(f"主机 {host} 开放端口:")
for port, service insorted(open_ports):
print(f" 端口 {port:>5}/tcp - {service}")
return open_ports
# scan_port('192.168.1.1')
3. DNS查询
import socket
defdns_query(domain):
"""DNS查询获取IP地址"""
try:
results = socket.getaddrinfo(domain, None)
print(f"域名 {domain} 解析结果:")
ips = set()
for result in results:
ip = result[4][0]
if ip notin ips:
ips.add(ip)
print(f" {result[0].name}: {ip}")
returnlist(ips)
except socket.gaierror as e:
print(f"DNS解析失败: {e}")
return []
defreverse_dns(ip):
"""反向DNS查询"""
try:
hostname = socket.gethostbyaddr(ip)
print(f"IP {ip} -> 主机名: {hostname[0]}")
return hostname[0]
except socket.herror:
print(f"反向DNS查询失败: {ip}")
returnNone
# dns_query('www.baidu.com')
# reverse_dns('8.8.8.8')
六、定时任务与调度指令
1. 定时执行任务
import schedule
import time
from datetime import datetime
defjob():
"""定时执行的任务"""
print(f"[{datetime.now()}] 执行定时任务...")
# 这里写你的运维任务逻辑
# 设置定时任务
schedule.every(10).minutes.do(job) # 每10分钟
schedule.every().hour.do(job) # 每小时
schedule.every().day.at("02:00").do(job) # 每天凌晨2点
schedule.every().monday.at("09:00").do(job) # 每周一9点
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15
# 运行调度
whileTrue:
schedule.run_pending()
time.sleep(1)
2. Cron风格调度(APScheduler)
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
defcleanup_job():
print(f"[{datetime.now()}] 执行清理任务")
defbackup_job():
print(f"[{datetime.now()}] 执行备份任务")
scheduler = BackgroundScheduler()
# Cron风格调度
scheduler.add_job(cleanup_job, 'cron', hour=2, minute=0) # 每天凌晨2点
scheduler.add_job(backup_job, 'cron', day_of_week='sun', hour=3) # 每周日凌晨3点
scheduler.add_job(cleanup_job, 'interval', hours=4) # 每4小时
scheduler.start()
# 查看已调度的任务
for job in scheduler.get_jobs():
print(f"任务: {job.name}, 下次执行: {job.next_run_time}")
七、系统服务管理指令
1. 服务状态检查
import subprocess
import platform
defcheck_service(service_name):
"""检查系统服务状态"""
system = platform.system()
if system == 'Linux':
try:
result = subprocess.run(
['systemctl', 'status', service_name],
capture_output=True, text=True
)
print(f"服务 {service_name}:")
print(result.stdout[:500])
return result.returncode == 0
except FileNotFoundError:
print("systemctl 不可用")
returnFalse
elif system == 'Windows':
try:
result = subprocess.run(
['sc', 'query', service_name],
capture_output=True, text=True
)
print(f"服务 {service_name}:")
print(result.stdout[:500])
return'RUNNING'in result.stdout
except FileNotFoundError:
print("sc 命令不可用")
returnFalse
# check_service('nginx')
# check_service('sshd')
2. 服务重启
import subprocess
defrestart_service(service_name):
"""重启系统服务"""
try:
# 先停止
subprocess.run(['systemctl', 'stop', service_name], check=True)
print(f"服务 {service_name} 已停止")
# 再启动
subprocess.run(['systemctl', 'start', service_name], check=True)
print(f"服务 {service_name} 已启动")
# 检查状态
result = subprocess.run(['systemctl', 'is-active', service_name],
capture_output=True, text=True)
if result.stdout.strip() == 'active':
print(f"✅ 服务 {service_name} 运行正常")
returnTrue
else:
print(f"❌ 服务 {service_name} 启动失败")
returnFalse
except subprocess.CalledProcessError as e:
print(f"❌ 服务操作失败: {e}")
returnFalse
八、数据备份指令
1. 数据库备份(MySQL)
import subprocess
from datetime import datetime
defbackup_mysql(host, user, password, databases, backup_dir):
"""备份MySQL数据库"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
for db in databases:
backup_file = f"{backup_dir}/{db}_{timestamp}.sql"
command = [
'mysqldump',
f'--host={host}',
f'--user={user}',
f'--password={password}',
'--single-transaction',
'--routines',
'--triggers',
db
]
try:
withopen(backup_file, 'w') as f:
result = subprocess.run(command, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
# 压缩备份
import gzip
withopen(backup_file, 'rb') as f_in:
with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
import shutil
shutil.copyfileobj(f_in, f_out)
import os
os.remove(backup_file)
print(f"✅ 数据库 {db} 备份完成: {backup_file}.gz")
else:
print(f"❌ 数据库 {db} 备份失败: {result.stderr}")
except Exception as e:
print(f"❌ 备份异常: {e}")
# backup_mysql('localhost', 'root', 'password', ['db1', 'db2'], '/backup/mysql')
2. 文件备份与同步
import shutil
import os
from pathlib import Path
import hashlib
defbackup_files(source_dir, backup_dir):
"""增量备份文件"""
source = Path(source_dir)
backup = Path(backup_dir)
backup.mkdir(parents=True, exist_ok=True)
backed_up = 0
skipped = 0
for item in source.rglob('*'):
if item.is_file():
relative_path = item.relative_to(source)
dest_path = backup / relative_path
# 检查目标文件是否存在
if dest_path.exists():
# 比较文件是否变化
if item.stat().st_mtime <= dest_path.stat().st_mtime:
skipped += 1
continue
# 创建目录结构
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, dest_path)
backed_up += 1
print(f"已备份: {relative_path}")
print(f"\n备份完成: 新增/更新 {backed_up} 个文件, 跳过 {skipped} 个未变更文件")
# backup_files('/data/app', '/backup/app')
九、SSH远程执行指令
1. 使用Paramiko执行远程命令
import paramiko
defssh_exec(hostname, port, username, password, command):
"""SSH远程执行命令"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password, timeout=10)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
print(f"命令: {command}")
print(f"退出码: {exit_code}")
if output:
print(f"输出:\n{output}")
if error:
print(f"错误:\n{error}")
client.close()
return exit_code == 0, output, error
except Exception as e:
print(f"SSH连接失败: {e}")
returnFalse, '', str(e)
# 批量执行
servers = [
('192.168.1.10', 22, 'admin', 'password'),
('192.168.1.11', 22, 'admin', 'password'),
]
for host, port, user, pwd in servers:
print(f"\n=== 连接 {host} ===")
ssh_exec(host, port, user, pwd, 'df -h && free -m')
2. SSH密钥认证
import paramiko
defssh_exec_key(hostname, port, username, key_path, command):
"""使用SSH密钥执行远程命令"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.RSAKey.from_private_key_file(key_path)
client.connect(hostname, port, username, pkey=private_key, timeout=10)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode('utf-8')
client.close()
return output
except Exception as e:
print(f"SSH密钥认证失败: {e}")
return''
# ssh_exec_key('192.168.1.10', 22, 'admin', '~/.ssh/id_rsa', 'uptime')
十、监控告警指令
1. 系统资源告警
import psutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
defcheck_and_alert():
"""检查系统资源并发送告警"""
alerts = []
# CPU告警
cpu = psutil.cpu_percent(interval=1)
if cpu > 90:
alerts.append(f"CPU使用率过高: {cpu}%")
# 内存告警
memory = psutil.virtual_memory()
if memory.percent > 90:
alerts.append(f"内存使用率过高: {memory.percent}%")
# 磁盘告警
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
if usage.percent > 90:
alerts.append(f"磁盘 {partition.mountpoint} 使用率过高: {usage.percent}%")
except PermissionError:
pass
# 发送告警
if alerts:
alert_message = "\n".join(alerts)
send_alert_email("系统资源告警", alert_message)
print(f"⚠️ 告警:\n{alert_message}")
else:
print("✅ 系统资源正常")
defsend_alert_email(subject, body):
"""发送告警邮件"""
msg = MIMEMultipart()
msg['From'] = 'monitor@company.com'
msg['To'] = 'admin@company.com'
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP('smtp.company.com', 25)
server.send_message(msg)
server.quit()
print("告警邮件已发送")
except Exception as e:
print(f"邮件发送失败: {e}")
# check_and_alert()
2. Webhook告警(钉钉/企业微信)
import requests
import json
defsend_dingtalk_alert(webhook_url, content):
"""发送钉钉机器人告警"""
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "text",
"text": {
"content": f"【系统告警】\n{content}"
},
"at": {
"isAtAll": True
}
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
result = response.json()
if result.get('errcode') == 0:
print("钉钉告警发送成功")
else:
print(f"钉钉告警发送失败: {result}")
defsend_wechat_alert(webhook_url, content):
"""发送企业微信机器人告警"""
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "text",
"text": {
"content": f"【系统告警】\n{content}"
}
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
result = response.json()
if result.get('errcode') == 0:
print("企业微信告警发送成功")
else:
print(f"企业微信告警发送失败: {result}")
# 使用示例
# send_dingtalk_alert('https://oapi.dingtalk.com/robot/send?access_token=xxx', 'CPU使用率超过90%')
十一、常用系统信息收集指令
1. 一键收集系统信息
import psutil
import platform
import socket
from datetime import datetime
defcollect_system_info():
"""一键收集系统信息"""
info = {}
# 基本信息
info['主机名'] = socket.gethostname()
info['操作系统'] = f"{platform.system()}{platform.release()}"
info['架构'] = platform.machine()
info['Python版本'] = platform.python_version()
info['当前时间'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# CPU信息
info['CPU核心数'] = psutil.cpu_count(logical=True)
info['CPU使用率'] = f"{psutil.cpu_percent(interval=1)}%"
# 内存信息
memory = psutil.virtual_memory()
info['总内存'] = f"{memory.total / (1024**3):.2f} GB"
info['内存使用率'] = f"{memory.percent}%"
# 磁盘信息
disk = psutil.disk_usage('/')
info['磁盘总大小'] = f"{disk.total / (1024**3):.2f} GB"
info['磁盘使用率'] = f"{disk.percent}%"
# 网络信息
net_io = psutil.net_io_counters()
info['网络发送'] = f"{net_io.bytes_sent / (1024**2):.2f} MB"
info['网络接收'] = f"{net_io.bytes_recv / (1024**2):.2f} MB"
# 运行时间
boot_time = datetime.fromtimestamp(psutil.boot_time())
uptime = datetime.now() - boot_time
info['运行时间'] = f"{uptime.days}天{uptime.seconds // 3600}小时"
# 进程数
info['运行进程数'] = len(psutil.pids())
print("=" * 50)
print(" 系统信息报告")
print("=" * 50)
for key, value in info.items():
print(f" {key}: {value}")
print("=" * 50)
return info
# collect_system_info()
2. 生成系统报告
import psutil
import json
from pathlib import Path
from datetime import datetime
defgenerate_system_report(output_dir='.'):
"""生成系统状态报告并保存为JSON"""
report = {
'timestamp': datetime.now().isoformat(),
'hostname': psutil.users()[0].name if psutil.users() else'unknown',
'cpu': {
'percent': psutil.cpu_percent(interval=1),
'percpu': psutil.cpu_percent(interval=1, percpu=True),
'count': psutil.cpu_count(),
'freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() elseNone,
},
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'percent': psutil.virtual_memory().percent,
},
'disk': {},
'network': {
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_recv': psutil.net_io_counters().bytes_recv,
},
'processes': len(psutil.pids()),
}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
report['disk'][partition.mountpoint] = {
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': usage.percent,
}
except PermissionError:
pass
# 保存报告
output_path = Path(output_dir) / f"system_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
withopen(output_path, 'w') as f:
json.dump(report, f, indent=2)
print(f"系统报告已生成: {output_path}")
return output_path
# generate_system_report('/tmp/reports')
十二、实用运维工具脚本
1. 服务器健康检查脚本
import psutil
import socket
import subprocess
from datetime import datetime
classHealthChecker:
"""服务器健康检查工具"""
def__init__(self):
self.results = []
defcheck_cpu(self, threshold=90):
"""检查CPU健康"""
cpu = psutil.cpu_percent(interval=1)
status = "OK"if cpu < threshold else"WARNING"
self.results.append(('CPU', status, f"使用率: {cpu}%"))
return status == "OK"
defcheck_memory(self, threshold=90):
"""检查内存健康"""
mem = psutil.virtual_memory()
status = "OK"if mem.percent < threshold else"WARNING"
self.results.append(('内存', status, f"使用率: {mem.percent}%"))
return status == "OK"
defcheck_disk(self, threshold=90):
"""检查磁盘健康"""
all_ok = True
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
if usage.percent > threshold:
self.results.append((
f"磁盘({partition.mountpoint})",
"WARNING",
f"使用率: {usage.percent}%"
))
all_ok = False
else:
self.results.append((
f"磁盘({partition.mountpoint})",
"OK",
f"使用率: {usage.percent}%"
))
except PermissionError:
pass
return all_ok
defcheck_network(self, host='8.8.8.8'):
"""检查网络连通性"""
try:
socket.create_connection((host, 53), timeout=3)
self.results.append(('网络', 'OK', f"可访问 {host}"))
returnTrue
except Exception:
self.results.append(('网络', 'ERROR', f"无法访问 {host}"))
returnFalse
defrun_all_checks(self):
"""运行所有检查"""
print(f"\n{'='*50}")
print(f" 服务器健康检查 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*50}")
self.check_cpu()
self.check_memory()
self.check_disk()
self.check_network()
for name, status, detail inself.results:
icon = "✅"if status == "OK"else"⚠️"if status == "WARNING"else"❌"
print(f" {icon}{name}: {detail}")
print(f"{'='*50}")
returnall(r[1] == "OK"for r inself.results)
# checker = HealthChecker()
# is_healthy = checker.run_all_checks()
2. 批量服务器管理
from concurrent.futures import ThreadPoolExecutor, as_completed
import paramiko
classServerManager:
"""批量服务器管理工具"""
def__init__(self):
self.servers = []
defadd_server(self, hostname, port, username, password=None, key_path=None):
"""添加服务器"""
self.servers.append({
'hostname': hostname,
'port': port,
'username': username,
'password': password,
'key_path': key_path,
})
defexec_on_server(self, server, command):
"""在单个服务器上执行命令"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_kwargs = {
'hostname': server['hostname'],
'port': server['port'],
'username': server['username'],
'timeout': 10,
}
if server.get('key_path'):
private_key = paramiko.RSAKey.from_private_key_file(server['key_path'])
connect_kwargs['pkey'] = private_key
else:
connect_kwargs['password'] = server['password']
client.connect(**connect_kwargs)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
client.close()
return server['hostname'], True, output
except Exception as e:
return server['hostname'], False, str(e)
defexec_on_all(self, command, max_workers=10):
"""在所有服务器上执行命令"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.exec_on_server, server, command): server
for server inself.servers
}
for future in as_completed(futures):
hostname, success, output = future.result()
results[hostname] = (success, output)
status = "✅"if success else"❌"
print(f"{status}{hostname}: {output[:100]}...")
return results
# 使用示例
# manager = ServerManager()
# manager.add_server('192.168.1.10', 22, 'admin', password='xxx')
# manager.add_server('192.168.1.11', 22, 'admin', key_path='~/.ssh/id_rsa')
# results = manager.exec_on_all('uptime && free -m')
3. Docker容器管理
import docker
import json
classDockerManager:
"""Docker容器管理工具"""
def__init__(self):
self.client = docker.from_env()
deflist_containers(self, all=True):
"""列出所有容器"""
containers = self.client.containers.list(all=all)
print(f"{'容器ID':>12} | {'名称':<25} | {'状态':<10} | {'镜像'}")
print("-" * 80)
for container in containers:
print(f"{container.short_id:>12} | {container.name:<25} | "
f"{container.status:<10} | {container.image.tags[0] if container.image.tags else'N/A'}")
defget_container_stats(self, container_name):
"""获取容器资源使用"""
try:
container = self.client.containers.get(container_name)
stats = container.stats(stream=False)
# CPU使用率
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
cpu_percent = (cpu_delta / system_delta) * 100if system_delta > 0else0
# 内存使用
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
memory_percent = (memory_usage / memory_limit) * 100
print(f"容器 {container_name}:")
print(f" CPU: {cpu_percent:.2f}%")
print(f" 内存: {memory_usage / (1024*1024):.2f} MB ({memory_percent:.2f}%)")
except docker.errors.NotFound:
print(f"容器 {container_name} 不存在")
defrestart_container(self, container_name):
"""重启容器"""
try:
container = self.client.containers.get(container_name)
container.restart()
print(f"容器 {container_name} 已重启")
except docker.errors.NotFound:
print(f"容器 {container_name} 不存在")
defprune_unused(self):
"""清理未使用的资源"""
result = self.client.containers.prune()
print(f"清理容器: {len(result.get('ContainersDeleted', []))} 个")
result = self.client.images.prune()
print(f"清理镜像: {len(result.get('ImagesDeleted', []))} 个")
result = self.client.volumes.prune()
print(f"清理卷: {len(result.get('VolumesDeleted', []))} 个")
十三、Python运维最佳实践
1. 日志记录
import logging
defsetup_logger(name, log_file, level=logging.INFO):
"""配置运维日志"""
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
return logger
# logger = setup_logger('ops', '/var/log/ops/operations.log')
# logger.info('执行了数据库备份')
# logger.error('服务器连接失败')
2. 异常处理与重试
import time
from functools import wraps
defretry(max_retries=3, delay=1, backoff=2):
"""重试装饰器"""
defdecorator(func):
@wraps(func)
defwrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries >= max_retries:
raise
print(f"重试 {retries}/{max_retries},等待 {current_delay}秒... 错误: {e}")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
# @retry(max_retries=3, delay=2)
# def ssh_connect(host):
# # SSH连接代码
# pass
3. 配置管理
import json
from pathlib import Path
classConfig:
"""配置文件管理"""
def__init__(self, config_path):
self.config_path = Path(config_path)
self.data = {}
self.load()
defload(self):
ifself.config_path.exists():
withopen(self.config_path) as f:
self.data = json.load(f)
defsave(self):
withopen(self.config_path, 'w') as f:
json.dump(self.data, f, indent=2)
defget(self, key, default=None):
returnself.data.get(key, default)
defset(self, key, value):
self.data[key] = value
self.save()
# config = Config('ops_config.json')
# db_host = config.get('database.host', 'localhost')
总结
本文介绍了Python运维工程师日常工作中最常用的核心指令和脚本,涵盖了:
掌握这些指令,能够大幅提升日常运维工作的自动化程度和效率。建议将这些脚本整理为自己的运维工具箱,根据实际情况持续优化和扩展。
提示:本文中的代码示例可直接复制使用,建议配合 pip install psutil paramiko schedule apscheduler docker 安装依赖后运行。