去年有个需求:从200台服务器上收集系统信息,包括CPU、内存、磁盘、网络配置,汇总成一份巡检报告。我随手写了个Shell脚本,用for循环SSH到每台机器执行命令,结果整整跑了三个多小时。
同事看不下去了,花了半天用Python重写,引入并发处理,同样的任务只要8分钟。
这件事让我开始认真思考:什么场景该用Shell,什么场景该用Python?
先说结论:Shell适合简单的命令编排,Python适合复杂的数据处理和逻辑控制。
Shell的优势:
Shell的劣势:
Python的优势:
Python的劣势:
测试环境:
任务:扫描日志目录,找出过去7天超过100MB的日志文件,按大小排序输出。
Shell 版本:
#!/bin/bash# find_large_logs.shLOG_DIR="/var/log"DAYS=7SIZE_MB=100find "$LOG_DIR" -type f -name "*.log" -mtime -"$DAYS" -size +"${SIZE_MB}M" \ -exec ls -lh {} \; 2>/dev/null | \ awk '{print $5, $9}' | \ sort -rhPython 版本:
#!/usr/bin/env python3# find_large_logs.pyimport osfrom pathlib import Pathfrom datetime import datetime, timedeltaLOG_DIR = "/var/log"DAYS = 7SIZE_MB = 100deffind_large_logs(): cutoff = datetime.now() - timedelta(days=DAYS) size_bytes = SIZE_MB * 1024 * 1024 results = []for log_file in Path(LOG_DIR).rglob("*.log"):try: stat = log_file.stat() mtime = datetime.fromtimestamp(stat.st_mtime)if mtime > cutoff and stat.st_size > size_bytes: results.append((stat.st_size, str(log_file)))except (PermissionError, FileNotFoundError):continue results.sort(reverse=True)for size, path in results: print(f"{size / 1024 / 1024:.1f}MB\t{path}")if __name__ == "__main__": find_large_logs()测试结果(扫描10万个文件):
分析:这个场景Shell和Python差距不大。Shell版本利用了find命令的优化,Python版本需要遍历文件系统但代码更清晰。
任务:分析Nginx访问日志,统计每个IP的请求数、状态码分布、响应时间P99。
Shell 版本:
#!/bin/bash# analyze_nginx_log.shLOG_FILE="$1"echo"=== Top 10 IPs ==="awk '{print $1}'"$LOG_FILE" | sort | uniq -c | sort -rn | head -10echo""echo"=== Status Code Distribution ==="awk '{print $9}'"$LOG_FILE" | sort | uniq -c | sort -rnecho""echo"=== Response Time P99 ==="# 假设响应时间在最后一列awk '{print $NF}'"$LOG_FILE" | sort -n | awk ' {a[NR]=$1} END { p99_idx = int(NR * 0.99) print "P99: " a[p99_idx] "ms" }'Python 版本:
#!/usr/bin/env python3# analyze_nginx_log.pyimport reimport sysfrom collections import Counterfrom statistics import quantilesLOG_PATTERN = re.compile(r'(?P<ip>\d+\.\d+\.\d+\.\d+)'# IPr'.*?"(?P<method>\w+) (?P<path>[^ ]+)'# Method and Pathr'.*?" (?P<status>\d+)'# Statusr'.*?(?P<time>\d+\.?\d*)$'# Response time)defanalyze_log(filename): ip_counter = Counter() status_counter = Counter() response_times = []with open(filename, 'r') as f:for line in f: match = LOG_PATTERN.search(line)if match: ip_counter[match.group('ip')] += 1 status_counter[match.group('status')] += 1try: response_times.append(float(match.group('time')))except ValueError:pass print("=== Top 10 IPs ===")for ip, count in ip_counter.most_common(10): print(f"{count:>8}{ip}") print("\n=== Status Code Distribution ===")for status, count in status_counter.most_common(): print(f"{count:>8}{status}") print("\n=== Response Time Percentiles ===")if response_times: q = quantiles(response_times, n=100) print(f"P50: {q[49]:.2f}ms") print(f"P90: {q[89]:.2f}ms") print(f"P99: {q[98]:.2f}ms")if __name__ == "__main__": analyze_log(sys.argv[1])测试结果(1GB日志文件,约500万行):
分析:Python版本快了近4倍。Shell版本多次sort/uniq导致重复读写文件,而Python一次遍历完成所有统计。
任务:连接200台服务器,收集系统信息(hostname、uptime、内存使用、磁盘使用)。
Shell 版本(串行):
#!/bin/bash# collect_info.shSERVERS_FILE="servers.txt"OUTPUT_FILE="report.csv"echo"hostname,uptime,mem_used_pct,disk_used_pct" > "$OUTPUT_FILE"whileread -r server; doecho"Collecting from $server..." info=$(ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "$server"' hostname=$(hostname) uptime=$(uptime -p) mem_used=$(free | awk "/Mem:/ {printf \"%.1f\", \$3/\$2*100}") disk_used=$(df -h / | awk "NR==2 {print \$5}" | tr -d "%") echo "$hostname,$uptime,$mem_used,$disk_used" ' 2>/dev/null)if [ -n "$info" ]; thenecho"$info" >> "$OUTPUT_FILE"elseecho"$server,FAILED,N/A,N/A" >> "$OUTPUT_FILE"fidone < "$SERVERS_FILE"Shell 版本(并行,使用GNU Parallel):
#!/bin/bash# collect_info_parallel.shSERVERS_FILE="servers.txt"OUTPUT_FILE="report.csv"echo"hostname,uptime,mem_used_pct,disk_used_pct" > "$OUTPUT_FILE"collect_server_info() { server=$1 info=$(ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "$server"' hostname=$(hostname) uptime=$(uptime -p) mem_used=$(free | awk "/Mem:/ {printf \"%.1f\", \$3/\$2*100}") disk_used=$(df -h / | awk "NR==2 {print \$5}" | tr -d "%") echo "$hostname,$uptime,$mem_used,$disk_used" ' 2>/dev/null)if [ -n "$info" ]; thenecho"$info"elseecho"$server,FAILED,N/A,N/A"fi}export -f collect_server_infocat "$SERVERS_FILE" | parallel -j 50 collect_server_info >> "$OUTPUT_FILE"Python 版本(并发):
#!/usr/bin/env python3# collect_info.pyimport asyncioimport asyncsshimport csvfrom dataclasses import dataclassfrom typing import Optional@dataclassclassServerInfo: hostname: str uptime: str mem_used_pct: float disk_used_pct: floatasyncdefcollect_from_server(host: str, timeout: int = 10) -> Optional[ServerInfo]:try:asyncwith asyncssh.connect( host, username='root', known_hosts=None, connect_timeout=timeout ) as conn: result = await conn.run(''' hostname uptime -p free | awk '/Mem:/ {printf "%.1f", $3/$2*100}' df -h / | awk 'NR==2 {print $5}' | tr -d '%' ''', check=True) lines = result.stdout.strip().split('\n')return ServerInfo( hostname=lines[0], uptime=lines[1], mem_used_pct=float(lines[2]), disk_used_pct=float(lines[3]) )except Exception as e: print(f"Failed to connect {host}: {e}")returnNoneasyncdefcollect_all(servers: list[str], concurrency: int = 50): semaphore = asyncio.Semaphore(concurrency)asyncdeflimited_collect(host):asyncwith semaphore:return host, await collect_from_server(host) tasks = [limited_collect(host) for host in servers] results = await asyncio.gather(*tasks)return resultsdefmain():with open('servers.txt') as f: servers = [line.strip() for line in f if line.strip()] results = asyncio.run(collect_all(servers))with open('report.csv', 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['hostname', 'uptime', 'mem_used_pct', 'disk_used_pct'])for host, info in results:if info: writer.writerow([ info.hostname, info.uptime, info.mem_used_pct, info.disk_used_pct ])else: writer.writerow([host, 'FAILED', 'N/A', 'N/A'])if __name__ == "__main__": main()测试结果(200台服务器):
任务:修改200台服务器的sshd_config,禁用PasswordAuthentication。
Shell 版本:
#!/bin/bash# update_sshd.shSERVERS_FILE="servers.txt"BACKUP_DIR="/tmp/sshd_backup"mkdir -p "$BACKUP_DIR"whileread -r server; doecho"Updating $server..." ssh "$server"' # 备份 cp /etc/ssh/sshd_config /etc/ssh/sshd_config.bak # 修改配置 if grep -q "^PasswordAuthentication" /etc/ssh/sshd_config; then sed -i "s/^PasswordAuthentication.*/PasswordAuthentication no/" /etc/ssh/sshd_config else echo "PasswordAuthentication no" >> /etc/ssh/sshd_config fi # 验证配置 sshd -t && systemctl reload sshd 'if [ $? -eq 0 ]; thenecho"$server: SUCCESS"elseecho"$server: FAILED"fidone < "$SERVERS_FILE"Python 版本(使用Fabric):
#!/usr/bin/env python3# update_sshd.pyfrom fabric import Connection, Configfrom invoke import Responderfrom concurrent.futures import ThreadPoolExecutor, as_completedimport logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)SSHD_CONFIG = "/etc/ssh/sshd_config"defupdate_server(host: str) -> tuple[str, bool, str]:"""更新单台服务器的sshd配置"""try: conn = Connection( host, user='root', connect_timeout=10, connect_kwargs={"banner_timeout": 30} )# 备份 conn.run(f"cp {SSHD_CONFIG}{SSHD_CONFIG}.bak")# 读取当前配置 result = conn.run(f"cat {SSHD_CONFIG}", hide=True) config = result.stdout# 修改配置if"PasswordAuthentication"in config: conn.run(f"sed -i 's/^#*PasswordAuthentication.*/PasswordAuthentication no/' {SSHD_CONFIG}" )else: conn.run(f"echo 'PasswordAuthentication no' >> {SSHD_CONFIG}")# 验证配置语法 result = conn.run("sshd -t", warn=True)if result.failed:# 回滚 conn.run(f"cp {SSHD_CONFIG}.bak {SSHD_CONFIG}")return host, False, "Config validation failed"# 重载服务 conn.run("systemctl reload sshd")# 验证修改生效 result = conn.run(f"grep '^PasswordAuthentication' {SSHD_CONFIG}", hide=True)if"no"notin result.stdout.lower():return host, False, "Change not applied"return host, True, "Success"except Exception as e:return host, False, str(e)defmain():with open('servers.txt') as f: servers = [line.strip() for line in f if line.strip()] results = {"success": [], "failed": []}with ThreadPoolExecutor(max_workers=20) as executor: futures = {executor.submit(update_server, host): host for host in servers}for future in as_completed(futures): host, success, message = future.result()if success: results["success"].append(host) logger.info(f"{host}: {message}")else: results["failed"].append((host, message)) logger.error(f"{host}: {message}")# 输出汇总 print(f"\n=== Summary ===") print(f"Success: {len(results['success'])}") print(f"Failed: {len(results['failed'])}")if results["failed"]: print("\nFailed servers:")for host, reason in results["failed"]: print(f" {host}: {reason}")if __name__ == "__main__": main()1. 简单的命令组合
# 查看磁盘使用最高的目录du -sh /* 2>/dev/null | sort -rh | head -10# 批量重命名文件for f in *.txt; do mv "$f""${f%.txt}.md"; done# 日志实时监控tail -f /var/log/app.log | grep --line-buffered "ERROR"2. 快速的系统管理任务
# 批量杀进程pgrep -f "python.*worker" | xargs kill# 清理临时文件find /tmp -type f -mtime +7 -delete# 检查服务状态for svc in nginx mysql redis; do systemctl is-active "$svc" || echo"$svc is down!"done3. 管道处理流式数据
# 实时统计Nginx QPStail -f /var/log/nginx/access.log | \ awk '{print strftime("%H:%M:%S")}' | \ uniq -c# 提取错误日志并发送告警tail -F /var/log/app.log | \ grep --line-buffered "CRITICAL" | \whileread line; do curl -X POST -d "text=$line""$SLACK_WEBHOOK"done1. 复杂的数据处理
# 解析JSON日志并聚合统计import jsonfrom collections import defaultdictstats = defaultdict(lambda: {"count": 0, "errors": 0, "total_time": 0})with open("app.log") as f:for line in f:try: entry = json.loads(line) endpoint = entry["endpoint"] stats[endpoint]["count"] += 1 stats[endpoint]["total_time"] += entry["response_time"]if entry["status"] >= 400: stats[endpoint]["errors"] += 1except (json.JSONDecodeError, KeyError):continuefor endpoint, data in sorted(stats.items(), key=lambda x: -x[1]["count"]): avg_time = data["total_time"] / data["count"] error_rate = data["errors"] / data["count"] * 100 print(f"{endpoint}: {data['count']} requests, {avg_time:.2f}ms avg, {error_rate:.1f}% errors")2. 需要并发的批量操作
# 并发检查URL可用性import asyncioimport aiohttpfrom typing import NamedTupleclassCheckResult(NamedTuple): url: str status: int latency: float error: str = ""asyncdefcheck_url(session: aiohttp.ClientSession, url: str) -> CheckResult:try: start = asyncio.get_event_loop().time()asyncwith session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: latency = asyncio.get_event_loop().time() - startreturn CheckResult(url, resp.status, latency * 1000)except Exception as e:return CheckResult(url, 0, 0, str(e))asyncdefcheck_all(urls: list[str]) -> list[CheckResult]:asyncwith aiohttp.ClientSession() as session: tasks = [check_url(session, url) for url in urls]returnawait asyncio.gather(*tasks)# 使用urls = ["https://example.com", "https://google.com", ...]results = asyncio.run(check_all(urls))3. 与API交互
# 调用云API批量创建资源import boto3from concurrent.futures import ThreadPoolExecutorec2 = boto3.client('ec2')defcreate_instance(config): response = ec2.run_instances( ImageId=config['ami'], InstanceType=config['type'], MinCount=1, MaxCount=1, TagSpecifications=[{'ResourceType': 'instance','Tags': [{'Key': 'Name', 'Value': config['name']}] }] )return response['Instances'][0]['InstanceId']configs = [ {'name': 'web-1', 'ami': 'ami-xxx', 'type': 't3.medium'}, {'name': 'web-2', 'ami': 'ami-xxx', 'type': 't3.medium'},# ...]with ThreadPoolExecutor(max_workers=10) as executor: instance_ids = list(executor.map(create_instance, configs))4. 需要良好错误处理
# 带重试和超时的文件下载import requestsfrom tenacity import retry, stop_after_attempt, wait_exponentialimport hashlib@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))defdownload_file(url: str, dest: str, expected_md5: str = None) -> bool:"""下载文件,支持重试、校验""" response = requests.get(url, stream=True, timeout=30) response.raise_for_status() md5 = hashlib.md5()with open(dest, 'wb') as f:for chunk in response.iter_content(chunk_size=8192): f.write(chunk) md5.update(chunk)if expected_md5 and md5.hexdigest() != expected_md5:raise ValueError(f"MD5 mismatch: expected {expected_md5}, got {md5.hexdigest()}")returnTrue不要一次性把所有Shell脚本重写成Python。我推荐的策略:
Python调用Shell命令:
import subprocessimport shlexdefrun_cmd(cmd: str, timeout: int = 60) -> tuple[int, str, str]:"""执行Shell命令,返回(返回码, stdout, stderr)""" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout )return result.returncode, result.stdout, result.stderr# 使用code, out, err = run_cmd("df -h")if code == 0: print(out)else: print(f"Error: {err}")Shell调用Python脚本:
#!/bin/bash# 前处理用Shellfind /var/log -name "*.log" -mtime -1 > /tmp/logs.txt# 复杂处理用Pythonpython3 analyze_logs.py /tmp/logs.txt > report.json# 后处理用Shellcat report.json | jq -r '.summary' | mail -s "Daily Report" admin@example.com我整理了一套运维脚本模板,新脚本可以直接基于模板修改:
Python 运维脚本模板:
#!/usr/bin/env python3"""脚本说明:xxx作者:xxx日期:xxx"""import argparseimport loggingimport sysfrom pathlib import Path# 配置日志logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)defparse_args(): parser = argparse.ArgumentParser(description='脚本说明') parser.add_argument('-c', '--config', type=Path, help='配置文件路径') parser.add_argument('-v', '--verbose', action='store_true', help='详细输出') parser.add_argument('--dry-run', action='store_true', help='仅模拟执行')return parser.parse_args()defmain(): args = parse_args()if args.verbose: logging.getLogger().setLevel(logging.DEBUG) logger.info("开始执行...")try:# 主逻辑passexcept KeyboardInterrupt: logger.warning("用户中断") sys.exit(130)except Exception as e: logger.error(f"执行失败: {e}") sys.exit(1) logger.info("执行完成")if __name__ == "__main__": main()# 错误:直接读取可能有编码错误的文件with open('log.txt') as f: content = f.read() # 可能抛出UnicodeDecodeError# 正确:指定编码并处理错误with open('log.txt', encoding='utf-8', errors='replace') as f: content = f.read()# 或者使用chardet自动检测编码import chardetwith open('log.txt', 'rb') as f: raw = f.read() encoding = chardet.detect(raw)['encoding']content = raw.decode(encoding or'utf-8', errors='replace')# 错误:连接没有正确关闭import paramikodefget_hostname(host): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host) stdin, stdout, stderr = client.exec_command('hostname')return stdout.read().decode().strip()# client没有关闭!# 正确:使用context managerdefget_hostname(host):with paramiko.SSHClient() as client: client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host) stdin, stdout, stderr = client.exec_command('hostname')return stdout.read().decode().strip()# 错误:无限制并发asyncdefcheck_all(hosts): tasks = [check_host(h) for h in hosts] # 1000个并发连接returnawait asyncio.gather(*tasks)# 正确:使用信号量限制并发asyncdefcheck_all(hosts, max_concurrent=50): semaphore = asyncio.Semaphore(max_concurrent)asyncdeflimited_check(host):asyncwith semaphore:returnawait check_host(host) tasks = [limited_check(h) for h in hosts]returnawait asyncio.gather(*tasks)# 错误:不等待子进程结束import subprocessdefrun_background(cmd): subprocess.Popen(cmd, shell=True)# 父进程结束后,子进程变成僵尸# 正确:正确处理后台进程import subprocessimport atexitbackground_processes = []defrun_background(cmd): proc = subprocess.Popen(cmd, shell=True) background_processes.append(proc)return proc@atexit.registerdefcleanup():for proc in background_processes: proc.terminate() proc.wait()# 创建运维脚本项目mkdir ops-scripts && cd ops-scriptspython3 -m venv venvsource venv/bin/activate# 安装常用依赖pip install paramiko fabric psutil click rich pyyaml requests tenacity# 固化依赖pip freeze > requirements.txtShell不是不好用,Python也不是万能的。关键是选择合适的工具解决问题。
我的原则是:
最后,无论用什么语言,写好注释、处理好异常、记录好日志,才是运维脚本的基本功。

长按或扫描下方二维码,免费获取 Python公开课和大佬打包整理的几百G的学习资料,内容包含但不限于Python电子书、教程、项目接单、源码等等
▲扫描二维码-免费领取
推荐阅读
Lihil框架—其愿景是:使 Python 成为 Web 开发的主流编程语言
excel-serializer: Excel 世界的 json.dump()/ load(),轻松序列化Python 复杂数据
点击 阅读原文了解更多