feat(iot):modbus-tcp-slave、modbus-tcp-master 继续优化大量代码,并对接管理后台

This commit is contained in:
YunaiV
2026-02-08 20:48:24 +08:00
parent c608b81c4e
commit 4e4c776bed
38 changed files with 522 additions and 329 deletions

View File

@@ -1,16 +1,14 @@
package cn.iocoder.yudao.module.iot.api.device;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.enums.RpcConstants;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.*;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO;
@@ -35,8 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.List;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
@@ -81,12 +77,13 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi {
}
@Override
@PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/modbus/enabled-configs")
@PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/modbus/config-list")
@PermitAll
@TenantIgnore
public CommonResult<List<IotModbusDeviceConfigRespDTO>> getEnabledModbusDeviceConfigs() {
// 1. 获取所有启用的 Modbus 连接配置
List<IotDeviceModbusConfigDO> configList = modbusConfigService.getEnabledDeviceModbusConfigList();
public CommonResult<List<IotModbusDeviceConfigRespDTO>> getModbusDeviceConfigList(
@RequestBody IotModbusDeviceConfigListReqDTO listReqDTO) {
// 1. 获取 Modbus 连接配置
List<IotDeviceModbusConfigDO> configList = modbusConfigService.getDeviceModbusConfigList(listReqDTO);
if (CollUtil.isEmpty(configList)) {
return success(new ArrayList<>());
}
@@ -95,6 +92,7 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi {
Set<Long> deviceIds = convertSet(configList, IotDeviceModbusConfigDO::getDeviceId);
Map<Long, IotDeviceDO> deviceMap = deviceService.getDeviceMap(deviceIds);
Map<Long, List<IotDeviceModbusPointDO>> pointMap = modbusPointService.getEnabledDeviceModbusPointMapByDeviceIds(deviceIds);
Map<Long, IotProductDO> productMap = productService.getProductMap(convertSet(deviceMap.values(), IotDeviceDO::getProductId));
List<IotModbusDeviceConfigRespDTO> result = new ArrayList<>(configList.size());
for (IotDeviceModbusConfigDO config : configList) {
// 3.1 获取设备信息
@@ -102,12 +100,20 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi {
if (device == null) {
continue;
}
// 3.2 获取启用的点位列表
// 3.2 按 protocolType 筛选(如果非空)
if (StrUtil.isNotEmpty(listReqDTO.getProtocolType())) {
IotProductDO product = productMap.get(device.getProductId());
if (product == null || ObjUtil.notEqual(listReqDTO.getProtocolType(), product.getProtocolType())) {
continue;
}
}
// 3.3 获取启用的点位列表
List<IotDeviceModbusPointDO> pointList = pointMap.get(config.getDeviceId());
if (CollUtil.isEmpty(pointList)) {
continue;
}
// 3.3 构建 IotModbusDeviceConfigRespDTO 对象
// 3.4 构建 IotModbusDeviceConfigRespDTO 对象
IotModbusDeviceConfigRespDTO configDTO = BeanUtils.toBean(config, IotModbusDeviceConfigRespDTO.class, o ->
o.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())
.setPoints(BeanUtils.toBean(pointList, IotModbusPointRespDTO.class)));

View File

@@ -33,13 +33,11 @@ public class IotDeviceModbusConfigRespVO {
@Schema(description = "重试间隔(毫秒)", example = "1000")
private Integer retryInterval;
// TODO @AI不要【1-云端轮询 2-主动上报】
@Schema(description = "模式1-云端轮询 2-主动上报", example = "1")
@Schema(description = "工作模式", example = "1")
private Integer mode;
// TODO @AI还是换成 int然后写注释不要【modbus_tcp / modbus_rtu】
@Schema(description = "数据帧格式modbus_tcp / modbus_rtu", example = "modbus_tcp")
private String frameFormat;
@Schema(description = "数据帧格式", example = "1")
private Integer frameFormat;
@Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "0")
private Integer status;

View File

@@ -1,7 +1,9 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@@ -13,12 +15,14 @@ public class IotDeviceModbusConfigSaveReqVO {
@NotNull(message = "设备编号不能为空")
private Long deviceId;
@Schema(description = "Modbus 服务器 IP 地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "192.168.1.100")
@NotEmpty(message = "Modbus 服务器 IP 地址不能为空")
@Schema(description = "Modbus 服务器 IP 地址", example = "192.168.1.100")
// @NotEmpty(message = "Modbus 服务器 IP 地址不能为空")
// TODO @AI这个字段要根据情况校验
private String ip;
@Schema(description = "Modbus 端口", requiredMode = Schema.RequiredMode.REQUIRED, example = "502")
@NotNull(message = "Modbus 端口不能为空")
@Schema(description = "Modbus 端口", example = "502")
// @NotNull(message = "Modbus 端口不能为空")
// TODO @AI这个字段要根据情况校验
private Integer port;
@Schema(description = "从站地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@@ -31,13 +35,13 @@ public class IotDeviceModbusConfigSaveReqVO {
@Schema(description = "重试间隔(毫秒)", example = "1000")
private Integer retryInterval;
// TODO @AI不要【1-云端轮询 2-主动上报】
@Schema(description = "模式1-云端轮询 2-主动上报", example = "1")
@Schema(description = "工作模式", example = "1")
@InEnum(IotModbusModeEnum.class)
private Integer mode;
// TODO @AI不要【1-云端轮询 2-主动上报】
@Schema(description = "数据帧格式modbus_tcp / modbus_rtu", example = "modbus_tcp")
private String frameFormat;
@Schema(description = "数据帧格式", example = "1")
@InEnum(IotModbusFrameFormatEnum.class)
private Integer frameFormat;
@Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "0")
@NotNull(message = "状态不能为空")

View File

@@ -69,7 +69,7 @@ public class IotDeviceModbusConfigDO extends TenantBaseDO {
*
* @see cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum
*/
private String frameFormat;
private Integer frameFormat;
/**
* 状态
*

View File

@@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.iot.dal.mysql.device;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import org.apache.ibatis.annotations.Mapper;
@@ -18,8 +20,11 @@ public interface IotDeviceModbusConfigMapper extends BaseMapperX<IotDeviceModbus
return selectOne(IotDeviceModbusConfigDO::getDeviceId, deviceId);
}
default List<IotDeviceModbusConfigDO> selectListByStatus(Integer status) {
return selectList(IotDeviceModbusConfigDO::getStatus, status);
default List<IotDeviceModbusConfigDO> selectList(IotModbusDeviceConfigListReqDTO reqDTO) {
return selectList(new LambdaQueryWrapperX<IotDeviceModbusConfigDO>()
.eqIfPresent(IotDeviceModbusConfigDO::getStatus, reqDTO.getStatus())
.eqIfPresent(IotDeviceModbusConfigDO::getMode, reqDTO.getMode())
.inIfPresent(IotDeviceModbusConfigDO::getDeviceId, reqDTO.getDeviceIds()));
}
}

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import jakarta.validation.Valid;
@@ -37,10 +38,11 @@ public interface IotDeviceModbusConfigService {
IotDeviceModbusConfigDO getDeviceModbusConfigByDeviceId(Long deviceId);
/**
* 获得所有启用的 Modbus 连接配置列表
* 获得 Modbus 连接配置列表
*
* @return 启用的 Modbus 连接配置列表
* @param listReqDTO 查询参数
* @return Modbus 连接配置列表
*/
List<IotDeviceModbusConfigDO> getEnabledDeviceModbusConfigList();
List<IotDeviceModbusConfigDO> getDeviceModbusConfigList(IotModbusDeviceConfigListReqDTO listReqDTO);
}

View File

@@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceModbusConfigMapper;
import jakarta.annotation.Resource;
@@ -54,8 +54,8 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe
}
@Override
public List<IotDeviceModbusConfigDO> getEnabledDeviceModbusConfigList() {
return modbusConfigMapper.selectListByStatus(CommonStatusEnum.ENABLE.getStatus());
public List<IotDeviceModbusConfigDO> getDeviceModbusConfigList(IotModbusDeviceConfigListReqDTO listReqDTO) {
return modbusConfigMapper.selectList(listReqDTO);
}
}

View File

@@ -10,6 +10,9 @@ import javax.annotation.Nullable;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertMap;
/**
* IoT 产品 Service 接口
@@ -121,6 +124,24 @@ public interface IotProductService {
*/
Long getProductCount(@Nullable LocalDateTime createTime);
/**
* 批量获得产品列表
*
* @param ids 产品编号集合
* @return 产品列表
*/
List<IotProductDO> getProductList(Collection<Long> ids);
/**
* 批量获得产品 Map
*
* @param ids 产品编号集合
* @return 产品 Mapkey: 产品编号, value: 产品)
*/
default Map<Long, IotProductDO> getProductMap(Collection<Long> ids) {
return convertMap(getProductList(ids), IotProductDO::getId);
}
/**
* 批量校验产品存在
*

View File

@@ -171,6 +171,11 @@ public class IotProductServiceImpl implements IotProductService {
return productMapper.selectCountByCreateTime(createTime);
}
@Override
public List<IotProductDO> getProductList(Collection<Long> ids) {
return productMapper.selectByIds(ids);
}
@Override
public void validateProductsExist(Collection<Long> ids) {
if (CollUtil.isEmpty(ids)) {

View File

@@ -1,13 +1,7 @@
package cn.iocoder.yudao.module.iot.core.biz;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import java.util.List;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.*;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO;
@@ -54,10 +48,11 @@ public interface IotDeviceCommonApi {
CommonResult<List<IotSubDeviceRegisterRespDTO>> registerSubDevices(IotSubDeviceRegisterFullReqDTO reqDTO);
/**
* 获取所有启用的 Modbus 设备配置列表
* 获取 Modbus 设备配置列表
*
* @param listReqDTO 查询参数
* @return Modbus 设备配置列表
*/
CommonResult<List<IotModbusDeviceConfigRespDTO>> getEnabledModbusDeviceConfigs();
CommonResult<List<IotModbusDeviceConfigRespDTO>> getModbusDeviceConfigList(IotModbusDeviceConfigListReqDTO listReqDTO);
}

View File

@@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.core.biz.dto;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Set;
/**
* IoT Modbus 设备配置列表查询 Request DTO
*
* @author 芋道源码
*/
@Data
@Accessors(chain = true)
public class IotModbusDeviceConfigListReqDTO {
/**
* 状态
*/
private Integer status;
/**
* 模式
*/
private Integer mode;
/**
* 协议类型
*/
private String protocolType;
/**
* 设备 ID 集合
*/
private Set<Long> deviceIds;
}

View File

@@ -54,7 +54,7 @@ public class IotModbusDeviceConfigRespDTO {
/**
* 数据帧格式
*/
private String frameFormat;
private Integer frameFormat;
// ========== Modbus 点位配置 ==========

View File

@@ -13,26 +13,22 @@ import java.util.Arrays;
*/
@Getter
@RequiredArgsConstructor
public enum IotModbusFrameFormatEnum implements ArrayValuable<String> {
public enum IotModbusFrameFormatEnum implements ArrayValuable<Integer> {
MODBUS_TCP("modbus_tcp", "Modbus TCP"),
MODBUS_RTU("modbus_rtu", "Modbus RTU");
MODBUS_TCP(1),
MODBUS_RTU(2);
public static final String[] ARRAYS = Arrays.stream(values())
public static final Integer[] ARRAYS = Arrays.stream(values())
.map(IotModbusFrameFormatEnum::getFormat)
.toArray(String[]::new);
.toArray(Integer[]::new);
/**
* 格式
*/
private final String format;
/**
* 名称
*/
private final String name;
private final Integer format;
@Override
public String[] array() {
public Integer[] array() {
return ARRAYS;
}

View File

@@ -25,6 +25,14 @@ public class IotDeviceAuthUtils {
return String.format("%s.%s", productKey, deviceName);
}
public static String buildClientIdFromUsername(String username) {
IotDeviceIdentity identity = parseUsername(username);
if (identity == null) {
return null;
}
return buildClientId(identity.getProductKey(), identity.getDeviceName());
}
public static String buildUsername(String productKey, String deviceName) {
return String.format("%s&%s", deviceName, productKey);
}

View File

@@ -32,6 +32,10 @@ public abstract class AbstractIotModbusPollScheduler {
* 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积
*/
private static final long MIN_REQUEST_INTERVAL = 1000;
/**
* 每个设备请求队列的最大长度,超出时丢弃最旧请求
*/
private static final int MAX_QUEUE_SIZE = 1000;
/**
* 设备点位的定时器映射deviceId -> (pointId -> PointTimerInfo)
@@ -159,6 +163,11 @@ public abstract class AbstractIotModbusPollScheduler {
private void submitPollRequest(Long deviceId, Long pointId) {
// 1. 【重要】将请求添加到设备的请求队列
Queue<Runnable> queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>());
while (queue.size() >= MAX_QUEUE_SIZE) {
// 超出上限时,丢弃最旧的请求
queue.poll();
log.warn("[submitPollRequest][设备 {} 请求队列已满({}), 丢弃最旧请求]", deviceId, MAX_QUEUE_SIZE);
}
queue.offer(() -> pollPoint(deviceId, pointId));
// 2. 处理设备请求队列(如果没有延迟 timer 在等待)

View File

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
@@ -331,10 +332,10 @@ public class IotModbusCommonUtils {
// 其他字节序调整
byte[] result = new byte[bytes.length];
switch (byteOrderEnum) {
case BA: // 小端序16 位
if (bytes.length >= 2) {
result[0] = bytes[1];
result[1] = bytes[0];
case BA: // 小端序:按每 2 字节一组交换16 位场景 [1,0]32 位场景 [1,0,3,2]
for (int i = 0; i + 1 < bytes.length; i += 2) {
result[i] = bytes[i + 1];
result[i + 1] = bytes[i];
}
break;
case CDAB: // 大端字交换32 位)
@@ -509,6 +510,9 @@ public class IotModbusCommonUtils {
* @return 匹配的点位配置,未找到返回 null
*/
public static IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) {
if (config == null || StrUtil.isBlank(identifier)) {
return null;
}
return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier()));
}
@@ -520,6 +524,9 @@ public class IotModbusCommonUtils {
* @return 匹配的点位配置,未找到返回 null
*/
public static IotModbusPointRespDTO findPointById(IotModbusDeviceConfigRespDTO config, Long pointId) {
if (config == null || pointId == null) {
return null;
}
return CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId));
}

View File

@@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConnectionManager;
import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction;
import com.ghgande.j2mod.modbus.msg.*;
import com.ghgande.j2mod.modbus.procimg.InputRegister;
@@ -19,7 +19,7 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.I
* <p>
* 封装基于 j2mod 的 Modbus TCP 读写操作:
* 1. 根据功能码创建对应的 Modbus 读/写请求
* 2. 通过 {@link IotModbusTcpConnectionManager.ModbusConnection} 执行事务
* 2. 通过 {@link IotModbusTcpMasterConnectionManager.ModbusConnection} 执行事务
* 3. 从响应中提取原始值
*
* @author 芋道源码
@@ -36,7 +36,7 @@ public class IotModbusTcpMasterUtils {
* @param point 点位配置
* @return 原始值int 数组)
*/
public static Future<int[]> read(IotModbusTcpConnectionManager.ModbusConnection connection,
public static Future<int[]> read(IotModbusTcpMasterConnectionManager.ModbusConnection connection,
Integer slaveId,
IotModbusPointRespDTO point) {
return connection.executeBlocking(tcpConnection -> {
@@ -70,7 +70,7 @@ public class IotModbusTcpMasterUtils {
* @param values 要写入的值
* @return 是否成功
*/
public static Future<Boolean> write(IotModbusTcpConnectionManager.ModbusConnection connection,
public static Future<Boolean> write(IotModbusTcpMasterConnectionManager.ModbusConnection connection,
Integer slaveId,
IotModbusPointRespDTO point,
int[] values) {

View File

@@ -6,16 +6,15 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpMasterDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpMasterDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpMasterUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.Getter;
@@ -23,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -61,15 +61,14 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
/**
* 连接管理器
*/
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpMasterConnectionManager connectionManager;
/**
* 下行消息订阅者
*/
private final IotModbusTcpDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpMasterDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpConfigCacheService configCacheService;
private final IotModbusTcpPollScheduler pollScheduler;
private final IotDeviceMessageService messageService;
private final IotModbusTcpMasterConfigCacheService configCacheService;
private final IotModbusTcpMasterPollScheduler pollScheduler;
public IotModbusTcpMasterProtocol(ProtocolProperties properties) {
IotModbusTcpMasterConfig modbusTcpMasterConfig = properties.getModbusTcpMaster();
@@ -83,22 +82,23 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
// 初始化 Manager
RedissonClient redissonClient = SpringUtil.getBean(RedissonClient.class);
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.connectionManager = new IotModbusTcpConnectionManager(redissonClient, vertx);
this.configCacheService = new IotModbusTcpConfigCacheService(deviceApi);
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.configCacheService = new IotModbusTcpMasterConfigCacheService(deviceApi);
// DONE @AI上线/下线消息已移到 ConnectionManager 内部处理,不再走回调
this.connectionManager = new IotModbusTcpMasterConnectionManager(redissonClient, vertx,
messageService, configCacheService, serverId);
// 初始化 Handler
this.messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceMessageService messageService = this.messageService;
IotModbusTcpUpstreamHandler upstreamHandler = new IotModbusTcpUpstreamHandler(messageService, serverId);
IotModbusTcpDownstreamHandler downstreamHandler = new IotModbusTcpDownstreamHandler(connectionManager,
IotModbusTcpMasterUpstreamHandler upstreamHandler = new IotModbusTcpMasterUpstreamHandler(messageService, serverId);
IotModbusTcpMasterDownstreamHandler downstreamHandler = new IotModbusTcpMasterDownstreamHandler(connectionManager,
configCacheService);
// 初始化轮询调度器
this.pollScheduler = new IotModbusTcpPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService);
this.pollScheduler = new IotModbusTcpMasterPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
this.downstreamSubscriber = new IotModbusTcpDownstreamSubscriber(this, downstreamHandler, messageBus);
this.downstreamSubscriber = new IotModbusTcpMasterDownstreamSubscriber(this, downstreamHandler, messageBus);
}
@Override
@@ -196,46 +196,21 @@ public class IotModbusTcpMasterProtocol implements IotProtocol {
// 2. 更新连接和轮询任务
for (IotModbusDeviceConfigRespDTO config : configs) {
try {
// 2.1 检测是否为首次连接
boolean isNewConnection = connectionManager.getConnection(config.getDeviceId()) == null;
// 2.2 确保连接存在
// 2.1 确保连接存在
connectionManager.ensureConnection(config);
// 2.3 首次建连成功后发送上线消息
// TODO @AI在这里判断上线 ,会不会有点奇怪???
if (isNewConnection && connectionManager.getConnection(config.getDeviceId()) != null) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(onlineMessage,
config.getProductKey(), config.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[refreshConfig][发送设备上线消息失败, deviceId={}]", config.getDeviceId(), ex);
}
}
// 2.4 更新轮询任务
// 2.2 更新轮询任务
pollScheduler.updatePolling(config);
} catch (Exception e) {
log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e);
}
}
// 3. 清理已删除设备的资源(仅 API 成功时才执行)
configCacheService.cleanupRemovedDevices(configs, deviceId -> {
// 3.1 发送设备下线消息
// TODO @AI在这里判断上线 ,会不会有点奇怪???
IotModbusDeviceConfigRespDTO removedConfig = configCacheService.getConfig(deviceId);
if (removedConfig != null) {
try {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage,
removedConfig.getProductKey(), removedConfig.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[refreshConfig][发送设备下线消息失败, deviceId={}]", deviceId, ex);
}
}
// 3.2 停止轮询和移除连接
// 3. 清理已删除设备的资源
Set<Long> removedDeviceIds = configCacheService.cleanupRemovedDevices(configs);
for (Long deviceId : removedDeviceIds) {
pollScheduler.stopPolling(deviceId);
connectionManager.removeDevice(deviceId);
});
}
} catch (Exception e) {
log.error("[refreshConfig][刷新配置失败]", e);
}

View File

@@ -1,41 +1,42 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConfigCacheService;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConnectionManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* IoT Modbus TCP 下行消息处理器
* IoT Modbus TCP Master 下行消息处理器
* <p>
* 负责
* 1. 处理下行消息如属性设置 thing.service.property.set
* 2. 执行 Modbus 入操作
* 2. 将属性值转换为 Modbus 指令通过 TCP 连接发送给设备
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpDownstreamHandler {
public class IotModbusTcpMasterDownstreamHandler {
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpConfigCacheService configCacheService;
private final IotModbusTcpMasterConnectionManager connectionManager;
private final IotModbusTcpMasterConfigCacheService configCacheService;
/**
* 处理下行消息
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "DuplicatedCode"})
public void handle(IotDeviceMessage message) {
// 1.1 检查是否是属性设置消息
if (!IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod().equals(message.getMethod())) {
if (ObjUtil.notEqual(IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod(), message.getMethod())) {
log.debug("[handle][忽略非属性设置消息: {}]", message.getMethod());
return;
}
@@ -78,7 +79,7 @@ public class IotModbusTcpDownstreamHandler {
*/
private void writeProperty(IotModbusDeviceConfigRespDTO config, IotModbusPointRespDTO point, Object value) {
// 1.1 获取连接
IotModbusTcpConnectionManager.ModbusConnection connection = connectionManager.getConnection(config.getDeviceId());
IotModbusTcpMasterConnectionManager.ModbusConnection connection = connectionManager.getConnection(config.getDeviceId());
if (connection == null) {
log.warn("[writeProperty][设备 {} 没有连接]", config.getDeviceId());
return;

View File

@@ -12,12 +12,12 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
public class IotModbusTcpMasterDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
private final IotModbusTcpDownstreamHandler downstreamHandler;
private final IotModbusTcpMasterDownstreamHandler downstreamHandler;
public IotModbusTcpDownstreamSubscriber(IotModbusTcpMasterProtocol protocol,
IotModbusTcpDownstreamHandler downstreamHandler,
public IotModbusTcpMasterDownstreamSubscriber(IotModbusTcpMasterProtocol protocol,
IotModbusTcpMasterDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;

View File

@@ -17,12 +17,13 @@ import java.util.Map;
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpUpstreamHandler {
public class IotModbusTcpMasterUpstreamHandler {
private final IotDeviceMessageService messageService;
private final String serverId;
public IotModbusTcpUpstreamHandler(IotDeviceMessageService messageService,
public IotModbusTcpMasterUpstreamHandler(IotDeviceMessageService messageService,
String serverId) {
this.messageService = messageService;
this.serverId = serverId;
@@ -39,7 +40,7 @@ public class IotModbusTcpUpstreamHandler {
IotModbusPointRespDTO point,
int[] rawValue) {
try {
// 1.1 转换原始值为物模型属性值
// 1.1 转换原始值为物模型属性值点位翻译
Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValue, point);
log.debug("[handleReadResult][设备={}, 属性={}, 原始值={}, 转换值={}]",
config.getDeviceId(), point.getIdentifier(), rawValue, convertedValue);

View File

@@ -1,25 +1,31 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/**
* IoT Modbus TCP 配置缓存服务负责 biz 拉取 Modbus 设备配置缓存配置数据并检测配置变更
* IoT Modbus TCP Master 配置缓存服务
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpConfigCacheService {
public class IotModbusTcpMasterConfigCacheService {
private final IotDeviceCommonApi deviceApi;
@@ -31,7 +37,7 @@ public class IotModbusTcpConfigCacheService {
/**
* 已知的设备 ID 集合作用用于检测已删除的设备
*
* @see #cleanupRemovedDevices(List, Consumer)
* @see #cleanupRemovedDevices(List)
*/
private final Set<Long> knownDeviceIds = ConcurrentHashMap.newKeySet();
@@ -43,7 +49,9 @@ public class IotModbusTcpConfigCacheService {
public List<IotModbusDeviceConfigRespDTO> refreshConfig() {
try {
// 1. 从远程获取配置
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getEnabledModbusDeviceConfigs();
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getModbusDeviceConfigList(
new IotModbusDeviceConfigListReqDTO().setStatus(CommonStatusEnum.ENABLE.getStatus())
.setMode(IotModbusModeEnum.POLLING.getMode()).setProtocolType(IotProtocolTypeEnum.MODBUS_TCP_MASTER.getType()));
result.checkError();
List<IotModbusDeviceConfigRespDTO> configs = result.getData();
@@ -69,28 +77,30 @@ public class IotModbusTcpConfigCacheService {
}
/**
* 清理已删除设备的资源并更新已知设备 ID 集合
* 计算已删除设备的 ID 集合清理缓存并更新已知设备 ID 集合
*
* DONE @AI不再使用 callback 模式返回已删除的设备 ID 集合由调用方直接清理
*
* @param currentConfigs 当前有效的配置列表
* @param cleanupAction 清理动作
* @return 已删除的设备 ID 集合
*/
public void cleanupRemovedDevices(List<IotModbusDeviceConfigRespDTO> currentConfigs, Consumer<Long> cleanupAction) {
public Set<Long> cleanupRemovedDevices(List<IotModbusDeviceConfigRespDTO> currentConfigs) {
// 1.1 获取当前有效的设备 ID
Set<Long> currentDeviceIds = convertSet(currentConfigs, IotModbusDeviceConfigRespDTO::getDeviceId);
// 1.2 找出已删除的设备基于旧的 knownDeviceIds
Set<Long> removedDeviceIds = new HashSet<>(knownDeviceIds);
removedDeviceIds.removeAll(currentDeviceIds);
// 2. 清理已删除设备先执行 cleanupAction再从缓存移除保证 action 中仍可获取 config
// 2. 清理已删除设备的缓存
for (Long deviceId : removedDeviceIds) {
log.info("[cleanupRemovedDevices][清理已删除设备: {}]", deviceId);
cleanupAction.accept(deviceId);
configCache.remove(deviceId);
}
// 3. 更新已知设备 ID 集合为当前有效的设备 ID
knownDeviceIds.clear();
knownDeviceIds.addAll(currentDeviceIds);
return removedDeviceIds;
}
}

View File

@@ -2,12 +2,13 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import com.ghgande.j2mod.modbus.net.TCPMasterConnection;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
@@ -26,14 +27,16 @@ import java.util.concurrent.ConcurrentHashMap;
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusTcpConnectionManager {
public class IotModbusTcpMasterConnectionManager {
private static final String LOCK_KEY_PREFIX = "iot:modbus-tcp:connection:";
private final RedissonClient redissonClient;
private final Vertx vertx;
private final IotDeviceMessageService messageService;
private final IotModbusTcpMasterConfigCacheService configCacheService;
private final String serverId;
/**
* 连接池key = ip:port
@@ -45,8 +48,21 @@ public class IotModbusTcpConnectionManager {
*/
private final Map<Long, String> deviceConnectionMap = new ConcurrentHashMap<>();
public IotModbusTcpMasterConnectionManager(RedissonClient redissonClient, Vertx vertx,
IotDeviceMessageService messageService,
IotModbusTcpMasterConfigCacheService configCacheService,
String serverId) {
this.redissonClient = redissonClient;
this.vertx = vertx;
this.messageService = messageService;
this.configCacheService = configCacheService;
this.serverId = serverId;
}
/**
* 确保连接存在
* <p>
* 首次建连成功时直接发送设备上线消息
*
* @param config 设备配置
*/
@@ -59,37 +75,39 @@ public class IotModbusTcpConnectionManager {
config.getDeviceId(), oldConnectionKey, connectionKey);
removeDevice(config.getDeviceId());
}
// 1.2 记录设备与连接的关系
// 1.2 记录设备与连接的映射
deviceConnectionMap.put(config.getDeviceId(), connectionKey);
// 2. 情况一连接已存在添加设备引用
// 2. 情况一连接已存在注册设备并发送上线消息
ModbusConnection connection = connectionPool.get(connectionKey);
if (connection != null) {
connection.addDevice(config.getDeviceId(), config.getSlaveId());
addDeviceAndOnline(connection, config);
return;
}
// 3. 情况二连接不存在创建新连接
// 3.1 尝试获取分布式锁
// 3. 情况二连接不存在加分布式锁创建新连接
RLock lock = redissonClient.getLock(LOCK_KEY_PREFIX + connectionKey);
if (!lock.tryLock()) {
log.debug("[ensureConnection][获取锁失败, 由其他节点负责: {}]", connectionKey);
return;
}
// 3.2 double-check拿到锁后再次检查避免并发创建重复连接
try {
// 3.1 double-check拿到锁后再次检查避免并发创建重复连接
connection = connectionPool.get(connectionKey);
if (connection != null) {
connection.addDevice(config.getDeviceId(), config.getSlaveId());
lock.unlock();
addDeviceAndOnline(connection, config);
return;
}
// 3.3 创建新连接
connection = createConnection(config, lock);
// 3.2 创建新连接
connection = createConnection(config);
connectionPool.put(connectionKey, connection);
log.info("[ensureConnection][创建 Modbus 连接成功: {}]", connectionKey);
// 3.3 注册设备并发送上线消息
addDeviceAndOnline(connection, config);
} catch (Exception e) {
log.error("[ensureConnection][创建 Modbus 连接失败: {}]", connectionKey, e);
} finally {
// TODO @AI如果这里释放会不会出现集群模式下多个节点同时创建连接的情况需要验证一下 Redisson 的分布式锁特性
lock.unlock();
}
}
@@ -97,7 +115,7 @@ public class IotModbusTcpConnectionManager {
/**
* 创建 Modbus TCP 连接
*/
private ModbusConnection createConnection(IotModbusDeviceConfigRespDTO config, RLock lock) throws Exception {
private ModbusConnection createConnection(IotModbusDeviceConfigRespDTO config) throws Exception {
// 1. 创建 TCP 连接
TCPMasterConnection tcpConnection = new TCPMasterConnection(InetAddress.getByName(config.getIp()));
tcpConnection.setPort(config.getPort());
@@ -105,12 +123,10 @@ public class IotModbusTcpConnectionManager {
tcpConnection.connect();
// 2. 创建 Modbus 连接对象
ModbusConnection connection = new ModbusConnection()
return new ModbusConnection()
.setConnectionKey(buildConnectionKey(config.getIp(), config.getPort()))
.setTcpConnection(tcpConnection).setLock(lock).setContext(vertx.getOrCreateContext())
.setTcpConnection(tcpConnection).setContext(vertx.getOrCreateContext())
.setTimeout(config.getTimeout()).setRetryInterval(config.getRetryInterval());
connection.addDevice(config.getDeviceId(), config.getSlaveId());
return connection;
}
/**
@@ -137,25 +153,71 @@ public class IotModbusTcpConnectionManager {
/**
* 移除设备
* <p>
* 移除时直接发送设备下线消息
*/
public void removeDevice(Long deviceId) {
// 1. 移除设备引用
// 1.1 移除设备发送下线消息
sendOfflineMessage(deviceId);
// 1.2 移除设备引用
String connectionKey = deviceConnectionMap.remove(deviceId);
if (connectionKey == null) {
return;
}
// 2.1 移除连接中的设备引用
ModbusConnection connection = connectionPool.get(connectionKey);
if (connection == null) {
return;
}
connection.removeDevice(deviceId);
// 2. 如果没有设备引用了关闭连接
// 2.2 如果没有设备引用了关闭连接
if (connection.getDeviceCount() == 0) {
closeConnection(connectionKey);
}
}
// ==================== 设备连接 & 上下线消息 ====================
/**
* 注册设备到连接并发送上线消息
*/
private void addDeviceAndOnline(ModbusConnection connection,
IotModbusDeviceConfigRespDTO config) {
connection.addDevice(config.getDeviceId(), config.getSlaveId());
sendOnlineMessage(config);
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotModbusDeviceConfigRespDTO config) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(onlineMessage,
config.getProductKey(), config.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[sendOnlineMessage][发送设备上线消息失败, deviceId={}]", config.getDeviceId(), ex);
}
}
/**
* 发送设备下线消息
*/
private void sendOfflineMessage(Long deviceId) {
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId);
if (config == null) {
return;
}
try {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage,
config.getProductKey(), config.getDeviceName(), serverId);
} catch (Exception ex) {
log.error("[sendOfflineMessage][发送设备下线消息失败, deviceId={}]", deviceId, ex);
}
}
/**
* 关闭指定连接
*/
@@ -170,10 +232,10 @@ public class IotModbusTcpConnectionManager {
connection.getTcpConnection().close();
}
// 强制解锁避免死锁正常情况下应该不会发生锁未释放的情况
RLock lock = connection.getLock();
if (lock != null && lock.isLocked()) {
lock.forceUnlock();
}
// RLock lock = connection.getLock();
// if (lock != null && lock.isLocked()) {
// lock.forceUnlock();
// }
log.info("[closeConnection][关闭 Modbus 连接: {}]", connectionKey);
} catch (Exception e) {
log.error("[closeConnection][关闭连接失败: {}]", connectionKey, e);
@@ -202,11 +264,14 @@ public class IotModbusTcpConnectionManager {
private String connectionKey;
private TCPMasterConnection tcpConnection;
private RLock lock;
private Integer timeout;
private Integer retryInterval;
private Context context;
// TODO @AI是不是需要 lock避免集群模式下的竞争肯定不能让别的节点连接上另外RLock 在节点持有所锁的节点 cransh 的时候会自动释放
// private RLock lock;
/**
* 设备 ID slave ID 的映射
*/

View File

@@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager.AbstractIotModbusPollScheduler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpMasterUpstreamHandler;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
@@ -17,16 +17,16 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotModbusTcpPollScheduler extends AbstractIotModbusPollScheduler {
public class IotModbusTcpMasterPollScheduler extends AbstractIotModbusPollScheduler {
private final IotModbusTcpConnectionManager connectionManager;
private final IotModbusTcpUpstreamHandler upstreamHandler;
private final IotModbusTcpConfigCacheService configCacheService;
private final IotModbusTcpMasterConnectionManager connectionManager;
private final IotModbusTcpMasterUpstreamHandler upstreamHandler;
private final IotModbusTcpMasterConfigCacheService configCacheService;
public IotModbusTcpPollScheduler(Vertx vertx,
IotModbusTcpConnectionManager connectionManager,
IotModbusTcpUpstreamHandler upstreamHandler,
IotModbusTcpConfigCacheService configCacheService) {
public IotModbusTcpMasterPollScheduler(Vertx vertx,
IotModbusTcpMasterConnectionManager connectionManager,
IotModbusTcpMasterUpstreamHandler upstreamHandler,
IotModbusTcpMasterConfigCacheService configCacheService) {
super(vertx);
this.connectionManager = connectionManager;
this.upstreamHandler = upstreamHandler;
@@ -54,7 +54,7 @@ public class IotModbusTcpPollScheduler extends AbstractIotModbusPollScheduler {
}
// 2.1 获取连接
IotModbusTcpConnectionManager.ModbusConnection connection = connectionManager.getConnection(deviceId);
IotModbusTcpMasterConnectionManager.ModbusConnection connection = connectionManager.getConnection(deviceId);
if (connection == null) {
log.warn("[pollPoint][设备 {} 没有连接]", deviceId);
return;

View File

@@ -1,9 +1,6 @@
/**
* Modbus TCP Master 协议实现包
* Modbus TCP Master(主站)协议:网关主动连接并轮询 Modbus 从站设备
* <p>
* 提供基于 j2mod 的 Modbus TCP 主站Master功能支持
* 1. 定时轮询 Modbus 从站设备数据
* 2. 下发属性设置命令到从站设备
* 3. 数据格式转换(寄存器值 ↔ 物模型属性值)
* 基于 j2mod 实现,支持 FC01-04 读、FC05/06/15/16 写,定时轮询 + 下发属性设置
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster;

View File

@@ -81,22 +81,27 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
*/
private Long requestCleanupTimerId;
// ========== 各组件 ==========
// TODO @芋艿:稍后排序下,有点小乱;
private final IotModbusTcpSlaveConfig slaveConfig;
private final IotModbusFrameDecoder frameDecoder;
private final IotModbusFrameEncoder frameEncoder;
/**
* 连接管理器
*/
private final IotModbusTcpSlaveConnectionManager connectionManager;
/**
* 下行消息订阅者
*/
private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber;
private final IotModbusFrameDecoder frameDecoder;
@SuppressWarnings("FieldCanBeLocal")
private final IotModbusFrameEncoder frameEncoder;
private final IotModbusTcpSlaveConfigCacheService configCacheService;
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
private final IotModbusTcpSlaveUpstreamHandler upstreamHandler;
private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber;
private final IotModbusTcpSlavePollScheduler pollScheduler;
private final IotDeviceMessageService messageService;
public IotModbusTcpSlaveProtocol(ProtocolProperties properties) {
this.slaveConfig = properties.getModbusTcpSlave();
IotModbusTcpSlaveConfig slaveConfig = properties.getModbusTcpSlave();
Assert.notNull(slaveConfig, "Modbus TCP Slave 协议配置modbusTcpSlave不能为空");
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
@@ -124,10 +129,9 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
// 初始化 Handler
this.messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceMessageService messageService = this.messageService;
IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class);
this.upstreamHandler = new IotModbusTcpSlaveUpstreamHandler(
deviceApi, messageService, frameEncoder,
deviceApi, this.messageService, frameEncoder,
connectionManager, configCacheService, pendingRequestManager,
pollScheduler, deviceService, serverId);
@@ -158,9 +162,9 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
try {
// 1. 启动配置刷新定时器
int refreshInterval = slaveConfig.getConfigRefreshInterval();
IotModbusTcpSlaveConfig slaveConfig = properties.getModbusTcpSlave();
configRefreshTimerId = vertx.setPeriodic(
TimeUnit.SECONDS.toMillis(refreshInterval),
TimeUnit.SECONDS.toMillis(slaveConfig.getConfigRefreshInterval()),
id -> refreshConfig());
// 2.1 启动 TCP Server
@@ -178,6 +182,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e);
// TODO @芋艿:后续统一优化 stop 逻辑;
if (configRefreshTimerId != null) {
vertx.cancelTimer(configRefreshTimerId);
configRefreshTimerId = null;
@@ -223,9 +228,9 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
pollScheduler.stopAll();
// 2.3 清理 PendingRequest
pendingRequestManager.clear();
// 2.3 关闭所有连接
// 2.4 关闭所有连接
connectionManager.closeAll();
// 2.4 关闭 TCP Server
// 2.5 关闭 TCP Server
if (netServer != null) {
try {
netServer.close().result();
@@ -308,9 +313,6 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
/**
* 刷新已连接设备的配置(定时调用)
* <p>
* 与 tcpmaster 不同slave 只刷新已连接设备的配置,不做全量 diff。
* 设备的新增(认证时)和删除(断连时)分别在 {@link #handleConnection} 中处理。
*/
private synchronized void refreshConfig() {
try {
@@ -321,6 +323,10 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol {
}
List<IotModbusDeviceConfigRespDTO> configs =
configCacheService.refreshConnectedDeviceConfigList(connectedDeviceIds);
if (configs == null) {
log.warn("[refreshConfig][刷新配置失败,跳过本次刷新]");
return;
}
log.debug("[refreshConfig][刷新了 {} 个已连接设备的配置]", configs.size());
// 2. 更新已连接设备的轮询任务

View File

@@ -29,7 +29,7 @@ public class IotModbusFrame {
/**
* 事务标识符
* <p>
* 仅 {@link IotModbusFrameFormatEnum#MODBUS_TCP} 格式有值
* 仅 {@link IotModbusFrameFormatEnum#MODBUS_TCP} 格式有值
*/
private Integer transactionId;
@@ -37,14 +37,13 @@ public class IotModbusFrame {
* 异常码
* <p>
* 当功能码最高位为 1 时(异常响应),此字段存储异常码。
* 为 null 表示非异常响应。
*
* @see IotModbusCommonUtils#FC_EXCEPTION_MASK
*/
private Integer exceptionCode;
/**
* 自定义功能码时的 JSON 字符串
* 自定义功能码时的 JSON 字符串(用于 auth 认证等等)
*/
private String customData;

View File

@@ -25,15 +25,15 @@ import java.util.function.BiConsumer;
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotModbusFrameDecoder {
/**
* 自定义功能码
*/
private final int customFunctionCode;
public IotModbusFrameDecoder(int customFunctionCode) {
this.customFunctionCode = customFunctionCode;
}
/**
* 创建带自动帧格式检测的 RecordParser
*
@@ -82,7 +82,7 @@ public class IotModbusFrameDecoder {
// 提取 PDU 数据(从 functionCode 之后到末尾)
byte[] pdu = new byte[data.length - 8];
System.arraycopy(data, 8, pdu, 0, pdu.length);
// 构建 IotModbusFrame
return buildFrame(slaveId, functionCode, pdu, transactionId);
}
@@ -105,7 +105,7 @@ public class IotModbusFrameDecoder {
// PDU 数据(不含 slaveId、functionCode、CRC
byte[] pdu = new byte[data.length - 4];
System.arraycopy(data, 2, pdu, 0, pdu.length);
// 构建 IotModbusFrame
return buildFrame(slaveId, functionCode, pdu, null);
}
@@ -144,7 +144,6 @@ public class IotModbusFrameDecoder {
/**
* 帧格式检测阶段 Handler仅处理首包探测后切换到对应的拆包 Handler
*/
@SuppressWarnings("ClassCanBeRecord")
@RequiredArgsConstructor
private class DetectPhaseHandler implements Handler<Buffer> {

View File

@@ -51,7 +51,7 @@ public class IotModbusTcpSlaveDownstreamHandler {
/**
* 处理下行消息
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "DuplicatedCode"})
public void handle(IotDeviceMessage message) {
// 1.1 检查是否是属性设置消息
if (ObjUtil.notEqual(IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod(), message.getMethod())) {
@@ -125,17 +125,26 @@ public class IotModbusTcpSlaveDownstreamHandler {
point.getRegisterAddress(), rawValues[0], frameFormat, transactionId);
} else if (writeMultipleCode != null) {
// 多个值使用多写功能码FC15/FC16
if (writeMultipleCode == IotModbusCommonUtils.FC_WRITE_MULTIPLE_COILS) {
data = frameEncoder.encodeWriteMultipleCoilsRequest(slaveId,
point.getRegisterAddress(), rawValues, frameFormat, transactionId);
} else {
data = frameEncoder.encodeWriteMultipleRegistersRequest(slaveId,
point.getRegisterAddress(), rawValues, frameFormat, transactionId);
}
} else {
log.warn("[writeProperty][点位 {} 不支持写操作, 功能码={}]", point.getIdentifier(), readFunctionCode);
return;
}
// 2. 发送
connectionManager.sendToDevice(deviceId, data);
// 2. 发送消息
connectionManager.sendToDevice(deviceId, data).onSuccess(v ->
log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]",
deviceId, point.getIdentifier(), value);
deviceId, point.getIdentifier(), value)
).onFailure(e ->
log.error("[writeProperty][写入失败, deviceId={}, identifier={}, value={}]",
deviceId, point.getIdentifier(), value, e)
);
}
}

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.upstream;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants;
@@ -61,6 +61,7 @@ public class IotModbusTcpSlaveUpstreamHandler {
private final IotModbusTcpSlavePendingRequestManager pendingRequestManager;
private final IotModbusTcpSlavePollScheduler pollScheduler;
private final IotDeviceService deviceService;
private final String serverId;
public IotModbusTcpSlaveUpstreamHandler(IotDeviceCommonApi deviceApi,
@@ -153,16 +154,20 @@ public class IotModbusTcpSlaveUpstreamHandler {
// 1. 解析认证参数
IotDeviceAuthReqDTO request = JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class);
Assert.notNull(request, "认证参数不能为空");
Assert.notBlank(request.getClientId(), "clientId 不能为空");
Assert.notBlank(request.getUsername(), "username 不能为空");
Assert.notBlank(request.getPassword(), "password 不能为空");
// 特殊:考虑到 modbus 消息体积较小,默认 clientId 传递空串
if (StrUtil.isBlank(request.getClientId())) {
request.setClientId(IotDeviceAuthUtils.buildClientIdFromUsername(request.getUsername()));
}
Assert.notBlank(request.getClientId(), "clientId 不能为空");
// 2.1 调用认证 API
CommonResult<Boolean> result = deviceApi.authDevice(request);
result.checkError();
if (BooleanUtil.isFalse(result.getData())) {
log.warn("[handleAuth][认证失败, clientId={}, username={}]", request.getClientId(), request.getUsername());
sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, 1, "认证失败");
sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, BAD_REQUEST.getCode(), "认证失败");
return;
}
// 2.2 解析设备信息
@@ -171,7 +176,21 @@ public class IotModbusTcpSlaveUpstreamHandler {
// 2.3 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// TODO @AI2.4 必须找到连接配置;
// 2.4 加载设备 Modbus 配置,无配置则阻断认证
IotModbusDeviceConfigRespDTO modbusConfig = configCacheService.loadDeviceConfig(device.getId());
if (modbusConfig == null) {
log.warn("[handleAuth][设备 {} 没有 Modbus 点位配置, 拒绝认证]", device.getId());
sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, BAD_REQUEST.getCode(), "设备无 Modbus 配置");
return;
}
// 2.5 协议不一致,阻断认证
if (ObjUtil.notEqual(frameFormat.getFormat(), modbusConfig.getFrameFormat())) {
log.warn("[handleAuth][设备 {} frameFormat 不一致, 连接协议={}, 设备配置={},拒绝认证]",
device.getId(), frameFormat.getFormat(), modbusConfig.getFrameFormat());
sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, BAD_REQUEST.getCode(),
"frameFormat 协议不一致");
return;
}
// 3.1 注册连接
ConnectionInfo connectionInfo = new ConnectionInfo()
@@ -189,13 +208,8 @@ public class IotModbusTcpSlaveUpstreamHandler {
GlobalErrorCodeConstants.SUCCESS.getCode(), "success");
log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", request.getClientId(), device.getId());
// 4. 加载设备配置并启动轮询
IotModbusDeviceConfigRespDTO config = configCacheService.loadDeviceConfig(device.getId());
if (config != null) {
pollScheduler.updatePolling(config);
} else {
log.warn("[handleAuth][认证成功但未找到设备配置, deviceId={}]", device.getId());
}
// 4. 启动轮询
pollScheduler.updatePolling(modbusConfig);
}
/**
@@ -245,20 +259,19 @@ public class IotModbusTcpSlaveUpstreamHandler {
}
// 2.3 查找点位配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(info.getDeviceId());
if (config == null || CollUtil.isEmpty(config.getPoints())) {
return;
}
IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, request.getPointId());
if (point == null) {
return;
}
// 3.1 点位翻译
// 3.1 转换原始值为物模型属性值(点位翻译
Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValues, point);
// 3.2 上报属性
// 3.2 构造属性上报消息
Map<String, Object> params = MapUtil.of(request.getIdentifier(), convertedValue);
IotDeviceMessage message = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params);
// 4. 发送到消息总线
messageService.sendDeviceMessage(message, info.getProductKey(), info.getDeviceName(), serverId);
log.debug("[handlePollingResponse][设备={}, 属性={}, 原始值={}, 转换值={}]",
info.getDeviceId(), request.getIdentifier(), rawValues, convertedValue);

View File

@@ -1,10 +1,15 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -13,16 +18,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT Modbus TCP Slave 配置缓存服务
* <p>
* 与 tcpmaster 的 {@code IotModbusTcpConfigCacheService} 不同:
* - tcpmaster 启动时拉全量配置 → 主动建连,需要全量 diff 检测新增/删除设备
* - tcpslave 设备主动连接 → 认证时按需加载配置,断连时清理,定时刷新已连接设备的配置
* <p>
* 配置生命周期:
* 1. 认证时:{@link #loadDeviceConfig(Long)} 按 deviceId 从 API 加载配置到缓存
* 2. 断连时:{@link #removeConfig(Long)} 从缓存中移除
* 3. 定时刷新:{@link #refreshConnectedDeviceConfigList(Set)} 批量刷新已连接设备的配置
* IoT Modbus TCP Slave 配置缓存:认证时按需加载,断连时清理,定时刷新已连接设备
*
* @author 芋道源码
*/
@@ -41,44 +37,33 @@ public class IotModbusTcpSlaveConfigCacheService {
/**
* 加载单个设备的配置(认证成功后调用)
* <p>
* 从远程 API 获取全量配置,然后按 deviceId 匹配。
* 如果远程获取失败,尝试从 Mock 数据中匹配。
*
* @param deviceId 设备 ID
* @return 设备配置,未找到返回 null
* @return 设备配置
*/
public IotModbusDeviceConfigRespDTO loadDeviceConfig(Long deviceId) {
try {
// 1. 从远程 API 获取全量配置
// TODO @AI等待修复不着急
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getEnabledModbusDeviceConfigs();
if (result != null && result.isSuccess() && result.getData() != null) {
for (IotModbusDeviceConfigRespDTO config : result.getData()) {
// 顺便更新缓存(其他已连接设备也受益)
configCache.put(config.getDeviceId(), config);
if (config.getDeviceId().equals(deviceId)) {
return config;
}
}
// 1. 从远程 API 获取配置
IotModbusDeviceConfigListReqDTO reqDTO = new IotModbusDeviceConfigListReqDTO()
.setStatus(CommonStatusEnum.ENABLE.getStatus())
.setMode(IotModbusModeEnum.POLLING.getMode())
.setProtocolType(IotProtocolTypeEnum.MODBUS_TCP_SLAVE.getType())
.setDeviceIds(Collections.singleton(deviceId));
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getModbusDeviceConfigList(reqDTO);
result.checkError();
IotModbusDeviceConfigRespDTO modbusConfig = CollUtil.getFirst(result.getData());
if (modbusConfig == null) {
log.warn("[loadDeviceConfig][远程获取配置失败,未找到数据, deviceId={}]", deviceId);
return null;
}
// 2. 更新缓存并返回
configCache.put(modbusConfig.getDeviceId(), modbusConfig);
return modbusConfig;
} catch (Exception e) {
log.error("[loadDeviceConfig][从远程获取配置失败, deviceId={}]", deviceId, e);
return null;
}
// 2. 远程未找到,尝试 Mock 数据(仅 mockEnabled=true 时)
// DONE @AI【from codex】【中】Mock 数据已通过 mockEnabled 配置开关控制,线上环境不会污染真实配置。
// TODO @芋艿:测试完成后移除
if (true) {
for (IotModbusDeviceConfigRespDTO mockConfig : buildMockConfigs()) {
configCache.put(mockConfig.getDeviceId(), mockConfig);
if (mockConfig.getDeviceId().equals(deviceId)) {
return mockConfig;
}
}
}
return configCache.get(deviceId);
}
// ==================== 定时刷新(已连接设备) ====================
@@ -96,26 +81,25 @@ public class IotModbusTcpSlaveConfigCacheService {
return Collections.emptyList();
}
try {
// 1. 从远程获取全量配置
// TODO @AI传递 ids 批量查询;需要分批啦;
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getEnabledModbusDeviceConfigs();
List<IotModbusDeviceConfigRespDTO> allConfigs;
if (result != null && result.isSuccess() && result.getData() != null) {
allConfigs = new ArrayList<>(result.getData());
} else {
log.warn("[refreshConnectedDeviceConfigList][获取 Modbus 配置失败: {}]", result);
allConfigs = new ArrayList<>();
}
// 1. 从远程获取已连接设备的配置
CommonResult<List<IotModbusDeviceConfigRespDTO>> result = deviceApi.getModbusDeviceConfigList(
new IotModbusDeviceConfigListReqDTO().setStatus(CommonStatusEnum.ENABLE.getStatus())
.setMode(IotModbusModeEnum.POLLING.getMode())
.setProtocolType(IotProtocolTypeEnum.MODBUS_TCP_SLAVE.getType())
.setDeviceIds(connectedDeviceIds));
List<IotModbusDeviceConfigRespDTO> modbusConfigs = result.getCheckedData();
// 2. 追加 Mock 测试数据(仅 mockEnabled=true 时)
// TODO @芋艿:测试完成后移除
// TODO @claude-code【严重】同上if(true) 导致 mockEnabled 开关失效Mock 数据永远加载
if (true) {
allConfigs.addAll(buildMockConfigs());
modbusConfigs.addAll(buildMockConfigs());
}
// 3. 只保留已连接设备的配置,更新缓存
// 2. 只保留已连接设备的配置,更新缓存
// TODO @AI是不是直接添加到 configCache 缓存(或者覆盖),然后返回 modbusConfigs 就 ok 了?!
List<IotModbusDeviceConfigRespDTO> connectedConfigs = new ArrayList<>();
for (IotModbusDeviceConfigRespDTO config : allConfigs) {
for (IotModbusDeviceConfigRespDTO config : modbusConfigs) {
if (connectedDeviceIds.contains(config.getDeviceId())) {
configCache.put(config.getDeviceId(), config);
connectedConfigs.add(config);
@@ -124,15 +108,7 @@ public class IotModbusTcpSlaveConfigCacheService {
return connectedConfigs;
} catch (Exception e) {
log.error("[refreshConnectedDeviceConfigList][刷新配置失败]", e);
// 降级:返回缓存中已连接设备的配置
List<IotModbusDeviceConfigRespDTO> fallback = new ArrayList<>();
for (Long deviceId : connectedDeviceIds) {
IotModbusDeviceConfigRespDTO config = configCache.get(deviceId);
if (config != null) {
fallback.add(config);
}
}
return fallback;
return null;
}
}
@@ -142,7 +118,12 @@ public class IotModbusTcpSlaveConfigCacheService {
* 获取设备配置
*/
public IotModbusDeviceConfigRespDTO getConfig(Long deviceId) {
return configCache.get(deviceId);
IotModbusDeviceConfigRespDTO config = configCache.get(deviceId);
if (config != null) {
return config;
}
// 缓存未命中,从远程 API 获取
return loadDeviceConfig(deviceId);
}
/**
@@ -169,7 +150,7 @@ public class IotModbusTcpSlaveConfigCacheService {
config.setDeviceName("small");
config.setSlaveId(1);
config.setMode(1); // 云端轮询
config.setFrameFormat("modbus_tcp");
config.setFrameFormat(IotModbusFrameFormatEnum.MODBUS_TCP.getFormat());
// 点位列表
List<IotModbusPointRespDTO> points = new ArrayList<>();

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import lombok.Data;
@@ -129,21 +130,25 @@ public class IotModbusTcpSlaveConnectionManager {
/**
* 发送数据到设备
*
* @return 发送结果 Future
*/
public void sendToDevice(Long deviceId, byte[] data) {
public Future<Void> sendToDevice(Long deviceId, byte[] data) {
NetSocket socket = deviceSocketMap.get(deviceId);
if (socket == null) {
log.warn("[sendToDevice][设备 {} 没有连接]", deviceId);
return;
return Future.failedFuture("设备 " + deviceId + " 没有连接");
}
sendToSocket(socket, data);
return sendToSocket(socket, data);
}
/**
* 发送数据到指定 socket
*
* @return 发送结果 Future
*/
public void sendToSocket(NetSocket socket, byte[] data) {
socket.write(Buffer.buffer(data));
public Future<Void> sendToSocket(NetSocket socket, byte[] data) {
return socket.write(Buffer.buffer(data));
}
/**

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -76,8 +77,9 @@ public class IotModbusTcpSlavePendingRequestManager {
if (frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP && frame.getTransactionId() != null) {
return matchByTransactionId(queue, frame.getTransactionId());
}
// RTU 模式FIFO匹配 slaveId + functionCode
return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode());
// RTU 模式FIFO匹配 slaveId + functionCode + registerCount
int responseRegisterCount = extractRegisterCountFromResponse(frame);
return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode(), responseRegisterCount);
}
/**
@@ -96,13 +98,16 @@ public class IotModbusTcpSlavePendingRequestManager {
}
/**
* 按 FIFO 匹配
* 按 FIFO 匹配slaveId + functionCode + registerCount
*/
private PendingRequest matchByFifo(Deque<PendingRequest> queue, int slaveId, int functionCode) {
private PendingRequest matchByFifo(Deque<PendingRequest> queue, int slaveId, int functionCode,
int responseRegisterCount) {
Iterator<PendingRequest> it = queue.iterator();
while (it.hasNext()) {
PendingRequest req = it.next();
if (req.getSlaveId() == slaveId && req.getFunctionCode() == functionCode) {
if (req.getSlaveId() == slaveId
&& req.getFunctionCode() == functionCode
&& (responseRegisterCount <= 0 || req.getRegisterCount() == responseRegisterCount)) {
it.remove();
return req;
}
@@ -110,6 +115,29 @@ public class IotModbusTcpSlavePendingRequestManager {
return null;
}
// TODO @AI是不是放到 modbus 工具类里,更合适?
/**
* 从响应帧中提取 registerCount通过 PDU 的 byteCount 推断)
*
* @return registerCount无法提取时返回 -1匹配时跳过校验
*/
private int extractRegisterCountFromResponse(IotModbusFrame frame) {
byte[] pdu = frame.getPdu();
if (pdu == null || pdu.length < 1) {
return -1;
}
int byteCount = pdu[0] & 0xFF;
int fc = frame.getFunctionCode();
// FC03/04 寄存器读响应registerCount = byteCount / 2
if (fc == IotModbusCommonUtils.FC_READ_HOLDING_REGISTERS
|| fc == IotModbusCommonUtils.FC_READ_INPUT_REGISTERS) {
return byteCount / 2;
}
// FC01/02 线圈/离散输入读响应registerCount = byteCount * 8线圈数量
// 但因为按 bit 打包有余位,无法精确反推,返回 -1 跳过校验
return -1;
}
/**
* 清理过期请求
*/

View File

@@ -55,6 +55,7 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul
* 轮询单个点位
*/
@Override
@SuppressWarnings("DuplicatedCode")
protected void pollPoint(Long deviceId, Long pointId) {
// 1.1 从 configCache 获取最新配置
IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId);
@@ -78,7 +79,7 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul
// 2.2 获取 slave ID
IotModbusFrameFormatEnum frameFormat = connection.getFrameFormat();
Assert.notNull(frameFormat, "设备 {} 的帧格式不能为空", deviceId);
int slaveId = connection.getSlaveId();
Integer slaveId = connection.getSlaveId();
Assert.notNull(connection.getSlaveId(), "设备 {} 的 slaveId 不能为空", deviceId);
// 3.1 编码读请求
@@ -96,10 +97,13 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul
System.currentTimeMillis() + requestTimeout);
pendingRequestManager.addRequest(pendingRequest);
// 3.3 发送读请求
connectionManager.sendToDevice(deviceId, data);
connectionManager.sendToDevice(deviceId, data).onSuccess(v ->
log.debug("[pollPoint][设备={}, 点位={}, FC={}, 地址={}, 数量={}]",
deviceId, point.getIdentifier(), point.getFunctionCode(),
point.getRegisterAddress(), point.getRegisterCount());
point.getRegisterAddress(), point.getRegisterCount())
).onFailure(e ->
log.warn("[pollPoint][发送失败, 设备={}, 点位={}]", deviceId, point.getIdentifier(), e)
);
}
}

View File

@@ -0,0 +1,6 @@
/**
* Modbus TCP Slave从站协议设备主动连接网关自定义 FC65 认证后由网关云端轮询
* <p>
* TCP Server 模式,支持 MODBUS_TCP / MODBUS_RTU 帧格式自动检测
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave;

View File

@@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
@@ -64,8 +65,8 @@ public class IotDeviceApiImpl implements IotDeviceCommonApi {
}
@Override
public CommonResult<List<IotModbusDeviceConfigRespDTO>> getEnabledModbusDeviceConfigs() {
return doPost("/rpc-api/iot/modbus/enabled-configs", null, new ParameterizedTypeReference<>() { });
public CommonResult<List<IotModbusDeviceConfigRespDTO>> getModbusDeviceConfigList(IotModbusDeviceConfigListReqDTO listReqDTO) {
return doPost("/rpc-api/iot/modbus/config-list", listReqDTO, new ParameterizedTypeReference<>() { });
}
@Override

View File

@@ -167,7 +167,7 @@ yudao:
# 针对引入的 Modbus TCP Master 组件的配置
# ====================================
- id: modbus-tcp-master-1
enabled: false
enabled: true
protocol: modbus_tcp_master
port: 502
modbus-tcp-master:

View File

@@ -67,9 +67,9 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
// ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) =====================
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3";
private static final String PRODUCT_KEY = "modbus_tcp_slave_product_demo";
private static final String DEVICE_NAME = "modbus_tcp_slave_device_demo_tcp";
private static final String DEVICE_SECRET = "8e4adeb3d25342ab88643421d3fba3f6";
@BeforeAll
static void setUp() {
@@ -128,7 +128,6 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
// 2. 设置持续监听:每收到一个读请求,自动回复
log.info("[testPollingResponse][开始持续监听网关下发的读请求...]");
CompletableFuture<Void> done = new CompletableFuture<>();
RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> {
log.info("[testPollingResponse][收到请求: slaveId={}, FC={}, transactionId={}]",
frame.getSlaveId(), frame.getFunctionCode(), frame.getTransactionId());
@@ -201,6 +200,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest {
*/
private IotModbusFrame authenticate(NetSocket socket) throws Exception {
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
authInfo.setClientId(""); // 特殊:考虑到 modbus 消息长度限制,默认 clientId 不发送
byte[] authFrame = buildAuthFrame(authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
return sendAndReceive(socket, authFrame);
}