经过安全编程和漏洞防范的学习,今天我们聚焦于应用安全的核心——认证与授权机制,这是保护系统访问控制的关键防线。
1. 认证与授权基础概念
1.1 核心区别
| | | |
|---|
| 认证 (Authentication) | | | |
| 授权 (Authorization) | | | |
1.2 Python认证授权生态系统
2. 完整的认证系统实现
2.1 用户模型与密码管理
"""安全的用户认证系统"""import bcryptimport secretsfrom datetime import datetime, timedeltafrom typing import Optional, Dict, Listfrom dataclasses import dataclass, asdictfrom enum import Enumimport reclass PasswordStrength(Enum): """密码强度级别""" WEAK = 1 MEDIUM = 2 STRONG = 3@dataclassclass User: """用户数据模型""" id: str username: str email: str password_hash: bytes is_active: bool = True is_locked: bool = False failed_attempts: int = 0 last_login: Optional[datetime] = None created_at: datetime = datetime.now() mfa_enabled: bool = False mfa_secret: Optional[str] = None def to_dict(self) -> Dict: """转换为字典,排除敏感信息""" data = asdict(self) # 移除敏感字段 data.pop('password_hash', None) data.pop('mfa_secret', None) # 转换datetime为字符串 for key, value in data.items(): if isinstance(value, datetime): data[key] = value.isoformat() return dataclass PasswordValidator: """密码验证器""" @staticmethod def validate_strength(password: str) -> PasswordStrength: """验证密码强度""" score = 0 # 长度检查 if len(password) >= 12: score += 1 # 字符多样性 if re.search(r'[a-z]', password): score += 1 if re.search(r'[A-Z]', password): score += 1 if re.search(r'\d', password): score += 1 if re.search(r'[!@#$%^&*(),.?":{}|<>]', password): score += 1 # 常见密码检查 common_passwords = { 'password', '123456', 'qwerty', 'admin', 'welcome' } if password.lower() in common_passwords: return PasswordStrength.WEAK # 评估强度 if score >= 4: return PasswordStrength.STRONG elif score >= 3: return PasswordStrength.MEDIUM else: return PasswordStrength.WEAK @staticmethod def is_breached(password: str, cache_file: str = 'breached_passwords.txt') -> bool: """检查密码是否在已知泄露密码中""" try: with open(cache_file, 'r') as f: breached_passwords = set(line.strip() for line in f) # 简单实现:实际中应使用哈希值检查 return password in breached_passwords except FileNotFoundError: # 可在此处集成HaveIBeenPwned API return Falseclass AuthenticationManager: """认证管理器""" def __init__(self, pepper: Optional[str] = None): # 全局pepper(应存储在安全的地方) self.pepper = pepper or secrets.token_hex(32) def hash_password(self, password: str) -> bytes: """安全地哈希密码""" # 添加pepper增加安全性 peppered_password = password + self.pepper # 使用bcrypt(推荐) salt = bcrypt.gensalt(rounds=14) # 适当的工作因子 password_hash = bcrypt.hashpw(peppered_password.encode(), salt) return password_hash def verify_password(self, password: str, password_hash: bytes) -> bool: """验证密码(恒定时间比较)""" # 恒定时间比较防止时序攻击 peppered_password = password + self.pepper # 使用bcrypt的恒定时间比较 try: return bcrypt.checkpw(peppered_password.encode(), password_hash) except (ValueError, TypeError): # 如果哈希格式无效,也返回False但保持恒定时间 bcrypt.hashpw(b"dummy", bcrypt.gensalt()) # 恒定时间操作 return False def generate_session_token(self, user_id: str) -> str: """生成安全的会话令牌""" # 使用secrets模块生成密码学安全的令牌 token = secrets.token_urlsafe(64) # 添加用户ID和过期时间的签名 import hashlib import hmac import json payload = { 'user_id': user_id, 'token': token, 'expires': (datetime.now() + timedelta(hours=24)).isoformat() } # 创建签名 signature = hmac.new( self.pepper.encode(), json.dumps(payload, sort_keys=True).encode(), hashlib.sha256 ).hexdigest() # 返回组合令牌 return f"{token}.{signature}" def validate_session_token(self, token: str) -> Optional[str]: """验证会话令牌""" if not token or '.' not in token: return None token_part, signature = token.rsplit('.', 1) # 实际实现中会解码payload并验证签名 # 这里简化处理 return token_part if len(signature) == 64 else None
2.2 多因素认证实现
"""多因素认证(MFA)实现"""import pyotpimport qrcodeimport base64from io import BytesIOfrom typing import Tuple, Optionalimport timeclass MFAManager: """多因素认证管理器""" def __init__(self): self.totp_window = 1 # 允许的时间窗口 def setup_mfa(self, user_id: str, user_email: str) -> Tuple[str, bytes]: """设置MFA,返回密钥和QR码""" # 生成随机密钥 secret = pyotp.random_base32() # 创建TOTP对象 totp = pyotp.TOTP(secret) # 生成配置URI issuer = "MySecureApp" uri = totp.provisioning_uri( name=user_email, issuer_name=issuer ) # 生成QR码 qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(uri) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") # 转换为bytes img_bytes = BytesIO() img.save(img_bytes, format='PNG') img_bytes.seek(0) return secret, img_bytes.getvalue() def verify_totp(self, secret: str, code: str) -> bool: """验证TOTP代码""" totp = pyotp.TOTP(secret) # 验证当前时间窗口 current_time = time.time() return totp.verify(code, for_time=int(current_time), valid_window=self.totp_window) def generate_backup_codes(self, count: int = 10) -> List[str]: """生成备份代码""" backup_codes = [] for _ in range(count): # 生成易于输入的代码 code = f"{secrets.randbelow(10000):04d}-{secrets.randbelow(10000):04d}" backup_codes.append(code) # 安全存储这些代码的哈希值 return backup_codes def verify_backup_code(self, provided_code: str, stored_hashes: List[str]) -> bool: """验证备份代码""" import hashlib # 对提供的代码进行哈希 code_hash = hashlib.sha256(provided_code.encode()).hexdigest() # 恒定时间比较 for stored_hash in stored_hashes: if secrets.compare_digest(code_hash, stored_hash): return True return Falseclass RateLimitedMFAManager(MFAManager): """带有速率限制的MFA管理器""" def __init__(self): super().__init__() self.failed_attempts: Dict[str, List[float]] = {} self.max_attempts = 5 self.lockout_time = 300 # 5分钟 def verify_with_rate_limit(self, user_id: str, secret: str, code: str) -> Tuple[bool, Optional[str]]: """带速率限制的验证""" # 检查是否被锁定 if self._is_locked(user_id): lockout_until = self.failed_attempts[user_id][0] + self.lockout_time remaining = lockout_until - time.time() return False, f"Account locked. Try again in {int(remaining)} seconds" # 验证代码 is_valid = self.verify_totp(secret, code) if not is_valid: # 记录失败尝试 self._record_failure(user_id) attempts = len(self.failed_attempts.get(user_id, [])) remaining_attempts = self.max_attempts - attempts if remaining_attempts <= 0: return False, "Account locked due to too many failed attempts" else: return False, f"Invalid code. {remaining_attempts} attempts remaining" # 验证成功,清除失败记录 if user_id in self.failed_attempts: del self.failed_attempts[user_id] return True, None def _record_failure(self, user_id: str): """记录失败尝试""" if user_id not in self.failed_attempts: self.failed_attempts[user_id] = [] current_time = time.time() self.failed_attempts[user_id].append(current_time) # 清理过期的失败记录 cutoff = current_time - self.lockout_time self.failed_attempts[user_id] = [ t for t in self.failed_attempts[user_id] if t > cutoff ] def _is_locked(self, user_id: str) -> bool: """检查账户是否被锁定""" if user_id not in self.failed_attempts: return False attempts = self.failed_attempts[user_id] # 检查是否超过最大尝试次数 if len(attempts) >= self.max_attempts: # 检查锁定期是否已过 oldest_attempt = attempts[0] if time.time() - oldest_attempt < self.lockout_time: return True return False
3. 授权系统实现
3.1 基于角色的访问控制(RBAC)
"""基于角色的访问控制系统"""from enum import Enumfrom typing import Set, List, Dictfrom dataclasses import dataclass, fieldimport jsonclass Permission(Enum): """权限枚举""" # 用户权限 USER_CREATE = "user:create" USER_READ = "user:read" USER_UPDATE = "user:update" USER_DELETE = "user:delete" # 文章权限 ARTICLE_CREATE = "article:create" ARTICLE_READ = "article:read" ARTICLE_UPDATE = "article:update" ARTICLE_DELETE = "article:delete" # 管理权限 ADMIN_ACCESS = "admin:access" SYSTEM_CONFIG = "system:config" AUDIT_LOG = "audit:log"@dataclassclass Role: """角色定义""" name: str description: str permissions: Set[Permission] = field(default_factory=set) inherits: List[str] = field(default_factory=list) # 继承的角色 def has_permission(self, permission: Permission) -> bool: """检查是否拥有指定权限""" return permission in self.permissions def add_permission(self, permission: Permission): """添加权限""" self.permissions.add(permission) def remove_permission(self, permission: Permission): """移除权限""" self.permissions.discard(permission)class RBACManager: """RBAC管理器""" def __init__(self): self.roles: Dict[str, Role] = {} self._initialize_default_roles() def _initialize_default_roles(self): """初始化默认角色""" # 管理员角色 admin_role = Role( name="admin", description="系统管理员", permissions={ Permission.USER_CREATE, Permission.USER_READ, Permission.USER_UPDATE, Permission.USER_DELETE, Permission.ARTICLE_CREATE, Permission.ARTICLE_READ, Permission.ARTICLE_UPDATE, Permission.ARTICLE_DELETE, Permission.ADMIN_ACCESS, Permission.SYSTEM_CONFIG, Permission.AUDIT_LOG, } ) # 编辑角色 editor_role = Role( name="editor", description="内容编辑", permissions={ Permission.ARTICLE_CREATE, Permission.ARTICLE_READ, Permission.ARTICLE_UPDATE, Permission.ARTICLE_DELETE, Permission.USER_READ, } ) # 查看者角色 viewer_role = Role( name="viewer", description="内容查看者", permissions={ Permission.ARTICLE_READ, Permission.USER_READ, } ) # 注册角色 self.add_role(admin_role) self.add_role(editor_role) self.add_role(viewer_role) def add_role(self, role: Role): """添加角色""" self.roles[role.name] = role def get_user_permissions(self, user_roles: List[str]) -> Set[Permission]: """获取用户的所有权限(考虑继承)""" all_permissions = set() visited_roles = set() def collect_permissions(role_name: str): """递归收集权限""" if role_name in visited_roles: return visited_roles.add(role_name) role = self.roles.get(role_name) if role: # 添加当前角色的权限 all_permissions.update(role.permissions) # 递归处理继承的角色 for inherited_role in role.inherits: collect_permissions(inherited_role) # 收集所有角色的权限 for role_name in user_roles: collect_permissions(role_name) return all_permissions def check_permission(self, user_roles: List[str], permission: Permission) -> bool: """检查用户是否拥有指定权限""" user_permissions = self.get_user_permissions(user_roles) return permission in user_permissions def create_custom_role(self, name: str, description: str, base_roles: List[str] = None, additional_permissions: List[Permission] = None) -> Role: """创建自定义角色""" custom_role = Role(name=name, description=description) # 继承基础角色的权限 if base_roles: for base_role in base_roles: if base_role in self.roles: custom_role.permissions.update(self.roles[base_role].permissions) # 添加额外权限 if additional_permissions: for perm in additional_permissions: custom_role.add_permission(perm) # 记录继承关系 if base_roles: custom_role.inherits.extend(base_roles) self.add_role(custom_role) return custom_roleclass AuthorizationMiddleware: """授权中间件""" def __init__(self, rbac_manager: RBACManager): self.rbac = rbac_manager def authorize(self, user_roles: List[str], required_permission: Permission, resource_owner: str = None, user_id: str = None) -> bool: """授权检查""" # 基本权限检查 if not self.rbac.check_permission(user_roles, required_permission): return False # 资源所有者检查(如果适用) if resource_owner and user_id: # 如果是资源所有者,可能拥有额外权限 if resource_owner == user_id: # 检查是否有所有者权限 owner_permission = Permission(f"{required_permission.value}:own") if owner_permission in self.rbac.get_user_permissions(user_roles): return True return True def filter_resources(self, user_roles: List[str], resources: List[Dict], required_permission: Permission) -> List[Dict]: """过滤用户有权访问的资源""" user_permissions = self.rbac.get_user_permissions(user_roles) if required_permission not in user_permissions: return [] # 在实际应用中,这里可能包含更复杂的逻辑 # 例如:基于资源属性或所有权的过滤 return resources
3.2 基于属性的访问控制(ABAC)
"""基于属性的访问控制系统"""from typing import Any, Dict, Callablefrom enum import Enumimport reclass AttributeType(Enum): """属性类型""" USER = "user" RESOURCE = "resource" ACTION = "action" ENVIRONMENT = "environment"class PolicyCondition: """策略条件""" def __init__(self, attribute: str, operator: str, value: Any): self.attribute = attribute # 例如: user.role, resource.type self.operator = operator # eq, neq, gt, lt, in, contains, regex self.value = value def evaluate(self, context: Dict[str, Any]) -> bool: """评估条件""" # 获取属性值 attr_value = self._get_attribute_value(self.attribute, context) # 根据操作符评估 if self.operator == "eq": return attr_value == self.value elif self.operator == "neq": return attr_value != self.value elif self.operator == "gt": return attr_value > self.value elif self.operator == "lt": return attr_value < self.value elif self.operator == "in": return attr_value in self.value elif self.operator == "contains": return self.value in attr_value elif self.operator == "regex": return bool(re.match(self.value, str(attr_value))) else: raise ValueError(f"Unknown operator: {self.operator}") def _get_attribute_value(self, attribute_path: str, context: Dict) -> Any: """从上下文中获取属性值""" parts = attribute_path.split('.') value = context for part in parts: if isinstance(value, dict) and part in value: value = value[part] else: # 尝试从不同命名空间获取 for namespace in ['user', 'resource', 'action', 'environment']: if namespace in context and isinstance(context[namespace], dict): if part in context[namespace]: value = context[namespace][part] break else: return None return valueclass ABACPolicy: """ABAC策略""" def __init__(self, name: str, effect: str = "allow"): self.name = name self.effect = effect # allow 或 deny self.conditions: List[PolicyCondition] = [] def add_condition(self, condition: PolicyCondition): """添加条件""" self.conditions.append(condition) def evaluate(self, context: Dict[str, Any]) -> bool: """评估策略""" # 所有条件都必须满足 for condition in self.conditions: if not condition.evaluate(context): return False return Trueclass ABACManager: """ABAC管理器""" def __init__(self): self.policies: List[ABACPolicy] = [] def add_policy(self, policy: ABACPolicy): """添加策略""" self.policies.append(policy) def evaluate_access(self, context: Dict[str, Any]) -> Dict[str, Any]: """评估访问请求""" result = { "allowed": False, "denied": False, "evaluated_policies": [], "matching_policies": [], "denying_policies": [] } # 按特定顺序评估策略 # 通常:显式deny优先,然后显式allow for policy in self.policies: if policy.evaluate(context): result["evaluated_policies"].append(policy.name) if policy.effect == "deny": result["denied"] = True result["denying_policies"].append(policy.name) elif policy.effect == "allow": result["matching_policies"].append(policy.name) # 确定最终结果 if result["denied"]: result["allowed"] = False elif result["matching_policies"]: result["allowed"] = True return result def create_time_based_policy(self, name: str, start_time: str, end_time: str, days_of_week: List[int] = None) -> ABACPolicy: """创建基于时间的策略""" policy = ABACPolicy(name, effect="allow") # 时间条件 time_condition = PolicyCondition( attribute="environment.current_time", operator="gt", value=start_time ) policy.add_condition(time_condition) time_condition2 = PolicyCondition( attribute="environment.current_time", operator="lt", value=end_time ) policy.add_condition(time_condition2) # 星期几条件(如果指定) if days_of_week: day_condition = PolicyCondition( attribute="environment.day_of_week", operator="in", value=days_of_week ) policy.add_condition(day_condition) return policy def create_location_based_policy(self, name: str, allowed_ips: List[str] = None, allowed_countries: List[str] = None) -> ABACPolicy: """创建基于位置的策略""" policy = ABACPolicy(name, effect="allow") # IP地址条件 if allowed_ips: ip_condition = PolicyCondition( attribute="environment.client_ip", operator="in", value=allowed_ips ) policy.add_condition(ip_condition) # 国家条件(需要IP地理位置服务) if allowed_countries: country_condition = PolicyCondition( attribute="environment.country", operator="in", value=allowed_countries ) policy.add_condition(country_condition) return policy# 使用示例def demonstrate_abac(): """演示ABAC使用""" abac = ABACManager() # 创建时间限制策略:只在工作时间访问 work_hours_policy = abac.create_time_based_policy( name="work_hours_only", start_time="09:00", end_time="17:00", days_of_week=[1, 2, 3, 4, 5] # 周一到周五 ) # 创建位置限制策略:只允许公司IP访问 company_ip_policy = abac.create_location_based_policy( name="company_network_only", allowed_ips=["192.168.1.0/24", "10.0.0.0/8"] ) # 添加自定义策略:只有高级用户可以访问敏感数据 sensitive_data_policy = ABACPolicy("sensitive_data_access", effect="allow") sensitive_data_policy.add_condition(PolicyCondition( attribute="user.role", operator="eq", value="senior_analyst" )) sensitive_data_policy.add_condition(PolicyCondition( attribute="resource.sensitivity_level", operator="lte", # 小于等于 value=user_clearance_level # 从用户属性获取 )) abac.add_policy(work_hours_policy) abac.add_policy(company_ip_policy) abac.add_policy(sensitive_data_policy) # 评估访问请求 context = { "user": { "id": "user123", "role": "senior_analyst", "clearance_level": 3 }, "resource": { "id": "report_456", "type": "financial_report", "sensitivity_level": 2, "owner": "user123" }, "action": "read", "environment": { "current_time": "14:30", "day_of_week": 2, # 周二 "client_ip": "192.168.1.100" } } result = abac.evaluate_access(context) return result
4. JWT与OAuth实现
4.1 JWT令牌管理
"""JWT令牌实现"""import jwtimport datetimefrom typing import Dict, Optional, Anyfrom cryptography.hazmat.primitives import serializationfrom cryptography.hazmat.primitives.asymmetric import rsaimport secretsclass JWTManager: """JWT令牌管理器""" def __init__(self, algorithm: str = "RS256"): self.algorithm = algorithm # 生成密钥对 self.private_key, self.public_key = self._generate_key_pair() # 令牌配置 self.access_token_expiry = datetime.timedelta(minutes=15) self.refresh_token_expiry = datetime.timedelta(days=7) def _generate_key_pair(self) -> tuple: """生成RSA密钥对""" private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048 ) public_key = private_key.public_key() # 序列化密钥 private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) return private_pem, public_pem def create_access_token(self, user_id: str, additional_claims: Dict[str, Any] = None) -> str: """创建访问令牌""" now = datetime.datetime.utcnow() expiry = now + self.access_token_expiry payload = { "sub": user_id, "iat": now, "exp": expiry, "type": "access", "jti": secrets.token_hex(16) # 唯一标识符 } # 添加额外声明 if additional_claims: payload.update(additional_claims) # 生成令牌 token = jwt.encode( payload, self.private_key, algorithm=self.algorithm ) return token def create_refresh_token(self, user_id: str) -> str: """创建刷新令牌""" now = datetime.datetime.utcnow() expiry = now + self.refresh_token_expiry payload = { "sub": user_id, "iat": now, "exp": expiry, "type": "refresh", "jti": secrets.token_hex(16) } token = jwt.encode( payload, self.private_key, algorithm=self.algorithm ) return token def verify_token(self, token: str, token_type: str = "access") -> Optional[Dict]: """验证令牌""" try: # 解码并验证令牌 payload = jwt.decode( token, self.public_key, algorithms=[self.algorithm], options={ "require": ["exp", "iat", "sub", "type", "jti"], "verify_exp": True, "verify_iat": True } ) # 验证令牌类型 if payload.get("type") != token_type: return None return payload except jwt.InvalidTokenError as e: # 记录错误但不泄露细节 print(f"Token validation failed: {type(e).__name__}") return None def refresh_access_token(self, refresh_token: str) -> Optional[tuple]: """使用刷新令牌获取新的访问令牌""" # 验证刷新令牌 payload = self.verify_token(refresh_token, "refresh") if not payload: return None user_id = payload["sub"] # 创建新的访问令牌 new_access_token = self.create_access_token(user_id) # 可选:创建新的刷新令牌(滑动过期) new_refresh_token = self.create_refresh_token(user_id) return new_access_token, new_refresh_tokenclass TokenBlacklist: """令牌黑名单管理""" def __init__(self, redis_client=None): self.redis = redis_client self.in_memory_blacklist = set() def add_to_blacklist(self, token_jti: str, expiry_timestamp: float): """将令牌加入黑名单""" if self.redis: # 使用Redis,自动过期 self.redis.setex( f"blacklist:{token_jti}", int(expiry_timestamp - datetime.datetime.utcnow().timestamp()), "1" ) else: # 内存存储(不适合生产环境) self.in_memory_blacklist.add(token_jti) def is_blacklisted(self, token_jti: str) -> bool: """检查令牌是否在黑名单中""" if self.redis: return self.redis.exists(f"blacklist:{token_jti}") == 1 else: return token_jti in self.in_memory_blacklist def revoke_user_tokens(self, user_id: str): """撤销用户的所有令牌""" # 在实际实现中,这可能需要记录到数据库 # 然后在验证时检查用户ID是否在撤销列表中 pass
4.2 OAuth 2.0客户端实现
"""OAuth 2.0客户端实现"""from typing import Dict, Optionalimport requestsfrom urllib.parse import urlencode, urlparse, parse_qsimport secretsclass OAuthClient: """OAuth 2.0客户端""" def __init__(self, client_id: str, client_secret: str, redirect_uri: str, auth_server_url: str): self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.auth_server_url = auth_server_url.rstrip('/') # 状态缓存(生产环境应使用数据库) self.state_store = {} def get_authorization_url(self, scope: str = "", state: str = None) -> str: """获取授权URL""" if state is None: state = secrets.token_urlsafe(16) # 保存状态用于验证 self.state_store[state] = { "created": datetime.datetime.now(), "used": False } params = { "response_type": "code", "client_id": self.client_id, "redirect_uri": self.redirect_uri, "scope": scope, "state": state } return f"{self.auth_server_url}/authorize?{urlencode(params)}" def handle_callback(self, code: str, state: str) -> Optional[Dict]: """处理授权回调""" # 验证state参数 if state not in self.state_store: return None state_info = self.state_store[state] # 检查state是否已使用或过期 if (state_info["used"] or (datetime.datetime.now() - state_info["created"]).seconds > 300): del self.state_store[state] return None # 标记为已使用 state_info["used"] = True # 交换令牌 token_data = self.exchange_code_for_token(code) return token_data def exchange_code_for_token(self, code: str) -> Optional[Dict]: """使用授权码交换访问令牌""" token_url = f"{self.auth_server_url}/token" data = { "grant_type": "authorization_code", "code": code, "redirect_uri": self.redirect_uri, "client_id": self.client_id, "client_secret": self.client_secret } headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json" } try: response = requests.post(token_url, data=data, headers=headers) response.raise_for_status() token_data = response.json() # 添加过期时间戳 if "expires_in" in token_data: expires_at = datetime.datetime.now() + datetime.timedelta( seconds=token_data["expires_in"] ) token_data["expires_at"] = expires_at.isoformat() return token_data except requests.RequestException as e: print(f"Token exchange failed: {e}") return None def refresh_token(self, refresh_token: str) -> Optional[Dict]: """刷新访问令牌""" refresh_url = f"{self.auth_server_url}/token" data = { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": self.client_id, "client_secret": self.client_secret } headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json" } try: response = requests.post(refresh_url, data=data, headers=headers) response.raise_for_status() return response.json() except requests.RequestException as e: print(f"Token refresh failed: {e}") return None def make_authenticated_request(self, url: str, access_token: str, method: str = "GET", **kwargs) -> Optional[requests.Response]: """使用访问令牌进行认证请求""" headers = kwargs.get("headers", {}) headers["Authorization"] = f"Bearer {access_token}" kwargs["headers"] = headers try: if method.upper() == "GET": response = requests.get(url, **kwargs) elif method.upper() == "POST": response = requests.post(url, **kwargs) elif method.upper() == "PUT": response = requests.put(url, **kwargs) elif method.upper() == "DELETE": response = requests.delete(url, **kwargs) else: raise ValueError(f"Unsupported method: {method}") response.raise_for_status() return response except requests.RequestException as e: print(f"Authenticated request failed: {e}") return Noneclass PKCEOAuthClient(OAuthClient): """PKCE增强的OAuth客户端""" def get_authorization_url(self, scope: str = "", state: str = None) -> str: """使用PKCE的授权URL""" if state is None: state = secrets.token_urlsafe(16) # 生成PKCE code_verifier和code_challenge code_verifier = secrets.token_urlsafe(64) code_challenge = self._create_code_challenge(code_verifier) # 保存状态和code_verifier self.state_store[state] = { "created": datetime.datetime.now(), "used": False, "code_verifier": code_verifier } params = { "response_type": "code", "client_id": self.client_id, "redirect_uri": self.redirect_uri, "scope": scope, "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256" } return f"{self.auth_server_url}/authorize?{urlencode(params)}" def exchange_code_for_token(self, code: str, state: str) -> Optional[Dict]: """使用PKCE交换令牌""" if state not in self.state_store: return None code_verifier = self.state_store[state].get("code_verifier") if not code_verifier: return None token_url = f"{self.auth_server_url}/token" data = { "grant_type": "authorization_code", "code": code, "redirect_uri": self.redirect_uri, "client_id": self.client_id, "code_verifier": code_verifier # 注意:PKCE不需要client_secret } headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json" } try: response = requests.post(token_url, data=data, headers=headers) response.raise_for_status() return response.json() except requests.RequestException as e: print(f"PKCE token exchange failed: {e}") return None def _create_code_challenge(self, code_verifier: str) -> str: """创建PKCE code_challenge""" import hashlib import base64 # 计算SHA256哈希 sha256_hash = hashlib.sha256(code_verifier.encode()).digest() # Base64 URL安全编码 code_challenge = base64.urlsafe_b64encode(sha256_hash).decode().rstrip('=') return code_challenge
5. 完整的安全Web应用示例
"""完整的Flask安全应用示例"""from flask import Flask, request, jsonify, session, redirect, url_forfrom flask_limiter import Limiterfrom flask_limiter.util import get_remote_addressfrom functools import wrapsimport loggingfrom typing import Callable, Any# 初始化应用app = Flask(__name__)app.config['SECRET_KEY'] = secrets.token_hex(32)app.config['SESSION_COOKIE_HTTPONLY'] = Trueapp.config['SESSION_COOKIE_SECURE'] = True # 生产环境启用app.config['PERMANENT_SESSION_LIFETIME'] = 3600 # 1小时# 速率限制limiter = Limiter( get_remote_address, app=app, default_limits=["200 per day", "50 per hour"], storage_uri="memory://",)# 初始化管理器auth_manager = AuthenticationManager()rbac_manager = RBACManager()jwt_manager = JWTManager()# 设置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def require_auth(f: Callable) -> Callable: """认证装饰器""" @wraps(f) def decorated(*args, **kwargs): # 从请求头获取令牌 auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid token'}), 401 token = auth_header.split(' ')[1] # 验证令牌 payload = jwt_manager.verify_token(token) if not payload: return jsonify({'error': 'Invalid or expired token'}), 401 # 将用户信息添加到请求上下文 request.user_id = payload['sub'] request.user_claims = payload return f(*args, **kwargs) return decorateddef require_permission(permission: Permission): """权限检查装饰器""" def decorator(f: Callable): @wraps(f) @require_auth def decorated(*args, **kwargs): # 获取用户角色(实际应从数据库获取) user_roles = get_user_roles(request.user_id) # 检查权限 if not rbac_manager.check_permission(user_roles, permission): return jsonify({'error': 'Insufficient permissions'}), 403 return f(*args, **kwargs) return decorated return decoratordef audit_log(action: str, resource: str = None): """审计日志装饰器""" def decorator(f: Callable): @wraps(f) def decorated(*args, **kwargs): # 执行函数 result = f(*args, **kwargs) # 记录审计日志 log_entry = { 'timestamp': datetime.datetime.utcnow().isoformat(), 'user_id': getattr(request, 'user_id', 'anonymous'), 'action': action, 'resource': resource or request.path, 'ip_address': request.remote_addr, 'user_agent': request.user_agent.string, 'status': 'success' if result[1] == 200 else 'failure' } logger.info(f"AUDIT: {log_entry}") return result return decorated return decorator@app.route('/api/login', methods=['POST'])@limiter.limit("5 per minute")@audit_log(action='login')def login(): """登录端点""" data = request.get_json() username = data.get('username') password = data.get('password') mfa_code = data.get('mfa_code') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 # 验证用户(实际应从数据库获取) user = get_user_by_username(username) if not user or not user.is_active: # 恒定时间响应防止用户枚举 bcrypt.hashpw(b"dummy", bcrypt.gensalt()) return jsonify({'error': 'Invalid credentials'}), 401 # 验证密码 if not auth_manager.verify_password(password, user.password_hash): # 记录失败尝试 record_failed_attempt(user.id) return jsonify({'error': 'Invalid credentials'}), 401 # 检查MFA(如果启用) if user.mfa_enabled and user.mfa_secret: if not mfa_code: return jsonify({'error': 'MFA code required', 'requires_mfa': True}), 401 mfa_manager = MFAManager() if not mfa_manager.verify_totp(user.mfa_secret, mfa_code): return jsonify({'error': 'Invalid MFA code'}), 401 # 重置失败计数 reset_failed_attempts(user.id) # 生成令牌 access_token = jwt_manager.create_access_token( user.id, additional_claims={ 'roles': get_user_roles(user.id), 'email': user.email } ) refresh_token = jwt_manager.create_refresh_token(user.id) # 更新最后登录时间 update_last_login(user.id) return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'token_type': 'bearer', 'expires_in': 900, # 15分钟 'user': user.to_dict() })@app.route('/api/protected', methods=['GET'])@require_auth@require_permission(Permission.USER_READ)@audit_log(action='access_protected', resource='protected_resource')def protected_resource(): """受保护的资源""" return jsonify({ 'message': 'Access granted', 'user_id': request.user_id, 'timestamp': datetime.datetime.utcnow().isoformat() })@app.route('/api/admin/users', methods=['GET'])@require_auth@require_permission(Permission.ADMIN_ACCESS)@audit_log(action='list_users', resource='admin_users')def list_users(): """管理员接口:列出所有用户""" users = get_all_users() # 过滤敏感信息 safe_users = [user.to_dict() for user in users] return jsonify({ 'users': safe_users, 'count': len(safe_users) })@app.route('/api/refresh', methods=['POST'])def refresh_token(): """刷新访问令牌""" data = request.get_json() refresh_token = data.get('refresh_token') if not refresh_token: return jsonify({'error': 'Refresh token required'}), 400 # 刷新令牌 result = jwt_manager.refresh_access_token(refresh_token) if not result: return jsonify({'error': 'Invalid refresh token'}), 401 new_access_token, new_refresh_token = result return jsonify({ 'access_token': new_access_token, 'refresh_token': new_refresh_token, 'token_type': 'bearer', 'expires_in': 900 })@app.route('/api/logout', methods=['POST'])@require_auth@audit_log(action='logout')def logout(): """注销端点""" # 获取当前令牌的JTI auth_header = request.headers.get('Authorization') if auth_header: token = auth_header.split(' ')[1] payload = jwt_manager.verify_token(token) if payload and 'jti' in payload: # 将令牌加入黑名单 blacklist = TokenBlacklist() blacklist.add_to_blacklist( payload['jti'], payload['exp'] ) return jsonify({'message': 'Logged out successfully'})# 辅助函数(实际实现需要数据库)def get_user_by_username(username: str) -> Optional[User]: """根据用户名获取用户""" # 这里应该查询数据库 return Nonedef get_user_roles(user_id: str) -> List[str]: """获取用户角色""" # 这里应该查询数据库 return ['user']def record_failed_attempt(user_id: str): """记录失败尝试""" passdef reset_failed_attempts(user_id: str): """重置失败计数""" passdef update_last_login(user_id: str): """更新最后登录时间""" passdef get_all_users() -> List[User]: """获取所有用户""" return []if __name__ == '__main__': # 生产环境应使用WSGI服务器 app.run( host='0.0.0.0', port=5000, debug=False, ssl_context='adhoc' )
总结
Python认证与授权机制是应用安全的核心,关键要点:
分层防御:结合多种认证因素和授权模型
最小权限:只授予必要的最小权限
安全默认值:默认配置应该是安全的
持续监控:实时监控和审计所有访问
防御深度:多层次安全防护,避免单点失效
实施路线图:
从基本认证开始,确保密码安全
实施合适的授权模型(RBAC/ABAC)
添加多因素认证增强安全
集成令牌管理和OAuth支持
建立完整的审计和监控体系
明天我们将进入Web安全基础的学习,这是保护Web应用免受攻击的关键知识。