1. WSGI简介
什么是WSGI?
WSGI(Web Server Gateway Interface,Web服务器网关接口)是Python Web应用程序和Web服务器之间的标准接口。它在2003年由PEP 333提出,2010年更新为PEP 3333(添加了Python 3支持)。
WSGI的核心目标
为什么需要WSGI?
在WSGI之前,每个Python Web框架(如Zope、Django)都需要特定的Web服务器或适配器,造成了生态系统碎片化。
2. WSGI规范详解
WSGI接口定义
WSGI规范定义了三个核心组件:
应用程序(Application):可调用对象(函数、类等)
服务器/网关(Server/Gateway):接收HTTP请求,调用应用程序
中间件(Middleware):同时扮演服务器和应用程序的角色
应用程序接口
def simple_app(environ, start_response): """ 最简单的WSGI应用程序 environ: 包含请求信息的字典 start_response: 用于开始HTTP响应的回调函数 返回值: 响应体的可迭代对象(通常是字符串列表) """ status = '200 OK' headers = [('Content-Type', 'text/plain; charset=utf-8')] start_response(status, headers) return [b'Hello, WSGI World!\n']
environ字典详解
# environ字典包含的重要键:environ = { # 必需的环境变量 'REQUEST_METHOD': 'GET', # HTTP方法 'SCRIPT_NAME': '', # 应用根路径 'PATH_INFO': '/path/to/resource', # 请求路径 'QUERY_STRING': 'param=value', # 查询字符串 'SERVER_NAME': 'localhost', # 服务器名 'SERVER_PORT': '8080', # 服务器端口 # WSGI必需的变量 'wsgi.version': (1, 0), # WSGI版本 'wsgi.url_scheme': 'http', # URL方案 'wsgi.input': input_stream, # 输入流 'wsgi.errors': error_stream, # 错误流 'wsgi.multithread': True/False, # 是否多线程 'wsgi.multiprocess': True/False, # 是否多进程 'wsgi.run_once': True/False, # 是否单次运行 # HTTP头部(添加HTTP_前缀) 'HTTP_HOST': 'localhost:8080', 'HTTP_USER_AGENT': 'Mozilla/5.0', 'HTTP_ACCEPT': 'text/html', # 其他CGI标准变量 'SERVER_PROTOCOL': 'HTTP/1.1', 'CONTENT_TYPE': 'application/x-www-form-urlencoded', 'CONTENT_LENGTH': '123',}
start_response函数
def start_response(status, response_headers, exc_info=None): """ 开始HTTP响应的回调函数 status: 状态字符串,如 '200 OK' response_headers: 头部元组列表,如 [('Content-Type', 'text/html')] exc_info: 异常信息元组(可选) 返回值: write()函数(可选) """ # 服务器必须设置状态和头部 # 可能返回一个write()函数用于写入响应体 pass
3. 实现WSGI服务器
基本WSGI服务器实现
# simple_wsgi_server.pyimport socketimport threadingimport ioimport sysfrom urllib.parse import parse_qs, urlparsefrom datetime import datetimeclass SimpleWSGIServer: """简单的WSGI服务器实现""" def __init__(self, host='localhost', port=8080): self.host = host self.port = port self.app = None self.server_socket = None self.running = False def set_app(self, app): """设置WSGI应用程序""" self.app = app def serve_forever(self): """启动服务器""" self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.running = True print(f"WSGI服务器启动在 http://{self.host}:{self.port}") try: while self.running: client_socket, address = self.server_socket.accept() print(f"[{datetime.now()}] 客户端连接: {address}") thread = threading.Thread( target=self.handle_request, args=(client_socket,) ) thread.daemon = True thread.start() except KeyboardInterrupt: print("\n服务器关闭中...") finally: self.server_socket.close() def handle_request(self, client_socket): """处理单个HTTP请求""" try: # 接收请求数据 request_data = client_socket.recv(1024) if not request_data: return # 解析请求 request_lines = request_data.decode('utf-8').split('\r\n') request_line = request_lines[0] method, path, version = request_line.split(' ') # 解析查询参数 parsed_path = urlparse(path) path_info = parsed_path.path query_string = parsed_path.query # 解析请求头 headers = {} content_length = 0 for line in request_lines[1:]: if not line: break if ': ' in line: key, value = line.split(': ', 1) headers[key.lower()] = value if key.lower() == 'content-length': content_length = int(value) # 读取请求体(如果有) body = b'' if content_length > 0: body = client_socket.recv(content_length) # 构建environ字典 environ = self.make_environ( method, path_info, query_string, headers, body, client_socket ) # 准备start_response参数 response_status = [None] response_headers = [] def start_response(status, headers, exc_info=None): """WSGI start_response回调""" if exc_info: # 处理异常 raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) response_status[0] = status response_headers.extend(headers) return lambda data: None # write函数(这里不需要) # 调用WSGI应用程序 if self.app: app_iter = self.app(environ, start_response) # 构建响应 response = self.build_response( response_status[0], response_headers, app_iter ) # 发送响应 client_socket.sendall(response) else: # 如果没有应用,返回404 response = b'HTTP/1.1 404 Not Found\r\n\r\nNo WSGI app configured' client_socket.sendall(response) except Exception as e: print(f"处理请求错误: {e}") error_response = ( b'HTTP/1.1 500 Internal Server Error\r\n' b'Content-Type: text/plain\r\n' b'\r\n' f'Server Error: {e}'.encode() ) client_socket.sendall(error_response) finally: client_socket.close() def make_environ(self, method, path_info, query_string, headers, body, client_socket): """构建environ字典""" # 解析客户端地址 client_address = client_socket.getpeername() # 构建environ environ = { # CGI标准变量 'REQUEST_METHOD': method, 'SCRIPT_NAME': '', # 应用挂载点 'PATH_INFO': path_info, 'QUERY_STRING': query_string, 'CONTENT_TYPE': headers.get('content-type', ''), 'CONTENT_LENGTH': str(len(body)), 'SERVER_NAME': self.host, 'SERVER_PORT': str(self.port), 'SERVER_PROTOCOL': 'HTTP/1.1', # WSGI必需变量 'wsgi.version': (1, 0), 'wsgi.url_scheme': 'http', 'wsgi.input': io.BytesIO(body), 'wsgi.errors': sys.stderr, 'wsgi.multithread': True, 'wsgi.multiprocess': False, 'wsgi.run_once': False, # 客户端信息 'REMOTE_ADDR': client_address[0], 'REMOTE_PORT': str(client_address[1]), } # 添加HTTP头部 for key, value in headers.items(): if key.startswith('content-'): continue # 已经处理过了 wsgi_key = 'HTTP_' + key.upper().replace('-', '_') environ[wsgi_key] = value return environ def build_response(self, status, headers, app_iter): """构建HTTP响应""" # 响应状态行 response = [f'HTTP/1.1 {status}\r\n'.encode()] # 响应头部 for key, value in headers: response.append(f'{key}: {value}\r\n'.encode()) response.append(b'\r\n') # 响应体 for chunk in app_iter: if isinstance(chunk, str): chunk = chunk.encode('utf-8') response.append(chunk) # 关闭可迭代对象(如果支持) if hasattr(app_iter, 'close'): app_iter.close() return b''.join(response)# 测试应用程序def test_app(environ, start_response): """测试WSGI应用""" # 解析查询参数 query_string = environ.get('QUERY_STRING', '') params = parse_qs(query_string) # 获取路径 path = environ.get('PATH_INFO', '/') # 构建响应 status = '200 OK' headers = [ ('Content-Type', 'text/html; charset=utf-8'), ('Server', 'SimpleWSGIServer/1.0') ] start_response(status, headers) # 响应体 body = f""" <!DOCTYPE html> <html> <head><title>WSGI Test</title></head> <body> <h1>WSGI Test Application</h1> <p>Path: {path}</p> <p>Method: {environ['REQUEST_METHOD']}</p> <p>Query Parameters: {params}</p> <p>Time: {datetime.now()}</p> <p>WSGI Version: {environ['wsgi.version']}</p> </body> </html> """ return [body.encode('utf-8')]def run_simple_server(): """运行简单WSGI服务器""" server = SimpleWSGIServer(port=8080) server.set_app(test_app) server.serve_forever()if __name__ == '__main__': run_simple_server()
4. 实现WSGI应用程序
类实现的WSGI应用
# wsgi_applications.pyclass WSGIApplication: """基于类的WSGI应用""" def __init__(self): self.routes = {} self.middleware = [] def route(self, path, methods=None): """路由装饰器""" if methods is None: methods = ['GET'] def decorator(handler): self.routes[(path, tuple(methods))] = handler return handler return decorator def __call__(self, environ, start_response): """WSGI应用接口""" # 解析请求 method = environ['REQUEST_METHOD'] path = environ['PATH_INFO'] # 查找路由 handler = None for (route_path, route_methods), route_handler in self.routes.items(): if path == route_path and method in route_methods: handler = route_handler break if handler: try: # 调用处理函数 response = handler(environ) status = response.get('status', '200 OK') headers = response.get('headers', []) body = response.get('body', '') start_response(status, headers) return [body.encode('utf-8')] except Exception as e: status = '500 Internal Server Error' headers = [('Content-Type', 'text/plain')] body = f'Server Error: {e}' start_response(status, headers) return [body.encode('utf-8')] else: # 404 Not Found status = '404 Not Found' headers = [('Content-Type', 'text/html')] body = f'<h1>404 Not Found</h1><p>Path: {path}</p>' start_response(status, headers) return [body.encode('utf-8')]# 使用示例def create_sample_app(): """创建示例应用""" app = WSGIApplication() @app.route('/', methods=['GET']) def home(environ): return { 'status': '200 OK', 'headers': [('Content-Type', 'text/html')], 'body': ''' <html> <head><title>Home</title></head> <body> <h1>Welcome!</h1> <p><a href="/about">About</a></p> <p><a href="/api/hello">API</a></p> </body> </html> ''' } @app.route('/about', methods=['GET']) def about(environ): return { 'status': '200 OK', 'headers': [('Content-Type', 'text/html')], 'body': ''' <html> <head><title>About</title></head> <body> <h1>About This App</h1> <p>This is a simple WSGI application.</p> <p><a href="/">Home</a></p> </body> </html> ''' } @app.route('/api/hello', methods=['GET']) def api_hello(environ): import json data = { 'message': 'Hello, World!', 'timestamp': datetime.now().isoformat(), 'path': environ['PATH_INFO'] } return { 'status': '200 OK', 'headers': [('Content-Type', 'application/json')], 'body': json.dumps(data, indent=2) } return app# 运行应用if __name__ == '__main__': from simple_wsgi_server import SimpleWSGIServer app = create_sample_app() server = SimpleWSGIServer(port=8080) server.set_app(app) server.serve_forever()
更完整的框架式实现
# mini_framework.pyimport jsonimport refrom urllib.parse import parse_qs, urlparseclass Request: """HTTP请求包装类""" def __init__(self, environ): self.environ = environ self.method = environ['REQUEST_METHOD'] self.path = environ['PATH_INFO'] self.query = parse_qs(environ.get('QUERY_STRING', '')) self.headers = self._extract_headers() self.body = self._read_body() def _extract_headers(self): """提取HTTP头部""" headers = {} for key, value in self.environ.items(): if key.startswith('HTTP_'): header_name = key[5:].replace('_', '-').title() headers[header_name] = value return headers def _read_body(self): """读取请求体""" content_length = int(self.environ.get('CONTENT_LENGTH', 0)) if content_length > 0: return self.environ['wsgi.input'].read(content_length) return b'' @property def json(self): """解析JSON请求体""" if self.body: try: return json.loads(self.body.decode('utf-8')) except json.JSONDecodeError: return None return None @property def form(self): """解析表单数据""" content_type = self.environ.get('CONTENT_TYPE', '') if 'application/x-www-form-urlencoded' in content_type: return parse_qs(self.body.decode('utf-8')) return {}class Response: """HTTP响应类""" def __init__(self, body='', status=200, content_type='text/plain'): self.body = body self.status = status self.content_type = content_type self.headers = [] def set_header(self, key, value): """设置响应头""" self.headers.append((key, value)) def as_wsgi(self): """转换为WSGI响应""" status_map = { 200: '200 OK', 201: '201 Created', 204: '204 No Content', 301: '301 Moved Permanently', 302: '302 Found', 400: '400 Bad Request', 401: '401 Unauthorized', 403: '403 Forbidden', 404: '404 Not Found', 500: '500 Internal Server Error' } status_text = status_map.get(self.status, '200 OK') # 默认头部 default_headers = [ ('Content-Type', f'{self.content_type}; charset=utf-8'), ('Content-Length', str(len(self.body.encode('utf-8'))) if self.body else '0') ] all_headers = default_headers + self.headers def start_response(status, headers): """WSGI start_response""" pass return status_text, all_headers, [self.body.encode('utf-8')] if self.body else []class MiniFramework: """迷你Web框架""" def __init__(self): self.routes = [] self.error_handlers = {} self.middleware = [] def route(self, pattern, methods=None): """路由装饰器,支持正则表达式""" if methods is None: methods = ['GET'] def decorator(handler): self.routes.append((re.compile(pattern), methods, handler)) return handler return decorator def errorhandler(self, code): """错误处理器装饰器""" def decorator(handler): self.error_handlers[code] = handler return handler return decorator def add_middleware(self, middleware): """添加中间件""" self.middleware.append(middleware) def __call__(self, environ, start_response): """WSGI应用接口""" try: # 创建请求对象 request = Request(environ) # 应用中间件 for middleware in self.middleware: result = middleware(request) if result: # 如果中间件返回响应,直接返回 status, headers, body = result.as_wsgi() start_response(status, headers) return body # 查找匹配的路由 handler = None match_dict = {} for pattern, methods, route_handler in self.routes: if request.method in methods: match = pattern.match(request.path) if match: handler = route_handler match_dict = match.groupdict() break if handler: # 调用处理函数 response = handler(request, **match_dict) if isinstance(response, tuple): # 如果是元组,解包为(body, status, headers) if len(response) == 2: body, status = response headers = [] elif len(response) == 3: body, status, headers = response else: raise ValueError("响应元组格式错误") response_obj = Response(body, status) for key, value in headers: response_obj.set_header(key, value) elif isinstance(response, Response): response_obj = response else: # 假设是字符串 response_obj = Response(response) status_text, headers, body = response_obj.as_wsgi() start_response(status_text, headers) return body else: # 404 Not Found if 404 in self.error_handlers: response = self.error_handlers[404](request) else: response = Response('404 Not Found', 404) status_text, headers, body = response.as_wsgi() start_response(status_text, headers) return body except Exception as e: # 处理异常 print(f"Unhandled exception: {e}") if 500 in self.error_handlers: response = self.error_handlers[500](Request(environ)) else: response = Response(f'500 Internal Server Error: {e}', 500) status_text, headers, body = response.as_wsgi() start_response(status_text, headers) return body# 使用示例def create_mini_app(): """创建迷你框架应用""" app = MiniFramework() @app.route(r'^/$') def home(request): return ''' <html> <head><title>Home</title></head> <body> <h1>Mini Framework</h1> <ul> <li><a href="/hello">Hello</a></li> <li><a href="/user/123">User 123</a></li> <li><a href="/api/data">API Data</a></li> <li><a href="/about">About</a></li> </ul> </body> </html> ''' @app.route(r'^/hello$') def hello(request): name = request.query.get('name', ['World'])[0] return f''' <html> <head><title>Hello</title></head> <body> <h1>Hello, {name}!</h1> <form method="GET"> <input type="text" name="name" placeholder="Your name"> <input type="submit" value="Greet"> </form> <p><a href="/">Home</a></p> </body> </html> ''' @app.route(r'^/user/(?P<user_id>\d+)$') def user_profile(request, user_id): return f''' <html> <head><title>User {user_id}</title></head> <body> <h1>User Profile</h1> <p>User ID: {user_id}</p> <p><a href="/">Home</a></p> </body> </html> ''' @app.route(r'^/api/data$', methods=['GET', 'POST']) def api_data(request): if request.method == 'POST': data = request.json or request.form return json.dumps({ 'status': 'success', 'message': 'Data received', 'data': data }, indent=2), 200, [('Content-Type', 'application/json')] else: return json.dumps({ 'status': 'success', 'data': [1, 2, 3, 4, 5], 'timestamp': datetime.now().isoformat() }, indent=2), 200, [('Content-Type', 'application/json')] @app.route(r'^/about$') def about(request): return ''' <html> <head><title>About</title></head> <body> <h1>About This Framework</h1> <p>This is a mini WSGI-based web framework.</p> <p><a href="/">Home</a></p> </body> </html> ''' # 错误处理 @app.errorhandler(404) def not_found(request): return Response(f''' <html> <head><title>404 Not Found</title></head> <body> <h1>404 Not Found</h1> <p>The requested path "{request.path}" was not found.</p> <p><a href="/">Home</a></p> </body> </html> ''', 404, 'text/html') @app.errorhandler(500) def server_error(request): return Response(''' <html> <head><title>500 Server Error</title></head> <body> <h1>500 Internal Server Error</h1> <p>Something went wrong on our end.</p> <p><a href="/">Home</a></p> </body> </html> ''', 500, 'text/html') return app# 运行框架if __name__ == '__main__': from simple_wsgi_server import SimpleWSGIServer app = create_mini_app() server = SimpleWSGIServer(port=8080) server.set_app(app) server.serve_forever()
5. WSGI中间件实现
中间件基类
# wsgi_middleware.pyclass WSGIMiddleware: """WSGI中间件基类""" def __init__(self, app): self.app = app def __call__(self, environ, start_response): """必须由子类实现""" raise NotImplementedErrorclass LoggingMiddleware(WSGIMiddleware): """日志中间件""" def __call__(self, environ, start_response): # 记录请求开始 start_time = datetime.now() method = environ['REQUEST_METHOD'] path = environ['PATH_INFO'] print(f"[{start_time}] {method}{path} - Request started") # 包装start_response以记录响应状态 def custom_start_response(status, headers, exc_info=None): # 调用原始的start_response result = start_response(status, headers, exc_info) # 记录响应状态 end_time = datetime.now() duration = (end_time - start_time).total_seconds() print(f"[{end_time}] {method}{path} - {status} ({duration:.3f}s)") return result # 调用下一个应用或中间件 return self.app(environ, custom_start_response)class AuthenticationMiddleware(WSGIMiddleware): """身份验证中间件""" def __init__(self, app, api_keys=None): super().__init__(app) self.api_keys = api_keys or {} def __call__(self, environ, start_response): # 检查需要认证的路径 path = environ['PATH_INFO'] if path.startswith('/api/'): # 提取API密钥 api_key = environ.get('HTTP_X_API_KEY', '') if not api_key or api_key not in self.api_keys: # 认证失败 status = '401 Unauthorized' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return [b'Authentication required'] # 认证通过,继续处理 return self.app(environ, start_response)class CORSMiddleware(WSGIMiddleware): """CORS中间件""" def __init__(self, app, allow_origins=None, allow_methods=None, allow_headers=None): super().__init__(app) self.allow_origins = allow_origins or ['*'] self.allow_methods = allow_methods or ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'] self.allow_headers = allow_headers or ['Content-Type', 'Authorization'] def __call__(self, environ, start_response): # 处理预检请求 if environ['REQUEST_METHOD'] == 'OPTIONS': status = '200 OK' headers = [ ('Access-Control-Allow-Origin', ', '.join(self.allow_origins)), ('Access-Control-Allow-Methods', ', '.join(self.allow_methods)), ('Access-Control-Allow-Headers', ', '.join(self.allow_headers)), ('Access-Control-Max-Age', '86400'), # 24小时 ] start_response(status, headers) return [] # 包装start_response以添加CORS头部 def custom_start_response(status, response_headers, exc_info=None): # 添加CORS头部 cors_headers = [ ('Access-Control-Allow-Origin', ', '.join(self.allow_origins)), ('Access-Control-Allow-Credentials', 'true'), ] # 调用原始的start_response return start_response(status, response_headers + cors_headers, exc_info) # 调用下一个应用或中间件 return self.app(environ, custom_start_response)class SessionMiddleware(WSGIMiddleware): """会话中间件""" def __init__(self, app, session_cookie='session_id'): super().__init__(app) self.session_cookie = session_cookie self.sessions = {} # 简单内存存储 def __call__(self, environ, start_response): # 从cookie中提取会话ID cookie_header = environ.get('HTTP_COOKIE', '') cookies = self.parse_cookies(cookie_header) session_id = cookies.get(self.session_cookie) # 获取或创建会话 if session_id and session_id in self.sessions: session = self.sessions[session_id] else: import uuid session_id = str(uuid.uuid4()) session = {'id': session_id, 'data': {}} self.sessions[session_id] = session # 将会话添加到environ中 environ['wsgi.session'] = session # 包装start_response以设置会话cookie def custom_start_response(status, response_headers, exc_info=None): # 添加会话cookie cookie = f"{self.session_cookie}={session_id}; Path=/; HttpOnly" response_headers.append(('Set-Cookie', cookie)) return start_response(status, response_headers, exc_info) # 调用下一个应用或中间件 try: return self.app(environ, custom_start_response) finally: # 清理过期会话(简化实现) self.cleanup_sessions() def parse_cookies(self, cookie_header): """解析Cookie头部""" cookies = {} if cookie_header: for cookie in cookie_header.split(';'): if '=' in cookie: key, value = cookie.strip().split('=', 1) cookies[key] = value return cookies def cleanup_sessions(self): """清理过期会话(简化实现)""" # 在实际应用中,这里应该实现会话过期逻辑 passclass StaticFileMiddleware(WSGIMiddleware): """静态文件中间件""" def __init__(self, app, static_dir='static', url_prefix='/static'): super().__init__(app) self.static_dir = static_dir self.url_prefix = url_prefix def __call__(self, environ, start_response): path = environ['PATH_INFO'] # 检查是否是静态文件请求 if path.startswith(self.url_prefix): # 提取文件路径 file_path = path[len(self.url_prefix):].lstrip('/') # 防止路径遍历攻击 if '..' in file_path or file_path.startswith('/'): status = '403 Forbidden' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return [b'Access denied'] # 构建完整文件路径 import os full_path = os.path.join(self.static_dir, file_path) # 检查文件是否存在 if os.path.exists(full_path) and os.path.isfile(full_path): # 根据扩展名确定Content-Type content_type = self.get_content_type(full_path) # 读取文件内容 with open(full_path, 'rb') as f: content = f.read() # 返回文件响应 status = '200 OK' headers = [ ('Content-Type', content_type), ('Content-Length', str(len(content))) ] start_response(status, headers) return [content] else: # 文件不存在 status = '404 Not Found' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return [b'File not found'] # 不是静态文件请求,继续处理 return self.app(environ, start_response) def get_content_type(self, file_path): """根据文件扩展名获取Content-Type""" import os ext = os.path.splitext(file_path)[1].lower() content_types = { '.html': 'text/html', '.css': 'text/css', '.js': 'application/javascript', '.json': 'application/json', '.txt': 'text/plain', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.pdf': 'application/pdf', '.svg': 'image/svg+xml', '.ico': 'image/x-icon', } return content_types.get(ext, 'application/octet-stream')# 中间件链构建器def build_middleware_stack(app, middlewares): """构建中间件栈""" wrapped_app = app for middleware_class, kwargs in reversed(middlewares): wrapped_app = middleware_class(wrapped_app, **kwargs) return wrapped_app# 使用示例def create_app_with_middleware(): """创建带有中间件的应用""" # 基础应用 def simple_app(environ, start_response): status = '200 OK' headers = [('Content-Type', 'text/html; charset=utf-8')] # 获取会话(如果存在) session = environ.get('wsgi.session', {}) session_data = session.get('data', {}) # 更新访问计数 visit_count = session_data.get('visits', 0) + 1 session_data['visits'] = visit_count body = f""" <html> <head><title>Middleware Demo</title></head> <body> <h1>Middleware Demo</h1> <p>You have visited this page {visit_count} times.</p> <p>Path: {environ['PATH_INFO']}</p> <p>Method: {environ['REQUEST_METHOD']}</p> <p><a href="/static/test.txt">Static File Test</a></p> <p><a href="/api/test">API Test (requires auth)</a></p> </body> </html> """ start_response(status, headers) return [body.encode('utf-8')] # API端点(需要认证) def api_app(environ, start_response): status = '200 OK' headers = [ ('Content-Type', 'application/json'), ('X-Custom-Header', 'API Response') ] data = { 'status': 'success', 'message': 'API is working', 'authenticated': True, 'timestamp': datetime.now().isoformat() } import json start_response(status, headers) return [json.dumps(data).encode('utf-8')] # 组合应用 def combined_app(environ, start_response): path = environ['PATH_INFO'] if path.startswith('/api/'): return api_app(environ, start_response) else: return simple_app(environ, start_response) # 构建中间件栈 middlewares = [ (LoggingMiddleware, {}), (CORSMiddleware, { 'allow_origins': ['http://localhost:3000'], 'allow_methods': ['GET', 'POST', 'OPTIONS'], 'allow_headers': ['Content-Type', 'X-API-Key'] }), (SessionMiddleware, {'session_cookie': 'myapp_session'}), (AuthenticationMiddleware, { 'api_keys': {'test-key-123': 'admin'} }), (StaticFileMiddleware, { 'static_dir': 'static', 'url_prefix': '/static' }) ] return build_middleware_stack(combined_app, middlewares)# 运行带中间件的应用if __name__ == '__main__': # 创建静态目录和测试文件 import os if not os.path.exists('static'): os.makedirs('static') with open('static/test.txt', 'w') as f: f.write('This is a static file served by WSGI middleware.\n') from simple_wsgi_server import SimpleWSGIServer app = create_app_with_middleware() server = SimpleWSGIServer(port=8080) server.set_app(app) server.serve_forever()
6. 生产级WSGI服务器
使用Python标准库的wsgiref
# wsgiref_demo.pyfrom wsgiref.simple_server import make_serverfrom wsgiref.util import setup_testing_defaultsfrom wsgiref.headers import Headersimport jsonclass WsgirefApp: """使用wsgiref的WSGI应用""" def __init__(self): self.routes = {} def route(self, path): def decorator(handler): self.routes[path] = handler return handler return decorator def __call__(self, environ, start_response): setup_testing_defaults(environ) path = environ['PATH_INFO'] method = environ['REQUEST_METHOD'] if path in self.routes: try: response = self.routes[path](environ) if isinstance(response, tuple): body, status, headers = response else: body = response status = '200 OK' headers = [('Content-Type', 'text/html')] start_response(status, headers) return [body.encode('utf-8')] except Exception as e: status = '500 Internal Server Error' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return [f'Error: {e}'.encode('utf-8')] else: status = '404 Not Found' headers = [('Content-Type', 'text/html')] start_response(status, headers) return [b'<h1>404 Not Found</h1>']# 创建应用app = WsgirefApp()@app.route('/')def index(environ): return ''' <html> <head><title>WSGIREF Demo</title></head> <body> <h1>WSGIREF Demo Application</h1> <p>Using Python standard library wsgiref</p> <p><a href="/api">API Example</a></p> <p><a href="/env">Environment</a></p> </body> </html> '''@app.route('/api')def api(environ): data = { 'status': 'success', 'message': 'This is a JSON API response', 'server': 'wsgiref', 'timestamp': datetime.now().isoformat() } headers = [ ('Content-Type', 'application/json'), ('X-Server', 'WSGIREF') ] return json.dumps(data, indent=2), '200 OK', headers@app.route('/env')def show_env(environ): body = '<h1>WSGI Environment</h1><ul>' for key, value in sorted(environ.items()): body += f'<li><strong>{key}:</strong> {value}</li>' body += '</ul>' return body, '200 OK', [('Content-Type', 'text/html')]# 运行服务器def run_wsgiref_server(): """使用wsgiref运行WSGI服务器""" port = 8000 with make_server('', port, app) as httpd: print(f"Serving on port {port}...") print(f"Visit http://localhost:{port}") httpd.serve_forever()if __name__ == '__main__': run_wsgiref_server()
多线程WSGI服务器
# threaded_wsgi_server.pyimport socketimport threadingimport queueimport ioimport sysclass ThreadPoolWSGIServer: """线程池WSGI服务器""" def __init__(self, host='localhost', port=8080, thread_count=10): self.host = host self.port = port self.thread_count = thread_count self.app = None self.request_queue = queue.Queue() self.threads = [] self.running = False def set_app(self, app): self.app = app def worker(self): """工作线程函数""" while self.running: try: client_socket = self.request_queue.get(timeout=1) if client_socket: self.handle_request(client_socket) except queue.Empty: continue def handle_request(self, client_socket): """处理请求(简化版)""" try: # 接收请求 request_data = client_socket.recv(4096) if not request_data: return # 简化的请求解析 request_lines = request_data.decode('utf-8').split('\r\n') if not request_lines: return request_line = request_lines[0] parts = request_line.split(' ') if len(parts) < 3: return method, path, version = parts # 构建environ environ = { 'REQUEST_METHOD': method, 'PATH_INFO': path, 'SERVER_NAME': self.host, 'SERVER_PORT': str(self.port), 'wsgi.version': (1, 0), 'wsgi.url_scheme': 'http', 'wsgi.input': io.BytesIO(b''), 'wsgi.errors': sys.stderr, 'wsgi.multithread': True, 'wsgi.multiprocess': False, 'wsgi.run_once': False, } # 调用应用 if self.app: def start_response(status, headers): # 构建响应 response = f'HTTP/1.1 {status}\r\n' for key, value in headers: response += f'{key}: {value}\r\n' response += '\r\n' return response.encode('utf-8') # 获取响应体 app_iter = self.app(environ, start_response) response_parts = [] for chunk in app_iter: if isinstance(chunk, str): chunk = chunk.encode('utf-8') response_parts.append(chunk) # 发送响应 response = b''.join(response_parts) client_socket.sendall(response) except Exception as e: print(f"请求处理错误: {e}") error_response = ( b'HTTP/1.1 500 Internal Server Error\r\n' b'Content-Type: text/plain\r\n' b'\r\n' b'Server Error' ) client_socket.sendall(error_response) finally: client_socket.close() def serve_forever(self): """启动服务器""" # 创建服务器socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((self.host, self.port)) server_socket.listen(100) self.running = True # 启动工作线程 for i in range(self.thread_count): thread = threading.Thread(target=self.worker, daemon=True) thread.start() self.threads.append(thread) print(f"线程池WSGI服务器启动 (线程数: {self.thread_count})") print(f"监听地址: http://{self.host}:{self.port}") try: while self.running: try: client_socket, address = server_socket.accept() self.request_queue.put(client_socket) except socket.timeout: continue except KeyboardInterrupt: break finally: self.running = False server_socket.close() print("服务器已停止")# 测试应用def test_app(environ, start_response): status = '200 OK' headers = [('Content-Type', 'text/html')] start_response(status, headers) thread_name = threading.current_thread().name body = f""" <html> <head><title>Thread Pool Test</title></head> <body> <h1>Thread Pool WSGI Server</h1> <p>Thread: {thread_name}</p> <p>Path: {environ['PATH_INFO']}</p> <p>Time: {datetime.now()}</p> </body> </html> """ return [body.encode('utf-8')]if __name__ == '__main__': server = ThreadPoolWSGIServer(thread_count=4) server.set_app(test_app) server.serve_forever()
7. WSGI与主流框架
Flask的WSGI实现
# flask_wsgi_demo.pyfrom flask import Flask, request, jsonify# Flask应用本身就是WSGI应用app = Flask(__name__)@app.route('/')def home(): return ''' <html> <head><title>Flask WSGI Demo</title></head> <body> <h1>Flask as WSGI Application</h1> <p>Flask applications are WSGI compliant.</p> <p><a href="/wsgi">WSGI Info</a></p> <p><a href="/api">API</a></p> </body> </html> '''@app.route('/wsgi')def wsgi_info(): """显示WSGI环境信息""" wsgi_env = { 'wsgi.version': request.environ.get('wsgi.version'), 'wsgi.url_scheme': request.environ.get('wsgi.url_scheme'), 'wsgi.multithread': request.environ.get('wsgi.multithread'), 'wsgi.multiprocess': request.environ.get('wsgi.multiprocess'), 'wsgi.run_once': request.environ.get('wsgi.run_once'), 'server_software': request.environ.get('SERVER_SOFTWARE'), 'request_method': request.environ.get('REQUEST_METHOD'), } return jsonify(wsgi_env)@app.route('/api/data', methods=['GET', 'POST'])def api_data(): if request.method == 'POST': data = request.get_json() or request.form return jsonify({ 'status': 'success', 'method': 'POST', 'data': data, 'wsgi_input': 'Flask abstracts wsgi.input' }) else: return jsonify({ 'status': 'success', 'method': 'GET', 'message': 'Flask handles WSGI for you' })# 直接作为WSGI应用使用if __name__ == '__main__': # 方式1: 使用Flask内置开发服务器 # app.run(debug=True, port=5000) # 方式2: 使用标准WSGI服务器 from wsgiref.simple_server import make_server port = 5000 with make_server('', port, app) as httpd: print(f"Flask WSGI应用运行在 http://localhost:{port}") httpd.serve_forever()
Django的WSGI配置
# django_wsgi_example.py (概念示例)"""Django的WSGI入口点通常位于项目目录下的wsgi.py文件以下是简化的示例:"""import osimport sysfrom django.core.wsgi import get_wsgi_application# 设置Django设置模块os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')# 获取WSGI应用application = get_wsgi_application()# Django的WSGI应用可以在任何WSGI服务器上运行# 例如使用gunicorn: gunicorn myproject.wsgi:application"""Django的WSGI处理器实际上是一个中间件栈,包括:1. SecurityMiddleware - 安全中间件2. SessionMiddleware - 会话中间件 3. CommonMiddleware - 通用中间件4. CsrfViewMiddleware - CSRF保护5. AuthenticationMiddleware - 认证中间件6. MessageMiddleware - 消息中间件7. XFrameOptionsMiddleware - X-Frame-Options最终调用视图函数处理请求"""
8. WSGI部署实践
使用Gunicorn部署
# gunicorn_config.py# Gunicorn配置文件示例# 绑定地址和端口bind = "0.0.0.0:8000"# 工作进程数 (CPU核心数 * 2 + 1 是个不错的起点)workers = 3# 工作进程类型 (sync, eventlet, gevent, tornado, gthread)worker_class = "sync"# 每个工作进程的线程数 (仅对gthread worker有效)threads = 1# 最大并发请求数worker_connections = 1000# 进程名proc_name = "my_wsgi_app"# 日志配置accesslog = "-" # 访问日志输出到stdouterrorlog = "-" # 错误日志输出到stderrloglevel = "info"# 超时设置timeout = 30keepalive = 2# 优雅重启graceful_timeout = 30# 最大请求数 (防止内存泄漏)max_requests = 1000max_requests_jitter = 50# 进程用户/组user = "www-data"group = "www-data"# 使用命令: gunicorn -c gunicorn_config.py myapp:app
使用uWSGI部署
# uwsgi.ini# uWSGI配置文件示例[uwsgi]# 项目目录chdir = /path/to/your/project# WSGI模块module = myapp:app# 主进程master = true# 进程数processes = 4# 线程数threads = 2# 绑定地址http = 0.0.0.0:8000# Socket文件 (用于与Nginx通信)# socket = /tmp/myapp.sock# chmod-socket = 664# vacuum = true# 进程用户/组uid = www-datagid = www-data# 日志文件logto = /var/log/uwsgi/myapp.log# 最大请求数max-requests = 1000# 优雅重启reload-on-rss = 256# 内存报告memory-report = true# 启用线程enable-threads = true# 使用命令: uwsgi --ini uwsgi.ini
Nginx + uWSGI配置
# nginx配置示例server { listen 80; server_name example.com; location / { include uwsgi_params; uwsgi_pass unix:/tmp/myapp.sock; # 超时设置 uwsgi_read_timeout 300; uwsgi_connect_timeout 300; uwsgi_send_timeout 300; } location /static { alias /path/to/your/project/static; expires 30d; } location /media { alias /path/to/your/project/media; expires 30d; } # 禁止访问隐藏文件 location ~ /\. { deny all; }}
9. WSGI性能优化
异步WSGI应用
# async_wsgi.pyimport asynciofrom concurrent.futures import ThreadPoolExecutorfrom wsgiref.simple_server import make_serverclass AsyncWSGIApp: """支持异步处理的WSGI应用""" def __init__(self, max_workers=10): self.executor = ThreadPoolExecutor(max_workers=max_workers) self.loop = asyncio.new_event_loop() async def async_handler(self, environ): """异步处理函数""" # 模拟异步I/O操作 await asyncio.sleep(0.1) path = environ['PATH_INFO'] return f""" <html> <head><title>Async WSGI</title></head> <body> <h1>Async WSGI Response</h1> <p>Path: {path}</p> <p>Async processed at: {datetime.now()}</p> </body> </html> """ def __call__(self, environ, start_response): # 将异步函数包装为同步调用 future = asyncio.run_coroutine_threadsafe( self.async_handler(environ), self.loop ) try: # 等待异步结果 body = future.result(timeout=5) status = '200 OK' headers = [('Content-Type', 'text/html')] start_response(status, headers) return [body.encode('utf-8')] except asyncio.TimeoutError: status = '504 Gateway Timeout' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return [b'Request timeout'] except Exception as e: status = '500 Internal Server Error' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return [f'Error: {e}'.encode('utf-8')]def run_async_app(): """运行异步WSGI应用""" app = AsyncWSGIApp() # 启动事件循环线程 def run_loop(): asyncio.set_event_loop(app.loop) app.loop.run_forever() import threading loop_thread = threading.Thread(target=run_loop, daemon=True) loop_thread.start() # 启动WSGI服务器 with make_server('', 8000, app) as httpd: print("Async WSGI app serving on port 8000") httpd.serve_forever()if __name__ == '__main__': run_async_app()
WSGI应用性能监控
# wsgi_monitoring.pyimport timeimport psutilimport threadingfrom collections import dequeclass WSGIMonitor: """WSGI应用性能监控""" def __init__(self, app, history_size=100): self.app = app self.request_times = deque(maxlen=history_size) self.error_count = 0 self.total_requests = 0 self.lock = threading.Lock() def __call__(self, environ, start_response): start_time = time.time() # 监控start_response def monitored_start_response(status, headers, exc_info=None): # 记录响应状态 if status and status.startswith('5'): with self.lock: self.error_count += 1 # 添加监控头部 headers.append(('X-Request-ID', threading.current_thread().name)) headers.append(('X-Server-Load', str(psutil.cpu_percent()))) return start_response(status, headers, exc_info) try: # 处理请求 response = self.app(environ, monitored_start_response) # 计算处理时间 duration = time.time() - start_time with self.lock: self.total_requests += 1 self.request_times.append(duration) return response except Exception as e: with self.lock: self.error_count += 1 raise def get_stats(self): """获取性能统计""" import statistics with self.lock: if self.request_times: avg_time = statistics.mean(self.request_times) max_time = max(self.request_times) min_time = min(self.request_times) p95 = statistics.quantiles(self.request_times, n=20)[18] if len(self.request_times) >= 20 else 0 else: avg_time = max_time = min_time = p95 = 0 return { 'total_requests': self.total_requests, 'error_count': self.error_count, 'error_rate': self.error_count / max(self.total_requests, 1), 'avg_response_time': avg_time, 'max_response_time': max_time, 'min_response_time': min_time, 'p95_response_time': p95, 'requests_per_second': len(self.request_times) / 60 if self.request_times else 0, 'memory_usage': psutil.Process().memory_info().rss / 1024 / 1024, # MB 'cpu_percent': psutil.cpu_percent(), } def stats_endpoint(self, environ, start_response): """监控数据端点""" stats = self.get_stats() import json body = json.dumps(stats, indent=2) status = '200 OK' headers = [ ('Content-Type', 'application/json'), ('Cache-Control', 'no-cache') ] start_response(status, headers) return [body.encode('utf-8')]# 使用示例def create_monitored_app(): """创建带监控的应用""" def base_app(environ, start_response): status = '200 OK' headers = [('Content-Type', 'text/html')] start_response(status, headers) # 模拟一些处理时间 time.sleep(0.01) return [b'<h1>Monitored App</h1><p>This app is being monitored.</p>'] # 包装应用 monitored_app = WSGIMonitor(base_app) # 添加监控端点 def app_with_monitor(environ, start_response): path = environ['PATH_INFO'] if path == '/stats': return monitored_app.stats_endpoint(environ, start_response) else: return monitored_app(environ, start_response) return app_with_monitorif __name__ == '__main__': from wsgiref.simple_server import make_server app = create_monitored_app() with make_server('', 8000, app) as httpd: print("Monitored app serving on port 8000") print("Visit http://localhost:8000/stats for monitoring data") httpd.serve_forever()
总结
今天深入学习了WSGI协议,掌握了以下核心内容:
关键收获
WSGI协议基础:理解了WSGI的设计哲学和核心接口
WSGI服务器实现:从零实现了完整的WSGI服务器
WSGI应用程序:创建了多种形式的WSGI应用(函数、类、框架)
WSGI中间件:实现了日志、认证、CORS、会话等中间件
生产部署:了解了Gunicorn、uWSGI等生产服务器的配置
性能优化:学习了异步WSGI和性能监控技术
ASGI简介:了解了WSGI的继任者ASGI协议
核心技术点
environ字典的构建和解析
start_response回调机制
中间件栈的设计模式
请求/响应生命周期管理
多线程/异步处理
与Flask、Django等框架的集成
WSGI的优缺点
优点:
标准化,提高互操作性
简单清晰的设计
成熟的生态系统
良好的调试支持
缺点:
同步模型,不适合高并发
不支持WebSocket等新协议
每个请求都需要完整的环境重建
实践建议
深入理解中间件:尝试实现更多类型的中间件
性能测试:对不同WSGI服务器进行性能对比测试
安全性:实现安全相关的中间件(如CSRF防护、XSS过滤)
协议扩展:尝试实现HTTP/2或WebSocket的WSGI扩展
容器化部署:将WSGI应用Docker化并部署到Kubernetes
扩展学习
ASGI规范:深入学习ASGI协议和异步Web开发
Web服务器实现:研究Nginx、Apache的WSGI模块实现
协议分析工具:使用Wireshark分析WSGI通信
性能调优:学习WSGI应用的内存分析和性能优化
云原生部署:学习在云平台上部署WSGI应用的最佳实践
明天我们将进入Web开发概述的学习,这是第四阶段Python生态系统的开始,将介绍Python Web开发的整体生态和主要框架。