MQTT协议应用实验

MQTT 协议与边缘设备实验报告

实验日期:2026-06-09
实验环境:macOS + OrbStack Ubuntu 22.04 ARM64
实验目的:搭建完整 IoT 原型系统,验证 MQTT 协议的发布/订阅模式在边缘计算场景中的应用


一、实验概述

1.1 实验目标

在 Ubuntu 虚拟环境中搭建一个端到端的物联网(IoT)边缘计算原型系统,通过模拟传感器设备、边缘处理节点、云端聚合服务和 Web 监控界面,深入理解 MQTT 的发布/订阅(Pub/Sub)模式在边缘计算中的实际应用。

1.2 系统架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
graph LR
subgraph 设备层
T["🌡️ 温度传感器 ×2"]
H["💧 湿度传感器 ×2"]
L["💡 光照传感器"]
P["🔵 气压传感器"]
M["🚶 运动传感器"]
D["🌫️ 烟尘传感器"]
end

subgraph 边缘层
PRE["数据预处理节点"]
ABN["异常检测节点"]
end

subgraph 云端层
CLD["云端聚合服务"]
end

subgraph 展示层
WEB["Web 监控界面"]
end

T & H & L & P & M & D -->|sensors/+/+| MQTT{MQTT Broker<br/>Mosquitto :1883}
MQTT -->|sensors/+/+| PRE
PRE -->|processed/+/+| MQTT
MQTT -->|processed/+/+| ABN
ABN -->|alerts/+/+| MQTT
MQTT -->|processed/+/+<br/>alerts/+/+| CLD
CLD -->|cloud/aggregated/+| MQTT
MQTT -->|all topics| WEB

1.3 组件清单

组件 数量 技术栈 说明
MQTT Broker 1 Mosquitto 2.0.11 MQTT 消息中间件
设备模拟器 8 个 (6 类) Python 3 + paho-mqtt 模拟传感器数据上报
数据预处理 1 Python 3 数据标准化 + 分类
异常检测 1 Python 3 + numpy 阈值/统计/趋势三重检测
云端聚合 1 Python 3 + numpy 时间窗口聚合统计
Web 界面 1 Node.js + Socket.IO 实时监控仪表盘

1.4 MQTT 主题设计

主题模式 方向 说明
sensors/{type}/{id} 设备 → Broker 原始传感器数据
processed/{type}/{id} 预处理 → Broker 标准化 + 分类数据
alerts/{type}/{id} 异常检测 → Broker 告警消息
cloud/aggregated/{type} 聚合 → Broker 统计聚合结果

二、实验环境

2.1 宿主机环境

项目 配置
操作系统 macOS (Apple Silicon)
虚拟化平台 OrbStack
终端 iTerm2 / zsh

2.2 虚拟机环境

项目 配置
操作系统 Ubuntu 22.04.5 LTS (ARM64)
内核版本 7.0.5-orbstack
磁盘空间 722 GB(可用 721 GB)
Python 3.10.12
Node.js v18.20.8
npm 10.8.2
Mosquitto 2.0.11
paho-mqtt 2.1.0
numpy 2.2.6

2.3 OrbStack 环境说明

本实验使用 OrbStack 替代传统 VMware,无需手动配置 CPU/内存/网络。以下命令均在 Ubuntu VM 内直接执行。Web 界面通过 OrbStack 端口转发在 macOS 浏览器访问 http://localhost:3000

1
2
# 登录 Ubuntu VM(实验命令在 VM 内直接执行)
ssh ubuntu@orb

三、实验步骤

第一部分:环境准备

1.1 系统更新与基础包安装

1
2
sudo apt update && sudo apt upgrade -y
sudo apt install -y curl wget git python3 python3-pip python3-venv

1.2 安装 Node.js 18

Ubuntu 22.04 默认源的 Node.js 版本过旧(v12.x),后续 Web 服务需要 v18+。

1
2
3
4
5
6
7
# 通过 NodeSource 官方源安装
curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -
sudo apt install -y nodejs

# 验证版本
node -v # v18.20.8
npm -v # 10.8.2

1.3 安装 Mosquitto MQTT Broker

