一、PyTorch概述与设计哲学
1. PyTorch生态系统全览
import matplotlib.pyplot as pltdef visualize_pytorch_ecosystem(): """可视化PyTorch生态系统""" components = { 'PyTorch Core': ['张量计算', '自动微分', 'GPU加速'], 'torch.nn': ['神经网络模块', '损失函数', '优化器'], 'torchvision': ['计算机视觉', '数据集', '预训练模型'], 'torchaudio': ['音频处理', '音频数据集', '音频模型'], 'torchtext': ['文本处理', 'NLP数据集', '文本模型'], 'TorchServe': ['模型部署', '推理服务', '模型管理'], 'TorchScript': ['模型序列化', '跨语言部署', '性能优化'], 'PyTorch Lightning': ['训练抽象', '实验管理', '可复现性'], 'Hugging Face Transformers': ['预训练模型', 'NLP工具', '模型共享'], 'Detectron2': ['目标检测', '实例分割', '姿态估计'] } # 创建生态系统图 fig, ax = plt.subplots(figsize=(14, 8)) ax.axis('off') # 设置位置 positions = { 'PyTorch Core': (0.5, 0.8), 'torch.nn': (0.3, 0.7), 'torchvision': (0.2, 0.6), 'torchaudio': (0.4, 0.6), 'torchtext': (0.6, 0.6), 'TorchServe': (0.8, 0.7), 'TorchScript': (0.7, 0.5), 'PyTorch Lightning': (0.3, 0.5), 'Hugging Face Transformers': (0.5, 0.4), 'Detectron2': (0.7, 0.3) } # 绘制组件 for component, (x, y) in positions.items(): # 绘制框 box = plt.Rectangle((x-0.08, y-0.03), 0.16, 0.06, facecolor='#FF6B6B', edgecolor='#C44D58', alpha=0.8) ax.add_patch(box) # 添加文本 ax.text(x, y, component, ha='center', va='center', fontsize=9, fontweight='bold', color='white') # 添加功能描述 features = components[component] feature_text = '\n'.join(features) ax.text(x, y-0.02, feature_text, ha='center', va='top', fontsize=7, fontstyle='italic') # 添加连接线 connections = [ ('PyTorch Core', 'torch.nn'), ('PyTorch Core', 'torchvision'), ('PyTorch Core', 'torchaudio'), ('PyTorch Core', 'torchtext'), ('PyTorch Core', 'TorchServe'), ('PyTorch Core', 'TorchScript'), ('torch.nn', 'PyTorch Lightning'), ('torchvision', 'Detectron2'), ('torchtext', 'Hugging Face Transformers') ] for start, end in connections: x1, y1 = positions[start] x2, y2 = positions[end] ax.annotate('', xy=(x2, y2-0.03), xytext=(x1, y1+0.03), arrowprops=dict(arrowstyle='->', color='gray', alpha=0.6, lw=1.5)) ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.set_title('PyTorch生态系统架构', fontsize=16, fontweight='bold', color='#333333') plt.show()visualize_pytorch_ecosystem()
2. PyTorch vs TensorFlow 对比
def compare_pytorch_tensorflow(): """PyTorch与TensorFlow详细对比""" comparison_data = { '特性': ['动态计算图', '静态计算图', 'API风格', '调试体验', '研究友好度', '生产部署', '社区生态', '学习曲线'], 'PyTorch': [ 'Eager模式为主\n动态构建计算图\n调试方便', '通过TorchScript\n支持静态图', 'Pythonic\n面向对象\n直观易用', '优秀的Python\n调试器支持\n实时错误检查', '非常高\n适合原型开发\n和学术研究', '良好\nTorchServe支持\n但相对较新', '快速增长\n研究社区强大\n开源活跃', '平缓\n符合Python习惯' ], 'TensorFlow': [ 'Eager模式支持\n但传统是静态图', '静态图为主\n优化性能好', '函数式\n有时冗长\n有学习成本', '图模式调试\n复杂', '高\n但传统上更\n适合生产', '非常成熟\nTF Serving完善\n工具链完整', '庞大成熟\n工业界广泛使用\n企业支持强', '较陡峭\n概念较多' ] } import pandas as pd df = pd.DataFrame(comparison_data) print("PyTorch vs TensorFlow 详细对比") print("=" * 120) # 格式化输出 for idx, row in df.iterrows(): print(f"\n{row['特性']}:") print(f" PyTorch: {row['PyTorch']}") print(f" TensorFlow: {row['TensorFlow']}") print("\n" + "=" * 120) print("\n选择建议:") print("1. 学术研究/快速原型: 选择 PyTorch") print("2. 生产部署/企业应用: 考虑 TensorFlow") print("3. 入门学习: PyTorch 更容易上手") print("4. 多框架掌握: 学习两者,了解各自优势") print("5. 最新趋势: 两者都在互相借鉴,差距在缩小") return df# 显示对比compare_pytorch_tensorflow()
二、PyTorch安装与环境配置
1. 安装指南
def setup_pytorch_environment(): """PyTorch环境配置指南""" setups = { '基础安装 (CPU版本)': { '命令': 'pip install torch torchvision torchaudio', '说明': '安装CPU版本,适合学习和开发', '验证代码': '''import torchprint(f"PyTorch版本: {torch.__version__}")print(f"CUDA是否可用: {torch.cuda.is_available()}") ''' }, 'GPU支持 (CUDA 11.8)': { '命令': 'pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118', '说明': '安装CUDA 11.8支持的GPU版本', '前提条件': [ 'NVIDIA GPU (Compute Capability 3.5+)', 'CUDA Toolkit 11.8', 'cuDNN 8.6+' ] }, 'conda安装': { '命令': 'conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia', '说明': '使用conda安装(推荐)', '优势': '自动处理依赖,环境管理方便' }, 'Docker安装': { '命令': 'docker pull pytorch/pytorch:latest', '说明': '使用Docker容器', '运行命令': 'docker run -it --gpus all pytorch/pytorch:latest python' }, '特定版本': { '命令': 'pip install torch==2.1.0 torchvision==0.16.0 torchaudio==2.1.0', '说明': '安装特定版本(生产环境推荐)' }, '开发版本': { '命令': 'pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu118', '说明': '安装夜间构建版本(最新功能)' } } print("PyTorch安装配置指南") print("=" * 80) for setup_type, setup_info in setups.items(): print(f"\n{setup_type}:") print(f" 命令: {setup_info['命令']}") print(f" 说明: {setup_info['说明']}") if '前提条件' in setup_info: print(f" 前提条件:") for req in setup_info['前提条件']: print(f" • {req}") if '优势' in setup_info: print(f" 优势: {setup_info['优势']}") if '验证代码' in setup_info: print(f" 验证代码: {setup_info['验证代码']}") print("\n" + "=" * 80) print("\n验证安装(复制运行以下代码):") print("""import torchimport torchvision# 打印版本信息print(f"PyTorch版本: {torch.__version__}")print(f"torchvision版本: {torchvision.__version__}")# 检查CUDAif torch.cuda.is_available(): print(f"CUDA可用") print(f"CUDA版本: {torch.version.cuda}") print(f"GPU数量: {torch.cuda.device_count()}") print(f"当前GPU: {torch.cuda.current_device()}") print(f"GPU名称: {torch.cuda.get_device_name(0)}")else: print("CUDA不可用,使用CPU") """)# 显示安装指南setup_pytorch_environment()
2. GPU配置与优化
def configure_gpu_for_pytorch(): """配置PyTorch GPU使用""" config_code = '''import torchimport numpy as npimport timedef configure_gpu_settings(): """配置GPU设置""" print("GPU配置与优化") print("=" * 60) # 1. 检查可用GPU device_count = torch.cuda.device_count() print(f"可用GPU数量: {device_count}") if device_count == 0: print("警告: 未找到GPU,使用CPU运行") device = torch.device('cpu') else: device = torch.device('cuda:0') # 2. 显示GPU信息 for i in range(device_count): gpu_name = torch.cuda.get_device_name(i) gpu_memory = torch.cuda.get_device_properties(i).total_memory / 1e9 print(f"GPU {i}: {gpu_name}, 显存: {gpu_memory:.1f} GB") # 3. 设置当前设备 torch.cuda.set_device(0) print(f"当前GPU: {torch.cuda.current_device()}") # 4. 清空GPU缓存 torch.cuda.empty_cache() # 5. 设置cuDNN基准 torch.backends.cudnn.benchmark = True torch.backends.cudnn.enabled = True # 6. 设置随机种子(确保可复现性) torch.manual_seed(42) torch.cuda.manual_seed(42) torch.cuda.manual_seed_all(42) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # 7. 自动混合精度训练 try: from torch.cuda.amp import autocast, GradScaler scaler = GradScaler() print("自动混合精度训练已启用") except: scaler = None print("自动混合精度训练不可用") return device, scalerdef check_gpu_performance(device): """检查GPU性能""" print("\\nGPU性能测试:") # 创建测试张量 size = 5000 a = torch.randn(size, size, device=device) b = torch.randn(size, size, device=device) # 预热 _ = torch.mm(a, b) torch.cuda.synchronize() # 测量矩阵乘法时间 times = [] for _ in range(10): start = time.time() c = torch.mm(a, b) torch.cuda.synchronize() # 确保计算完成 times.append(time.time() - start) avg_time = np.mean(times) * 1000 # 转换为毫秒 print(f"矩阵乘法 ({size}x{size}) 平均耗时: {avg_time:.2f} ms") print(f"结果形状: {c.shape}") # 内存使用情况 if device.type == 'cuda': memory_allocated = torch.cuda.memory_allocated() / 1e6 memory_reserved = torch.cuda.memory_reserved() / 1e6 print(f"已分配显存: {memory_allocated:.1f} MB") print(f{已保留显存}: {memory_reserved:.1f} MB") return avg_timedef memory_management_demo(): """内存管理演示""" print("\\n内存管理演示:") # 1. 监控内存使用 if torch.cuda.is_available(): # 清空缓存 torch.cuda.empty_cache() # 分配大张量 print("分配大张量...") big_tensor = torch.randn(10000, 10000, device='cuda') print(f"分配后显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB") # 删除张量 del big_tensor torch.cuda.empty_cache() print(f"删除后显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB") # 2. 使用with torch.no_grad()减少内存 print("\\n使用torch.no_grad()减少内存:") with torch.no_grad(): x = torch.randn(1000, 1000, device='cuda') y = torch.randn(1000, 1000, device='cuda') z = x @ y # 不会保存计算图 print(f"no_grad模式下显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")# 运行配置device, scaler = configure_gpu_settings()check_gpu_performance(device)memory_management_demo() ''' print(config_code) print("\n常见GPU问题解决方案:") print("1. CUDA版本不匹配: 使用对应CUDA版本的PyTorch") print("2. 显存不足: 减小batch_size,使用梯度累积") print("3. 多GPU训练: 使用torch.nn.DataParallel或DistributedDataParallel") print("4. 性能优化: 启用cudnn.benchmark,使用混合精度训练") print("5. 内存泄漏: 及时del张量,使用torch.cuda.empty_cache()")# 显示GPU配置configure_gpu_for_pytorch()
三、PyTorch核心概念
1. 张量(Tensor)基础
import torchimport numpy as npdef tensor_basics(): """PyTorch张量基础""" print("=" * 60) print("PyTorch张量基础") print("=" * 60) # 1. 创建张量 print("\n1. 创建张量:") # 标量 (0维张量) scalar = torch.tensor(42) print(f"标量: {scalar}, 形状: {scalar.shape}, 数据类型: {scalar.dtype}") # 向量 (1维张量) vector = torch.tensor([1, 2, 3, 4, 5]) print(f"向量: {vector}, 形状: {vector.shape}") # 矩阵 (2维张量) matrix = torch.tensor([[1, 2], [3, 4], [5, 6]]) print(f"矩阵: {matrix}, 形状: {matrix.shape}") # 3维张量 tensor_3d = torch.tensor([[[1, 2], [3, 4]], [[5, 6], [7, 8]]]) print(f"3维张量: 形状: {tensor_3d.shape}") # 2. 特殊张量 print("\n2. 特殊张量:") # 零张量 zeros = torch.zeros(2, 3) print(f"零张量:\n{zeros}") # 一张量 ones = torch.ones(3, 2) print(f"一张量:\n{ones}") # 单位矩阵 eye = torch.eye(3) print(f"单位矩阵:\n{eye}") # 随机张量 random_normal = torch.randn(2, 2) # 标准正态分布 print(f"正态分布随机张量:\n{random_normal}") random_uniform = torch.rand(2, 2) # 均匀分布[0, 1) print(f"均匀分布随机张量:\n{random_uniform}") # 3. 张量属性 print("\n3. 张量属性:") tensor = torch.tensor([[1, 2, 3], [4, 5, 6]], dtype=torch.float32) print(f"张量:\n{tensor}") print(f"形状: {tensor.shape}") print(f"数据类型: {tensor.dtype}") print(f"维度数: {tensor.dim()}") print(f"元素总数: {tensor.numel()}") print(f"设备: {tensor.device}") print(f"是否requires_grad: {tensor.requires_grad}") print(f"转换为NumPy:\n{tensor.numpy()}") # 4. 张量操作 print("\n4. 张量操作:") a = torch.tensor([[1, 2], [3, 4]], dtype=torch.float32) b = torch.tensor([[5, 6], [7, 8]], dtype=torch.float32) print(f"加法:\n{a + b}") print(f"乘法:\n{a * b}") print(f"矩阵乘法:\n{torch.matmul(a, b)}") print(f"点积:\n{torch.dot(a.flatten(), b.flatten())}") # 重塑 original = torch.tensor([1, 2, 3, 4, 5, 6]) reshaped = original.view(2, 3) print(f"重塑前: {original.shape}, 重塑后: {reshaped.shape}") # 转置 transposed = reshaped.t() print(f"转置: {transposed.shape}") # 拼接 concat = torch.cat([a, b], dim=0) print(f"拼接 (dim=0):\n{concat}") # 5. 广播 print("\n5. 广播机制:") x = torch.tensor([1, 2, 3]) y = torch.tensor([[10], [20], [30]]) print(f"x: {x.shape}, y: {y.shape}") print(f"x + y:\n{x + y}") # 6. 张量与NumPy互操作 print("\n6. 张量与NumPy互操作:") # PyTorch张量 -> NumPy数组 torch_tensor = torch.randn(3, 3) numpy_array = torch_tensor.numpy() print(f"PyTorch张量 -> NumPy数组:\n{numpy_array}") # NumPy数组 -> PyTorch张量 numpy_array = np.random.randn(3, 3) torch_tensor = torch.from_numpy(numpy_array) print(f"NumPy数组 -> PyTorch张量:\n{torch_tensor}") # 注意:共享内存! print(f"共享内存: {torch_tensor.data_ptr() == torch.from_numpy(numpy_array).data_ptr()}") return tensor# 运行张量基础tensor_basics()
2. 自动微分(Autograd)
def autograd_demo(): """PyTorch自动微分演示""" print("=" * 80) print("PyTorch自动微分 (Autograd)") print("=" * 80) # 1. 基本自动微分 print("\n1. 基本自动微分:") basic_code = '''# 创建需要梯度的张量x = torch.tensor(2.0, requires_grad=True)y = torch.tensor(3.0, requires_grad=True)# 定义计算z = x**2 + y**3 + x*yprint(f"x = {x.item()}, y = {y.item()}")print(f"z = x^2 + y^3 + x*y = {z.item()}")# 计算梯度z.backward()print(f"∂z/∂x = {x.grad.item()}") # 2x + y = 2*2 + 3 = 7print(f"∂z/∂y = {y.grad.item()}") # 3y^2 + x = 3*9 + 2 = 29# 验证print(f"验证 ∂z/∂x: 2*{x.item()} + {y.item()} = {2*x.item() + y.item()}")print(f"验证 ∂z/∂y: 3*{y.item()}^2 + {x.item()} = {3*y.item()**2 + x.item()}") ''' print(basic_code) # 2. 计算图可视化 print("\n2. 计算图可视化:") graph_code = '''# 创建更复杂的计算图x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)w = torch.tensor([0.5, 0.3, 0.2], requires_grad=True)b = torch.tensor(1.0, requires_grad=True)# 线性变换y = torch.sum(x * w) + b# 激活函数z = torch.sigmoid(y)print(f"输入 x: {x}")print(f"权重 w: {w}")print(f"偏置 b: {b}")print(f"线性输出 y = Σ(x*w) + b = {y.item()}")print(f"Sigmoid输出 z = σ(y) = {z.item()}")# 计算梯度z.backward()print(f"\\n梯度:")print(f"∂z/∂x = {x.grad}")print(f"∂z/∂w = {w.grad}")print(f"∂z/∂b = {b.grad}")# 手动验证梯度sigmoid_y = torch.sigmoid(y)dz_dy = sigmoid_y * (1 - sigmoid_y) # σ'(y) = σ(y)(1-σ(y))print(f"\\n手动验证:")print(f"∂z/∂y = σ(y)(1-σ(y)) = {dz_dy.item()}")print(f"∂z/∂x = ∂z/∂y * ∂y/∂x = {dz_dy.item()} * w = {dz_dy.item() * w}") ''' print(graph_code) # 3. 梯度控制 print("\n3. 梯度控制:") control_code = '''# 1. 禁用梯度计算print("禁用梯度计算:")with torch.no_grad(): x = torch.tensor([1.0, 2.0, 3.0]) y = x * 2 print(f"x = {x}") print(f"y = x * 2 = {y}") print(f"y.requires_grad = {y.requires_grad}")# 2. 分离张量(从计算图中分离)print("\\n分离张量:")x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)y = x * 2z = y.detach() * 3 # y.detach() 返回不需要梯度的新张量print(f"x.requires_grad = {x.requires_grad}")print(f"y.requires_grad = {y.requires_grad}")print(f"z.requires_grad = {z.requires_grad}")# 3. 梯度累积print("\\n梯度累积:")x = torch.tensor(2.0, requires_grad=True)# 多次前向传播,累积梯度for i in range(3): y = x ** 2 y.backward(retain_graph=True) # 保留计算图 print(f"第 {i+1} 次反向传播,梯度: {x.grad.item()}")# 清空梯度x.grad.zero_()print(f"清空梯度后: {x.grad.item()}")# 4. 自定义梯度函数print("\\n自定义梯度函数:")class MyReLU(torch.autograd.Function): """自定义ReLU函数,带自定义反向传播""" @staticmethod def forward(ctx, input): ctx.save_for_backward(input) # 保存输入用于反向传播 return input.clamp(min=0) @staticmethod def backward(ctx, grad_output): input, = ctx.saved_tensors grad_input = grad_output.clone() grad_input[input < 0] = 0 # ReLU的梯度 return grad_input# 使用自定义函数x = torch.tensor([-1.0, 0.5, 2.0, -0.3], requires_grad=True)y = MyReLU.apply(x)print(f"输入: {x}")print(f"MyReLU输出: {y}")y.sum().backward()print(f"梯度: {x.grad}") ''' print(control_code) # 4. 高阶导数 print("\n4. 高阶导数:") high_order_code = '''# 计算二阶导数x = torch.tensor(3.0, requires_grad=True)y = x**3 + 2*x**2 + x + 1# 一阶导数first_derivative = torch.autograd.grad(y, x, create_graph=True)[0]print(f"y = x^3 + 2x^2 + x + 1")print(f"x = {x.item()}")print(f"y = {y.item()}")print(f"dy/dx = {first_derivative.item()}")# 二阶导数second_derivative = torch.autograd.grad(first_derivative, x)[0]print(f"d²y/dx² = {second_derivative.item()}")# 验证print(f"\\n验证:")print(f"dy/dx = 3x^2 + 4x + 1 = {3*x.item()**2 + 4*x.item() + 1}")print(f"d²y/dx² = 6x + 4 = {6*x.item() + 4}") ''' print(high_order_code) # 5. 实战示例:线性回归梯度 print("\n5. 实战示例:线性回归梯度:") regression_code = '''# 线性回归的梯度计算torch.manual_seed(42)# 生成数据n_samples = 100X = torch.randn(n_samples, 1) # 特征true_w = 2.5true_b = 1.0y = true_w * X + true_b + torch.randn(n_samples, 1) * 0.1 # 添加噪声# 初始化参数w = torch.tensor(0.0, requires_grad=True)b = torch.tensor(0.0, requires_grad=True)# 学习率learning_rate = 0.01# 训练循环(手动梯度下降)print("梯度下降训练线性回归:")for epoch in range(100): # 前向传播 y_pred = w * X + b # 计算损失(均方误差) loss = torch.mean((y_pred - y) ** 2) # 反向传播 loss.backward() # 更新参数(手动) with torch.no_grad(): w -= learning_rate * w.grad b -= learning_rate * b.grad # 清空梯度 w.grad.zero_() b.grad.zero_() if epoch % 20 == 0: print(f"Epoch {epoch:3d}: w = {w.item():.4f}, b = {b.item():.4f}, Loss = {loss.item():.4f}")print(f"\\n真实参数: w = {true_w}, b = {true_b}")print(f"学习参数: w = {w.item():.4f}, b = {b.item():.4f}") ''' print(regression_code) return None# 运行自动微分演示autograd_demo()
四、PyTorch神经网络模块
1. torch.nn.Module基础
def nn_module_basics(): """PyTorch神经网络模块基础""" print("=" * 80) print("PyTorch torch.nn 模块") print("=" * 80) # 1. 基础Module print("\n1. 基础Module类:") basic_module_code = '''import torchimport torch.nn as nnimport torch.nn.functional as Fclass SimpleNet(nn.Module): """简单的全连接神经网络""" def __init__(self, input_size=784, hidden_size=128, output_size=10): super().__init__() # 必须调用父类初始化 # 定义网络层 self.fc1 = nn.Linear(input_size, hidden_size) # 全连接层1 self.fc2 = nn.Linear(hidden_size, hidden_size // 2) # 全连接层2 self.fc3 = nn.Linear(hidden_size // 2, output_size) # 输出层 # Dropout层 self.dropout = nn.Dropout(p=0.2) # BatchNorm层 self.batchnorm = nn.BatchNorm1d(hidden_size // 2) def forward(self, x): """前向传播""" # 展平输入(如果是图像) x = x.view(x.size(0), -1) # 第一层 + ReLU激活 + Dropout x = F.relu(self.fc1(x)) x = self.dropout(x) # 第二层 + ReLU激活 + BatchNorm x = F.relu(self.fc2(x)) x = self.batchnorm(x) x = self.dropout(x) # 输出层(无激活函数,用于分类) x = self.fc3(x) return x# 创建模型实例model = SimpleNet(input_size=784, hidden_size=256, output_size=10)print("模型结构:")print(model)# 前向传播x = torch.randn(32, 784) # 批量大小32,特征784output = model(x)print(f"\\n输入形状: {x.shape}")print(f"输出形状: {output.shape}")# 模型参数print(f"\\n模型参数数量: {sum(p.numel() for p in model.parameters())}")print(f"可训练参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")# 访问特定层print(f"\\n第一层权重形状: {model.fc1.weight.shape}")print(f"第一层偏置形状: {model.fc1.bias.shape}") ''' print(basic_module_code) # 2. 常用层类型 print("\n2. 常用层类型:") layers_code = '''# 常用层类型示例layers_examples = { '线性层': nn.Linear(in_features=10, out_features=5), '卷积层': nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3), '池化层': nn.MaxPool2d(kernel_size=2, stride=2), '循环层': nn.LSTM(input_size=10, hidden_size=20, num_layers=2), '批归一化': nn.BatchNorm2d(num_features=16), 'Dropout': nn.Dropout(p=0.5), '嵌入层': nn.Embedding(num_embeddings=1000, embedding_dim=50), '转置卷积': nn.ConvTranspose2d(in_channels=16, out_channels=3, kernel_size=3), '层归一化': nn.LayerNorm(normalized_shape=128), '实例归一化': nn.InstanceNorm2d(num_features=16), '组归一化': nn.GroupNorm(num_groups=4, num_channels=16), '自适应池化': nn.AdaptiveAvgPool2d(output_size=(7, 7))}print("常用神经网络层:")for name, layer in layers_examples.items(): print(f" • {name}: {layer}") if hasattr(layer, 'weight'): print(f" 权重形状: {layer.weight.shape if hasattr(layer.weight, 'shape') else 'N/A'}")# 激活函数activations = { 'ReLU': nn.ReLU(), 'LeakyReLU': nn.LeakyReLU(negative_slope=0.01), 'Sigmoid': nn.Sigmoid(), 'Tanh': nn.Tanh(), 'Softmax': nn.Softmax(dim=1), 'LogSoftmax': nn.LogSoftmax(dim=1), 'GELU': nn.GELU(), 'SiLU': nn.SiLU(), # 也称为Swish 'Mish': nn.Mish(), 'ELU': nn.ELU(alpha=1.0)}print("\\n常用激活函数:")for name, activation in activations.items(): print(f" • {name}: {activation}") ''' print(layers_code) # 3. 损失函数 print("\n3. 损失函数:") loss_functions_code = '''# 常用损失函数loss_functions = { '均方误差': nn.MSELoss(), '平均绝对误差': nn.L1Loss(), '交叉熵损失': nn.CrossEntropyLoss(), '二元交叉熵': nn.BCELoss(), '带logits的二元交叉熵': nn.BCEWithLogitsLoss(), '负对数似然': nn.NLLLoss(), 'KL散度': nn.KLDivLoss(), 'Huber损失': nn.SmoothL1Loss(), '余弦相似度': nn.CosineEmbeddingLoss(), 'MarginRanking损失': nn.MarginRankingLoss(), '多标签Margin损失': nn.MultiLabelMarginLoss(), 'Triplet损失': nn.TripletMarginLoss(), 'CTCLoss': nn.CTCLoss() # 连接主义时序分类}print("常用损失函数:")for name, loss_fn in loss_functions.items(): print(f" • {name}: {loss_fn}")# 损失函数使用示例print("\\n损失函数使用示例:")# 分类问题criterion_ce = nn.CrossEntropyLoss()outputs = torch.randn(4, 3) # 4个样本,3个类别targets = torch.tensor([0, 2, 1, 0]) # 真实标签loss = criterion_ce(outputs, targets)print(f"交叉熵损失: {loss.item():.4f}")# 回归问题criterion_mse = nn.MSELoss()predictions = torch.tensor([1.2, 2.3, 3.4])targets_reg = torch.tensor([1.0, 2.0, 3.0])loss_mse = criterion_mse(predictions, targets_reg)print(f"均方误差损失: {loss_mse.item():.4f}") ''' print(loss_functions_code) # 4. 优化器 print("\n4. 优化器:") optimizers_code = '''import torch.optim as optim# 常用优化器optimizers = { 'SGD': optim.SGD, 'Momentum': lambda params: optim.SGD(params, momentum=0.9), 'Adam': optim.Adam, 'AdamW': optim.AdamW, 'RMSprop': optim.RMSprop, 'Adagrad': optim.Adagrad, 'Adadelta': optim.Adadelta, 'Adamax': optim.Adamax, 'NAdam': optim.NAdam, 'RAdam': optim.RAdam}print("常用优化器:")for name, optimizer_class in optimizers.items(): print(f" • {name}")# 优化器使用示例print("\\n优化器使用示例:")# 创建简单模型model = nn.Sequential( nn.Linear(10, 20), nn.ReLU(), nn.Linear(20, 1))# 创建优化器optimizer_adam = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))optimizer_sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)print(f"Adam优化器参数:")for param_group in optimizer_adam.param_groups: print(f" 学习率: {param_group['lr']}") print(f" betas: {param_group['betas']}")print(f"\\nSGD优化器参数:")for param_group in optimizer_sgd.param_groups: print(f" 学习率: {param_group['lr']}") print(f" 动量: {param_group['momentum']}")# 学习率调度器schedulers = { 'StepLR': optim.lr_scheduler.StepLR(optimizer_adam, step_size=30, gamma=0.1), 'MultiStepLR': optim.lr_scheduler.MultiStepLR(optimizer_adam, milestones=[30, 80], gamma=0.1), 'ExponentialLR': optim.lr_scheduler.ExponentialLR(optimizer_adam, gamma=0.95), 'CosineAnnealingLR': optim.lr_scheduler.CosineAnnealingLR(optimizer_adam, T_max=100), 'ReduceLROnPlateau': optim.lr_scheduler.ReduceLROnPlateau(optimizer_adam, mode='min', patience=5), 'CyclicLR': optim.lr_scheduler.CyclicLR(optimizer_adam, base_lr=0.001, max_lr=0.01, step_size_up=20)}print("\\n学习率调度器:")for name, scheduler in schedulers.items(): print(f" • {name}: {scheduler}") ''' print(optimizers_code) # 5. 容器模块 print("\n5. 容器模块:") containers_code = '''# 容器模块:组合多个模块print("容器模块示例:")# 1. Sequential - 顺序容器sequential_model = nn.Sequential( nn.Conv2d(3, 16, kernel_size=3, padding=1), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(16, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2), nn.Flatten(), nn.Linear(32 * 8 * 8, 128), nn.ReLU(), nn.Dropout(0.5), nn.Linear(128, 10))print("Sequential模型:")print(sequential_model)# 测试Sequential模型x = torch.randn(4, 3, 32, 32) # 4个样本,3通道,32x32output = sequential_model(x)print(f"输入形状: {x.shape}")print(f"输出形状: {output.shape}")# 2. ModuleList - 模块列表class ModuleListNet(nn.Module): def __init__(self): super().__init__() self.layers = nn.ModuleList([ nn.Linear(10, 20), nn.Linear(20, 30), nn.Linear(30, 40) ]) self.activations = nn.ModuleList([ nn.ReLU(), nn.ReLU(), nn.Sigmoid() ]) def forward(self, x): for layer, activation in zip(self.layers, self.activations): x = activation(layer(x)) return xmodulelist_model = ModuleListNet()print(f"\\nModuleList模型参数数量: {sum(p.numel() for p in modulelist_model.parameters())}")# 3. ModuleDict - 模块字典class ModuleDictNet(nn.Module): def __init__(self): super().__init__() self.operations = nn.ModuleDict({ 'conv1': nn.Conv2d(3, 16, 3), 'conv2': nn.Conv2d(16, 32, 3), 'pool': nn.MaxPool2d(2), 'fc': nn.Linear(32 * 14 * 14, 10) }) def forward(self, x): x = self.operations['conv1'](x) x = self.operations['pool'](x) x = self.operations['conv2'](x) x = self.operations['pool'](x) x = x.view(x.size(0), -1) x = self.operations['fc'](x) return xmoduledict_model = ModuleDictNet()print(f"ModuleDict模型参数数量: {sum(p.numel() for p in moduledict_model.parameters())}")# 4. 参数容器class ParameterNet(nn.Module): def __init__(self): super().__init__() # 参数张量(自动注册为参数) self.weight = nn.Parameter(torch.randn(10, 20)) self.bias = nn.Parameter(torch.zeros(20)) # ParameterList self.weights_list = nn.ParameterList([ nn.Parameter(torch.randn(10, 10)), nn.Parameter(torch.randn(10, 10)) ]) # ParameterDict self.weights_dict = nn.ParameterDict({ 'weight1': nn.Parameter(torch.randn(10, 10)), 'weight2': nn.Parameter(torch.randn(10, 10)) }) def forward(self, x): return x @ self.weight + self.biasparam_model = ParameterNet()print(f"\\nParameter模型参数数量: {sum(p.numel() for p in param_model.parameters())}") ''' print(containers_code) return None# 运行神经网络模块示例nn_module_basics()
2. 数据加载与处理
def data_loading_pipeline(): """PyTorch数据加载与处理""" print("=" * 80) print("PyTorch数据加载与处理") print("=" * 80) # 1. 基础Dataset和DataLoader print("\n1. 基础Dataset和DataLoader:") basic_dataloader_code = '''import torchfrom torch.utils.data import Dataset, DataLoader, TensorDatasetimport numpy as np# 1.1 使用TensorDataset(简单情况)print("TensorDataset示例:")# 创建模拟数据x = torch.randn(1000, 10) # 1000个样本,10个特征y = torch.randint(0, 2, (1000, 1)).float() # 二分类标签# 创建TensorDatasetdataset = TensorDataset(x, y)print(f"数据集大小: {len(dataset)}")print(f"样本形状: {dataset[0][0].shape}, 标签形状: {dataset[0][1].shape}")# 创建DataLoaderdataloader = DataLoader( dataset, batch_size=32, shuffle=True, num_workers=2, # 并行加载数据的工作进程数 pin_memory=True # 如果使用GPU,可以加速数据传输)print(f"DataLoader批次数量: {len(dataloader)}")# 遍历DataLoaderfor batch_idx, (batch_x, batch_y) in enumerate(dataloader): if batch_idx == 0: print(f"第一个批次: 输入形状: {batch_x.shape}, 标签形状: {batch_y.shape}") break# 1.2 自定义Datasetprint("\\n自定义Dataset示例:")class CustomDataset(Dataset): """自定义数据集类""" def __init__(self, data_path, transform=None): """ 初始化数据集 Args: data_path: 数据路径 transform: 数据转换函数 """ # 这里应该加载数据 # 为了示例,我们生成随机数据 self.data = torch.randn(1000, 3, 32, 32) # 1000张32x32 RGB图像 self.labels = torch.randint(0, 10, (1000,)) # 10个类别 self.transform = transform def __len__(self): """返回数据集大小""" return len(self.data) def __getitem__(self, idx): """获取单个样本""" image = self.data[idx] label = self.labels[idx] # 应用变换(如果有) if self.transform: image = self.transform(image) return image, label# 创建自定义数据集custom_dataset = CustomDataset("fake_path")print(f"自定义数据集大小: {len(custom_dataset)}")# 2. 数据变换(Transforms)print("\\n2. 数据变换(Transforms):")import torchvision.transforms as transforms# 图像数据变换image_transforms = transforms.Compose([ transforms.Resize((224, 224)), # 调整大小 transforms.RandomHorizontalFlip(p=0.5), # 随机水平翻转 transforms.RandomRotation(degrees=15), # 随机旋转 transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 颜色抖动 transforms.ToTensor(), # 转换为张量 transforms.Normalize(mean=[0.485, 0.456, 0.406], # 归一化(ImageNet统计) std=[0.229, 0.224, 0.225])])print("图像变换管道:")for transform in image_transforms.transforms: print(f" • {transform}")# 文本数据变换示例print("\\n文本数据变换示例:")text_transforms = transforms.Compose([ # 在实际中,这里可能是tokenizer lambda x: x.lower(), # 转换为小写 lambda x: x.strip(), # 去除空白 # 添加更多文本处理步骤...])# 3. torchvision数据集print("\\n3. torchvision内置数据集:")# 注意:实际使用时需要下载数据try: import torchvision from torchvision import datasets # MNIST数据集示例 mnist_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) # 训练集 mnist_train = datasets.MNIST( root='./data', train=True, download=True, # 如果数据不存在则下载 transform=mnist_transform ) # 测试集 mnist_test = datasets.MNIST( root='./data', train=False, download=True, transform=mnist_transform ) print(f"MNIST训练集大小: {len(mnist_train)}") print(f"MNIST测试集大小: {len(mnist_test)}") # 其他常用数据集 datasets_list = [ ('CIFAR-10', datasets.CIFAR10), ('CIFAR-100', datasets.CIFAR100), ('ImageNet', datasets.ImageNet), ('FashionMNIST', datasets.FashionMNIST), ('COCO', datasets.CocoDetection), ('VOC', datasets.VOCDetection) ] print("\\n其他torchvision数据集:") for name, dataset_class in datasets_list: print(f" • {name}")except ImportError: print("torchvision不可用,请安装: pip install torchvision") ''' print(basic_dataloader_code) # 2. 高级数据加载技巧 print("\n2. 高级数据加载技巧:") advanced_dataloader_code = '''# 1. 自定义collate_fnprint("自定义collate_fn示例:")def custom_collate_fn(batch): """自定义批次处理函数""" # batch是列表,元素为(__getitem__返回的样本, 标签) images = [] labels = [] additional_info = [] for item in batch: # 假设每个样本返回 (image, label, info) image, label, info = item images.append(image) labels.append(label) additional_info.append(info) # 堆叠张量 images = torch.stack(images, dim=0) labels = torch.tensor(labels) return images, labels, additional_info# 2. 采样器(Sampler)from torch.utils.data import Sampler, RandomSampler, SequentialSampler, WeightedRandomSamplerprint("\\n采样器示例:")# 随机采样器random_sampler = RandomSampler(mnist_train)print(f"随机采样器: {random_sampler}")# 顺序采样器sequential_sampler = SequentialSampler(mnist_train)print(f"顺序采样器: {sequential_sampler}")# 加权随机采样器(用于处理类别不平衡)weights = [1.0] * len(mnist_train) # 这里应该是每个样本的权重weighted_sampler = WeightedRandomSampler(weights, num_samples=1000, replacement=True)print(f"加权随机采样器: {weighted_sampler}")# 3. 批采样器(BatchSampler)from torch.utils.data import BatchSamplerbatch_sampler = BatchSampler(random_sampler, batch_size=32, drop_last=False)print(f"批采样器: {batch_sampler}")# 4. 分布式采样器from torch.utils.data.distributed import DistributedSampler# 分布式采样器(用于多GPU训练)# distributed_sampler = DistributedSampler(mnist_train)# print(f"分布式采样器: {distributed_sampler}")# 5. 数据加载器的高级参数print("\\nDataLoader高级参数:")advanced_dataloader = DataLoader( mnist_train, batch_size=64, shuffle=True, num_workers=4, # 并行工作进程数 pin_memory=True, # 加速GPU数据传输 prefetch_factor=2, # 预取因子 persistent_workers=True, # 保持工作进程活跃 drop_last=True # 丢弃最后一个不完整的批次)print(f"高级DataLoader配置:")print(f" batch_size: {advanced_dataloader.batch_size}")print(f" num_workers: {advanced_dataloader.num_workers}")print(f" pin_memory: {advanced_dataloader.pin_memory}")print(f" drop_last: {advanced_dataloader.drop_last}")# 6. 迭代DataLoader的多种方式print("\\n迭代DataLoader的多种方式:")# 方式1: 直接迭代print("方式1: 直接迭代")for batch in advanced_dataloader: images, labels = batch print(f"批次形状: {images.shape}") break# 方式2: 使用enumerate获取批次索引print("\\n方式2: 使用enumerate")for batch_idx, (images, labels) in enumerate(advanced_dataloader): if batch_idx == 0: print(f"批次 {batch_idx}: {images.shape}") break# 方式3: 使用tqdm显示进度条print("\\n方式3: 使用tqdm进度条")try: from tqdm import tqdm # 创建进度条 pbar = tqdm(advanced_dataloader, desc="训练", total=len(advanced_dataloader)) for images, labels in pbar: # 在这里进行训练 # pbar.set_description(f"训练 (Loss: {loss:.4f})") break print("使用tqdm显示进度条")except ImportError: print("tqdm未安装,使用: pip install tqdm") ''' print(advanced_dataloader_code) # 3. 实战示例:完整数据管道 print("\n3. 实战示例:完整数据管道:") practical_pipeline_code = '''import torchfrom torch.utils.data import Dataset, DataLoaderimport torchvision.transforms as transformsfrom PIL import Imageimport osclass ImageClassificationDataset(Dataset): """图像分类数据集""" def __init__(self, root_dir, transform=None, split='train'): """ 初始化数据集 Args: root_dir: 数据根目录 transform: 数据变换 split: 'train' 或 'val' """ self.root_dir = root_dir self.transform = transform self.split = split # 组织数据 self.image_paths = [] self.labels = [] self.class_to_idx = {} # 假设目录结构为: # root_dir/ # train/ # class1/ # img1.jpg # img2.jpg # class2/ # img1.jpg # val/ # class1/ # class2/ split_dir = os.path.join(root_dir, split) if os.path.exists(split_dir): # 获取类别 classes = sorted(os.listdir(split_dir)) self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)} # 遍历所有类别 for class_name in classes: class_dir = os.path.join(split_dir, class_name) class_idx = self.class_to_idx[class_name] # 遍历类别的所有图像 for img_name in os.listdir(class_dir): if img_name.lower().endswith(('.png', '.jpg', '.jpeg')): img_path = os.path.join(class_dir, img_name) self.image_paths.append(img_path) self.labels.append(class_idx) else: print(f"警告: 目录 {split_dir} 不存在") def __len__(self): return len(self.image_paths) def __getitem__(self, idx): img_path = self.image_paths[idx] label = self.labels[idx] # 加载图像 try: image = Image.open(img_path).convert('RGB') except Exception as e: print(f"无法加载图像 {img_path}: {e}") # 返回占位符 image = Image.new('RGB', (224, 224), color='white') # 应用变换 if self.transform: image = self.transform(image) return image, label def get_class_distribution(self): """获取类别分布""" from collections import Counter return Counter(self.labels)# 数据增强变换train_transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.RandomRotation(20), transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])# 验证集变换(不需要数据增强)val_transform = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])def create_data_loaders(data_dir, batch_size=32, num_workers=4): """创建数据加载器""" # 创建数据集 train_dataset = ImageClassificationDataset( root_dir=data_dir, transform=train_transform, split='train' ) val_dataset = ImageClassificationDataset( root_dir=data_dir, transform=val_transform, split='val' ) print(f"训练集大小: {len(train_dataset)}") print(f"验证集大小: {len(val_dataset)}") print(f"类别数量: {len(train_dataset.class_to_idx)}") # 创建数据加载器 train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True, drop_last=True ) val_loader = DataLoader( val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True ) return train_loader, val_loader, train_dataset.class_to_idx# 使用示例print("完整数据管道示例:")print("=" * 50)# 注意:这里使用虚拟目录,实际使用时需要真实数据# train_loader, val_loader, class_to_idx = create_data_loaders('./data')print("数据管道创建完成!")print("包含以下功能:")print("1. 从目录结构自动加载图像和标签")print("2. 训练集和验证集不同的数据增强")print("3. 支持多进程数据加载")print("4. 自动类别映射")print("5. 错误处理(损坏图像)")# 4. 数据缓存和性能优化print("\\n4. 数据缓存和性能优化:")class CachedDataset(Dataset): """带缓存的数据集""" def __init__(self, dataset, cache_size=1000): self.dataset = dataset self.cache = {} self.cache_size = cache_size def __len__(self): return len(self.dataset) def __getitem__(self, idx): if idx in self.cache: return self.cache[idx] item = self.dataset[idx] # 如果缓存未满,添加到缓存 if len(self.cache) < self.cache_size: self.cache[idx] = item return itemprint("带缓存的数据集可以加速数据加载,特别是当数据预处理很耗时的时候") ''' print(practical_pipeline_code) return None# 运行数据加载示例data_loading_pipeline()
五、PyTorch训练与验证
1. 基础训练循环
def basic_training_loop(): """PyTorch基础训练循环""" print("=" * 80) print("PyTorch基础训练循环") print("=" * 80) # 1. 完整训练示例 print("\n1. 完整训练示例:") training_code = '''import torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import DataLoader, TensorDatasetimport numpy as npfrom sklearn.model_selection import train_test_split# 设置随机种子以确保可复现性torch.manual_seed(42)np.random.seed(42)# 1. 准备数据def prepare_data(): """准备模拟数据""" n_samples = 1000 n_features = 20 n_classes = 3 # 生成数据 X = np.random.randn(n_samples, n_features).astype(np.float32) y = np.random.randint(0, n_classes, n_samples).astype(np.int64) # 划分训练集和验证集 X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # 转换为PyTorch张量 X_train_tensor = torch.from_numpy(X_train) y_train_tensor = torch.from_numpy(y_train) X_val_tensor = torch.from_numpy(X_val) y_val_tensor = torch.from_numpy(y_val) # 创建数据集和数据加载器 train_dataset = TensorDataset(X_train_tensor, y_train_tensor) val_dataset = TensorDataset(X_val_tensor, y_val_tensor) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) return train_loader, val_loader, n_features, n_classes# 2. 定义模型class SimpleClassifier(nn.Module): """简单的分类器""" def __init__(self, input_size, hidden_size, num_classes): super().__init__() self.model = nn.Sequential( nn.Linear(input_size, hidden_size), nn.BatchNorm1d(hidden_size), nn.ReLU(), nn.Dropout(0.3), nn.Linear(hidden_size, hidden_size // 2), nn.BatchNorm1d(hidden_size // 2), nn.ReLU(), nn.Dropout(0.3), nn.Linear(hidden_size // 2, num_classes) ) def forward(self, x): return self.model(x)# 3. 训练函数def train_epoch(model, dataloader, criterion, optimizer, device): """训练一个epoch""" model.train() # 设置为训练模式 running_loss = 0.0 correct = 0 total = 0 for batch_idx, (inputs, targets) in enumerate(dataloader): # 移动到设备 inputs, targets = inputs.to(device), targets.to(device) # 清零梯度 optimizer.zero_grad() # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) # 反向传播 loss.backward() # 梯度裁剪(防止梯度爆炸) torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # 更新参数 optimizer.step() # 统计 running_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() # 打印进度 if batch_idx % 10 == 0: print(f' Batch {batch_idx}/{len(dataloader)}: Loss={loss.item():.4f}') epoch_loss = running_loss / len(dataloader) epoch_acc = 100. * correct / total return epoch_loss, epoch_acc# 4. 验证函数def validate_epoch(model, dataloader, criterion, device): """验证一个epoch""" model.eval() # 设置为评估模式 running_loss = 0.0 correct = 0 total = 0 with torch.no_grad(): # 禁用梯度计算 for inputs, targets in dataloader: # 移动到设备 inputs, targets = inputs.to(device), targets.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) # 统计 running_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() epoch_loss = running_loss / len(dataloader) epoch_acc = 100. * correct / total return epoch_loss, epoch_acc# 5. 主训练循环def main(): """主训练函数""" print("开始训练...") print("=" * 50) # 配置 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"使用设备: {device}") # 准备数据 train_loader, val_loader, n_features, n_classes = prepare_data() # 创建模型 model = SimpleClassifier( input_size=n_features, hidden_size=128, num_classes=n_classes ).to(device) # 损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4) # 学习率调度器 scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5) # 训练参数 num_epochs = 20 best_val_acc = 0.0 # 记录训练历史 history = { 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [] } # 训练循环 for epoch in range(num_epochs): print(f"\\nEpoch {epoch+1}/{num_epochs}") print("-" * 30) # 训练 train_loss, train_acc = train_epoch( model, train_loader, criterion, optimizer, device ) # 验证 val_loss, val_acc = validate_epoch( model, val_loader, criterion, device ) # 更新学习率 scheduler.step() # 记录历史 history['train_loss'].append(train_loss) history['train_acc'].append(train_acc) history['val_loss'].append(val_loss) history['val_acc'].append(val_acc) # 打印结果 print(f"训练结果: 损失={train_loss:.4f}, 准确率={train_acc:.2f}%") print(f"验证结果: 损失={val_loss:.4f}, 准确率={val_acc:.2f}%") print(f"学习率: {optimizer.param_groups[0]['lr']:.6f}") # 保存最佳模型 if val_acc > best_val_acc: best_val_acc = val_acc torch.save({ 'epoch': epoch, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'val_acc': val_acc, }, 'best_model.pth') print(f"保存最佳模型,验证准确率: {val_acc:.2f}%") print("\\n训练完成!") print(f"最佳验证准确率: {best_val_acc:.2f}%") return model, history# 运行训练model, history = main() ''' print(training_code) # 2. 高级训练技巧 print("\n2. 高级训练技巧:") advanced_training_code = '''import torchimport torch.nn as nnimport torch.optim as optimfrom torch.cuda.amp import autocast, GradScalerclass AdvancedTrainer: """高级训练器""" def __init__(self, model, device, use_amp=True): self.model = model self.device = device self.use_amp = use_amp # 自动混合精度训练 if use_amp: self.scaler = GradScaler() else: self.scaler = None # 指标追踪 self.metrics = { 'train': {'loss': [], 'acc': []}, 'val': {'loss': [], 'acc': []} } def train_epoch(self, train_loader, criterion, optimizer): """训练一个epoch(支持混合精度)""" self.model.train() total_loss = 0.0 correct = 0 total = 0 for batch_idx, (inputs, targets) in enumerate(train_loader): inputs, targets = inputs.to(self.device), targets.to(self.device) # 清零梯度 optimizer.zero_grad() # 前向传播(支持混合精度) if self.use_amp: with autocast(): outputs = self.model(inputs) loss = criterion(outputs, targets) # 反向传播(缩放梯度) self.scaler.scale(loss).backward() # 取消缩放梯度并更新参数 self.scaler.step(optimizer) self.scaler.update() else: outputs = self.model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() # 统计 total_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() epoch_loss = total_loss / len(train_loader) epoch_acc = 100. * correct / total self.metrics['train']['loss'].append(epoch_loss) self.metrics['train']['acc'].append(epoch_acc) return epoch_loss, epoch_acc def validate_epoch(self, val_loader, criterion): """验证一个epoch""" self.model.eval() total_loss = 0.0 correct = 0 total = 0 with torch.no_grad(): for inputs, targets in val_loader: inputs, targets = inputs.to(self.device), targets.to(self.device) outputs = self.model(inputs) loss = criterion(outputs, targets) total_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() epoch_loss = total_loss / len(val_loader) epoch_acc = 100. * correct / total self.metrics['val']['loss'].append(epoch_loss) self.metrics['val']['acc'].append(epoch_acc) return epoch_loss, epoch_acc def train(self, train_loader, val_loader, criterion, optimizer, scheduler=None, num_epochs=10, early_stopping_patience=5): """完整训练过程""" print(f"开始训练,共 {num_epochs} 个epoch") print("使用混合精度训练:", self.use_amp) best_val_acc = 0.0 patience_counter = 0 for epoch in range(num_epochs): print(f"\\nEpoch {epoch+1}/{num_epochs}") print("-" * 30) # 训练 train_loss, train_acc = self.train_epoch(train_loader, criterion, optimizer) # 验证 val_loss, val_acc = self.validate_epoch(val_loader, criterion) # 更新学习率 if scheduler: scheduler.step(val_loss) # 打印结果 print(f"训练: 损失={train_loss:.4f}, 准确率={train_acc:.2f}%") print(f"验证: 损失={val_loss:.4f}, 准确率={val_acc:.2f}%") # 早停检查 if val_acc > best_val_acc: best_val_acc = val_acc patience_counter = 0 # 保存最佳模型 torch.save({ 'epoch': epoch, 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'val_acc': val_acc, 'metrics': self.metrics }, 'best_model_advanced.pth') print(f"保存最佳模型,验证准确率: {val_acc:.2f}%") else: patience_counter += 1 if patience_counter >= early_stopping_patience: print(f"早停触发,在epoch {epoch+1}停止训练") break print(f"\\n训练完成!") print(f"最佳验证准确率: {best_val_acc:.2f}%") return self.metrics# 梯度累积训练def train_with_gradient_accumulation(model, train_loader, criterion, optimizer, device, accumulation_steps=4): """梯度累积训练""" model.train() optimizer.zero_grad() # 在开始时清零梯度 for batch_idx, (inputs, targets) in enumerate(train_loader): inputs, targets = inputs.to(device), targets.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) # 反向传播(累积梯度) loss = loss / accumulation_steps # 缩放损失 loss.backward() # 每accumulation_steps步更新一次参数 if (batch_idx + 1) % accumulation_steps == 0: optimizer.step() optimizer.zero_grad() print(f"更新参数,批次: {batch_idx+1}") # 处理剩余的梯度 if len(train_loader) % accumulation_steps != 0: optimizer.step() optimizer.zero_grad()print("高级训练技巧:")print("1. 自动混合精度训练: 加快训练速度,减少显存使用")print("2. 梯度累积: 模拟大batch size训练")print("3. 梯度裁剪: 防止梯度爆炸")print("4. 早停: 防止过拟合")print("5. 学习率调度: 动态调整学习率") ''' print(advanced_training_code) # 3. 模型评估与测试 print("\n3. 模型评估与测试:") evaluation_code = '''import torchimport numpy as npfrom sklearn.metrics import accuracy_score, precision_score, recall_score, f1_scorefrom sklearn.metrics import confusion_matrix, classification_reportimport matplotlib.pyplot as pltimport seaborn as snsclass ModelEvaluator: """模型评估器""" def __init__(self, model, device): self.model = model self.device = device def predict(self, dataloader): """批量预测""" self.model.eval() all_predictions = [] all_targets = [] all_probabilities = [] with torch.no_grad(): for inputs, targets in dataloader: inputs = inputs.to(self.device) # 前向传播 outputs = self.model(inputs) # 获取预测结果 probabilities = torch.softmax(outputs, dim=1) _, predictions = outputs.max(1) # 收集结果 all_predictions.extend(predictions.cpu().numpy()) all_targets.extend(targets.numpy()) all_probabilities.extend(probabilities.cpu().numpy()) return { 'predictions': np.array(all_predictions), 'targets': np.array(all_targets), 'probabilities': np.array(all_probabilities) } def compute_metrics(self, predictions_dict): """计算评估指标""" y_true = predictions_dict['targets'] y_pred = predictions_dict['predictions'] metrics = { 'accuracy': accuracy_score(y_true, y_pred), 'precision_macro': precision_score(y_true, y_pred, average='macro'), 'recall_macro': recall_score(y_true, y_pred, average='macro'), 'f1_macro': f1_score(y_true, y_pred, average='macro') } # 二分类特定指标 if len(np.unique(y_true)) == 2: metrics['precision_binary'] = precision_score(y_true, y_pred) metrics['recall_binary'] = recall_score(y_true, y_pred) metrics['f1_binary'] = f1_score(y_true, y_pred) return metrics def plot_confusion_matrix(self, predictions_dict, class_names=None): """绘制混淆矩阵""" y_true = predictions_dict['targets'] y_pred = predictions_dict['predictions'] cm = confusion_matrix(y_true, y_pred) plt.figure(figsize=(10, 8)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names) plt.xlabel('预测标签') plt.ylabel('真实标签') plt.title('混淆矩阵') plt.tight_layout() plt.show() return cm def print_classification_report(self, predictions_dict, class_names=None): """打印分类报告""" y_true = predictions_dict['targets'] y_pred = predictions_dict['predictions'] report = classification_report(y_true, y_pred, target_names=class_names) print("分类报告:") print(report) return report def plot_roc_curve(self, predictions_dict, num_classes): """绘制ROC曲线(多分类)""" from sklearn.metrics import roc_curve, auc from sklearn.preprocessing import label_binarize y_true = predictions_dict['targets'] y_prob = predictions_dict['probabilities'] # 二值化标签 y_true_bin = label_binarize(y_true, classes=range(num_classes)) # 计算每个类别的ROC曲线和AUC fpr = {} tpr = {} roc_auc = {} plt.figure(figsize=(10, 8)) for i in range(num_classes): fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_prob[:, i]) roc_auc[i] = auc(fpr[i], tpr[i]) plt.plot(fpr[i], tpr[i], lw=2, label=f'类别 {i} (AUC = {roc_auc[i]:.2f})') # 绘制对角线 plt.plot([0, 1], [0, 1], 'k--', lw=2) plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('假正率') plt.ylabel('真正率') plt.title('多分类ROC曲线') plt.legend(loc="lower right") plt.grid(alpha=0.3) plt.show() return fpr, tpr, roc_auc def plot_training_history(self, history): """绘制训练历史""" fig, axes = plt.subplots(1, 2, figsize=(12, 4)) # 损失曲线 axes[0].plot(history['train']['loss'], label='训练损失') axes[0].plot(history['val']['loss'], label='验证损失') axes[0].set_xlabel('Epoch') axes[0].set_ylabel('损失') axes[0].set_title('训练和验证损失') axes[0].legend() axes[0].grid(True, alpha=0.3) # 准确率曲线 axes[1].plot(history['train']['acc'], label='训练准确率') axes[1].plot(history['val']['acc'], label='验证准确率') axes[1].set_xlabel('Epoch') axes[1].set_ylabel('准确率 (%)') axes[1].set_title('训练和验证准确率') axes[1].legend() axes[1].grid(True, alpha=0.3) plt.tight_layout() plt.show()# 使用示例print("模型评估功能:")print("1. 批量预测")print("2. 计算多种评估指标")print("3. 绘制混淆矩阵")print("4. 打印分类报告")print("5. 绘制ROC曲线")print("6. 可视化训练历史") ''' print(evaluation_code) return None# 运行训练循环示例basic_training_loop()
六、PyTorch模型部署与生产
1. 模型保存与加载
def model_saving_loading(): """PyTorch模型保存与加载""" print("=" * 80) print("PyTorch模型保存与加载") print("=" * 80) # 1. 基础保存与加载 print("\n1. 基础保存与加载:") basic_saving_code = '''import torchimport torch.nn as nnimport torch.optim as optim# 创建示例模型class SimpleModel(nn.Module): def __init__(self): super().__init__() self.fc1 = nn.Linear(10, 20) self.fc2 = nn.Linear(20, 1) def forward(self, x): x = torch.relu(self.fc1(x)) x = self.fc2(x) return xmodel = SimpleModel()optimizer = optim.Adam(model.parameters(), lr=0.001)# 1. 保存整个模型(不推荐)print("方法1: 保存整个模型")torch.save(model, 'model_complete.pth')# 加载整个模型loaded_model = torch.load('model_complete.pth')print(f"加载完整模型: {type(loaded_model)}")# 2. 保存模型状态字典(推荐)print("\\n方法2: 保存模型状态字典")torch.save(model.state_dict(), 'model_state_dict.pth')# 创建新模型并加载状态字典new_model = SimpleModel()new_model.load_state_dict(torch.load('model_state_dict.pth'))print(f"加载状态字典到新模型: {type(new_model)}")# 3. 保存检查点(训练中保存)print("\\n方法3: 保存检查点")checkpoint = { 'epoch': 10, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'loss': 0.05, 'accuracy': 0.95}torch.save(checkpoint, 'checkpoint.pth')# 加载检查点loaded_checkpoint = torch.load('checkpoint.pth')model.load_state_dict(loaded_checkpoint['model_state_dict'])optimizer.load_state_dict(loaded_checkpoint['optimizer_state_dict'])epoch = loaded_checkpoint['epoch']print(f"加载检查点: epoch={epoch}, loss={loaded_checkpoint['loss']}, accuracy={loaded_checkpoint['accuracy']}")# 4. 保存多个模型print("\\n方法4: 保存多个模型")ensemble_models = { 'model1': model.state_dict(), 'model2': SimpleModel().state_dict(), 'metadata': { 'created_date': '2024-01-01', 'version': '1.0' }}torch.save(ensemble_models, 'ensemble_models.pth') ''' print(basic_saving_code) # 2. 跨设备保存与加载 print("\n2. 跨设备保存与加载:") cross_device_code = '''# 跨设备保存与加载print("跨设备模型处理:")# 创建GPU模型(如果有GPU)device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')model_gpu = SimpleModel().to(device)# 保存GPU模型torch.save(model_gpu.state_dict(), 'model_gpu.pth')# 加载到CPUmodel_cpu = SimpleModel()state_dict = torch.load('model_gpu.pth', map_location=torch.device('cpu'))model_cpu.load_state_dict(state_dict)print(f"GPU模型加载到CPU: 成功")# 加载到GPUif torch.cuda.is_available(): model_gpu2 = SimpleModel().to(device) state_dict = torch.load('model_gpu.pth', map_location=device) model_gpu2.load_state_dict(state_dict) print(f"GPU模型加载到GPU: 成功")# 处理不同GPU编号print("\\n处理不同GPU编号:")# 保存时指定map_locationdef load_model_flexible(model_path, device): """灵活加载模型到指定设备""" # 尝试加载到指定设备 try: state_dict = torch.load(model_path, map_location=device) except: # 如果失败,加载到CPU然后移动到设备 state_dict = torch.load(model_path, map_location='cpu') model = SimpleModel() model.load_state_dict(state_dict) model = model.to(device) return modelprint("灵活加载模型函数已创建") ''' print(cross_device_code) # 3. TorchScript和模型序列化 print("\n3. TorchScript和模型序列化:") torchscript_code = '''# TorchScript: 将PyTorch模型转换为可序列化的格式print("TorchScript模型序列化:")# 创建示例模型model = SimpleModel()model.eval() # 转换为推理模式# 方法1: TorchScript Tracing(跟踪)print("方法1: Tracing")example_input = torch.randn(1, 10)traced_model = torch.jit.trace(model, example_input)# 保存TorchScript模型traced_model.save('traced_model.pt')print(f"Tracing模型已保存: traced_model.pt")# 加载TorchScript模型loaded_traced = torch.jit.load('traced_model.pt')output = loaded_traced(example_input)print(f"加载的Tracing模型输出: {output.shape}")# 方法2: TorchScript Scripting(脚本)print("\\n方法2: Scripting")class SimpleModelWithControlFlow(nn.Module): def __init__(self): super().__init__() self.fc1 = nn.Linear(10, 20) self.fc2 = nn.Linear(20, 1) self.threshold = 0.5 def forward(self, x): x = torch.relu(self.fc1(x)) # 控制流(Tracing无法处理) if x.mean() > self.threshold: x = self.fc2(x) else: x = self.fc2(x) * 0.5 return xmodel_with_control = SimpleModelWithControlFlow()model_with_control.eval()# 使用Scripting处理控制流scripted_model = torch.jit.script(model_with_control)scripted_model.save('scripted_model.pt')print(f"Scripting模型已保存: scripted_model.pt")# 方法3: TorchScript优化print("\\n方法3: TorchScript优化")# 融合操作(需要模型在GPU上)if torch.cuda.is_available(): model_cuda = SimpleModel().cuda() model_cuda.eval() # 转换为TorchScript traced_cuda = torch.jit.trace(model_cuda, torch.randn(1, 10).cuda()) # 应用优化 torch.jit.freeze(traced_cuda) # 冻结模型 traced_cuda = torch.jit.optimize_for_inference(traced_cuda) # 推理优化 traced_cuda.save('optimized_model.pt') print(f"优化模型已保存: optimized_model.pt")# TorchScript的优点print("\\nTorchScript的优点:")print("1. 模型可序列化,不依赖Python代码")print("2. 可以在C++中加载和运行")print("3. 可以进行模型优化")print("4. 支持移动端部署") ''' print(torchscript_code) # 4. 模型部署选项 print("\n4. 模型部署选项:") deployment_code = '''# PyTorch模型部署选项print("PyTorch模型部署选项:")deployment_options = { 'TorchServe': { '描述': 'PyTorch官方模型服务框架', '特点': ['REST API', '模型版本管理', '自动批处理', '模型监控'], '使用场景': '生产环境模型服务', '安装': 'pip install torchserve torch-model-archiver' }, 'ONNX Runtime': { '描述': '跨框架推理引擎', '特点': ['高性能推理', '多硬件支持', '跨平台', '量化支持'], '使用场景': '跨框架部署,性能关键应用', '转换': 'torch.onnx.export()' }, 'TensorRT': { '描述': 'NVIDIA高性能推理引擎', '特点': ['极致性能', '低延迟', 'INT8/FP16量化', '动态形状'], '使用场景': 'NVIDIA GPU上的高性能推理', '要求': 'NVIDIA GPU, TensorRT SDK' }, 'Torch Mobile': { '描述': '移动端部署', '特点': ['iOS/Android支持', '模型优化', '离线推理'], '使用场景': '移动应用', '工具': 'PyTorch Mobile, LibTorch' }, 'FastAPI + PyTorch': { '描述': '自定义API服务', '特点': ['灵活控制', '易于定制', 'Python生态'], '使用场景': '快速原型,定制需求', '框架': 'FastAPI, Flask' }}print("部署框架对比:")for name, info in deployment_options.items(): print(f"\\n{name}:") print(f" 描述: {info['描述']}") print(f" 特点: {', '.join(info['特点'])}") print(f" 使用场景: {info['使用场景']}")# ONNX导出示例print("\\nONNX导出示例:")def export_to_onnx(model, input_shape, onnx_path='model.onnx'): """导出模型为ONNX格式""" # 设置模型为评估模式 model.eval() # 创建示例输入 dummy_input = torch.randn(*input_shape) # 导出ONNX torch.onnx.export( model, # 要导出的模型 dummy_input, # 模型输入 onnx_path, # 输出文件路径 export_params=True, # 导出参数 opset_version=13, # ONNX opset版本 do_constant_folding=True, # 常量折叠优化 input_names=['input'], # 输入名称 output_names=['output'], # 输出名称 dynamic_axes={ # 动态轴(批处理维度) 'input': {0: 'batch_size'}, 'output': {0: 'batch_size'} } ) print(f"模型已导出为: {onnx_path}") return onnx_path# TorchServe部署示例print("\\nTorchServe部署步骤:")torchserve_steps = '''# 1. 创建模型存档torch-model-archiver --model-name mymodel \ --version 1.0 \ --model-file model.py \ --serialized-file model.pth \ --handler image_classifier.py \ --extra-files index_to_name.json# 2. 启动TorchServetorchserve --start --model-store model_store \ --models mymodel=mymodel.mar \ --ncs# 3. 调用APIcurl http://localhost:8080/predictions/mymodel -T test_image.jpg'''print(torchserve_steps)print("\\n部署建议:")print("1. 原型开发: FastAPI + PyTorch")print("2. 生产服务: TorchServe")print("3. 跨平台: ONNX Runtime")print("4. 极致性能: TensorRT")print("5. 移动端: PyTorch Mobile") ''' print(deployment_code) return None# 运行模型保存与部署示例model_saving_loading()
PyTorch以其动态计算图和Pythonic的设计哲学,成为了深度学习研究和开发的首选框架之一。 通过今天的学习,你已经掌握了PyTorch的核心概念和基本用法。记住:PyTorch的强大之处在于其灵活性和直观性,这使得它特别适合研究和快速原型开发。