feat(iot):modbus-tcp-slave、modbus-tcp-master 接着大量优化,并且修复 modbus rtu 编解码的问题

This commit is contained in:
YunaiV
2026-02-08 23:11:38 +08:00
parent 4e4c776bed
commit 88f090b66f
27 changed files with 224 additions and 213 deletions

View File

@@ -16,13 +16,9 @@ public class IotDeviceModbusConfigSaveReqVO {
private Long deviceId;
@Schema(description = "Modbus 服务器 IP 地址", example = "192.168.1.100")
// @NotEmpty(message = "Modbus 服务器 IP 地址不能为空")
// TODO @AI这个字段要根据情况校验
private String ip;
@Schema(description = "Modbus 端口", example = "502")
// @NotNull(message = "Modbus 端口不能为空")
// TODO @AI这个字段要根据情况校验
private Integer port;
@Schema(description = "从站地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")

View File

@@ -29,7 +29,12 @@ public class IotDeviceModbusPointDO extends TenantBaseDO {
*/
@TableId
private Long id;
// TODO @AI增加 productId
/**
* 产品编号
*
* 关联 {@link cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO#getId()}
*/
private Long productId;
/**
* 设备编号
*
@@ -42,7 +47,6 @@ public class IotDeviceModbusPointDO extends TenantBaseDO {
* 关联 {@link IotThingModelDO#getId()}
*/
private Long thingModelId;
// TODO @AI每次物模型的变更时需要按需刷下 identifier、name 配置;
/**
* 属性标识符
*

View File

@@ -39,4 +39,9 @@ public interface IotDeviceModbusPointMapper extends BaseMapperX<IotDeviceModbusP
IotDeviceModbusPointDO::getIdentifier, identifier);
}
default void updateByThingModelId(Long thingModelId, IotDeviceModbusPointDO updateObj) {
update(updateObj, new LambdaQueryWrapperX<IotDeviceModbusPointDO>()
.eq(IotDeviceModbusPointDO::getThingModelId, thingModelId));
}
}

View File

@@ -19,9 +19,9 @@ public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
TCP(2, "TCP"),
WEBSOCKET(3, "WebSocket"),
MQTT(10, "MQTT"), // TODO 待实现;
MQTT(10, "MQTT"), // TODO @puhui999待实现;
DATABASE(20, "Database"), // TODO @puhui999待实现可以简单点,对应的表名是什么,字段先固定了。
DATABASE(20, "Database"), // TODO @puhui999待实现
REDIS(21, "Redis"),
ROCKETMQ(30, "RocketMQ"),

View File

@@ -67,7 +67,6 @@ public class IotDeviceMessageSubscriber implements IotMessageSubscriber<IotDevic
IotDeviceDO device = deviceService.validateDeviceExistsFromCache(message.getDeviceId());
devicePropertyService.updateDeviceReportTimeAsync(device.getId(), LocalDateTime.now());
// 1.2 更新设备的连接 server
// TODO 芋艿HTTP 网关的上行消息,不应该更新 serverId会覆盖掉 MQTT 等长连接的 serverId导致下行消息无法发送。
devicePropertyService.updateDeviceServerIdAsync(device.getId(), message.getServerId());
// 2. 未上线的设备,强制上线

View File

@@ -9,7 +9,6 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
// TODO @puhui999后面重构哈
/**
* 针对 {@link IotDeviceMessage} 的消费者,处理规则场景
*

View File

@@ -1,10 +1,16 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceModbusConfigMapper;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@@ -25,11 +31,17 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe
@Resource
private IotDeviceService deviceService;
@Resource
private IotProductService productService;
@Override
public void saveDeviceModbusConfig(IotDeviceModbusConfigSaveReqVO saveReqVO) {
// 1. 校验设备存在
deviceService.validateDeviceExists(saveReqVO.getDeviceId());
// 1.1 校验设备存在
IotDeviceDO device = deviceService.validateDeviceExists(saveReqVO.getDeviceId());
// 1.2 根据产品 protocolType 条件校验
IotProductDO product = productService.getProduct(device.getProductId());
Assert.notNull(product, "产品不存在");
validateModbusConfigByProtocolType(saveReqVO, product.getProtocolType());
// 2. 根据数据库中是否已有配置,决定是新增还是更新
IotDeviceModbusConfigDO existConfig = modbusConfigMapper.selectByDeviceId(saveReqVO.getDeviceId());
@@ -58,4 +70,21 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe
return modbusConfigMapper.selectList(listReqDTO);
}
// TODO @AI还是希望在 IotDeviceModbusConfigSaveReqVO 里,通过 validator 校验?!或者通过 group 来处理?
private void validateModbusConfigByProtocolType(IotDeviceModbusConfigSaveReqVO saveReqVO, String protocolType) {
IotProtocolTypeEnum protocolTypeEnum = IotProtocolTypeEnum.of(protocolType);
if (protocolTypeEnum == null) {
return;
}
if (protocolTypeEnum == IotProtocolTypeEnum.MODBUS_TCP_MASTER) {
Assert.isTrue(StrUtil.isNotEmpty(saveReqVO.getIp()), "Master 模式下IP 地址不能为空");
Assert.notNull(saveReqVO.getPort(), "Master 模式下,端口不能为空");
Assert.notNull(saveReqVO.getTimeout(), "Master 模式下,连接超时时间不能为空");
Assert.notNull(saveReqVO.getRetryInterval(), "Master 模式下,重试间隔不能为空");
} else if (protocolTypeEnum == IotProtocolTypeEnum.MODBUS_TCP_SLAVE) {
Assert.notNull(saveReqVO.getMode(), "Slave 模式下,工作模式不能为空");
Assert.notNull(saveReqVO.getFrameFormat(), "Slave 模式下,数据帧格式不能为空");
}
}
}

View File

@@ -55,6 +55,15 @@ public interface IotDeviceModbusPointService {
*/
PageResult<IotDeviceModbusPointDO> getDeviceModbusPointPage(IotDeviceModbusPointPageReqVO pageReqVO);
/**
* 物模型变更时更新关联点位的冗余字段identifier、name
*
* @param thingModelId 物模型编号
* @param identifier 物模型标识符
* @param name 物模型名称
*/
void updateDeviceModbusPointByThingModel(Long thingModelId, String identifier, String name);
/**
* 根据设备编号批量获得启用的点位配置 Map
*

View File

@@ -75,7 +75,12 @@ public class IotDeviceModbusPointServiceImpl implements IotDeviceModbusPointServ
modbusPointMapper.updateById(updateObj);
}
// TODO @AI物模型更新的时候更新下 identifier、name 信息;例如说 updateDeviceModbusPoint(thingModelId, identifier、name) 方法;
@Override
public void updateDeviceModbusPointByThingModel(Long thingModelId, String identifier, String name) {
IotDeviceModbusPointDO updateObj = new IotDeviceModbusPointDO()
.setIdentifier(identifier).setName(name);
modbusPointMapper.updateByThingModelId(thingModelId, updateObj);
}
private IotThingModelDO validateThingModelExists(Long id) {
IotThingModelDO thingModel = thingModelService.getThingModel(id);

View File

@@ -14,8 +14,6 @@ import java.time.Duration;
// TODO @芋艿:数据库
// TODO @芋艿mqtt
// TODO @芋艿tcp
// TODO @芋艿websocket
/**
* 可缓存的 {@link IotDataRuleAction} 抽象实现

View File

@@ -15,6 +15,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.dal.mysql.thingmodel.IotThingModelMapper;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import cn.iocoder.yudao.module.iot.enums.product.IotProductStatusEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceModbusPointService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -51,6 +52,9 @@ public class IotThingModelServiceImpl implements IotThingModelService {
@Lazy // 延迟加载,解决循环依赖
private IotProductService productService;
@Resource
private IotDeviceModbusPointService deviceModbusPointService;
@Override
@Transactional(rollbackFor = Exception.class)
public Long createThingModel(IotThingModelSaveReqVO createReqVO) {
@@ -84,7 +88,11 @@ public class IotThingModelServiceImpl implements IotThingModelService {
IotThingModelDO thingModel = IotThingModelConvert.INSTANCE.convert(updateReqVO);
thingModelMapper.updateById(thingModel);
// 3. 删除缓存
// 3. 同步更新 Modbus 点位的冗余字段identifier、name
deviceModbusPointService.updateDeviceModbusPointByThingModel(
updateReqVO.getId(), updateReqVO.getIdentifier(), updateReqVO.getName());
// 4. 删除缓存
deleteThingModelListCache(updateReqVO.getProductId());
}

View File

@@ -500,6 +500,27 @@ public class IotModbusCommonUtils {
return values;
}
/**
* 从响应帧中提取 registerCount通过 PDU 的 byteCount 推断)
*
* @param frame 解码后的 Modbus 响应帧
* @return registerCount无法提取时返回 -1匹配时跳过校验
*/
public static int extractRegisterCountFromResponse(IotModbusFrame frame) {
byte[] pdu = frame.getPdu();
if (pdu == null || pdu.length < 1) {
return -1;
}
int byteCount = pdu[0] & 0xFF;
int fc = frame.getFunctionCode();
// FC03/04 寄存器读响应registerCount = byteCount / 2
if (fc == FC_READ_HOLDING_REGISTERS || fc == FC_READ_INPUT_REGISTERS) {
return byteCount / 2;
}
// FC01/02 线圈/离散输入读响应:按 bit 打包有余位,无法精确反推,返回 -1 跳过校验
return -1;
}
// ==================== 点位查找 ====================
/**

View File

@@ -84,7 +84,6 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.configCacheService = new IotModbusTcpMasterConfigCacheService(deviceApi);
// DONE @AI上线/下线消息已移到 ConnectionManager 内部处理,不再走回调
this.connectionManager = new IotModbusTcpMasterConnectionManager(redissonClient, vertx,
messageService, configCacheService, serverId);

View File

@@ -79,8 +79,6 @@ public class IotModbusTcpMasterConfigCacheService {
/**
* 计算已删除设备的 ID 集合,清理缓存,并更新已知设备 ID 集合
*
* DONE @AI不再使用 callback 模式,返回已删除的设备 ID 集合,由调用方直接清理
*
* @param currentConfigs 当前有效的配置列表
* @return 已删除的设备 ID 集合
*/

View File

@@ -96,18 +96,19 @@ public class IotModbusTcpMasterConnectionManager {
connection = connectionPool.get(connectionKey);
if (connection != null) {
addDeviceAndOnline(connection, config);
lock.unlock();
return;
}
// 3.2 创建新连接
connection = createConnection(config);
connection.setLock(lock);
connectionPool.put(connectionKey, connection);
log.info("[ensureConnection][创建 Modbus 连接成功: {}]", connectionKey);
// 3.3 注册设备并发送上线消息
addDeviceAndOnline(connection, config);
} catch (Exception e) {
log.error("[ensureConnection][创建 Modbus 连接失败: {}]", connectionKey, e);
} finally {
// TODO @AI如果这里释放会不会出现集群模式下多个节点同时创建连接的情况需要验证一下 Redisson 的分布式锁特性?!
// 建连失败,释放锁让其他节点可重试
lock.unlock();
}
}
@@ -231,11 +232,11 @@ public class IotModbusTcpMasterConnectionManager {
if (connection.getTcpConnection() != null) {
connection.getTcpConnection().close();
}
// 强制解锁,避免死锁(正常情况下应该不会发生锁未释放的情况)
// RLock lock = connection.getLock();
// if (lock != null && lock.isLocked()) {
// lock.forceUnlock();
// }
// 释放分布式锁,让其他节点可接管
RLock lock = connection.getLock();
if (lock != null && lock.isHeldByCurrentThread()) {
lock.unlock();
}
log.info("[closeConnection][关闭 Modbus 连接: {}]", connectionKey);
} catch (Exception e) {
log.error("[closeConnection][关闭连接失败: {}]", connectionKey, e);
@@ -266,17 +267,21 @@ public class IotModbusTcpMasterConnectionManager {
private TCPMasterConnection tcpConnection;
private Integer timeout;
private Integer retryInterval;
private Context context;
// TODO @AI是不是需要 lock避免集群模式下的竞争肯定不能让别的节点连接上【另外RLock 在节点(持有所锁的节点) cransh 的时候,会自动释放】
// private RLock lock;
/**
* 设备 ID 到 slave ID 的映射
*/
private final Map<Long, Integer> deviceSlaveMap = new ConcurrentHashMap<>();
/**
* 分布式锁,锁住连接的创建和销毁,避免多节点重复连接同一从站
*/
private RLock lock;
/**
* Vert.x Context用于 executeBlocking 执行 Modbus 操作,保证同一连接的操作串行执行
*/
private Context context;
public void addDevice(Long deviceId, Integer slaveId) {
deviceSlaveMap.put(deviceId, slaveId);
}

View File

@@ -41,10 +41,4 @@ public class IotModbusTcpSlaveConfig {
@NotNull(message = "请求清理间隔不能为空")
private Integer requestCleanupInterval = 10000;
// TODO @AI可以去掉这个开关因为本身就是模拟的稍后我自己也会手动或者让你去掉听我指令
/**
* 是否启用 Mock 测试数据(仅开发/测试环境使用,线上务必关闭)
*/
private Boolean mockEnabled = false;
}

View File

@@ -29,21 +29,36 @@ import java.util.function.BiConsumer;
@Slf4j
public class IotModbusFrameDecoder {
private static final Boolean REQUEST_MODE_DEFAULT = false;
/**
* 自定义功能码
*/
private final int customFunctionCode;
/**
* 创建带自动帧格式检测的 RecordParser
* 创建带自动帧格式检测的 RecordParser(默认响应模式)
*
* @param frameHandler 完整帧回调(解码后的 IotModbusFrame + 检测到的帧格式)
* @return RecordParser 实例
*/
public RecordParser createRecordParser(BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler) {
return createRecordParser(frameHandler, REQUEST_MODE_DEFAULT);
}
/**
* 创建带自动帧格式检测的 RecordParser
*
* @param frameHandler 完整帧回调(解码后的 IotModbusFrame + 检测到的帧格式)
* @param requestMode 是否为请求模式true接收方收到的是 Modbus 请求帧FC01-04 按固定 8 字节解析;
* false接收方收到的是 Modbus 响应帧FC01-04 按 byteCount 变长解析)
* @return RecordParser 实例
*/
public RecordParser createRecordParser(BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler,
boolean requestMode) {
// 先创建一个 RecordParser使用 fixedSizeMode(6) 读取首帧前 6 字节进行帧格式检测
RecordParser parser = RecordParser.newFixed(6);
parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler));
parser.handler(new DetectPhaseHandler(parser, customFunctionCode, frameHandler, requestMode));
return parser;
}
@@ -150,6 +165,7 @@ public class IotModbusFrameDecoder {
private final RecordParser parser;
private final int customFunctionCode;
private final BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler;
private final boolean requestMode;
@Override
public void handle(Buffer buffer) {
@@ -169,7 +185,7 @@ public class IotModbusFrameDecoder {
} else {
// MODBUS_RTU切换到 RTU 拆包 Handler
log.debug("[DetectPhaseHandler][检测到 MODBUS_RTU 帧格式]");
RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, frameHandler, customFunctionCode);
RtuFrameHandler rtuHandler = new RtuFrameHandler(parser, frameHandler, customFunctionCode, requestMode);
parser.handler(rtuHandler);
// 当前 bytes 包含前 6 字节slaveId + FC + 部分数据),交给 rtuHandler 处理
rtuHandler.handleFirstBytes(bytes);
@@ -248,6 +264,9 @@ public class IotModbusFrameDecoder {
* - 自定义 FC / FC01-04 响应fixedSizeMode(1) → 读 byteCount → fixedSizeMode(byteCount + 2)
* - FC05/06 响应fixedSizeMode(6) → addr(2) + value(2) + CRC(2)
* - FC15/16 响应fixedSizeMode(6) → addr(2) + quantity(2) + CRC(2)
* <p>
* 请求模式requestMode=trueFC01-04 按固定 8 字节解析(与写响应相同路径),
* 因为读请求格式为 [SlaveId(1)][FC(1)][StartAddr(2)][Quantity(2)][CRC(2)]
*/
@RequiredArgsConstructor
private class RtuFrameHandler implements Handler<Buffer> {
@@ -261,6 +280,12 @@ public class IotModbusFrameDecoder {
private final RecordParser parser;
private final BiConsumer<IotModbusFrame, IotModbusFrameFormatEnum> frameHandler;
private final int customFunctionCode;
/**
* 请求模式:
* - true 表示接收方收到的是 Modbus 请求帧如设备端收到网关下发的读请求FC01-04 按固定 8 字节帧解析
* - false 表示接收方收到的是 Modbus 响应帧FC01-04 按 byteCount 变长解析
*/
private final boolean requestMode;
private int state = STATE_HEADER;
private byte slaveId;
@@ -289,6 +314,13 @@ public class IotModbusFrameDecoder {
frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC
emitFrame(frame);
resetToHeader();
} else if (IotModbusCommonUtils.isReadResponse(fc) && requestMode) {
// 请求模式下的读请求:固定 8 字节 [SlaveId(1)][FC(1)][StartAddr(2)][Quantity(2)][CRC(2)]
// 已有 6 字节,还需 2 字节CRC
state = STATE_WRITE_BODY;
this.pendingData = Buffer.buffer();
this.pendingData.appendBytes(bytes, 2, 4); // 暂存已有的 4 字节StartAddr + Quantity
parser.fixedSizeMode(2); // 还需 2 字节CRC
} else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FCbytes[2] = byteCount
this.byteCount = bytes[2];
@@ -359,6 +391,11 @@ public class IotModbusFrameDecoder {
// 异常响应
state = STATE_EXCEPTION_BODY;
parser.fixedSizeMode(3); // exceptionCode(1) + CRC(2)
} else if (IotModbusCommonUtils.isReadResponse(fc) && requestMode) {
// 请求模式下的读请求:固定 8 字节,已读 2 字节slaveId + FC还需 6 字节
state = STATE_WRITE_BODY;
pendingData = Buffer.buffer();
parser.fixedSizeMode(6); // StartAddr(2) + Quantity(2) + CRC(2)
} else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FC
state = STATE_READ_BYTE_COUNT;

View File

@@ -46,9 +46,6 @@ public class IotModbusFrameEncoder {
/**
* 编码写请求(单个寄存器 FC06 / 单个线圈 FC05
*
* DONE @AI【from codex】【高】FC05 写线圈时value 已转换为 Modbus 标准值非0 → 0xFF000 → 0x0000
* 新增 encodeWriteMultipleCoilsRequest 方法用于 FC15 编码(按 bit 打包)。
*
* @param slaveId 从站地址
* @param functionCode 功能码
* @param address 寄存器地址

View File

@@ -109,7 +109,6 @@ public class IotModbusTcpSlaveDownstreamHandler {
// 1.2 确定帧格式和事务 ID
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
Assert.notNull(frameFormat, "连接帧格式不能为空");
// TODO @AI【from 芋艿】需要和按照 deviceId 进行自增么???
Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP
? (transactionIdCounter.incrementAndGet() & 0xFFFF)
: null;

View File

@@ -6,15 +6,15 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -33,8 +33,6 @@ public class IotModbusTcpSlaveConfigCacheService {
*/
private final Map<Long, IotModbusDeviceConfigRespDTO> configCache = new ConcurrentHashMap<>();
// ==================== 按需加载(认证时) ====================
/**
* 加载单个设备的配置(认证成功后调用)
*
@@ -66,8 +64,6 @@ public class IotModbusTcpSlaveConfigCacheService {
}
}
// ==================== 定时刷新(已连接设备) ====================
/**
* 刷新已连接设备的配置缓存
* <p>
@@ -89,31 +85,17 @@ public class IotModbusTcpSlaveConfigCacheService {
.setDeviceIds(connectedDeviceIds));
List<IotModbusDeviceConfigRespDTO> modbusConfigs = result.getCheckedData();
// 2. 追加 Mock 测试数据(仅 mockEnabled=true 时)
// TODO @芋艿:测试完成后移除
// TODO @claude-code【严重】同上if(true) 导致 mockEnabled 开关失效Mock 数据永远加载
if (true) {
modbusConfigs.addAll(buildMockConfigs());
}
// 2. 只保留已连接设备的配置,更新缓存
// TODO @AI是不是直接添加到 configCache 缓存(或者覆盖),然后返回 modbusConfigs 就 ok 了?!
List<IotModbusDeviceConfigRespDTO> connectedConfigs = new ArrayList<>();
// 2. 更新缓存并返回
for (IotModbusDeviceConfigRespDTO config : modbusConfigs) {
if (connectedDeviceIds.contains(config.getDeviceId())) {
configCache.put(config.getDeviceId(), config);
connectedConfigs.add(config);
}
configCache.put(config.getDeviceId(), config);
}
return connectedConfigs;
return modbusConfigs;
} catch (Exception e) {
log.error("[refreshConnectedDeviceConfigList][刷新配置失败]", e);
return null;
}
}
// ==================== 缓存操作 ====================
/**
* 获取设备配置
*/
@@ -133,59 +115,4 @@ public class IotModbusTcpSlaveConfigCacheService {
configCache.remove(deviceId);
}
// ==================== Mock 数据 ====================
/**
* 构建 Mock 测试配置数据(一次性测试用途)
*
* 设备PRODUCT_KEY=4aymZgOTOOCrDKRT, DEVICE_NAME=small
* 点位temperatureFC03, 地址 0、humidityFC03, 地址 1
*
* TODO @芋艿:测试完成后移除
*/
private List<IotModbusDeviceConfigRespDTO> buildMockConfigs() {
IotModbusDeviceConfigRespDTO config = new IotModbusDeviceConfigRespDTO();
config.setDeviceId(25L);
config.setProductKey("4aymZgOTOOCrDKRT");
config.setDeviceName("small");
config.setSlaveId(1);
config.setMode(1); // 云端轮询
config.setFrameFormat(IotModbusFrameFormatEnum.MODBUS_TCP.getFormat());
// 点位列表
List<IotModbusPointRespDTO> points = new ArrayList<>();
// 点位 1温度 - 保持寄存器 FC03, 地址 0, 1 个寄存器, INT16
IotModbusPointRespDTO point1 = new IotModbusPointRespDTO();
point1.setId(1L);
point1.setIdentifier("temperature");
point1.setName("温度");
point1.setFunctionCode(3); // FC03 读保持寄存器
point1.setRegisterAddress(0);
point1.setRegisterCount(1);
point1.setRawDataType("INT16");
point1.setByteOrder("BIG_ENDIAN");
point1.setScale(new BigDecimal("0.1"));
point1.setPollInterval(5000); // 5 秒轮询一次
points.add(point1);
// 点位 2湿度 - 保持寄存器 FC03, 地址 1, 1 个寄存器, UINT16
IotModbusPointRespDTO point2 = new IotModbusPointRespDTO();
point2.setId(2L);
point2.setIdentifier("humidity");
point2.setName("湿度");
point2.setFunctionCode(3); // FC03 读保持寄存器
point2.setRegisterAddress(1);
point2.setRegisterCount(1);
point2.setRawDataType("UINT16");
point2.setByteOrder("BIG_ENDIAN");
point2.setScale(new BigDecimal("0.1"));
point2.setPollInterval(5000); // 5 秒轮询一次
points.add(point2);
config.setPoints(points);
log.info("[buildMockConfigs][已加载 Mock 配置, deviceId={}, points={}]", config.getDeviceId(), points.size());
return Collections.singletonList(config);
}
}

View File

@@ -78,7 +78,7 @@ public class IotModbusTcpSlavePendingRequestManager {
return matchByTransactionId(queue, frame.getTransactionId());
}
// RTU 模式FIFO匹配 slaveId + functionCode + registerCount
int responseRegisterCount = extractRegisterCountFromResponse(frame);
int responseRegisterCount = IotModbusCommonUtils.extractRegisterCountFromResponse(frame);
return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode(), responseRegisterCount);
}
@@ -115,29 +115,6 @@ public class IotModbusTcpSlavePendingRequestManager {
return null;
}
// TODO @AI是不是放到 modbus 工具类里,更合适?
/**
* 从响应帧中提取 registerCount通过 PDU 的 byteCount 推断)
*
* @return registerCount无法提取时返回 -1匹配时跳过校验
*/
private int extractRegisterCountFromResponse(IotModbusFrame frame) {
byte[] pdu = frame.getPdu();
if (pdu == null || pdu.length < 1) {
return -1;
}
int byteCount = pdu[0] & 0xFF;
int fc = frame.getFunctionCode();
// FC03/04 寄存器读响应registerCount = byteCount / 2
if (fc == IotModbusCommonUtils.FC_READ_HOLDING_REGISTERS
|| fc == IotModbusCommonUtils.FC_READ_INPUT_REGISTERS) {
return byteCount / 2;
}
// FC01/02 线圈/离散输入读响应registerCount = byteCount * 8线圈数量
// 但因为按 bit 打包有余位,无法精确反推,返回 -1 跳过校验
return -1;
}
/**
* 清理过期请求
*/

View File

@@ -6,6 +6,7 @@ import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
@@ -58,6 +59,11 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
Assert.notNull(connectionInfo, "无法获取连接信息");
Assert.equals(productKey, connectionInfo.getProductKey(), "产品 Key 不匹配");
Assert.equals(deviceName, connectionInfo.getDeviceName(), "设备名称不匹配");
// 1.4 校验 topic 是否允许发布
if (!IotMqttTopicUtils.isTopicPublishAllowed(topic, productKey, deviceName)) {
log.warn("[handleBusinessRequest][topic 不允许发布,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
// 2. 反序列化消息
message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName);

View File

@@ -91,7 +91,6 @@ public class IotUdpUpstreamHandler {
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
}
// TODO done @AIvertx 有 udp 的实现么?当前已使用 Vert.x DatagramSocket 实现
/**
* 处理 UDP 数据包
*

View File

@@ -101,8 +101,6 @@ public final class IotMqttTopicUtils {
* @param deviceName 设备名称
* @return 是否允许发布
*/
// TODO DONE @AI这个逻辑是不是 mqtt 协议也要使用是通用工具方法MQTT 协议可按需调用;
// TODO @AI那你改下 mqtt也调用
public static boolean isTopicPublishAllowed(String topic, String productKey, String deviceName) {
if (!StrUtil.isAllNotBlank(topic, productKey, deviceName)) {
return false;

View File

@@ -3,21 +3,49 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster;
import com.ghgande.j2mod.modbus.procimg.*;
import com.ghgande.j2mod.modbus.slave.ModbusSlave;
import com.ghgande.j2mod.modbus.slave.ModbusSlaveFactory;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Modbus TCP 从站模拟器
* Modbus TCP 从站模拟器手动测试
*
* 用于测试 Modbus TCP 网关连接和数据读写功能
* <p>测试场景模拟一个标准 Modbus TCP 从站设备 Modbus TCP Master 网关连接和读写数据
*
* <p>使用步骤
* <ol>
* <li>运行 {@link #testStartSlaveSimulator()} 启动模拟从站默认端口 5020从站地址 1</li>
* <li>启动 yudao-module-iot-gateway 服务需开启 modbus-tcp-master 协议</li>
* <li>确保数据库有对应的 Modbus Master 设备配置ip=127.0.0.1, port=5020, slaveId=1</li>
* <li>网关会自动连接模拟从站并开始轮询读取寄存器数据</li>
* <li>模拟器每 5 秒自动更新输入寄存器和保持寄存器的值模拟传感器数据变化</li>
* </ol>
*
* <p>可用寄存器
* <ul>
* <li>线圈 (Coil, 功能码 01/05): 地址 0-9交替 true/false</li>
* <li>离散输入 (Discrete Input, 功能码 02): 地址 0-9 3 个一个 true</li>
* <li>保持寄存器 (Holding Register, 功能码 03/06/16): 地址 0-19初始值 0,100,200,...</li>
* <li>输入寄存器 (Input Register, 功能码 04): 地址 0-19初始值 1,11,21,...</li>
* </ul>
*
* @author 芋道源码
*/
public class ModbusTcpSlaveSimulatorTest {
@Slf4j
@Disabled
public class IoTModbusTcpMasterIntegrationTest {
private static final int PORT = 5020;
private static final int SLAVE_ID = 1;
@SuppressWarnings({"InfiniteLoopStatement", "BusyWait", "CommentedOutCode"})
public static void main(String[] args) throws Exception {
/**
* 启动 Modbus TCP 从站模拟器
*
* <p>模拟器会持续运行 5 秒更新一次寄存器数据直到手动停止
*/
@SuppressWarnings({"InfiniteLoopStatement", "BusyWait"})
@Test
public void testStartSlaveSimulator() throws Exception {
// 1. 创建进程映像Process Image存储寄存器数据
SimpleProcessImage spi = new SimpleProcessImage(SLAVE_ID);
@@ -53,24 +81,15 @@ public class ModbusTcpSlaveSimulatorTest {
// 3.2 启动从站
slave.open();
System.out.println("===================================================");
System.out.println("Modbus TCP 从站模拟器已启动");
System.out.println("端口: " + PORT);
System.out.println("从站地址 (Slave ID): " + SLAVE_ID);
System.out.println("===================================================");
System.out.println("可用寄存器:");
System.out.println(" - 线圈 (Coil, 功能码 01/05): 地址 0-9");
System.out.println(" - 离散输入 (Discrete Input, 功能码 02): 地址 0-9");
System.out.println(" - 保持寄存器 (Holding Register, 功能码 03/06/16): 地址 0-19");
System.out.println(" - 输入寄存器 (Input Register, 功能码 04): 地址 0-19");
System.out.println("===================================================");
System.out.println("按 Ctrl+C 停止模拟器");
log.info("[testStartSlaveSimulator][Modbus TCP 从站模拟器已启动, 端口: {}, 从站地址: {}]", PORT, SLAVE_ID);
log.info("[testStartSlaveSimulator][可用寄存器: 线圈(01/05) 0-9, 离散输入(02) 0-9, " +
"保持寄存器(03/06/16) 0-19, 输入寄存器(04) 0-19]");
// 4. 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("\n正在关闭模拟器...");
log.info("[testStartSlaveSimulator][正在关闭模拟器...]");
slave.close();
System.out.println("模拟器已关闭");
log.info("[testStartSlaveSimulator][模拟器已关闭]");
}));
// 5. 保持运行定时更新输入寄存器模拟数据变化
@@ -87,9 +106,8 @@ public class ModbusTcpSlaveSimulatorTest {
// 更新保持寄存器的第一个值
spi.getRegister(0).setValue(counter * 100);
// System.out.println("[" + java.time.LocalTime.now() + "] 数据已更新, counter=" + counter
// + ", 保持寄存器[0]=" + (counter * 100)
// + ", 输入寄存器[0]=" + (1 + counter));
log.info("[testStartSlaveSimulator][数据已更新, counter={}, 保持寄存器[0]={}, 输入寄存器[0]={}]",
counter, counter * 100, 1 + counter);
}
}

View File

@@ -1,14 +1,15 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave;
import cn.hutool.core.util.HexUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
@@ -24,6 +25,8 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* IoT Modbus TCP Slave 协议集成测试 MODBUS_RTU 帧格式手动测试
*
@@ -46,7 +49,7 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
@Disabled
public class IotModbusTcpSlaveModbusRtuIntegrationTest {
public class IotModbusTcpSlaveRtuIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 503;
@@ -65,9 +68,9 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
// ===================== 设备信息根据实际情况修改 iot_device 表查询 =====================
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3";
private static final String PRODUCT_KEY = "modbus_tcp_slave_product_demo";
private static final String DEVICE_NAME = "modbus_tcp_slave_device_demo_rtu";
private static final String DEVICE_SECRET = "af01c55eb8e3424bb23fc6c783936b2e";
@BeforeAll
static void setUp() {
@@ -104,6 +107,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
log.info("[testAuth][认证响应帧: slaveId={}, FC={}, customData={}]",
response.getSlaveId(), response.getFunctionCode(), response.getCustomData());
JSONObject json = JSONUtil.parseObj(response.getCustomData());
assertEquals(0, json.getInt("code"));
log.info("[testAuth][认证结果: code={}, message={}]", json.getInt("code"), json.getStr("message"));
} finally {
socket.close();
@@ -122,10 +126,13 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
// 1. 先认证
IotModbusFrame authResponse = authenticate(socket);
log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData());
JSONObject authJson = JSONUtil.parseObj(authResponse.getCustomData());
assertEquals(0, authJson.getInt("code"));
// 2. 设置持续监听每收到一个读请求自动回复
log.info("[testPollingResponse][开始持续监听网关下发的读请求...]");
CompletableFuture<Void> done = new CompletableFuture<>();
// 注意使用 requestMode=true因为设备端收到的是网关下发的读请求非响应
RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> {
log.info("[testPollingResponse][收到请求: slaveId={}, FC={}]",
frame.getSlaveId(), frame.getFunctionCode());
@@ -144,7 +151,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
frame.getFunctionCode(), registerValues);
socket.write(Buffer.buffer(responseData));
log.info("[testPollingResponse][已发送读响应, registerValues={}]", registerValues);
});
}, true);
socket.handler(parser);
// 3. 持续等待200 期间会自动回复所有收到的读请求
@@ -174,7 +181,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
IotModbusFrame writeRequest = waitForRequest(socket);
log.info("[testPropertySetWrite][收到写请求: slaveId={}, FC={}, pdu={}]",
writeRequest.getSlaveId(), writeRequest.getFunctionCode(),
bytesToHex(writeRequest.getPdu()));
HexUtil.encodeHexStr(writeRequest.getPdu()));
} finally {
socket.close();
}
@@ -198,6 +205,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
*/
private IotModbusFrame authenticate(NetSocket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
authInfo.setClientId("");
byte[] authFrame = buildAuthFrame(authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
return sendAndReceive(socket, authFrame);
}
@@ -291,18 +299,4 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
return frame;
}
/**
* 字节数组转十六进制字符串
*/
private static String bytesToHex(byte[] bytes) {
if (bytes == null) {
return "null";
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
}

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave;
import cn.hutool.core.util.HexUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
@@ -25,7 +26,8 @@ import java.nio.ByteOrder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
// TODO @芋艿晚点改单测需要简化
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* IoT Modbus TCP Slave 协议集成测试 MODBUS_TCP 帧格式手动测试
*
@@ -48,7 +50,7 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
@Disabled
public class IotModbusTcpSlaveModbusTcpIntegrationTest {
public class IotModbusTcpSlaveTcpIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 503;
@@ -106,6 +108,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
log.info("[testAuth][认证响应帧: slaveId={}, FC={}, customData={}]",
response.getSlaveId(), response.getFunctionCode(), response.getCustomData());
JSONObject json = JSONUtil.parseObj(response.getCustomData());
assertEquals(0, json.getInt("code"));
log.info("[testAuth][认证结果: code={}, message={}]", json.getInt("code"), json.getStr("message"));
} finally {
socket.close();
@@ -124,7 +127,8 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
// 1. 先认证
IotModbusFrame authResponse = authenticate(socket);
log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData());
// TODO @AI这里断言下认证必须成功
JSONObject authJson = JSONUtil.parseObj(authResponse.getCustomData());
assertEquals(0, authJson.getInt("code"));
// 2. 设置持续监听每收到一个读请求自动回复
log.info("[testPollingResponse][开始持续监听网关下发的读请求...]");
@@ -176,7 +180,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
IotModbusFrame writeRequest = waitForRequest(socket);
log.info("[testPropertySetWrite][收到写请求: slaveId={}, FC={}, transactionId={}, pdu={}]",
writeRequest.getSlaveId(), writeRequest.getFunctionCode(),
writeRequest.getTransactionId(), bytesToHex(writeRequest.getPdu()));
writeRequest.getTransactionId(), HexUtil.encodeHexStr(writeRequest.getPdu()));
} finally {
socket.close();
}
@@ -295,18 +299,4 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
return buf.array();
}
/**
* 字节数组转十六进制字符串
*/
private static String bytesToHex(byte[] bytes) {
if (bytes == null) {
return "null";
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
}