当单体应用变得臃肿,微服务是一个选择。
今天聊聊如何将 PHP 单体应用拆分为微服务。
什么时候需要微服务
单体应用的问题
微服务不是银弹
微服务带来的复杂性:
建议:日活 10 万以下,团队 10 人以下,优先考虑单体应用。
拆分原则
1. 按业务领域拆分
电商系统拆分:
├── 用户服务(user-service)
├── 商品服务(product-service)
├── 订单服务(order-service)
├── 支付服务(payment-service)
├── 库存服务(inventory-service)
└── 通知服务(notification-service)
2. 单一职责
每个服务只负责一个业务领域。
3. 数据独立
每个服务有自己的数据库,不共享数据库。
❌ 错误:多个服务共享数据库
用户服务 ─┐
商品服务 ─┼─→ 共享数据库
订单服务 ─┘
✅ 正确:每个服务独立数据库
用户服务 → 用户数据库
商品服务 → 商品数据库
订单服务 → 订单数据库
服务间通信
1. HTTP/REST
最简单的方式。
// 订单服务调用用户服务
classUserClient
{
publicfunction__construct(
private HttpClientInterface $http,
private string $baseUrl
){}
publicfunctiongetUser(int $id): ?array
{
$response = $this->http->request('GET', "{$this->baseUrl}/users/{$id}");
if ($response->getStatusCode() !== 200) {
returnnull;
}
return json_decode($response->getBody(), true);
}
}
// 使用
$userClient = new UserClient($http, 'http://user-service:8080');
$user = $userClient->getUser(123);
2. 消息队列
异步通信,解耦服务。
// 订单服务发布事件
$queue->publish('order.created', [
'order_id' => $order->id,
'user_id' => $order->user_id,
'total' => $order->total,
]);
// 库存服务订阅事件
$queue->subscribe('order.created', function($message){
$orderId = $message['order_id'];
// 扣减库存
});
// 通知服务订阅事件
$queue->subscribe('order.created', function($message){
$userId = $message['user_id'];
// 发送通知
});
3. gRPC
高性能的 RPC 框架(下一篇详细讲)。
服务发现
服务实例动态变化,需要服务发现机制。
客户端发现
classServiceDiscovery
{
privatearray $services = [];
publicfunctionregister(string $name, string $host, int $port): void
{
$this->services[$name][] = [
'host' => $host,
'port' => $port,
];
}
publicfunctiondiscover(string $name): ?array
{
$instances = $this->services[$name] ?? [];
if (empty($instances)) {
returnnull;
}
// 简单的随机负载均衡
return $instances[array_rand($instances)];
}
}
使用 Consul
classConsulServiceDiscovery
{
publicfunction__construct(private ConsulClient $consul){}
publicfunctionregister(string $name, string $host, int $port): void
{
$this->consul->agent()->registerService([
'ID' => "{$name}-{$host}-{$port}",
'Name' => $name,
'Address' => $host,
'Port' => $port,
'Check' => [
'HTTP' => "http://{$host}:{$port}/health",
'Interval' => '10s',
],
]);
}
publicfunctiondiscover(string $name): array
{
$services = $this->consul->health()->service($name, ['passing' => true]);
return array_map(fn($s) => [
'host' => $s['Service']['Address'],
'port' => $s['Service']['Port'],
], $services);
}
}
API 网关
统一入口,处理认证、限流、路由。
classApiGateway
{
privatearray $routes = [];
private ServiceDiscovery $discovery;
publicfunction__construct(ServiceDiscovery $discovery)
{
$this->discovery = $discovery;
$this->routes = [
'/users/*' => 'user-service',
'/products/*' => 'product-service',
'/orders/*' => 'order-service',
];
}
publicfunctionhandle(Request $request): Response
{
// 1. 认证
if (!$this->authenticate($request)) {
returnnew Response(401, 'Unauthorized');
}
// 2. 限流
if (!$this->rateLimit($request)) {
returnnew Response(429, 'Too Many Requests');
}
// 3. 路由
$service = $this->matchService($request->getPath());
if (!$service) {
returnnew Response(404, 'Not Found');
}
// 4. 服务发现
$instance = $this->discovery->discover($service);
if (!$instance) {
returnnew Response(503, 'Service Unavailable');
}
// 5. 转发请求
return$this->forward($request, $instance);
}
privatefunctionmatchService(string $path): ?string
{
foreach ($this->routes as $pattern => $service) {
if (fnmatch($pattern, $path)) {
return $service;
}
}
returnnull;
}
}
分布式事务
Saga 模式
classCreateOrderSaga
{
publicfunctionexecute(array $orderData): void
{
$steps = [];
try {
// 步骤 1:创建订单
$order = $this->orderService->create($orderData);
$steps[] = ['service' => 'order', 'action' => 'create', 'data' => $order];
// 步骤 2:扣减库存
$this->inventoryService->decrease($order->product_id, $order->quantity);
$steps[] = ['service' => 'inventory', 'action' => 'decrease', 'data' => $order];
// 步骤 3:扣减余额
$this->walletService->deduct($order->user_id, $order->total);
$steps[] = ['service' => 'wallet', 'action' => 'deduct', 'data' => $order];
} catch (Exception $e) {
// 补偿:回滚已执行的步骤
$this->compensate($steps);
throw $e;
}
}
privatefunctioncompensate(array $steps): void
{
foreach (array_reverse($steps) as $step) {
match ($step['service']) {
'order' => $this->orderService->cancel($step['data']->id),
'inventory' => $this->inventoryService->increase(
$step['data']->product_id,
$step['data']->quantity
),
'wallet' => $this->walletService->refund(
$step['data']->user_id,
$step['data']->total
),
};
}
}
}
配置中心
classConfigCenter
{
private ConsulClient $consul;
privatearray $cache = [];
publicfunctionget(string $key, $default = null)
{
if (isset($this->cache[$key])) {
return$this->cache[$key];
}
$value = $this->consul->kv()->get($key);
if ($value === null) {
return $default;
}
$this->cache[$key] = $value;
return $value;
}
publicfunctionset(string $key, $value): void
{
$this->consul->kv()->put($key, $value);
$this->cache[$key] = $value;
}
}
// 使用
$config = new ConfigCenter($consul);
$dbHost = $config->get('database/host', 'localhost');
链路追踪
classTraceMiddleware
{
publicfunctionhandle(Request $request, callable $next): Response
{
// 获取或生成 Trace ID
$traceId = $request->getHeader('X-Trace-ID') ?: $this->generateTraceId();
$spanId = $this->generateSpanId();
// 设置上下文
Context::set('trace_id', $traceId);
Context::set('span_id', $spanId);
$startTime = microtime(true);
try {
$response = $next($request);
// 记录追踪信息
$this->record([
'trace_id' => $traceId,
'span_id' => $spanId,
'service' => config('app.name'),
'path' => $request->getPath(),
'duration' => microtime(true) - $startTime,
'status' => $response->getStatusCode(),
]);
return $response->withHeader('X-Trace-ID', $traceId);
} catch (Exception $e) {
$this->record([
'trace_id' => $traceId,
'span_id' => $spanId,
'error' => $e->getMessage(),
]);
throw $e;
}
}
}
总结
拆分建议:
下一篇我们来聊 gRPC + PHP。