Python运维工程师必会的指令
掌握这些Python运维指令,让你的运维工作自动化、高效化
欢迎大家关注此公众号,后台点击按钮【免费资料】可免费获取【Python入门30节课】电子书
此外小庄推荐一本适合于新手\小白入手一本 Python基础书籍,欢迎大家订阅,也感谢大家支持,我才有更新的动力
前言
Python运维工程师需要掌握一系列用于系统管理、自动化部署、监控告警的指令和工具。本文将系统性地介绍运维工程师必须掌握的Python指令,帮助你构建强大的运维自动化体系。
一、系统信息获取
1.1 获取系统信息
import platform
print(platform.system()) # 操作系统: Windows/Linux/Darwin
print(platform.release()) # 系统版本
print(platform.version()) # 系统详细版本
print(platform.machine()) # 机器架构: x86_64/AMD64
print(platform.processor()) # 处理器信息
print(platform.node()) # 主机名
1.2 获取CPU信息
import psutil
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
print(f'CPU使用率: {cpu_percent}%')
# 每个CPU核心的使用率
cpu_per_core = psutil.cpu_percent(interval=1, percpu=True)
for i, percent in enumerate(cpu_per_core):
print(f'CPU核心{i}: {percent}%')
# CPU数量
print(f'逻辑CPU: {psutil.cpu_count()}')
print(f'物理CPU: {psutil.cpu_count(logical=False)}')
# CPU频率
cpu_freq = psutil.cpu_freq()
print(f'当前频率: {cpu_freq.current}MHz')
1.3 获取内存信息
import psutil
memory = psutil.virtual_memory()
print(f'总内存: {memory.total / 1024 / 1024 / 1024:.2f}GB')
print(f'已使用: {memory.used / 1024 / 1024 / 1024:.2f}GB')
print(f'可用内存: {memory.available / 1024 / 1024 / 1024:.2f}GB')
print(f'使用率: {memory.percent}%')
# 交换内存
swap = psutil.swap_memory()
print(f'交换内存使用率: {swap.percent}%')
1.4 获取磁盘信息
import psutil
# 磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
print(f'设备: {partition.device}, 挂载点: {partition.mountpoint}')
usage = psutil.disk_usage(partition.mountpoint)
print(f' 总空间: {usage.total / 1024 / 1024 / 1024:.2f}GB')
print(f' 已使用: {usage.used / 1024 / 1024 / 1024:.2f}GB')
print(f' 使用率: {usage.percent}%')
# 磁盘IO
disk_io = psutil.disk_io_counters()
print(f'读取字节数: {disk_io.read_bytes}')
print(f'写入字节数: {disk_io.write_bytes}')
1.5 获取网络信息
import psutil
# 网络接口
net_interfaces = psutil.net_if_addrs()
for name, addrs in net_interfaces.items():
print(f'接口: {name}')
for addr in addrs:
if addr.family.name == 'AF_INET':
print(f' IPv4: {addr.address}')
# 网络IO
net_io = psutil.net_io_counters()
print(f'发送字节数: {net_io.bytes_sent}')
print(f'接收字节数: {net_io.bytes_recv}')
# 网络连接
connections = psutil.net_connections()
for conn in connections[:5]:
print(f'本地地址: {conn.laddr}, 远程地址: {conn.raddr}, 状态: {conn.status}')
二、进程管理
2.1 查看所有进程
import psutil
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
print(f"PID: {proc.info['pid']}, 名称: {proc.info['name']}, "
f"CPU: {proc.info['cpu_percent']}%, 内存: {proc.info['memory_percent']:.2f}%")
2.2 查找特定进程
import psutil
def find_process_by_name(name):
"""根据进程名查找进程"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if name.lower() in proc.info['name'].lower():
processes.append(proc)
return processes
# 查找Python进程
python_procs = find_process_by_name('python')
for proc in python_procs:
print(f"PID: {proc.pid}, 命令: {proc.cmdline()}")
2.3 进程操作
import psutil
import os
# 获取当前进程
current_proc = psutil.Process(os.getpid())
print(f'当前进程PID: {current_proc.pid}')
print(f'进程名称: {current_proc.name()}')
print(f'进程状态: {current_proc.status()}')
# 终止进程
proc = psutil.Process(1234)
proc.terminate() # 发送SIGTERM
# proc.kill() # 发送SIGKILL
# 等待进程结束
proc.wait(timeout=10)
2.4 获取进程详细信息
import psutil
proc = psutil.Process(1234)
# 基本信息
print(f'PID: {proc.pid}')
print(f'名称: {proc.name()}')
print(f'状态: {proc.status()}')
print(f'创建时间: {proc.create_time()}')
# 资源使用
print(f'CPU使用率: {proc.cpu_percent(interval=1)}%')
print(f'内存使用: {proc.memory_info().rss / 1024 / 1024:.2f}MB')
print(f'内存使用率: {proc.memory_percent():.2f}%')
# 命令行
print(f'命令行: {proc.cmdline()}')
# 工作目录
print(f'工作目录: {proc.cwd()}')
# 打开的文件
for file in proc.open_files():
print(f'打开文件: {file.path}')
三、文件系统操作
3.1 文件和目录操作
import os
import shutil
import glob
# 获取当前目录
current_dir = os.getcwd()
print(f'当前目录: {current_dir}')
# 列出目录内容
files = os.listdir('.')
for file in files:
print(file)
# 判断路径
print(os.path.exists('/path/to/file'))
print(os.path.isfile('/path/to/file'))
print(os.path.isdir('/path/to/dir'))
# 获取文件信息
file_path = 'test.txt'
print(f'文件大小: {os.path.getsize(file_path)}字节')
print(f'修改时间: {os.path.getmtime(file_path)}')
# 创建目录
os.makedirs('/path/to/dir', exist_ok=True)
# 删除目录
shutil.rmtree('/path/to/dir')
# 复制文件
shutil.copy('source.txt', 'destination.txt')
shutil.copytree('source_dir', 'destination_dir')
# 移动/重命名
shutil.move('old_name.txt', 'new_name.txt')
# 删除文件
os.remove('file.txt')
# 查找文件
files = glob.glob('*.py')
files = glob.glob('**/*.py', recursive=True)
3.2 文件内容操作
# 读取文件
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 逐行读取
with open('file.txt', 'r', encoding='utf-8') as f:
for line in f:
print(line.strip())
# 写入文件
with open('output.txt', 'w', encoding='utf-8') as f:
f.write('Hello World\n')
# 追加写入
with open('log.txt', 'a', encoding='utf-8') as f:
f.write('New log entry\n')
3.3 文件监控
import os
import time
def watch_file(file_path, interval=1):
"""监控文件变化"""
last_mtime = os.path.getmtime(file_path)
while True:
current_mtime = os.path.getmtime(file_path)
if current_mtime != last_mtime:
print(f'文件 {file_path} 已更新')
last_mtime = current_mtime
time.sleep(interval)
# 使用watchdog库进行更强大的文件监控
# pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f'文件已修改: {event.src_path}')
def on_created(self, event):
print(f'文件已创建: {event.src_path}')
def on_deleted(self, event):
print(f'文件已删除: {event.src_path}')
observer = Observer()
observer.schedule(MyHandler(), path='.', recursive=True)
observer.start()
四、Shell命令执行
4.1 使用subprocess执行命令
import subprocess
# 执行简单命令
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
print(result.returncode)
# 执行shell命令
result = subprocess.run('ls -la | grep .py', shell=True, capture_output=True, text=True)
# 超时控制
try:
result = subprocess.run(['ping', '-c', '4', 'google.com'],
capture_output=True, text=True, timeout=10)
except subprocess.TimeoutExpired:
print('命令执行超时')
4.2 使用subprocess.Popen
import subprocess
# 启动进程
process = subprocess.Popen(['ping', '-c', '4', 'google.com'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
# 获取输出
stdout, stderr = process.communicate()
print(f'输出: {stdout}')
print(f'错误: {stderr}')
print(f'返回码: {process.returncode}')
4.3 使用os.system
import os
# 简单执行命令
os.system('ls -la')
# 获取命令输出(使用popen)
output = os.popen('ls -la').read()
print(output)
4.4 使用paramiko进行SSH
import paramiko
# 创建SSH客户端
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect('hostname', username='user', password='password')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls -la')
print(stdout.read().decode())
# 使用密钥连接
private_key = paramiko.RSAKey.from_private_key_file('/path/to/key')
ssh.connect('hostname', username='user', pkey=private_key)
# 关闭连接
ssh.close()
五、日志管理
5.1 使用logging模块
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 记录日志
logger.info('这是一条信息日志')
logger.warning('这是一条警告日志')
logger.error('这是一条错误日志')
logger.debug('这是一条调试日志')
5.2 日志轮转
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# 按大小轮转
handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
# 按时间轮转
handler = TimedRotatingFileHandler(
'app.log',
when='midnight',
interval=1,
backupCount=30
)
5.3 解析日志文件
import re
from collections import Counter
def parse_nginx_log(log_file):
"""解析Nginx日志"""
ip_pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
ip_list = []
with open(log_file, 'r') as f:
for line in f:
match = re.search(ip_pattern, line)
if match:
ip_list.append(match.group())
# 统计IP访问次数
ip_counter = Counter(ip_list)
return ip_counter.most_common(10)
六、定时任务
6.1 使用schedule库
import schedule
import time
def job():
print("执行定时任务...")
# 每隔10分钟执行
schedule.every(10).minutes.do(job)
# 每小时执行
schedule.every().hour.do(job)
# 每天10:30执行
schedule.every().day.at("10:30").do(job)
# 每周一执行
schedule.every().monday.do(job)
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(1)
6.2 使用APScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
scheduler = BlockingScheduler()
# 间隔执行
@scheduler.scheduled_job(IntervalTrigger(seconds=30))
def interval_job():
print("每30秒执行一次")
# Cron表达式
@scheduler.scheduled_job(CronTrigger(hour=8, minute=30))
def cron_job():
print("每天8:30执行")
scheduler.start()
6.3 使用crontab
# 生成crontab配置
def generate_crontab(command, schedule):
"""生成crontab配置"""
return f"{schedule} {command}"
# 示例:每天凌晨2点执行备份
crontab = generate_crontab(
command="/usr/bin/python3 /path/to/backup.py",
schedule="0 2 * * *"
)
print(crontab)
七、网络监控
7.1 Ping检测
import subprocess
import platform
def ping(host, count=4):
"""Ping主机"""
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, str(count), host]
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=10)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
# 批量检测
hosts = ['google.com', 'github.com', 'example.com']
for host in hosts:
status = '在线' if ping(host) else '离线'
print(f'{host}: {status}')
7.2 端口检测
import socket
def check_port(host, port, timeout=2):
"""检测端口是否开放"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except socket.error:
return False
# 检测常用端口
ports = [22, 80, 443, 3306, 6379]
for port in ports:
status = '开放' if check_port('localhost', port) else '关闭'
print(f'端口 {port}: {status}')
7.3 HTTP健康检查
import requests
from datetime import datetime
def health_check(url, timeout=5):
"""HTTP健康检查"""
try:
start_time = datetime.now()
response = requests.get(url, timeout=timeout)
end_time = datetime.now()
response_time = (end_time - start_time).total_seconds()
return {
'url': url,
'status': response.status_code,
'response_time': response_time,
'healthy': response.status_code == 200
}
except requests.RequestException as e:
return {
'url': url,
'status': None,
'response_time': None,
'healthy': False,
'error': str(e)
}
# 批量健康检查
urls = [
'https://www.google.com',
'https://www.github.com',
'https://api.example.com'
]
for url in urls:
result = health_check(url)
status = '健康' if result['healthy'] else '异常'
print(f"{url}: {status}")
八、配置管理
8.1 使用YAML配置
import yaml
# 读取YAML配置
with open('config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
print(config['database']['host'])
print(config['database']['port'])
# 写入YAML配置
config = {
'database': {
'host': 'localhost',
'port': 3306,
'name': 'mydb'
},
'server': {
'host': '0.0.0.0',
'port': 8080
}
}
with open('config.yaml', 'w', encoding='utf-8') as f:
yaml.dump(config, f, default_flow_style=False)
8.2 使用JSON配置
import json
# 读取JSON配置
with open('config.json', 'r', encoding='utf-8') as f:
config = json.load(f)
# 写入JSON配置
with open('config.json', 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
8.3 使用环境变量
import os
# 读取环境变量
db_host = os.environ.get('DB_HOST', 'localhost')
db_port = int(os.environ.get('DB_PORT', 3306))
# 设置环境变量
os.environ['MY_VAR'] = 'my_value'
8.4 使用python-dotenv
from dotenv import load_dotenv
import os
# 加载.env文件
load_dotenv()
# 读取环境变量
secret_key = os.getenv('SECRET_KEY')
database_url = os.getenv('DATABASE_URL')
九、备份与恢复
9.1 文件备份
import shutil
import os
from datetime import datetime
def backup_file(source, backup_dir):
"""备份文件"""
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = os.path.basename(source)
backup_path = os.path.join(backup_dir, f'{filename}.{timestamp}.bak')
shutil.copy2(source, backup_path)
return backup_path
# 使用示例
backup_path = backup_file('important.db', './backups')
print(f'备份已创建: {backup_path}')
9.2 数据库备份
import subprocess
from datetime import datetime
def backup_mysql(host, user, password, database, backup_dir):
"""MySQL数据库备份"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f'{backup_dir}/{database}_{timestamp}.sql'
command = [
'mysqldump',
f'--host={host}',
f'--user={user}',
f'--password={password}',
database
]
with open(backup_file, 'w') as f:
subprocess.run(command, stdout=f, check=True)
return backup_file
# 使用示例
backup_file = backup_mysql('localhost', 'root', 'password', 'mydb', './backups')
print(f'数据库备份完成: {backup_file}')
9.3 自动清理旧备份
import os
import glob
from datetime import datetime, timedelta
def cleanup_old_backups(backup_dir, days=7):
"""清理超过指定天数的备份文件"""
cutoff_date = datetime.now() - timedelta(days=days)
for file_path in glob.glob(os.path.join(backup_dir, '*.bak')):
file_stat = os.stat(file_path)
file_mtime = datetime.fromtimestamp(file_stat.st_mtime)
if file_mtime < cutoff_date:
os.remove(file_path)
print(f'已删除旧备份: {file_path}')
# 使用示例
cleanup_old_backups('./backups', days=7)
十、监控告警
10.1 系统监控脚本
import psutil
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
def system_monitor():
"""系统监控"""
alerts = []
# CPU监控
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 90:
alerts.append(f'CPU使用率过高: {cpu_percent}%')
# 内存监控
memory = psutil.virtual_memory()
if memory.percent > 90:
alerts.append(f'内存使用率过高: {memory.percent}%')
# 磁盘监控
disk = psutil.disk_usage('/')
if disk.percent > 90:
alerts.append(f'磁盘使用率过高: {disk.percent}%')
return alerts
def send_alert_email(alerts, to_email):
"""发送告警邮件"""
if not alerts:
return
subject = f'系统告警 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
body = '\n'.join(alerts)
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'monitor@example.com'
msg['To'] = to_email
with smtplib.SMTP('smtp.example.com', 587) as server:
server.starttls()
server.login('user', 'password')
server.send_message(msg)
# 执行监控
alerts = system_monitor()
if alerts:
send_alert_email(alerts, 'admin@example.com')
10.2 钉钉/企业微信告警
import requests
import json
def send_dingtalk_alert(webhook, message):
"""发送钉钉告警"""
headers = {'Content-Type': 'application/json'}
data = {
'msgtype': 'text',
'text': {
'content': message
}
}
response = requests.post(webhook, headers=headers, data=json.dumps(data))
return response.json()
def send_wechat_alert(webhook, message):
"""发送企业微信告警"""
headers = {'Content-Type': 'application/json'}
data = {
'msgtype': 'text',
'text': {
'content': message
}
}
response = requests.post(webhook, headers=headers, data=json.dumps(data))
return response.json()
# 使用示例
dingtalk_webhook = 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
send_dingtalk_alert(dingtalk_webhook, '服务器CPU使用率超过90%!')
总结
作为Python运维工程师,掌握这些指令是必备技能:
- Shell命令执行 - subprocess和paramiko
掌握这些工具,你就能构建强大的运维自动化体系。
关注我,获取更多Python技术干货!