From 88f090b66f2c9cc2cb151dbfe9f3275b237df85c Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 8 Feb 2026 23:11:38 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9Amodbus-tcp-sl?= =?UTF-8?q?ave=E3=80=81modbus-tcp-master=20=E6=8E=A5=E7=9D=80=E5=A4=A7?= =?UTF-8?q?=E9=87=8F=E4=BC=98=E5=8C=96=EF=BC=8C=E5=B9=B6=E4=B8=94=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=20modbus=20rtu=20=E7=BC=96=E8=A7=A3=E7=A0=81=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDeviceModbusConfigSaveReqVO.java | 4 - .../device/IotDeviceModbusPointDO.java | 8 +- .../device/IotDeviceModbusPointMapper.java | 5 ++ .../iot/enums/rule/IotDataSinkTypeEnum.java | 4 +- .../device/IotDeviceMessageSubscriber.java | 1 - .../rule/IotSceneRuleMessageHandler.java | 1 - .../IotDeviceModbusConfigServiceImpl.java | 33 ++++++- .../device/IotDeviceModbusPointService.java | 9 ++ .../IotDeviceModbusPointServiceImpl.java | 7 +- .../action/IotDataRuleCacheableAction.java | 2 - .../thingmodel/IotThingModelServiceImpl.java | 10 ++- .../common/utils/IotModbusCommonUtils.java | 21 +++++ .../tcpmaster/IotModbusTcpMasterProtocol.java | 1 - .../IotModbusTcpMasterConfigCacheService.java | 2 - .../IotModbusTcpMasterConnectionManager.java | 31 ++++--- .../tcpslave/IotModbusTcpSlaveConfig.java | 6 -- .../tcpslave/codec/IotModbusFrameDecoder.java | 43 ++++++++- .../tcpslave/codec/IotModbusFrameEncoder.java | 3 - .../IotModbusTcpSlaveDownstreamHandler.java | 1 - .../IotModbusTcpSlaveConfigCacheService.java | 87 ++----------------- ...otModbusTcpSlavePendingRequestManager.java | 25 +----- .../upstream/IotMqttUpstreamHandler.java | 6 ++ .../upstream/IotUdpUpstreamHandler.java | 1 - .../iot/gateway/util/IotMqttTopicUtils.java | 2 - ...=> IoTModbusTcpMasterIntegrationTest.java} | 62 ++++++++----- ... IotModbusTcpSlaveRtuIntegrationTest.java} | 36 ++++---- ... IotModbusTcpSlaveTcpIntegrationTest.java} | 26 ++---- 27 files changed, 224 insertions(+), 213 deletions(-) rename yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/{ModbusTcpSlaveSimulatorTest.java => IoTModbusTcpMasterIntegrationTest.java} (53%) rename yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/{IotModbusTcpSlaveModbusRtuIntegrationTest.java => IotModbusTcpSlaveRtuIntegrationTest.java} (93%) rename yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/{IotModbusTcpSlaveModbusTcpIntegrationTest.java => IotModbusTcpSlaveTcpIntegrationTest.java} (95%) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java index 9fa3fdc7c2..9b08d88555 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java @@ -16,13 +16,9 @@ public class IotDeviceModbusConfigSaveReqVO { private Long deviceId; @Schema(description = "Modbus 服务器 IP 地址", example = "192.168.1.100") -// @NotEmpty(message = "Modbus 服务器 IP 地址不能为空") - // TODO @AI:这个字段,要根据情况校验; private String ip; @Schema(description = "Modbus 端口", example = "502") -// @NotNull(message = "Modbus 端口不能为空") - // TODO @AI:这个字段,要根据情况校验; private Integer port; @Schema(description = "从站地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusPointDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusPointDO.java index dd3a3a9609..2937476cff 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusPointDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusPointDO.java @@ -29,7 +29,12 @@ public class IotDeviceModbusPointDO extends TenantBaseDO { */ @TableId private Long id; - // TODO @AI:增加 productId; + /** + * 产品编号 + * + * 关联 {@link cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO#getId()} + */ + private Long productId; /** * 设备编号 * @@ -42,7 +47,6 @@ public class IotDeviceModbusPointDO extends TenantBaseDO { * 关联 {@link IotThingModelDO#getId()} */ private Long thingModelId; - // TODO @AI:每次物模型的变更时,需要按需刷下 identifier、name 配置; /** * 属性标识符 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusPointMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusPointMapper.java index 74dc463e41..7c9b5d3bae 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusPointMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusPointMapper.java @@ -39,4 +39,9 @@ public interface IotDeviceModbusPointMapper extends BaseMapperX() + .eq(IotDeviceModbusPointDO::getThingModelId, thingModelId)); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java index 440fab5f53..96b477d69d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java @@ -19,9 +19,9 @@ public enum IotDataSinkTypeEnum implements ArrayValuable { TCP(2, "TCP"), WEBSOCKET(3, "WebSocket"), - MQTT(10, "MQTT"), // TODO 待实现; + MQTT(10, "MQTT"), // TODO @puhui999:待实现; - DATABASE(20, "Database"), // TODO @puhui999:待实现;可以简单点,对应的表名是什么,字段先固定了。 + DATABASE(20, "Database"), // TODO @puhui999:待实现; REDIS(21, "Redis"), ROCKETMQ(30, "RocketMQ"), diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java index 7e039d0327..c6e0ba4221 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceMessageSubscriber.java @@ -67,7 +67,6 @@ public class IotDeviceMessageSubscriber implements IotMessageSubscriber getDeviceModbusPointPage(IotDeviceModbusPointPageReqVO pageReqVO); + /** + * 物模型变更时,更新关联点位的冗余字段(identifier、name) + * + * @param thingModelId 物模型编号 + * @param identifier 物模型标识符 + * @param name 物模型名称 + */ + void updateDeviceModbusPointByThingModel(Long thingModelId, String identifier, String name); + /** * 根据设备编号批量获得启用的点位配置 Map * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusPointServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusPointServiceImpl.java index 906697b15b..7683aa7ecc 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusPointServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusPointServiceImpl.java @@ -75,7 +75,12 @@ public class IotDeviceModbusPointServiceImpl implements IotDeviceModbusPointServ modbusPointMapper.updateById(updateObj); } - // TODO @AI:物模型更新的时候,更新下 identifier、name 信息;例如说 updateDeviceModbusPoint(thingModelId, identifier、name) 方法; + @Override + public void updateDeviceModbusPointByThingModel(Long thingModelId, String identifier, String name) { + IotDeviceModbusPointDO updateObj = new IotDeviceModbusPointDO() + .setIdentifier(identifier).setName(name); + modbusPointMapper.updateByThingModelId(thingModelId, updateObj); + } private IotThingModelDO validateThingModelExists(Long id) { IotThingModelDO thingModel = thingModelService.getThingModel(id); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java index 4319469082..cc282e1b8d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotDataRuleCacheableAction.java @@ -14,8 +14,6 @@ import java.time.Duration; // TODO @芋艿:数据库 // TODO @芋艿:mqtt -// TODO @芋艿:tcp -// TODO @芋艿:websocket /** * 可缓存的 {@link IotDataRuleAction} 抽象实现 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/thingmodel/IotThingModelServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/thingmodel/IotThingModelServiceImpl.java index ca04ecd5f3..18a5b62e2a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/thingmodel/IotThingModelServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/thingmodel/IotThingModelServiceImpl.java @@ -15,6 +15,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO; import cn.iocoder.yudao.module.iot.dal.mysql.thingmodel.IotThingModelMapper; import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; import cn.iocoder.yudao.module.iot.enums.product.IotProductStatusEnum; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceModbusPointService; import cn.iocoder.yudao.module.iot.service.product.IotProductService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -51,6 +52,9 @@ public class IotThingModelServiceImpl implements IotThingModelService { @Lazy // 延迟加载,解决循环依赖 private IotProductService productService; + @Resource + private IotDeviceModbusPointService deviceModbusPointService; + @Override @Transactional(rollbackFor = Exception.class) public Long createThingModel(IotThingModelSaveReqVO createReqVO) { @@ -84,7 +88,11 @@ public class IotThingModelServiceImpl implements IotThingModelService { IotThingModelDO thingModel = IotThingModelConvert.INSTANCE.convert(updateReqVO); thingModelMapper.updateById(thingModel); - // 3. 删除缓存 + // 3. 同步更新 Modbus 点位的冗余字段(identifier、name) + deviceModbusPointService.updateDeviceModbusPointByThingModel( + updateReqVO.getId(), updateReqVO.getIdentifier(), updateReqVO.getName()); + + // 4. 删除缓存 deleteThingModelListCache(updateReqVO.getProductId()); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java index 23ee4bf124..32e7292964 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java @@ -500,6 +500,27 @@ public class IotModbusCommonUtils { return values; } + /** + * 从响应帧中提取 registerCount(通过 PDU 的 byteCount 推断) + * + * @param frame 解码后的 Modbus 响应帧 + * @return registerCount,无法提取时返回 -1(匹配时跳过校验) + */ + public static int extractRegisterCountFromResponse(IotModbusFrame frame) { + byte[] pdu = frame.getPdu(); + if (pdu == null || pdu.length < 1) { + return -1; + } + int byteCount = pdu[0] & 0xFF; + int fc = frame.getFunctionCode(); + // FC03/04 寄存器读响应:registerCount = byteCount / 2 + if (fc == FC_READ_HOLDING_REGISTERS || fc == FC_READ_INPUT_REGISTERS) { + return byteCount / 2; + } + // FC01/02 线圈/离散输入读响应:按 bit 打包有余位,无法精确反推,返回 -1 跳过校验 + return -1; + } + // ==================== 点位查找 ==================== /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java index 1fac973a92..1b284e4e4c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java @@ -84,7 +84,6 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); this.configCacheService = new IotModbusTcpMasterConfigCacheService(deviceApi); - // DONE @AI:上线/下线消息已移到 ConnectionManager 内部处理,不再走回调 this.connectionManager = new IotModbusTcpMasterConnectionManager(redissonClient, vertx, messageService, configCacheService, serverId); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java index 7ce6dd02fc..aa6273485f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java @@ -79,8 +79,6 @@ public class IotModbusTcpMasterConfigCacheService { /** * 计算已删除设备的 ID 集合,清理缓存,并更新已知设备 ID 集合 * - * DONE @AI:不再使用 callback 模式,返回已删除的设备 ID 集合,由调用方直接清理 - * * @param currentConfigs 当前有效的配置列表 * @return 已删除的设备 ID 集合 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java index 34b68f65b6..ebbca8e2a0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java @@ -96,18 +96,19 @@ public class IotModbusTcpMasterConnectionManager { connection = connectionPool.get(connectionKey); if (connection != null) { addDeviceAndOnline(connection, config); + lock.unlock(); return; } // 3.2 创建新连接 connection = createConnection(config); + connection.setLock(lock); connectionPool.put(connectionKey, connection); log.info("[ensureConnection][创建 Modbus 连接成功: {}]", connectionKey); // 3.3 注册设备并发送上线消息 addDeviceAndOnline(connection, config); } catch (Exception e) { log.error("[ensureConnection][创建 Modbus 连接失败: {}]", connectionKey, e); - } finally { - // TODO @AI:如果这里释放,会不会出现,集群模式下,多个节点同时创建连接的情况?需要验证一下 Redisson 的分布式锁特性?! + // 建连失败,释放锁让其他节点可重试 lock.unlock(); } } @@ -231,11 +232,11 @@ public class IotModbusTcpMasterConnectionManager { if (connection.getTcpConnection() != null) { connection.getTcpConnection().close(); } - // 强制解锁,避免死锁(正常情况下应该不会发生锁未释放的情况) -// RLock lock = connection.getLock(); -// if (lock != null && lock.isLocked()) { -// lock.forceUnlock(); -// } + // 释放分布式锁,让其他节点可接管 + RLock lock = connection.getLock(); + if (lock != null && lock.isHeldByCurrentThread()) { + lock.unlock(); + } log.info("[closeConnection][关闭 Modbus 连接: {}]", connectionKey); } catch (Exception e) { log.error("[closeConnection][关闭连接失败: {}]", connectionKey, e); @@ -266,17 +267,21 @@ public class IotModbusTcpMasterConnectionManager { private TCPMasterConnection tcpConnection; private Integer timeout; private Integer retryInterval; - - private Context context; - - // TODO @AI:是不是需要 lock?!避免集群模式下的竞争(肯定不能让别的节点连接上)!!!【另外,RLock 在节点(持有所锁的节点) cransh 的时候,会自动释放】 -// private RLock lock; - /** * 设备 ID 到 slave ID 的映射 */ private final Map deviceSlaveMap = new ConcurrentHashMap<>(); + /** + * 分布式锁,锁住连接的创建和销毁,避免多节点重复连接同一从站 + */ + private RLock lock; + + /** + * Vert.x Context,用于 executeBlocking 执行 Modbus 操作,保证同一连接的操作串行执行 + */ + private Context context; + public void addDevice(Long deviceId, Integer slaveId) { deviceSlaveMap.put(deviceId, slaveId); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveConfig.java index d21c28e676..60185f1eb0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveConfig.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveConfig.java @@ -41,10 +41,4 @@ public class IotModbusTcpSlaveConfig { @NotNull(message = "请求清理间隔不能为空") private Integer requestCleanupInterval = 10000; - // TODO @AI:可以去掉这个开关,因为本身就是模拟的,稍后我自己也会手动或者让你去掉(听我指令!) - /** - * 是否启用 Mock 测试数据(仅开发/测试环境使用,线上务必关闭) - */ - private Boolean mockEnabled = false; - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java index b47f2a11be..1415b51779 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java @@ -29,21 +29,36 @@ import java.util.function.BiConsumer; @Slf4j public class IotModbusFrameDecoder { + private static final Boolean REQUEST_MODE_DEFAULT = false; + /** * 自定义功能码 */ private final int customFunctionCode; /** - * 创建带自动帧格式检测的 RecordParser + * 创建带自动帧格式检测的 RecordParser(默认响应模式) * * @param frameHandler 完整帧回调(解码后的 IotModbusFrame + 检测到的帧格式) * @return RecordParser 实例 */ public RecordParser createRecordParser(BiConsumer frameHandler) { + return createRecordParser(frameHandler, REQUEST_MODE_DEFAULT); + } + + /** + * 创建带自动帧格式检测的 RecordParser + * + * @param frameHandler 完整帧回调(解码后的 IotModbusFrame + 检测到的帧格式) + * @param requestMode 是否为请求模式(true:接收方收到的是 Modbus 请求帧,FC01-04 按固定 8 字节解析; + * false:接收方收到的是 Modbus 响应帧,FC01-04 按 byteCount 变长解析) + * @return RecordParser 实例 + */ + public RecordParser createRecordParser(BiConsumer frameHandler, + boolean requestMode) { // 先创建一个 RecordParser:使用 fixedSizeMode(6) 读取首帧前 6 字节进行帧格式检测 RecordParser parser = RecordParser.newFixed(6); - parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler)); + parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler, requestMode)); return parser; } @@ -150,6 +165,7 @@ public class IotModbusFrameDecoder { private final RecordParser parser; private final int customFunctionCode; private final BiConsumer frameHandler; + private final boolean requestMode; @Override public void handle(Buffer buffer) { @@ -169,7 +185,7 @@ public class IotModbusFrameDecoder { } else { // MODBUS_RTU:切换到 RTU 拆包 Handler log.debug("[DetectPhaseHandler][检测到 MODBUS_RTU 帧格式]"); - RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, frameHandler, customFunctionCode); + RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, frameHandler, customFunctionCode, requestMode); parser.handler(rtuHandler); // 当前 bytes 包含前 6 字节(slaveId + FC + 部分数据),交给 rtuHandler 处理 rtuHandler.handleFirstBytes(bytes); @@ -248,6 +264,9 @@ public class IotModbusFrameDecoder { * - 自定义 FC / FC01-04 响应:fixedSizeMode(1) → 读 byteCount → fixedSizeMode(byteCount + 2) * - FC05/06 响应:fixedSizeMode(6) → addr(2) + value(2) + CRC(2) * - FC15/16 响应:fixedSizeMode(6) → addr(2) + quantity(2) + CRC(2) + *

+ * 请求模式(requestMode=true)时,FC01-04 按固定 8 字节解析(与写响应相同路径), + * 因为读请求格式为 [SlaveId(1)][FC(1)][StartAddr(2)][Quantity(2)][CRC(2)] */ @RequiredArgsConstructor private class RtuFrameHandler implements Handler { @@ -261,6 +280,12 @@ public class IotModbusFrameDecoder { private final RecordParser parser; private final BiConsumer frameHandler; private final int customFunctionCode; + /** + * 请求模式: + * - true 表示接收方收到的是 Modbus 请求帧(如设备端收到网关下发的读请求),FC01-04 按固定 8 字节帧解析 + * - false 表示接收方收到的是 Modbus 响应帧,FC01-04 按 byteCount 变长解析 + */ + private final boolean requestMode; private int state = STATE_HEADER; private byte slaveId; @@ -289,6 +314,13 @@ public class IotModbusFrameDecoder { frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC emitFrame(frame); resetToHeader(); + } else if (IotModbusCommonUtils.isReadResponse(fc) && requestMode) { + // 请求模式下的读请求:固定 8 字节 [SlaveId(1)][FC(1)][StartAddr(2)][Quantity(2)][CRC(2)] + // 已有 6 字节,还需 2 字节(CRC) + state = STATE_WRITE_BODY; + this.pendingData = Buffer.buffer(); + this.pendingData.appendBytes(bytes, 2, 4); // 暂存已有的 4 字节(StartAddr + Quantity) + parser.fixedSizeMode(2); // 还需 2 字节(CRC) } else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) { // 读响应或自定义 FC:bytes[2] = byteCount this.byteCount = bytes[2]; @@ -359,6 +391,11 @@ public class IotModbusFrameDecoder { // 异常响应 state = STATE_EXCEPTION_BODY; parser.fixedSizeMode(3); // exceptionCode(1) + CRC(2) + } else if (IotModbusCommonUtils.isReadResponse(fc) && requestMode) { + // 请求模式下的读请求:固定 8 字节,已读 2 字节(slaveId + FC),还需 6 字节 + state = STATE_WRITE_BODY; + pendingData = Buffer.buffer(); + parser.fixedSizeMode(6); // StartAddr(2) + Quantity(2) + CRC(2) } else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) { // 读响应或自定义 FC state = STATE_READ_BYTE_COUNT; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java index 36323cbda5..2a2bb3574a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java @@ -46,9 +46,6 @@ public class IotModbusFrameEncoder { /** * 编码写请求(单个寄存器 FC06 / 单个线圈 FC05) * - * DONE @AI:【from codex】【高】FC05 写线圈时,value 已转换为 Modbus 标准值(非0 → 0xFF00,0 → 0x0000); - * 新增 encodeWriteMultipleCoilsRequest 方法用于 FC15 编码(按 bit 打包)。 - * * @param slaveId 从站地址 * @param functionCode 功能码 * @param address 寄存器地址 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java index 727cc7cea4..900b01a47c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java @@ -109,7 +109,6 @@ public class IotModbusTcpSlaveDownstreamHandler { // 1.2 确定帧格式和事务 ID IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat(); Assert.notNull(frameFormat, "连接帧格式不能为空"); - // TODO @AI:【from 芋艿】需要和按照 deviceId 进行自增么??? Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP ? (transactionIdCounter.incrementAndGet() & 0xFFFF) : null; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java index a998a77079..cf8589861b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java @@ -6,15 +6,15 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.math.BigDecimal; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -33,8 +33,6 @@ public class IotModbusTcpSlaveConfigCacheService { */ private final Map configCache = new ConcurrentHashMap<>(); - // ==================== 按需加载(认证时) ==================== - /** * 加载单个设备的配置(认证成功后调用) * @@ -66,8 +64,6 @@ public class IotModbusTcpSlaveConfigCacheService { } } - // ==================== 定时刷新(已连接设备) ==================== - /** * 刷新已连接设备的配置缓存 *

@@ -89,31 +85,17 @@ public class IotModbusTcpSlaveConfigCacheService { .setDeviceIds(connectedDeviceIds)); List modbusConfigs = result.getCheckedData(); - // 2. 追加 Mock 测试数据(仅 mockEnabled=true 时) - // TODO @芋艿:测试完成后移除 - // TODO @claude-code:【严重】同上,if(true) 导致 mockEnabled 开关失效,Mock 数据永远加载 - if (true) { - modbusConfigs.addAll(buildMockConfigs()); - } - - // 2. 只保留已连接设备的配置,更新缓存 - // TODO @AI:是不是直接添加到 configCache 缓存(或者覆盖),然后返回 modbusConfigs 就 ok 了?! - List connectedConfigs = new ArrayList<>(); + // 2. 更新缓存并返回 for (IotModbusDeviceConfigRespDTO config : modbusConfigs) { - if (connectedDeviceIds.contains(config.getDeviceId())) { - configCache.put(config.getDeviceId(), config); - connectedConfigs.add(config); - } + configCache.put(config.getDeviceId(), config); } - return connectedConfigs; + return modbusConfigs; } catch (Exception e) { log.error("[refreshConnectedDeviceConfigList][刷新配置失败]", e); return null; } } - // ==================== 缓存操作 ==================== - /** * 获取设备配置 */ @@ -133,59 +115,4 @@ public class IotModbusTcpSlaveConfigCacheService { configCache.remove(deviceId); } - // ==================== Mock 数据 ==================== - - /** - * 构建 Mock 测试配置数据(一次性测试用途) - * - * 设备:PRODUCT_KEY=4aymZgOTOOCrDKRT, DEVICE_NAME=small - * 点位:temperature(FC03, 地址 0)、humidity(FC03, 地址 1) - * - * TODO @芋艿:测试完成后移除 - */ - private List buildMockConfigs() { - IotModbusDeviceConfigRespDTO config = new IotModbusDeviceConfigRespDTO(); - config.setDeviceId(25L); - config.setProductKey("4aymZgOTOOCrDKRT"); - config.setDeviceName("small"); - config.setSlaveId(1); - config.setMode(1); // 云端轮询 - config.setFrameFormat(IotModbusFrameFormatEnum.MODBUS_TCP.getFormat()); - - // 点位列表 - List points = new ArrayList<>(); - - // 点位 1:温度 - 保持寄存器 FC03, 地址 0, 1 个寄存器, INT16 - IotModbusPointRespDTO point1 = new IotModbusPointRespDTO(); - point1.setId(1L); - point1.setIdentifier("temperature"); - point1.setName("温度"); - point1.setFunctionCode(3); // FC03 读保持寄存器 - point1.setRegisterAddress(0); - point1.setRegisterCount(1); - point1.setRawDataType("INT16"); - point1.setByteOrder("BIG_ENDIAN"); - point1.setScale(new BigDecimal("0.1")); - point1.setPollInterval(5000); // 5 秒轮询一次 - points.add(point1); - - // 点位 2:湿度 - 保持寄存器 FC03, 地址 1, 1 个寄存器, UINT16 - IotModbusPointRespDTO point2 = new IotModbusPointRespDTO(); - point2.setId(2L); - point2.setIdentifier("humidity"); - point2.setName("湿度"); - point2.setFunctionCode(3); // FC03 读保持寄存器 - point2.setRegisterAddress(1); - point2.setRegisterCount(1); - point2.setRawDataType("UINT16"); - point2.setByteOrder("BIG_ENDIAN"); - point2.setScale(new BigDecimal("0.1")); - point2.setPollInterval(5000); // 5 秒轮询一次 - points.add(point2); - - config.setPoints(points); - log.info("[buildMockConfigs][已加载 Mock 配置, deviceId={}, points={}]", config.getDeviceId(), points.size()); - return Collections.singletonList(config); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java index a02601d666..76e7d11e94 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java @@ -78,7 +78,7 @@ public class IotModbusTcpSlavePendingRequestManager { return matchByTransactionId(queue, frame.getTransactionId()); } // RTU 模式:FIFO,匹配 slaveId + functionCode + registerCount - int responseRegisterCount = extractRegisterCountFromResponse(frame); + int responseRegisterCount = IotModbusCommonUtils.extractRegisterCountFromResponse(frame); return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode(), responseRegisterCount); } @@ -115,29 +115,6 @@ public class IotModbusTcpSlavePendingRequestManager { return null; } - // TODO @AI:是不是放到 modbus 工具类里,更合适? - /** - * 从响应帧中提取 registerCount(通过 PDU 的 byteCount 推断) - * - * @return registerCount,无法提取时返回 -1(匹配时跳过校验) - */ - private int extractRegisterCountFromResponse(IotModbusFrame frame) { - byte[] pdu = frame.getPdu(); - if (pdu == null || pdu.length < 1) { - return -1; - } - int byteCount = pdu[0] & 0xFF; - int fc = frame.getFunctionCode(); - // FC03/04 寄存器读响应:registerCount = byteCount / 2 - if (fc == IotModbusCommonUtils.FC_READ_HOLDING_REGISTERS - || fc == IotModbusCommonUtils.FC_READ_INPUT_REGISTERS) { - return byteCount / 2; - } - // FC01/02 线圈/离散输入读响应:registerCount = byteCount * 8(线圈数量) - // 但因为按 bit 打包有余位,无法精确反推,返回 -1 跳过校验 - return -1; - } - /** * 清理过期请求 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java index 00a0c4b849..6b452e4b7e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import io.vertx.mqtt.MqttEndpoint; import lombok.extern.slf4j.Slf4j; @@ -58,6 +59,11 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { Assert.notNull(connectionInfo, "无法获取连接信息"); Assert.equals(productKey, connectionInfo.getProductKey(), "产品 Key 不匹配"); Assert.equals(deviceName, connectionInfo.getDeviceName(), "设备名称不匹配"); + // 1.4 校验 topic 是否允许发布 + if (!IotMqttTopicUtils.isTopicPublishAllowed(topic, productKey, deviceName)) { + log.warn("[handleBusinessRequest][topic 不允许发布,客户端 ID: {},主题: {}]", clientId, topic); + return; + } // 2. 反序列化消息 message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java index 7b248ab7c9..4c83a4a256 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java @@ -91,7 +91,6 @@ public class IotUdpUpstreamHandler { this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); } - // TODO done @AI:vertx 有 udp 的实现么?当前已使用 Vert.x DatagramSocket 实现 /** * 处理 UDP 数据包 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index 249b31544f..e72597d8fe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -101,8 +101,6 @@ public final class IotMqttTopicUtils { * @param deviceName 设备名称 * @return 是否允许发布 */ - // TODO DONE @AI:这个逻辑,是不是 mqtt 协议,也要使用???答:是通用工具方法,MQTT 协议可按需调用; - // TODO @AI:那你改下 mqtt,也调用!!! public static boolean isTopicPublishAllowed(String topic, String productKey, String deviceName) { if (!StrUtil.isAllNotBlank(topic, productKey, deviceName)) { return false; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/ModbusTcpSlaveSimulatorTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IoTModbusTcpMasterIntegrationTest.java similarity index 53% rename from yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/ModbusTcpSlaveSimulatorTest.java rename to yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IoTModbusTcpMasterIntegrationTest.java index f901bc3973..40c8c5cdeb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/ModbusTcpSlaveSimulatorTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IoTModbusTcpMasterIntegrationTest.java @@ -3,21 +3,49 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster; import com.ghgande.j2mod.modbus.procimg.*; import com.ghgande.j2mod.modbus.slave.ModbusSlave; import com.ghgande.j2mod.modbus.slave.ModbusSlaveFactory; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; /** - * Modbus TCP 从站模拟器 + * Modbus TCP 从站模拟器(手动测试) * - * 用于测试 Modbus TCP 网关的连接和数据读写功能 + *

测试场景:模拟一个标准 Modbus TCP 从站设备,供 Modbus TCP Master 网关连接和读写数据 + * + *

使用步骤: + *

    + *
  1. 运行 {@link #testStartSlaveSimulator()} 启动模拟从站(默认端口 5020,从站地址 1)
  2. + *
  3. 启动 yudao-module-iot-gateway 服务(需开启 modbus-tcp-master 协议)
  4. + *
  5. 确保数据库有对应的 Modbus Master 设备配置(ip=127.0.0.1, port=5020, slaveId=1)
  6. + *
  7. 网关会自动连接模拟从站并开始轮询读取寄存器数据
  8. + *
  9. 模拟器每 5 秒自动更新输入寄存器和保持寄存器的值,模拟传感器数据变化
  10. + *
+ * + *

可用寄存器: + *

    + *
  • 线圈 (Coil, 功能码 01/05): 地址 0-9,交替 true/false
  • + *
  • 离散输入 (Discrete Input, 功能码 02): 地址 0-9,每 3 个一个 true
  • + *
  • 保持寄存器 (Holding Register, 功能码 03/06/16): 地址 0-19,初始值 0,100,200,...
  • + *
  • 输入寄存器 (Input Register, 功能码 04): 地址 0-19,初始值 1,11,21,...
  • + *
* * @author 芋道源码 */ -public class ModbusTcpSlaveSimulatorTest { +@Slf4j +@Disabled +public class IoTModbusTcpMasterIntegrationTest { private static final int PORT = 5020; private static final int SLAVE_ID = 1; - @SuppressWarnings({"InfiniteLoopStatement", "BusyWait", "CommentedOutCode"}) - public static void main(String[] args) throws Exception { + /** + * 启动 Modbus TCP 从站模拟器 + * + *

模拟器会持续运行,每 5 秒更新一次寄存器数据,直到手动停止 + */ + @SuppressWarnings({"InfiniteLoopStatement", "BusyWait"}) + @Test + public void testStartSlaveSimulator() throws Exception { // 1. 创建进程映像(Process Image),存储寄存器数据 SimpleProcessImage spi = new SimpleProcessImage(SLAVE_ID); @@ -53,24 +81,15 @@ public class ModbusTcpSlaveSimulatorTest { // 3.2 启动从站 slave.open(); - System.out.println("==================================================="); - System.out.println("Modbus TCP 从站模拟器已启动"); - System.out.println("端口: " + PORT); - System.out.println("从站地址 (Slave ID): " + SLAVE_ID); - System.out.println("==================================================="); - System.out.println("可用寄存器:"); - System.out.println(" - 线圈 (Coil, 功能码 01/05): 地址 0-9"); - System.out.println(" - 离散输入 (Discrete Input, 功能码 02): 地址 0-9"); - System.out.println(" - 保持寄存器 (Holding Register, 功能码 03/06/16): 地址 0-19"); - System.out.println(" - 输入寄存器 (Input Register, 功能码 04): 地址 0-19"); - System.out.println("==================================================="); - System.out.println("按 Ctrl+C 停止模拟器"); + log.info("[testStartSlaveSimulator][Modbus TCP 从站模拟器已启动, 端口: {}, 从站地址: {}]", PORT, SLAVE_ID); + log.info("[testStartSlaveSimulator][可用寄存器: 线圈(01/05) 0-9, 离散输入(02) 0-9, " + + "保持寄存器(03/06/16) 0-19, 输入寄存器(04) 0-19]"); // 4. 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("\n正在关闭模拟器..."); + log.info("[testStartSlaveSimulator][正在关闭模拟器...]"); slave.close(); - System.out.println("模拟器已关闭"); + log.info("[testStartSlaveSimulator][模拟器已关闭]"); })); // 5. 保持运行,定时更新输入寄存器模拟数据变化 @@ -87,9 +106,8 @@ public class ModbusTcpSlaveSimulatorTest { // 更新保持寄存器的第一个值 spi.getRegister(0).setValue(counter * 100); -// System.out.println("[" + java.time.LocalTime.now() + "] 数据已更新, counter=" + counter -// + ", 保持寄存器[0]=" + (counter * 100) -// + ", 输入寄存器[0]=" + (1 + counter)); + log.info("[testStartSlaveSimulator][数据已更新, counter={}, 保持寄存器[0]={}, 输入寄存器[0]={}]", + counter, counter * 100, 1 + counter); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveRtuIntegrationTest.java similarity index 93% rename from yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java rename to yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveRtuIntegrationTest.java index e103d6b9d3..286923377a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveRtuIntegrationTest.java @@ -1,14 +1,15 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave; +import cn.hutool.core.util.HexUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; @@ -24,6 +25,8 @@ import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; + /** * IoT Modbus TCP Slave 协议集成测试 — MODBUS_RTU 帧格式(手动测试) * @@ -46,7 +49,7 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Disabled -public class IotModbusTcpSlaveModbusRtuIntegrationTest { +public class IotModbusTcpSlaveRtuIntegrationTest { private static final String SERVER_HOST = "127.0.0.1"; private static final int SERVER_PORT = 503; @@ -65,9 +68,9 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { // ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) ===================== - private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; - private static final String DEVICE_NAME = "small"; - private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; + private static final String PRODUCT_KEY = "modbus_tcp_slave_product_demo"; + private static final String DEVICE_NAME = "modbus_tcp_slave_device_demo_rtu"; + private static final String DEVICE_SECRET = "af01c55eb8e3424bb23fc6c783936b2e"; @BeforeAll static void setUp() { @@ -104,6 +107,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { log.info("[testAuth][认证响应帧: slaveId={}, FC={}, customData={}]", response.getSlaveId(), response.getFunctionCode(), response.getCustomData()); JSONObject json = JSONUtil.parseObj(response.getCustomData()); + assertEquals(0, json.getInt("code")); log.info("[testAuth][认证结果: code={}, message={}]", json.getInt("code"), json.getStr("message")); } finally { socket.close(); @@ -122,10 +126,13 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { // 1. 先认证 IotModbusFrame authResponse = authenticate(socket); log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData()); + JSONObject authJson = JSONUtil.parseObj(authResponse.getCustomData()); + assertEquals(0, authJson.getInt("code")); // 2. 设置持续监听:每收到一个读请求,自动回复 log.info("[testPollingResponse][开始持续监听网关下发的读请求...]"); CompletableFuture done = new CompletableFuture<>(); + // 注意:使用 requestMode=true,因为设备端收到的是网关下发的读请求(非响应) RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> { log.info("[testPollingResponse][收到请求: slaveId={}, FC={}]", frame.getSlaveId(), frame.getFunctionCode()); @@ -144,7 +151,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { frame.getFunctionCode(), registerValues); socket.write(Buffer.buffer(responseData)); log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues); - }); + }, true); socket.handler(parser); // 3. 持续等待(200 秒),期间会自动回复所有收到的读请求 @@ -174,7 +181,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { IotModbusFrame writeRequest = waitForRequest(socket); log.info("[testPropertySetWrite][收到写请求: slaveId={}, FC={}, pdu={}]", writeRequest.getSlaveId(), writeRequest.getFunctionCode(), - bytesToHex(writeRequest.getPdu())); + HexUtil.encodeHexStr(writeRequest.getPdu())); } finally { socket.close(); } @@ -198,6 +205,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { */ private IotModbusFrame authenticate(NetSocket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + authInfo.setClientId(""); byte[] authFrame = buildAuthFrame(authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); return sendAndReceive(socket, authFrame); } @@ -291,18 +299,4 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { return frame; } - /** - * 字节数组转十六进制字符串 - */ - private static String bytesToHex(byte[] bytes) { - if (bytes == null) { - return "null"; - } - StringBuilder sb = new StringBuilder(); - for (byte b : bytes) { - sb.append(String.format("%02X ", b)); - } - return sb.toString().trim(); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveTcpIntegrationTest.java similarity index 95% rename from yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java rename to yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveTcpIntegrationTest.java index b8c1edb8a0..720887a914 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveTcpIntegrationTest.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave; +import cn.hutool.core.util.HexUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; @@ -25,7 +26,8 @@ import java.nio.ByteOrder; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -// TODO @芋艿:【晚点改】单测需要简化! +import static org.junit.jupiter.api.Assertions.assertEquals; + /** * IoT Modbus TCP Slave 协议集成测试 — MODBUS_TCP 帧格式(手动测试) * @@ -48,7 +50,7 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Disabled -public class IotModbusTcpSlaveModbusTcpIntegrationTest { +public class IotModbusTcpSlaveTcpIntegrationTest { private static final String SERVER_HOST = "127.0.0.1"; private static final int SERVER_PORT = 503; @@ -106,6 +108,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { log.info("[testAuth][认证响应帧: slaveId={}, FC={}, customData={}]", response.getSlaveId(), response.getFunctionCode(), response.getCustomData()); JSONObject json = JSONUtil.parseObj(response.getCustomData()); + assertEquals(0, json.getInt("code")); log.info("[testAuth][认证结果: code={}, message={}]", json.getInt("code"), json.getStr("message")); } finally { socket.close(); @@ -124,7 +127,8 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { // 1. 先认证 IotModbusFrame authResponse = authenticate(socket); log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData()); - // TODO @AI:这里断言下,认证必须成功! + JSONObject authJson = JSONUtil.parseObj(authResponse.getCustomData()); + assertEquals(0, authJson.getInt("code")); // 2. 设置持续监听:每收到一个读请求,自动回复 log.info("[testPollingResponse][开始持续监听网关下发的读请求...]"); @@ -176,7 +180,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { IotModbusFrame writeRequest = waitForRequest(socket); log.info("[testPropertySetWrite][收到写请求: slaveId={}, FC={}, transactionId={}, pdu={}]", writeRequest.getSlaveId(), writeRequest.getFunctionCode(), - writeRequest.getTransactionId(), bytesToHex(writeRequest.getPdu())); + writeRequest.getTransactionId(), HexUtil.encodeHexStr(writeRequest.getPdu())); } finally { socket.close(); } @@ -295,18 +299,4 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { return buf.array(); } - /** - * 字节数组转十六进制字符串 - */ - private static String bytesToHex(byte[] bytes) { - if (bytes == null) { - return "null"; - } - StringBuilder sb = new StringBuilder(); - for (byte b : bytes) { - sb.append(String.format("%02X ", b)); - } - return sb.toString().trim(); - } - }