1
2
3
4
5
6
7
8
9
sudo apt install -y mosquitto mosquitto-clients

# 启动并设置开机自启
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# 验证
sudo systemctl status mosquitto
# Active: active (running)

第二部分:MQTT Broker 安装与测试

2.1 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sudo tee /etc/mosquitto/conf.d/default.conf << EOF
# 允许匿名连接(实验用)
allow_anonymous true

# 监听端口
listener 1883 0.0.0.0

# 日志配置
log_type error
log_type warning
log_type notice
log_type information
EOF

sudo systemctl restart mosquitto

2.2 连通性验证

1
2
3
4
5
6
7
# 终端1:订阅
mosquitto_sub -h localhost -t test/topic

# 终端2:发布
mosquitto_pub -h localhost -t test/topic -m "Hello MQTT - 测试成功!"

# 终端1 应收到:Hello MQTT - 测试成功!

第三部分:项目结构

1
mkdir -p ~/mqtt-iot-experiment/{iot-devices,edge-nodes,cloud-service,web-interface,logs,config,data}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
~/mqtt-iot-experiment/
├── iot-devices/ # IoT 设备模拟器
│ ├── temperature_sensor.py
│ ├── humidity_sensor.py
│ ├── light_sensor.py
│ ├── pressure_sensor.py
│ ├── motion_sensor.py
│ ├── dust_sensor.py
│ └── device_manager.py
├── edge-nodes/ # 边缘处理节点
│ ├── data_preprocessor.py
│ └── anomaly_detector.py
├── cloud-service/ # 云端聚合服务
│ └── cloud_aggregator.py
├── web-interface/ # Web 监控界面
│ ├── index.js
│ ├── package.json
│ └── public/
│ ├── index.html
│ ├── style.css
│ └── app.js
├── logs/ # 运行日志
├── config/ # 配置文件
├── data/ # 数据存储
├── start_system.sh # 一键启动
└── stop_system.sh # 一键停止

第四部分:IoT 设备模拟器开发

4.1 安装 Python 依赖

1
2
pip3 install --user paho-mqtt numpy
# paho-mqtt 2.1.0, numpy 2.2.6

4.2 传感器模拟器设计

每个传感器遵循统一的设计模式:

  • __init__() — 初始化 MQTT 客户端、主题、设备 ID
  • connect() — 连接 Broker
  • generate_data() — 生成模拟数据(基础值 + 随机波动 + 5% 异常概率)
  • run() — 循环发送数据
  • stop() — 断开连接

6 类传感器参数:

传感器 基础值 波动范围 5%异常范围 发送间隔 单位
温度 20°C ±10°C +15~25°C 2s celsius
湿度 50% ±20% 90~100% 3s percent
光照 500 lux ±300 lux 1200~2000 lux 2.5s lux
气压 1013.25 hPa ±10 hPa 950~980 hPa 3.5s hPa
运动 0~10 次 - 20~50 次 2s count
烟尘 35 µg/m³ ±20 150~500 3s µg/m³

