当前位置:首页>python>一天一个Python知识点——Day 135:Python网络编程基础

一天一个Python知识点——Day 135:Python网络编程基础

  • 2026-02-06 09:15:53
一天一个Python知识点——Day 135:Python网络编程基础

经过Web安全基础的学习,今天我们进入网络编程基础,这是理解互联网通信、构建网络应用和Web服务的核心技术基础。

1. 网络编程基础概念

1.1 网络分层模型

"""OSI七层模型与TCP/IP四层模型对比"""# 网络通信的分层实现NETWORK_LAYERS = {    "OSI_7层": [        "1. 物理层 (Physical) - 比特流传输",        "2. 数据链路层 (Data Link) - 帧传输",        "3. 网络层 (Network) - IP地址、路由",        "4. 传输层 (Transport) - TCP/UDP、端口",        "5. 会话层 (Session) - 建立/管理/终止会话",        "6. 表示层 (Presentation) - 数据格式转换",        "7. 应用层 (Application) - 应用程序接口"    ],    "TCP/IP_4层": [        "1. 网络接口层 (Network Interface)",        "2. 网络层 (Internet) - IP、ICMP",        "3. 传输层 (Transport) - TCP、UDP",        "4. 应用层 (Application) - HTTP、FTP、SMTP"    ]}# Python对应的模块/库PYTHON_NETWORK_MODULES = {    "传输层": {        "TCP""socket (SOCK_STREAM)",        "UDP""socket (SOCK_DGRAM)"    },    "应用层": {        "HTTP""http.client, requests",        "FTP""ftplib",        "SMTP""smtplib",        "DNS""socket.getaddrinfo()"    },    "高级抽象": {        "HTTP服务器""http.server, flask, django",        "异步网络""asyncio",        "RPC""xmlrpc, grpc"    }}

1.2 核心网络概念

"""网络核心概念速查"""import socketclass NetworkConcepts:    """网络概念演示"""    @staticmethod    def show_ip_protocol_family():        """IP协议族"""        families = {            socket.AF_INET: 'IPv4 (32位地址)',            socket.AF_INET6: 'IPv6 (128位地址)',            socket.AF_UNIX: 'UNIX域套接字'        }        return families    @staticmethod    def show_socket_types():        """套接字类型"""        types = {            socket.SOCK_STREAM: 'TCP - 可靠、面向连接、字节流',            socket.SOCK_DGRAM: 'UDP - 不可靠、无连接、数据报',            socket.SOCK_RAW: '原始套接字 - 直接操作IP层'        }        return types    @staticmethod    def explain_ports():        """端口说明"""        common_ports = {            20'FTP数据',            21'FTP控制',            22'SSH',            23'Telnet',            25'SMTP',            53'DNS',            80'HTTP',            443'HTTPS',            3306'MySQL',            5432'PostgreSQL',            6379'Redis',            8080'HTTP备用'        }        return common_ports    @staticmethod    def byte_order_example():        """字节序示例"""        import struct        # 网络字节序(大端) vs 主机字节序        value = 0x12345678        # 主机字节序转网络字节序        network_order = socket.htonl(value)        # 网络字节序转主机字节序        host_order = socket.ntohl(network_order)        return {            '原始值'hex(value),            '网络字节序'hex(network_order),            '转换回主机字节序'hex(host_order),            '说明''网络通信统一使用网络字节序(大端)'        }

2. Socket编程基础

2.1 TCP套接字编程

