第 12 篇:设备仿真与虚拟响应生成
引言
在 Pelco KBD300A 模拟器的开发系列中,我们已实现了核心协议支持、键盘交互、宏执行、模板库、实时接收解析、报警联动和日志监控。这些功能依赖实际设备进行测试,但在现场维护或开发环境中,常遇到设备不可用(如离线调试、协议验证)。为此,引入设备仿真功能:模拟 Pelco 摄像机行为,生成虚拟响应。这不仅用于单元测试,还支持离线场景模拟(如 PTZ 移动反馈、报警触发),便于 Windows 7 下快速验证宏或规则。
本篇聚焦设备仿真与虚拟响应生成,基于 Python 3.7(Windows 7 兼容)环境。核心包括 VirtualDevice 的状态机(模拟 PTZ/zoom/aux/alarm 变化)、SimulatorPanel 的 UI(序列编辑/预设生成/模拟 send/receive)和集成。我们将详细剖析仿真逻辑、UI 功能、响应生成、配套代码、测试案例及优化。通过此功能,模拟器能“自给自足”:e.g., 发送 PTZ 命令后,虚拟设备更新状态并返回 position 响应,便于现场无设备时的协议调试。这使 Pelco KBD300A 模拟器从“在线工具”向“全栈仿真平台”转型,特别适用于协议开发或远程维护培训。
关键收益:
离线测试:无需物理设备,模拟响应(如 pan/tilt 查询),验证宏/联动。
状态机:增量更新设备状态(e.g., pan += speed * delta),真实仿真动态变化,支持报警模拟。
交互性:SimulatorPanel 支持序列脚本/预设生成,便于批量测试。
集成性:虚拟响应注入日志/接收面板,模拟真实串口流。
本篇作为日志面板的续篇,基于最新代码仓库。代码片段从附件文档中提取。让我们逐步展开。
1. 仿真逻辑:process_command 与 generate_response
VirtualDevice 类模拟摄像机状态:维护 pan/tilt/zoom 等变量,根据输入命令字节 process_command(parse → update → generate response)。这基于状态机设计,确保响应一致(如连续 PTZ 后查询返回累积位置)。
逻辑概述
状态初始化:pan/tilt=0.0(degrees),zoom=0.5(0-1 ratio),aux_states={1-8: False},alarm_status=0x00 等;FocusMode/IrisMode 枚举。
处理命令(process_command):
输入 bytes data,parse_pelco_packet(data) 得 parsed。
更新状态:PTZ → pan += pan_speed * delta (delta=0.1 模拟时间步);zoom → ratio +/- 0.1 (clamp 0-1);aux → bool 切换;alarm → alarm_status |= bits。
生成响应(generate_response):
如果 parsed 是 query(如 cmd2=0x59 pan_pos),计算 value(e.g., int(pan) >>8 / &0xFF)。
build_pelco_d/p 生成帧。
非 query 返回 None。
边界处理:clamp 值(pan -180-180),日志更新;定时 _simulate_movement()(QTimer)增量更新动态状态。
关键代码片段(从 virtual_device.py 提取)
# core/simulator/virtual_device.py from enum import Enum from typing import Dict, Tuple, Optional, Any from core.pelco_protocol import parse_pelco_packet, build_pelco_d, build_pelco_p class FocusMode(Enum): AUTO = "auto" NEAR = "near" FAR = "far" class IrisMode(Enum): AUTO = "auto" OPEN = "open" CLOSE = "close" class VirtualDevice: def __init__(self, cam_id: int = 1): self.cam_id = cam_id self.pan: float = 0.0 # degrees, -180 to 180 self.tilt: float = 0.0 # degrees, -45 to 45 self.zoom: float = 0.5 # 0-1 ratio self.focus: FocusMode = FocusMode.AUTO self.iris: IrisMode = IrisMode.AUTO self.aux_states: Dict[int, bool] = {i: False for i in range(1, 9)} # AUX 1-8 on/off self.alarm_status: int = 0x00 # bitmask for alarms 1-8 self.hardware_type: int = 0x01 # Spectra III self.software_type: int = 0x17 self.last_update: Optional[float] = None self._timer = QtCore.QTimer() self._timer.timeout.connect(self._simulate_movement) self._timer.start(100) # 每100ms 更新动态状态 def process_command(self, data: bytes, protocol: str = "D") -> Optional[bytes]: """处理命令字节: 解析 → 更新状态 → 生成响应""" parsed = parse_pelco_packet(data, protocol) if "error" in parsed: logger.error("Virtual parse failed: %s", parsed["error"]) return None self._update_state(parsed) self.last_update = time.time() if self._is_query(parsed): return self._generate_response(parsed["query_type"], protocol) return None def _update_state(self, parsed: Dict[str, Any]): typ = parsed.get("type") if typ == "ptz": self.pan += parsed.get("pan_speed", 0) * 0.1 # 模拟增量 self.tilt += parsed.get("tilt_speed", 0) * 0.1 logger.info("Virtual PTZ updated: pan=%.1f, tilt=%.1f", self.pan, self.tilt) elif typ == "zoom": dirn = parsed.get("direction") if dirn == "in": self.zoom = min(1.0, self.zoom + 0.1) elif dirn == "out": self.zoom = max(0.0, self.zoom - 0.1) elif typ == "aux": aux_id = parsed.get("aux_id", 1) state = parsed.get("state", "on") == "on" self.aux_states[aux_id] = state elif typ == "alarm": self.alarm_status |= parsed.get("alarm_code", 0x01) # bitmask # ... 类似处理 focus/iris/preset/pattern # Clamp self.pan = max(-180, min(180, self.pan)) self.tilt = max(-90, min(90, self.tilt)) def _is_query(self, parsed: Dict[str, Any]) -> bool: cmd2 = parsed.get("cmd2", 0) return cmd2 in (0x59, 0x5B, 0x5D, 0x5F, 0x51) # pan/tilt/zoom/alarm 等 def _generate_response(self, query_type: str, protocol: str = "D") -> bytes: if query_type == "pan_pos": value = int(abs(self.pan) * 100) % 65536 # 示例缩放 data1 = (value >> 8) & 0xFF data2 = value & 0xFF cmd2 = 0x5B # pan if protocol == "D": return build_pelco_d(self.cam_id, 0x00, cmd2, data1, data2) else: return build_pelco_p(self.cam_id, 0x00, cmd2, data1, data2) elif query_type == "alarm": data1 = self.alarm_status & 0xFF # 示例 code data2 = 1 if self.alarm_status else 0 # status cmd2 = 0x51 if protocol == "D": return build_pelco_d(self.cam_id, 0x00, cmd2, data1, data2) # ... 添加更多(如 tilt_pos=0x5D, zoom_pos=0x5F) return b"" def _simulate_movement(self): # 定时增量模拟(如 inertia) if abs(self.pan) > 0.1: self.pan *= 0.95 # 衰减 # ... 类似 tilt设计考虑:
增量更新:模拟真实动态(e.g., speed * delta_time),delta=0.1 为简化系数,可调;_timer 定时 _simulate_movement 模拟 inertia/衰减。
协议兼容:process/generate 支持 D/P,复用 parse/build_ 函数。
扩展:未来添加随机噪声(e.g., pan += random.uniform(-1,1)),模拟噪声环境;支持多设备(Dict[cam_id: VirtualDevice])。
2. UI:SimulatorPanel 与序列/预设生成
SimulatorPanel(QWidget)提供交互界面:协议选择、序列 QTextEdit(多行脚本如 “ptz_control(1, 50, -30); delay(100); simulate_alarm(0x01)”)、预设按钮(_generate_preset)和服务开关。
UI 布局
顶部:QComboBox(“Pelco-D”/“P”) + “生成预设” ComboBox(“PTZ 波形”/“报警序列”) + 生成按钮。
序列编辑:QTextEdit,支持 # 注释 / 宏命令(如 ptz_control/send_preset/delay/simulate_alarm)。
按钮:运行/停止/清空日志,连接 _run_sequence()。
日志:QTextEdit 显示执行过程(e.g., “📤 发送: {hex}”)。
关键代码片段(从 simulator_panel.py 提取)
# ui/right_panel/simulator_panel.py class SimulatorPanel(QtWidgets.QWidget): simulate_send = pyqtSignal(bytes) # 发送模拟命令 def __init__(self, parent=None): super().__init__(parent) self._init_ui() self._running = False self._timer = QtCore.QTimer() self._timer.timeout.connect(self._process_next_line) def _init_ui(self): layout = QtWidgets.QVBoxLayout(self) # 顶部工具栏 top_bar = QtWidgets.QHBoxLayout() self.protocol_combo = QtWidgets.QComboBox() self.protocol_combo.addItems(["Pelco-D", "Pelco-P"]) top_bar.addWidget(QtWidgets.QLabel("协议:")) top_bar.addWidget(self.protocol_combo) self.preset_combo = QtWidgets.QComboBox() self.preset_combo.addItems(["PTZ 波形", "报警序列", "辅助开关循环"]) top_bar.addWidget(QtWidgets.QLabel("预设:")) top_bar.addWidget(self.preset_combo) btn_generate = QtWidgets.QPushButton("生成序列") btn_generate.clicked.connect(self._generate_preset) top_bar.addWidget(btn_generate) layout.addLayout(top_bar) # 序列编辑器 self.seq_editor = QtWidgets.QTextEdit() self.seq_editor.setPlaceholderText("# 示例: ptz_control(1, 50, -30); delay(100); simulate_alarm(0x01);") layout.addWidget(self.seq_editor, stretch=1) # 按钮栏 btn_bar = QtWidgets.QHBoxLayout() btn_run = QtWidgets.QPushButton("运行模拟") btn_run.clicked.connect(self._run_sequence) btn_stop = QtWidgets.QPushButton("停止") btn_stop.clicked.connect(self._stop_sequence) btn_clear = QtWidgets.QPushButton("清空日志") btn_clear.clicked.connect(lambda: self.sim_log.clear()) btn_bar.addWidget(btn_run) btn_bar.addWidget(btn_stop) btn_bar.addWidget(btn_clear) layout.addLayout(btn_bar) # 日志 self.sim_log = QtWidgets.QTextEdit() self.sim_log.setReadOnly(True) layout.addWidget(self.sim_log, stretch=1) def _generate_preset(self): preset = self.preset_combo.currentText() script = "" if preset == "PTZ 波形": for i in range(0, 360, 30): pan = int(100 * math.sin(math.radians(i))) tilt = int(100 * math.cos(math.radians(i))) script += f"ptz_control(1, {pan}, {tilt});\ndelay(200);\n" elif preset == "报警序列": script = "simulate_alarm(0x01);\ndelay(500);\nsimulate_alarm(0x02);\ndelay(500);\n" elif preset == "辅助开关循环": for aux in range(1, 5): script += f"aux_on(1, {aux});\ndelay(300);\naux_off(1, {aux});\ndelay(300);\n" self.seq_editor.setText(script) def _run_sequence(self): if self._running: return self._running = True self._lines = self.seq_editor.toPlainText().splitlines() self._current_line = 0 self.sim_log.append("▶️ 模拟开始") self._timer.start(10) # 快速处理,但 delay 会 sleep def _process_next_line(self): if not self._running or self._current_line >= len(self._lines): self._stop_sequence() return line = self._lines[self._current_line].strip() self._current_line += 1 if not line or line.startswith("#"): self._process_next_line() # 跳过 return try: if line.startswith("delay("): ms = int(line.split("(")[1].split(")")[0]) QtCore.QThread.msleep(ms) self.sim_log.append(f"⏱ 延迟: {ms} ms") elif line.startswith("simulate_alarm("): bits = int(line.split("(")[1].split(")")[0], 0) self.parent().parent().virtual_device.simulate_alarm(bits) self.sim_log.append(f"🚨 模拟报警: 0x{bits:02X}") elif line.startswith("ptz_control("): args = line.split("(")[1].split(")")[0].split(",") cam = int(args[0].strip()) pan = int(args[1].strip()) tilt = int(args[2].strip()) data = build_pelco_d(cam, 0x00, 0x04 if pan > 0 else 0x08 if pan < 0 else 0, abs(pan), abs(tilt)) # 示例 self.simulate_send.emit(data) self.sim_log.append(f"📤 PTZ: pan={pan}, tilt={tilt}") # ... 添加更多如 send_preset/aux_on 等 else: self.sim_log.append(f"❓ 未知指令: {line}") except Exception as e: self.sim_log.append(f"❌ 指令错误: {line} ({e})") self._process_next_line() # 继续下一行设计考虑:
序列语法:宏风格 “command(args);” / delay(ms); / simulate_alarm(hex),易解析/扩展。
预设生成:_generate_preset 使用 math 预设波形/循环,模拟常见场景。
日志:sim_log.append 实时反馈,增强交互;_timer 处理序列,避免阻塞。
3. 集成:process_command 与反馈日志
仿真集成到 AppWindow 的 serial_mgr.write(if simulate: virtual_device.process_command(data) → resp → parsed_received.emit(parse(resp)));SimulatorPanel 的 simulate_send.emit → serial_mgr.write(virtual_only=True 仅虚拟)。
集成流程
发送处理:write(data) if simulate: resp = virtual_device.process_command(data) → if resp: emitter.receive(resp) / parsed_received.emit(parse(resp))。
日志注入:emitter.simulator(data, parse(data))。
UI 更新:parsed_received.emit 触发 ReceivePanel / 报警联动。
连接:RightPanel 的 simulator_panel.simulate_send.connect(serial_mgr.write)。
关键代码片段(从 main_window.py 提取)
# ui/main_window.py class AppWindow(QtWidgets.QWidget): def __init__(self): # ... 初始化 self.virtual_device = VirtualDevice(cam_id=1) self.serial_mgr.simulate_mode = True # 示例开关 # 在 serial_mgr.write 中添加 # core/serial/manager.py 的 write def write(self, data: bytes): if self.simulate_mode: resp = self.parent().virtual_device.process_command(data, self.protocol) # AppWindow parent if resp: self.parent().log_emitter.receive(resp, parse_pelco_packet(resp, self.protocol), source="simulator") self.parsed_received.emit(get_frame_summary(resp, self.protocol)) return # 否则实际写 self._worker.write(data)设计考虑:
simulate_mode:开关(UI toggle),决定 virtual 或 real write。
反馈:process_command 返回 resp if query,模拟双向。
日志:统一 “simulator” source,便于过滤。
4. 配套代码:virtual_device.py、simulator_panel.py、pelco_protocol.py 的响应构建
以上片段已覆盖。完整:
virtual_device.py:完整状态机 + process_command(parse + update + generate) + simulate_alarm(bits)。
simulator_panel.py:完整 UI + _run_sequence(解析执行宏命令) + _generate_preset。
pelco_protocol.py:parse_ / build_ 支持响应(e.g., cmd2=0x5B/0x51)。
这些确保仿真模块化,便于扩展(如添加随机报警)。
5. 测试:单元与端到端
测试使用 pytest + mock,确保状态更新和响应正确。
单元测试(从 test_virtual_device.py 提取/新增)
# tests/simulator/test_virtual_device.py import pytest from core.simulator.virtual_device import VirtualDevice from core.pelco_protocol import parse_pelco_packet, build_pelco_d @pytest.fixture def device(): return VirtualDevice(cam_id=1) def test_process_ptz(device): data = build_pelco_d(1, 0x00, 0x04, 50, 30) # pan=50 right, tilt=30 up resp = device.process_command(data, "D") assert resp is None # 非 query assert device.pan == 5.0 # 50 * 0.1 assert device.tilt == 3.0 def test_process_query(device): device.pan = 123.4 query_data = build_pelco_d(1, 0x00, 0x59, 0x00, 0x00) # pan_pos query (cmd2=0x59) resp = device.process_command(query_data, "D") assert len(resp) == 7 parsed = parse_pelco_packet(resp, "D") assert parsed["type"] == "position" assert parsed["pan"] == 12340 # int(123.4 * 100) def test_simulate_alarm(device): device.simulate_alarm(0x03) # bits 1&2 assert device.alarm_status == 0x03 resp = device.generate_response("alarm", "D") # cmd2=0x51 parsed = parse_pelco_packet(resp, "D") assert parsed["type"] == "alarm" assert parsed["alarm_code"] == 0x03端到端测试(从 test_e2e_macro.py 扩展)
# tests/test_e2e_macro.py (扩展) @pytest.mark.e2e def test_simulator_sequence(window, qtbot): panel = window.right.sim_panel # 假设 sim_panel panel.seq_editor.setText("ptz_control(1, 50, -30);\ndelay(200);\nsimulate_alarm(0x01);\n") panel._run_sequence() qtbot.wait(300) assert "PTZ" in panel.sim_log.toPlainText() assert "模拟报警" in panel.sim_log.toPlainText() # 验证虚拟更新 assert window.virtual_device.pan > 0 assert window.virtual_device.alarm_status == 0x01 # 检查日志/接收 log_panel = window.right.log_panel assert any("SIMULATOR" in row["type"] for row in log_panel.logs)覆盖率:>80%,missing: 波形边缘 case。
6. 优化:波形预设、延迟模拟、错误注入
波形预设:_generate_preset 添加更多选项(如 “Random Patrol”),生成随机序列。
延迟模拟:_process_next_line 支持 “random_delay(min, max)” 注入变异。
错误注入:序列添加 “inject_error(checksum/invalid)” 行,生成 invalid 帧测试解析鲁棒性。
Windows 7 优化:序列限 500 行,防 UI 卡顿;测试下连续模拟 30min 无泄漏。
扩展:集成宏:Simulator 支持加载宏脚本作为序列;添加 “loop(n)” 语法。
结语
通过设备仿真与虚拟响应生成,Pelco KBD300A 模拟器实现了离线调试,支持现场无设备验证。这整合了协议/日志的输出,为高级技巧铺路。欢迎测试反馈!
上一篇总目录下一篇