4.3 温度传感器(示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import json
import random
import time
import datetime

class TemperatureSensor:
def __init__(self, device_id, broker_host="localhost", broker_port=1883):
self.device_id = device_id
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.topic = f"sensors/temperature/{device_id}"
self.running = False

def connect(self):
try:
self.client.connect(self.broker_host, self.broker_port, 60)
print(f"[温度传感器 {self.device_id}] 已连接到 MQTT Broker")
return True
except Exception as e:
print(f"[温度传感器 {self.device_id}] 连接失败: {e}")
return False

def generate_temperature_data(self):
base_temp = 20.0
variation = random.uniform(-10, 10)
temperature = round(base_temp + variation, 2)
# 5% 概率产生异常数据
if random.random() < 0.05:
temperature += random.uniform(15, 25)
return {
"device_id": self.device_id,
"sensor_type": "temperature",
"value": temperature,
"unit": "celsius",
"timestamp": datetime.datetime.now().isoformat(),
"location": {"lat": 39.9042, "lon": 116.4074}
}

def run(self):
self.running = True
while self.running:
try:
data = self.generate_temperature_data()
self.client.publish(self.topic, json.dumps(data))
time.sleep(2)
except Exception as e:
print(f"[温度传感器 {self.device_id}] 发送失败: {e}")
time.sleep(5)

def stop(self):
self.running = False
self.client.disconnect()

其余 5 类传感器(湿度、光照、气压、运动、烟尘)结构相同,仅在数据生成逻辑上有差异。完整代码见项目目录 ~/mqtt-iot-experiment/iot-devices/

4.4 设备管理器

device_manager.py 负责统一启动和管理所有传感器实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env python3
import threading
import time
import signal
import sys
from temperature_sensor import TemperatureSensor
from humidity_sensor import HumiditySensor
from light_sensor import LightSensor
from pressure_sensor import PressureSensor
from motion_sensor import MotionSensor
from dust_sensor import DustSensor

devices = []
threads = []

def create_sensors():
sensors = [
TemperatureSensor("temp_001"), TemperatureSensor("temp_002"),
HumiditySensor("humid_001"), HumiditySensor("humid_002"),
LightSensor("light_001"),
PressureSensor("press_001"),
MotionSensor("motion_001"),
DustSensor("dust_001"),
]
for sensor in sensors:
if sensor.connect():
devices.append(sensor)
print(f"共启动 {len(devices)} 个传感器设备")

def start_sensors():
for device in devices:
t = threading.Thread(target=device.run, daemon=True)
t.start()
threads.append(t)
print("所有传感器已开始发送数据")

def stop_sensors(signum=None, frame=None):
print("\n正在停止所有传感器...")
for device in devices:
device.stop()
sys.exit(0)

if __name__ == "__main__":
signal.signal(signal.SIGINT, stop_sensors)
signal.signal(signal.SIGTERM, stop_sensors)
create_sensors()
start_sensors()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_sensors()

第五部分:边缘处理节点开发

5.1 数据预处理节点

data_preprocessor.py 订阅 sensors/+/+,对每个传感器的原始数据执行:

  1. 归一化(Normalize):将不同量纲数据映射到 [0, 1] 区间
  2. 状态分类(Classify):基于阈值分为 5 个等级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def normalize(self, value, sensor_type):
"""各传感器数值归一化到 [0,1]"""
ranges = {
"temperature": (0, 50), "humidity": (0, 100),
"light": (0, 2000), "pressure": (950, 1050),
"motion": (0, 50), "dust": (0, 500),
}
lo, hi = ranges.get(sensor_type, (0, 100))
return (value - lo) / (hi - lo) if hi != lo else 0.0

def classify(self, value, sensor_type):
"""基于阈值五级分类"""
thresholds = {
"temperature": (-10, 10, 40, 80), # 低温/正常/高温/超高温
"humidity": (0, 30, 70, 100), # 过干/偏干/正常/偏湿/过湿
"light": (0, 300, 1000, 2000), # 暗/较暗/正常/亮/过亮
"pressure": (900, 980, 1030, 1100), # 低压/偏低/正常/偏高/高压
"motion": (0, 10, 30, 50), # 静止/少量/正常/频繁/异常
"dust": (0, 75, 150, 500), # 优/良/轻度/中度/重度
}
t = thresholds.get(sensor_type)
if t is None: return "normal"
if value < t[0]: return "critical_low"
elif value < t[1]: return "low"
elif value < t[2]: return "normal"
elif value < t[3]: return "high"
else: return "critical_high"

处理后的数据发布到 processed/{type}/{id},增加了 normalizedprocessed_timestampstatus 字段。

5.2 异常检测节点

anomaly_detector.py 订阅 processed/+/+,实现三重异常检测:

检测类型 方法 触发条件
阈值违规 直接判断 status 为 high / critical
统计异常 滑动窗口 Z-score `
趋势异常 线性回归斜率 `

告警消息发布到 alerts/{type}/{id},包含告警类型、严重级别、设备信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 核心检测逻辑
class AnomalyDetector:
def __init__(self):
self.history = defaultdict(list) # 滑动窗口
self.window_size = 20

def on_message(self, client, userdata, msg):
data = json.loads(msg.payload.decode())
key = f"{data['sensor_type']}/{data['device_id']}"
self.history[key].append(data['normalized'])
if len(self.history[key]) > self.window_size:
self.history[key].pop(0)

alerts = []
# 1. 阈值检测
if data['status'] in ("critical_high", "critical_low"):
alerts.append({"type": "threshold_violation", "severity": "critical", ...})
elif data['status'] in ("high", "low"):
alerts.append({"type": "threshold_violation", "severity": "warning", ...})

# 2. 统计异常
arr = np.array(self.history[key])
mean, std = np.mean(arr), np.std(arr)
if std > 0 and abs(data['normalized'] - mean) > 2 * std:
alerts.append({"type": "statistical_outlier", "severity": "warning", ...})

# 3. 趋势异常
recent = np.array(self.history[key][-5:])
trend = np.polyfit(range(len(recent)), recent, 1)[0]
if abs(trend) > 0.1:
alerts.append({"type": "trend_anomaly", "severity": "warning", ...})

第六部分:云端聚合服务开发

cloud_aggregator.py 订阅 processed/+/+alerts/+/+,每 10 秒进行一次时间窗口聚合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class CloudAggregator:
def __init__(self):
self.data_buffer = defaultdict(list)
self.aggregation_window = 10 # 秒

def aggregate_and_publish(self):
for sensor_type, records in self.data_buffer.items():
values = [r["value"] for r in records]
aggregated = {
"sensor_type": sensor_type,
"count": int(len(records)),
"device_count": int(len(set(r["device_id"] for r in records))),
"avg": float(round(np.mean(values), 2)),
"max": float(round(np.max(values), 2)),
"min": float(round(np.min(values), 2)),
"std": float(round(np.std(values), 2)),
"avg_normalized": float(round(np.mean([r["normalized"] for r in records]), 4)),
"timestamp": datetime.datetime.now().isoformat(),
"window_seconds": int(self.aggregation_window)
}
self.client.publish(f"cloud/aggregated/{sensor_type}", json.dumps(aggregated))
self.data_buffer.clear()

numpy 的 int64/float64 类型不能直接 json.dumps(),需要用 int()/float() 显式转换。


第七部分:Web 监控界面开发

7.1 技术栈

层级 技术
后端 Node.js + Express + Socket.IO
MQTT 客户端 mqtt.js
前端 原生 HTML/CSS/JavaScript
实时通信 WebSocket (Socket.IO)

7.2 服务端 index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const express = require("express");
const http = require("http");
const { Server } = require("socket.io");
const mqtt = require("mqtt");
const path = require("path");

const app = express();
const server = http.createServer(app);
const io = new Server(server);

const mqttClient = mqtt.connect("mqtt://127.0.0.1:1883");

app.use(express.static(path.join(__dirname, "public")));

mqttClient.on("connect", () => {
mqttClient.subscribe("sensors/+/+");
mqttClient.subscribe("alerts/+/+");
mqttClient.subscribe("cloud/aggregated/+");
});

mqttClient.on("message", (topic, message) => {
const data = JSON.parse(message.toString());
const parts = topic.split("/");
if (parts[0] === "sensors")
io.emit("sensorData", { sensorType: parts[1], deviceId: parts[2], data });
else if (parts[0] === "alerts")
io.emit("newAlert", data);
else if (parts[0] === "cloud")
io.emit("aggregatedData", data);
});

io.on("connection", (socket) => {
console.log("客户端已连接:", socket.id);
});

server.listen(3000, "0.0.0.0", () => {
console.log("监控界面已启动: http://localhost:3000");
});

7.3 前端设计

  • 6 张传感器卡片:网格布局,实时显示各设备数值和状态颜色(绿色=正常、橙色=警告、红色=临界,红色卡片带脉冲动画)
  • 警报面板:最近 10 条告警,按严重级别着色
  • 聚合面板:最近 8 条云端聚合统计
  • 状态栏:连接状态、设备总数、警报计数
  • 超时检测:30 秒无更新自动移除离线设备

第八部分:系统启停脚本

start_system.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash
BASE_DIR="$HOME/mqtt-iot-experiment"
cd "$BASE_DIR"
mkdir -p logs
export PYTHONUNBUFFERED=1
export PYTHONPATH="$BASE_DIR/iot-devices:$BASE_DIR/edge-nodes:$BASE_DIR/cloud-service"

# 按依赖顺序启动 5 个组件
cd "$BASE_DIR/iot-devices"
python3 device_manager.py > "$BASE_DIR/logs/devices.log" 2>&1 &
DEVICES_PID=$!

cd "$BASE_DIR/edge-nodes"
python3 data_preprocessor.py > "$BASE_DIR/logs/preprocessor.log" 2>&1 &
PREPROCESSOR_PID=$!
python3 anomaly_detector.py > "$BASE_DIR/logs/anomaly_detector.log" 2>&1 &
DETECTOR_PID=$!

cd "$BASE_DIR/cloud-service"
python3 cloud_aggregator.py > "$BASE_DIR/logs/cloud_service.log" 2>&1 &
CLOUD_PID=$!

cd "$BASE_DIR/web-interface"
npm start > "$BASE_DIR/logs/web_server.log" 2>&1 &
WEB_PID=$!

echo "$DEVICES_PID $PREPROCESSOR_PID $DETECTOR_PID $CLOUD_PID $WEB_PID" \
> "$BASE_DIR/system_pids.txt"

📌 关键配置:PYTHONUNBUFFERED=1 确保日志实时输出;PYTHONPATH 确保跨目录模块导入。

stop_system.sh

1
2
3
4
5
6
7
8
9
#!/bin/bash
if [ -f system_pids.txt ]; then
for PID in $(cat system_pids.txt); do
kill -0 $PID 2>/dev/null && kill $PID
done
rm system_pids.txt
else
pkill -f "device_manager.py|data_preprocessor.py|anomaly_detector.py|cloud_aggregator.py|node index.js"
fi

第九部分:运行验证

9.1 启动系统

1
~/mqtt-iot-experiment/start_system.sh

输出:

1
2
3
4
5
6
7
8
9
10
11
12
==============================
IoT 实验系统启动中...
==============================
[1/5] 启动设备模拟器... -> PID: 5586
[2/5] 启动数据预处理节点... -> PID: 5587
[3/5] 启动异常检测节点... -> PID: 5588
[4/5] 启动云端聚合服务... -> PID: 5589
[5/5] 启动 Web 监控界面... -> PID: 5590
==============================
系统启动完成!
访问 http://localhost:3000 查看监控界面
==============================

9.2 访问 Web 监控界面

打开浏览器访问 http://localhost:3000

9.3 监控 MQTT 消息

1
2
3
4
5
6
7
8
# 监控所有传感器原始数据
mosquitto_sub -h localhost -t "sensors/+/+"

# 监控所有告警
mosquitto_sub -h localhost -t "alerts/+/+"

# 监控聚合数据
mosquitto_sub -h localhost -t "cloud/aggregated/+"

9.4 发送异常数据测试

手动发布一条 100°C 的温度数据来触发异常检测:

1
2
3
4
5
6
7
8
mosquitto_pub -h localhost -t "sensors/temperature/temp_001" -m '{
"device_id": "temp_001",
"sensor_type": "temperature",
"value": 100.0,
"unit": "celsius",
"timestamp": "'$(date -Iseconds)'",
"location": {"lat": 39.9042, "lon": 116.4074}
}'

