当前位置:首页>python>一天一个Python知识点——Day 133:Python认证与授权机制

一天一个Python知识点——Day 133:Python认证与授权机制

  • 2026-02-07 02:22:30
一天一个Python知识点——Day 133:Python认证与授权机制

经过安全编程和漏洞防范的学习,今天我们聚焦于应用安全的核心——认证与授权机制,这是保护系统访问控制的关键防线。

1. 认证与授权基础概念

1.1 核心区别

概念
目的
实现方式
示例
认证 (Authentication)
验证用户身份
用户名/密码、多因素认证、生物识别
"你是谁?"
授权 (Authorization)
控制资源访问权限
角色、权限、访问控制列表
"你能做什么?"

1.2 Python认证授权生态系统

2. 完整的认证系统实现

2.1 用户模型与密码管理

"""安全的用户认证系统"""import bcryptimport secretsfrom datetime import datetime, timedeltafrom typing import OptionalDictListfrom dataclasses import dataclass, asdictfrom enum import Enumimport reclass PasswordStrength(Enum):    """密码强度级别"""    WEAK = 1    MEDIUM = 2    STRONG = 3@dataclassclass User:    """用户数据模型"""    idstr    username: str    email: str    password_hash: bytes    is_active: bool = True    is_locked: bool = False    failed_attempts: int = 0    last_login: Optional[datetime] = None    created_at: datetime = datetime.now()    mfa_enabled: bool = False    mfa_secret: Optional[str] = None    def to_dict(self) -> Dict:        """转换为字典,排除敏感信息"""        data = asdict(self)        # 移除敏感字段        data.pop('password_hash'None)        data.pop('mfa_secret'None)        # 转换datetime为字符串        for key, value in data.items():            if isinstance(value, datetime):                data[key] = value.isoformat()        return dataclass PasswordValidator:    """密码验证器"""    @staticmethod    def validate_strength(password: str) -> PasswordStrength:        """验证密码强度"""        score = 0        # 长度检查        if len(password) >= 12:            score += 1        # 字符多样性        if re.search(r'[a-z]', password):            score += 1        if re.search(r'[A-Z]', password):            score += 1        if re.search(r'\d', password):            score += 1        if re.search(r'[!@#$%^&*(),.?":{}|<>]', password):            score += 1        # 常见密码检查        common_passwords = {            'password''123456''qwerty''admin''welcome'        }        if password.lower() in common_passwords:            return PasswordStrength.WEAK        # 评估强度        if score >= 4:            return PasswordStrength.STRONG        elif score >= 3:            return PasswordStrength.MEDIUM        else:            return PasswordStrength.WEAK    @staticmethod    def is_breached(password: str, cache_file: str = 'breached_passwords.txt') -> bool:        """检查密码是否在已知泄露密码中"""        try:            with open(cache_file, 'r'as f:                breached_passwords = set(line.strip() for line in f)            # 简单实现:实际中应使用哈希值检查            return password in breached_passwords        except FileNotFoundError:            # 可在此处集成HaveIBeenPwned API            return Falseclass AuthenticationManager:    """认证管理器"""    def __init__(self, pepper: Optional[str] = None):        # 全局pepper(应存储在安全的地方)        self.pepper = pepper or secrets.token_hex(32)    def hash_password(self, password: str) -> bytes:        """安全地哈希密码"""        # 添加pepper增加安全性        peppered_password = password + self.pepper        # 使用bcrypt(推荐)        salt = bcrypt.gensalt(rounds=14)  # 适当的工作因子        password_hash = bcrypt.hashpw(peppered_password.encode(), salt)        return password_hash    def verify_password(self, password: str, password_hash: bytes) -> bool:        """验证密码(恒定时间比较)"""        # 恒定时间比较防止时序攻击        peppered_password = password + self.pepper        # 使用bcrypt的恒定时间比较        try:            return bcrypt.checkpw(peppered_password.encode(), password_hash)        except (ValueError, TypeError):            # 如果哈希格式无效,也返回False但保持恒定时间            bcrypt.hashpw(b"dummy", bcrypt.gensalt())  # 恒定时间操作            return False    def generate_session_token(self, user_id: str) -> str:        """生成安全的会话令牌"""        # 使用secrets模块生成密码学安全的令牌        token = secrets.token_urlsafe(64)        # 添加用户ID和过期时间的签名        import hashlib        import hmac        import json        payload = {            'user_id': user_id,            'token': token,            'expires': (datetime.now() + timedelta(hours=24)).isoformat()        }        # 创建签名        signature = hmac.new(            self.pepper.encode(),            json.dumps(payload, sort_keys=True).encode(),            hashlib.sha256        ).hexdigest()        # 返回组合令牌        return f"{token}.{signature}"    def validate_session_token(self, token: str) -> Optional[str]:        """验证会话令牌"""        if not token or '.' not in token:            return None        token_part, signature = token.rsplit('.'1)        # 实际实现中会解码payload并验证签名        # 这里简化处理        return token_part if len(signature) == 64 else None

