Skip to the content.

南向采集数据通道质量优化

保持设备级在线/离线裁决逻辑不变(仍在 node_status.go)

新增通道级通信质量裁决

为通道引入超时、重连间隔、指数退避机制

输出工业级通信状态等级(Excellent / Good / Fair / Poor / Offline)

一、总体设计原则 层级 状态来源 是否修改现有逻辑 设备(Device) internal/core/node_status.go(成功率裁决) ❌ 不改 通道(Channel) 汇总设备状态 + 通信失败趋势 ✅ 新增,不破坏

👉 所有设备状态继续由你现有的状态机裁决;通道状态仅做“二次裁决”,不干预设备逻辑。

二、通道状态模型(最小新增结构)

在 internal/core 下新增一个结构(可以放在 channel_status.go 或当前文件顶部):

type ChannelCommLevel string

const ( ChannelLevelExcellent ChannelCommLevel = “Excellent” ChannelLevelGood ChannelCommLevel = “Good” ChannelLevelFair ChannelCommLevel = “Fair” ChannelLevelPoor ChannelCommLevel = “Poor” ChannelLevelOffline ChannelCommLevel = “Offline” )

type ChannelRuntime struct { State ChannelCommLevel json:"state" FailCount int json:"fail_count" LastFailTime time.Time json:"last_fail_time" NextRetryTime time.Time json:"next_retry_time" RetryInterval time.Duration json:"retry_interval" }

并在 ChannelManager 中新增一个 map:

channelRuntime map[string]*ChannelRuntime // channel.id -> runtime

在 NewChannelManager() 中初始化:

channelRuntime: make(map[string]*ChannelRuntime),

三、通道级裁决规则(工业标准化) 1️⃣ 裁决触发条件

当满足以下条件时,启动通道裁决:

一个通道下所有设备都为 Offline 状态

你已经在 GetChannelStats() 里统计了 OnlineCount / OfflineCount,这里可以直接复用。

2️⃣ 通道通信等级算法(推荐工业标准) 条件 通道等级 所有设备在线 Excellent ≥80% 设备在线 Good 30%–79% 在线 Fair <30% 在线 Poor 所有设备离线 Offline

实现函数(最小侵入):

func calcChannelLevel(online, total int) ChannelCommLevel { if total == 0 { return ChannelLevelOffline } if online == total { return ChannelLevelExcellent } ratio := float64(online) / float64(total) switch { case ratio >= 0.8: return ChannelLevelGood case ratio >= 0.3: return ChannelLevelFair case ratio > 0: return ChannelLevelPoor default: return ChannelLevelOffline } }

四、为通道引入超时 + 指数退避重连机制(最小侵入) 1️⃣ ChannelRuntime 初始化

在 AddChannel() 或 UpdateChannel() 中初始化:

