南向采集数据通道质量优化
保持设备级在线/离线裁决逻辑不变(仍在 node_status.go)
新增通道级通信质量裁决
为通道引入超时、重连间隔、指数退避机制
输出工业级通信状态等级(Excellent / Good / Fair / Poor / Offline)
一、总体设计原则 层级 状态来源 是否修改现有逻辑 设备(Device) internal/core/node_status.go(成功率裁决) ❌ 不改 通道(Channel) 汇总设备状态 + 通信失败趋势 ✅ 新增,不破坏
👉 所有设备状态继续由你现有的状态机裁决;通道状态仅做“二次裁决”,不干预设备逻辑。
二、通道状态模型(最小新增结构)
在 internal/core 下新增一个结构(可以放在 channel_status.go 或当前文件顶部):
type ChannelCommLevel string
const ( ChannelLevelExcellent ChannelCommLevel = “Excellent” ChannelLevelGood ChannelCommLevel = “Good” ChannelLevelFair ChannelCommLevel = “Fair” ChannelLevelPoor ChannelCommLevel = “Poor” ChannelLevelOffline ChannelCommLevel = “Offline” )
type ChannelRuntime struct {
State ChannelCommLevel json:"state"
FailCount int json:"fail_count"
LastFailTime time.Time json:"last_fail_time"
NextRetryTime time.Time json:"next_retry_time"
RetryInterval time.Duration json:"retry_interval"
}
并在 ChannelManager 中新增一个 map:
channelRuntime map[string]*ChannelRuntime // channel.id -> runtime
在 NewChannelManager() 中初始化:
channelRuntime: make(map[string]*ChannelRuntime),
三、通道级裁决规则(工业标准化) 1️⃣ 裁决触发条件
当满足以下条件时,启动通道裁决:
一个通道下所有设备都为 Offline 状态
你已经在 GetChannelStats() 里统计了 OnlineCount / OfflineCount,这里可以直接复用。
2️⃣ 通道通信等级算法(推荐工业标准) 条件 通道等级 所有设备在线 Excellent ≥80% 设备在线 Good 30%–79% 在线 Fair <30% 在线 Poor 所有设备离线 Offline
实现函数(最小侵入):
func calcChannelLevel(online, total int) ChannelCommLevel { if total == 0 { return ChannelLevelOffline } if online == total { return ChannelLevelExcellent } ratio := float64(online) / float64(total) switch { case ratio >= 0.8: return ChannelLevelGood case ratio >= 0.3: return ChannelLevelFair case ratio > 0: return ChannelLevelPoor default: return ChannelLevelOffline } }
四、为通道引入超时 + 指数退避重连机制(最小侵入) 1️⃣ ChannelRuntime 初始化
在 AddChannel() 或 UpdateChannel() 中初始化:
cm.channelRuntime[ch.ID] = &ChannelRuntime{ State: ChannelLevelExcellent, RetryInterval: 5 * time.Second, // 初始重连间隔 }
2️⃣ 在 StartChannel 中接入通道级重连控制
修改 StartChannel() 中连接逻辑(最小替换):
err := d.Connect(cm.ctx) if err != nil { cm.onChannelConnectFail(ch.ID, err) return err } cm.onChannelConnectSuccess(ch.ID)
新增两个方法:
func (cm *ChannelManager) onChannelConnectFail(channelID string, err error) { cm.mu.Lock() defer cm.mu.Unlock()
rt := cm.channelRuntime[channelID]
if rt == nil {
return
}
rt.FailCount++
rt.LastFailTime = time.Now()
// 指数退避:最大 5 分钟
if rt.RetryInterval == 0 {
rt.RetryInterval = 5 * time.Second
} else {
rt.RetryInterval = rt.RetryInterval * 2
if rt.RetryInterval > 5*time.Minute {
rt.RetryInterval = 5 * time.Minute
}
}
rt.NextRetryTime = time.Now().Add(rt.RetryInterval)
rt.State = ChannelLevelPoor
zap.L().Warn("Channel connect failed, scheduling retry",
zap.String("channel_id", channelID),
zap.Duration("retry_in", rt.RetryInterval),
zap.Error(err),
) }
func (cm *ChannelManager) onChannelConnectSuccess(channelID string) { cm.mu.Lock() defer cm.mu.Unlock()
rt := cm.channelRuntime[channelID]
if rt == nil {
return
}
rt.FailCount = 0
rt.RetryInterval = 5 * time.Second
rt.State = ChannelLevelExcellent
rt.NextRetryTime = time.Time{} }
3️⃣ 在 StopChannel 或 deviceLoop 中引入重连调度
在通道启动失败时,可加一个 goroutine 自动重试:
go func(chID string) { for { time.Sleep(1 * time.Second)
cm.mu.RLock()
rt := cm.channelRuntime[chID]
ch := cm.channels[chID]
d := cm.drivers[chID]
cm.mu.RUnlock()
if rt == nil || ch == nil || d == nil {
return
}
if time.Now().Before(rt.NextRetryTime) {
continue
}
if !ch.Enable {
return
}
if err := d.Connect(cm.ctx); err != nil {
cm.onChannelConnectFail(chID, err)
continue
}
cm.onChannelConnectSuccess(chID)
zap.L().Info("Channel reconnected successfully", zap.String("channel_id", chID))
return
} }(ch.ID)
👉 这是非侵入式增强,不破坏原有启动流程。
五、在 GetChannelStats 中输出通道通信等级
你当前:
Status: “Running”, // Simplification
替换为:
level := ChannelLevelOffline if rt, ok := cm.channelRuntime[ch.ID]; ok { level = rt.State }
stats = append(stats, ChannelStatus{ ID: ch.ID, Name: ch.Name, Protocol: ch.Protocol, Status: string(level), DeviceCount: len(ch.Devices), OnlineCount: online, OfflineCount: offline, })
同时,在每次统计前,可以动态更新通道等级:
if rt, ok := cm.channelRuntime[ch.ID]; ok { rt.State = calcChannelLevel(online, len(ch.Devices)) }
六、设备采集失败对通道的影响(不破坏现有逻辑)
你现有在设备失败时调用:
cm.stateManager.onCollectFail(node)
你可以 轻量增强,在设备失败且该通道所有设备 Offline 时,触发通道级失败计数:
if cm.isAllDevicesOffline(ch.ID) { cm.onChannelConnectFail(ch.ID, fmt.Errorf(“all devices offline”)) }
实现辅助函数:
func (cm *ChannelManager) isAllDevicesOffline(channelID string) bool { cm.mu.RLock() defer cm.mu.RUnlock()
ch, ok := cm.channels[channelID]
if !ok {
return false
}
for _, dev := range ch.Devices {
node := cm.stateManager.GetNode(dev.ID)
if node != nil && node.Runtime.State == NodeStateOnline {
return false
}
}
return true }
七、最终效果总结 功能 是否支持 设备在线/离线判定 ✅ 原逻辑保留 通道通信质量等级 ✅ 新增(Good/ Bad / Poor / Offline) 通道超时控制 ✅ 支持 通道指数退避重连 ✅ 支持 UI 可展示通道通信健康度 ✅ 直接使用 ChannelStatus.Status
补充一个非常重要的场景 进行代码优化 避免部分设备离线导致全部离线不可用:
✅ 通道通信已建立(如 Modbus TCP 已连接),但尚未添加设备时,也应判定该通道通信质量为 Good(或 Excellent),而不是 Offline。
下面我基于你现有架构,给你一份 完整、工业级、最小侵入、向后兼容 的通信管道与通道状态管理方案。
✅ 一、目标设计总结 层级 状态来源 逻辑 设备状态 node_status.go 状态机 采集成功率裁决(不变) 通道连接状态 Driver Connect / Disconnect 新增 通道通信质量 设备状态 + 通道连接状态 + 失败趋势 新增 重连机制 通道级指数退避 新增
同时支持三种核心场景:
通道已连接 + 无设备 → 通信质量 = Good
通道已连接 + 有设备 + 多数在线 → Excellent / Good
通道已连接但设备全离线 → Poor / Offline
通道未连接 / 连接失败 → Offline
🧩 二、核心数据结构(最小新增) 1️⃣ 通道通信等级枚举 type ChannelCommLevel string
const ( ChannelLevelExcellent ChannelCommLevel = “Excellent” ChannelLevelGood ChannelCommLevel = “Good” ChannelLevelFair ChannelCommLevel = “Fair” ChannelLevelPoor ChannelCommLevel = “Poor” ChannelLevelOffline ChannelCommLevel = “Offline” )
2️⃣ 通道运行态结构
type ChannelRuntime struct {
State ChannelCommLevel json:"state"
Connected bool json:"connected"
FailCount int json:"fail_count"
LastFailTime time.Time json:"last_fail_time"
NextRetryTime time.Time json:"next_retry_time"
RetryInterval time.Duration json:"retry_interval"
}
3️⃣ ChannelManager 增强字段 channelRuntime map[string]*ChannelRuntime // channel.id -> runtime
在 NewChannelManager() 中初始化:
channelRuntime: make(map[string]*ChannelRuntime),
🔌 三、通道连接生命周期管理 1️⃣ 在 AddChannel / UpdateChannel 时初始化运行态 cm.channelRuntime[ch.ID] = &ChannelRuntime{ State: ChannelLevelOffline, Connected: false, RetryInterval: 5 * time.Second, }
2️⃣ 在 StartChannel 中接入连接状态控制
将原逻辑:
err := d.Connect(cm.ctx) if err != nil { … }
替换为:
if err := d.Connect(cm.ctx); err != nil { cm.onChannelConnectFail(ch.ID, err) return err } cm.onChannelConnectSuccess(ch.ID)
3️⃣ 连接成功 / 失败回调方法 func (cm *ChannelManager) onChannelConnectSuccess(channelID string) { cm.mu.Lock() defer cm.mu.Unlock()
rt := cm.channelRuntime[channelID]
if rt == nil {
return
}
rt.Connected = true
rt.FailCount = 0
rt.RetryInterval = 5 * time.Second
rt.NextRetryTime = time.Time{}
rt.State = ChannelLevelGood // 默认 Good(无设备场景) }
func (cm *ChannelManager) onChannelConnectFail(channelID string, err error) { cm.mu.Lock() defer cm.mu.Unlock()
rt := cm.channelRuntime[channelID]
if rt == nil {
return
}
rt.Connected = false
rt.FailCount++
rt.LastFailTime = time.Now()
if rt.RetryInterval == 0 {
rt.RetryInterval = 5 * time.Second
} else {
rt.RetryInterval *= 2
if rt.RetryInterval > 5*time.Minute {
rt.RetryInterval = 5 * time.Minute
}
}
rt.NextRetryTime = time.Now().Add(rt.RetryInterval)
rt.State = ChannelLevelOffline
zap.L().Warn("Channel connect failed, scheduling retry",
zap.String("channel_id", channelID),
zap.Duration("retry_in", rt.RetryInterval),
zap.Error(err),
) }
4️⃣ 自动重连协程(指数退避)
在 StartChannel() 中,如果连接失败,启动自动重连协程:
go cm.channelReconnectLoop(ch.ID)
实现:
func (cm *ChannelManager) channelReconnectLoop(channelID string) { for { time.Sleep(1 * time.Second)
cm.mu.RLock()
rt := cm.channelRuntime[channelID]
ch := cm.channels[channelID]
d := cm.drivers[channelID]
cm.mu.RUnlock()
if rt == nil || ch == nil || d == nil {
return
}
if !ch.Enable {
return
}
if rt.Connected {
return
}
if time.Now().Before(rt.NextRetryTime) {
continue
}
if err := d.Connect(cm.ctx); err != nil {
cm.onChannelConnectFail(channelID, err)
continue
}
cm.onChannelConnectSuccess(channelID)
zap.L().Info("Channel reconnected successfully", zap.String("channel_id", channelID))
return
} }
📊 四、通道通信质量裁决算法(含“无设备已连接”场景)
核心裁决函数:
func calcChannelLevelWithConnection(connected bool, online, total int) ChannelCommLevel { if !connected { return ChannelLevelOffline } if total == 0 { // 无设备但通道已连接 → Good return ChannelLevelGood } if online == total { return ChannelLevelExcellent } ratio := float64(online) / float64(total) switch { case ratio >= 0.8: return ChannelLevelGood case ratio >= 0.3: return ChannelLevelFair case ratio > 0: return ChannelLevelPoor default: return ChannelLevelOffline } }
🧠 五、在 GetChannelStats 中动态裁决通道质量
替换你当前的:
Status: “Running”,
为:
rt := cm.channelRuntime[ch.ID] level := ChannelLevelOffline connected := false if rt != nil { connected = rt.Connected }
level = calcChannelLevelWithConnection(connected, online, len(ch.Devices))
if rt != nil { rt.State = level }
stats = append(stats, ChannelStatus{ ID: ch.ID, Name: ch.Name, Protocol: ch.Protocol, Status: string(level), DeviceCount: len(ch.Devices), OnlineCount: online, OfflineCount: offline, })
🔁 六、设备失败影响通道状态(保持最小侵入)
在 collectDevice() 中,在你已有:
cm.stateManager.onCollectFail(node)
后追加:
if cm.isAllDevicesOffline(ch.ID) { cm.onChannelConnectFail(ch.ID, fmt.Errorf(“all devices offline”)) }
辅助函数:
func (cm *ChannelManager) isAllDevicesOffline(channelID string) bool { cm.mu.RLock() defer cm.mu.RUnlock()
ch, ok := cm.channels[channelID]
if !ok {
return false
}
for _, dev := range ch.Devices {
node := cm.stateManager.GetNode(dev.ID)
if node != nil && node.Runtime.State == NodeStateOnline {
return false
}
}
return true }
🧪 七、行为矩阵(你关心的场景全覆盖) 通道连接 设备数量 在线设备 通道状态 ❌ 未连接 任意 任意 Offline ✅ 已连接 0 0 Good ✅ ✅ 已连接 >0 全部在线 Excellent ✅ 已连接 >0 ≥80% 在线 Good ✅ 已连接 >0 30%–79% 在线 Fair ✅ 已连接 >0 <30% 在线 Poor ✅ 已连接 >0 全部离线 Offline 🎯 八、方案特点总结
✅ 不改设备状态机(node_status.go 原逻辑完全保留)
✅ 不破坏现有 ChannelManager API
✅ 引入通道连接态(Connected)和通信等级(State)
✅ 支持“无设备已连接”真实工业场景
✅ 支持通道级指数退避重连
✅ UI 可直接展示通道通信健康度