AI时代,基础很重要。
只有基础扎实,才能驾驭AI这个强大的工具。
向大师学习,是夯实基础、精进技术的最好方式之一。
很多人第一次看 ZLMediaKit 的代码,会有一种“看不太懂,但很高级”的感觉。这种感觉并不是因为代码写得晦涩,而是用看似“多绕了一层”的设计实现了“在复杂中,保持秩序”。
我第一次接触ZLMedia是在六、七年前参与开发一款人脸识别门禁产品,那时还没疫情,人脸识别门禁也不算火爆。当时加入这个开发团队我有很多不适应,因为那时我还是一个传统意义上的嵌入式linux应用开发:开发语言主要是C,少量C++;开发IDE是sourceinsight, 观察前端打印大多是通过串口。而这个人脸识别门禁的开发是基于ZLMedia框架,开发团队使用的语言是纯C++, 开发IDE是vs code, 观察日志打印也没有串口而是日志文件。尤其是这个ZLMedia框架代码,对于当时的我来说,确实很晦涩。为了补齐现代C++语言的短板,那一两年看了两部大部头书,一本是原版的《c++ primer》,一本是Marc Gregoire的《prefessional c++》,两本都是上千页,都看了两遍以上。

一边看书,一边看ZLMedia的源码,确实受益匪浅。如今我也被同化成了基本只写C++, 用vs code的AI应用开发。虽然那个项目之后没有再基于ZLM做开发,但平时写代码的很多小习惯都是当年向ZLMdedia学的,比如也喜欢用Ptr,函数入口处常用RAII小组件,函数内的逻辑代码块封装成lambda。很为大陆有这样的大佬存在而高兴。
传统的一个连接占用一个线程的模型(Thread-per-Connection)在应对高频并发连接时会面临诸多严峻挑战:
传统的多线程阻塞模型不适用I/O密集型的流媒体服务,不能很好地应对流媒体服务的工程难题。
在一个流媒体服务器中,会同时存在:
所以真正难的不是“如何监听 socket”,而是:
如果这三点处理不好,epoll 写得再漂亮,也一定会恶化演变成:
所以ZLMediaKit 的多线程模型,本质上是一个复杂度隔离系统。
原则1:让I/O线程专注于I/OI/O线程只做三件事:
原则2:计算与I/O分离
原则3:无锁或最小化锁的设计
ZLMediaKit的多线程模型可以抽象为四个逻辑层次:
┌─────────────────────────────────────────┐│ 事件分发层 (EventDispatcher) │ ← 使用epoll/kqueue/select│ (单线程或少量线程) │├─────────────────────────────────────────┤│ I/O工作层 (IOWorker) │ ← 执行实际非阻塞I/O│ (线程数=CPU核心数) │├─────────────────────────────────────────┤│ 业务处理层 (WorkThread) │ ← 处理协议、音视频帧│ (线程池,可配置大小) │├─────────────────────────────────────────┤│ 定时任务层 (TimerThread) │ ← 处理超时、心跳│ (专用线程) │└─────────────────────────────────────────┘// src/Network/Socket.h 中的关键抽象class EventPoller : public std::enable_shared_from_this<EventPoller> {public: // 单例模式管理所有EventPoller实例static EventPollerPool& Instance(); // 每个EventPoller绑定一个系统线程void runLoop(bool blocked = true);private: // 关键数据结构:事件循环 std::unique_ptr<EventLoop> _loop; // 定时器管理 TimerManager _timer_manager; // 异步任务队列(线程安全) TaskExecutor _async_task_queue; // 核心事件处理循环void handleEvents(int timeout_ms);};每一个 EventPoller 对应一个系统线程,其内部运行着一个 epoll 或 select 循环。
// 伪代码展示其核心循环void EventPoller::runLoop(){ while (!_exited) { int ret = epoll_wait(_epoll_fd, events, max_events, timeout); for (int i = 0; i < ret; ++i) { auto fd = events[i].data.fd; // 触发 fd 绑定的回调函数 handleEvent(fd, events[i].events); } // 处理跨线程投递的任务 executePendedTasks(); }}EventPoller同时承担了三个角色:
EventPoller的设计亮点:
TcpServer 接收到连接请求时,它会调用 EventPollerPool::getPoller()通过哈希算法等策略分配到不同的EventPoller在 ZLMediaKit 中,你会发现大量代码都遵循一个隐含规则:
对象只在所属 EventPoll 线程中被真正操作
这不是靠 mutex 实现的,而是靠:
这背后是一个非常成熟的工程判断:
锁不是免费的,而“线程归属”是一种更高维度的同步方式。
ZLMediaKit没有简单照搬Reactor模式,而是根据流媒体特性进行了深度优化:
// 简化的主Reactor实现逻辑class MainReactor {public:void start(){ // 1. 创建监听socket _acceptor = createAcceptor(port); // 2. 注册到EventPoller _poller->addEvent(_acceptor.fd(), Event_Read, [this](int event) { this->onAccept(); }); // 3. 启动I/O工作线程池 _io_workers.start(); }private:void onAccept(){ // 非阻塞accept while (auto conn = _acceptor.accept()) { // 关键决策点:选择哪个I/O Worker? auto worker = selectIOWorkerByHash(conn.fd()); // 将连接迁移到选中的I/O Worker线程 worker->addConnection(std::move(conn)); } }};ZLMediaKit采用了多Reactor多线程变体:
这种设计的优势在于:
在异步网络中,最怕的是“回调对象已销毁”。ZLM 大量使用了 std::enable_shared_from_this。
每当一个异步操作(如异步写入)被触发时,Lambda 闭包会捕获一个 weak_ptr 或 shared_ptr。这保证了在事件触发时,对象依然有效;或者在对象销毁后,回调能自动跳过逻辑,优雅地解决了多线程下的生命周期管理难题。
ZLM 将具体的协议处理抽象为 Session。
TcpServer 负责监听和产生 Socket 对象。Socket 对象被分配到某个 EventPoller。EventPoller 触发事件后,通过 Socket 调用绑定的 Session 的 onRecv。这种层层递进的抽象,使得 ZLM 能够同时支持 RTSP、RTMP、HTTP、WebSocket 而不显得臃肿。
典型配置中,ZLMediaKit 会存在:
但它们的职责边界非常清楚:
// 线程间任务传递的无锁队列实现(简化版)template<typename T>class LockFreeTaskQueue {public: // 多生产者单消费者场景优化bool push(T&& task, bool front = false){ // 使用内存序relaxed减少同步开销 auto tail = _tail.load(std::memory_order_relaxed); // 无锁CAS操作 while (!_tail.compare_exchange_weak( tail, tail + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) { // 忙等但无系统调用开销 } // 写入任务数据 _buffer[tail % CAPACITY] = std::move(task); return true; }private: // 环形缓冲区 std::array<T, CAPACITY> _buffer; // 原子计数器 std::atomic<uint64_t> _head{0}; std::atomic<uint64_t> _tail{0};};在ZLMediaKit中,数据包在线程间的传递采用了移动语义+智能指针的组合:
// 数据包在线程间传递的典型模式class Packet {public: using Ptr = std::shared_ptr<Packet>; // 关键:确保数据只在一个线程中被处理void processOn(ThreadPool& pool){ auto self = shared_from_this(); // 将任务提交到指定线程池 pool.async([self]() { // 此时数据包的所有权已经转移 self->doProcess(); }); // 当前线程不再访问self }};流媒体服务器频繁分配/释放内存,ZLMediaKit实现了专用的内存池:
class MediaBufferPool {public: // 针对不同大小的音视频帧优化static char* obtain(size_t size){ if (size <= SMALL_BLOCK) { return SmallBlockPool::instance().alloc(); } else if (size <= MEDIUM_BLOCK) { return MediumBlockPool::instance().alloc(); } else { // 大块内存直接使用系统分配 return new char[size]; } } // 内存对齐优化(CPU缓存友好)static constexpr size_t align_size(size_t size){ const size_t alignment = 64; // 缓存行大小 return (size + alignment - 1) & ~(alignment - 1); }};在关键路径上,ZLMediaKit大量使用零拷贝技术:
// 可复用的分层线程架构模板template<typename IOHandler, typename Worker>class LayeredThreadModel {public:void setup(size_t io_threads = std::thread::hardware_concurrency(), size_t worker_threads = 4){ // 1. 创建I/O层 for (size_t i = 0; i < io_threads; ++i) { _io_pool.emplace_back([this] { EventLoop loop; IOHandler handler; loop.run(&handler); }); } // 2. 创建工作层 _worker_pool.resize(worker_threads); } // 连接分发策略void dispatchConnection(Connection conn){ // 一致性哈希确保同一连接的I/O在同一线程 size_t idx = hash(conn.id()) % _io_pool.size(); _io_pool[idx].post([conn]() mutable { conn.handleIO(); }); }};// 通用的事件驱动任务调度器class EventDrivenScheduler {public:template<typename F> void scheduleOnEvent(int fd, EventType type, F&& func){ // 注册事件回调 _poller->addEvent(fd, type, [func = std::forward<F>(func)]() { // 事件触发时在I/O线程执行 func(); // 如果需要CPU计算,转移到工作线程 if (needsComputation()) { WorkerPool::instance().submit(std::move(func)); } }); }};ZLMediaKit的架构体现了优秀的适应性:
通过剖析ZLMediaKit的多线程非阻塞网络模型,我们看到的不仅是一套高效的代码实现,更是一种严谨的工程思维:
ZLMediaKit的代码也告诉我们:大师级的代码不在于用了多么玄学的语法,而在于对“资源控制”和“逻辑解耦”的深刻理解。它通过 EventPoller 锁定了运行环境,通过 Socket 抽象了通信底座,通过 Session 隔离了业务协议。这种结构让复杂的流媒体转发变得像流水线一样清晰。