摘要:日志散落在几十台服务器上,查一个问题要登录 N 台机器 grep 半天——这是每个后端工程师的噩梦。本文带你用 PHP + Kafka 从零搭建一套高吞吐日志收集系统,包含 Producer、Consumer、批量压缩、分区策略完整代码,吞吐量从 500条/s 飙升到 20万条/s,彻底告别日志地狱。(约5200字)
一、痛点:你的日志还在写文件吗?
大多数 PHP 项目的日志收集姿势:
// 😱 传统做法:每台服务器各写各的
// server-01:/var/log/app.log
// server-02:/var/log/app.log
// server-03:/var/log/app.log
// ...
Log::info("订单创建成功", ['order_id' => 12345]);
// 查问题流程:
// 1. 先猜是哪台机器出了问题
// 2. SSH 上去 grep "order_id=12345"
// 3. 如果猜错了,再换一台……
这套方案的四宗罪:
今天的目标:用 PHP + Kafka 搭建一套集中式日志收集系统,让所有日志汇聚到一处,实时可查、可分析、可告警。
二、为什么选 Kafka?
先看三种主流方案对比:
| | | | |
|---|
| 直接写 Elasticsearch | | | | |
| Redis List + 消费者 | | | | |
| Kafka + 消费者 | 200,000条/s+ | 高(磁盘持久化) | 极高 | |
Kafka 的核心优势:
┌──────────────┐
server-01 ──────▶│ │
server-02 ──────▶│ Kafka │──────▶ Consumer ──▶ Elasticsearch
server-03 ──────▶│ Cluster │──────▶ Consumer ──▶ 告警系统
server-04 ──────▶│ │──────▶ Consumer ──▶ 数据湖
└──────────────┘
一条消息,多个消费者各自消费——这就是 Kafka 的"发布-订阅"威力。
Kafka 核心概念速览
| |
|---|
| Topic | 消息类别,比如 app-logs、error-logs |
| Partition | |
| Broker | |
| Producer | |
| Consumer | |
| Consumer Group | 消费者组,组内每个 Consumer 分到不同 Partition |
三、环境准备:Docker Compose 一键搭建
3.1 启动 Kafka 集群
# docker-compose.yml
version:'3.8'
services:
zookeeper:
image:confluentinc/cp-zookeeper:7.5.0
container_name:zookeeper
environment:
ZOOKEEPER_CLIENT_PORT:2181
ZOOKEEPER_TICK_TIME:2000
ports:
-"2181:2181"
kafka:
image:confluentinc/cp-kafka:7.5.0
container_name:kafka
depends_on:
-zookeeper
ports:
-"9092:9092"
environment:
KAFKA_BROKER_ID:1
KAFKA_ZOOKEEPER_CONNECT:zookeeper:2181
KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1
# 日志保留策略:保留7天或最大50GB
KAFKA_LOG_RETENTION_HOURS:168
KAFKA_LOG_RETENTION_BYTES:53687091200
# 一键启动
docker-compose up -d
# 验证 Kafka 是否正常运行
docker exec kafka kafka-topics --bootstrap-server localhost:9092 --list
3.2 安装 PHP rdkafka 扩展
rdkafka 是基于 C 库 librdkafka 的 PHP 扩展,性能远超纯 PHP 的 Composer 包:
# 方式一:pecl 安装(推荐)
pecl install rdkafka
# 方式二:源码编译
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make && make install
# 在 php.ini 中启用
# extension=rdkafka.so
验证安装:
php -m | grep rdkafka
# 输出:rdkafka ✅
php --ri rdkafka
# 查看 librdkafka 版本 ≥ 1.5.3
四、核心实现:从 Producer 到 Consumer
4.1 创建 Kafka Topic
# 创建 app-logs topic,3个分区,副本因子1(生产环境建议≥3)
docker exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic app-logs \
--partitions 3 \
--replication-factor 1
# 查看 topic 详情
docker exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic app-logs
4.2 日志 Producer:高性能写入
<?php
// src/Kafka/LogProducer.php
namespaceApp\Kafka;
useRdKafka\Producer;
useRdKafka\Conf;
classLogProducer
{
private Producer $producer;
// 日志级别
constLEVEL_DEBUG = 100;
constLEVEL_INFO = 200;
constLEVEL_ERROR = 400;
// Kafka Topic
privatestring$topic = 'app-logs';
publicfunction__construct(string$brokers = 'localhost:9092')
{
$conf = newConf();
// ===== 核心配置 =====
// 1. Broker 地址
$conf->set('metadata.broker.list', $brokers);
// 2. 确认机制:ack=-1 表示所有副本确认(最高可靠性)
// ack=0 不等待确认(最快但可能丢)
// ack=1 Leader 确认(折中)
$conf->set('acks', '1');
// 3. 重试次数(生产环境建议 ≥3)
$conf->set('message.send.max.retries', '3');
// 重试间隔(毫秒)
$conf->set('retry.backoff.ms', '100');
// 4. 压缩算法:lz4(压缩/解压速度最快,压缩率中等)
// 可选:none / gzip / snappy / lz4 / zstd
$conf->set('compression.codec', 'lz4');
// 5. 批量发送:积累到一定数量后一起发送(减少网络开销)
$conf->set('batch.num.messages', '1000'); // 累积1000条
$conf->set('queue.buffering.max.ms', '5'); // 或等5毫秒
$conf->set('queue.buffering.max.kbytes', '1024'); // 或累积1MB
// 6. 压缩级别(仅 gzip / zstd 有效)
// $conf->set('compression.level', '6'); // -1=默认
// 7. 幂等生产者(保证消息不重复)
$conf->set('enable.idempotence', 'true');
$this->producer = newProducer($conf);
}
/**
* 发送单条日志
*
* @param string $level 日志级别(debug/info/error)
* @param string $message 日志内容
* @param array $context 上下文数据
* @param string|null $key 分区键(同 key 的消息去同一分区,保证有序)
*/
publicfunctionsend(string$level, string$message, array$context = [], ?string$key = null): void
{
$payload = json_encode([
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => (int)(microtime(true) * 1000), // 毫秒时间戳
'hostname' => gethostname(),
'php_version'=> PHP_VERSION,
'trace_id' => $context['trace_id'] ?? bin2hex(random_bytes(16)),
], JSON_UNESCAPED_UNICODE);
$topic = $this->producer->newTopic($this->topic);
// 发送消息
// 分区策略:有 key → hash(key) % partition_count
// 无 key → round-robin(轮询)
$topic->produce(
partition: RD_KAFKA_PARTITION_UA, // 自动选择分区
msgflags: 0,
payload: $payload,
key: $key, // 同用户/同订单的日志去同一分区
);
// 触发内部队列发送(异步模式可选)
// $this->producer->poll(0);
}
/**
* 批量发送日志(推荐高频场景使用)
*
* @param array $logs [['level' => 'info', 'message' => '...', 'context' => []], ...]
*/
publicfunctionsendBatch(array$logs): void
{
$topic = $this->producer->newTopic($this->topic);
foreach ($logsas$log) {
$payload = json_encode([
'level' => $log['level'],
'message' => $log['message'],
'context' => $log['context'] ?? [],
'timestamp' => (int)(microtime(true) * 1000),
'hostname' => gethostname(),
'trace_id' => $log['context']['trace_id'] ?? bin2hex(random_bytes(16)),
], JSON_UNESCAPED_UNICODE);
$topic->produce(
RD_KAFKA_PARTITION_UA,
0,
$payload,
$log['key'] ?? null,
);
}
}
/**
* 优雅关闭,确保缓冲区消息全部发送
*/
publicfunctionflush(int$timeoutMs = 10000): void
{
// flush 会阻塞直到所有消息发送成功或超时
$result = $this->producer->flush($timeoutMs);
if ($result === RD_KAFKA_RESP_ERR_NO_ERROR) {
echo"[Producer] ✅ 所有消息已成功发送\n";
} else {
echo"[Producer] ⚠️ 尚有 {$result} 条消息未发送(超时)\n";
}
}
publicfunction__destruct()
{
// 析构时确保刷新缓冲区
$this->flush(5000);
}
}
4.3 在业务代码中使用
<?php
// 初始化(单例模式,全局复用)
$logProducer = new\App\Kafka\LogProducer('localhost:9092');
// ===== 用法1:普通日志 =====
$logProducer->send('info', '用户登录成功', [
'user_id' => 12345,
'ip' => $_SERVER['REMOTE_ADDR'] ?? 'unknown',
'ua' => $_SERVER['HTTP_USER_AGENT'] ?? 'unknown',
], key: 'user_12345'); // 同一用户的日志归入同一分区
// ===== 用法2:错误日志 =====
$logProducer->send('error', '数据库连接超时', [
'query' => 'SELECT ...',
'elapsed' => 5000, // ms
'trace_id'=> 'abc-123',
], key: 'db_errors');
// ===== 用法3:批量发送(高吞吐场景) =====
$batch = [];
for ($i = 0; $i < 1000; $i++) {
$batch[] = [
'level' => 'debug',
'message' => '批量日志 #' . $i,
'context' => ['batch_id' => uniqid()],
'key' => 'batch_' . date('Ymd'),
];
}
$logProducer->sendBatch($batch);
// ===== 用法4:请求结束时刷新(兜底) =====
register_shutdown_function(function () use ($logProducer) {
$logProducer->flush();
});
4.4 消费者:多进程 + 协程写入 Elasticsearch
<?php
// src/Kafka/LogConsumer.php
namespaceApp\Kafka;
useRdKafka\KafkaConsumer;
useRdKafka\Conf;
classLogConsumer
{
private KafkaConsumer $consumer;
// 批量写入大小(积累到500条或等1秒后写入ES)
privateconstBULK_SIZE = 500;
privateconstFLUSH_MS = 1000; // 1秒
privatearray$buffer = [];
privateint$lastFlush = 0;
publicfunction__construct(
string$brokers = 'localhost:9092',
string$groupId = 'log-consumer-group',
array$topics = ['app-logs'],
) {
$conf = newConf();
// ===== 消费者核心配置 =====
$conf->set('metadata.broker.list', $brokers);
$conf->set('group.id', $groupId);
// auto.offset.reset:
// earliest → 从最早的消息开始(新消费者组)
// latest → 从最新的消息开始(跳过历史)
$conf->set('auto.offset.reset', 'latest');
// 自动提交 offset(生产环境建议手动提交)
$conf->set('enable.auto.commit', 'true');
$conf->set('auto.commit.interval.ms', '1000');
// 每次最多拉取的消息数
$conf->set('max.poll.records', '500');
// 心跳超时(默认3秒,建议≥10秒)
$conf->set('session.timeout.ms', '30000');
// 消费位置策略:先存 offset 还是先处理消息
// (此处用自动提交,更复杂场景建议手动提交 + 幂等写入)
$conf->set('enable.auto.offset.store', 'false');
$this->consumer = newKafkaConsumer($conf);
$this->consumer->subscribe($topics);
$this->lastFlush = time();
}
/**
* 开始消费(守护进程持续运行)
*/
publicfunctionconsume(): void
{
echo"[Consumer] 开始监听 Kafka 日志流...\n";
echo"[Consumer] Topic: app-logs | Group: log-consumer-group\n";
echostr_repeat('-', 50) . "\n";
while (true) {
// 拉取消息(超时1秒)
$message = $this->consumer->consume(1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$this->processMessage($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// 消费到分区末尾,正常现象,继续等待
$this->flushIfNeeded();
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// 拉取超时(1秒内无新消息),正常
$this->flushIfNeeded();
break;
default:
echo"[Consumer] ⚠️ 错误:{$message->errstr()}\n";
break;
}
}
}
/**
* 处理单条消息(加入批量缓冲区)
*/
privatefunctionprocessMessage(\RdKafka\Message $message): void
{
$data = json_decode($message->payload, true);
if ($data === null) {
// 解析失败,跳过
return;
}
// 加入批量缓冲区
$this->buffer[] = $data;
// 缓冲区满,立即写入
if (count($this->buffer) >= self::BULK_SIZE) {
$this->bulkWrite();
}
}
/**
* 时间到了也强制写入(防止低频日志长时间不落库)
*/
privatefunctionflushIfNeeded(): void
{
if (!empty($this->buffer) && time() - $this->lastFlush >= 1) {
$this->bulkWrite();
}
}
/**
* 批量写入(此处示例写 Elasticsearch,实际可替换为其他存储)
*/
privatefunctionbulkWrite(): void
{
if (empty($this->buffer)) {
return;
}
$count = count($this->buffer);
try {
// 这里调用 Elasticsearch Bulk API(代码省略,参考之前的 ES 文章)
// $this->elasticsearch->bulk($this->buffer);
echo"[Consumer] ✅ 写入 {$count} 条日志到 ES\n";
} catch (\Throwable$e) {
// 写入失败——关键:不要丢消息!
// 策略1:写回 Kafka 另一个 topic(死信队列)
// 策略2:写入本地文件作为兜底
echo"[Consumer] ❌ 写入失败:{$e->getMessage()},已保存到本地兜底文件\n";
$this->saveToFallbackFile($this->buffer);
}
// 清空缓冲区
$this->buffer = [];
$this->lastFlush = time();
}
/**
* 兜底:写入本地文件(防止 ES 宕机时丢日志)
*/
privatefunctionsaveToFallbackFile(array$logs): void
{
$dir = '/var/log/kafka-fallback';
if (!is_dir($dir)) {
mkdir($dir, 0755, true);
}
$filename = $dir . '/fallback-' . date('Ymd-Hi') . '.json';
file_put_contents($filename, json_encode($logs, JSON_UNESCAPED_UNICODE) . "\n", FILE_APPEND);
}
}
4.5 启动消费者(Supervisor 守护进程配置)
; /etc/supervisor/conf.d/kafka-log-consumer.conf
[program:kafka-log-consumer]
command=php /var/www/html/bin/console app:kafka-consume
directory=/var/www/html
user=www-data
autostart=true
autorestart=true
startretries=10
numprocs=3; 启动3个消费者进程(≤ Partition 数量)
process_name=%(program_name)s_%(process_num)02d
; stdout 日志
stdout_logfile=/var/log/supervisor/kafka-consumer-out.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=5
; stderr 日志
stderr_logfile=/var/log/supervisor/kafka-consumer-error.log
stderr_logfile_maxbytes=100MB
stderr_logfile_backups=5
; 退出时等待优雅关闭
stopwaitsecs=30; 等30秒让消费者处理完当前批次
stopsignal=QUIT
五、进阶:生产者性能调优
5.1 分区策略选择
// ====== 策略1:按用户ID分区(同用户的日志有序) ======
$logProducer->send('info', '用户下单', $context, key: 'user_' . $userId);
// 同一个用户的日志永远在同一个分区,保证顺序
// ====== 策略2:按服务器IP分区(查看单机日志) ======
$logProducer->send('info', '请求处理', $context, key: gethostname());
// 同一台服务器的日志集中在一个分区,便于排查机器问题
// ====== 策略3:默认轮询(负载均衡) ======
$logProducer->send('info', '通用日志', $context); // key=null,轮询
// 日志均匀分布在各分区,吞吐量最高
5.2 压缩算法性能对比
在 8 核 16G 服务器上,发送 100 万条日志(每条 500 字节)实测:
结论:日志场景推荐 lz4,CPU 开销低、吞吐量最高,压缩率足够。磁盘紧张可选 zstd。
5.3 批量发送 vs 逐条发送
// ====== ❌ 逐条发送:20,000 条日志 ======
$start = microtime(true);
for ($i = 0; $i < 20000; $i++) {
$logProducer->send('info', "日志 #{$i}");
$logProducer->flush(0); // 每次立即发——灾难!
}
$elapsed = microtime(true) - $start;
// 结果:~18 秒,约 1,100 条/s
// ====== ✅ 批量发送:20,000 条日志 ======
$start = microtime(true);
$batch = [];
for ($i = 0; $i < 20000; $i++) {
$batch[] = ['level' => 'info', 'message' => "日志 #{$i}"];
}
$logProducer->sendBatch($batch);
$logProducer->flush();
$elapsed = microtime(true) - $start;
// 结果:~0.15 秒,约 133,000 条/s
// 🔥 性能提升:120 倍!
六、性能实测对比
在 8 核 16G 服务器上,模拟 10 个 PHP 进程各发送 10 万条日志(共 100 万条):
| | | | |
|---|
| 直接写文件 | | | | |
| Syslog + rsyslog | | | | |
| Redis List + 消费者 | | | | |
| PHP Kafka (逐条) | | | | |
| PHP Kafka (批量+lz4) | 200,000条/s+ | | 极低 | |
结论:Kafka 批量模式在吞吐量上领先其他方案 25~400 倍,同时 CPU 和磁盘 IO 压力最低。
七、生产五大坑,踩过一个算我输
坑1:消费者数量 > Partition 数量
现象:启动了 5 个消费者,但只有 3 个在工作。
原因:Kafka 的分区分配规则——一个 Partition 只能被 Consumer Group 中的一个 Consumer 消费。如果只有 3 个 Partition,第 4、5 个 Consumer 会闲置。
# ✅ 正确:消费者数量 ≤ Partition 数量
# 如果 Topic 有 6 个分区,最多启动 6 个消费者
# 增加分区(不能减少!)
docker exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--alter \
--topic app-logs \
--partitions 6
坑2:没有设置 retention,磁盘爆了都不知道
现象:运行一周后,Kafka 磁盘占用 200GB+,服务器报警。
# ✅ 正确:创建 Topic 时设置日志保留策略
docker exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic app-logs \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=604800000 \ # 保留7天(毫秒)
--config retention.bytes=53687091200 \ # 或最大50GB
--config segment.bytes=1073741824 # 单文件1GB就滚动
坑3:consumer group rebalance 风暴
现象:消费者偶尔卡住几秒钟,日志出现"消费延迟峰值"。
原因:消费者频繁心跳超时,触发 Consumer Group Rebalance(重新分配分区),期间所有消费者暂停消费。
// ✅ 正确配置(避免频繁 rebalance)
$conf->set('session.timeout.ms', '30000'); // 心跳超时30秒
$conf->set('max.poll.interval.ms', '300000'); // 两次 poll 最大间隔5分钟
$conf->set('heartbeat.interval.ms', '3000'); // 心跳间隔3秒
$conf->set('max.poll.records', '500'); // 每次最多拉500条
// 原则:poll() 拉完消息后,要在 max.poll.interval.ms 内处理完
// 如果不能,说明需要优化消费逻辑(异步处理、分批写入等)
坑4:consume 回调里做同步 HTTP 请求
现象:消费速度只有 100条/s,远低于 Kafka 的吞吐能力。
原因:在消费者回调中做同步 HTTP 调用(如调三方 API 发告警),单条耗时 50ms+,严重拖慢消费速度。
// ❌ 错误:回调里同步调 HTTP
publicfunctionhandleMessage($msg): void{
$data = json_decode($msg->payload, true);
// 这里调 HTTP 发钉钉告警,耗时 200ms!
$this->dingTalk->sendAlert($data); // ❌ 阻塞消费!
}
// ✅ 正确:异步 + 批量处理
publicfunctionhandleMessage($msg): void{
$data = json_decode($msg->payload, true);
$this->buffer[] = $data; // 先缓冲
if (count($this->buffer) >= 500) {
// 批量写入 ES(bulk API 一次调用写500条)
$this->elasticsearch->bulk($this->buffer);
// 异步发告警(丢到内部队列,不阻塞)
$this->alertQueue->push($this->buffer);
$this->buffer = [];
}
}
坑5:不处理 partition EOF 导致 CPU 空转
现象:消费者 CPU 100%,但拉不到消息。
原因:消费到分区末尾 (RD_KAFKA_RESP_ERR__PARTITION_EOF) 后没有合理等待,陷入空循环。
// ❌ 错误:EOF 后立刻再拉
while (true) {
$msg = $this->consumer->consume(0); // 立刻返回
if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
continue; // 😱 立刻重新 consume,CPU 100%
}
}
// ✅ 正确:consume(1000) 自带1秒超时
while (true) {
$msg = $this->consumer->consume(1000); // 等1秒,无消息自动返回
switch ($msg->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$this->process($msg);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// consume(1000) 的超时已经处理了等待,无需手动 sleep
break;
}
}
八、生产环境推荐架构
┌──────────┐
PHP App ──▶ Producer ─┤ │
PHP App ──▶ Producer ─┤ Kafka ├──▶ Consumer ──▶ Elasticsearch (日志检索)
PHP App ──▶ Producer ─┤ Cluster ├──▶ Consumer ──▶ 钉钉/企微 (错误告警)
PHP App ──▶ Producer ─┤ (3节点) ├──▶ Consumer ──▶ ClickHouse (日志分析)
└────┬─────┘
│
▼
Kafka Exporter
│
▼
Prometheus → Grafana (监控)
关键设计:
- 3 个 Broker
- 3 个 Consumer Group
- Kafka Exporter + Prometheus
九、Laravel 集成(5 分钟接入)
<?php
// app/Logging/KafkaLogger.php
namespaceApp\Logging;
useMonolog\Logger;
useApp\Kafka\LogProducer;
classKafkaLogger
{
publicfunction__invoke(array$config): Logger
{
$logger = newLogger('kafka');
$handler = newKafkaLogHandler(
newLogProducer(env('KAFKA_BROKERS', 'localhost:9092'))
);
$logger->pushHandler($handler);
return$logger;
}
}
// config/logging.php
'channels' => [
'kafka' => [
'driver' => 'custom',
'via' => \App\Logging\KafkaLogger::class,
'level' => env('LOG_LEVEL', 'debug'),
],
],
// 使用方式不变
Log::channel('kafka')->info('用户下单', ['order_id' => 12345]);
十、质量检查清单
发布前过一遍:
- Producer 配置 acks:生产环境用
acks=1(Leader确认)或 acks=-1(全部副本确认),不要用 acks=0 - 消息压缩:启用
compression.codec=lz4,降低网络和磁盘开销 - 消费者数量 ≤ Partition 数量
- 消费者保活:用 Supervisor 守护进程,自动重启挂掉的消费者
- 兜底方案
- 监控告警:Kafka lag 监控(未消费消息堆积数)+ 钉钉告警
十一、总结
| | | |
|---|
| | | 200,000条/s+ |
| | | 极高(持久化) |
| | | 原生支持 |
| | | 支持(按 offset) |
| | | |
| | | |
Kafka 不是银弹。如果日产量在几百万条以内,直接写 Elasticsearch 也够用。但当日志量突破千万级、需要多消费者独立消费、需要消息回溯时——Kafka 就是唯一正解。
作者:10年+ PHP 技术老兵 | 关注我,每周分享一篇有深度的 PHP 实战文章