diff --git a/yudao-module-bpm/src/main/java/cn/iocoder/yudao/module/bpm/service/task/BpmTaskServiceImpl.java b/yudao-module-bpm/src/main/java/cn/iocoder/yudao/module/bpm/service/task/BpmTaskServiceImpl.java index a3f54b6988..318fa26a3a 100644 --- a/yudao-module-bpm/src/main/java/cn/iocoder/yudao/module/bpm/service/task/BpmTaskServiceImpl.java +++ b/yudao-module-bpm/src/main/java/cn/iocoder/yudao/module/bpm/service/task/BpmTaskServiceImpl.java @@ -116,6 +116,7 @@ public class BpmTaskServiceImpl implements BpmTaskService { .taskAssignee(String.valueOf(userId)) // 分配给自己 .active() .includeProcessVariables() + .taskTenantId(FlowableUtils.getTenantId()) .orderByTaskCreateTime().desc(); // 创建时间倒序 if (StrUtil.isNotBlank(pageVO.getName())) { taskQuery.taskNameLike("%" + pageVO.getName() + "%"); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java index f7d515df96..e527242fb3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java @@ -31,4 +31,7 @@ public class IotDevicePageReqVO extends PageParam { @Schema(description = "设备分组编号", example = "1024") private Long groupId; + @Schema(description = "网关设备 ID", example = "16380") + private Long gatewayId; + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/data/sink/IotDataSinkPageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/data/sink/IotDataSinkPageReqVO.java index 06bbecc894..8a8fcdef3d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/data/sink/IotDataSinkPageReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/data/sink/IotDataSinkPageReqVO.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink; import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; import cn.iocoder.yudao.framework.common.pojo.PageParam; import cn.iocoder.yudao.framework.common.validation.InEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import org.springframework.format.annotation.DateTimeFormat; @@ -22,6 +23,10 @@ public class IotDataSinkPageReqVO extends PageParam { @InEnum(CommonStatusEnum.class) private Integer status; + @Schema(description = "数据目的类型", example = "1") + @InEnum(IotDataSinkTypeEnum.class) + private Integer type; + @Schema(description = "创建时间") @DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND) private LocalDateTime[] createTime; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java index 94aa1eb5a3..ecf87db7a7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java @@ -21,6 +21,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.time.LocalDateTime; import java.util.List; /** @@ -56,6 +57,11 @@ public class IotSceneRuleDO extends TenantBaseDO { */ private Integer status; + /** + * 最后触发时间 + */ + private LocalDateTime lastTriggerTime; + /** * 场景定义配置 */ diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java index 3d96f11ceb..513a987f2f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java @@ -10,6 +10,35 @@ import lombok.Data; @Data public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig { + /** + * 默认连接超时时间(毫秒) + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000; + /** + * 默认读取超时时间(毫秒) + */ + public static final int DEFAULT_READ_TIMEOUT_MS = 10000; + /** + * 默认是否启用 SSL + */ + public static final boolean DEFAULT_SSL = false; + /** + * 默认数据格式 + */ + public static final String DEFAULT_DATA_FORMAT = "JSON"; + /** + * 默认心跳间隔时间(毫秒) + */ + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L; + /** + * 默认重连间隔时间(毫秒) + */ + public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L; + /** + * 默认最大重连次数 + */ + public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3; + /** * TCP 服务器地址 */ @@ -23,17 +52,17 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig { /** * 连接超时时间(毫秒) */ - private Integer connectTimeoutMs = 5000; + private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS; /** * 读取超时时间(毫秒) */ - private Integer readTimeoutMs = 10000; + private Integer readTimeoutMs = DEFAULT_READ_TIMEOUT_MS; /** * 是否启用 SSL */ - private Boolean ssl = false; + private Boolean ssl = DEFAULT_SSL; /** * SSL 证书路径(当 ssl=true 时需要) @@ -43,21 +72,21 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig { /** * 数据格式:JSON 或 BINARY */ - private String dataFormat = "JSON"; + private String dataFormat = DEFAULT_DATA_FORMAT; /** * 心跳间隔时间(毫秒),0 表示不启用心跳 */ - private Long heartbeatIntervalMs = 30000L; + private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; /** * 重连间隔时间(毫秒) */ - private Long reconnectIntervalMs = 5000L; + private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS; /** * 最大重连次数 */ - private Integer maxReconnectAttempts = 3; + private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java index f1b7e86d86..55514da7c8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java @@ -13,6 +13,51 @@ import lombok.Data; @Data public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig { + /** + * 默认连接超时时间(毫秒) + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000; + /** + * 默认发送超时时间(毫秒) + */ + public static final int DEFAULT_SEND_TIMEOUT_MS = 10000; + /** + * 默认心跳间隔时间(毫秒) + */ + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L; + /** + * 默认心跳消息内容 + */ + public static final String DEFAULT_HEARTBEAT_MESSAGE = "{\"type\":\"heartbeat\"}"; + /** + * 默认是否启用 SSL 证书验证 + */ + public static final boolean DEFAULT_VERIFY_SSL_CERT = true; + /** + * 默认数据格式 + */ + public static final String DEFAULT_DATA_FORMAT = "JSON"; + /** + * 默认重连间隔时间(毫秒) + */ + public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L; + /** + * 默认最大重连次数 + */ + public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3; + /** + * 默认是否启用压缩 + */ + public static final boolean DEFAULT_ENABLE_COMPRESSION = false; + /** + * 默认消息发送重试次数 + */ + public static final int DEFAULT_SEND_RETRY_COUNT = 1; + /** + * 默认消息发送重试间隔(毫秒) + */ + public static final long DEFAULT_SEND_RETRY_INTERVAL_MS = 1000L; + /** * WebSocket 服务器地址 * 例如:ws://localhost:8080/ws 或 wss://example.com/ws @@ -22,22 +67,22 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig { /** * 连接超时时间(毫秒) */ - private Integer connectTimeoutMs = 5000; + private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS; /** * 发送超时时间(毫秒) */ - private Integer sendTimeoutMs = 10000; + private Integer sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS; /** * 心跳间隔时间(毫秒),0 表示不启用心跳 */ - private Long heartbeatIntervalMs = 30000L; + private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; /** * 心跳消息内容(JSON 格式) */ - private String heartbeatMessage = "{\"type\":\"heartbeat\"}"; + private String heartbeatMessage = DEFAULT_HEARTBEAT_MESSAGE; /** * 子协议列表(逗号分隔) @@ -52,36 +97,36 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig { /** * 是否启用 SSL 证书验证(仅对 wss:// 生效) */ - private Boolean verifySslCert = true; + private Boolean verifySslCert = DEFAULT_VERIFY_SSL_CERT; /** * 数据格式:JSON 或 TEXT */ - private String dataFormat = "JSON"; + private String dataFormat = DEFAULT_DATA_FORMAT; /** * 重连间隔时间(毫秒) */ - private Long reconnectIntervalMs = 5000L; + private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS; /** * 最大重连次数 */ - private Integer maxReconnectAttempts = 3; + private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS; /** * 是否启用压缩 */ - private Boolean enableCompression = false; + private Boolean enableCompression = DEFAULT_ENABLE_COMPRESSION; /** * 消息发送重试次数 */ - private Integer sendRetryCount = 1; + private Integer sendRetryCount = DEFAULT_SEND_RETRY_COUNT; /** * 消息发送重试间隔(毫秒) */ - private Long sendRetryIntervalMs = 1000L; + private Long sendRetryIntervalMs = DEFAULT_SEND_RETRY_INTERVAL_MS; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceMapper.java index ae21b3cc3c..c982cc0c03 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceMapper.java @@ -31,6 +31,7 @@ public interface IotDeviceMapper extends BaseMapperX { .eqIfPresent(IotDeviceDO::getDeviceType, reqVO.getDeviceType()) .likeIfPresent(IotDeviceDO::getNickname, reqVO.getNickname()) .eqIfPresent(IotDeviceDO::getState, reqVO.getStatus()) + .eqIfPresent(IotDeviceDO::getGatewayId, reqVO.getGatewayId()) .apply(ObjectUtil.isNotNull(reqVO.getGroupId()), "FIND_IN_SET(" + reqVO.getGroupId() + ",group_ids) > 0") .orderByDesc(IotDeviceDO::getId)); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java index 7c0c17d3bc..ce2eeb04bc 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataRuleMapper.java @@ -35,4 +35,8 @@ public interface IotDataRuleMapper extends BaseMapperX { return selectList(IotDataRuleDO::getStatus, status); } + default IotDataRuleDO selectByName(String name) { + return selectOne(IotDataRuleDO::getName, name); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataSinkMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataSinkMapper.java index e65001db86..57e2a84595 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataSinkMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataSinkMapper.java @@ -21,6 +21,7 @@ public interface IotDataSinkMapper extends BaseMapperX { return selectPage(reqVO, new LambdaQueryWrapperX() .likeIfPresent(IotDataSinkDO::getName, reqVO.getName()) .eqIfPresent(IotDataSinkDO::getStatus, reqVO.getStatus()) + .eqIfPresent(IotDataSinkDO::getType, reqVO.getType()) .betweenIfPresent(IotDataSinkDO::getCreateTime, reqVO.getCreateTime()) .orderByDesc(IotDataSinkDO::getId)); } @@ -29,4 +30,8 @@ public interface IotDataSinkMapper extends BaseMapperX { return selectList(IotDataSinkDO::getStatus, status); } + default IotDataSinkDO selectByName(String name) { + return selectOne(IotDataSinkDO::getName, name); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index d1cf60e206..025d61390e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -65,10 +65,12 @@ public interface ErrorCodeConstants { // ========== IoT 数据流转规则 1-050-010-000 ========== ErrorCode DATA_RULE_NOT_EXISTS = new ErrorCode(1_050_010_000, "数据流转规则不存在"); + ErrorCode DATA_RULE_NAME_EXISTS = new ErrorCode(1_050_010_001, "数据流转规则名称已存在"); // ========== IoT 数据流转目的 1-050-011-000 ========== ErrorCode DATA_SINK_NOT_EXISTS = new ErrorCode(1_050_011_000, "数据桥梁不存在"); ErrorCode DATA_SINK_DELETE_FAIL_USED_BY_RULE = new ErrorCode(1_050_011_001, "数据流转目的正在被数据流转规则使用,无法删除"); + ErrorCode DATA_SINK_NAME_EXISTS = new ErrorCode(1_050_011_002, "数据流转目的名称已存在"); // ========== IoT 场景联动 1-050-012-000 ========== ErrorCode RULE_SCENE_NOT_EXISTS = new ErrorCode(1_050_012_000, "场景联动不存在"); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java index 45a557db61..440fab5f53 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java @@ -16,8 +16,8 @@ import java.util.Arrays; public enum IotDataSinkTypeEnum implements ArrayValuable { HTTP(1, "HTTP"), - TCP(2, "TCP"), // TODO @puhui999:待实现; - WEBSOCKET(3, "WebSocket"), // TODO @puhui999:待实现; + TCP(2, "TCP"), + WEBSOCKET(3, "WebSocket"), MQTT(10, "MQTT"), // TODO 待实现; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java index 7c1bdc8df5..275ad4b065 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java @@ -382,7 +382,7 @@ public class IotDeviceServiceImpl implements IotDeviceService { return; } // 2.2.2 如果存在,判断是否允许更新 - if (updateSupport) { + if (!updateSupport) { throw exception(DEVICE_KEY_EXISTS); } updateDevice(new IotDeviceSaveReqVO().setId(existDevice.getId()) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index 4acd844bd9..8c3477313f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.service.device.property; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.convert.Convert; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; @@ -145,6 +146,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) { // 特殊:STRUCT 和 ARRAY 类型,在 TDengine 里,有没对应数据类型,只能通过 JSON 来存储 properties.put((String) key, JsonUtils.toJsonString(value)); + } else if (IotDataSpecsDataTypeEnum.DOUBLE.getDataType().equals(thingModel.getProperty().getDataType())) { + properties.put((String) key, Convert.toDouble(value)); + } else if (IotDataSpecsDataTypeEnum.FLOAT.getDataType().equals(thingModel.getProperty().getDataType())) { + properties.put((String) key, Convert.toFloat(value)); + } else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(thingModel.getProperty().getDataType())) { + properties.put((String) key, Convert.toByte(value)); } else { properties.put((String) key, value); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java index 3301315731..8c4d74ab07 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataRuleServiceImpl.java @@ -32,6 +32,7 @@ import java.util.*; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NAME_EXISTS; import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NOT_EXISTS; /** @@ -62,6 +63,8 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { @Override @CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true) public Long createDataRule(IotDataRuleSaveReqVO createReqVO) { + // 校验名称唯一 + validateDataRuleNameUnique(null, createReqVO.getName()); // 校验数据源配置和数据目的 validateDataRuleConfig(createReqVO); // 新增 @@ -75,6 +78,8 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { public void updateDataRule(IotDataRuleSaveReqVO updateReqVO) { // 校验存在 validateDataRuleExists(updateReqVO.getId()); + // 校验名称唯一 + validateDataRuleNameUnique(updateReqVO.getId(), updateReqVO.getName()); // 校验数据源配置和数据目的 validateDataRuleConfig(updateReqVO); @@ -98,6 +103,29 @@ public class IotDataRuleServiceImpl implements IotDataRuleService { } } + /** + * 校验数据流转规则名称唯一性 + * + * @param id 数据流转规则编号(用于更新时排除自身) + * @param name 数据流转规则名称 + */ + private void validateDataRuleNameUnique(Long id, String name) { + if (StrUtil.isBlank(name)) { + return; + } + IotDataRuleDO dataRule = dataRuleMapper.selectByName(name); + if (dataRule == null) { + return; + } + // 如果 id 为空,说明不用比较是否为相同 id 的规则 + if (id == null) { + throw exception(DATA_RULE_NAME_EXISTS); + } + if (!dataRule.getId().equals(id)) { + throw exception(DATA_RULE_NAME_EXISTS); + } + } + /** * 校验数据流转规则配置 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java index 53453c9896..15d7e99b1c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/IotDataSinkServiceImpl.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSinkPageReqVO; @@ -19,6 +20,7 @@ import java.util.List; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_DELETE_FAIL_USED_BY_RULE; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NAME_EXISTS; import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS; /** @@ -39,6 +41,9 @@ public class IotDataSinkServiceImpl implements IotDataSinkService { @Override public Long createDataSink(IotDataSinkSaveReqVO createReqVO) { + // 校验名称唯一 + validateDataSinkNameUnique(null, createReqVO.getName()); + // 新增 IotDataSinkDO dataBridge = BeanUtils.toBean(createReqVO, IotDataSinkDO.class); dataSinkMapper.insert(dataBridge); return dataBridge.getId(); @@ -48,6 +53,8 @@ public class IotDataSinkServiceImpl implements IotDataSinkService { public void updateDataSink(IotDataSinkSaveReqVO updateReqVO) { // 校验存在 validateDataBridgeExists(updateReqVO.getId()); + // 校验名称唯一 + validateDataSinkNameUnique(updateReqVO.getId(), updateReqVO.getName()); // 更新 IotDataSinkDO updateObj = BeanUtils.toBean(updateReqVO, IotDataSinkDO.class); dataSinkMapper.updateById(updateObj); @@ -71,6 +78,29 @@ public class IotDataSinkServiceImpl implements IotDataSinkService { } } + /** + * 校验数据流转目的名称唯一性 + * + * @param id 数据流转目的编号(用于更新时排除自身) + * @param name 数据流转目的名称 + */ + private void validateDataSinkNameUnique(Long id, String name) { + if (StrUtil.isBlank(name)) { + return; + } + IotDataSinkDO dataSink = dataSinkMapper.selectByName(name); + if (dataSink == null) { + return; + } + // 如果 id 为空,说明不用比较是否为相同 id 的目的 + if (id == null) { + throw exception(DATA_SINK_NAME_EXISTS); + } + if (!dataSink.getId().equals(id)) { + throw exception(DATA_SINK_NAME_EXISTS); + } + } + @Override public IotDataSinkDO getDataSink(Long id) { return dataSinkMapper.selectById(id); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java index 4db6dc205a..53a3b71480 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java @@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.time.Duration; - /** * TCP 的 {@link IotDataRuleAction} 实现类 *

@@ -23,9 +21,6 @@ import java.time.Duration; public class IotTcpDataRuleAction extends IotDataRuleCacheableAction { - private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5); - private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10); - @Override public Integer getType() { return IotDataSinkTypeEnum.TCP.getType(); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java new file mode 100644 index 0000000000..c0445df906 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -0,0 +1,85 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; +import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * WebSocket 的 {@link IotDataRuleAction} 实现类 + *

+ * 负责将设备消息发送到外部 WebSocket 服务器 + * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 + * 使用连接池管理 WebSocket 连接,提高性能和资源利用率 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotWebSocketDataRuleAction extends + IotDataRuleCacheableAction { + + @Override + public Integer getType() { + return IotDataSinkTypeEnum.WEBSOCKET.getType(); + } + + @Override + protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception { + // 1. 参数校验 + if (StrUtil.isBlank(config.getServerUrl())) { + throw new IllegalArgumentException("WebSocket 服务器地址不能为空"); + } + if (!StrUtil.startWithAny(config.getServerUrl(), "ws://", "wss://")) { + throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头"); + } + + // 2.1 创建 WebSocket 客户端 + IotWebSocketClient webSocketClient = new IotWebSocketClient( + config.getServerUrl(), + config.getConnectTimeoutMs(), + config.getSendTimeoutMs(), + config.getDataFormat() + ); + // 2.2 连接服务器 + webSocketClient.connect(); + log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]", + config.getServerUrl(), config.getDataFormat()); + return webSocketClient; + } + + @Override + protected void closeProducer(IotWebSocketClient producer) throws Exception { + if (producer != null) { + producer.close(); + } + } + + @Override + protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception { + try { + // 1.1 获取或创建 WebSocket 客户端 + // TODO @puhui999:需要加锁,保证必须连接上; + IotWebSocketClient webSocketClient = getProducer(config); + // 1.2 检查连接状态,如果断开则重新连接 + if (!webSocketClient.isConnected()) { + log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); + webSocketClient.connect(); + } + + // 2.1 发送消息 + webSocketClient.sendMessage(message); + // 2.2 记录发送成功日志 + log.info("[execute][message({}) config({}) 发送成功,WebSocket 服务器: {}]", + message, config, config.getServerUrl()); + } catch (Exception e) { + log.error("[execute][message({}) config({}) 发送失败,WebSocket 服务器: {}]", + message, config, config.getServerUrl(), e); + throw e; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java index 1618532a4a..15b57b5405 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig; import lombok.extern.slf4j.Slf4j; import javax.net.ssl.SSLSocketFactory; @@ -30,6 +31,7 @@ public class IotTcpClient { private final Integer connectTimeoutMs; private final Integer readTimeoutMs; private final Boolean ssl; + // TODO @puhui999:sslCertPath 是不是没在用? private final String sslCertPath; private final String dataFormat; @@ -38,16 +40,16 @@ public class IotTcpClient { private BufferedReader reader; private final AtomicBoolean connected = new AtomicBoolean(false); - // TODO @puhui999:default 值,IotDataSinkTcpConfig.java 枚举起来哈; public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs, Boolean ssl, String sslCertPath, String dataFormat) { this.host = host; this.port = port; - this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000; - this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000; - this.ssl = ssl != null ? ssl : false; + this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS; + this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS; + this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL; this.sslCertPath = sslCertPath; - this.dataFormat = dataFormat != null ? dataFormat : "JSON"; + // TODO @puhui999:可以使用 StrUtil.defaultIfBlank 方法简化 + this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT; } /** @@ -99,9 +101,8 @@ public class IotTcpClient { } try { - // TODO @puhui999:枚举值 String messageData; - if ("JSON".equalsIgnoreCase(dataFormat)) { + if (IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) { // JSON 格式 messageData = JsonUtils.toJsonString(message); } else { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java new file mode 100644 index 0000000000..2f55d6ee74 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java @@ -0,0 +1,177 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; +import lombok.extern.slf4j.Slf4j; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * IoT WebSocket 客户端 + *

+ * 负责与外部 WebSocket 服务器建立连接并发送设备消息 + * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 + * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现 + * + * @author HUIHUI + */ +@Slf4j +public class IotWebSocketClient implements WebSocket.Listener { + + private final String serverUrl; + private final Integer connectTimeoutMs; + private final Integer sendTimeoutMs; + private final String dataFormat; + + private WebSocket webSocket; + private final AtomicBoolean connected = new AtomicBoolean(false); + private final StringBuilder messageBuffer = new StringBuilder(); + + public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) { + this.serverUrl = serverUrl; + this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS; + this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_SEND_TIMEOUT_MS; + this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT; + } + + /** + * 连接到 WebSocket 服务器 + */ + @SuppressWarnings("resource") + public void connect() throws Exception { + if (connected.get()) { + log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]"); + return; + } + + try { + HttpClient httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + .build(); + + CompletableFuture future = httpClient.newWebSocketBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + .buildAsync(URI.create(serverUrl), this); + + // 等待连接完成 + webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); + connected.set(true); + log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl); + } catch (Exception e) { + close(); + log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e); + throw e; + } + } + + @Override + public void onOpen(WebSocket webSocket) { + log.debug("[onOpen][WebSocket 连接已打开]"); + webSocket.request(1); + } + + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + messageBuffer.append(data); + if (last) { + log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer); + messageBuffer.setLength(0); + } + webSocket.request(1); + return null; + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + connected.set(false); + log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason); + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + connected.set(false); + log.error("[onError][WebSocket 发生错误]", error); + } + + /** + * 发送设备消息 + * + * @param message 设备消息 + * @throws Exception 发送异常 + */ + public void sendMessage(IotDeviceMessage message) throws Exception { + if (!connected.get() || webSocket == null) { + throw new IllegalStateException("WebSocket 客户端未连接"); + } + + try { + String messageData; + if (IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) { + messageData = JsonUtils.toJsonString(message); + } else { + messageData = message.toString(); + } + + // 发送消息并等待完成 + CompletableFuture future = webSocket.sendText(messageData, true); + future.get(sendTimeoutMs, TimeUnit.MILLISECONDS); + log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]", + message.getDeviceId(), messageData.length()); + } catch (Exception e) { + log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e); + throw e; + } + } + + /** + * 关闭连接 + */ + public void close() { + if (!connected.get() && webSocket == null) { + return; + } + + try { + if (webSocket != null) { + webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭") + .orTimeout(5, TimeUnit.SECONDS) + .exceptionally(e -> { + log.warn("[close][发送关闭帧失败]", e); + return null; + }); + } + connected.set(false); + log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl); + } catch (Exception e) { + log.error("[close][关闭 WebSocket 客户端连接异常]", e); + } + } + + /** + * 检查连接状态 + * + * @return 是否已连接 + */ + public boolean isConnected() { + return connected.get() && webSocket != null; + } + + @Override + public String toString() { + return "IotWebSocketClient{" + + "serverUrl='" + serverUrl + '\'' + + ", dataFormat='" + dataFormat + '\'' + + ", connected=" + connected.get() + + '}'; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java index 105edf7e0e..f96bc9f450 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java @@ -23,13 +23,14 @@ import cn.iocoder.yudao.module.iot.service.product.IotProductService; import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherManager; import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import javax.annotation.Resource; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; @@ -392,9 +393,25 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService { } }); }); + + // 3. 更新最后触发时间 + updateLastTriggerTime(sceneRule.getId()); }); } + /** + * 更新规则场景的最后触发时间 + * + * @param id 规则场景编号 + */ + private void updateLastTriggerTime(Long id) { + try { + sceneRuleMapper.updateById(new IotSceneRuleDO().setId(id).setLastTriggerTime(LocalDateTime.now())); + } catch (Exception e) { + log.error("[updateLastTriggerTime][规则场景编号({}) 更新最后触发时间异常]", id, e); + } + } + private IotSceneRuleServiceImpl getSelf() { return SpringUtil.getBean(IotSceneRuleServiceImpl.class); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java index 27cb02a1a5..d653c9c42e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java @@ -36,11 +36,12 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM return false; } - // 1.3 检查标识符是否匹配 - String messageIdentifier = IotDeviceMessageUtils.getIdentifier(message); - if (!IotSceneRuleMatcherHelper.isIdentifierMatched(trigger.getIdentifier(), messageIdentifier)) { - IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "标识符不匹配,期望: " + - trigger.getIdentifier() + ", 实际: " + messageIdentifier); + // 1.3 检查消息中是否包含触发器指定的属性标识符 + // 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中 + // TODO @puhui999:可以考虑 notXXX 方法,简化代码(尽量取反) + if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " + + trigger.getIdentifier()); return false; } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java index 65165425c8..5c1ac26005 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java @@ -5,6 +5,7 @@ import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.system.SystemUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -69,6 +70,55 @@ public class IotDeviceMessageUtils { return null; } + /** + * 判断消息中是否包含指定的标识符 + * + * 对于不同消息类型的处理: + * - EVENT_POST/SERVICE_INVOKE:检查 params.identifier 是否匹配 + * - STATE_UPDATE:检查 params.state 是否匹配 + * - PROPERTY_POST:检查 params 中是否包含该属性 key + * + * @param message 消息 + * @param identifier 要检查的标识符 + * @return 是否包含 + */ + public static boolean containsIdentifier(IotDeviceMessage message, String identifier) { + if (message.getParams() == null || StrUtil.isBlank(identifier)) { + return false; + } + // EVENT_POST / SERVICE_INVOKE / STATE_UPDATE:使用原有逻辑 + String messageIdentifier = getIdentifier(message); + if (messageIdentifier != null) { + return identifier.equals(messageIdentifier); + } + // PROPERTY_POST:检查 params 中是否包含该属性 key + if (StrUtil.equals(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) { + Map params = parseParamsToMap(message.getParams()); + return params != null && params.containsKey(identifier); + } + return false; + } + + /** + * 将 params 解析为 Map + * + * @param params 参数(可能是 Map 或 JSON 字符串) + * @return Map,解析失败返回 null + */ + @SuppressWarnings("unchecked") + private static Map parseParamsToMap(Object params) { + if (params instanceof Map) { + return (Map) params; + } + if (params instanceof String) { + try { + return JsonUtils.parseObject((String) params, Map.class); + } catch (Exception ignored) { + } + } + return null; + } + /** * 从设备消息中提取指定标识符的属性值 * - 支持多种消息格式和属性值提取策略 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 4b9c3af32c..3e573efdde 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -10,6 +10,10 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; @@ -17,6 +21,7 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -55,20 +60,20 @@ public class IotGatewayConfiguration { @Slf4j public static class EmqxProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "emqxVertx", destroyMethod = "close") public Vertx emqxVertx() { return Vertx.vertx(); } @Bean public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties, - Vertx emqxVertx) { + @Qualifier("emqxVertx") Vertx emqxVertx) { return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @Bean public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties, - Vertx emqxVertx) { + @Qualifier("emqxVertx") Vertx emqxVertx) { return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @@ -87,7 +92,7 @@ public class IotGatewayConfiguration { @Slf4j public static class TcpProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "tcpVertx", destroyMethod = "close") public Vertx tcpVertx() { return Vertx.vertx(); } @@ -97,7 +102,7 @@ public class IotGatewayConfiguration { IotDeviceService deviceService, IotDeviceMessageService messageService, IotTcpConnectionManager connectionManager, - Vertx tcpVertx) { + @Qualifier("tcpVertx") Vertx tcpVertx) { return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(), deviceService, messageService, connectionManager, tcpVertx); } @@ -122,7 +127,7 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "mqttVertx", destroyMethod = "close") public Vertx mqttVertx() { return Vertx.vertx(); } @@ -131,7 +136,7 @@ public class IotGatewayConfiguration { public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties, IotDeviceMessageService messageService, IotMqttConnectionManager connectionManager, - Vertx mqttVertx) { + @Qualifier("mqttVertx") Vertx mqttVertx) { return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService, connectionManager, mqttVertx); } @@ -151,4 +156,42 @@ public class IotGatewayConfiguration { } + /** + * IoT 网关 MQTT WebSocket 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt-ws", name = "enabled", havingValue = "true") + @Slf4j + public static class MqttWsProtocolConfiguration { + + @Bean(name = "mqttWsVertx", destroyMethod = "close") + public Vertx mqttWsVertx() { + return Vertx.vertx(); + } + + @Bean + public IotMqttWsUpstreamProtocol iotMqttWsUpstreamProtocol(IotGatewayProperties gatewayProperties, + IotDeviceMessageService messageService, + IotMqttWsConnectionManager connectionManager, + @Qualifier("mqttWsVertx") Vertx mqttWsVertx) { + return new IotMqttWsUpstreamProtocol(gatewayProperties.getProtocol().getMqttWs(), + messageService, connectionManager, mqttWsVertx); + } + + @Bean + public IotMqttWsDownstreamHandler iotMqttWsDownstreamHandler(IotDeviceMessageService messageService, + IotDeviceService deviceService, + IotMqttWsConnectionManager connectionManager) { + return new IotMqttWsDownstreamHandler(messageService, deviceService, connectionManager); + } + + @Bean + public IotMqttWsDownstreamSubscriber iotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol mqttWsUpstreamProtocol, + IotMqttWsDownstreamHandler downstreamHandler, + IotMessageBus messageBus) { + return new IotMqttWsDownstreamSubscriber(mqttWsUpstreamProtocol, downstreamHandler, messageBus); + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 1a2bf82a1e..966dae5262 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -88,6 +88,11 @@ public class IotGatewayProperties { */ private MqttProperties mqtt; + /** + * MQTT WebSocket 组件配置 + */ + private MqttWsProperties mqttWs; + } @Data @@ -402,4 +407,100 @@ public class IotGatewayProperties { } + @Data + public static class MqttWsProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * WebSocket 服务器端口(默认:8083) + */ + private Integer port = 8083; + + /** + * WebSocket 路径(默认:/mqtt) + */ + @NotEmpty(message = "WebSocket 路径不能为空") + private String path = "/mqtt"; + + /** + * 最大消息大小(字节) + */ + private Integer maxMessageSize = 8192; + + /** + * 连接超时时间(秒) + */ + private Integer connectTimeoutSeconds = 60; + + /** + * 保持连接超时时间(秒) + */ + private Integer keepAliveTimeoutSeconds = 300; + + /** + * 是否启用 SSL(wss://) + */ + private Boolean sslEnabled = false; + + /** + * SSL 配置 + */ + private SslOptions sslOptions = new SslOptions(); + + /** + * WebSocket 子协议(通常为 "mqtt" 或 "mqttv3.1") + */ + @NotEmpty(message = "WebSocket 子协议不能为空") + private String subProtocol = "mqtt"; + + /** + * 最大帧大小(字节) + */ + private Integer maxFrameSize = 65536; + + /** + * SSL 配置选项 + */ + @Data + public static class SslOptions { + + /** + * 密钥证书选项 + */ + private io.vertx.core.net.KeyCertOptions keyCertOptions; + + /** + * 信任选项 + */ + private io.vertx.core.net.TrustOptions trustOptions; + + /** + * SSL 证书路径 + */ + private String certPath; + + /** + * SSL 私钥路径 + */ + private String keyPath; + + /** + * 信任存储路径 + */ + private String trustStorePath; + + /** + * 信任存储密码 + */ + private String trustStorePassword; + + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsDownstreamSubscriber.java new file mode 100644 index 0000000000..302824d6df --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsDownstreamSubscriber.java @@ -0,0 +1,79 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT MQTT WebSocket 下行消息订阅器 + *

