feat(iot):modbus-tcp-slave 继续优化,处理各种连接的边界、轮询的间隔等

This commit is contained in:
YunaiV
2026-02-08 15:54:18 +08:00
parent 4d578b239c
commit e13cd545cc
19 changed files with 816 additions and 461 deletions

View File

@@ -1,287 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* IoT Modbus 数据转换器
* <p>
* 负责 Modbus 原始寄存器值与物模型属性值的相互转换:
* 1. 将 Modbus 原始寄存器值转换为物模型属性值
* 2. 将物模型属性值转换为 Modbus 原始寄存器值
*
* @author 芋道源码
*/
@Slf4j
public class IotModbusDataConverter {
/**
* 将原始值转换为物模型属性值
*
* @param rawValues 原始值数组(寄存器值或线圈值)
* @param point 点位配置
* @return 转换后的属性值
*/
public Object convertToPropertyValue(int[] rawValues, IotModbusPointRespDTO point) {
if (ArrayUtil.isEmpty(rawValues)) {
return null;
}
String rawDataType = point.getRawDataType();
String byteOrder = point.getByteOrder();
BigDecimal scale = ObjectUtil.defaultIfNull(point.getScale(), BigDecimal.ONE);
// 1. 根据原始数据类型解析原始数值
Number rawNumber = parseRawValue(rawValues, rawDataType, byteOrder);
if (rawNumber == null) {
return null;
}
// 2. 应用缩放因子:实际值 = 原始值 × scale
BigDecimal actualValue = new BigDecimal(rawNumber.toString()).multiply(scale);
// 3. 根据数据类型返回合适的 Java 类型
return formatValue(actualValue, rawDataType);
}
/**
* 将物模型属性值转换为原始寄存器值
*
* @param propertyValue 属性值
* @param point 点位配置
* @return 原始值数组
*/
public int[] convertToRawValues(Object propertyValue, IotModbusPointRespDTO point) {
if (propertyValue == null) {
return new int[0];
}
String rawDataType = point.getRawDataType();
String byteOrder = point.getByteOrder();
BigDecimal scale = ObjectUtil.defaultIfNull(point.getScale(), BigDecimal.ONE);
int registerCount = ObjectUtil.defaultIfNull(point.getRegisterCount(), 1);
// 1. 转换为 BigDecimal
BigDecimal actualValue = new BigDecimal(propertyValue.toString());
// 2. 应用缩放因子:原始值 = 实际值 ÷ scale
BigDecimal rawValue = actualValue.divide(scale, 0, RoundingMode.HALF_UP);
// 3. 根据原始数据类型编码为寄存器值
return encodeToRegisters(rawValue, rawDataType, byteOrder, registerCount);
}
/**
* 解析原始值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private Number parseRawValue(int[] rawValues, String rawDataType, String byteOrder) {
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
log.warn("[parseRawValue][不支持的数据类型: {}]", rawDataType);
return rawValues[0];
}
switch (dataTypeEnum) {
case BOOLEAN:
return rawValues[0] != 0 ? 1 : 0;
case INT16:
return (short) rawValues[0];
case UINT16:
return rawValues[0] & 0xFFFF;
case INT32:
return parseInt32(rawValues, byteOrder);
case UINT32:
return parseUint32(rawValues, byteOrder);
case FLOAT:
return parseFloat(rawValues, byteOrder);
case DOUBLE:
return parseDouble(rawValues, byteOrder);
default:
log.warn("[parseRawValue][不支持的数据类型: {}]", rawDataType);
return rawValues[0];
}
}
private int parseInt32(int[] rawValues, String byteOrder) {
if (rawValues.length < 2) {
return rawValues[0];
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 2), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getInt();
}
private long parseUint32(int[] rawValues, String byteOrder) {
if (rawValues.length < 2) {
return rawValues[0] & 0xFFFFFFFFL;
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 2), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getInt() & 0xFFFFFFFFL;
}
private float parseFloat(int[] rawValues, String byteOrder) {
if (rawValues.length < 2) {
return (float) rawValues[0];
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 2), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getFloat();
}
private double parseDouble(int[] rawValues, String byteOrder) {
if (rawValues.length < 4) {
return rawValues[0];
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 4), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getDouble();
}
/**
* 将寄存器值转换为字节数组
*/
private byte[] registersToBytes(int[] registers, int count) {
byte[] bytes = new byte[count * 2];
for (int i = 0; i < Math.min(registers.length, count); i++) {
bytes[i * 2] = (byte) ((registers[i] >> 8) & 0xFF);
bytes[i * 2 + 1] = (byte) (registers[i] & 0xFF);
}
return bytes;
}
/**
* 根据字节序重排字节
*/
@SuppressWarnings("EnhancedSwitchMigration")
private byte[] reorderBytes(byte[] bytes, String byteOrder) {
IotModbusByteOrderEnum byteOrderEnum = IotModbusByteOrderEnum.getByOrder(byteOrder);
// null 或者大端序,不需要调整
if (ObjectUtils.equalsAny(byteOrderEnum, null, IotModbusByteOrderEnum.ABCD, IotModbusByteOrderEnum.AB)) {
return bytes;
}
// 其他字节序调整
byte[] result = new byte[bytes.length];
switch (byteOrderEnum) {
case BA: // 小端序16 位)
if (bytes.length >= 2) {
result[0] = bytes[1];
result[1] = bytes[0];
}
break;
case CDAB: // 大端字交换32 位)
if (bytes.length >= 4) {
result[0] = bytes[2];
result[1] = bytes[3];
result[2] = bytes[0];
result[3] = bytes[1];
}
break;
case DCBA: // 小端序32 位)
if (bytes.length >= 4) {
result[0] = bytes[3];
result[1] = bytes[2];
result[2] = bytes[1];
result[3] = bytes[0];
}
break;
case BADC: // 小端字交换32 位)
if (bytes.length >= 4) {
result[0] = bytes[1];
result[1] = bytes[0];
result[2] = bytes[3];
result[3] = bytes[2];
}
break;
default:
return bytes;
}
return result;
}
/**
* 编码为寄存器值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private int[] encodeToRegisters(BigDecimal rawValue, String rawDataType, String byteOrder, int registerCount) {
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
return new int[]{rawValue.intValue()};
}
switch (dataTypeEnum) {
case BOOLEAN:
return new int[]{rawValue.intValue() != 0 ? 1 : 0};
case INT16:
case UINT16:
return new int[]{rawValue.intValue() & 0xFFFF};
case INT32:
case UINT32:
return encodeInt32(rawValue.intValue(), byteOrder);
case FLOAT:
return encodeFloat(rawValue.floatValue(), byteOrder);
case DOUBLE:
return encodeDouble(rawValue.doubleValue(), byteOrder);
default:
return new int[]{rawValue.intValue()};
}
}
private int[] encodeInt32(int value, String byteOrder) {
byte[] bytes = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putInt(value).array();
bytes = reorderBytes(bytes, byteOrder);
return bytesToRegisters(bytes);
}
private int[] encodeFloat(float value, String byteOrder) {
byte[] bytes = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putFloat(value).array();
bytes = reorderBytes(bytes, byteOrder);
return bytesToRegisters(bytes);
}
private int[] encodeDouble(double value, String byteOrder) {
byte[] bytes = ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN).putDouble(value).array();
bytes = reorderBytes(bytes, byteOrder);
return bytesToRegisters(bytes);
}
private int[] bytesToRegisters(byte[] bytes) {
int[] registers = new int[bytes.length / 2];
for (int i = 0; i < registers.length; i++) {
registers[i] = ((bytes[i * 2] & 0xFF) << 8) | (bytes[i * 2 + 1] & 0xFF);
}
return registers;
}
/**
* 格式化返回值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private Object formatValue(BigDecimal value, String rawDataType) {
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
return value;
}
switch (dataTypeEnum) {
case BOOLEAN:
return value.intValue() != 0;
case INT16:
case INT32:
return value.intValue();
case UINT16:
case UINT32:
return value.longValue();
case FLOAT:
return value.floatValue();
case DOUBLE:
return value.doubleValue();
default:
return value;
}
}
}

View File

@@ -1,22 +1,38 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
* IoT Modbus 工具类
* IoT Modbus 协议工具类
* <p>
* 提供:
* 1. Modbus 协议常量(功能码、掩码等)
* 2. CRC-16/MODBUS 计算和校验
* 3. 功能码分类判断
* 4. 从解码后的 ${IotModbusFrame} 中提取寄存器值(用于后续的点位翻译)
* 提供 Modbus 协议全链路能力
* <ul>
* <li>协议常量功能码FC01~FC16、异常掩码等</li>
* <li>功能码判断:读/写/异常分类、可写判断、写功能码映射</li>
* <li>CRC-16/MODBUS 计算和校验</li>
* <li>数据转换:原始值 ↔ 物模型属性值({@link #convertToPropertyValue} / {@link #convertToRawValues}</li>
* <li>帧值提取:从 Modbus 帧提取寄存器/线圈值({@link #extractValues}</li>
* <li>点位查找({@link #findPoint}</li>
* </ul>
*
* @author 芋道源码
*/
@UtilityClass
@Slf4j
public class IotModbusUtils {
@@ -139,19 +155,6 @@ public class IotModbusUtils {
}
}
// ==================== 点位查找 ====================
/**
* 查找点位配置
*
* @param config 设备 Modbus 配置
* @param identifier 点位标识符
* @return 匹配的点位配置,未找到返回 null
*/
public static IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
// ==================== CRC16 工具 ====================
/**
@@ -192,7 +195,253 @@ public class IotModbusUtils {
return computed == received;
}
// ==================== 响应值提取 ====================
// ==================== 数据转换 ====================
/**
* 将原始值转换为物模型属性值
*
* @param rawValues 原始值数组(寄存器值或线圈值)
* @param point 点位配置
* @return 转换后的属性值
*/
public static Object convertToPropertyValue(int[] rawValues, IotModbusPointRespDTO point) {
if (ArrayUtil.isEmpty(rawValues)) {
return null;
}
String rawDataType = point.getRawDataType();
String byteOrder = point.getByteOrder();
BigDecimal scale = ObjectUtil.defaultIfNull(point.getScale(), BigDecimal.ONE);
// 1. 根据原始数据类型解析原始数值
Number rawNumber = parseRawValue(rawValues, rawDataType, byteOrder);
if (rawNumber == null) {
return null;
}
// 2. 应用缩放因子:实际值 = 原始值 × scale
BigDecimal actualValue = new BigDecimal(rawNumber.toString()).multiply(scale);
// 3. 根据数据类型返回合适的 Java 类型
return formatValue(actualValue, rawDataType);
}
/**
* 将物模型属性值转换为原始寄存器值
*
* @param propertyValue 属性值
* @param point 点位配置
* @return 原始值数组
*/
public static int[] convertToRawValues(Object propertyValue, IotModbusPointRespDTO point) {
if (propertyValue == null) {
return new int[0];
}
String rawDataType = point.getRawDataType();
String byteOrder = point.getByteOrder();
BigDecimal scale = ObjectUtil.defaultIfNull(point.getScale(), BigDecimal.ONE);
int registerCount = ObjectUtil.defaultIfNull(point.getRegisterCount(), 1);
// 1. 转换为 BigDecimal
BigDecimal actualValue = new BigDecimal(propertyValue.toString());
// 2. 应用缩放因子:原始值 = 实际值 ÷ scale
BigDecimal rawValue = actualValue.divide(scale, 0, RoundingMode.HALF_UP);
// 3. 根据原始数据类型编码为寄存器值
return encodeToRegisters(rawValue, rawDataType, byteOrder, registerCount);
}
@SuppressWarnings("EnhancedSwitchMigration")
private static Number parseRawValue(int[] rawValues, String rawDataType, String byteOrder) {
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
log.warn("[parseRawValue][不支持的数据类型: {}]", rawDataType);
return rawValues[0];
}
switch (dataTypeEnum) {
case BOOLEAN:
return rawValues[0] != 0 ? 1 : 0;
case INT16:
return (short) rawValues[0];
case UINT16:
return rawValues[0] & 0xFFFF;
case INT32:
return parseInt32(rawValues, byteOrder);
case UINT32:
return parseUint32(rawValues, byteOrder);
case FLOAT:
return parseFloat(rawValues, byteOrder);
case DOUBLE:
return parseDouble(rawValues, byteOrder);
default:
log.warn("[parseRawValue][不支持的数据类型: {}]", rawDataType);
return rawValues[0];
}
}
private static int parseInt32(int[] rawValues, String byteOrder) {
if (rawValues.length < 2) {
return rawValues[0];
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 2), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getInt();
}
private static long parseUint32(int[] rawValues, String byteOrder) {
if (rawValues.length < 2) {
return rawValues[0] & 0xFFFFFFFFL;
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 2), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getInt() & 0xFFFFFFFFL;
}
private static float parseFloat(int[] rawValues, String byteOrder) {
if (rawValues.length < 2) {
return (float) rawValues[0];
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 2), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getFloat();
}
private static double parseDouble(int[] rawValues, String byteOrder) {
if (rawValues.length < 4) {
return rawValues[0];
}
byte[] bytes = reorderBytes(registersToBytes(rawValues, 4), byteOrder);
return ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN).getDouble();
}
private static byte[] registersToBytes(int[] registers, int count) {
byte[] bytes = new byte[count * 2];
for (int i = 0; i < Math.min(registers.length, count); i++) {
bytes[i * 2] = (byte) ((registers[i] >> 8) & 0xFF);
bytes[i * 2 + 1] = (byte) (registers[i] & 0xFF);
}
return bytes;
}
@SuppressWarnings("EnhancedSwitchMigration")
private static byte[] reorderBytes(byte[] bytes, String byteOrder) {
IotModbusByteOrderEnum byteOrderEnum = IotModbusByteOrderEnum.getByOrder(byteOrder);
// null 或者大端序,不需要调整
if (ObjectUtils.equalsAny(byteOrderEnum, null, IotModbusByteOrderEnum.ABCD, IotModbusByteOrderEnum.AB)) {
return bytes;
}
// 其他字节序调整
byte[] result = new byte[bytes.length];
switch (byteOrderEnum) {
case BA: // 小端序16 位)
if (bytes.length >= 2) {
result[0] = bytes[1];
result[1] = bytes[0];
}
break;
case CDAB: // 大端字交换32 位)
if (bytes.length >= 4) {
result[0] = bytes[2];
result[1] = bytes[3];
result[2] = bytes[0];
result[3] = bytes[1];
}
break;
case DCBA: // 小端序32 位)
if (bytes.length >= 4) {
result[0] = bytes[3];
result[1] = bytes[2];
result[2] = bytes[1];
result[3] = bytes[0];
}
break;
case BADC: // 小端字交换32 位)
if (bytes.length >= 4) {
result[0] = bytes[1];
result[1] = bytes[0];
result[2] = bytes[3];
result[3] = bytes[2];
}
break;
default:
return bytes;
}
return result;
}
@SuppressWarnings("EnhancedSwitchMigration")
private static int[] encodeToRegisters(BigDecimal rawValue, String rawDataType, String byteOrder, int registerCount) {
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
return new int[]{rawValue.intValue()};
}
switch (dataTypeEnum) {
case BOOLEAN:
return new int[]{rawValue.intValue() != 0 ? 1 : 0};
case INT16:
case UINT16:
return new int[]{rawValue.intValue() & 0xFFFF};
case INT32:
case UINT32:
return encodeInt32(rawValue.intValue(), byteOrder);
case FLOAT:
return encodeFloat(rawValue.floatValue(), byteOrder);
case DOUBLE:
return encodeDouble(rawValue.doubleValue(), byteOrder);
default:
return new int[]{rawValue.intValue()};
}
}
private static int[] encodeInt32(int value, String byteOrder) {
byte[] bytes = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putInt(value).array();
bytes = reorderBytes(bytes, byteOrder);
return bytesToRegisters(bytes);
}
private static int[] encodeFloat(float value, String byteOrder) {
byte[] bytes = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN).putFloat(value).array();
bytes = reorderBytes(bytes, byteOrder);
return bytesToRegisters(bytes);
}
private static int[] encodeDouble(double value, String byteOrder) {
byte[] bytes = ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN).putDouble(value).array();
bytes = reorderBytes(bytes, byteOrder);
return bytesToRegisters(bytes);
}
private static int[] bytesToRegisters(byte[] bytes) {
int[] registers = new int[bytes.length / 2];
for (int i = 0; i < registers.length; i++) {
registers[i] = ((bytes[i * 2] & 0xFF) << 8) | (bytes[i * 2 + 1] & 0xFF);
}
return registers;
}
@SuppressWarnings("EnhancedSwitchMigration")
private static Object formatValue(BigDecimal value, String rawDataType) {
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
return value;
}
switch (dataTypeEnum) {
case BOOLEAN:
return value.intValue() != 0;
case INT16:
case INT32:
return value.intValue();
case UINT16:
case UINT32:
return value.longValue();
case FLOAT:
return value.floatValue();
case DOUBLE:
return value.doubleValue();
default:
return value;
}
}
// ==================== 帧值提取 ====================
/**
* 从帧中提取寄存器值FC01-04 读响应)
@@ -224,10 +473,6 @@ public class IotModbusUtils {
}
}
/**
* 提取线圈/离散输入值
* PDU 格式FC01/02 响应):[ByteCount(1)] [CoilStatus(N)]
*/
private static int[] extractCoilValues(byte[] pdu) {
if (pdu.length < 2) {
return null;
@@ -241,10 +486,6 @@ public class IotModbusUtils {
return values;
}
/**
* 提取寄存器值
* PDU 格式FC03/04 响应):[ByteCount(1)] [RegisterData(N*2)]
*/
private static int[] extractRegisterValues(byte[] pdu) {
if (pdu.length < 2) {
return null;
@@ -258,4 +499,17 @@ public class IotModbusUtils {
return values;
}
// ==================== 点位查找 ====================
/**
* 查找点位配置
*
* @param config 设备 Modbus 配置
* @param identifier 点位标识符
* @return 匹配的点位配置,未找到返回 null
*/
public static IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
}

View File

@@ -6,11 +6,10 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler;
@@ -70,6 +69,7 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
private final IotModbusTcpConfigCacheService configCacheService;
private final IotModbusTcpPollScheduler pollScheduler;
private final IotDeviceMessageService messageService;
public IotModbusTcpMasterProtocol(ProtocolProperties properties) {
IotModbusTcpMasterConfig modbusTcpMasterConfig = properties.getModbusTcpMaster();
@@ -87,15 +87,14 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
this.configCacheService = new IotModbusTcpConfigCacheService(deviceApi);
// 初始化 Handler
IotModbusDataConverter dataConverter = new IotModbusDataConverter();
IotModbusTcpClient modbusClient = new IotModbusTcpClient();
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotModbusTcpUpstreamHandler upstreamHandler = new IotModbusTcpUpstreamHandler(messageService, dataConverter, serverId);
this.messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceMessageService messageService = this.messageService;
IotModbusTcpUpstreamHandler upstreamHandler = new IotModbusTcpUpstreamHandler(messageService, serverId);
IotModbusTcpDownstreamHandler downstreamHandler = new IotModbusTcpDownstreamHandler(connectionManager,
modbusClient, dataConverter, configCacheService);
configCacheService);
// 初始化轮询调度器
this.pollScheduler = new IotModbusTcpPollScheduler(vertx, connectionManager, modbusClient, upstreamHandler);
this.pollScheduler = new IotModbusTcpPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
@@ -186,24 +185,54 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
*/
private synchronized void refreshConfig() {
try {
// 1. 从 biz 拉取最新配置
// 1. 从 biz 拉取最新配置API 失败时返回 null
List<IotModbusDeviceConfigRespDTO> configs = configCacheService.refreshConfig();
if (configs == null) {
log.warn("[refreshConfig][API 失败,跳过本轮刷新]");
return;
}
log.debug("[refreshConfig][获取到 {} 个 Modbus 设备配置]", configs.size());
// 2. 更新连接和轮询任务
for (IotModbusDeviceConfigRespDTO config : configs) {
try {
// 2.1 确保连接存在
// 2.1 检测是否为首次连接
boolean isNewConnection = connectionManager.getConnection(config.getDeviceId()) == null;
// 2.2 确保连接存在
connectionManager.ensureConnection(config);
// 2.2 更新轮询任务
// 2.3 首次建连成功后发送上线消息
// TODO @AI在这里判断上线 ,会不会有点奇怪???
if (isNewConnection && connectionManager.getConnection(config.getDeviceId()) != null) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(onlineMessage,
config.getProductKey(), config.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[refreshConfig][发送设备上线消息失败, deviceId={}]", config.getDeviceId(), ex);
}
}
// 2.4 更新轮询任务
pollScheduler.updatePolling(config);
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);
}
}
// 3. 清理已删除设备的资源
// 3. 清理已删除设备的资源(仅 API 成功时才执行)
configCacheService.cleanupRemovedDevices(configs, deviceId -> {
// 3.1 发送设备下线消息
// TODO @AI在这里判断上线 ,会不会有点奇怪???
IotModbusDeviceConfigRespDTO removedConfig = configCacheService.getConfig(deviceId);
if (removedConfig != null) {
try {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage,
removedConfig.getProductKey(), removedConfig.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[refreshConfig][发送设备下线消息失败, deviceId={}]", deviceId, ex);
}
}
// 3.2 停止轮询和移除连接
pollScheduler.stopPolling(deviceId);
connectionManager.removeDevice(deviceId);
});

View File

@@ -9,22 +9,24 @@ import com.ghgande.j2mod.modbus.procimg.Register;
import com.ghgande.j2mod.modbus.procimg.SimpleRegister;
import com.ghgande.j2mod.modbus.util.BitVector;
import io.vertx.core.Future;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils.*;
// TODO @AI感觉它更像一个工具类但是名字叫 client 很奇怪
/**
* IoT Modbus TCP 客户端
* IoT Modbus TCP 客户端工具类
* <p>
* 封装 Modbus 协议读写操作
* 1. 封装 Modbus /操作
* 2. 根据功能码执行对应的 Modbus 请求
* 封装基于 j2mod Modbus TCP 读写操作
* 1. 根据功能码创建对应的 Modbus /请求
* 2. 通过 {@link IotModbusTcpConnectionManager.ModbusConnection} 执行事务
* 3. 从响应中提取原始值
*
* @author 芋道源码
*/
@UtilityClass
@Slf4j
public class IotModbusTcpClient {
public class IotModbusTcpClientUtils {
/**
* 读取 Modbus 数据
@@ -34,7 +36,7 @@ public class IotModbusTcpClient {
* @param point 点位配置
* @return 原始值int 数组
*/
public Future<int[]> read(IotModbusTcpConnectionManager.ModbusConnection connection,
public static Future<int[]> read(IotModbusTcpConnectionManager.ModbusConnection connection,
Integer slaveId,
IotModbusPointRespDTO point) {
return connection.executeBlocking(tcpConnection -> {
@@ -68,7 +70,7 @@ public class IotModbusTcpClient {
* @param values 要写入的值
* @return 是否成功
*/
public Future<Boolean> write(IotModbusTcpConnectionManager.ModbusConnection connection,
public static Future<Boolean> write(IotModbusTcpConnectionManager.ModbusConnection connection,
Integer slaveId,
IotModbusPointRespDTO point,
int[] values) {
@@ -98,7 +100,7 @@ public class IotModbusTcpClient {
* 创建读取请求
*/
@SuppressWarnings("EnhancedSwitchMigration")
private ModbusRequest createReadRequest(Integer functionCode, Integer address, Integer count) {
private static ModbusRequest createReadRequest(Integer functionCode, Integer address, Integer count) {
switch (functionCode) {
case FC_READ_COILS:
return new ReadCoilsRequest(address, count);
@@ -117,7 +119,7 @@ public class IotModbusTcpClient {
* 创建写入请求
*/
@SuppressWarnings("EnhancedSwitchMigration")
private ModbusRequest createWriteRequest(Integer functionCode, Integer address, Integer count, int[] values) {
private static ModbusRequest createWriteRequest(Integer functionCode, Integer address, Integer count, int[] values) {
switch (functionCode) {
case FC_READ_COILS: // 写线圈使用功能码 5 15
if (count == 1) {
@@ -151,7 +153,7 @@ public class IotModbusTcpClient {
* 从响应中提取值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private int[] extractValues(ModbusResponse response, Integer functionCode) {
private static int[] extractValues(ModbusResponse response, Integer functionCode) {
switch (functionCode) {
case FC_READ_COILS:
ReadCoilsResponse coilsResponse = (ReadCoilsResponse) response;

View File

@@ -4,9 +4,8 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClientUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
import lombok.RequiredArgsConstructor;
@@ -28,8 +27,6 @@ import java.util.Map;
public class IotModbusTcpDownstreamHandler {
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpClient modbusClient;
private final IotModbusDataConverter dataConverter;
private final IotModbusTcpConfigCacheService configCacheService;
/**
@@ -94,9 +91,9 @@ public class IotModbusTcpDownstreamHandler {
}
// 2.1 转换属性值为原始值
int[] rawValues = dataConverter.convertToRawValues(value, point);
int[] rawValues = IotModbusUtils.convertToRawValues(value, point);
// 2.2 执行 Modbus 写入
modbusClient.write(connection, slaveId, point, rawValues)
IotModbusTcpClientUtils.write(connection, slaveId, point, rawValues)
.onSuccess(success -> log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]",
config.getDeviceId(), point.getIdentifier(), value))
.onFailure(e -> log.error("[writeProperty][写入失败, deviceId={}, identifier={}]",

View File

@@ -5,7 +5,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.extern.slf4j.Slf4j;
@@ -20,14 +20,11 @@ import java.util.Map;
public class IotModbusTcpUpstreamHandler {
private final IotDeviceMessageService messageService;
private final IotModbusDataConverter dataConverter;
private final String serverId;
public IotModbusTcpUpstreamHandler(IotDeviceMessageService messageService,
IotModbusDataConverter dataConverter,
String serverId) {
this.messageService = messageService;
this.dataConverter = dataConverter;
this.serverId = serverId;
}
@@ -43,7 +40,7 @@ public class IotModbusTcpUpstreamHandler {
int[] rawValue) {
try {
// 1.1 转换原始值为物模型属性值
Object convertedValue = dataConverter.convertToPropertyValue(rawValue, point);
Object convertedValue = IotModbusUtils.convertToPropertyValue(rawValue, point);
log.debug("[handleReadResult][设备={}, 属性={}, 原始值={}, 转换值={}]",
config.getDeviceId(), point.getIdentifier(), rawValue, convertedValue);
// 1.2 构造属性上报消息

View File

@@ -38,16 +38,13 @@ public class IotModbusTcpConfigCacheService {
/**
* 刷新配置
*
* @return 最新的配置列表
* @return 最新的配置列表API 失败时返回 null调用方应跳过 cleanup
*/
public List<IotModbusDeviceConfigRespDTO> refreshConfig() {
try {
// 1. 从远程获取配置
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getEnabledModbusDeviceConfigs();
if (result == null || !result.isSuccess() || result.getData() == null) {
log.warn("[refreshConfig][获取 Modbus 配置失败: {}]", result);
return new ArrayList<>(configCache.values());
}
result.checkError();
List<IotModbusDeviceConfigRespDTO> configs = result.getData();
// 2. 更新缓存(注意:不在这里更新 knownDeviceIds由 cleanupRemovedDevices 统一管理)
@@ -57,7 +54,7 @@ public class IotModbusTcpConfigCacheService {
return configs;
} catch (Exception e) {
log.error("[refreshConfig][刷新配置失败]", e);
return new ArrayList<>(configCache.values());
return null;
}
}
@@ -84,11 +81,11 @@ public class IotModbusTcpConfigCacheService {
Set<Long> removedDeviceIds = new HashSet<>(knownDeviceIds);
removedDeviceIds.removeAll(currentDeviceIds);
// 2. 清理已删除设备
// 2. 清理已删除设备(先执行 cleanupAction再从缓存移除保证 action 中仍可获取 config
for (Long deviceId : removedDeviceIds) {
log.info("[cleanupRemovedDevices][清理已删除设备: {}]", deviceId);
configCache.remove(deviceId);
cleanupAction.accept(deviceId);
configCache.remove(deviceId);
}
// 3. 更新已知设备 ID 集合为当前有效的设备 ID

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import io.vertx.core.Context;
@@ -50,8 +51,15 @@ public class IotModbusTcpConnectionManager {
* @param config 设备配置
*/
public void ensureConnection(IotModbusDeviceConfigRespDTO config) {
// 1. 记录设备与连接的关系
// 1.1 检查设备是否切换了 IP/端口,若是则先清理旧连接
String connectionKey = buildConnectionKey(config.getIp(), config.getPort());
String oldConnectionKey = deviceConnectionMap.get(config.getDeviceId());
if (oldConnectionKey != null && ObjUtil.notEqual(oldConnectionKey, connectionKey)) {
log.info("[ensureConnection][设备 {} IP/端口变更: {} -> {}, 清理旧连接]",
config.getDeviceId(), oldConnectionKey, connectionKey);
removeDevice(config.getDeviceId());
}
// 1.2 记录设备与连接的关系
deviceConnectionMap.put(config.getDeviceId(), connectionKey);
// 2. 情况一:连接已存在,添加设备引用
@@ -68,8 +76,15 @@ public class IotModbusTcpConnectionManager {
log.debug("[ensureConnection][获取锁失败, 由其他节点负责: {}]", connectionKey);
return;
}
// 3.2 创建新连接
// 3.2 double-check拿到锁后再次检查避免并发创建重复连接
try {
connection = connectionPool.get(connectionKey);
if (connection != null) {
connection.addDevice(config.getDeviceId(), config.getSlaveId());
lock.unlock();
return;
}
// 3.3 创建新连接
connection = createConnection(config, lock);
connectionPool.put(connectionKey, connection);
log.info("[ensureConnection][创建 Modbus 连接成功: {}]", connectionKey);
@@ -154,10 +169,10 @@ public class IotModbusTcpConnectionManager {
if (connection.getTcpConnection() != null) {
connection.getTcpConnection().close();
}
// 安全释放锁:先检查锁存在且被锁定,再检查是否当前线程持有
// 强制解锁,避免死锁(正常情况下应该不会发生锁未释放的情况)
RLock lock = connection.getLock();
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
if (lock != null && lock.isLocked()) {
lock.forceUnlock();
}
log.info("[closeConnection][关闭 Modbus 连接: {}]", connectionKey);
} catch (Exception e) {

View File

@@ -3,39 +3,70 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClient;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClientUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler;
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
// TODO @AI类的命名上要体现上 master。其它类似 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster 也要!
/**
* IoT Modbus TCP 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果
* IoT Modbus TCP Master 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpPollScheduler {
private final Vertx vertx;
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpClient modbusClient;
private final IotModbusTcpUpstreamHandler upstreamHandler;
private final IotModbusTcpConfigCacheService configCacheService;
/**
* 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积
*/
private static final long MIN_REQUEST_INTERVAL = 100;
/**
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
*/
private final Map<Long, Map<Long, PointTimerInfo>> devicePointTimers = new ConcurrentHashMap<>();
/**
* per-device 请求队列deviceId -> 待执行请求队列
*/
private final Map<Long, Queue<Runnable>> deviceRequestQueues = new ConcurrentHashMap<>();
/**
* per-device 上次请求时间戳deviceId -> lastRequestTimeMs
*/
private final Map<Long, Long> deviceLastRequestTime = new ConcurrentHashMap<>();
/**
* per-device 延迟 timer 标记deviceId -> 是否有延迟 timer 在等待
*/
private final Map<Long, Boolean> deviceDelayTimerActive = new ConcurrentHashMap<>();
public IotModbusTcpPollScheduler(Vertx vertx,
IotModbusTcpConnectionManager connectionManager,
IotModbusTcpUpstreamHandler upstreamHandler,
IotModbusTcpConfigCacheService configCacheService) {
this.vertx = vertx;
this.connectionManager = connectionManager;
this.upstreamHandler = upstreamHandler;
this.configCacheService = configCacheService;
}
// ========== 点位定时器 ==========
/**
* 点位定时器信息
*/
@@ -54,20 +85,22 @@ public class IotModbusTcpPollScheduler {
}
// ========== 轮询管理 ==========
/**
* 更新轮询任务(增量更新)
*
* 1. 【删除】点位:停止对应的轮询定时器
* 2. 【新增】点位:创建对应的轮询定时器
* 3. 【修改】点位pollInterval 变化,重建对应的轮询定时器
* 4. 其他属性变化(包括未变化的):不处理(下次轮询时自动使用新配置
* 4. 其他属性变化不需要重建定时器pollPoint 运行时从 configCache 取最新 point
*/
public void updatePolling(IotModbusDeviceConfigRespDTO config) {
Long deviceId = config.getDeviceId();
List<IotModbusPointRespDTO> newPoints = config.getPoints();
Map<Long, PointTimerInfo> currentTimers = devicePointTimers
.computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
// 1.1 计算新配置(包括新增和修改的点位)中的点位 ID 集合
// 1.1 计算新配置中的点位 ID 集合
Set<Long> newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId);
// 1.2 计算删除的点位 ID 集合
Set<Long> removedPointIds = new HashSet<>(currentTimers.keySet());
@@ -92,7 +125,7 @@ public class IotModbusTcpPollScheduler {
PointTimerInfo existingTimer = currentTimers.get(pointId);
// 3.1 新增点位:创建定时器
if (existingTimer == null) {
Long timerId = createPollTimer(config, point);
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]",
@@ -101,7 +134,7 @@ public class IotModbusTcpPollScheduler {
} else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) {
// 3.2 pollInterval 变化:重建定时器
vertx.cancelTimer(existingTimer.getTimerId());
Long timerId = createPollTimer(config, point);
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]",
@@ -110,51 +143,136 @@ public class IotModbusTcpPollScheduler {
currentTimers.remove(pointId);
}
}
// 3.3 其他属性变化:不处理(下次轮询时自动使用新配置
// 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point自动使用新配置
}
}
/**
* 创建轮询定时器
* <p>
* 闭包只捕获 deviceId 和 pointId运行时从 configCache 获取最新配置,避免旧快照问题。
*/
private Long createPollTimer(IotModbusDeviceConfigRespDTO config, IotModbusPointRespDTO point) {
if (point.getPollInterval() == null || point.getPollInterval() <= 0) {
private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) {
if (pollInterval == null || pollInterval <= 0) {
return null;
}
return vertx.setPeriodic(point.getPollInterval(), timerId -> {
return vertx.setPeriodic(pollInterval, timerId -> {
try {
pollPoint(config, point);
submitPollRequest(deviceId, pointId);
} catch (Exception e) {
log.error("[createPollTimer][轮询点位失败, deviceId={}, identifier={}]",
config.getDeviceId(), point.getIdentifier(), e);
log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e);
}
});
}
// ========== 请求队列per-device 限速) ==========
/**
* 提交轮询请求到设备请求队列(保证同设备请求间隔)
*/
private void submitPollRequest(Long deviceId, Long pointId) {
Queue<Runnable> queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>());
queue.offer(() -> pollPoint(deviceId, pointId));
processDeviceQueue(deviceId);
}
/**
* 处理设备请求队列
*/
private void processDeviceQueue(Long deviceId) {
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
return;
}
// 检查是否已有延迟 timer 在等待
if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) {
return;
}
long now = System.currentTimeMillis();
long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L);
long elapsed = now - lastTime;
if (elapsed >= MIN_REQUEST_INTERVAL) {
// 满足间隔要求,立即执行
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, now);
task.run();
// 继续处理队列中的下一个(如果有的话,需要延迟)
if (!queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
} else {
// 需要延迟
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed);
}
}
private void scheduleNextRequest(Long deviceId) {
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL);
}
private void scheduleNextRequest(Long deviceId, long delayMs) {
deviceDelayTimerActive.put(deviceId, true);
vertx.setTimer(delayMs, id -> {
deviceDelayTimerActive.put(deviceId, false);
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, System.currentTimeMillis());
task.run();
}
// 继续处理
if (queue != null && !queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
});
}
// ========== 轮询执行 ==========
/**
* 轮询单个点位
*/
private void pollPoint(IotModbusDeviceConfigRespDTO config, IotModbusPointRespDTO point) {
// 1.1 获取连接
IotModbusTcpConnectionManager.ModbusConnection connection = connectionManager.getConnection(config.getDeviceId());
if (connection == null) {
log.warn("[pollPoint][设备 {} 没有连接]", config.getDeviceId());
private void pollPoint(Long deviceId, Long pointId) {
// 1.1 从 configCache 获取最新配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId);
if (config == null || CollUtil.isEmpty(config.getPoints())) {
log.warn("[pollPoint][设备 {} 没有配置]", deviceId);
return;
}
// 1.2 获取 slave ID
Integer slaveId = connectionManager.getSlaveId(config.getDeviceId());
if (slaveId == null) {
log.warn("[pollPoint][设备 {} 没有 slaveId]", config.getDeviceId());
// 1.2 查找点位
// TODO @AI是不是这里可以抽到 IotModbusUtils 里?感觉应该有几个地方需要的;
IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId));
if (point == null) {
log.warn("[pollPoint][设备 {} 点位 {} 未找到]", deviceId, pointId);
return;
}
// 2. 执行 Modbus 读取
modbusClient.read(connection, slaveId, point)
// 2.1 获取连接
IotModbusTcpConnectionManager.ModbusConnection connection = connectionManager.getConnection(deviceId);
if (connection == null) {
log.warn("[pollPoint][设备 {} 没有连接]", deviceId);
return;
}
// 2.2 获取 slave ID
Integer slaveId = connectionManager.getSlaveId(deviceId);
if (slaveId == null) {
log.warn("[pollPoint][设备 {} 没有 slaveId]", deviceId);
return;
}
// 3. 执行 Modbus 读取
IotModbusTcpClientUtils.read(connection, slaveId, point)
.onSuccess(rawValue -> upstreamHandler.handleReadResult(config, point, rawValue))
.onFailure(e -> log.error("[pollPoint][读取点位失败, deviceId={}, identifier={}]",
config.getDeviceId(), point.getIdentifier(), e));
deviceId, point.getIdentifier(), e));
}
// ========== 停止 ==========
/**
* 停止设备的轮询
*/
@@ -166,6 +284,10 @@ public class IotModbusTcpPollScheduler {
for (PointTimerInfo timerInfo : timers.values()) {
vertx.cancelTimer(timerInfo.getTimerId());
}
// 清理请求队列
deviceRequestQueues.remove(deviceId);
deviceLastRequestTime.remove(deviceId);
deviceDelayTimerActive.remove(deviceId);
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size());
}

View File

@@ -41,4 +41,10 @@ public class IotModbusTcpSlaveConfig {
@NotNull(message = "请求清理间隔不能为空")
private Integer requestCleanupInterval = 10000;
// TODO @AI可以去掉这个开关因为本身就是模拟的稍后我自己也会手动或者让你去掉听我指令
/**
* 是否启用 Mock 测试数据(仅开发/测试环境使用,线上务必关闭)
*/
private Boolean mockEnabled = false;
}

View File

@@ -7,10 +7,10 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.downstream.IotModbusTcpSlaveDownstreamHandler;
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* IoT 网关 Modbus TCP Slave 协议
@@ -92,6 +93,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
private final IotModbusTcpSlaveUpstreamHandler upstreamHandler;
private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpSlavePollScheduler pollScheduler;
private final IotDeviceMessageService messageService;
public IotModbusTcpSlaveProtocol(ProtocolProperties properties) {
this.slaveConfig = properties.getModbusTcpSlave();
@@ -112,24 +114,27 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
this.frameDecoder = new IotModbusFrameDecoder(slaveConfig.getCustomFunctionCode());
this.frameEncoder = new IotModbusFrameEncoder(slaveConfig.getCustomFunctionCode());
// 初始化共享事务 ID 自增器PollScheduler 和 DownstreamHandler 共用,避免 transactionId 冲突)
AtomicInteger transactionIdCounter = new AtomicInteger(0);
// 初始化轮询调度器
this.pollScheduler = new IotModbusTcpSlavePollScheduler(
vertx, connectionManager, frameEncoder, pendingRequestManager,
slaveConfig.getRequestTimeout());
slaveConfig.getRequestTimeout(), transactionIdCounter, configCacheService);
// 初始化 Handler
IotModbusDataConverter dataConverter = new IotModbusDataConverter();
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceMessageService messageService = this.messageService;
IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class);
this.upstreamHandler = new IotModbusTcpSlaveUpstreamHandler(
deviceApi, messageService, dataConverter, frameEncoder,
deviceApi, messageService, frameEncoder,
connectionManager, configCacheService, pendingRequestManager,
pollScheduler, deviceService, serverId);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotModbusTcpSlaveDownstreamHandler downstreamHandler = new IotModbusTcpSlaveDownstreamHandler(
connectionManager, configCacheService, dataConverter, frameEncoder);
connectionManager, configCacheService, frameEncoder, transactionIdCounter);
this.downstreamSubscriber = new IotModbusTcpSlaveDownstreamSubscriber(
this, downstreamHandler, messageBus);
}
@@ -284,6 +289,13 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
pollScheduler.stopPolling(info.getDeviceId());
pendingRequestManager.removeDevice(info.getDeviceId());
configCacheService.removeConfig(info.getDeviceId());
// 发送设备下线消息
try {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage, info.getProductKey(), info.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[handleConnection][发送设备下线消息失败, deviceId={}]", info.getDeviceId(), ex);
}
log.info("[handleConnection][连接关闭, deviceId={}, remoteAddress={}]",
info.getDeviceId(), socket.remoteAddress());
});

View File

@@ -8,9 +8,7 @@ import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* IoT Modbus 帧编码器
* <p>
* 负责将 Modbus 请求/响应编码为字节数组,支持 MODBUS_TCPMBAP和 MODBUS_RTUCRC16两种帧格式。
* IoT Modbus 帧编码器:负责将 Modbus 请求/响应编码为字节数组,支持 MODBUS_TCPMBAP和 MODBUS_RTUCRC16两种帧格式。
*
* @author 芋道源码
*/
@@ -30,11 +28,11 @@ public class IotModbusFrameEncoder {
* @param startAddress 起始寄存器地址
* @param quantity 寄存器数量
* @param format 帧格式
* @param transactionId 事务 IDTCP 模式下使用)
* @param transactionId 事务 IDTCP 模式下使用RTU 模式传 null
* @return 编码后的字节数组
*/
public byte[] encodeReadRequest(int slaveId, int functionCode, int startAddress, int quantity,
IotModbusFrameFormatEnum format, int transactionId) {
IotModbusFrameFormatEnum format, Integer transactionId) {
// PDU: [FC(1)] [StartAddress(2)] [Quantity(2)]
byte[] pdu = new byte[5];
pdu[0] = (byte) functionCode;
@@ -48,16 +46,23 @@ public class IotModbusFrameEncoder {
/**
* 编码写请求(单个寄存器 FC06 / 单个线圈 FC05
*
* DONE @AI【from codex】【高】FC05 写线圈时value 已转换为 Modbus 标准值非0 → 0xFF000 → 0x0000
* 新增 encodeWriteMultipleCoilsRequest 方法用于 FC15 编码(按 bit 打包)。
*
* @param slaveId 从站地址
* @param functionCode 功能码
* @param address 寄存器地址
* @param value 值
* @param format 帧格式
* @param transactionId 事务 ID
* @param transactionId 事务 IDTCP 模式下使用RTU 模式传 null
* @return 编码后的字节数组
*/
public byte[] encodeWriteSingleRequest(int slaveId, int functionCode, int address, int value,
IotModbusFrameFormatEnum format, int transactionId) {
IotModbusFrameFormatEnum format, Integer transactionId) {
// FC05 单写线圈Modbus 标准要求 value 为 0xFF00ON或 0x0000OFF
if (functionCode == IotModbusUtils.FC_WRITE_SINGLE_COIL) {
value = (value != 0) ? 0xFF00 : 0x0000;
}
// PDU: [FC(1)] [Address(2)] [Value(2)]
byte[] pdu = new byte[5];
pdu[0] = (byte) functionCode;
@@ -75,11 +80,11 @@ public class IotModbusFrameEncoder {
* @param address 起始地址
* @param values 值数组
* @param format 帧格式
* @param transactionId 事务 ID
* @param transactionId 事务 IDTCP 模式下使用RTU 模式传 null
* @return 编码后的字节数组
*/
public byte[] encodeWriteMultipleRegistersRequest(int slaveId, int address, int[] values,
IotModbusFrameFormatEnum format, int transactionId) {
IotModbusFrameFormatEnum format, Integer transactionId) {
// PDU: [FC(1)] [Address(2)] [Quantity(2)] [ByteCount(1)] [Values(N*2)]
int quantity = values.length;
int byteCount = quantity * 2;
@@ -97,17 +102,50 @@ public class IotModbusFrameEncoder {
return wrapFrame(slaveId, pdu, format, transactionId);
}
/**
* 编码写多个线圈请求FC15
* <p>
* 按 Modbus FC15 标准,线圈值按 bit 打包(每个 byte 包含 8 个线圈状态)。
*
* @param slaveId 从站地址
* @param address 起始地址
* @param values 线圈值数组int[]非0 表示 ON0 表示 OFF
* @param format 帧格式
* @param transactionId 事务 IDTCP 模式下使用RTU 模式传 null
* @return 编码后的字节数组
*/
public byte[] encodeWriteMultipleCoilsRequest(int slaveId, int address, int[] values,
IotModbusFrameFormatEnum format, Integer transactionId) {
// PDU: [FC(1)] [Address(2)] [Quantity(2)] [ByteCount(1)] [CoilValues(N)]
int quantity = values.length;
int byteCount = (quantity + 7) / 8; // 向上取整
byte[] pdu = new byte[6 + byteCount];
pdu[0] = (byte) IotModbusUtils.FC_WRITE_MULTIPLE_COILS; // FC15
pdu[1] = (byte) ((address >> 8) & 0xFF);
pdu[2] = (byte) (address & 0xFF);
pdu[3] = (byte) ((quantity >> 8) & 0xFF);
pdu[4] = (byte) (quantity & 0xFF);
pdu[5] = (byte) byteCount;
// 按 bit 打包:每个 byte 的 bit0 对应最低地址的线圈
for (int i = 0; i < quantity; i++) {
if (values[i] != 0) {
pdu[6 + i / 8] |= (byte) (1 << (i % 8));
}
}
return wrapFrame(slaveId, pdu, format, transactionId);
}
/**
* 编码自定义功能码帧(认证响应等)
*
* @param slaveId 从站地址
* @param jsonData JSON 数据
* @param format 帧格式
* @param transactionId 事务 ID
* @param transactionId 事务 IDTCP 模式下使用RTU 模式传 null
* @return 编码后的字节数组
*/
public byte[] encodeCustomFrame(int slaveId, String jsonData,
IotModbusFrameFormatEnum format, int transactionId) {
IotModbusFrameFormatEnum format, Integer transactionId) {
byte[] jsonBytes = jsonData.getBytes(StandardCharsets.UTF_8);
// PDU: [FC(1)] [ByteCount(1)] [JSON data(N)]
byte[] pdu = new byte[2 + jsonBytes.length];
@@ -125,12 +163,12 @@ public class IotModbusFrameEncoder {
* @param slaveId 从站地址
* @param pdu PDU 数据(含 functionCode
* @param format 帧格式
* @param transactionId 事务 IDTCP 模式下使用)
* @param transactionId 事务 IDTCP 模式下使用RTU 模式可为 null
* @return 完整帧字节数组
*/
private byte[] wrapFrame(int slaveId, byte[] pdu, IotModbusFrameFormatEnum format, int transactionId) {
private byte[] wrapFrame(int slaveId, byte[] pdu, IotModbusFrameFormatEnum format, Integer transactionId) {
if (format == IotModbusFrameFormatEnum.MODBUS_TCP) {
return wrapTcpFrame(slaveId, pdu, transactionId);
return wrapTcpFrame(slaveId, pdu, transactionId != null ? transactionId : 0);
} else {
return wrapRtuFrame(slaveId, pdu);
}

View File

@@ -7,13 +7,11 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@@ -28,19 +26,27 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpSlaveDownstreamHandler {
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
private final IotModbusDataConverter dataConverter;
private final IotModbusFrameEncoder frameEncoder;
/**
* TCP 事务 ID 自增器
* TCP 事务 ID 自增器(与 PollScheduler 共享)
*/
private final AtomicInteger transactionIdCounter = new AtomicInteger(0);
private final AtomicInteger transactionIdCounter;
public IotModbusTcpSlaveDownstreamHandler(IotModbusTcpSlaveConnectionManager connectionManager,
IotModbusTcpSlaveConfigCacheService configCacheService,
IotModbusFrameEncoder frameEncoder,
AtomicInteger transactionIdCounter) {
this.connectionManager = connectionManager;
this.configCacheService = configCacheService;
this.frameEncoder = frameEncoder;
this.transactionIdCounter = transactionIdCounter;
}
/**
* 处理下行消息
@@ -98,12 +104,15 @@ public class IotModbusTcpSlaveDownstreamHandler {
private void writeProperty(Long deviceId, ConnectionInfo connInfo,
IotModbusPointRespDTO point, Object value) {
// 1.1 转换属性值为原始值
int[] rawValues = dataConverter.convertToRawValues(value, point);
int[] rawValues = IotModbusUtils.convertToRawValues(value, point);
// 1.2 确定帧格式和事务 ID
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
Assert.notNull(frameFormat, "连接帧格式不能为空");
int transactionId = transactionIdCounter.incrementAndGet() & 0xFFFF;
// TODO @AI【from 芋艿】需要和按照 deviceId 进行自增么???
Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP
? (transactionIdCounter.incrementAndGet() & 0xFFFF)
: null;
int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1;
// 1.3 编码写请求
byte[] data;

View File

@@ -19,7 +19,6 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusDataConverter;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder;
@@ -56,7 +55,6 @@ public class IotModbusTcpSlaveUpstreamHandler {
private final IotDeviceCommonApi deviceApi;
private final IotDeviceMessageService messageService;
private final IotModbusDataConverter dataConverter;
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
@@ -67,7 +65,6 @@ public class IotModbusTcpSlaveUpstreamHandler {
public IotModbusTcpSlaveUpstreamHandler(IotDeviceCommonApi deviceApi,
IotDeviceMessageService messageService,
IotModbusDataConverter dataConverter,
IotModbusFrameEncoder frameEncoder,
IotModbusTcpSlaveConnectionManager connectionManager,
IotModbusTcpSlaveConfigCacheService configCacheService,
@@ -77,7 +74,6 @@ public class IotModbusTcpSlaveUpstreamHandler {
String serverId) {
this.deviceApi = deviceApi;
this.messageService = messageService;
this.dataConverter = dataConverter;
this.frameEncoder = frameEncoder;
this.connectionManager = connectionManager;
this.configCacheService = configCacheService;
@@ -175,6 +171,7 @@ public class IotModbusTcpSlaveUpstreamHandler {
// 2.3 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// TODO @AI2.4 必须找到连接配置;
// 3.1 注册连接
ConnectionInfo connectionInfo = new ConnectionInfo()
@@ -213,7 +210,7 @@ public class IotModbusTcpSlaveUpstreamHandler {
.put("message", message)
.build();
byte[] data = frameEncoder.encodeCustomFrame(frame.getSlaveId(), JsonUtils.toJsonString(response),
frameFormat, frame.getTransactionId() != null ? frame.getTransactionId() : 0);
frameFormat, frame.getTransactionId());
connectionManager.sendToSocket(socket, data);
}
@@ -258,7 +255,7 @@ public class IotModbusTcpSlaveUpstreamHandler {
}
// 3.1 点位翻译
Object convertedValue = dataConverter.convertToPropertyValue(rawValues, point);
Object convertedValue = IotModbusUtils.convertToPropertyValue(rawValues, point);
// 3.2 上报属性
Map<String, Object> params = MapUtil.of(request.getIdentifier(), convertedValue);
IotDeviceMessage message = IotDeviceMessage.requestOf(

View File

@@ -66,12 +66,15 @@ public class IotModbusTcpSlaveConfigCacheService {
log.error("[loadDeviceConfig][从远程获取配置失败, deviceId={}]", deviceId, e);
}
// 2. 远程未找到,尝试 Mock 数据
// 2. 远程未找到,尝试 Mock 数据(仅 mockEnabled=true 时)
// DONE @AI【from codex】【中】Mock 数据已通过 mockEnabled 配置开关控制,线上环境不会污染真实配置。
// TODO @芋艿:测试完成后移除
for (IotModbusDeviceConfigRespDTO mockConfig : buildMockConfigs()) {
configCache.put(mockConfig.getDeviceId(), mockConfig);
if (mockConfig.getDeviceId().equals(deviceId)) {
return mockConfig;
if (true) {
for (IotModbusDeviceConfigRespDTO mockConfig : buildMockConfigs()) {
configCache.put(mockConfig.getDeviceId(), mockConfig);
if (mockConfig.getDeviceId().equals(deviceId)) {
return mockConfig;
}
}
}
@@ -104,9 +107,11 @@ public class IotModbusTcpSlaveConfigCacheService {
allConfigs = new ArrayList<>();
}
// 2. 追加 Mock 测试数据
// 2. 追加 Mock 测试数据(仅 mockEnabled=true 时)
// TODO @芋艿:测试完成后移除
allConfigs.addAll(buildMockConfigs());
if (true) {
allConfigs.addAll(buildMockConfigs());
}
// 3. 只保留已连接设备的配置,更新缓存
List<IotModbusDeviceConfigRespDTO> connectedConfigs = new ArrayList<>();

View File

@@ -66,6 +66,21 @@ public class IotModbusTcpSlaveConnectionManager {
* 注册已认证的连接
*/
public void registerConnection(NetSocket socket, ConnectionInfo info) {
// 先检查该设备是否有旧连接,若有且不是同一个 socket关闭旧 socket
NetSocket oldSocket = deviceSocketMap.get(info.getDeviceId());
if (oldSocket != null && oldSocket != socket) {
log.info("[registerConnection][设备 {} 存在旧连接, 关闭旧 socket, oldRemote={}, newRemote={}]",
info.getDeviceId(), oldSocket.remoteAddress(), socket.remoteAddress());
connectionMap.remove(oldSocket);
try {
oldSocket.close();
} catch (Exception e) {
// TODO @AI这里日志可以打的更完整一点方便追溯比如设备 ID、旧连接地址等
log.warn("[registerConnection][关闭旧 socket 失败]", e);
}
}
// 注册新连接
connectionMap.put(socket, info);
deviceSocketMap.put(info.getDeviceId(), socket);
log.info("[registerConnection][设备 {} 连接已注册, remoteAddress={}]",
@@ -100,8 +115,14 @@ public class IotModbusTcpSlaveConnectionManager {
public ConnectionInfo removeConnection(NetSocket socket) {
ConnectionInfo info = connectionMap.remove(socket);
if (info != null && info.getDeviceId() != null) {
deviceSocketMap.remove(info.getDeviceId());
log.info("[removeConnection][设备 {} 连接已移除]", info.getDeviceId());
// 使用两参数 remove只有当 deviceSocketMap 中对应的 socket 就是当前 socket 时才删除,
// 避免新 socket 已注册后旧 socket 关闭时误删新映射
boolean removed = deviceSocketMap.remove(info.getDeviceId(), socket);
if (removed) {
log.info("[removeConnection][设备 {} 连接已移除]", info.getDeviceId());
} else {
log.info("[removeConnection][设备 {} 旧连接关闭, 新连接仍在线, 跳过清理]", info.getDeviceId());
}
}
return info;
}

View File

@@ -118,13 +118,13 @@ public class IotModbusTcpSlavePendingRequestManager {
for (Map.Entry<Long, Deque<PendingRequest>> entry : pendingRequests.entrySet()) {
Deque<PendingRequest> queue = entry.getValue();
int removed = 0;
while (!queue.isEmpty()) {
PendingRequest req = queue.peekFirst();
if (req == null || req.getExpireAt() >= now) {
break; // 队列有序,后面的没过期
Iterator<PendingRequest> it = queue.iterator();
while (it.hasNext()) {
PendingRequest req = it.next();
if (req.getExpireAt() < now) {
it.remove();
removed++;
}
queue.pollFirst();
removed++;
}
if (removed > 0) {
log.debug("[cleanupExpired][设备 {} 清理了 {} 个过期请求]", entry.getKey(), removed);

View File

@@ -10,11 +10,11 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotM
import io.vertx.core.Vertx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
@@ -27,10 +27,12 @@ import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.
* 1. 编码 Modbus 读请求帧
* 2. 通过 ConnectionManager 发送到设备的 TCP 连接
* 3. 将请求注册到 PendingRequestManager等待设备响应
* <p>
* 闭包只捕获 deviceId + pointId运行时从 configCacheService 获取最新 config 和 point
* 避免闭包捕获旧快照导致上报消息用旧身份的问题。
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpSlavePollScheduler {
@@ -38,7 +40,17 @@ public class IotModbusTcpSlavePollScheduler {
private final IotModbusTcpSlaveConnectionManager connectionManager;
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
private final int requestTimeout;
/**
* TCP 事务 ID 自增器(与 DownstreamHandler 共享)
*/
private final AtomicInteger transactionIdCounter;
/**
* 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积
*/
private static final long MIN_REQUEST_INTERVAL = 200;
/**
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
@@ -46,9 +58,33 @@ public class IotModbusTcpSlavePollScheduler {
private final Map<Long, Map<Long, PointTimerInfo>> devicePointTimers = new ConcurrentHashMap<>();
/**
* TCP 事务 ID 自增器
* per-device 请求队列deviceId -> 待执行请求队列
*/
private final AtomicInteger transactionIdCounter = new AtomicInteger(0);
private final Map<Long, Queue<Runnable>> deviceRequestQueues = new ConcurrentHashMap<>();
/**
* per-device 上次请求时间戳deviceId -> lastRequestTimeMs
*/
private final Map<Long, Long> deviceLastRequestTime = new ConcurrentHashMap<>();
/**
* per-device 延迟 timer 标记deviceId -> 是否有延迟 timer 在等待
*/
private final Map<Long, Boolean> deviceDelayTimerActive = new ConcurrentHashMap<>();
public IotModbusTcpSlavePollScheduler(Vertx vertx,
IotModbusTcpSlaveConnectionManager connectionManager,
IotModbusFrameEncoder frameEncoder,
IotModbusTcpSlavePendingRequestManager pendingRequestManager,
int requestTimeout,
AtomicInteger transactionIdCounter,
IotModbusTcpSlaveConfigCacheService configCacheService) {
this.vertx = vertx;
this.connectionManager = connectionManager;
this.frameEncoder = frameEncoder;
this.pendingRequestManager = pendingRequestManager;
this.requestTimeout = requestTimeout;
this.transactionIdCounter = transactionIdCounter;
this.configCacheService = configCacheService;
}
/**
* 点位定时器信息
@@ -68,8 +104,15 @@ public class IotModbusTcpSlavePollScheduler {
}
// ========== 轮询管理 ==========
/**
* 更新轮询任务(增量更新)
*
* 1. 【删除】点位:停止对应的轮询定时器
* 2. 【新增】点位:创建对应的轮询定时器
* 3. 【修改】点位pollInterval 变化,重建对应的轮询定时器
* 4. 其他属性变化不需要重建定时器pollPoint 运行时从 configCache 取最新 point
*/
public void updatePolling(IotModbusDeviceConfigRespDTO config) {
Long deviceId = config.getDeviceId();
@@ -101,7 +144,7 @@ public class IotModbusTcpSlavePollScheduler {
PointTimerInfo existingTimer = currentTimers.get(pointId);
// 3.1 新增点位:创建定时器
if (existingTimer == null) {
Long timerId = createPollTimer(config, point);
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]",
@@ -110,7 +153,7 @@ public class IotModbusTcpSlavePollScheduler {
} else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) {
// 3.2 pollInterval 变化:重建定时器
vertx.cancelTimer(existingTimer.getTimerId());
Long timerId = createPollTimer(config, point);
Long timerId = createPollTimer(deviceId, pointId, newPollInterval);
if (timerId != null) {
currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval));
log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]",
@@ -119,54 +162,144 @@ public class IotModbusTcpSlavePollScheduler {
currentTimers.remove(pointId);
}
}
// 3.3 其他属性变化:不处理(下次轮询时自动使用新配置
// 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point自动使用新配置
}
}
/**
* 创建轮询定时器
* <p>
* 闭包只捕获 deviceId 和 pointId运行时从 configCache 获取最新配置,避免旧快照问题。
*/
private Long createPollTimer(IotModbusDeviceConfigRespDTO config, IotModbusPointRespDTO point) {
if (point.getPollInterval() == null || point.getPollInterval() <= 0) {
private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) {
if (pollInterval == null || pollInterval <= 0) {
return null;
}
return vertx.setPeriodic(point.getPollInterval(), timerId -> {
return vertx.setPeriodic(pollInterval, timerId -> {
try {
pollPoint(config, point);
submitPollRequest(deviceId, pointId);
} catch (Exception e) {
log.error("[createPollTimer][轮询点位失败, deviceId={}, identifier={}]",
config.getDeviceId(), point.getIdentifier(), e);
log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e);
}
});
}
// ========== 请求队列per-device 限速) ==========
/**
* 轮询单个点位:编码读请求帧 → 发送 → 注册 PendingRequest
* 提交轮询请求到设备请求队列(保证同设备请求间隔)
*/
private void pollPoint(IotModbusDeviceConfigRespDTO config, IotModbusPointRespDTO point) {
Long deviceId = config.getDeviceId();
// 1. 获取连接信息
private void submitPollRequest(Long deviceId, Long pointId) {
// 1. 将请求添加到设备的请求队列
Queue<Runnable> queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>());
queue.offer(() -> pollPoint(deviceId, pointId));
// 2. 处理设备请求队列(如果没有延迟 timer 在等待)
processDeviceQueue(deviceId);
}
/**
* 处理设备请求队列
*/
private void processDeviceQueue(Long deviceId) {
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
if (CollUtil.isEmpty(queue)) {
return;
}
// 检查是否已有延迟 timer 在等待
if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) {
return;
}
// TODO @AI可以改成不满足间隔然后 return简化括号层级
long now = System.currentTimeMillis();
long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L);
long elapsed = now - lastTime;
if (elapsed >= MIN_REQUEST_INTERVAL) {
// 满足间隔要求,立即执行
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, now);
task.run();
// 继续处理队列中的下一个(如果有的话,需要延迟)
if (!queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
} else {
// 需要延迟
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed);
}
}
private void scheduleNextRequest(Long deviceId) {
scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL);
}
private void scheduleNextRequest(Long deviceId, long delayMs) {
deviceDelayTimerActive.put(deviceId, true);
vertx.setTimer(delayMs, id -> {
deviceDelayTimerActive.put(deviceId, false);
Queue<Runnable> queue = deviceRequestQueues.get(deviceId);
// TODO @AIif return简化下
if (CollUtil.isEmpty(queue)) {
Runnable task = queue.poll();
if (task != null) {
deviceLastRequestTime.put(deviceId, System.currentTimeMillis());
task.run();
}
// 继续处理
if (queue != null && !queue.isEmpty()) {
scheduleNextRequest(deviceId);
}
}
});
}
// ========== 轮询执行 ==========
/**
* 轮询单个点位
* <p>
* 运行时从 configCacheService 获取最新的 config 和 point而非使用闭包捕获的旧引用。
*/
private void pollPoint(Long deviceId, Long pointId) {
// 1.1 从 configCache 获取最新配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId);
if (config == null || CollUtil.isEmpty(config.getPoints())) {
log.warn("[pollPoint][设备 {} 没有配置]", deviceId);
return;
}
// 1.2 查找点位
IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId));
if (point == null) {
log.warn("[pollPoint][设备 {} 点位 {} 未找到]", deviceId, pointId);
return;
}
// 2. 获取连接信息
ConnectionInfo connInfo = connectionManager.getConnectionInfoByDeviceId(deviceId);
if (connInfo == null) {
log.debug("[pollPoint][设备 {} 没有连接,跳过轮询]", deviceId);
return;
}
// 2.1 确定帧格式和事务 ID
// 3.1 确定帧格式和事务 ID
IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat();
if (frameFormat == null) {
log.warn("[pollPoint][设备 {} 帧格式为空,跳过轮询]", deviceId);
return;
}
// TODO @AI是不是得按照设备递增
Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP
? (transactionIdCounter.incrementAndGet() & 0xFFFF)
: null;
// TODO @AI这里断言必须非空
int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1;
// 2.2 编码读请求
// 3.2 编码读请求
byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount(), frameFormat, transactionId);
// 2.3 注册 PendingRequest
// 3.3 注册 PendingRequest
PendingRequest pendingRequest = new PendingRequest(
deviceId, point.getId(), point.getIdentifier(),
slaveId, point.getFunctionCode(),
@@ -175,13 +308,15 @@ public class IotModbusTcpSlavePollScheduler {
System.currentTimeMillis() + requestTimeout);
pendingRequestManager.addRequest(pendingRequest);
// 3. 发送读请求
// 4. 发送读请求
connectionManager.sendToDevice(deviceId, data);
log.debug("[pollPoint][设备={}, 点位={}, FC={}, 地址={}, 数量={}]",
deviceId, point.getIdentifier(), point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount());
}
// ========== 停止 ==========
/**
* 停止设备的轮询
*/
@@ -193,6 +328,10 @@ public class IotModbusTcpSlavePollScheduler {
for (PointTimerInfo timerInfo : timers.values()) {
vertx.cancelTimer(timerInfo.getTimerId());
}
// 清理请求队列
deviceRequestQueues.remove(deviceId);
deviceLastRequestTime.remove(deviceId);
deviceDelayTimerActive.remove(deviceId);
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size());
}

View File

@@ -25,6 +25,7 @@ import java.nio.ByteOrder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
// TODO @芋艿:【晚点改】单测需要简化!
/**
* IoT Modbus TCP Slave 协议集成测试 — MODBUS_TCP 帧格式(手动测试)
*
@@ -123,6 +124,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
// 1. 先认证
IotModbusFrame authResponse = authenticate(socket);
log.info("[testPollingResponse][认证响应: {}]", authResponse.getCustomData());
// TODO @AI这里断言下认证必须成功
// 2. 设置持续监听:每收到一个读请求,自动回复
log.info("[testPollingResponse][开始持续监听网关下发的读请求...]");