EtherNet/IP 驱动真实通信实现方案
1. 产品概述
将EdgeX边缘网关的EtherNet/IP协议采集通道从模拟实现升级为基于go-ethernet-ip库的真实Allen-Bradley PLC通信,支持ControlLogix、CompactLogix、Micro800等系列PLC。
2. 核心功能
- EtherNet/IP真实通信: 基于github.com/anviod/ethernet-ip库实现与Allen-Bradley系列PLC的TCP通信
- 前端通道配置增强: 支持IP地址、端口、超时时间、重试次数、心跳间隔、缓冲区大小、QoS等级、连接类型(CIP Connection Type)、CPU插槽值
- 点位读写: 支持EtherNet/IP Tag地址格式的批量读取和单点写入
- 数据类型支持: bool, sint, int, dint, real, string, 数组类型
- 批量读取优化: 使用go-ethernet-ip的批量Tag读取减少网络往返
- 连接管理: 自动重连、心跳保活、健康状态检测、连接指标统计
3. 技术栈
- 后端: Go + github.com/anviod/ethernet-ip
- 前端: Vue 3 + Arco Design (现有技术栈)
- 数据模型: 复用现有Channel/Device/Point模型
3.1 Go 库评估
3.1.1 库对比分析
| 评估维度 | anviod/ethernet-ip | danomagnum/gologix |
|---|---|---|
| Star数 | 70 | 66 |
| 提交活跃度 | 85 commits (更新至2022) | 431 commits (更新至2026) |
| Issue处理 | 1 issue | 3 issues |
| 功能完整性 | 完整CIP协议实现 | 专注于Rockwell PLC |
| 批量读写 | 支持MultipleServicePacket | 支持批量Tag读写 |
| 文档质量 | 基本文档 | 详细文档和示例 |
| 社区支持 | 较小 | 活跃 |
| 许可证 | WTFPL | MIT |
3.1.2 本项目选型
确定使用: github.com/anviod/ethernet-ip
选型理由:
- 通用性更强: 完整的CIP协议实现,支持多种厂商设备
- 批量操作支持: 支持MultipleServicePacket批量读写
- 协议完整性: 覆盖EtherNet/IP整个协议栈
- 轻量级: 更适合资源受限的边缘网关环境
3.1.3 依赖配置
require github.com/anviod/ethernet-ip latest
4. 实现方案
4.1 后端EtherNet/IP驱动架构(参照S7三层模式)
4.1.1 架构设计原则
参考S7驱动的三层架构模式,EtherNet/IP驱动采用以下分层设计:
| 层级 | 职责 | 对应文件 | 参考S7实现 |
|---|---|---|---|
| 传输层 | TCP连接管理、CIP会话管理、心跳保活 | transport.go |
s7/transport.go |
| 解码器 | Tag地址解析、数据类型编解码 | decoder.go |
s7/decoder.go |
| 调度器 | 批量读取优化、请求分组、限流控制 | scheduler.go |
s7/scheduler.go |
| 驱动入口 | 接口实现、组件组合 | ethernetip.go |
s7/s7.go |
4.1.2 三层框架搭建步骤
第一步:创建目录结构
mkdir -p internal/driver/ethernetip
touch internal/driver/ethernetip/transport.go
touch internal/driver/ethernetip/decoder.go
touch internal/driver/ethernetip/scheduler.go
touch internal/driver/ethernetip/transport_test.go
touch internal/driver/ethernetip/decoder_test.go
touch internal/driver/ethernetip/scheduler_test.go
第二步:定义核心接口
// ENIPClient 接口别名,方便mock测试
type ENIPClient interface {
Connect() error
Disconnect() error
ReadTag(tag string, count int) ([]byte, error)
WriteTag(tag string, data []byte) error
ReadTags(tags []string) (map[string][]byte, error)
Ping() error
}
第三步:建立依赖注入机制
type ENIPTransport struct {
clientFactory func(address string, slot int) ENIPClient
// ...
}
1. 传输层 - internal/driver/ethernetip/transport.go
4.1.3 传输层实现方案(优先实现)
传输层是整个驱动的基础,负责管理TCP连接和CIP会话。
4.1.3.1 核心职责
- 封装gologix的Client和连接管理
- 管理TCP连接生命周期:Connect/Disconnect/Reconnect
- 心跳保活:定期发送Ping验证连接存活
- 连接指标:连接时长、重连次数、本地/远程地址
- 配置解析:从DriverConfig.Config map中提取ip/port/slot/timeout等参数
- 支持CIP连接类型:CIP Connection Type
4.1.3.2 配置参数清单
| 参数名 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| ip | string | - | PLC IP地址(必填) |
| port | int | 44818 | EtherNet/IP默认端口 |
| slot | int | 0 | CPU插槽号 |
| timeout | int | 2000 | 连接超时时间(ms) |
| max_retries | int | 3 | 最大重试次数 |
| retry_interval | int | 100 | 重试间隔(ms) |
| heartbeat_interval | int | 30000 | 心跳间隔(ms) |
| buffer_size | int | 4096 | 缓冲区大小 |
4.1.3.3 结构体设计
// ENIPTransport EtherNet/IP传输层
type ENIPTransport struct {
cfg map[string]any
client gologix.Client
// 依赖注入(用于测试)
clientFactory func(address string, slot int) *gologix.Client
// 配置参数
ip string
port int
slot int
timeout time.Duration
maxRetries int
retryInterval time.Duration
// 连接状态
connected bool
mu sync.Mutex
connectTime time.Time
lastDisconnectTime time.Time
reconnectCount int64
localAddr string
remoteAddr string
// 心跳
heartbeatInterval time.Duration
heartbeatTicker *time.Ticker
stopHeartbeat chan struct{}
// 会话健康
lastActivityTime time.Time
heartbeatFailCount int32
heartbeatFailMax int32
}
4.1.3.4 核心方法实现
构造函数:
func NewENIPTransport(cfg map[string]any) *ENIPTransport {
t := &ENIPTransport{
cfg: cfg,
port: 44818,
slot: 0,
timeout: 2 * time.Second,
maxRetries: 3,
retryInterval: 100 * time.Millisecond,
heartbeatInterval: 30 * time.Second,
heartbeatFailMax: 3,
stopHeartbeat: make(chan struct{}),
}
// 设置默认工厂函数
t.clientFactory = func(address string, slot int) *gologix.Client {
client := gologix.NewClient(address)
client.Slot = slot
return client
}
// 解析配置
t.parseConfig()
return t
}
配置解析:
func (t *ENIPTransport) parseConfig() {
// IP地址(必填)
if v, ok := t.cfg["ip"].(string); ok {
t.ip = v
}
// 端口
if v, ok := t.cfg["port"].(float64); ok {
t.port = int(v)
} else if v, ok := t.cfg["port"].(int); ok {
t.port = v
}
// 插槽号
if v, ok := t.cfg["slot"].(float64); ok {
t.slot = int(v)
} else if v, ok := t.cfg["slot"].(int); ok {
t.slot = v
}
// 超时时间
if v, ok := t.cfg["timeout"].(float64); ok {
t.timeout = time.Duration(v) * time.Millisecond
}
// 重试次数
if v, ok := t.cfg["max_retries"].(float64); ok {
t.maxRetries = int(v)
}
// 心跳间隔
if v, ok := t.cfg["heartbeat_interval"].(float64); ok {
t.heartbeatInterval = time.Duration(v) * time.Millisecond
}
}
连接管理:
func (t *ENIPTransport) Connect(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.connected {
return nil
}
address := fmt.Sprintf("%s:%d", t.ip, t.port)
t.client = t.clientFactory(address, t.slot)
var err error
for i := 0; i < t.maxRetries; i++ {
err = t.client.Connect()
if err == nil {
break
}
if i < t.maxRetries-1 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(t.retryInterval):
}
}
}
if err != nil {
return fmt.Errorf("ENIP connection failed after %d retries: %w", t.maxRetries, err)
}
t.connected = true
t.connectTime = time.Now()
t.remoteAddr = address
t.reconnectCount++
// 启动心跳
t.startHeartbeat()
return nil
}
func (t *ENIPTransport) Disconnect() error {
t.mu.Lock()
defer t.mu.Unlock()
if !t.connected || t.client == nil {
return nil
}
// 停止心跳
t.stopHeartbeat <- struct{}{}
err := t.client.Disconnect()
t.connected = false
t.lastDisconnectTime = time.Now()
return err
}
心跳机制:
func (t *ENIPTransport) startHeartbeat() {
t.heartbeatTicker = time.NewTicker(t.heartbeatInterval)
go func() {
for {
select {
case <-t.stopHeartbeat:
t.heartbeatTicker.Stop()
return
case <-t.heartbeatTicker.C:
t.mu.Lock()
if t.connected && t.client != nil {
if err := t.client.Ping(); err != nil {
atomic.AddInt32(&t.heartbeatFailCount, 1)
if atomic.LoadInt32(&t.heartbeatFailCount) >= t.heartbeatFailMax {
// 心跳失败次数达到上限,触发重连
go t.reconnect()
}
} else {
atomic.StoreInt32(&t.heartbeatFailCount, 0)
}
}
t.mu.Unlock()
}
}
}()
}
func (t *ENIPTransport) reconnect() {
t.mu.Lock()
if !t.connected {
t.mu.Unlock()
return
}
_ = t.client.Disconnect()
t.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), t.timeout)
defer cancel()
for i := 0; i < t.maxRetries; i++ {
if err := t.Connect(ctx); err == nil {
return
}
time.Sleep(t.retryInterval * time.Duration(i+1))
}
}
连接指标:
func (t *ENIPTransport) GetConnectionMetrics() (
connectionSeconds int64,
reconnectCount int64,
localAddr string,
remoteAddr string,
lastDisconnectTime time.Time,
) {
t.mu.Lock()
defer t.mu.Unlock()
if t.connectTime.IsZero() {
return 0, 0, "", "", time.Time{}
}
connectionSeconds = int64(time.Since(t.connectTime).Seconds())
reconnectCount = t.reconnectCount
localAddr = t.localAddr
remoteAddr = t.remoteAddr
lastDisconnectTime = t.lastDisconnectTime
return
}
func (t *ENIPTransport) IsConnected() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.connected
}
4.1.3.5 依赖注入测试模式
为了支持单元测试,采用依赖注入模式:
// MockENIPClient 用于测试的mock客户端
type MockENIPClient struct {
connectFunc func() error
disconnectFunc func() error
readTagFunc func(tag string, count int) ([]byte, error)
writeTagFunc func(tag string, data []byte) error
pingFunc func() error
}
func (m *MockENIPClient) Connect() error {
if m.connectFunc != nil {
return m.connectFunc()
}
return nil
}
func (m *MockENIPClient) Disconnect() error {
if m.disconnectFunc != nil {
return m.disconnectFunc()
}
return nil
}
func (m *MockENIPClient) ReadTag(tag string, count int) ([]byte, error) {
if m.readTagFunc != nil {
return m.readTagFunc(tag, count)
}
return nil, nil
}
func (m *MockENIPClient) WriteTag(tag string, data []byte) error {
if m.writeTagFunc != nil {
return m.writeTagFunc(tag, data)
}
return nil
}
func (m *MockENIPClient) Ping() error {
if m.pingFunc != nil {
return m.pingFunc()
}
return nil
}
在测试中使用mock:
func TestENIPTransport_Connect(t *testing.T) {
mockClient := &MockENIPClient{
connectFunc: func() error { return nil },
}
transport := NewENIPTransport(map[string]any{
"ip": "192.168.1.10",
"port": 44818,
})
// 注入mock客户端
transport.clientFactory = func(address string, slot int) *gologix.Client {
// 返回mock包装的客户端
return mockClient
}
err := transport.Connect(context.Background())
assert.NoError(t, err)
assert.True(t, transport.IsConnected())
}
2. 解码器 - internal/driver/ethernetip/decoder.go
- EtherNet/IP Tag地址解析:支持格式
tag_name(简单标签)、tag_name[index](数组元素)、tag_name.field(结构体字段) - 地址结构:TagName + ElementSize + ArrayIndex + Path
- 数据编解码:使用gologix的数据类型进行字节序转换
- 寄存器计数:根据DataType确定需要读取的字节数
3. 调度器 - internal/driver/ethernetip/scheduler.go
- 按Tag名称对点位分组
- 使用gologix的批量读取API,减少PDU往返
- 适配EtherNet/IP CMU (Connection Manager)限制,自动拆分大数据块
- 指令间隔控制,避免PLC过载
4. 驱动主入口 - internal/driver/ethernetip/ethernetip.go(重构)
- 组合transport/decoder/scheduler三层
- 实现Driver接口所有方法
- Init时根据配置创建三层组件
- Connect时建立真实TCP连接
- ReadPoints通过scheduler批量读取
- WritePoint通过单点写入
- Health基于真实连接状态判断
4.2 EtherNet/IP地址格式
// 简单标签
Motor_Speed -> TagName="Motor_Speed"
// 数组元素
Temperatures[0] -> TagName="Temperatures", Index=0
// 数组范围
Temperatures[0..9] -> TagName="Temperatures", StartIndex=0, EndIndex=9
// 结构体字段
Controller.Scanner -> TagName="Controller.Scanner"
// 位域访问 (部分数据类型支持)
Motor_Status.0 -> TagName="Motor_Status", BitPosition=0
4.3 支持的PLC类型与默认参数
| PLC类型 | 默认端口 | 默认插槽 | 连接类型 |
|---|---|---|---|
| ControlLogix | 44818 | 0 | CIP Direct |
| CompactLogix | 44818 | 0 | CIP Direct |
| Micro800 | 44818 | 0 | CIP Direct |
| SLC500 | 44818 | N/A | Backplane |
| PLC-5 | 44818 | N/A | Backplane |
4.4 数据类型映射
| Point Type | CIP Data Type | Size (bytes) |
|---|---|---|
| bool | BOOL | 1 |
| sint | SINT | 1 |
| int | INT | 2 |
| dint | DINT | 4 |
| real | REAL | 4 |
| string | STRING | 88 |
4.5 批量读取策略
- 按Tag名称分组点位
- 同一组内使用go-ethernet-ip的ReadTags批量读取
- 自动构建Tag列表,包含TagName和ElementSize
- 读取后逐项检查Status字段并解析Data缓冲区
- 超过最大连接数时自动分批处理
- 支持配置batch_read_max限制单次最大读取点数(默认50)
5. 前端配置增强
ChannelList.vue EtherNet/IP配置区域扩展
在现有IP/端口/插槽基础上增加:
- 超时时间(timeout): 默认2000ms
- 重试次数(max_retries): 默认1
- 心跳间隔(heartbeat_interval): 默认30000ms
- 缓冲区大小(buffer_size): 默认4096字节
- QoS等级(qos): 默认1
- 连接超时(connect_timeout): 毫秒
- 连接类型(connection_type): Direct/Backplane下拉选择
- 批量读取最大值(batch_read_max): 默认50
6. go.mod依赖
新增: github.com/anviod/ethernet-ip
7. 实现细节
7.1 CIP协议基础知识
EtherNet/IP会话建立流程:
- TCP连接到44818端口
- 发送RegisterSession请求
- 接收Session ID
- 使用Session ID进行后续通信
- 通信结束时发送UnregisterSession
CIP通用指令:
- ReadTag - 读取单个标签
- ReadTagFragmented - 读取大标签(超过连接缓冲区)
- WriteTag - 写入单个标签
- WriteTagFragmented - 写入大标签
- MultipleServicePacket - 批量读写
7.2 go-ethernet-ip库结构
github.com/anviod/ethernet-ip/
├── bufferx/ # 缓冲区管理
├── command/ # CIP命令封装
├── messages/ # 消息编解码
├── path/ # CIP路径解析
├── types/ # 数据类型定义
├── utils/ # 工具函数
├── config.go # 配置结构
├── context.go # 上下文管理
├── request.go # 请求处理
├── tag.go # Tag操作
└── tcp.go # TCP连接
7.3 关键实现代码模式
连接管理:
// 创建TCP连接
tcp := ethernetip.NewTCP("192.168.1.10:44818")
// 创建会话
session, err := tcp.CreateSession()
if err != nil {
return err
}
// 注册会话
err = session.Register()
if err != nil {
return err
}
// 读取Tag
data, err := session.ReadTag("Motor_Speed", 1)
if err != nil {
return err
}
// 写入Tag
err = session.WriteTag("Motor_Speed", data)
// 关闭会话
session.Unregister()
session.Close()
依赖注入测试模式:
// 使用clientFactory注入mock对象进行单元测试
type ENIPTransport struct {
clientFactory func(address string) ENIPClient
// ...
}
8. 测试计划
8.1 单元测试
| 测试文件 | 测试用例数 | 覆盖范围 |
|---|---|---|
| decoder_test.go | 10 | Tag解析、数据类型编解码、配置解析 |
| transport_test.go | 8 | 连接管理、心跳控制、重试逻辑、指标统计 |
| scheduler_test.go | 6 | 批量读取、调度策略、分批处理 |
8.2 集成测试
| 测试用例 | 测试内容 | 预期结果 |
|---|---|---|
| TestEtherNetIPConnect | 连接真实PLC | 连接成功,获取Session ID |
| TestReadSingleTag | 读取单个Tag | 正确解析数据类型和值 |
| TestReadMultipleTags | 批量读取多个Tag | 减少网络往返,提高效率 |
| TestWriteTag | 写入Tag值 | 写入成功,PLC端数据更新 |
| TestConnectionRecovery | 连接断开后自动重连 | 自动重连,恢复通信 |
| TestHeartbeat | 心跳保活机制 | 定期发送心跳,保持连接 |
8.3 验收测试
| 检查项 | 标准 |
|---|---|
| 连接稳定性 | 连续运行24小时无断连 |
| 数据准确性 | 读取值与PLC端一致 |
| 批量读取效率 | 10个Tag批量读取比逐个读取快80%以上 |
| 重连恢复时间 | 断连后3秒内自动重连 |
| 内存占用 | 稳定状态下内存占用不超过50MB |
9. 文件结构
internal/driver/ethernetip/
├── ethernetip.go # 驱动主入口
├── transport.go # 传输层(TCP连接、会话管理)
├── decoder.go # 地址解码器(Tag解析、数据类型)
├── scheduler.go # 调度器(批量读取优化)
├── transport_test.go # 传输层单元测试
├── decoder_test.go # 解码器单元测试
├── scheduler_test.go # 调度器单元测试
└── integration_test.go # 集成测试
10. 风险评估
| 风险 | 影响 | 缓解措施 |
|---|---|---|
| go-ethernet-ip库不稳定 | 高 | 评估库的质量,考虑使用gologix作为备选 |
| 不同PLC兼容性 | 中 | 针对主流PLC型号测试,保留配置扩展性 |
| 批量读取限制 | 中 | 实现分批读取机制,处理超限情况 |
| 连接超时处理 | 中 | 实现智能超时和重试机制 |
11. 里程碑
- Phase 1: 基础连接管理 - 完成transport.go实现
- Phase 2: Tag读写功能 - 完成decoder.go和scheduler.go实现
- Phase 3: 单元测试 - 完成各层单元测试
- Phase 4: 集成测试 - 连接真实PLC验证功能
- Phase 5: 前端集成 - 配置界面增强
- Phase 6: 性能优化 - 批量读取调优