feat(iot):modbus-tcp-slave 整体代码进一步优化

This commit is contained in:
YunaiV
2026-02-08 10:13:11 +08:00
parent 3ab33527e3
commit 4d578b239c
6 changed files with 157 additions and 150 deletions

View File

@@ -1,10 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
@@ -32,9 +32,9 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
// DONE @AI不用主动上报
/**
* IoT 网关 Modbus TCP Slave 协议
* <p>
@@ -152,10 +152,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
}
try {
// 1.1 首次加载配置
// TODO @AI可能首次不用加载你在想想
refreshConfig();
// 1.2 启动配置刷新定时器
// 1. 启动配置刷新定时器
int refreshInterval = slaveConfig.getConfigRefreshInterval();
configRefreshTimerId = vertx.setPeriodic(
TimeUnit.SECONDS.toMillis(refreshInterval),
@@ -286,6 +283,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
}
pollScheduler.stopPolling(info.getDeviceId());
pendingRequestManager.removeDevice(info.getDeviceId());
configCacheService.removeConfig(info.getDeviceId());
log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]",
info.getDeviceId(), socket.remoteAddress());
});
@@ -297,35 +295,30 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
}
/**
* 刷新配置
* 刷新已连接设备的配置(定时调用)
* <p>
* 与 tcpmaster 不同slave 只刷新已连接设备的配置,不做全量 diff。
* 设备的新增(认证时)和删除(断连时)分别在 {@link #handleConnection} 中处理。
*/
private synchronized void refreshConfig() {
try {
// 1. 从 biz 拉取最新配置
List<IotModbusDeviceConfigRespDTO> configs = configCacheService.refreshConfig();
log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size());
// 1. 只刷新已连接设备的配置
Set<Long> connectedDeviceIds = connectionManager.getConnectedDeviceIds();
if (CollUtil.isEmpty(connectedDeviceIds)) {
return;
}
List<IotModbusDeviceConfigRespDTO> configs =
configCacheService.refreshConnectedDeviceConfigList(connectedDeviceIds);
log.debug("[refreshConfig][刷新了 {} 个已连接设备的配置]", configs.size());
// 2. 更新已连接设备的轮询任务(仅 mode=1
// 2. 更新已连接设备的轮询任务
for (IotModbusDeviceConfigRespDTO config : configs) {
try {
if (config.getMode() != null
&& config.getMode().equals(IotModbusModeEnum.POLLING.getMode())) {
// 只有已连接的设备才启动轮询
ConnectionInfo connInfo = connectionManager.getConnectionInfoByDeviceId(config.getDeviceId());
if (connInfo != null) {
pollScheduler.updatePolling(config);
}
}
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);
}
}
// 3. 清理已删除设备的资源
configCacheService.cleanupRemovedDevices(configs, deviceId -> {
pollScheduler.stopPolling(deviceId);
pendingRequestManager.removeDevice(deviceId);
});
} catch (Exception e) {
log.error("[refreshConfig][刷新配置失败]", e);
}

View File

