当前位置:首页>python>一天一个Python知识点——Day 136:Python HTTP协议实现

一天一个Python知识点——Day 136:Python HTTP协议实现

  • 2026-01-31 18:18:38
一天一个Python知识点——Day 136:Python HTTP协议实现

今天我们将深入HTTP协议的Python实现,这是构建Web应用、API服务和网络爬虫的基础,也是理解现代Web通信的核心。

1. HTTP协议基础

1.1 HTTP协议全景

"""HTTP协议核心概念与版本演变"""import http.clientfrom typing import DictListTuplefrom dataclasses import dataclassfrom enum import Enumclass HTTPVersion(Enum):    """HTTP版本"""    HTTP_0_9 = "HTTP/0.9"  # 1991年,只有GET方法    HTTP_1_0 = "HTTP/1.0"  # 1996年,增加方法、头部、状态码    HTTP_1_1 = "HTTP/1.1"  # 1997年,持久连接、管道化、分块传输    HTTP_2_0 = "HTTP/2.0"  # 2015年,二进制协议、多路复用、头部压缩    HTTP_3_0 = "HTTP/3.0"  # 2022年,基于QUIC、0-RTT连接class HTTPMethod(Enum):    """HTTP方法"""    GET = "GET"      # 获取资源    POST = "POST"    # 创建资源    PUT = "PUT"      # 更新资源    DELETE = "DELETE" # 删除资源    HEAD = "HEAD"    # 获取头部    OPTIONS = "OPTIONS" # 获取支持方法    PATCH = "PATCH"  # 部分更新    TRACE = "TRACE"  # 诊断    CONNECT = "CONNECT" # 隧道连接class HTTPStatusCategory(Enum):    """HTTP状态码分类"""    INFORMATIONAL = (100199"信息响应")    SUCCESS = (200299"成功响应")    REDIRECTION = (300399"重定向")    CLIENT_ERROR = (400499"客户端错误")    SERVER_ERROR = (500599"服务器错误")@dataclassclass HTTPStatus:    """HTTP状态码"""    code: int    phrase: str    description: str    @property    def category(self) -> HTTPStatusCategory:        """获取状态码分类"""        for category in HTTPStatusCategory:            start, end, _ = category.value            if start <= self.code <= end:                return category        return Noneclass HTTPProtocolAnalyzer:    """HTTP协议分析器"""    # 常见状态码    STATUS_CODES = {        100: HTTPStatus(100"Continue""继续"),        101: HTTPStatus(101"Switching Protocols""切换协议"),        200: HTTPStatus(200"OK""成功"),        201: HTTPStatus(201"Created""已创建"),        204: HTTPStatus(204"No Content""无内容"),        301: HTTPStatus(301"Moved Permanently""永久重定向"),        302: HTTPStatus(302"Found""临时重定向"),        304: HTTPStatus(304"Not Modified""未修改"),        400: HTTPStatus(400"Bad Request""错误请求"),        401: HTTPStatus(401"Unauthorized""未授权"),        403: HTTPStatus(403"Forbidden""禁止访问"),        404: HTTPStatus(404"Not Found""未找到"),        405: HTTPStatus(405"Method Not Allowed""方法不允许"),        500: HTTPStatus(500"Internal Server Error""内部服务器错误"),        502: HTTPStatus(502"Bad Gateway""错误网关"),        503: HTTPStatus(503"Service Unavailable""服务不可用"),    }    @staticmethod    def compare_http_versions() -> List[Dict]:        """比较HTTP版本特性"""        versions = [            {                "version""HTTP/1.0",                "year"1996,                "key_features": [                    "每个请求/响应独立的TCP连接",                    "添加头部字段",                    "状态码",                    "支持多种方法"                ],                "limitations": [                    "连接开销大",                    "队头阻塞",                    "无主机头支持"                ]            },            {                "version""HTTP/1.1",                "year"1997,                "key_features": [                    "持久连接",                    "管道化",                    "分块传输编码",                    "主机头支持",                    "缓存控制"                ],                "limitations": [                    "队头阻塞未完全解决",                    "头部重复传输"                ]            },            {                "version""HTTP/2",                "year"2015,                "key_features": [                    "二进制协议",                    "多路复用",                    "头部压缩(HPACK)",                    "服务器推送",                    "流优先级"                ],                "limitations": [                    "仍基于TCP",                    "TCP队头阻塞"                ]            },            {                "version""HTTP/3",                "year"2022,                "key_features": [                    "基于QUIC/UDP",                    "0-RTT连接建立",                    "改进的多路复用",                    "更好的丢包处理",                    "连接迁移"                ],                "limitations": [                    "部署复杂度",                    "防火墙兼容性"                ]            }        ]        return versions    @staticmethod    def parse_http_message(raw_message: str) -> Tuple[strDictstr]:        """解析HTTP消息"""        lines = raw_message.strip().split('\r\n')        # 解析起始行        start_line = lines[0]        # 解析头部        headers = {}        body_start = 0        for i, line in enumerate(lines[1:], 1):            if not line.strip():  # 空行表示头部结束                body_start = i + 1                break            if ': ' in line:                key, value = line.split(': '1)                headers[key] = value        # 解析消息体        body = '\r\n'.join(lines[body_start:]) if body_start < len(lines) else ''        return start_line, headers, body    @staticmethod    def validate_headers(headers: Dict) -> List[str]:        """验证HTTP头部"""        warnings = []        # 检查必需头部        required_headers = ['Host']  # HTTP/1.1要求Host头部        for header in required_headers:            if header not in headers:                warnings.append(f"缺少必需头部: {header}")        # 检查头部格式        for key, value in headers.items():            if '\r' in key or '\n' in key:                warnings.append(f"无效头部键: {key}")            if '\r' in value or '\n' in value:                warnings.append(f"头部值包含换行: {key}")        return warnings

2. 完整HTTP服务器实现

2.1 支持HTTP/1.1的完整服务器

