今天我们聚焦于Web安全基础,这是保护Python Web应用免受网络攻击的关键知识。无论是使用Flask、Django还是FastAPI,安全都是Web开发的第一要务。
1. Web安全威胁全景
2. Web安全核心漏洞与防护
2.1 跨站脚本攻击(XSS)
XSS类型与原理:
"""XSS攻击类型示例"""# 1. 反射型XSS(非持久化)# 攻击URL:http://example.com/search?q=<script>alert('XSS')</script>@app.route('/search')def search(): query = request.args.get('q', '') # ❌ 危险:直接渲染用户输入 return f"<h1>搜索结果:{query}</h1>"# 2. 存储型XSS(持久化)# 攻击:将恶意脚本存入数据库,其他用户访问时执行@app.route('/comment', methods=['POST'])def add_comment(): comment = request.form.get('comment', '') # ❌ 危险:直接存储并显示用户内容 save_to_database(comment) return "评论已保存"# 3. DOM型XSS(客户端执行)# 攻击:通过修改DOM而非服务器响应执行"""<script> // 危险:从URL读取并写入DOM const userInput = new URLSearchParams(window.location.search).get('input'); document.getElementById('output').innerHTML = userInput;</script>"""
"""XSS防护完整实现"""from markupsafe import escape, Markupimport htmlimport bleachfrom typing import Unionclass XSSProtector: """XSS防护器""" @staticmethod def escape_html(text: str) -> str: """转义HTML特殊字符""" # 使用标准库 return html.escape(text) @staticmethod def safe_render(text: str, is_trusted: bool = False) -> Union[str, Markup]: """安全渲染文本""" if is_trusted: # 仅当内容完全可信时 return Markup(text) else: # 默认转义 return escape(text) @staticmethod def sanitize_html(html_content: str, allowed_tags: list = None) -> str: """净化HTML,只允许安全标签和属性""" if allowed_tags is None: allowed_tags = [ 'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'br' ] allowed_attributes = { 'a': ['href', 'title', 'rel'], 'abbr': ['title'], 'acronym': ['title'], } # 使用bleach净化 cleaned = bleach.clean( html_content, tags=allowed_tags, attributes=allowed_attributes, strip=True, # 移除不允许的标签 strip_comments=True # 移除注释 ) # 添加rel="noopener noreferrer"到外部链接 cleaned = bleach.linkify(cleaned, callbacks=[ bleach.callbacks.nofollow, lambda attrs, new: XSSProtector._add_link_attributes(attrs, new) ]) return cleaned @staticmethod def _add_link_attributes(attrs: dict, is_new: bool) -> dict: """为链接添加安全属性""" if attrs.get('href', '').startswith(('http://', 'https://')): attrs['rel'] = 'noopener noreferrer' attrs['target'] = '_blank' return attrs @staticmethod def validate_input(input_data: Union[str, dict], max_length: int = 1000) -> bool: """验证输入数据""" if isinstance(input_data, str): # 检查长度 if len(input_data) > max_length: return False # 检查可疑模式 suspicious_patterns = [ r'<script[^>]*>', # script标签 r'javascript:', # JavaScript协议 r'on\w+\s*=', # 事件处理器 r'expression\s*\(', # CSS表达式 r'vbscript:', # VBScript ] import re for pattern in suspicious_patterns: if re.search(pattern, input_data, re.IGNORECASE): return False return True elif isinstance(input_data, dict): # 递归验证字典 return all(XSSProtector.validate_input(v) for v in input_data.values()) return True# Flask集成示例from flask import Flask, request, render_template_stringimport jinja2app = Flask(__name__)# 配置Jinja2自动转义app.jinja_env.autoescape = True# 自定义过滤器@app.template_filter('safe_html')def safe_html_filter(html_text): """模板过滤器:安全HTML""" return Markup(XSSProtector.sanitize_html(html_text))# 路由示例@app.route('/safe_render')def safe_render(): user_content = request.args.get('content', '') # 方法1:使用模板自动转义 return render_template_string(""" <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>安全渲染</title> </head> <body> <h1>用户内容:</h1> <div>{{ content }}</div> <!-- 自动转义 --> <h1>受信任内容:</h1> <div>{{ trusted_content|safe }}</div> <h1>净化HTML:</h1> <div>{{ raw_html|safe_html }}</div> </body> </html> """, content=user_content, trusted_content="<em>这是受信任的内容</em>", raw_html=user_content)# 设置安全HTTP头@app.after_requestdef add_security_headers(response): """添加安全HTTP头""" # Content Security Policy (CSP) - 最重要的XSS防护 csp_policy = "; ".join([ "default-src 'self'", "script-src 'self' 'unsafe-inline' https://cdn.example.com", "style-src 'self' 'unsafe-inline'", "img-src 'self' data: https:", "font-src 'self'", "connect-src 'self'", "frame-ancestors 'none'", # 防止点击劫持 "form-action 'self'", ]) response.headers['Content-Security-Policy'] = csp_policy response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' return response
2.2 跨站请求伪造(CSRF)
CSRF攻击原理:
<!-- 恶意网站中的CSRF攻击 --><html><body> <!-- 利用用户已登录状态发起请求 --> <formaction="https://bank.com/transfer"method="POST"id="csrf-form"> <inputtype="hidden"name="amount"value="1000"> <inputtype="hidden"name="to_account"value="attacker"> </form> <script> document.getElementById('csrf-form').submit(); </script></body></html>
"""CSRF防护完整实现"""import secretsfrom typing import Optional, Dictfrom datetime import datetime, timedeltaimport hmacimport hashlibfrom functools import wrapsclass CSRFTokenManager: """CSRF令牌管理器""" def __init__(self, secret_key: str = None): self.secret_key = secret_key or secrets.token_hex(32) self.token_expiry = timedelta(hours=1) self.token_store: Dict[str, dict] = {} def generate_token(self, session_id: str) -> str: """生成CSRF令牌""" token = secrets.token_urlsafe(32) expires_at = datetime.now() + self.token_expiry # 创建签名 data = f"{session_id}:{token}:{expires_at.timestamp()}" signature = hmac.new( self.secret_key.encode(), data.encode(), hashlib.sha256 ).hexdigest() # 存储令牌 self.token_store[session_id] = { 'token': token, 'expires_at': expires_at, 'signature': signature } return f"{token}.{signature}" def validate_token(self, session_id: str, token_with_signature: str) -> bool: """验证CSRF令牌""" if not session_id or not token_with_signature: return False if session_id not in self.token_store: return False stored_token = self.token_store[session_id] # 检查过期 if datetime.now() > stored_token['expires_at']: del self.token_store[session_id] return False # 分离令牌和签名 if '.' not in token_with_signature: return False token, signature = token_with_signature.rsplit('.', 1) # 验证令牌 if token != stored_token['token']: return False # 验证签名 data = f"{session_id}:{token}:{stored_token['expires_at'].timestamp()}" expected_signature = hmac.new( self.secret_key.encode(), data.encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected_signature): return False # 使用后失效(可选) # del self.token_store[session_id] return True def cleanup_expired_tokens(self): """清理过期令牌""" now = datetime.now() expired = [ sid for sid, token in self.token_store.items() if now > token['expires_at'] ] for sid in expired: del self.token_store[sid]# Flask CSRF防护装饰器from flask import session, request, abortcsrf_manager = CSRFTokenManager()def csrf_protect(f): """CSRF保护装饰器""" @wraps(f) def decorated_function(*args, **kwargs): if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']: # 获取令牌 token = None # 从表单获取 if request.form: token = request.form.get('csrf_token') # 从header获取(API请求) if not token: token = request.headers.get('X-CSRF-Token') # 从session获取ID session_id = session.get('session_id') if not token or not csrf_manager.validate_token(session_id, token): abort(403, description='CSRF token验证失败') return f(*args, **kwargs) return decorated_function# Flask集成示例@app.route('/transfer', methods=['POST'])@csrf_protectdef transfer_money(): """受CSRF保护的资金转账""" amount = request.form.get('amount') to_account = request.form.get('to_account') # 业务逻辑 return f"转账 {amount} 到 {to_account} 成功"@app.route('/get_csrf_token')def get_csrf_token(): """获取CSRF令牌(通常内嵌在表单中)""" session_id = session.get('session_id') if not session_id: session_id = secrets.token_hex(16) session['session_id'] = session_id token = csrf_manager.generate_token(session_id) return jsonify({ 'csrf_token': token, 'timestamp': datetime.now().isoformat() })# 表单模板示例"""<!DOCTYPE html><html><head> <meta name="csrf-token" content="{{ csrf_token }}"></head><body> <form action="/transfer" method="POST"> <input type="hidden" name="csrf_token" value="{{ csrf_token }}"> <input type="number" name="amount" placeholder="金额"> <input type="text" name="to_account" placeholder="收款账户"> <button type="submit">转账</button> </form> <script> // 为AJAX请求自动添加CSRF令牌 const csrfToken = document.querySelector('meta[name="csrf-token"]').content; fetch('/api/transfer', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': csrfToken }, body: JSON.stringify({ amount: 100, to_account: '12345' }) }); </script></body></html>"""
2.3 点击劫持防护
"""点击劫持防护实现"""from flask import make_responsedef clickjacking_protection(f): """点击劫持防护装饰器""" @wraps(f) def decorated_function(*args, **kwargs): response = make_response(f(*args, **kwargs)) # 设置X-Frame-Options response.headers['X-Frame-Options'] = 'DENY' # 或 'SAMEORIGIN' # 现代方式:Content Security Policy csp = response.headers.get('Content-Security-Policy', '') if 'frame-ancestors' not in csp: if csp: csp += '; ' csp += "frame-ancestors 'none'" response.headers['Content-Security-Policy'] = csp # 额外的点击劫持防护头 response.headers['X-Content-Type-Options'] = 'nosniff' return response return decorated_function# 使用示例@app.route('/sensitive_action')@clickjacking_protectiondef sensitive_action(): """受点击劫持保护的操作""" return render_template('sensitive.html')# JavaScript防护(额外保护)"""// 防止页面被嵌入iframeif (window !== window.top) { window.top.location = window.location;}// 或更优雅的方式if (window.self !== window.top) { document.documentElement.style.display = 'none'; window.top.location = window.self.location;}"""
3. 服务器端请求伪造(SSRF)防护
"""SSRF防护实现"""import socketimport ipaddressfrom urllib.parse import urlparsefrom typing import Optional, Tupleimport requestsimport reclass SSRFProtector: """SSRF防护器""" def __init__(self): # 不允许访问的IP范围 self.forbidden_networks = [ ipaddress.ip_network('127.0.0.0/8'), # 本地回环 ipaddress.ip_network('10.0.0.0/8'), # 私有网络 ipaddress.ip_network('172.16.0.0/12'), # 私有网络 ipaddress.ip_network('192.168.0.0/16'), # 私有网络 ipaddress.ip_network('0.0.0.0/8'), # 无效地址 ipaddress.ip_network('169.254.0.0/16'), # 链路本地 ipaddress.ip_network('224.0.0.0/4'), # 组播 ] # 允许的协议 self.allowed_schemes = {'http', 'https'} # 允许的端口 self.allowed_ports = {80, 443, 8080} def validate_url(self, url: str, allowed_hosts: set = None) -> Tuple[bool, str]: """验证URL是否安全""" try: parsed = urlparse(url) # 检查协议 if parsed.scheme not in self.allowed_schemes: return False, f"不允许的协议: {parsed.scheme}" # 解析主机名 hostname = parsed.hostname if not hostname: return False, "无效的URL" # 解析IP地址 try: # 尝试直接解析为IP ip = ipaddress.ip_address(hostname) except ValueError: # 如果是域名,解析为IP try: ip_info = socket.getaddrinfo(hostname, None) ip = ipaddress.ip_address(ip_info[0][4][0]) except socket.gaierror: return False, "无法解析主机名" # 检查是否在禁止的网络中 for network in self.forbidden_networks: if ip in network: return False, f"禁止访问的IP范围: {network}" # 检查端口 port = parsed.port if port and port not in self.allowed_ports: default_port = 443 if parsed.scheme == 'https' else 80 if port != default_port: return False, f"不允许的端口: {port}" # 检查允许的主机(白名单) if allowed_hosts: if hostname not in allowed_hosts: return False, f"主机不在白名单中: {hostname}" # 检查URL格式(防止畸形URL) if re.search(r'[<>"\']', url): return False, "URL包含非法字符" return True, "URL验证通过" except Exception as e: return False, f"URL验证失败: {str(e)}" def safe_fetch_url(self, url: str, timeout: int = 5) -> Optional[requests.Response]: """安全地获取URL内容""" # 验证URL is_valid, message = self.validate_url(url) if not is_valid: raise ValueError(f"不安全的URL: {message}") # 安全获取 try: response = requests.get( url, timeout=timeout, allow_redirects=False, # 禁止重定向(可被利用) verify=True # 验证SSL证书 ) # 检查响应内容类型 content_type = response.headers.get('Content-Type', '') if 'html' in content_type.lower(): # 对于HTML内容,进行额外检查 if self._contains_sensitive_data(response.text): raise ValueError("响应包含敏感数据") return response except requests.RequestException as e: raise RuntimeError(f"获取URL失败: {str(e)}") def _contains_sensitive_data(self, content: str) -> bool: """检查是否包含敏感数据""" sensitive_patterns = [ r'<meta\s+name=["\']keywords["\']', r'<title>.*(密码|秘钥|token|密钥).*</title>', r'["\'](api[_-]?key|secret|password|token)["\']\s*:\s*["\'][^"\']+["\']', ] for pattern in sensitive_patterns: if re.search(pattern, content, re.IGNORECASE): return True return False def sanitize_file_upload(self, file_content: bytes, filename: str) -> bytes: """净化上传的文件(防止XXE等攻击)""" # 检查文件类型 import magic mime = magic.Magic(mime=True) detected_type = mime.from_buffer(file_content[:1024]) # 允许的文件类型 allowed_types = { 'image/jpeg', 'image/png', 'image/gif', 'application/pdf', 'text/plain' } if detected_type not in allowed_types: raise ValueError(f"不允许的文件类型: {detected_type}") # 对于XML文件,移除DOCTYPE声明(防止XXE) if detected_type in ['text/xml', 'application/xml']: content_str = file_content.decode('utf-8', errors='ignore') # 移除DOCTYPE content_str = re.sub(r'<!DOCTYPE[^>[]*(\[[^]]*\])?>', '', content_str, flags=re.IGNORECASE) # 移除实体声明 content_str = re.sub(r'<!ENTITY[^>]*>', '', content_str, flags=re.IGNORECASE) file_content = content_str.encode('utf-8') return file_content# 使用示例@app.route('/fetch_url', methods=['POST'])def fetch_url(): """安全的URL获取端点""" url = request.form.get('url') if not url: return jsonify({'error': 'URL不能为空'}), 400 protector = SSRFProtector() try: # 验证URL is_valid, message = protector.validate_url(url) if not is_valid: return jsonify({'error': message}), 400 # 安全获取 response = protector.safe_fetch_url(url) # 处理响应 content = response.text[:1000] # 限制返回内容 return jsonify({ 'success': True, 'content': content, 'content_type': response.headers.get('Content-Type') }) except Exception as e: return jsonify({'error': str(e)}), 400# 白名单配置示例ALLOWED_HOSTS = { 'api.example.com', 'cdn.example.com', 'images.example.com'}@app.route('/proxy')def proxy(): """带白名单的代理""" url = request.args.get('url') protector = SSRFProtector() is_valid, message = protector.validate_url(url, ALLOWED_HOSTS) if not is_valid: return jsonify({'error': message}), 403 # ... 继续处理
4. Web框架安全特性
4.1 Django安全特性
"""Django安全配置示例"""# settings.py# 关键安全设置SECURE_BROWSER_XSS_FILTER = True # 启用XSS过滤器SECURE_CONTENT_TYPE_NOSNIFF = True # 阻止MIME类型嗅探X_FRAME_OPTIONS = 'DENY' # 点击劫持防护SECURE_REFERRER_POLICY = 'same-origin' # Referrer策略# HTTPS设置SECURE_SSL_REDIRECT = True # 重定向HTTP到HTTPSSECURE_HSTS_SECONDS = 31536000 # HSTS(1年)SECURE_HSTS_INCLUDE_SUBDOMAINS = TrueSECURE_HSTS_PRELOAD = True# Cookie安全SESSION_COOKIE_SECURE = True # 仅HTTPS传输SESSION_COOKIE_HTTPONLY = True # 防止JavaScript访问CSRF_COOKIE_SECURE = TrueCSRF_COOKIE_HTTPONLY = False # 需要JavaScript访问SESSION_COOKIE_SAMESITE = 'Lax' # CSRF防护# 密码安全AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', 'OPTIONS': { 'min_length': 12, } }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', },]# 自定义安全中间件MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', # ... 其他中间件]# Django视图安全from django.views.decorators.csrf import csrf_protectfrom django.views.decorators.clickjacking import xframe_options_denyfrom django.middleware.csrf import get_tokenfrom django.template.context_processors import csrf@csrf_protect@xframe_options_denydef secure_view(request): """安全的Django视图""" # 获取CSRF令牌 csrf_token = get_token(request) # 模板中自动包含CSRF令牌 return render(request, 'secure_template.html', { 'csrf_token': csrf_token })# Django模板安全"""<!-- 模板中自动转义 -->{{ user_input }} <!-- 自动转义 --><!-- 标记安全内容 -->{{ trusted_content|safe }}<!-- URL安全 -->{% url 'view_name' arg1 arg2 %} <!-- 安全的URL构建 -->"""
4.2 Flask安全扩展
"""Flask安全扩展配置"""from flask import Flaskfrom flask_wtf import CSRFProtectfrom flask_talisman import Talismanfrom flask_limiter import Limiterfrom flask_limiter.util import get_remote_addressimport secretsapp = Flask(__name__)# 密钥配置app.config['SECRET_KEY'] = secrets.token_hex(32)app.config['WTF_CSRF_SECRET_KEY'] = secrets.token_hex(32)# 启用CSRF保护csrf = CSRFProtect(app)# 启用Talisman安全头talisman = Talisman( app, content_security_policy={ 'default-src': "'self'", 'style-src': ["'self'", 'https://cdn.example.com'], 'script-src': ["'self'", 'https://cdn.example.com'], }, content_security_policy_nonce_in=['script-src'], force_https=True, session_cookie_secure=True, session_cookie_http_only=True, strict_transport_security=True, strict_transport_security_max_age=31536000, frame_options='DENY', frame_options_allow_from=None, referrer_policy='strict-origin-when-cross-origin', feature_policy={ 'geolocation': "'none'", 'camera': "'none'", 'microphone': "'none'", })# 启用速率限制limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "50 per hour"], storage_uri="memory://", # 生产环境用Redis strategy="fixed-window", # 或 "moving-window" enabled=True)# 安全路由示例@app.route('/api/sensitive')@limiter.limit("10 per minute") # 速率限制@csrf.exempt # 如果不需要CSRF(API端点)def sensitive_api(): """受保护的安全API""" # 添加安全头 response = jsonify({'status': 'success'}) response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Permitted-Cross-Domain-Policies'] = 'none' return response# Flask-Login集成from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_userlogin_manager = LoginManager()login_manager.init_app(app)login_manager.session_protection = "strong" # 会话保护@login_manager.user_loaderdef load_user(user_id): """用户加载回调""" return User.get(user_id)@app.route('/login', methods=['POST'])def login(): """登录端点""" username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) # 记录登录 log_security_event('login_success', user.id, request.remote_addr) return redirect(url_for('dashboard')) # 恒定时间响应 import bcrypt bcrypt.hashpw(b"dummy", bcrypt.gensalt()) log_security_event('login_failed', username, request.remote_addr) return "无效的凭据", 401@app.route('/dashboard')@login_requireddef dashboard(): """需要登录的页面""" return f"欢迎, {current_user.username}!"
4.3 FastAPI安全特性
"""FastAPI安全配置"""from fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import HTTPBasic, HTTPBasicCredentials, HTTPBearer, OAuth2PasswordBearerfrom fastapi.middleware.httpsredirect import HTTPSRedirectMiddlewarefrom fastapi.middleware.trustedhost import TrustedHostMiddlewareimport secretsfrom typing import Optionalapp = FastAPI( title="安全API", description="安全的FastAPI应用", version="1.0.0", docs_url="/docs", # 生产环境可禁用 redoc_url="/redoc")# 启用HTTPS重定向app.add_middleware(HTTPSRedirectMiddleware)# 信任的主机头app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com", "api.example.com"])# 安全方案security = HTTPBearer()oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)# 基本认证basic_auth = HTTPBasic()def verify_api_key(api_key: str = Depends(security)): """验证API密钥""" # 这里应该查询数据库 valid_keys = {"sk_test_123", "sk_live_456"} if api_key.credentials not in valid_keys: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的API密钥", headers={"WWW-Authenticate": "Bearer"} ) return api_key.credentialsdef verify_basic_auth(credentials: HTTPBasicCredentials = Depends(basic_auth)): """验证基本认证""" import bcrypt # 验证用户名密码 correct_username = secrets.compare_digest(credentials.username, "admin") correct_password = secrets.compare_digest(credentials.password, "secret") if not (correct_username and correct_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的凭据", headers={"WWW-Authenticate": "Basic"} ) return credentials.username@app.get("/secure")async def secure_endpoint( api_key: str = Depends(verify_api_key), current_user: Optional[str] = Depends(get_current_user)): """受保护的端点""" return { "message": "访问已授权", "api_key": api_key, "user": current_user }@app.get("/metrics", dependencies=[Depends(verify_basic_auth)])async def metrics(): """需要基本认证的端点""" return {"metrics": "系统指标"}# CORS配置from fastapi.middleware.cors import CORSMiddlewareapp.add_middleware( CORSMiddleware, allow_origins=["https://example.com"], # 明确的来源 allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"], expose_headers=["X-Request-ID"], max_age=600 # 预检请求缓存时间)# 速率限制(需要额外扩展)"""# pip install slowapifrom slowapi import Limiter, _rate_limit_exceeded_handlerfrom slowapi.util import get_remote_addressfrom slowapi.errors import RateLimitExceededlimiter = Limiter(key_func=get_remote_address)app.state.limiter = limiterapp.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)@app.get("/limited")@limiter.limit("5/minute")async def limited_endpoint(): return {"message": "速率限制测试"}"""
5. Web安全测试与监控
5.1 自动化安全测试
"""Web安全自动化测试"""import unittestfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECimport requestsimport jsonclass WebSecurityTests(unittest.TestCase): """Web安全测试套件""" def setUp(self): """测试设置""" self.base_url = "http://localhost:5000" self.session = requests.Session() # Selenium设置 options = webdriver.ChromeOptions() options.add_argument('--headless') # 无头模式 options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') self.driver = webdriver.Chrome(options=options) self.wait = WebDriverWait(self.driver, 10) def tearDown(self): """测试清理""" self.driver.quit() def test_csrf_protection(self): """测试CSRF防护""" # 尝试没有CSRF令牌的POST请求 response = self.session.post( f"{self.base_url}/transfer", data={"amount": 1000, "to_account": "attacker"} ) # 应该返回403或类似的错误 self.assertIn(response.status_code, [403, 400]) # 日志中应该记录CSRF尝试 log_entries = self.get_security_logs() self.assertTrue(any("CSRF" in entry for entry in log_entries)) def test_xss_protection(self): """测试XSS防护""" # 测试反射型XSS xss_payload = "<script>alert('XSS')</script>" response = self.session.get( f"{self.base_url}/search", params={"q": xss_payload} ) # 检查响应中是否转义了脚本标签 self.assertNotIn("<script>", response.text) self.assertIn("<script>", response.text) # 使用Selenium测试DOM XSS self.driver.get(f"{self.base_url}/test_xss") # 注入脚本 self.driver.execute_script(f""" document.getElementById('input').value = '{xss_payload}'; document.getElementById('form').submit(); """) # 检查是否执行了脚本 alerts = self.driver.switch_to.alert if self.is_alert_present() else None self.assertIsNone(alerts, "XSS防护失败:脚本被执行") def test_sql_injection(self): """测试SQL注入防护""" # 常见的SQL注入payload sql_payloads = [ "' OR '1'='1", "'; DROP TABLE users; --", "' UNION SELECT username, password FROM users --" ] for payload in sql_payloads: response = self.session.post( f"{self.base_url}/login", data={"username": payload, "password": "test"} ) # 不应该返回数据库错误信息 self.assertNotIn("syntax error", response.text.lower()) self.assertNotIn("mysql", response.text.lower()) self.assertNotIn("postgresql", response.text.lower()) def test_clickjacking_protection(self): """测试点击劫持防护""" response = self.session.get(f"{self.base_url}/sensitive_page") headers = response.headers # 检查安全头 self.assertIn('X-Frame-Options', headers) self.assertEqual(headers['X-Frame-Options'].upper(), 'DENY') # 检查CSP if 'Content-Security-Policy' in headers: self.assertIn("frame-ancestors", headers['Content-Security-Policy']) def test_secure_headers(self): """测试安全HTTP头""" response = self.session.get(f"{self.base_url}/") headers = response.headers # 必需的安全头 required_headers = { 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': ['DENY', 'SAMEORIGIN'], 'X-XSS-Protection': '1; mode=block', } for header, expected in required_headers.items(): self.assertIn(header, headers) if isinstance(expected, list): self.assertIn(headers[header], expected) else: self.assertEqual(headers[header], expected) # 推荐的HSTS头(如果使用HTTPS) if self.base_url.startswith("https://"): self.assertIn('Strict-Transport-Security', headers) def test_rate_limiting(self): """测试速率限制""" endpoints = [ "/login", "/api/token", "/password_reset" ] for endpoint in endpoints: # 发送快速请求 responses = [] for _ in range(20): response = self.session.post( f"{self.base_url}{endpoint}", data={"test": "data"} ) responses.append(response.status_code) # 应该有一些请求被限制(429) self.assertIn(429, responses, f"速率限制未生效:{endpoint}") def test_session_security(self): """测试会话安全""" # 登录获取会话 response = self.session.post( f"{self.base_url}/login", data={"username": "test", "password": "test"} ) cookies = self.session.cookies # 检查cookie属性 for cookie in cookies: if cookie.name.startswith('session'): self.assertTrue(cookie.secure, "会话cookie应该启用Secure") self.assertTrue(cookie.has_nonstandard_attr('HttpOnly'), "会话cookie应该启用HttpOnly") self.assertEqual(cookie.get_nonstandard_attr('SameSite'), 'Lax', "会话cookie应该设置SameSite=Lax") def test_error_handling(self): """测试错误处理""" # 触发错误 response = self.session.get(f"{self.base_url}/nonexistent") # 检查是否泄露敏感信息 sensitive_patterns = [ r'stack trace', r'file:.*\.py', r'line \d+', r'database password', r'api[_-]?key', r'secret' ] import re for pattern in sensitive_patterns: matches = re.findall(pattern, response.text, re.IGNORECASE) self.assertEqual(len(matches), 0, f"错误响应泄露了敏感信息: {matches}") def get_security_logs(self): """获取安全日志""" # 这里实现从日志文件或数据库获取日志 return [] def is_alert_present(self): """检查是否有alert""" try: self.driver.switch_to.alert return True except: return False# 运行测试if __name__ == '__main__': unittest.main()
5.2 安全监控与告警
"""Web安全监控系统"""import loggingfrom datetime import datetime, timedeltafrom collections import defaultdictfrom typing import Dict, List, Optionalimport reimport smtplibfrom email.mime.text import MIMETextfrom dataclasses import dataclass@dataclassclass SecurityEvent: """安全事件""" timestamp: datetime event_type: str severity: str # LOW, MEDIUM, HIGH, CRITICAL source_ip: str user_agent: str endpoint: str details: Dict user_id: Optional[str] = Noneclass SecurityMonitor: """安全监控器""" def __init__(self, alert_thresholds: Dict = None): self.events: List[SecurityEvent] = [] # 告警阈值 self.thresholds = alert_thresholds or { 'failed_logins': {'count': 5, 'window': 300}, # 5次/5分钟 'xss_attempts': {'count': 3, 'window': 300}, 'sql_injection': {'count': 2, 'window': 300}, 'rate_limit': {'count': 10, 'window': 60}, } # 配置日志 self.logger = logging.getLogger('security_monitor') self.logger.setLevel(logging.INFO) # 添加文件处理器 handler = logging.FileHandler('security.log') handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) def log_event(self, event: SecurityEvent): """记录安全事件""" self.events.append(event) # 写入日志 log_message = ( f"{event.event_type} - {event.severity} - " f"IP: {event.source_ip} - " f"Endpoint: {event.endpoint} - " f"User: {event.user_id or'anonymous'}" ) if event.severity == 'CRITICAL': self.logger.critical(log_message, extra=event.details) elif event.severity == 'HIGH': self.logger.error(log_message, extra=event.details) elif event.severity == 'MEDIUM': self.logger.warning(log_message, extra=event.details) else: self.logger.info(log_message, extra=event.details) # 检查是否需要告警 self._check_alerts(event) def _check_alerts(self, event: SecurityEvent): """检查是否需要发送告警""" # 按事件类型和时间窗口过滤 window_start = datetime.now() - timedelta( seconds=self.thresholds.get(event.event_type, {}).get('window', 300) ) recent_events = [ e for e in self.events if e.event_type == event.event_type and e.timestamp > window_start ] threshold = self.thresholds.get(event.event_type, {}).get('count', 0) if len(recent_events) >= threshold: self._send_alert(event.event_type, recent_events) def _send_alert(self, event_type: str, events: List[SecurityEvent]): """发送告警""" alert_subject = f"安全告警: {event_type}" alert_body = f""" 安全告警通知 事件类型: {event_type} 发生时间: {datetime.now().isoformat()} 事件数量: {len(events)} 涉及IP: {', '.join(set(e.source_ip for e in events))} 最近事件: """ for i, event in enumerate(events[-5:], 1): # 只显示最近5个 alert_body += f""" {i}. 时间: {event.timestamp.isoformat()} 来源IP: {event.source_ip} 端点: {event.endpoint} 用户代理: {event.user_agent[:50]} """ # 发送邮件(示例) self._send_email( to=["security-team@example.com", "admin@example.com"], subject=alert_subject, body=alert_body ) # 也可以集成其他告警方式:Slack、Webhook等 self._send_slack_alert(event_type, events) self._send_webhook_alert(event_type, events) def _send_email(self, to: List[str], subject: str, body: str): """发送邮件告警""" try: msg = MIMEText(body, 'plain', 'utf-8') msg['Subject'] = subject msg['From'] = 'security-monitor@example.com' msg['To'] = ', '.join(to) # 实际发送邮件 # with smtplib.SMTP('smtp.example.com') as server: # server.send_message(msg) self.logger.info(f"已发送邮件告警: {subject}") except Exception as e: self.logger.error(f"发送邮件失败: {e}") def _send_slack_alert(self, event_type: str, events: List[SecurityEvent]): """发送Slack告警""" # 实现Slack集成 pass def _send_webhook_alert(self, event_type: str, events: List[SecurityEvent]): """发送Webhook告警""" # 实现Webhook集成 pass def analyze_patterns(self): """分析攻击模式""" # 按IP分析 ip_analysis = defaultdict(list) for event in self.events[-1000:]: # 分析最近1000个事件 ip_analysis[event.source_ip].append(event) # 识别可疑IP suspicious_ips = [] for ip, events in ip_analysis.items(): # 多种攻击尝试 event_types = {e.event_type for e in events} if len(event_types) >= 3: suspicious_ips.append({ 'ip': ip, 'event_count': len(events), 'event_types': list(event_types) }) return suspicious_ips def generate_report(self, days: int = 7) -> Dict: """生成安全报告""" cutoff = datetime.now() - timedelta(days=days) recent_events = [e for e in self.events if e.timestamp > cutoff] report = { 'period': f"{days}天", 'total_events': len(recent_events), 'by_severity': defaultdict(int), 'by_type': defaultdict(int), 'top_ips': defaultdict(int), 'top_endpoints': defaultdict(int), 'timeline': defaultdict(int) } for event in recent_events: # 按严重程度统计 report['by_severity'][event.severity] += 1 # 按事件类型统计 report['by_type'][event.event_type] += 1 # 按IP统计 report['top_ips'][event.source_ip] += 1 # 按端点统计 report['top_endpoints'][event.endpoint] += 1 # 按小时统计时间线 hour = event.timestamp.strftime('%Y-%m-%d %H:00') report['timeline'][hour] += 1 return report# Flask中间件集成class SecurityMonitoringMiddleware: """安全监控中间件""" def __init__(self, app, monitor: SecurityMonitor): self.app = app self.monitor = monitor def __call__(self, environ, start_response): # 请求前处理 request_start = datetime.now() def custom_start_response(status, headers, exc_info=None): # 请求后处理 request_end = datetime.now() duration = (request_end - request_start).total_seconds() # 提取信息 path = environ.get('PATH_INFO', '') method = environ.get('REQUEST_METHOD', '') ip = environ.get('REMOTE_ADDR', '') user_agent = environ.get('HTTP_USER_AGENT', '') # 检查可疑请求 self._check_suspicious_request( path, method, ip, user_agent, duration, status ) return start_response(status, headers, exc_info) return self.app(environ, custom_start_response) def _check_suspicious_request(self, path, method, ip, user_agent, duration, status): """检查可疑请求""" # 检查SQL注入 if self._detect_sql_injection(path): event = SecurityEvent( timestamp=datetime.now(), event_type='sql_injection', severity='HIGH', source_ip=ip, user_agent=user_agent, endpoint=path, details={'method': method, 'path': path} ) self.monitor.log_event(event) # 检查XSS尝试 if self._detect_xss_attempt(path): event = SecurityEvent( timestamp=datetime.now(), event_type='xss_attempt', severity='MEDIUM', source_ip=ip, user_agent=user_agent, endpoint=path, details={'method': method, 'path': path} ) self.monitor.log_event(event) # 检查异常响应时间 if duration > 5: # 5秒阈值 event = SecurityEvent( timestamp=datetime.now(), event_type='slow_request', severity='LOW', source_ip=ip, user_agent=user_agent, endpoint=path, details={'method': method, 'duration': duration, 'status': status} ) self.monitor.log_event(event) # 检查可疑用户代理 if self._is_suspicious_user_agent(user_agent): event = SecurityEvent( timestamp=datetime.now(), event_type='suspicious_ua', severity='LOW', source_ip=ip, user_agent=user_agent, endpoint=path, details={'method': method, 'user_agent': user_agent} ) self.monitor.log_event(event) def _detect_sql_injection(self, path: str) -> bool: """检测SQL注入尝试""" sql_patterns = [ r"'\s+OR\s+['1']=[('1']", r"'\s+UNION\s+SELECT", r"';.*--", r"'\s+AND\s+\d+=\d+", r"EXEC(\s+|\().*", ] for pattern in sql_patterns: if re.search(pattern, path, re.IGNORECASE): return True return False def _detect_xss_attempt(self, path: str) -> bool: """检测XSS尝试""" xss_patterns = [ r"<script[^>]*>", r"javascript:", r"on\w+\s*=", r"expression\s*\(", r"vbscript:", ] for pattern in xss_patterns: if re.search(pattern, path, re.IGNORECASE): return True return False def _is_suspicious_user_agent(self, user_agent: str) -> bool: """检查可疑用户代理""" suspicious_agents = [ 'sqlmap', 'nmap', 'nikto', 'wget', 'curl', 'acunetix', 'appscan', 'burp', 'zap' ] ua_lower = user_agent.lower() return any(agent in ua_lower for agent in suspicious_agents)
总结
Python Web安全是一个多层次、持续的过程:
关键行动:
从项目开始就考虑安全设计
使用安全框架和库,避免自己实现安全功能
实施自动化安全测试和监控
建立安全开发生命周期(SDLC)
持续学习和适应新的安全威胁
明天我们将进入网络编程基础的学习,这是理解Web底层通信和安全的基础。