diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpPollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpPollScheduler.java index 9f8bf685ee..84516d1fc9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpPollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpPollScheduler.java @@ -4,14 +4,16 @@ import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import io.vertx.core.Vertx; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; + /** * IoT Modbus TCP 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果 * @@ -27,36 +29,86 @@ public class IotModbusTcpPollScheduler { private final IotModbusTcpUpstreamHandler upstreamHandler; /** - * 设备的定时器列表:deviceId -> timerId 列表 + * 设备点位的定时器映射:deviceId -> (pointId -> PointTimerInfo) */ - private final Map> deviceTimers = new ConcurrentHashMap<>(); + private final Map> devicePointTimers = new ConcurrentHashMap<>(); /** - * 更新轮询任务 + * 点位定时器信息 + */ + @Data + @AllArgsConstructor + private static class PointTimerInfo { + + /** + * Vert.x 定时器 ID + */ + private Long timerId; + /** + * 轮询间隔(用于判断是否需要更新定时器) + */ + private Integer pollInterval; + + } + + /** + * 更新轮询任务(增量更新) + * + * 1. 【删除】点位:停止对应的轮询定时器 + * 2. 【新增】点位:创建对应的轮询定时器 + * 3. 【修改】点位:pollInterval 变化,重建对应的轮询定时器 + * 4. 其他属性变化(包括未变化的):不处理(下次轮询时自动使用新配置) */ public void updatePolling(IotModbusDeviceConfigRespDTO config) { Long deviceId = config.getDeviceId(); + List newPoints = config.getPoints(); + Map currentTimers = devicePointTimers + .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>()); + // 1.1 计算新配置(包括新增和修改的点位)中的点位 ID 集合 + Set newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId); + // 1.2 计算删除的点位 ID 集合 + Set removedPointIds = new HashSet<>(currentTimers.keySet()); + removedPointIds.removeAll(newPointIds); - // 1. 停止旧的轮询任务 - stopPolling(deviceId); - - // 2.1 为每个点位创建新的轮询任务 - if (CollUtil.isEmpty(config.getPoints())) { - return; - } - List timerIds = new ArrayList<>(config.getPoints().size()); - for (IotModbusPointRespDTO point : config.getPoints()) { - Long timerId = createPollTimer(config, point); - if (timerId != null) { - timerIds.add(timerId); + // 2. 处理删除的点位:停止不再存在的定时器 + for (Long pointId : removedPointIds) { + PointTimerInfo timerInfo = currentTimers.remove(pointId); + if (timerInfo != null) { + vertx.cancelTimer(timerInfo.getTimerId()); + log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId); } } - // 2.2 记录定时器 - if (CollUtil.isEmpty(timerIds)) { + + // 3. 处理新增和修改的点位 + if (CollUtil.isEmpty(newPoints)) { return; } - deviceTimers.put(deviceId, timerIds); - log.debug("[updatePolling][设备 {} 创建了 {} 个轮询定时器]", deviceId, timerIds.size()); + for (IotModbusPointRespDTO point : newPoints) { + Long pointId = point.getId(); + Integer newPollInterval = point.getPollInterval(); + PointTimerInfo existingTimer = currentTimers.get(pointId); + // 3.1 新增点位:创建定时器 + if (existingTimer == null) { + Long timerId = createPollTimer(config, point); + if (timerId != null) { + currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); + log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]", + deviceId, pointId, newPollInterval); + } + } else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) { + // 3.2 pollInterval 变化:重建定时器 + vertx.cancelTimer(existingTimer.getTimerId()); + Long timerId = createPollTimer(config, point); + if (timerId != null) { + currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); + log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]", + deviceId, pointId, existingTimer.getPollInterval(), newPollInterval); + } else { + currentTimers.remove(pointId); + } + } + // 3.3 其他属性变化:不处理(下次轮询时自动使用新配置) + } } /** @@ -104,21 +156,21 @@ public class IotModbusTcpPollScheduler { * 停止设备的轮询 */ public void stopPolling(Long deviceId) { - List timerIds = deviceTimers.remove(deviceId); - if (CollUtil.isEmpty(timerIds)) { + Map timers = devicePointTimers.remove(deviceId); + if (CollUtil.isEmpty(timers)) { return; } - for (Long timerId : timerIds) { - vertx.cancelTimer(timerId); + for (PointTimerInfo timerInfo : timers.values()) { + vertx.cancelTimer(timerInfo.getTimerId()); } - log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timerIds.size()); + log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size()); } /** * 停止所有轮询 */ public void stopAll() { - for (Long deviceId : deviceTimers.keySet()) { + for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) { stopPolling(deviceId); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java index 7236cb09c2..43b9ad5f5b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbustcp/IotModbusTcpUpstreamProtocol.java @@ -92,8 +92,6 @@ public class IotModbusTcpUpstreamProtocol { // 2.1 确保连接存在 connectionManager.ensureConnection(config); // 2.2 更新轮询任务 - // DONE @AI:【重要】当前实现是全量更新轮询任务,未来可优化为增量更新(只更新变化的点位) - // TODO @AI:【超级重要,这次必须优化】需要对比 point 的更新:1)如果 points 删除了,需要停止对应的轮询定时器;2)如果 points 新增了,需要新增对应的轮询定时器;3)如果 points 只修改了 pollInterval,需要更新对应的轮询定时器;4)如果 points 其他属性修改了,不需要处理轮询定时器 pollScheduler.updatePolling(config); } catch (Exception e) { log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);