Hello,学习搭子!咱们一起闯过了Python内置模块、第三方库生态的重重关卡,是不是感觉Python已经无所不能了?但你知道吗,Python真正的魅力在于它能 “连接世界” —— 让不同的计算机、设备、应用程序相互交流!今天,我就带你走进Python网络编程的新世界,一起掌握Socket编程、TCP/UDP协议、HTTP客户端实现,让你开发的程序也能“交朋友”、“聊聊天”!
- • 浏览器输入网址后,网页内容是怎么“飞”到你屏幕上的?
如果你的答案是“好奇但不知道原理”,那今天这篇文章就是为你量身定做的。咱们要系统学习 Python网络编程的核心三剑客:Socket编程基础、TCP/UDP协议差异、以及HTTP客户端实现。通过设计 聊天应用雏形、文件传输工具、HTTP客户端 等实际案例,你将掌握如何让Python程序与外界通信。
学习目标:学完本章,你不仅能理解网络通信的基本原理,还能亲手创建TCP/UDP服务器和客户端,实现简单的网络应用,并为后续学习高并发编程打下坚实基础。预期成果:掌握Python网络编程的核心概念与工具,能独立实现简单聊天室、文件传输、HTTP请求等网络功能,理解客户端-服务器架构的本质。
Socket(套接字)是网络通信的基石,它就像计算机世界里的“电话插座”——插上就能通话!在Python中,socket模块让这一切变得简单直观。
import socket # 导入socket模块,无需额外安装!# 创建Socket对象就像买一部电话机# family: 地址族,指定使用哪种“电话号码系统”# - socket.AF_INET: IPv4地址(最常用)# - socket.AF_INET6: IPv6地址# type: 套接字类型,决定“通话方式”# - socket.SOCK_STREAM: TCP协议,像打电话一样可靠# - socket.SOCK_DGRAM: UDP协议,像发短信一样快速# 创建TCP Socket(流式套接字)tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 创建UDP Socket(数据报套接字)udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- • 服务器端:像“接听中心”,绑定固定地址,等待客户端连接
- • 客户端:像“拨号方”,知道服务器地址,主动发起连接
# 服务器端方法server_socket.bind((host, port)) # 绑定IP和端口server_socket.listen(backlog) # 开始监听,backlog=最大等待连接数client_socket, addr = server_socket.accept() # 接受客户端连接# 客户端方法client_socket.connect((host, port)) # 连接到服务器# 通用方法data = sock.recv(bufsize) # 接收数据(TCP/UDP略有不同)sent = sock.send(data) # 发送数据sock.close() # 关闭Socket# UDP专用data, addr = sock.recvfrom(bufsize) # 接收数据并获取发送方地址sent = sock.sendto(data, (host, port)) # 发送数据到指定地址
选择TCP还是UDP,就像选择“打电话”还是“发短信”——各有利弊!
- • 有流量控制:防止发送过快导致接收方“消化不良”
HTTP(超文本传输协议)是Web世界的通用语言,Python提供了多种工具来“说”这门语言。
- • 2xx:成功(200 OK,201 Created)
- • 3xx:重定向(301 Moved Permanently)
- • 4xx:客户端错误(404 Not Found)
- • 5xx:服务器错误(500 Internal Server Error)
| | | | |
| requests | | | | |
| httpx | | | | |
| urllib | | | | |
| aiohttp | | | | |
import requests# 基本GET请求response = requests.get('https://api.github.com')print(f"状态码: {response.status_code}")print(f"响应头: {response.headers['content-type']}")print(f"响应内容: {response.text[:100]}...")# 带参数的GET请求params = {'q': 'python', 'page': 1}response = requests.get('https://api.github.com/search/repositories', params=params)# POST请求(提交JSON数据)data = {'username': 'test', 'password': 'test123'}response = requests.post('https://httpbin.org/post', json=data)# 设置请求头headers = {'User-Agent': 'MyApp/1.0','Authorization': 'Bearer your_token'}response = requests.get('https://api.github.com/user', headers=headers)# 处理错误try: response = requests.get('https://api.github.com', timeout=5) response.raise_for_status() # 如果状态码不是2xx,抛出HTTPErrorexcept requests.exceptions.RequestException as e:print(f"请求失败: {e}")# 使用Session保持连接和Cookiesession = requests.Session()session.headers.update({'User-Agent': 'MyApp/1.0'})response1 = session.get('https://example.com/login')response2 = session.get('https://example.com/dashboard') # 保持登录状态
import httpx# 同步客户端(requests替代品)with httpx.Client() as client: response = client.get('https://example.com')print(response.text)# 异步客户端(高性能并发)import asyncioasyncdefmain():asyncwith httpx.AsyncClient() as client:# 并发请求 urls = ['https://example.com/1','https://example.com/2','https://example.com/3' ] tasks = [client.get(url) for url in urls] responses = await asyncio.gather(*tasks)for resp in responses:print(f"状态码: {resp.status_code}")asyncio.run(main())# HTTP/2支持import httpxclient = httpx.Client(http_versions=["HTTP/2"])response = client.get('https://http2.pro/api/v1')print(f"使用的协议: {response.http_version}") # HTTP/2# 直接调用WSGI/ASGI应用(单元测试利器)from myapp import app # Flask/FastAPI应用client = httpx.Client(app=app, base_url="http://testserver")response = client.get("/api/data") # 不启动真实服务器!
- 2. 高并发、异步编程 →
httpx(异步模式) - 4. 纯异步环境、WebSocket →
aiohttp
- • 保持连接复用(使用Session/Client)
场景:最简单的网络通信演示,客户端发送消息,服务器原样返回。
"""tcp_echo_server.pyTCP Echo服务器:接收客户端消息并原样返回"""import socketdefrun_tcp_server(host='127.0.0.1', port=8888):"""运行TCP Echo服务器"""print(f"启动TCP Echo服务器 ({host}:{port})...")# 创建TCP Socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置地址重用(避免重启时的"Address already in use"错误) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)try:# 绑定地址和端口 server_socket.bind((host, port))# 开始监听,最多允许5个客户端等待连接 server_socket.listen(5)print(f"服务器已启动,等待客户端连接...")whileTrue:# 接受客户端连接(阻塞等待) client_socket, client_addr = server_socket.accept()print(f"客户端已连接: {client_addr}")# 设置客户端Socket超时(避免无限等待) client_socket.settimeout(30.0)try:whileTrue:# 接收客户端数据(最大1024字节) data = client_socket.recv(1024)ifnot data:# 客户端断开连接print(f"客户端 {client_addr} 断开连接")break# 解码数据(假设是UTF-8编码的文本) message = data.decode('utf-8', errors='ignore')print(f"收到来自 {client_addr} 的消息: {message}")# 原样返回数据 client_socket.sendall(data)print(f"已回传消息给 {client_addr}")except socket.timeout:print(f"客户端 {client_addr} 超时")except ConnectionResetError:print(f"客户端 {client_addr} 强制断开连接")finally:# 关闭客户端连接 client_socket.close()print(f"已关闭与 {client_addr} 的连接")except KeyboardInterrupt:print("服务器正在关闭...")except Exception as e:print(f"服务器错误: {e}")finally:# 关闭服务器Socket server_socket.close()print("服务器已关闭")if __name__ == "__main__":# 运行服务器(默认监听本地8888端口) run_tcp_server()
"""tcp_echo_client.pyTCP Echo客户端:连接到服务器并发送消息"""import socketimport sysdefrun_tcp_client(host='127.0.0.1', port=8888):"""运行TCP Echo客户端"""print(f"连接到服务器 ({host}:{port})...")# 创建TCP Socket client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:# 连接到服务器 client_socket.connect((host, port))print("连接成功!")# 设置超时时间 client_socket.settimeout(10.0)whileTrue:# 用户输入消息 message = input("请输入消息 (输入 'exit' 退出): ").strip()if message.lower() == 'exit':print("正在断开连接...")breakifnot message:print("消息不能为空,请重新输入")continue# 发送消息到服务器 client_socket.sendall(message.encode('utf-8'))print(f"已发送: {message}")# 接收服务器回传 response = client_socket.recv(1024)ifnot response:print("服务器断开连接")break# 解码并显示回传消息 echo_message = response.decode('utf-8', errors='ignore')print(f"服务器回传: {echo_message}")except ConnectionRefusedError:print(f"连接被拒绝,请确保服务器正在运行 ({host}:{port})")except socket.timeout:print("连接超时,请检查网络或服务器状态")except KeyboardInterrupt:print("用户中断连接")except Exception as e:print(f"客户端错误: {e}")finally:# 关闭连接 client_socket.close()print("客户端已关闭")if __name__ == "__main__":# 从命令行参数获取服务器地址iflen(sys.argv) == 3: server_host = sys.argv[1] server_port = int(sys.argv[2]) run_tcp_client(server_host, server_port)else:# 使用默认地址 run_tcp_client()
# 终端1:启动服务器python tcp_echo_server.py# 终端2:启动客户端python tcp_echo_client.py# 终端3:启动另一个客户端(测试多客户端)python tcp_echo_client.py
场景:快速、无连接的消息传输,适合实时性要求高的场景。
"""udp_message_receiver.pyUDP消息接收器:接收UDP消息并显示"""import socketdefrun_udp_receiver(host='127.0.0.1', port=9999):"""运行UDP消息接收器"""print(f"启动UDP消息接收器 ({host}:{port})...")# 创建UDP Socket udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)try:# 绑定地址和端口 udp_socket.bind((host, port))print(f"接收器已启动,等待UDP消息...")whileTrue:# 接收数据和发送方地址(阻塞等待) data, sender_addr = udp_socket.recvfrom(1024)ifnot data:continue# 解码消息 message = data.decode('utf-8', errors='ignore')print(f"收到来自 {sender_addr} 的UDP消息: {message}")# 可选:发送确认回执 ack_msg = f"已收到消息: {message[:20]}..." udp_socket.sendto(ack_msg.encode('utf-8'), sender_addr)except KeyboardInterrupt:print("接收器正在关闭...")except Exception as e:print(f"接收器错误: {e}")finally:# 关闭Socket udp_socket.close()print("接收器已关闭")if __name__ == "__main__":# 运行UDP接收器 run_udp_receiver()
"""udp_message_sender.pyUDP消息发送器:发送UDP消息到指定地址"""import socketimport sysdefrun_udp_sender(target_host='127.0.0.1', target_port=9999):"""运行UDP消息发送器"""print(f"准备发送UDP消息到 ({target_host}:{target_port})...")# 创建UDP Socket udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)try:whileTrue:# 用户输入消息 message = input("请输入UDP消息 (输入 'exit' 退出): ").strip()if message.lower() == 'exit':print("正在退出...")breakifnot message:print("消息不能为空,请重新输入")continue# 发送消息到目标地址 udp_socket.sendto(message.encode('utf-8'), (target_host, target_port))print(f"已发送UDP消息: {message}")# 可选:等待接收确认回执(设置超时) udp_socket.settimeout(5.0)try: ack_data, _ = udp_socket.recvfrom(1024) ack_msg = ack_data.decode('utf-8', errors='ignore')print(f"收到服务器确认: {ack_msg}")except socket.timeout:print("未收到服务器确认(UDP可能丢失)")except KeyboardInterrupt:print("用户中断发送")except Exception as e:print(f"发送器错误: {e}")finally:# 关闭Socket udp_socket.close()print("发送器已关闭")if __name__ == "__main__":# 从命令行参数获取目标地址iflen(sys.argv) == 3: target_host = sys.argv[1] target_port = int(sys.argv[2]) run_udp_sender(target_host, target_port)else:# 使用默认地址 run_udp_sender()
# 终端1:启动UDP接收器python udp_message_receiver.py# 终端2:启动UDP发送器python udp_message_sender.py# 终端3:启动另一个UDP发送器(测试多发送方)python udp_message_sender.py 127.0.0.1 9999
场景:使用requests和httpx库与Web API交互,获取和处理数据。
"""http_client_demo.py多功能HTTP客户端演示:requests + httpx"""import requestsimport httpximport jsonfrom datetime import datetimeclassMultiHTTPClient:"""多功能HTTP客户端"""def__init__(self):self.session = requests.Session()self.async_client = httpx.AsyncClient(timeout=10.0)# 通用请求头self.headers = {'User-Agent': 'PythonNetworkClient/1.0','Accept': 'application/json','Content-Type': 'application/json; charset=utf-8' }# ========== 同步请求(requests) ==========defget_public_api_data(self, api_url):"""获取公共API数据(同步)"""print(f"正在请求API: {api_url}")try: response = self.session.get( api_url, headers=self.headers, timeout=10.0 ) response.raise_for_status() # 检查HTTP错误# 解析JSON响应 data = response.json()print(f"请求成功!状态码: {response.status_code}")print(f"响应数据类型: {type(data)}")# 显示部分数据ifisinstance(data, list) andlen(data) > 0:print(f"数据条数: {len(data)}")print(f"第一条数据: {json.dumps(data[0], indent=2, ensure_ascii=False)[:200]}...")elifisinstance(data, dict):print(f"数据字段: {list(data.keys())}")print(f"数据预览: {json.dumps(data, indent=2, ensure_ascii=False)[:200]}...")return dataexcept requests.exceptions.Timeout:print(f"请求超时: {api_url}")returnNoneexcept requests.exceptions.HTTPError as e:print(f"HTTP错误: {e}")returnNoneexcept Exception as e:print(f"请求异常: {e}")returnNonedefpost_to_api(self, api_url, payload):"""向API提交数据(同步)"""print(f"正在提交数据到: {api_url}")print(f"提交内容: {json.dumps(payload, indent=2, ensure_ascii=False)}")try: response = self.session.post( api_url, json=payload, headers=self.headers, timeout=10.0 ) response.raise_for_status()print(f"提交成功!状态码: {response.status_code}")if response.content: result = response.json()print(f"服务器响应: {json.dumps(result, indent=2, ensure_ascii=False)}")return resultreturnNoneexcept Exception as e:print(f"提交失败: {e}")returnNone# ========== 异步请求(httpx) ==========asyncdefasync_get_data(self, urls):"""异步获取多个URL的数据"""print(f"开始异步请求 {len(urls)} 个URL...") start_time = datetime.now() results = []try:# 并发请求 tasks = [self.async_client.get(url) for url in urls] responses = await asyncio.gather(*tasks, return_exceptions=True)for i, resp inenumerate(responses): url = urls[i]ifisinstance(resp, Exception):print(f"请求失败 {url}: {resp}") results.append({"url": url, "error": str(resp)})else:try: resp.raise_for_status() data = resp.json()print(f"成功 {url}: 状态码 {resp.status_code}, 数据长度 {len(str(data))}") results.append({"url": url, "data": data})except Exception as e:print(f"处理失败 {url}: {e}") results.append({"url": url, "error": str(e)})except Exception as e:print(f"异步请求异常: {e}") end_time = datetime.now() elapsed = (end_time - start_time).total_seconds()print(f"异步请求完成!耗时: {elapsed:.2f} 秒")return resultsasyncdefasync_close(self):"""关闭异步客户端"""awaitself.async_client.aclose()print("异步客户端已关闭")# ========== 工具方法 ==========defclose(self):"""关闭所有客户端"""self.session.close()print("同步会话已关闭")# ========== 示例使用 ==========defdemo_sync_requests():"""演示同步HTTP请求""" client = MultiHTTPClient()print("=" * 60)print("演示1: 同步GET请求 - 获取公共API数据")print("=" * 60)# 示例1: 获取JSONPlaceholder数据(模拟REST API) api_url = "https://jsonplaceholder.typicode.com/posts" data = client.get_public_api_data(api_url)if data:print(f"获取到 {len(data)} 篇文章")# 示例2: 获取单个文章print("\n" + "=" * 60)print("演示2: 获取单篇文章")print("=" * 60) single_post = client.get_public_api_data("https://jsonplaceholder.typicode.com/posts/1" )if single_post:print(f"文章标题: {single_post.get('title', '无标题')}")print(f"文章内容: {single_post.get('body', '无内容')[:100]}...")# 示例3: 提交数据print("\n" + "=" * 60)print("演示3: POST提交新文章")print("=" * 60) new_post = {"title": "Python网络编程实战","body": "今天我们一起学习了Python网络编程的基础知识...","userId": 1 } result = client.post_to_api("https://jsonplaceholder.typicode.com/posts", new_post )if result:print(f"服务器返回ID: {result.get('id', '未知')}")# 关闭客户端 client.close()asyncdefdemo_async_requests():"""演示异步HTTP请求""" client = MultiHTTPClient()print("\n" + "=" * 60)print("演示4: 异步并发请求")print("=" * 60)# 定义多个要请求的URL urls = ["https://jsonplaceholder.typicode.com/posts/1","https://jsonplaceholder.typicode.com/posts/2","https://jsonplaceholder.typicode.com/posts/3","https://jsonplaceholder.typicode.com/comments/1","https://jsonplaceholder.typicode.com/albums/1" ]# 异步并发请求 results = await client.async_get_data(urls)print(f"总请求数: {len(urls)}")print(f"成功数: {len([r for r in results if'data'in r])}")print(f"失败数: {len([r for r in results if'error'in r])}")# 关闭异步客户端await client.async_close()# ========== 主程序 ==========if __name__ == "__main__":import asyncio# 运行同步演示 demo_sync_requests()# 运行异步演示 asyncio.run(demo_async_requests())print("\n" + "=" * 60)print("HTTP客户端演示完成!")print("=" * 60)
场景:实现一个支持多客户端连接的简单聊天室服务器。
"""simple_chat_server.py简单聊天室服务器:支持多客户端连接"""import socketimport threadingimport timefrom datetime import datetimeclassSimpleChatServer:"""简单聊天室服务器"""def__init__(self, host='127.0.0.1', port=7777):self.host = hostself.port = portself.server_socket = Noneself.clients = {} # 客户端连接字典 {client_id: (socket, address, username)}self.client_counter = 0self.running = Falseself.lock = threading.Lock()defstart(self):"""启动服务器"""print(f"启动聊天服务器 ({self.host}:{self.port})...")# 创建TCP Socketself.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)try:# 绑定地址和端口self.server_socket.bind((self.host, self.port))# 开始监听self.server_socket.listen(10)self.running = Trueprint(f"服务器已启动在 {self.host}:{self.port}")print("等待客户端连接...")# 接受客户端连接的线程 accept_thread = threading.Thread(target=self.accept_clients) accept_thread.daemon = True accept_thread.start()# 服务器控制台self.server_console()except Exception as e:print(f"服务器启动失败: {e}")finally:self.stop()defaccept_clients(self):"""接受客户端连接"""whileself.running:try:# 接受客户端连接 client_socket, client_addr = self.server_socket.accept()# 生成客户端IDwithself.lock:self.client_counter += 1 client_id = self.client_counter# 创建客户端处理线程 client_thread = threading.Thread( target=self.handle_client, args=(client_socket, client_addr, client_id) ) client_thread.daemon = True client_thread.start()print(f"客户端 {client_id} 已连接: {client_addr}")except Exception as e:ifself.running:print(f"接受客户端连接时出错: {e}")defhandle_client(self, client_socket, client_addr, client_id):"""处理单个客户端连接""" username = f"用户{client_id}"# 添加到客户端字典withself.lock:self.clients[client_id] = (client_socket, client_addr, username)# 欢迎消息 welcome_msg = f"欢迎来到聊天室!你的用户名是: {username}\n" welcome_msg += "输入消息开始聊天,输入 '/exit' 退出\n" welcome_msg += f"当前在线用户: {len(self.clients)} 人\n"try: client_socket.sendall(welcome_msg.encode('utf-8'))# 广播新用户加入 join_msg = f"[系统] {username} 加入了聊天室\n"self.broadcast_message(join_msg, exclude_client_id=client_id)# 处理客户端消息whileself.running:# 接收消息 data = client_socket.recv(1024)ifnot data:# 客户端断开连接break# 解码消息 message = data.decode('utf-8', errors='ignore').strip()ifnot message:continue# 处理特殊命令if message.lower() == '/exit':breakelif message.lower() == '/users':# 显示在线用户列表 users_list = self.get_online_users() client_socket.sendall(users_list.encode('utf-8'))continueelif message.lower().startswith('/name '):# 更改用户名 new_username = message[6:].strip()if new_username: old_username = username username = new_usernamewithself.lock:self.clients[client_id] = (client_socket, client_addr, username) rename_msg = f"[系统] {old_username} 改名为 {username}\n"self.broadcast_message(rename_msg)continue# 格式化聊天消息 timestamp = datetime.now().strftime("%H:%M:%S") chat_msg = f"[{timestamp}] {username}: {message}\n"print(f"收到消息: {chat_msg.strip()}")# 广播消息给所有客户端self.broadcast_message(chat_msg, exclude_client_id=client_id)except (ConnectionResetError, BrokenPipeError):print(f"客户端 {client_id} 异常断开连接")except Exception as e:print(f"处理客户端 {client_id} 时出错: {e}")finally:# 客户端断开处理self.client_disconnected(client_id, username)defclient_disconnected(self, client_id, username):"""客户端断开连接处理"""withself.lock:if client_id inself.clients: client_socket, _, _ = self.clients[client_id] client_socket.close()delself.clients[client_id]# 广播用户离开消息 leave_msg = f"[系统] {username} 离开了聊天室\n"self.broadcast_message(leave_msg)print(f"客户端 {client_id} ({username}) 已断开连接")print(f"当前在线用户: {len(self.clients)} 人")defbroadcast_message(self, message, exclude_client_id=None):"""广播消息给所有客户端(排除指定客户端)"""withself.lock:for cid, (client_socket, _, _) inself.clients.items():if exclude_client_id isnotNoneand cid == exclude_client_id:continuetry: client_socket.sendall(message.encode('utf-8'))except Exception as e:print(f"向客户端 {cid} 发送消息失败: {e}")defget_online_users(self):"""获取在线用户列表"""withself.lock: users = [username for _, _, username inself.clients.values()] users_list = "在线用户列表:\n" users_list += "-" * 30 + "\n"for i, user inenumerate(users, 1): users_list += f"{i}. {user}\n" users_list += f"共计 {len(users)} 人\n"return users_listdefserver_console(self):"""服务器控制台"""print("\n服务器控制台已启动")print("可用命令:")print(" /users - 显示在线用户")print(" /broadcast <消息> - 广播系统消息")print(" /stop - 停止服务器")print(" /help - 显示帮助")print()whileself.running:try: command = input("服务器> ").strip()ifnot command:continueif command.lower() == '/stop':print("正在停止服务器...")self.running = Falsebreakelif command.lower() == '/users': users_list = self.get_online_users()print(users_list)elif command.lower().startswith('/broadcast '): system_msg = f"[系统广播] {command[11:]}\n"self.broadcast_message(system_msg)print(f"已广播系统消息: {system_msg.strip()}")elif command.lower() == '/help':print("可用命令:")print(" /users - 显示在线用户")print(" /broadcast <消息> - 广播系统消息")print(" /stop - 停止服务器")print(" /help - 显示帮助")else:print(f"未知命令: {command}")print("输入 /help 查看可用命令")except (KeyboardInterrupt, EOFError):print("\n正在停止服务器...")self.running = Falsebreakexcept Exception as e:print(f"控制台错误: {e}")defstop(self):"""停止服务器"""self.running = False# 关闭所有客户端连接withself.lock:for client_id, (client_socket, _, _) inlist(self.clients.items()):try: client_socket.close()except Exception:passself.clients.clear()# 关闭服务器Socketifself.server_socket:try:self.server_socket.close()except Exception:passprint("服务器已停止")if __name__ == "__main__":# 启动聊天服务器 server = SimpleChatServer() server.start()
"""simple_chat_client.py简单聊天室客户端"""import socketimport threadingimport sysclassSimpleChatClient:"""简单聊天室客户端"""def__init__(self, server_host='127.0.0.1', server_port=7777):self.server_host = server_hostself.server_port = server_portself.client_socket = Noneself.running = Falseself.username = "匿名用户"defconnect(self):"""连接到服务器"""print(f"正在连接到聊天服务器 ({self.server_host}:{self.server_port})...")# 创建TCP Socketself.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)try:# 连接到服务器self.client_socket.connect((self.server_host, self.server_port))self.running = Trueprint("连接成功!")print("输入消息开始聊天,输入 '/exit' 退出")print("可用命令: /users, /name <新用户名>, /help")print()# 启动接收消息线程 receive_thread = threading.Thread(target=self.receive_messages) receive_thread.daemon = True receive_thread.start()# 启动发送消息线程self.send_messages()except ConnectionRefusedError:print(f"连接被拒绝,请确保服务器正在运行 ({self.server_host}:{self.server_port})")except Exception as e:print(f"连接失败: {e}")finally:self.disconnect()defreceive_messages(self):"""接收服务器消息"""whileself.running:try:# 接收消息 data = self.client_socket.recv(1024)ifnot data:print("服务器断开连接")self.running = Falsebreak# 解码并显示消息 message = data.decode('utf-8', errors='ignore')print(message, end='') # 消息已经包含换行符except ConnectionResetError:print("服务器强制断开连接")self.running = Falsebreakexcept Exception as e:ifself.running:print(f"接收消息时出错: {e}")self.running = Falsebreakdefsend_messages(self):"""发送消息到服务器"""whileself.running:try:# 获取用户输入 message = input().strip()ifnot message:continue# 检查退出命令if message.lower() == '/exit':print("正在断开连接...")self.running = Falsebreak# 发送消息到服务器self.client_socket.sendall(message.encode('utf-8'))except (KeyboardInterrupt, EOFError):print("\n正在断开连接...")self.running = Falsebreakexcept Exception as e:print(f"发送消息时出错: {e}")self.running = Falsebreakdefdisconnect(self):"""断开连接"""self.running = Falseifself.client_socket:try:self.client_socket.close()except Exception:passprint("客户端已关闭")defmain():"""主函数"""# 从命令行参数获取服务器地址iflen(sys.argv) == 3: server_host = sys.argv[1] server_port = int(sys.argv[2])else:# 使用默认地址 server_host = '127.0.0.1' server_port = 7777# 创建并启动客户端 client = SimpleChatClient(server_host, server_port) client.connect()if __name__ == "__main__": main()
# 终端1:启动聊天服务器python simple_chat_server.py# 终端2:启动第一个聊天客户端python simple_chat_client.py# 终端3:启动第二个聊天客户端python simple_chat_client.py# 终端4:启动第三个聊天客户端python simple_chat_client.py
创建TCP服务器,当客户端连接时,返回当前时间戳。
"""练习:TCP时间戳服务器要求:1. 服务器监听8888端口2. 客户端连接时,服务器发送当前时间(格式:YYYY-MM-DD HH:MM:SS)3. 客户端接收并打印时间4. 客户端断开连接"""# 服务器端代码框架import socketfrom datetime import datetimedeftime_server():# TODO: 创建TCP Socket# TODO: 绑定地址和端口# TODO: 开始监听# TODO: 接受客户端连接# TODO: 获取当前时间并发送# TODO: 关闭连接pass# 客户端代码框架deftime_client():# TODO: 创建TCP Socket# TODO: 连接到服务器# TODO: 接收时间数据# TODO: 打印时间# TODO: 关闭连接pass
创建UDP服务器,统计接收到的消息数量并返回给客户端。
"""练习:UDP消息计数器要求:1. 服务器监听9999端口(UDP)2. 客户端发送消息到服务器3. 服务器统计这是第几条消息,返回"这是第X条消息"4. 客户端打印服务器返回"""# 提示:UDP无需连接,直接使用sendto/recvfrom
"""文件传输协议设计:1. 客户端先发送文件名(以\n结尾)2. 服务器回复"READY"3. 客户端发送文件大小(整数)4. 服务器回复"GO_AHEAD"5. 客户端分块发送文件内容6. 服务器接收并保存7. 传输完成,服务器回复"SUCCESS""""
使用requests库从公共API获取数据并进行分析。
- 1. 从JSONPlaceholder API获取帖子、评论、用户数据
"""API端点参考:- 帖子:https://jsonplaceholder.typicode.com/posts- 评论:https://jsonplaceholder.typicode.com/comments- 用户:https://jsonplaceholder.typicode.com/users统计要求:1. 每个用户写了多少篇帖子2. 每个用户的帖子收到了多少评论3. 保存为CSV:用户ID,用户名,帖子数,评论数"""
- 1. 私聊:支持用户间私密聊天(命令:/msg <用户名> <消息>)
- 3. 用户状态:显示用户在线状态(在线、忙碌、离开)
"""协议扩展设计:1. 私聊协议:/msg 目标用户 消息内容2. 文件传输:先发送文件信息(文件名、大小),确认后传输3. 状态设置:/status <在线|忙碌|离开>4. 历史记录:新用户连接时,服务器发送最近20条消息"""
"""代理服务器工作流程:1. 监听本地端口(如8080)2. 接收客户端HTTP请求3. 解析请求,确定目标服务器4. 转发请求到目标服务器5. 接收响应,返回给客户端6. 可选:缓存、过滤、日志"""
- 1. 服务器收集系统信息(CPU、内存、磁盘、网络)
"""系统设计:1. 服务器:定期收集系统信息,广播给所有客户端2. 客户端:连接服务器,接收并显示实时数据3. 数据存储:使用SQLite存储历史数据4. 图表:使用matplotlib绘制历史趋势图"""
通过今天的实战,咱们一起掌握了Python网络编程的核心技能:
- 1. Socket编程基础:网络通信的"万能插座",掌握创建、绑定、监听、连接、收发、关闭全流程
- • TCP:面向连接、可靠传输,适合文件传输、网页浏览
- • UDP:无连接、快速传输,适合实时视频、在线游戏
- • requests:简单易用的同步HTTP客户端
- • httpx:现代异步HTTP客户端,支持HTTP/2
- • TCP Echo服务器/客户端:网络通信的"Hello World"
这些技能让你的Python程序从"单机模式"升级到"联网模式":
- • ✅ 客户端-服务器架构:理解网络应用的基本模型
- • ✅ Socket通信:掌握最底层的网络编程接口
记住:网络编程的本质是进程间通信。无论多么复杂的网络应用,都是由客户端和服务器之间的数据交换构成的。理解这一点,你就掌握了网络编程的核心。