异常检测日志输出:

1
2
3
[异常检测] ALERT: [warning] temperature/temp_001 状态警告: high, 数值: 100.0
[异常检测] ALERT: [warning] temperature/temp_001 统计异常: 值=2.000, 均值=0.524, 标准差=0.363
[异常检测] ALERT: [warning] temperature/temp_001 趋势异常: 斜率=0.326

→ 一条异常数据触发了 3 种告警:阈值违规 + 统计离群 + 趋势异常。

MQTT 告警消息示例:

1
2
3
4
5
6
7
8
9
{
"type": "threshold_violation",
"severity": "warning",
"message": "temperature/temp_001 状态警告: high, 数值: 100.0",
"device_id": "temp_001",
"sensor_type": "temperature",
"raw_value": 100.0,
"timestamp": "2026-06-09T15:15:XX.XXXXXX"
}

9.5 停止系统

1
~/mqtt-iot-experiment/stop_system.sh

四、实验结果与分析

4.1 系统运行状态

指标 数值
运行进程数 6(设备管理器 + 预处理 + 异常检测 + 云端 + Node.js × 2)
模拟设备数 8 个(2×温度 + 2×湿度 + 光照 + 气压 + 运动 + 烟尘)
数据发送频率 2~3.5 秒/设备
聚合窗口 10 秒
Web 延迟 实时(WebSocket 推送)

