1. 为什么需要它?
想象你正在调查一个可疑的域名,它可能关联着多个 IP、子域名、社交媒体账号,甚至加密钱包。传统做法是手动在不同工具间切换:Dig 查 DNS、Whois 查注册人、Shodan 扫 IP、Twitter 搜用户名……不仅效率低,更关键的是——线索一旦分散,就很难看出全局关系。更糟的是,很多在线 OSINT 工具会把你的查询记录上传到云端,等于把你的调查思路暴露给第三方。
Flowsint 的破局思路很直接:把所有数据留在本地,用图结构把碎片信息“织”成一张可探索的关系网。你点一个节点,系统自动调用合适的 enricher(丰富器)去拉取关联数据,并实时更新图谱。它不只聚合数据,更构建上下文。
核心技术栈围绕“图 + 异步 + 类型安全”展开:FastAPI 提供 REST 接口,Neo4j 存储实体关系,Celery 处理耗时 enrich 任务,而 Pydantic 和 TypeScript 共享的类型定义确保前后端对数据结构的理解一致。
2. 核心架构:像做手术一样拆解
Flowsint 的骨架是一个典型的前后端分离架构,但它的“神经中枢”是 enricher 编排层。你可以把它想象成一个智能情报分拣中心:前台(React)展示一张动态演化的调查图谱;当你选中一个节点(比如 example.com),后台(FastAPI)收到请求后,不是直接处理,而是把任务派给 Celery;Celery 从 enricher 注册表中找出所有能处理“域名”的 enricher(如 WHOIS 查询、子域名爆破、证书透明日志扫描等),并行执行;每个 enricher 返回新发现的实体(IP、邮箱、子域等),最终由 core 模块统一写入 Neo4j,前端再拉取更新后的图结构。
关键类比:整个系统就像一个本地化的情报流水线工厂。原料(初始实体)进入工厂,经过不同工位(enrichers)的加工,产出新的零件(关联实体),最后由总装线(图数据库)把这些零件组装成完整的设备(调查图谱)。所有过程都在你的厂房(本地机器)内完成,不对外泄露半张图纸。
当一个请求进来时,它的旅程是这样的:
- 前端通过 TanStack Router 发起
/api/enrich 请求,附带目标节点类型和值; - FastAPI 路由到 enricher 调度器,验证用户权限并生成唯一
scan_id; - 调度器查询 enricher 注册表,筛选出匹配输入类型的 enricher 列表;
- 每个 enricher 被 Celery 作为独立任务异步调用,传入
sketch_id(调查会话)和 scan_id; - enricher 执行外部 API 调用或本地计算,返回结构化结果;
- core 模块将结果转换为 Neo4j 节点/关系,批量写入图数据库;
- 前端通过 WebSocket 或轮询感知图变更,用 xyflow 重绘可视化。
3. 源码级复盘
3.1 EnricherRegistry:动态插件系统的注册中心
- 设计哲学:为了让开发者能“即插即用”地添加新数据源,系统必须避免硬编码的 if-else 分支。注册表模式将 enricher 的发现和实例化完全解耦,新 enricher 只需用装饰器注册,系统自动纳入调度。
- 代码逻辑拆解
# 步骤 1: 初始化空注册表# 使用字典存储 name -> class 的映射,O(1) 查找def__init__(self):self._enrichers: Dict[str, Type[Enricher]] = {}# 步骤 2: 装饰器注册机制# 返回原类,支持 @registry.register 语法糖defregister(self, enricher_class: Type[E]) -> Type[E]:self._enrichers[enricher_class.name()] = enricher_classreturn enricher_class# 步骤 3: 按需实例化# 传入 sketch_id/scan_id 等上下文,实现多租户隔离defget_enricher( self, name: str, sketch_id: str, scan_id: str, **kwargs) -> Enricher:if name notinself._enrichers:raise Exception(f"Enricher '{name}' not found")returnself._enrichers[name](sketch_id=sketch_id, scan_id=scan_id, **kwargs)# 步骤 4: 安全的元数据暴露# list 方法过滤敏感 enricher,返回前端可消费的 JSONdeflist( self, exclude: Optional[List[str]] = None, wobbly_type: Optional[bool] = False) -> List[Dict[str, Any]]:# ... 过滤和排序逻辑
- 架构视点
- 决策分析:为什么不直接用 importlib 动态加载?因为注册表提供了显式控制点——可以方便地禁用高风险 enricher(如涉及付费 API 的),也能在运行时检查可用性。相比反射,它更安全、可测试。
- 潜在风险:如果 enricher 名称冲突(两个模块注册同名 enricher),后注册的会覆盖前者。生产环境中应通过命名规范(如
domain_whois vs domain_subfinder)规避。
3.2 EnricherBase:类型驱动的抽象契约
- 设计哲学:所有 enricher 必须遵循“输入-输出”范式,但具体实现千差万别。基类通过强制定义
InputType 和 OutputType,让系统能在不运行 enricher 的前提下推断其能力(例如,知道某个 enricher 能把 Domain 转成 Ip),从而实现智能路由。 - 代码逻辑拆解
# 步骤 1: 声明类型契约# 开发者只需指定基础类型,无需处理列表classMyEnricher(Enricher): InputType = Domain # flowsint_types 中的 Pydantic 模型 OutputType = Ip# 步骤 2: 自动参数验证# preprocess 自动将原始输入数据验证为 InputType 列表defpreprocess(self, data: List) -> List[InputType]:# 内部调用 pydantic.parse_obj_as(List[cls.InputType], data)returnsuper().preprocess(data)# 步骤 3: 异步核心逻辑# scan 方法接收已验证的输入,返回结构化输出asyncdefscan(self, data: List[InputType]) -> List[OutputType]:# 子类实现具体业务逻辑pass# 步骤 4: 统一错误处理# 基类捕获异常,记录 scan_id 便于追踪asyncdefrun(self, raw_data: List) -> List[Dict]:try: validated = self.preprocess(raw_data) results = awaitself.scan(validated)return [r.dict() for r in results]except Exception as e: logger.error(f"Scan {self.scan_id} failed: {e}")raise
- 架构视点
- 决策分析:为什么用 Pydantic 而不用 dataclass 或普通 dict?因为 Pydantic 提供运行时验证和JSON Schema 生成,这对前端动态渲染 enricher 表单(比如 WHOIS enricher 需要填域名)至关重要。同时,它与 FastAPI 的自动文档无缝集成。
- 潜在风险:如果 enricher 的
scan 方法阻塞(如未用 async/await 包装网络请求),会拖慢整个 Celery worker。因此基类强制要求 scan 为 async,但无法阻止开发者内部写同步阻塞代码——这需要 Code Review 保障。
4. 生产环境避坑指南
- 适用场景
- 需要处理海量(>100 万节点)图谱(Neo4j 社区版有性能瓶颈)
- 要求 enricher 结果毫秒级返回(OSINT 本身依赖外部 API,延迟不可控)
- 团队没有 Python/React 技术栈维护能力
- 配置建议:
# Celery 并发数:根据 enricher I/O 密集特性调高CELERY_WORKER_CONCURRENCY:20# 默认 4,建议按 CPU 核数 * 2# Neo4j 内存配置(docker-compose.yml)NEO4J_dbms_memory_heap_initial__size:2GNEO4J_dbms_memory_heap_max__size:4G# FastAPI 日志级别:生产环境关闭 DEBUGLOG_LEVEL:"INFO"# 关键安全参数:禁用高风险 enricherENRICHER_EXCLUDE: ["telegram_scraper", "dark_web_monitor"]
5. 总结
这个项目本质上就是一个运行在你笔记本上的 CIA 情报分析站的代码实现——所有数据不出本地,所有关系一目了然。
项目地址: https://github.com/reconurge/flowsint