经过Web安全基础的学习,今天我们进入网络编程基础,这是理解互联网通信、构建网络应用和Web服务的核心技术基础。
1. 网络编程基础概念
1.1 网络分层模型
"""OSI七层模型与TCP/IP四层模型对比"""# 网络通信的分层实现NETWORK_LAYERS = { "OSI_7层": [ "1. 物理层 (Physical) - 比特流传输", "2. 数据链路层 (Data Link) - 帧传输", "3. 网络层 (Network) - IP地址、路由", "4. 传输层 (Transport) - TCP/UDP、端口", "5. 会话层 (Session) - 建立/管理/终止会话", "6. 表示层 (Presentation) - 数据格式转换", "7. 应用层 (Application) - 应用程序接口" ], "TCP/IP_4层": [ "1. 网络接口层 (Network Interface)", "2. 网络层 (Internet) - IP、ICMP", "3. 传输层 (Transport) - TCP、UDP", "4. 应用层 (Application) - HTTP、FTP、SMTP" ]}# Python对应的模块/库PYTHON_NETWORK_MODULES = { "传输层": { "TCP": "socket (SOCK_STREAM)", "UDP": "socket (SOCK_DGRAM)" }, "应用层": { "HTTP": "http.client, requests", "FTP": "ftplib", "SMTP": "smtplib", "DNS": "socket.getaddrinfo()" }, "高级抽象": { "HTTP服务器": "http.server, flask, django", "异步网络": "asyncio", "RPC": "xmlrpc, grpc" }}
1.2 核心网络概念
"""网络核心概念速查"""import socketclass NetworkConcepts: """网络概念演示""" @staticmethod def show_ip_protocol_family(): """IP协议族""" families = { socket.AF_INET: 'IPv4 (32位地址)', socket.AF_INET6: 'IPv6 (128位地址)', socket.AF_UNIX: 'UNIX域套接字' } return families @staticmethod def show_socket_types(): """套接字类型""" types = { socket.SOCK_STREAM: 'TCP - 可靠、面向连接、字节流', socket.SOCK_DGRAM: 'UDP - 不可靠、无连接、数据报', socket.SOCK_RAW: '原始套接字 - 直接操作IP层' } return types @staticmethod def explain_ports(): """端口说明""" common_ports = { 20: 'FTP数据', 21: 'FTP控制', 22: 'SSH', 23: 'Telnet', 25: 'SMTP', 53: 'DNS', 80: 'HTTP', 443: 'HTTPS', 3306: 'MySQL', 5432: 'PostgreSQL', 6379: 'Redis', 8080: 'HTTP备用' } return common_ports @staticmethod def byte_order_example(): """字节序示例""" import struct # 网络字节序(大端) vs 主机字节序 value = 0x12345678 # 主机字节序转网络字节序 network_order = socket.htonl(value) # 网络字节序转主机字节序 host_order = socket.ntohl(network_order) return { '原始值': hex(value), '网络字节序': hex(network_order), '转换回主机字节序': hex(host_order), '说明': '网络通信统一使用网络字节序(大端)' }
2. Socket编程基础
2.1 TCP套接字编程
"""TCP套接字编程:可靠、面向连接、字节流"""import socketimport threadingimport timefrom typing import Optional, Tupleimport jsonclass TCPServer: """TCP服务器基类""" def __init__(self, host: str = 'localhost', port: int = 8888): self.host = host self.port = port self.server_socket = None self.is_running = False self.clients = [] # 连接的客户端列表 self.lock = threading.Lock() def start(self): """启动TCP服务器""" try: # 创建TCP套接字 self.server_socket = socket.socket( socket.AF_INET, # IPv4 socket.SOCK_STREAM # TCP ) # 设置套接字选项 self.server_socket.setsockopt( socket.SOL_SOCKET, # 通用套接字选项 socket.SO_REUSEADDR, # 地址重用 1 ) # 绑定地址和端口 self.server_socket.bind((self.host, self.port)) # 开始监听,设置最大等待连接数 self.server_socket.listen(5) print(f"TCP服务器启动在 {self.host}:{self.port}") print(f"等待客户端连接...") self.is_running = True # 接受连接循环 while self.is_running: try: # 接受客户端连接 client_socket, client_address = self.server_socket.accept() print(f"新客户端连接: {client_address}") # 创建客户端处理线程 client_thread = threading.Thread( target=self.handle_client, args=(client_socket, client_address) ) client_thread.daemon = True client_thread.start() # 记录客户端 with self.lock: self.clients.append({ 'socket': client_socket, 'address': client_address, 'thread': client_thread }) except OSError as e: if self.is_running: print(f"接受连接时出错: {e}") break except Exception as e: print(f"服务器启动失败: {e}") finally: self.stop() def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]): """处理客户端连接""" client_ip, client_port = address try: # 设置接收超时(秒) client_socket.settimeout(30.0) # 发送欢迎消息 welcome_msg = f"欢迎连接到TCP服务器!你的地址: {client_ip}:{client_port}" client_socket.sendall(welcome_msg.encode('utf-8')) # 通信循环 while self.is_running: try: # 接收数据 data = client_socket.recv(1024) # 最多接收1024字节 if not data: print(f"客户端 {address} 断开连接") break # 处理接收到的数据 message = data.decode('utf-8', errors='ignore').strip() print(f"来自 {address} 的消息: {message}") # 回显消息 response = f"服务器收到: {message}" client_socket.sendall(response.encode('utf-8')) # 特殊命令处理 if message.lower() == 'quit': response = "再见!" client_socket.sendall(response.encode('utf-8')) break except socket.timeout: # 超时检查,发送心跳 try: client_socket.sendall(b'ping') except: break except ConnectionResetError: print(f"客户端 {address} 强制关闭连接") break except Exception as e: print(f"处理客户端 {address} 时出错: {e}") finally: # 清理资源 client_socket.close() with self.lock: self.clients = [c for c in self.clients if c['socket'] != client_socket] print(f"客户端 {address} 处理完成") def broadcast(self, message: str, exclude_address: Tuple = None): """广播消息给所有客户端""" with self.lock: for client in self.clients: if exclude_address and client['address'] == exclude_address: continue try: client['socket'].sendall(message.encode('utf-8')) except: # 发送失败,可能是客户端已断开 pass def stop(self): """停止服务器""" self.is_running = False # 关闭所有客户端连接 with self.lock: for client in self.clients: try: client['socket'].close() except: pass self.clients.clear() # 关闭服务器套接字 if self.server_socket: try: self.server_socket.close() except: pass print("TCP服务器已停止")class AdvancedTCPServer(TCPServer): """增强的TCP服务器""" def __init__(self, host='localhost', port=8888): super().__init__(host, port) self.client_data = {} # 客户端数据存储 def handle_client(self, client_socket: socket.socket, address: Tuple[str, int]): """增强的客户端处理""" client_ip, client_port = address try: # 发送欢迎消息 welcome = { 'type': 'welcome', 'message': f'欢迎 {client_ip}:{client_port}', 'timestamp': time.time(), 'server_info': 'Python TCP服务器 v1.0' } self._send_json(client_socket, welcome) # 主通信循环 while self.is_running: try: # 接收数据 data = client_socket.recv(4096) if not data: print(f"客户端 {address} 断开连接") break # 尝试解析为JSON try: message = json.loads(data.decode('utf-8')) self._handle_json_message(client_socket, address, message) except json.JSONDecodeError: # 处理普通文本 text_message = data.decode('utf-8', errors='ignore').strip() self._handle_text_message(client_socket, address, text_message) except socket.timeout: # 发送心跳 heartbeat = { 'type': 'heartbeat', 'timestamp': time.time() } self._send_json(client_socket, heartbeat) except ConnectionError as e: print(f"连接错误 {address}: {e}") break except Exception as e: print(f"处理客户端 {address} 时异常: {e}") finally: self._cleanup_client(client_socket, address) def _send_json(self, sock: socket.socket, data: dict): """发送JSON数据""" json_str = json.dumps(data) sock.sendall(json_str.encode('utf-8')) def _handle_json_message(self, sock: socket.socket, address: Tuple, message: dict): """处理JSON消息""" msg_type = message.get('type', 'unknown') if msg_type == 'echo': # 回显消息 response = { 'type': 'echo_response', 'original': message.get('data', ''), 'timestamp': time.time(), 'received_from': address } self._send_json(sock, response) elif msg_type == 'broadcast': # 广播消息 broadcast_msg = message.get('message', '') self.broadcast( f"广播来自 {address}: {broadcast_msg}", exclude_address=address ) elif msg_type == 'get_clients': # 获取客户端列表 with self.lock: clients_info = [ {'address': c['address'], 'connected': True} for c in self.clients ] response = { 'type': 'clients_list', 'clients': clients_info, 'total': len(clients_info) } self._send_json(sock, response) else: # 未知消息类型 response = { 'type': 'error', 'message': f'未知的消息类型: {msg_type}', 'received': message } self._send_json(sock, response) def _handle_text_message(self, sock: socket.socket, address: Tuple, message: str): """处理文本消息""" print(f"文本消息来自 {address}: {message}") # 简单命令处理 if message.startswith('/'): command = message[1:].lower().split()[0] if len(message) > 1 else '' if command == 'time': response = f"服务器时间: {time.ctime()}" elif command == 'help': response = "可用命令: /time, /help, /quit" elif command == 'quit': response = "再见!" sock.sendall(response.encode('utf-8')) return else: response = f"未知命令: {command}" else: # 普通消息回显 response = f"收到: {message}" sock.sendall(response.encode('utf-8')) def _cleanup_client(self, sock: socket.socket, address: Tuple): """清理客户端资源""" sock.close() with self.lock: self.clients = [c for c in self.clients if c['socket'] != sock] # 通知其他客户端 leave_msg = f"客户端 {address} 已离开" self.broadcast(leave_msg) print(f"客户端 {address} 已清理")# TCP客户端实现class TCPClient: """TCP客户端""" def __init__(self, host: str = 'localhost', port: int = 8888): self.host = host self.port = port self.socket = None self.is_connected = False def connect(self) -> bool: """连接到服务器""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置超时 self.socket.settimeout(10.0) # 连接服务器 self.socket.connect((self.host, self.port)) # 接收欢迎消息 welcome = self.socket.recv(1024) print(f"服务器响应: {welcome.decode('utf-8')}") self.is_connected = True return True except Exception as e: print(f"连接失败: {e}") return False def send_message(self, message: str) -> Optional[str]: """发送消息并接收响应""" if not self.is_connected: print("未连接到服务器") return None try: # 发送消息 self.socket.sendall(message.encode('utf-8')) # 接收响应 response = self.socket.recv(1024) return response.decode('utf-8') except Exception as e: print(f"发送/接收失败: {e}") return None def send_json(self, data: dict) -> Optional[dict]: """发送JSON数据""" try: json_str = json.dumps(data) self.socket.sendall(json_str.encode('utf-8')) # 接收响应 response_data = self.socket.recv(4096) return json.loads(response_data.decode('utf-8')) except Exception as e: print(f"JSON通信失败: {e}") return None def interactive_mode(self): """交互模式""" if not self.connect(): return print("进入交互模式,输入 'quit' 退出") try: while True: # 获取用户输入 user_input = input("> ").strip() if not user_input: continue if user_input.lower() == 'quit': # 发送退出命令 self.send_message('quit') break # 发送消息 response = self.send_message(user_input) if response: print(f"服务器: {response}") except KeyboardInterrupt: print("\n中断连接") finally: self.disconnect() def disconnect(self): """断开连接""" if self.socket: try: self.socket.close() except: pass self.socket = None self.is_connected = False print("已断开连接")
2.2 UDP套接字编程
"""UDP套接字编程:无连接、不可靠、数据报"""import socketimport threadingimport timefrom typing import Optional, Tupleclass UDPServer: """UDP服务器""" def __init__(self, host: str = 'localhost', port: int = 9999): self.host = host self.port = port self.socket = None self.is_running = False self.clients = set() # 记录连接过的客户端 self.lock = threading.Lock() def start(self): """启动UDP服务器""" try: # 创建UDP套接字 self.socket = socket.socket( socket.AF_INET, # IPv4 socket.SOCK_DGRAM # UDP ) # 绑定地址和端口 self.socket.bind((self.host, self.port)) print(f"UDP服务器启动在 {self.host}:{self.port}") print("等待数据报...") self.is_running = True # 接收数据报循环 while self.is_running: try: # 接收数据(UDP不需要accept) data, client_address = self.socket.recvfrom(1024) # 记录客户端 with self.lock: self.clients.add(client_address) # 处理数据 self.handle_datagram(data, client_address) except OSError as e: if self.is_running: print(f"接收数据报时出错: {e}") break except Exception as e: print(f"服务器启动失败: {e}") finally: self.stop() def handle_datagram(self, data: bytes, client_address: Tuple[str, int]): """处理接收到的数据报""" try: message = data.decode('utf-8', errors='ignore').strip() client_ip, client_port = client_address print(f"来自 {client_ip}:{client_port} 的消息: {message}") # 处理不同类型的数据 if message.lower() == 'ping': response = 'pong' elif message.lower() == 'time': response = f"当前时间: {time.ctime()}" elif message.lower() == 'clients': with self.lock: client_count = len(self.clients) response = f"连接过的客户端数: {client_count}" elif message.lower() == 'quit': response = '服务器收到退出指令' # 向客户端发送响应后,可以选择停止服务器 self._send_response(client_address, response) self.stop() return else: response = f"收到: {message}" # 发送响应 self._send_response(client_address, response) except Exception as e: print(f"处理数据报时出错: {e}") def _send_response(self, client_address: Tuple[str, int], message: str): """发送响应到客户端""" try: self.socket.sendto(message.encode('utf-8'), client_address) except Exception as e: print(f"发送响应失败: {e}") def broadcast(self, message: str): """广播消息给所有连接过的客户端""" with self.lock: for client_address in self.clients: try: self.socket.sendto(message.encode('utf-8'), client_address) except: pass # 客户端可能已不可达 def stop(self): """停止服务器""" self.is_running = False if self.socket: try: self.socket.close() except: pass print("UDP服务器已停止")class UDPClient: """UDP客户端""" def __init__(self, server_host: str = 'localhost', server_port: int = 9999): self.server_address = (server_host, server_port) self.socket = None self.timeout = 5.0 # 超时时间(秒) def create_socket(self): """创建UDP套接字""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.settimeout(self.timeout) return True except Exception as e: print(f"创建套接字失败: {e}") return False def send_message(self, message: str, retry_count: int = 3) -> Optional[str]: """发送消息到服务器(带重试)""" if not self.socket and not self.create_socket(): return None for attempt in range(retry_count): try: # 发送消息 self.socket.sendto(message.encode('utf-8'), self.server_address) # 尝试接收响应(UDP可能没有响应) try: response_data, server_addr = self.socket.recvfrom(1024) response = response_data.decode('utf-8') return response except socket.timeout: if attempt < retry_count - 1: print(f"超时,第 {attempt + 1} 次重试...") continue else: print("服务器无响应") return None except Exception as e: print(f"发送失败: {e}") if attempt < retry_count - 1: time.sleep(1) # 等待后重试 else: return None return None def measure_latency(self, count: int = 10) -> dict: """测量网络延迟""" if not self.create_socket(): return {'error': '无法创建套接字'} latencies = [] successes = 0 for i in range(count): try: start_time = time.time() # 发送ping self.socket.sendto(b'ping', self.server_address) # 等待pong response, _ = self.socket.recvfrom(1024) if response.decode('utf-8') == 'pong': end_time = time.time() latency = (end_time - start_time) * 1000 # 转换为毫秒 latencies.append(latency) successes += 1 except socket.timeout: print(f"测试 {i+1}: 超时") except Exception as e: print(f"测试 {i+1}: 错误 - {e}") # 计算结果 if latencies: results = { 'total_tests': count, 'successful': successes, 'success_rate': (successes / count) * 100, 'min_latency_ms': min(latencies), 'max_latency_ms': max(latencies), 'avg_latency_ms': sum(latencies) / len(latencies), 'all_latencies': latencies } else: results = { 'total_tests': count, 'successful': 0, 'success_rate': 0, 'error': '所有测试都失败' } return results def close(self): """关闭套接字""" if self.socket: try: self.socket.close() except: pass self.socket = None
3. HTTP协议实现
3.1 原始HTTP客户端
"""原始HTTP客户端实现"""import socketimport sslfrom urllib.parse import urlparsefrom typing import Dict, Optional, Tupleimport jsonclass RawHTTPClient: """原始HTTP客户端(直接使用socket)""" def __init__(self): self.default_ports = { 'http': 80, 'https': 443 } self.user_agent = 'PythonRawHTTPClient/1.0' def request(self, url: str, method: str = 'GET', headers: Dict = None, data: str = None, timeout: float = 10.0) -> Tuple[int, Dict, str]: """发送HTTP请求""" # 解析URL parsed = urlparse(url) scheme = parsed.scheme.lower() hostname = parsed.hostname port = parsed.port or self.default_ports.get(scheme, 80) path = parsed.path or '/' query = f'?{parsed.query}' if parsed.query else '' if not hostname: raise ValueError(f"无效的URL: {url}") # 创建socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) try: # 连接服务器 sock.connect((hostname, port)) # 对于HTTPS,包装socket if scheme == 'https': context = ssl.create_default_context() sock = context.wrap_socket(sock, server_hostname=hostname) # 构建请求 request_lines = self._build_request( method, hostname, port, path, query, headers or {}, data ) # 发送请求 sock.sendall(request_lines.encode('utf-8')) # 接收响应 response = self._receive_response(sock) return response finally: sock.close() def _build_request(self, method: str, hostname: str, port: int, path: str, query: str, headers: Dict, data: str) -> str: """构建HTTP请求""" # 请求行 request_line = f"{method.upper()}{path}{query} HTTP/1.1\r\n" # 默认头部 default_headers = { 'Host': f'{hostname}:{port}' if port != 80 else hostname, 'User-Agent': self.user_agent, 'Accept': '*/*', 'Connection': 'close' } # 更新头部 if headers: default_headers.update(headers) # 如果有请求体,添加Content-Length if data: default_headers['Content-Length'] = str(len(data.encode('utf-8'))) if 'Content-Type' not in default_headers: default_headers['Content-Type'] = 'application/x-www-form-urlencoded' # 构建头部 headers_str = '' for key, value in default_headers.items(): headers_str += f"{key}: {value}\r\n" # 完整的请求 request = request_line + headers_str + '\r\n' # 如果有请求体,添加到末尾 if data: request += data return request def _receive_response(self, sock: socket.socket) -> Tuple[int, Dict, str]: """接收HTTP响应""" BUFFER_SIZE = 4096 response_data = b'' # 接收数据直到连接关闭 while True: try: chunk = sock.recv(BUFFER_SIZE) if not chunk: break response_data += chunk except socket.timeout: break except: break # 解析响应 response_str = response_data.decode('utf-8', errors='ignore') # 分割响应头和响应体 header_end = response_str.find('\r\n\r\n') if header_end == -1: raise ValueError("无效的HTTP响应") headers_part = response_str[:header_end] body = response_str[header_end + 4:] # 解析状态行 lines = headers_part.split('\r\n') if not lines: raise ValueError("无效的HTTP响应") status_line = lines[0] status_parts = status_line.split(' ', 2) if len(status_parts) < 3: raise ValueError("无效的状态行") protocol, status_code, status_text = status_parts status_code = int(status_code) # 解析响应头 response_headers = {} for line in lines[1:]: if ': ' in line: key, value = line.split(': ', 1) response_headers[key] = value return status_code, response_headers, body def get(self, url: str, **kwargs) -> Tuple[int, Dict, str]: """发送GET请求""" return self.request(url, 'GET', **kwargs) def post(self, url: str, data: str = None, **kwargs) -> Tuple[int, Dict, str]: """发送POST请求""" return self.request(url, 'POST', data=data, **kwargs) def head(self, url: str, **kwargs) -> Tuple[int, Dict, str]: """发送HEAD请求""" return self.request(url, 'HEAD', **kwargs) def get_json(self, url: str, **kwargs) -> Optional[Dict]: """获取JSON响应""" status, headers, body = self.get(url, **kwargs) if 200 <= status < 300: try: return json.loads(body) except json.JSONDecodeError: return None return None# 使用示例def test_raw_http_client(): """测试原始HTTP客户端""" client = RawHTTPClient() # 测试HTTP请求 print("=== 测试HTTP请求 ===") status, headers, body = client.get('http://httpbin.org/get') print(f"状态码: {status}") print(f"响应头: {list(headers.keys())[:5]}...") print(f"响应体长度: {len(body)} 字节") # 测试HTTPS请求 print("\n=== 测试HTTPS请求 ===") status, headers, body = client.get('https://httpbin.org/get') print(f"状态码: {status}") # 测试POST请求 print("\n=== 测试POST请求 ===") post_data = 'key1=value1&key2=value2' status, headers, body = client.post( 'https://httpbin.org/post', data=post_data, headers={'Content-Type': 'application/x-www-form-urlencoded'} ) print(f"状态码: {status}") # 测试JSON响应 print("\n=== 测试JSON响应 ===") json_data = client.get_json('https://httpbin.org/json') if json_data: print(f"获取到JSON数据,键: {list(json_data.keys())}") return client
3.2 简单HTTP服务器
"""简单HTTP服务器实现"""import socketimport threadingimport mimetypesimport osfrom pathlib import Pathfrom datetime import datetimeimport urllib.parseclass SimpleHTTPServer: """简单的HTTP服务器""" def __init__(self, host: str = 'localhost', port: int = 8000, web_root: str = '.'): self.host = host self.port = port self.web_root = Path(web_root).resolve() self.server_socket = None self.is_running = False self.routes = {} # 路由映射 # 初始化MIME类型 mimetypes.init() # 注册默认路由 self._register_default_routes() def _register_default_routes(self): """注册默认路由""" self.routes = { 'GET': { '/': self._handle_index, '/echo': self._handle_echo, '/time': self._handle_time, '/headers': self._handle_headers, '/form': self._handle_form_get, }, 'POST': { '/echo': self._handle_echo, '/form': self._handle_form_post, } } def start(self): """启动HTTP服务器""" try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) print(f"HTTP服务器启动在 http://{self.host}:{self.port}") print(f"文档根目录: {self.web_root}") print("按 Ctrl+C 停止服务器") self.is_running = True while self.is_running: try: client_socket, client_address = self.server_socket.accept() # 创建线程处理请求 thread = threading.Thread( target=self._handle_request, args=(client_socket, client_address) ) thread.daemon = True thread.start() except KeyboardInterrupt: print("\n收到中断信号") break except Exception as e: if self.is_running: print(f"接受连接时出错: {e}") except Exception as e: print(f"服务器启动失败: {e}") finally: self.stop() def _handle_request(self, client_socket: socket.socket, client_address: tuple): """处理HTTP请求""" try: # 接收请求数据 request_data = client_socket.recv(1024) if not request_data: return request_text = request_data.decode('utf-8') # 解析请求 request_info = self._parse_request(request_text) # 记录访问日志 self._log_request(client_address, request_info) # 处理请求 response = self._process_request(request_info) # 发送响应 client_socket.sendall(response.encode('utf-8')) except Exception as e: print(f"处理请求时出错: {e}") error_response = self._create_error_response(500, str(e)) client_socket.sendall(error_response.encode('utf-8')) finally: client_socket.close() def _parse_request(self, request_text: str) -> dict: """解析HTTP请求""" lines = request_text.strip().split('\r\n') if not lines: raise ValueError("空请求") # 解析请求行 request_line = lines[0] method, path, version = request_line.split(' ', 2) # 解析请求头 headers = {} body_start = 0 for i, line in enumerate(lines[1:], 1): if not line.strip(): # 空行,头部结束 body_start = i + 1 break if ': ' in line: key, value = line.split(': ', 1) headers[key] = value # 解析请求体 body = '' if body_start < len(lines): body = '\r\n'.join(lines[body_start:]) # 解析查询参数 parsed_path = urllib.parse.urlparse(path) path = parsed_path.path query_params = urllib.parse.parse_qs(parsed_path.query) return { 'method': method, 'path': path, 'version': version, 'headers': headers, 'body': body, 'query_params': query_params, 'raw': request_text } def _process_request(self, request_info: dict) -> str: """处理请求并生成响应""" method = request_info['method'] path = request_info['path'] # 检查是否匹配注册的路由 if method in self.routes and path in self.routes[method]: handler = self.routes[method][path] return handler(request_info) # 否则作为静态文件处理 return self._handle_static_file(request_info) def _handle_static_file(self, request_info: dict) -> str: """处理静态文件请求""" path = request_info['path'] # 安全性检查:防止路径遍历攻击 requested_path = (self.web_root / path.lstrip('/')).resolve() # 确保请求路径在web根目录内 if not str(requested_path).startswith(str(self.web_root)): return self._create_error_response(403, "禁止访问") # 检查文件是否存在 if not requested_path.exists(): return self._create_error_response(404, "文件未找到") # 如果是目录,寻找index.html if requested_path.is_dir(): index_file = requested_path / 'index.html' if index_file.exists(): requested_path = index_file else: # 列出目录内容 return self._list_directory(requested_path, path) # 检查是否是文件 if not requested_path.is_file(): return self._create_error_response(403, "不是文件") # 读取文件内容 try: with open(requested_path, 'rb') as f: content = f.read() # 获取MIME类型 mime_type, _ = mimetypes.guess_type(str(requested_path)) if not mime_type: mime_type = 'application/octet-stream' # 构建响应 response_lines = [ 'HTTP/1.1 200 OK', f'Content-Type: {mime_type}', f'Content-Length: {len(content)}', f'Date: {datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")}', 'Server: SimpleHTTPServer', 'Connection: close', '', '' ] response = '\r\n'.join(response_lines).encode('utf-8') + content return response.decode('utf-8', errors='ignore') except Exception as e: return self._create_error_response(500, f"读取文件失败: {e}") def _list_directory(self, dir_path: Path, url_path: str) -> str: """列出目录内容""" try: items = [] for item in dir_path.iterdir(): is_dir = item.is_dir() name = item.name + ('/' if is_dir else '') size = os.path.getsize(item) if not is_dir else '-' items.append({ 'name': name, 'size': size, 'url': f"{url_path.rstrip('/')}/{urllib.parse.quote(name)}" }) # 生成HTML html_lines = [ '<!DOCTYPE html>', '<html><head><title>目录列表</title></head>', '<body>', '<h1>目录: ' + url_path + '</h1>', '<table border="1">', '<tr><th>名称</th><th>大小</th></tr>' ] # 父目录链接 if url_path != '/': parent_path = '/'.join(url_path.rstrip('/').split('/')[:-1]) html_lines.append( f'<tr><td><a href="{parent_path or"/"}">../</a></td><td>-</td></tr>' ) for item in items: html_lines.append( f'<tr><td><a href="{item["url"]}">{item["name"]}</a></td>' f'<td>{item["size"]}</td></tr>' ) html_lines.extend([ '</table>', '</body></html>' ]) html = '\n'.join(html_lines) response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) except Exception as e: return self._create_error_response(500, f"列出目录失败: {e}") def _handle_index(self, request_info: dict) -> str: """处理首页请求""" html = """ <!DOCTYPE html> <html> <head><title>Simple HTTP Server</title></head> <body> <h1>欢迎使用Simple HTTP Server</h1> <ul> <li><a href="/time">查看服务器时间</a></li> <li><a href="/headers">查看请求头</a></li> <li><a href="/form">表单测试</a></li> <li><a href="/echo">回显测试</a></li> </ul> </body> </html> """ response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) def _handle_time(self, request_info: dict) -> str: """处理时间请求""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/plain; charset=utf-8', f'Content-Length: {len(current_time.encode("utf-8"))}', '', current_time ] return '\r\n'.join(response_lines) def _handle_headers(self, request_info: dict) -> str: """处理请求头查看""" headers = request_info['headers'] html_lines = [ '<!DOCTYPE html>', '<html><head><title>请求头</title></head>', '<body><h1>请求头信息</h1><table border="1">', '<tr><th>Header</th><th>Value</th></tr>' ] for key, value in headers.items(): html_lines.append(f'<tr><td>{key}</td><td>{value}</td></tr>') html_lines.append('</table></body></html>') html = '\n'.join(html_lines) response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) def _handle_echo(self, request_info: dict) -> str: """处理回显请求""" method = request_info['method'] if method == 'GET': # 显示表单 query_params = request_info['query_params'] message = query_params.get('message', [''])[0] elif method == 'POST': # 解析POST数据 body = request_info['body'] parsed = urllib.parse.parse_qs(body) message = parsed.get('message', [''])[0] else: message = '' html = f""" <!DOCTYPE html> <html> <head><title>回显测试</title></head> <body> <h1>回显测试</h1> <form method="POST" action="/echo"> <label>消息: <input type="text" name="message" value="{message}"></label> <button type="submit">发送</button> </form> {"<h2>收到的消息: " + message + "</h2>"if message else""} </body> </html> """ response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) def _handle_form_get(self, request_info: dict) -> str: """处理GET表单""" html = """ <!DOCTYPE html> <html> <head><title>表单测试</title></head> <body> <h1>表单测试</h1> <form method="POST" action="/form"> <div> <label>用户名: <input type="text" name="username"></label> </div> <div> <label>邮箱: <input type="email" name="email"></label> </div> <div> <label>留言:</label><br> <textarea name="message" rows="4" cols="50"></textarea> </div> <div> <button type="submit">提交</button> </div> </form> </body> </html> """ response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) def _handle_form_post(self, request_info: dict) -> str: """处理POST表单""" body = request_info['body'] parsed = urllib.parse.parse_qs(body) result_html = '<h2>提交的数据:</h2><ul>' for key, values in parsed.items(): for value in values: result_html += f'<li>{key}: {value}</li>' result_html += '</ul>' html = f""" <!DOCTYPE html> <html> <head><title>表单提交结果</title></head> <body> <h1>表单提交成功</h1> {result_html} <p><a href="/form">返回表单</a></p> </body> </html> """ response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) def _create_error_response(self, status_code: int, message: str) -> str: """创建错误响应""" status_messages = { 400: 'Bad Request', 403: 'Forbidden', 404: 'Not Found', 500: 'Internal Server Error' } status_text = status_messages.get(status_code, 'Unknown Error') html = f""" <!DOCTYPE html> <html> <head><title>{status_code} {status_text}</title></head> <body> <h1>{status_code} {status_text}</h1> <p>{message}</p> <p><a href="/">返回首页</a></p> </body> </html> """ response_lines = [ f'HTTP/1.1 {status_code}{status_text}', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) def _log_request(self, client_address: tuple, request_info: dict): """记录访问日志""" ip, port = client_address method = request_info['method'] path = request_info['path'] user_agent = request_info['headers'].get('User-Agent', 'Unknown') log_line = ( f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"{ip}:{port} - {method}{path} - {user_agent}" ) print(log_line) def add_route(self, method: str, path: str, handler): """添加自定义路由""" if method not in self.routes: self.routes[method] = {} self.routes[method][path] = handler def stop(self): """停止服务器""" self.is_running = False if self.server_socket: try: self.server_socket.close() except: pass print("HTTP服务器已停止")# 使用示例def run_http_server_example(): """运行HTTP服务器示例""" # 创建服务器 server = SimpleHTTPServer( host='localhost', port=8000, web_root='./web_root' # 创建一个web_root目录存放静态文件 ) # 添加自定义路由 def handle_custom(request_info): html = """ <!DOCTYPE html> <html> <head><title>自定义路由</title></head> <body> <h1>这是自定义路由</h1> <p>当前时间: """ + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + """</p> </body> </html> """ response_lines = [ 'HTTP/1.1 200 OK', 'Content-Type: text/html; charset=utf-8', f'Content-Length: {len(html.encode("utf-8"))}', '', html ] return '\r\n'.join(response_lines) server.add_route('GET', '/custom', handle_custom) # 启动服务器 try: server.start() except KeyboardInterrupt: print("\n服务器被用户中断") server.stop()# 创建测试目录和文件def setup_test_environment(): """设置测试环境""" import os # 创建web根目录 web_root = Path('./web_root') web_root.mkdir(exist_ok=True) # 创建index.html index_html = web_root / 'index.html' if not index_html.exists(): index_html.write_text(""" <!DOCTYPE html> <html> <head><title>测试页面</title></head> <body> <h1>欢迎来到测试网站</h1> <p>这是一个测试用的静态文件。</p> </body> </html> """) # 创建其他测试文件 test_txt = web_root / 'test.txt' test_txt.write_text("这是一个文本文件\n第二行\n第三行") # 创建子目录 subdir = web_root / 'subdir' subdir.mkdir(exist_ok=True) sub_file = subdir / 'readme.md' sub_file.write_text("# 子目录文件\n\n这个文件在子目录中。") print("测试环境设置完成") print(f"Web根目录: {web_root.absolute()}")
4. 网络编程实战:简单聊天室
"""网络编程实战:简单聊天室"""import socketimport threadingimport jsonimport timefrom datetime import datetimefrom typing import Dict, List, Optionalfrom dataclasses import dataclass, asdictfrom enum import Enumclass MessageType(Enum): """消息类型""" TEXT = "text" JOIN = "join" LEAVE = "leave" SYSTEM = "system" COMMAND = "command"@dataclassclass ChatMessage: """聊天消息""" type: MessageType sender: str content: str timestamp: float room: str = "default" def to_dict(self): """转换为字典""" data = asdict(self) data['type'] = self.type.value data['timestamp'] = datetime.fromtimestamp(self.timestamp).isoformat() return data @classmethod def from_dict(cls, data: dict): """从字典创建""" data['type'] = MessageType(data['type']) data['timestamp'] = datetime.fromisoformat(data['timestamp']).timestamp() return cls(**data)class ChatRoom: """聊天室""" def __init__(self, name: str = "default"): self.name = name self.clients: Dict[str, socket.socket] = {} # 用户名 -> socket self.messages: List[ChatMessage] = [] self.max_messages = 100 self.lock = threading.Lock() def add_client(self, username: str, client_socket: socket.socket): """添加客户端""" with self.lock: self.clients[username] = client_socket # 发送加入消息 join_msg = ChatMessage( type=MessageType.JOIN, sender="系统", content=f"{username} 加入了聊天室", timestamp=time.time(), room=self.name ) self.add_message(join_msg) self.broadcast(join_msg, exclude=username) # 发送历史消息给新用户 self.send_history(username) return join_msg def remove_client(self, username: str): """移除客户端""" with self.lock: if username in self.clients: del self.clients[username] # 发送离开消息 leave_msg = ChatMessage( type=MessageType.LEAVE, sender="系统", content=f"{username} 离开了聊天室", timestamp=time.time(), room=self.name ) self.add_message(leave_msg) self.broadcast(leave_msg) return leave_msg def add_message(self, message: ChatMessage): """添加消息到历史""" with self.lock: self.messages.append(message) # 限制消息数量 if len(self.messages) > self.max_messages: self.messages = self.messages[-self.max_messages:] def broadcast(self, message: ChatMessage, exclude: str = None): """广播消息""" data = json.dumps(message.to_dict()) with self.lock: for username, sock in self.clients.items(): if username == exclude: continue try: sock.sendall(data.encode('utf-8')) except: # 发送失败,客户端可能已断开 pass def send_history(self, username: str, count: int = 20): """发送历史消息""" with self.lock: if username not in self.clients: return # 获取最近的消息 recent_messages = self.messages[-count:] if self.messages else [] for message in recent_messages: try: data = json.dumps(message.to_dict()) self.clients[username].sendall(data.encode('utf-8')) except: break def send_to_user(self, username: str, message: ChatMessage): """发送消息给特定用户""" with self.lock: if username in self.clients: try: data = json.dumps(message.to_dict()) self.clients[username].sendall(data.encode('utf-8')) return True except: pass return False def get_client_count(self) -> int: """获取客户端数量""" with self.lock: return len(self.clients) def get_client_list(self) -> List[str]: """获取客户端列表""" with self.lock: return list(self.clients.keys())class ChatServer: """聊天服务器""" def __init__(self, host: str = 'localhost', port: int = 9000): self.host = host self.port = port self.server_socket = None self.is_running = False self.rooms: Dict[str, ChatRoom] = {"default": ChatRoom("default")} self.user_rooms: Dict[str, str] = {} # 用户名 -> 房间名 self.lock = threading.Lock() def start(self): """启动聊天服务器""" try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) print(f"聊天服务器启动在 {self.host}:{self.port}") print("等待客户端连接...") self.is_running = True while self.is_running: try: client_socket, client_address = self.server_socket.accept() # 创建线程处理客户端 thread = threading.Thread( target=self.handle_client, args=(client_socket, client_address) ) thread.daemon = True thread.start() except KeyboardInterrupt: print("\n收到中断信号") break except Exception as e: if self.is_running: print(f"接受连接时出错: {e}") except Exception as e: print(f"服务器启动失败: {e}") finally: self.stop() def handle_client(self, client_socket: socket.socket, client_address: tuple): """处理客户端连接""" client_ip, client_port = client_address username = None try: # 设置超时 client_socket.settimeout(30.0) # 第一步:用户认证/注册 auth_data = client_socket.recv(1024) if not auth_data: return try: auth_info = json.loads(auth_data.decode('utf-8')) username = auth_info.get('username') room_name = auth_info.get('room', 'default') if not username: error_msg = {"error": "需要用户名"} client_socket.sendall(json.dumps(error_msg).encode('utf-8')) return # 检查用户名是否已存在 with self.lock: if username in self.user_rooms: error_msg = {"error": "用户名已存在"} client_socket.sendall(json.dumps(error_msg).encode('utf-8')) return except json.JSONDecodeError: error_msg = {"error": "无效的认证数据"} client_socket.sendall(json.dumps(error_msg).encode('utf-8')) return # 加入房间 with self.lock: if room_name not in self.rooms: self.rooms[room_name] = ChatRoom(room_name) self.user_rooms[username] = room_name room = self.rooms[room_name] # 添加客户端到房间 join_msg = room.add_client(username, client_socket) print(f"用户 {username} 加入房间 {room_name} 来自 {client_ip}:{client_port}") # 发送欢迎消息 welcome = { "type": "system", "sender": "系统", "content": f"欢迎来到聊天室 {room_name}! 输入 /help 查看帮助", "timestamp": datetime.now().isoformat(), "room": room_name } client_socket.sendall(json.dumps(welcome).encode('utf-8')) # 主消息循环 while self.is_running: try: # 接收消息 message_data = client_socket.recv(4096) if not message_data: print(f"用户 {username} 断开连接") break # 解析消息 try: message_dict = json.loads(message_data.decode('utf-8')) message_type = MessageType(message_dict.get('type', 'text')) if message_type == MessageType.TEXT: content = message_dict.get('content', '') # 检查是否是命令 if content.startswith('/'): self._handle_command(username, room, content) else: # 创建文本消息 text_msg = ChatMessage( type=MessageType.TEXT, sender=username, content=content, timestamp=time.time(), room=room_name ) room.add_message(text_msg) room.broadcast(text_msg, exclude=username) except json.JSONDecodeError: print(f"无效的JSON数据来自 {username}") except socket.timeout: # 发送心跳 heartbeat = { "type": "system", "sender": "系统", "content": "ping", "timestamp": datetime.now().isoformat() } try: client_socket.sendall(json.dumps(heartbeat).encode('utf-8')) except: break except ConnectionError: print(f"用户 {username} 连接错误") break except Exception as e: print(f"处理客户端 {username or client_address} 时出错: {e}") finally: # 清理 if username: with self.lock: if username in self.user_rooms: room_name = self.user_rooms[username] if room_name in self.rooms: room = self.rooms[room_name] room.remove_client(username) del self.user_rooms[username] client_socket.close() print(f"客户端 {username or client_address} 处理完成") def _handle_command(self, username: str, room: ChatRoom, command: str): """处理命令""" parts = command[1:].split() cmd = parts[0].lower() if parts else "" args = parts[1:] if len(parts) > 1 else [] response = None if cmd == "help": help_text = """ 可用命令: /help - 显示此帮助 /list - 列出在线用户 /room <房间名> - 切换房间 /pm <用户名> <消息> - 私聊 /quit - 退出聊天室 """ response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=help_text, timestamp=time.time(), room=room.name ) elif cmd == "list": users = room.get_client_list() user_list = ", ".join(users) response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=f"在线用户 ({len(users)}): {user_list}", timestamp=time.time(), room=room.name ) elif cmd == "pm" and len(args) >= 2: target_user = args[0] pm_message = " ".join(args[1:]) # 查找目标用户 target_room_name = None with self.lock: if target_user in self.user_rooms: target_room_name = self.user_rooms[target_user] if target_room_name: target_room = self.rooms.get(target_room_name) if target_room: pm_msg = ChatMessage( type=MessageType.TEXT, sender=f"[私]{username}", content=pm_message, timestamp=time.time(), room=target_room.name ) # 发送给目标用户 if target_room.send_to_user(target_user, pm_msg): # 发送确认给发起者 response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=f"私聊已发送给 {target_user}", timestamp=time.time(), room=room.name ) else: response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=f"无法发送私聊给 {target_user}", timestamp=time.time(), room=room.name ) else: response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=f"用户 {target_user} 不在线", timestamp=time.time(), room=room.name ) elif cmd == "room" and args: new_room = args[0] # 注意:实际实现中需要更复杂的房间切换逻辑 response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=f"房间切换功能待实现,当前房间: {room.name}", timestamp=time.time(), room=room.name ) elif cmd == "quit": # 处理退出命令 response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content="正在断开连接...", timestamp=time.time(), room=room.name ) # 实际退出逻辑在主循环中处理 else: response = ChatMessage( type=MessageType.SYSTEM, sender="系统", content=f"未知命令: {cmd},输入 /help 查看帮助", timestamp=time.time(), room=room.name ) # 发送响应 if response: room.send_to_user(username, response) def stop(self): """停止服务器""" self.is_running = False if self.server_socket: try: self.server_socket.close() except: pass # 通知所有客户端 for room_name, room in self.rooms.items(): shutdown_msg = ChatMessage( type=MessageType.SYSTEM, sender="系统", content="服务器正在关闭", timestamp=time.time(), room=room_name ) room.broadcast(shutdown_msg) print("聊天服务器已停止")class ChatClient: """聊天客户端""" def __init__(self, server_host: str = 'localhost', server_port: int = 9000): self.server_host = server_host self.server_port = server_port self.socket = None self.username = None self.current_room = "default" self.is_connected = False self.receive_thread = None def connect(self, username: str, room: str = "default") -> bool: """连接到聊天服务器""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(5.0) # 连接服务器 self.socket.connect((self.server_host, self.server_port)) # 发送认证信息 auth_info = { "username": username, "room": room } self.socket.sendall(json.dumps(auth_info).encode('utf-8')) # 接收响应 response_data = self.socket.recv(1024) response = json.loads(response_data.decode('utf-8')) if "error" in response: print(f"连接失败: {response['error']}") return False self.username = username self.current_room = room self.is_connected = True # 启动接收线程 self.receive_thread = threading.Thread(target=self._receive_messages, daemon=True) self.receive_thread.start() return True except Exception as e: print(f"连接失败: {e}") return False def _receive_messages(self): """接收消息的线程""" while self.is_connected: try: message_data = self.socket.recv(4096) if not message_data: print("服务器断开连接") self.is_connected = False break # 解析消息 message = json.loads(message_data.decode('utf-8')) self._display_message(message) except socket.timeout: continue except json.JSONDecodeError: print("收到无效的消息") except ConnectionError: print("连接错误") self.is_connected = False break except Exception as e: print(f"接收消息时出错: {e}") self.is_connected = False break def _display_message(self, message: dict): """显示消息""" msg_type = message.get('type', 'text') sender = message.get('sender', '未知') content = message.get('content', '') timestamp = message.get('timestamp', '') if msg_type == 'system' and content == 'ping': # 忽略心跳 return # 格式化时间 try: dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) time_str = dt.strftime('%H:%M:%S') except: time_str = timestamp # 格式化输出 if msg_type == 'join': print(f"\n[{time_str}] {content}") elif msg_type == 'leave': print(f"\n[{time_str}] {content}") elif msg_type == 'system': print(f"\n[{time_str}] 系统: {content}") else: print(f"\n[{time_str}] {sender}: {content}") # 显示输入提示 self._show_prompt() def _show_prompt(self): """显示输入提示""" print(f"\n[{self.username}] > ", end='', flush=True) def send_message(self, message: str): """发送消息""" if not self.is_connected: print("未连接到服务器") return try: message_data = { "type": "text", "content": message } self.socket.sendall(json.dumps(message_data).encode('utf-8')) except Exception as e: print(f"发送消息失败: {e}") self.is_connected = False def run_interactive(self): """运行交互式客户端""" print("=== Python聊天客户端 ===") # 获取用户名 while True: username = input("请输入用户名: ").strip() if username: break print("用户名不能为空") # 获取房间名 room = input("请输入房间名 (默认: default): ").strip() if not room: room = "default" # 连接服务器 if not self.connect(username, room): return print(f"\n已连接到聊天服务器,房间: {room}") print("输入 /help 查看命令,输入 /quit 退出") self._show_prompt() try: while self.is_connected: # 获取用户输入 try: message = input() except EOFError: print("\n输入结束") break except KeyboardInterrupt: print("\n中断连接") break # 发送消息 if message.strip().lower() == '/quit': self.send_message('/quit') time.sleep(0.5) # 等待消息发送 break else: self.send_message(message) finally: self.disconnect() def disconnect(self): """断开连接""" self.is_connected = False if self.socket: try: self.socket.close() except: pass print("已断开连接")# 运行示例def run_chat_example(): """运行聊天示例""" import sys if len(sys.argv) > 1 and sys.argv[1] == 'server': # 运行服务器 server = ChatServer() try: server.start() except KeyboardInterrupt: server.stop() else: # 运行客户端 client = ChatClient() client.run_interactive()# 多客户端测试脚本def test_multiple_clients(): """测试多个客户端""" import threading import random import time class TestClient(threading.Thread): def __init__(self, client_id): super().__init__() self.client_id = client_id self.username = f"user{client_id}" def run(self): client = ChatClient() if client.connect(self.username): print(f"客户端 {self.username} 连接成功") # 发送几条消息 for i in range(3): message = f"消息 {i+1} 来自 {self.username}" client.send_message(message) time.sleep(random.uniform(0.5, 2.0)) # 退出 client.send_message('/quit') time.sleep(0.5) client.disconnect() else: print(f"客户端 {self.username} 连接失败") # 创建并启动多个测试客户端 threads = [] for i in range(1, 4): # 3个客户端 thread = TestClient(i) thread.start() threads.append(thread) time.sleep(0.5) # 避免同时连接 # 等待所有线程完成 for thread in threads: thread.join() print("多客户端测试完成")
总结
Python网络编程是现代应用开发的基础技能,关键要点:
分层理解:从TCP/IP协议栈到应用层协议的系统性理解
socket核心:掌握socket API,理解TCP/UDP差异
并发处理:熟练使用多线程、多进程、异步I/O处理并发连接
协议实现:能够实现自定义协议和常见应用层协议
安全实践:重视网络安全,实现加密、认证和防护
核心技能矩阵:
下一步学习建议:
明天我们将学习socket编程的深入内容,探索更高级的网络编程技术和模式。