@@ -16,7 +16,6 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
@@ -10,15 +11,18 @@ import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
// TODO @AI和 IotModbusTcpConfigCacheService 基本一致?!
/**
* IoT Modbus TCP Slave 配置缓存服务
* <p>
* 负责:从 biz 拉取 Modbus 设备配置,缓存配置数据,并检测配置变更
* 与 tcpmaster 的 {@code IotModbusTcpConfigCacheService} 不同:
* - tcpmaster 启动时拉全量配置 → 主动建连,需要全量 diff 检测新增/删除设备
* - tcpslave 设备主动连接 → 认证时按需加载配置,断连时清理,定时刷新已连接设备的配置
* <p>
* 配置生命周期:
* 1. 认证时:{@link #loadDeviceConfig(Long)} 按 deviceId 从 API 加载配置到缓存
* 2. 断连时:{@link #removeConfig(Long)} 从缓存中移除
* 3. 定时刷新:{@link #refreshConnectedDeviceConfigList(Set)} 批量刷新已连接设备的配置
*
* @author 芋道源码
*/
@@ -33,43 +37,118 @@ public class IotModbusTcpSlaveConfigCacheService {
*/
private final Map<Long, IotModbusDeviceConfigRespDTO> configCache = new ConcurrentHashMap<>();
// TODO @AI它的 diff 算法,是不是不用和 IotModbusTcpConfigCacheService 完全一致更多是1首次连接时查找2断开连接移除3定时轮询更新
/**
* 已知的设备 ID 集合
*/
private final Set<Long> knownDeviceIds = ConcurrentHashMap.newKeySet();
// ==================== 按需加载(认证时) ====================
/**
* 刷新配置
* 加载单个设备的配置(认证成功后调用)
* <p>
* 从远程 API 获取全量配置,然后按 deviceId 匹配。
* 如果远程获取失败,尝试从 Mock 数据中匹配。
*
* @return 最新的配置列表
* @param deviceId 设备 ID
* @return 设备配置,未找到返回 null
*/
public List<IotModbusDeviceConfigRespDTO> refreshConfig() {
public IotModbusDeviceConfigRespDTO loadDeviceConfig(Long deviceId) {
try {
// 1. 从远程获取配置
// TODO @AI需要过滤下,只查找连接的设备列表;并且只有主动轮询的,才会处理;方法名,应该是 List 结尾
// 1. 从远程 API 获取全量配置
// TODO @AI等待修复,不着急
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getEnabledModbusDeviceConfigs();
if (result == null || !result.isSuccess() || result.getData() == null) {
log.warn("[refreshConfig][获取 Modbus 配置失败: {}]", result);
return new ArrayList<>(configCache.values());
}
List<IotModbusDeviceConfigRespDTO> configs = new ArrayList<>(result.getData());
// 2. 追加 Mock 测试数据(一次性测试用途)
// TODO @芋艿:测试完成后移除
configs.addAll(buildMockConfigs());
// 3. 更新缓存
for (IotModbusDeviceConfigRespDTO config : configs) {
if (result != null && result.isSuccess() && result.getData() != null) {
for (IotModbusDeviceConfigRespDTO config : result.getData()) {
// 顺便更新缓存(其他已连接设备也受益)
configCache.put(config.getDeviceId(), config);
if (config.getDeviceId().equals(deviceId)) {
return config;
}
}
}
return configs;
} catch (Exception e) {
log.error("[refreshConfig][刷新配置失败]", e);
return new ArrayList<>(configCache.values());
log.error("[loadDeviceConfig][从远程获取配置失败, deviceId={}]", deviceId, e);
}
// 2. 远程未找到,尝试 Mock 数据
// TODO @芋艿:测试完成后移除
for (IotModbusDeviceConfigRespDTO mockConfig : buildMockConfigs()) {
configCache.put(mockConfig.getDeviceId(), mockConfig);
if (mockConfig.getDeviceId().equals(deviceId)) {
return mockConfig;
}
}
return configCache.get(deviceId);
}
// ==================== 定时刷新(已连接设备) ====================
/**
* 刷新已连接设备的配置缓存
* <p>
* 定时调用,从远程 API 拉取最新配置,只更新已连接设备的缓存。
*
* @param connectedDeviceIds 当前已连接的设备 ID 集合
* @return 已连接设备的最新配置列表
*/
public List<IotModbusDeviceConfigRespDTO> refreshConnectedDeviceConfigList(Set<Long> connectedDeviceIds) {
if (CollUtil.isEmpty(connectedDeviceIds)) {
return Collections.emptyList();
}
try {
// 1. 从远程获取全量配置
// TODO @AI传递 ids 批量查询;需要分批啦;
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getEnabledModbusDeviceConfigs();
List<IotModbusDeviceConfigRespDTO> allConfigs;
if (result != null && result.isSuccess() && result.getData() != null) {
allConfigs = new ArrayList<>(result.getData());
} else {
log.warn("[refreshConnectedDeviceConfigList][获取 Modbus 配置失败: {}]", result);
allConfigs = new ArrayList<>();
}
// 2. 追加 Mock 测试数据
// TODO @芋艿:测试完成后移除
allConfigs.addAll(buildMockConfigs());
// 3. 只保留已连接设备的配置,更新缓存
List<IotModbusDeviceConfigRespDTO> connectedConfigs = new ArrayList<>();
for (IotModbusDeviceConfigRespDTO config : allConfigs) {
if (connectedDeviceIds.contains(config.getDeviceId())) {
configCache.put(config.getDeviceId(), config);
connectedConfigs.add(config);
}
}
return connectedConfigs;
} catch (Exception e) {
log.error("[refreshConnectedDeviceConfigList][刷新配置失败]", e);
// 降级:返回缓存中已连接设备的配置
List<IotModbusDeviceConfigRespDTO> fallback = new ArrayList<>();
for (Long deviceId : connectedDeviceIds) {
IotModbusDeviceConfigRespDTO config = configCache.get(deviceId);
if (config != null) {
fallback.add(config);
}
}
return fallback;
}
}
// ==================== 缓存操作 ====================
/**
* 获取设备配置
*/
public IotModbusDeviceConfigRespDTO getConfig(Long deviceId) {
return configCache.get(deviceId);
}
/**
* 移除设备配置缓存(设备断连时调用)
*/
public void removeConfig(Long deviceId) {
configCache.remove(deviceId);
}
// ==================== Mock 数据 ====================
/**
* 构建 Mock 测试配置数据(一次性测试用途)
*
@@ -123,50 +202,4 @@ public class IotModbusTcpSlaveConfigCacheService {
return Collections.singletonList(config);
}
/**
* 获取设备配置
*/
public IotModbusDeviceConfigRespDTO getConfig(Long deviceId) {
return configCache.get(deviceId);
}
// TODO @AI这个逻辑是不是非必须
/**
* 通过 clientId + username + password 查找设备配置(认证用)
* 暂通过遍历缓存实现,后续可优化为索引
*/
public IotModbusDeviceConfigRespDTO findConfigByAuth(String clientId, String username, String password) {
// TODO @芋艿:测试完成后移除 mock 逻辑,改为正式查找
// Mock通过 clientId格式 productKey.deviceName匹配缓存中的设备
if (clientId != null && clientId.contains(".")) {
String[] parts = clientId.split("\\.", 2);
String productKey = parts[0];
String deviceName = parts[1];
for (IotModbusDeviceConfigRespDTO config : configCache.values()) {
if (productKey.equals(config.getProductKey()) && deviceName.equals(config.getDeviceName())) {
return config;
}
}
}
return null;
}
/**
* 清理已删除设备的资源
*/
public void cleanupRemovedDevices(List<IotModbusDeviceConfigRespDTO> currentConfigs, Consumer<Long> cleanupAction) {
Set<Long> currentDeviceIds = convertSet(currentConfigs, IotModbusDeviceConfigRespDTO::getDeviceId);
Set<Long> removedDeviceIds = new HashSet<>(knownDeviceIds);
removedDeviceIds.removeAll(currentDeviceIds);
for (Long deviceId : removedDeviceIds) {
log.info("[cleanupRemovedDevices][清理已删除设备: {}]", deviceId);
configCache.remove(deviceId);
cleanupAction.accept(deviceId);
}
knownDeviceIds.clear();
knownDeviceIds.addAll(currentDeviceIds);
}
}

View File

@@ -4,9 +4,11 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -33,6 +35,7 @@ public class IotModbusTcpSlaveConnectionManager {
* 连接信息
*/
@Data
@Accessors(chain = true)
public static class ConnectionInfo {
/**
@@ -57,12 +60,6 @@ public class IotModbusTcpSlaveConnectionManager {
*/
private IotModbusFrameFormatEnum frameFormat;
// TODO @AImode 是否非必须?!
/**
* 模式1-云端轮询 2-主动上报
*/
private Integer mode;
}
/**
@@ -75,17 +72,6 @@ public class IotModbusTcpSlaveConnectionManager {
info.getDeviceId(), socket.remoteAddress());
}
// TODO @芋艿:待定是不是要保留?!
/**
* 设置连接的帧格式(首帧检测后调用)
*/
public void setFrameFormat(NetSocket socket, IotModbusFrameFormatEnum frameFormat) {
ConnectionInfo info = connectionMap.get(socket);
if (info != null) {
info.setFrameFormat(frameFormat);
}
}
/**
* 获取连接信息
*/
@@ -101,12 +87,11 @@ public class IotModbusTcpSlaveConnectionManager {
return socket != null ? connectionMap.get(socket) : null;
}
// TODO @AI不用判断连接是否认证
/**
* 判断连接是否已认证
* 获取所有已连接设备的 ID 集合
*/
public boolean isAuthenticated(NetSocket socket) {
return connectionMap.containsKey(socket);
public Set<Long> getConnectedDeviceIds() {
return deviceSocketMap.keySet();
}
/**
@@ -130,8 +115,7 @@ public class IotModbusTcpSlaveConnectionManager {
log.warn("[sendToDevice][设备 {} 没有连接]", deviceId);
return;
}
// TODO @AI直接复用 sendToSocket 方法?!
socket.write(Buffer.buffer(data));
sendToSocket(socket, data);
}
/**
@@ -141,7 +125,6 @@ public class IotModbusTcpSlaveConnectionManager {
socket.write(Buffer.buffer(data));
}
// TODO @AI貌似别的都没这个是不是可以去掉哈
/**
* 关闭所有连接
*/

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import lombok.AllArgsConstructor;
@@ -7,6 +8,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -66,26 +68,24 @@ public class IotModbusTcpSlavePendingRequestManager {
public PendingRequest matchResponse(Long deviceId, IotModbusFrame frame,
IotModbusFrameFormatEnum frameFormat) {
Deque<PendingRequest> queue = pendingRequests.get(deviceId);
// TODO @AICollUtil.isEmpty(queue)
if (queue == null || queue.isEmpty()) {
if (CollUtil.isEmpty(queue)) {
return null;
}
if (frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP && frame.getTransactionId() != null) {
// TCP 模式:按 transactionId 精确匹配
if (frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP && frame.getTransactionId() != null) {
return matchByTransactionId(queue, frame.getTransactionId());
} else {
}
// RTU 模式FIFO匹配 slaveId + functionCode
return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode());
}
}
/**
* 按 transactionId 匹配
*/
private PendingRequest matchByTransactionId(Deque<PendingRequest> queue, int transactionId) {
// TODO @AI需要兼容 jdk8
for (var it = queue.iterator(); it.hasNext(); ) {
Iterator<PendingRequest> it = queue.iterator();
while (it.hasNext()) {
PendingRequest req = it.next();
if (req.getTransactionId() != null && req.getTransactionId() == transactionId) {
it.remove();
@@ -99,8 +99,8 @@ public class IotModbusTcpSlavePendingRequestManager {
* 按 FIFO 匹配
*/
private PendingRequest matchByFifo(Deque<PendingRequest> queue, int slaveId, int functionCode) {
// TODO @AI需要兼容 jdk8
for (var it = queue.iterator(); it.hasNext(); ) {
Iterator<PendingRequest> it = queue.iterator();
while (it.hasNext()) {
PendingRequest req = it.next();
if (req.getSlaveId() == slaveId && req.getFunctionCode() == functionCode) {
it.remove();
@@ -120,13 +120,11 @@ public class IotModbusTcpSlavePendingRequestManager {
int removed = 0;
while (!queue.isEmpty()) {
PendingRequest req = queue.peekFirst();
// TODO @AIif return 减少括号层级;
if (req != null && req.getExpireAt() < now) {
queue.pollFirst();
removed++;
} else {
if (req == null || req.getExpireAt() >= now) {
break; // 队列有序,后面的没过期
}
queue.pollFirst();
removed++;
}
if (removed > 0) {
log.debug("[cleanupExpired][设备 {} 清理了 {} 个过期请求]", entry.getKey(), removed);

View File

@@ -19,12 +19,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
// TODO @AI和 IotModbusTcpPollScheduler 很像,是不是可以做一些复用?
/**
* IoT Modbus TCP Slave 轮询调度器
* <p>
* 管理点位的轮询定时器,为 mode=1云端轮询的设备调度读取任务。
* 与 tcpmaster 不同,这里不直接通过 j2mod 读取,而是:
* 管理点位的轮询定时器,为云端轮询模式的设备调度读取任务。
* 与 tcpmaster 的 {@code IotModbusTcpPollScheduler} 不同,这里不通过 j2mod 直接读取,而是:
* 1. 编码 Modbus 读请求帧
* 2. 通过 ConnectionManager 发送到设备的 TCP 连接
* 3. 将请求注册到 PendingRequestManager等待设备响应
@@ -154,13 +153,15 @@ public class IotModbusTcpSlavePollScheduler {
}
// 2.1 确定帧格式和事务 ID
// TODO @AI不允许为空
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
if (frameFormat == null) {
frameFormat = IotModbusFrameFormatEnum.MODBUS_TCP;
log.warn("[pollPoint][设备 {} 帧格式为空,跳过轮询]", deviceId);
return;
}
// TODO @AItransactionId 需要根据设备来么?然后递增也根据 IotModbusFrameFormatEnum.MODBUS_TCP 提前判断;
int transactionId = transactionIdCounter.incrementAndGet() & 0xFFFF;
// TODO @AI是不是得按照设备递增?
Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP
? (transactionIdCounter.incrementAndGet() & 0xFFFF)
: null;
int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1;
// 2.2 编码读请求
byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(),
@@ -170,7 +171,7 @@ public class IotModbusTcpSlavePollScheduler {
deviceId, point.getId(), point.getIdentifier(),
slaveId, point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount(),
frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP ? transactionId : null,
transactionId,
System.currentTimeMillis() + requestTimeout);
pendingRequestManager.addRequest(pendingRequest);