基于 BIOS SN 的局域网远程摄像头管理系统
在多台电脑协同工作的场景中,如何快速、准确地远程查看指定设备的摄像头画面,一直是设备运维和监控管理中的常见痛点。 本系统采用经典的 C/S(客户端-服务端)架构,以电脑主板 BIOS 序列号作为设备唯一标识,构建了一套完整的局域网远程摄像头管理方案。 服务端部署在每台被控电脑上,通过 Flask 框架提供 MJPEG 视频流 HTTP 接口;客户端基于 PyQt5 打造现代化 GUI, 运维人员只需输入目标设备 SN,即可实时接收并显示远端摄像头画面。配置持久化为 JSON 文件,支持 10 台设备同时在线, 满足中小规模设备集群的日常监控需求。整套系统轻量、开箱即用,无需复杂的中间件或第三方流媒体服务器。
目录
系统架构
┌─────────────────────────────────────────────────────────────────┐│ 局 域 网 ││ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ 被控电脑 A │ │ 被控电脑 B │ │ 被控电脑 C │ ││ │ SN: 5CD514.. │ │ SN: ABC123.. │ │ SN: XYZ987.. │ ││ │ IP: .1.100 │ │ IP: .1.101 │ │ IP: .1.102 │ ││ │ │ │ │ │ │ ││ │ camera_ │ │ camera_ │ │ camera_ │ ││ │ server.py │ │ server.py │ │ server.py │ ││ │ :5000 │ │ :5000 │ │ :5000 │ ││ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ ││ │ │ │ ││ └──────────────────┼──────────────────┘ ││ │ HTTP MJPEG Stream ││ ▼ ││ ┌────────────────────┐ ││ │ 控制端电脑 │ ││ │ camera_client.py │ ││ │ 输入 SN → 看画面 │ ││ └────────────────────┘ │└─────────────────────────────────────────────────────────────────┘
功能特性
| | |
|---|
| | |
| | 基于 Flask 提供 MJPEG 格式实时视频流 |
| | |
| | |
| | 可视化表格管理多台设备(SN + IP + 端口) |
| | |
| | |
| | |
| | |
环境要求
服务端(每台被控电脑):
Python >= 3.8opencv-python >= 4.5Flask >= 2.0Windows 操作系统
客户端(控制电脑):
Python >= 3.8opencv-python >= 4.5PyQt5 >= 5.15requests >= 2.25numpy >= 1.20
快速部署
1. 服务端部署(每台被控电脑执行)
pip install opencv-python flaskpython camera_server.py
启动后输出:
[服务端启动] 本机 SN: 5CD5145Y25[服务端启动] 监听端口: 5000[服务端启动] 视频流地址: http://<本机IP>:5000/stream
2. 客户端启动(控制电脑执行)
pip install opencv-python PyQt5 requests numpypython camera_client.py
3. 使用流程
┌───────────────────────────────────────────────────────────────┐│ Step 1: 在每台被控电脑上启动 camera_server.py ││ ↓ ││ Step 2: 在控制电脑启动 camera_client.py ││ ↓ ││ Step 3: 添加设备 (填入对方 SN + IP + 端口) → 点击「添加/更新」 ││ ↓ ││ Step 4: 输入 SN → 点击「打开摄像头」→ 实时查看远程画面 │└───────────────────────────────────────────────────────────────┘
核心代码解析
Part 1:服务端 — BIOS SN 获取与 Flask 服务注册
服务端启动时首先获取本机硬件唯一标识,然后注册 Flask HTTP 服务对外暴露接口。
import subprocessfrom flask import Flask, jsonifyapp = Flask(__name__)defget_bios_sn():"""获取本机 BIOS 序列号"""try: result = subprocess.run( ['wmic', 'bios', 'get', 'serialnumber'], capture_output=True, text=True, timeout=10 ) lines = [l.strip() for l in result.stdout.strip().split('\n') if l.strip()]if len(lines) >= 2:return lines[1]except Exception:passreturn"UNKNOWN"LOCAL_SN = get_bios_sn()@app.route('/sn')defget_sn():return jsonify({"sn": LOCAL_SN})@app.route('/status')defstatus():return jsonify({"sn": LOCAL_SN,"streaming": streaming,"camera_open": cap isnotNoneand cap.isOpened() })if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, threaded=True)
技术要点:
host='0.0.0.0':监听所有网卡,允许局域网其他设备访问threaded=True:启用多线程模式,支持多个客户端同时请求timeout=10:WMIC 命令超时保护,防止阻塞启动流程- RESTful 风格接口设计,返回 JSON 格式数据,便于扩展和集成
Part 2:服务端 — MJPEG 视频流生成引擎
核心的视频流采集与编码模块,使用 Python 生成器(Generator)实现流式传输。
import cv2import timefrom flask import Responsecap = Nonestreaming = Falsedefopen_camera():global capif cap isNoneornot cap.isOpened(): cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)# 降低分辨率减少网络带宽消耗 cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)# 预热跳帧for _ in range(10): cap.read()defgenerate_frames():"""生成 MJPEG 视频流 — 基于 HTTP multipart 协议"""global cap, streaming open_camera() streaming = Truewhile streaming:if cap and cap.isOpened(): ret, frame = cap.read()if ret:# JPEG 编码,质量 60(平衡画质与带宽) _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 60])yield (b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')else: time.sleep(0.03)else: time.sleep(0.1)@app.route('/stream')defvideo_stream():return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')@app.route('/stop')defstop_stream():global cap, streaming streaming = Falseif cap: cap.release() cap = Nonereturn jsonify({"status": "stopped"})
技术要点:
- MJPEG over HTTP:使用
multipart/x-mixed-replace MIME 类型,浏览器和客户端均可直接解析 - Python Generator(yield):惰性生成帧数据,内存占用恒定,不会因长时间运行而内存溢出
- JPEG 质量参数:
IMWRITE_JPEG_QUALITY=60 在画质和带宽间取得平衡,640×480 单帧约 20~40KB - 优雅退出:
/stop 接口释放摄像头资源,避免设备被持续占用 - 容错机制:读取失败时
time.sleep() 避免 CPU 空转
Part 3:客户端 — 多线程流接收与 JPEG 帧解析
客户端使用独立的 QThread 线程接收网络视频流,解析 JPEG 帧后通过信号传递给 GUI 主线程渲染。
import requestsimport cv2import numpy as npfrom PyQt5.QtCore import QThread, pyqtSignalclassStreamThread(QThread):"""后台线程接收 MJPEG 视频流""" frame_received = pyqtSignal(np.ndarray) error_occurred = pyqtSignal(str)def__init__(self, url): super().__init__() self.url = url self.running = Truedefrun(self):try: response = requests.get(self.url, stream=True, timeout=10) buf = b''for chunk in response.iter_content(chunk_size=4096):ifnot self.running:break buf += chunk# 查找 JPEG 帧边界标记# SOI (Start of Image): 0xFF 0xD8# EOI (End of Image): 0xFF 0xD9 start = buf.find(b'\xff\xd8') end = buf.find(b'\xff\xd9')if start != -1and end != -1and end > start: jpg = buf[start:end + 2] buf = buf[end + 2:]# 解码 JPEG 为 OpenCV 帧 frame = cv2.imdecode( np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR )if frame isnotNone: self.frame_received.emit(frame)except requests.exceptions.ConnectionError: self.error_occurred.emit("连接失败,请确认目标电脑服务端已启动")except requests.exceptions.Timeout: self.error_occurred.emit("连接超时")except Exception as e: self.error_occurred.emit(f"流接收错误: {e}")defstop(self): self.running = False
技术要点:
- QThread 线程隔离:网络 IO 在子线程执行,避免阻塞 GUI 主线程导致界面卡死
- pyqtSignal 跨线程通信:子线程解码完帧后通过信号安全传递给主线程更新界面
- 流式接收:
requests.get(stream=True) + iter_content() 分块读取,内存友好 - JPEG 边界解析:通过 SOI(
0xFFD8)和 EOI(0xFFD9)标记定位完整 JPEG 帧 - 多层异常捕获:区分连接失败、超时、数据异常等不同错误类型,给用户精确提示
Part 4:客户端 — GUI 交互与设备管理
客户端界面提供设备表格管理、SN 输入、视频预览等完整交互功能。
from PyQt5.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QTableWidget, QTableWidgetItem)from PyQt5.QtGui import QImage, QPixmapfrom PyQt5.QtCore import QtclassCameraClient(QWidget):def__init__(self): super().__init__() self.setWindowTitle("远程摄像头客户端") self.resize(900, 700) self.config = load_config() self.stream_thread = None self.init_ui() self.refresh_table()defopen_camera(self): sn = self.open_sn_input.text().strip()if sn notin self.config: self.status_label.setText(f"未找到 SN: {sn} 的配置,请先添加设备")return info = self.config[sn] ip = info["ip"] port = info.get("port", 5000) self.stop_stream() url = f"http://{ip}:{port}/stream" self.stream_thread = StreamThread(url) self.stream_thread.frame_received.connect(self.update_frame) self.stream_thread.error_occurred.connect(self.on_stream_error) self.stream_thread.start()defupdate_frame(self, frame): frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = frame.shape img = QImage(frame.data, w, h, ch * w, QImage.Format_RGB888) scaled = QPixmap.fromImage(img).scaled( self.video_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) self.video_label.setPixmap(scaled)defstop_stream(self):if self.stream_thread: self.stream_thread.stop() self.stream_thread.wait(2000) self.stream_thread = None
技术要点:
- SN → IP 映射:通过配置文件将逻辑标识(SN)转换为网络地址(IP:Port)
- QTableWidget:可视化设备列表,支持选择行删除,直观管理多台设备
- BGR → RGB 转换:OpenCV 默认 BGR,PyQt5 显示需要 RGB 格式
- KeepAspectRatio 缩放:保持视频画面比例不变形
- 优雅断开:
stop() 标记 + wait(2000) 确保线程安全退出
配置文件说明
客户端运行后生成 devices_config.json:
{"5CD5145Y25": {"ip": "192.168.1.100","port": 5000 },"ABC1234567": {"ip": "192.168.1.101","port": 5000 },"XYZ9876543": {"ip": "192.168.1.102","port": 5000 }}
API 接口文档
服务端提供以下 HTTP 接口:
响应示例:
// GET /status{"sn": "5CD5145Y25","streaming": true,"camera_open": true}
知识点总结
1. MJPEG 流媒体协议
| |
|---|
| HTTP multipart/x-mixed-replace |
| |
| |
| |
| |
2. Flask Web 框架核心概念
- 路由装饰器:
@app.route('/path') 声明式 URL 映射 - Response 流式响应:配合 Generator 实现长连接持续输出
- JSON 响应:
jsonify() 自动设置 Content-Type - 多线程模式:
threaded=True 支持并发请求处理
3. PyQt5 多线程编程
- QThread:Qt 封装的线程类,与事件循环和信号机制深度集成
- pyqtSignal:线程安全的跨线程通信机制,避免直接操作 GUI 控件
- 线程生命周期:
start() 启动 → run() 执行 → wait() 等待结束 - 注意事项:GUI 控件只能在主线程更新,子线程通过信号传递数据
4. 网络流数据解析
- 流式读取:
requests.get(stream=True) 不将响应一次性加载到内存 - 分块迭代:
iter_content(chunk_size) 按固定大小分块接收 - 帧边界检测:JPEG 文件以
0xFFD8 开头、0xFFD9 结尾 - 缓冲区管理:累积 buffer 直到检测到完整帧再解码,剩余数据保留
5. OpenCV 视频采集与编码
- DirectShow 后端:
cv2.CAP_DSHOW 在 Windows 上稳定高效 - 分辨率设置:
cap.set(CAP_PROP_FRAME_WIDTH/HEIGHT) 控制采集尺寸 - JPEG 编码:
cv2.imencode('.jpg', frame, [IMWRITE_JPEG_QUALITY, 60]) 压缩输出 - JPEG 解码:
cv2.imdecode(np.frombuffer(...), IMREAD_COLOR) 从字节恢复帧
拓展场景与测试步骤
拓展应用场景
| |
|---|
| 管理员在中控室通过 SN 逐台查看各工位摄像头,无需到现场 |
| |
| 总部 IT 通过 VPN 连接各分支网络,统一监控设备状态 |
| |
| |
| 结合 CI/CD 流水线,自动连接测试设备验证摄像头功能 |
测试步骤
Step 1:环境准备
# 服务端电脑pip install opencv-python flask# 客户端电脑pip install opencv-python PyQt5 requests numpy
Step 2:网络连通性验证
# 在客户端电脑 ping 服务端 IPping 192.168.1.100# 确认端口可达(PowerShell)Test-NetConnection -ComputerName 192.168.1.100 -Port 5000
Step 3:服务端启动验证
# 启动服务端python camera_server.py# 在浏览器访问以下地址确认服务正常# http://192.168.1.100:5000/sn → 应返回 SN JSON# http://192.168.1.100:5000/status → 应返回状态 JSON# http://192.168.1.100:5000/stream → 应显示摄像头画面
Step 4:客户端连接测试
- [ ] 启动客户端
python camera_client.py - [ ] 添加设备:填入服务端 SN + IP + 端口 → 点击「添加/更新」
Step 5:多设备并发测试
- [ ] 逐台切换 SN 连接,确认每台均可正常查看
Step 6:异常场景测试
Step 7:防火墙检查(如连接失败)
# 在服务端电脑开放 5000 端口(管理员权限 PowerShell)New-NetFirewallRule -DisplayName "Camera Server" -Direction Inbound -Port 5000 -Protocol TCP -Action Allow
本系统为局域网方案。如需跨公网访问,建议搭配 Tailscale、ZeroTier 等虚拟组网工具,或使用 frp 进行内网穿透。
"""摄像头服务端 - 部署在每台电脑上启动后自动获取本机 BIOS SN,注册到中心服务器,并等待客户端请求开启摄像头视频流依赖: pip install opencv-python flask"""import subprocessimport cv2import threadingimport timefrom flask import Flask, Response, jsonifyapp = Flask(name)全局变量camera_lock = threading.Lock()cap = Nonestreaming = Falsedef get_bios_sn():"""获取本机 BIOS 序列号"""try:result = subprocess.run(['wmic', 'bios', 'get', 'serialnumber'],capture_output=True, text=True, timeout=10)lines = [l.strip() for l in result.stdout.strip().split('\n') if l.strip()]if len(lines) >= 2:return lines[1]except Exception:passreturn "UNKNOWN"LOCAL_SN = get_bios_sn()def open_camera():global capif cap is None or not cap.isOpened():cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)# 降低分辨率减少带宽cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)# 预热for _ in range(10):cap.read()def generate_frames():"""生成 MJPEG 视频流"""global cap, streamingopen_camera()streaming = Truewhile streaming: if cap and cap.isOpened(): ret, frame = cap.read() if ret: _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 60]) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') else: time.sleep(0.03) else: time.sleep(0.1)@app.route('/sn')def get_sn():return jsonify({"sn": LOCAL_SN})@app.route('/stream')def video_stream():return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')@app.route('/stop')def stop_stream():global cap, streamingstreaming = Falseif cap:cap.release()cap = Nonereturn jsonify({"status": "stopped"})@app.route('/status')def status():return jsonify({"sn": LOCAL_SN, "streaming": streaming, "camera_open": cap is not None and cap.isOpened()})if name == 'main':print(f"[服务端启动] 本机 SN: {LOCAL_SN}")print(f"[服务端启动] 监听端口: 5000")print(f"[服务端启动] 视频流地址: http://<本机IP>:5000/stream")app.run(host='0.0.0.0', port=5000, threaded=True)
"""摄像头客户端 GUI - 在控制端运行输入 SN 即可查看对应电脑的摄像头画面依赖: pip install opencv-python PyQt5 requests"""import sysimport osimport jsonimport requestsimport cv2import numpy as npfrom PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout,QLabel, QLineEdit, QPushButton, QTableWidget, QTableWidgetItem,QMessageBox, QHeaderView, QSpinBox)from PyQt5.QtGui import QImage, QPixmapfrom PyQt5.QtCore import QTimer, Qt, QThread, pyqtSignalCONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(file)), "devices_config.json")def load_config():if os.path.exists(CONFIG_FILE):with open(CONFIG_FILE, "r", encoding="utf-8") as f:return json.load(f)return {}def save_config(config):with open(CONFIG_FILE, "w", encoding="utf-8") as f:json.dump(config, f, indent=2, ensure_ascii=False)class StreamThread(QThread):"""后台线程接收 MJPEG 视频流"""frame_received = pyqtSignal(np.ndarray)error_occurred = pyqtSignal(str)def __init__(self, url): super().__init__() self.url = url self.running = Truedef run(self): try: response = requests.get(self.url, stream=True, timeout=10) buf = b'' for chunk in response.iter_content(chunk_size=4096): if not self.running: break buf += chunk # 查找 JPEG 帧边界 start = buf.find(b'\xff\xd8') end = buf.find(b'\xff\xd9') if start != -1 and end != -1 and end > start: jpg = buf[start:end + 2] buf = buf[end + 2:] frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) if frame is not None: self.frame_received.emit(frame) except requests.exceptions.ConnectionError: self.error_occurred.emit("连接失败,请确认目标电脑服务端已启动") except requests.exceptions.Timeout: self.error_occurred.emit("连接超时") except Exception as e: self.error_occurred.emit(f"流接收错误: {e}")def stop(self): self.running = Falseclass CameraClient(QWidget):def init(self):super().init()self.setWindowTitle("远程摄像头客户端")self.resize(900, 700) self.config = load_config() # {sn: ip} self.stream_thread = None self.init_ui() self.refresh_table()def init_ui(self): layout = QVBoxLayout() # 设备配置区域 layout.addWidget(QLabel("设备配置 (SN → IP 地址):")) config_layout = QHBoxLayout() config_layout.addWidget(QLabel("SN:")) self.sn_input = QLineEdit() self.sn_input.setPlaceholderText("电脑 BIOS SN") config_layout.addWidget(self.sn_input) config_layout.addWidget(QLabel("IP:")) self.ip_input = QLineEdit() self.ip_input.setPlaceholderText("如 192.168.1.100") config_layout.addWidget(self.ip_input) config_layout.addWidget(QLabel("端口:")) self.port_spin = QSpinBox() self.port_spin.setRange(1, 65535) self.port_spin.setValue(5000) config_layout.addWidget(self.port_spin) self.btn_add = QPushButton("添加/更新") self.btn_add.clicked.connect(self.add_device) config_layout.addWidget(self.btn_add) self.btn_delete = QPushButton("删除选中") self.btn_delete.clicked.connect(self.delete_device) config_layout.addWidget(self.btn_delete) layout.addLayout(config_layout) # 设备列表表格 self.table = QTableWidget() self.table.setColumnCount(3) self.table.setHorizontalHeaderLabels(["SN", "IP", "端口"]) self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) self.table.setMaximumHeight(150) self.table.setSelectionBehavior(QTableWidget.SelectRows) layout.addWidget(self.table) # 打开摄像头 open_layout = QHBoxLayout() open_layout.addWidget(QLabel("输入 SN 打开远程摄像头:")) self.open_sn_input = QLineEdit() self.open_sn_input.setPlaceholderText("输入已配置的电脑 SN") open_layout.addWidget(self.open_sn_input) self.btn_open = QPushButton("打开摄像头") self.btn_open.clicked.connect(self.open_camera) open_layout.addWidget(self.btn_open) self.btn_stop = QPushButton("关闭") self.btn_stop.clicked.connect(self.stop_stream) open_layout.addWidget(self.btn_stop) layout.addLayout(open_layout) # 视频画面 self.video_label = QLabel("远程摄像头画面") self.video_label.setMinimumSize(640, 360) self.video_label.setStyleSheet("background-color: #222; color: white;") self.video_label.setAlignment(Qt.AlignCenter) layout.addWidget(self.video_label) # 状态栏 self.status_label = QLabel("就绪") layout.addWidget(self.status_label) self.setLayout(layout)def refresh_table(self): self.table.setRowCount(0) for sn, info in self.config.items(): row = self.table.rowCount() self.table.insertRow(row) self.table.setItem(row, 0, QTableWidgetItem(sn)) if isinstance(info, dict): self.table.setItem(row, 1, QTableWidgetItem(info.get("ip", ""))) self.table.setItem(row, 2, QTableWidgetItem(str(info.get("port", 5000)))) else: # 兼容旧格式 {sn: "ip"} self.table.setItem(row, 1, QTableWidgetItem(str(info))) self.table.setItem(row, 2, QTableWidgetItem("5000"))def add_device(self): sn = self.sn_input.text().strip() ip = self.ip_input.text().strip() port = self.port_spin.value() if not sn or not ip: QMessageBox.warning(self, "提示", "请输入 SN 和 IP 地址") return self.config[sn] = {"ip": ip, "port": port} save_config(self.config) self.refresh_table() self.status_label.setText(f"已保存: {sn} → {ip}:{port}")def delete_device(self): row = self.table.currentRow() if row < 0: return sn = self.table.item(row, 0).text() del self.config[sn] save_config(self.config) self.refresh_table() self.status_label.setText(f"已删除: {sn}")def open_camera(self): sn = self.open_sn_input.text().strip() if not sn: self.status_label.setText("请输入 SN") return if sn not in self.config: self.status_label.setText(f"未找到 SN: {sn} 的配置,请先添加设备") return info = self.config[sn] if isinstance(info, dict): ip = info["ip"] port = info.get("port", 5000) else: ip = str(info) port = 5000 self.stop_stream() url = f"http://{ip}:{port}/stream" self.status_label.setText(f"正在连接 {ip}:{port} ...") self.stream_thread = StreamThread(url) self.stream_thread.frame_received.connect(self.update_frame) self.stream_thread.error_occurred.connect(self.on_stream_error) self.stream_thread.start()def update_frame(self, frame): self.status_label.setText("正在接收视频流...") frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = frame.shape img = QImage(frame.data, w, h, ch * w, QImage.Format_RGB888) scaled = QPixmap.fromImage(img).scaled( self.video_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation ) self.video_label.setPixmap(scaled)def on_stream_error(self, msg): self.status_label.setText(f"错误: {msg}")def stop_stream(self): if self.stream_thread: self.stream_thread.stop() self.stream_thread.wait(2000) self.stream_thread = None self.video_label.clear() self.video_label.setText("远程摄像头画面") self.status_label.setText("已断开")def closeEvent(self, event): self.stop_stream() event.accept()if name == 'main':app = QApplication(sys.argv)window = CameraClient()window.show()sys.exit(app.exec_())