2.2 多因素认证实现

"""多因素认证(MFA)实现"""import pyotpimport qrcodeimport base64from io import BytesIOfrom typing import TupleOptionalimport timeclass MFAManager:    """多因素认证管理器"""    def __init__(self):        self.totp_window = 1  # 允许的时间窗口    def setup_mfa(self, user_id: str, user_email: str) -> Tuple[strbytes]:        """设置MFA,返回密钥和QR码"""        # 生成随机密钥        secret = pyotp.random_base32()        # 创建TOTP对象        totp = pyotp.TOTP(secret)        # 生成配置URI        issuer = "MySecureApp"        uri = totp.provisioning_uri(            name=user_email,            issuer_name=issuer        )        # 生成QR码        qr = qrcode.QRCode(            version=1,            error_correction=qrcode.constants.ERROR_CORRECT_L,            box_size=10,            border=4,        )        qr.add_data(uri)        qr.make(fit=True)        img = qr.make_image(fill_color="black", back_color="white")        # 转换为bytes        img_bytes = BytesIO()        img.save(img_bytes, format='PNG')        img_bytes.seek(0)        return secret, img_bytes.getvalue()    def verify_totp(self, secret: str, code: str) -> bool:        """验证TOTP代码"""        totp = pyotp.TOTP(secret)        # 验证当前时间窗口        current_time = time.time()        return totp.verify(code, for_time=int(current_time), valid_window=self.totp_window)    def generate_backup_codes(self, count: int = 10) -> List[str]:        """生成备份代码"""        backup_codes = []        for _ in range(count):            # 生成易于输入的代码            code = f"{secrets.randbelow(10000):04d}-{secrets.randbelow(10000):04d}"            backup_codes.append(code)        # 安全存储这些代码的哈希值        return backup_codes    def verify_backup_code(self, provided_code: str, stored_hashes: List[str]) -> bool:        """验证备份代码"""        import hashlib        # 对提供的代码进行哈希        code_hash = hashlib.sha256(provided_code.encode()).hexdigest()        # 恒定时间比较        for stored_hash in stored_hashes:            if secrets.compare_digest(code_hash, stored_hash):                return True        return Falseclass RateLimitedMFAManager(MFAManager):    """带有速率限制的MFA管理器"""    def __init__(self):        super().__init__()        self.failed_attempts: Dict[strList[float]] = {}        self.max_attempts = 5        self.lockout_time = 300  # 5分钟    def verify_with_rate_limit(self, user_id: str, secret: str, code: str) -> Tuple[boolOptional[str]]:        """带速率限制的验证"""        # 检查是否被锁定        if self._is_locked(user_id):            lockout_until = self.failed_attempts[user_id][0] + self.lockout_time            remaining = lockout_until - time.time()            return Falsef"Account locked. Try again in {int(remaining)} seconds"        # 验证代码        is_valid = self.verify_totp(secret, code)        if not is_valid:            # 记录失败尝试            self._record_failure(user_id)            attempts = len(self.failed_attempts.get(user_id, []))            remaining_attempts = self.max_attempts - attempts            if remaining_attempts <= 0:                return False"Account locked due to too many failed attempts"            else:                return Falsef"Invalid code. {remaining_attempts} attempts remaining"        # 验证成功,清除失败记录        if user_id in self.failed_attempts:            del self.failed_attempts[user_id]        return TrueNone    def _record_failure(self, user_id: str):        """记录失败尝试"""        if user_id not in self.failed_attempts:            self.failed_attempts[user_id] = []        current_time = time.time()        self.failed_attempts[user_id].append(current_time)        # 清理过期的失败记录        cutoff = current_time - self.lockout_time        self.failed_attempts[user_id] = [            t for t in self.failed_attempts[user_id] if t > cutoff        ]    def _is_locked(self, user_id: str) -> bool:        """检查账户是否被锁定"""        if user_id not in self.failed_attempts:            return False        attempts = self.failed_attempts[user_id]        # 检查是否超过最大尝试次数        if len(attempts) >= self.max_attempts:            # 检查锁定期是否已过            oldest_attempt = attempts[0]            if time.time() - oldest_attempt < self.lockout_time:                return True        return False

3. 授权系统实现

3.1 基于角色的访问控制(RBAC)

