feat:【iot】modbus-tcp 协议接入:30% 初始化:基于 crystalline-giggling-whisper.md 规划

This commit is contained in:
YunaiV
2026-01-17 12:07:33 +08:00
parent cece79d04c
commit ed78834eaf
22 changed files with 155 additions and 275 deletions

View File

@@ -1,9 +1,7 @@
package cn.iocoder.yudao.module.iot.controller.admin.device;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
@@ -62,12 +60,4 @@ public class IotDeviceModbusConfigController {
return success(BeanUtils.toBean(modbusConfig, IotDeviceModbusConfigRespVO.class));
}
@GetMapping("/page")
@Operation(summary = "获得设备 Modbus 连接配置分页")
@PreAuthorize("@ss.hasPermission('iot:device-modbus-config:query')")
public CommonResult<PageResult<IotDeviceModbusConfigRespVO>> getDeviceModbusConfigPage(@Valid IotDeviceModbusConfigPageReqVO pageReqVO) {
PageResult<IotDeviceModbusConfigDO> pageResult = modbusConfigService.getDeviceModbusConfigPage(pageReqVO);
return success(BeanUtils.toBean(pageResult, IotDeviceModbusConfigRespVO.class));
}
}

View File

@@ -1,25 +0,0 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
// TODO @AI不需要分页接口
@Schema(description = "管理后台 - IoT 设备 Modbus 连接配置分页 Request VO")
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class IotDeviceModbusConfigPageReqVO extends PageParam {
@Schema(description = "设备编号", example = "1024")
private Long deviceId;
@Schema(description = "Modbus 服务器 IP 地址", example = "192.168.1.100")
private String ip;
@Schema(description = "状态", example = "0")
private Integer status;
}

View File

@@ -2,9 +2,9 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.device;
import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.enums.device.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotModbusRawDataTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

View File

@@ -1,9 +1,6 @@
package cn.iocoder.yudao.module.iot.dal.mysql.device;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import org.apache.ibatis.annotations.Mapper;
@@ -17,14 +14,6 @@ import java.util.List;
@Mapper
public interface IotDeviceModbusConfigMapper extends BaseMapperX<IotDeviceModbusConfigDO> {
default PageResult<IotDeviceModbusConfigDO> selectPage(IotDeviceModbusConfigPageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<IotDeviceModbusConfigDO>()
.eqIfPresent(IotDeviceModbusConfigDO::getDeviceId, reqVO.getDeviceId())
.likeIfPresent(IotDeviceModbusConfigDO::getIp, reqVO.getIp())
.eqIfPresent(IotDeviceModbusConfigDO::getStatus, reqVO.getStatus())
.orderByDesc(IotDeviceModbusConfigDO::getId));
}
default IotDeviceModbusConfigDO selectByDeviceId(Long deviceId) {
return selectOne(IotDeviceModbusConfigDO::getDeviceId, deviceId);
}

View File