"""TCP套接字编程:可靠、面向连接、字节流"""import socketimport threadingimport timefrom typing import OptionalTupleimport jsonclass TCPServer:    """TCP服务器基类"""    def __init__(self, host: str = 'localhost', port: int = 8888):        self.host = host        self.port = port        self.server_socket = None        self.is_running = False        self.clients = []  # 连接的客户端列表        self.lock = threading.Lock()    def start(self):        """启动TCP服务器"""        try:            # 创建TCP套接字            self.server_socket = socket.socket(                socket.AF_INET,      # IPv4                socket.SOCK_STREAM   # TCP            )            # 设置套接字选项            self.server_socket.setsockopt(                socket.SOL_SOCKET,   # 通用套接字选项                socket.SO_REUSEADDR, # 地址重用                1            )            # 绑定地址和端口            self.server_socket.bind((self.host, self.port))            # 开始监听,设置最大等待连接数            self.server_socket.listen(5)            print(f"TCP服务器启动在 {self.host}:{self.port}")            print(f"等待客户端连接...")            self.is_running = True            # 接受连接循环            while self.is_running:                try:                    # 接受客户端连接                    client_socket, client_address = self.server_socket.accept()                    print(f"新客户端连接: {client_address}")                    # 创建客户端处理线程                    client_thread = threading.Thread(                        target=self.handle_client,                        args=(client_socket, client_address)                    )                    client_thread.daemon = True                    client_thread.start()                    # 记录客户端                    with self.lock:                        self.clients.append({                            'socket': client_socket,                            'address': client_address,                            'thread': client_thread                        })                except OSError as e:                    if self.is_running:                        print(f"接受连接时出错: {e}")                    break        except Exception as e:            print(f"服务器启动失败: {e}")        finally:            self.stop()    def handle_client(self, client_socket: socket.socket, address: Tuple[strint]):        """处理客户端连接"""        client_ip, client_port = address        try:            # 设置接收超时(秒)            client_socket.settimeout(30.0)            # 发送欢迎消息            welcome_msg = f"欢迎连接到TCP服务器!你的地址: {client_ip}:{client_port}"            client_socket.sendall(welcome_msg.encode('utf-8'))            # 通信循环            while self.is_running:                try:                    # 接收数据                    data = client_socket.recv(1024)  # 最多接收1024字节                    if not data:                        print(f"客户端 {address} 断开连接")                        break                    # 处理接收到的数据                    message = data.decode('utf-8', errors='ignore').strip()                    print(f"来自 {address} 的消息: {message}")                    # 回显消息                    response = f"服务器收到: {message}"                    client_socket.sendall(response.encode('utf-8'))                    # 特殊命令处理                    if message.lower() == 'quit':                        response = "再见!"                        client_socket.sendall(response.encode('utf-8'))                        break                except socket.timeout:                    # 超时检查,发送心跳                    try:                        client_socket.sendall(b'ping')                    except:                        break                except ConnectionResetError:                    print(f"客户端 {address} 强制关闭连接")                    break        except Exception as e:            print(f"处理客户端 {address} 时出错: {e}")        finally:            # 清理资源            client_socket.close()            with self.lock:                self.clients = [c for c in self.clients if c['socket'] != client_socket]            print(f"客户端 {address} 处理完成")    def broadcast(self, message: str, exclude_address: Tuple = None):        """广播消息给所有客户端"""        with self.lock:            for client in self.clients:                if exclude_address and client['address'] == exclude_address:                    continue                try:                    client['socket'].sendall(message.encode('utf-8'))                except:                    # 发送失败,可能是客户端已断开                    pass    def stop(self):        """停止服务器"""        self.is_running = False        # 关闭所有客户端连接        with self.lock:            for client in self.clients:                try:                    client['socket'].close()                except:                    pass            self.clients.clear()        # 关闭服务器套接字        if self.server_socket:            try:                self.server_socket.close()            except:                pass        print("TCP服务器已停止")class AdvancedTCPServer(TCPServer):    """增强的TCP服务器"""    def __init__(self, host='localhost', port=8888):        super().__init__(host, port)        self.client_data = {}  # 客户端数据存储    def handle_client(self, client_socket: socket.socket, address: Tuple[strint]):        """增强的客户端处理"""        client_ip, client_port = address        try:            # 发送欢迎消息            welcome = {                'type''welcome',                'message'f'欢迎 {client_ip}:{client_port}',                'timestamp': time.time(),                'server_info''Python TCP服务器 v1.0'            }            self._send_json(client_socket, welcome)            # 主通信循环            while self.is_running:                try:                    # 接收数据                    data = client_socket.recv(4096)                    if not data:                        print(f"客户端 {address} 断开连接")                        break                    # 尝试解析为JSON                    try:                        message = json.loads(data.decode('utf-8'))                        self._handle_json_message(client_socket, address, message)                    except json.JSONDecodeError:                        # 处理普通文本                        text_message = data.decode('utf-8', errors='ignore').strip()                        self._handle_text_message(client_socket, address, text_message)                except socket.timeout:                    # 发送心跳                    heartbeat = {                        'type''heartbeat',                        'timestamp': time.time()                    }                    self._send_json(client_socket, heartbeat)                except ConnectionError as e:                    print(f"连接错误 {address}{e}")                    break        except Exception as e:            print(f"处理客户端 {address} 时异常: {e}")        finally:            self._cleanup_client(client_socket, address)    def _send_json(self, sock: socket.socket, data: dict):        """发送JSON数据"""        json_str = json.dumps(data)        sock.sendall(json_str.encode('utf-8'))    def _handle_json_message(self, sock: socket.socket, address: Tuple, message: dict):        """处理JSON消息"""        msg_type = message.get('type''unknown')        if msg_type == 'echo':            # 回显消息            response = {                'type''echo_response',                'original': message.get('data'''),                'timestamp': time.time(),                'received_from': address            }            self._send_json(sock, response)        elif msg_type == 'broadcast':            # 广播消息            broadcast_msg = message.get('message''')            self.broadcast(                f"广播来自 {address}{broadcast_msg}",                exclude_address=address            )        elif msg_type == 'get_clients':            # 获取客户端列表            with self.lock:                clients_info = [                    {'address': c['address'], 'connected'True}                    for c in self.clients                ]            response = {                'type''clients_list',                'clients': clients_info,                'total'len(clients_info)            }            self._send_json(sock, response)        else:            # 未知消息类型            response = {                'type''error',                'message'f'未知的消息类型: {msg_type}',                'received': message            }            self._send_json(sock, response)    def _handle_text_message(self, sock: socket.socket, address: Tuple, message: str):        """处理文本消息"""        print(f"文本消息来自 {address}{message}")        # 简单命令处理        if message.startswith('/'):            command = message[1:].lower().split()[0if len(message) > 1 else ''            if command == 'time':                response = f"服务器时间: {time.ctime()}"            elif command == 'help':                response = "可用命令: /time, /help, /quit"            elif command == 'quit':                response = "再见!"                sock.sendall(response.encode('utf-8'))                return            else:                response = f"未知命令: {command}"        else:            # 普通消息回显            response = f"收到: {message}"        sock.sendall(response.encode('utf-8'))    def _cleanup_client(self, sock: socket.socket, address: Tuple):        """清理客户端资源"""        sock.close()        with self.lock:            self.clients = [c for c in self.clients if c['socket'] != sock]        # 通知其他客户端        leave_msg = f"客户端 {address} 已离开"        self.broadcast(leave_msg)        print(f"客户端 {address} 已清理")# TCP客户端实现class TCPClient:    """TCP客户端"""    def __init__(self, host: str = 'localhost', port: int = 8888):        self.host = host        self.port = port        self.socket = None        self.is_connected = False    def connect(self) -> bool:        """连接到服务器"""        try:            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            # 设置超时            self.socket.settimeout(10.0)            # 连接服务器            self.socket.connect((self.host, self.port))            # 接收欢迎消息            welcome = self.socket.recv(1024)            print(f"服务器响应: {welcome.decode('utf-8')}")            self.is_connected = True            return True        except Exception as e:            print(f"连接失败: {e}")            return False    def send_message(self, message: str) -> Optional[str]:        """发送消息并接收响应"""        if not self.is_connected:            print("未连接到服务器")            return None        try:            # 发送消息            self.socket.sendall(message.encode('utf-8'))            # 接收响应            response = self.socket.recv(1024)            return response.decode('utf-8')        except Exception as e:            print(f"发送/接收失败: {e}")            return None    def send_json(self, data: dict) -> Optional[dict]:        """发送JSON数据"""        try:            json_str = json.dumps(data)            self.socket.sendall(json_str.encode('utf-8'))            # 接收响应            response_data = self.socket.recv(4096)            return json.loads(response_data.decode('utf-8'))        except Exception as e:            print(f"JSON通信失败: {e}")            return None    def interactive_mode(self):        """交互模式"""        if not self.connect():            return        print("进入交互模式,输入 'quit' 退出")        try:            while True:                # 获取用户输入                user_input = input("> ").strip()                if not user_input:                    continue                if user_input.lower() == 'quit':                    # 发送退出命令                    self.send_message('quit')                    break                # 发送消息                response = self.send_message(user_input)                if response:                    print(f"服务器: {response}")        except KeyboardInterrupt:            print("\n中断连接")        finally:            self.disconnect()    def disconnect(self):        """断开连接"""        if self.socket:            try:                self.socket.close()            except:                pass            self.socket = None        self.is_connected = False        print("已断开连接")

2.2 UDP套接字编程

