feat(iot):modbus-tcp-slave 优化大量代码,主要是 polling 轮询的复用;

This commit is contained in:
YunaiV
2026-02-08 16:34:22 +08:00
parent e13cd545cc
commit c608b81c4e
24 changed files with 373 additions and 566 deletions

View File

@@ -17,7 +17,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@AllArgsConstructor
@Slf4j
public abstract class IotProtocolDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public abstract class AbstractIotProtocolDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotProtocol protocol;

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
import lombok.extern.slf4j.Slf4j;
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotCoapDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotCoapDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
public IotCoapDownstreamSubscriber(IotCoapProtocol protocol, IotMessageBus messageBus) {
super(protocol, messageBus);

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol;
import lombok.extern.slf4j.Slf4j;
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotEmqxDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotEmqxDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotEmqxDownstreamHandler downstreamHandler;

View File

@@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import lombok.extern.slf4j.Slf4j;
/**
@@ -13,7 +13,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class IotHttpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotHttpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
public IotHttpDownstreamSubscriber(IotProtocol protocol, IotMessageBus messageBus) {
super(protocol, messageBus);

View File

@@ -0,0 +1,269 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/**
* Modbus 轮询调度器基类
* <p>
* 封装通用的定时器管理、per-device 请求队列限速逻辑。
* 子类只需实现 {@link #pollPoint(Long, Long)} 定义具体的轮询动作。
* <p>
*
* @author 芋道源码
*/
@Slf4j
public abstract class AbstractIotModbusPollScheduler {
protected final Vertx vertx;
/**
* 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积
*/
private static final long MIN_REQUEST_INTERVAL = 1000;
/**
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
*/
private final Map<Long, Map<Long, PointTimerInfo>> devicePointTimers = new ConcurrentHashMap<>();
/**
* per-device 请求队列deviceId -> 待执行请求队列
*/
private final Map<Long, Queue<Runnable>> deviceRequestQueues = new ConcurrentHashMap<>();
/**
* per-device 上次请求时间戳deviceId -> lastRequestTimeMs
*/
private final Map<Long, Long> deviceLastRequestTime = new ConcurrentHashMap<>();
/**
* per-device 延迟 timer 标记deviceId -> 是否有延迟 timer 在等待
*/
private final Map<Long, Boolean> deviceDelayTimerActive = new ConcurrentHashMap<>();
protected AbstractIotModbusPollScheduler(Vertx vertx) {
this.vertx = vertx;
}
/**
* 点位定时器信息
*/
@Data
@AllArgsConstructor
private static class PointTimerInfo {
/**
* Vert.x 定时器 ID
*/
private Long timerId;
/**
* 轮询间隔(用于判断是否需要更新定时器)
*/
private Integer pollInterval;
}
// ========== 轮询管理 ==========
/**
* 更新轮询任务(增量更新)
*
* 1. 【删除】点位:停止对应的轮询定时器
* 2. 【新增】点位:创建对应的轮询定时器
* 3. 【修改】点位pollInterval 变化,重建对应的轮询定时器
* 【修改】其他属性变化不需要重建定时器pollPoint 运行时从 configCache 取最新 point
*/
public void updatePolling(IotModbusDeviceConfigRespDTO config) {
Long deviceId = config.getDeviceId();
List<IotModbusPointRespDTO> newPoints = config.getPoints();
Map<Long, PointTimerInfo> currentTimers = devicePointTimers
.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
// 1.1 计算新配置中的点位 ID 集合
Set<Long> newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId);
// 1.2 计算删除的点位 ID 集合
Set<Long> removedPointIds = new HashSet<>(currentTimers.keySet());
removedPointIds.removeAll(newPointIds);
// 2. 处理删除的点位:停止不再存在的定时器
for (Long pointId : removedPointIds) {
PointTimerInfo timerInfo = currentTimers.remove(pointId);
if (timerInfo != null) {
vertx.cancelTimer(timerInfo.getTimerId());
log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId);
}
}
// 3. 处理新增和修改的点位
if (CollUtil.isEmpty(newPoints)) {
return;
}
for (IotModbusPointRespDTO point : newPoints) {
Long pointId = point.getId();
Integer newPollInterval = point.getPollInterval();
PointTimerInfo existingTimer = currentTimers.get(pointId);
// 3.1 新增点位:创建定时器
if (existingTimer == null) {
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]",
deviceId, pointId, newPollInterval);
}
} else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) {
// 3.2 pollInterval 变化:重建定时器
vertx.cancelTimer(existingTimer.getTimerId());
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]",
deviceId, pointId, existingTimer.getPollInterval(), newPollInterval);
} else {
currentTimers.remove(pointId);
}
}
// 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point自动使用新配置
}
}
/**
* 创建轮询定时器
*/
private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) {
if (pollInterval == null || pollInterval <= 0) {
return null;
}
return vertx.setPeriodic(pollInterval, timerId -> {
try {
submitPollRequest(deviceId, pointId);
} catch (Exception e) {
log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e);
}
});
}
// ========== 请求队列per-device 限速) ==========
/**
* 提交轮询请求到设备请求队列(保证同设备请求间隔)
*/
private void submitPollRequest(Long deviceId, Long pointId) {
// 1. 【重要】将请求添加到设备的请求队列
Queue<Runnable> queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>());
queue.offer(() -> pollPoint(deviceId, pointId));
// 2. 处理设备请求队列(如果没有延迟 timer 在等待)
processDeviceQueue(deviceId);
}
/**
* 处理设备请求队列
*/
private void processDeviceQueue(Long deviceId) {
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
return;
}
// 检查是否已有延迟 timer 在等待
if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) {
return;
}
// 不满足间隔要求,延迟执行
long now = System.currentTimeMillis();
long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L);
long elapsed = now - lastTime;
if (elapsed < MIN_REQUEST_INTERVAL) {
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed);
return;
}
// 满足间隔要求,立即执行
Runnable task = queue.poll();
if (task == null) {
return;
}
deviceLastRequestTime.put(deviceId, now);
task.run();
// 继续处理队列中的下一个(如果有的话,需要延迟)
if (CollUtil.isNotEmpty(queue)) {
scheduleNextRequest(deviceId);
}
}
private void scheduleNextRequest(Long deviceId) {
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL);
}
private void scheduleNextRequest(Long deviceId, long delayMs) {
deviceDelayTimerActive.put(deviceId, true);
vertx.setTimer(delayMs, id -> {
deviceDelayTimerActive.put(deviceId, false);
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
return;
}
// 满足间隔要求,立即执行
Runnable task = queue.poll();
if (task == null) {
return;
}
deviceLastRequestTime.put(deviceId, System.currentTimeMillis());
task.run();
// 继续处理队列中的下一个(如果有的话,需要延迟)
if (CollUtil.isNotEmpty(queue)) {
scheduleNextRequest(deviceId);
}
});
}
// ========== 轮询执行 ==========
/**
* 轮询单个点位(子类实现具体的读取逻辑)
*
* @param deviceId 设备 ID
* @param pointId 点位 ID
*/
protected abstract void pollPoint(Long deviceId, Long pointId);
// ========== 停止 ==========
/**
* 停止设备的轮询
*/
public void stopPolling(Long deviceId) {
Map<Long, PointTimerInfo> timers = devicePointTimers.remove(deviceId);
if (CollUtil.isEmpty(timers)) {
return;
}
for (PointTimerInfo timerInfo : timers.values()) {
vertx.cancelTimer(timerInfo.getTimerId());
}
// 清理请求队列
deviceRequestQueues.remove(deviceId);
deviceLastRequestTime.remove(deviceId);
deviceDelayTimerActive.remove(deviceId);
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size());
}
/**
* 停止所有轮询
*/
public void stopAll() {
for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) {
stopPolling(deviceId);
}
}
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common;
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
@@ -34,7 +34,7 @@ import java.nio.ByteOrder;
*/
@UtilityClass
@Slf4j
public class IotModbusUtils {
public class IotModbusCommonUtils {
/** FC01: 读线圈 */
public static final int FC_READ_COILS = 1;
@@ -512,4 +512,15 @@ public class IotModbusUtils {
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
/**
* 根据点位 ID 查找点位配置
*
* @param config 设备 Modbus 配置
* @param pointId 点位 ID
* @return 匹配的点位配置未找到返回 null
*/
public static IotModbusPointRespDTO findPointById(IotModbusDeviceConfigRespDTO config, Long pointId) {
return CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId));
}
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client;
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
@@ -12,7 +12,7 @@ import io.vertx.core.Future;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils.*;
import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils.*;
/**
* IoT Modbus TCP 客户端工具类
@@ -26,7 +26,7 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModb
*/
@UtilityClass
@Slf4j
public class IotModbusTcpClientUtils {
public class IotModbusTcpMasterUtils {
/**
* 读取 Modbus 数据

View File

@@ -4,8 +4,8 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClientUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
import lombok.RequiredArgsConstructor;
@@ -57,13 +57,13 @@ public class IotModbusTcpDownstreamHandler {
String identifier = entry.getKey();
Object value = entry.getValue();
// 2.1 查找对应的点位配置
IotModbusPointRespDTO point = IotModbusUtils.findPoint(config, identifier);
IotModbusPointRespDTO point = IotModbusCommonUtils.findPoint(config, identifier);
if (point == null) {
log.warn("[handle][设备 {} 没有点位配置: {}]", message.getDeviceId(), identifier);
continue;
}
// 2.2 检查是否支持写操作
if (!IotModbusUtils.isWritable(point.getFunctionCode())) {
if (!IotModbusCommonUtils.isWritable(point.getFunctionCode())) {
log.warn("[handle][点位 {} 不支持写操作, 功能码={}]", identifier, point.getFunctionCode());
continue;
}
@@ -91,9 +91,9 @@ public class IotModbusTcpDownstreamHandler {
}
// 2.1 转换属性值为原始值
int[] rawValues = IotModbusUtils.convertToRawValues(value, point);
int[] rawValues = IotModbusCommonUtils.convertToRawValues(value, point);
// 2.2 执行 Modbus 写入
IotModbusTcpClientUtils.write(connection, slaveId, point, rawValues)
IotModbusTcpMasterUtils.write(connection, slaveId, point, rawValues)
.onSuccess(success -> log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]",
config.getDeviceId(), point.getIdentifier(), value))
.onFailure(e -> log.error("[writeProperty][写入失败, deviceId={}, identifier={}]",

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.do
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.IotModbusTcpMasterProtocol;
import lombok.extern.slf4j.Slf4j;
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotModbusTcpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotModbusTcpDownstreamHandler downstreamHandler;

View File

@@ -5,7 +5,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.extern.slf4j.Slf4j;
@@ -40,7 +40,7 @@ public class IotModbusTcpUpstreamHandler {
int[] rawValue) {
try {
// 1.1 转换原始值为物模型属性值
Object convertedValue = IotModbusUtils.convertToPropertyValue(rawValue, point);
Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValue, point);
log.debug("[handleReadResult][设备={}, 属性={}, 原始值={}, 转换值={}]",
config.getDeviceId(), point.getIdentifier(), rawValue, convertedValue);
// 1.2 构造属性上报消息

View File

@@ -1,242 +1,45 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClientUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager.AbstractIotModbusPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler;
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
// TODO @AI类的命名上要体现上 master。其它类似 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster 也要!
/**
* IoT Modbus TCP Master 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpPollScheduler {
public class IotModbusTcpPollScheduler extends AbstractIotModbusPollScheduler {
private final Vertx vertx;
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpUpstreamHandler upstreamHandler;
private final IotModbusTcpConfigCacheService configCacheService;
/**
* 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积
*/
private static final long MIN_REQUEST_INTERVAL = 100;
/**
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
*/
private final Map<Long, Map<Long, PointTimerInfo>> devicePointTimers = new ConcurrentHashMap<>();
/**
* per-device 请求队列deviceId -> 待执行请求队列
*/
private final Map<Long, Queue<Runnable>> deviceRequestQueues = new ConcurrentHashMap<>();
/**
* per-device 上次请求时间戳deviceId -> lastRequestTimeMs
*/
private final Map<Long, Long> deviceLastRequestTime = new ConcurrentHashMap<>();
/**
* per-device 延迟 timer 标记deviceId -> 是否有延迟 timer 在等待
*/
private final Map<Long, Boolean> deviceDelayTimerActive = new ConcurrentHashMap<>();
public IotModbusTcpPollScheduler(Vertx vertx,
IotModbusTcpConnectionManager connectionManager,
IotModbusTcpUpstreamHandler upstreamHandler,
IotModbusTcpConfigCacheService configCacheService) {
this.vertx = vertx;
super(vertx);
this.connectionManager = connectionManager;
this.upstreamHandler = upstreamHandler;
this.configCacheService = configCacheService;
}
// ========== 点位定时器 ==========
/**
* 点位定时器信息
*/
@Data
@AllArgsConstructor
private static class PointTimerInfo {
/**
* Vert.x 定时器 ID
*/
private Long timerId;
/**
* 轮询间隔(用于判断是否需要更新定时器)
*/
private Integer pollInterval;
}
// ========== 轮询管理 ==========
/**
* 更新轮询任务(增量更新)
*
* 1. 【删除】点位:停止对应的轮询定时器
* 2. 【新增】点位:创建对应的轮询定时器
* 3. 【修改】点位pollInterval 变化,重建对应的轮询定时器
* 4. 其他属性变化不需要重建定时器pollPoint 运行时从 configCache 取最新 point
*/
public void updatePolling(IotModbusDeviceConfigRespDTO config) {
Long deviceId = config.getDeviceId();
List<IotModbusPointRespDTO> newPoints = config.getPoints();
Map<Long, PointTimerInfo> currentTimers = devicePointTimers
.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
// 1.1 计算新配置中的点位 ID 集合
Set<Long> newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId);
// 1.2 计算删除的点位 ID 集合
Set<Long> removedPointIds = new HashSet<>(currentTimers.keySet());
removedPointIds.removeAll(newPointIds);
// 2. 处理删除的点位:停止不再存在的定时器
for (Long pointId : removedPointIds) {
PointTimerInfo timerInfo = currentTimers.remove(pointId);
if (timerInfo != null) {
vertx.cancelTimer(timerInfo.getTimerId());
log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId);
}
}
// 3. 处理新增和修改的点位
if (CollUtil.isEmpty(newPoints)) {
return;
}
for (IotModbusPointRespDTO point : newPoints) {
Long pointId = point.getId();
Integer newPollInterval = point.getPollInterval();
PointTimerInfo existingTimer = currentTimers.get(pointId);
// 3.1 新增点位:创建定时器
if (existingTimer == null) {
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]",
deviceId, pointId, newPollInterval);
}
} else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) {
// 3.2 pollInterval 变化:重建定时器
vertx.cancelTimer(existingTimer.getTimerId());
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]",
deviceId, pointId, existingTimer.getPollInterval(), newPollInterval);
} else {
currentTimers.remove(pointId);
}
}
// 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point自动使用新配置
}
}
/**
* 创建轮询定时器
* <p>
* 闭包只捕获 deviceId 和 pointId运行时从 configCache 获取最新配置,避免旧快照问题。
*/
private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) {
if (pollInterval == null || pollInterval <= 0) {
return null;
}
return vertx.setPeriodic(pollInterval, timerId -> {
try {
submitPollRequest(deviceId, pointId);
} catch (Exception e) {
log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e);
}
});
}
// ========== 请求队列per-device 限速) ==========
/**
* 提交轮询请求到设备请求队列(保证同设备请求间隔)
*/
private void submitPollRequest(Long deviceId, Long pointId) {
Queue<Runnable> queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>());
queue.offer(() -> pollPoint(deviceId, pointId));
processDeviceQueue(deviceId);
}
/**
* 处理设备请求队列
*/
private void processDeviceQueue(Long deviceId) {
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
return;
}
// 检查是否已有延迟 timer 在等待
if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) {
return;
}
long now = System.currentTimeMillis();
long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L);
long elapsed = now - lastTime;
if (elapsed >= MIN_REQUEST_INTERVAL) {
// 满足间隔要求,立即执行
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, now);
task.run();
// 继续处理队列中的下一个(如果有的话,需要延迟)
if (!queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
} else {
// 需要延迟
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed);
}
}
private void scheduleNextRequest(Long deviceId) {
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL);
}
private void scheduleNextRequest(Long deviceId, long delayMs) {
deviceDelayTimerActive.put(deviceId, true);
vertx.setTimer(delayMs, id -> {
deviceDelayTimerActive.put(deviceId, false);
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, System.currentTimeMillis());
task.run();
}
// 继续处理
if (queue != null && !queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
});
}
// ========== 轮询执行 ==========
/**
* 轮询单个点位
*/
private void pollPoint(Long deviceId, Long pointId) {
@Override
protected void pollPoint(Long deviceId, Long pointId) {
// 1.1 从 configCache 获取最新配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId);
if (config == null || CollUtil.isEmpty(config.getPoints())) {
@@ -244,8 +47,7 @@ public class IotModbusTcpPollScheduler {
return;
}
// 1.2 查找点位
// TODO @AI是不是这里可以抽到 IotModbusUtils 里?感觉应该有几个地方需要的;
IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId));
IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, pointId);
if (point == null) {
log.warn("[pollPoint][设备 {} 点位 {} 未找到]", deviceId, pointId);
return;
@@ -259,45 +61,13 @@ public class IotModbusTcpPollScheduler {
}
// 2.2 获取 slave ID
Integer slaveId = connectionManager.getSlaveId(deviceId);
if (slaveId == null) {
log.warn("[pollPoint][设备 {} 没有 slaveId]", deviceId);
return;
}
Assert.notNull(slaveId, "设备 {} 没有配置 slaveId", deviceId);
// 3. 执行 Modbus 读取
IotModbusTcpClientUtils.read(connection, slaveId, point)
IotModbusTcpMasterUtils.read(connection, slaveId, point)
.onSuccess(rawValue -> upstreamHandler.handleReadResult(config, point, rawValue))
.onFailure(e -> log.error("[pollPoint][读取点位失败, deviceId={}, identifier={}]",
deviceId, point.getIdentifier(), e));
}
// ========== 停止 ==========
/**
* 停止设备的轮询
*/
public void stopPolling(Long deviceId) {
Map<Long, PointTimerInfo> timers = devicePointTimers.remove(deviceId);
if (CollUtil.isEmpty(timers)) {
return;
}
for (PointTimerInfo timerInfo : timers.values()) {
vertx.cancelTimer(timerInfo.getTimerId());
}
// 清理请求队列
deviceRequestQueues.remove(deviceId);
deviceLastRequestTime.remove(deviceId);
deviceDelayTimerActive.remove(deviceId);
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size());
}
/**
* 停止所有轮询
*/
public void stopAll() {
for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) {
stopPolling(deviceId);
}
}
}