"""基于角色的访问控制系统"""from enum import Enumfrom typing import SetListDictfrom dataclasses import dataclass, fieldimport jsonclass Permission(Enum):    """权限枚举"""    # 用户权限    USER_CREATE = "user:create"    USER_READ = "user:read"    USER_UPDATE = "user:update"    USER_DELETE = "user:delete"    # 文章权限    ARTICLE_CREATE = "article:create"    ARTICLE_READ = "article:read"    ARTICLE_UPDATE = "article:update"    ARTICLE_DELETE = "article:delete"    # 管理权限    ADMIN_ACCESS = "admin:access"    SYSTEM_CONFIG = "system:config"    AUDIT_LOG = "audit:log"@dataclassclass Role:    """角色定义"""    name: str    description: str    permissions: Set[Permission] = field(default_factory=set)    inherits: List[str] = field(default_factory=list)  # 继承的角色    def has_permission(self, permission: Permission) -> bool:        """检查是否拥有指定权限"""        return permission in self.permissions    def add_permission(self, permission: Permission):        """添加权限"""        self.permissions.add(permission)    def remove_permission(self, permission: Permission):        """移除权限"""        self.permissions.discard(permission)class RBACManager:    """RBAC管理器"""    def __init__(self):        self.roles: Dict[str, Role] = {}        self._initialize_default_roles()    def _initialize_default_roles(self):        """初始化默认角色"""        # 管理员角色        admin_role = Role(            name="admin",            description="系统管理员",            permissions={                Permission.USER_CREATE,                Permission.USER_READ,                Permission.USER_UPDATE,                Permission.USER_DELETE,                Permission.ARTICLE_CREATE,                Permission.ARTICLE_READ,                Permission.ARTICLE_UPDATE,                Permission.ARTICLE_DELETE,                Permission.ADMIN_ACCESS,                Permission.SYSTEM_CONFIG,                Permission.AUDIT_LOG,            }        )        # 编辑角色        editor_role = Role(            name="editor",            description="内容编辑",            permissions={                Permission.ARTICLE_CREATE,                Permission.ARTICLE_READ,                Permission.ARTICLE_UPDATE,                Permission.ARTICLE_DELETE,                Permission.USER_READ,            }        )        # 查看者角色        viewer_role = Role(            name="viewer",            description="内容查看者",            permissions={                Permission.ARTICLE_READ,                Permission.USER_READ,            }        )        # 注册角色        self.add_role(admin_role)        self.add_role(editor_role)        self.add_role(viewer_role)    def add_role(self, role: Role):        """添加角色"""        self.roles[role.name] = role    def get_user_permissions(self, user_roles: List[str]) -> Set[Permission]:        """获取用户的所有权限(考虑继承)"""        all_permissions = set()        visited_roles = set()        def collect_permissions(role_name: str):            """递归收集权限"""            if role_name in visited_roles:                return            visited_roles.add(role_name)            role = self.roles.get(role_name)            if role:                # 添加当前角色的权限                all_permissions.update(role.permissions)                # 递归处理继承的角色                for inherited_role in role.inherits:                    collect_permissions(inherited_role)        # 收集所有角色的权限        for role_name in user_roles:            collect_permissions(role_name)        return all_permissions    def check_permission(self, user_roles: List[str], permission: Permission) -> bool:        """检查用户是否拥有指定权限"""        user_permissions = self.get_user_permissions(user_roles)        return permission in user_permissions    def create_custom_role(self, name: str, description: str                          base_roles: List[str] = None,                          additional_permissions: List[Permission] = None) -> Role:        """创建自定义角色"""        custom_role = Role(name=name, description=description)        # 继承基础角色的权限        if base_roles:            for base_role in base_roles:                if base_role in self.roles:                    custom_role.permissions.update(self.roles[base_role].permissions)        # 添加额外权限        if additional_permissions:            for perm in additional_permissions:                custom_role.add_permission(perm)        # 记录继承关系        if base_roles:            custom_role.inherits.extend(base_roles)        self.add_role(custom_role)        return custom_roleclass AuthorizationMiddleware:    """授权中间件"""    def __init__(self, rbac_manager: RBACManager):        self.rbac = rbac_manager    def authorize(self, user_roles: List[str], required_permission: Permission,                 resource_owner: str = None, user_id: str = None) -> bool:        """授权检查"""        # 基本权限检查        if not self.rbac.check_permission(user_roles, required_permission):            return False        # 资源所有者检查(如果适用)        if resource_owner and user_id:            # 如果是资源所有者,可能拥有额外权限            if resource_owner == user_id:                # 检查是否有所有者权限                owner_permission = Permission(f"{required_permission.value}:own")                if owner_permission in self.rbac.get_user_permissions(user_roles):                    return True        return True    def filter_resources(self, user_roles: List[str], resources: List[Dict],                         required_permission: Permission) -> List[Dict]:        """过滤用户有权访问的资源"""        user_permissions = self.rbac.get_user_permissions(user_roles)        if required_permission not in user_permissions:            return []        # 在实际应用中,这里可能包含更复杂的逻辑        # 例如:基于资源属性或所有权的过滤        return resources

3.2 基于属性的访问控制(ABAC)