"""UDP套接字编程:无连接、不可靠、数据报"""import socketimport threadingimport timefrom typing import OptionalTupleclass UDPServer:    """UDP服务器"""    def __init__(self, host: str = 'localhost', port: int = 9999):        self.host = host        self.port = port        self.socket = None        self.is_running = False        self.clients = set()  # 记录连接过的客户端        self.lock = threading.Lock()    def start(self):        """启动UDP服务器"""        try:            # 创建UDP套接字            self.socket = socket.socket(                socket.AF_INET,      # IPv4                socket.SOCK_DGRAM    # UDP            )            # 绑定地址和端口            self.socket.bind((self.host, self.port))            print(f"UDP服务器启动在 {self.host}:{self.port}")            print("等待数据报...")            self.is_running = True            # 接收数据报循环            while self.is_running:                try:                    # 接收数据(UDP不需要accept)                    data, client_address = self.socket.recvfrom(1024)                    # 记录客户端                    with self.lock:                        self.clients.add(client_address)                    # 处理数据                    self.handle_datagram(data, client_address)                except OSError as e:                    if self.is_running:                        print(f"接收数据报时出错: {e}")                    break        except Exception as e:            print(f"服务器启动失败: {e}")        finally:            self.stop()    def handle_datagram(self, data: bytes, client_address: Tuple[strint]):        """处理接收到的数据报"""        try:            message = data.decode('utf-8', errors='ignore').strip()            client_ip, client_port = client_address            print(f"来自 {client_ip}:{client_port} 的消息: {message}")            # 处理不同类型的数据            if message.lower() == 'ping':                response = 'pong'            elif message.lower() == 'time':                response = f"当前时间: {time.ctime()}"            elif message.lower() == 'clients':                with self.lock:                    client_count = len(self.clients)                response = f"连接过的客户端数: {client_count}"            elif message.lower() == 'quit':                response = '服务器收到退出指令'                # 向客户端发送响应后,可以选择停止服务器                self._send_response(client_address, response)                self.stop()                return            else:                response = f"收到: {message}"            # 发送响应            self._send_response(client_address, response)        except Exception as e:            print(f"处理数据报时出错: {e}")    def _send_response(self, client_address: Tuple[strint], message: str):        """发送响应到客户端"""        try:            self.socket.sendto(message.encode('utf-8'), client_address)        except Exception as e:            print(f"发送响应失败: {e}")    def broadcast(self, message: str):        """广播消息给所有连接过的客户端"""        with self.lock:            for client_address in self.clients:                try:                    self.socket.sendto(message.encode('utf-8'), client_address)                except:                    pass  # 客户端可能已不可达    def stop(self):        """停止服务器"""        self.is_running = False        if self.socket:            try:                self.socket.close()            except:                pass        print("UDP服务器已停止")class UDPClient:    """UDP客户端"""    def __init__(self, server_host: str = 'localhost', server_port: int = 9999):        self.server_address = (server_host, server_port)        self.socket = None        self.timeout = 5.0  # 超时时间(秒)    def create_socket(self):        """创建UDP套接字"""        try:            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)            self.socket.settimeout(self.timeout)            return True        except Exception as e:            print(f"创建套接字失败: {e}")            return False    def send_message(self, message: str, retry_count: int = 3) -> Optional[str]:        """发送消息到服务器(带重试)"""        if not self.socket and not self.create_socket():            return None        for attempt in range(retry_count):            try:                # 发送消息                self.socket.sendto(message.encode('utf-8'), self.server_address)                # 尝试接收响应(UDP可能没有响应)                try:                    response_data, server_addr = self.socket.recvfrom(1024)                    response = response_data.decode('utf-8')                    return response                except socket.timeout:                    if attempt < retry_count - 1:                        print(f"超时,第 {attempt + 1} 次重试...")                        continue                    else:                        print("服务器无响应")                        return None            except Exception as e:                print(f"发送失败: {e}")                if attempt < retry_count - 1:                    time.sleep(1)  # 等待后重试                else:                    return None        return None    def measure_latency(self, count: int = 10) -> dict:        """测量网络延迟"""        if not self.create_socket():            return {'error''无法创建套接字'}        latencies = []        successes = 0        for i in range(count):            try:                start_time = time.time()                # 发送ping                self.socket.sendto(b'ping'self.server_address)                # 等待pong                response, _ = self.socket.recvfrom(1024)                if response.decode('utf-8') == 'pong':                    end_time = time.time()                    latency = (end_time - start_time) * 1000  # 转换为毫秒                    latencies.append(latency)                    successes += 1            except socket.timeout:                print(f"测试 {i+1}: 超时")            except Exception as e:                print(f"测试 {i+1}: 错误 - {e}")        # 计算结果        if latencies:            results = {                'total_tests': count,                'successful': successes,                'success_rate': (successes / count) * 100,                'min_latency_ms'min(latencies),                'max_latency_ms'max(latencies),                'avg_latency_ms'sum(latencies) / len(latencies),                'all_latencies': latencies            }        else:            results = {                'total_tests': count,                'successful'0,                'success_rate'0,                'error''所有测试都失败'            }        return results    def close(self):        """关闭套接字"""        if self.socket:            try:                self.socket.close()            except:                pass            self.socket = None

3. HTTP协议实现

3.1 原始HTTP客户端