cm.channelRuntime[ch.ID] = &ChannelRuntime{ State: ChannelLevelExcellent, RetryInterval: 5 * time.Second, // 初始重连间隔 }

2️⃣ 在 StartChannel 中接入通道级重连控制

修改 StartChannel() 中连接逻辑(最小替换):

err := d.Connect(cm.ctx) if err != nil { cm.onChannelConnectFail(ch.ID, err) return err } cm.onChannelConnectSuccess(ch.ID)

新增两个方法:

func (cm *ChannelManager) onChannelConnectFail(channelID string, err error) { cm.mu.Lock() defer cm.mu.Unlock()

rt := cm.channelRuntime[channelID]
if rt == nil {
	return
}
rt.FailCount++
rt.LastFailTime = time.Now()

// 指数退避:最大 5 分钟
if rt.RetryInterval == 0 {
	rt.RetryInterval = 5 * time.Second
} else {
	rt.RetryInterval = rt.RetryInterval * 2
	if rt.RetryInterval > 5*time.Minute {
		rt.RetryInterval = 5 * time.Minute
	}
}

rt.NextRetryTime = time.Now().Add(rt.RetryInterval)
rt.State = ChannelLevelPoor

zap.L().Warn("Channel connect failed, scheduling retry",
	zap.String("channel_id", channelID),
	zap.Duration("retry_in", rt.RetryInterval),
	zap.Error(err),
) }

func (cm *ChannelManager) onChannelConnectSuccess(channelID string) { cm.mu.Lock() defer cm.mu.Unlock()

rt := cm.channelRuntime[channelID]
if rt == nil {
	return
}
rt.FailCount = 0
rt.RetryInterval = 5 * time.Second
rt.State = ChannelLevelExcellent
rt.NextRetryTime = time.Time{} }

3️⃣ 在 StopChannel 或 deviceLoop 中引入重连调度

在通道启动失败时,可加一个 goroutine 自动重试:

go func(chID string) { for { time.Sleep(1 * time.Second)

	cm.mu.RLock()
	rt := cm.channelRuntime[chID]
	ch := cm.channels[chID]
	d := cm.drivers[chID]
	cm.mu.RUnlock()

	if rt == nil || ch == nil || d == nil {
		return
	}
	if time.Now().Before(rt.NextRetryTime) {
		continue
	}
	if !ch.Enable {
		return
	}

	if err := d.Connect(cm.ctx); err != nil {
		cm.onChannelConnectFail(chID, err)
		continue
	}
	cm.onChannelConnectSuccess(chID)
	zap.L().Info("Channel reconnected successfully", zap.String("channel_id", chID))
	return
} }(ch.ID)

👉 这是非侵入式增强,不破坏原有启动流程。

五、在 GetChannelStats 中输出通道通信等级

你当前:

Status: “Running”, // Simplification

替换为:

level := ChannelLevelOffline if rt, ok := cm.channelRuntime[ch.ID]; ok { level = rt.State }

stats = append(stats, ChannelStatus{ ID: ch.ID, Name: ch.Name, Protocol: ch.Protocol, Status: string(level), DeviceCount: len(ch.Devices), OnlineCount: online, OfflineCount: offline, })

同时,在每次统计前,可以动态更新通道等级:

if rt, ok := cm.channelRuntime[ch.ID]; ok { rt.State = calcChannelLevel(online, len(ch.Devices)) }

六、设备采集失败对通道的影响(不破坏现有逻辑)

你现有在设备失败时调用:

cm.stateManager.onCollectFail(node)

你可以 轻量增强,在设备失败且该通道所有设备 Offline 时,触发通道级失败计数:

if cm.isAllDevicesOffline(ch.ID) { cm.onChannelConnectFail(ch.ID, fmt.Errorf(“all devices offline”)) }

实现辅助函数:

func (cm *ChannelManager) isAllDevicesOffline(channelID string) bool { cm.mu.RLock() defer cm.mu.RUnlock()

ch, ok := cm.channels[channelID]
if !ok {
	return false
}
for _, dev := range ch.Devices {
	node := cm.stateManager.GetNode(dev.ID)
	if node != nil && node.Runtime.State == NodeStateOnline {
		return false
	}
}
return true }

七、最终效果总结 功能 是否支持 设备在线/离线判定 ✅ 原逻辑保留 通道通信质量等级 ✅ 新增(Good/ Bad / Poor / Offline) 通道超时控制 ✅ 支持 通道指数退避重连 ✅ 支持 UI 可展示通道通信健康度 ✅ 直接使用 ChannelStatus.Status

补充一个非常重要的场景 进行代码优化 避免部分设备离线导致全部离线不可用:

✅ 通道通信已建立(如 Modbus TCP 已连接),但尚未添加设备时,也应判定该通道通信质量为 Good(或 Excellent),而不是 Offline。

下面我基于你现有架构,给你一份 完整、工业级、最小侵入、向后兼容 的通信管道与通道状态管理方案。

✅ 一、目标设计总结 层级 状态来源 逻辑 设备状态 node_status.go 状态机 采集成功率裁决(不变) 通道连接状态 Driver Connect / Disconnect 新增 通道通信质量 设备状态 + 通道连接状态 + 失败趋势 新增 重连机制 通道级指数退避 新增

同时支持三种核心场景:

通道已连接 + 无设备 → 通信质量 = Good

通道已连接 + 有设备 + 多数在线 → Excellent / Good

通道已连接但设备全离线 → Poor / Offline

通道未连接 / 连接失败 → Offline

🧩 二、核心数据结构(最小新增) 1️⃣ 通道通信等级枚举 type ChannelCommLevel string

const ( ChannelLevelExcellent ChannelCommLevel = “Excellent” ChannelLevelGood ChannelCommLevel = “Good” ChannelLevelFair ChannelCommLevel = “Fair” ChannelLevelPoor ChannelCommLevel = “Poor” ChannelLevelOffline ChannelCommLevel = “Offline” )

2️⃣ 通道运行态结构 type ChannelRuntime struct { State ChannelCommLevel json:"state" Connected bool json:"connected" FailCount int json:"fail_count" LastFailTime time.Time json:"last_fail_time" NextRetryTime time.Time json:"next_retry_time" RetryInterval time.Duration json:"retry_interval" }

3️⃣ ChannelManager 增强字段 channelRuntime map[string]*ChannelRuntime // channel.id -> runtime

在 NewChannelManager() 中初始化:

channelRuntime: make(map[string]*ChannelRuntime),

🔌 三、通道连接生命周期管理 1️⃣ 在 AddChannel / UpdateChannel 时初始化运行态 cm.channelRuntime[ch.ID] = &ChannelRuntime{ State: ChannelLevelOffline, Connected: false, RetryInterval: 5 * time.Second, }

2️⃣ 在 StartChannel 中接入连接状态控制

将原逻辑:

err := d.Connect(cm.ctx) if err != nil { … }

替换为:

if err := d.Connect(cm.ctx); err != nil { cm.onChannelConnectFail(ch.ID, err) return err } cm.onChannelConnectSuccess(ch.ID)

3️⃣ 连接成功 / 失败回调方法 func (cm *ChannelManager) onChannelConnectSuccess(channelID string) { cm.mu.Lock() defer cm.mu.Unlock()

rt := cm.channelRuntime[channelID]
if rt == nil {
	return
}
rt.Connected = true
rt.FailCount = 0
rt.RetryInterval = 5 * time.Second
rt.NextRetryTime = time.Time{}
rt.State = ChannelLevelGood // 默认 Good(无设备场景) }

func (cm *ChannelManager) onChannelConnectFail(channelID string, err error) { cm.mu.Lock() defer cm.mu.Unlock()

rt := cm.channelRuntime[channelID]
if rt == nil {
	return
}
rt.Connected = false
rt.FailCount++
rt.LastFailTime = time.Now()

if rt.RetryInterval == 0 {
	rt.RetryInterval = 5 * time.Second
} else {
	rt.RetryInterval *= 2
	if rt.RetryInterval > 5*time.Minute {
		rt.RetryInterval = 5 * time.Minute
	}
}

rt.NextRetryTime = time.Now().Add(rt.RetryInterval)
rt.State = ChannelLevelOffline

zap.L().Warn("Channel connect failed, scheduling retry",
	zap.String("channel_id", channelID),
	zap.Duration("retry_in", rt.RetryInterval),
	zap.Error(err),
) }

4️⃣ 自动重连协程(指数退避)

在 StartChannel() 中,如果连接失败,启动自动重连协程:

go cm.channelReconnectLoop(ch.ID)

实现:

func (cm *ChannelManager) channelReconnectLoop(channelID string) { for { time.Sleep(1 * time.Second)

	cm.mu.RLock()
	rt := cm.channelRuntime[channelID]
	ch := cm.channels[channelID]
	d := cm.drivers[channelID]
	cm.mu.RUnlock()

	if rt == nil || ch == nil || d == nil {
		return
	}
	if !ch.Enable {
		return
	}
	if rt.Connected {
		return
	}
	if time.Now().Before(rt.NextRetryTime) {
		continue
	}

	if err := d.Connect(cm.ctx); err != nil {
		cm.onChannelConnectFail(channelID, err)
		continue
	}
	cm.onChannelConnectSuccess(channelID)
	zap.L().Info("Channel reconnected successfully", zap.String("channel_id", channelID))
	return
} }

📊 四、通道通信质量裁决算法(含“无设备已连接”场景)

核心裁决函数:

func calcChannelLevelWithConnection(connected bool, online, total int) ChannelCommLevel { if !connected { return ChannelLevelOffline } if total == 0 { // 无设备但通道已连接 → Good return ChannelLevelGood } if online == total { return ChannelLevelExcellent } ratio := float64(online) / float64(total) switch { case ratio >= 0.8: return ChannelLevelGood case ratio >= 0.3: return ChannelLevelFair case ratio > 0: return ChannelLevelPoor default: return ChannelLevelOffline } }

🧠 五、在 GetChannelStats 中动态裁决通道质量

替换你当前的:

Status: “Running”,

为:

rt := cm.channelRuntime[ch.ID] level := ChannelLevelOffline connected := false if rt != nil { connected = rt.Connected }

level = calcChannelLevelWithConnection(connected, online, len(ch.Devices))

if rt != nil { rt.State = level }

stats = append(stats, ChannelStatus{ ID: ch.ID, Name: ch.Name, Protocol: ch.Protocol, Status: string(level), DeviceCount: len(ch.Devices), OnlineCount: online, OfflineCount: offline, })

🔁 六、设备失败影响通道状态(保持最小侵入)

在 collectDevice() 中,在你已有:

cm.stateManager.onCollectFail(node)

后追加:

if cm.isAllDevicesOffline(ch.ID) { cm.onChannelConnectFail(ch.ID, fmt.Errorf(“all devices offline”)) }

辅助函数:

func (cm *ChannelManager) isAllDevicesOffline(channelID string) bool { cm.mu.RLock() defer cm.mu.RUnlock()

ch, ok := cm.channels[channelID]
if !ok {
	return false
}
for _, dev := range ch.Devices {
	node := cm.stateManager.GetNode(dev.ID)
	if node != nil && node.Runtime.State == NodeStateOnline {
		return false
	}
}
return true }

🧪 七、行为矩阵(你关心的场景全覆盖) 通道连接 设备数量 在线设备 通道状态 ❌ 未连接 任意 任意 Offline ✅ 已连接 0 0 Good ✅ ✅ 已连接 >0 全部在线 Excellent ✅ 已连接 >0 ≥80% 在线 Good ✅ 已连接 >0 30%–79% 在线 Fair ✅ 已连接 >0 <30% 在线 Poor ✅ 已连接 >0 全部离线 Offline 🎯 八、方案特点总结

✅ 不改设备状态机(node_status.go 原逻辑完全保留)

✅ 不破坏现有 ChannelManager API

✅ 引入通道连接态(Connected)和通信等级(State)

✅ 支持“无设备已连接”真实工业场景

✅ 支持通道级指数退避重连

✅ UI 可直接展示通道通信健康度