每天早上要手动去服务器跑一堆脚本,或者批量压缩一堆文件、定期备份数据库——每次都要开终端、敲命令、等结果、再敲下一条……
累不累?
今天来聊聊 Python 的 subprocess 模块。这个模块很多人知道但不太会用,或者只会最简单的 os.system()——但那东西基本算上古遗物了,能用但控制力极差。
subprocess 才是正经的姿势。
一、先说 os.system() 有啥问题
很多入门教程还在教这个:
import os
os.system("ls -la")
用是能用,但有几个致命缺陷:
1. 拿不到输出——返回值只有退出码(0成功,非0失败),命令打印的内容你看不到
2. 没法传参——参数拼接容易出问题,一不小心就 shell 注入了
3. 错误处理烂——失败了你也不知道具体原因
所以,subprocess 是真正的替代方案。
二、subprocess 基础用法
2.1 最简单的调用
import subprocess
result = subprocess.run(["ls", "-la"], capture_output=True, text=True)
print(result.stdout) # 命令的标准输出
print(result.stderr) # 错误信息
print(result.returncode) # 退出码,0 = 成功
注意几个参数:
capture_output=True:捕获 stdout 和 stderr-
text=True:以字符串形式返回,不用手动 decode-
check=True:如果命令失败,自动抛出异常(推荐加上)-
2.2 加上错误处理
import subprocess
def run_cmd(cmd: list[str]) -> str:
"""执行命令,失败时抛出异常并打印错误"""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True # 非0退出码自动报错
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"命令执行失败: {e.cmd}")
print(f"错误信息: {e.stderr}")
raise
# 示例:查看磁盘使用情况
output = run_cmd(["df", "-h"])
print(output)
三、实际场景:批量执行任务
3.1 批量压缩图片
假设你有一个目录里全是 PNG,需要批量转成 JPG(用 ImageMagick 的 convert 命令):
import subprocess
from pathlib import Path
def batch_convert_images(src_dir: str, dst_dir: str):
src = Path(src_dir)
dst = Path(dst_dir)
dst.mkdir(parents=True, exist_ok=True)
png_files = list(src.glob("*.png"))
print(f"找到 {len(png_files)} 个 PNG 文件,开始转换...")
success, failed = 0, 0
for png in png_files:
output_path = dst / png.with_suffix(".jpg").name
try:
subprocess.run(
["convert", str(png), str(output_path)],
check=True,
capture_output=True,
text=True
)
success += 1
print(f"✓ {png.name}")
except subprocess.CalledProcessError as e:
failed += 1
print(f"✗ {png.name}: {e.stderr.strip()}")
print(f"\n完成!成功 {success} 个,失败 {failed} 个")
batch_convert_images("/tmp/images", "/tmp/images_jpg")
3.2 定时备份 MySQL 数据库
这个场景超级实用——每天自动备份、按日期命名、删除7天前的旧文件:
import subprocess
from pathlib import Path
from datetime import datetime, timedelta
DB_HOST = "localhost"
DB_USER = "root"
DB_PASS = "your_password"
DB_NAME = "mydb"
BACKUP_DIR = Path("/data/mysql_backup")
def backup_mysql():
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = BACKUP_DIR / f"{DB_NAME}_{today}.sql.gz"
# mysqldump 输出通过管道 | gzip 压缩
dump_cmd = [
"mysqldump",
f"--host={DB_HOST}",
f"--user={DB_USER}",
f"--password={DB_PASS}",
"--single-transaction", # 不锁表
DB_NAME
]
gzip_cmd = ["gzip", "-c"]
with open(backup_file, "wb") as f:
dump_proc = subprocess.Popen(dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gzip_proc = subprocess.Popen(gzip_cmd, stdin=dump_proc.stdout, stdout=f)
dump_proc.stdout.close()
gzip_proc.communicate()
dump_proc.wait()
if dump_proc.returncode != 0:
backup_file.unlink(missing_ok=True)
raise RuntimeError(f"mysqldump 失败,退出码: {dump_proc.returncode}")
print(f"备份成功: {backup_file}")
# 清理7天前的旧备份
cutoff = datetime.now() - timedelta(days=7)
for old_file in BACKUP_DIR.glob("*.sql.gz"):
if datetime.fromtimestamp(old_file.stat().st_mtime) < cutoff:
old_file.unlink()
print(f"删除旧备份: {old_file.name}")
backup_mysql()
这里用了 subprocess.Popen 来实现命令管道,比直接用 shell=True 的字符串拼接安全得多。
四、进阶技巧
4.1 超时控制
命令挂死是常见问题,加个 timeout 参数就行:
try:
result = subprocess.run(
["ping", "-c", "4", "8.8.8.8"],
capture_output=True,
text=True,
timeout=10 # 10秒超时
)
except subprocess.TimeoutExpired:
print("命令超时了!")
4.2 环境变量注入
有时候子进程需要特定的环境变量,可以这样传:
import os
custom_env = os.environ.copy()
custom_env["MY_ENV_VAR"] = "hello_world"
custom_env["PATH"] = "/usr/local/bin:" + custom_env["PATH"]
result = subprocess.run(
["my_script.sh"],
capture_output=True,
text=True,
env=custom_env
)
4.3 实时输出流式读取
有些命令执行时间很长,你希望实时看到输出,而不是等命令结束才一次性打印:
import subprocess
import sys
def run_with_realtime_output(cmd: list[str]):
"""实时打印命令输出"""
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 把 stderr 合并到 stdout
text=True,
bufsize=1 # 行缓冲
)
for line in iter(process.stdout.readline, ""):
sys.stdout.write(line)
sys.stdout.flush()
process.wait()
return process.returncode
# 比如跑一个耗时的 pip install
run_with_realtime_output(["pip", "install", "-r", "requirements.txt"])
五、完整示例:服务器巡检脚本
最后来个综合的——每天自动巡检服务器状态,生成报告:
import subprocess
from datetime import datetime
from pathlib import Path
def check_disk():
result = subprocess.run(["df", "-h", "--output=target,pcent"],
capture_output=True, text=True, check=True)
lines = result.stdout.strip().split("\n")[1:] # 跳过表头
warnings = []
for line in lines:
parts = line.split()
if len(parts) == 2:
mount, usage = parts
pct = int(usage.rstrip("%"))
if pct >= 80:
warnings.append(f" ⚠ {mount}: 使用率 {usage}")
return warnings
def check_memory():
result = subprocess.run(["free", "-h"], capture_output=True, text=True, check=True)
return result.stdout.strip()
def check_top_processes():
result = subprocess.run(
["ps", "aux", "--sort=-%cpu"],
capture_output=True, text=True, check=True
)
lines = result.stdout.strip().split("\n")[:6] # 取前5个进程
return "\n".join(lines)
def generate_report():
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report_path = Path(f"/tmp/server_report_{datetime.now().strftime('%Y%m%d')}.txt")
disk_warnings = check_disk()
memory_info = check_memory()
top_procs = check_top_processes()
with open(report_path, "w") as f:
f.write(f"服务器巡检报告 - {now}\n")
f.write("=" * 50 + "\n\n")
f.write("【磁盘使用情况】\n")
if disk_warnings:
f.write("发现以下磁盘使用率过高:\n")
f.write("\n".join(disk_warnings) + "\n")
else:
f.write("磁盘使用正常\n")
f.write("\n【内存使用情况】\n")
f.write(memory_info + "\n")
f.write("\n【CPU占用Top5进程】\n")
f.write(top_procs + "\n")
print(f"报告已生成: {report_path}")
return str(report_path)
generate_report()
效果对比
| 方式 | 获取输出 | 错误处理 | 管道支持 | 安全性 |
|---|
os.system() | ❌ | 仅退出码 | ❌ | 低 |
subprocess.run() | ✅ | 异常 | ❌ | 高 |
subprocess.Popen() | ✅ | 灵活 | ✅ | 高 |
小结
subprocess 的核心就三件事:
1. 简单命令用 subprocess.run(),加上 capture_output=True, text=True, check=True 三件套
2. 需要管道或实时输出时,用 subprocess.Popen()
3. 记得加 timeout,别让程序挂死在那
工作中能自动化的都自动化,省下来的时间才是真正属于你的。
如果这篇文章对你有帮助,点赞+在看 支持一下,转发给身边还在手动跑命令的朋友,让他早点解放双手 🙌
我是几行代码,关注我,下期见。