"""原始HTTP客户端实现"""import socketimport sslfrom urllib.parse import urlparsefrom typing import DictOptionalTupleimport jsonclass RawHTTPClient:    """原始HTTP客户端(直接使用socket)"""    def __init__(self):        self.default_ports = {            'http'80,            'https'443        }        self.user_agent = 'PythonRawHTTPClient/1.0'    def request(self, url: str, method: str = 'GET'                headers: Dict = None, data: str = None,                timeout: float = 10.0) -> Tuple[intDictstr]:        """发送HTTP请求"""        # 解析URL        parsed = urlparse(url)        scheme = parsed.scheme.lower()        hostname = parsed.hostname        port = parsed.port or self.default_ports.get(scheme, 80)        path = parsed.path or '/'        query = f'?{parsed.query}' if parsed.query else ''        if not hostname:            raise ValueError(f"无效的URL: {url}")        # 创建socket        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        sock.settimeout(timeout)        try:            # 连接服务器            sock.connect((hostname, port))            # 对于HTTPS,包装socket            if scheme == 'https':                context = ssl.create_default_context()                sock = context.wrap_socket(sock, server_hostname=hostname)            # 构建请求            request_lines = self._build_request(                method, hostname, port, path, query,                headers or {}, data            )            # 发送请求            sock.sendall(request_lines.encode('utf-8'))            # 接收响应            response = self._receive_response(sock)            return response        finally:            sock.close()    def _build_request(self, method: str, hostname: str, port: int,                      path: str, query: str, headers: Dict, data: str) -> str:        """构建HTTP请求"""        # 请求行        request_line = f"{method.upper()}{path}{query} HTTP/1.1\r\n"        # 默认头部        default_headers = {            'Host'f'{hostname}:{port}' if port != 80 else hostname,            'User-Agent'self.user_agent,            'Accept''*/*',            'Connection''close'        }        # 更新头部        if headers:            default_headers.update(headers)        # 如果有请求体,添加Content-Length        if data:            default_headers['Content-Length'] = str(len(data.encode('utf-8')))            if 'Content-Type' not in default_headers:                default_headers['Content-Type'] = 'application/x-www-form-urlencoded'        # 构建头部        headers_str = ''        for key, value in default_headers.items():            headers_str += f"{key}{value}\r\n"        # 完整的请求        request = request_line + headers_str + '\r\n'        # 如果有请求体,添加到末尾        if data:            request += data        return request    def _receive_response(self, sock: socket.socket) -> Tuple[intDictstr]:        """接收HTTP响应"""        BUFFER_SIZE = 4096        response_data = b''        # 接收数据直到连接关闭        while True:            try:                chunk = sock.recv(BUFFER_SIZE)                if not chunk:                    break                response_data += chunk            except socket.timeout:                break            except:                break        # 解析响应        response_str = response_data.decode('utf-8', errors='ignore')        # 分割响应头和响应体        header_end = response_str.find('\r\n\r\n')        if header_end == -1:            raise ValueError("无效的HTTP响应")        headers_part = response_str[:header_end]        body = response_str[header_end + 4:]        # 解析状态行        lines = headers_part.split('\r\n')        if not lines:            raise ValueError("无效的HTTP响应")        status_line = lines[0]        status_parts = status_line.split(' '2)        if len(status_parts) < 3:            raise ValueError("无效的状态行")        protocol, status_code, status_text = status_parts        status_code = int(status_code)        # 解析响应头        response_headers = {}        for line in lines[1:]:            if ': ' in line:                key, value = line.split(': '1)                response_headers[key] = value        return status_code, response_headers, body    def get(self, url: str, **kwargs) -> Tuple[intDictstr]:        """发送GET请求"""        return self.request(url, 'GET', **kwargs)    def post(self, url: str, data: str = None, **kwargs) -> Tuple[intDictstr]:        """发送POST请求"""        return self.request(url, 'POST', data=data, **kwargs)    def head(self, url: str, **kwargs) -> Tuple[intDictstr]:        """发送HEAD请求"""        return self.request(url, 'HEAD', **kwargs)    def get_json(self, url: str, **kwargs) -> Optional[Dict]:        """获取JSON响应"""        status, headers, body = self.get(url, **kwargs)        if 200 <= status < 300:            try:                return json.loads(body)            except json.JSONDecodeError:                return None        return None# 使用示例def test_raw_http_client():    """测试原始HTTP客户端"""    client = RawHTTPClient()    # 测试HTTP请求    print("=== 测试HTTP请求 ===")    status, headers, body = client.get('http://httpbin.org/get')    print(f"状态码: {status}")    print(f"响应头: {list(headers.keys())[:5]}...")    print(f"响应体长度: {len(body)} 字节")    # 测试HTTPS请求    print("\n=== 测试HTTPS请求 ===")    status, headers, body = client.get('https://httpbin.org/get')    print(f"状态码: {status}")    # 测试POST请求    print("\n=== 测试POST请求 ===")    post_data = 'key1=value1&key2=value2'    status, headers, body = client.post(        'https://httpbin.org/post',        data=post_data,        headers={'Content-Type''application/x-www-form-urlencoded'}    )    print(f"状态码: {status}")    # 测试JSON响应    print("\n=== 测试JSON响应 ===")    json_data = client.get_json('https://httpbin.org/json')    if json_data:        print(f"获取到JSON数据,键: {list(json_data.keys())}")    return client

3.2 简单HTTP服务器