"""基于属性的访问控制系统"""from typing import AnyDictCallablefrom enum import Enumimport reclass AttributeType(Enum):    """属性类型"""    USER = "user"    RESOURCE = "resource"    ACTION = "action"    ENVIRONMENT = "environment"class PolicyCondition:    """策略条件"""    def __init__(self, attribute: str, operator: str, value: Any):        self.attribute = attribute  # 例如: user.role, resource.type        self.operator = operator    # eq, neq, gt, lt, in, contains, regex        self.value = value    def evaluate(self, context: Dict[strAny]) -> bool:        """评估条件"""        # 获取属性值        attr_value = self._get_attribute_value(self.attribute, context)        # 根据操作符评估        if self.operator == "eq":            return attr_value == self.value        elif self.operator == "neq":            return attr_value != self.value        elif self.operator == "gt":            return attr_value > self.value        elif self.operator == "lt":            return attr_value < self.value        elif self.operator == "in":            return attr_value in self.value        elif self.operator == "contains":            return self.value in attr_value        elif self.operator == "regex":            return bool(re.match(self.value, str(attr_value)))        else:            raise ValueError(f"Unknown operator: {self.operator}")    def _get_attribute_value(self, attribute_path: str, context: Dict) -> Any:        """从上下文中获取属性值"""        parts = attribute_path.split('.')        value = context        for part in parts:            if isinstance(value, dictand part in value:                value = value[part]            else:                # 尝试从不同命名空间获取                for namespace in ['user''resource''action''environment']:                    if namespace in context and isinstance(context[namespace], dict):                        if part in context[namespace]:                            value = context[namespace][part]                            break                else:                    return None        return valueclass ABACPolicy:    """ABAC策略"""    def __init__(self, name: str, effect: str = "allow"):        self.name = name        self.effect = effect  # allow 或 deny        self.conditions: List[PolicyCondition] = []    def add_condition(self, condition: PolicyCondition):        """添加条件"""        self.conditions.append(condition)    def evaluate(self, context: Dict[strAny]) -> bool:        """评估策略"""        # 所有条件都必须满足        for condition in self.conditions:            if not condition.evaluate(context):                return False        return Trueclass ABACManager:    """ABAC管理器"""    def __init__(self):        self.policies: List[ABACPolicy] = []    def add_policy(self, policy: ABACPolicy):        """添加策略"""        self.policies.append(policy)    def evaluate_access(self, context: Dict[strAny]) -> Dict[strAny]:        """评估访问请求"""        result = {            "allowed"False,            "denied"False,            "evaluated_policies": [],            "matching_policies": [],            "denying_policies": []        }        # 按特定顺序评估策略        # 通常:显式deny优先,然后显式allow        for policy in self.policies:            if policy.evaluate(context):                result["evaluated_policies"].append(policy.name)                if policy.effect == "deny":                    result["denied"] = True                    result["denying_policies"].append(policy.name)                elif policy.effect == "allow":                    result["matching_policies"].append(policy.name)        # 确定最终结果        if result["denied"]:            result["allowed"] = False        elif result["matching_policies"]:            result["allowed"] = True        return result    def create_time_based_policy(self, name: str, start_time: str                                end_time: str, days_of_week: List[int] = None) -> ABACPolicy:        """创建基于时间的策略"""        policy = ABACPolicy(name, effect="allow")        # 时间条件        time_condition = PolicyCondition(            attribute="environment.current_time",            operator="gt",            value=start_time        )        policy.add_condition(time_condition)        time_condition2 = PolicyCondition(            attribute="environment.current_time",            operator="lt",            value=end_time        )        policy.add_condition(time_condition2)        # 星期几条件(如果指定)        if days_of_week:            day_condition = PolicyCondition(                attribute="environment.day_of_week",                operator="in",                value=days_of_week            )            policy.add_condition(day_condition)        return policy    def create_location_based_policy(self, name: str, allowed_ips: List[str] = None,                                    allowed_countries: List[str] = None) -> ABACPolicy:        """创建基于位置的策略"""        policy = ABACPolicy(name, effect="allow")        # IP地址条件        if allowed_ips:            ip_condition = PolicyCondition(                attribute="environment.client_ip",                operator="in",                value=allowed_ips            )            policy.add_condition(ip_condition)        # 国家条件(需要IP地理位置服务)        if allowed_countries:            country_condition = PolicyCondition(                attribute="environment.country",                operator="in",                value=allowed_countries            )            policy.add_condition(country_condition)        return policy# 使用示例def demonstrate_abac():    """演示ABAC使用"""    abac = ABACManager()    # 创建时间限制策略:只在工作时间访问    work_hours_policy = abac.create_time_based_policy(        name="work_hours_only",        start_time="09:00",        end_time="17:00",        days_of_week=[12345]  # 周一到周五    )    # 创建位置限制策略:只允许公司IP访问    company_ip_policy = abac.create_location_based_policy(        name="company_network_only",        allowed_ips=["192.168.1.0/24""10.0.0.0/8"]    )    # 添加自定义策略:只有高级用户可以访问敏感数据    sensitive_data_policy = ABACPolicy("sensitive_data_access", effect="allow")    sensitive_data_policy.add_condition(PolicyCondition(        attribute="user.role",        operator="eq",        value="senior_analyst"    ))    sensitive_data_policy.add_condition(PolicyCondition(        attribute="resource.sensitivity_level",        operator="lte",  # 小于等于        value=user_clearance_level  # 从用户属性获取    ))    abac.add_policy(work_hours_policy)    abac.add_policy(company_ip_policy)    abac.add_policy(sensitive_data_policy)    # 评估访问请求    context = {        "user": {            "id""user123",            "role""senior_analyst",            "clearance_level"3        },        "resource": {            "id""report_456",            "type""financial_report",            "sensitivity_level"2,            "owner""user123"        },        "action""read",        "environment": {            "current_time""14:30",            "day_of_week"2,  # 周二            "client_ip""192.168.1.100"        }    }    result = abac.evaluate_access(context)    return result