+ * 订阅消息总线的设备下行消息,并通过 WebSocket 发送到设备 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttWsDownstreamSubscriber implements IotMessageSubscriber { + + private final IotMqttWsUpstreamProtocol upstreamProtocol; + private final IotMqttWsDownstreamHandler downstreamHandler; + private final IotMessageBus messageBus; + + public IotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol upstreamProtocol, + IotMqttWsDownstreamHandler downstreamHandler, + IotMessageBus messageBus) { + this.upstreamProtocol = upstreamProtocol; + this.downstreamHandler = downstreamHandler; + this.messageBus = messageBus; + } + + @PostConstruct + public void init() { + messageBus.register(this); + log.info("[init][MQTT WebSocket 下行消息订阅器已启动,topic: {}]", getTopic()); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + log.debug("[onMessage][收到下行消息,deviceId: {},method: {}]", + message.getDeviceId(), message.getMethod()); + try { + // 1. 校验 + String method = message.getMethod(); + if (StrUtil.isBlank(method)) { + log.warn("[onMessage][消息方法为空,deviceId: {}]", message.getDeviceId()); + return; + } + + // 2. 委托给下行处理器处理业务逻辑 + boolean success = downstreamHandler.handleDownstreamMessage(message); + if (success) { + log.debug("[onMessage][下行消息处理成功,deviceId: {},method: {}]", + message.getDeviceId(), message.getMethod()); + } else { + log.warn("[onMessage][下行消息处理失败,deviceId: {},method: {}]", + message.getDeviceId(), message.getMethod()); + } + } catch (Exception e) { + log.error("[onMessage][处理下行消息失败,deviceId: {},method: {}]", + message.getDeviceId(), message.getMethod(), e); + } + } + +} + diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsUpstreamProtocol.java new file mode 100644 index 0000000000..6944d47dad --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/IotMqttWsUpstreamProtocol.java @@ -0,0 +1,146 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws; + +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.ServerWebSocket; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT WebSocket 协议:接收设备上行消息 + *

+ * 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持: + * - 标准 MQTT 3.1.1 协议 + * - WebSocket 协议升级 + * - SSL/TLS 加密(wss://) + * - 设备认证与连接管理 + * - QoS 0/1/2 消息质量保证 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttWsUpstreamProtocol { + + private final IotGatewayProperties.MqttWsProperties mqttWsProperties; + + private final IotDeviceMessageService messageService; + + private final IotMqttWsConnectionManager connectionManager; + + private final Vertx vertx; + + @Getter + private final String serverId; + + private HttpServer httpServer; + + public IotMqttWsUpstreamProtocol(IotGatewayProperties.MqttWsProperties mqttWsProperties, + IotDeviceMessageService messageService, + IotMqttWsConnectionManager connectionManager, + Vertx vertx) { + this.mqttWsProperties = mqttWsProperties; + this.messageService = messageService; + this.connectionManager = connectionManager; + this.vertx = vertx; + this.serverId = IotDeviceMessageUtils.generateServerId(mqttWsProperties.getPort()); + } + + @PostConstruct + public void start() { + // 创建 HTTP 服务器选项 + HttpServerOptions options = new HttpServerOptions() + .setPort(mqttWsProperties.getPort()) + .setIdleTimeout(mqttWsProperties.getKeepAliveTimeoutSeconds()) + .setMaxWebSocketFrameSize(mqttWsProperties.getMaxFrameSize()) + .setMaxWebSocketMessageSize(mqttWsProperties.getMaxMessageSize()) + // 配置 WebSocket 子协议支持 + .addWebSocketSubProtocol(mqttWsProperties.getSubProtocol()); + + // 配置 SSL(如果启用) + if (Boolean.TRUE.equals(mqttWsProperties.getSslEnabled())) { + options.setSsl(true) + .setKeyCertOptions(mqttWsProperties.getSslOptions().getKeyCertOptions()) + .setTrustOptions(mqttWsProperties.getSslOptions().getTrustOptions()); + log.info("[start][MQTT WebSocket 已启用 SSL/TLS (wss://)]"); + } + + // 创建 HTTP 服务器 + httpServer = vertx.createHttpServer(options); + + // 设置 WebSocket 处理器 + httpServer.webSocketHandler(this::handleWebSocketConnection); + + // 启动服务器 + try { + httpServer.listen().result(); + log.info("[start][IoT 网关 MQTT WebSocket 协议启动成功,端口: {},路径: {},支持子协议: {}]", + mqttWsProperties.getPort(), mqttWsProperties.getPath(), + "mqtt, mqttv3.1, " + mqttWsProperties.getSubProtocol()); + } catch (Exception e) { + log.error("[start][IoT 网关 MQTT WebSocket 协议启动失败]", e); + throw e; + } + } + + @PreDestroy + public void stop() { + if (httpServer != null) { + try { + // 关闭所有连接 + connectionManager.closeAllConnections(); + + // 关闭服务器 + httpServer.close().result(); + log.info("[stop][IoT 网关 MQTT WebSocket 协议已停止]"); + } catch (Exception e) { + log.error("[stop][IoT 网关 MQTT WebSocket 协议停止失败]", e); + } + } + } + + /** + * 处理 WebSocket 连接请求 + * + * @param socket WebSocket 连接 + */ + private void handleWebSocketConnection(ServerWebSocket socket) { + String path = socket.path(); + String subProtocol = socket.subProtocol(); + + log.info("[handleWebSocketConnection][收到 WebSocket 连接请求,path: {},subProtocol: {},remoteAddress: {}]", + path, subProtocol, socket.remoteAddress()); + + // 验证路径 + if (!mqttWsProperties.getPath().equals(path)) { + log.warn("[handleWebSocketConnection][WebSocket 路径不匹配,拒绝连接,path: {},期望: {}]", + path, mqttWsProperties.getPath()); + socket.close(); + return; + } + + // 验证子协议 + // Vert.x 已经自动进行了子协议协商,这里只需要验证是否为 MQTT 相关协议 + if (subProtocol != null && !subProtocol.startsWith("mqtt")) { + log.warn("[handleWebSocketConnection][WebSocket 子协议不支持,拒绝连接,subProtocol: {}]", subProtocol); + socket.close(); + return; + } + + log.info("[handleWebSocketConnection][WebSocket 连接已接受,remoteAddress: {},subProtocol: {}]", + socket.remoteAddress(), subProtocol); + + // 创建处理器并处理连接 + IotMqttWsUpstreamHandler handler = new IotMqttWsUpstreamHandler( + this, messageService, connectionManager); + handler.handle(socket); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java new file mode 100644 index 0000000000..fee3e359c8 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/manager/IotMqttWsConnectionManager.java @@ -0,0 +1,259 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager; + +import cn.hutool.core.collection.CollUtil; +import io.vertx.core.http.ServerWebSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * IoT MQTT WebSocket 连接管理器 + * + * @author 芋道源码 + */ +@Slf4j +@Component +public class IotMqttWsConnectionManager { + + /** + * 存储设备连接 + * Key: 设备标识(deviceKey) + * Value: WebSocket 连接 + */ + private final Map connections = new ConcurrentHashMap<>(); + + /** + * 存储设备标识与 Socket ID 的映射 + * Key: 设备标识(deviceKey) + * Value: Socket ID(UUID) + */ + private final Map deviceKeyToSocketId = new ConcurrentHashMap<>(); + + /** + * 存储 Socket ID 与设备标识的映射 + * Key: Socket ID(UUID) + * Value: 设备标识(deviceKey) + */ + private final Map socketIdToDeviceKey = new ConcurrentHashMap<>(); + + /** + * 存储设备订阅的主题 + * Key: 设备标识(deviceKey) + * Value: 订阅的主题集合 + */ + private final Map> deviceSubscriptions = new ConcurrentHashMap<>(); + + /** + * 添加连接 + * + * @param deviceKey 设备标识 + * @param socket WebSocket 连接 + * @param socketId Socket ID(UUID) + */ + public void addConnection(String deviceKey, ServerWebSocket socket, String socketId) { + connections.put(deviceKey, socket); + deviceKeyToSocketId.put(deviceKey, socketId); + socketIdToDeviceKey.put(socketId, deviceKey); + log.info("[addConnection][设备连接已添加,deviceKey: {},socketId: {},当前连接数: {}]", + deviceKey, socketId, connections.size()); + } + + /** + * 移除连接 + * + * @param deviceKey 设备标识 + */ + public void removeConnection(String deviceKey) { + ServerWebSocket socket = connections.remove(deviceKey); + String socketId = deviceKeyToSocketId.remove(deviceKey); + if (socketId != null) { + socketIdToDeviceKey.remove(socketId); + } + if (socket != null) { + log.info("[removeConnection][设备连接已移除,deviceKey: {},socketId: {},当前连接数: {}]", + deviceKey, socketId, connections.size()); + } + } + + /** + * 根据 Socket ID 移除连接 + * + * @param socketId WebSocket 文本框架 ID + */ + public void removeConnectionBySocketId(String socketId) { + String deviceKey = socketIdToDeviceKey.remove(socketId); + if (deviceKey != null) { + connections.remove(deviceKey); + log.info("[removeConnectionBySocketId][设备连接已移除,socketId: {},deviceKey: {},当前连接数: {}]", + socketId, deviceKey, connections.size()); + } + } + + /** + * 获取连接 + * + * @param deviceKey 设备标识 + * @return WebSocket 连接 + */ + public ServerWebSocket getConnection(String deviceKey) { + return connections.get(deviceKey); + } + + /** + * 根据 Socket ID 获取设备标识 + * + * @param socketId WebSocket 文本框架 ID + * @return 设备标识 + */ + public String getDeviceKeyBySocketId(String socketId) { + return socketIdToDeviceKey.get(socketId); + } + + /** + * 检查设备是否在线 + * + * @param deviceKey 设备标识 + * @return 是否在线 + */ + public boolean isOnline(String deviceKey) { + return connections.containsKey(deviceKey); + } + + /** + * 获取当前连接数 + * + * @return 连接数 + */ + public int getConnectionCount() { + return connections.size(); + } + + /** + * 关闭所有连接 + */ + public void closeAllConnections() { + connections.forEach((deviceKey, socket) -> { + try { + socket.close(); + log.info("[closeAllConnections][关闭设备连接,deviceKey: {}]", deviceKey); + } catch (Exception e) { + log.error("[closeAllConnections][关闭设备连接失败,deviceKey: {}]", deviceKey, e); + } + }); + connections.clear(); + deviceKeyToSocketId.clear(); + socketIdToDeviceKey.clear(); + deviceSubscriptions.clear(); + log.info("[closeAllConnections][所有连接已关闭]"); + } + + // ==================== 订阅管理方法 ==================== + + /** + * 添加订阅 + * + * @param deviceKey 设备标识 + * @param topic 订阅主题 + */ + public void addSubscription(String deviceKey, String topic) { + deviceSubscriptions.computeIfAbsent(deviceKey, k -> new CopyOnWriteArraySet<>()).add(topic); + log.debug("[addSubscription][设备订阅主题,deviceKey: {},topic: {}]", deviceKey, topic); + } + + /** + * 移除订阅 + * + * @param deviceKey 设备标识 + * @param topic 订阅主题 + */ + public void removeSubscription(String deviceKey, String topic) { + Set topics = deviceSubscriptions.get(deviceKey); + if (topics != null) { + topics.remove(topic); + log.debug("[removeSubscription][设备取消订阅,deviceKey: {},topic: {}]", deviceKey, topic); + } + } + + /** + * 检查设备是否订阅了指定主题 + * 支持 MQTT 通配符匹配(+ 和 #) + * + * @param deviceKey 设备标识 + * @param topic 发布主题 + * @return 是否匹配 + */ + public boolean isSubscribed(String deviceKey, String topic) { + Set subscriptions = deviceSubscriptions.get(deviceKey); + if (CollUtil.isEmpty(subscriptions)) { + return false; + } + + // 检查是否有匹配的订阅 + for (String subscription : subscriptions) { + if (topicMatches(subscription, topic)) { + return true; + } + } + return false; + } + + /** + * 获取设备的所有订阅 + * + * @param deviceKey 设备标识 + * @return 订阅主题集合 + */ + public Set getSubscriptions(String deviceKey) { + return deviceSubscriptions.get(deviceKey); + } + + // TODO @haohao:这个方法,是不是也可以考虑抽到 IotMqttTopicUtils 里面去哈;感觉更简洁一点? + /** + * MQTT 主题匹配 + * 支持通配符: + * - +:匹配单层主题 + * - #:匹配多层主题(必须在末尾) + * + * @param subscription 订阅主题(可能包含通配符) + * @param topic 发布主题(不包含通配符) + * @return 是否匹配 + */ + private boolean topicMatches(String subscription, String topic) { + // 完全匹配 + if (subscription.equals(topic)) { + return true; + } + + // 不包含通配符 + // TODO @haohao:这里要不要枚举下哈;+ # + if (!subscription.contains("+") && !subscription.contains("#")) { + return false; + } + + String[] subscriptionParts = subscription.split("/"); + String[] topicParts = topic.split("/"); + int i = 0; + for (; i < subscriptionParts.length && i < topicParts.length; i++) { + String subPart = subscriptionParts[i]; + String topicPart = topicParts[i]; + + // # 匹配剩余所有层级,且必须在末尾 + if (subPart.equals("#")) { + return i == subscriptionParts.length - 1; + } + + // 不是通配符且不匹配 + if (!subPart.equals("+") && !subPart.equals(topicPart)) { + return false; + } + } + + // 检查是否都匹配完 + return i == subscriptionParts.length && i == topicParts.length; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/package-info.java new file mode 100644 index 0000000000..b9af4afe3a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/package-info.java @@ -0,0 +1,15 @@ +/** + * IoT 网关 MQTT WebSocket 协议实现 + *

+ * 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持: + * - 标准 MQTT 3.1.1 协议 + * - WebSocket 协议升级 + * - SSL/TLS 加密(wss://) + * - 设备认证与连接管理 + * - QoS 0/1/2 消息质量保证 + * - 双向消息通信(上行/下行) + * + * @author 芋道源码 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws; + diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java new file mode 100644 index 0000000000..3aeb6c5c48 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsDownstreamHandler.java @@ -0,0 +1,221 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.ServerWebSocket; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * IoT MQTT WebSocket 下行消息处理器 + *

+ * 处理从消息总线发送到设备的消息,包括: + * - 属性设置 + * - 服务调用 + * - 事件通知 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttWsDownstreamHandler { + + private final IotDeviceMessageService deviceMessageService; + + private final IotDeviceService deviceService; + + private final IotMqttWsConnectionManager connectionManager; + + /** + * 消息 ID 生成器(用于发布消息) + */ + private final AtomicInteger messageIdGenerator = new AtomicInteger(1); + + public IotMqttWsDownstreamHandler(IotDeviceMessageService deviceMessageService, + IotDeviceService deviceService, + IotMqttWsConnectionManager connectionManager) { + this.deviceMessageService = deviceMessageService; + this.deviceService = deviceService; + this.connectionManager = connectionManager; + } + + /** + * 处理下行消息 + * + * @param message 设备消息 + * @return 是否处理成功 + */ + public boolean handleDownstreamMessage(IotDeviceMessage message) { + try { + // 1. 基础校验 + if (message == null || message.getDeviceId() == null) { + log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]"); + return false; + } + + // 2. 获取设备信息 + IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId()); + if (deviceInfo == null) { + log.warn("[handleDownstreamMessage][设备不存在,设备 ID:{}]", message.getDeviceId()); + return false; + } + + // 3. 构建设备标识 + String deviceKey = deviceInfo.getProductKey() + ":" + deviceInfo.getDeviceName(); + + // 4. 检查设备是否在线 + if (!connectionManager.isOnline(deviceKey)) { + log.warn("[handleDownstreamMessage][设备离线,无法发送消息,deviceKey: {}]", deviceKey); + return false; + } + + // 5. 构建主题 + String topic = buildDownstreamTopic(message, deviceInfo); + if (StrUtil.isBlank(topic)) { + log.warn("[handleDownstreamMessage][主题构建失败,设备 ID:{},方法:{}]", + message.getDeviceId(), message.getMethod()); + return false; + } + + // 6. 检查设备是否订阅了该主题 + if (!connectionManager.isSubscribed(deviceKey, topic)) { + log.warn("[handleDownstreamMessage][设备未订阅该主题,deviceKey: {},topic: {}]", deviceKey, topic); + return false; + } + + // 8. 编码消息 + byte[] payload = deviceMessageService.encodeDeviceMessage(message, + deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + if (payload == null || payload.length == 0) { + log.warn("[handleDownstreamMessage][消息编码失败,设备 ID:{}]", message.getDeviceId()); + return false; + } + + // 9. 发送消息到设备 + return sendMessageToDevice(deviceKey, topic, payload, 1); + } catch (Exception e) { + if (message != null) { + log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID:{},错误:{}]", + message.getDeviceId(), e.getMessage(), e); + } + return false; + } + } + + /** + * 构建下行消息主题 + * + * @param message 设备消息 + * @param deviceInfo 设备信息 + * @return 主题 + */ + private String buildDownstreamTopic(IotDeviceMessage message, IotDeviceRespDTO deviceInfo) { + String method = message.getMethod(); + if (StrUtil.isBlank(method)) { + return null; + } + + // 使用工具类构建主题,支持回复消息处理 + boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); + return IotMqttTopicUtils.buildTopicByMethod(method, deviceInfo.getProductKey(), + deviceInfo.getDeviceName(), isReply); + } + + /** + * 发送消息到设备 + * + * @param deviceKey 设备标识(productKey:deviceName) + * @param topic 主题 + * @param payload 消息内容 + * @param qos QoS 级别 + * @return 是否发送成功 + */ + private boolean sendMessageToDevice(String deviceKey, String topic, byte[] payload, int qos) { + // 获取设备连接 + ServerWebSocket socket = connectionManager.getConnection(deviceKey); + if (socket == null) { + log.warn("[sendMessageToDevice][设备未连接,deviceKey: {}]", deviceKey); + return false; + } + + try { + int messageId = qos > 0 ? generateMessageId() : 0; + + // 手动编码 MQTT PUBLISH 消息 + io.netty.buffer.ByteBuf byteBuf = io.netty.buffer.Unpooled.buffer(); + + // 固定头:消息类型(PUBLISH=3) + DUP(0) + QoS + RETAIN + int fixedHeaderByte1 = 0x30 | (qos << 1); // PUBLISH类型 + byteBuf.writeByte(fixedHeaderByte1); + + // 计算剩余长度 + int topicLength = topic.getBytes().length; + int remainingLength = 2 + topicLength + (qos > 0 ? 2 : 0) + payload.length; + + // 写入剩余长度(简化版本,假设小于 128 字节) + if (remainingLength < 128) { + byteBuf.writeByte(remainingLength); + } else { + // 处理大于 127 的情况 + int x = remainingLength; + do { + int encodedByte = x % 128; + x = x / 128; + if (x > 0) { + encodedByte = encodedByte | 128; + } + byteBuf.writeByte(encodedByte); + } while (x > 0); + } + + // 可变头:主题名称 + byteBuf.writeShort(topicLength); + byteBuf.writeBytes(topic.getBytes()); + + // 可变头:消息 ID(仅 QoS > 0 时) + if (qos > 0) { + byteBuf.writeShort(messageId); + } + + // 有效载荷 + byteBuf.writeBytes(payload); + + // 发送 + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + byteBuf.release(); + socket.writeBinaryMessage(Buffer.buffer(bytes)); + + log.info("[sendMessageToDevice][消息已发送到设备,deviceKey: {},topic: {},qos: {},messageId: {}]", + deviceKey, topic, qos, messageId); + return true; + } catch (Exception e) { + log.error("[sendMessageToDevice][发送消息到设备失败,deviceKey: {},topic: {}]", deviceKey, topic, e); + return false; + } + } + + /** + * 生成消息 ID + * + * @return 消息 ID + */ + private int generateMessageId() { + int id = messageIdGenerator.getAndIncrement(); + // MQTT 消息 ID 范围是 1-65535 + // TODO @haohao:并发可能有问题; + if (id > 65535) { + messageIdGenerator.set(1); + return 1; + } + return id; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java new file mode 100644 index 0000000000..d11d109502 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqttws/router/IotMqttWsUpstreamHandler.java @@ -0,0 +1,753 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router; + +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.mqtt.*; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.ServerWebSocket; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * IoT MQTT WebSocket 上行消息处理器 + *

+ * 处理来自设备的 MQTT 消息,包括: + * - CONNECT:设备连接认证 + * - PUBLISH:设备发布消息 + * - SUBSCRIBE:设备订阅主题 + * - UNSUBSCRIBE:设备取消订阅 + * - PINGREQ:心跳请求 + * - DISCONNECT:设备断开连接 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttWsUpstreamHandler { + + private final IotMqttWsUpstreamProtocol upstreamProtocol; + + private final IotDeviceCommonApi deviceApi; + + private final IotDeviceMessageService messageService; + + private final IotMqttWsConnectionManager connectionManager; + + /** + * 存储 WebSocket 连接到 Socket ID 的映射 + * Key: WebSocket 对象 + * Value: Socket ID(UUID) + */ + private final ConcurrentHashMap socketIdMap = new ConcurrentHashMap<>(); + + /** + * 存储 Socket ID 对应的设备信息 + * Key: Socket ID(UUID) + * Value: 设备信息 + */ + private final ConcurrentHashMap socketDeviceMap = new ConcurrentHashMap<>(); + + /** + * 存储设备的消息 ID 生成器(用于 QoS > 0 的消息) + */ + private final ConcurrentHashMap deviceMessageIdMap = new ConcurrentHashMap<>(); + + /** + * MQTT 解码通道(用于解析 WebSocket 中的 MQTT 二进制消息) + */ + private final ThreadLocal decoderChannelThreadLocal = ThreadLocal + .withInitial(() -> new EmbeddedChannel(new MqttDecoder())); + + /** + * MQTT 编码通道(用于编码 MQTT 响应消息) + */ + private final ThreadLocal encoderChannelThreadLocal = ThreadLocal + .withInitial(() -> new EmbeddedChannel(MqttEncoder.INSTANCE)); + + public IotMqttWsUpstreamHandler(IotMqttWsUpstreamProtocol upstreamProtocol, + IotDeviceMessageService messageService, + IotMqttWsConnectionManager connectionManager) { + this.upstreamProtocol = upstreamProtocol; + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.messageService = messageService; + this.connectionManager = connectionManager; + } + + /** + * 处理 WebSocket 连接 + * + * @param socket WebSocket 连接 + */ + public void handle(ServerWebSocket socket) { + // 生成唯一的 Socket ID(因为 MQTT 使用二进制协议,textHandlerID() 会返回 null) + String socketId = IdUtil.simpleUUID(); + socketIdMap.put(socket, socketId); + + log.info("[handle][WebSocket 连接建立,socketId: {},remoteAddress: {}]", + socketId, socket.remoteAddress()); + + // 设置二进制数据处理器 + socket.binaryMessageHandler(buffer -> { + try { + handleMqttMessage(socket, buffer); + } catch (Exception e) { + log.error("[handle][处理 MQTT 消息异常,socketId: {}]", socketId, e); + socket.close(); + } + }); + + // 设置关闭处理器 + socket.closeHandler(v -> { + socketIdMap.remove(socket); + IotDeviceRespDTO device = socketDeviceMap.remove(socketId); + if (device != null) { + String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); + connectionManager.removeConnection(deviceKey); + deviceMessageIdMap.remove(deviceKey); + // 发送设备离线消息 + sendOfflineMessage(device); + log.info("[handle][WebSocket 连接关闭,deviceKey: {},socketId: {}]", deviceKey, socketId); + } + }); + + // 设置异常处理器 + socket.exceptionHandler(e -> { + log.error("[handle][WebSocket 连接异常,socketId: {}]", socketId, e); + socketIdMap.remove(socket); + IotDeviceRespDTO device = socketDeviceMap.remove(socketId); + if (device != null) { + String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); + connectionManager.removeConnection(deviceKey); + deviceMessageIdMap.remove(deviceKey); + } + socket.close(); + }); + } + + /** + * 处理 MQTT 消息 + * + * @param socket WebSocket 连接 + * @param buffer 消息缓冲区 + */ + private void handleMqttMessage(ServerWebSocket socket, Buffer buffer) { + String socketId = socketIdMap.get(socket); + ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer.getBytes()); + + try { + // 使用 EmbeddedChannel 解码 MQTT 消息 + EmbeddedChannel decoderChannel = decoderChannelThreadLocal.get(); + decoderChannel.writeInbound(byteBuf.retain()); + + // 读取解码后的消息 + MqttMessage mqttMessage = decoderChannel.readInbound(); + if (mqttMessage == null) { + log.warn("[handleMqttMessage][MQTT 消息解码失败,socketId: {}]", socketId); + return; + } + + MqttMessageType messageType = mqttMessage.fixedHeader().messageType(); + log.debug("[handleMqttMessage][收到 MQTT 消息,类型: {},socketId: {}]", messageType, socketId); + + // 根据消息类型分发处理 + switch (messageType) { + case CONNECT: + handleConnect(socket, (MqttConnectMessage) mqttMessage); + break; + case PUBLISH: + handlePublish(socket, (MqttPublishMessage) mqttMessage); + break; + case PUBACK: + handlePubAck(socket, mqttMessage); + break; + case PUBREC: + handlePubRec(socket, mqttMessage); + break; + case PUBREL: + handlePubRel(socket, mqttMessage); + break; + case PUBCOMP: + handlePubComp(socket, mqttMessage); + break; + case SUBSCRIBE: + handleSubscribe(socket, (MqttSubscribeMessage) mqttMessage); + break; + case UNSUBSCRIBE: + handleUnsubscribe(socket, (MqttUnsubscribeMessage) mqttMessage); + break; + case PINGREQ: + handlePingReq(socket); + break; + case DISCONNECT: + handleDisconnect(socket); + break; + default: + log.warn("[handleMqttMessage][不支持的消息类型: {},socketId: {}]", messageType, socketId); + } + } catch (DecoderException e) { + log.error("[handleMqttMessage][MQTT 消息解码异常,socketId: {}]", socketId, e); + socket.close(); + } catch (Exception e) { + log.error("[handleMqttMessage][处理 MQTT 消息失败,socketId: {}]", socketId, e); + socket.close(); + } finally { + byteBuf.release(); + } + } + + /** + * 处理 CONNECT 消息(设备认证) + */ + private void handleConnect(ServerWebSocket socket, MqttConnectMessage message) { + String socketId = socketIdMap.get(socket); + try { + // 1. 解析 CONNECT 消息 + MqttConnectPayload payload = message.payload(); + String clientId = payload.clientIdentifier(); + String username = payload.userName(); + String password = payload.passwordInBytes() != null + ? new String(payload.passwordInBytes(), StandardCharsets.UTF_8) + : null; + + log.info("[handleConnect][收到 CONNECT 消息,clientId: {},username: {},socketId: {}]", + clientId, username, socketId); + + // 2. 设备认证 + IotDeviceRespDTO device = authenticateDevice(clientId, username, password); + if (device == null) { + log.warn("[handleConnect][设备认证失败,clientId: {},socketId: {}]", clientId, socketId); + sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); + socket.close(); + return; + } + + // 3. 保存设备信息 + socketDeviceMap.put(socketId, device); + String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); + connectionManager.addConnection(deviceKey, socket, socketId); + deviceMessageIdMap.put(deviceKey, new AtomicInteger(1)); + + log.info("[handleConnect][设备认证成功,deviceId: {},deviceKey: {},socketId: {}]", + device.getId(), deviceKey, socketId); + + // 4. 发送 CONNACK + sendConnAck(socket, MqttConnectReturnCode.CONNECTION_ACCEPTED); + + // 5. 发送设备上线消息 + sendOnlineMessage(device); + } catch (Exception e) { + log.error("[handleConnect][处理 CONNECT 消息失败,socketId: {}]", socketId, e); + sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); + socket.close(); + } + } + + /** + * 处理 PUBLISH 消息(设备发布消息) + */ + private void handlePublish(ServerWebSocket socket, MqttPublishMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + + if (device == null) { + log.warn("[handlePublish][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + try { + // 1. 解析 PUBLISH 消息 + MqttFixedHeader fixedHeader = message.fixedHeader(); + MqttPublishVariableHeader variableHeader = message.variableHeader(); + ByteBuf payload = message.payload(); + + String topic = variableHeader.topicName(); + int messageId = variableHeader.packetId(); + MqttQoS qos = fixedHeader.qosLevel(); + + log.debug("[handlePublish][收到 PUBLISH 消息,topic: {},messageId: {},QoS: {},deviceId: {}]", + topic, messageId, qos, device.getId()); + + // 2. 读取 payload + byte[] payloadBytes = new byte[payload.readableBytes()]; + payload.readBytes(payloadBytes); + + // 3. 解码并发送消息 + IotDeviceMessage deviceMessage = messageService.decodeDeviceMessage(payloadBytes, + device.getProductKey(), device.getDeviceName()); + if (deviceMessage != null) { + deviceMessage.setServerId(upstreamProtocol.getServerId()); + messageService.sendDeviceMessage(deviceMessage, device.getProductKey(), + device.getDeviceName(), upstreamProtocol.getServerId()); + log.info("[handlePublish][设备消息已发送,method: {},deviceId: {}]", + deviceMessage.getMethod(), device.getId()); + } + + // 4. 根据 QoS 级别发送相应的确认消息 + if (qos == MqttQoS.AT_LEAST_ONCE) { + // QoS 1:发送 PUBACK + sendPubAck(socket, messageId); + } else if (qos == MqttQoS.EXACTLY_ONCE) { + // QoS 2:发送 PUBREC + sendPubRec(socket, messageId); + } + // QoS 0 无需确认 + } catch (Exception e) { + log.error("[handlePublish][处理 PUBLISH 消息失败,deviceId: {}]", device.getId(), e); + } + } + + /** + * 处理 PUBACK 消息(QoS 1 确认) + */ + private void handlePubAck(ServerWebSocket socket, MqttMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + if (device == null) { + log.warn("[handlePubAck][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); + log.debug("[handlePubAck][收到 PUBACK,messageId: {},deviceId: {}]", messageId, device.getId()); + } + + /** + * 处理 PUBREC 消息(QoS 2 第一步确认) + */ + private void handlePubRec(ServerWebSocket socket, MqttMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + if (device == null) { + log.warn("[handlePubRec][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); + log.debug("[handlePubRec][收到 PUBREC,messageId: {},deviceId: {}]", messageId, device.getId()); + // 发送 PUBREL + sendPubRel(socket, messageId); + } + + /** + * 处理 PUBREL 消息(QoS 2 第二步) + */ + private void handlePubRel(ServerWebSocket socket, MqttMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + + if (device == null) { + log.warn("[handlePubRel][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); + log.debug("[handlePubRel][收到 PUBREL,messageId: {},deviceId: {}]", messageId, device.getId()); + // 发送 PUBCOMP + sendPubComp(socket, messageId); + } + + /** + * 处理 PUBCOMP 消息(QoS 2 完成确认) + */ + private void handlePubComp(ServerWebSocket socket, MqttMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + + if (device == null) { + log.warn("[handlePubComp][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); + log.debug("[handlePubComp][收到 PUBCOMP,messageId: {},deviceId: {}]", messageId, device.getId()); + } + + /** + * 处理 SUBSCRIBE 消息(设备订阅主题) + */ + private void handleSubscribe(ServerWebSocket socket, MqttSubscribeMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + if (device == null) { + log.warn("[handleSubscribe][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + try { + // 1. 解析 SUBSCRIBE 消息 + int messageId = message.variableHeader().messageId(); + MqttSubscribePayload payload = message.payload(); + String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); + + log.info("[handleSubscribe][设备订阅请求,deviceKey: {},messageId: {},主题数量: {}]", + deviceKey, messageId, payload.topicSubscriptions().size()); + + // 2. 构建 QoS 列表并记录订阅信息 + int[] grantedQosList = new int[payload.topicSubscriptions().size()]; + for (int i = 0; i < payload.topicSubscriptions().size(); i++) { + MqttTopicSubscription subscription = payload.topicSubscriptions().get(i); + String topic = subscription.topicFilter(); + grantedQosList[i] = subscription.qualityOfService().value(); + + // 记录订阅信息到连接管理器 + connectionManager.addSubscription(deviceKey, topic); + + log.info("[handleSubscribe][订阅主题: {},QoS: {},deviceKey: {}]", + topic, subscription.qualityOfService(), deviceKey); + } + + // 3. 发送 SUBACK + sendSubAck(socket, messageId, grantedQosList); + } catch (Exception e) { + log.error("[handleSubscribe][处理 SUBSCRIBE 消息失败,deviceId: {}]", device.getId(), e); + } + } + + /** + * 处理 UNSUBSCRIBE 消息(设备取消订阅) + */ + private void handleUnsubscribe(ServerWebSocket socket, MqttUnsubscribeMessage message) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + if (device == null) { + log.warn("[handleUnsubscribe][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + try { + // 1. 解析 UNSUBSCRIBE 消息 + int messageId = message.variableHeader().messageId(); + MqttUnsubscribePayload payload = message.payload(); + String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); + + log.info("[handleUnsubscribe][设备取消订阅,deviceKey: {},messageId: {},主题数量: {}]", + deviceKey, messageId, payload.topics().size()); + + // 2. 移除订阅信息 + for (String topic : payload.topics()) { + connectionManager.removeSubscription(deviceKey, topic); + log.info("[handleUnsubscribe][取消订阅主题: {},deviceKey: {}]", topic, deviceKey); + } + + // 3. 发送 UNSUBACK + sendUnsubAck(socket, messageId); + } catch (Exception e) { + log.error("[handleUnsubscribe][处理 UNSUBSCRIBE 消息失败,deviceId: {}]", device.getId(), e); + } + } + + /** + * 处理 PINGREQ 消息(心跳请求) + */ + private void handlePingReq(ServerWebSocket socket) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.get(socketId); + if (device == null) { + log.warn("[handlePingReq][设备未认证,socketId: {}]", socketId); + socket.close(); + return; + } + + log.debug("[handlePingReq][收到心跳请求,deviceId: {}]", device.getId()); + // 发送 PINGRESP + sendPingResp(socket); + } + + /** + * 处理 DISCONNECT 消息(设备断开连接) + */ + private void handleDisconnect(ServerWebSocket socket) { + String socketId = socketIdMap.get(socket); + IotDeviceRespDTO device = socketDeviceMap.remove(socketId); + if (device != null) { + String deviceKey = device.getProductKey() + ":" + device.getDeviceName(); + connectionManager.removeConnection(deviceKey); + deviceMessageIdMap.remove(deviceKey); + sendOfflineMessage(device); + log.info("[handleDisconnect][设备主动断开连接,deviceKey: {}]", deviceKey); + } + + socket.close(); + } + + // ==================== 设备认证和状态相关方法 ==================== + + /** + * 设备认证 + */ + private IotDeviceRespDTO authenticateDevice(String clientId, String username, String password) { + try { + // 1. 参数校验 + if (StrUtil.hasEmpty(clientId, username, password)) { + log.warn("[authenticateDevice][认证参数不完整,clientId: {},username: {}]", clientId, username); + return null; + } + + // 2. 构建认证参数并调用 API + IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() + .setClientId(clientId) + .setUsername(username) + .setPassword(password); + + CommonResult authResult = deviceApi.authDevice(authParams); + if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) { + log.warn("[authenticateDevice][设备认证失败,clientId: {}]", clientId); + return null; + } + + // 3. 获取设备信息 + IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username); + if (deviceInfo == null) { + log.warn("[authenticateDevice][用户名格式不正确,username: {}]", username); + return null; + } + + IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO() + .setProductKey(deviceInfo.getProductKey()) + .setDeviceName(deviceInfo.getDeviceName()); + + CommonResult deviceResult = deviceApi.getDevice(getReqDTO); + if (!deviceResult.isSuccess() || deviceResult.getData() == null) { + log.warn("[authenticateDevice][获取设备信息失败,username: {}]", username); + return null; + } + + return deviceResult.getData(); + } catch (Exception e) { + log.error("[authenticateDevice][设备认证异常,clientId: {}]", clientId, e); + return null; + } + } + + /** + * 发送设备上线消息 + */ + private void sendOnlineMessage(IotDeviceRespDTO device) { + try { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + messageService.sendDeviceMessage(onlineMessage, device.getProductKey(), + device.getDeviceName(), upstreamProtocol.getServerId()); + log.info("[sendOnlineMessage][设备上线,deviceId: {}]", device.getId()); + } catch (Exception e) { + log.error("[sendOnlineMessage][发送设备上线消息失败,deviceId: {}]", device.getId(), e); + } + } + + /** + * 发送设备离线消息 + */ + private void sendOfflineMessage(IotDeviceRespDTO device) { + try { + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + messageService.sendDeviceMessage(offlineMessage, device.getProductKey(), + device.getDeviceName(), upstreamProtocol.getServerId()); + log.info("[sendOfflineMessage][设备离线,deviceId: {}]", device.getId()); + } catch (Exception e) { + log.error("[sendOfflineMessage][发送设备离线消息失败,deviceId: {}]", device.getId(), e); + } + } + + // ==================== 发送响应消息的辅助方法 ==================== + + /** + * 发送 CONNACK 消息 + */ + private void sendConnAck(ServerWebSocket socket, MqttConnectReturnCode returnCode) { + try { + // 构建 CONNACK 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false); + MqttConnAckMessage connAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader); + + // 编码并发送 + sendMqttMessage(socket, connAckMessage); + log.debug("[sendConnAck][发送 CONNACK 消息,returnCode: {}]", returnCode); + } catch (Exception e) { + log.error("[sendConnAck][发送 CONNACK 消息失败]", e); + } + } + + /** + * 发送 PUBACK 消息(QoS 1 确认) + */ + private void sendPubAck(ServerWebSocket socket, int messageId) { + try { + // 构建 PUBACK 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessage pubAckMessage = new MqttMessage(fixedHeader, variableHeader); + + // 编码并发送 + sendMqttMessage(socket, pubAckMessage); + log.debug("[sendPubAck][发送 PUBACK 消息,messageId: {}]", messageId); + } catch (Exception e) { + log.error("[sendPubAck][发送 PUBACK 消息失败,messageId: {}]", messageId, e); + } + } + + /** + * 发送 PUBREC 消息(QoS 2 第一步确认) + */ + private void sendPubRec(ServerWebSocket socket, int messageId) { + try { + // 构建 PUBREC 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessage pubRecMessage = new MqttMessage(fixedHeader, variableHeader); + + // 编码并发送 + sendMqttMessage(socket, pubRecMessage); + log.debug("[sendPubRec][发送 PUBREC 消息,messageId: {}]", messageId); + } catch (Exception e) { + log.error("[sendPubRec][发送 PUBREC 消息失败,messageId: {}]", messageId, e); + } + } + + /** + * 发送 PUBREL 消息(QoS 2 第二步) + */ + private void sendPubRel(ServerWebSocket socket, int messageId) { + try { + // 构建 PUBREL 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader); + + // 编码并发送 + sendMqttMessage(socket, pubRelMessage); + log.debug("[sendPubRel][发送 PUBREL 消息,messageId: {}]", messageId); + } catch (Exception e) { + log.error("[sendPubRel][发送 PUBREL 消息失败,messageId: {}]", messageId, e); + } + } + + /** + * 发送 PUBCOMP 消息(QoS 2 完成确认) + */ + private void sendPubComp(ServerWebSocket socket, int messageId) { + try { + // 构建 PUBCOMP 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessage pubCompMessage = new MqttMessage(fixedHeader, variableHeader); + + // 编码并发送 + sendMqttMessage(socket, pubCompMessage); + log.debug("[sendPubComp][发送 PUBCOMP 消息,messageId: {}]", messageId); + } catch (Exception e) { + log.error("[sendPubComp][发送 PUBCOMP 消息失败,messageId: {}]", messageId, e); + } + } + + /** + * 发送 SUBACK 消息 + */ + private void sendSubAck(ServerWebSocket socket, int messageId, int[] grantedQosList) { + try { + // 构建 SUBACK 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttSubAckPayload payload = new MqttSubAckPayload(grantedQosList); + MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload); + + // 编码并发送 + sendMqttMessage(socket, subAckMessage); + log.debug("[sendSubAck][发送 SUBACK 消息,messageId: {},主题数量: {}]", messageId, grantedQosList.length); + } catch (Exception e) { + log.error("[sendSubAck][发送 SUBACK 消息失败,messageId: {}]", messageId, e); + } + } + + /** + * 发送 UNSUBACK 消息 + */ + private void sendUnsubAck(ServerWebSocket socket, int messageId) { + try { + // 构建 UNSUBACK 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttUnsubAckMessage unsubAckMessage = new MqttUnsubAckMessage(fixedHeader, variableHeader); + + // 编码并发送 + sendMqttMessage(socket, unsubAckMessage); + log.debug("[sendUnsubAck][发送 UNSUBACK 消息,messageId: {}]", messageId); + } catch (Exception e) { + log.error("[sendUnsubAck][发送 UNSUBACK 消息失败,messageId: {}]", messageId, e); + } + } + + /** + * 发送 PINGRESP 消息 + */ + private void sendPingResp(ServerWebSocket socket) { + try { + // 构建 PINGRESP 消息 + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessage pingRespMessage = new MqttMessage(fixedHeader); + + // 编码并发送 + sendMqttMessage(socket, pingRespMessage); + log.debug("[sendPingResp][发送 PINGRESP 消息]"); + } catch (Exception e) { + log.error("[sendPingResp][发送 PINGRESP 消息失败]", e); + } + } + + /** + * 发送 MQTT 消息到 WebSocket + */ + private void sendMqttMessage(ServerWebSocket socket, MqttMessage mqttMessage) { + ByteBuf byteBuf = null; + try { + // 使用 EmbeddedChannel 编码 MQTT 消息 + EmbeddedChannel encoderChannel = encoderChannelThreadLocal.get(); + encoderChannel.writeOutbound(mqttMessage); + + // 读取编码后的 ByteBuf + byteBuf = encoderChannel.readOutbound(); + if (byteBuf != null) { + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + socket.writeBinaryMessage(Buffer.buffer(bytes)); + } + } finally { + if (byteBuf != null) { + byteBuf.release(); + } + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 575eb8a390..d192c87d4a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -98,11 +98,24 @@ yudao: # 针对引入的 MQTT 组件的配置 # ==================================== mqtt: - enabled: true + enabled: false port: 1883 max-message-size: 8192 connect-timeout-seconds: 60 ssl-enabled: false + # ==================================== + # 针对引入的 MQTT WebSocket 组件的配置 + # ==================================== + mqtt-ws: + enabled: false # 是否启用 MQTT WebSocket + port: 8083 # WebSocket 服务端口 + path: /mqtt # WebSocket 路径 + max-message-size: 8192 # 最大消息大小(字节) + max-frame-size: 65536 # 最大帧大小(字节) + connect-timeout-seconds: 60 # 连接超时时间(秒) + keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒) + ssl-enabled: false # 是否启用 SSL(wss://) + sub-protocol: mqtt # WebSocket 子协议 --- #################### 日志相关配置 #################### @@ -122,6 +135,7 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG + cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG # 根日志级别 root: INFO diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/mqtt-websocket-test-client.html b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/mqtt-websocket-test-client.html new file mode 100644 index 0000000000..e0853ac6bf --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/resources/mqtt-websocket-test-client.html @@ -0,0 +1,888 @@ + + + + + + MQTT WebSocket 测试客户端 + + + +

+
+

🚀 MQTT WebSocket 测试客户端

+

RuoYi-Vue-Pro IoT 模块 - MQTT over WebSocket 在线测试工具

+
+ + +
+

📌 标准协议格式说明

+
    +
  • Topic 格式:/sys/{productKey}/{deviceName}/thing/property/post
  • +
  • Client ID 格式:{productKey}.{deviceName} 例如:zOXKLvHjUqTo7ipD.ceshi001 +
  • +
  • Username 格式:{deviceName}&{productKey} 例如:ceshi001&zOXKLvHjUqTo7ipD +
  • +
  • 消息格式(Alink 协议): +
    +{
    +  "id": "消息 ID(唯一标识)",
    +  "version": "1.0",
    +  "method": "thing.property.post",
    +  "params": {
    +    "temperature": 25.5,
    +    "humidity": 60
    +  }
    +}
    +
  • +
  • 常用 Topic(下行 - 服务端推送): +
      +
    • 属性设置:/sys/{pk}/{dn}/thing/property/set
    • +
    • 服务调用:/sys/{pk}/{dn}/thing/service/invoke
    • +
    • 配置推送:/sys/{pk}/{dn}/thing/config/push
    • +
    • OTA 升级:/sys/{pk}/{dn}/thing/ota/upgrade
    • +
    +
  • +
  • 常用 Topic(上行 - 设备上报): +
      +
    • 状态更新:/sys/{pk}/{dn}/thing/state/update
    • +
    • 属性上报:/sys/{pk}/{dn}/thing/property/post
    • +
    • 事件上报:/sys/{pk}/{dn}/thing/event/post
    • +
    • OTA 进度:/sys/{pk}/{dn}/thing/ota/progress
    • +
    +
  • +
+
+ +
+ +
+

📡 连接配置

+ +
+ ⚫ 未连接 +
+ +
+ + + WebSocket 地址,支持 ws:// 和 wss:// +
+ +
+ + + 格式:{productKey}.{deviceName} +
+ +
+ + + 格式:{deviceName}&{productKey} +
+ +
+ + + 设备的认证密钥(Device Secret) +
+ +
+ + + +
+ + +
+
+
0
+
发送消息数
+
+
+
0
+
接收消息数
+
+
+
0
+
错误次数
+
+
+
+ + +
+

📤 消息发布

+ +
+ + +
+ +
+ + + 标准格式:/sys/{productKey}/{deviceName}/thing/property/post +
+ +
+ + +
+ +
+ + + + Alink 协议格式:id(消息 ID)、version(协议版本)、method(方法)、params(参数) + +
+ +
+ + +
+ +

📥 主题订阅

+ +
+ + +
+ +
+ + + 标准格式:/sys/{productKey}/{deviceName}/thing/method 或使用通配符 + /sys/+/+/# +
+ +
+ + +
+ +
+ + +
+
+ + +
+

📝 日志输出

+
+
+
+
+ + + + + + + + + diff --git a/yudao-server/src/main/resources/application-local.yaml b/yudao-server/src/main/resources/application-local.yaml index f6ef1d2401..819440c14d 100644 --- a/yudao-server/src/main/resources/application-local.yaml +++ b/yudao-server/src/main/resources/application-local.yaml @@ -70,7 +70,8 @@ spring: username: root password: 123456 # tdengine: # IoT 数据库(需要 IoT 物联网再开启噢!) -# url: jdbc:TAOS-WS://127.0.0.1:6041/ruoyi_vue_pro +# lazy: true # 开启懒加载,保证启动速度 +# url: jdbc:TAOS-WS://127.0.0.1:6041/ruoyi_vue_pro?varcharAsString=true # driver-class-name: com.taosdata.jdbc.ws.WebSocketDriver # username: root # password: taosdata