第一章:为什么需要边缘智能?
1.1 云计算的局限
| 问题 | 说明 |
|---|
- 高延迟| 云端往返 >500ms,无法满足实时控制(如机器人避障)
- 带宽瓶颈| 1000 台摄像头 × 2Mbps = 2Gbps 上行带宽
- 单点故障| 网络中断 → 全系统瘫痪
- 隐私风险| 敏感数据(如人脸)上传云端
1.2 边缘计算优势
- 低延迟:本地处理 <50ms
- 带宽节省:仅上传摘要/告警(压缩率 >90%)
- 高可用:断网仍可执行预设规则
- 合规性:敏感数据不出厂
典型架构:
设备 → 边缘网关 → 云(分层智能)
第二章:边缘网关架构设计
2.1 软件栈选型(轻量化)
| 功能 | 技术 | 说明 |
|---|
- Web 框架| Flask(非异步) | 内存占用低,启动快
- 消息总线| ZeroMQ(inproc + tcp) | 无代理,低开销
- 本地存储| SQLite(WAL 模式) | 单文件,ACID,支持并发读
- AI 推理| ONNX Runtime(CPU 版) | 跨框架模型部署
- 前端| Vue 3 + Vite(静态资源) | 构建后仅 500KB
为何不用 FastAPI?:ASGI 在低端设备上内存开销更高,Flask 足够满足边缘 API 需求。
2.2 进程模型
[主进程: Flask] │ ├── [子进程: MQTT Client] ←→ 云 / 设备 ├── [子进程: CoAP Server] ←→ 老旧传感器 ├── [子进程: Rule Engine] ←→ 本地自动化 └── [子进程: Model Inference] ←→ ONNX 模型 ↑ ZeroMQ IPC 通信隔离性:任一子进程崩溃,主进程可重启它。
第三章:协议转换 —— 接入异构设备
3.1 支持协议
| 协议 | 适用设备 | 特点 |
|---|
- MQTT| 新型 IoT 设备 | 轻量、发布/订阅
- CoAP| 资源受限传感器 | UDP、RESTful
- Modbus RTU| 工业 PLC | 串口、主从架构
3.2 CoAP 服务端(aiocoap)
# protocols/coap_server.py import asyncio from aiocoap import Context, Message, resource class SensorResource(resource.Resource): def __init__(self): super().__init__() self.value = b"0" async def render_get(self, request): return Message(payload=self.value) async def render_put(self, request): self.value = request.payload # 触发 ZeroMQ 消息 zmq_pub.send(b"coap_update", request.payload) return Message(code=CHANGED) async def start_coap_server(): root = resource.Site() root.add_resource(['sensor', 'temp'], SensorResource()) await Context.create_server_context(root, bind=('::', 5683)) await asyncio.get_event_loop().create_future() # run forever3.3 Modbus 串口监听
# protocols/modbus_reader.py from pymodbus.client import ModbusSerialClient def read_plc_registers(): client = ModbusSerialClient(method='rtu', port='/dev/ttyUSB0', baudrate=9600) if client.connect(): result = client.read_holding_registers(address=0, count=10, slave=1) client.close() return result.registers return None统一数据模型:所有协议数据转为 JSON 格式:
{ "device_id": "plc_01", "timestamp": 1705650000, "metrics": { "vibration": 0.85, "temperature": 42.3 } }第四章:本地 AI 推理 —— ONNX Runtime
4.1 模型准备
- 训练模型(PyTorch/TensorFlow) → 导出为 ONNX
- 优化:使用
onnx-simplifier减小体积
pip install onnxruntime pip install onnx-simplifier python -m onnxsim model.onnx model_optimized.onnx4.2 边缘推理服务
# services/inference.py import onnxruntime as ort import numpy as np class EdgeInference: def __init__(self, model_path: str): self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider']) self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name def predict(self, input_data: np.ndarray) -> np.ndarray: return self.session.run([self.output_name], {self.input_name: input_data})[0] # 全局实例(避免重复加载) vibration_model = EdgeInference('/models/vibration_anomaly.onnx')4.3 工厂设备预测性维护
# rules/predictive_maintenance.py def check_vibration(data: dict): if 'vibration' in data['metrics']: # 预处理:滑动窗口标准化 window = get_last_10_vibrations(data['device_id']) features = np.array(window).reshape(1, -1).astype(np.float32) # 推理 anomaly_score = vibration_model.predict(features)[0][0] if anomaly_score > 0.9: trigger_local_alert(f"设备 {data['device_id']} 振动异常!") # 仅上传告警,不上传原始数据 cloud_uploader.enqueue({ "alert": "vibration_anomaly", "device": data['device_id'], "score": float(anomaly_score) })效果:
- 原始数据(10KB/s/设备) → 告警(<1KB/小时)
- 告警延迟 <100ms
第五章:断网自治 —— 本地规则引擎
5.1 规则定义(YAML)
# rules/home_automation.yaml - name: "夜间自动关灯" condition: time: "22:00-06:00" sensor.light: ">300" action: command: "turn_off" target: "light_living_room" - name: "温度过高开空调" condition: sensor.temperature: ">28" action: command: "set_mode" target: "ac_bedroom" params: {"mode": "cool", "temp": 26}5.2 规则引擎执行
# services/rule_engine.py import yaml from datetime import datetime class LocalRuleEngine: def __init__(self, rules_file: str): with open(rules_file) as f: self.rules = yaml.safe_load(f) def evaluate(self, sensor_data: dict): current_time = datetime.now().strftime("%H:%M") for rule in self.rules: if self._check_condition(rule['condition'], sensor_data, current_time): self._execute_action(rule['action']) def _check_condition(self, cond: dict, data: dict, time_str: str) -> bool: # 时间条件 if 'time' in cond: start, end = cond['time'].split('-') if not (start <= time_str <= end): return False # 传感器条件 for key, expr in cond.items(): if key.startswith('sensor.'): metric = key.split('.')[1] if metric not in data['metrics']: return False value = data['metrics'][metric] # 简单表达式解析(如 ">300") op, threshold = expr[0], float(expr[1:]) if op == '>' and not (value > threshold): return False if op == '<' and not (value < threshold): return False return True def _execute_action(self, action: dict): # 通过 ZeroMQ 发送控制命令 zmq_control.send_json(action)断网时:规则引擎继续运行,控制本地设备。
第六章:数据同步 —— 断网续传
6.1 本地队列设计
# services/cloud_uploader.py import sqlite3 import threading class EdgeQueue: def __init__(self, db_path="/data/edge_queue.db"): self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.execute("CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY, payload TEXT)") self.lock = threading.Lock() def enqueue(self, payload: dict): with self.lock: self.conn.execute("INSERT INTO queue (payload) VALUES (?)", (json.dumps(payload),)) self.conn.commit() def dequeue_batch(self, limit=100): with self.lock: cur = self.conn.cursor() cur.execute("SELECT id, payload FROM queue LIMIT ?", (limit,)) items = cur.fetchall() if items: ids = [item[0] for item in items] self.conn.execute(f"DELETE FROM queue WHERE id IN ({','.join('?'*len(ids))})", ids) self.conn.commit() return [json.loads(item[1]) for item in items]6.2 云同步服务
# services/sync_to_cloud.py import requests def sync_loop(): while True: if is_network_available(): batch = edge_queue.dequeue_batch() if batch: try: requests.post(CLOUD_ENDPOINT, json=batch, timeout=10) except Exception as e: # 重入队列 for item in batch: edge_queue.enqueue(item) time.sleep(5) # 每 5 秒尝试同步可靠性:SQLite WAL 模式确保断电不丢数据。
第七章:前端边缘管理(Vue)
7.1 边缘节点状态面板
<template> <div class="edge-node"> <h3>{{ node.name }}</h3> <div class="status-grid"> <MetricCard label="CPU" :value="node.cpu_usage + '%'" /> <MetricCard label="内存" :value="node.mem_usage + 'MB'" /> <MetricCard label="网络" :value="node.net_status" /> <MetricCard label="存储" :value="node.disk_free + 'GB'" /> </div> <button @click="deployFunction">部署边缘函数</button> </div> </template> <script setup> const props = defineProps({ node: Object // { name, cpu_usage, mem_usage, ... } }) const deployFunction = async () => { const file = await openFilePicker() // 用户选择 .py 或 .onnx await fetch(`/api/edge/${props.node.id}/deploy`, { method: 'POST', body: file }) alert('部署成功!') } </script>7.2 远程函数热加载
# routes/edge_management.py @app.post('/api/edge/<node_id>/deploy') def deploy_edge_function(): file = request.files['file'] if file.filename.endswith('.py'): # 保存并 reload 规则模块 file.save(f"/rules/custom_{secure_filename(file.filename)}") importlib.reload(custom_rules) return jsonify({"status": "success"}) elif file.filename.endswith('.onnx'): # 替换模型 file.save("/models/custom.onnx") global custom_model custom_model = EdgeInference("/models/custom.onnx") return jsonify({"status": "success"})运维价值:无需物理接触设备,远程更新 AI 模型或业务逻辑。
第八章:性能与资源优化
8.1 内存控制
- Flask 多进程禁用:
app.run(threaded=True, processes=1) - ZeroMQ 高水位:限制内存队列长度
- SQLite PRAGMA:
PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL; -- 平衡安全与性能8.2 启动加速
- 懒加载模型:首次推理时才加载 ONNX
- 预编译字节码:
python -m compileall /app
实测(树莓派 4B):
- 内存占用:85MB
- 启动时间:2.3s
- CoAP 响应延迟:<10ms
第九章:安全设计
9.1 边缘安全
- TLS 双向认证:边缘 ↔ 云
- 固件签名:防止恶意代码部署
- 最小权限:Flask 运行于非 root 用户
9.2 数据隐私
- 本地脱敏:人脸/车牌在边缘模糊后再上传
- 加密存储:SQLite 使用 SQLCipher 加密
第十章:场景总结
10.1 工厂预测性维护
- 输入:设备振动、温度传感器
- 边缘动作:实时异常检测 → 本地停机 + 云告警
- 收益:减少非计划停机 30%
10.2 智能家居
- 输入:温湿度、光照、人体红外
- 边缘动作:断网时仍可执行“人来开灯、人走关灯”
- 收益:用户体验不依赖互联网
10.3 精准农业
- 输入:土壤湿度、气象站数据
- 边缘动作:融合多源数据 → 自动灌溉决策
- 收益:节水 25%,减少云流量 95%
总结:智能下沉,价值上升
边缘不是云的延伸,而是智能的新前线。