大学 Stack MQTT协议应用实验 CH0ico 2026-06-09 2026-06-09 MQTT 协议与边缘设备实验报告
实验日期:2026-06-09 实验环境:macOS + OrbStack Ubuntu 22.04 ARM64 实验目的:搭建完整 IoT 原型系统,验证 MQTT 协议的发布/订阅模式在边缘计算场景中的应用
一、实验概述 1.1 实验目标 在 Ubuntu 虚拟环境中搭建一个端到端的物联网(IoT)边缘计算原型系统,通过模拟传感器设备、边缘处理节点、云端聚合服务和 Web 监控界面,深入理解 MQTT 的发布/订阅(Pub/Sub)模式在边缘计算中的实际应用。
1.2 系统架构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 graph LR subgraph 设备层 T["🌡️ 温度传感器 ×2"] H["💧 湿度传感器 ×2"] L["💡 光照传感器"] P["🔵 气压传感器"] M["🚶 运动传感器"] D["🌫️ 烟尘传感器"] end subgraph 边缘层 PRE["数据预处理节点"] ABN["异常检测节点"] end subgraph 云端层 CLD["云端聚合服务"] end subgraph 展示层 WEB["Web 监控界面"] end T & H & L & P & M & D -->|sensors/+/+| MQTT{MQTT Broker<br/>Mosquitto :1883} MQTT -->|sensors/+/+| PRE PRE -->|processed/+/+| MQTT MQTT -->|processed/+/+| ABN ABN -->|alerts/+/+| MQTT MQTT -->|processed/+/+<br/>alerts/+/+| CLD CLD -->|cloud/aggregated/+| MQTT MQTT -->|all topics| WEB
1.3 组件清单
组件
数量
技术栈
说明
MQTT Broker
1
Mosquitto 2.0.11
MQTT 消息中间件
设备模拟器
8 个 (6 类)
Python 3 + paho-mqtt
模拟传感器数据上报
数据预处理
1
Python 3
数据标准化 + 分类
异常检测
1
Python 3 + numpy
阈值/统计/趋势三重检测
云端聚合
1
Python 3 + numpy
时间窗口聚合统计
Web 界面
1
Node.js + Socket.IO
实时监控仪表盘
1.4 MQTT 主题设计
主题模式
方向
说明
sensors/{type}/{id}
设备 → Broker
原始传感器数据
processed/{type}/{id}
预处理 → Broker
标准化 + 分类数据
alerts/{type}/{id}
异常检测 → Broker
告警消息
cloud/aggregated/{type}
聚合 → Broker
统计聚合结果
二、实验环境 2.1 宿主机环境
项目
配置
操作系统
macOS (Apple Silicon)
虚拟化平台
OrbStack
终端
iTerm2 / zsh
2.2 虚拟机环境
项目
配置
操作系统
Ubuntu 22.04.5 LTS (ARM64)
内核版本
7.0.5-orbstack
磁盘空间
722 GB(可用 721 GB)
Python
3.10.12
Node.js
v18.20.8
npm
10.8.2
Mosquitto
2.0.11
paho-mqtt
2.1.0
numpy
2.2.6
2.3 OrbStack 环境说明 本实验使用 OrbStack 替代传统 VMware,无需手动配置 CPU/内存/网络。以下命令均在 Ubuntu VM 内直接执行。Web 界面通过 OrbStack 端口转发在 macOS 浏览器访问 http://localhost:3000。
三、实验步骤 第一部分:环境准备 1.1 系统更新与基础包安装 1 2 sudo apt update && sudo apt upgrade -ysudo apt install -y curl wget git python3 python3-pip python3-venv
1.2 安装 Node.js 18
Ubuntu 22.04 默认源的 Node.js 版本过旧(v12.x),后续 Web 服务需要 v18+。
1 2 3 4 5 6 7 curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash - sudo apt install -y nodejsnode -v npm -v
1.3 安装 Mosquitto MQTT Broker 1 2 3 4 5 6 7 8 9 sudo apt install -y mosquitto mosquitto-clientssudo systemctl start mosquittosudo systemctl enable mosquittosudo systemctl status mosquitto
第二部分:MQTT Broker 安装与测试 2.1 配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 sudo tee /etc/mosquitto/conf.d/default.conf << EOF # 允许匿名连接(实验用) allow_anonymous true # 监听端口 listener 1883 0.0.0.0 # 日志配置 log_type error log_type warning log_type notice log_type information EOF sudo systemctl restart mosquitto
2.2 连通性验证 1 2 3 4 5 6 7 mosquitto_sub -h localhost -t test /topic mosquitto_pub -h localhost -t test /topic -m "Hello MQTT - 测试成功!"
第三部分:项目结构 1 mkdir -p ~/mqtt-iot-experiment/{iot-devices,edge-nodes,cloud-service,web-interface,logs,config,data}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ~/mqtt-iot-experiment/ ├── iot-devices/ # IoT 设备模拟器 │ ├── temperature_sensor.py │ ├── humidity_sensor.py │ ├── light_sensor.py │ ├── pressure_sensor.py │ ├── motion_sensor.py │ ├── dust_sensor.py │ └── device_manager.py ├── edge-nodes/ # 边缘处理节点 │ ├── data_preprocessor.py │ └── anomaly_detector.py ├── cloud-service/ # 云端聚合服务 │ └── cloud_aggregator.py ├── web-interface/ # Web 监控界面 │ ├── index.js │ ├── package.json │ └── public/ │ ├── index.html │ ├── style.css │ └── app.js ├── logs/ # 运行日志 ├── config/ # 配置文件 ├── data/ # 数据存储 ├── start_system.sh # 一键启动 └── stop_system.sh # 一键停止
第四部分:IoT 设备模拟器开发 4.1 安装 Python 依赖 1 2 pip3 install --user paho-mqtt numpy
4.2 传感器模拟器设计 每个传感器遵循统一的设计模式:
__init__() — 初始化 MQTT 客户端、主题、设备 ID
connect() — 连接 Broker
generate_data() — 生成模拟数据(基础值 + 随机波动 + 5% 异常概率)
run() — 循环发送数据
stop() — 断开连接
6 类传感器参数:
传感器
基础值
波动范围
5%异常范围
发送间隔
单位
温度
20°C
±10°C
+15~25°C
2s
celsius
湿度
50%
±20%
90~100%
3s
percent
光照
500 lux
±300 lux
1200~2000 lux
2.5s
lux
气压
1013.25 hPa
±10 hPa
950~980 hPa
3.5s
hPa
运动
0~10 次
-
20~50 次
2s
count
烟尘
35 µg/m³
±20
150~500
3s
µg/m³
4.3 温度传感器(示例) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import paho.mqtt.client as mqttimport jsonimport randomimport timeimport datetimeclass TemperatureSensor : def __init__ (self, device_id, broker_host="localhost" , broker_port=1883 ): self .device_id = device_id self .broker_host = broker_host self .broker_port = broker_port self .client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) self .topic = f"sensors/temperature/{device_id} " self .running = False def connect (self ): try : self .client.connect(self .broker_host, self .broker_port, 60 ) print (f"[温度传感器 {self.device_id} ] 已连接到 MQTT Broker" ) return True except Exception as e: print (f"[温度传感器 {self.device_id} ] 连接失败: {e} " ) return False def generate_temperature_data (self ): base_temp = 20.0 variation = random.uniform(-10 , 10 ) temperature = round (base_temp + variation, 2 ) if random.random() < 0.05 : temperature += random.uniform(15 , 25 ) return { "device_id" : self .device_id, "sensor_type" : "temperature" , "value" : temperature, "unit" : "celsius" , "timestamp" : datetime.datetime.now().isoformat(), "location" : {"lat" : 39.9042 , "lon" : 116.4074 } } def run (self ): self .running = True while self .running: try : data = self .generate_temperature_data() self .client.publish(self .topic, json.dumps(data)) time.sleep(2 ) except Exception as e: print (f"[温度传感器 {self.device_id} ] 发送失败: {e} " ) time.sleep(5 ) def stop (self ): self .running = False self .client.disconnect()
其余 5 类传感器(湿度、光照、气压、运动、烟尘)结构相同,仅在数据生成逻辑上有差异。完整代码见项目目录 ~/mqtt-iot-experiment/iot-devices/
4.4 设备管理器 device_manager.py 负责统一启动和管理所有传感器实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import threadingimport timeimport signalimport sysfrom temperature_sensor import TemperatureSensorfrom humidity_sensor import HumiditySensorfrom light_sensor import LightSensorfrom pressure_sensor import PressureSensorfrom motion_sensor import MotionSensorfrom dust_sensor import DustSensordevices = [] threads = [] def create_sensors (): sensors = [ TemperatureSensor("temp_001" ), TemperatureSensor("temp_002" ), HumiditySensor("humid_001" ), HumiditySensor("humid_002" ), LightSensor("light_001" ), PressureSensor("press_001" ), MotionSensor("motion_001" ), DustSensor("dust_001" ), ] for sensor in sensors: if sensor.connect(): devices.append(sensor) print (f"共启动 {len (devices)} 个传感器设备" ) def start_sensors (): for device in devices: t = threading.Thread(target=device.run, daemon=True ) t.start() threads.append(t) print ("所有传感器已开始发送数据" ) def stop_sensors (signum=None , frame=None ): print ("\n正在停止所有传感器..." ) for device in devices: device.stop() sys.exit(0 ) if __name__ == "__main__" : signal.signal(signal.SIGINT, stop_sensors) signal.signal(signal.SIGTERM, stop_sensors) create_sensors() start_sensors() try : while True : time.sleep(1 ) except KeyboardInterrupt: stop_sensors()
第五部分:边缘处理节点开发 5.1 数据预处理节点 data_preprocessor.py 订阅 sensors/+/+,对每个传感器的原始数据执行:
归一化(Normalize):将不同量纲数据映射到 [0, 1] 区间
状态分类(Classify):基于阈值分为 5 个等级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def normalize (self, value, sensor_type ): """各传感器数值归一化到 [0,1]""" ranges = { "temperature" : (0 , 50 ), "humidity" : (0 , 100 ), "light" : (0 , 2000 ), "pressure" : (950 , 1050 ), "motion" : (0 , 50 ), "dust" : (0 , 500 ), } lo, hi = ranges.get(sensor_type, (0 , 100 )) return (value - lo) / (hi - lo) if hi != lo else 0.0 def classify (self, value, sensor_type ): """基于阈值五级分类""" thresholds = { "temperature" : (-10 , 10 , 40 , 80 ), "humidity" : (0 , 30 , 70 , 100 ), "light" : (0 , 300 , 1000 , 2000 ), "pressure" : (900 , 980 , 1030 , 1100 ), "motion" : (0 , 10 , 30 , 50 ), "dust" : (0 , 75 , 150 , 500 ), } t = thresholds.get(sensor_type) if t is None : return "normal" if value < t[0 ]: return "critical_low" elif value < t[1 ]: return "low" elif value < t[2 ]: return "normal" elif value < t[3 ]: return "high" else : return "critical_high"
处理后的数据发布到 processed/{type}/{id},增加了 normalized、processed_timestamp 和 status 字段。
5.2 异常检测节点 anomaly_detector.py 订阅 processed/+/+,实现三重异常检测:
检测类型
方法
触发条件
阈值违规
直接判断
status 为 high / critical
统计异常
滑动窗口 Z-score
`
趋势异常
线性回归斜率
`
告警消息发布到 alerts/{type}/{id},包含告警类型、严重级别、设备信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class AnomalyDetector : def __init__ (self ): self .history = defaultdict(list ) self .window_size = 20 def on_message (self, client, userdata, msg ): data = json.loads(msg.payload.decode()) key = f"{data['sensor_type' ]} /{data['device_id' ]} " self .history[key].append(data['normalized' ]) if len (self .history[key]) > self .window_size: self .history[key].pop(0 ) alerts = [] if data['status' ] in ("critical_high" , "critical_low" ): alerts.append({"type" : "threshold_violation" , "severity" : "critical" , ...}) elif data['status' ] in ("high" , "low" ): alerts.append({"type" : "threshold_violation" , "severity" : "warning" , ...}) arr = np.array(self .history[key]) mean, std = np.mean(arr), np.std(arr) if std > 0 and abs (data['normalized' ] - mean) > 2 * std: alerts.append({"type" : "statistical_outlier" , "severity" : "warning" , ...}) recent = np.array(self .history[key][-5 :]) trend = np.polyfit(range (len (recent)), recent, 1 )[0 ] if abs (trend) > 0.1 : alerts.append({"type" : "trend_anomaly" , "severity" : "warning" , ...})
第六部分:云端聚合服务开发 cloud_aggregator.py 订阅 processed/+/+ 和 alerts/+/+,每 10 秒进行一次时间窗口聚合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class CloudAggregator : def __init__ (self ): self .data_buffer = defaultdict(list ) self .aggregation_window = 10 def aggregate_and_publish (self ): for sensor_type, records in self .data_buffer.items(): values = [r["value" ] for r in records] aggregated = { "sensor_type" : sensor_type, "count" : int (len (records)), "device_count" : int (len (set (r["device_id" ] for r in records))), "avg" : float (round (np.mean(values), 2 )), "max" : float (round (np.max (values), 2 )), "min" : float (round (np.min (values), 2 )), "std" : float (round (np.std(values), 2 )), "avg_normalized" : float (round (np.mean([r["normalized" ] for r in records]), 4 )), "timestamp" : datetime.datetime.now().isoformat(), "window_seconds" : int (self .aggregation_window) } self .client.publish(f"cloud/aggregated/{sensor_type} " , json.dumps(aggregated)) self .data_buffer.clear()
numpy 的 int64/float64 类型不能直接 json.dumps(),需要用 int()/float() 显式转换。
第七部分:Web 监控界面开发 7.1 技术栈
层级
技术
后端
Node.js + Express + Socket.IO
MQTT 客户端
mqtt.js
前端
原生 HTML/CSS/JavaScript
实时通信
WebSocket (Socket.IO)
7.2 服务端 index.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 const express = require ("express" );const http = require ("http" );const { Server } = require ("socket.io" );const mqtt = require ("mqtt" );const path = require ("path" );const app = express ();const server = http.createServer (app);const io = new Server (server);const mqttClient = mqtt.connect ("mqtt://127.0.0.1:1883" );app.use (express.static (path.join (__dirname, "public" ))); mqttClient.on ("connect" , () => { mqttClient.subscribe ("sensors/+/+" ); mqttClient.subscribe ("alerts/+/+" ); mqttClient.subscribe ("cloud/aggregated/+" ); }); mqttClient.on ("message" , (topic, message ) => { const data = JSON .parse (message.toString ()); const parts = topic.split ("/" ); if (parts[0 ] === "sensors" ) io.emit ("sensorData" , { sensorType : parts[1 ], deviceId : parts[2 ], data }); else if (parts[0 ] === "alerts" ) io.emit ("newAlert" , data); else if (parts[0 ] === "cloud" ) io.emit ("aggregatedData" , data); }); io.on ("connection" , (socket ) => { console .log ("客户端已连接:" , socket.id ); }); server.listen (3000 , "0.0.0.0" , () => { console .log ("监控界面已启动: http://localhost:3000" ); });
7.3 前端设计
6 张传感器卡片:网格布局,实时显示各设备数值和状态颜色(绿色=正常、橙色=警告、红色=临界,红色卡片带脉冲动画)
警报面板:最近 10 条告警,按严重级别着色
聚合面板:最近 8 条云端聚合统计
状态栏:连接状态、设备总数、警报计数
超时检测:30 秒无更新自动移除离线设备
第八部分:系统启停脚本 start_system.sh1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #!/bin/bash BASE_DIR="$HOME /mqtt-iot-experiment" cd "$BASE_DIR " mkdir -p logsexport PYTHONUNBUFFERED=1export PYTHONPATH="$BASE_DIR /iot-devices:$BASE_DIR /edge-nodes:$BASE_DIR /cloud-service" cd "$BASE_DIR /iot-devices" python3 device_manager.py > "$BASE_DIR /logs/devices.log" 2>&1 & DEVICES_PID=$! cd "$BASE_DIR /edge-nodes" python3 data_preprocessor.py > "$BASE_DIR /logs/preprocessor.log" 2>&1 & PREPROCESSOR_PID=$! python3 anomaly_detector.py > "$BASE_DIR /logs/anomaly_detector.log" 2>&1 & DETECTOR_PID=$! cd "$BASE_DIR /cloud-service" python3 cloud_aggregator.py > "$BASE_DIR /logs/cloud_service.log" 2>&1 & CLOUD_PID=$! cd "$BASE_DIR /web-interface" npm start > "$BASE_DIR /logs/web_server.log" 2>&1 & WEB_PID=$! echo "$DEVICES_PID $PREPROCESSOR_PID $DETECTOR_PID $CLOUD_PID $WEB_PID " \ > "$BASE_DIR /system_pids.txt"
📌 关键配置:PYTHONUNBUFFERED=1 确保日志实时输出;PYTHONPATH 确保跨目录模块导入。
stop_system.sh1 2 3 4 5 6 7 8 9 #!/bin/bash if [ -f system_pids.txt ]; then for PID in $(cat system_pids.txt); do kill -0 $PID 2>/dev/null && kill $PID done rm system_pids.txt else pkill -f "device_manager.py|data_preprocessor.py|anomaly_detector.py|cloud_aggregator.py|node index.js" fi
第九部分:运行验证 9.1 启动系统 1 ~/mqtt-iot-experiment/start_system.sh
输出:
1 2 3 4 5 6 7 8 9 10 11 12 ============================== IoT 实验系统启动中... ============================== [1/5] 启动设备模拟器... -> PID: 5586 [2/5] 启动数据预处理节点... -> PID: 5587 [3/5] 启动异常检测节点... -> PID: 5588 [4/5] 启动云端聚合服务... -> PID: 5589 [5/5] 启动 Web 监控界面... -> PID: 5590 ============================== 系统启动完成! 访问 http://localhost:3000 查看监控界面 ==============================
9.2 访问 Web 监控界面 打开浏览器访问 http://localhost:3000
9.3 监控 MQTT 消息 1 2 3 4 5 6 7 8 mosquitto_sub -h localhost -t "sensors/+/+" mosquitto_sub -h localhost -t "alerts/+/+" mosquitto_sub -h localhost -t "cloud/aggregated/+"
9.4 发送异常数据测试 手动发布一条 100°C 的温度数据来触发异常检测:
1 2 3 4 5 6 7 8 mosquitto_pub -h localhost -t "sensors/temperature/temp_001" -m '{ "device_id": "temp_001", "sensor_type": "temperature", "value": 100.0, "unit": "celsius", "timestamp": "' $(date -Iseconds)'", "location": {"lat": 39.9042, "lon": 116.4074} }'
异常检测日志输出:
1 2 3 [异常检测] ALERT: [warning] temperature/temp_001 状态警告: high, 数值: 100.0 [异常检测] ALERT: [warning] temperature/temp_001 统计异常: 值=2.000, 均值=0.524, 标准差=0.363 [异常检测] ALERT: [warning] temperature/temp_001 趋势异常: 斜率=0.326
→ 一条异常数据触发了 3 种告警:阈值违规 + 统计离群 + 趋势异常。
MQTT 告警消息示例:
1 2 3 4 5 6 7 8 9 { "type" : "threshold_violation" , "severity" : "warning" , "message" : "temperature/temp_001 状态警告: high, 数值: 100.0" , "device_id" : "temp_001" , "sensor_type" : "temperature" , "raw_value" : 100.0 , "timestamp" : "2026-06-09T15:15:XX.XXXXXX" }
9.5 停止系统 1 ~/mqtt-iot-experiment/stop_system.sh
四、实验结果与分析 4.1 系统运行状态
指标
数值
运行进程数
6(设备管理器 + 预处理 + 异常检测 + 云端 + Node.js × 2)
模拟设备数
8 个(2×温度 + 2×湿度 + 光照 + 气压 + 运动 + 烟尘)
数据发送频率
2~3.5 秒/设备
聚合窗口
10 秒
Web 延迟
实时(WebSocket 推送)
4.2 MQTT 协议特性验证 实验验证了 MQTT 的以下核心特性:
发布/订阅解耦 —— 传感器、边缘节点、云端服务之间只通过 Topic 通信,不感知对方存在。新增组件只需订阅对应 Topic 即可接入。
主题层级化 —— sensors/{type}/{id} 的层级设计支持通配符:sensors/+/+ 订阅全部传感器,sensors/temperature/+ 只订阅温度。
一对多消息分发 —— 一条传感器数据同时送到预处理节点、云端服务和 Web 界面三个消费者,体现了 MQTT 的广播能力。
轻量级协议 —— ARM64 虚拟机上 6 个 Python 进程 + 1 个 Node.js 进程总内存约 200MB,适合资源受限的边缘设备。
4.3 边缘计算架构优势
优势
体现
就近处理
预处理和异常检测在边缘节点完成,降低云端负载
实时响应
异常检测延迟 < 100ms(本地 Broker),满足工业场景需求
分级告警
5 级状态分类 + 3 种检测维度,降低误报率
容错性
单传感器故障不影响其他设备,组件独立运行
4.4 MQTT 协议要点(计算机网络视角) 协议栈位置 MQTT 工作在 TCP/IP 协议栈的应用层,下层依赖 TCP(端口 1883/8883)。因为跑在 TCP 之上,MQTT 天然继承了可靠传输、拥塞控制和有序交付,不用自己在应用层做确认和重传。MQTT 本身只关心 pub/sub 语义和主题匹配。
1 2 3 4 5 6 7 ┌─────────────────────┐ │ MQTT (应用层) │ PUBLISH / SUBSCRIBE / CONNECT ├─────────────────────┤ │ TCP (传输层) │ 可靠字节流, 端口 1883 ├─────────────────────┤ │ IP (网络层) │ 路由与寻址 └─────────────────────┘
报文格式与最小开销 MQTT 固定头只有 2 字节 :第一个字节包含消息类型(4 bit)和标志位(4 bit),第二个字节是剩余长度(变长编码,1-4 字节)。相比之下 HTTP/1.1 请求头动辄几百字节。这个 2 字节底开销是 MQTT 能跑在低带宽、高延迟物联网网络里的关键。
1 2 3 4 5 6 7 8 0 1 2 3 4 5 6 7 ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐ │ 消息类型 (4bit) │ DUP │ QoS │ RETAIN │ ├──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┤ │ 剩余长度 (变长 1-4 字节) │ ├──────────────────────────────────────────────────────┤ │ 可变头 + 载荷(取决于消息类型) │ └──────────────────────────────────────────────────────┘
QoS 三级递送保证 MQTT 定义了三种服务质量,本质是在 TCP 之上额外加了应用层确认:
QoS
名称
行为
开销
0
至多一次
发完即忘,不重试,不确认
最小,消息可能丢
1
至少一次
接收方回 PUBACK;发送方没收到 ACK 就重发
有重传开销,可能重复
2
恰好一次
四步握手(PUBLISH → PUBREC → PUBREL → PUBCOMP)
最大,保证不丢不重
QoS 0 适合高频传感器数据(丢一两条无所谓)。QoS 1 适合告警和控制指令。QoS 2 开销最大,一般是关键计费或配置下发场景才用。本实验中传感器上报用默认 QoS 0,告警消息适用 QoS 1。
会话与遗嘱机制 客户端连接时可以设置 cleanSession 标志:true 表示断开后 Broker 清掉所有订阅和未消费消息;false 表示 Broker 保留会话,重连后自动恢复订阅,并把离线期间积攒的消息推过来。
遗嘱消息(Last Will)是客户端在 CONNECT 时就预设好的一条消息,如果客户端非正常断开(keep-alive 超时),Broker 会自动替它发布这条遗嘱,通知其他订阅者”这个设备下线了”。本实验是本地原型系统没开这个特性,但真实部署里遗嘱消息是设备在线状态监控的标配手段。
与 HTTP/CoAP 的对比
MQTT
HTTP/1.1
CoAP
传输层
TCP
TCP
UDP
通信模式
Pub/Sub(推送)
Req/Resp(拉取)
Req/Resp(类 REST)
头开销
≥2 字节
数百字节
≥4 字节
适用场景
传感器上报、推送通知
Web API、文件传输
受限节点、6LoWPAN
实时性
天然推送,毫秒级
需轮询或长轮询
支持 Observe 模式
MQTT 的中心化 Broker 模型和 HTTP 的端到端模型各有利弊:MQTT 的生产者和消费者完全解耦,扩展灵活,但 Broker 是单点瓶颈;HTTP 没有中心节点但需要客户端知道服务器地址且主动发起请求。
Topic 匹配与通配符 MQTT 主题是 UTF-8 字符串,用 / 分层。Broker 内部用 前缀树(Trie) 做主题匹配,支持两种通配符:
+ 匹配单层,如 sensors/+/temp_001 匹配 sensors/temperature/temp_001
# 匹配多层(只能放末尾),如 sensors/# 匹配所有传感器消息
匹配过程是 O(主题深度) 的 Trie 遍历,所以即使订阅量很大,单条消息的分发速度也非常快。本实验中 Web 界面用 sensors/+/+ 一条订阅就覆盖了全部 8 个设备的原始数据。
五、踩坑记录与问题解决 5.1 paho-mqtt 2.x API 变更 paho-mqtt 2.x 中 mqtt.Client() 必须传入 CallbackAPIVersion,教程代码基于 1.x 会直接报错。
解决:
1 self .client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
5.2 Python 日志缓冲 后台运行的 Python 进程日志文件为空——stdout 默认带缓冲,nohup 重定向时不会立即刷盘。
启动脚本里加 export PYTHONUNBUFFERED=1 即可关闭缓冲。
5.3 Python 模块导入路径 device_manager.py 里 from temperature_sensor import TemperatureSensor 报 ModuleNotFoundError。
启动脚本加 export PYTHONPATH 把所有子目录纳入搜索路径。
5.4 f-string 中 dict key 引用 f-string 里引用 dict key 忘加引号:normalized[normalized] 被当成用 dict 做 key,报 unhashable type: 'dict'。
解决:
1 2 3 4 f"norm={normalized[normalized]:.2 f} " f"norm={normalized['normalized' ]:.2 f} "
5.5 numpy 类型 JSON 序列化 numpy 的 int64 / float64 不能直接 json.dumps(),报 Object of type int64 is not JSON serializable。
解决:
1 2 3 4 "count" : int (len (records))"avg" : float (round (np.mean(values), 2 ))
5.6 Node.js MQTT 客户端 localhost 解析为 IPv6 Web 界面显示”设备: 0”,没有任何传感器数据。排查发现 Web 服务器的 MQTT 客户端连不上 Broker,报 ECONNREFUSED ::1:1883。mqtt v5.x 把 localhost 解析成了 IPv6 ::1,但 Mosquitto 只配了 listener 1883 0.0.0.0(仅 IPv4),连接直接被拒。
把 index.js 里的 mqtt://localhost:1883 改成 mqtt://127.0.0.1:1883,强制走 IPv4 就行。
1 2 3 4 const MQTT_BROKER = "mqtt://localhost:1883" ;const MQTT_BROKER = "mqtt://127.0.0.1:1883" ;
Python 的 paho-mqtt 不受影响,它底层会自动回退 IPv4。
5.7 聚合面板重复堆积 云端聚合每 10 秒为每种传感器类型发一次统计,前端 displayAggregation 每次都创建新的 DOM 节点,同一类型很快堆出多条重复条目,面板不断膨胀。
给 displayAggregation 加了去重逻辑:先 querySelector 查找同 data-sensor-type 属性的旧条目并删掉,再插入新的。每类传感器始终只保留最新一条。
1 2 3 4 5 6 7 8 9 function displayAggregation (data ) { var container = document .getElementById ("aggregationData" ); var existing = container.querySelector ( '.agg-item[data-sensor-type="' + data.sensor_type + '"]' ); if (existing) existing.remove (); }
5.8 异常检测告警洪水 motion 传感器 70% 时间输出 0,dust 传感器值多在 30-50 之间,而原分类阈值把 motion<10 判为”low”、dust<75 判为”low”。结果是几乎每个数据点都触发告警,每 10 秒涌出 ~10 条。
两步修复:(1) 后端 anomaly_detector.py 按 (device_id, alert_type, severity) 做 60 秒冷却,同类同级别告警在冷却期内直接跳过;(2) 前端 app.js 按 sensor_type/device_id/type 对告警去重,重复的只更新计数和时间戳,显示时追加 (×N),alertCount 也改为唯一告警数。
5.9 分类阈值与模拟器值域不匹配 上面 5.8 的根因是 data_preprocessor.py 的 classify() 阈值和模拟器实际生成的值域对不上。调整后:
传感器
模拟值域
旧 normal 区间
新 normal 区间
motion
0~10
10~30
0~15
dust
15~55
75~150
15~80
temperature
10~30
10~40
5~35
告警量从 ~10 条/10 秒降到 0,仅模拟器 5% 概率注入的随机异常时偶尔触发,符合预期。
5.10 前端 app.js 语法错误导致页面无数据 Python 脚本写 app.js 时 querySelector 那行的引号转义出了错(写入了 \\"),导致整个 JS 文件解析失败,Socket.IO 代码从未执行,页面始终显示”设备: 0”。修复后加了 Cache-Control: no-cache 头,防止浏览器缓存旧版 JS。
5.11 重复启动导致进程残留 再次执行 start_system.sh 时没先停旧进程,导致所有 Python 组件出现双份实例,Web 服务器因端口 3000 被占报 EADDRINUSE。在 start_system.sh 开头加了一行 bash stop_system.sh,确保每次启动先清理残留。
六、实验总结 本实验在 OrbStack Ubuntu 22.04 ARM64 环境下成功搭建了一个完整的 IoT 边缘计算原型系统,包含:
1 个 MQTT Broker(Mosquitto 2.0.11)
8 个传感器模拟器(6 类,Python)
2 个边缘处理节点(预处理 + 异常检测,Python)
1 个云端聚合服务(Python)
1 个 Web 实时监控界面(Node.js + Socket.IO)
通过本实验深入理解了 MQTT 的 Pub/Sub 模式、Topic 层级设计和边缘计算架构。系统打通了 数据采集 → 边缘预处理 → 异常检测 → 云端聚合 → 可视化展示 的完整链路。
后续可扩展方向
添加 TLS/SSL 加密和用户名密码认证
使用 Docker Compose 容器化部署
接入真实硬件传感器(ESP32/Arduino)
集成时序数据库(InfluxDB)持久化存储
添加 Grafana 可视化大屏
附录:快速参考 常用命令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ~/mqtt-iot-experiment/start_system.sh ~/mqtt-iot-experiment/stop_system.sh tail -f ~/mqtt-iot-experiment/logs/anomaly_detector.logmosquitto_sub -h localhost -t "sensors/+/+" mosquitto_pub -h localhost -t "sensors/temperature/temp_001" \ -m '{"device_id":"temp_001","sensor_type":"temperature","value":100.0,...}' ps aux | grep -E "device_manager|preprocessor|anomaly|cloud_agg|node index"