"""完整的HTTP/1.1服务器实现"""import socketimport threadingimport mimetypesimport osimport timeimport jsonimport zlibfrom pathlib import Pathfrom datetime import datetime, timezonefrom typing import OptionalDictListTupleCallablefrom urllib.parse import urlparse, parse_qs, quote, unquotefrom email.utils import formatdate, parsedateimport hashlibimport base64class HTTPRequest:    """HTTP请求解析"""    def __init__(self, raw_request: bytes, client_address: tuple):        self.raw = raw_request        self.client_ip, self.client_port = client_address        self.method = ""        self.path = ""        self.query_params = {}        self.version = "HTTP/1.0"        self.headers = {}        self.body = b""        self.cookies = {}        self._parse()    def _parse(self):        """解析HTTP请求"""        try:            # 解码请求            request_text = self.raw.decode('utf-8', errors='ignore')            lines = request_text.split('\r\n')            if not lines:                raise ValueError("空请求")            # 解析请求行            request_line = lines[0]            parts = request_line.split(' '2)            if len(parts) != 3:                raise ValueError(f"无效的请求行: {request_line}")            self.method, full_path, self.version = parts            # 解析路径和查询参数            parsed_url = urlparse(full_path)            self.path = unquote(parsed_url.path) if parsed_url.path else '/'            self.query_params = parse_qs(parsed_url.query)            # 解析头部            header_end = 0            for i, line in enumerate(lines[1:], 1):                if not line.strip():                    header_end = i                    break                if ': ' in line:                    key, value = line.split(': '1)                    self.headers[key] = value            # 解析cookies            if 'Cookie' in self.headers:                self._parse_cookies(self.headers['Cookie'])            # 解析消息体            if header_end > 0 and header_end < len(lines):                body_lines = lines[header_end + 1:]                self.body = '\r\n'.join(body_lines).encode('utf-8')                # 处理分块传输编码                if self.headers.get('Transfer-Encoding''').lower() == 'chunked':                    self._decode_chunked_body()        except Exception as e:            raise ValueError(f"解析HTTP请求失败: {e}")    def _parse_cookies(self, cookie_header: str):        """解析Cookie头部"""        for cookie_pair in cookie_header.split(';'):            cookie_pair = cookie_pair.strip()            if '=' in cookie_pair:                key, value = cookie_pair.split('='1)                self.cookies[key.strip()] = value.strip()    def _decode_chunked_body(self):        """解码分块传输编码的正文"""        try:            data = self.body.decode('utf-8', errors='ignore')            chunks = []            pos = 0            while pos < len(data):                # 查找块大小行                chunk_size_end = data.find('\r\n', pos)                if chunk_size_end == -1:                    break                # 解析块大小(十六进制)                chunk_size_str = data[pos:chunk_size_end].strip()                chunk_size = int(chunk_size_str, 16)                # 块大小为0表示结束                if chunk_size == 0:                    break                # 获取块数据                chunk_start = chunk_size_end + 2                chunk_end = chunk_start + chunk_size                if chunk_end > len(data):                    break                chunks.append(data[chunk_start:chunk_end])                pos = chunk_end + 2  # 跳过CRLF            self.body = ''.join(chunks).encode('utf-8')        except Exception as e:            raise ValueError(f"解码分块正文失败: {e}")    def get_header(self, key: str, default: str = None) -> Optional[str]:        """获取头部值(不区分大小写)"""        key_lower = key.lower()        for k, v in self.headers.items():            if k.lower() == key_lower:                return v        return default    def get_content_type(self) -> str:        """获取内容类型"""        content_type = self.get_header('Content-Type''')        # 移除参数部分        return content_type.split(';')[0].strip()    def is_keep_alive(self) -> bool:        """检查是否保持连接"""        connection = self.get_header('Connection''').lower()        if self.version == 'HTTP/1.1':            return connection != 'close'        else:  # HTTP/1.0            return connection == 'keep-alive'    def get_basic_auth(self) -> Optional[Tuple[strstr]]:        """解析Basic认证"""        auth_header = self.get_header('Authorization')        if auth_header and auth_header.startswith('Basic '):            try:                encoded = auth_header[6:].strip()                decoded = base64.b64decode(encoded).decode('utf-8')                if ':' in decoded:                    username, password = decoded.split(':'1)                    return username, password            except:                pass        return None    def __str__(self) -> str:        """字符串表示"""        return f"{self.method}{self.path}{self.version}"class HTTPResponse:    """HTTP响应构建"""    def __init__(self, status_code: int = 200, content: bytes = b"",                 content_type: str = "text/plain; charset=utf-8"):        self.status_code = status_code        self.headers = {            'Server''PythonHTTPServer/1.0',            'Date': formatdate(timeval=None, localtime=False, usegmt=True),            'Content-Type': content_type,        }        self.content = content        self.cookies = {}        self.version = "HTTP/1.1"    def set_header(self, key: str, value: str):        """设置响应头"""        self.headers[key] = value    def add_cookie(self, name: str, value: str, **kwargs):        """添加Cookie"""        cookie_parts = [f"{name}={value}"]        # Cookie属性        if 'expires' in kwargs:            if isinstance(kwargs['expires'], (intfloat)):                expires_time = kwargs['expires']                expires_date = datetime.fromtimestamp(expires_time, tz=timezone.utc)                cookie_parts.append(f"Expires={expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT')}")            else:                cookie_parts.append(f"Expires={kwargs['expires']}")        if 'max_age' in kwargs:            cookie_parts.append(f"Max-Age={kwargs['max_age']}")        if 'domain' in kwargs:            cookie_parts.append(f"Domain={kwargs['domain']}")        if 'path' in kwargs:            cookie_parts.append(f"Path={kwargs['path']}")        if kwargs.get('secure'False):            cookie_parts.append("Secure")        if kwargs.get('http_only'False):            cookie_parts.append("HttpOnly")        if 'same_site' in kwargs:            cookie_parts.append(f"SameSite={kwargs['same_site']}")        self.cookies[name] = '; '.join(cookie_parts)    def delete_cookie(self, name: str, path: str = "/", domain: str = None):        """删除Cookie"""        self.add_cookie(            name, "",            expires=0,            max_age=0,            path=path,            domain=domain,            http_only=True        )    def redirect(self, location: str, status_code: int = 302):        """重定向响应"""        self.status_code = status_code        self.set_header('Location', location)        self.content = f"Redirecting to {location}".encode('utf-8')    def to_bytes(self) -> bytes:        """转换为字节"""        # 构建状态行        status_phrase = self._get_status_phrase(self.status_code)        status_line = f"{self.version}{self.status_code}{status_phrase}\r\n"        # 设置内容长度        if 'Content-Length' not in self.headers:            self.headers['Content-Length'] = str(len(self.content))        # 添加cookies到头部        for cookie_value in self.cookies.values():            self.headers.setdefault('Set-Cookie', []).append(cookie_value)        # 构建头部        headers = []        for key, value in self.headers.items():            if isinstance(value, list):                for v in value:                    headers.append(f"{key}{v}")            else:                headers.append(f"{key}{value}")        headers_text = '\r\n'.join(headers)        # 构建完整响应        response_text = f"{status_line}{headers_text}\r\n\r\n"        response_bytes = response_text.encode('utf-8') + self.content        return response_bytes    def compress(self, method: str = 'gzip') -> bool:        """压缩响应内容"""        if not self.content:            return False        try:            if method == 'gzip':                compressed = zlib.compressobj(                    zlib.Z_DEFAULT_COMPRESSION,                    zlib.DEFLATED,                    zlib.MAX_WBITS | 16  # gzip格式                )                compressed_data = compressed.compress(self.content)                compressed_data += compressed.flush()                self.content = compressed_data                self.set_header('Content-Encoding''gzip')                self.set_header('Content-Length'str(len(compressed_data)))                return True            elif method == 'deflate':                compressed = zlib.compressobj(                    zlib.Z_DEFAULT_COMPRESSION,                    zlib.DEFLATED,                    -zlib.MAX_WBITS  # deflate格式                )                compressed_data = compressed.compress(self.content)                compressed_data += compressed.flush()                self.content = compressed_data                self.set_header('Content-Encoding''deflate')                self.set_header('Content-Length'str(len(compressed_data)))                return True        except Exception as e:            print(f"压缩失败: {e}")            return False    @staticmethod    def _get_status_phrase(status_code: int) -> str:        """获取状态短语"""        status_map = {            200"OK",            201"Created",            204"No Content",            301"Moved Permanently",            302"Found",            304"Not Modified",            400"Bad Request",            401"Unauthorized",            403"Forbidden",            404"Not Found",            405"Method Not Allowed",            413"Payload Too Large",            500"Internal Server Error",            501"Not Implemented",            502"Bad Gateway",            503"Service Unavailable",        }        return status_map.get(status_code, "Unknown Status")    @classmethod    def make_error_response(cls, status_code: int, message: str = None) -> 'HTTPResponse':        """创建错误响应"""        if not message:            message = cls._get_status_phrase(status_code)        html = f"""        <!DOCTYPE html>        <html>        <head>            <title>{status_code} {message}</title>            <style>                body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}                h1 {{ color: #333; }}                .error {{ color: #d32f2f; font-size: 72px; margin-bottom: 20px; }}                .message {{ color: #666; font-size: 18px; }}                .details {{ color: #999; font-size: 14px; margin-top: 30px; }}            </style>        </head>        <body>            <div class="error">{status_code}</div>            <h1>{message}</h1>            <div class="message">抱歉,发生了错误</div>            <div class="details">PythonHTTPServer</div>        </body>        </html>        """        response = cls(            status_code=status_code,            content=html.encode('utf-8'),            content_type="text/html; charset=utf-8"        )        return responseclass HTTPRoute:    """HTTP路由"""    def __init__(self, method: str, path: str, handler: Callable, name: str = None):        self.method = method.upper()        self.path = path        self.handler = handler        self.name = name        self.pattern = self._compile_pattern(path)    def _compile_pattern(self, path: str):        """编译路径模式"""        import re        # 转换路径参数,如 /users/{id} -> /users/(?P<id>[^/]+)        pattern = re.sub(r'\{(\w+)\}'r'(?P<\1>[^/]+)', path)        pattern = f"^{pattern}$"        return re.compile(pattern)    def match(self, method: str, request_path: str) -> Optional[dict]:        """匹配路由"""        if self.method != method.upper():            return None        match = self.pattern.match(request_path)        if match:            return match.groupdict()        return Noneclass HTTPServer:    """完整的HTTP/1.1服务器"""    def __init__(self, host: str = 'localhost', port: int = 8080,                 web_root: str = '.', max_workers: int = 10):        self.host = host        self.port = port        self.web_root = Path(web_root).resolve()        self.server_socket = None        self.is_running = False        self.routes = []        self.middlewares = []        # 连接管理        self.connections = {}        self.max_keep_alive = 100  # 最大保持连接数        self.keep_alive_timeout = 5  # 保持连接超时(秒)        # 线程池        from concurrent.futures import ThreadPoolExecutor        self.executor = ThreadPoolExecutor(max_workers=max_workers)        # 初始化MIME类型        mimetypes.init()        # 注册默认路由        self._register_default_routes()    def _register_default_routes(self):        """注册默认路由"""        # 静态文件服务        self.add_route('GET''/'self._handle_index)        self.add_route('GET''/static/{file_path:path}'self._handle_static_file)        # API端点        self.add_route('GET''/api/status'self._handle_api_status)        self.add_route('GET''/api/echo'self._handle_api_echo)        self.add_route('POST''/api/echo'self._handle_api_echo)        self.add_route('GET''/api/time'self._handle_api_time)        # 表单处理        self.add_route('GET''/form'self._handle_form_get)        self.add_route('POST''/form'self._handle_form_post)    def add_route(self, method: str, path: str, handler: Callable, name: str = None):        """添加路由"""        route = HTTPRoute(method, path, handler, name)        self.routes.append(route)    def add_middleware(self, middleware: Callable):        """添加中间件"""        self.middlewares.append(middleware)    def start(self):        """启动服务器"""        try:            # 创建服务器socket            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)            # 设置非阻塞(用于select)            self.server_socket.setblocking(False)            # 绑定地址            self.server_socket.bind((self.host, self.port))            self.server_socket.listen(128)  # 较大的backlog            print(f"HTTP服务器启动在 http://{self.host}:{self.port}")            print(f"文档根目录: {self.web_root}")            print("按 Ctrl+C 停止服务器")            self.is_running = True            # 使用select处理多个连接            import select            inputs = [self.server_socket]            outputs = []            message_queues = {}            while self.is_running:                try:                    # 等待socket事件                    readable, writable, exceptional = select.select(                        inputs, outputs, inputs, 1.0                    )                    # 处理可读socket                    for s in readable:                        if s is self.server_socket:                            # 新连接                            self._handle_new_connection(s, inputs)                        else:                            # 客户端数据到达                            self._handle_client_data(s, inputs, outputs, message_queues)                    # 处理可写socket                    for s in writable:                        self._handle_writable_socket(s, outputs, message_queues)                    # 处理异常socket                    for s in exceptional:                        self._handle_exceptional_socket(s, inputs, outputs, message_queues)                    # 清理超时的keep-alive连接                    self._cleanup_idle_connections()                except KeyboardInterrupt:                    print("\n收到中断信号")                    break                except Exception as e:                    print(f"服务器循环错误: {e}")                    if self.is_running:                        continue                    else:                        break        except Exception as e:            print(f"服务器启动失败: {e}")        finally:            self.stop()    def _handle_new_connection(self, server_socket, inputs):        """处理新连接"""        try:            client_socket, client_address = server_socket.accept()            client_socket.setblocking(False)            # 记录连接时间            self.connections[client_socket] = {                'address': client_address,                'last_activity': time.time(),                'keep_alive'False,                'requests_processed'0            }            inputs.append(client_socket)            print(f"新连接: {client_address}")        except Exception as e:            print(f"接受连接失败: {e}")    def _handle_client_data(self, client_socket, inputs, outputs, message_queues):        """处理客户端数据"""        try:            data = client_socket.recv(4096)            if data:                # 更新活动时间                if client_socket in self.connections:                    self.connections[client_socket]['last_activity'] = time.time()                # 将请求提交给线程池处理                future = self.executor.submit(                    self._process_request, client_socket, data                )                # 将响应future放入队列                message_queues[client_socket] = future                if client_socket not in outputs:                    outputs.append(client_socket)            else:                # 客户端关闭连接                self._cleanup_socket(client_socket, inputs, outputs, message_queues)        except Exception as e:            print(f"接收数据失败: {e}")            self._cleanup_socket(client_socket, inputs, outputs, message_queues)    def _process_request(self, client_socket, request_data):        """处理请求(在线程池中执行)"""        try:            # 获取连接信息            conn_info = self.connections.get(client_socket, {})            client_address = conn_info.get('address', ('unknown'0))            # 解析请求            request = HTTPRequest(request_data, client_address)            # 应用中间件            for middleware in self.middlewares:                result = middleware(request)                if result is not None:  # 中间件可以中断请求处理                    return result            # 查找匹配的路由            handler, path_params = self._find_route_handler(request)            if handler:                # 执行处理器                response = handler(request, **path_params)            else:                # 尝试作为静态文件处理                response = self._handle_static_file(request)            # 更新连接信息            if client_socket in self.connections:                self.connections[client_socket]['requests_processed'] += 1                self.connections[client_socket]['keep_alive'] = request.is_keep_alive()            return response.to_bytes()        except Exception as e:            print(f"处理请求失败: {e}")            error_response = HTTPResponse.make_error_response(500str(e))            return error_response.to_bytes()    def _find_route_handler(self, request: HTTPRequest) -> Tuple[Optional[Callable], dict]:        """查找路由处理器"""        for route in self.routes:            path_params = route.match(request.method, request.path)            if path_params is not None:                return route.handler, path_params        return None, {}    def _handle_writable_socket(self, client_socket, outputs, message_queues):        """处理可写socket"""        try:            future = message_queues.get(client_socket)            if future and future.done():                try:                    response_data = future.result()                    client_socket.sendall(response_data)                except Exception as e:                    print(f"发送响应失败: {e}")                finally:                    # 从输出列表中移除                    if client_socket in outputs:                        outputs.remove(client_socket)                    # 清理队列                    if client_socket in message_queues:                        del message_queues[client_socket]                    # 如果不是keep-alive,关闭连接                    conn_info = self.connections.get(client_socket, {})                    if not conn_info.get('keep_alive'False):                        self._cleanup_socket(client_socket, NoneNoneNone)        except Exception as e:            print(f"处理可写socket失败: {e}")            self._cleanup_socket(client_socket, None, outputs, message_queues)    def _handle_exceptional_socket(self, client_socket, inputs, outputs, message_queues):        """处理异常socket"""        print(f"socket异常: {client_socket}")        self._cleanup_socket(client_socket, inputs, outputs, message_queues)    def _cleanup_socket(self, client_socket, inputs, outputs, message_queues):        """清理socket"""        if inputs and client_socket in inputs:            inputs.remove(client_socket)        if outputs and client_socket in outputs:            outputs.remove(client_socket)        if message_queues and client_socket in message_queues:            del message_queues[client_socket]        if client_socket in self.connections:            conn_info = self.connections[client_socket]            print(f"连接关闭: {conn_info['address']}, "                  f"处理请求数: {conn_info['requests_processed']}")            del self.connections[client_socket]        try:            client_socket.close()        except:            pass    def _cleanup_idle_connections(self):        """清理空闲连接"""        current_time = time.time()        idle_sockets = []        for sock, info in self.connections.items():            idle_time = current_time - info['last_activity']            if idle_time > self.keep_alive_timeout:                idle_sockets.append(sock)        for sock in idle_sockets:            print(f"清理空闲连接: {self.connections.get(sock, {}).get('address')}")            try:                sock.close()            except:                pass            if sock in self.connections:                del self.connections[sock]    # 默认路由处理器    def _handle_index(self, request: HTTPRequest) -> HTTPResponse:        """处理首页"""        html = """        <!DOCTYPE html>        <html>        <head>            <title>Python HTTP Server</title>            <style>                body { font-family: Arial, sans-serif; margin: 40px; }                h1 { color: #333; }                .card { background: #f5f5f5; padding: 20px; margin: 20px 0; border-radius: 5px; }                .endpoints { display: grid; grid-template-columns: repeat(auto-fill, minmax(300px, 1fr)); gap: 20px; }                .endpoint { background: white; padding: 15px; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }                .method { display: inline-block; padding: 3px 8px; border-radius: 3px; font-weight: bold; margin-right: 10px; }                .get { background: #e3f2fd; color: #1565c0; }                .post { background: #e8f5e9; color: #2e7d32; }                a { color: #1976d2; text-decoration: none; }                a:hover { text-decoration: underline; }            </style>        </head>        <body>            <h1>Python HTTP Server</h1>            <div class="card">                <p>这是一个完整的HTTP/1.1服务器实现,支持持久连接、路由、中间件等特性。</p>            </div>            <h2>可用端点</h2>            <div class="endpoints">                <div class="endpoint">                    <span class="method get">GET</span>                    <a href="/static/test.txt">/static/{file_path}</a>                    <p>静态文件服务</p>                </div>                <div class="endpoint">                    <span class="method get">GET</span>                    <a href="/api/status">/api/status</a>                    <p>服务器状态</p>                </div>                <div class="endpoint">                    <span class="method get">GET</span>                    <span class="method post">POST</span>                    <a href="/api/echo">/api/echo</a>                    <p>回显请求内容</p>                </div>                <div class="endpoint">                    <span class="method get">GET</span>                    <a href="/api/time">/api/time</a>                    <p>服务器时间</p>                </div>                <div class="endpoint">                    <span class="method get">GET</span>                    <a href="/form">/form</a>                    <p>表单测试</p>                </div>            </div>            <h2>请求信息</h2>            <div class="card">                <p><strong>客户端IP:</strong> """ + request.client_ip + """</p>                <p><strong>请求方法:</strong> """ + request.method + """</p>                <p><strong>请求路径:</strong> """ + request.path + """</p>                <p><strong>HTTP版本:</strong> """ + request.version + """</p>                <p><strong>保持连接:</strong> """ + str(request.is_keep_alive()) + """</p>            </div>        </body>        </html>        """        response = HTTPResponse(            content=html.encode('utf-8'),            content_type="text/html; charset=utf-8"        )        # 设置缓存控制        response.set_header('Cache-Control''public, max-age=3600')        return response    def _handle_static_file(self, request: HTTPRequest, file_path: str = "") -> HTTPResponse:        """处理静态文件"""        # 安全性检查:防止路径遍历        if file_path:            # 从路由参数获取文件路径            safe_path = Path(unquote(file_path))        else:            # 从请求路径获取(兼容旧方式)            safe_path = Path(unquote(request.path.lstrip('/')))        # 确保路径在web根目录内        full_path = (self.web_root / safe_path).resolve()        if not str(full_path).startswith(str(self.web_root)):            return HTTPResponse.make_error_response(403"禁止访问")        # 检查文件是否存在        if not full_path.exists():            return HTTPResponse.make_error_response(404"文件未找到")        # 如果是目录,寻找index.html        if full_path.is_dir():            index_file = full_path / 'index.html'            if index_file.exists():                full_path = index_file            else:                # 列出目录内容                return self._list_directory(full_path, safe_path)        # 检查是否是文件        if not full_path.is_file():            return HTTPResponse.make_error_response(403"不是文件")        try:            # 读取文件            with open(full_path, 'rb'as f:                content = f.read()            # 获取MIME类型            mime_type, encoding = mimetypes.guess_type(str(full_path))            if not mime_type:                mime_type = 'application/octet-stream'            # 构建响应            response = HTTPResponse(                content=content,                content_type=mime_type + (f'; charset={encoding}' if encoding else '')            )            # 设置缓存和压缩            self._set_file_headers(response, full_path, request)            return response        except Exception as e:            print(f"读取文件失败: {e}")            return HTTPResponse.make_error_response(500f"读取文件失败: {e}")    def _set_file_headers(self, response: HTTPResponse, file_path: Path, request: HTTPRequest):        """设置文件相关头部"""        # ETag(实体标签)用于缓存验证        file_stat = file_path.stat()        etag = hashlib.md5(f"{file_path}-{file_stat.st_mtime}".encode()).hexdigest()        response.set_header('ETag'f'"{etag}"')        # Last-Modified        last_modified = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc)        response.set_header('Last-Modified', last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT'))        # 检查条件请求        if_none_match = request.get_header('If-None-Match')        if_modified_since = request.get_header('If-Modified-Since')        if if_none_match and f'"{etag}"' in if_none_match:            # ETag匹配,返回304 Not Modified            response.status_code = 304            response.content = b""            response.set_header('Content-Length''0')            return        if if_modified_since:            try:                if_modified_date = parsedate(if_modified_since)                if if_modified_date:                    if_modified_time = datetime(*if_modified_date[:6], tzinfo=timezone.utc)                    if last_modified <= if_modified_time:                        response.status_code = 304                        response.content = b""                        response.set_header('Content-Length''0')                        return            except:                pass        # 缓存控制        cache_control = []        if file_path.suffix.lower() in ['.html''.htm''.php']:            cache_control.append('no-cache')        else:            cache_control.append('public')            cache_control.append('max-age=86400')  # 24小时        response.set_header('Cache-Control'', '.join(cache_control))        # 内容协商:压缩        accept_encoding = request.get_header('Accept-Encoding''')        if 'gzip' in accept_encoding and len(response.content) > 1024:  # 大于1KB才压缩            if response.compress('gzip'):                response.set_header('Vary''Accept-Encoding')    def _list_directory(self, dir_path: Path, url_path: Path) -> HTTPResponse:        """列出目录内容"""        try:            items = []            for item in dir_path.iterdir():                is_dir = item.is_dir()                name = item.name                size = item.stat().st_size if not is_dir else '-'                mtime = datetime.fromtimestamp(item.stat().st_mtime)                items.append({                    'name': name,                    'is_dir': is_dir,                    'size': size,                    'mtime': mtime.strftime('%Y-%m-%d %H:%M'),                    'url': quote(str(url_path / name))                })            # 按目录优先排序            items.sort(key=lambda x: (not x['is_dir'], x['name'].lower()))            # 生成HTML            html_lines = [                '<!DOCTYPE html>',                '<html><head><title>目录列表</title>',                '<style>',                'body { font-family: Arial, sans-serif; margin: 20px; }',                'table { border-collapse: collapse; width: 100%; }',                'th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }',                'th { background-color: #f2f2f2; }',                'tr:hover { background-color: #f5f5f5; }',                '.dir { color: #1976d2; }',                '.size { text-align: right; }',                '</style>',                '</head><body>',                f'<h1>目录: /{url_path}</h1>',                '<table>',                '<tr><th>名称</th><th>大小</th><th>修改时间</th></tr>'            ]            # 父目录链接            if url_path.parent != url_path:                parent_url = quote(str(url_path.parent)) if str(url_path.parent) != '.' else '/'                html_lines.append(                    f'<tr><td colspan="3"><a href="{parent_url}">../</a></td></tr>'                )            for item in items:                icon = '📁' if item['is_dir'else '📄'                size_display = f"{item['size']:,}" if isinstance(item['size'], intelse item['size']                html_lines.append(                    f'<tr>'                    f'<td>{icon} <a href="{item["url"]}" class="{"dir"if item["is_dir"else""}">{item["name"]}</a></td>'                    f'<td class="size">{size_display}</td>'                    f'<td>{item["mtime"]}</td>'                    f'</tr>'                )            html_lines.extend([                '</table>',                '</body></html>'            ])            html = '\n'.join(html_lines)            response = HTTPResponse(                content=html.encode('utf-8'),                content_type="text/html; charset=utf-8"            )            return response        except Exception as e:            return HTTPResponse.make_error_response(500f"列出目录失败: {e}")    def _handle_api_status(self, request: HTTPRequest) -> HTTPResponse:        """处理API状态请求"""        import psutil        import platform        status_info = {            'server': {                'name''PythonHTTPServer',                'version''1.0',                'host'self.host,                'port'self.port,                'uptime''运行中'            },            'system': {                'platform': platform.platform(),                'python_version': platform.python_version(),                'cpu_count': psutil.cpu_count(),                'memory_percent': psutil.virtual_memory().percent,                'disk_usage': psutil.disk_usage('/').percent            },            'connections': {                'total'len(self.connections),                'active'sum(1 for c in self.connections.values() if c['keep_alive']),                'requests_processed'sum(c['requests_processed'for c in self.connections.values())            }        }        response = HTTPResponse(            content=json.dumps(status_info, indent=2).encode('utf-8'),            content_type="application/json"        )        return response    def _handle_api_echo(self, request: HTTPRequest) -> HTTPResponse:        """处理回显请求"""        echo_data = {            'method': request.method,            'path': request.path,            'query_params': request.query_params,            'headers'dict(request.headers),            'cookies': request.cookies,            'client': {                'ip': request.client_ip,                'port': request.client_port            },            'body': request.body.decode('utf-8', errors='ignore'if request.body else None,            'timestamp': datetime.now().isoformat()        }        response = HTTPResponse(            content=json.dumps(echo_data, indent=2).encode('utf-8'),            content_type="application/json"        )        return response    def _handle_api_time(self, request: HTTPRequest) -> HTTPResponse:        """处理时间请求"""        time_data = {            'iso': datetime.now().isoformat(),            'timestamp': time.time(),            'utc': datetime.utcnow().isoformat() + 'Z',            'server_timezone'str(datetime.now().astimezone().tzinfo)        }        response = HTTPResponse(            content=json.dumps(time_data, indent=2).encode('utf-8'),            content_type="application/json"        )        # 设置缓存控制        response.set_header('Cache-Control''no-cache, must-revalidate')        return response    def _handle_form_get(self, request: HTTPRequest) -> HTTPResponse:        """处理GET表单"""        html = """        <!DOCTYPE html>        <html>        <head>            <title>表单测试</title>            <style>                body { font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; }                .form-group { margin-bottom: 15px; }                label { display: block; margin-bottom: 5px; font-weight: bold; }                input, textarea, select { width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px; }                button { background: #1976d2; color: white; border: none; padding: 10px 20px; border-radius: 4px; cursor: pointer; }                button:hover { background: #1565c0; }                .result { background: #f5f5f5; padding: 15px; border-radius: 4px; margin-top: 20px; }            </style>        </head>        <body>            <h1>表单测试</h1>            <form method="POST" action="/form" enctype="application/x-www-form-urlencoded">                <div class="form-group">                    <label for="username">用户名:</label>                    <input type="text" id="username" name="username" required>                </div>                <div class="form-group">                    <label for="email">邮箱:</label>                    <input type="email" id="email" name="email" required>                </div>                <div class="form-group">                    <label for="message">留言:</label>                    <textarea id="message" name="message" rows="4"></textarea>                </div>                <div class="form-group">                    <label for="country">国家:</label>                    <select id="country" name="country">                        <option value="">请选择</option>                        <option value="CN">中国</option>                        <option value="US">美国</option>                        <option value="JP">日本</option>                        <option value="UK">英国</option>                    </select>                </div>                <div class="form-group">                    <label>兴趣:</label>                    <div>                        <label><input type="checkbox" name="interests" value="sports"> 运动</label>                        <label><input type="checkbox" name="interests" value="music"> 音乐</label>                        <label><input type="checkbox" name="interests" value="reading"> 阅读</label>                        <label><input type="checkbox" name="interests" value="travel"> 旅行</label>                    </div>                </div>                <button type="submit">提交</button>            </form>        </body>        </html>        """        response = HTTPResponse(            content=html.encode('utf-8'),            content_type="text/html; charset=utf-8"        )        return response    def _handle_form_post(self, request: HTTPRequest) -> HTTPResponse:        """处理POST表单"""        try:            # 解析表单数据            from urllib.parse import parse_qs            body_text = request.body.decode('utf-8', errors='ignore')            form_data = parse_qs(body_text)            # 准备结果            result_html = '<h2>提交成功</h2><div class="result"><h3>提交的数据:</h3><ul>'            for key, values in form_data.items():                for value in values:                    result_html += f'<li><strong>{key}:</strong> {value}</li>'            result_html += '</ul></div>'            html = f"""            <!DOCTYPE html>            <html>            <head>                <title>表单提交结果</title>                <style>                    body {{ font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; }}                    .result {{ background: #f5f5f5; padding: 15px; border-radius: 4px; margin: 20px 0; }}                    .back-link {{ display: inline-block; margin-top: 20px; padding: 10px 20px; background: #1976d2; color: white; text-decoration: none; border-radius: 4px; }}                    .back-link:hover {{ background: #1565c0; }}                </style>            </head>            <body>                <h1>表单提交结果</h1>                {result_html}                <a href="/form" class="back-link">返回表单</a>            </body>            </html>            """            response = HTTPResponse(                content=html.encode('utf-8'),                content_type="text/html; charset=utf-8"            )            # 设置cookie示例            response.add_cookie(                'form_submission',                'success',                max_age=3600,                path='/',                http_only=True,                same_site='Lax'            )            return response        except Exception as e:            return HTTPResponse.make_error_response(400f"表单处理失败: {e}")    def stop(self):        """停止服务器"""        self.is_running = False        if self.server_socket:            try:                self.server_socket.close()            except:                pass        self.executor.shutdown(wait=True)        print("HTTP服务器已停止")