@@ -1,7 +1,5 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import jakarta.validation.Valid;
@@ -45,14 +43,6 @@ public interface IotDeviceModbusConfigService {
*/
IotDeviceModbusConfigDO getDeviceModbusConfigByDeviceId(Long deviceId);
/**
* 获得设备 Modbus 连接配置分页
*
* @param pageReqVO 分页查询
* @return 设备 Modbus 连接配置分页
*/
PageResult<IotDeviceModbusConfigDO> getDeviceModbusConfigPage(IotDeviceModbusConfigPageReqVO pageReqVO);
/**
* 获得所有启用的 Modbus 连接配置列表
*

View File

@@ -1,9 +1,7 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceModbusConfigMapper;
@@ -72,11 +70,6 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe
return modbusConfigMapper.selectByDeviceId(deviceId);
}
@Override
public PageResult<IotDeviceModbusConfigDO> getDeviceModbusConfigPage(IotDeviceModbusConfigPageReqVO pageReqVO) {
return modbusConfigMapper.selectPage(pageReqVO);
}
@Override
public List<IotDeviceModbusConfigDO> getEnabledDeviceModbusConfigList() {
return modbusConfigMapper.selectListByStatus(CommonStatusEnum.ENABLE.getStatus());

View File

@@ -1,5 +1,8 @@
package cn.iocoder.yudao.module.iot.core.biz.dto;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import lombok.Data;
import java.math.BigDecimal;
@@ -27,11 +30,10 @@ public class IotModbusPointRespDTO {
// ========== Modbus 协议配置 ==========
// TODO @AI所有的枚举通过 @,不要写上去;
/**
* Modbus 功能码
*
* 1-ReadCoils 2-ReadDiscreteInputs 3-ReadHoldingRegisters 4-ReadInputRegisters
* 枚举 {@link IotModbusFunctionCodeEnum}
*/
private Integer functionCode;
/**
@@ -45,13 +47,13 @@ public class IotModbusPointRespDTO {
/**
* 字节序
*
* AB/BA/ABCD/CDAB/DCBA/BADC
* 枚举 {@link IotModbusByteOrderEnum}
*/
private String byteOrder;
/**
* 原始数据类型
*
* INT16/UINT16/INT32/UINT32/FLOAT/DOUBLE/BOOLEAN/STRING
* 枚举 {@link IotModbusRawDataTypeEnum}
*/
private String rawDataType;
/**
@@ -63,12 +65,4 @@ public class IotModbusPointRespDTO {
*/
private Integer pollInterval;
// ========== 物模型相关字段 ==========
// TODO @AI分析一下是否有必要返回
/**
* 数据类型(来自物模型)
*/
private String dataType;
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.enums.device;
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
@@ -6,7 +6,6 @@ import lombok.RequiredArgsConstructor;
import java.util.Arrays;
// TODO @AI如果枚举需要共享可以拿到 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums
/**
* IoT Modbus 字节序枚举
*
@@ -45,13 +44,6 @@ public enum IotModbusByteOrderEnum implements ArrayValuable<String> {
return ARRAYS;
}
// TODO @AI如果不需要可以删除掉这个方法
/**
* 根据字节序获取枚举
*
* @param order 字节序
* @return 枚举
*/
public static IotModbusByteOrderEnum getByOrder(String order) {
return Arrays.stream(values())
.filter(e -> e.getOrder().equals(order))

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.enums.device;
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
@@ -6,7 +6,6 @@ import lombok.RequiredArgsConstructor;
import java.util.Arrays;
// TODO @AI如果枚举需要共享可以拿到 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums
/**
* IoT Modbus 功能码枚举
*
@@ -51,12 +50,6 @@ public enum IotModbusFunctionCodeEnum implements ArrayValuable<Integer> {
return ARRAYS;
}
/**
* 根据功能码获取枚举
*
* @param code 功能码
* @return 枚举
*/
public static IotModbusFunctionCodeEnum valueOf(Integer code) {
return Arrays.stream(values())
.filter(e -> e.getCode().equals(code))

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.enums.device;
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
@@ -6,7 +6,6 @@ import lombok.RequiredArgsConstructor;
import java.util.Arrays;
// TODO @AI如果枚举需要共享可以拿到 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums
/**
* IoT Modbus 原始数据类型枚举
*
@@ -23,7 +22,7 @@ public enum IotModbusRawDataTypeEnum implements ArrayValuable<String> {
FLOAT("FLOAT", "32 位浮点数", 2),
DOUBLE("DOUBLE", "64 位浮点数", 4),
BOOLEAN("BOOLEAN", "布尔值(用于线圈)", 1),
STRING("STRING", "字符串", -1); // -1 表示可变长度
STRING("STRING", "字符串", null); // null 表示可变长度
public static final String[] ARRAYS = Arrays.stream(values())
.map(IotModbusRawDataTypeEnum::getType)
@@ -37,24 +36,16 @@ public enum IotModbusRawDataTypeEnum implements ArrayValuable<String> {
* 名称
*/
private final String name;
// TODO @AI去掉 default 会好点null 表示可变
/**
* 默认寄存器数量-1 表示可变
* 寄存器数量null 表示可变
*/
private final Integer defaultRegisterCount;
private final Integer registerCount;
@Override
public String[] array() {
return ARRAYS;
}
// TODO @AI如果不用可以删除掉
/**
* 根据类型获取枚举
*
* @param type 类型
* @return 枚举
*/
public static IotModbusRawDataTypeEnum getByType(String type) {
return Arrays.stream(values())
.filter(e -> e.getType().equals(type))

View File

@@ -520,7 +520,7 @@ public class IotGatewayProperties {
/**
* 配置刷新间隔(秒)
*/
// TODO @AI需要校验下非空
@NotNull(message = "配置刷新间隔不能为空")
private Integer configRefreshInterval = 30;
}

View File

@@ -2,7 +2,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusByteOrderEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusRawDataTypeEnum;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
@@ -79,21 +82,25 @@ public class IotModbusDataConverter {
*/
@SuppressWarnings("EnhancedSwitchMigration")
private Number parseRawValue(int[] rawValues, String rawDataType, String byteOrder) {
// TODO @AI是不是可以用枚举复用 IotModbusRawDataTypeEnum 里的;(保留现有实现,字符串比较已足够清晰)
switch (rawDataType.toUpperCase()) {
case "BOOLEAN":
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
log.warn("[parseRawValue][不支持的数据类型: {}]", rawDataType);
return rawValues[0];
}
switch (dataTypeEnum) {
case BOOLEAN:
return rawValues[0] != 0 ? 1 : 0;
case "INT16":
case INT16:
return (short) rawValues[0];
case "UINT16":
case UINT16:
return rawValues[0] & 0xFFFF;
case "INT32":
case INT32:
return parseInt32(rawValues, byteOrder);
case "UINT32":
case UINT32:
return parseUint32(rawValues, byteOrder);
case "FLOAT":
case FLOAT:
return parseFloat(rawValues, byteOrder);
case "DOUBLE":
case DOUBLE:
return parseDouble(rawValues, byteOrder);
default:
log.warn("[parseRawValue][不支持的数据类型: {}]", rawDataType);
@@ -148,24 +155,24 @@ public class IotModbusDataConverter {
/**
* 根据字节序重排字节
*/
@SuppressWarnings("EnhancedSwitchMigration")
private byte[] reorderBytes(byte[] bytes, String byteOrder) {
// 大端序,不需要调整
// TODO @AIStrUtil.equalsnull 要抛出异常;(保留 null 默认为大端序的兼容逻辑)
if (byteOrder == null || "ABCD".equals(byteOrder) || "AB".equals(byteOrder)) {
IotModbusByteOrderEnum byteOrderEnum = IotModbusByteOrderEnum.getByOrder(byteOrder);
// null 或者大端序,不需要调整
if (ObjectUtils.equalsAny(byteOrderEnum, null, IotModbusByteOrderEnum.ABCD, IotModbusByteOrderEnum.AB)) {
return bytes;
}
// 其他字节序调整
byte[] result = new byte[bytes.length];
switch (byteOrder.toUpperCase()) {
// TODO @AI走枚举sortOrder参考 IotModbusByteOrderEnum 枚举定义
case "BA": // 小端序16 位)
switch (byteOrderEnum) {
case BA: // 小端序16 位
if (bytes.length >= 2) {
result[0] = bytes[1];
result[1] = bytes[0];
}
break;
case "CDAB": // 大端字交换32 位)
case CDAB: // 大端字交换32 位)
if (bytes.length >= 4) {
result[0] = bytes[2];
result[1] = bytes[3];
@@ -173,7 +180,7 @@ public class IotModbusDataConverter {
result[3] = bytes[1];
}
break;
case "DCBA": // 小端序32 位)
case DCBA: // 小端序32 位)
if (bytes.length >= 4) {
result[0] = bytes[3];
result[1] = bytes[2];
@@ -181,7 +188,7 @@ public class IotModbusDataConverter {
result[3] = bytes[0];
}
break;
case "BADC": // 小端字交换32 位)
case BADC: // 小端字交换32 位)
if (bytes.length >= 4) {
result[0] = bytes[1];
result[1] = bytes[0];
@@ -198,20 +205,24 @@ public class IotModbusDataConverter {
/**
* 编码为寄存器值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private int[] encodeToRegisters(BigDecimal rawValue, String rawDataType, String byteOrder, int registerCount) {
// TODO @AI是不是可以用枚举复用 IotModbusRawDataTypeEnum 里的;(保留现有实现,字符串比较已足够清晰)
switch (rawDataType.toUpperCase()) {
case "BOOLEAN":
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
return new int[]{rawValue.intValue()};
}
switch (dataTypeEnum) {
case BOOLEAN:
return new int[]{rawValue.intValue() != 0 ? 1 : 0};
case "INT16":
case "UINT16":
case INT16:
case UINT16:
return new int[]{rawValue.intValue() & 0xFFFF};
case "INT32":
case "UINT32":
case INT32:
case UINT32:
return encodeInt32(rawValue.intValue(), byteOrder);
case "FLOAT":
case FLOAT:
return encodeFloat(rawValue.floatValue(), byteOrder);
case "DOUBLE":
case DOUBLE:
return encodeDouble(rawValue.doubleValue(), byteOrder);
default:
return new int[]{rawValue.intValue()};
@@ -247,20 +258,24 @@ public class IotModbusDataConverter {
/**
* 格式化返回值
*/
@SuppressWarnings("EnhancedSwitchMigration")
private Object formatValue(BigDecimal value, String rawDataType) {
// TODO @AI是不是可以用枚举复用 IotModbusRawDataTypeEnum 里的;(保留现有实现,字符串比较已足够清晰)
switch (rawDataType.toUpperCase()) {
case "BOOLEAN":
IotModbusRawDataTypeEnum dataTypeEnum = IotModbusRawDataTypeEnum.getByType(rawDataType);
if (dataTypeEnum == null) {
return value;
}
switch (dataTypeEnum) {
case BOOLEAN:
return value.intValue() != 0;
case "INT16":
case "INT32":
case INT16:
case INT32:
return value.intValue();
case "UINT16":
case "UINT32":
case UINT16:
case UINT32:
return value.longValue();
case "FLOAT":
case FLOAT:
return value.floatValue();
case "DOUBLE":
case DOUBLE:
return value.doubleValue();
default:
return value;

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction;
import com.ghgande.j2mod.modbus.msg.*;
import com.ghgande.j2mod.modbus.procimg.InputRegister;
@@ -91,16 +92,20 @@ public class IotModbusTcpClient {
/**
* 创建读取请求
*/
@SuppressWarnings("EnhancedSwitchMigration")
private ModbusRequest createReadRequest(Integer functionCode, Integer address, Integer count) {
// TODO @AI1、2、3、4 能不能有枚举哈?这样 1、2、3、4 那的注释就不用写;
switch (functionCode) {
case 1:
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
if (functionCodeEnum == null) {
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
}
switch (functionCodeEnum) {
case READ_COILS:
return new ReadCoilsRequest(address, count);
case 2: // ReadDiscreteInputs
case READ_DISCRETE_INPUTS:
return new ReadInputDiscretesRequest(address, count);
case 3: // ReadHoldingRegisters
case READ_HOLDING_REGISTERS:
return new ReadMultipleRegistersRequest(address, count);
case 4: // ReadInputRegisters
case READ_INPUT_REGISTERS:
return new ReadInputRegistersRequest(address, count);
default:
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
@@ -111,31 +116,33 @@ public class IotModbusTcpClient {
* 创建写入请求
*/
private ModbusRequest createWriteRequest(Integer functionCode, Integer address, Integer count, int[] values) {
// TODO @AI5、6、15、16 能不能有枚举哈?这样 5、6、15、16 那的注释就不用写;
switch (functionCode) {
case 1: // WriteCoils (使用 5 或 15)
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
if (functionCodeEnum == null) {
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
}
switch (functionCodeEnum) {
case READ_COILS: // 写线圈(使用功能码 5 或 15
if (count == 1) {
return new WriteCoilRequest(address, values[0] != 0);
} else {
// 多线圈写入
BitVector bv = new BitVector(count);
for (int i = 0; i < Math.min(values.length, count); i++) {
bv.setBit(i, values[i] != 0);
}
return new WriteMultipleCoilsRequest(address, bv);
}
case 3: // WriteHoldingRegisters (使用 6 或 16)
case READ_HOLDING_REGISTERS: // 写保持寄存器(使用功能码 6 或 16
if (count == 1) {
return new WriteSingleRegisterRequest(address, new SimpleRegister(values[0]));
} else {
Register[] registers = new com.ghgande.j2mod.modbus.procimg.SimpleRegister[count];
Register[] registers = new SimpleRegister[count];
for (int i = 0; i < count; i++) {
registers[i] = new SimpleRegister(i < values.length ? values[i] : 0);
}
return new WriteMultipleRegistersRequest(address, registers);
}
case 2: // ReadDiscreteInputs - 只读
case 4: // ReadInputRegisters - 只读
case READ_DISCRETE_INPUTS: // 只读
case READ_INPUT_REGISTERS: // 只读
return null;
default:
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
@@ -146,9 +153,12 @@ public class IotModbusTcpClient {
* 从响应中提取值
*/
private int[] extractValues(ModbusResponse response, Integer functionCode) {
// TODO @AI1、2、3、4 能不能有枚举哈?这样 1、2、3、4 那的注释就不用写;
switch (functionCode) {
case 1:
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
if (functionCodeEnum == null) {
throw new IllegalArgumentException("不支持的功能码: " + functionCode);
}
switch (functionCodeEnum) {
case READ_COILS:
ReadCoilsResponse coilsResponse = (ReadCoilsResponse) response;
int bitCount = coilsResponse.getBitCount();
int[] coilValues = new int[bitCount];
@@ -156,7 +166,7 @@ public class IotModbusTcpClient {
coilValues[i] = coilsResponse.getCoilStatus(i) ? 1 : 0;
}
return coilValues;
case 2: // ReadDiscreteInputs
case READ_DISCRETE_INPUTS:
ReadInputDiscretesResponse discretesResponse = (ReadInputDiscretesResponse) response;
int discreteCount = discretesResponse.getBitCount();
int[] discreteValues = new int[discreteCount];
@@ -164,7 +174,7 @@ public class IotModbusTcpClient {
discreteValues[i] = discretesResponse.getDiscreteStatus(i) ? 1 : 0;
}
return discreteValues;
case 3: // ReadHoldingRegisters
case READ_HOLDING_REGISTERS:
ReadMultipleRegistersResponse holdingResponse = (ReadMultipleRegistersResponse) response;
InputRegister[] holdingRegisters = holdingResponse.getRegisters();
int[] holdingValues = new int[holdingRegisters.length];
@@ -172,7 +182,7 @@ public class IotModbusTcpClient {
holdingValues[i] = holdingRegisters[i].getValue();
}
return holdingValues;
case 4: // ReadInputRegisters
case READ_INPUT_REGISTERS:
ReadInputRegistersResponse inputResponse = (ReadInputRegistersResponse) response;
InputRegister[] inputRegisters = inputResponse.getRegisters();
int[] inputValues = new int[inputRegisters.length];

View File

@@ -29,7 +29,9 @@ public class IotModbusTcpConfigCacheService {
private final Map<Long, IotModbusDeviceConfigRespDTO> configCache = new ConcurrentHashMap<>();
/**
* 已知的设备 ID 集合
* 已知的设备 ID 集合(作用:用于检测已删除的设备)
*
* @see #cleanupRemovedDevices(List, Consumer)
*/
private final Set<Long> knownDeviceIds = ConcurrentHashMap.newKeySet();
@@ -48,10 +50,9 @@ public class IotModbusTcpConfigCacheService {
}
List<IotModbusDeviceConfigRespDTO> configs = result.getData();
// 2. 更新缓存
// 2. 更新缓存(注意:不在这里更新 knownDeviceIds由 cleanupRemovedDevices 统一管理)
for (IotModbusDeviceConfigRespDTO config : configs) {
configCache.put(config.getDeviceId(), config);
knownDeviceIds.add(config.getDeviceId());
}
return configs;
} catch (Exception e) {
@@ -70,9 +71,8 @@ public class IotModbusTcpConfigCacheService {
return configCache.get(deviceId);
}
// TODO @AI怎么感觉 cleanupRemovedDevices 的时候knownDeviceIds 已经在 refreshConfig 里更新了???
/**
* 清理已删除设备的资源
* 清理已删除设备的资源,并更新已知设备 ID 集合
*
* @param currentConfigs 当前有效的配置列表
* @param cleanupAction 清理动作
@@ -80,7 +80,7 @@ public class IotModbusTcpConfigCacheService {
public void cleanupRemovedDevices(List<IotModbusDeviceConfigRespDTO> currentConfigs, Consumer<Long> cleanupAction) {
// 1.1 获取当前有效的设备 ID
Set<Long> currentDeviceIds = convertSet(currentConfigs, IotModbusDeviceConfigRespDTO::getDeviceId);
// 1.2 找出已删除的设备
// 1.2 找出已删除的设备(基于旧的 knownDeviceIds
Set<Long> removedDeviceIds = new HashSet<>(knownDeviceIds);
removedDeviceIds.removeAll(currentDeviceIds);
@@ -88,9 +88,12 @@ public class IotModbusTcpConfigCacheService {
for (Long deviceId : removedDeviceIds) {
log.info("[cleanupRemovedDevices][清理已删除设备: {}]", deviceId);
configCache.remove(deviceId);
knownDeviceIds.remove(deviceId);
cleanupAction.accept(deviceId);
}
// 3. 更新已知设备 ID 集合为当前有效的设备 ID
knownDeviceIds.clear();
knownDeviceIds.addAll(currentDeviceIds);
}
}

View File

@@ -27,8 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class IotModbusTcpConnectionManager {
// TODO @AIiot:modbus-tcp:connection:
private static final String LOCK_KEY_PREFIX = "iot:modbus:connection:";
private static final String LOCK_KEY_PREFIX = "iot:modbus-tcp:connection:";
private final RedissonClient redissonClient;
private final Vertx vertx;
@@ -56,11 +55,7 @@ public class IotModbusTcpConnectionManager {
// 2. 情况一:连接已存在,添加设备引用
ModbusConnection connection = connectionPool.get(connectionKey);
if (connection != null) {
// 添加设备引用
connection.addDevice(config.getDeviceId(), config.getSlaveId());
// 更新连接参数(取最小值)
// TODO @AI不确定如果后续最小值被移除后是不是无法灰度到上一个最小值
connection.updateParams(config.getTimeout(), config.getRetryInterval());
return;
}
@@ -86,22 +81,17 @@ public class IotModbusTcpConnectionManager {
* 创建 Modbus TCP 连接
*/
private ModbusConnection createConnection(IotModbusDeviceConfigRespDTO config, RLock lock) throws Exception {
// 创建 TCP 连接
// TODO @AI需要重连么
// 1. 创建 TCP 连接
TCPMasterConnection tcpConnection = new TCPMasterConnection(InetAddress.getByName(config.getIp()));
tcpConnection.setPort(config.getPort());
tcpConnection.setTimeout(config.getTimeout());
tcpConnection.connect();
// 创建 Modbus 连接对象
ModbusConnection connection = new ModbusConnection();
// TODO @AI链式调用简化下
connection.setConnectionKey(buildConnectionKey(config.getIp(), config.getPort()));
connection.setTcpConnection(tcpConnection);
connection.setLock(lock);
connection.setTimeout(config.getTimeout());
connection.setRetryInterval(config.getRetryInterval());
connection.setContext(vertx.getOrCreateContext());
// 2. 创建 Modbus 连接对象
ModbusConnection connection = new ModbusConnection()
.setConnectionKey(buildConnectionKey(config.getIp(), config.getPort()))
.setTcpConnection(tcpConnection).setLock(lock).setContext(vertx.getOrCreateContext())
.setTimeout(config.getTimeout()).setRetryInterval(config.getRetryInterval());
connection.addDevice(config.getDeviceId(), config.getSlaveId());
return connection;
}
@@ -162,9 +152,10 @@ public class IotModbusTcpConnectionManager {
if (connection.getTcpConnection() != null) {
connection.getTcpConnection().close();
}
// TODO @AI不确定是不是要当前线程还是当前进程就 ok 了。
if (connection.getLock() != null && connection.getLock().isHeldByCurrentThread()) {
connection.getLock().unlock();
// 安全释放锁:先检查锁存在且被锁定,再检查是否当前线程持有
RLock lock = connection.getLock();
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
log.info("[closeConnection][关闭 Modbus 连接: {}]", connectionKey);
} catch (Exception e) {
@@ -220,16 +211,6 @@ public class IotModbusTcpConnectionManager {
return deviceSlaveMap.get(deviceId);
}
public void updateParams(Integer timeout, Integer retryInterval) {
// 取最小值
if (timeout != null && (this.timeout == null || timeout < this.timeout)) {
this.timeout = timeout;
}
if (retryInterval != null && (this.retryInterval == null || retryInterval < this.retryInterval)) {
this.retryInterval = retryInterval;
}
}
/**
* 执行 Modbus 读取操作(阻塞方式,在 Vert.x worker 线程执行)
*/

View File

@@ -1,7 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFunctionCodeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -101,23 +103,15 @@ public class IotModbusTcpDownstreamHandler {
* 查找点位配置
*/
private IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
if (config.getPoints() == null) {
return null;
}
// TODO @AIhutool findOne
return config.getPoints().stream()
.filter(p -> identifier.equals(p.getIdentifier()))
.findFirst()
.orElse(null);
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
/**
* 检查功能码是否支持写操作
*/
private boolean isWritable(Integer functionCode) {
// TODO @AI能不能通过 枚举优化下?
// 功能码 1ReadCoils和 3ReadHoldingRegisters支持写操作
return functionCode != null && (functionCode == 1 || functionCode == 3);
IotModbusFunctionCodeEnum functionCodeEnum = IotModbusFunctionCodeEnum.valueOf(functionCode);
return functionCodeEnum != null && Boolean.TRUE.equals(functionCodeEnum.getWritable());
}
}

View File

@@ -9,15 +9,10 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT Modbus TCP 下行消息订阅器
*
* 负责:
* 1. 订阅消息总线的下行消息
* 2. 将属性设置消息转发给下行处理器
* IoT Modbus TCP 下行消息订阅器:订阅消息总线的下行消息并转发给处理器
*
* @author 芋道源码
*/
// TODO @AI希望它的初始化在 configuration 里;
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbustcp;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import io.vertx.core.Vertx;
@@ -11,14 +12,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// TODO @AI注释可以简化
/**
* IoT Modbus TCP 轮询调度器
*
* 负责:
* 1. 管理每个点位的轮询定时器
* 2. 调度 Modbus 读取任务
* 3. 处理读取结果并上报
* IoT Modbus TCP 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果
*
* @author 芋道源码
*/
@@ -46,22 +41,22 @@ public class IotModbusTcpPollScheduler {
stopPolling(deviceId);
// 2.1 为每个点位创建新的轮询任务
List<Long> timerIds = new ArrayList<>();
// TODO @AIif return 简化;上面的 size 加下config.getPoints()
if (config.getPoints() != null) {
for (IotModbusPointRespDTO point : config.getPoints()) {
Long timerId = createPollTimer(config, point);
if (timerId != null) {
timerIds.add(timerId);
}
if (CollUtil.isEmpty(config.getPoints())) {
return;
}
List<Long> timerIds = new ArrayList<>(config.getPoints().size());
for (IotModbusPointRespDTO point : config.getPoints()) {
Long timerId = createPollTimer(config, point);
if (timerId != null) {
timerIds.add(timerId);
}
}
// 2.2 记录定时器
// TODO @AICollUtil.isNotEmptyif return 简化;
if (!timerIds.isEmpty()) {
deviceTimers.put(deviceId, timerIds);
log.debug("[updatePolling][设备 {} 创建了 {} 个轮询定时器]", deviceId, timerIds.size());
if (CollUtil.isEmpty(timerIds)) {
return;
}
deviceTimers.put(deviceId, timerIds);
log.debug("[updatePolling][设备 {} 创建了 {} 个轮询定时器]", deviceId, timerIds.size());
}
/**
@@ -99,7 +94,6 @@ public class IotModbusTcpPollScheduler {
}
// 2. 执行 Modbus 读取
// TODO @AI超时时间有实现么
modbusClient.read(connection, slaveId, point)
.onSuccess(rawValue -> upstreamHandler.handleReadResult(config, point, rawValue))
.onFailure(e -> log.error("[pollPoint][读取点位失败, deviceId={}, identifier={}]",
@@ -111,13 +105,13 @@ public class IotModbusTcpPollScheduler {
*/
public void stopPolling(Long deviceId) {
List<Long> timerIds = deviceTimers.remove(deviceId);
// TODO @AICollUtil.isNotEmpty并且 if return
if (timerIds != null) {
for (Long timerId : timerIds) {
vertx.cancelTimer(timerId);
}
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timerIds.size());
if (CollUtil.isEmpty(timerIds)) {
return;
}
for (Long timerId : timerIds) {
vertx.cancelTimer(timerId);
}
log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timerIds.size());
}
/**

View File

@@ -10,14 +10,8 @@ import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
// TODO @AI注释可以简化下
/**
* IoT Modbus TCP 上行数据处理器
*
* 负责:
* 1. 将 Modbus 读取的原始值转换为物模型属性值
* 2. 构造属性上报消息
* 3. 发送消息到消息总线
* IoT Modbus TCP 上行数据处理器:将原始值转换为物模型属性值并上报
*
* @author 芋道源码
*/

View File

@@ -14,15 +14,8 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
// TODO @AI注释可以简化下
/**
* IoT Modbus TCP 上行协议
*
* 负责:
* 1. 定时从 biz 拉取 Modbus 设备配置
* 2. 管理 Modbus TCP 连接
* 3. 调度轮询任务
* 4. 处理采集数据上报
* IoT Modbus TCP 上行协议:定时拉取配置、管理连接、调度轮询任务
*
* @author 芋道源码
*/
@@ -99,7 +92,8 @@ public class IotModbusTcpUpstreamProtocol {
// 2.1 确保连接存在
connectionManager.ensureConnection(config);
// 2.2 更新轮询任务
// TODO @AI【重要】如果点位配置没变化,是不是不用 update
// DONE @AI【重要】当前实现是全量更新轮询任务,未来可优化为增量更新(只更新变化的点位)
// TODO @AI【超级重要这次必须优化】需要对比 point 的更新1如果 points 删除了需要停止对应的轮询定时器2如果 points 新增了需要新增对应的轮询定时器3如果 points 只修改了 pollInterval需要更新对应的轮询定时器4如果 points 其他属性修改了,不需要处理轮询定时器
pollScheduler.updatePolling(config);
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);

View File

@@ -36,19 +36,12 @@ public class IotDeviceApiImpl implements IotDeviceCommonApi {
private IotGatewayProperties gatewayProperties;
private RestTemplate restTemplate;
private RestTemplate modbusRestTemplate;
@PostConstruct
public void init() {
IotGatewayProperties.RpcProperties rpc = gatewayProperties.getRpc();
restTemplate = new RestTemplateBuilder()
.rootUri(rpc.getUrl() + "/rpc-api/iot/device")
.readTimeout(rpc.getReadTimeout())
.connectTimeout(rpc.getConnectTimeout())
.build();
// TODO @AI继续复用 rpc.getConnectTimeout();不需要独立 modbusRestTemplate
modbusRestTemplate = new RestTemplateBuilder()
.rootUri(rpc.getUrl() + "/rpc-api/iot/modbus")
.rootUri(rpc.getUrl())
.readTimeout(rpc.getReadTimeout())
.connectTimeout(rpc.getConnectTimeout())
.build();
@@ -56,25 +49,25 @@ public class IotDeviceApiImpl implements IotDeviceCommonApi {
@Override
public CommonResult<Boolean> authDevice(IotDeviceAuthReqDTO authReqDTO) {
return doPost(restTemplate, "/auth", authReqDTO, new ParameterizedTypeReference<>() { });
return doPost("/rpc-api/iot/device/auth", authReqDTO, new ParameterizedTypeReference<>() { });
}
@Override
public CommonResult<IotDeviceRespDTO> getDevice(IotDeviceGetReqDTO getReqDTO) {
return doPost(restTemplate, "/get", getReqDTO, new ParameterizedTypeReference<>() { });
return doPost("/rpc-api/iot/device/get", getReqDTO, new ParameterizedTypeReference<>() { });
}
@Override
public CommonResult<List<IotModbusDeviceConfigRespDTO>> getEnabledModbusDeviceConfigs() {
return doPost(modbusRestTemplate, "/enabled-configs", null, new ParameterizedTypeReference<>() { });
return doPost("/rpc-api/iot/modbus/enabled-configs", null, new ParameterizedTypeReference<>() { });
}
private <T, R> CommonResult<R> doPost(RestTemplate template, String url, T body,
private <T, R> CommonResult<R> doPost(String url, T body,
ParameterizedTypeReference<CommonResult<R>> responseType) {
try {
// 请求
HttpEntity<T> requestEntity = new HttpEntity<>(body);
ResponseEntity<CommonResult<R>> response = template.exchange(
ResponseEntity<CommonResult<R>> response = restTemplate.exchange(
url, HttpMethod.POST, requestEntity, responseType);
// 响应
CommonResult<R> result = response.getBody();

View File

@@ -117,9 +117,9 @@ yudao:
keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒)
ssl-enabled: false # 是否启用 SSLwss://
sub-protocol: mqtt # WebSocket 子协议
# TODO @AImodbus-tcp 组件配置待补充
modbus-tcp:
enabled: true
config-refresh-interval: 30 # 配置刷新间隔(秒)
--- #################### 日志相关配置 ####################