EdgeOS 后端实现指南
本文档聚焦于 EdgeOS 通过 UI 添加消息总线(MQTT/NATS),再通过中间件监听指定主题,按以下四个核心功能顺序完整说明后端实现:
- EdgeX 节点注册
- EdgeX 子设备列表同步
- EdgeX 子设备点位同步(即物模型点位上报 可用将实时数据点位理解成物模型)
- EdgeX 子设备双向控制
目录
- 架构总览
- 消息总线管理
- EdgeX 节点注册
- EdgeX 子设备列表同步
- EdgeX 子设备点位同步(即物模型点位上报 可用将实时数据点位理解成物模型)
- EdgeX 子设备双向控制
- 心跳与状态管理
- 告警与事件处理
- 错误处理与重试策略
- 安全与认证
- 测试与排查
- 项目结构参考
1. 架构总览
┌────────────────────────────────────────────────────────────────┐
│ EdgeOS 后端 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 消息总线层 │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ MQTT Client │ │ NATS Client │ │ │
│ │ │ (paho.mqtt) │ │ (nats.go) │ │ │
│ │ └────────┬─────────┘ └────────┬─────────┘ │ │
│ └────────────┼───────────────────┬─┘ │ │
│ │ │ │ │
│ ┌────────────▼───────────────────▼────────────────────┐ │ │
│ │ 消息路由器 (MessageRouter) │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌───────┐ │ │ │
│ │ │注册Handler│ │设备Handler│ │点位Handler│ │控制Hdl│ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └───────┘ │ │ │
│ └──────────────────────────┬──────────────────────────┘ │ │
│ │ │ │
│ ┌──────────────────────────▼──────────────────────────┐ │ │
│ │ 服务层 (Services) │ │ │
│ │ NodeService / DeviceService / PointService / ... │ │ │
│ └──────────────────────────┬──────────────────────────┘ │ │
│ │ │ │
│ ┌──────────────────────────▼──────────────────────────┐ │ │
│ │ 存储层 (Storage) │ │ │
│ └──────────────────────────────────────────────────────┘ │ │
└────────────────────────────────────────────────────────────────┘
▲ ▲
│ 消息中间件 (MQTT/NATS) │
▼ ▼
┌──────────────────────────────────────────────────────────────┐
│ EdgeX 边缘采集网关 │
│ 发布:节点注册 / 设备上报 / 点位上报 / 实时数据 / 心跳 │
│ 订阅:设备发现命令 / 写入命令 / 任务控制 / 配置更新 │
└──────────────────────────────────────────────────────────────┘
核心数据流向:
EdgeX → 发布 → 消息中间件 → EdgeOS 订阅 → 路由 → 处理器 → 服务层 → 存储/推送
EdgeOS → 发布控制命令 → 消息中间件 → EdgeX 订阅 → 执行 → 响应
2. 消息总线管理
2.1 功能描述
EdgeOS 通过 UI 界面 动态添加 MQTT/NATS 消息总线配置,后端提供 REST API 保存连接信息并创建客户端实例,连接成功后立即订阅四个功能所需的全部 Topic/Subject。
2.2 连接配置数据结构
// internal/models/middleware.go
type MiddlewareType string
const (
MiddlewareMQTT MiddlewareType = "edgeOS(MQTT)"
MiddlewareNATS MiddlewareType = "edgeOS(NATS)"
)
// MiddlewareConfig 前端提交的连接配置
type MiddlewareConfig struct {
ID string `json:"id"` // 唯一ID(后端生成)
Name string `json:"name"` // 连接名称,如 "生产环境MQTT"
Type MiddlewareType `json:"type"` // edgeOS(MQTT) 或 edgeOS(NATS)
Description string `json:"description"` // 备注
MQTT *MQTTConfig `json:"mqtt,omitempty"`
NATS *NATSConfig `json:"nats,omitempty"`
Status ConnStatus `json:"status"` // connected / disconnected / error
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type MQTTConfig struct {
Broker string `json:"broker"` // tcp://127.0.0.1:1883
ClientID string `json:"client_id"`
Username string `json:"username"`
Password string `json:"password"`
QoS byte `json:"qos"` // 0/1/2
CleanSession bool `json:"clean_session"`
KeepAlive int `json:"keep_alive"` // 秒
ConnectTimeout int `json:"connect_timeout"` // 秒
AutoReconnect bool `json:"auto_reconnect"`
MaxReconnectInterval int `json:"max_reconnect_interval"` // 秒
TLSEnabled bool `json:"tls_enabled"`
CACert string `json:"ca_cert,omitempty"`
ClientCert string `json:"client_cert,omitempty"`
ClientKey string `json:"client_key,omitempty"`
}
type NATSConfig struct {
URL string `json:"url"` // nats://127.0.0.1:4222
ClientName string `json:"client_name"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token,omitempty"`
ConnectTimeout int `json:"connect_timeout"`
ReconnectWait int `json:"reconnect_wait"`
MaxReconnects int `json:"max_reconnects"`
JetStreamEnabled bool `json:"jetstream_enabled"`
TLSEnabled bool `json:"tls_enabled"`
}
type ConnStatus string
const (
ConnStatusConnected ConnStatus = "connected"
ConnStatusDisconnected ConnStatus = "disconnected"
ConnStatusError ConnStatus = "error"
ConnStatusConnecting ConnStatus = "connecting"
)
2.3 REST API 接口
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/api/v1/middlewares |
添加消息总线 |
GET |
/api/v1/middlewares |
获取所有连接列表 (包含 连接/断开 状态) |
GET |
/api/v1/middlewares/:id |
获取单个连接详情 |
PUT |
/api/v1/middlewares/:id |
更新连接配置 |
DELETE |
/api/v1/middlewares/:id |
删除连接 |
POST |
/api/v1/middlewares/:id/connect |
手动触发连接 |
POST |
/api/v1/middlewares/:id/disconnect |
断开连接 |
GET |
/api/v1/middlewares/:id/status |
查询连接状态 |
POST |
/api/v1/middlewares/:id/test |
测试连接(不持久化) |
2.4 连接管理器实现
// internal/middleware/manager.go
package middleware
import (
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
// ConnectionManager 管理所有消息总线
type ConnectionManager struct {
mu sync.RWMutex
connections map[string]*Connection // id -> Connection
storage MiddlewareStorage
router MessageRouter
log *logrus.Logger
}
// Connection 单个消息总线实例
type Connection struct {
Config *MiddlewareConfig
Client MiddlewareClient // 接口,MQTT或NATS实现
subs []Subscription // 已订阅列表
}
// MiddlewareClient 统一客户端接口
type MiddlewareClient interface {
Connect() error
Disconnect()
Subscribe(topic string, handler MessageHandler) error
Publish(topic string, msgType string, body interface{}) error
IsConnected() bool
Status() ConnStatus
}
// MessageHandler 消息处理函数
type MessageHandler func(topic string, msg *Message) error
func NewConnectionManager(storage MiddlewareStorage, router MessageRouter, log *logrus.Logger) *ConnectionManager {
return &ConnectionManager{
connections: make(map[string]*Connection),
storage: storage,
router: router,
log: log,
}
}
// AddConnection 添加并连接中间件
func (m *ConnectionManager) AddConnection(cfg *MiddlewareConfig) (*MiddlewareConfig, error) {
cfg.ID = uuid.NewString()
cfg.Status = ConnStatusConnecting
cfg.CreatedAt = time.Now().UnixMilli()
cfg.UpdatedAt = cfg.CreatedAt
var client MiddlewareClient
var err error
switch cfg.Type {
case MiddlewareMQTT:
client, err = NewMQTTClient(cfg.MQTT, m.log)
case MiddlewareNATS:
client, err = NewNATSClient(cfg.NATS, m.log)
default:
return nil, fmt.Errorf("unsupported middleware type: %s", cfg.Type)
}
if err != nil {
cfg.Status = ConnStatusError
return cfg, err
}
if err = client.Connect(); err != nil {
cfg.Status = ConnStatusError
_ = m.storage.Save(cfg)
return cfg, fmt.Errorf("connect failed: %w", err)
}
cfg.Status = ConnStatusConnected
conn := &Connection{Config: cfg, Client: client}
// 连接成功后订阅全部功能主题
if err = m.subscribeAll(conn); err != nil {
m.log.Warnf("partial subscribe error: %v", err)
}
m.mu.Lock()
m.connections[cfg.ID] = conn
m.mu.Unlock()
_ = m.storage.Save(cfg)
return cfg, nil
}
// subscribeAll 订阅四个核心功能所需的所有主题
func (m *ConnectionManager) subscribeAll(conn *Connection) error {
topics := buildTopicList(conn.Config.Type)
for _, t := range topics {
if err := conn.Client.Subscribe(t, m.router.Route); err != nil {
return fmt.Errorf("subscribe %s failed: %w", t, err)
}
conn.subs = append(conn.subs, Subscription{Topic: t})
m.log.Infof("[%s] subscribed: %s", conn.Config.Name, t)
}
return nil
}
// buildTopicList 根据协议类型返回需要订阅的主题列表
func buildTopicList(t MiddlewareType) []string {
if t == MiddlewareMQTT {
return []string{
// 1. 节点注册(Stage 1:被动)
"edgex/nodes/register",
"edgex/nodes/unregister",
// 1b. 节点发现(Stage 2:主动触发 EdgeX 重新注册)
"edgex/cmd/nodes/register",
// 2. 设备同步
"edgex/devices/report",
// 3. 点位同步(即物模型点位上报 可用将实时数据点位理解成物模型)
"edgex/points/report",
// 4. 实时数据(双向控制读侧)
"edgex/data/#",
// 响应
"edgex/responses/#",
// 心跳与状态
"edgex/nodes/+/heartbeat",
"edgex/nodes/+/status",
"edgex/nodes/+/online",
"edgex/nodes/+/offline",
// 告警
"edgex/events/alert",
"edgex/events/error",
"edgex/events/info",
}
}
// NATS
return []string{
"edgex.nodes.register",
"edgex.nodes.unregister",
"edgex.cmd.nodes.register",
"edgex.devices.report",
"edgex.points.report",
"edgex.data.>",
"edgex.res.>",
"edgex.nodes.heartbeat.>",
"edgex.nodes.status.>",
"edgex.events.alert",
"edgex.events.error",
"edgex.events.info",
}
}
2.5 MQTT 客户端实现
// internal/middleware/mqtt_client.go
package middleware
import (
"encoding/json"
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
)
type mqttClient struct {
cfg *MQTTConfig
client mqtt.Client
log *logrus.Logger
}
func NewMQTTClient(cfg *MQTTConfig, log *logrus.Logger) (MiddlewareClient, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(cfg.Broker)
opts.SetClientID(cfg.ClientID)
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
opts.SetCleanSession(cfg.CleanSession)
opts.SetKeepAlive(time.Duration(cfg.KeepAlive) * time.Second)
opts.SetConnectTimeout(time.Duration(cfg.ConnectTimeout) * time.Second)
opts.SetAutoReconnect(cfg.AutoReconnect)
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Info("[MQTT] connected")
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Warnf("[MQTT] connection lost: %v", err)
})
return &mqttClient{cfg: cfg, client: mqtt.NewClient(opts), log: log}, nil
}
func (c *mqttClient) Connect() error {
if token := c.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func (c *mqttClient) Disconnect() { c.client.Disconnect(250) }
func (c *mqttClient) IsConnected() bool { return c.client.IsConnected() }
func (c *mqttClient) Status() ConnStatus {
if c.client.IsConnected() {
return ConnStatusConnected
}
return ConnStatusDisconnected
}
func (c *mqttClient) Subscribe(topic string, handler MessageHandler) error {
token := c.client.Subscribe(topic, c.cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) {
var m Message
if err := json.Unmarshal(msg.Payload(), &m); err != nil {
c.log.Errorf("[MQTT] unmarshal error on %s: %v", msg.Topic(), err)
return
}
if err := handler(msg.Topic(), &m); err != nil {
c.log.Errorf("[MQTT] handler error on %s: %v", msg.Topic(), err)
}
})
token.Wait()
return token.Error()
}
func (c *mqttClient) Publish(topic string, msgType string, body interface{}) error {
msg := buildMessage(c.cfg.ClientID, msgType, body)
payload, err := json.Marshal(msg)
if err != nil {
return err
}
token := c.client.Publish(topic, c.cfg.QoS, false, payload)
token.Wait()
return token.Error()
}
2.6 NATS 客户端实现
// internal/middleware/nats_client.go
package middleware
import (
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
type natsClient struct {
cfg *NATSConfig
nc *nats.Conn
js nats.JetStreamContext
log *logrus.Logger
}
func NewNATSClient(cfg *NATSConfig, log *logrus.Logger) (MiddlewareClient, error) {
return &natsClient{cfg: cfg, log: log}, nil
}
func (c *natsClient) Connect() error {
opts := []nats.Option{
nats.Name(c.cfg.ClientName),
nats.UserInfo(c.cfg.Username, c.cfg.Password),
nats.ReconnectWait(time.Duration(c.cfg.ReconnectWait) * time.Second),
nats.MaxReconnects(c.cfg.MaxReconnects),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
c.log.Warnf("[NATS] disconnected: %v", err)
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
c.log.Info("[NATS] reconnected")
}),
}
if c.cfg.Token != "" {
opts = append(opts, nats.Token(c.cfg.Token))
}
nc, err := nats.Connect(c.cfg.URL, opts...)
if err != nil {
return err
}
c.nc = nc
if c.cfg.JetStreamEnabled {
if c.js, err = nc.JetStream(); err != nil {
c.log.Warnf("[NATS] jetstream unavailable: %v", err)
}
}
return nil
}
func (c *natsClient) Disconnect() {
if c.nc != nil {
c.nc.Drain()
}
}
func (c *natsClient) IsConnected() bool { return c.nc != nil && c.nc.IsConnected() }
func (c *natsClient) Status() ConnStatus {
if c.IsConnected() {
return ConnStatusConnected
}
return ConnStatusDisconnected
}
func (c *natsClient) Subscribe(subject string, handler MessageHandler) error {
_, err := c.nc.Subscribe(subject, func(msg *nats.Msg) {
var m Message
if err := json.Unmarshal(msg.Data, &m); err != nil {
c.log.Errorf("[NATS] unmarshal error on %s: %v", msg.Subject, err)
return
}
if err := handler(msg.Subject, &m); err != nil {
c.log.Errorf("[NATS] handler error on %s: %v", msg.Subject, err)
}
})
return err
}
func (c *natsClient) Publish(subject string, msgType string, body interface{}) error {
msg := buildMessage(c.cfg.ClientName, msgType, body)
payload, err := json.Marshal(msg)
if err != nil {
return err
}
return c.nc.Publish(subject, payload)
}
// Request 发起请求/响应(NATS 特有)
func (c *natsClient) Request(subject string, msgType string, body interface{}, timeout time.Duration) (*Message, error) {
msg := buildMessage(c.cfg.ClientName, msgType, body)
payload, err := json.Marshal(msg)
if err != nil {
return nil, err
}
resp, err := c.nc.Request(subject, payload, timeout)
if err != nil {
return nil, err
}
var m Message
if err = json.Unmarshal(resp.Data, &m); err != nil {
return nil, err
}
return &m, nil
}
3. EdgeX 节点注册
3.1 两阶段注册流程概述
EdgeX 节点注册分为两个阶段:
- Stage 1(被动注册):EdgeX 节点主动发布
edgex/nodes/register,EdgeOS 接收并处理注册请求。这是节点初始化时的标准注册流程。 - Stage 2(主动发现):EdgeOS 主动发布
edgex/cmd/nodes/register,触发已注册或待注册的 EdgeX 节点重新上报注册信息。用于运维管理场景——例如节点重启后需重新建立映射、拓扑变更后需刷新节点列表、或 EdgeOS 重启后需主动探测可用节点。
Stage 1: 被动注册(EdgeX → EdgeOS)
EdgeX 节点 ──(edgex/nodes/register)──→ MQTT Broker ──→ EdgeOS(处理注册)
Stage 2: 主动发现(EdgeOS → EdgeX → EdgeOS)
EdgeOS ──(edgex/cmd/nodes/register)──→ MQTT Broker ──→ EdgeX 节点
│
▼
EdgeX 节点重新发布:
edgex/nodes/register
│
▼
MQTT Broker ──→ EdgeOS(处理注册)
3.2 Stage 1:被动注册
EdgeX 消息中间件 EdgeOS
│ │ │
│── node_register ────────►│ │
│ Topic: edgex/nodes/register │
│ │──────────────────────►│
│ │ 处理注册消息 │
│ │ 1. 验证消息格式 │
│ │ 2. 检查节点是否已存在 │
│ │ 3. 持久化节点信息 │
│ │ 4. 返回注册响应 │
│◄── node_register_resp ───│◄──────────────────────│
│ 包含 access_token │ │
3.2 订阅主题
| 协议 | 主题 | QoS | 说明 |
|---|---|---|---|
| MQTT | edgex/nodes/register |
1 | Stage 1: 节点被动注册 |
| MQTT | edgex/nodes/unregister |
1 | 节点注销 |
| MQTT | edgex/cmd/nodes/register |
1 | Stage 2: EdgeOS 主动触发节点重新注册 |
| NATS | edgex.nodes.register |
- | Stage 1: 节点被动注册 |
| NATS | edgex.nodes.unregister |
- | 节点注销 |
| NATS | edgex.cmd.nodes.register |
- | Stage 2: EdgeOS 主动触发节点重新注册 |
3.3 消息结构
接收(EdgeX → EdgeOS):
{
"header": {
"message_id": "msg-node-reg-001",
"timestamp": 1744680000000,
"source": "edgex-node-001",
"destination": "edgeos-queen",
"message_type": "node_register",
"version": "1.0"
},
"body": {
"node_id": "edgex-node-001",
"node_name": "EdgeX Gateway Node",
"model": "edge-gateway",
"version": "1.0.0",
"api_version": "v1",
"capabilities": ["shadow-sync", "heartbeat", "device-control", "task-execution"],
"protocol": "edgeOS(MQTT)",
"endpoint": { "host": "127.0.0.1", "port": 8082 },
"metadata": { "os": "linux", "arch": "amd64", "hostname": "edgex-node-001.local" }
}
}
回复(EdgeOS → EdgeX):
{
"header": {
"message_id": "msg-node-reg-resp-001",
"timestamp": 1744680000500,
"source": "edgeos-queen",
"destination": "edgex-node-001",
"message_type": "node_register_response",
"version": "1.0",
"correlation_id": "msg-node-reg-001"
},
"body": {
"success": true,
"node_id": "edgex-node-001",
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"expires_in": 3600,
"message": "Node registered successfully"
}
}
3.4 处理器实现
// internal/handlers/node_handler.go
package handlers
import (
"fmt"
"time"
"github.com/sirupsen/logrus"
)
type NodeHandler struct {
nodeService NodeService
publisher Publisher
log *logrus.Logger
}
func (h *NodeHandler) HandleRegister(topic string, msg *Message) error {
var body NodeRegisterBody
if err := decodeBody(msg.Body, &body); err != nil {
return fmt.Errorf("decode node_register body: %w", err)
}
// 1. 幂等检查:已注册则直接返回成功
existing, _ := h.nodeService.GetByID(body.NodeID)
if existing != nil {
h.log.Infof("[NodeReg] node %s already registered, update info", body.NodeID)
existing.Status = "online"
existing.UpdatedAt = time.Now().UnixMilli()
_ = h.nodeService.Update(existing)
return h.replyRegisterSuccess(msg.Header, body.NodeID)
}
// 2. 持久化节点
node := &Node{
ID: body.NodeID,
Name: body.NodeName,
Model: body.Model,
Version: body.Version,
Capabilities: body.Capabilities,
Protocol: body.Protocol,
Endpoint: body.Endpoint,
Metadata: body.Metadata,
Status: "online",
RegisteredAt: time.Now().UnixMilli(),
UpdatedAt: time.Now().UnixMilli(),
}
if err := h.nodeService.Save(node); err != nil {
return fmt.Errorf("save node: %w", err)
}
h.log.Infof("[NodeReg] registered node: %s (%s)", body.NodeID, body.NodeName)
// 3. 回复注册响应
return h.replyRegisterSuccess(msg.Header, body.NodeID)
}
func (h *NodeHandler) replyRegisterSuccess(reqHeader MessageHeader, nodeID string) error {
respBody := map[string]interface{}{
"success": true,
"node_id": nodeID,
"access_token": generateToken(nodeID),
"expires_in": 3600,
"message": "Node registered successfully",
}
respHeader := MessageHeader{
MessageID: generateUUID(),
Timestamp: time.Now().UnixMilli(),
Source: "edgeos-queen",
Destination: reqHeader.Source,
MessageType: "node_register_response",
Version: "1.0",
CorrelationID: reqHeader.MessageID,
}
topic := fmt.Sprintf("edgex/responses/%s/%s", reqHeader.Source, reqHeader.MessageID)
return h.publisher.Publish(topic, respHeader, respBody)
}
func (h *NodeHandler) HandleUnregister(topic string, msg *Message) error {
var body NodeUnregisterBody
if err := decodeBody(msg.Body, &body); err != nil {
return fmt.Errorf("decode node_unregister body: %w", err)
}
return h.nodeService.MarkOffline(body.NodeID)
}
3.5 节点模型
// internal/models/node.go
type Node struct {
ID string `json:"id"`
Name string `json:"name"`
Model string `json:"model"`
Version string `json:"version"`
APIVersion string `json:"api_version"`
Capabilities []string `json:"capabilities"`
Protocol string `json:"protocol"`
Endpoint NodeEndpoint `json:"endpoint"`
Metadata map[string]string `json:"metadata"`
Status string `json:"status"` // online / offline / error
RegisteredAt int64 `json:"registered_at"`
UpdatedAt int64 `json:"updated_at"`
LastHeartbeat int64 `json:"last_heartbeat"`
}
type NodeEndpoint struct {
Host string `json:"host"`
Port int `json:"port"`
}
3.7 Stage 2:主动节点发现
EdgeOS 支持主动向中间件发布节点发现请求,触发 EdgeX 节点重新注册。
API 端点:
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/api/edgex/discover |
向第一个已连接中间件发布发现请求(广播模式) |
POST |
/api/edgex/discover/:middlewareId |
向指定中间件实例发布发现请求 |
请求示例:
# 触发所有已连接中间件的节点发现
curl -X POST http://localhost:8000/api/edgex/discover \
-H "Authorization: Bearer <token>"
# 向指定中间件触发发现
curl -X POST http://localhost:8000/api/edgex/discover/mqtt-1 \
-H "Authorization: Bearer <token>"
实现原理(messaging.Manager):
// PublishNodeDiscovery 主动向第一个已连接中间件发布节点发现请求
func (m *Manager) PublishNodeDiscovery() error {
msg := map[string]interface{}{
"header": map[string]interface{}{
"message_type": "discovery_request",
"source": "edgeos",
"timestamp": time.Now().UnixMilli(),
},
"body": map[string]interface{}{},
}
payload, _ := json.Marshal(msg)
return m.publishToFirstClient("edgex/cmd/nodes/register", payload)
}
// PublishNodeDiscoveryTo 向指定中间件发布节点发现请求
func (m *Manager) PublishNodeDiscoveryTo(middlewareID string) error {
m.mu.RLock()
defer m.mu.RUnlock()
entry, ok := m.clients[middlewareID]
if !ok || entry.client == nil || !entry.client.IsConnected() {
return fmt.Errorf("middleware %s not connected", middlewareID)
}
msg := map[string]interface{}{
"header": map[string]interface{}{
"message_type": "discovery_request",
"source": "edgeos",
"timestamp": time.Now().UnixMilli(),
},
"body": map[string]interface{}{},
}
payload, _ := json.Marshal(msg)
return entry.publishFn("edgex/cmd/nodes/register", payload)
}
NATS 等效实现:
// NATS 客户端 PublishNodeDiscoveryTo
func (c *natsClient) PublishDiscoveryRequest() error {
msg := map[string]interface{}{
"header": map[string]interface{}{
"message_type": "discovery_request",
"source": "edgeos",
"timestamp": time.Now().UnixMilli(),
},
"body": map[string]interface{}{},
}
payload, _ := json.Marshal(msg)
return c.nc.Publish("edgex.cmd.nodes.register", payload)
}
节点发现请求消息格式:
{
"header": {
"message_type": "discovery_request",
"source": "edgeos",
"timestamp": 1744680000000
},
"body": {}
}
EdgeX 节点收到该消息后,应立即重新发布 edgex/nodes/register 消息,从而触发完整的 Stage 1 注册流程(验证 + 持久化 + 响应)。
4. EdgeX 子设备列表同步
4.1 功能流程
EdgeX 消息中间件 EdgeOS
│ │ │
│── device_report ────────►│ │
│ Topic: edgex/devices/report │
│ │──────────────────────►│
│ │ 1. 校验来源节点是否 │
│ │ 已注册 │
│ │ 2. 遍历设备列表 │
│ │ 3. 新增或更新设备 │
│ │ 4. 标记消失的设备 │
│ │ │
│ │ (可选) EdgeOS主动 │
│◄─── list query ──────────│◄─ 发 list 查询命令 │
│── device list response ─►│──────────────────────►│
EdgeOS 也可主动发起设备同步:向 edgex/devices/{node_id}/list 发布查询命令,EdgeX 以 edgex/devices/report 响应。
4.2 订阅 / 发布主题
| 方向 | 协议 | 主题 | 说明 |
|---|---|---|---|
| 订阅(收) | MQTT | edgex/devices/report |
接收设备列表上报 |
| 订阅(收) | NATS | edgex.devices.report |
接收设备列表上报 |
| 发布(发) | MQTT | edgex/devices/{node_id}/list |
主动查询设备列表 |
| 发布(发) | NATS | edgex.devices.{node_id}.list |
主动查询设备列表 |
| 发布(发) | MQTT | edgex/cmd/{node_id}/discover |
触发设备发现 |
4.3 消息结构
接收(EdgeX → EdgeOS):
{
"header": { "message_type": "device_report", "source": "edgex-node-001" },
"body": {
"node_id": "edgex-node-001",
"devices": [
{
"device_id": "device-001",
"device_name": "Modbus TCP Device",
"device_profile": "modbus-tcp-device",
"service_name": "modbus-tcp-service",
"labels": ["sensor", "modbus"],
"description": "Test Modbus TCP device",
"admin_state": "ENABLED",
"operating_state": "ENABLED",
"properties": {
"protocol": "modbus-tcp",
"address": "192.168.1.100:502",
"unit_id": 1
}
}
]
}
}
4.4 处理器实现
// internal/handlers/device_handler.go
package handlers
type DeviceHandler struct {
nodeService NodeService
deviceService DeviceService
log *logrus.Logger
}
func (h *DeviceHandler) HandleDeviceReport(topic string, msg *Message) error {
var body DeviceReportBody
if err := decodeBody(msg.Body, &body); err != nil {
return fmt.Errorf("decode device_report: %w", err)
}
// 校验节点是否已注册
node, err := h.nodeService.GetByID(body.NodeID)
if err != nil || node == nil {
return fmt.Errorf("node %s not registered", body.NodeID)
}
reportedIDs := make(map[string]struct{})
for _, d := range body.Devices {
reportedIDs[d.DeviceID] = struct{}{}
existing, _ := h.deviceService.GetByID(body.NodeID, d.DeviceID)
device := &Device{
NodeID: body.NodeID,
DeviceID: d.DeviceID,
DeviceName: d.DeviceName,
DeviceProfile: d.DeviceProfile,
ServiceName: d.ServiceName,
Labels: d.Labels,
Description: d.Description,
AdminState: d.AdminState,
OperatingState: d.OperatingState,
Properties: d.Properties,
UpdatedAt: time.Now().UnixMilli(),
}
if existing == nil {
device.CreatedAt = device.UpdatedAt
if err = h.deviceService.Save(device); err != nil {
h.log.Errorf("[DeviceSync] save device %s: %v", d.DeviceID, err)
continue
}
h.log.Infof("[DeviceSync] new device: %s / %s", body.NodeID, d.DeviceID)
} else {
if err = h.deviceService.Update(device); err != nil {
h.log.Errorf("[DeviceSync] update device %s: %v", d.DeviceID, err)
}
}
}
// 标记不在上报列表中的设备为离线
allDevices, _ := h.deviceService.ListByNode(body.NodeID)
for _, d := range allDevices {
if _, ok := reportedIDs[d.DeviceID]; !ok {
_ = h.deviceService.MarkOffline(body.NodeID, d.DeviceID)
}
}
return nil
}
// TriggerDiscover 主动触发设备发现
func (h *DeviceHandler) TriggerDiscover(nodeID string, protocol string, publisher Publisher) error {
body := map[string]interface{}{
"protocol": protocol,
"options": map[string]interface{}{
"auto_register": true,
"sync_immediately": true,
},
}
topic := fmt.Sprintf("edgex/cmd/%s/discover", nodeID)
return publisher.Publish(topic, "discover_command", body)
}
4.5 设备模型
// internal/models/device.go
type Device struct {
NodeID string `json:"node_id"`
DeviceID string `json:"device_id"`
DeviceName string `json:"device_name"`
DeviceProfile string `json:"device_profile"`
ServiceName string `json:"service_name"`
Labels []string `json:"labels"`
Description string `json:"description"`
AdminState string `json:"admin_state"`
OperatingState string `json:"operating_state"`
Properties map[string]interface{} `json:"properties"`
Status string `json:"status"` // online / offline
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
5. EdgeX 子设备点位同步(物模型)
5.1 设计原则:物模型驱动的两阶段数据流
EdgeX 的点位数据通过实时数据消息(edgex/data/{node_id}/{device_id})传递,遵循以下两阶段规则:
| 阶段 | 触发时机 | 消息内容 | EdgeOS 处理方式 |
|---|---|---|---|
| 全量上报(物模型初始化) | 设备连接后首次上报,或 EdgeOS 主动触发同步 | points 包含该设备全部点位及其当前值 |
以 point_id 为 key 全量写入点位缓存,建立物模型快照 |
| 差量上报(增量实时数据) | 后续周期性或变化触发 | points 仅包含自上次上报以来发生变化的点位 |
Merge 到全量缓存,仅更新出现的 key,不删除未出现的点位 |
关键原则:两种消息结构完全相同,区分方式由 EdgeOS 业务逻辑判断——设备无缓存时首条视为全量,有缓存后每条均视为差量 Merge。
edgex/points/report是独立的点位元数据上报(含类型/单位/地址等配置信息),与实时数据上报相互独立、可选。
5.2 功能流程
EdgeX 消息中间件 EdgeOS
│ │ │
│ ─(可选) point_report ──►│ │
│ Topic: edgex/points/report │
│ │──────────────────────►│
│ │ 保存点位元数据定义 │
│ │ (类型/单位/地址/范围) │
│ │ │
│ ── data [首次·全量] ────►│ │
│ Topic: edgex/data/{node}/{dev} │
│ body.points = {所有点位:当前值} │
│ │──────────────────────►│
│ │ 1. 设备无缓存 │
│ │ 2. 全量写入点位缓存 │
│ │ 3. 建立物模型快照 │
│ │ 4. 广播 data_update │
│ │ (is_full_snapshot=true)
│ │ │
│ ── data [后续·差量] ────►│ │
│ body.points = {仅变化点位} │
│ │──────────────────────►│
│ │ 1. 设备缓存已存在 │
│ │ 2. Merge 到全量缓存 │
│ │ (只更新出现的key) │
│ │ 3. 广播 data_update │
│ │ (is_full_snapshot=false)
EdgeOS 可主动触发全量同步:向 edgex/cmd/{node_id}/sync 发布请求,EdgeX 将重新触发全量数据上报。
5.3 订阅 / 发布主题
| 方向 | 协议 | 主题 | 说明 |
|---|---|---|---|
| 订阅(收) | MQTT | edgex/points/report |
接收点位元数据定义(可选) |
| 订阅(收) | MQTT | edgex/data/# |
接收实时数据(全量或差量) |
| 订阅(收) | NATS | edgex.points.report |
接收点位元数据定义(可选) |
| 订阅(收) | NATS | edgex.data.> |
接收实时数据(全量或差量) |
| 发布(发) | MQTT | edgex/cmd/{node_id}/sync |
主动触发 EdgeX 全量上报 |
| 发布(发) | MQTT | edgex/points/{node_id}/{device_id}/sync |
请求指定设备点位元数据同步 |
5.4 消息结构
点位元数据上报 edgex/points/report(可选,含配置信息):
{
"header": { "message_type": "point_report", "source": "edgex-node-001" },
"body": {
"node_id": "edgex-node-001",
"device_id": "Room_FC_2014_19",
"points": [
{
"point_id": "SetPoint.Value",
"point_name": "设定温度",
"resource_name": "SetPoint.Value",
"value_type": "Float32",
"access_mode": "RW",
"unit": "°C",
"minimum": 16,
"maximum": 30,
"address": "40001",
"data_type": "holding_register",
"scale": 1.0,
"offset": 0
}
]
}
}
首次全量实时数据上报(物模型初始化):
{
"header": {
"message_id": "msg-c9028d5e3e3018e6324ad61c91927561",
"timestamp": 1776312621808,
"source": "edgex-node-001",
"message_type": "data",
"version": "1.0"
},
"body": {
"device_id": "Room_FC_2014_19",
"node_id": "edgex-node-001",
"points": {
"SetPoint.Value": 19,
"Setpoint.1": 19,
"Setpoint.2": 19,
"Setpoint.3": 19,
"State.Chiller": 0,
"State.Heater": 1,
"Temperature.Indoor": 18.8,
"Temperature.Outdoor": 12,
"Temperature.Water": 39.7
},
"quality": "Good",
"timestamp": 1776312621573
}
}
首次上报包含设备所有点位,EdgeOS 以此建立物模型全量快照。
points的 key 即为point_id,value 为当前采集值。
后续差量实时数据上报(仅变化点位):
{
"header": {
"message_id": "msg-c9028d5e3e3018e6324ad61c91927561",
"timestamp": 1776312623000,
"source": "edgex-node-001",
"message_type": "data",
"version": "1.0"
},
"body": {
"device_id": "Room_FC_2014_19",
"node_id": "edgex-node-001",
"points": {
"SetPoint.Value": 20,
"Setpoint.1": 20,
"Setpoint.2": 20
},
"quality": "Good",
"timestamp": 1776312623000
}
}
差量上报只包含发生变化的点位(
Setpoint.3、State.*、Temperature.*未变化故不出现)。EdgeOS 收到后仅更新这三个点位的值,其余点位保持上次快照中的值不变。
5.5 处理器实现
// internal/handlers/point_handler.go
package handlers
import (
"fmt"
"strings"
"time"
"github.com/sirupsen/logrus"
)
type PointHandler struct {
nodeService NodeService
deviceService DeviceService
pointService PointService
broadcast BroadcastFunc // 向前端 WebSocket 推送
log *logrus.Logger
}
// HandlePointReport 处理点位元数据上报(edgex/points/report,可选)
// 保存点位的配置定义:类型、单位、地址、范围等,不覆盖 CurrentValue/Quality/LastUpdated。
func (h *PointHandler) HandlePointReport(topic string, msg *Message) error {
var body PointReportBody
if err := decodeBody(msg.Body, &body); err != nil {
return fmt.Errorf("decode point_report: %w", err)
}
dev, err := h.deviceService.GetByID(body.NodeID, body.DeviceID)
if err != nil || dev == nil {
return fmt.Errorf("device %s/%s not found", body.NodeID, body.DeviceID)
}
now := time.Now().UnixMilli()
for _, p := range body.Points {
point := &Point{
NodeID: body.NodeID,
DeviceID: body.DeviceID,
PointID: p.PointID,
PointName: p.PointName,
ResourceName: p.ResourceName,
ValueType: p.ValueType,
AccessMode: p.AccessMode,
Unit: p.Unit,
Minimum: p.Minimum,
Maximum: p.Maximum,
Address: p.Address,
DataType: p.DataType,
Scale: p.Scale,
Offset: p.Offset,
UpdatedAt: now,
}
existing, _ := h.pointService.GetByID(body.NodeID, body.DeviceID, p.PointID)
if existing == nil {
point.CreatedAt = now
_ = h.pointService.Save(point)
} else {
// 只更新元数据字段,不覆盖 CurrentValue / Quality / LastUpdated
_ = h.pointService.UpdateMeta(point)
}
}
h.log.Infof("[PointMeta] synced %d point definitions for %s/%s",
len(body.Points), body.NodeID, body.DeviceID)
return nil
}
// HandleRealtimeData 处理实时数据上报(edgex/data/{node_id}/{device_id})
//
// 两阶段 Merge 策略:
// - 首次上报(设备无缓存):body.points 为物模型全量数据,全量写入缓存,
// 同时为未经 point_report 注册的点位自动创建占位记录(point_id 作为 point_name)。
// - 后续上报(设备已有缓存):body.points 仅含变化点位,执行 Merge,
// 未出现在本次 payload 中的点位保持原值不变。
func (h *PointHandler) HandleRealtimeData(topic string, msg *Message) error {
var body DataBody
if err := decodeBody(msg.Body, &body); err != nil {
return fmt.Errorf("decode data: %w", err)
}
// 兜底:从 topic 解析 nodeId / deviceId
nodeID, deviceID := body.NodeID, body.DeviceID
if nodeID == "" || deviceID == "" {
parts := strings.Split(topic, "/")
if len(parts) >= 4 {
nodeID, deviceID = parts[2], parts[3]
}
}
isFirstReport := !h.pointService.HasCache(nodeID, deviceID)
now := time.Now().UnixMilli()
for pointID, value := range body.Points {
existing, _ := h.pointService.GetByID(nodeID, deviceID, pointID)
if existing == nil {
// 自动创建点位占位(元数据待 point_report 补全)
placeholder := &Point{
NodeID: nodeID,
DeviceID: deviceID,
PointID: pointID,
PointName: pointID, // 暂用 point_id 作为名称
CurrentValue: value,
Quality: body.Quality,
LastUpdated: body.Timestamp,
CreatedAt: now,
UpdatedAt: now,
}
_ = h.pointService.Save(placeholder)
} else {
// Merge:只更新当前值,保留元数据不变
_ = h.pointService.UpdateValue(nodeID, deviceID, pointID, value, body.Quality, body.Timestamp)
}
}
if isFirstReport {
h.log.Infof("[PointSync] full snapshot: %d points for %s/%s",
len(body.Points), nodeID, deviceID)
} else {
h.log.Debugf("[PointSync] delta merge: %d changed points for %s/%s",
len(body.Points), nodeID, deviceID)
}
// 广播实时事件到前端
// payload 只含本次上报的点位(差量),前端 store 同步执行 merge
h.broadcast(RealtimeDataEvent{
NodeID: nodeID,
DeviceID: deviceID,
Points: body.Points,
Timestamp: body.Timestamp,
Quality: body.Quality,
IsFullSnapshot: isFirstReport,
})
return nil
}
5.6 PointService 接口
// internal/services/point_service.go
type PointService interface {
// HasCache 检查设备是否已有点位缓存(用于判断全量 vs 差量 Merge)
HasCache(nodeID, deviceID string) bool
GetByID(nodeID, deviceID, pointID string) (*Point, error)
Save(p *Point) error
// UpdateMeta 仅更新点位元数据字段,不覆盖 CurrentValue/Quality/LastUpdated
UpdateMeta(p *Point) error
// UpdateValue 差量 Merge 的最小操作单元:更新当前值+质量+时间戳
UpdateValue(nodeID, deviceID, pointID string, value interface{}, quality string, ts int64) error
ListByDevice(nodeID, deviceID string) ([]*Point, error)
}
5.7 点位模型
// internal/models/point.go
type Point struct {
NodeID string `json:"node_id"`
DeviceID string `json:"device_id"`
PointID string `json:"point_id"`
PointName string `json:"point_name"` // 来自 point_report,缺省时等于 point_id
ResourceName string `json:"resource_name"`
ValueType string `json:"value_type"` // Float32 / Int16 / Bool / ...(元数据)
AccessMode string `json:"access_mode"` // R / W / RW(元数据,缺省时为空)
Unit string `json:"unit"`
Minimum interface{} `json:"minimum,omitempty"`
Maximum interface{} `json:"maximum,omitempty"`
Address string `json:"address"`
DataType string `json:"data_type"`
Scale float64 `json:"scale"`
Offset float64 `json:"offset"`
CurrentValue interface{} `json:"current_value"` // 最新采集值(由实时数据驱动)
Quality string `json:"quality"` // Good / Bad / Uncertain
LastUpdated int64 `json:"last_updated"` // 最新值对应的设备时间戳
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"` // 元数据最后修改时间
}
// RealtimeDataEvent 向前端广播的实时数据事件
type RealtimeDataEvent struct {
NodeID string `json:"node_id"`
DeviceID string `json:"device_id"`
Points map[string]interface{} `json:"points"` // 本次上报的点位(全量或差量)
Timestamp int64 `json:"timestamp"`
Quality string `json:"quality"`
IsFullSnapshot bool `json:"is_full_snapshot"` // true=全量,前端可用于初始化展示
}
5.8 全量 vs 差量 Merge 示意
设备: Room_FC_2014_19 物模型缓存状态
T1 收到全量上报(IsFullSnapshot=true):
payload.points = 9 个点位
缓存写入全部 9 个:
SetPoint.Value=19 Setpoint.1=19 Setpoint.2=19 Setpoint.3=19
State.Chiller=0 State.Heater=1
Temperature.Indoor=18.8 Temperature.Outdoor=12 Temperature.Water=39.7
T2 收到差量上报(IsFullSnapshot=false):
payload.points = { SetPoint.Value:20, Setpoint.1:20, Setpoint.2:20 }
Merge 结果(仅更新 3 个,其余 6 个保持 T1 值):
SetPoint.Value=20 ← 已更新 Setpoint.1=20 ← 已更新 Setpoint.2=20 ← 已更新
Setpoint.3=19 ← T1 值 State.Chiller=0 ← T1 值 State.Heater=1 ← T1 值
Temperature.Indoor=18.8 ← T1 值 Temperature.Outdoor=12 ← T1 值 Temperature.Water=39.7 ← T1 值
6. EdgeX 子设备双向控制
6.1 功能流程
EdgeOS (控制发起) 消息中间件 EdgeX (执行)
│ │ │
│─── write_command ──────►│ │
│ edgex/cmd/{node}/{dev}/write │
│ │──────────────────────►│
│ │ EdgeX 写入物理设备 │
│ │◄──────────────────────│
│◄─── 命令响应 ───────────│ edgex/responses/... │
│ │ │
│ │ │
│ (实时状态反馈路径) │ │
│◄── data ───────────────│◄──────────────────────│
│ edgex/data/{node}/{dev} 实时数据包含控制结果 │
双向指的是:
- 下行(EdgeOS → EdgeX):写入命令、任务控制、配置更新、设备发现
- 上行(EdgeX → EdgeOS):命令执行响应、实时数据(含写后读验证)、告警
6.2 下行控制主题
| 协议 | 主题 | 消息类型 | 说明 |
|---|---|---|---|
| MQTT | edgex/cmd/{node_id}/{device_id}/write |
write_command |
写入设备点位 |
| MQTT | edgex/cmd/{node_id}/discover |
discover_command |
触发设备发现 |
| MQTT | edgex/cmd/{node_id}/task/create |
task_create |
创建采集任务 |
| MQTT | edgex/cmd/{node_id}/task/{task_id}/pause |
task_control |
暂停任务 |
| MQTT | edgex/cmd/{node_id}/task/{task_id}/resume |
task_control |
恢复任务 |
| MQTT | edgex/cmd/{node_id}/task/{task_id}/stop |
task_control |
停止任务 |
| MQTT | edgex/cmd/{node_id}/config/update |
config_update |
更新节点配置 |
| NATS | edgex.cmd.{node_id}.{device_id}.write |
write_command |
写入设备点位 |
6.3 上行响应主题
| 协议 | 主题 | 说明 |
|---|---|---|
| MQTT | edgex/responses/{node_id}/{request_id} |
命令执行响应 |
| MQTT | edgex/responses/{node_id}/error/{request_id} |
错误响应 |
| NATS | edgex.res.{node_id}.{request_id} |
命令执行响应 |
6.4 写入命令消息
{
"header": {
"message_id": "msg-cmd-write-001",
"timestamp": 1744680000000,
"source": "edgeos-queen",
"destination": "edgex-node-001",
"message_type": "write_command",
"version": "1.0",
"correlation_id": "req-write-001"
},
"body": {
"request_id": "req-write-001",
"device_id": "device-001",
"timestamp": 1744680000000,
"points": {
"Switch": true,
"Setpoint": 80.5
},
"options": {
"confirm": true,
"timeout_seconds": 10
}
}
}
6.5 控制服务实现
// internal/services/control_service.go
package services
import (
"fmt"
"sync"
"time"
)
type ControlService struct {
publisher Publisher
pendingReqs sync.Map // requestID -> chan *CommandResponse
log *logrus.Logger
}
// WritePoints 向 EdgeX 写入设备点位(异步等待响应)
func (s *ControlService) WritePoints(nodeID, deviceID string, points map[string]interface{}, timeout time.Duration) (*CommandResponse, error) {
reqID := generateUUID()
respCh := make(chan *CommandResponse, 1)
s.pendingReqs.Store(reqID, respCh)
defer s.pendingReqs.Delete(reqID)
body := map[string]interface{}{
"request_id": reqID,
"device_id": deviceID,
"timestamp": time.Now().UnixMilli(),
"points": points,
"options": map[string]interface{}{
"confirm": true,
"timeout_seconds": int(timeout.Seconds()),
},
}
topic := fmt.Sprintf("edgex/cmd/%s/%s/write", nodeID, deviceID)
if err := s.publisher.Publish(topic, "write_command", body); err != nil {
return nil, fmt.Errorf("publish write_command: %w", err)
}
select {
case resp := <-respCh:
return resp, nil
case <-time.After(timeout):
return nil, fmt.Errorf("write command timeout after %v", timeout)
}
}
// HandleCommandResponse 处理来自 EdgeX 的命令响应
func (s *ControlService) HandleCommandResponse(topic string, msg *Message) error {
var body CommandResponseBody
if err := decodeBody(msg.Body, &body); err != nil {
return err
}
if ch, ok := s.pendingReqs.Load(body.RequestID); ok {
ch.(chan *CommandResponse) <- &CommandResponse{
RequestID: body.RequestID,
Success: body.Success,
Message: body.Message,
Data: body.Data,
}
}
return nil
}
// CreateTask 创建采集任务
func (s *ControlService) CreateTask(nodeID string, task *TaskCreateRequest) error {
body := map[string]interface{}{
"task_id": task.TaskID,
"task_name": task.TaskName,
"device_id": task.DeviceID,
"schedule": task.Schedule,
"points": task.Points,
"options": task.Options,
}
topic := fmt.Sprintf("edgex/cmd/%s/task/create", nodeID)
return s.publisher.Publish(topic, "task_create", body)
}
// ControlTask 任务控制(pause/resume/stop)
func (s *ControlService) ControlTask(nodeID, taskID, action string) error {
body := map[string]interface{}{
"task_id": taskID,
"action": action,
}
topic := fmt.Sprintf("edgex/cmd/%s/task/%s/%s", nodeID, taskID, action)
return s.publisher.Publish(topic, "task_control", body)
}
6.6 控制 REST API
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/api/v1/nodes/:nodeId/devices/:deviceId/write |
写入点位值 |
POST |
/api/v1/nodes/:nodeId/discover |
触发设备发现 |
POST |
/api/v1/nodes/:nodeId/tasks |
创建采集任务 |
PUT |
/api/v1/nodes/:nodeId/tasks/:taskId/pause |
暂停任务 |
PUT |
/api/v1/nodes/:nodeId/tasks/:taskId/resume |
恢复任务 |
DELETE |
/api/v1/nodes/:nodeId/tasks/:taskId |
停止并删除任务 |
POST |
/api/v1/nodes/:nodeId/config |
更新节点配置 |
POST |
/api/edgex/discover |
Stage 2: 触发全量节点发现 |
POST |
/api/edgex/discover/:middlewareId |
Stage 2: 向指定中间件触发节点发现 |
7. 心跳与状态管理
7.1 订阅主题
| 协议 | 主题 | QoS | 说明 |
|---|---|---|---|
| MQTT | edgex/nodes/+/heartbeat |
0 | 节点心跳 |
| MQTT | edgex/nodes/+/status |
1 | 节点状态变更 |
| MQTT | edgex/nodes/+/online |
2 | 节点上线 |
| MQTT | edgex/nodes/+/offline |
2 | 节点离线 |
| NATS | edgex.nodes.heartbeat.> |
- | 节点心跳 |
| NATS | edgex.nodes.status.> |
- | 节点状态 |
7.2 心跳超时自动离线
// internal/services/heartbeat_service.go
const HeartbeatTimeout = 3 * time.Minute
type HeartbeatService struct {
nodeService NodeService
lastSeen sync.Map // nodeID -> int64(UnixMilli)
log *logrus.Logger
}
func (s *HeartbeatService) HandleHeartbeat(_ string, msg *Message) error {
var body HeartbeatBody
if err := decodeBody(msg.Body, &body); err != nil {
return err
}
s.lastSeen.Store(body.NodeID, time.Now().UnixMilli())
return s.nodeService.UpdateHeartbeat(body.NodeID, body.Status, body.Metrics)
}
// StartWatchdog 定时检查心跳超时
func (s *HeartbeatService) StartWatchdog(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.checkTimeouts()
}
}
}
func (s *HeartbeatService) checkTimeouts() {
now := time.Now().UnixMilli()
s.lastSeen.Range(func(key, value interface{}) bool {
nodeID := key.(string)
lastTS := value.(int64)
if now-lastTS > HeartbeatTimeout.Milliseconds() {
s.log.Warnf("[Heartbeat] node %s timeout, marking offline", nodeID)
_ = s.nodeService.MarkOffline(nodeID)
}
return true
})
}
8. 告警与事件处理
8.1 订阅主题
| 协议 | 主题 | QoS | 说明 |
|---|---|---|---|
| MQTT | edgex/events/alert |
2 | 告警 |
| MQTT | edgex/events/error |
1 | 错误 |
| MQTT | edgex/events/info |
0 | 信息 |
| NATS | edgex.events.alert |
- | 告警 |
8.2 处理器
// internal/handlers/event_handler.go
func (h *EventHandler) HandleAlert(_ string, msg *Message) error {
var body AlertBody
if err := decodeBody(msg.Body, &body); err != nil {
return err
}
alert := &Alert{
AlertID: body.AlertID,
NodeID: body.NodeID,
DeviceID: body.DeviceID,
Type: body.AlertType,
Severity: body.Severity,
Message: body.Message,
Timestamp: body.Timestamp,
Details: body.Details,
}
_ = h.alertService.Save(alert)
// 高优先级告警实时推送前端
if body.Severity == "critical" || body.Severity == "error" {
h.broadcast(AlertEvent{Alert: alert})
}
return nil
}
9. 错误处理与重试策略
9.1 消息处理错误分类
| 错误类型 | 处理方式 |
|---|---|
| JSON 解析失败 | 记录日志,丢弃消息,不重试 |
| 节点未注册 | 记录日志,丢弃消息,等待注册 |
| 存储失败(可恢复) | 指数退避重试,最多3次 |
| 网络/连接异常 | MQTT 自动重连 / NATS 自动重连 |
| 命令超时 | 返回超时错误,前端提示重试 |
9.2 指数退避实现
// internal/utils/retry.go
type RetryConfig struct {
MaxAttempts int
InitialInterval time.Duration
MaxInterval time.Duration
Multiplier float64
}
func RetryWithBackoff(cfg RetryConfig, fn func() error) error {
interval := cfg.InitialInterval
for i := 0; i < cfg.MaxAttempts; i++ {
if err := fn(); err == nil {
return nil
} else if i == cfg.MaxAttempts-1 {
return err
}
time.Sleep(interval)
interval = time.Duration(float64(interval) * cfg.Multiplier)
if interval > cfg.MaxInterval {
interval = cfg.MaxInterval
}
}
return nil
}
9.3 错误码参考
| 错误码 | 说明 | 处理建议 |
|---|---|---|
E001 |
消息格式错误 | 检查 JSON 格式与字段 |
E002 |
消息类型不支持 | 检查 message_type 字段 |
E003 |
节点未注册 | 等待节点先完成注册流程 |
E004 |
设备不存在 | 等待设备列表同步完成 |
E005 |
认证失败 | 检查 access_token / 凭证 |
E006 |
权限不足 | 检查 ACL 配置 |
E007 |
超时 | 重试或增加超时时间 |
E008 |
重复消息 | 根据 message_id 去重 |
10. 安全与认证
10.1 MQTT 安全配置
mqtt:
tls_enabled: true
ca_cert: "/etc/edgeos/certs/ca.crt"
client_cert: "/etc/edgeos/certs/client.crt"
client_key: "/etc/edgeos/certs/client.key"
# ACL: edgex/# 仅 edgeos 用户可订阅
10.2 NATS 安全配置
nats:
tls_enabled: true
token: "your-auth-token"
# 或使用用户名密码
username: "edgeos"
password: "secure-password"
10.3 节点 access_token
节点注册成功后 EdgeOS 返回 access_token,后续 EdgeX 发送的消息头中应携带此 token,EdgeOS 在处理消息时校验有效性:
func (h *BaseHandler) ValidateToken(msg *Message) error {
token := msg.Header.Token
if token == "" {
return errors.New("missing access_token")
}
return h.authService.ValidateNodeToken(msg.Header.Source, token)
}
11. 测试与排查
11.1 本地测试环境启动
# 启动 MQTT Broker (EMQX)
docker run -d --name emqx -p 1883:1883 -p 18083:18083 emqx/emqx:latest
# 启动 NATS Server(含 JetStream)
docker run -d --name nats -p 4222:4222 -p 8222:8222 nats -js
11.2 模拟 EdgeX 消息
# 1. 模拟节点注册
mosquitto_pub -h 127.0.0.1 -p 1883 -t "edgex/nodes/register" -m '{
"header":{"message_id":"test-reg-001","timestamp":1744680000000,"source":"edgex-test-001","message_type":"node_register","version":"1.0"},
"body":{"node_id":"edgex-test-001","node_name":"测试网关","model":"edge-gateway","version":"1.0.0","api_version":"v1","capabilities":["shadow-sync","heartbeat"],"protocol":"edgeOS(MQTT)"}
}'
# 2. 模拟设备上报
mosquitto_pub -h 127.0.0.1 -p 1883 -t "edgex/devices/report" -m '{
"header":{"message_id":"test-dev-001","source":"edgex-test-001","message_type":"device_report","version":"1.0","timestamp":1744680000000},
"body":{"node_id":"edgex-test-001","devices":[{"device_id":"dev-001","device_name":"Modbus设备","device_profile":"modbus-tcp","admin_state":"ENABLED","operating_state":"ENABLED"}]}
}'
# 3. 模拟点位上报
mosquitto_pub -h 127.0.0.1 -p 1883 -t "edgex/points/report" -m '{
"header":{"message_id":"test-pt-001","source":"edgex-test-001","message_type":"point_report","version":"1.0","timestamp":1744680000000},
"body":{"node_id":"edgex-test-001","device_id":"dev-001","points":[{"point_id":"Temperature","point_name":"温度","value_type":"Float32","access_mode":"R","unit":"°C"}]}
}'
# 4. 模拟实时数据
mosquitto_pub -h 127.0.0.1 -p 1883 -t "edgex/data/edgex-test-001/dev-001" -m '{
"header":{"message_id":"test-data-001","source":"edgex-test-001","message_type":"data","version":"1.0","timestamp":1744680000000},
"body":{"node_id":"edgex-test-001","device_id":"dev-001","timestamp":1744680000000,"points":{"Temperature":25.5,"Humidity":65.2},"quality":"good"}
}'
# 5. 订阅监听 EdgeOS 下发的控制命令(验证双向控制)
mosquitto_sub -h 127.0.0.1 -p 1883 -t "edgex/cmd/#" -v
11.3 NATS 测试
# 订阅所有 edgex 消息
nats sub "edgex.>"
# 发布节点注册
nats pub "edgex.nodes.register" '{"header":{"message_id":"test-001","timestamp":1744680000000,"source":"edgex-test","message_type":"node_register","version":"1.0"},"body":{"node_id":"edgex-test","node_name":"Test Node","protocol":"edgeOS(NATS)"}}'
11.4 常见排查
| 问题 | 排查步骤 |
|---|---|
| 连接失败 | 检查 broker/url 地址与端口,检查用户名密码,telnet host port |
| 消息未接收 | 检查订阅 Topic 是否正确,检查 QoS,确认消息发布成功 |
| 注册无响应 | 检查响应 Topic 订阅,检查 correlation_id 匹配 |
| 点位不更新 | 确认 edgex/data/# 已订阅,检查 data 消息格式 |
| 控制命令无反应 | 检查下行 Topic 拼写,检查 EdgeX 是否在线,查看响应 Topic 日志 |
12. 项目结构参考
internal/
├── middleware/ # 消息总线层
│ ├── manager.go # 连接管理器
│ ├── mqtt_client.go # MQTT 客户端
│ ├── nats_client.go # NATS 客户端
│ ├── message.go # 消息通用结构
│ └── router.go # 消息路由器
├── handlers/ # 消息处理器
│ ├── node_handler.go # 节点注册处理
│ ├── device_handler.go # 设备同步处理
│ ├── point_handler.go # 点位同步(即物模型点位上报 可用将实时数据点位理解成物模型)+实时数据
│ ├── control_handler.go # 控制命令响应处理
│ └── event_handler.go # 告警事件处理
├── services/ # 业务逻辑层
│ ├── node_service.go
│ ├── device_service.go
│ ├── point_service.go
│ ├── control_service.go # 下行控制命令构建与发送
│ └── heartbeat_service.go
├── models/ # 数据模型
│ ├── middleware.go # 中间件配置模型
│ ├── node.go
│ ├── device.go
│ └── point.go
└── api/ # REST API 路由
├── middleware_api.go # 中间件 CRUD 接口
├── node_api.go
├── device_api.go
├── point_api.go
└── control_api.go # 控制命令接口
config/
├── config.yaml # 主配置(含中间件默认值)
└── config.dev.yaml
cmd/
└── main.go # 启动入口,加载配置,初始化连接管理器
文档版本: v2.1
最后更新: 2026-04-17
维护者: edgeOS 团队