View File

@@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -39,7 +39,7 @@ public class IotModbusFrame {
* 当功能码最高位为 1 时(异常响应),此字段存储异常码。
* 为 null 表示非异常响应。
*
* @see IotModbusUtils#FC_EXCEPTION_MASK
* @see IotModbusCommonUtils#FC_EXCEPTION_MASK
*/
private Integer exceptionCode;

View File

@@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
@@ -96,7 +96,7 @@ public class IotModbusFrameDecoder {
return null;
}
// 校验 CRC
if (!IotModbusUtils.verifyCrc16(data)) {
if (!IotModbusCommonUtils.verifyCrc16(data)) {
log.warn("[decodeRtuResponse][CRC 校验失败]");
return null;
}
@@ -119,8 +119,8 @@ public class IotModbusFrameDecoder {
.setPdu(pdu)
.setTransactionId(transactionId);
// 异常响应
if (IotModbusUtils.isExceptionResponse(functionCode)) {
frame.setFunctionCode(IotModbusUtils.extractOriginalFunctionCode(functionCode));
if (IotModbusCommonUtils.isExceptionResponse(functionCode)) {
frame.setFunctionCode(IotModbusCommonUtils.extractOriginalFunctionCode(functionCode));
if (pdu.length >= 1) {
frame.setExceptionCode(pdu[0] & 0xFF);
}
@@ -281,7 +281,7 @@ public class IotModbusFrameDecoder {
this.slaveId = bytes[0];
this.functionCode = bytes[1];
int fc = functionCode & 0xFF;
if (IotModbusUtils.isExceptionResponse(fc)) {
if (IotModbusCommonUtils.isExceptionResponse(fc)) {
// 异常响应:完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5 字节
// 已有 6 字节(多 1 字节),取前 5 字节组装
Buffer frame = Buffer.buffer(5);
@@ -290,7 +290,7 @@ public class IotModbusFrameDecoder {
frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC
emitFrame(frame);
resetToHeader();
} else if (IotModbusUtils.isReadResponse(fc) || fc == customFunctionCode) {
} else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FCbytes[2] = byteCount
this.byteCount = bytes[2];
int bc = byteCount & 0xFF;
@@ -315,7 +315,7 @@ public class IotModbusFrameDecoder {
this.expectedDataLen = bc + 2; // byteCount 个数据 + 2 CRC
parser.fixedSizeMode(remaining);
}
} else if (IotModbusUtils.isWriteResponse(fc)) {
} else if (IotModbusCommonUtils.isWriteResponse(fc)) {
// 写响应:总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8 字节
// 已有 6 字节,还需 2 字节
state = STATE_WRITE_BODY;
@@ -356,15 +356,15 @@ public class IotModbusFrameDecoder {
this.slaveId = header[0];
this.functionCode = header[1];
int fc = functionCode & 0xFF;
if (IotModbusUtils.isExceptionResponse(fc)) {
if (IotModbusCommonUtils.isExceptionResponse(fc)) {
// 异常响应
state = STATE_EXCEPTION_BODY;
parser.fixedSizeMode(3); // exceptionCode(1) + CRC(2)
} else if (IotModbusUtils.isReadResponse(fc) || fc == customFunctionCode) {
} else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) {
// 读响应或自定义 FC
state = STATE_READ_BYTE_COUNT;
parser.fixedSizeMode(1); // byteCount
} else if (IotModbusUtils.isWriteResponse(fc)) {
} else if (IotModbusCommonUtils.isWriteResponse(fc)) {
// 写响应
state = STATE_WRITE_BODY;
pendingData = Buffer.buffer();

View File

@@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -60,7 +60,7 @@ public class IotModbusFrameEncoder {
public byte[] encodeWriteSingleRequest(int slaveId, int functionCode, int address, int value,
IotModbusFrameFormatEnum format, Integer transactionId) {
// FC05 单写线圈Modbus 标准要求 value 为 0xFF00ON或 0x0000OFF
if (functionCode == IotModbusUtils.FC_WRITE_SINGLE_COIL) {
if (functionCode == IotModbusCommonUtils.FC_WRITE_SINGLE_COIL) {
value = (value != 0) ? 0xFF00 : 0x0000;
}
// PDU: [FC(1)] [Address(2)] [Value(2)]
@@ -120,7 +120,7 @@ public class IotModbusFrameEncoder {
int quantity = values.length;
int byteCount = (quantity + 7) / 8; // 向上取整
byte[] pdu = new byte[6 + byteCount];
pdu[0] = (byte) IotModbusUtils.FC_WRITE_MULTIPLE_COILS; // FC15
pdu[0] = (byte) IotModbusCommonUtils.FC_WRITE_MULTIPLE_COILS; // FC15
pdu[1] = (byte) ((address >> 8) & 0xFF);
pdu[2] = (byte) (address & 0xFF);
pdu[3] = (byte) ((quantity >> 8) & 0xFF);
@@ -204,7 +204,7 @@ public class IotModbusFrameEncoder {
frame[0] = (byte) slaveId;
System.arraycopy(pdu, 0, frame, 1, pdu.length);
// 计算并追加 CRC16
int crc = IotModbusUtils.calculateCrc16(frame, frame.length - 2);
int crc = IotModbusCommonUtils.calculateCrc16(frame, frame.length - 2);
frame[frame.length - 2] = (byte) (crc & 0xFF); // CRC Low
frame[frame.length - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High
return frame;

View File

@@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager;
@@ -82,13 +82,13 @@ public class IotModbusTcpSlaveDownstreamHandler {
String identifier = entry.getKey();
Object value = entry.getValue();
// 2.1 查找对应的点位配置
IotModbusPointRespDTO point = IotModbusUtils.findPoint(config, identifier);
IotModbusPointRespDTO point = IotModbusCommonUtils.findPoint(config, identifier);
if (point == null) {
log.warn("[handle][设备 {} 没有点位配置: {}]", message.getDeviceId(), identifier);
continue;
}
// 2.2 检查是否支持写操作
if (!IotModbusUtils.isWritable(point.getFunctionCode())) {
if (!IotModbusCommonUtils.isWritable(point.getFunctionCode())) {
log.warn("[handle][点位 {} 不支持写操作, 功能码={}]", identifier, point.getFunctionCode());
continue;
}
@@ -104,7 +104,7 @@ public class IotModbusTcpSlaveDownstreamHandler {
private void writeProperty(Long deviceId, ConnectionInfo connInfo,
IotModbusPointRespDTO point, Object value) {
// 1.1 转换属性值为原始值
int[] rawValues = IotModbusUtils.convertToRawValues(value, point);
int[] rawValues = IotModbusCommonUtils.convertToRawValues(value, point);
// 1.2 确定帧格式和事务 ID
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
@@ -117,8 +117,8 @@ public class IotModbusTcpSlaveDownstreamHandler {
// 1.3 编码写请求
byte[] data;
int readFunctionCode = point.getFunctionCode();
Integer writeSingleCode = IotModbusUtils.getWriteSingleFunctionCode(readFunctionCode);
Integer writeMultipleCode = IotModbusUtils.getWriteMultipleFunctionCode(readFunctionCode);
Integer writeSingleCode = IotModbusCommonUtils.getWriteSingleFunctionCode(readFunctionCode);
Integer writeMultipleCode = IotModbusCommonUtils.getWriteMultipleFunctionCode(readFunctionCode);
if (rawValues.length == 1 && writeSingleCode != null) {
// 单个值使用单写功能码FC05/FC06
data = frameEncoder.encodeWriteSingleRequest(slaveId, writeSingleCode,

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.dow
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.IotModbusTcpSlaveProtocol;
import lombok.extern.slf4j.Slf4j;
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpSlaveDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotModbusTcpSlaveDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotModbusTcpSlaveDownstreamHandler downstreamHandler;

View File

@@ -19,7 +19,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
@@ -237,7 +237,7 @@ public class IotModbusTcpSlaveUpstreamHandler {
return;
}
// 2.2 提取寄存器值
int[] rawValues = IotModbusUtils.extractValues(frame);
int[] rawValues = IotModbusCommonUtils.extractValues(frame);
if (rawValues == null) {
log.warn("[handlePollingResponse][提取寄存器值失败, deviceId={}, identifier={}]",
info.getDeviceId(), request.getIdentifier());
@@ -248,14 +248,13 @@ public class IotModbusTcpSlaveUpstreamHandler {
if (config == null || CollUtil.isEmpty(config.getPoints())) {
return;
}
IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(),
p -> p.getId().equals(request.getPointId()));
IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, request.getPointId());
if (point == null) {
return;
}
// 3.1 点位翻译
Object convertedValue = IotModbusUtils.convertToPropertyValue(rawValues, point);
Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValues, point);
// 3.2 上报属性
Map<String, Object> params = MapUtil.of(request.getIdentifier(), convertedValue);
IotDeviceMessage message = IotDeviceMessage.requestOf(

View File

@@ -75,8 +75,8 @@ public class IotModbusTcpSlaveConnectionManager {
try {
oldSocket.close();
} catch (Exception e) {
// TODO @AI这里日志可以打的更完整一点方便追溯比如设备 ID、旧连接地址等
log.warn("[registerConnection][关闭旧 socket 失败]", e);
log.warn("[registerConnection][关闭旧 socket 失败, deviceId={}, oldRemote={}]",
info.getDeviceId(), oldSocket.remoteAddress(), e);
}
}

View File

@@ -1,42 +1,28 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager.AbstractIotModbusPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest;
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/**
* IoT Modbus TCP Slave 轮询调度器
* <p>
* 管理点位的轮询定时器,为云端轮询模式的设备调度读取任务。
* 与 tcpmaster 的 {@code IotModbusTcpPollScheduler} 不同,这里不通过 j2mod 直接读取,而是:
* 1. 编码 Modbus 读请求帧
* 2. 通过 ConnectionManager 发送到设备的 TCP 连接
* 3. 将请求注册到 PendingRequestManager等待设备响应
* <p>
* 闭包只捕获 deviceId + pointId运行时从 configCacheService 获取最新 config 和 point
* 避免闭包捕获旧快照导致上报消息用旧身份的问题。
* IoT Modbus TCP Slave 轮询调度器:编码读请求帧,通过 TCP 连接发送到设备,注册 PendingRequest 等待响应
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpSlavePollScheduler {
public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollScheduler {
private final Vertx vertx;
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
@@ -47,29 +33,6 @@ public class IotModbusTcpSlavePollScheduler {
*/
private final AtomicInteger transactionIdCounter;
/**
* 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积
*/
private static final long MIN_REQUEST_INTERVAL = 200;
/**
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
*/
private final Map<Long, Map<Long, PointTimerInfo>> devicePointTimers = new ConcurrentHashMap<>();
/**
* per-device 请求队列deviceId -> 待执行请求队列
*/
private final Map<Long, Queue<Runnable>> deviceRequestQueues = new ConcurrentHashMap<>();
/**
* per-device 上次请求时间戳deviceId -> lastRequestTimeMs
*/
private final Map<Long, Long> deviceLastRequestTime = new ConcurrentHashMap<>();
/**
* per-device 延迟 timer 标记deviceId -> 是否有延迟 timer 在等待
*/
private final Map<Long, Boolean> deviceDelayTimerActive = new ConcurrentHashMap<>();
public IotModbusTcpSlavePollScheduler(Vertx vertx,
IotModbusTcpSlaveConnectionManager connectionManager,
IotModbusFrameEncoder frameEncoder,
@@ -77,7 +40,7 @@ public class IotModbusTcpSlavePollScheduler {
int requestTimeout,
AtomicInteger transactionIdCounter,
IotModbusTcpSlaveConfigCacheService configCacheService) {
this.vertx = vertx;
super(vertx);
this.connectionManager = connectionManager;
this.frameEncoder = frameEncoder;
this.pendingRequestManager = pendingRequestManager;
@@ -86,185 +49,13 @@ public class IotModbusTcpSlavePollScheduler {
this.configCacheService = configCacheService;
}
/**
* 点位定时器信息
*/
@Data
@AllArgsConstructor
private static class PointTimerInfo {
/**
* Vert.x 定时器 ID
*/
private Long timerId;
/**
* 轮询间隔(用于判断是否需要更新定时器)
*/
private Integer pollInterval;
}
// ========== 轮询管理 ==========
/**
* 更新轮询任务(增量更新)
*
* 1. 【删除】点位:停止对应的轮询定时器
* 2. 【新增】点位:创建对应的轮询定时器
* 3. 【修改】点位pollInterval 变化,重建对应的轮询定时器
* 4. 其他属性变化不需要重建定时器pollPoint 运行时从 configCache 取最新 point
*/
public void updatePolling(IotModbusDeviceConfigRespDTO config) {
Long deviceId = config.getDeviceId();
List<IotModbusPointRespDTO> newPoints = config.getPoints();
Map<Long, PointTimerInfo> currentTimers = devicePointTimers
.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
// 1.1 计算新配置中的点位 ID 集合
Set<Long> newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId);
// 1.2 计算删除的点位 ID 集合
Set<Long> removedPointIds = new HashSet<>(currentTimers.keySet());
removedPointIds.removeAll(newPointIds);
// 2. 处理删除的点位:停止不再存在的定时器
for (Long pointId : removedPointIds) {
PointTimerInfo timerInfo = currentTimers.remove(pointId);
if (timerInfo != null) {
vertx.cancelTimer(timerInfo.getTimerId());
log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId);
}
}
// 3. 处理新增和修改的点位
if (CollUtil.isEmpty(newPoints)) {
return;
}
for (IotModbusPointRespDTO point : newPoints) {
Long pointId = point.getId();
Integer newPollInterval = point.getPollInterval();
PointTimerInfo existingTimer = currentTimers.get(pointId);
// 3.1 新增点位:创建定时器
if (existingTimer == null) {
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]",
deviceId, pointId, newPollInterval);
}
} else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) {
// 3.2 pollInterval 变化:重建定时器
vertx.cancelTimer(existingTimer.getTimerId());
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]",
deviceId, pointId, existingTimer.getPollInterval(), newPollInterval);
} else {
currentTimers.remove(pointId);
}
}
// 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point自动使用新配置
}
}
/**
* 创建轮询定时器
* <p>
* 闭包只捕获 deviceId 和 pointId运行时从 configCache 获取最新配置,避免旧快照问题。
*/
private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) {
if (pollInterval == null || pollInterval <= 0) {
return null;
}
return vertx.setPeriodic(pollInterval, timerId -> {
try {
submitPollRequest(deviceId, pointId);
} catch (Exception e) {
log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e);
}
});
}
// ========== 请求队列per-device 限速) ==========
/**
* 提交轮询请求到设备请求队列(保证同设备请求间隔)
*/
private void submitPollRequest(Long deviceId, Long pointId) {
// 1. 将请求添加到设备的请求队列
Queue<Runnable> queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>());
queue.offer(() -> pollPoint(deviceId, pointId));
// 2. 处理设备请求队列(如果没有延迟 timer 在等待)
processDeviceQueue(deviceId);
}
/**
* 处理设备请求队列
*/
private void processDeviceQueue(Long deviceId) {
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
return;
}
// 检查是否已有延迟 timer 在等待
if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) {
return;
}
// TODO @AI可以改成不满足间隔然后 return简化括号层级
long now = System.currentTimeMillis();
long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L);
long elapsed = now - lastTime;
if (elapsed >= MIN_REQUEST_INTERVAL) {
// 满足间隔要求,立即执行
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, now);
task.run();
// 继续处理队列中的下一个(如果有的话,需要延迟)
if (!queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
} else {
// 需要延迟
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed);
}
}
private void scheduleNextRequest(Long deviceId) {
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL);
}
private void scheduleNextRequest(Long deviceId, long delayMs) {
deviceDelayTimerActive.put(deviceId, true);
vertx.setTimer(delayMs, id -> {
deviceDelayTimerActive.put(deviceId, false);
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
// TODO @AIif return简化下
if (CollUtil.isEmpty(queue)) {
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, System.currentTimeMillis());
task.run();
}
// 继续处理
if (queue != null && !queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
});
}
// ========== 轮询执行 ==========
/**
* 轮询单个点位
* <p>
* 运行时从 configCacheService 获取最新的 config 和 point而非使用闭包捕获的旧引用。
*/
private void pollPoint(Long deviceId, Long pointId) {
@Override
protected void pollPoint(Long deviceId, Long pointId) {
// 1.1 从 configCache 获取最新配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId);
if (config == null || CollUtil.isEmpty(config.getPoints())) {
@@ -272,34 +63,31 @@ public class IotModbusTcpSlavePollScheduler {
return;
}
// 1.2 查找点位
IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId));
IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, pointId);
if (point == null) {
log.warn("[pollPoint][设备 {} 点位 {} 未找到]", deviceId, pointId);
return;
}
// 2. 获取连接信息
ConnectionInfo connInfo = connectionManager.getConnectionInfoByDeviceId(deviceId);
if (connInfo == null) {
// 2.1 获取连接
ConnectionInfo connection = connectionManager.getConnectionInfoByDeviceId(deviceId);
if (connection == null) {
log.debug("[pollPoint][设备 {} 没有连接,跳过轮询]", deviceId);
return;
}
// 2.2 获取 slave ID
IotModbusFrameFormatEnum frameFormat = connection.getFrameFormat();
Assert.notNull(frameFormat, "设备 {} 的帧格式不能为空", deviceId);
int slaveId = connection.getSlaveId();
Assert.notNull(connection.getSlaveId(), "设备 {} 的 slaveId 不能为空", deviceId);
// 3.1 确定帧格式和事务 ID
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
if (frameFormat == null) {
log.warn("[pollPoint][设备 {} 帧格式为空,跳过轮询]", deviceId);
return;
}
// 3.1 编码读请求
Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP
? (transactionIdCounter.incrementAndGet() & 0xFFFF)
: null;
// TODO @AI这里断言必须非空
int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1;
// 3.2 编码读请求
byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount(), frameFormat, transactionId);
// 3.3 注册 PendingRequest
// 3.2 注册 PendingRequest
PendingRequest pendingRequest = new PendingRequest(
deviceId, point.getId(), point.getIdentifier(),
slaveId, point.getFunctionCode(),
@@ -307,41 +95,11 @@ public class IotModbusTcpSlavePollScheduler {
transactionId,
System.currentTimeMillis() + requestTimeout);
pendingRequestManager.addRequest(pendingRequest);
// 4. 发送读请求
// 3.3 发送读请求
connectionManager.sendToDevice(deviceId, data);
log.debug("[pollPoint][设备={}, 点位={}, FC={}, 地址={}, 数量={}]",
deviceId, point.getIdentifier(), point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount());
}
// ========== 停止 ==========
/**
* 停止设备的轮询
*/
public void stopPolling(Long deviceId) {
Map<Long, PointTimerInfo> timers = devicePointTimers.remove(deviceId);
if (CollUtil.isEmpty(timers)) {
return;
}
for (PointTimerInfo timerInfo : timers.values()) {
vertx.cancelTimer(timerInfo.getTimerId());
}
// 清理请求队列
deviceRequestQueues.remove(deviceId);
deviceLastRequestTime.remove(deviceId);
deviceDelayTimerActive.remove(deviceId);
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size());
}
/**
* 停止所有轮询
*/
public void stopAll() {
for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) {
stopPolling(deviceId);
}
}
}

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol;
import lombok.extern.slf4j.Slf4j;
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotMqttDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotMqttDownstreamHandler downstreamHandler;

View File

@@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import lombok.extern.slf4j.Slf4j;
/**
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotTcpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotTcpDownstreamHandler downstreamHandler;

View File

@@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import lombok.extern.slf4j.Slf4j;
/**
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotUdpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotUdpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotUdpDownstreamHandler downstreamHandler;

View File

@@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstrea
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol;
import lombok.extern.slf4j.Slf4j;
@@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotWebSocketDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public class IotWebSocketDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotWebSocketDownstreamHandler downstreamHandler;

View File

@@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
@@ -285,7 +285,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest {
frame[3 + i * 2 + 1] = (byte) (registerValues[i] & 0xFF);
}
// 计算 CRC16
int crc = IotModbusUtils.calculateCrc16(frame, totalLength - 2);
int crc = IotModbusCommonUtils.calculateCrc16(frame, totalLength - 2);
frame[totalLength - 2] = (byte) (crc & 0xFF); // CRC Low
frame[totalLength - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High
return frame;