4.2 MQTT 协议特性验证

实验验证了 MQTT 的以下核心特性:

  1. 发布/订阅解耦 —— 传感器、边缘节点、云端服务之间只通过 Topic 通信,不感知对方存在。新增组件只需订阅对应 Topic 即可接入。

  2. 主题层级化 —— sensors/{type}/{id} 的层级设计支持通配符:sensors/+/+ 订阅全部传感器,sensors/temperature/+ 只订阅温度。

  3. 一对多消息分发 —— 一条传感器数据同时送到预处理节点、云端服务和 Web 界面三个消费者,体现了 MQTT 的广播能力。

  4. 轻量级协议 —— ARM64 虚拟机上 6 个 Python 进程 + 1 个 Node.js 进程总内存约 200MB,适合资源受限的边缘设备。

4.3 边缘计算架构优势

优势 体现
就近处理 预处理和异常检测在边缘节点完成,降低云端负载
实时响应 异常检测延迟 < 100ms(本地 Broker),满足工业场景需求
分级告警 5 级状态分类 + 3 种检测维度,降低误报率
容错性 单传感器故障不影响其他设备,组件独立运行

4.4 MQTT 协议要点(计算机网络视角)

协议栈位置

MQTT 工作在 TCP/IP 协议栈的应用层,下层依赖 TCP(端口 1883/8883)。因为跑在 TCP 之上,MQTT 天然继承了可靠传输、拥塞控制和有序交付,不用自己在应用层做确认和重传。MQTT 本身只关心 pub/sub 语义和主题匹配。

