今天我们将深入HTTP协议的Python实现,这是构建Web应用、API服务和网络爬虫的基础,也是理解现代Web通信的核心。
1. HTTP协议基础
1.1 HTTP协议全景
"""HTTP协议核心概念与版本演变"""import http.clientfrom typing import Dict, List, Tuplefrom dataclasses import dataclassfrom enum import Enumclass HTTPVersion(Enum): """HTTP版本""" HTTP_0_9 = "HTTP/0.9" # 1991年,只有GET方法 HTTP_1_0 = "HTTP/1.0" # 1996年,增加方法、头部、状态码 HTTP_1_1 = "HTTP/1.1" # 1997年,持久连接、管道化、分块传输 HTTP_2_0 = "HTTP/2.0" # 2015年,二进制协议、多路复用、头部压缩 HTTP_3_0 = "HTTP/3.0" # 2022年,基于QUIC、0-RTT连接class HTTPMethod(Enum): """HTTP方法""" GET = "GET" # 获取资源 POST = "POST" # 创建资源 PUT = "PUT" # 更新资源 DELETE = "DELETE" # 删除资源 HEAD = "HEAD" # 获取头部 OPTIONS = "OPTIONS" # 获取支持方法 PATCH = "PATCH" # 部分更新 TRACE = "TRACE" # 诊断 CONNECT = "CONNECT" # 隧道连接class HTTPStatusCategory(Enum): """HTTP状态码分类""" INFORMATIONAL = (100, 199, "信息响应") SUCCESS = (200, 299, "成功响应") REDIRECTION = (300, 399, "重定向") CLIENT_ERROR = (400, 499, "客户端错误") SERVER_ERROR = (500, 599, "服务器错误")@dataclassclass HTTPStatus: """HTTP状态码""" code: int phrase: str description: str @property def category(self) -> HTTPStatusCategory: """获取状态码分类""" for category in HTTPStatusCategory: start, end, _ = category.value if start <= self.code <= end: return category return Noneclass HTTPProtocolAnalyzer: """HTTP协议分析器""" # 常见状态码 STATUS_CODES = { 100: HTTPStatus(100, "Continue", "继续"), 101: HTTPStatus(101, "Switching Protocols", "切换协议"), 200: HTTPStatus(200, "OK", "成功"), 201: HTTPStatus(201, "Created", "已创建"), 204: HTTPStatus(204, "No Content", "无内容"), 301: HTTPStatus(301, "Moved Permanently", "永久重定向"), 302: HTTPStatus(302, "Found", "临时重定向"), 304: HTTPStatus(304, "Not Modified", "未修改"), 400: HTTPStatus(400, "Bad Request", "错误请求"), 401: HTTPStatus(401, "Unauthorized", "未授权"), 403: HTTPStatus(403, "Forbidden", "禁止访问"), 404: HTTPStatus(404, "Not Found", "未找到"), 405: HTTPStatus(405, "Method Not Allowed", "方法不允许"), 500: HTTPStatus(500, "Internal Server Error", "内部服务器错误"), 502: HTTPStatus(502, "Bad Gateway", "错误网关"), 503: HTTPStatus(503, "Service Unavailable", "服务不可用"), } @staticmethod def compare_http_versions() -> List[Dict]: """比较HTTP版本特性""" versions = [ { "version": "HTTP/1.0", "year": 1996, "key_features": [ "每个请求/响应独立的TCP连接", "添加头部字段", "状态码", "支持多种方法" ], "limitations": [ "连接开销大", "队头阻塞", "无主机头支持" ] }, { "version": "HTTP/1.1", "year": 1997, "key_features": [ "持久连接", "管道化", "分块传输编码", "主机头支持", "缓存控制" ], "limitations": [ "队头阻塞未完全解决", "头部重复传输" ] }, { "version": "HTTP/2", "year": 2015, "key_features": [ "二进制协议", "多路复用", "头部压缩(HPACK)", "服务器推送", "流优先级" ], "limitations": [ "仍基于TCP", "TCP队头阻塞" ] }, { "version": "HTTP/3", "year": 2022, "key_features": [ "基于QUIC/UDP", "0-RTT连接建立", "改进的多路复用", "更好的丢包处理", "连接迁移" ], "limitations": [ "部署复杂度", "防火墙兼容性" ] } ] return versions @staticmethod def parse_http_message(raw_message: str) -> Tuple[str, Dict, str]: """解析HTTP消息""" lines = raw_message.strip().split('\r\n') # 解析起始行 start_line = lines[0] # 解析头部 headers = {} body_start = 0 for i, line in enumerate(lines[1:], 1): if not line.strip(): # 空行表示头部结束 body_start = i + 1 break if ': ' in line: key, value = line.split(': ', 1) headers[key] = value # 解析消息体 body = '\r\n'.join(lines[body_start:]) if body_start < len(lines) else '' return start_line, headers, body @staticmethod def validate_headers(headers: Dict) -> List[str]: """验证HTTP头部""" warnings = [] # 检查必需头部 required_headers = ['Host'] # HTTP/1.1要求Host头部 for header in required_headers: if header not in headers: warnings.append(f"缺少必需头部: {header}") # 检查头部格式 for key, value in headers.items(): if '\r' in key or '\n' in key: warnings.append(f"无效头部键: {key}") if '\r' in value or '\n' in value: warnings.append(f"头部值包含换行: {key}") return warnings
2. 完整HTTP服务器实现
2.1 支持HTTP/1.1的完整服务器
"""完整的HTTP/1.1服务器实现"""import socketimport threadingimport mimetypesimport osimport timeimport jsonimport zlibfrom pathlib import Pathfrom datetime import datetime, timezonefrom typing import Optional, Dict, List, Tuple, Callablefrom urllib.parse import urlparse, parse_qs, quote, unquotefrom email.utils import formatdate, parsedateimport hashlibimport base64class HTTPRequest: """HTTP请求解析""" def __init__(self, raw_request: bytes, client_address: tuple): self.raw = raw_request self.client_ip, self.client_port = client_address self.method = "" self.path = "" self.query_params = {} self.version = "HTTP/1.0" self.headers = {} self.body = b"" self.cookies = {} self._parse() def _parse(self): """解析HTTP请求""" try: # 解码请求 request_text = self.raw.decode('utf-8', errors='ignore') lines = request_text.split('\r\n') if not lines: raise ValueError("空请求") # 解析请求行 request_line = lines[0] parts = request_line.split(' ', 2) if len(parts) != 3: raise ValueError(f"无效的请求行: {request_line}") self.method, full_path, self.version = parts # 解析路径和查询参数 parsed_url = urlparse(full_path) self.path = unquote(parsed_url.path) if parsed_url.path else '/' self.query_params = parse_qs(parsed_url.query) # 解析头部 header_end = 0 for i, line in enumerate(lines[1:], 1): if not line.strip(): header_end = i break if ': ' in line: key, value = line.split(': ', 1) self.headers[key] = value # 解析cookies if 'Cookie' in self.headers: self._parse_cookies(self.headers['Cookie']) # 解析消息体 if header_end > 0 and header_end < len(lines): body_lines = lines[header_end + 1:] self.body = '\r\n'.join(body_lines).encode('utf-8') # 处理分块传输编码 if self.headers.get('Transfer-Encoding', '').lower() == 'chunked': self._decode_chunked_body() except Exception as e: raise ValueError(f"解析HTTP请求失败: {e}") def _parse_cookies(self, cookie_header: str): """解析Cookie头部""" for cookie_pair in cookie_header.split(';'): cookie_pair = cookie_pair.strip() if '=' in cookie_pair: key, value = cookie_pair.split('=', 1) self.cookies[key.strip()] = value.strip() def _decode_chunked_body(self): """解码分块传输编码的正文""" try: data = self.body.decode('utf-8', errors='ignore') chunks = [] pos = 0 while pos < len(data): # 查找块大小行 chunk_size_end = data.find('\r\n', pos) if chunk_size_end == -1: break # 解析块大小(十六进制) chunk_size_str = data[pos:chunk_size_end].strip() chunk_size = int(chunk_size_str, 16) # 块大小为0表示结束 if chunk_size == 0: break # 获取块数据 chunk_start = chunk_size_end + 2 chunk_end = chunk_start + chunk_size if chunk_end > len(data): break chunks.append(data[chunk_start:chunk_end]) pos = chunk_end + 2 # 跳过CRLF self.body = ''.join(chunks).encode('utf-8') except Exception as e: raise ValueError(f"解码分块正文失败: {e}") def get_header(self, key: str, default: str = None) -> Optional[str]: """获取头部值(不区分大小写)""" key_lower = key.lower() for k, v in self.headers.items(): if k.lower() == key_lower: return v return default def get_content_type(self) -> str: """获取内容类型""" content_type = self.get_header('Content-Type', '') # 移除参数部分 return content_type.split(';')[0].strip() def is_keep_alive(self) -> bool: """检查是否保持连接""" connection = self.get_header('Connection', '').lower() if self.version == 'HTTP/1.1': return connection != 'close' else: # HTTP/1.0 return connection == 'keep-alive' def get_basic_auth(self) -> Optional[Tuple[str, str]]: """解析Basic认证""" auth_header = self.get_header('Authorization') if auth_header and auth_header.startswith('Basic '): try: encoded = auth_header[6:].strip() decoded = base64.b64decode(encoded).decode('utf-8') if ':' in decoded: username, password = decoded.split(':', 1) return username, password except: pass return None def __str__(self) -> str: """字符串表示""" return f"{self.method}{self.path}{self.version}"class HTTPResponse: """HTTP响应构建""" def __init__(self, status_code: int = 200, content: bytes = b"", content_type: str = "text/plain; charset=utf-8"): self.status_code = status_code self.headers = { 'Server': 'PythonHTTPServer/1.0', 'Date': formatdate(timeval=None, localtime=False, usegmt=True), 'Content-Type': content_type, } self.content = content self.cookies = {} self.version = "HTTP/1.1" def set_header(self, key: str, value: str): """设置响应头""" self.headers[key] = value def add_cookie(self, name: str, value: str, **kwargs): """添加Cookie""" cookie_parts = [f"{name}={value}"] # Cookie属性 if 'expires' in kwargs: if isinstance(kwargs['expires'], (int, float)): expires_time = kwargs['expires'] expires_date = datetime.fromtimestamp(expires_time, tz=timezone.utc) cookie_parts.append(f"Expires={expires_date.strftime('%a, %d %b %Y %H:%M:%S GMT')}") else: cookie_parts.append(f"Expires={kwargs['expires']}") if 'max_age' in kwargs: cookie_parts.append(f"Max-Age={kwargs['max_age']}") if 'domain' in kwargs: cookie_parts.append(f"Domain={kwargs['domain']}") if 'path' in kwargs: cookie_parts.append(f"Path={kwargs['path']}") if kwargs.get('secure', False): cookie_parts.append("Secure") if kwargs.get('http_only', False): cookie_parts.append("HttpOnly") if 'same_site' in kwargs: cookie_parts.append(f"SameSite={kwargs['same_site']}") self.cookies[name] = '; '.join(cookie_parts) def delete_cookie(self, name: str, path: str = "/", domain: str = None): """删除Cookie""" self.add_cookie( name, "", expires=0, max_age=0, path=path, domain=domain, http_only=True ) def redirect(self, location: str, status_code: int = 302): """重定向响应""" self.status_code = status_code self.set_header('Location', location) self.content = f"Redirecting to {location}".encode('utf-8') def to_bytes(self) -> bytes: """转换为字节""" # 构建状态行 status_phrase = self._get_status_phrase(self.status_code) status_line = f"{self.version}{self.status_code}{status_phrase}\r\n" # 设置内容长度 if 'Content-Length' not in self.headers: self.headers['Content-Length'] = str(len(self.content)) # 添加cookies到头部 for cookie_value in self.cookies.values(): self.headers.setdefault('Set-Cookie', []).append(cookie_value) # 构建头部 headers = [] for key, value in self.headers.items(): if isinstance(value, list): for v in value: headers.append(f"{key}: {v}") else: headers.append(f"{key}: {value}") headers_text = '\r\n'.join(headers) # 构建完整响应 response_text = f"{status_line}{headers_text}\r\n\r\n" response_bytes = response_text.encode('utf-8') + self.content return response_bytes def compress(self, method: str = 'gzip') -> bool: """压缩响应内容""" if not self.content: return False try: if method == 'gzip': compressed = zlib.compressobj( zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS | 16 # gzip格式 ) compressed_data = compressed.compress(self.content) compressed_data += compressed.flush() self.content = compressed_data self.set_header('Content-Encoding', 'gzip') self.set_header('Content-Length', str(len(compressed_data))) return True elif method == 'deflate': compressed = zlib.compressobj( zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS # deflate格式 ) compressed_data = compressed.compress(self.content) compressed_data += compressed.flush() self.content = compressed_data self.set_header('Content-Encoding', 'deflate') self.set_header('Content-Length', str(len(compressed_data))) return True except Exception as e: print(f"压缩失败: {e}") return False @staticmethod def _get_status_phrase(status_code: int) -> str: """获取状态短语""" status_map = { 200: "OK", 201: "Created", 204: "No Content", 301: "Moved Permanently", 302: "Found", 304: "Not Modified", 400: "Bad Request", 401: "Unauthorized", 403: "Forbidden", 404: "Not Found", 405: "Method Not Allowed", 413: "Payload Too Large", 500: "Internal Server Error", 501: "Not Implemented", 502: "Bad Gateway", 503: "Service Unavailable", } return status_map.get(status_code, "Unknown Status") @classmethod def make_error_response(cls, status_code: int, message: str = None) -> 'HTTPResponse': """创建错误响应""" if not message: message = cls._get_status_phrase(status_code) html = f""" <!DOCTYPE html> <html> <head> <title>{status_code} {message}</title> <style> body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }} h1 {{ color: #333; }} .error {{ color: #d32f2f; font-size: 72px; margin-bottom: 20px; }} .message {{ color: #666; font-size: 18px; }} .details {{ color: #999; font-size: 14px; margin-top: 30px; }} </style> </head> <body> <div class="error">{status_code}</div> <h1>{message}</h1> <div class="message">抱歉,发生了错误</div> <div class="details">PythonHTTPServer</div> </body> </html> """ response = cls( status_code=status_code, content=html.encode('utf-8'), content_type="text/html; charset=utf-8" ) return responseclass HTTPRoute: """HTTP路由""" def __init__(self, method: str, path: str, handler: Callable, name: str = None): self.method = method.upper() self.path = path self.handler = handler self.name = name self.pattern = self._compile_pattern(path) def _compile_pattern(self, path: str): """编译路径模式""" import re # 转换路径参数,如 /users/{id} -> /users/(?P<id>[^/]+) pattern = re.sub(r'\{(\w+)\}', r'(?P<\1>[^/]+)', path) pattern = f"^{pattern}$" return re.compile(pattern) def match(self, method: str, request_path: str) -> Optional[dict]: """匹配路由""" if self.method != method.upper(): return None match = self.pattern.match(request_path) if match: return match.groupdict() return Noneclass HTTPServer: """完整的HTTP/1.1服务器""" def __init__(self, host: str = 'localhost', port: int = 8080, web_root: str = '.', max_workers: int = 10): self.host = host self.port = port self.web_root = Path(web_root).resolve() self.server_socket = None self.is_running = False self.routes = [] self.middlewares = [] # 连接管理 self.connections = {} self.max_keep_alive = 100 # 最大保持连接数 self.keep_alive_timeout = 5 # 保持连接超时(秒) # 线程池 from concurrent.futures import ThreadPoolExecutor self.executor = ThreadPoolExecutor(max_workers=max_workers) # 初始化MIME类型 mimetypes.init() # 注册默认路由 self._register_default_routes() def _register_default_routes(self): """注册默认路由""" # 静态文件服务 self.add_route('GET', '/', self._handle_index) self.add_route('GET', '/static/{file_path:path}', self._handle_static_file) # API端点 self.add_route('GET', '/api/status', self._handle_api_status) self.add_route('GET', '/api/echo', self._handle_api_echo) self.add_route('POST', '/api/echo', self._handle_api_echo) self.add_route('GET', '/api/time', self._handle_api_time) # 表单处理 self.add_route('GET', '/form', self._handle_form_get) self.add_route('POST', '/form', self._handle_form_post) def add_route(self, method: str, path: str, handler: Callable, name: str = None): """添加路由""" route = HTTPRoute(method, path, handler, name) self.routes.append(route) def add_middleware(self, middleware: Callable): """添加中间件""" self.middlewares.append(middleware) def start(self): """启动服务器""" try: # 创建服务器socket self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 设置非阻塞(用于select) self.server_socket.setblocking(False) # 绑定地址 self.server_socket.bind((self.host, self.port)) self.server_socket.listen(128) # 较大的backlog print(f"HTTP服务器启动在 http://{self.host}:{self.port}") print(f"文档根目录: {self.web_root}") print("按 Ctrl+C 停止服务器") self.is_running = True # 使用select处理多个连接 import select inputs = [self.server_socket] outputs = [] message_queues = {} while self.is_running: try: # 等待socket事件 readable, writable, exceptional = select.select( inputs, outputs, inputs, 1.0 ) # 处理可读socket for s in readable: if s is self.server_socket: # 新连接 self._handle_new_connection(s, inputs) else: # 客户端数据到达 self._handle_client_data(s, inputs, outputs, message_queues) # 处理可写socket for s in writable: self._handle_writable_socket(s, outputs, message_queues) # 处理异常socket for s in exceptional: self._handle_exceptional_socket(s, inputs, outputs, message_queues) # 清理超时的keep-alive连接 self._cleanup_idle_connections() except KeyboardInterrupt: print("\n收到中断信号") break except Exception as e: print(f"服务器循环错误: {e}") if self.is_running: continue else: break except Exception as e: print(f"服务器启动失败: {e}") finally: self.stop() def _handle_new_connection(self, server_socket, inputs): """处理新连接""" try: client_socket, client_address = server_socket.accept() client_socket.setblocking(False) # 记录连接时间 self.connections[client_socket] = { 'address': client_address, 'last_activity': time.time(), 'keep_alive': False, 'requests_processed': 0 } inputs.append(client_socket) print(f"新连接: {client_address}") except Exception as e: print(f"接受连接失败: {e}") def _handle_client_data(self, client_socket, inputs, outputs, message_queues): """处理客户端数据""" try: data = client_socket.recv(4096) if data: # 更新活动时间 if client_socket in self.connections: self.connections[client_socket]['last_activity'] = time.time() # 将请求提交给线程池处理 future = self.executor.submit( self._process_request, client_socket, data ) # 将响应future放入队列 message_queues[client_socket] = future if client_socket not in outputs: outputs.append(client_socket) else: # 客户端关闭连接 self._cleanup_socket(client_socket, inputs, outputs, message_queues) except Exception as e: print(f"接收数据失败: {e}") self._cleanup_socket(client_socket, inputs, outputs, message_queues) def _process_request(self, client_socket, request_data): """处理请求(在线程池中执行)""" try: # 获取连接信息 conn_info = self.connections.get(client_socket, {}) client_address = conn_info.get('address', ('unknown', 0)) # 解析请求 request = HTTPRequest(request_data, client_address) # 应用中间件 for middleware in self.middlewares: result = middleware(request) if result is not None: # 中间件可以中断请求处理 return result # 查找匹配的路由 handler, path_params = self._find_route_handler(request) if handler: # 执行处理器 response = handler(request, **path_params) else: # 尝试作为静态文件处理 response = self._handle_static_file(request) # 更新连接信息 if client_socket in self.connections: self.connections[client_socket]['requests_processed'] += 1 self.connections[client_socket]['keep_alive'] = request.is_keep_alive() return response.to_bytes() except Exception as e: print(f"处理请求失败: {e}") error_response = HTTPResponse.make_error_response(500, str(e)) return error_response.to_bytes() def _find_route_handler(self, request: HTTPRequest) -> Tuple[Optional[Callable], dict]: """查找路由处理器""" for route in self.routes: path_params = route.match(request.method, request.path) if path_params is not None: return route.handler, path_params return None, {} def _handle_writable_socket(self, client_socket, outputs, message_queues): """处理可写socket""" try: future = message_queues.get(client_socket) if future and future.done(): try: response_data = future.result() client_socket.sendall(response_data) except Exception as e: print(f"发送响应失败: {e}") finally: # 从输出列表中移除 if client_socket in outputs: outputs.remove(client_socket) # 清理队列 if client_socket in message_queues: del message_queues[client_socket] # 如果不是keep-alive,关闭连接 conn_info = self.connections.get(client_socket, {}) if not conn_info.get('keep_alive', False): self._cleanup_socket(client_socket, None, None, None) except Exception as e: print(f"处理可写socket失败: {e}") self._cleanup_socket(client_socket, None, outputs, message_queues) def _handle_exceptional_socket(self, client_socket, inputs, outputs, message_queues): """处理异常socket""" print(f"socket异常: {client_socket}") self._cleanup_socket(client_socket, inputs, outputs, message_queues) def _cleanup_socket(self, client_socket, inputs, outputs, message_queues): """清理socket""" if inputs and client_socket in inputs: inputs.remove(client_socket) if outputs and client_socket in outputs: outputs.remove(client_socket) if message_queues and client_socket in message_queues: del message_queues[client_socket] if client_socket in self.connections: conn_info = self.connections[client_socket] print(f"连接关闭: {conn_info['address']}, " f"处理请求数: {conn_info['requests_processed']}") del self.connections[client_socket] try: client_socket.close() except: pass def _cleanup_idle_connections(self): """清理空闲连接""" current_time = time.time() idle_sockets = [] for sock, info in self.connections.items(): idle_time = current_time - info['last_activity'] if idle_time > self.keep_alive_timeout: idle_sockets.append(sock) for sock in idle_sockets: print(f"清理空闲连接: {self.connections.get(sock, {}).get('address')}") try: sock.close() except: pass if sock in self.connections: del self.connections[sock] # 默认路由处理器 def _handle_index(self, request: HTTPRequest) -> HTTPResponse: """处理首页""" html = """ <!DOCTYPE html> <html> <head> <title>Python HTTP Server</title> <style> body { font-family: Arial, sans-serif; margin: 40px; } h1 { color: #333; } .card { background: #f5f5f5; padding: 20px; margin: 20px 0; border-radius: 5px; } .endpoints { display: grid; grid-template-columns: repeat(auto-fill, minmax(300px, 1fr)); gap: 20px; } .endpoint { background: white; padding: 15px; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .method { display: inline-block; padding: 3px 8px; border-radius: 3px; font-weight: bold; margin-right: 10px; } .get { background: #e3f2fd; color: #1565c0; } .post { background: #e8f5e9; color: #2e7d32; } a { color: #1976d2; text-decoration: none; } a:hover { text-decoration: underline; } </style> </head> <body> <h1>Python HTTP Server</h1> <div class="card"> <p>这是一个完整的HTTP/1.1服务器实现,支持持久连接、路由、中间件等特性。</p> </div> <h2>可用端点</h2> <div class="endpoints"> <div class="endpoint"> <span class="method get">GET</span> <a href="/static/test.txt">/static/{file_path}</a> <p>静态文件服务</p> </div> <div class="endpoint"> <span class="method get">GET</span> <a href="/api/status">/api/status</a> <p>服务器状态</p> </div> <div class="endpoint"> <span class="method get">GET</span> <span class="method post">POST</span> <a href="/api/echo">/api/echo</a> <p>回显请求内容</p> </div> <div class="endpoint"> <span class="method get">GET</span> <a href="/api/time">/api/time</a> <p>服务器时间</p> </div> <div class="endpoint"> <span class="method get">GET</span> <a href="/form">/form</a> <p>表单测试</p> </div> </div> <h2>请求信息</h2> <div class="card"> <p><strong>客户端IP:</strong> """ + request.client_ip + """</p> <p><strong>请求方法:</strong> """ + request.method + """</p> <p><strong>请求路径:</strong> """ + request.path + """</p> <p><strong>HTTP版本:</strong> """ + request.version + """</p> <p><strong>保持连接:</strong> """ + str(request.is_keep_alive()) + """</p> </div> </body> </html> """ response = HTTPResponse( content=html.encode('utf-8'), content_type="text/html; charset=utf-8" ) # 设置缓存控制 response.set_header('Cache-Control', 'public, max-age=3600') return response def _handle_static_file(self, request: HTTPRequest, file_path: str = "") -> HTTPResponse: """处理静态文件""" # 安全性检查:防止路径遍历 if file_path: # 从路由参数获取文件路径 safe_path = Path(unquote(file_path)) else: # 从请求路径获取(兼容旧方式) safe_path = Path(unquote(request.path.lstrip('/'))) # 确保路径在web根目录内 full_path = (self.web_root / safe_path).resolve() if not str(full_path).startswith(str(self.web_root)): return HTTPResponse.make_error_response(403, "禁止访问") # 检查文件是否存在 if not full_path.exists(): return HTTPResponse.make_error_response(404, "文件未找到") # 如果是目录,寻找index.html if full_path.is_dir(): index_file = full_path / 'index.html' if index_file.exists(): full_path = index_file else: # 列出目录内容 return self._list_directory(full_path, safe_path) # 检查是否是文件 if not full_path.is_file(): return HTTPResponse.make_error_response(403, "不是文件") try: # 读取文件 with open(full_path, 'rb') as f: content = f.read() # 获取MIME类型 mime_type, encoding = mimetypes.guess_type(str(full_path)) if not mime_type: mime_type = 'application/octet-stream' # 构建响应 response = HTTPResponse( content=content, content_type=mime_type + (f'; charset={encoding}' if encoding else '') ) # 设置缓存和压缩 self._set_file_headers(response, full_path, request) return response except Exception as e: print(f"读取文件失败: {e}") return HTTPResponse.make_error_response(500, f"读取文件失败: {e}") def _set_file_headers(self, response: HTTPResponse, file_path: Path, request: HTTPRequest): """设置文件相关头部""" # ETag(实体标签)用于缓存验证 file_stat = file_path.stat() etag = hashlib.md5(f"{file_path}-{file_stat.st_mtime}".encode()).hexdigest() response.set_header('ETag', f'"{etag}"') # Last-Modified last_modified = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc) response.set_header('Last-Modified', last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT')) # 检查条件请求 if_none_match = request.get_header('If-None-Match') if_modified_since = request.get_header('If-Modified-Since') if if_none_match and f'"{etag}"' in if_none_match: # ETag匹配,返回304 Not Modified response.status_code = 304 response.content = b"" response.set_header('Content-Length', '0') return if if_modified_since: try: if_modified_date = parsedate(if_modified_since) if if_modified_date: if_modified_time = datetime(*if_modified_date[:6], tzinfo=timezone.utc) if last_modified <= if_modified_time: response.status_code = 304 response.content = b"" response.set_header('Content-Length', '0') return except: pass # 缓存控制 cache_control = [] if file_path.suffix.lower() in ['.html', '.htm', '.php']: cache_control.append('no-cache') else: cache_control.append('public') cache_control.append('max-age=86400') # 24小时 response.set_header('Cache-Control', ', '.join(cache_control)) # 内容协商:压缩 accept_encoding = request.get_header('Accept-Encoding', '') if 'gzip' in accept_encoding and len(response.content) > 1024: # 大于1KB才压缩 if response.compress('gzip'): response.set_header('Vary', 'Accept-Encoding') def _list_directory(self, dir_path: Path, url_path: Path) -> HTTPResponse: """列出目录内容""" try: items = [] for item in dir_path.iterdir(): is_dir = item.is_dir() name = item.name size = item.stat().st_size if not is_dir else '-' mtime = datetime.fromtimestamp(item.stat().st_mtime) items.append({ 'name': name, 'is_dir': is_dir, 'size': size, 'mtime': mtime.strftime('%Y-%m-%d %H:%M'), 'url': quote(str(url_path / name)) }) # 按目录优先排序 items.sort(key=lambda x: (not x['is_dir'], x['name'].lower())) # 生成HTML html_lines = [ '<!DOCTYPE html>', '<html><head><title>目录列表</title>', '<style>', 'body { font-family: Arial, sans-serif; margin: 20px; }', 'table { border-collapse: collapse; width: 100%; }', 'th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }', 'th { background-color: #f2f2f2; }', 'tr:hover { background-color: #f5f5f5; }', '.dir { color: #1976d2; }', '.size { text-align: right; }', '</style>', '</head><body>', f'<h1>目录: /{url_path}</h1>', '<table>', '<tr><th>名称</th><th>大小</th><th>修改时间</th></tr>' ] # 父目录链接 if url_path.parent != url_path: parent_url = quote(str(url_path.parent)) if str(url_path.parent) != '.' else '/' html_lines.append( f'<tr><td colspan="3"><a href="{parent_url}">../</a></td></tr>' ) for item in items: icon = '📁' if item['is_dir'] else '📄' size_display = f"{item['size']:,}" if isinstance(item['size'], int) else item['size'] html_lines.append( f'<tr>' f'<td>{icon} <a href="{item["url"]}" class="{"dir"if item["is_dir"] else""}">{item["name"]}</a></td>' f'<td class="size">{size_display}</td>' f'<td>{item["mtime"]}</td>' f'</tr>' ) html_lines.extend([ '</table>', '</body></html>' ]) html = '\n'.join(html_lines) response = HTTPResponse( content=html.encode('utf-8'), content_type="text/html; charset=utf-8" ) return response except Exception as e: return HTTPResponse.make_error_response(500, f"列出目录失败: {e}") def _handle_api_status(self, request: HTTPRequest) -> HTTPResponse: """处理API状态请求""" import psutil import platform status_info = { 'server': { 'name': 'PythonHTTPServer', 'version': '1.0', 'host': self.host, 'port': self.port, 'uptime': '运行中' }, 'system': { 'platform': platform.platform(), 'python_version': platform.python_version(), 'cpu_count': psutil.cpu_count(), 'memory_percent': psutil.virtual_memory().percent, 'disk_usage': psutil.disk_usage('/').percent }, 'connections': { 'total': len(self.connections), 'active': sum(1 for c in self.connections.values() if c['keep_alive']), 'requests_processed': sum(c['requests_processed'] for c in self.connections.values()) } } response = HTTPResponse( content=json.dumps(status_info, indent=2).encode('utf-8'), content_type="application/json" ) return response def _handle_api_echo(self, request: HTTPRequest) -> HTTPResponse: """处理回显请求""" echo_data = { 'method': request.method, 'path': request.path, 'query_params': request.query_params, 'headers': dict(request.headers), 'cookies': request.cookies, 'client': { 'ip': request.client_ip, 'port': request.client_port }, 'body': request.body.decode('utf-8', errors='ignore') if request.body else None, 'timestamp': datetime.now().isoformat() } response = HTTPResponse( content=json.dumps(echo_data, indent=2).encode('utf-8'), content_type="application/json" ) return response def _handle_api_time(self, request: HTTPRequest) -> HTTPResponse: """处理时间请求""" time_data = { 'iso': datetime.now().isoformat(), 'timestamp': time.time(), 'utc': datetime.utcnow().isoformat() + 'Z', 'server_timezone': str(datetime.now().astimezone().tzinfo) } response = HTTPResponse( content=json.dumps(time_data, indent=2).encode('utf-8'), content_type="application/json" ) # 设置缓存控制 response.set_header('Cache-Control', 'no-cache, must-revalidate') return response def _handle_form_get(self, request: HTTPRequest) -> HTTPResponse: """处理GET表单""" html = """ <!DOCTYPE html> <html> <head> <title>表单测试</title> <style> body { font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; } .form-group { margin-bottom: 15px; } label { display: block; margin-bottom: 5px; font-weight: bold; } input, textarea, select { width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px; } button { background: #1976d2; color: white; border: none; padding: 10px 20px; border-radius: 4px; cursor: pointer; } button:hover { background: #1565c0; } .result { background: #f5f5f5; padding: 15px; border-radius: 4px; margin-top: 20px; } </style> </head> <body> <h1>表单测试</h1> <form method="POST" action="/form" enctype="application/x-www-form-urlencoded"> <div class="form-group"> <label for="username">用户名:</label> <input type="text" id="username" name="username" required> </div> <div class="form-group"> <label for="email">邮箱:</label> <input type="email" id="email" name="email" required> </div> <div class="form-group"> <label for="message">留言:</label> <textarea id="message" name="message" rows="4"></textarea> </div> <div class="form-group"> <label for="country">国家:</label> <select id="country" name="country"> <option value="">请选择</option> <option value="CN">中国</option> <option value="US">美国</option> <option value="JP">日本</option> <option value="UK">英国</option> </select> </div> <div class="form-group"> <label>兴趣:</label> <div> <label><input type="checkbox" name="interests" value="sports"> 运动</label> <label><input type="checkbox" name="interests" value="music"> 音乐</label> <label><input type="checkbox" name="interests" value="reading"> 阅读</label> <label><input type="checkbox" name="interests" value="travel"> 旅行</label> </div> </div> <button type="submit">提交</button> </form> </body> </html> """ response = HTTPResponse( content=html.encode('utf-8'), content_type="text/html; charset=utf-8" ) return response def _handle_form_post(self, request: HTTPRequest) -> HTTPResponse: """处理POST表单""" try: # 解析表单数据 from urllib.parse import parse_qs body_text = request.body.decode('utf-8', errors='ignore') form_data = parse_qs(body_text) # 准备结果 result_html = '<h2>提交成功</h2><div class="result"><h3>提交的数据:</h3><ul>' for key, values in form_data.items(): for value in values: result_html += f'<li><strong>{key}:</strong> {value}</li>' result_html += '</ul></div>' html = f""" <!DOCTYPE html> <html> <head> <title>表单提交结果</title> <style> body {{ font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; }} .result {{ background: #f5f5f5; padding: 15px; border-radius: 4px; margin: 20px 0; }} .back-link {{ display: inline-block; margin-top: 20px; padding: 10px 20px; background: #1976d2; color: white; text-decoration: none; border-radius: 4px; }} .back-link:hover {{ background: #1565c0; }} </style> </head> <body> <h1>表单提交结果</h1> {result_html} <a href="/form" class="back-link">返回表单</a> </body> </html> """ response = HTTPResponse( content=html.encode('utf-8'), content_type="text/html; charset=utf-8" ) # 设置cookie示例 response.add_cookie( 'form_submission', 'success', max_age=3600, path='/', http_only=True, same_site='Lax' ) return response except Exception as e: return HTTPResponse.make_error_response(400, f"表单处理失败: {e}") def stop(self): """停止服务器""" self.is_running = False if self.server_socket: try: self.server_socket.close() except: pass self.executor.shutdown(wait=True) print("HTTP服务器已停止")
2.2 中间件系统
"""HTTP中间件系统"""import timefrom typing import Callable, Optionalimport refrom functools import wrapsclass HTTPMiddleware: """HTTP中间件基类""" def __init__(self): self.priority = 0 # 优先级,越小越先执行 def process_request(self, request): """处理请求""" return None def process_response(self, request, response): """处理响应""" return responseclass LoggingMiddleware(HTTPMiddleware): """日志中间件""" def __init__(self, log_file: str = None): super().__init__() self.log_file = log_file self.priority = 100 # 较高优先级 def process_request(self, request): """记录请求日志""" log_entry = { 'timestamp': time.time(), 'client_ip': request.client_ip, 'method': request.method, 'path': request.path, 'user_agent': request.get_header('User-Agent', 'Unknown'), 'content_length': len(request.body) } # 控制台输出 print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] " f"{request.client_ip} - {request.method}{request.path}") # 文件日志 if self.log_file: import json with open(self.log_file, 'a') as f: f.write(json.dumps(log_entry) + '\n') return None def process_response(self, request, response): """记录响应日志""" log_entry = { 'timestamp': time.time(), 'client_ip': request.client_ip, 'method': request.method, 'path': request.path, 'status_code': response.status_code, 'response_size': len(response.content) } print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] " f"{request.client_ip} - {request.method}{request.path} - " f"{response.status_code} ({len(response.content)} bytes)") return responseclass RateLimitMiddleware(HTTPMiddleware): """速率限制中间件""" def __init__(self, requests_per_minute: int = 60): super().__init__() self.requests_per_minute = requests_per_minute self.request_counts = {} # IP -> [请求时间戳列表] self.priority = 50 # 较高优先级 def process_request(self, request): """检查速率限制""" client_ip = request.client_ip current_time = time.time() # 清理旧记录 one_minute_ago = current_time - 60 if client_ip in self.request_counts: self.request_counts[client_ip] = [ t for t in self.request_counts[client_ip] if t > one_minute_ago ] else: self.request_counts[client_ip] = [] # 检查是否超过限制 if len(self.request_counts[client_ip]) >= self.requests_per_minute: # 创建429响应 from .http_response import HTTPResponse response = HTTPResponse.make_error_response( 429, "请求过于频繁,请稍后再试" ) response.set_header('Retry-After', '60') return response # 记录请求 self.request_counts[client_ip].append(current_time) return None def process_response(self, request, response): """在响应中添加速率限制头部""" client_ip = request.client_ip if client_ip in self.request_counts: remaining = self.requests_per_minute - len(self.request_counts[client_ip]) response.set_header('X-RateLimit-Limit', str(self.requests_per_minute)) response.set_header('X-RateLimit-Remaining', str(max(0, remaining))) return responseclass AuthenticationMiddleware(HTTPMiddleware): """认证中间件""" def __init__(self, api_keys: set = None): super().__init__() self.api_keys = api_keys or set() self.public_paths = {'/', '/api/status', '/api/time', '/form'} self.priority = 30 # 高优先级 def process_request(self, request): """检查认证""" # 公开路径不需要认证 if request.path in self.public_paths: return None # 检查API密钥 api_key = request.get_header('X-API-Key') if api_key and api_key in self.api_keys: return None # 检查Basic认证 auth = request.get_basic_auth() if auth: username, password = auth # 这里可以添加用户名密码验证逻辑 if username == 'admin' and password == 'secret': return None # 认证失败 from .http_response import HTTPResponse response = HTTPResponse.make_error_response(401, "需要认证") response.set_header('WWW-Authenticate', 'Basic realm="API Access"') return response def process_response(self, request, response): """在响应中添加认证头部""" if response.status_code == 401: response.set_header('WWW-Authenticate', 'Basic realm="API Access"') return responseclass CORSMiddleware(HTTPMiddleware): """CORS中间件""" def __init__(self, allowed_origins: list = None, allowed_methods: list = None): super().__init__() self.allowed_origins = allowed_origins or ['*'] self.allowed_methods = allowed_methods or ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'] self.priority = 90 # 较低优先级 def process_request(self, request): """处理预检请求""" if request.method == 'OPTIONS': from .http_response import HTTPResponse response = HTTPResponse(status_code=204) self._add_cors_headers(request, response) return response return None def process_response(self, request, response): """添加CORS头部""" self._add_cors_headers(request, response) return response def _add_cors_headers(self, request, response): """添加CORS头部""" origin = request.get_header('Origin') if origin and (origin in self.allowed_origins or '*' in self.allowed_origins): response.set_header('Access-Control-Allow-Origin', origin) response.set_header('Access-Control-Allow-Methods', ', '.join(self.allowed_methods)) response.set_header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-API-Key') response.set_header('Access-Control-Allow-Credentials', 'true') response.set_header('Access-Control-Max-Age', '86400') # 24小时class SecurityMiddleware(HTTPMiddleware): """安全中间件""" def __init__(self): super().__init__() self.priority = 10 # 最高优先级 def process_request(self, request): """安全检查""" # 检查路径遍历攻击 if self._detect_path_traversal(request.path): from .http_response import HTTPResponse return HTTPResponse.make_error_response(400, "无效的请求路径") # 检查SQL注入(简单模式匹配) if self._detect_sql_injection(request.path + str(request.query_params)): from .http_response import HTTPResponse return HTTPResponse.make_error_response(400, "请求包含可疑内容") # 检查请求体大小 content_length = request.get_header('Content-Length') if content_length and int(content_length) > 10 * 1024 * 1024: # 10MB限制 from .http_response import HTTPResponse return HTTPResponse.make_error_response(413, "请求体过大") return None def process_response(self, request, response): """添加安全头部""" # 添加安全相关HTTP头部 response.set_header('X-Content-Type-Options', 'nosniff') response.set_header('X-Frame-Options', 'DENY') response.set_header('X-XSS-Protection', '1; mode=block') response.set_header('Referrer-Policy', 'strict-origin-when-cross-origin') response.set_header('Strict-Transport-Security', 'max-age=31536000; includeSubDomains') # 内容安全策略 csp = [ "default-src 'self'", "script-src 'self' 'unsafe-inline'", "style-src 'self' 'unsafe-inline'", "img-src 'self' data: https:", "font-src 'self'", "connect-src 'self'", "frame-ancestors 'none'", "form-action 'self'" ] response.set_header('Content-Security-Policy', '; '.join(csp)) return response def _detect_path_traversal(self, path: str) -> bool: """检测路径遍历攻击""" # 检查是否包含父目录引用 if '..' in path or '../' in path or '/..' in path: return True # 检查绝对路径 if path.startswith('/') and len(path) > 1 and path[1] == '/': return True # 检查空字节注入(已弃用,但保持检查) if '\x00' in path: return True return False def _detect_sql_injection(self, text: str) -> bool: """检测SQL注入尝试(简单模式)""" patterns = [ r"'\s+OR\s+'['1']=[('1']", r"'\s+UNION\s+SELECT", r"';.*--", r"'\s+AND\s+\d+=\d+", r"EXEC(\s+|\().*", ] for pattern in patterns: if re.search(pattern, text, re.IGNORECASE): return True return Falsedef middleware_decorator(middleware_class): """中间件装饰器(用于路由处理器)""" def decorator(handler): @wraps(handler) def wrapped(request, *args, **kwargs): # 创建中间件实例 middleware = middleware_class() # 处理请求 middleware_result = middleware.process_request(request) if middleware_result is not None: return middleware_result # 执行原始处理器 response = handler(request, *args, **kwargs) # 处理响应 response = middleware.process_response(request, response) return response return wrapped return decorator# 使用示例def create_server_with_middleware(): """创建带中间件的服务器""" server = HTTPServer(host='localhost', port=8080) # 添加中间件 server.add_middleware(LoggingMiddleware('access.log')) server.add_middleware(SecurityMiddleware()) server.add_middleware(CORSMiddleware(allowed_origins=['http://localhost:3000'])) server.add_middleware(RateLimitMiddleware(requests_per_minute=30)) # 添加带认证的API密钥 api_keys = {'secret-key-123', 'another-key-456'} server.add_middleware(AuthenticationMiddleware(api_keys=api_keys)) # 自定义路由使用中间件装饰器 @middleware_decorator(RateLimitMiddleware(requests_per_minute=10)) def handle_sensitive(request): """敏感端点,需要更严格的速率限制""" return HTTPResponse(content=b"Sensitive data", content_type="text/plain") server.add_route('GET', '/sensitive', handle_sensitive) return server
3. HTTP客户端实现
3.1 完整的HTTP/1.1客户端
"""完整的HTTP/1.1客户端实现"""import socketimport sslimport urllib.parsefrom typing import Optional, Dict, Tuple, Listfrom dataclasses import dataclassfrom datetime import datetimeimport json@dataclassclass HTTPClientConfig: """HTTP客户端配置""" timeout: float = 30.0 max_redirects: int = 5 verify_ssl: bool = True user_agent: str = "PythonHTTPClient/1.0" accept_encoding: str = "gzip, deflate" default_headers: Dict = None proxy: Optional[str] = None enable_cookies: bool = True enable_compression: bool = True def __post_init__(self): if self.default_headers is None: self.default_headers = { 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9', 'Cache-Control': 'no-cache', 'Connection': 'close', # 默认关闭连接 }class HTTPResponse: """HTTP响应(客户端版)""" def __init__(self, raw_response: bytes, url: str): self.url = url self.raw = raw_response self.status_code = 0 self.reason = "" self.headers = {} self.cookies = {} self.body = b"" self._parse() def _parse(self): """解析HTTP响应""" try: # 分割头部和正文 parts = self.raw.split(b'\r\n\r\n', 1) headers_part = parts[0].decode('utf-8', errors='ignore') if len(parts) > 1: self.body = parts[1] # 解析状态行 lines = headers_part.split('\r\n') if not lines: raise ValueError("空响应") # 状态行 status_line = lines[0] version, status_code, reason = status_line.split(' ', 2) self.status_code = int(status_code) self.reason = reason # 解析头部 for line in lines[1:]: if ': ' in line: key, value = line.split(': ', 1) self.headers[key] = value # 解析cookie self._parse_cookies() except Exception as e: raise ValueError(f"解析HTTP响应失败: {e}") def _parse_cookies(self): """解析Set-Cookie头部""" set_cookie = self.headers.get('Set-Cookie') if set_cookie: # 可能有多个Set-Cookie头部 if isinstance(set_cookie, list): cookie_strings = set_cookie else: cookie_strings = [set_cookie] for cookie_str in cookie_strings: # 简单解析,实际需要更复杂的解析 cookie_parts = cookie_str.split(';')[0].strip() if '=' in cookie_parts: key, value = cookie_parts.split('=', 1) self.cookies[key] = value def json(self) -> Optional[Dict]: """解析JSON响应""" try: return json.loads(self.body.decode('utf-8')) except: return None def text(self, encoding: str = 'utf-8') -> str: """获取文本响应""" return self.body.decode(encoding, errors='ignore') def is_redirect(self) -> bool: """是否是重定向响应""" return self.status_code in [301, 302, 303, 307, 308] def get_redirect_location(self) -> Optional[str]: """获取重定向位置""" return self.headers.get('Location') def __str__(self) -> str: """字符串表示""" return f"{self.status_code}{self.reason} ({len(self.body)} bytes)"class HTTPConnection: """HTTP连接管理""" def __init__(self, host: str, port: int, ssl_context: bool = False): self.host = host self.port = port self.ssl_context = ssl_context self.socket = None self.is_connected = False self.last_used = None def connect(self, timeout: float = 30.0) -> bool: """建立连接""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(timeout) self.socket.connect((self.host, self.port)) if self.ssl_context: context = ssl.create_default_context() self.socket = context.wrap_socket( self.socket, server_hostname=self.host ) self.is_connected = True self.last_used = datetime.now() return True except Exception as e: print(f"连接失败 {self.host}:{self.port}: {e}") return False def send_request(self, request: bytes) -> bytes: """发送请求并接收响应""" if not self.is_connected: raise ConnectionError("未连接") try: # 发送请求 self.socket.sendall(request) # 接收响应 response = b"" while True: try: chunk = self.socket.recv(4096) if not chunk: break response += chunk # 检查是否接收完整(简化实现) if b'\r\n\r\n' in response: # 尝试根据Content-Length判断是否接收完整 header_end = response.find(b'\r\n\r\n') headers_part = response[:header_end].decode('utf-8', errors='ignore') # 查找Content-Length import re match = re.search(r'Content-Length:\s*(\d+)', headers_part, re.IGNORECASE) if match: content_length = int(match.group(1)) body_start = header_end + 4 body_received = len(response) - body_start if body_received >= content_length: break # 对于分块传输编码,需要特殊处理 if 'Transfer-Encoding: chunked' in headers_part: # 简化处理:检查是否以0\r\n\r\n结束 if response.endswith(b'0\r\n\r\n'): break except socket.timeout: break except Exception as e: print(f"接收响应失败: {e}") break self.last_used = datetime.now() return response except Exception as e: self.close() raise ConnectionError(f"发送/接收失败: {e}") def close(self): """关闭连接""" if self.socket: try: self.socket.close() except: pass self.socket = None self.is_connected = Falseclass HTTPClient: """完整的HTTP/1.1客户端""" def __init__(self, config: HTTPClientConfig = None): self.config = config or HTTPClientConfig() self.connections = {} # (host, port, ssl) -> HTTPConnection self.cookie_jar = {} # domain -> {name: value} self.session_headers = {} def request(self, method: str, url: str, headers: Dict = None, data: bytes = None, params: Dict = None, json_data: Dict = None, allow_redirects: bool = True) -> HTTPResponse: """发送HTTP请求""" # 解析URL parsed = urllib.parse.urlparse(url) scheme = parsed.scheme.lower() hostname = parsed.hostname port = parsed.port or (443 if scheme == 'https' else 80) path = parsed.path or '/' query = parsed.query # 添加查询参数 if params: query_params = urllib.parse.parse_qs(query) query_params.update(params) query = urllib.parse.urlencode(query_params, doseq=True) full_path = path if query: full_path += f'?{query}' # 准备请求数据 if json_data is not None: if data is not None: raise ValueError("不能同时提供data和json_data") data = json.dumps(json_data).encode('utf-8') if headers is None: headers = {} headers['Content-Type'] = 'application/json' # 构建请求 request_lines = self._build_request( method, hostname, port, full_path, headers, data ) # 发送请求(支持重定向) response = self._send_request_with_redirects( scheme, hostname, port, request_lines, allow_redirects, 0 ) return response def _build_request(self, method: str, hostname: str, port: int, path: str, headers: Dict, data: bytes) -> bytes: """构建HTTP请求""" # 默认头部 request_headers = self.config.default_headers.copy() request_headers.update(self.session_headers) # 自定义头部 if headers: request_headers.update(headers) # 必需头部 request_headers['Host'] = f'{hostname}:{port}' if port not in [80, 443] else hostname request_headers['User-Agent'] = self.config.user_agent # 添加cookie if self.config.enable_cookies: cookie_header = self._get_cookie_header(hostname, path) if cookie_header: request_headers['Cookie'] = cookie_header # 添加Accept-Encoding if self.config.enable_compression: request_headers['Accept-Encoding'] = self.config.accept_encoding # 如果有请求体,添加Content-Length if data: request_headers['Content-Length'] = str(len(data)) # 构建请求 request_line = f"{method.upper()}{path} HTTP/1.1\r\n" headers_text = '' for key, value in request_headers.items(): headers_text += f"{key}: {value}\r\n" request_text = request_line + headers_text + '\r\n' request_bytes = request_text.encode('utf-8') if data: request_bytes += data return request_bytes def _get_cookie_header(self, domain: str, path: str) -> str: """获取cookie头部""" cookies = [] # 检查该域名的cookie for cookie_domain, domain_cookies in self.cookie_jar.items(): if domain.endswith(cookie_domain) or cookie_domain == domain: for name, cookie_info in domain_cookies.items(): # 检查路径匹配 if 'path' in cookie_info: cookie_path = cookie_info['path'] if not path.startswith(cookie_path): continue # 检查过期时间 if 'expires' in cookie_info: expires = cookie_info['expires'] if isinstance(expires, (int, float)): if time.time() > expires: continue cookies.append(f"{name}={cookie_info['value']}") return '; '.join(cookies) if cookies else '' def _update_cookies(self, domain: str, response: HTTPResponse): """更新cookie jar""" set_cookie = response.headers.get('Set-Cookie') if not set_cookie: return # 可能有多个Set-Cookie头部 if isinstance(set_cookie, list): cookie_strings = set_cookie else: cookie_strings = [set_cookie] for cookie_str in cookie_strings: self._parse_and_store_cookie(domain, cookie_str) def _parse_and_store_cookie(self, domain: str, cookie_str: str): """解析并存储cookie""" parts = [part.strip() for part in cookie_str.split(';')] if not parts: return # 第一个部分是name=value name_value = parts[0] if '=' not in name_value: return name, value = name_value.split('=', 1) # 解析属性 cookie_info = {'value': value} for part in parts[1:]: if '=' in part: attr_name, attr_value = part.split('=', 1) attr_name = attr_name.lower() if attr_name == 'expires': # 解析过期时间 try: from email.utils import parsedate expires_date = parsedate(attr_value) if expires_date: import time expires_time = time.mktime(expires_date) cookie_info['expires'] = expires_time except: pass elif attr_name == 'max-age': try: max_age = int(attr_value) import time cookie_info['expires'] = time.time() + max_age except: pass elif attr_name in ['domain', 'path', 'samesite']: cookie_info[attr_name] = attr_value elif attr_name in ['secure', 'httponly']: cookie_info[attr_name] = True # 确定存储的域名 cookie_domain = cookie_info.get('domain', domain) if cookie_domain.startswith('.'): cookie_domain = cookie_domain[1:] # 存储cookie if cookie_domain not in self.cookie_jar: self.cookie_jar[cookie_domain] = {} self.cookie_jar[cookie_domain][name] = cookie_info def _send_request_with_redirects(self, scheme: str, hostname: str, port: int, request_bytes: bytes, allow_redirects: bool, redirect_count: int) -> HTTPResponse: """发送请求,处理重定向""" if redirect_count >= self.config.max_redirects: raise RuntimeError(f"超过最大重定向次数: {self.config.max_redirects}") # 获取或创建连接 ssl_context = (scheme == 'https') connection_key = (hostname, port, ssl_context) if connection_key not in self.connections: connection = HTTPConnection(hostname, port, ssl_context) if not connection.connect(self.config.timeout): raise ConnectionError(f"无法连接到 {hostname}:{port}") self.connections[connection_key] = connection else: connection = self.connections[connection_key] # 发送请求 try: response_bytes = connection.send_request(request_bytes) except ConnectionError: # 连接失败,重新连接 connection.close() del self.connections[connection_key] connection = HTTPConnection(hostname, port, ssl_context) if not connection.connect(self.config.timeout): raise self.connections[connection_key] = connection response_bytes = connection.send_request(request_bytes) # 解析响应 response_url = f"{scheme}://{hostname}:{port}" response = HTTPResponse(response_bytes, response_url) # 更新cookie if self.config.enable_cookies: self._update_cookies(hostname, response) # 处理重定向 if allow_redirects and response.is_redirect(): location = response.get_redirect_location() if location: # 绝对URL或相对URL if location.startswith(('http://', 'https://')): redirect_url = location else: # 构建绝对URL redirect_url = urllib.parse.urljoin(response_url, location) # 递归处理重定向 return self.request( 'GET' if response.status_code in [301, 302, 303] else method, redirect_url, allow_redirects=True, headers={'Referer': response_url} ) return response def get(self, url: str, **kwargs) -> HTTPResponse: """发送GET请求""" return self.request('GET', url, **kwargs) def post(self, url: str, data: bytes = None, **kwargs) -> HTTPResponse: """发送POST请求""" return self.request('POST', url, data=data, **kwargs) def put(self, url: str, data: bytes = None, **kwargs) -> HTTPResponse: """发送PUT请求""" return self.request('PUT', url, data=data, **kwargs) def delete(self, url: str, **kwargs) -> HTTPResponse: """发送DELETE请求""" return self.request('DELETE', url, **kwargs) def head(self, url: str, **kwargs) -> HTTPResponse: """发送HEAD请求""" return self.request('HEAD', url, **kwargs) def options(self, url: str, **kwargs) -> HTTPResponse: """发送OPTIONS请求""" return self.request('OPTIONS', url, **kwargs) def set_session_header(self, key: str, value: str): """设置会话头部""" self.session_headers[key] = value def clear_session_headers(self): """清除会话头部""" self.session_headers.clear() def clear_cookies(self): """清除所有cookie""" self.cookie_jar.clear() def close(self): """关闭所有连接""" for connection in self.connections.values(): connection.close() self.connections.clear()# 高级功能:连接池class HTTPConnectionPool: """HTTP连接池""" def __init__(self, max_size: int = 10, idle_timeout: float = 30.0): self.max_size = max_size self.idle_timeout = idle_timeout self.pool = {} # (host, port, ssl) -> [connections] self.lock = threading.RLock() def get_connection(self, host: str, port: int, ssl: bool) -> Optional[HTTPConnection]: """从池中获取连接""" key = (host, port, ssl) with self.lock: if key in self.pool and self.pool[key]: # 清理空闲连接 self._clean_idle_connections(key) if self.pool[key]: connection = self.pool[key].pop(0) # 检查连接是否仍然有效 if connection.is_connected: return connection else: # 连接已失效,关闭并创建新的 connection.close() return None def return_connection(self, connection: HTTPConnection): """归还连接到池中""" if not connection.is_connected: connection.close() return key = (connection.host, connection.port, connection.ssl_context) with self.lock: if key not in self.pool: self.pool[key] = [] # 检查池是否已满 if len(self.pool[key]) < self.max_size: self.pool[key].append(connection) else: connection.close() def _clean_idle_connections(self, key): """清理空闲连接""" current_time = datetime.now() if key in self.pool: valid_connections = [] for connection in self.pool[key]: idle_time = (current_time - connection.last_used).total_seconds() if idle_time < self.idle_timeout and connection.is_connected: valid_connections.append(connection) else: connection.close() self.pool[key] = valid_connections def close_all(self): """关闭所有连接""" with self.lock: for connections in self.pool.values(): for connection in connections: connection.close() self.pool.clear()# 使用示例def test_http_client(): """测试HTTP客户端""" import time # 创建客户端 config = HTTPClientConfig( timeout=10.0, max_redirects=3, user_agent="MyHTTPClient/1.0", enable_cookies=True, enable_compression=True ) client = HTTPClient(config) try: # 测试GET请求 print("=== 测试GET请求 ===") response = client.get('https://httpbin.org/get') print(f"状态码: {response.status_code}") print(f"响应大小: {len(response.body)} 字节") print(f"头部: {list(response.headers.keys())[:5]}...") # 测试POST请求 print("\n=== 测试POST请求 ===") post_data = {'key1': 'value1', 'key2': 'value2'} response = client.post('https://httpbin.org/post', json_data=post_data) if response.status_code == 200: data = response.json() print(f"POST成功,返回数据键: {list(data.keys())}") # 测试带参数的GET请求 print("\n=== 测试带参数的GET请求 ===") params = {'param1': 'value1', 'param2': 'value2'} response = client.get('https://httpbin.org/get', params=params) # 测试cookie print("\n=== 测试cookie ===") response = client.get('https://httpbin.org/cookies/set/test/123') print(f"设置cookie响应: {response.status_code}") # 测试重定向 print("\n=== 测试重定向 ===") response = client.get('https://httpbin.org/redirect/2', allow_redirects=True) print(f"重定向最终状态码: {response.status_code}") # 测试会话头部 print("\n=== 测试会话头部 ===") client.set_session_header('X-Custom-Header', 'my-value') response = client.get('https://httpbin.org/headers') if response.status_code == 200: data = response.json() print(f"自定义头部已发送: {data.get('headers', {}).get('X-Custom-Header')}") # 性能测试:连续请求 print("\n=== 性能测试:连续请求 ===") start_time = time.time() for i in range(5): response = client.get(f'https://httpbin.org/delay/{i % 3}') print(f"请求 {i+1}: {response.status_code} ({len(response.body)} bytes)") elapsed = time.time() - start_time print(f"总时间: {elapsed:.2f}秒") except Exception as e: print(f"测试失败: {e}") finally: client.close() return client
4. HTTP/2实验性支持
"""HTTP/2实验性支持"""import sslimport socketimport structfrom typing import Optional, Dictclass HTTP2Connection: """HTTP/2连接(实验性)""" # HTTP/2连接前言 CONNECTION_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" # 帧类型 FRAME_TYPES = { 0x0: "DATA", 0x1: "HEADERS", 0x2: "PRIORITY", 0x3: "RST_STREAM", 0x4: "SETTINGS", 0x5: "PUSH_PROMISE", 0x6: "PING", 0x7: "GOAWAY", 0x8: "WINDOW_UPDATE", 0x9: "CONTINUATION" } def __init__(self, host: str, port: int = 443): self.host = host self.port = port self.socket = None self.settings = {} self.next_stream_id = 1 self.is_connected = False def connect(self) -> bool: """建立HTTP/2连接""" try: # 创建TCP连接 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.host, self.port)) # 创建SSL上下文,启用ALPN context = ssl.create_default_context() context.set_alpn_protocols(['h2', 'http/1.1']) # 包装socket ssl_sock = context.wrap_socket(sock, server_hostname=self.host) # 检查协商的协议 selected_protocol = ssl_sock.selected_alpn_protocol() if selected_protocol != 'h2': print(f"服务器不支持HTTP/2,使用: {selected_protocol}") ssl_sock.close() return False self.socket = ssl_sock self.is_connected = True # 发送连接前言 self.socket.sendall(self.CONNECTION_PREFACE) # 发送SETTINGS帧 self.send_settings() # 接收服务器SETTINGS self.receive_settings() return True except Exception as e: print(f"HTTP/2连接失败: {e}") return False def send_settings(self, settings: Dict = None): """发送SETTINGS帧""" if settings is None: settings = { 0x1: 4096, # SETTINGS_HEADER_TABLE_SIZE 0x2: 1, # SETTINGS_ENABLE_PUSH 0x3: 100, # SETTINGS_MAX_CONCURRENT_STREAMS 0x4: 65535, # SETTINGS_INITIAL_WINDOW_SIZE 0x5: 16384, # SETTINGS_MAX_FRAME_SIZE 0x6: 256, # SETTINGS_MAX_HEADER_LIST_SIZE } # 构建SETTINGS帧 frame_data = b'' for identifier, value in settings.items(): frame_data += struct.pack('!HI', identifier, value) frame = self.build_frame(0x4, 0x0, 0, frame_data) self.socket.sendall(frame) def receive_settings(self): """接收SETTINGS帧""" try: # 读取帧头 header = self.socket.recv(9) if len(header) < 9: return # 解析帧头 length = struct.unpack('!I', b'\x00' + header[:3])[0] frame_type = header[3] flags = header[4] stream_id = struct.unpack('!I', b'\x00' + header[5:])[0] # 读取帧体 body = self.socket.recv(length) if frame_type == 0x4: # SETTINGS帧 self.parse_settings(body) # 发送SETTINGS ACK if flags & 0x1 == 0: # 如果没有ACK标志 ack_frame = self.build_frame(0x4, 0x1, 0, b'') self.socket.sendall(ack_frame) except Exception as e: print(f"接收SETTINGS失败: {e}") def parse_settings(self, data: bytes): """解析SETTINGS帧数据""" for i in range(0, len(data), 6): if i + 6 <= len(data): identifier, value = struct.unpack('!HI', data[i:i+6]) self.settings[identifier] = value def build_frame(self, frame_type: int, flags: int, stream_id: int, data: bytes) -> bytes: """构建HTTP/2帧""" length = len(data) frame_header = struct.pack('!I', length)[1:] # 3字节长度 frame_header += bytes([frame_type, flags]) frame_header += struct.pack('!I', stream_id)[1:] # 31位流ID return frame_header + data def send_request(self, method: str, path: str, headers: Dict = None, body: bytes = None) -> int: """发送HTTP/2请求,返回流ID""" if not self.is_connected: raise ConnectionError("未连接") stream_id = self.next_stream_id self.next_stream_id += 2 # 客户端使用奇数流ID # 构建HEADERS帧 header_data = self.build_headers_frame_data(method, path, headers) headers_frame = self.build_frame(0x1, 0x5, stream_id, header_data) # END_STREAM | END_HEADERS self.socket.sendall(headers_frame) # 如果有请求体,发送DATA帧 if body: data_frame = self.build_frame(0x0, 0x1, stream_id, body) # END_STREAM self.socket.sendall(data_frame) return stream_id def build_headers_frame_data(self, method: str, path: str, headers: Dict) -> bytes: """构建HEADERS帧数据(简化版,实际需要HPACK压缩)""" # 注意:实际实现需要使用HPACK压缩头部 header_lines = [ (':method', method), (':path', path), (':scheme', 'https'), (':authority', self.host), ] if headers: for key, value in headers.items(): if not key.startswith(':'): # 伪头部字段以:开头 header_lines.append((key.lower(), value)) # 简单编码(实际需要HPACK) encoded = b'' for key, value in header_lines: encoded += f"{key}: {value}\n".encode('utf-8') return encoded def receive_response(self, stream_id: int) -> Optional[Dict]: """接收响应(简化实现)""" try: response = { 'headers': {}, 'body': b'', 'status': 0 } while True: # 读取帧头 header = self.socket.recv(9) if len(header) < 9: break length = struct.unpack('!I', b'\x00' + header[:3])[0] frame_type = header[3] flags = header[4] recv_stream_id = struct.unpack('!I', b'\x00' + header[5:])[0] # 只处理目标流的帧 if recv_stream_id != stream_id: # 跳过帧体 if length > 0: self.socket.recv(length) continue # 读取帧体 body = self.socket.recv(length) if frame_type == 0x1: # HEADERS帧 self.parse_response_headers(body, response) elif frame_type == 0x0: # DATA帧 response['body'] += body if flags & 0x1: # END_STREAM break # 其他帧类型... return response except Exception as e: print(f"接收响应失败: {e}") return None def parse_response_headers(self, data: bytes, response: Dict): """解析响应头部(简化版)""" # 简单解析(实际需要HPACK解码) headers_text = data.decode('utf-8', errors='ignore') lines = headers_text.split('\n') for line in lines: if ': ' in line: key, value = line.split(': ', 1) if key == ':status': response['status'] = int(value) else: response['headers'][key] = value def close(self): """关闭连接""" if self.socket: try: # 发送GOAWAY帧 goaway_frame = self.build_frame(0x7, 0x0, 0, struct.pack('!II', 0, 0)) self.socket.sendall(goaway_frame) self.socket.close() except: pass self.is_connected = False# HTTP/2客户端包装class HTTP2Client: """HTTP/2客户端(实验性)""" def __init__(self): self.connections = {} def request(self, method: str, url: str, **kwargs) -> Optional[Dict]: """发送HTTP/2请求""" from urllib.parse import urlparse parsed = urlparse(url) if parsed.scheme != 'https': print("HTTP/2只支持HTTPS") return None host = parsed.hostname port = parsed.port or 443 path = parsed.path or '/' # 获取或创建连接 key = (host, port) if key not in self.connections: connection = HTTP2Connection(host, port) if not connection.connect(): return None self.connections[key] = connection connection = self.connections[key] # 发送请求 headers = kwargs.get('headers', {}) body = kwargs.get('body') if body and isinstance(body, str): body = body.encode('utf-8') stream_id = connection.send_request(method, path, headers, body) # 接收响应 response = connection.receive_response(stream_id) return response def close(self): """关闭所有连接""" for connection in self.connections.values(): connection.close() self.connections.clear()def test_http2(): """测试HTTP/2(需要支持HTTP/2的服务器)""" client = HTTP2Client() try: # 注意:需要真正支持HTTP/2的服务器 # 例如:https://http2.golang.org/ print("测试HTTP/2连接(实验性)") # 这个URL可能需要替换为实际支持HTTP/2的测试服务器 test_url = "https://httpbin.org/" # 注意:httpbin.org可能不支持HTTP/2 response = client.request('GET', test_url) if response: print(f"状态码: {response.get('status')}") print(f"头部: {list(response.get('headers', {}).keys())[:5]}") print(f"响应体大小: {len(response.get('body', b''))} 字节") else: print("HTTP/2请求失败,可能服务器不支持") except Exception as e: print(f"HTTP/2测试失败: {e}") finally: client.close()
5. HTTP协议测试工具
"""HTTP协议测试工具"""import socketimport sslimport jsonfrom typing import Dict, List, Optionalclass HTTPTester: """HTTP协议测试工具""" def __init__(self): self.results = [] def test_http_compliance(self, url: str) -> Dict: """测试HTTP协议合规性""" from urllib.parse import urlparse parsed = urlparse(url) hostname = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) tests = [] # 测试1: 基本连接 tests.append(self._test_connection(hostname, port, parsed.scheme == 'https')) # 测试2: HTTP方法支持 tests.append(self._test_methods(hostname, port, parsed.scheme == 'https', parsed.path)) # 测试3: 头部支持 tests.append(self._test_headers(hostname, port, parsed.scheme == 'https', parsed.path)) # 测试4: 状态码处理 tests.append(self._test_status_codes(hostname, port, parsed.scheme == 'https', parsed.path)) # 测试5: 重定向处理 tests.append(self._test_redirects(hostname, port, parsed.scheme == 'https')) # 汇总结果 total_tests = sum(len(t['tests']) for t in tests) passed_tests = sum(sum(1 for st in t['tests'] if st['passed']) for t in tests) result = { 'url': url, 'hostname': hostname, 'port': port, 'scheme': parsed.scheme, 'total_tests': total_tests, 'passed_tests': passed_tests, 'pass_rate': (passed_tests / total_tests * 100) if total_tests > 0 else 0, 'test_categories': tests, 'recommendations': self._generate_recommendations(tests) } self.results.append(result) return result def _test_connection(self, hostname: str, port: int, use_ssl: bool) -> Dict: """测试连接""" tests = [] # 测试TCP连接 try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((hostname, port)) sock.close() tests.append({'name': 'TCP连接', 'passed': True, 'details': 'TCP连接成功'}) except Exception as e: tests.append({'name': 'TCP连接', 'passed': False, 'details': f'TCP连接失败: {e}'}) # 测试SSL/TLS(如果使用HTTPS) if use_ssl: try: context = ssl.create_default_context() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) ssl_sock = context.wrap_socket(sock, server_hostname=hostname) ssl_sock.connect((hostname, port)) # 检查证书 cert = ssl_sock.getpeercert() if cert: tests.append({'name': 'SSL证书', 'passed': True, 'details': 'SSL证书有效'}) else: tests.append({'name': 'SSL证书', 'passed': False, 'details': '无SSL证书'}) # 检查TLS版本 version = ssl_sock.version() if version in ['TLSv1.2', 'TLSv1.3']: tests.append({'name': 'TLS版本', 'passed': True, 'details': f'TLS版本: {version}'}) else: tests.append({'name': 'TLS版本', 'passed': False, 'details': f'不安全的TLS版本: {version}'}) ssl_sock.close() except Exception as e: tests.append({'name': 'SSL/TLS连接', 'passed': False, 'details': f'SSL/TLS连接失败: {e}'}) return {'category': '连接测试', 'tests': tests} def _test_methods(self, hostname: str, port: int, use_ssl: bool, path: str) -> Dict: """测试HTTP方法支持""" methods = ['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS'] tests = [] for method in methods: try: response = self._send_raw_request(hostname, port, use_ssl, method, path) if response: status_line = response.split('\r\n')[0] status_code = int(status_line.split(' ')[1]) if status_code != 405: # 405 Method Not Allowed tests.append({ 'name': f'{method}方法', 'passed': True, 'details': f'状态码: {status_code}' }) else: tests.append({ 'name': f'{method}方法', 'passed': False, 'details': '方法不允许 (405)' }) else: tests.append({ 'name': f'{method}方法', 'passed': False, 'details': '无响应' }) except Exception as e: tests.append({ 'name': f'{method}方法', 'passed': False, 'details': f'测试失败: {e}' }) return {'category': 'HTTP方法测试', 'tests': tests} def _test_headers(self, hostname: str, port: int, use_ssl: bool, path: str) -> Dict: """测试HTTP头部支持""" tests = [] # 测试Host头部(HTTP/1.1必需) try: response = self._send_raw_request( hostname, port, use_ssl, 'GET', path, headers={'Host': f'{hostname}:{port}'} ) if response: tests.append({'name': 'Host头部', 'passed': True, 'details': '支持Host头部'}) else: tests.append({'name': 'Host头部', 'passed': False, 'details': '不支持Host头部'}) except Exception as e: tests.append({'name': 'Host头部', 'passed': False, 'details': f'测试失败: {e}'}) # 测试User-Agent头部 try: custom_ua = 'HTTPTester/1.0' response = self._send_raw_request( hostname, port, use_ssl, 'GET', path, headers={'User-Agent': custom_ua} ) if response: tests.append({'name': 'User-Agent头部', 'passed': True, 'details': '支持自定义User-Agent'}) else: tests.append({'name': 'User-Agent头部', 'passed': False, 'details': '不支持自定义User-Agent'}) except Exception as e: tests.append({'name': 'User-Agent头部', 'passed': False, 'details': f'测试失败: {e}'}) # 测试Accept-Encoding try: response = self._send_raw_request( hostname, port, use_ssl, 'GET', path, headers={'Accept-Encoding': 'gzip, deflate'} ) if response: # 检查响应头中是否有Content-Encoding headers = self._parse_response_headers(response) if 'Content-Encoding' in headers: encoding = headers['Content-Encoding'] tests.append({'name': '内容编码', 'passed': True, 'details': f'支持压缩: {encoding}'}) else: tests.append({'name': '内容编码', 'passed': False, 'details': '不支持压缩'}) else: tests.append({'name': '内容编码', 'passed': False, 'details': '无响应'}) except Exception as e: tests.append({'name': '内容编码', 'passed': False, 'details': f'测试失败: {e}'}) return {'category': 'HTTP头部测试', 'tests': tests} def _test_status_codes(self, hostname: str, port: int, use_ssl: bool, path: str) -> Dict: """测试状态码处理""" tests = [] # 测试正常请求 try: response = self._send_raw_request(hostname, port, use_ssl, 'GET', path) if response: status_line = response.split('\r\n')[0] status_code = int(status_line.split(' ')[1]) if 200 <= status_code < 300: tests.append({'name': '2xx状态码', 'passed': True, 'details': f'成功: {status_code}'}) else: tests.append({'name': '2xx状态码', 'passed': False, 'details': f'非成功状态码: {status_code}'}) else: tests.append({'name': '2xx状态码', 'passed': False, 'details': '无响应'}) except Exception as e: tests.append({'name': '2xx状态码', 'passed': False, 'details': f'测试失败: {e}'}) # 测试404处理 try: nonexistent_path = f'{path}/nonexistent-{int(time.time())}' response = self._send_raw_request(hostname, port, use_ssl, 'GET', nonexistent_path) if response: status_line = response.split('\r\n')[0] status_code = int(status_line.split(' ')[1]) if status_code == 404: tests.append({'name': '404处理', 'passed': True, 'details': '正确处理404'}) else: tests.append({'name': '404处理', 'passed': False, 'details': f'错误状态码: {status_code}'}) else: tests.append({'name': '404处理', 'passed': False, 'details': '无响应'}) except Exception as e: tests.append({'name': '404处理', 'passed': False, 'details': f'测试失败: {e}'}) return {'category': '状态码测试', 'tests': tests} def _test_redirects(self, hostname: str, port: int, use_ssl: bool) -> Dict: """测试重定向处理""" tests = [] # 注意:这个测试需要知道服务器上的重定向端点 # 这里只是一个示例框架 return {'category': '重定向测试', 'tests': tests} def _send_raw_request(self, hostname: str, port: int, use_ssl: bool, method: str, path: str, headers: Dict = None) -> Optional[str]: """发送原始HTTP请求""" try: # 创建socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) if use_ssl: context = ssl.create_default_context() sock = context.wrap_socket(sock, server_hostname=hostname) sock.connect((hostname, port)) # 构建请求 request_lines = [f'{method}{path} HTTP/1.1'] # 默认头部 default_headers = { 'Host': f'{hostname}:{port}', 'User-Agent': 'HTTPTester/1.0', 'Connection': 'close' } if headers: default_headers.update(headers) for key, value in default_headers.items(): request_lines.append(f'{key}: {value}') request_text = '\r\n'.join(request_lines) + '\r\n\r\n' # 发送请求 sock.sendall(request_text.encode('utf-8')) # 接收响应 response = b'' while True: try: chunk = sock.recv(4096) if not chunk: break response += chunk except socket.timeout: break sock.close() return response.decode('utf-8', errors='ignore') except Exception: return None def _parse_response_headers(self, response_text: str) -> Dict: """解析响应头部""" headers = {} lines = response_text.split('\r\n') if not lines: return headers # 跳过状态行 for line in lines[1:]: if not line.strip(): break if ': ' in line: key, value = line.split(': ', 1) headers[key] = value return headers def _generate_recommendations(self, test_categories: List[Dict]) -> List[str]: """生成改进建议""" recommendations = [] for category in test_categories: failed_tests = [t for t in category['tests'] if not t['passed']] if failed_tests: category_name = category['category'] recommendations.append(f"{category_name}: 有{len(failed_tests)}个测试失败") # 通用建议 recommendations.extend([ "确保支持HTTP/1.1必需的Host头部", "实现正确的状态码处理", "支持常见的HTTP方法(GET、POST等)", "启用内容压缩(gzip/deflate)", "使用安全的TLS版本(TLSv1.2或更高)", "提供有意义的错误页面", "实现适当的缓存控制" ]) return recommendations def generate_report(self, output_file: str = None): """生成测试报告""" report = { 'timestamp': time.time(), 'tests_run': len(self.results), 'results': self.results, 'summary': self._generate_summary() } report_json = json.dumps(report, indent=2) if output_file: with open(output_file, 'w') as f: f.write(report_json) return report_json def _generate_summary(self) -> Dict: """生成测试摘要""" if not self.results: return {} total_tests = sum(r['total_tests'] for r in self.results) passed_tests = sum(r['passed_tests'] for r in self.results) return { 'total_tests': total_tests, 'passed_tests': passed_tests, 'overall_pass_rate': (passed_tests / total_tests * 100) if total_tests > 0 else 0, 'tested_urls': [r['url'] for r in self.results] }
总结
通过今天的学习,我们深入掌握了:
HTTP协议核心:理解了HTTP/1.1协议规范、方法、状态码和头部
完整服务器实现:构建了支持HTTP/1.1、持久连接、路由、中间件的完整服务器
高级客户端:实现了支持连接池、cookie管理、重定向的HTTP客户端
协议扩展:探索了HTTP/2协议的基本原理和实现
性能优化:学习了服务器性能调优和基准测试技术
关键收获:
实践建议:
使用今天的代码作为基础,构建自己的Web框架
尝试实现更多HTTP功能,如WebSocket、Server-Sent Events
学习现有框架(如Flask、Django)的HTTP处理实现
深入研究HTTP/2和HTTP/3协议,实现完整支持
明天我们将学习WebSocket基础,这是实现实时双向通信的关键技术。