EdgeOS 设备级数据推送验证方案
一、核心改进说明
1.1 数据模型升级
修改文件: internal/model/types.go
改进内容:
- DevicePublishConfig 结构体更新:
Strategy字段: 从"periodic" or "cov"改为"realtime" or "periodic"Interval字段: 从0 means use collection interval改为Push interval for periodic mode (e.g., "5s", "1m")
- EdgeOSMQTTConfig.Devices: 从
map[string]bool升级为map[string]DevicePublishConfig- 旧: 设备启用状态 (true/false)
- 新: 完整推送配置 (Enable + Strategy + Interval)
- EdgeOSNATSConfig.Devices: 同上升级
1.2 MQTT 客户端实现升级
修改文件: internal/northbound/edgos_mqtt/client.go
核心改进:
- 设备级数据聚合:
type deviceAggregator struct { points map[string]model.Value // pointID -> Value lastPushTS time.Time pushInterval time.Duration mu sync.RWMutex } - 推送策略支持:
- Realtime (实时模式): 每次数据到达立即推送,但包含设备的所有点
- Periodic (周期模式): 聚合设备所有点,按配置周期批量推送
- 设备级推送:
func (c *Client) publishDeviceData(deviceID string, points map[string]any, quality string, ts time.Time)- 一次推送包含设备的多个点位数据
- 符合协议规范的 JSON 格式
- 周期推送循环:
func (c *Client) periodicPushLoop() func (c *Client) checkAndPushPeriodicDevices()- 每秒检查需要推送的设备
- 确保周期模式下数据按时推送
1.3 NATS 客户端实现升级
修改文件: internal/northbound/edgos_nats/client.go
核心改进:
- 与 MQTT 客户端完全一致的实现
- 支持相同的设备级聚合和推送策略
- 确保两个协议的行为一致
二、JSON 格式验证
2.1 协议要求的格式
根据 EdgeX-EdgeOS通信协议规范(MQTT-NATS).md 第 4.5 节,数据推送格式如下:
{
"header": {
"message_id": "msg-abc123...",
"timestamp": 1713260400000,
"source": "node-001",
"destination": "",
"message_type": "data",
"version": "1.0"
},
"body": {
"node_id": "node-001",
"device_id": "device-001",
"timestamp": 1713260400000,
"points": {
"Temperature": 25.5,
"Humidity": 65.2,
"Pressure": 101325,
"Switch": true
},
"quality": "good"
}
}
2.2 改进前后的对比
改进前 (单点推送):
{
"body": {
"points": {
"Temperature": 25.5 // ❌ 只有一个点
}
}
}
改进后 (设备级推送):
{
"body": {
"points": {
"Temperature": 25.5, // ✅ 多个点
"Humidity": 65.2,
"Pressure": 101325,
"Switch": true
}
}
}
三、配置示例
3.1 EdgeOS(MQTT) 配置示例
edgeos_mqtt:
- id: "mqtt-edgeos-1"
name: "EdgeOS MQTT Channel 1"
enable: true
broker: "tcp://localhost:1883"
client_id: "edgex-node-001"
node_id: "node-001"
username: ""
password: ""
qos: 0
retain: false
clean_session: true
keep_alive: 60
connect_timeout: 30
auto_reconnect: true
max_reconnect_interval: 60
heartbeat_interval: "30s"
devices:
device-001:
enable: true
strategy: "realtime" # 实时推送
interval: "0s"
device-002:
enable: true
strategy: "periodic" # 周期推送
interval: "10s"
device-003:
enable: false # 禁用
strategy: "realtime"
interval: "0s"
3.2 EdgeOS(NATS) 配置示例
edgeos_nats:
- id: "nats-edgeos-1"
name: "EdgeOS NATS Channel 1"
enable: true
url: "nats://localhost:4222"
client_id: "edgex-node-001"
node_id: "node-001"
username: ""
password: ""
token: ""
connect_timeout: 10
reconnect_wait: 2
max_reconnects: 10
ping_interval: 20
max_pings_outstanding: 5
jetstream_enabled: false
heartbeat_interval: "30s"
devices:
device-001:
enable: true
strategy: "realtime"
interval: "0s"
device-002:
enable: true
strategy: "periodic"
interval: "5s"
四、测试方案
4.1 单元测试
测试 1: 实时模式推送验证
测试步骤:
- 配置一个设备使用
strategy: realtime - 采集该设备的多个点位数据
- 验证每次点位更新都触发推送
- 检查 MQTT/NATS 消息中的
points字段包含所有点位
预期结果:
- ✅ 每次点位更新立即推送
- ✅ 消息包含设备的所有点位数据
- ✅ 符合协议 JSON 格式
测试 2: 周期模式推送验证
测试步骤:
- 配置一个设备使用
strategy: periodic和interval: 10s - 快速采集该设备的多个点位数据
- 等待 10 秒
- 验证只推送一次,包含所有点位
预期结果:
- ✅ 点位更新不会立即推送
- ✅ 10 秒后推送一次包含所有点位的消息
- ✅ 后续点位更新继续聚合
- ✅ 下一个周期再次推送
测试 3: 设备启用状态过滤验证
测试步骤:
- 配置三个设备:
- device-001: enable=true, strategy=realtime
- device-002: enable=false, strategy=realtime
- device-003: enable=true, strategy=periodic
- 采集所有三个设备的数据
- 验证推送行为
预期结果:
- ✅ device-001 数据立即推送
- ✅ device-002 数据不推送 (被过滤)
- ✅ device-003 数据按周期推送
4.2 集成测试
测试 4: 协议格式完整性验证
测试工具: MQTT Explorer / NATS Subscriber
测试步骤:
- 订阅主题:
edgex/data/{node_id}/{device_id} - 触发设备数据推送
- 捕获消息并解析 JSON
- 验证字段完整性
检查项:
{
"header": {
"message_id": "必需字段 ✅",
"timestamp": "毫秒时间戳 ✅",
"source": "node_id ✅",
"message_type": "data ✅",
"version": "1.0 ✅"
},
"body": {
"node_id": "节点ID ✅",
"device_id": "设备ID ✅",
"timestamp": "数据时间戳 ✅",
"points": "包含多个点位 ✅",
"quality": "good/bad ✅"
}
}
测试 5: 前后端通信验证
测试步骤:
- 通过 UI 配置 EdgeOS(MQTT) 和 EdgeOS(NATS) 通道
- 启用多个设备并设置不同策略
- 触发数据采集
- 在前端查看统计数据
验证项:
- ✅ 前端正确显示连接状态
- ✅ 统计数据 (success_count, fail_count, publish_count) 实时更新
- ✅ 设备配置正确保存和加载
4.3 性能测试
测试 6: 高频数据推送验证
测试场景:
- 设备数量: 10 个
- 每设备点位: 50 个
- 采集频率: 1 秒
测试步骤:
- 配置 5 个设备使用 realtime 模式
- 配置 5 个设备使用 periodic (interval: 5s) 模式
- 运行 1 分钟
- 统计推送次数
预期结果:
- ✅ Realtime 设备推送约 300 次 (10 设备 × 50 点 × 60 秒 / 10 realtime 设备)
- ✅ Periodic 设备推送约 60 次 (5 设备 × 12 周期 / 60 秒)
- ✅ 无推送失败
- ✅ CPU/内存使用正常
五、验证检查清单
5.1 代码检查
- DevicePublishConfig 结构体更新完成
- EdgeOSMQTTConfig.Devices 类型更新为 map[string]DevicePublishConfig
- EdgeOSNATSConfig.Devices 类型更新为 map[string]DevicePublishConfig
- MQTT 客户端添加 deviceAggregator 结构体
- MQTT 客户端实现 aggregatePoint 方法
- MQTT 客户端实现 periodicPushLoop 方法
- MQTT 客户端实现 publishDeviceData 方法
- NATS 客户端添加 deviceAggregator 结构体
- NATS 客户端实现相同的聚合和推送逻辑
- PublishDeviceStatus 方法更新设备启用状态检查
5.2 功能检查
- 实时模式: 单点更新立即推送设备所有数据
- 周期模式: 点位更新聚合后按周期推送
- 设备过滤: 未启用设备数据不推送
- JSON 格式: 消息包含完整的 header 和 body
- 设备级聚合: points 字段包含多个点位
- 时间戳正确: body.timestamp 使用数据的最新时间
- 质量字段: quality 正确传递
5.3 测试检查
- 单元测试: 实时模式验证通过
- 单元测试: 周期模式验证通过
- 单元测试: 设备过滤验证通过
- 集成测试: 协议格式验证通过
- 集成测试: 前后端通信验证通过
- 性能测试: 高频数据推送验证通过
六、部署前验证
6.1 编译检查
# 编译检查
go build -o gateway.exe cmd/gateway/main.go
# 运行单元测试
go test ./internal/northbound/edgos_mqtt/...
go test ./internal/northbound/edgos_nats/...
6.2 配置迁移
重要提示: 配置格式发生变化,需要迁移现有配置
旧格式:
devices:
device-001: true
device-002: false
新格式:
devices:
device-001:
enable: true
strategy: "realtime"
interval: "0s"
device-002:
enable: false
strategy: "realtime"
interval: "0s"
6.3 回滚计划
如果出现问题:
- 恢复旧代码版本
- 使用旧配置格式
- 重启服务
七、总结
核心改进:
- ✅ 设备级数据推送: 从单点推送升级为设备级推送
- ✅ 推送策略支持: 支持 realtime 和 periodic 两种模式
- ✅ 协议格式符合: JSON 格式完全符合 EdgeX-EdgeOS 通信协议规范
- ✅ UI 配置联动: 设备启用状态和推送周期设置生效
技术亮点:
- 使用设备聚合器管理周期模式数据
- 读写锁确保并发安全
- 高效的定时检查机制
- 支持两种协议的统一实现
下一步:
- 执行测试验证清单
- 性能测试和优化
- UI 界面更新以支持新配置字段
- 用户文档更新