在了解了安全编程基础后,今天我们将深入探讨Python开发中的常见漏洞及其防范措施,这是构建安全应用的实战关键。
1. Python漏洞防范全景图
2. 深度漏洞解析与防范
2.1 模板注入漏洞
危险示例:
from flask import Flask, request, render_template_stringimport jinja2app = Flask(__name__)# ❌ 危险:直接渲染用户控制的模板@app.route('/unsafe_template')def unsafe_template(): template = request.args.get('template', 'Hello {{ name }}') name = request.args.get('name', 'Guest') # 攻击者可输入: {{ config.__class__.__init__.__globals__['os'].system('rm -rf /') }} return render_template_string(template, name=name)# ❌ 危险:使用不安全的Jinja2配置unsafe_env = jinja2.Environment(autoescape=False) # 禁用自动转义
# ✅ 安全:使用沙盒环境from jinja2.sandbox import SandboxedEnvironmentdef safe_template_render(): # 创建沙盒环境 env = SandboxedEnvironment( autoescape=True, # 启用自动转义 trim_blocks=True, lstrip_blocks=True ) # 限制可用函数 env.globals['safe_functions'] = { 'len': len, 'str': str, 'int': int, 'list': list, 'dict': dict, } # 移除危险函数 env.globals.pop('__builtins__', None) # 渲染安全模板 template = env.from_string("Hello {{ name|e }}") # 使用e过滤器转义 return template.render(name=user_input)# ✅ 安全:使用内容安全策略(CSP)@app.after_requestdef add_security_headers(response): response.headers['Content-Security-Policy'] = \ "default-src 'self'; script-src 'self' 'unsafe-inline';" response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' return response# ✅ 使用专门的模板库import stringclass SafeTemplate(string.Template): """安全的字符串模板""" delimiter = '@' # 使用不常见的分隔符 def safe_substitute(self, **kwargs): # 对输入进行验证 for key, value in kwargs.items(): if not isinstance(value, (str, int, float, bool, type(None))): raise ValueError(f"Unsafe value type for {key}") return super().safe_substitute(**kwargs)# 使用示例template = SafeTemplate('Hello @name')safe_output = template.safe_substitute(name=user_input) # 自动转义危险字符
2.2 不安全的反序列化
危险示例:
import pickleimport yamlimport json# ❌ 危险:直接反序列化不可信数据def load_data_unsafe(serialized_data: bytes): # pickle可能执行任意代码 return pickle.loads(serialized_data) # 严重安全风险!# ❌ 危险:使用不安全的YAML加载器def load_yaml_unsafe(yaml_str: str): return yaml.load(yaml_str, Loader=yaml.Loader) # 可能执行代码!# ❌ JSON看似安全但也有风险def json_decode_unsafe(json_str: str): data = json.loads(json_str) # 如果后续使用eval处理JSON中的字符串... if '__eval__' in data: return eval(data['__eval__']) # 危险!
import pickleimport yamlimport jsonimport hmacimport hashlibfrom typing import Any, Dict# ✅ 安全:签名的pickle数据class SecurePickle: def __init__(self, secret_key: str): self.secret_key = secret_key.encode() def dumps(self, obj: Any) -> bytes: """安全的序列化""" data = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) signature = hmac.new(self.secret_key, data, hashlib.sha256).digest() return signature + data def loads(self, data: bytes) -> Any: """安全的反序列化""" if len(data) < 32: raise ValueError("Invalid data length") signature, payload = data[:32], data[32:] expected = hmac.new(self.secret_key, payload, hashlib.sha256).digest() if not hmac.compare_digest(signature, expected): raise ValueError("Invalid signature") # 使用限制的Unpickler class RestrictedUnpickler(pickle.Unpickler): def find_class(self, module, name): # 只允许安全模块 safe_modules = { 'builtins': ['str', 'int', 'float', 'list', 'dict', 'tuple'], 'datetime': ['datetime', 'date', 'time'], 'decimal': ['Decimal'], } if module in safe_modules and name in safe_modules[module]: return super().find_class(module, name) raise pickle.UnpicklingError(f"Forbidden class: {module}.{name}") import io return RestrictedUnpickler(io.BytesIO(payload)).load()# ✅ 安全:使用安全的YAML加载器def load_yaml_safe(yaml_str: str) -> Dict: """安全的YAML加载""" # 使用SafeLoader而不是Loader return yaml.load(yaml_str, Loader=yaml.SafeLoader)# ✅ 安全:JSON反序列化防护def json_decode_safe(json_str: str, max_size: int = 1024*1024) -> Dict: """安全的JSON解析""" # 限制JSON大小防止内存耗尽 if len(json_str) > max_size: raise ValueError(f"JSON too large (>{max_size} bytes)") data = json.loads(json_str) # 验证数据结构 def validate_obj(obj, depth=0): if depth > 50: # 防止递归过深 raise ValueError("Recursion depth exceeded") if isinstance(obj, dict): for key, value in obj.items(): if not isinstance(key, str): raise ValueError(f"Invalid key type: {type(key)}") validate_obj(value, depth + 1) elif isinstance(obj, list): for item in obj: validate_obj(item, depth + 1) elif not isinstance(obj, (str, int, float, bool, type(None))): raise ValueError(f"Unsupported type: {type(obj)}") validate_obj(data) return data# ✅ 使用MessagePack等替代方案import msgpackimport cbor2def secure_serialization(obj: Any) -> bytes: """使用安全的序列化格式""" # MessagePack比pickle安全 return msgpack.packb(obj, use_bin_type=True)def secure_deserialization(data: bytes) -> Any: """安全的反序列化""" return msgpack.unpackb(data, raw=False)
2.3 路径遍历与文件包含
危险示例:
import os# ❌ 危险:直接使用用户提供的路径def read_file_unsafe(filename: str): # 攻击者可输入: ../../../etc/passwd filepath = os.path.join('uploads', filename) with open(filepath, 'r') as f: return f.read()# ❌ 危险:不安全的重命名def rename_file_unsafe(old_name: str, new_name: str): # 可能跨越目录边界 os.rename(old_name, new_name)
import osimport tempfilefrom pathlib import Pathimport posixpathimport shutilclass SecureFileHandler: """安全的文件操作处理器""" @staticmethod def safe_path_join(base_dir: str, user_path: str) -> Path: """安全地拼接路径,防止目录遍历""" base_path = Path(base_dir).resolve() user_path = Path(user_path) # 规范化路径 try: full_path = (base_path / user_path).resolve() except RuntimeError: # 解析错误 raise ValueError("Invalid path") # 确保最终路径在基目录内 if not str(full_path).startswith(str(base_path)): raise ValueError(f"Path traversal attempt: {user_path}") return full_path @staticmethod def secure_open(filename: str, mode: str = 'r', base_dir: str = 'uploads'): """安全地打开文件""" safe_path = SecureFileHandler.safe_path_join(base_dir, filename) # 额外的安全检查 if 'w' in mode or 'a' in mode: # 写入时检查文件扩展名 allowed_extensions = {'.txt', '.pdf', '.png', '.jpg'} if safe_path.suffix.lower() not in allowed_extensions: raise ValueError(f"File type not allowed: {safe_path.suffix}") return open(safe_path, mode) @staticmethod def create_secure_temp_file(suffix: str = None) -> str: """创建安全的临时文件""" # 使用tempfile的现代API temp_file = tempfile.NamedTemporaryFile( mode='w+b', delete=False, suffix=suffix, prefix='tmp_' ) temp_file.close() # 设置安全权限 os.chmod(temp_file.name, 0o600) # 只有所有者可读写 return temp_file.name# ✅ 使用pathlib的现代APIdef safe_file_operations(): """使用Pathlib的安全文件操作""" from pathlib import Path # 安全路径解析 base = Path('/var/www/uploads').resolve() user_input = 'user_file.txt' # 方法1:直接验证 full_path = base / user_input full_path = full_path.resolve() # 验证是否在基目录内 try: full_path.relative_to(base) except ValueError: raise ValueError("Path traversal detected") # 方法2:使用strict参数 safe_path = Path(posixpath.normpath(user_input)) if posixpath.isabs(user_input) or '..' in safe_path.parts: raise ValueError("Invalid path") # 安全的文件读取 if full_path.exists() and full_path.is_file(): return full_path.read_text(encoding='utf-8') return None# ✅ 安全的上传处理def handle_file_upload(uploaded_file, allowed_extensions=None): """处理文件上传的安全函数""" if allowed_extensions is None: allowed_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.pdf', '.txt'} # 验证文件扩展名 filename = uploaded_file.filename file_ext = os.path.splitext(filename)[1].lower() if file_ext not in allowed_extensions: raise ValueError(f"File extension {file_ext} not allowed") # 验证MIME类型(不依赖扩展名) import magic mime = magic.Magic(mime=True) file_mime = mime.from_buffer(uploaded_file.read(1024)) uploaded_file.seek(0) # 重置文件指针 # 验证MIME类型白名单 allowed_mimes = { 'image/jpeg', 'image/png', 'image/gif', 'application/pdf', 'text/plain' } if file_mime not in allowed_mimes: raise ValueError(f"MIME type {file_mime} not allowed") # 生成安全文件名 import uuid safe_filename = f"{uuid.uuid4()}{file_ext}" # 保存文件 upload_dir = Path('uploads').resolve() upload_dir.mkdir(exist_ok=True, mode=0o755) save_path = upload_dir / safe_filename uploaded_file.save(save_path) # 设置安全权限 save_path.chmod(0o644) # 所有者读写,其他人只读 return safe_filename
3. 依赖与配置安全
3.1 供应链攻击防范
"""依赖供应链安全防护"""import subprocessimport jsonimport hashlibfrom typing import Dict, Listimport requestsclass SupplyChainSecurity: """供应链安全检查""" @staticmethod def verify_package_integrity(package_name: str, version: str, expected_hash: str) -> bool: """验证包完整性""" import pkg_resources try: dist = pkg_resources.get_distribution(package_name) # 验证版本 if dist.version != version: print(f"Version mismatch: expected {version}, got {dist.version}") return False # 计算实际哈希 package_path = dist.location if package_path.endswith('.egg') or package_path.endswith('.whl'): with open(package_path, 'rb') as f: actual_hash = hashlib.sha256(f.read()).hexdigest() if actual_hash != expected_hash: print(f"Hash mismatch for {package_name}") return False return True except Exception as e: print(f"Verification failed: {e}") return False @staticmethod def check_typosquatting(): """检查可能的仿冒包""" common_packages = { 'requests': ['request', 'reqests', 'requets'], 'numpy': ['nampy', 'numppy', 'numpi'], 'django': ['djano', 'dajngo', 'djangoo'], } installed = subprocess.run( ['pip', 'list', '--format=json'], capture_output=True, text=True ) if installed.returncode == 0: packages = json.loads(installed.stdout) installed_names = {p['name'].lower() for p in packages} for real, typos in common_packages.items(): for typo in typos: if typo in installed_names: print(f"⚠️ Possible typosquatting: {typo} instead of {real}") @staticmethod def verify_pypi_metadata(package_name: str) -> Dict: """验证PyPI包元数据""" import xmlrpc.client try: client = xmlrpc.client.ServerProxy('https://pypi.org/pypi') # 获取包信息 releases = client.package_releases(package_name, True) if not releases: return {} latest = releases[0] urls = client.release_urls(package_name, latest) # 检查发布者信息 role_info = client.package_roles(package_name) return { 'latest_version': latest, 'release_count': len(releases), 'maintainers': [role[0] for role in role_info], 'download_urls': [url['url'] for url in urls] } except Exception as e: print(f"Failed to fetch PyPI metadata: {e}") return {} @staticmethod def create_requirements_lock() -> str: """创建锁定的requirements文件""" # 获取当前环境的所有包及其精确版本 result = subprocess.run( ['pip', 'freeze'], capture_output=True, text=True ) if result.returncode != 0: raise RuntimeError("Failed to freeze requirements") requirements = result.stdout.strip() # 添加哈希值 hashed_requirements = [] for line in requirements.split('\n'): if '==' in line: pkg_spec = line.split('#')[0].strip() hashed_line = SupplyChainSecurity._add_package_hash(pkg_spec) hashed_requirements.append(hashed_line) return '\n'.join(hashed_requirements) @staticmethod def _add_package_hash(pkg_spec: str) -> str: """为包规范添加哈希值""" import tempfile # 下载包到临时文件 with tempfile.NamedTemporaryFile(suffix='.whl', delete=False) as tmp: download_cmd = ['pip', 'download', '--no-deps', '-d', tmp.name, pkg_spec] subprocess.run(download_cmd, capture_output=True) # 计算哈希 with open(tmp.name, 'rb') as f: content = f.read() sha256_hash = hashlib.sha256(content).hexdigest() return f"{pkg_spec} --hash=sha256:{sha256_hash}"
3.2 安全配置管理
"""高级安全配置管理"""import osfrom typing import Any, Dictfrom cryptography.fernet import Fernetfrom cryptography.hazmat.primitives import hashesfrom cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMACimport base64class SecureConfigManager: """安全配置管理器""" def __init__(self, master_key: str = None): self.master_key = master_key or os.environ.get('CONFIG_MASTER_KEY') if not self.master_key: raise ValueError("Master key required") # 派生加密密钥 self.encryption_key = self._derive_key(self.master_key) self.cipher = Fernet(self.encryption_key) # 配置缓存 self._config_cache: Dict[str, Any] = {} def _derive_key(self, password: str, salt: bytes = None) -> bytes: """从密码派生密钥""" if salt is None: salt = b'secure_config_salt_' # 生产环境应从安全来源获取 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password.encode())) return key def encrypt_value(self, plaintext: str) -> str: """加密配置值""" encrypted = self.cipher.encrypt(plaintext.encode()) return base64.urlsafe_b64encode(encrypted).decode() def decrypt_value(self, ciphertext: str) -> str: """解密配置值""" encrypted = base64.urlsafe_b64decode(ciphertext.encode()) decrypted = self.cipher.decrypt(encrypted) return decrypted.decode() def load_config(self, config_path: str = None) -> Dict[str, Any]: """加载并解密配置文件""" if config_path is None: config_path = os.environ.get('CONFIG_FILE', 'config.encrypted.json') import json with open(config_path, 'r') as f: encrypted_config = json.load(f) # 解密配置 decrypted_config = {} for key, value in encrypted_config.items(): if key.endswith('_encrypted'): plain_key = key.replace('_encrypted', '') decrypted_config[plain_key] = self.decrypt_value(value) else: decrypted_config[key] = value self._config_cache.update(decrypted_config) return decrypted_config def save_config(self, config: Dict[str, Any], config_path: str = None): """加密并保存配置""" if config_path is None: config_path = 'config.encrypted.json' encrypted_config = {} for key, value in config.items(): # 只加密敏感字段 if any(sensitive in key.lower() for sensitive in ['password', 'secret', 'key', 'token', 'credential']): encrypted_key = f"{key}_encrypted" encrypted_config[encrypted_key] = self.encrypt_value(str(value)) else: encrypted_config[key] = value import json with open(config_path, 'w') as f: json.dump(encrypted_config, f, indent=2) # 设置安全权限 os.chmod(config_path, 0o600) def get_with_fallback(self, key: str, default: Any = None) -> Any: """安全地获取配置值""" # 1. 尝试从环境变量获取(最高优先级) env_key = key.upper().replace('.', '_') if env_key in os.environ: return os.environ[env_key] # 2. 尝试从缓存获取 if key in self._config_cache: return self._config_cache[key] # 3. 返回默认值 return default# 使用示例config_manager = SecureConfigManager()# 生产环境配置production_config = { 'database_url': 'postgresql://user:password@localhost/db', 'redis_secret': 'my_redis_secret_key', 'api_key': 'sk_live_xxxxxxxxxxxxxx', 'debug': False, 'log_level': 'INFO'}# 保存加密配置config_manager.save_config(production_config)# 加载配置loaded_config = config_manager.load_config()print(f"Database URL: {loaded_config.get('database_url')}")
4. 实战:安全漏洞扫描器
"""Python安全漏洞扫描器"""import astimport osfrom pathlib import Pathfrom typing import List, Dict, Setimport reclass PythonVulnerabilityScanner: """Python代码安全漏洞扫描器""" def __init__(self): self.vulnerabilities: List[Dict] = [] # 定义危险模式 self.dangerous_patterns = { 'pickle.loads': '反序列化漏洞', 'yaml.load': 'YAML反序列化漏洞', 'eval(': '代码注入漏洞', 'exec(': '代码执行漏洞', 'os.system': '命令注入漏洞', 'subprocess.call': '命令注入漏洞', 'subprocess.Popen': '命令注入漏洞', 'marshal.loads': '反序列化漏洞', 'input(': '潜在的不安全输入', 'getpass.getpass': '密码输入(需检查处理)', } # 需要参数化的SQL模式 self.sql_patterns = { r'execute\s*\(\s*f["\']': 'f-string SQL注入', r'execute\s*\(\s*["\'][^"\']*%\s*[^"\']*["\']': '字符串格式化SQL注入', r'execute\s*\(\s*["\'][^"\']*\+': '字符串拼接SQL注入', } def scan_file(self, filepath: str) -> List[Dict]: """扫描单个文件""" vulnerabilities = [] try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() # 模式匹配扫描 for pattern, description in self.dangerous_patterns.items(): if re.search(pattern, content): vulnerabilities.append({ 'file': filepath, 'type': 'DANGEROUS_FUNCTION', 'description': description, 'pattern': pattern, 'severity': 'HIGH' }) # SQL注入模式扫描 for pattern, description in self.sql_patterns.items(): if re.search(pattern, content, re.IGNORECASE): vulnerabilities.append({ 'file': filepath, 'type': 'SQL_INJECTION', 'description': description, 'pattern': pattern, 'severity': 'CRITICAL' }) # AST语法树分析 try: tree = ast.parse(content) vulnerabilities.extend(self._analyze_ast(tree, filepath)) except SyntaxError: pass # 忽略语法错误的文件 except Exception as e: print(f"Error scanning {filepath}: {e}") return vulnerabilities def _analyze_ast(self, tree: ast.AST, filepath: str) -> List[Dict]: """使用AST进行深度分析""" vulnerabilities = [] class SecurityVisitor(ast.NodeVisitor): def __init__(self, filename): self.filename = filename self.vulns = [] def visit_Call(self, node): # 检查危险函数调用 if isinstance(node.func, ast.Name): func_name = node.func.id # 检查eval/exec if func_name in ['eval', 'exec', 'compile']: self.vulns.append({ 'file': self.filename, 'type': 'CODE_INJECTION', 'description': f'直接调用{func_name}()函数', 'line': node.lineno, 'severity': 'CRITICAL' }) # 检查字符串格式化中的SQL if isinstance(node.func, ast.Attribute): if node.func.attr == 'execute': # 检查参数 for arg in node.args: if isinstance(arg, ast.JoinedStr): # f-string self.vulns.append({ 'file': self.filename, 'type': 'SQL_INJECTION', 'description': '在execute()中使用f-string', 'line': node.lineno, 'severity': 'CRITICAL' }) self.generic_visit(node) def visit_Assign(self, node): # 检查硬编码的密码/密钥 if isinstance(node.targets[0], ast.Name): target_name = node.targets[0].id.lower() if any(keyword in target_name for keyword in ['password', 'secret', 'key', 'token']): # 检查值是否为简单的字符串 if isinstance(node.value, ast.Constant): if isinstance(node.value.value, str): if len(node.value.value) > 0: self.vulns.append({ 'file': self.filename, 'type': 'HARDCODED_SECRET', 'description': f'硬编码的敏感值: {target_name}', 'line': node.lineno, 'severity': 'HIGH' }) self.generic_visit(node) visitor = SecurityVisitor(filepath) visitor.visit(tree) return visitor.vulns def scan_directory(self, directory: str, extensions: Set[str] = None) -> Dict: """扫描整个目录""" if extensions is None: extensions = {'.py', '.pyw'} results = { 'critical': [], 'high': [], 'medium': [], 'low': [], 'summary': { 'total_files': 0, 'vulnerabilities_found': 0, 'critical_count': 0, 'high_count': 0 } } directory_path = Path(directory) for file_path in directory_path.rglob('*'): if file_path.suffix in extensions: results['summary']['total_files'] += 1 vulns = self.scan_file(str(file_path)) if vulns: results['summary']['vulnerabilities_found'] += len(vulns) for vuln in vulns: severity = vuln.get('severity', 'MEDIUM').lower() if severity == 'critical': results['critical'].append(vuln) results['summary']['critical_count'] += 1 elif severity == 'high': results['high'].append(vuln) results['summary']['high_count'] += 1 elif severity == 'medium': results['medium'].append(vuln) else: results['low'].append(vuln) return results def generate_report(self, scan_results: Dict, output_format: str = 'text') -> str: """生成扫描报告""" if output_format == 'text': report_lines = [ "=" * 60, "Python安全漏洞扫描报告", "=" * 60, f"扫描文件总数: {scan_results['summary']['total_files']}", f"发现漏洞总数: {scan_results['summary']['vulnerabilities_found']}", f"严重漏洞: {scan_results['summary']['critical_count']}", f"高危漏洞: {scan_results['summary']['high_count']}", "", ] # 按严重程度分组显示 for severity, vulns in [ ('CRITICAL', scan_results['critical']), ('HIGH', scan_results['high']), ('MEDIUM', scan_results['medium']), ('LOW', scan_results['low']), ]: if vulns: report_lines.append(f"\n{severity}级别漏洞 ({len(vulns)}个):") report_lines.append("-" * 40) for i, vuln in enumerate(vulns[:10], 1): # 只显示前10个 line_info = f"Line {vuln.get('line', 'N/A')}" if 'line' in vuln else "" report_lines.append( f"{i}. {vuln['file']}{line_info}" ) report_lines.append(f" 类型: {vuln['type']}") report_lines.append(f" 描述: {vuln['description']}") report_lines.append("") return '\n'.join(report_lines) elif output_format == 'json': import json return json.dumps(scan_results, indent=2, ensure_ascii=False) else: raise ValueError(f"不支持的输出格式: {output_format}")# 使用示例if __name__ == "__main__": scanner = PythonVulnerabilityScanner() # 扫描当前目录 results = scanner.scan_directory('.') # 生成报告 report = scanner.generate_report(results, output_format='text') print(report) # 也可以生成JSON报告 json_report = scanner.generate_report(results, output_format='json') with open('security_scan_report.json', 'w') as f: f.write(json_report)
5.最佳实践
5.1 优先级防御措施
5.2 开发流程安全集成
"""开发流程中的安全集成"""# pre-commit配置示例"""# .pre-commit-config.yamlrepos: - repo: https://github.com/PyCQA/bandit rev: 1.7.5 hooks: - id: bandit args: ['-r', '.', '-ll'] - repo: https://github.com/PyCQA/safety rev: 2.3.5 hooks: - id: safety args: ['check', '--full-report'] - repo: local hooks: - id: custom-security-scan name: Custom Security Scan entry: python security_scanner.py language: system pass_filenames: false always_run: true"""# CI/CD安全流水线示例"""# .github/workflows/security.ymlname: Security Checkson: [push, pull_request]jobs: security: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Run Bandit run: | pip install bandit bandit -r . -f json -o bandit-report.json - name: Run Safety run: | pip install safety safety check --json --output safety-report.json - name: Dependency Audit run: | pip install pip-audit pip-audit --format json --output pip-audit-report.json - name: Upload Reports uses: actions/upload-artifact@v3 with: name: security-reports path: | bandit-report.json safety-report.json pip-audit-report.json"""
6. 应急响应与漏洞修复
6.1 漏洞修复优先级矩阵
"""漏洞修复优先级评估"""class VulnerabilityPriority: """漏洞优先级评估""" @staticmethod def calculate_priority(severity: str, exploitability: str, impact: str, prevalence: str) -> int: """计算修复优先级分数""" # 严重程度权重 severity_scores = { 'CRITICAL': 4, 'HIGH': 3, 'MEDIUM': 2, 'LOW': 1 } # 可利用性权重 exploit_scores = { 'EASY': 4, 'MODERATE': 3, 'DIFFICULT': 2, 'THEORETICAL': 1 } # 影响范围权重 impact_scores = { 'SYSTEM': 4, 'APPLICATION': 3, 'USER': 2, 'MINIMAL': 1 } # 普遍性权重 prevalence_scores = { 'WIDESPREAD': 4, 'COMMON': 3, 'OCCASIONAL': 2, 'RARE': 1 } score = ( severity_scores.get(severity.upper(), 0) + exploit_scores.get(exploitability.upper(), 0) + impact_scores.get(impact.upper(), 0) + prevalence_scores.get(prevalence.upper(), 0) ) return score @staticmethod def get_fix_timeline(priority_score: int) -> str: """根据优先级确定修复时间线""" if priority_score >= 14: return "立即修复 (24小时内)" elif priority_score >= 10: return "尽快修复 (72小时内)" elif priority_score >= 7: return "计划修复 (1周内)" elif priority_score >= 4: return "下次发布时修复" else: return "低优先级,监控即可"
6.2 安全补丁应用示例
"""安全补丁应用模板"""class SecurityPatch: """安全补丁应用""" def __init__(self, vulnerability_id: str, description: str): self.vulnerability_id = vulnerability_id self.description = description self.patch_applied = False self.verification_passed = False def apply_patch(self, code_before: str) -> str: """应用安全补丁""" # SQL注入修复 if 'SQL injection' in self.description: return self._fix_sql_injection(code_before) # XSS修复 elif 'XSS' in self.description: return self._fix_xss(code_before) # 命令注入修复 elif 'command injection' in self.description.lower(): return self._fix_command_injection(code_before) # 默认返回原代码 return code_before def _fix_sql_injection(self, code: str) -> str: """修复SQL注入漏洞""" import re # 查找字符串格式化的SQL patterns = [ # f-string SQL (r'(cursor\.execute\s*\(\s*)f(["\'])(.*?)\2', r"\1\2\3\2, \1)"), # % 格式化SQL (r'(cursor\.execute\s*\(\s*["\'])(.*?)(%s.*?)(["\'])', r"\1\2?\4, \1)"), # 字符串拼接SQL (r'cursor\.execute\s*\(\s*["\'][^"\']*\+\s*', r'cursor.execute('), ] fixed_code = code for pattern, replacement in patterns: fixed_code = re.sub(pattern, replacement, fixed_code, flags=re.DOTALL) return fixed_code def verify_fix(self, original_vuln: str, patched_code: str) -> bool: """验证补丁是否有效""" # 使用之前的安全扫描器验证 scanner = PythonVulnerabilityScanner() # 将补丁代码写入临时文件 import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tmp: tmp.write(patched_code) tmp_path = tmp.name try: vulns = scanner.scan_file(tmp_path) # 检查原始漏洞是否还存在 for vuln in vulns: if vuln['description'] == original_vuln: return False self.verification_passed = True return True finally: import os os.unlink(tmp_path)
总结
Python安全漏洞防范是一个持续的过程,需要:
核心要点回顾:
永远不要信任用户输入,始终进行验证和清理
使用安全的API和库,避免自己实现安全功能
保持依赖包更新,定期进行安全扫描
实施最小权限原则和安全默认配置
建立安全开发流程和应急响应机制
明天我们将进入认证授权机制的学习,这是保护应用访问控制的关键环节。