1
2
3
4
5
6
7
┌─────────────────────┐
│ MQTT (应用层) │ PUBLISH / SUBSCRIBE / CONNECT
├─────────────────────┤
│ TCP (传输层) │ 可靠字节流, 端口 1883
├─────────────────────┤
│ IP (网络层) │ 路由与寻址
└─────────────────────┘

报文格式与最小开销

MQTT 固定头只有 2 字节:第一个字节包含消息类型(4 bit)和标志位(4 bit),第二个字节是剩余长度(变长编码,1-4 字节)。相比之下 HTTP/1.1 请求头动辄几百字节。这个 2 字节底开销是 MQTT 能跑在低带宽、高延迟物联网网络里的关键。

1
2
3
4
5
6
7
8
 0      1      2      3      4      5      6      7
┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
│ 消息类型 (4bit) │ DUP │ QoS │ RETAIN │
├──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┤
│ 剩余长度 (变长 1-4 字节) │
├──────────────────────────────────────────────────────┤
│ 可变头 + 载荷(取决于消息类型) │
└──────────────────────────────────────────────────────┘

QoS 三级递送保证

MQTT 定义了三种服务质量,本质是在 TCP 之上额外加了应用层确认:

QoS 名称 行为 开销
0 至多一次 发完即忘,不重试,不确认 最小,消息可能丢
1 至少一次 接收方回 PUBACK;发送方没收到 ACK 就重发 有重传开销,可能重复
2 恰好一次 四步握手(PUBLISH → PUBREC → PUBREL → PUBCOMP) 最大,保证不丢不重

QoS 0 适合高频传感器数据(丢一两条无所谓)。QoS 1 适合告警和控制指令。QoS 2 开销最大,一般是关键计费或配置下发场景才用。本实验中传感器上报用默认 QoS 0,告警消息适用 QoS 1。

会话与遗嘱机制

客户端连接时可以设置 cleanSession 标志:true 表示断开后 Broker 清掉所有订阅和未消费消息;false 表示 Broker 保留会话,重连后自动恢复订阅,并把离线期间积攒的消息推过来。