2.2 中间件系统

"""HTTP中间件系统"""import timefrom typing import CallableOptionalimport refrom functools import wrapsclass HTTPMiddleware:    """HTTP中间件基类"""    def __init__(self):        self.priority = 0  # 优先级,越小越先执行    def process_request(self, request):        """处理请求"""        return None    def process_response(self, request, response):        """处理响应"""        return responseclass LoggingMiddleware(HTTPMiddleware):    """日志中间件"""    def __init__(self, log_file: str = None):        super().__init__()        self.log_file = log_file        self.priority = 100  # 较高优先级    def process_request(self, request):        """记录请求日志"""        log_entry = {            'timestamp': time.time(),            'client_ip': request.client_ip,            'method': request.method,            'path': request.path,            'user_agent': request.get_header('User-Agent''Unknown'),            'content_length'len(request.body)        }        # 控制台输出        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] "              f"{request.client_ip} - {request.method}{request.path}")        # 文件日志        if self.log_file:            import json            with open(self.log_file, 'a'as f:                f.write(json.dumps(log_entry) + '\n')        return None    def process_response(self, request, response):        """记录响应日志"""        log_entry = {            'timestamp': time.time(),            'client_ip': request.client_ip,            'method': request.method,            'path': request.path,            'status_code': response.status_code,            'response_size'len(response.content)        }        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] "              f"{request.client_ip} - {request.method}{request.path} - "              f"{response.status_code} ({len(response.content)} bytes)")        return responseclass RateLimitMiddleware(HTTPMiddleware):    """速率限制中间件"""    def __init__(self, requests_per_minute: int = 60):        super().__init__()        self.requests_per_minute = requests_per_minute        self.request_counts = {}  # IP -> [请求时间戳列表]        self.priority = 50  # 较高优先级    def process_request(self, request):        """检查速率限制"""        client_ip = request.client_ip        current_time = time.time()        # 清理旧记录        one_minute_ago = current_time - 60        if client_ip in self.request_counts:            self.request_counts[client_ip] = [                t for t in self.request_counts[client_ip] if t > one_minute_ago            ]        else:            self.request_counts[client_ip] = []        # 检查是否超过限制        if len(self.request_counts[client_ip]) >= self.requests_per_minute:            # 创建429响应            from .http_response import HTTPResponse            response = HTTPResponse.make_error_response(                429"请求过于频繁,请稍后再试"            )            response.set_header('Retry-After''60')            return response        # 记录请求        self.request_counts[client_ip].append(current_time)        return None    def process_response(self, request, response):        """在响应中添加速率限制头部"""        client_ip = request.client_ip        if client_ip in self.request_counts:            remaining = self.requests_per_minute - len(self.request_counts[client_ip])            response.set_header('X-RateLimit-Limit'str(self.requests_per_minute))            response.set_header('X-RateLimit-Remaining'str(max(0, remaining)))        return responseclass AuthenticationMiddleware(HTTPMiddleware):    """认证中间件"""    def __init__(self, api_keys: set = None):        super().__init__()        self.api_keys = api_keys or set()        self.public_paths = {'/''/api/status''/api/time''/form'}        self.priority = 30  # 高优先级    def process_request(self, request):        """检查认证"""        # 公开路径不需要认证        if request.path in self.public_paths:            return None        # 检查API密钥        api_key = request.get_header('X-API-Key')        if api_key and api_key in self.api_keys:            return None        # 检查Basic认证        auth = request.get_basic_auth()        if auth:            username, password = auth            # 这里可以添加用户名密码验证逻辑            if username == 'admin' and password == 'secret':                return None        # 认证失败        from .http_response import HTTPResponse        response = HTTPResponse.make_error_response(401"需要认证")        response.set_header('WWW-Authenticate''Basic realm="API Access"')        return response    def process_response(self, request, response):        """在响应中添加认证头部"""        if response.status_code == 401:            response.set_header('WWW-Authenticate''Basic realm="API Access"')        return responseclass CORSMiddleware(HTTPMiddleware):    """CORS中间件"""    def __init__(self, allowed_origins: list = None, allowed_methods: list = None):        super().__init__()        self.allowed_origins = allowed_origins or ['*']        self.allowed_methods = allowed_methods or ['GET''POST''PUT''DELETE''OPTIONS']        self.priority = 90  # 较低优先级    def process_request(self, request):        """处理预检请求"""        if request.method == 'OPTIONS':            from .http_response import HTTPResponse            response = HTTPResponse(status_code=204)            self._add_cors_headers(request, response)            return response        return None    def process_response(self, request, response):        """添加CORS头部"""        self._add_cors_headers(request, response)        return response    def _add_cors_headers(self, request, response):        """添加CORS头部"""        origin = request.get_header('Origin')        if origin and (origin in self.allowed_origins or '*' in self.allowed_origins):            response.set_header('Access-Control-Allow-Origin', origin)            response.set_header('Access-Control-Allow-Methods'', '.join(self.allowed_methods))            response.set_header('Access-Control-Allow-Headers''Content-Type, Authorization, X-API-Key')            response.set_header('Access-Control-Allow-Credentials''true')            response.set_header('Access-Control-Max-Age''86400')  # 24小时class SecurityMiddleware(HTTPMiddleware):    """安全中间件"""    def __init__(self):        super().__init__()        self.priority = 10  # 最高优先级    def process_request(self, request):        """安全检查"""        # 检查路径遍历攻击        if self._detect_path_traversal(request.path):            from .http_response import HTTPResponse            return HTTPResponse.make_error_response(400"无效的请求路径")        # 检查SQL注入(简单模式匹配)        if self._detect_sql_injection(request.path + str(request.query_params)):            from .http_response import HTTPResponse            return HTTPResponse.make_error_response(400"请求包含可疑内容")        # 检查请求体大小        content_length = request.get_header('Content-Length')        if content_length and int(content_length) > 10 * 1024 * 1024:  # 10MB限制            from .http_response import HTTPResponse            return HTTPResponse.make_error_response(413"请求体过大")        return None    def process_response(self, request, response):        """添加安全头部"""        # 添加安全相关HTTP头部        response.set_header('X-Content-Type-Options''nosniff')        response.set_header('X-Frame-Options''DENY')        response.set_header('X-XSS-Protection''1; mode=block')        response.set_header('Referrer-Policy''strict-origin-when-cross-origin')        response.set_header('Strict-Transport-Security''max-age=31536000; includeSubDomains')        # 内容安全策略        csp = [            "default-src 'self'",            "script-src 'self' 'unsafe-inline'",            "style-src 'self' 'unsafe-inline'",            "img-src 'self' data: https:",            "font-src 'self'",            "connect-src 'self'",            "frame-ancestors 'none'",            "form-action 'self'"        ]        response.set_header('Content-Security-Policy''; '.join(csp))        return response    def _detect_path_traversal(self, path: str) -> bool:        """检测路径遍历攻击"""        # 检查是否包含父目录引用        if '..' in path or '../' in path or '/..' in path:            return True        # 检查绝对路径        if path.startswith('/'and len(path) > 1 and path[1] == '/':            return True        # 检查空字节注入(已弃用,但保持检查)        if '\x00' in path:            return True        return False    def _detect_sql_injection(self, text: str) -> bool:        """检测SQL注入尝试(简单模式)"""        patterns = [            r"'\s+OR\s+'['1']=[('1']",            r"'\s+UNION\s+SELECT",            r"';.*--",            r"'\s+AND\s+\d+=\d+",            r"EXEC(\s+|\().*",        ]        for pattern in patterns:            if re.search(pattern, text, re.IGNORECASE):                return True        return Falsedef middleware_decorator(middleware_class):    """中间件装饰器(用于路由处理器)"""    def decorator(handler):        @wraps(handler)        def wrapped(request, *args, **kwargs):            # 创建中间件实例            middleware = middleware_class()            # 处理请求            middleware_result = middleware.process_request(request)            if middleware_result is not None:                return middleware_result            # 执行原始处理器            response = handler(request, *args, **kwargs)            # 处理响应            response = middleware.process_response(request, response)            return response        return wrapped    return decorator# 使用示例def create_server_with_middleware():    """创建带中间件的服务器"""    server = HTTPServer(host='localhost', port=8080)    # 添加中间件    server.add_middleware(LoggingMiddleware('access.log'))    server.add_middleware(SecurityMiddleware())    server.add_middleware(CORSMiddleware(allowed_origins=['http://localhost:3000']))    server.add_middleware(RateLimitMiddleware(requests_per_minute=30))    # 添加带认证的API密钥    api_keys = {'secret-key-123''another-key-456'}    server.add_middleware(AuthenticationMiddleware(api_keys=api_keys))    # 自定义路由使用中间件装饰器    @middleware_decorator(RateLimitMiddleware(requests_per_minute=10))    def handle_sensitive(request):        """敏感端点,需要更严格的速率限制"""        return HTTPResponse(content=b"Sensitive data", content_type="text/plain")    server.add_route('GET''/sensitive', handle_sensitive)    return server

