10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在公众号、今日头条持续发布最新文章,助你少走弯路。
前言
在微服务架构中,服务间的通信效率直接影响着整个系统的性能。想象一下,如果你的电商订单系统每秒需要调用上千次库存服务,那种传统的 JSON 文本传输会带来多大的性能损耗?这正是 gRPC 要解决的核心问题。
gRPC 由 Google 开发,是一个高性能、开源的远程过程调用框架。与传统的 REST API 不同,gRPC 基于 HTTP/2 协议,使用 Protocol Buffers 作为序列化格式,将数据体积压缩 3-10 倍,同时支持双向流通信、连接多路复用等高级特性-42。Netflix、Square、Google 等顶级公司已大规模用 gRPC 替代 REST,用于内部微服务通信-42。
本文将带你从零开始掌握 Python 中的 gRPC 开发技术,从环境搭建到生产级实践,每一个示例都可以直接运行,适合新手入门和进阶开发者参考。
首先安装核心依赖:
# 安装 gRPC 核心库和编译工具
pip install grpcio grpcio-tools protobuf安装完成后,验证版本:
python -c "import grpc; print(f'gRPC 版本:{grpc.__version__}')"控制台输出:
新手常见坑:import grpc 时不能写成 import grpcio。Python 中没有叫 grpc 的官方包,grpcio 安装后暴露的顶层模块名是 grpc,这个细节经常让新手困惑-。
gRPC 的核心思想是契约优先——先定义好服务接口,再由框架自动生成代码。接口定义使用 Protocol Buffers 语法。
// hello.proto
syntax = "proto3";
package hello;
// 定义问候服务
service Greeter {
// 一元 RPC:客户端发送一个请求,服务端返回一个响应
rpc SayHello(HelloRequest)returns(HelloReply);
}
// 请求消息
message HelloRequest {
string name = 1;
}
// 响应消息
message HelloReply {
string message = 1;
}语法要点解析:
使用 protoc 编译器将 .proto 文件编译为 Python 代码:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello.proto控制台输出(成功时无报错):
这个命令会在当前目录下生成两个文件-7:
组件分工表:
服务端的核心任务是实现 .proto 中定义的服务接口并启动 gRPC 服务器-7。
# server.py
import grpc
from concurrent import futures
import hello_pb2
import hello_pb2_grpc
classGreeterServicer(hello_pb2_grpc.GreeterServicer):
"""实现 .proto 中定义的 Greeter 服务"""
defSayHello(self, request, context):
# request.name 来自客户端发送的 HelloRequest
print(f"[服务端] 收到请求,name={request.name}")
return hello_pb2.HelloReply(
message=f"你好,{request.name}!这里是 gRPC 服务端 "
)
defserve():
# 创建 gRPC 服务器,最大工作线程数 10
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# 将服务实现注册到服务器
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
# 监听端口(insecure 表示不使用 TLS,仅供开发测试)
server.add_insecure_port('[::]:50051')
print("[服务端] gRPC 服务已启动,监听端口 50051...")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()客户端的任务是通过 Stub(桩)调用远程服务方法,就像调用本地函数一样自然-7。
# client.py
import grpc
import hello_pb2
import hello_pb2_grpc
defrun():
# 建立与服务器的连接(insecure 开发模式)
with grpc.insecure_channel('localhost:50051') as channel:
# 创建客户端桩(Stub)
stub = hello_pb2_grpc.GreeterStub(channel)
# 构造请求并调用远程方法
response = stub.SayHello(hello_pb2.HelloRequest(name="小明"))
print(f"[客户端] 收到响应: {response.message}")
if __name__ == '__main__':
run()打开两个终端,分别运行服务端和客户端。
终端 1 — 服务端:
终端 1 输出:
[服务端] gRPC 服务已启动,监听端口 50051...
[服务端] 收到请求,name=小明终端 2 — 客户端:
终端 2 输出:
[客户端] 收到响应: 你好,小明!这里是 gRPC 服务端 恭喜!你已经成功构建了第一个 gRPC 服务 。
gRPC 基于 HTTP/2,支持四种通信模式-16。下面逐一实战演示。
// streaming.proto
syntax = "proto3";
package streaming;
service DataService {
// 1. 一元 RPC:一问一答(上面已讲)
rpc UnaryCall(DataRequest)returns(DataResponse);
// 2. 服务端流式:客户端发一次,服务端持续推送
rpc ServerStreaming(DataRequest)returns(stream DataResponse);
// 3. 客户端流式:客户端持续发送,服务端汇总一次
rpc ClientStreaming(stream DataRequest)returns(DataResponse);
// 4. 双向流式:双方都可随时发送
rpc BidirectionalStreaming(stream DataRequest)returns(stream DataResponse);
}
message DataRequest {
string message = 1;
int32 sequence = 2; // 序号,用于追踪数据流顺序
}
message DataResponse {
string message = 1;
int32 sequence = 2;
}语法要点:stream 关键字是流式模式的核心声明。出现在返回类型前表示服务端流式,出现在参数类型前表示客户端流式,两边都有则是双向流式-19。
首先编译生成代码:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. streaming.proto适用场景:股票实时行情推送、日志监控、大文件下载。服务端逐个发送数据块,客户端逐个读取,内存占用极低-19。
# server_streaming_server.py
import grpc
import time
from concurrent import futures
import streaming_pb2
import streaming_pb2_grpc
classDataServiceServicer(streaming_pb2_grpc.DataServiceServicer):
defServerStreaming(self, request, context):
"""服务端流式:收到一个请求后,持续推送多条数据"""
print(f"[服务端] 收到流式请求: {request.message},开始推送数据...")
for i in range(5):
response = streaming_pb2.DataResponse(
message=f"服务端推送第 {i+1} 条数据",
sequence=i + 1
)
print(f"[服务端] 推送 #{i+1}")
yield response
time.sleep(0.5) # 模拟数据生成延迟
print("[服务端] 推送完成")
defserve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_DataServiceServicer_to_server(DataServiceServicer(), server)
server.add_insecure_port('[::]:50052')
server.start()
print("[服务端流式] 服务已启动,监听端口 50052...")
server.wait_for_termination()
if __name__ == '__main__':
serve()# server_streaming_client.py
import grpc
import streaming_pb2
import streaming_pb2_grpc
defrun():
with grpc.insecure_channel('localhost:50052') as channel:
stub = streaming_pb2_grpc.DataServiceStub(channel)
request = streaming_pb2.DataRequest(message="请推送数据")
print("[客户端] 发送请求,等待服务端推送...")
# responses 是一个迭代器,逐个接收服务端推送的数据
responses = stub.ServerStreaming(request)
for resp in responses:
print(f"[客户端] 收到 #{resp.sequence}: {resp.message}")
if __name__ == '__main__':
run()控制台输出 — 服务端:
[服务端流式] 服务已启动,监听端口 50052...
[服务端] 收到流式请求: 请推送数据,开始推送数据...
[服务端] 推送 #1
[服务端] 推送 #2
[服务端] 推送 #3
[服务端] 推送 #4
[服务端] 推送 #5
[服务端] 推送完成控制台输出 — 客户端:
[客户端] 发送请求,等待服务端推送...
[客户端] 收到 #1: 服务端推送第 1 条数据
[客户端] 收到 #2: 服务端推送第 2 条数据
[客户端] 收到 #3: 服务端推送第 3 条数据
[客户端] 收到 #4: 服务端推送第 4 条数据
[客户端] 收到 #5: 服务端推送第 5 条数据核心要点:服务端使用 yield 逐个生成响应,客户端通过 for-in 循环逐条接收,无需等待全部数据到达。
适用场景:物联网设备批量上报传感器数据、日志采集、大文件上传-19。
# client_streaming_server.py
import grpc
from concurrent import futures
import streaming_pb2
import streaming_pb2_grpc
classDataServiceServicer(streaming_pb2_grpc.DataServiceServicer):
defClientStreaming(self, request_iterator, context):
"""客户端流式:接收客户端持续发送的多条数据,汇总后返回一条结果"""
total = 0
messages = []
for request in request_iterator:
total += 1
messages.append(request.message)
print(f"[服务端] 收到 #{request.sequence}: {request.message}")
# 汇总返回
result = streaming_pb2.DataResponse(
message=f"共收到 {total} 条数据: {'; '.join(messages)}",
sequence=total
)
print(f"[服务端] 汇总完成,返回结果")
return result
defserve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_DataServiceServicer_to_server(DataServiceServicer(), server)
server.add_insecure_port('[::]:50053')
server.start()
print("[客户端流式] 服务已启动,监听端口 50053...")
server.wait_for_termination()
if __name__ == '__main__':
serve()# client_streaming_client.py
import grpc
import time
import streaming_pb2
import streaming_pb2_grpc
defgenerate_requests():
"""生成器函数:逐个产生要发送的请求"""
data_list = ["传感器-温度:25.3°C", "传感器-湿度:68%", "传感器-气压:1013hPa"]
for i, data in enumerate(data_list):
yield streaming_pb2.DataRequest(message=data, sequence=i + 1)
time.sleep(0.3) # 模拟采集间隔
print(f" [生成器] 已产生第 {i+1} 条数据")
defrun():
with grpc.insecure_channel('localhost:50053') as channel:
stub = streaming_pb2_grpc.DataServiceStub(channel)
response = stub.ClientStreaming(generate_requests())
print(f"\n[客户端] 收到汇总响应: {response.message}")
if __name__ == '__main__':
run()控制台输出 — 服务端:
[客户端流式] 服务已启动,监听端口 50053...
[服务端] 收到 #1: 传感器-温度:25.3°C
[服务端] 收到 #2: 传感器-湿度:68%
[服务端] 收到 #3: 传感器-气压:1013hPa
[服务端] 汇总完成,返回结果控制台输出 — 客户端:
[生成器] 已产生第 1 条数据
[生成器] 已产生第 2 条数据
[生成器] 已产生第 3 条数据
[客户端] 收到汇总响应: 共收到 3 条数据: 传感器-温度:25.3°C; 传感器-湿度:68%; 传感器-气压:1013hPa核心要点:客户端用 yield 生成器逐个发送请求,服务端通过迭代 request_iterator 逐个接收,汇总后一次性返回结果。
适用场景:实时聊天、语音识别、在线游戏同步——双方可以随时、独立地发送数据-19。
# bidirectional_server.py
import grpc
import time
from concurrent import futures
import streaming_pb2
import streaming_pb2_grpc
classDataServiceServicer(streaming_pb2_grpc.DataServiceServicer):
defBidirectionalStreaming(self, request_iterator, context):
"""双向流式:一边接收客户端消息,一边主动推送响应"""
print("[服务端] 双向流通信已建立,等待客户端消息...")
for request in request_iterator:
print(f"[服务端] 收到 #{request.sequence}: {request.message}")
# 收到一条消息后,立即回复两条
yield streaming_pb2.DataResponse(
message=f"回声: {request.message}",
sequence=request.sequence * 100 + 1
)
yield streaming_pb2.DataResponse(
message=f"确认收到第 {request.sequence} 条消息",
sequence=request.sequence * 100 + 2
)
defserve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_DataServiceServicer_to_server(DataServiceServicer(), server)
server.add_insecure_port('[::]:50054')
server.start()
print("[双向流式] 服务已启动,监听端口 50054...")
server.wait_for_termination()
if __name__ == '__main__':
serve()# bidirectional_client.py
import grpc
import time
import threading
import streaming_pb2
import streaming_pb2_grpc
defrun():
with grpc.insecure_channel('localhost:50054') as channel:
stub = streaming_pb2_grpc.DataServiceStub(channel)
defgenerate_messages():
"""在另一个线程中发送消息"""
messages = ["你好服务端!", "这是第二条消息", "再见!"]
for i, msg in enumerate(messages):
yield streaming_pb2.DataRequest(message=msg, sequence=i + 1)
time.sleep(1)
print(f" [发送线程] 已发送第 {i+1} 条: {msg}")
# 调用双向流 RPC
responses = stub.BidirectionalStreaming(generate_messages())
print("[客户端] 开始接收服务端响应...")
for resp in responses:
print(f"[客户端] 收到 #{resp.sequence}: {resp.message}")
if __name__ == '__main__':
run()控制台输出 — 服务端:
[双向流式] 服务已启动,监听端口 50054...
[服务端] 双向流通信已建立,等待客户端消息...
[服务端] 收到 #1: 你好服务端!
[服务端] 收到 #2: 这是第二条消息
[服务端] 收到 #3: 再见!控制台输出 — 客户端:
[客户端] 开始接收服务端响应...
[客户端] 收到 #101: 回声: 你好服务端!
[客户端] 收到 #102: 确认收到第 1 条消息
[发送线程] 已发送第 1 条: 你好服务端!
[客户端] 收到 #201: 回声: 这是第二条消息
[客户端] 收到 #202: 确认收到第 2 条消息
[发送线程] 已发送第 2 条: 这是第二条消息
[客户端] 收到 #301: 回声: 再见!
[客户端] 收到 #302: 确认收到第 3 条消息
[发送线程] 已发送第 3 条: 再见!核心要点:双向流式模式下,客户端和服务端可以同时发送和接收数据,实现了真正的全双工通信。收到一条消息后,可以回复多条(本例中回复了 2 条)。
前面已经掌握了 gRPC 的基本用法,但要构建生产级服务,还需要掌握以下几个关键技术。
拦截器是 gRPC 的"中间件"机制,可以在不修改业务代码的情况下,统一处理日志、认证、监控等横切关注点-。
# interceptor_server.py
import grpc
from concurrent import futures
import time
import hello_pb2
import hello_pb2_grpc
classLoggingInterceptor(grpc.ServerInterceptor):
"""自定义日志拦截器:记录每次 RPC 调用的耗时"""
defintercept_service(self, continuation, handler_call_details):
start_time = time.time()
# 调用实际的 RPC 处理方法
response = continuation(handler_call_details)
elapsed = (time.time() - start_time) * 1000
method = handler_call_details.method
print(f"[拦截器] {method} 调用完成,耗时 {elapsed:.2f}ms")
return response
classGreeterServicer(hello_pb2_grpc.GreeterServicer):
defSayHello(self, request, context):
return hello_pb2.HelloReply(
message=f"你好,{request.name}!"
)
defserve():
# 在创建服务器时传入拦截器
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingInterceptor()] # 注入拦截器
)
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50055')
server.start()
print("[拦截器示例] 服务已启动...")
server.wait_for_termination()
if __name__ == '__main__':
serve()客户端(使用之前的 client.py 修改端口为 50055):
# interceptor_client.py
import grpc
import hello_pb2
import hello_pb2_grpc
defrun():
with grpc.insecure_channel('localhost:50055') as channel:
stub = hello_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(hello_pb2.HelloRequest(name="小明"))
print(f"[客户端] 收到: {response.message}")
if __name__ == '__main__':
run()控制台输出 — 服务端:
[拦截器示例] 服务已启动...
[拦截器] /hello.Greeter/SayHello 调用完成,耗时 0.03msgRPC 提供了丰富的状态码体系,远比 HTTP 状态码更精确-58。不应一律返回 INTERNAL 错误,而应使用语义明确的状态码。
# error_handling_server.py
import grpc
from concurrent import futures
import hello_pb2
import hello_pb2_grpc
classGreeterServicer(hello_pb2_grpc.GreeterServicer):
defSayHello(self, request, context):
# 参数校验
ifnot request.name:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('name 参数不能为空')
return hello_pb2.HelloReply()
# 模拟权限检查
if request.name == "admin":
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
context.set_details('不允许使用 admin 身份')
return hello_pb2.HelloReply()
# 模拟资源不存在
if request.name == "notfound":
context.abort(grpc.StatusCode.NOT_FOUND, '该用户不存在')
return hello_pb2.HelloReply(
message=f"你好,{request.name}!调用成功"
)
defserve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50056')
server.start()
print("[错误处理示例] 服务已启动...")
server.wait_for_termination()
if __name__ == '__main__':
serve()# error_handling_client.py
import grpc
import hello_pb2
import hello_pb2_grpc
defcall_with_name(stub, name):
"""封装调用逻辑,统一处理 RPC 错误"""
try:
response = stub.SayHello(
hello_pb2.HelloRequest(name=name),
timeout=2.0# 设置 2 秒超时
)
print(f"[客户端] name='{name}' -> {response.message}")
except grpc.RpcError as e:
print(f"[客户端] name='{name}' -> "
f"状态码: {e.code()}, 详情: {e.details()}")
defrun():
with grpc.insecure_channel('localhost:50056') as channel:
stub = hello_pb2_grpc.GreeterStub(channel)
call_with_name(stub, "小明") # 正常
call_with_name(stub, "") # 参数为空
call_with_name(stub, "admin") # 权限不足
call_with_name(stub, "notfound") # 资源不存在
if __name__ == '__main__':
run()控制台输出 — 客户端:
[客户端] name='小明' -> 你好,小明!调用成功
[客户端] name='' -> 状态码: StatusCode.INVALID_ARGUMENT, 详情: name 参数不能为空
[客户端] name='admin' -> 状态码: StatusCode.PERMISSION_DENIED, 详情: 不允许使用 admin 身份
[客户端] name='notfound' -> 状态码: StatusCode.NOT_FOUND, 详情: 该用户不存在常用状态码速查:
生产环境中,长连接的管理直接影响资源利用率和故障恢复速度。
# keepalive_server.py
import grpc
from concurrent import futures
import hello_pb2
import hello_pb2_grpc
classGreeterServicer(hello_pb2_grpc.GreeterServicer):
defSayHello(self, request, context):
# 获取请求超时时间
deadline = context.time_remaining()
if deadline:
print(f"[服务端] 距离超时还有 {deadline:.2f} 秒")
return hello_pb2.HelloReply(message=f"你好,{request.name}!")
defserve():
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
# HTTP/2 Keep-Alive:定期发送 PING 帧保持连接
('grpc.keepalive_time_ms', 10000), # 每 10 秒发送 PING
('grpc.keepalive_timeout_ms', 5000), # PING 超时 5 秒则断开
('grpc.keepalive_permit_without_calls', True),
('grpc.http2.max_pings_without_data', 0),
]
)
hello_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50057')
server.start()
print("[Keep-Alive 示例] 服务已启动...")
server.wait_for_termination()
if __name__ == '__main__':
serve()# keepalive_client.py
import grpc
import hello_pb2
import hello_pb2_grpc
defrun():
# 配置客户端侧 HTTP/2 Keep-Alive
channel = grpc.insecure_channel(
'localhost:50057',
options=[
('grpc.keepalive_time_ms', 10000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.http2.min_time_between_pings_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
]
)
stub = hello_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(
hello_pb2.HelloRequest(name="小明"),
timeout=1.0# 单次调用超时 1 秒
)
print(f"[客户端] 收到: {response.message}")
channel.close()
if __name__ == '__main__':
run()控制台输出 — 服务端:
[Keep-Alive 示例] 服务已启动...
[服务端] 距离超时还有 0.99 秒这个话题是新手最容易纠结的。核心区分点在于使用场景-41-42:
选择 gRPC 的场景:
选择 REST 的场景:
错误原因:把 grpc 和 grpcio 搞混了。Python 里没有叫 grpc 的官方包,你需要 pip install grpcio,但 import 时写的是 import grpc——grpc 是 grpcio 安装后暴露的顶层模块名-。
解决方案:
pip install grpcio # 正确安装import grpc # 正确导入——grpc 是 grpcio 安装后暴露的顶层模块名# 检查端口是否被占用
lsof-i :50051
kill-9 <PID>每次修改 .proto 文件后,记得重新编译:
# 先删除旧的生成文件
rm*_pb2.py *_pb2_grpc.py
# 再重新编译
python-m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. hello.proto生产环境务必使用 TLS 加密通信,替换 add_insecure_port 为:
# 读取证书
withopen('server.key', 'rb') as f:
private_key = f.read()
withopen('server.crt', 'rb') as f:
certificate_chain = f.read()
# 创建 SSL 凭证
server_credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
# 使用安全端口
server.add_secure_port('[::]:50051', server_credentials)gRPC 的学习曲线比 REST 稍高,但它在性能上的回报是巨大的。掌握 gRPC,你将能够构建出真正高性能的分布式系统。如果你在实践过程中遇到任何问题,欢迎在评论区留言交流!
还可以去公众号、今日头条搜索「IT策士」,一起升级 IT 思维 !