"""简单HTTP服务器实现"""import socketimport threadingimport mimetypesimport osfrom pathlib import Pathfrom datetime import datetimeimport urllib.parseclass SimpleHTTPServer:    """简单的HTTP服务器"""    def __init__(self, host: str = 'localhost', port: int = 8000                 web_root: str = '.'):        self.host = host        self.port = port        self.web_root = Path(web_root).resolve()        self.server_socket = None        self.is_running = False        self.routes = {}  # 路由映射        # 初始化MIME类型        mimetypes.init()        # 注册默认路由        self._register_default_routes()    def _register_default_routes(self):        """注册默认路由"""        self.routes = {            'GET': {                '/'self._handle_index,                '/echo'self._handle_echo,                '/time'self._handle_time,                '/headers'self._handle_headers,                '/form'self._handle_form_get,            },            'POST': {                '/echo'self._handle_echo,                '/form'self._handle_form_post,            }        }    def start(self):        """启动HTTP服务器"""        try:            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)            self.server_socket.bind((self.host, self.port))            self.server_socket.listen(5)            print(f"HTTP服务器启动在 http://{self.host}:{self.port}")            print(f"文档根目录: {self.web_root}")            print("按 Ctrl+C 停止服务器")            self.is_running = True            while self.is_running:                try:                    client_socket, client_address = self.server_socket.accept()                    # 创建线程处理请求                    thread = threading.Thread(                        target=self._handle_request,                        args=(client_socket, client_address)                    )                    thread.daemon = True                    thread.start()                except KeyboardInterrupt:                    print("\n收到中断信号")                    break                except Exception as e:                    if self.is_running:                        print(f"接受连接时出错: {e}")        except Exception as e:            print(f"服务器启动失败: {e}")        finally:            self.stop()    def _handle_request(self, client_socket: socket.socket, client_address: tuple):        """处理HTTP请求"""        try:            # 接收请求数据            request_data = client_socket.recv(1024)            if not request_data:                return            request_text = request_data.decode('utf-8')            # 解析请求            request_info = self._parse_request(request_text)            # 记录访问日志            self._log_request(client_address, request_info)            # 处理请求            response = self._process_request(request_info)            # 发送响应            client_socket.sendall(response.encode('utf-8'))        except Exception as e:            print(f"处理请求时出错: {e}")            error_response = self._create_error_response(500str(e))            client_socket.sendall(error_response.encode('utf-8'))        finally:            client_socket.close()    def _parse_request(self, request_text: str) -> dict:        """解析HTTP请求"""        lines = request_text.strip().split('\r\n')        if not lines:            raise ValueError("空请求")        # 解析请求行        request_line = lines[0]        method, path, version = request_line.split(' '2)        # 解析请求头        headers = {}        body_start = 0        for i, line in enumerate(lines[1:], 1):            if not line.strip():  # 空行,头部结束                body_start = i + 1                break            if ': ' in line:                key, value = line.split(': '1)                headers[key] = value        # 解析请求体        body = ''        if body_start < len(lines):            body = '\r\n'.join(lines[body_start:])        # 解析查询参数        parsed_path = urllib.parse.urlparse(path)        path = parsed_path.path        query_params = urllib.parse.parse_qs(parsed_path.query)        return {            'method': method,            'path': path,            'version': version,            'headers': headers,            'body': body,            'query_params': query_params,            'raw': request_text        }    def _process_request(self, request_info: dict) -> str:        """处理请求并生成响应"""        method = request_info['method']        path = request_info['path']        # 检查是否匹配注册的路由        if method in self.routes and path in self.routes[method]:            handler = self.routes[method][path]            return handler(request_info)        # 否则作为静态文件处理        return self._handle_static_file(request_info)    def _handle_static_file(self, request_info: dict) -> str:        """处理静态文件请求"""        path = request_info['path']        # 安全性检查:防止路径遍历攻击        requested_path = (self.web_root / path.lstrip('/')).resolve()        # 确保请求路径在web根目录内        if not str(requested_path).startswith(str(self.web_root)):            return self._create_error_response(403"禁止访问")        # 检查文件是否存在        if not requested_path.exists():            return self._create_error_response(404"文件未找到")        # 如果是目录,寻找index.html        if requested_path.is_dir():            index_file = requested_path / 'index.html'            if index_file.exists():                requested_path = index_file            else:                # 列出目录内容                return self._list_directory(requested_path, path)        # 检查是否是文件        if not requested_path.is_file():            return self._create_error_response(403"不是文件")        # 读取文件内容        try:            with open(requested_path, 'rb'as f:                content = f.read()            # 获取MIME类型            mime_type, _ = mimetypes.guess_type(str(requested_path))            if not mime_type:                mime_type = 'application/octet-stream'            # 构建响应            response_lines = [                'HTTP/1.1 200 OK',                f'Content-Type: {mime_type}',                f'Content-Length: {len(content)}',                f'Date: {datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")}',                'Server: SimpleHTTPServer',                'Connection: close',                '',                ''            ]            response = '\r\n'.join(response_lines).encode('utf-8') + content            return response.decode('utf-8', errors='ignore')        except Exception as e:            return self._create_error_response(500f"读取文件失败: {e}")    def _list_directory(self, dir_path: Path, url_path: str) -> str:        """列出目录内容"""        try:            items = []            for item in dir_path.iterdir():                is_dir = item.is_dir()                name = item.name + ('/' if is_dir else '')                size = os.path.getsize(item) if not is_dir else '-'                items.append({                    'name': name,                    'size': size,                    'url'f"{url_path.rstrip('/')}/{urllib.parse.quote(name)}"                })            # 生成HTML            html_lines = [                '<!DOCTYPE html>',                '<html><head><title>目录列表</title></head>',                '<body>',                '<h1>目录: ' + url_path + '</h1>',                '<table border="1">',                '<tr><th>名称</th><th>大小</th></tr>'            ]            # 父目录链接            if url_path != '/':                parent_path = '/'.join(url_path.rstrip('/').split('/')[:-1])                html_lines.append(                    f'<tr><td><a href="{parent_path or"/"}">../</a></td><td>-</td></tr>'                )            for item in items:                html_lines.append(                    f'<tr><td><a href="{item["url"]}">{item["name"]}</a></td>'                    f'<td>{item["size"]}</td></tr>'                )            html_lines.extend([                '</table>',                '</body></html>'            ])            html = '\n'.join(html_lines)            response_lines = [                'HTTP/1.1 200 OK',                'Content-Type: text/html; charset=utf-8',                f'Content-Length: {len(html.encode("utf-8"))}',                '',                html            ]            return '\r\n'.join(response_lines)        except Exception as e:            return self._create_error_response(500f"列出目录失败: {e}")    def _handle_index(self, request_info: dict) -> str:        """处理首页请求"""        html = """        <!DOCTYPE html>        <html>        <head><title>Simple HTTP Server</title></head>        <body>            <h1>欢迎使用Simple HTTP Server</h1>            <ul>                <li><a href="/time">查看服务器时间</a></li>                <li><a href="/headers">查看请求头</a></li>                <li><a href="/form">表单测试</a></li>                <li><a href="/echo">回显测试</a></li>            </ul>        </body>        </html>        """        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    def _handle_time(self, request_info: dict) -> str:        """处理时间请求"""        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/plain; charset=utf-8',            f'Content-Length: {len(current_time.encode("utf-8"))}',            '',            current_time        ]        return '\r\n'.join(response_lines)    def _handle_headers(self, request_info: dict) -> str:        """处理请求头查看"""        headers = request_info['headers']        html_lines = [            '<!DOCTYPE html>',            '<html><head><title>请求头</title></head>',            '<body><h1>请求头信息</h1><table border="1">',            '<tr><th>Header</th><th>Value</th></tr>'        ]        for key, value in headers.items():            html_lines.append(f'<tr><td>{key}</td><td>{value}</td></tr>')        html_lines.append('</table></body></html>')        html = '\n'.join(html_lines)        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    def _handle_echo(self, request_info: dict) -> str:        """处理回显请求"""        method = request_info['method']        if method == 'GET':            # 显示表单            query_params = request_info['query_params']            message = query_params.get('message', [''])[0]        elif method == 'POST':            # 解析POST数据            body = request_info['body']            parsed = urllib.parse.parse_qs(body)            message = parsed.get('message', [''])[0]        else:            message = ''        html = f"""        <!DOCTYPE html>        <html>        <head><title>回显测试</title></head>        <body>            <h1>回显测试</h1>            <form method="POST" action="/echo">                <label>消息: <input type="text" name="message" value="{message}"></label>                <button type="submit">发送</button>            </form>            {"<h2>收到的消息: " + message + "</h2>"if message else""}        </body>        </html>        """        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    def _handle_form_get(self, request_info: dict) -> str:        """处理GET表单"""        html = """        <!DOCTYPE html>        <html>        <head><title>表单测试</title></head>        <body>            <h1>表单测试</h1>            <form method="POST" action="/form">                <div>                    <label>用户名: <input type="text" name="username"></label>                </div>                <div>                    <label>邮箱: <input type="email" name="email"></label>                </div>                <div>                    <label>留言:</label><br>                    <textarea name="message" rows="4" cols="50"></textarea>                </div>                <div>                    <button type="submit">提交</button>                </div>            </form>        </body>        </html>        """        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    def _handle_form_post(self, request_info: dict) -> str:        """处理POST表单"""        body = request_info['body']        parsed = urllib.parse.parse_qs(body)        result_html = '<h2>提交的数据:</h2><ul>'        for key, values in parsed.items():            for value in values:                result_html += f'<li>{key}{value}</li>'        result_html += '</ul>'        html = f"""        <!DOCTYPE html>        <html>        <head><title>表单提交结果</title></head>        <body>            <h1>表单提交成功</h1>            {result_html}            <p><a href="/form">返回表单</a></p>        </body>        </html>        """        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    def _create_error_response(self, status_code: int, message: str) -> str:        """创建错误响应"""        status_messages = {            400'Bad Request',            403'Forbidden',            404'Not Found',            500'Internal Server Error'        }        status_text = status_messages.get(status_code, 'Unknown Error')        html = f"""        <!DOCTYPE html>        <html>        <head><title>{status_code} {status_text}</title></head>        <body>            <h1>{status_code} {status_text}</h1>            <p>{message}</p>            <p><a href="/">返回首页</a></p>        </body>        </html>        """        response_lines = [            f'HTTP/1.1 {status_code}{status_text}',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    def _log_request(self, client_address: tuple, request_info: dict):        """记录访问日志"""        ip, port = client_address        method = request_info['method']        path = request_info['path']        user_agent = request_info['headers'].get('User-Agent''Unknown')        log_line = (            f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "            f"{ip}:{port} - {method}{path} - {user_agent}"        )        print(log_line)    def add_route(self, method: str, path: str, handler):        """添加自定义路由"""        if method not in self.routes:            self.routes[method] = {}        self.routes[method][path] = handler    def stop(self):        """停止服务器"""        self.is_running = False        if self.server_socket:            try:                self.server_socket.close()            except:                pass        print("HTTP服务器已停止")# 使用示例def run_http_server_example():    """运行HTTP服务器示例"""    # 创建服务器    server = SimpleHTTPServer(        host='localhost',        port=8000,        web_root='./web_root'  # 创建一个web_root目录存放静态文件    )    # 添加自定义路由    def handle_custom(request_info):        html = """        <!DOCTYPE html>        <html>        <head><title>自定义路由</title></head>        <body>            <h1>这是自定义路由</h1>            <p>当前时间: """ + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + """</p>        </body>        </html>        """        response_lines = [            'HTTP/1.1 200 OK',            'Content-Type: text/html; charset=utf-8',            f'Content-Length: {len(html.encode("utf-8"))}',            '',            html        ]        return '\r\n'.join(response_lines)    server.add_route('GET''/custom', handle_custom)    # 启动服务器    try:        server.start()    except KeyboardInterrupt:        print("\n服务器被用户中断")        server.stop()# 创建测试目录和文件def setup_test_environment():    """设置测试环境"""    import os    # 创建web根目录    web_root = Path('./web_root')    web_root.mkdir(exist_ok=True)    # 创建index.html    index_html = web_root / 'index.html'    if not index_html.exists():        index_html.write_text("""        <!DOCTYPE html>        <html>        <head><title>测试页面</title></head>        <body>            <h1>欢迎来到测试网站</h1>            <p>这是一个测试用的静态文件。</p>        </body>        </html>        """)    # 创建其他测试文件    test_txt = web_root / 'test.txt'    test_txt.write_text("这是一个文本文件\n第二行\n第三行")    # 创建子目录    subdir = web_root / 'subdir'    subdir.mkdir(exist_ok=True)    sub_file = subdir / 'readme.md'    sub_file.write_text("# 子目录文件\n\n这个文件在子目录中。")    print("测试环境设置完成")    print(f"Web根目录: {web_root.absolute()}")