4. JWT与OAuth实现

4.1 JWT令牌管理

"""JWT令牌实现"""import jwtimport datetimefrom typing import DictOptionalAnyfrom cryptography.hazmat.primitives import serializationfrom cryptography.hazmat.primitives.asymmetric import rsaimport secretsclass JWTManager:    """JWT令牌管理器"""    def __init__(self, algorithm: str = "RS256"):        self.algorithm = algorithm        # 生成密钥对        self.private_key, self.public_key = self._generate_key_pair()        # 令牌配置        self.access_token_expiry = datetime.timedelta(minutes=15)        self.refresh_token_expiry = datetime.timedelta(days=7)    def _generate_key_pair(self) -> tuple:        """生成RSA密钥对"""        private_key = rsa.generate_private_key(            public_exponent=65537,            key_size=2048        )        public_key = private_key.public_key()        # 序列化密钥        private_pem = private_key.private_bytes(            encoding=serialization.Encoding.PEM,            format=serialization.PrivateFormat.PKCS8,            encryption_algorithm=serialization.NoEncryption()        )        public_pem = public_key.public_bytes(            encoding=serialization.Encoding.PEM,            format=serialization.PublicFormat.SubjectPublicKeyInfo        )        return private_pem, public_pem    def create_access_token(self, user_id: str                           additional_claims: Dict[strAny] = None) -> str:        """创建访问令牌"""        now = datetime.datetime.utcnow()        expiry = now + self.access_token_expiry        payload = {            "sub": user_id,            "iat": now,            "exp": expiry,            "type""access",            "jti": secrets.token_hex(16)  # 唯一标识符        }        # 添加额外声明        if additional_claims:            payload.update(additional_claims)        # 生成令牌        token = jwt.encode(            payload,            self.private_key,            algorithm=self.algorithm        )        return token    def create_refresh_token(self, user_id: str) -> str:        """创建刷新令牌"""        now = datetime.datetime.utcnow()        expiry = now + self.refresh_token_expiry        payload = {            "sub": user_id,            "iat": now,            "exp": expiry,            "type""refresh",            "jti": secrets.token_hex(16)        }        token = jwt.encode(            payload,            self.private_key,            algorithm=self.algorithm        )        return token    def verify_token(self, token: str, token_type: str = "access") -> Optional[Dict]:        """验证令牌"""        try:            # 解码并验证令牌            payload = jwt.decode(                token,                self.public_key,                algorithms=[self.algorithm],                options={                    "require": ["exp""iat""sub""type""jti"],                    "verify_exp"True,                    "verify_iat"True                }            )            # 验证令牌类型            if payload.get("type") != token_type:                return None            return payload        except jwt.InvalidTokenError as e:            # 记录错误但不泄露细节            print(f"Token validation failed: {type(e).__name__}")            return None    def refresh_access_token(self, refresh_token: str) -> Optional[tuple]:        """使用刷新令牌获取新的访问令牌"""        # 验证刷新令牌        payload = self.verify_token(refresh_token, "refresh")        if not payload:            return None        user_id = payload["sub"]        # 创建新的访问令牌        new_access_token = self.create_access_token(user_id)        # 可选:创建新的刷新令牌(滑动过期)        new_refresh_token = self.create_refresh_token(user_id)        return new_access_token, new_refresh_tokenclass TokenBlacklist:    """令牌黑名单管理"""    def __init__(self, redis_client=None):        self.redis = redis_client        self.in_memory_blacklist = set()    def add_to_blacklist(self, token_jti: str, expiry_timestamp: float):        """将令牌加入黑名单"""        if self.redis:            # 使用Redis,自动过期            self.redis.setex(                f"blacklist:{token_jti}",                int(expiry_timestamp - datetime.datetime.utcnow().timestamp()),                "1"            )        else:            # 内存存储(不适合生产环境)            self.in_memory_blacklist.add(token_jti)    def is_blacklisted(self, token_jti: str) -> bool:        """检查令牌是否在黑名单中"""        if self.redis:            return self.redis.exists(f"blacklist:{token_jti}") == 1        else:            return token_jti in self.in_memory_blacklist    def revoke_user_tokens(self, user_id: str):        """撤销用户的所有令牌"""        # 在实际实现中,这可能需要记录到数据库        # 然后在验证时检查用户ID是否在撤销列表中        pass

