这是我的第447篇原创文章。
『数据杂坛』以Python语言为核心,垂直于数据科学领域,专注于(可戳👉)Python程序开发|数据采集|数据分析|数据可视化|特征工程|机器学习|时序数据|深度学习|人工智能等技术栈交流学习,涵盖数据挖掘、计算机视觉、自然语言处理等应用领域。(文末有惊喜福利)

一、引言
我们创建一个加法模型,它可以同时处理多个推理请求,每个请求包含两个数字。Triton 会将多个客户端请求合并成一个 batch。
吞吐量,是指在一次性能测试过程中网络上传输的数据量的总和。(承压能力)
系统的吞吐量与request对CPU的消耗、外部接口、IO等等紧密关联。单个request 对CPU消耗越高,外部系统接口、IO速度越慢,系统吞吐能力越低,反之越高。系统吞吐量几个重要参数:QPS(TPS)、并发数、响应时间。
吞吐率QPS(TPS):特指 Web 服务器单位时间内处理的请求数。(描述其并发处理能力)
响应时间(RT):执行一个请求从开始到最后收到响应数据所花费的总体时间,即从客户端发起请求到收到服务器响应结果的时间。响应时间RT(Response-time),是一个系统最重要的指标之一,它的数值大小直接反应了系统的快慢。
并发数:是指系统同时能处理的请求数量,这个也是反应了系统的负载能力。
理解了上面三个要素的意义之后,就能推算出它们之间的关系:
QPS=并发数/响应时间
| 5秒 | 20 × 5 = 100 | 精确20 | ||
| 0.2秒 | 20 × 0.2 = 4 | 精确20 | ||
| 0.05秒 | 20 × 0.05 = 1 | 精确20 |
单从定义来看,吞吐率描述了服务器在实际运行期间单位时间内处理的请求数,然而,我们更加关心的是服务器并发处理能力的上限,也就是单位时间内服务器能够处理的最大请求数,即最大吞吐率。
所以我们普遍使用 “压力测试” 的方法,通过模拟足够多数目的并发用户,分别持续发送一定的 HTTP 请求,并统计测试持续的总时间,计算出基于这种 “压力” 下的吞吐率,即为一个平均计算值。
二、实现过程
config.pbtxt代码:
name: "batch_add_model"backend: "python"max_batch_size: 4 # 最多合并4个请求input [{name: "INPUT_A"data_type: TYPE_FP32dims: [1] # 每个请求包含1个数字},{name: "INPUT_B"data_type: TYPE_FP32dims: [1]}]output [{name: "OUTPUT_SUM"data_type: TYPE_FP32dims: [1]}]dynamic_batching {preferred_batch_size: [2, 4] # 优先组成2或4的batchmax_queue_delay_microseconds: 500}
model.py代码:
import triton_python_backend_utils as pb_utilsimport numpy as npclass TritonPythonModel:def initialize(self, args):print("模型初始化完成")def execute(self, requests):print(f"\n=== 收到新的 Batch,包含 {len(requests)} 个请求 ===")responses = []# 遍历每个请求for i, request in enumerate(requests):print(f"\n--- 处理 Batch 中的第 {i+1} 个请求 ---")# 获取输入数据input_a = pb_utils.get_input_tensor_by_name(request, "INPUT_A")input_b = pb_utils.get_input_tensor_by_name(request, "INPUT_B")# 转换为 numpy 并查看具体内容a_values = input_a.as_numpy()b_values = input_b.as_numpy()print(f"INPUT_A 值: {a_values} (形状: {a_values.shape})")print(f"INPUT_B 值: {b_values} (形状: {b_values.shape})")# 执行计算sum_result = a_values + b_valuesprint(f"计算结果: {sum_result}")# 创建输出张量output_tensor = pb_utils.Tensor("OUTPUT_SUM", sum_result.astype(np.float32))# 创建响应response = pb_utils.InferenceResponse(output_tensors=[output_tensor])responses.append(response)print(f"\n=== 返回 {len(responses)} 个响应 ===\n")return responsesdef finalize(self):print("模型卸载")
client.py代码:
from tritonclient.http import InferenceServerClient, InferInputimport numpy as npfrom concurrent.futures import ThreadPoolExecutorimport timedef send_request(client, a, b, request_id):"""发送单个推理请求"""# 准备输入数据input_a = InferInput("INPUT_A", [1], "FP32")input_b = InferInput("INPUT_B", [1], "FP32")input_a.set_data_from_numpy(np.array([a], dtype=np.float32))input_b.set_data_from_numpy(np.array([b], dtype=np.float32))print(f"发送请求 {request_id}: {a} + {b}")# 发送请求response = client.infer(model_name="batch_add_model",inputs=[input_a, input_b],request_id=str(request_id))# 获取结果result = response.as_numpy("OUTPUT_SUM")print(f"收到响应 {request_id}: {a} + {b} = {result[0]}")return resultdef main():# 连接到 Triton 服务器client = InferenceServerClient(url="localhost:8000")# 测试数据:3组数字test_data = [(10.5, 20.3), # 请求1(15.0, 25.0), # 请求2(30.0, 40.0) # 请求3]print("=== 场景 1:串行发送(间隔大)===")for i, (a, b) in enumerate(test_data):send_request(client, a, b, f"sync_{i}")time.sleep(0.01) # 等待10ms,让Triton有时间处理print("\n" + "="*50 + "\n")print("=== 场景 2:并发发送(会触发批处理)===")# 使用线程池同时发送3个请求with ThreadPoolExecutor(max_workers=3) as executor:futures = []for i, (a, b) in enumerate(test_data):future = executor.submit(send_request, client, a, b, f"async_{i}")futures.append(future)# 等待所有请求完成for future in futures:future.result()if __name__ == "__main__":main()
代码:
# 模型目录结构model_repository/└── batch_add_model/├── config.pbtxt└── 1/└── model.py# 启动服务器docker run --rm -p 8000:8000 -v $(pwd)/model_repository:/models \nvcr.io/nvidia/tritonserver:25.11-py3 tritonserver --model-repository=/models
结果:
=== 场景 1:串行发送(间隔大)===发送请求 sync_0: 10.5 + 20.3收到响应 sync_0: 10.5 + 20.3 = 30.8发送请求 sync_1: 15.0 + 25.0收到响应 sync_1: 15.0 + 25.0 = 40.0发送请求 sync_2: 30.0 + 40.0收到响应 sync_2: 30.0 + 40.0 = 70.0
结果:
=== 收到新的 Batch,包含 1 个请求 ===--- 处理 Batch 中的第 1 个请求 ---INPUT_A 值: [10.5] (形状: (1,))INPUT_B 值: [20.3] (形状: (1,))计算结果: [30.8]=== 返回 1 个响应 ====== 收到新的 Batch,包含 1 个请求 ===--- 处理 Batch 中的第 1 个请求 ---INPUT_A 值: [15.] (形状: (1,))INPUT_B 值: [25.] (形状: (1,))计算结果: [40.]=== 返回 1 个响应 ====== 收到新的 Batch,包含 1 个请求 ===--- 处理 Batch 中的第 1 个请求 ---INPUT_A 值: [30.] (形状: (1,))INPUT_B 值: [40.] (形状: (1,))计算结果: [70.]=== 返回 1 个响应 ===
注意:每个请求被单独处理,len(requests) 始终为 1
结果:
=== 场景 2:并发发送(会触发批处理)===发送请求 async_0: 10.5 + 20.3发送请求 async_1: 15.0 + 25.0发送请求 async_2: 30.0 + 40.0收到响应 async_0: 10.5 + 20.3 = 30.8收到响应 async_1: 15.0 + 25.0 = 40.0收到响应 async_2: 30.0 + 40.0 = 70.0
结果:
=== 收到新的 Batch,包含 3 个请求 ===--- 处理 Batch 中的第 1 个请求 ---INPUT_A 值: [10.5] (形状: (1,))INPUT_B 值: [20.3] (形状: (1,))计算结果: [30.8]--- 处理 Batch 中的第 2 个请求 ---INPUT_A 值: [15.] (形状: (1,))INPUT_B 值: [25.] (形状: (1,))计算结果: [40.]--- 处理 Batch 中的第 3 个请求 ---INPUT_A 值: [30.] (形状: (1,))INPUT_B 值: [40.] (形状: (1,))计算结果: [70.]=== 返回 3 个响应 ===
关键观察:
len(requests) 变为 3,代表 3 个客户端请求被合并
但每个请求的数据仍然是独立的 [1] 形状
Triton 自动完成了请求的聚合与响应的拆分
| 串行发送 | |||
| 并发发送 | 高 2.5x |
这个例子清晰地展示了 Triton 动态批处理的价值:减少调用开销,提升 GPU 利用率。
技术层面
| 网络通信 | |||
| Python 调用 | execute() | execute() | |
| GPU 启动 | |||
| 日志打印 |
这些固定开销的减少,直接转化为吞吐量提升。
作者简介:
读研期间发表6篇SCI数据算法相关论文,目前在某研究院从事数据算法相关研究工作,结合自身科研实践经历不定期持续分享关于Python、数据分析、特征工程、机器学习、深度学习、人工智能系列基础知识与案例。
致力于只做原创,以最简单的方式理解和学习,关注我一起交流成长。
1、关注下方公众号,点击“领资料”即可免费领取电子资料书籍。
2、文章底部点击喜欢作者即可联系作者获取相关数据集和源码。
3、数据算法方向论文指导或就业指导,点击“联系我”添加作者微信直接交流。
4、有商务合作相关意向,点击“联系我”添加作者微信直接交流。