4. 网络编程实战:简单聊天室

"""网络编程实战:简单聊天室"""import socketimport threadingimport jsonimport timefrom datetime import datetimefrom typing import DictListOptionalfrom dataclasses import dataclass, asdictfrom enum import Enumclass MessageType(Enum):    """消息类型"""    TEXT = "text"    JOIN = "join"    LEAVE = "leave"    SYSTEM = "system"    COMMAND = "command"@dataclassclass ChatMessage:    """聊天消息"""    type: MessageType    sender: str    content: str    timestamp: float    room: str = "default"    def to_dict(self):        """转换为字典"""        data = asdict(self)        data['type'] = self.type.value        data['timestamp'] = datetime.fromtimestamp(self.timestamp).isoformat()        return data    @classmethod    def from_dict(cls, data: dict):        """从字典创建"""        data['type'] = MessageType(data['type'])        data['timestamp'] = datetime.fromisoformat(data['timestamp']).timestamp()        return cls(**data)class ChatRoom:    """聊天室"""    def __init__(self, name: str = "default"):        self.name = name        self.clients: Dict[str, socket.socket] = {}  # 用户名 -> socket        self.messages: List[ChatMessage] = []        self.max_messages = 100        self.lock = threading.Lock()    def add_client(self, username: str, client_socket: socket.socket):        """添加客户端"""        with self.lock:            self.clients[username] = client_socket        # 发送加入消息        join_msg = ChatMessage(            type=MessageType.JOIN,            sender="系统",            content=f"{username} 加入了聊天室",            timestamp=time.time(),            room=self.name        )        self.add_message(join_msg)        self.broadcast(join_msg, exclude=username)        # 发送历史消息给新用户        self.send_history(username)        return join_msg    def remove_client(self, username: str):        """移除客户端"""        with self.lock:            if username in self.clients:                del self.clients[username]        # 发送离开消息        leave_msg = ChatMessage(            type=MessageType.LEAVE,            sender="系统",            content=f"{username} 离开了聊天室",            timestamp=time.time(),            room=self.name        )        self.add_message(leave_msg)        self.broadcast(leave_msg)        return leave_msg    def add_message(self, message: ChatMessage):        """添加消息到历史"""        with self.lock:            self.messages.append(message)            # 限制消息数量            if len(self.messages) > self.max_messages:                self.messages = self.messages[-self.max_messages:]    def broadcast(self, message: ChatMessage, exclude: str = None):        """广播消息"""        data = json.dumps(message.to_dict())        with self.lock:            for username, sock in self.clients.items():                if username == exclude:                    continue                try:                    sock.sendall(data.encode('utf-8'))                except:                    # 发送失败,客户端可能已断开                    pass    def send_history(self, username: str, count: int = 20):        """发送历史消息"""        with self.lock:            if username not in self.clients:                return            # 获取最近的消息            recent_messages = self.messages[-count:] if self.messages else []            for message in recent_messages:                try:                    data = json.dumps(message.to_dict())                    self.clients[username].sendall(data.encode('utf-8'))                except:                    break    def send_to_user(self, username: str, message: ChatMessage):        """发送消息给特定用户"""        with self.lock:            if username in self.clients:                try:                    data = json.dumps(message.to_dict())                    self.clients[username].sendall(data.encode('utf-8'))                    return True                except:                    pass        return False    def get_client_count(self) -> int:        """获取客户端数量"""        with self.lock:            return len(self.clients)    def get_client_list(self) -> List[str]:        """获取客户端列表"""        with self.lock:            return list(self.clients.keys())class ChatServer:    """聊天服务器"""    def __init__(self, host: str = 'localhost', port: int = 9000):        self.host = host        self.port = port        self.server_socket = None        self.is_running = False        self.rooms: Dict[str, ChatRoom] = {"default": ChatRoom("default")}        self.user_rooms: Dict[strstr] = {}  # 用户名 -> 房间名        self.lock = threading.Lock()    def start(self):        """启动聊天服务器"""        try:            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)            self.server_socket.bind((self.host, self.port))            self.server_socket.listen(5)            print(f"聊天服务器启动在 {self.host}:{self.port}")            print("等待客户端连接...")            self.is_running = True            while self.is_running:                try:                    client_socket, client_address = self.server_socket.accept()                    # 创建线程处理客户端                    thread = threading.Thread(                        target=self.handle_client,                        args=(client_socket, client_address)                    )                    thread.daemon = True                    thread.start()                except KeyboardInterrupt:                    print("\n收到中断信号")                    break                except Exception as e:                    if self.is_running:                        print(f"接受连接时出错: {e}")        except Exception as e:            print(f"服务器启动失败: {e}")        finally:            self.stop()    def handle_client(self, client_socket: socket.socket, client_address: tuple):        """处理客户端连接"""        client_ip, client_port = client_address        username = None        try:            # 设置超时            client_socket.settimeout(30.0)            # 第一步:用户认证/注册            auth_data = client_socket.recv(1024)            if not auth_data:                return            try:                auth_info = json.loads(auth_data.decode('utf-8'))                username = auth_info.get('username')                room_name = auth_info.get('room''default')                if not username:                    error_msg = {"error""需要用户名"}                    client_socket.sendall(json.dumps(error_msg).encode('utf-8'))                    return                # 检查用户名是否已存在                with self.lock:                    if username in self.user_rooms:                        error_msg = {"error""用户名已存在"}                        client_socket.sendall(json.dumps(error_msg).encode('utf-8'))                        return            except json.JSONDecodeError:                error_msg = {"error""无效的认证数据"}                client_socket.sendall(json.dumps(error_msg).encode('utf-8'))                return            # 加入房间            with self.lock:                if room_name not in self.rooms:                    self.rooms[room_name] = ChatRoom(room_name)                self.user_rooms[username] = room_name                room = self.rooms[room_name]            # 添加客户端到房间            join_msg = room.add_client(username, client_socket)            print(f"用户 {username} 加入房间 {room_name} 来自 {client_ip}:{client_port}")            # 发送欢迎消息            welcome = {                "type""system",                "sender""系统",                "content"f"欢迎来到聊天室 {room_name}! 输入 /help 查看帮助",                "timestamp": datetime.now().isoformat(),                "room": room_name            }            client_socket.sendall(json.dumps(welcome).encode('utf-8'))            # 主消息循环            while self.is_running:                try:                    # 接收消息                    message_data = client_socket.recv(4096)                    if not message_data:                        print(f"用户 {username} 断开连接")                        break                    # 解析消息                    try:                        message_dict = json.loads(message_data.decode('utf-8'))                        message_type = MessageType(message_dict.get('type''text'))                        if message_type == MessageType.TEXT:                            content = message_dict.get('content''')                            # 检查是否是命令                            if content.startswith('/'):                                self._handle_command(username, room, content)                            else:                                # 创建文本消息                                text_msg = ChatMessage(                                    type=MessageType.TEXT,                                    sender=username,                                    content=content,                                    timestamp=time.time(),                                    room=room_name                                )                                room.add_message(text_msg)                                room.broadcast(text_msg, exclude=username)                    except json.JSONDecodeError:                        print(f"无效的JSON数据来自 {username}")                except socket.timeout:                    # 发送心跳                    heartbeat = {                        "type""system",                        "sender""系统",                        "content""ping",                        "timestamp": datetime.now().isoformat()                    }                    try:                        client_socket.sendall(json.dumps(heartbeat).encode('utf-8'))                    except:                        break                except ConnectionError:                    print(f"用户 {username} 连接错误")                    break        except Exception as e:            print(f"处理客户端 {username or client_address} 时出错: {e}")        finally:            # 清理            if username:                with self.lock:                    if username in self.user_rooms:                        room_name = self.user_rooms[username]                        if room_name in self.rooms:                            room = self.rooms[room_name]                            room.remove_client(username)                        del self.user_rooms[username]            client_socket.close()            print(f"客户端 {username or client_address} 处理完成")    def _handle_command(self, username: str, room: ChatRoom, command: str):        """处理命令"""        parts = command[1:].split()        cmd = parts[0].lower() if parts else ""        args = parts[1:] if len(parts) > 1 else []        response = None        if cmd == "help":            help_text = """            可用命令:            /help - 显示此帮助            /list - 列出在线用户            /room <房间名> - 切换房间            /pm <用户名> <消息> - 私聊            /quit - 退出聊天室            """            response = ChatMessage(                type=MessageType.SYSTEM,                sender="系统",                content=help_text,                timestamp=time.time(),                room=room.name            )        elif cmd == "list":            users = room.get_client_list()            user_list = ", ".join(users)            response = ChatMessage(                type=MessageType.SYSTEM,                sender="系统",                content=f"在线用户 ({len(users)}): {user_list}",                timestamp=time.time(),                room=room.name            )        elif cmd == "pm" and len(args) >= 2:            target_user = args[0]            pm_message = " ".join(args[1:])            # 查找目标用户            target_room_name = None            with self.lock:                if target_user in self.user_rooms:                    target_room_name = self.user_rooms[target_user]            if target_room_name:                target_room = self.rooms.get(target_room_name)                if target_room:                    pm_msg = ChatMessage(                        type=MessageType.TEXT,                        sender=f"[私]{username}",                        content=pm_message,                        timestamp=time.time(),                        room=target_room.name                    )                    # 发送给目标用户                    if target_room.send_to_user(target_user, pm_msg):                        # 发送确认给发起者                        response = ChatMessage(                            type=MessageType.SYSTEM,                            sender="系统",                            content=f"私聊已发送给 {target_user}",                            timestamp=time.time(),                            room=room.name                        )                    else:                        response = ChatMessage(                            type=MessageType.SYSTEM,                            sender="系统",                            content=f"无法发送私聊给 {target_user}",                            timestamp=time.time(),                            room=room.name                        )            else:                response = ChatMessage(                    type=MessageType.SYSTEM,                    sender="系统",                    content=f"用户 {target_user} 不在线",                    timestamp=time.time(),                    room=room.name                )        elif cmd == "room" and args:            new_room = args[0]            # 注意:实际实现中需要更复杂的房间切换逻辑            response = ChatMessage(                type=MessageType.SYSTEM,                sender="系统",                content=f"房间切换功能待实现,当前房间: {room.name}",                timestamp=time.time(),                room=room.name            )        elif cmd == "quit":            # 处理退出命令            response = ChatMessage(                type=MessageType.SYSTEM,                sender="系统",                content="正在断开连接...",                timestamp=time.time(),                room=room.name            )            # 实际退出逻辑在主循环中处理        else:            response = ChatMessage(                type=MessageType.SYSTEM,                sender="系统",                content=f"未知命令: {cmd},输入 /help 查看帮助",                timestamp=time.time(),                room=room.name            )        # 发送响应        if response:            room.send_to_user(username, response)    def stop(self):        """停止服务器"""        self.is_running = False        if self.server_socket:            try:                self.server_socket.close()            except:                pass        # 通知所有客户端        for room_name, room in self.rooms.items():            shutdown_msg = ChatMessage(                type=MessageType.SYSTEM,                sender="系统",                content="服务器正在关闭",                timestamp=time.time(),                room=room_name            )            room.broadcast(shutdown_msg)        print("聊天服务器已停止")class ChatClient:    """聊天客户端"""    def __init__(self, server_host: str = 'localhost', server_port: int = 9000):        self.server_host = server_host        self.server_port = server_port        self.socket = None        self.username = None        self.current_room = "default"        self.is_connected = False        self.receive_thread = None    def connect(self, username: str, room: str = "default") -> bool:        """连接到聊天服务器"""        try:            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            self.socket.settimeout(5.0)            # 连接服务器            self.socket.connect((self.server_host, self.server_port))            # 发送认证信息            auth_info = {                "username": username,                "room": room            }            self.socket.sendall(json.dumps(auth_info).encode('utf-8'))            # 接收响应            response_data = self.socket.recv(1024)            response = json.loads(response_data.decode('utf-8'))            if "error" in response:                print(f"连接失败: {response['error']}")                return False            self.username = username            self.current_room = room            self.is_connected = True            # 启动接收线程            self.receive_thread = threading.Thread(target=self._receive_messages, daemon=True)            self.receive_thread.start()            return True        except Exception as e:            print(f"连接失败: {e}")            return False    def _receive_messages(self):        """接收消息的线程"""        while self.is_connected:            try:                message_data = self.socket.recv(4096)                if not message_data:                    print("服务器断开连接")                    self.is_connected = False                    break                # 解析消息                message = json.loads(message_data.decode('utf-8'))                self._display_message(message)            except socket.timeout:                continue            except json.JSONDecodeError:                print("收到无效的消息")            except ConnectionError:                print("连接错误")                self.is_connected = False                break            except Exception as e:                print(f"接收消息时出错: {e}")                self.is_connected = False                break    def _display_message(self, message: dict):        """显示消息"""        msg_type = message.get('type''text')        sender = message.get('sender''未知')        content = message.get('content''')        timestamp = message.get('timestamp''')        if msg_type == 'system' and content == 'ping':            # 忽略心跳            return        # 格式化时间        try:            dt = datetime.fromisoformat(timestamp.replace('Z''+00:00'))            time_str = dt.strftime('%H:%M:%S')        except:            time_str = timestamp        # 格式化输出        if msg_type == 'join':            print(f"\n[{time_str}{content}")        elif msg_type == 'leave':            print(f"\n[{time_str}{content}")        elif msg_type == 'system':            print(f"\n[{time_str}] 系统: {content}")        else:            print(f"\n[{time_str}{sender}{content}")        # 显示输入提示        self._show_prompt()    def _show_prompt(self):        """显示输入提示"""        print(f"\n[{self.username}] > ", end='', flush=True)    def send_message(self, message: str):        """发送消息"""        if not self.is_connected:            print("未连接到服务器")            return        try:            message_data = {                "type""text",                "content": message            }            self.socket.sendall(json.dumps(message_data).encode('utf-8'))        except Exception as e:            print(f"发送消息失败: {e}")            self.is_connected = False    def run_interactive(self):        """运行交互式客户端"""        print("=== Python聊天客户端 ===")        # 获取用户名        while True:            username = input("请输入用户名: ").strip()            if username:                break            print("用户名不能为空")        # 获取房间名        room = input("请输入房间名 (默认: default): ").strip()        if not room:            room = "default"        # 连接服务器        if not self.connect(username, room):            return        print(f"\n已连接到聊天服务器,房间: {room}")        print("输入 /help 查看命令,输入 /quit 退出")        self._show_prompt()        try:            while self.is_connected:                # 获取用户输入                try:                    message = input()                except EOFError:                    print("\n输入结束")                    break                except KeyboardInterrupt:                    print("\n中断连接")                    break                # 发送消息                if message.strip().lower() == '/quit':                    self.send_message('/quit')                    time.sleep(0.5)  # 等待消息发送                    break                else:                    self.send_message(message)        finally:            self.disconnect()    def disconnect(self):        """断开连接"""        self.is_connected = False        if self.socket:            try:                self.socket.close()            except:                pass        print("已断开连接")# 运行示例def run_chat_example():    """运行聊天示例"""    import sys    if len(sys.argv) > 1 and sys.argv[1] == 'server':        # 运行服务器        server = ChatServer()        try:            server.start()        except KeyboardInterrupt:            server.stop()    else:        # 运行客户端        client = ChatClient()        client.run_interactive()# 多客户端测试脚本def test_multiple_clients():    """测试多个客户端"""    import threading    import random    import time    class TestClient(threading.Thread):        def __init__(self, client_id):            super().__init__()            self.client_id = client_id            self.username = f"user{client_id}"        def run(self):            client = ChatClient()            if client.connect(self.username):                print(f"客户端 {self.username} 连接成功")                # 发送几条消息                for i in range(3):                    message = f"消息 {i+1} 来自 {self.username}"                    client.send_message(message)                    time.sleep(random.uniform(0.52.0))                # 退出                client.send_message('/quit')                time.sleep(0.5)                client.disconnect()            else:                print(f"客户端 {self.username} 连接失败")    # 创建并启动多个测试客户端    threads = []    for i in range(14):  # 3个客户端        thread = TestClient(i)        thread.start()        threads.append(thread)        time.sleep(0.5)  # 避免同时连接    # 等待所有线程完成    for thread in threads:        thread.join()    print("多客户端测试完成")