4.2 OAuth 2.0客户端实现

"""OAuth 2.0客户端实现"""from typing import DictOptionalimport requestsfrom urllib.parse import urlencode, urlparse, parse_qsimport secretsclass OAuthClient:    """OAuth 2.0客户端"""    def __init__(self, client_id: str, client_secret: str                 redirect_uri: str, auth_server_url: str):        self.client_id = client_id        self.client_secret = client_secret        self.redirect_uri = redirect_uri        self.auth_server_url = auth_server_url.rstrip('/')        # 状态缓存(生产环境应使用数据库)        self.state_store = {}    def get_authorization_url(self, scope: str = "", state: str = None) -> str:        """获取授权URL"""        if state is None:            state = secrets.token_urlsafe(16)        # 保存状态用于验证        self.state_store[state] = {            "created": datetime.datetime.now(),            "used"False        }        params = {            "response_type""code",            "client_id"self.client_id,            "redirect_uri"self.redirect_uri,            "scope": scope,            "state": state        }        return f"{self.auth_server_url}/authorize?{urlencode(params)}"    def handle_callback(self, code: str, state: str) -> Optional[Dict]:        """处理授权回调"""        # 验证state参数        if state not in self.state_store:            return None        state_info = self.state_store[state]        # 检查state是否已使用或过期        if (state_info["used"or             (datetime.datetime.now() - state_info["created"]).seconds > 300):            del self.state_store[state]            return None        # 标记为已使用        state_info["used"] = True        # 交换令牌        token_data = self.exchange_code_for_token(code)        return token_data    def exchange_code_for_token(self, code: str) -> Optional[Dict]:        """使用授权码交换访问令牌"""        token_url = f"{self.auth_server_url}/token"        data = {            "grant_type""authorization_code",            "code": code,            "redirect_uri"self.redirect_uri,            "client_id"self.client_id,            "client_secret"self.client_secret        }        headers = {            "Content-Type""application/x-www-form-urlencoded",            "Accept""application/json"        }        try:            response = requests.post(token_url, data=data, headers=headers)            response.raise_for_status()            token_data = response.json()            # 添加过期时间戳            if "expires_in" in token_data:                expires_at = datetime.datetime.now() + datetime.timedelta(                    seconds=token_data["expires_in"]                )                token_data["expires_at"] = expires_at.isoformat()            return token_data        except requests.RequestException as e:            print(f"Token exchange failed: {e}")            return None    def refresh_token(self, refresh_token: str) -> Optional[Dict]:        """刷新访问令牌"""        refresh_url = f"{self.auth_server_url}/token"        data = {            "grant_type""refresh_token",            "refresh_token": refresh_token,            "client_id"self.client_id,            "client_secret"self.client_secret        }        headers = {            "Content-Type""application/x-www-form-urlencoded",            "Accept""application/json"        }        try:            response = requests.post(refresh_url, data=data, headers=headers)            response.raise_for_status()            return response.json()        except requests.RequestException as e:            print(f"Token refresh failed: {e}")            return None    def make_authenticated_request(self, url: str, access_token: str                                   method: str = "GET", **kwargs) -> Optional[requests.Response]:        """使用访问令牌进行认证请求"""        headers = kwargs.get("headers", {})        headers["Authorization"] = f"Bearer {access_token}"        kwargs["headers"] = headers        try:            if method.upper() == "GET":                response = requests.get(url, **kwargs)            elif method.upper() == "POST":                response = requests.post(url, **kwargs)            elif method.upper() == "PUT":                response = requests.put(url, **kwargs)            elif method.upper() == "DELETE":                response = requests.delete(url, **kwargs)            else:                raise ValueError(f"Unsupported method: {method}")            response.raise_for_status()            return response        except requests.RequestException as e:            print(f"Authenticated request failed: {e}")            return Noneclass PKCEOAuthClient(OAuthClient):    """PKCE增强的OAuth客户端"""    def get_authorization_url(self, scope: str = "", state: str = None) -> str:        """使用PKCE的授权URL"""        if state is None:            state = secrets.token_urlsafe(16)        # 生成PKCE code_verifier和code_challenge        code_verifier = secrets.token_urlsafe(64)        code_challenge = self._create_code_challenge(code_verifier)        # 保存状态和code_verifier        self.state_store[state] = {            "created": datetime.datetime.now(),            "used"False,            "code_verifier": code_verifier        }        params = {            "response_type""code",            "client_id"self.client_id,            "redirect_uri"self.redirect_uri,            "scope": scope,            "state": state,            "code_challenge": code_challenge,            "code_challenge_method""S256"        }        return f"{self.auth_server_url}/authorize?{urlencode(params)}"    def exchange_code_for_token(self, code: str, state: str) -> Optional[Dict]:        """使用PKCE交换令牌"""        if state not in self.state_store:            return None        code_verifier = self.state_store[state].get("code_verifier")        if not code_verifier:            return None        token_url = f"{self.auth_server_url}/token"        data = {            "grant_type""authorization_code",            "code": code,            "redirect_uri"self.redirect_uri,            "client_id"self.client_id,            "code_verifier": code_verifier            # 注意:PKCE不需要client_secret        }        headers = {            "Content-Type""application/x-www-form-urlencoded",            "Accept""application/json"        }        try:            response = requests.post(token_url, data=data, headers=headers)            response.raise_for_status()            return response.json()        except requests.RequestException as e:            print(f"PKCE token exchange failed: {e}")            return None    def _create_code_challenge(self, code_verifier: str) -> str:        """创建PKCE code_challenge"""        import hashlib        import base64        # 计算SHA256哈希        sha256_hash = hashlib.sha256(code_verifier.encode()).digest()        # Base64 URL安全编码        code_challenge = base64.urlsafe_b64encode(sha256_hash).decode().rstrip('=')        return code_challenge

