字数 4556,阅读大约需 23 分钟
inotify 机制详解
什么是 inotify?
inotify(inode notify)是 Linux 内核 2.6.13 版本引入的文件系统事件监控机制。它允许用户空间程序监控文件系统事件,如文件创建、删除、修改等。
inotify 的核心特点
inotify 的局限性
- 1. 不递归:监控目录时不会自动监控子目录,需要手动添加
- 2. 无历史记录:只报告变化发生后的状态,不记录变化过程
- 3. 可能丢失事件:如果事件产生速度超过读取速度,会丢失事件
inotify API 介绍
1. inotify_init - 初始化 inotify 实例
#include <sys/inotify.h>
// 创建 inotify 实例,返回文件描述符
int inotify_init(void);
int inotify_init1(int flags); // 带标志位的版本
返回值:
2. inotify_add_watch - 添加监控
int inotify_add_watch(int fd, const char *pathname, uint32_t mask);
参数说明:
常用事件掩码:
| |
|---|
IN_CREATE | |
IN_DELETE | |
IN_MODIFY | |
IN_MOVED_FROM | |
IN_MOVED_TO | |
IN_CLOSE_WRITE | |
IN_ATTRIB | |
IN_ISDIR | |
3. inotify_rm_watch - 移除监控
int inotify_rm_watch(int fd, int wd);
4. 读取事件
#include <unistd.h>
// 使用标准的 read 系统调用读取事件
ssize_t read(int fd, void *buf, size_t count);
inotify_event 结构体:
struct inotify_event {
int wd; /* 监控描述符 */
uint32_t mask; /* 事件掩码 */
uint32_t cookie; /* 关联相关事件(如 rename) */
uint32_t len; /* name 字段长度 */
char name[]; /* 文件名(可选) */
};
inotify 工作流程
epoll 机制详解
什么是 epoll?
epoll 是 Linux 内核提供的一种I/O 多路复用机制,用于高效监控多个文件描述符上的事件。相比传统的 select 和 poll,epoll 在处理大量文件描述符时性能更好。
epoll 的核心优势
epoll 的核心数据结构
// epoll 事件结构
struct epoll_event {
uint32_t events; /* 监控的事件类型 */
epoll_data_t data; /* 用户数据 */
};
// 用户数据联合体
typedefunion epoll_data {
void *ptr; /* 指针 */
int fd; /* 文件描述符 */
uint32_t u32;
uint64_t u64;
} epoll_data_t;
// 常用事件类型
#define EPOLLIN 0x001 /* 可读 */
#define EPOLLOUT 0x004 /* 可写 */
#define EPOLLERR 0x008 /* 错误 */
#define EPOLLHUP 0x010 /* 挂起 */
#define EPOLLET 0x80000000 /* 边缘触发模式 */
epoll API 介绍
1. epoll_create - 创建 epoll 实例
#include <sys/epoll.h>
// 创建 epoll 实例
int epoll_create(int size); // 旧版本,size 被忽略
int epoll_create1(int flags); // 新版本,推荐
返回值:
2. epoll_ctl - 控制 epoll 实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数说明:
- •
EPOLL_CTL_ADD: 添加 FD 到 epoll - •
EPOLL_CTL_MOD: 修改已存在的 FD - •
EPOLL_CTL_DEL: 从 epoll 删除 FD
3. epoll_wait - 等待事件
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
参数说明:
- •
timeout: 超时时间(毫秒),-1 表示无限等待
返回值:
epoll 的触发模式
水平触发 (Level Triggered, LT):
- • 只要 FD 处于就绪状态,每次
epoll_wait 都会返回
边缘触发 (Edge Triggered, ET):
- • 必须一次性读完所有数据(通常配合非阻塞 I/O)
epoll 工作流程
inotify + epoll 组合架构
为什么需要组合使用?
inotify 本身是阻塞式的:read(inotify_fd) 会阻塞直到有事件发生。如果需要监控多个目录,传统的做法是为每个 inotify 实例创建单独线程,这会造成资源浪费。
epoll 提供了一种优雅的解决方案:
组合架构的优势
- 2. 统一事件处理:所有文件变化通过 epoll 统一分发
组合工作流程
Rust 实战:构建文件同步监控器
项目架构
我们将构建一个基于 inotify + epoll 的文件同步监控器,当监控目录中的文件发生变化时,自动同步到远程服务器。
核心代码实现
这里举了一个简陋的例子,旨在说清楚原理,并没有考虑错误处理、线程同步这些内容,大家看看,了解原理即可。
1. 文件监控模块
// monitor.rs - 文件监控模块
use std::cell::RefCell;
use std::collections::HashMap;
use std::io;
use std::os::fd::{AsRawFd, RawFd};
use std::path::Path;
use std::rc::Rc;
use epoll_rs::{Epoll, EpollEvent, Opts, Token};
use inotify::{Inotify, WatchMask};
/// 文件监控器结构
/// 封装 epoll + inotify 的组合监控逻辑
pub struct FileMonitor<'a> {
/// 存储文件描述符到监控信息的映射
monitor_dict: RefCell<HashMap<RawFd, (Token<'a, RawFd>, &'a String, Rc<Inotify>)>>,
/// epoll 轮询器实例
poller: Epoll,
}
impl<'a> FileMonitor<'a> {
/// 创建新的文件监控器
pub fn new() -> Self {
FileMonitor {
monitor_dict: RefCell::new(HashMap::new()),
poller: Epoll::new().expect("Failed to initialize epoll"),
}
}
/// 添加监控目录
///
/// # 参数
/// * `dir` - 要监控的目录路径
///
/// # 流程
/// 1. 创建 inotify 实例
/// 2. 使用 inotify_add_watch 添加监控
/// 3. 将 inotify fd 添加到 epoll 轮询
/// 4. 保存监控信息到字典
pub fn add_watch(&'a self, dir: &'a String) {
// 初始化 inotify 实例
let inotify = Rc::new(
Inotify::init().expect("Failed to initialize inotify")
);
// 添加监控,订阅多种事件类型
inotify
.watches()
.add(
Path::new(&(*dir)),
WatchMask::CREATE // 文件创建
| WatchMask::CLOSE_WRITE // 写入关闭
| WatchMask::MOVED_FROM // 移出
| WatchMask::MOVED_TO // 移入
| WatchMask::MOVE_SELF, // 自身移动
)
.expect("Failed to add inotify watch");
// 获取 inotify 的文件描述符
let fd = inotify.as_raw_fd();
// 将 fd 添加到 epoll,使用边缘触发模式
let token = self
.poller
.add(fd, Opts::IN | Opts::ET)
.expect("Failed to add fd to epoll");
// 保存监控信息:fd -> (token, 目录路径, inotify实例)
self.monitor_dict
.borrow_mut()
.insert(token.fd(), (token, dir, inotify));
println!("Watching directory {} for activity...", dir);
}
/// 检查指定的文件描述符是否在监控列表中
pub fn contains_key(&self, key: &RawFd) -> bool {
self.monitor_dict.borrow().contains_key(key)
}
/// 获取指定 fd 对应的监控信息
pub fn get(&self, key: &RawFd) -> Option<(&'a String, Rc<Inotify>)> {
let map = self.monitor_dict.borrow();
map.get(key).map(|t| (t.1, t.2.clone()))
}
/// epoll wait 包装函数
/// 阻塞等待文件系统事件
pub fn wait<'b>(
&'a self,
buf: &'b mut [EpollEvent],
) -> io::Result<&'b mut [EpollEvent]> {
self.poller.wait(buf)
}
}
2. 同步处理模块
// sync_processor.rs - 同步处理模块
use std::ffi::OsStr;
use std::path::Path;
use std::process::{Command, Output};
/// 同步消息结构
/// 封装文件变化事件的相关信息
#[derive(Debug)]
pub struct SyncMessage {
/// 命令类型:"rsync" 或 "mv"
cmd: String,
/// 是否为目录
is_dir: bool,
/// 文件完整路径
file_name: String,
}
impl SyncMessage {
/// 创建新的同步消息
///
/// # 防循环机制
/// 为避免多服务器间同步形成循环,通过文件名后缀标记来源:
/// - 以 "_sync" 结尾:从其他服务器同步过来,只需本地改名
/// - 不含标记:本地生成的文件,需要同步到远程
pub fn new(path: &String, file_name: Option<&OsStr>) -> Self {
let mut file = String::new();
let mut cmd = String::new();
if let Some(name) = file_name {
let name_str = name.to_string_lossy().to_string();
file = format!("{}/{}", *path, name_str);
if name_str.ends_with("_sync") {
// 其他服务器同步过来的文件
cmd = "mv".to_string();
} else if !name_str.starts_with(".") && !name_str.contains("_sync") {
// 本地生成的文件
cmd = "rsync".to_string();
}
}
SyncMessage {
cmd,
is_dir: false,
file_name: file,
}
}
/// 执行 rsync 同步
/// 将文件同步到远程服务器
pub fn run_rsync(&self, remote_host: &String) {
println!("[sync] Starting rsync: {:?}", self);
let path = Path::new(&self.file_name);
let file_name = path
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
// 远程文件名添加 _sync 后缀,触发目标服务器的 mv 操作
let remote_path = format!("{}{}_sync", remote_host, file_name);
println!("[sync] {} -> {}", file_name, remote_path);
// 构建 rsync 命令
let mut command = Command::new("rsync");
if !self.is_dir {
command.arg("-avuzP").arg(&self.file_name).arg(&remote_path);
} else {
command.arg("-r").arg(&self.file_name).arg(&remote_path);
}
// 执行并记录结果
match command.output() {
Ok(output) => {
if output.status.success() {
println!("[sync] Success: {}", file_name);
} else {
eprintln!("[sync] Failed: {}", String::from_utf8_lossy(&output.stderr));
}
}
Err(err) => {
eprintln!("[sync] Error: {:?}", err);
}
}
}
/// 执行 mv 操作
/// 将 _sync 后缀的文件重命名为正常文件名
pub fn run_mv(&self) {
let new_name = self.file_name.replace("_sync", "");
println!("[mv] {} -> {}", self.file_name, new_name);
let mut command = Command::new("mv");
command.arg(&self.file_name).arg(&new_name);
if let Err(err) = command.output() {
eprintln!("[mv] Error: {:?}", err);
}
}
/// 获取命令类型
pub fn get_cmd(&self) -> &String {
&self.cmd
}
}
3. 主程序
// main.rs - 主程序
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use inotify::EventMask;
use epoll_rs::EpollEvent;
mod monitor;
mod sync_processor;
use monitor::FileMonitor;
use sync_processor::SyncMessage;
/// 信号标志,用于优雅退出
static mut SHUTDOWN: bool = false;
/// 监控工作线程
/// 使用 epoll + inotify 监控目录变化
fn monitor_worker(dirs: &[String], sender: Sender<SyncMessage>) {
// 创建文件监控器
let file_monitor = FileMonitor::new();
// 添加所有需要监控的目录
for dir in dirs {
file_monitor.add_watch(dir);
}
// 事件缓冲区(用于读取 inotify 事件)
let mut event_buffer = [0u8; 4096];
// epoll 事件缓冲区
let mut epoll_buffer = [EpollEvent::zeroed(); 20];
loop {
// 等待事件(阻塞)
match file_monitor.wait(&mut epoll_buffer) {
Ok(events) => {
for event in events {
let fd = event.fd();
// 检查是否是已注册的 inotify 实例
if let Some((dir_path, inotify)) = file_monitor.get(&fd) {
// 读取 inotify 事件
match inotify.read_events(&mut event_buffer) {
Ok(inotify_events) => {
for in_ev in inotify_events {
handle_inotify_event(in_ev, dir_path, &sender);
}
}
Err(err) => {
eprintln!("Failed to read inotify events: {:?}", err);
}
}
}
}
}
Err(err) => {
eprintln!("epoll wait error: {:?}", err);
break;
}
}
}
}
/// 处理单个 inotify 事件
fn handle_inotify_event(
event: inotify::Event<&OsStr>,
dir_path: &String,
sender: &Sender<SyncMessage>,
) {
let mask = event.mask;
// 根据事件类型处理
if mask.contains(EventMask::CREATE) {
if mask.contains(EventMask::ISDIR) {
println!("[event] Directory created: {:?}", event.name);
// TODO: 可选 - 自动添加新目录到监控列表
} else {
println!("[event] File created: {:?}", event.name);
}
} else if mask.contains(EventMask::DELETE) {
println!("[event] Deleted: {:?}", event.name);
} else if mask.contains(EventMask::MODIFY) {
println!("[event] Modified: {:?}", event.name);
} else if mask.contains(EventMask::CLOSE_WRITE) {
if !mask.contains(EventMask::ISDIR) {
// 文件写入完成,触发同步
println!("[event] File closed after write: {:?}", event.name);
let msg = SyncMessage::new(dir_path, event.name);
if !msg.get_cmd().is_empty() {
sender.send(msg).expect("Failed to send sync message");
}
}
} else if mask.contains(EventMask::MOVED_TO) {
println!("[event] Moved to: {:?}", event.name);
// 处理重命名/移动的文件
if let Some(name) = event.name {
let name_str = name.to_string_lossy();
if name_str.ends_with("_sync") {
let msg = SyncMessage::new(dir_path, event.name);
msg.run_mv();
}
}
}
}
/// 同步工作线程
/// 接收消息队列中的文件变化事件并执行同步
fn sync_worker(receiver: Receiver<SyncMessage>, remote_servers: Vec<String>) {
while let Ok(msg) = receiver.recv() {
match msg.get_cmd().as_str() {
"rsync" => {
// 同步到所有远程服务器
for host in &remote_servers {
msg.run_rsync(host);
}
}
"mv" => {
msg.run_mv();
}
_ => {}
}
}
}
/// 递归收集目录下的所有子目录
fn collect_directories(root: &str, dirs: &mut Vec<String>) {
let path = Path::new(root);
if let Ok(entries) = path.read_dir() {
for entry in entries {
if let Ok(entry) = entry {
if let Ok(file_type) = entry.file_type() {
if file_type.is_dir() {
let subdir = entry.path().to_string_lossy().to_string();
collect_directories(&subdir, dirs);
dirs.push(subdir);
}
}
}
}
}
}
fn main() {
// 配置:监控目录列表
let watch_dirs = vec![
"/data/sync".to_string(),
];
// 配置:远程服务器列表
let remote_servers = vec![
"server1:/data/sync/".to_string(),
"server2:/data/sync/".to_string(),
];
// 收集所有子目录
let mut all_dirs: Vec<String> = Vec::new();
for dir in &watch_dirs {
all_dirs.push(dir.clone());
collect_directories(dir, &mut all_dirs);
}
println!("Monitoring {} directories", all_dirs.len());
for dir in &all_dirs {
println!(" - {}", dir);
}
// 创建消息通道
let (sender, receiver) = channel();
// 启动监控线程
let monitor_handle: JoinHandle<()> = thread::spawn(move || {
monitor_worker(&all_dirs, sender);
});
// 启动同步线程
let sync_handle: JoinHandle<()> = thread::spawn(move || {
sync_worker(receiver, remote_servers);
});
// 注册信号处理(优雅退出)
setup_signal_handlers();
// 主循环
loop {
unsafe {
if SHUTDOWN {
println!("Shutting down...");
break;
}
}
thread::sleep(Duration::from_secs(1));
}
// 等待线程结束
monitor_handle.join().unwrap();
sync_handle.join().unwrap();
}
/// 设置信号处理
fn setup_signal_handlers() {
unsafe {
libc::signal(libc::SIGTERM, handle_sigterm as usize);
libc::signal(libc::SIGINT, handle_sigint as usize);
}
}
extern "C" fn handle_sigterm(_: i32) {
unsafe {
SHUTDOWN = true;
}
}
extern "C" fn handle_sigint(_: i32) {
unsafe {
SHUTDOWN = true;
}
}
工作流程图解
常见问题与优化建议
Q1: inotify 事件丢失怎么办?
原因:当事件产生速度超过处理速度时,inotify 事件队列会溢出。
解决方案:
// 使用更大的事件缓冲区
let mut event_buffer = [0u8; 8192]; // 原来是 4096
// 设置非阻塞模式
fcntl(fd, F_SETFL, O_NONBLOCK);
// epoll 使用边缘触发
epoll_ctl(epfd, EPOLL_CTL_ADD, fd,
&(EpollEvent { events: EPOLLIN | EPOLLET, data: ... }));
Q2: 如何递归监控目录树?
方案:动态添加子目录监控
fn watch_recursive(monitor: &FileMonitor, root: &str) {
// 首先监控根目录
monitor.add_watch(root);
// 递归遍历子目录
for entry in WalkDir::new(root) {
if let Ok(e) = entry {
if e.file_type().is_dir() {
monitor.add_watch(&e.path().to_string_lossy());
}
}
}
}
// 处理 CREATE 事件时动态添加新目录
if mask.contains(EventMask::CREATE) && mask.contains(EventMask::ISDIR) {
let new_dir = format!("{}/{}", base_path, name);
file_monitor.add_watch(&new_dir);
}
Q3: epoll_wait 被信号中断怎么处理?
loop {
match file_monitor.wait(&mut epoll_buffer) {
Ok(events) => { /* 处理事件 */ }
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {
// 被信号中断,继续循环
continue;
}
Err(e) => {
eprintln!("epoll error: {:?}", e);
break;
}
}
}
Q4: 性能优化建议
Q5: 调试技巧
// 启用 inotify 调试日志
std::env::set_var("RUST_LOG", "debug");
// 打印所有事件
println!("[debug] Event mask: {:?}", mask);
println!("[debug] Event name: {:?}", event.name);
println!("[debug] Event cookie: {:?}", event.cookie);
// 统计事件频率
use std::sync::atomic::{AtomicUsize, Ordering};
static EVENT_COUNT: AtomicUsize = AtomicUsize::new(0);
fn log_event() {
let count = EVENT_COUNT.fetch_add(1, Ordering::SeqCst);
println!("Total events: {}", count);
}
总结
inotify 负责文件变化通知,epoll 负责多路复用调度,两者结合就能用单线程高效监控多个目录。这套模式常见于日志收集、文件同步、自动构建等场景。
参考资源
- • Linux inotify man page[1]
- • Linux epoll man page[2]
- • The Linux Programming Interface[5]
引用链接
[1] Linux inotify man page: https://man7.org/linux/man-pages/man7/inotify.7.html
[2] Linux epoll man page: https://man7.org/linux/man-pages/man7/epoll.7.html
[3] inotify-rs crate: https://docs.rs/inotify/latest/inotify/
[4] epoll-rs crate: https://docs.rs/epoll-rs/latest/epoll_rs/
[5] The Linux Programming Interface: https://man7.org/tlpi/