遗嘱消息(Last Will)是客户端在 CONNECT 时就预设好的一条消息,如果客户端非正常断开(keep-alive 超时),Broker 会自动替它发布这条遗嘱,通知其他订阅者”这个设备下线了”。本实验是本地原型系统没开这个特性,但真实部署里遗嘱消息是设备在线状态监控的标配手段。

与 HTTP/CoAP 的对比

MQTT HTTP/1.1 CoAP
传输层 TCP TCP UDP
通信模式 Pub/Sub(推送) Req/Resp(拉取) Req/Resp(类 REST)
头开销 ≥2 字节 数百字节 ≥4 字节
适用场景 传感器上报、推送通知 Web API、文件传输 受限节点、6LoWPAN
实时性 天然推送,毫秒级 需轮询或长轮询 支持 Observe 模式

MQTT 的中心化 Broker 模型和 HTTP 的端到端模型各有利弊:MQTT 的生产者和消费者完全解耦,扩展灵活,但 Broker 是单点瓶颈;HTTP 没有中心节点但需要客户端知道服务器地址且主动发起请求。

Topic 匹配与通配符

MQTT 主题是 UTF-8 字符串,用 / 分层。Broker 内部用 前缀树(Trie) 做主题匹配,支持两种通配符:

  • + 匹配单层,如 sensors/+/temp_001 匹配 sensors/temperature/temp_001
  • # 匹配多层(只能放末尾),如 sensors/# 匹配所有传感器消息

匹配过程是 O(主题深度) 的 Trie 遍历,所以即使订阅量很大,单条消息的分发速度也非常快。本实验中 Web 界面用 sensors/+/+ 一条订阅就覆盖了全部 8 个设备的原始数据。


五、踩坑记录与问题解决

5.1 paho-mqtt 2.x API 变更

paho-mqtt 2.x 中 mqtt.Client() 必须传入 CallbackAPIVersion,教程代码基于 1.x 会直接报错。

解决:

1
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

5.2 Python 日志缓冲

后台运行的 Python 进程日志文件为空——stdout 默认带缓冲,nohup 重定向时不会立即刷盘。

启动脚本里加 export PYTHONUNBUFFERED=1 即可关闭缓冲。

5.3 Python 模块导入路径

device_manager.pyfrom temperature_sensor import TemperatureSensorModuleNotFoundError

启动脚本加 export PYTHONPATH 把所有子目录纳入搜索路径。

5.4 f-string 中 dict key 引用

f-string 里引用 dict key 忘加引号:normalized[normalized] 被当成用 dict 做 key,报 unhashable type: 'dict'

解决:

1
2
3
4
# 错误
f"norm={normalized[normalized]:.2f}"
# 正确
f"norm={normalized['normalized']:.2f}"

5.5 numpy 类型 JSON 序列化

numpy 的 int64 / float64 不能直接 json.dumps(),报 Object of type int64 is not JSON serializable

解决:

1
2
3
4
# numpy int64 -> Python int
"count": int(len(records))
# numpy float64 -> Python float
"avg": float(round(np.mean(values), 2))

5.6 Node.js MQTT 客户端 localhost 解析为 IPv6

Web 界面显示”设备: 0”,没有任何传感器数据。排查发现 Web 服务器的 MQTT 客户端连不上 Broker,报 ECONNREFUSED ::1:1883。mqtt v5.x 把 localhost 解析成了 IPv6 ::1,但 Mosquitto 只配了 listener 1883 0.0.0.0(仅 IPv4),连接直接被拒。

index.js 里的 mqtt://localhost:1883 改成 mqtt://127.0.0.1:1883,强制走 IPv4 就行。

1
2
3
4
// 修改前
const MQTT_BROKER = "mqtt://localhost:1883";
// 修改后
const MQTT_BROKER = "mqtt://127.0.0.1:1883";

Python 的 paho-mqtt 不受影响,它底层会自动回退 IPv4。

5.7 聚合面板重复堆积