3. HTTP客户端实现

3.1 完整的HTTP/1.1客户端

"""完整的HTTP/1.1客户端实现"""import socketimport sslimport urllib.parsefrom typing import OptionalDictTupleListfrom dataclasses import dataclassfrom datetime import datetimeimport json@dataclassclass HTTPClientConfig:    """HTTP客户端配置"""    timeout: float = 30.0    max_redirects: int = 5    verify_ssl: bool = True    user_agent: str = "PythonHTTPClient/1.0"    accept_encoding: str = "gzip, deflate"    default_headers: Dict = None    proxy: Optional[str] = None    enable_cookies: bool = True    enable_compression: bool = True    def __post_init__(self):        if self.default_headers is None:            self.default_headers = {                'Accept''*/*',                'Accept-Language''en-US,en;q=0.9',                'Cache-Control''no-cache',                'Connection''close',  # 默认关闭连接            }class HTTPResponse:    """HTTP响应(客户端版)"""    def __init__(self, raw_response: bytes, url: str):        self.url = url        self.raw = raw_response        self.status_code = 0        self.reason = ""        self.headers = {}        self.cookies = {}        self.body = b""        self._parse()    def _parse(self):        """解析HTTP响应"""        try:            # 分割头部和正文            parts = self.raw.split(b'\r\n\r\n'1)            headers_part = parts[0].decode('utf-8', errors='ignore')            if len(parts) > 1:                self.body = parts[1]            # 解析状态行            lines = headers_part.split('\r\n')            if not lines:                raise ValueError("空响应")            # 状态行            status_line = lines[0]            version, status_code, reason = status_line.split(' '2)            self.status_code = int(status_code)            self.reason = reason            # 解析头部            for line in lines[1:]:                if ': ' in line:                    key, value = line.split(': '1)                    self.headers[key] = value            # 解析cookie            self._parse_cookies()        except Exception as e:            raise ValueError(f"解析HTTP响应失败: {e}")    def _parse_cookies(self):        """解析Set-Cookie头部"""        set_cookie = self.headers.get('Set-Cookie')        if set_cookie:            # 可能有多个Set-Cookie头部            if isinstance(set_cookie, list):                cookie_strings = set_cookie            else:                cookie_strings = [set_cookie]            for cookie_str in cookie_strings:                # 简单解析,实际需要更复杂的解析                cookie_parts = cookie_str.split(';')[0].strip()                if '=' in cookie_parts:                    key, value = cookie_parts.split('='1)                    self.cookies[key] = value    def json(self) -> Optional[Dict]:        """解析JSON响应"""        try:            return json.loads(self.body.decode('utf-8'))        except:            return None    def text(self, encoding: str = 'utf-8') -> str:        """获取文本响应"""        return self.body.decode(encoding, errors='ignore')    def is_redirect(self) -> bool:        """是否是重定向响应"""        return self.status_code in [301302303307308]    def get_redirect_location(self) -> Optional[str]:        """获取重定向位置"""        return self.headers.get('Location')    def __str__(self) -> str:        """字符串表示"""        return f"{self.status_code}{self.reason} ({len(self.body)} bytes)"class HTTPConnection:    """HTTP连接管理"""    def __init__(self, host: str, port: int, ssl_context: bool = False):        self.host = host        self.port = port        self.ssl_context = ssl_context        self.socket = None        self.is_connected = False        self.last_used = None    def connect(self, timeout: float = 30.0) -> bool:        """建立连接"""        try:            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            self.socket.settimeout(timeout)            self.socket.connect((self.host, self.port))            if self.ssl_context:                context = ssl.create_default_context()                self.socket = context.wrap_socket(                    self.socket,                    server_hostname=self.host                )            self.is_connected = True            self.last_used = datetime.now()            return True        except Exception as e:            print(f"连接失败 {self.host}:{self.port}{e}")            return False    def send_request(self, request: bytes) -> bytes:        """发送请求并接收响应"""        if not self.is_connected:            raise ConnectionError("未连接")        try:            # 发送请求            self.socket.sendall(request)            # 接收响应            response = b""            while True:                try:                    chunk = self.socket.recv(4096)                    if not chunk:                        break                    response += chunk                    # 检查是否接收完整(简化实现)                    if b'\r\n\r\n' in response:                        # 尝试根据Content-Length判断是否接收完整                        header_end = response.find(b'\r\n\r\n')                        headers_part = response[:header_end].decode('utf-8', errors='ignore')                        # 查找Content-Length                        import re                        match = re.search(r'Content-Length:\s*(\d+)', headers_part, re.IGNORECASE)                        if match:                            content_length = int(match.group(1))                            body_start = header_end + 4                            body_received = len(response) - body_start                            if body_received >= content_length:                                break                        # 对于分块传输编码,需要特殊处理                        if 'Transfer-Encoding: chunked' in headers_part:                            # 简化处理:检查是否以0\r\n\r\n结束                            if response.endswith(b'0\r\n\r\n'):                                break                except socket.timeout:                    break                except Exception as e:                    print(f"接收响应失败: {e}")                    break            self.last_used = datetime.now()            return response        except Exception as e:            self.close()            raise ConnectionError(f"发送/接收失败: {e}")    def close(self):        """关闭连接"""        if self.socket:            try:                self.socket.close()            except:                pass            self.socket = None        self.is_connected = Falseclass HTTPClient:    """完整的HTTP/1.1客户端"""    def __init__(self, config: HTTPClientConfig = None):        self.config = config or HTTPClientConfig()        self.connections = {}  # (host, port, ssl) -> HTTPConnection        self.cookie_jar = {}   # domain -> {name: value}        self.session_headers = {}    def request(self, method: str, url: str                headers: Dict = None, data: bytes = None,                params: Dict = None, json_data: Dict = None,                allow_redirects: bool = True) -> HTTPResponse:        """发送HTTP请求"""        # 解析URL        parsed = urllib.parse.urlparse(url)        scheme = parsed.scheme.lower()        hostname = parsed.hostname        port = parsed.port or (443 if scheme == 'https' else 80)        path = parsed.path or '/'        query = parsed.query        # 添加查询参数        if params:            query_params = urllib.parse.parse_qs(query)            query_params.update(params)            query = urllib.parse.urlencode(query_params, doseq=True)        full_path = path        if query:            full_path += f'?{query}'        # 准备请求数据        if json_data is not None:            if data is not None:                raise ValueError("不能同时提供data和json_data")            data = json.dumps(json_data).encode('utf-8')            if headers is None:                headers = {}            headers['Content-Type'] = 'application/json'        # 构建请求        request_lines = self._build_request(            method, hostname, port, full_path,            headers, data        )        # 发送请求(支持重定向)        response = self._send_request_with_redirects(            scheme, hostname, port, request_lines,             allow_redirects, 0        )        return response    def _build_request(self, method: str, hostname: str, port: int,                      path: str, headers: Dict, data: bytes) -> bytes:        """构建HTTP请求"""        # 默认头部        request_headers = self.config.default_headers.copy()        request_headers.update(self.session_headers)        # 自定义头部        if headers:            request_headers.update(headers)        # 必需头部        request_headers['Host'] = f'{hostname}:{port}' if port not in [80443else hostname        request_headers['User-Agent'] = self.config.user_agent        # 添加cookie        if self.config.enable_cookies:            cookie_header = self._get_cookie_header(hostname, path)            if cookie_header:                request_headers['Cookie'] = cookie_header        # 添加Accept-Encoding        if self.config.enable_compression:            request_headers['Accept-Encoding'] = self.config.accept_encoding        # 如果有请求体,添加Content-Length        if data:            request_headers['Content-Length'] = str(len(data))        # 构建请求        request_line = f"{method.upper()}{path} HTTP/1.1\r\n"        headers_text = ''        for key, value in request_headers.items():            headers_text += f"{key}{value}\r\n"        request_text = request_line + headers_text + '\r\n'        request_bytes = request_text.encode('utf-8')        if data:            request_bytes += data        return request_bytes    def _get_cookie_header(self, domain: str, path: str) -> str:        """获取cookie头部"""        cookies = []        # 检查该域名的cookie        for cookie_domain, domain_cookies in self.cookie_jar.items():            if domain.endswith(cookie_domain) or cookie_domain == domain:                for name, cookie_info in domain_cookies.items():                    # 检查路径匹配                    if 'path' in cookie_info:                        cookie_path = cookie_info['path']                        if not path.startswith(cookie_path):                            continue                    # 检查过期时间                    if 'expires' in cookie_info:                        expires = cookie_info['expires']                        if isinstance(expires, (intfloat)):                            if time.time() > expires:                                continue                    cookies.append(f"{name}={cookie_info['value']}")        return '; '.join(cookies) if cookies else ''    def _update_cookies(self, domain: str, response: HTTPResponse):        """更新cookie jar"""        set_cookie = response.headers.get('Set-Cookie')        if not set_cookie:            return        # 可能有多个Set-Cookie头部        if isinstance(set_cookie, list):            cookie_strings = set_cookie        else:            cookie_strings = [set_cookie]        for cookie_str in cookie_strings:            self._parse_and_store_cookie(domain, cookie_str)    def _parse_and_store_cookie(self, domain: str, cookie_str: str):        """解析并存储cookie"""        parts = [part.strip() for part in cookie_str.split(';')]        if not parts:            return        # 第一个部分是name=value        name_value = parts[0]        if '=' not in name_value:            return        name, value = name_value.split('='1)        # 解析属性        cookie_info = {'value': value}        for part in parts[1:]:            if '=' in part:                attr_name, attr_value = part.split('='1)                attr_name = attr_name.lower()                if attr_name == 'expires':                    # 解析过期时间                    try:                        from email.utils import parsedate                        expires_date = parsedate(attr_value)                        if expires_date:                            import time                            expires_time = time.mktime(expires_date)                            cookie_info['expires'] = expires_time                    except:                        pass                elif attr_name == 'max-age':                    try:                        max_age = int(attr_value)                        import time                        cookie_info['expires'] = time.time() + max_age                    except:                        pass                elif attr_name in ['domain''path''samesite']:                    cookie_info[attr_name] = attr_value                elif attr_name in ['secure''httponly']:                    cookie_info[attr_name] = True        # 确定存储的域名        cookie_domain = cookie_info.get('domain', domain)        if cookie_domain.startswith('.'):            cookie_domain = cookie_domain[1:]        # 存储cookie        if cookie_domain not in self.cookie_jar:            self.cookie_jar[cookie_domain] = {}        self.cookie_jar[cookie_domain][name] = cookie_info    def _send_request_with_redirects(self, scheme: str, hostname: str, port: int,                                   request_bytes: bytes, allow_redirects: bool,                                   redirect_count: int) -> HTTPResponse:        """发送请求,处理重定向"""        if redirect_count >= self.config.max_redirects:            raise RuntimeError(f"超过最大重定向次数: {self.config.max_redirects}")        # 获取或创建连接        ssl_context = (scheme == 'https')        connection_key = (hostname, port, ssl_context)        if connection_key not in self.connections:            connection = HTTPConnection(hostname, port, ssl_context)            if not connection.connect(self.config.timeout):                raise ConnectionError(f"无法连接到 {hostname}:{port}")            self.connections[connection_key] = connection        else:            connection = self.connections[connection_key]        # 发送请求        try:            response_bytes = connection.send_request(request_bytes)        except ConnectionError:            # 连接失败,重新连接            connection.close()            del self.connections[connection_key]            connection = HTTPConnection(hostname, port, ssl_context)            if not connection.connect(self.config.timeout):                raise            self.connections[connection_key] = connection            response_bytes = connection.send_request(request_bytes)        # 解析响应        response_url = f"{scheme}://{hostname}:{port}"        response = HTTPResponse(response_bytes, response_url)        # 更新cookie        if self.config.enable_cookies:            self._update_cookies(hostname, response)        # 处理重定向        if allow_redirects and response.is_redirect():            location = response.get_redirect_location()            if location:                # 绝对URL或相对URL                if location.startswith(('http://''https://')):                    redirect_url = location                else:                    # 构建绝对URL                    redirect_url = urllib.parse.urljoin(response_url, location)                # 递归处理重定向                return self.request(                    'GET' if response.status_code in [301302303else method,                    redirect_url,                    allow_redirects=True,                    headers={'Referer': response_url}                )        return response    def get(self, url: str, **kwargs) -> HTTPResponse:        """发送GET请求"""        return self.request('GET', url, **kwargs)    def post(self, url: str, data: bytes = None, **kwargs) -> HTTPResponse:        """发送POST请求"""        return self.request('POST', url, data=data, **kwargs)    def put(self, url: str, data: bytes = None, **kwargs) -> HTTPResponse:        """发送PUT请求"""        return self.request('PUT', url, data=data, **kwargs)    def delete(self, url: str, **kwargs) -> HTTPResponse:        """发送DELETE请求"""        return self.request('DELETE', url, **kwargs)    def head(self, url: str, **kwargs) -> HTTPResponse:        """发送HEAD请求"""        return self.request('HEAD', url, **kwargs)    def options(self, url: str, **kwargs) -> HTTPResponse:        """发送OPTIONS请求"""        return self.request('OPTIONS', url, **kwargs)    def set_session_header(self, key: str, value: str):        """设置会话头部"""        self.session_headers[key] = value    def clear_session_headers(self):        """清除会话头部"""        self.session_headers.clear()    def clear_cookies(self):        """清除所有cookie"""        self.cookie_jar.clear()    def close(self):        """关闭所有连接"""        for connection in self.connections.values():            connection.close()        self.connections.clear()# 高级功能:连接池class HTTPConnectionPool:    """HTTP连接池"""    def __init__(self, max_size: int = 10, idle_timeout: float = 30.0):        self.max_size = max_size        self.idle_timeout = idle_timeout        self.pool = {}  # (host, port, ssl) -> [connections]        self.lock = threading.RLock()    def get_connection(self, host: str, port: int, ssl: bool) -> Optional[HTTPConnection]:        """从池中获取连接"""        key = (host, port, ssl)        with self.lock:            if key in self.pool and self.pool[key]:                # 清理空闲连接                self._clean_idle_connections(key)                if self.pool[key]:                    connection = self.pool[key].pop(0)                    # 检查连接是否仍然有效                    if connection.is_connected:                        return connection                    else:                        # 连接已失效,关闭并创建新的                        connection.close()            return None    def return_connection(self, connection: HTTPConnection):        """归还连接到池中"""        if not connection.is_connected:            connection.close()            return        key = (connection.host, connection.port, connection.ssl_context)        with self.lock:            if key not in self.pool:                self.pool[key] = []            # 检查池是否已满            if len(self.pool[key]) < self.max_size:                self.pool[key].append(connection)            else:                connection.close()    def _clean_idle_connections(self, key):        """清理空闲连接"""        current_time = datetime.now()        if key in self.pool:            valid_connections = []            for connection in self.pool[key]:                idle_time = (current_time - connection.last_used).total_seconds()                if idle_time < self.idle_timeout and connection.is_connected:                    valid_connections.append(connection)                else:                    connection.close()            self.pool[key] = valid_connections    def close_all(self):        """关闭所有连接"""        with self.lock:            for connections in self.pool.values():                for connection in connections:                    connection.close()            self.pool.clear()# 使用示例def test_http_client():    """测试HTTP客户端"""    import time    # 创建客户端    config = HTTPClientConfig(        timeout=10.0,        max_redirects=3,        user_agent="MyHTTPClient/1.0",        enable_cookies=True,        enable_compression=True    )    client = HTTPClient(config)    try:        # 测试GET请求        print("=== 测试GET请求 ===")        response = client.get('https://httpbin.org/get')        print(f"状态码: {response.status_code}")        print(f"响应大小: {len(response.body)} 字节")        print(f"头部: {list(response.headers.keys())[:5]}...")        # 测试POST请求        print("\n=== 测试POST请求 ===")        post_data = {'key1''value1''key2''value2'}        response = client.post('https://httpbin.org/post', json_data=post_data)        if response.status_code == 200:            data = response.json()            print(f"POST成功,返回数据键: {list(data.keys())}")        # 测试带参数的GET请求        print("\n=== 测试带参数的GET请求 ===")        params = {'param1''value1''param2''value2'}        response = client.get('https://httpbin.org/get', params=params)        # 测试cookie        print("\n=== 测试cookie ===")        response = client.get('https://httpbin.org/cookies/set/test/123')        print(f"设置cookie响应: {response.status_code}")        # 测试重定向        print("\n=== 测试重定向 ===")        response = client.get('https://httpbin.org/redirect/2', allow_redirects=True)        print(f"重定向最终状态码: {response.status_code}")        # 测试会话头部        print("\n=== 测试会话头部 ===")        client.set_session_header('X-Custom-Header''my-value')        response = client.get('https://httpbin.org/headers')        if response.status_code == 200:            data = response.json()            print(f"自定义头部已发送: {data.get('headers', {}).get('X-Custom-Header')}")        # 性能测试:连续请求        print("\n=== 性能测试:连续请求 ===")        start_time = time.time()        for i in range(5):            response = client.get(f'https://httpbin.org/delay/{i % 3}')            print(f"请求 {i+1}{response.status_code} ({len(response.body)} bytes)")        elapsed = time.time() - start_time        print(f"总时间: {elapsed:.2f}秒")    except Exception as e:        print(f"测试失败: {e}")    finally:        client.close()    return client

4. HTTP/2实验性支持

"""HTTP/2实验性支持"""import sslimport socketimport structfrom typing import OptionalDictclass HTTP2Connection:    """HTTP/2连接(实验性)"""    # HTTP/2连接前言    CONNECTION_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"    # 帧类型    FRAME_TYPES = {        0x0"DATA",        0x1"HEADERS",        0x2"PRIORITY",        0x3"RST_STREAM",        0x4"SETTINGS",        0x5"PUSH_PROMISE",        0x6"PING",        0x7"GOAWAY",        0x8"WINDOW_UPDATE",        0x9"CONTINUATION"    }    def __init__(self, host: str, port: int = 443):        self.host = host        self.port = port        self.socket = None        self.settings = {}        self.next_stream_id = 1        self.is_connected = False    def connect(self) -> bool:        """建立HTTP/2连接"""        try:            # 创建TCP连接            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            sock.connect((self.host, self.port))            # 创建SSL上下文,启用ALPN            context = ssl.create_default_context()            context.set_alpn_protocols(['h2''http/1.1'])            # 包装socket            ssl_sock = context.wrap_socket(sock, server_hostname=self.host)            # 检查协商的协议            selected_protocol = ssl_sock.selected_alpn_protocol()            if selected_protocol != 'h2':                print(f"服务器不支持HTTP/2,使用: {selected_protocol}")                ssl_sock.close()                return False            self.socket = ssl_sock            self.is_connected = True            # 发送连接前言            self.socket.sendall(self.CONNECTION_PREFACE)            # 发送SETTINGS帧            self.send_settings()            # 接收服务器SETTINGS            self.receive_settings()            return True        except Exception as e:            print(f"HTTP/2连接失败: {e}")            return False    def send_settings(self, settings: Dict = None):        """发送SETTINGS帧"""        if settings is None:            settings = {                0x14096,    # SETTINGS_HEADER_TABLE_SIZE                0x21,       # SETTINGS_ENABLE_PUSH                0x3100,     # SETTINGS_MAX_CONCURRENT_STREAMS                0x465535,   # SETTINGS_INITIAL_WINDOW_SIZE                0x516384,   # SETTINGS_MAX_FRAME_SIZE                0x6256,     # SETTINGS_MAX_HEADER_LIST_SIZE            }        # 构建SETTINGS帧        frame_data = b''        for identifier, value in settings.items():            frame_data += struct.pack('!HI', identifier, value)        frame = self.build_frame(0x40x00, frame_data)        self.socket.sendall(frame)    def receive_settings(self):        """接收SETTINGS帧"""        try:            # 读取帧头            header = self.socket.recv(9)            if len(header) < 9:                return            # 解析帧头            length = struct.unpack('!I'b'\x00' + header[:3])[0]            frame_type = header[3]            flags = header[4]            stream_id = struct.unpack('!I'b'\x00' + header[5:])[0]            # 读取帧体            body = self.socket.recv(length)            if frame_type == 0x4:  # SETTINGS帧                self.parse_settings(body)            # 发送SETTINGS ACK            if flags & 0x1 == 0:  # 如果没有ACK标志                ack_frame = self.build_frame(0x40x10b'')                self.socket.sendall(ack_frame)        except Exception as e:            print(f"接收SETTINGS失败: {e}")    def parse_settings(self, data: bytes):        """解析SETTINGS帧数据"""        for i in range(0len(data), 6):            if i + 6 <= len(data):                identifier, value = struct.unpack('!HI', data[i:i+6])                self.settings[identifier] = value    def build_frame(self, frame_type: int, flags: int                   stream_id: int, data: bytes) -> bytes:        """构建HTTP/2帧"""        length = len(data)        frame_header = struct.pack('!I', length)[1:]  # 3字节长度        frame_header += bytes([frame_type, flags])        frame_header += struct.pack('!I', stream_id)[1:]  # 31位流ID        return frame_header + data    def send_request(self, method: str, path: str                    headers: Dict = None, body: bytes = None) -> int:        """发送HTTP/2请求,返回流ID"""        if not self.is_connected:            raise ConnectionError("未连接")        stream_id = self.next_stream_id        self.next_stream_id += 2  # 客户端使用奇数流ID        # 构建HEADERS帧        header_data = self.build_headers_frame_data(method, path, headers)        headers_frame = self.build_frame(0x10x5, stream_id, header_data)  # END_STREAM | END_HEADERS        self.socket.sendall(headers_frame)        # 如果有请求体,发送DATA帧        if body:            data_frame = self.build_frame(0x00x1, stream_id, body)  # END_STREAM            self.socket.sendall(data_frame)        return stream_id    def build_headers_frame_data(self, method: str, path: str                                headers: Dict) -> bytes:        """构建HEADERS帧数据(简化版,实际需要HPACK压缩)"""        # 注意:实际实现需要使用HPACK压缩头部        header_lines = [            (':method', method),            (':path', path),            (':scheme''https'),            (':authority'self.host),        ]        if headers:            for key, value in headers.items():                if not key.startswith(':'):  # 伪头部字段以:开头                    header_lines.append((key.lower(), value))        # 简单编码(实际需要HPACK)        encoded = b''        for key, value in header_lines:            encoded += f"{key}{value}\n".encode('utf-8')        return encoded    def receive_response(self, stream_id: int) -> Optional[Dict]:        """接收响应(简化实现)"""        try:            response = {                'headers': {},                'body'b'',                'status'0            }            while True:                # 读取帧头                header = self.socket.recv(9)                if len(header) < 9:                    break                length = struct.unpack('!I'b'\x00' + header[:3])[0]                frame_type = header[3]                flags = header[4]                recv_stream_id = struct.unpack('!I'b'\x00' + header[5:])[0]                # 只处理目标流的帧                if recv_stream_id != stream_id:                    # 跳过帧体                    if length > 0:                        self.socket.recv(length)                    continue                # 读取帧体                body = self.socket.recv(length)                if frame_type == 0x1:  # HEADERS帧                    self.parse_response_headers(body, response)                elif frame_type == 0x0:  # DATA帧                    response['body'] += body                    if flags & 0x1:  # END_STREAM                        break                # 其他帧类型...            return response        except Exception as e:            print(f"接收响应失败: {e}")            return None    def parse_response_headers(self, data: bytes, response: Dict):        """解析响应头部(简化版)"""        # 简单解析(实际需要HPACK解码)        headers_text = data.decode('utf-8', errors='ignore')        lines = headers_text.split('\n')        for line in lines:            if ': ' in line:                key, value = line.split(': '1)                if key == ':status':                    response['status'] = int(value)                else:                    response['headers'][key] = value    def close(self):        """关闭连接"""        if self.socket:            try:                # 发送GOAWAY帧                goaway_frame = self.build_frame(0x70x00, struct.pack('!II'00))                self.socket.sendall(goaway_frame)                self.socket.close()            except:                pass        self.is_connected = False# HTTP/2客户端包装class HTTP2Client:    """HTTP/2客户端(实验性)"""    def __init__(self):        self.connections = {}    def request(self, method: str, url: str, **kwargs) -> Optional[Dict]:        """发送HTTP/2请求"""        from urllib.parse import urlparse        parsed = urlparse(url)        if parsed.scheme != 'https':            print("HTTP/2只支持HTTPS")            return None        host = parsed.hostname        port = parsed.port or 443        path = parsed.path or '/'        # 获取或创建连接        key = (host, port)        if key not in self.connections:            connection = HTTP2Connection(host, port)            if not connection.connect():                return None            self.connections[key] = connection        connection = self.connections[key]        # 发送请求        headers = kwargs.get('headers', {})        body = kwargs.get('body')        if body and isinstance(body, str):            body = body.encode('utf-8')        stream_id = connection.send_request(method, path, headers, body)        # 接收响应        response = connection.receive_response(stream_id)        return response    def close(self):        """关闭所有连接"""        for connection in self.connections.values():            connection.close()        self.connections.clear()def test_http2():    """测试HTTP/2(需要支持HTTP/2的服务器)"""    client = HTTP2Client()    try:        # 注意:需要真正支持HTTP/2的服务器        # 例如:https://http2.golang.org/        print("测试HTTP/2连接(实验性)")        # 这个URL可能需要替换为实际支持HTTP/2的测试服务器        test_url = "https://httpbin.org/"  # 注意:httpbin.org可能不支持HTTP/2        response = client.request('GET', test_url)        if response:            print(f"状态码: {response.get('status')}")            print(f"头部: {list(response.get('headers', {}).keys())[:5]}")            print(f"响应体大小: {len(response.get('body'b''))} 字节")        else:            print("HTTP/2请求失败,可能服务器不支持")    except Exception as e:        print(f"HTTP/2测试失败: {e}")    finally:        client.close()

5. HTTP协议测试工具

"""HTTP协议测试工具"""import socketimport sslimport jsonfrom typing import DictListOptionalclass HTTPTester:    """HTTP协议测试工具"""    def __init__(self):        self.results = []    def test_http_compliance(self, url: str) -> Dict:        """测试HTTP协议合规性"""        from urllib.parse import urlparse        parsed = urlparse(url)        hostname = parsed.hostname        port = parsed.port or (443 if parsed.scheme == 'https' else 80)        tests = []        # 测试1: 基本连接        tests.append(self._test_connection(hostname, port, parsed.scheme == 'https'))        # 测试2: HTTP方法支持        tests.append(self._test_methods(hostname, port, parsed.scheme == 'https', parsed.path))        # 测试3: 头部支持        tests.append(self._test_headers(hostname, port, parsed.scheme == 'https', parsed.path))        # 测试4: 状态码处理        tests.append(self._test_status_codes(hostname, port, parsed.scheme == 'https', parsed.path))        # 测试5: 重定向处理        tests.append(self._test_redirects(hostname, port, parsed.scheme == 'https'))        # 汇总结果        total_tests = sum(len(t['tests']) for t in tests)        passed_tests = sum(sum(1 for st in t['tests'if st['passed']) for t in tests)        result = {            'url': url,            'hostname': hostname,            'port': port,            'scheme': parsed.scheme,            'total_tests': total_tests,            'passed_tests': passed_tests,            'pass_rate': (passed_tests / total_tests * 100if total_tests > 0 else 0,            'test_categories': tests,            'recommendations'self._generate_recommendations(tests)        }        self.results.append(result)        return result    def _test_connection(self, hostname: str, port: int, use_ssl: bool) -> Dict:        """测试连接"""        tests = []        # 测试TCP连接        try:            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            sock.settimeout(5)            sock.connect((hostname, port))            sock.close()            tests.append({'name''TCP连接''passed'True'details''TCP连接成功'})        except Exception as e:            tests.append({'name''TCP连接''passed'False'details'f'TCP连接失败: {e}'})        # 测试SSL/TLS(如果使用HTTPS)        if use_ssl:            try:                context = ssl.create_default_context()                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)                sock.settimeout(5)                ssl_sock = context.wrap_socket(sock, server_hostname=hostname)                ssl_sock.connect((hostname, port))                # 检查证书                cert = ssl_sock.getpeercert()                if cert:                    tests.append({'name''SSL证书''passed'True'details''SSL证书有效'})                else:                    tests.append({'name''SSL证书''passed'False'details''无SSL证书'})                # 检查TLS版本                version = ssl_sock.version()                if version in ['TLSv1.2''TLSv1.3']:                    tests.append({'name''TLS版本''passed'True'details'f'TLS版本: {version}'})                else:                    tests.append({'name''TLS版本''passed'False'details'f'不安全的TLS版本: {version}'})                ssl_sock.close()            except Exception as e:                tests.append({'name''SSL/TLS连接''passed'False'details'f'SSL/TLS连接失败: {e}'})        return {'category''连接测试''tests': tests}    def _test_methods(self, hostname: str, port: int, use_ssl: bool, path: str) -> Dict:        """测试HTTP方法支持"""        methods = ['GET''POST''PUT''DELETE''HEAD''OPTIONS']        tests = []        for method in methods:            try:                response = self._send_raw_request(hostname, port, use_ssl, method, path)                if response:                    status_line = response.split('\r\n')[0]                    status_code = int(status_line.split(' ')[1])                    if status_code != 405:  # 405 Method Not Allowed                        tests.append({                            'name'f'{method}方法',                            'passed'True,                            'details'f'状态码: {status_code}'                        })                    else:                        tests.append({                            'name'f'{method}方法',                            'passed'False,                            'details''方法不允许 (405)'                        })                else:                    tests.append({                        'name'f'{method}方法',                        'passed'False,                        'details''无响应'                    })            except Exception as e:                tests.append({                    'name'f'{method}方法',                    'passed'False,                    'details'f'测试失败: {e}'                })        return {'category''HTTP方法测试''tests': tests}    def _test_headers(self, hostname: str, port: int, use_ssl: bool, path: str) -> Dict:        """测试HTTP头部支持"""        tests = []        # 测试Host头部(HTTP/1.1必需)        try:            response = self._send_raw_request(                hostname, port, use_ssl, 'GET', path,                headers={'Host'f'{hostname}:{port}'}            )            if response:                tests.append({'name''Host头部''passed'True'details''支持Host头部'})            else:                tests.append({'name''Host头部''passed'False'details''不支持Host头部'})        except Exception as e:            tests.append({'name''Host头部''passed'False'details'f'测试失败: {e}'})        # 测试User-Agent头部        try:            custom_ua = 'HTTPTester/1.0'            response = self._send_raw_request(                hostname, port, use_ssl, 'GET', path,                headers={'User-Agent': custom_ua}            )            if response:                tests.append({'name''User-Agent头部''passed'True'details''支持自定义User-Agent'})            else:                tests.append({'name''User-Agent头部''passed'False'details''不支持自定义User-Agent'})        except Exception as e:            tests.append({'name''User-Agent头部''passed'False'details'f'测试失败: {e}'})        # 测试Accept-Encoding        try:            response = self._send_raw_request(                hostname, port, use_ssl, 'GET', path,                headers={'Accept-Encoding''gzip, deflate'}            )            if response:                # 检查响应头中是否有Content-Encoding                headers = self._parse_response_headers(response)                if 'Content-Encoding' in headers:                    encoding = headers['Content-Encoding']                    tests.append({'name''内容编码''passed'True'details'f'支持压缩: {encoding}'})                else:                    tests.append({'name''内容编码''passed'False'details''不支持压缩'})            else:                tests.append({'name''内容编码''passed'False'details''无响应'})        except Exception as e:            tests.append({'name''内容编码''passed'False'details'f'测试失败: {e}'})        return {'category''HTTP头部测试''tests': tests}    def _test_status_codes(self, hostname: str, port: int, use_ssl: bool, path: str) -> Dict:        """测试状态码处理"""        tests = []        # 测试正常请求        try:            response = self._send_raw_request(hostname, port, use_ssl, 'GET', path)            if response:                status_line = response.split('\r\n')[0]                status_code = int(status_line.split(' ')[1])                if 200 <= status_code < 300:                    tests.append({'name''2xx状态码''passed'True'details'f'成功: {status_code}'})                else:                    tests.append({'name''2xx状态码''passed'False'details'f'非成功状态码: {status_code}'})            else:                tests.append({'name''2xx状态码''passed'False'details''无响应'})        except Exception as e:            tests.append({'name''2xx状态码''passed'False'details'f'测试失败: {e}'})        # 测试404处理        try:            nonexistent_path = f'{path}/nonexistent-{int(time.time())}'            response = self._send_raw_request(hostname, port, use_ssl, 'GET', nonexistent_path)            if response:                status_line = response.split('\r\n')[0]                status_code = int(status_line.split(' ')[1])                if status_code == 404:                    tests.append({'name''404处理''passed'True'details''正确处理404'})                else:                    tests.append({'name''404处理''passed'False'details'f'错误状态码: {status_code}'})            else:                tests.append({'name''404处理''passed'False'details''无响应'})        except Exception as e:            tests.append({'name''404处理''passed'False'details'f'测试失败: {e}'})        return {'category''状态码测试''tests': tests}    def _test_redirects(self, hostname: str, port: int, use_ssl: bool) -> Dict:        """测试重定向处理"""        tests = []        # 注意:这个测试需要知道服务器上的重定向端点        # 这里只是一个示例框架        return {'category''重定向测试''tests': tests}    def _send_raw_request(self, hostname: str, port: int, use_ssl: bool,                         method: str, path: str, headers: Dict = None) -> Optional[str]:        """发送原始HTTP请求"""        try:            # 创建socket            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            sock.settimeout(5)            if use_ssl:                context = ssl.create_default_context()                sock = context.wrap_socket(sock, server_hostname=hostname)            sock.connect((hostname, port))            # 构建请求            request_lines = [f'{method}{path} HTTP/1.1']            # 默认头部            default_headers = {                'Host'f'{hostname}:{port}',                'User-Agent''HTTPTester/1.0',                'Connection''close'            }            if headers:                default_headers.update(headers)            for key, value in default_headers.items():                request_lines.append(f'{key}{value}')            request_text = '\r\n'.join(request_lines) + '\r\n\r\n'            # 发送请求            sock.sendall(request_text.encode('utf-8'))            # 接收响应            response = b''            while True:                try:                    chunk = sock.recv(4096)                    if not chunk:                        break                    response += chunk                except socket.timeout:                    break            sock.close()            return response.decode('utf-8', errors='ignore')        except Exception:            return None    def _parse_response_headers(self, response_text: str) -> Dict:        """解析响应头部"""        headers = {}        lines = response_text.split('\r\n')        if not lines:            return headers        # 跳过状态行        for line in lines[1:]:            if not line.strip():                break            if ': ' in line:                key, value = line.split(': '1)                headers[key] = value        return headers    def _generate_recommendations(self, test_categories: List[Dict]) -> List[str]:        """生成改进建议"""        recommendations = []        for category in test_categories:            failed_tests = [t for t in category['tests'if not t['passed']]            if failed_tests:                category_name = category['category']                recommendations.append(f"{category_name}: 有{len(failed_tests)}个测试失败")        # 通用建议        recommendations.extend([            "确保支持HTTP/1.1必需的Host头部",            "实现正确的状态码处理",            "支持常见的HTTP方法(GET、POST等)",            "启用内容压缩(gzip/deflate)",            "使用安全的TLS版本(TLSv1.2或更高)",            "提供有意义的错误页面",            "实现适当的缓存控制"        ])        return recommendations    def generate_report(self, output_file: str = None):        """生成测试报告"""        report = {            'timestamp': time.time(),            'tests_run'len(self.results),            'results'self.results,            'summary'self._generate_summary()        }        report_json = json.dumps(report, indent=2)        if output_file:            with open(output_file, 'w'as f:                f.write(report_json)        return report_json    def _generate_summary(self) -> Dict:        """生成测试摘要"""        if not self.results:            return {}        total_tests = sum(r['total_tests'for r in self.results)        passed_tests = sum(r['passed_tests'for r in self.results)        return {            'total_tests': total_tests,            'passed_tests': passed_tests,            'overall_pass_rate': (passed_tests / total_tests * 100if total_tests > 0 else 0,            'tested_urls': [r['url'for r in self.results]        }

总结

通过今天的学习,我们深入掌握了:

  • HTTP协议核心:理解了HTTP/1.1协议规范、方法、状态码和头部

  • 完整服务器实现:构建了支持HTTP/1.1、持久连接、路由、中间件的完整服务器

  • 高级客户端:实现了支持连接池、cookie管理、重定向的HTTP客户端

  • 协议扩展:探索了HTTP/2协议的基本原理和实现

  • 性能优化:学习了服务器性能调优和基准测试技术

关键收获:

  • HTTP协议是Web通信的基础,理解其细节对网络编程至关重要

  • 持久连接、管道化、分块传输是HTTP/1.1的重要优化

  • 中间件架构提供了灵活的功能扩展方式

  • 连接池和异步I/O是提高性能的关键技术

实践建议:

  • 使用今天的代码作为基础,构建自己的Web框架

  • 尝试实现更多HTTP功能,如WebSocket、Server-Sent Events

  • 学习现有框架(如Flask、Django)的HTTP处理实现

  • 深入研究HTTP/2和HTTP/3协议,实现完整支持

明天我们将学习WebSocket基础,这是实现实时双向通信的关键技术。

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-02-08 17:59:25 HTTP/2.0 GET : https://f.mffb.com.cn/a/464457.html
  2. 运行时间 : 0.222660s [ 吞吐率:4.49req/s ] 内存消耗:5,374.68kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=b098340bccae1827f404b5eac59eb18d
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000580s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000712s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.008072s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.003007s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000568s ]
  6. SELECT * FROM `set` [ RunTime:0.007259s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000585s ]
  8. SELECT * FROM `article` WHERE `id` = 464457 LIMIT 1 [ RunTime:0.039064s ]
  9. UPDATE `article` SET `lasttime` = 1770544765 WHERE `id` = 464457 [ RunTime:0.003658s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.009665s ]
  11. SELECT * FROM `article` WHERE `id` < 464457 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.002595s ]
  12. SELECT * FROM `article` WHERE `id` > 464457 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.001783s ]
  13. SELECT * FROM `article` WHERE `id` < 464457 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.008725s ]
  14. SELECT * FROM `article` WHERE `id` < 464457 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.023243s ]
  15. SELECT * FROM `article` WHERE `id` < 464457 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.012544s ]
0.224218s