5. 完整的安全Web应用示例

"""完整的Flask安全应用示例"""from flask import Flask, request, jsonify, session, redirect, url_forfrom flask_limiter import Limiterfrom flask_limiter.util import get_remote_addressfrom functools import wrapsimport loggingfrom typing import CallableAny# 初始化应用app = Flask(__name__)app.config['SECRET_KEY'] = secrets.token_hex(32)app.config['SESSION_COOKIE_HTTPONLY'] = Trueapp.config['SESSION_COOKIE_SECURE'] = True  # 生产环境启用app.config['PERMANENT_SESSION_LIFETIME'] = 3600  # 1小时# 速率限制limiter = Limiter(    get_remote_address,    app=app,    default_limits=["200 per day""50 per hour"],    storage_uri="memory://",)# 初始化管理器auth_manager = AuthenticationManager()rbac_manager = RBACManager()jwt_manager = JWTManager()# 设置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def require_auth(f: Callable) -> Callable:    """认证装饰器"""    @wraps(f)    def decorated(*args, **kwargs):        # 从请求头获取令牌        auth_header = request.headers.get('Authorization')        if not auth_header or not auth_header.startswith('Bearer '):            return jsonify({'error''Missing or invalid token'}), 401        token = auth_header.split(' ')[1]        # 验证令牌        payload = jwt_manager.verify_token(token)        if not payload:            return jsonify({'error''Invalid or expired token'}), 401        # 将用户信息添加到请求上下文        request.user_id = payload['sub']        request.user_claims = payload        return f(*args, **kwargs)    return decorateddef require_permission(permission: Permission):    """权限检查装饰器"""    def decorator(f: Callable):        @wraps(f)        @require_auth        def decorated(*args, **kwargs):            # 获取用户角色(实际应从数据库获取)            user_roles = get_user_roles(request.user_id)            # 检查权限            if not rbac_manager.check_permission(user_roles, permission):                return jsonify({'error''Insufficient permissions'}), 403            return f(*args, **kwargs)        return decorated    return decoratordef audit_log(action: str, resource: str = None):    """审计日志装饰器"""    def decorator(f: Callable):        @wraps(f)        def decorated(*args, **kwargs):            # 执行函数            result = f(*args, **kwargs)            # 记录审计日志            log_entry = {                'timestamp': datetime.datetime.utcnow().isoformat(),                'user_id'getattr(request, 'user_id''anonymous'),                'action': action,                'resource': resource or request.path,                'ip_address': request.remote_addr,                'user_agent': request.user_agent.string,                'status''success' if result[1] == 200 else 'failure'            }            logger.info(f"AUDIT: {log_entry}")            return result        return decorated    return decorator@app.route('/api/login', methods=['POST'])@limiter.limit("5 per minute")@audit_log(action='login')def login():    """登录端点"""    data = request.get_json()    username = data.get('username')    password = data.get('password')    mfa_code = data.get('mfa_code')    if not username or not password:        return jsonify({'error''Username and password required'}), 400    # 验证用户(实际应从数据库获取)    user = get_user_by_username(username)    if not user or not user.is_active:        # 恒定时间响应防止用户枚举        bcrypt.hashpw(b"dummy", bcrypt.gensalt())        return jsonify({'error''Invalid credentials'}), 401    # 验证密码    if not auth_manager.verify_password(password, user.password_hash):        # 记录失败尝试        record_failed_attempt(user.id)        return jsonify({'error''Invalid credentials'}), 401    # 检查MFA(如果启用)    if user.mfa_enabled and user.mfa_secret:        if not mfa_code:            return jsonify({'error''MFA code required''requires_mfa'True}), 401        mfa_manager = MFAManager()        if not mfa_manager.verify_totp(user.mfa_secret, mfa_code):            return jsonify({'error''Invalid MFA code'}), 401    # 重置失败计数    reset_failed_attempts(user.id)    # 生成令牌    access_token = jwt_manager.create_access_token(        user.id,        additional_claims={            'roles': get_user_roles(user.id),            'email': user.email        }    )    refresh_token = jwt_manager.create_refresh_token(user.id)    # 更新最后登录时间    update_last_login(user.id)    return jsonify({        'access_token': access_token,        'refresh_token': refresh_token,        'token_type''bearer',        'expires_in'900,  # 15分钟        'user': user.to_dict()    })@app.route('/api/protected', methods=['GET'])@require_auth@require_permission(Permission.USER_READ)@audit_log(action='access_protected', resource='protected_resource')def protected_resource():    """受保护的资源"""    return jsonify({        'message''Access granted',        'user_id': request.user_id,        'timestamp': datetime.datetime.utcnow().isoformat()    })@app.route('/api/admin/users', methods=['GET'])@require_auth@require_permission(Permission.ADMIN_ACCESS)@audit_log(action='list_users', resource='admin_users')def list_users():    """管理员接口:列出所有用户"""    users = get_all_users()    # 过滤敏感信息    safe_users = [user.to_dict() for user in users]    return jsonify({        'users': safe_users,        'count'len(safe_users)    })@app.route('/api/refresh', methods=['POST'])def refresh_token():    """刷新访问令牌"""    data = request.get_json()    refresh_token = data.get('refresh_token')    if not refresh_token:        return jsonify({'error''Refresh token required'}), 400    # 刷新令牌    result = jwt_manager.refresh_access_token(refresh_token)    if not result:        return jsonify({'error''Invalid refresh token'}), 401    new_access_token, new_refresh_token = result    return jsonify({        'access_token': new_access_token,        'refresh_token': new_refresh_token,        'token_type''bearer',        'expires_in'900    })@app.route('/api/logout', methods=['POST'])@require_auth@audit_log(action='logout')def logout():    """注销端点"""    # 获取当前令牌的JTI    auth_header = request.headers.get('Authorization')    if auth_header:        token = auth_header.split(' ')[1]        payload = jwt_manager.verify_token(token)        if payload and 'jti' in payload:            # 将令牌加入黑名单            blacklist = TokenBlacklist()            blacklist.add_to_blacklist(                payload['jti'],                payload['exp']            )    return jsonify({'message''Logged out successfully'})# 辅助函数(实际实现需要数据库)def get_user_by_username(username: str) -> Optional[User]:    """根据用户名获取用户"""    # 这里应该查询数据库    return Nonedef get_user_roles(user_id: str) -> List[str]:    """获取用户角色"""    # 这里应该查询数据库    return ['user']def record_failed_attempt(user_id: str):    """记录失败尝试"""    passdef reset_failed_attempts(user_id: str):    """重置失败计数"""    passdef update_last_login(user_id: str):    """更新最后登录时间"""    passdef get_all_users() -> List[User]:    """获取所有用户"""    return []if __name__ == '__main__':    # 生产环境应使用WSGI服务器    app.run(        host='0.0.0.0',        port=5000,        debug=False,        ssl_context='adhoc'    )

