feat:【iot】modbus-tcp 协议接入:35% 初始化:基于 keen-painting-lark.md 规划(修复 point 更新处理不正确的问题)

This commit is contained in:
YunaiV
2026-01-17 12:31:29 +08:00
parent ed78834eaf
commit 593455a085
2 changed files with 80 additions and 30 deletions

View File

@@ -4,14 +4,16 @@ import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/**
* IoT Modbus TCP 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果
*
@@ -27,36 +29,86 @@ public class IotModbusTcpPollScheduler {
private final IotModbusTcpUpstreamHandler upstreamHandler;
/**
* 设备的定时器列表deviceId -> timerId 列表
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
*/
private final Map<Long, List<Long>> deviceTimers = new ConcurrentHashMap<>();
private final Map<Long, Map<Long, PointTimerInfo>> devicePointTimers = new ConcurrentHashMap<>();
/**
* 更新轮询任务
* 点位定时器信息
*/
@Data
@AllArgsConstructor
private static class PointTimerInfo {
/**
* Vert.x 定时器 ID
*/
private Long timerId;
/**
* 轮询间隔(用于判断是否需要更新定时器)
*/
private Integer pollInterval;
}
/**
* 更新轮询任务(增量更新)
*
* 1. 【删除】点位:停止对应的轮询定时器
* 2. 【新增】点位:创建对应的轮询定时器
* 3. 【修改】点位pollInterval 变化,重建对应的轮询定时器
* 4. 其他属性变化(包括未变化的):不处理(下次轮询时自动使用新配置)
*/
public void updatePolling(IotModbusDeviceConfigRespDTO config) {
Long deviceId = config.getDeviceId();
List<IotModbusPointRespDTO> newPoints = config.getPoints();
Map<Long, PointTimerInfo> currentTimers = devicePointTimers
.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
// 1.1 计算新配置(包括新增和修改的点位)中的点位 ID 集合
Set<Long> newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId);
// 1.2 计算删除的点位 ID 集合
Set<Long> removedPointIds = new HashSet<>(currentTimers.keySet());
removedPointIds.removeAll(newPointIds);
// 1. 停止旧的轮询任务
stopPolling(deviceId);
// 2.1 为每个点位创建新的轮询任务
if (CollUtil.isEmpty(config.getPoints())) {
return;
}
List<Long> timerIds = new ArrayList<>(config.getPoints().size());
for (IotModbusPointRespDTO point : config.getPoints()) {
Long timerId = createPollTimer(config, point);
if (timerId != null) {
timerIds.add(timerId);
// 2. 处理删除的点位:停止不再存在的定时器
for (Long pointId : removedPointIds) {
PointTimerInfo timerInfo = currentTimers.remove(pointId);
if (timerInfo != null) {
vertx.cancelTimer(timerInfo.getTimerId());
log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId);
}
}
// 2.2 记录定时器
if (CollUtil.isEmpty(timerIds)) {
// 3. 处理新增和修改的点位
if (CollUtil.isEmpty(newPoints)) {
return;
}
deviceTimers.put(deviceId, timerIds);
log.debug("[updatePolling][设备 {} 创建了 {} 个轮询定时器]", deviceId, timerIds.size());
for (IotModbusPointRespDTO point : newPoints) {
Long pointId = point.getId();
Integer newPollInterval = point.getPollInterval();
PointTimerInfo existingTimer = currentTimers.get(pointId);
// 3.1 新增点位:创建定时器
if (existingTimer == null) {
Long timerId = createPollTimer(config, point);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]",
deviceId, pointId, newPollInterval);
}
} else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) {
// 3.2 pollInterval 变化:重建定时器
vertx.cancelTimer(existingTimer.getTimerId());
Long timerId = createPollTimer(config, point);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]",
deviceId, pointId, existingTimer.getPollInterval(), newPollInterval);
} else {
currentTimers.remove(pointId);
}
}
// 3.3 其他属性变化:不处理(下次轮询时自动使用新配置)
}
}
/**
@@ -104,21 +156,21 @@ public class IotModbusTcpPollScheduler {
* 停止设备的轮询
*/
public void stopPolling(Long deviceId) {
List<Long> timerIds = deviceTimers.remove(deviceId);
if (CollUtil.isEmpty(timerIds)) {
Map<Long, PointTimerInfo> timers = devicePointTimers.remove(deviceId);
if (CollUtil.isEmpty(timers)) {
return;
}
for (Long timerId : timerIds) {
vertx.cancelTimer(timerId);
for (PointTimerInfo timerInfo : timers.values()) {
vertx.cancelTimer(timerInfo.getTimerId());
}
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timerIds.size());
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size());
}
/**
* 停止所有轮询
*/
public void stopAll() {
for (Long deviceId : deviceTimers.keySet()) {
for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) {
stopPolling(deviceId);
}
}

View File

@@ -92,8 +92,6 @@ public class IotModbusTcpUpstreamProtocol {
// 2.1 确保连接存在
connectionManager.ensureConnection(config);
// 2.2 更新轮询任务
// DONE @AI【重要】当前实现是全量更新轮询任务未来可优化为增量更新只更新变化的点位
// TODO @AI【超级重要这次必须优化】需要对比 point 的更新1如果 points 删除了需要停止对应的轮询定时器2如果 points 新增了需要新增对应的轮询定时器3如果 points 只修改了 pollInterval需要更新对应的轮询定时器4如果 points 其他属性修改了,不需要处理轮询定时器
pollScheduler.updatePolling(config);
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);