总结

Python网络编程是现代应用开发的基础技能,关键要点:

  • 分层理解:从TCP/IP协议栈到应用层协议的系统性理解

  • socket核心:掌握socket API,理解TCP/UDP差异

  • 并发处理:熟练使用多线程、多进程、异步I/O处理并发连接

  • 协议实现:能够实现自定义协议和常见应用层协议

  • 安全实践:重视网络安全,实现加密、认证和防护

核心技能矩阵:

技能层级
关键能力
典型应用
基础
socket编程、TCP/UDP、基本并发
客户端工具、简单服务器
中级
异步I/O、HTTP协议、网络安全
Web服务器、API服务、网络工具
高级
高性能网络、分布式系统、协议设计
大规模系统、自定义协议、网络框架

下一步学习建议:

  • 实践网络编程项目,如聊天室、文件服务器

  • 学习高级主题:异步I/O (asyncio)、协程

  • 探索网络安全和性能优化

  • 研究分布式系统和云计算网络

明天我们将学习socket编程的深入内容,探索更高级的网络编程技术和模式。

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-02-08 15:21:42 HTTP/2.0 GET : https://f.mffb.com.cn/a/463574.html
  2. 运行时间 : 0.119058s [ 吞吐率:8.40req/s ] 内存消耗:5,230.81kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=0357a5a23df410a1bb9e0c8898e5cd1e
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000985s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.001433s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000669s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000684s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.001353s ]
  6. SELECT * FROM `set` [ RunTime:0.000619s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.001541s ]
  8. SELECT * FROM `article` WHERE `id` = 463574 LIMIT 1 [ RunTime:0.001526s ]
  9. UPDATE `article` SET `lasttime` = 1770535302 WHERE `id` = 463574 [ RunTime:0.013578s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000713s ]
  11. SELECT * FROM `article` WHERE `id` < 463574 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.001155s ]
  12. SELECT * FROM `article` WHERE `id` > 463574 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.001020s ]
  13. SELECT * FROM `article` WHERE `id` < 463574 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.001808s ]
  14. SELECT * FROM `article` WHERE `id` < 463574 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.001715s ]
  15. SELECT * FROM `article` WHERE `id` < 463574 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.002469s ]
0.122972s