总结

Python认证与授权机制是应用安全的核心,关键要点:

  • 分层防御:结合多种认证因素和授权模型

  • 最小权限:只授予必要的最小权限

  • 安全默认值:默认配置应该是安全的

  • 持续监控:实时监控和审计所有访问

  • 防御深度:多层次安全防护,避免单点失效

实施路线图

  • 从基本认证开始,确保密码安全

  • 实施合适的授权模型(RBAC/ABAC)

  • 添加多因素认证增强安全

  • 集成令牌管理和OAuth支持

  • 建立完整的审计和监控体系

明天我们将进入Web安全基础的学习,这是保护Web应用免受攻击的关键知识。

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-02-08 19:35:53 HTTP/2.0 GET : https://f.mffb.com.cn/a/462856.html
  2. 运行时间 : 0.093005s [ 吞吐率:10.75req/s ] 内存消耗:5,280.70kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=da8c45fac0a5d456d67d60b6c7767aa7
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000509s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000812s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000336s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000304s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000537s ]
  6. SELECT * FROM `set` [ RunTime:0.000872s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000582s ]
  8. SELECT * FROM `article` WHERE `id` = 462856 LIMIT 1 [ RunTime:0.002066s ]
  9. UPDATE `article` SET `lasttime` = 1770550553 WHERE `id` = 462856 [ RunTime:0.001356s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000631s ]
  11. SELECT * FROM `article` WHERE `id` < 462856 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.000528s ]
  12. SELECT * FROM `article` WHERE `id` > 462856 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000599s ]
  13. SELECT * FROM `article` WHERE `id` < 462856 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.007590s ]
  14. SELECT * FROM `article` WHERE `id` < 462856 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.003229s ]
  15. SELECT * FROM `article` WHERE `id` < 462856 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.002705s ]
0.095642s