云端聚合每 10 秒为每种传感器类型发一次统计,前端 displayAggregation 每次都创建新的 DOM 节点,同一类型很快堆出多条重复条目,面板不断膨胀。

displayAggregation 加了去重逻辑:先 querySelector 查找同 data-sensor-type 属性的旧条目并删掉,再插入新的。每类传感器始终只保留最新一条。

1
2
3
4
5
6
7
8
9
function displayAggregation(data) {
var container = document.getElementById("aggregationData");
// 去重:移除同 sensor_type 的旧条目
var existing = container.querySelector(
'.agg-item[data-sensor-type="' + data.sensor_type + '"]'
);
if (existing) existing.remove();
// ... 插入新条目,上限 8 条 ...
}

5.8 异常检测告警洪水

motion 传感器 70% 时间输出 0,dust 传感器值多在 30-50 之间,而原分类阈值把 motion<10 判为”low”、dust<75 判为”low”。结果是几乎每个数据点都触发告警,每 10 秒涌出 ~10 条。

两步修复:(1) 后端 anomaly_detector.py(device_id, alert_type, severity) 做 60 秒冷却,同类同级别告警在冷却期内直接跳过;(2) 前端 app.jssensor_type/device_id/type 对告警去重,重复的只更新计数和时间戳,显示时追加 (×N)alertCount 也改为唯一告警数。

5.9 分类阈值与模拟器值域不匹配

上面 5.8 的根因是 data_preprocessor.pyclassify() 阈值和模拟器实际生成的值域对不上。调整后:

传感器 模拟值域 旧 normal 区间 新 normal 区间
motion 0~10 10~30 0~15
dust 15~55 75~150 15~80
temperature 10~30 10~40 5~35

告警量从 ~10 条/10 秒降到 0,仅模拟器 5% 概率注入的随机异常时偶尔触发,符合预期。

5.10 前端 app.js 语法错误导致页面无数据

Python 脚本写 app.jsquerySelector 那行的引号转义出了错(写入了 \\"),导致整个 JS 文件解析失败,Socket.IO 代码从未执行,页面始终显示”设备: 0”。修复后加了 Cache-Control: no-cache 头,防止浏览器缓存旧版 JS。

5.11 重复启动导致进程残留

再次执行 start_system.sh 时没先停旧进程,导致所有 Python 组件出现双份实例,Web 服务器因端口 3000 被占报 EADDRINUSE。在 start_system.sh 开头加了一行 bash stop_system.sh,确保每次启动先清理残留。


六、实验总结

本实验在 OrbStack Ubuntu 22.04 ARM64 环境下成功搭建了一个完整的 IoT 边缘计算原型系统,包含:

  • 1 个 MQTT Broker(Mosquitto 2.0.11)
  • 8 个传感器模拟器(6 类,Python)
  • 2 个边缘处理节点(预处理 + 异常检测,Python)
  • 1 个云端聚合服务(Python)
  • 1 个 Web 实时监控界面(Node.js + Socket.IO)

通过本实验深入理解了 MQTT 的 Pub/Sub 模式、Topic 层级设计和边缘计算架构。系统打通了 数据采集 → 边缘预处理 → 异常检测 → 云端聚合 → 可视化展示 的完整链路。

后续可扩展方向

  • 添加 TLS/SSL 加密和用户名密码认证
  • 使用 Docker Compose 容器化部署
  • 接入真实硬件传感器(ESP32/Arduino)
  • 集成时序数据库(InfluxDB)持久化存储
  • 添加 Grafana 可视化大屏

附录:快速参考

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 启动 / 停止系统
~/mqtt-iot-experiment/start_system.sh
~/mqtt-iot-experiment/stop_system.sh

# 查看日志
tail -f ~/mqtt-iot-experiment/logs/anomaly_detector.log

# 监控 MQTT 实时消息
mosquitto_sub -h localhost -t "sensors/+/+"

# 手动发布异常数据(触发告警)
mosquitto_pub -h localhost -t "sensors/temperature/temp_001" \
-m '{"device_id":"temp_001","sensor_type":"temperature","value":100.0,...}'

# 查看运行中的进程
ps aux | grep -E "device_manager|preprocessor|anomaly|cloud_agg|node index"