From 4d578b239c8a1a7f3f2cc7c16f094787bac4169d Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 8 Feb 2026 10:13:11 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9Amodbus-tcp-sl?= =?UTF-8?q?ave=20=E6=95=B4=E4=BD=93=E4=BB=A3=E7=A0=81=E8=BF=9B=E4=B8=80?= =?UTF-8?q?=E6=AD=A5=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tcpslave/IotModbusTcpSlaveProtocol.java | 43 ++-- .../IotModbusTcpSlaveUpstreamHandler.java | 1 - .../IotModbusTcpSlaveConfigCacheService.java | 187 ++++++++++-------- .../IotModbusTcpSlaveConnectionManager.java | 31 +-- ...otModbusTcpSlavePendingRequestManager.java | 28 ++- .../IotModbusTcpSlavePollScheduler.java | 17 +- 6 files changed, 157 insertions(+), 150 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java index ca317ec3fe..7b73b84ba2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java @@ -1,10 +1,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; @@ -32,9 +32,9 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; -// DONE @AI:不用主动上报! /** * IoT 网关 Modbus TCP Slave 协议 *

@@ -152,10 +152,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { } try { - // 1.1 首次加载配置 - // TODO @AI:可能首次不用加载;你在想想; - refreshConfig(); - // 1.2 启动配置刷新定时器 + // 1. 启动配置刷新定时器 int refreshInterval = slaveConfig.getConfigRefreshInterval(); configRefreshTimerId = vertx.setPeriodic( TimeUnit.SECONDS.toMillis(refreshInterval), @@ -286,6 +283,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { } pollScheduler.stopPolling(info.getDeviceId()); pendingRequestManager.removeDevice(info.getDeviceId()); + configCacheService.removeConfig(info.getDeviceId()); log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]", info.getDeviceId(), socket.remoteAddress()); }); @@ -297,35 +295,30 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { } /** - * 刷新配置 + * 刷新已连接设备的配置(定时调用) + *

+ * 与 tcpmaster 不同,slave 只刷新已连接设备的配置,不做全量 diff。 + * 设备的新增(认证时)和删除(断连时)分别在 {@link #handleConnection} 中处理。 */ private synchronized void refreshConfig() { try { - // 1. 从 biz 拉取最新配置 - List configs = configCacheService.refreshConfig(); - log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size()); + // 1. 只刷新已连接设备的配置 + Set connectedDeviceIds = connectionManager.getConnectedDeviceIds(); + if (CollUtil.isEmpty(connectedDeviceIds)) { + return; + } + List configs = + configCacheService.refreshConnectedDeviceConfigList(connectedDeviceIds); + log.debug("[refreshConfig][刷新了 {} 个已连接设备的配置]", configs.size()); - // 2. 更新已连接设备的轮询任务(仅 mode=1) + // 2. 更新已连接设备的轮询任务 for (IotModbusDeviceConfigRespDTO config : configs) { try { - if (config.getMode() != null - && config.getMode().equals(IotModbusModeEnum.POLLING.getMode())) { - // 只有已连接的设备才启动轮询 - ConnectionInfo connInfo = connectionManager.getConnectionInfoByDeviceId(config.getDeviceId()); - if (connInfo != null) { - pollScheduler.updatePolling(config); - } - } + pollScheduler.updatePolling(config); } catch (Exception e) { log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e); } } - - // 3. 清理已删除设备的资源 - configCacheService.cleanupRemovedDevices(configs, deviceId -> { - pollScheduler.stopPolling(deviceId); - pendingRequestManager.removeDevice(deviceId); - }); } catch (Exception e) { log.error("[refreshConfig][刷新配置失败]", e); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java index 69aebe98d3..805c65fd0f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java @@ -16,7 +16,6 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; -import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java index 16f1e47854..c8d68153a8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager; +import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; @@ -10,15 +11,18 @@ import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; -import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; - -// TODO @AI:和 IotModbusTcpConfigCacheService 基本一致?! /** * IoT Modbus TCP Slave 配置缓存服务 *

- * 负责:从 biz 拉取 Modbus 设备配置,缓存配置数据,并检测配置变更 + * 与 tcpmaster 的 {@code IotModbusTcpConfigCacheService} 不同: + * - tcpmaster 启动时拉全量配置 → 主动建连,需要全量 diff 检测新增/删除设备 + * - tcpslave 设备主动连接 → 认证时按需加载配置,断连时清理,定时刷新已连接设备的配置 + *

+ * 配置生命周期: + * 1. 认证时:{@link #loadDeviceConfig(Long)} 按 deviceId 从 API 加载配置到缓存 + * 2. 断连时:{@link #removeConfig(Long)} 从缓存中移除 + * 3. 定时刷新:{@link #refreshConnectedDeviceConfigList(Set)} 批量刷新已连接设备的配置 * * @author 芋道源码 */ @@ -33,43 +37,118 @@ public class IotModbusTcpSlaveConfigCacheService { */ private final Map configCache = new ConcurrentHashMap<>(); - // TODO @AI:它的 diff 算法,是不是不用和 IotModbusTcpConfigCacheService 完全一致;更多是1)首次连接时,查找;2)断开连接,移除;3)定时轮询更新; - /** - * 已知的设备 ID 集合 - */ - private final Set knownDeviceIds = ConcurrentHashMap.newKeySet(); + // ==================== 按需加载(认证时) ==================== /** - * 刷新配置 + * 加载单个设备的配置(认证成功后调用) + *

+ * 从远程 API 获取全量配置,然后按 deviceId 匹配。 + * 如果远程获取失败,尝试从 Mock 数据中匹配。 * - * @return 最新的配置列表 + * @param deviceId 设备 ID + * @return 设备配置,未找到返回 null */ - public List refreshConfig() { + public IotModbusDeviceConfigRespDTO loadDeviceConfig(Long deviceId) { try { - // 1. 从远程获取配置 - // TODO @AI:需要过滤下,只查找连接的设备列表;并且只有主动轮询的,才会处理;方法名,应该是 List 结尾; + // 1. 从远程 API 获取全量配置 + // TODO @AI:等待修复,不着急; CommonResult> result = deviceApi.getEnabledModbusDeviceConfigs(); - if (result == null || !result.isSuccess() || result.getData() == null) { - log.warn("[refreshConfig][获取 Modbus 配置失败: {}]", result); - return new ArrayList<>(configCache.values()); + if (result != null && result.isSuccess() && result.getData() != null) { + for (IotModbusDeviceConfigRespDTO config : result.getData()) { + // 顺便更新缓存(其他已连接设备也受益) + configCache.put(config.getDeviceId(), config); + if (config.getDeviceId().equals(deviceId)) { + return config; + } + } } - List configs = new ArrayList<>(result.getData()); - - // 2. 追加 Mock 测试数据(一次性测试用途) - // TODO @芋艿:测试完成后移除 - configs.addAll(buildMockConfigs()); - - // 3. 更新缓存 - for (IotModbusDeviceConfigRespDTO config : configs) { - configCache.put(config.getDeviceId(), config); - } - return configs; } catch (Exception e) { - log.error("[refreshConfig][刷新配置失败]", e); - return new ArrayList<>(configCache.values()); + log.error("[loadDeviceConfig][从远程获取配置失败, deviceId={}]", deviceId, e); + } + + // 2. 远程未找到,尝试 Mock 数据 + // TODO @芋艿:测试完成后移除 + for (IotModbusDeviceConfigRespDTO mockConfig : buildMockConfigs()) { + configCache.put(mockConfig.getDeviceId(), mockConfig); + if (mockConfig.getDeviceId().equals(deviceId)) { + return mockConfig; + } + } + + return configCache.get(deviceId); + } + + // ==================== 定时刷新(已连接设备) ==================== + + /** + * 刷新已连接设备的配置缓存 + *

+ * 定时调用,从远程 API 拉取最新配置,只更新已连接设备的缓存。 + * + * @param connectedDeviceIds 当前已连接的设备 ID 集合 + * @return 已连接设备的最新配置列表 + */ + public List refreshConnectedDeviceConfigList(Set connectedDeviceIds) { + if (CollUtil.isEmpty(connectedDeviceIds)) { + return Collections.emptyList(); + } + try { + // 1. 从远程获取全量配置 + // TODO @AI:传递 ids 批量查询;需要分批啦; + CommonResult> result = deviceApi.getEnabledModbusDeviceConfigs(); + List allConfigs; + if (result != null && result.isSuccess() && result.getData() != null) { + allConfigs = new ArrayList<>(result.getData()); + } else { + log.warn("[refreshConnectedDeviceConfigList][获取 Modbus 配置失败: {}]", result); + allConfigs = new ArrayList<>(); + } + + // 2. 追加 Mock 测试数据 + // TODO @芋艿:测试完成后移除 + allConfigs.addAll(buildMockConfigs()); + + // 3. 只保留已连接设备的配置,更新缓存 + List connectedConfigs = new ArrayList<>(); + for (IotModbusDeviceConfigRespDTO config : allConfigs) { + if (connectedDeviceIds.contains(config.getDeviceId())) { + configCache.put(config.getDeviceId(), config); + connectedConfigs.add(config); + } + } + return connectedConfigs; + } catch (Exception e) { + log.error("[refreshConnectedDeviceConfigList][刷新配置失败]", e); + // 降级:返回缓存中已连接设备的配置 + List fallback = new ArrayList<>(); + for (Long deviceId : connectedDeviceIds) { + IotModbusDeviceConfigRespDTO config = configCache.get(deviceId); + if (config != null) { + fallback.add(config); + } + } + return fallback; } } + // ==================== 缓存操作 ==================== + + /** + * 获取设备配置 + */ + public IotModbusDeviceConfigRespDTO getConfig(Long deviceId) { + return configCache.get(deviceId); + } + + /** + * 移除设备配置缓存(设备断连时调用) + */ + public void removeConfig(Long deviceId) { + configCache.remove(deviceId); + } + + // ==================== Mock 数据 ==================== + /** * 构建 Mock 测试配置数据(一次性测试用途) * @@ -123,50 +202,4 @@ public class IotModbusTcpSlaveConfigCacheService { return Collections.singletonList(config); } - /** - * 获取设备配置 - */ - public IotModbusDeviceConfigRespDTO getConfig(Long deviceId) { - return configCache.get(deviceId); - } - - // TODO @AI:这个逻辑,是不是非必须? - /** - * 通过 clientId + username + password 查找设备配置(认证用) - * 暂通过遍历缓存实现,后续可优化为索引 - */ - public IotModbusDeviceConfigRespDTO findConfigByAuth(String clientId, String username, String password) { - // TODO @芋艿:测试完成后移除 mock 逻辑,改为正式查找 - // Mock:通过 clientId(格式 productKey.deviceName)匹配缓存中的设备 - if (clientId != null && clientId.contains(".")) { - String[] parts = clientId.split("\\.", 2); - String productKey = parts[0]; - String deviceName = parts[1]; - for (IotModbusDeviceConfigRespDTO config : configCache.values()) { - if (productKey.equals(config.getProductKey()) && deviceName.equals(config.getDeviceName())) { - return config; - } - } - } - return null; - } - - /** - * 清理已删除设备的资源 - */ - public void cleanupRemovedDevices(List currentConfigs, Consumer cleanupAction) { - Set currentDeviceIds = convertSet(currentConfigs, IotModbusDeviceConfigRespDTO::getDeviceId); - Set removedDeviceIds = new HashSet<>(knownDeviceIds); - removedDeviceIds.removeAll(currentDeviceIds); - - for (Long deviceId : removedDeviceIds) { - log.info("[cleanupRemovedDevices][清理已删除设备: {}]", deviceId); - configCache.remove(deviceId); - cleanupAction.accept(deviceId); - } - - knownDeviceIds.clear(); - knownDeviceIds.addAll(currentDeviceIds); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java index b342158a15..53363242c0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java @@ -4,9 +4,11 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; import lombok.Data; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -33,6 +35,7 @@ public class IotModbusTcpSlaveConnectionManager { * 连接信息 */ @Data + @Accessors(chain = true) public static class ConnectionInfo { /** @@ -57,12 +60,6 @@ public class IotModbusTcpSlaveConnectionManager { */ private IotModbusFrameFormatEnum frameFormat; - // TODO @AI:mode 是否非必须?! - /** - * 模式:1-云端轮询 2-主动上报 - */ - private Integer mode; - } /** @@ -75,17 +72,6 @@ public class IotModbusTcpSlaveConnectionManager { info.getDeviceId(), socket.remoteAddress()); } - // TODO @芋艿:待定是不是要保留?! - /** - * 设置连接的帧格式(首帧检测后调用) - */ - public void setFrameFormat(NetSocket socket, IotModbusFrameFormatEnum frameFormat) { - ConnectionInfo info = connectionMap.get(socket); - if (info != null) { - info.setFrameFormat(frameFormat); - } - } - /** * 获取连接信息 */ @@ -101,12 +87,11 @@ public class IotModbusTcpSlaveConnectionManager { return socket != null ? connectionMap.get(socket) : null; } - // TODO @AI:不用判断连接是否认证; /** - * 判断连接是否已认证 + * 获取所有已连接设备的 ID 集合 */ - public boolean isAuthenticated(NetSocket socket) { - return connectionMap.containsKey(socket); + public Set getConnectedDeviceIds() { + return deviceSocketMap.keySet(); } /** @@ -130,8 +115,7 @@ public class IotModbusTcpSlaveConnectionManager { log.warn("[sendToDevice][设备 {} 没有连接]", deviceId); return; } - // TODO @AI:直接复用 sendToSocket 方法?! - socket.write(Buffer.buffer(data)); + sendToSocket(socket, data); } /** @@ -141,7 +125,6 @@ public class IotModbusTcpSlaveConnectionManager { socket.write(Buffer.buffer(data)); } - // TODO @AI:貌似别的都没这个,是不是可以去掉哈?! /** * 关闭所有连接 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java index f82ddbfcae..3e5b99706c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager; +import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; import lombok.AllArgsConstructor; @@ -7,6 +8,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.util.Deque; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -66,26 +68,24 @@ public class IotModbusTcpSlavePendingRequestManager { public PendingRequest matchResponse(Long deviceId, IotModbusFrame frame, IotModbusFrameFormatEnum frameFormat) { Deque queue = pendingRequests.get(deviceId); - // TODO @AI:CollUtil.isEmpty(queue) - if (queue == null || queue.isEmpty()) { + if (CollUtil.isEmpty(queue)) { return null; } + // TCP 模式:按 transactionId 精确匹配 if (frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP && frame.getTransactionId() != null) { - // TCP 模式:按 transactionId 精确匹配 return matchByTransactionId(queue, frame.getTransactionId()); - } else { - // RTU 模式:FIFO,匹配 slaveId + functionCode - return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode()); } + // RTU 模式:FIFO,匹配 slaveId + functionCode + return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode()); } /** * 按 transactionId 匹配 */ private PendingRequest matchByTransactionId(Deque queue, int transactionId) { - // TODO @AI:需要兼容 jdk8; - for (var it = queue.iterator(); it.hasNext(); ) { + Iterator it = queue.iterator(); + while (it.hasNext()) { PendingRequest req = it.next(); if (req.getTransactionId() != null && req.getTransactionId() == transactionId) { it.remove(); @@ -99,8 +99,8 @@ public class IotModbusTcpSlavePendingRequestManager { * 按 FIFO 匹配 */ private PendingRequest matchByFifo(Deque queue, int slaveId, int functionCode) { - // TODO @AI:需要兼容 jdk8; - for (var it = queue.iterator(); it.hasNext(); ) { + Iterator it = queue.iterator(); + while (it.hasNext()) { PendingRequest req = it.next(); if (req.getSlaveId() == slaveId && req.getFunctionCode() == functionCode) { it.remove(); @@ -120,13 +120,11 @@ public class IotModbusTcpSlavePendingRequestManager { int removed = 0; while (!queue.isEmpty()) { PendingRequest req = queue.peekFirst(); - // TODO @AI:if return 减少括号层级; - if (req != null && req.getExpireAt() < now) { - queue.pollFirst(); - removed++; - } else { + if (req == null || req.getExpireAt() >= now) { break; // 队列有序,后面的没过期 } + queue.pollFirst(); + removed++; } if (removed > 0) { log.debug("[cleanupExpired][设备 {} 清理了 {} 个过期请求]", entry.getKey(), removed); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java index 0c1ef64f48..70bda9f82e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java @@ -19,12 +19,11 @@ import java.util.concurrent.atomic.AtomicInteger; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; -// TODO @AI:和 IotModbusTcpPollScheduler 很像,是不是可以做一些复用? /** * IoT Modbus TCP Slave 轮询调度器 *

- * 管理点位的轮询定时器,为 mode=1(云端轮询)的设备调度读取任务。 - * 与 tcpmaster 不同,这里不直接通过 j2mod 读取,而是: + * 管理点位的轮询定时器,为云端轮询模式的设备调度读取任务。 + * 与 tcpmaster 的 {@code IotModbusTcpPollScheduler} 不同,这里不通过 j2mod 直接读取,而是: * 1. 编码 Modbus 读请求帧 * 2. 通过 ConnectionManager 发送到设备的 TCP 连接 * 3. 将请求注册到 PendingRequestManager,等待设备响应 @@ -154,13 +153,15 @@ public class IotModbusTcpSlavePollScheduler { } // 2.1 确定帧格式和事务 ID - // TODO @AI:不允许为空! IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat(); if (frameFormat == null) { - frameFormat = IotModbusFrameFormatEnum.MODBUS_TCP; + log.warn("[pollPoint][设备 {} 帧格式为空,跳过轮询]", deviceId); + return; } - // TODO @AI:transactionId 需要根据设备来么?然后递增也根据 IotModbusFrameFormatEnum.MODBUS_TCP 提前判断; - int transactionId = transactionIdCounter.incrementAndGet() & 0xFFFF; + // TODO @AI:是不是得按照设备递增? + Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP + ? (transactionIdCounter.incrementAndGet() & 0xFFFF) + : null; int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1; // 2.2 编码读请求 byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(), @@ -170,7 +171,7 @@ public class IotModbusTcpSlavePollScheduler { deviceId, point.getId(), point.getIdentifier(), slaveId, point.getFunctionCode(), point.getRegisterAddress(), point.getRegisterCount(), - frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP ? transactionId : null, + transactionId, System.currentTimeMillis() + requestTimeout); pendingRequestManager.addRequest(pendingRequest);