1. RPC简介
什么是RPC?
RPC(Remote Procedure Call,远程过程调用)是一种允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数的协议,而不需要程序员显式编码这个远程调用的细节。
RPC的核心思想
透明性:远程调用看起来像本地调用
抽象性:隐藏网络通信的复杂性
服务化:将功能封装为可远程调用的服务
RPC与REST API的区别
2. RPC架构与工作原理
RPC架构组件
客户端应用 → 客户端存根 → 网络传输 → 服务器存根 → 服务器应用 ↑ ↓ ↓ ↓ ↓ 调用 序列化 网络通信 反序列化 执行 函数 参数 发送/接收 参数 函数
RPC调用流程
客户端调用:客户端调用本地存根方法
参数编组:客户端存根将参数序列化为消息
网络传输:通过网络发送到服务器
服务器接收:服务器存根接收并反序列化消息
方法调用:调用实际的服务器方法
结果返回:将结果序列化并返回给客户端
3. RPC通信模式
同步RPC
客户端等待服务器响应后再继续执行
# 伪代码示例result = remote_service.add(1, 2) # 阻塞直到返回print(f"Result: {result}")
异步RPC
客户端不等待响应,通过回调或Future处理结果
# 伪代码示例future = remote_service.add_async(1, 2)# 继续其他工作...result = future.result() # 需要时获取结果
流式RPC
支持持续的数据流传输:
客户端流:客户端发送多个请求,服务器返回一个响应
服务器流:客户端发送一个请求,服务器返回多个响应
双向流:双方都可以发送多个消息
4. Python RPC框架比较
常用Python RPC框架
选择建议
简单项目:XML-RPC或JSON-RPC
性能关键:gRPC或Thrift
纯Python环境:Pyro4
实时通信:ZeroRPC
5. XML-RPC实现
使用Python标准库实现
服务端
# server.py - XML-RPC服务器from xmlrpc.server import SimpleXMLRPCServerfrom xmlrpc.server import SimpleXMLRPCRequestHandlerimport datetime# 限制路径为 /RPC2class RequestHandler(SimpleXMLRPCRequestHandler): rpc_paths = ('/RPC2',)class MathService: """数学服务""" def add(self, a, b): return a + b def multiply(self, a, b): return a * b def divide(self, a, b): if b == 0: raise ValueError("除数不能为零") return a / bclass TimeService: """时间服务""" def current_time(self): return datetime.datetime.now().isoformat() def server_info(self): return { "name": "XML-RPC Server", "version": "1.0", "started_at": datetime.datetime.now().isoformat() }def main(): # 创建服务器 server = SimpleXMLRPCServer( ('localhost', 8000), requestHandler=RequestHandler, allow_none=True ) server.register_introspection_functions() # 启用内省 # 注册服务实例 math_service = MathService() time_service = TimeService() # 注册方法 server.register_instance(math_service) server.register_instance(time_service) # 注册单个函数 def greet(name): return f"Hello, {name}!" server.register_function(greet, 'greet') # 注册多调用函数(接收多个参数) server.register_multicall_functions() print("XML-RPC服务器运行在 http://localhost:8000") print("可用方法:") for method in server.system_listMethods(): print(f" - {method}") # 启动服务器 server.serve_forever()if __name__ == '__main__': main()
客户端
# client.py - XML-RPC客户端import xmlrpc.clientimport pprintclass XMLRPCClient: def __init__(self, server_url="http://localhost:8000"): self.server = xmlrpc.client.ServerProxy(server_url) self.pp = pprint.PrettyPrinter(indent=2) def test_basic_operations(self): """测试基本操作""" print("=== 测试基本操作 ===") # 调用greet函数 result = self.server.greet("Alice") print(f"greet('Alice'): {result}") # 数学运算 print(f"add(5, 3): {self.server.add(5, 3)}") print(f"multiply(5, 3): {self.server.multiply(5, 3)}") try: print(f"divide(10, 2): {self.server.divide(10, 2)}") print(f"divide(10, 0): {self.server.divide(10, 0)}") except Exception as e: print(f"错误: {e}") def test_time_service(self): """测试时间服务""" print("\n=== 测试时间服务 ===") current_time = self.server.current_time() server_info = self.server.server_info() print(f"当前时间: {current_time}") print("服务器信息:") self.pp.pprint(server_info) def test_introspection(self): """测试内省功能""" print("\n=== 测试内省 ===") # 列出所有方法 methods = self.server.system.listMethods() print("可用方法:") for method in methods: print(f" - {method}") # 获取方法签名 print("\n方法签名:") for method in methods: try: signature = self.server.system.methodSignature(method) help_text = self.server.system.methodHelp(method) print(f"{method}:") print(f" 签名: {signature}") print(f" 帮助: {help_text[:50]}..." if help_text else " 帮助: 无") except: pass def test_multicall(self): """测试多调用(批量操作)""" print("\n=== 测试多调用 ===") multicall = xmlrpc.client.MultiCall(self.server) multicall.add(1, 2) multicall.multiply(3, 4) multicall.greet("Bob") multicall.current_time() results = list(multicall()) print("批量调用结果:") for i, result in enumerate(results): print(f" 结果{i+1}: {result}") def test_fault_handling(self): """测试错误处理""" print("\n=== 测试错误处理 ===") # 测试错误响应 try: # 调用不存在的方法 self.server.nonexistent_method() except xmlrpc.client.Fault as fault: print(f"Fault错误: 代码={fault.faultCode}, 消息={fault.faultString}") except Exception as e: print(f"其他错误: {type(e).__name__}: {e}")def main(): client = XMLRPCClient() # 运行测试 client.test_basic_operations() client.test_time_service() client.test_introspection() client.test_multicall() client.test_fault_handling()if __name__ == '__main__': main()
6. gRPC实现
安装gRPC
pip install grpcio grpcio-tools
定义Protocol Buffers接口
// calculator.protosyntax = "proto3";package calculator;service Calculator { // 一元RPC rpc Add (AddRequest) returns (AddResponse) {} rpc Multiply (MultiplyRequest) returns (MultiplyResponse) {} // 服务器流式RPC rpc PrimeFactors (PrimeFactorsRequest) returns (stream PrimeFactorsResponse) {} // 客户端流式RPC rpc ComputeAverage (stream ComputeAverageRequest) returns (ComputeAverageResponse) {} // 双向流式RPC rpc FindMax (stream FindMaxRequest) returns (stream FindMaxResponse) {}}message AddRequest { int32 a = 1; int32 b = 2;}message AddResponse { int32 result = 1;}message MultiplyRequest { int32 a = 1; int32 b = 2;}message MultiplyResponse { int32 result = 1;}message PrimeFactorsRequest { int32 number = 1;}message PrimeFactorsResponse { int32 factor = 1;}message ComputeAverageRequest { int32 number = 1;}message ComputeAverageResponse { double average = 1;}message FindMaxRequest { int32 number = 1;}message FindMaxResponse { int32 current_max = 1;}
生成Python代码
python -m grpc_tools.protoc \ -I. \ --python_out=. \ --grpc_python_out=. \ calculator.proto
gRPC服务器实现
# grpc_server.pyimport grpcfrom concurrent import futuresimport timeimport mathimport calculator_pb2import calculator_pb2_grpcclass CalculatorServicer(calculator_pb2_grpc.CalculatorServicer): def Add(self, request, context): """一元RPC:加法""" result = request.a + request.b print(f"Add: {request.a} + {request.b} = {result}") return calculator_pb2.AddResponse(result=result) def Multiply(self, request, context): """一元RPC:乘法""" result = request.a * request.b print(f"Multiply: {request.a} * {request.b} = {result}") return calculator_pb2.MultiplyResponse(result=result) def PrimeFactors(self, request, context): """服务器流式RPC:计算质因数""" number = request.number print(f"PrimeFactors: 分解 {number}") # 分解质因数 n = number divisor = 2 while n > 1: while n % divisor == 0: yield calculator_pb2.PrimeFactorsResponse(factor=divisor) n //= divisor divisor += 1 if divisor * divisor > n: if n > 1: yield calculator_pb2.PrimeFactorsResponse(factor=n) break def ComputeAverage(self, request_iterator, context): """客户端流式RPC:计算平均数""" total = 0 count = 0 for request in request_iterator: total += request.number count += 1 print(f"ComputeAverage: 收到 {request.number}, 当前总数 {total}, 计数 {count}") average = total / count if count > 0 else 0 print(f"ComputeAverage: 平均数 = {average}") return calculator_pb2.ComputeAverageResponse(average=average) def FindMax(self, request_iterator, context): """双向流式RPC:寻找最大值""" current_max = None for request in request_iterator: number = request.number print(f"FindMax: 收到 {number}") if current_max is None or number > current_max: current_max = number print(f"FindMax: 更新最大值 = {current_max}") yield calculator_pb2.FindMaxResponse(current_max=current_max)def serve(): """启动gRPC服务器""" server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) calculator_pb2_grpc.add_CalculatorServicer_to_server(CalculatorServicer(), server) # 监听端口 server.add_insecure_port('[::]:50051') server.start() print("gRPC服务器启动,监听端口 50051") try: # 保持服务器运行 while True: time.sleep(86400) # 24小时 except KeyboardInterrupt: server.stop(0)if __name__ == '__main__': serve()
gRPC客户端实现
# grpc_client.pyimport grpcimport calculator_pb2import calculator_pb2_grpcimport timeclass CalculatorClient: def __init__(self, host='localhost', port=50051): self.channel = grpc.insecure_channel(f'{host}:{port}') self.stub = calculator_pb2_grpc.CalculatorStub(self.channel) def test_unary_rpc(self): """测试一元RPC""" print("=== 测试一元RPC ===") # 加法 response = self.stub.Add(calculator_pb2.AddRequest(a=10, b=20)) print(f"10 + 20 = {response.result}") # 乘法 response = self.stub.Multiply(calculator_pb2.MultiplyRequest(a=10, b=20)) print(f"10 * 20 = {response.result}") def test_server_streaming(self): """测试服务器流式RPC""" print("\n=== 测试服务器流式RPC ===") request = calculator_pb2.PrimeFactorsRequest(number=360) responses = self.stub.PrimeFactors(request) print(f"360的质因数分解:") factors = [] for response in responses: factors.append(str(response.factor)) print(f"360 = {' × '.join(factors)}") def test_client_streaming(self): """测试客户端流式RPC""" print("\n=== 测试客户端流式RPC ===") def generate_numbers(): numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] for num in numbers: yield calculator_pb2.ComputeAverageRequest(number=num) time.sleep(0.1) # 模拟延迟 response = self.stub.ComputeAverage(generate_numbers()) print(f"1到10的平均数: {response.average}") def test_bidirectional_streaming(self): """测试双向流式RPC""" print("\n=== 测试双向流式RPC ===") def generate_numbers(): numbers = [3, 1, 4, 1, 5, 9, 2, 6, 5] for num in numbers: yield calculator_pb2.FindMaxRequest(number=num) time.sleep(0.2) # 模拟延迟 responses = self.stub.FindMax(generate_numbers()) print("发送序列: 3, 1, 4, 1, 5, 9, 2, 6, 5") print("实时最大值:") for response in responses: print(f" 当前最大值: {response.current_max}") def run_all_tests(self): """运行所有测试""" self.test_unary_rpc() self.test_server_streaming() self.test_client_streaming() self.test_bidirectional_streaming()def main(): client = CalculatorClient() client.run_all_tests()if __name__ == '__main__': main()
7. 实现自定义RPC框架
简单的JSON-RPC框架
# simple_rpc.pyimport jsonimport socketimport threadingfrom typing import Dict, Any, Callablefrom dataclasses import dataclassfrom enum import Enumclass RPCErrorCode(Enum): PARSE_ERROR = -32700 INVALID_REQUEST = -32600 METHOD_NOT_FOUND = -32601 INVALID_PARAMS = -32602 INTERNAL_ERROR = -32603 SERVER_ERROR = -32000@dataclassclass RPCRequest: jsonrpc: str = "2.0" method: str = None params: list = None id: Any = None def to_dict(self): return { "jsonrpc": self.jsonrpc, "method": self.method, "params": self.params or [], "id": self.id } @classmethod def from_dict(cls, data): return cls( jsonrpc=data.get("jsonrpc", "2.0"), method=data.get("method"), params=data.get("params"), id=data.get("id") )@dataclassclass RPCResponse: jsonrpc: str = "2.0" result: Any = None error: Dict[str, Any] = None id: Any = None def to_dict(self): response = {"jsonrpc": self.jsonrpc} if self.error: response["error"] = self.error else: response["result"] = self.result if self.id is not None: response["id"] = self.id return response @classmethod def success(cls, result, request_id): return cls(result=result, id=request_id) @classmethod def error_response(cls, code, message, request_id=None): return cls( error={"code": code.value, "message": message}, id=request_id )class SimpleRPCServer: def __init__(self, host='localhost', port=8080): self.host = host self.port = port self.methods = {} self.server_socket = None self.running = False def register_method(self, name: str, func: Callable): """注册RPC方法""" self.methods[name] = func def register(self, name=None): """装饰器注册方法""" def decorator(func): method_name = name or func.__name__ self.register_method(method_name, func) return func return decorator def handle_request(self, data: str) -> str: """处理单个请求""" try: request_dict = json.loads(data) request = RPCRequest.from_dict(request_dict) # 验证请求 if request.method is None: return json.dumps(RPCResponse.error_response( RPCErrorCode.INVALID_REQUEST, "Missing method" ).to_dict()) if request.method not in self.methods: return json.dumps(RPCResponse.error_response( RPCErrorCode.METHOD_NOT_FOUND, f"Method '{request.method}' not found", request.id ).to_dict()) # 执行方法 try: method = self.methods[request.method] params = request.params or [] if isinstance(params, list): result = method(*params) elif isinstance(params, dict): result = method(**params) else: result = method() response = RPCResponse.success(result, request.id) return json.dumps(response.to_dict()) except Exception as e: return json.dumps(RPCResponse.error_response( RPCErrorCode.INTERNAL_ERROR, f"Internal error: {str(e)}", request.id ).to_dict()) except json.JSONDecodeError: return json.dumps(RPCResponse.error_response( RPCErrorCode.PARSE_ERROR, "Parse error" ).to_dict()) def handle_connection(self, client_socket, address): """处理客户端连接""" print(f"客户端连接: {address}") try: # 接收数据 data = client_socket.recv(4096).decode('utf-8') if not data: return # 处理请求 response = self.handle_request(data) # 发送响应 client_socket.send(response.encode('utf-8')) except Exception as e: print(f"处理连接错误: {e}") finally: client_socket.close() def start(self): """启动服务器""" self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.running = True print(f"Simple RPC服务器启动在 {self.host}:{self.port}") try: while self.running: client_socket, address = self.server_socket.accept() thread = threading.Thread( target=self.handle_connection, args=(client_socket, address) ) thread.daemon = True thread.start() except KeyboardInterrupt: print("\n服务器关闭中...") finally: self.stop() def stop(self): """停止服务器""" self.running = False if self.server_socket: self.server_socket.close()# 使用示例if __name__ == '__main__': # 创建服务器实例 server = SimpleRPCServer(port=9090) # 注册方法 @server.register() def add(a, b): return a + b @server.register("multiply") def multiply_numbers(a, b): return a * b @server.register("greet") def greet(name, greeting="Hello"): return f"{greeting}, {name}!" class MathService: @staticmethod @server.register("square") def square(x): return x * x # 启动服务器 server.start()
8. RPC实践
1. 版本管理
# 在接口定义中添加版本class CalculatorServiceV1: def add(self, a, b): return a + bclass CalculatorServiceV2: def add(self, a, b, c=0): return a + b + c
2. 超时与重试
import grpcfrom tenacity import retry, stop_after_attempt, wait_exponentialclassResilientRPCClient: def __init__(self): options = [ ('grpc.enable_retries', 1), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "calculator.Calculator"}], ' '"retryPolicy": {"maxAttempts": 5, "initialBackoff": "0.1s", ' '"maxBackoff": "1s", "backoffMultiplier": 2, ' '"retryableStatusCodes": ["UNAVAILABLE"]}}]}') ] self.channel = grpc.insecure_channel( 'localhost:50051', options=options ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def call_with_retry(self, method, *args): return method(*args)
3. 认证与授权
# gRPC SSL/TLS加密import grpcfrom grpc import ssl_channel_credentialsdef create_secure_channel(): # 加载证书 with open('server.crt', 'rb') as f: trusted_certs = f.read() # 创建凭证 credentials = ssl_channel_credentials( root_certificates=trusted_certs ) # 创建安全通道 channel = grpc.secure_channel( 'localhost:50051', credentials ) return channel# Token认证class AuthInterceptor(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): metadata = dict(handler_call_details.invocation_metadata) # 检查Token if 'authorization' not in metadata: raise grpc.RpcError(grpc.StatusCode.UNAUTHENTICATED) token = metadata['authorization'] if not validate_token(token): raise grpc.RpcError(grpc.StatusCode.UNAUTHENTICATED) return continuation(handler_call_details)
4. 监控与日志
import loggingimport timefrom functools import wrapsdef rpc_logger(func): """RPC方法日志装饰器""" @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() method_name = func.__name__ logging.info(f"RPC调用开始: {method_name}") try: result = func(*args, **kwargs) duration = time.time() - start_time logging.info( f"RPC调用成功: {method_name}, " f"耗时: {duration:.3f}秒" ) return result except Exception as e: duration = time.time() - start_time logging.error( f"RPC调用失败: {method_name}, " f"错误: {e}, 耗时: {duration:.3f}秒" ) raise return wrapper
9. 性能优化
1. 连接池
import grpcfrom concurrent.futures import ThreadPoolExecutorclass ConnectionPool: def __init__(self, max_size=10): self.pool = [] self.max_size = max_size self.lock = threading.Lock() def get_channel(self, address): with self.lock: # 查找空闲连接 for channel, in_use in self.pool: if not in_use: self.pool.remove((channel, False)) self.pool.append((channel, True)) return channel # 创建新连接 if len(self.pool) < self.max_size: channel = grpc.insecure_channel(address) self.pool.append((channel, True)) return channel # 等待空闲连接 raise Exception("连接池已满") def release_channel(self, channel): with self.lock: for i, (ch, in_use) in enumerate(self.pool): if ch == channel: self.pool[i] = (channel, False) break
2. 批量调用
class BatchRPCClient: def __init__(self): self.batch_requests = [] def add_request(self, method, *args): self.batch_requests.append((method, args)) def execute_batch(self): """执行批量调用""" results = [] for method, args in self.batch_requests: try: result = method(*args) results.append(("success", result)) except Exception as e: results.append(("error", str(e))) self.batch_requests.clear() return results
3. 压缩传输
# gRPC压缩import grpcimport zlib# 客户端压缩options = [ ('grpc.default_compression_algorithm', grpc.Compression.Gzip)]# 手动压缩def compress_data(data): return zlib.compress(json.dumps(data).encode())def decompress_data(compressed_data): return json.loads(zlib.decompress(compressed_data).decode())
10. 测试RPC服务
单元测试
import unittestfrom unittest.mock import Mock, patchimport grpcclass TestCalculatorService(unittest.TestCase): def setUp(self): self.service = CalculatorServicer() def test_add(self): # 创建模拟上下文 mock_context = Mock() # 创建请求 request = calculator_pb2.AddRequest(a=10, b=20) # 调用方法 response = self.service.Add(request, mock_context) # 验证结果 self.assertEqual(response.result, 30) @patch('your_module.some_dependency') def test_with_mock(self, mock_dependency): mock_dependency.return_value = 42 request = calculator_pb2.SomeRequest() response = self.service.SomeMethod(request, Mock()) self.assertEqual(response.result, 42)class TestRPCClient(unittest.TestCase): def test_client_retry(self): # 测试重试逻辑 client = ResilientRPCClient() # 模拟失败然后成功的调用 mock_stub = Mock() mock_stub.some_method.side_effect = [ grpc.RpcError(grpc.StatusCode.UNAVAILABLE), "success_result" ] result = client.call_with_retry(mock_stub.some_method, "arg") self.assertEqual(result, "success_result") self.assertEqual(mock_stub.some_method.call_count, 2)if __name__ == '__main__': unittest.main()
集成测试
import subprocessimport timeimport requestsclass RPCIntegrationTest(unittest.TestCase): @classmethod def setUpClass(cls): # 启动测试服务器 cls.server_process = subprocess.Popen( ['python', 'test_server.py'], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) time.sleep(2) # 等待服务器启动 @classmethod def tearDownClass(cls): # 关闭服务器 cls.server_process.terminate() cls.server_process.wait() def test_server_responsive(self): # 测试服务器响应 response = requests.get('http://localhost:8000/health') self.assertEqual(response.status_code, 200) def test_rpc_methods(self): # 测试RPC方法 client = xmlrpc.client.ServerProxy('http://localhost:8000') result = client.add(5, 3) self.assertEqual(result, 8)
总结
RPC是分布式系统中服务通信的重要方式。通过学习今天的内容,你应该掌握:
关键要点
RPC使远程调用像本地调用一样简单
选择合适的RPC框架取决于项目需求
gRPC适合高性能、跨语言场景
XML-RPC适合简单、快速开发的场景
始终考虑安全性、可观察性和错误处理
实践任务
使用gRPC实现一个简单的聊天服务
为现有的XML-RPC服务添加认证机制
实现一个支持批量调用的RPC客户端
为RPC服务添加监控和日志记录
明天我们将进入Python网络编程的另一个重要主题:HTTP协议实现。