edgeOS 北向通道实现文档
概述
本实现为 EdgeX 边缘采集网关添加了两个新的北向通道,用于与 edgeOS 蜂群网络进行通信:
- edgeOS(MQTT): 基于 MQTT 协议的北向通道
- edgeOS(NATS): 基于 NATS 协议的北向通道
架构设计
目录结构
internal/northbound/
├── edgos_mqtt/
│ └── client.go # edgeOS(MQTT) 客户端实现
├── edgos_nats/
│ └── client.go # edgeOS(NATS) 客户端实现
internal/core/
├── northbound_manager.go # 北向管理器(已更新)
└── northbound_manager_edgos.go # edgeOS 配置管理扩展
internal/model/
└── types.go # 数据模型(已更新)
internal/server/
└── server_edgos.go # edgeOS API 端点
ui/src/components/northbound/
├── NorthboundEdgeOSMqtt.vue # edgeOS(MQTT) 卡片组件
├── NorthboundEdgeOSNats.vue # edgeOS(NATS) 卡片组件
├── EdgeOSMQTTSettingsDialog.vue # edgeOS(MQTT) 配置对话框
├── EdgeOSNATSSettingsDialog.vue # edgeOS(NATS) 配置对话框
└── EdgeOSHelpDialog.vue # edgeOS 帮助文档
核心功能
1. edgeOS(MQTT) 客户端
主要功能:
- MQTT 3.1.1 协议支持
- 自动重连机制
- 节点注册与心跳
- 实时数据上报
- 设备状态上报
- 命令接收与响应:
- 设备发现命令
- 写入命令
- 任务控制命令
配置参数:
type EdgeOSMQTTConfig struct {
ID string `json:"id"`
Name string `json:"name"`
Enable bool `json:"enable"`
Broker string `json:"broker"` // tcp://127.0.0.1:1883
ClientID string `json:"client_id"`
NodeID string `json:"node_id"`
Username string `json:"username"`
Password string `json:"password"`
QoS byte `json:"qos"` // 0/1/2
Retain bool `json:"retain"`
KeepAlive int `json:"keep_alive"`
AutoReconnect bool `json:"auto_reconnect"`
HeartbeatInterval string `json:"heartbeat_interval"`
Devices map[string]bool `json:"devices"`
}
消息主题:
- 上行(EdgeX → EdgeOS):
edgex/nodes/register- 节点注册edgex/data/{node_id}/{device_id}- 实时数据edgex/nodes/{node_id}/heartbeat- 心跳edgex/nodes/{node_id}/devices/{device_id}/status- 设备状态
- 下行(EdgeOS → EdgeX):
edgex/cmd/{node_id}/discover- 设备发现edgex/cmd/{node_id}/{device_id}/write- 写入数据edgex/cmd/{node_id}/task/{task_id}/{action}- 任务控制
2. edgeOS(NATS) 客户端
主要功能:
- NATS 2.x 协议支持
- JetStream 持久化支持
- 请求/响应模式
- Subject 通配符支持
- 自动重连机制
- 节点注册与心跳
- 实时数据上报
- 命令接收与响应
配置参数:
type EdgeOSNATSConfig struct {
ID string `json:"id"`
Name string `json:"name"`
Enable bool `json:"enable"`
URL string `json:"url"` // nats://127.0.0.1:4222
ClientID string `json:"client_id"`
NodeID string `json:"node_id"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
JetStreamEnabled bool `json:"jetstream_enabled"`
MaxReconnects int `json:"max_reconnects"`
PingInterval int `json:"ping_interval"`
HeartbeatInterval string `json:"heartbeat_interval"`
Devices map[string]bool `json:"devices"`
}
消息主题:
- 上行(EdgeX → EdgeOS):
edgex.nodes.register- 节点注册edgex.data.{node_id}.{device_id}- 实时数据edgex.nodes.{node_id}.heartbeat- 心跳
- 下行(EdgeOS → EdgeX):
edgex.cmd.{node_id}.discover- 设备发现edgex.cmd.{node_id}.{device_id}.write- 写入数据edgex.cmd.{node_id}.task.{task_id}.{action}- 任务控制
API 端点
配置管理
| 端点 | 方法 | 说明 |
|---|---|---|
/northbound/edgeos-mqtt |
POST | 创建/更新 edgeOS(MQTT) 配置 |
/northbound/edgeos-mqtt/:id |
DELETE | 删除 edgeOS(MQTT) 配置 |
/northbound/edgeos-mqtt/:id/stats |
GET | 获取 edgeOS(MQTT) 统计信息 |
/northbound/edgeos-mqtt/publish |
POST | 发布消息到 edgeOS(MQTT) |
/northbound/edgeos-nats |
POST | 创建/更新 edgeOS(NATS) 配置 |
/northbound/edgeos-nats/:id |
DELETE | 删除 edgeOS(NATS) 配置 |
/northbound/edgeos-nats/:id/stats |
GET | 获取 edgeOS(NATS) 统计信息 |
/northbound/edgeos-nats/publish |
POST | 发布消息到 edgeOS(NATS) |
统计信息
edgeOS(MQTT) 统计:
{
"success_count": 1234,
"fail_count": 5,
"reconnect_count": 2,
"publish_count": 1239,
"last_offline_time": 1744680000000,
"last_online_time": 1744680000000
}
edgeOS(NATS) 统计:
{
"success_count": 1234,
"fail_count": 5,
"reconnect_count": 2,
"publish_count": 1239,
"last_offline_time": 1744680000000,
"last_online_time": 1744680000000
}
消息格式
所有 edgeOS 消息遵循统一格式:
{
"header": {
"message_id": "msg-001",
"timestamp": 1744680000000,
"source": "edgex-node-001",
"destination": "edgeos-queen",
"message_type": "data",
"version": "1.0",
"correlation_id": "req-001"
},
"body": {
// 消息体内容
}
}
消息类型
| 类型 | 说明 |
|---|---|
node_register |
节点注册 |
data |
实时数据 |
heartbeat |
心跳 |
discover_command |
设备发现命令 |
write_command |
写入命令 |
task_create |
任务创建 |
task_control |
任务控制 |
discover_response |
发现响应 |
write_response |
写入响应 |
task_response |
任务响应 |
前端组件
卡片组件
- NorthboundEdgeOSMqtt.vue: 显示 edgeOS(MQTT) 通道卡片
- NorthboundEdgeOSNats.vue: 显示 edgeOS(NATS) 通道卡片
配置对话框
- EdgeOSMQTTSettingsDialog.vue: edgeOS(MQTT) 配置表单
- EdgeOSNATSSettingsDialog.vue: edgeOS(NATS) 配置表单
帮助文档
- EdgeOSHelpDialog.vue: edgeOS 协议帮助文档,包含三个标签页:
- edgeOS(MQTT) 说明
- edgeOS(NATS) 说明
- 配置说明
使用示例
1. 创建 edgeOS(MQTT) 配置
curl -X POST http://localhost:8082/api/northbound/edgeos-mqtt \
-H "Content-Type: application/json" \
-d '{
"id": "edgeos-mqtt-1",
"name": "edgeOS MQTT Channel",
"enable": true,
"broker": "tcp://127.0.0.1:1883",
"client_id": "edgex-node-001",
"node_id": "edgex-node-001",
"username": "edgex",
"password": "secret",
"qos": 1,
"keep_alive": 60
}'
2. 创建 edgeOS(NATS) 配置
curl -X POST http://localhost:8082/api/northbound/edgeos-nats \
-H "Content-Type: application/json" \
-d '{
"id": "edgeos-nats-1",
"name": "edgeOS NATS Channel",
"enable": true,
"url": "nats://127.0.0.1:4222",
"client_id": "edgex-node-001",
"node_id": "edgex-node-001",
"jetstream_enabled": true
}'
3. 发布测试消息
# MQTT
curl -X POST http://localhost:8082/api/northbound/edgeos-mqtt/publish \
-H "Content-Type: application/json" \
-d '{
"client_id": "edgeos-mqtt-1",
"topic": "edgex/test/data",
"payload": "{\"test\": true}"
}'
# NATS
curl -X POST http://localhost:8082/api/northbound/edgeos-nats/publish \
-H "Content-Type: application/json" \
-d '{
"client_id": "edgeos-nats-1",
"subject": "edgex.test.data",
"payload": "{\"test\": true}"
}'
部署说明
依赖项
已添加到 go.mod:
github.com/nats-io/nats.go v1.31.0
配置文件
edgeOS 配置保存在系统配置文件中(conf/system.yaml 或数据库),格式如下:
northbound:
edgeos_mqtt:
- id: edgeos-mqtt-1
name: edgeOS MQTT Channel
enable: true
broker: tcp://127.0.0.1:1883
client_id: edgex-node-001
node_id: edgex-node-001
edgeos_nats:
- id: edgeos-nats-1
name: edgeOS NATS Channel
enable: true
url: nats://127.0.0.1:4222
client_id: edgex-node-001
node_id: edgex-node-001
测试
单元测试
go test ./internal/northbound/edgos_mqtt/...
go test ./internal/northbound/edgos_nats/...
集成测试
启动 MQTT Broker:
docker run -d -p 1883:1883 eclipse-mosquitto
启动 NATS Server:
docker run -d -p 4222:4222 nats
启动带 JetStream 的 NATS:
docker run -d -p 4222:4222 nats -js
消息测试
MQTT:
# 订阅所有 edgex 消息
mosquitto_sub -h 127.0.0.1 -p 1883 -t "edgex/#" -v
# 发布测试命令
mosquitto_pub -h 127.0.0.1 -p 1883 -t "edgex/cmd/edgex-node-001/discover" -m '{
"header": {
"message_id": "test-001",
"timestamp": 1744680000000,
"source": "edgeos-queen",
"message_type": "discover_command",
"version": "1.0"
},
"body": {
"protocol": "modbus-tcp",
"network": "192.168.1.0/24"
}
}'
NATS:
# 订阅所有 edgex 消息
nats sub "edgex.>"
# 发布测试命令
nats pub "edgex.cmd.edgex-node-001.discover" '{
"header": {
"message_id": "test-001",
"timestamp": 1744680000000,
"source": "edgeos-queen",
"message_type": "discover_command",
"version": "1.0"
},
"body": {
"protocol": "modbus-tcp",
"network": "192.168.1.0/24"
}
}'
性能优化
edgeOS(MQTT)
- QoS 级别选择
- QoS 0: 最多一次,适合实时数据、心跳
- QoS 1: 至少一次,适合设备上报、命令控制
- QoS 2: 恰好一次,适合告警消息、重要状态
- 连接优化
- 自动重连
- LWT (Last Will Testament)
- 心跳保活
edgeOS(NATS)
- JetStream 持久化
- 启用后消息可持久化
- 支持重放历史消息
- 适合需要可靠消息的场景
- Subject 设计
- 合理使用通配符
- 避免过度订阅
故障排查
连接失败
MQTT:
# 检查 Broker 是否运行
telnet 127.0.0.1 1883
# 查看日志
tail -f logs/edgex-gateway.log | grep edgeOS
NATS:
# 检查 Server 是否运行
telnet 127.0.0.1 4222
# 查看 NATS 服务器状态
nats server info
消息丢失
- 检查 QoS/Subject 配置
- 检查网络连接稳定性
- 启用消息持久化(NATS JetStream)
- 查看统计信息确认发送/接收状态
版本兼容性
| edgeOS 版本 | 协议版本 | 支持中间件 | 状态 |
|---|---|---|---|
| v1.0 | v1.0 | MQTT 3.1.1, NATS 2.x | 当前 |