!1501 IOT:同步最新代码

Merge pull request !1501 from 芋道源码/feature/iot
This commit is contained in:
芋道源码
2026-01-18 10:26:57 +00:00
committed by Gitee
33 changed files with 3059 additions and 47 deletions

View File

@@ -116,6 +116,7 @@ public class BpmTaskServiceImpl implements BpmTaskService {
.taskAssignee(String.valueOf(userId)) // 分配给自己
.active()
.includeProcessVariables()
.taskTenantId(FlowableUtils.getTenantId())
.orderByTaskCreateTime().desc(); // 创建时间倒序
if (StrUtil.isNotBlank(pageVO.getName())) {
taskQuery.taskNameLike("%" + pageVO.getName() + "%");

View File

@@ -31,4 +31,7 @@ public class IotDevicePageReqVO extends PageParam {
@Schema(description = "设备分组编号", example = "1024")
private Long groupId;
@Schema(description = "网关设备 ID", example = "16380")
private Long gatewayId;
}

View File

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
@@ -22,6 +23,10 @@ public class IotDataSinkPageReqVO extends PageParam {
@InEnum(CommonStatusEnum.class)
private Integer status;
@Schema(description = "数据目的类型", example = "1")
@InEnum(IotDataSinkTypeEnum.class)
private Integer type;
@Schema(description = "创建时间")
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
private LocalDateTime[] createTime;

View File

@@ -21,6 +21,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.List;
/**
@@ -56,6 +57,11 @@ public class IotSceneRuleDO extends TenantBaseDO {
*/
private Integer status;
/**
* 最后触发时间
*/
private LocalDateTime lastTriggerTime;
/**
* 场景定义配置
*/

View File

@@ -10,6 +10,35 @@ import lombok.Data;
@Data
public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* 默认连接超时时间(毫秒)
*/
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000;
/**
* 默认读取超时时间(毫秒)
*/
public static final int DEFAULT_READ_TIMEOUT_MS = 10000;
/**
* 默认是否启用 SSL
*/
public static final boolean DEFAULT_SSL = false;
/**
* 默认数据格式
*/
public static final String DEFAULT_DATA_FORMAT = "JSON";
/**
* 默认心跳间隔时间(毫秒)
*/
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L;
/**
* 默认重连间隔时间(毫秒)
*/
public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L;
/**
* 默认最大重连次数
*/
public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3;
/**
* TCP 服务器地址
*/
@@ -23,17 +52,17 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeoutMs = 5000;
private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
/**
* 读取超时时间(毫秒)
*/
private Integer readTimeoutMs = 10000;
private Integer readTimeoutMs = DEFAULT_READ_TIMEOUT_MS;
/**
* 是否启用 SSL
*/
private Boolean ssl = false;
private Boolean ssl = DEFAULT_SSL;
/**
* SSL 证书路径(当 ssl=true 时需要)
@@ -43,21 +72,21 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* 数据格式JSON 或 BINARY
*/
private String dataFormat = "JSON";
private String dataFormat = DEFAULT_DATA_FORMAT;
/**
* 心跳间隔时间毫秒0 表示不启用心跳
*/
private Long heartbeatIntervalMs = 30000L;
private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
/**
* 重连间隔时间(毫秒)
*/
private Long reconnectIntervalMs = 5000L;
private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS;
/**
* 最大重连次数
*/
private Integer maxReconnectAttempts = 3;
private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS;
}

View File

@@ -13,6 +13,51 @@ import lombok.Data;
@Data
public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* 默认连接超时时间(毫秒)
*/
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000;
/**
* 默认发送超时时间(毫秒)
*/
public static final int DEFAULT_SEND_TIMEOUT_MS = 10000;
/**
* 默认心跳间隔时间(毫秒)
*/
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L;
/**
* 默认心跳消息内容
*/
public static final String DEFAULT_HEARTBEAT_MESSAGE = "{\"type\":\"heartbeat\"}";
/**
* 默认是否启用 SSL 证书验证
*/
public static final boolean DEFAULT_VERIFY_SSL_CERT = true;
/**
* 默认数据格式
*/
public static final String DEFAULT_DATA_FORMAT = "JSON";
/**
* 默认重连间隔时间(毫秒)
*/
public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L;
/**
* 默认最大重连次数
*/
public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3;
/**
* 默认是否启用压缩
*/
public static final boolean DEFAULT_ENABLE_COMPRESSION = false;
/**
* 默认消息发送重试次数
*/
public static final int DEFAULT_SEND_RETRY_COUNT = 1;
/**
* 默认消息发送重试间隔(毫秒)
*/
public static final long DEFAULT_SEND_RETRY_INTERVAL_MS = 1000L;
/**
* WebSocket 服务器地址
* 例如ws://localhost:8080/ws 或 wss://example.com/ws
@@ -22,22 +67,22 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeoutMs = 5000;
private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
/**
* 发送超时时间(毫秒)
*/
private Integer sendTimeoutMs = 10000;
private Integer sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS;
/**
* 心跳间隔时间毫秒0 表示不启用心跳
*/
private Long heartbeatIntervalMs = 30000L;
private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
/**
* 心跳消息内容JSON 格式)
*/
private String heartbeatMessage = "{\"type\":\"heartbeat\"}";
private String heartbeatMessage = DEFAULT_HEARTBEAT_MESSAGE;
/**
* 子协议列表(逗号分隔)
@@ -52,36 +97,36 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* 是否启用 SSL 证书验证(仅对 wss:// 生效)
*/
private Boolean verifySslCert = true;
private Boolean verifySslCert = DEFAULT_VERIFY_SSL_CERT;
/**
* 数据格式JSON 或 TEXT
*/
private String dataFormat = "JSON";
private String dataFormat = DEFAULT_DATA_FORMAT;
/**
* 重连间隔时间(毫秒)
*/
private Long reconnectIntervalMs = 5000L;
private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS;
/**
* 最大重连次数
*/
private Integer maxReconnectAttempts = 3;
private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS;
/**
* 是否启用压缩
*/
private Boolean enableCompression = false;
private Boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
/**
* 消息发送重试次数
*/
private Integer sendRetryCount = 1;
private Integer sendRetryCount = DEFAULT_SEND_RETRY_COUNT;
/**
* 消息发送重试间隔(毫秒)
*/
private Long sendRetryIntervalMs = 1000L;
private Long sendRetryIntervalMs = DEFAULT_SEND_RETRY_INTERVAL_MS;
}

View File

@@ -31,6 +31,7 @@ public interface IotDeviceMapper extends BaseMapperX<IotDeviceDO> {
.eqIfPresent(IotDeviceDO::getDeviceType, reqVO.getDeviceType())
.likeIfPresent(IotDeviceDO::getNickname, reqVO.getNickname())
.eqIfPresent(IotDeviceDO::getState, reqVO.getStatus())
.eqIfPresent(IotDeviceDO::getGatewayId, reqVO.getGatewayId())
.apply(ObjectUtil.isNotNull(reqVO.getGroupId()), "FIND_IN_SET(" + reqVO.getGroupId() + ",group_ids) > 0")
.orderByDesc(IotDeviceDO::getId));
}

View File

@@ -35,4 +35,8 @@ public interface IotDataRuleMapper extends BaseMapperX<IotDataRuleDO> {
return selectList(IotDataRuleDO::getStatus, status);
}
default IotDataRuleDO selectByName(String name) {
return selectOne(IotDataRuleDO::getName, name);
}
}

View File

@@ -21,6 +21,7 @@ public interface IotDataSinkMapper extends BaseMapperX<IotDataSinkDO> {
return selectPage(reqVO, new LambdaQueryWrapperX<IotDataSinkDO>()
.likeIfPresent(IotDataSinkDO::getName, reqVO.getName())
.eqIfPresent(IotDataSinkDO::getStatus, reqVO.getStatus())
.eqIfPresent(IotDataSinkDO::getType, reqVO.getType())
.betweenIfPresent(IotDataSinkDO::getCreateTime, reqVO.getCreateTime())
.orderByDesc(IotDataSinkDO::getId));
}
@@ -29,4 +30,8 @@ public interface IotDataSinkMapper extends BaseMapperX<IotDataSinkDO> {
return selectList(IotDataSinkDO::getStatus, status);
}
default IotDataSinkDO selectByName(String name) {
return selectOne(IotDataSinkDO::getName, name);
}
}

View File

@@ -65,10 +65,12 @@ public interface ErrorCodeConstants {
// ========== IoT 数据流转规则 1-050-010-000 ==========
ErrorCode DATA_RULE_NOT_EXISTS = new ErrorCode(1_050_010_000, "数据流转规则不存在");
ErrorCode DATA_RULE_NAME_EXISTS = new ErrorCode(1_050_010_001, "数据流转规则名称已存在");
// ========== IoT 数据流转目的 1-050-011-000 ==========
ErrorCode DATA_SINK_NOT_EXISTS = new ErrorCode(1_050_011_000, "数据桥梁不存在");
ErrorCode DATA_SINK_DELETE_FAIL_USED_BY_RULE = new ErrorCode(1_050_011_001, "数据流转目的正在被数据流转规则使用,无法删除");
ErrorCode DATA_SINK_NAME_EXISTS = new ErrorCode(1_050_011_002, "数据流转目的名称已存在");
// ========== IoT 场景联动 1-050-012-000 ==========
ErrorCode RULE_SCENE_NOT_EXISTS = new ErrorCode(1_050_012_000, "场景联动不存在");

View File

@@ -16,8 +16,8 @@ import java.util.Arrays;
public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
HTTP(1, "HTTP"),
TCP(2, "TCP"), // TODO @puhui999待实现
WEBSOCKET(3, "WebSocket"), // TODO @puhui999待实现
TCP(2, "TCP"),
WEBSOCKET(3, "WebSocket"),
MQTT(10, "MQTT"), // TODO 待实现;

View File

@@ -382,7 +382,7 @@ public class IotDeviceServiceImpl implements IotDeviceService {
return;
}
// 2.2.2 如果存在,判断是否允许更新
if (updateSupport) {
if (!updateSupport) {
throw exception(DEVICE_KEY_EXISTS);
}
updateDevice(new IotDeviceSaveReqVO().setId(existDevice.getId())

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.service.device.property;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
@@ -145,6 +146,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) {
// 特殊STRUCT 和 ARRAY 类型,在 TDengine 里,有没对应数据类型,只能通过 JSON 来存储
properties.put((String) key, JsonUtils.toJsonString(value));
} else if (IotDataSpecsDataTypeEnum.DOUBLE.getDataType().equals(thingModel.getProperty().getDataType())) {
properties.put((String) key, Convert.toDouble(value));
} else if (IotDataSpecsDataTypeEnum.FLOAT.getDataType().equals(thingModel.getProperty().getDataType())) {
properties.put((String) key, Convert.toFloat(value));
} else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(thingModel.getProperty().getDataType())) {
properties.put((String) key, Convert.toByte(value));
} else {
properties.put((String) key, value);
}

View File

@@ -32,6 +32,7 @@ import java.util.*;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NAME_EXISTS;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NOT_EXISTS;
/**
@@ -62,6 +63,8 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
@Override
@CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true)
public Long createDataRule(IotDataRuleSaveReqVO createReqVO) {
// 校验名称唯一
validateDataRuleNameUnique(null, createReqVO.getName());
// 校验数据源配置和数据目的
validateDataRuleConfig(createReqVO);
// 新增
@@ -75,6 +78,8 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
public void updateDataRule(IotDataRuleSaveReqVO updateReqVO) {
// 校验存在
validateDataRuleExists(updateReqVO.getId());
// 校验名称唯一
validateDataRuleNameUnique(updateReqVO.getId(), updateReqVO.getName());
// 校验数据源配置和数据目的
validateDataRuleConfig(updateReqVO);
@@ -98,6 +103,29 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
}
}
/**
* 校验数据流转规则名称唯一性
*
* @param id 数据流转规则编号(用于更新时排除自身)
* @param name 数据流转规则名称
*/
private void validateDataRuleNameUnique(Long id, String name) {
if (StrUtil.isBlank(name)) {
return;
}
IotDataRuleDO dataRule = dataRuleMapper.selectByName(name);
if (dataRule == null) {
return;
}
// 如果 id 为空,说明不用比较是否为相同 id 的规则
if (id == null) {
throw exception(DATA_RULE_NAME_EXISTS);
}
if (!dataRule.getId().equals(id)) {
throw exception(DATA_RULE_NAME_EXISTS);
}
}
/**
* 校验数据流转规则配置
*

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.service.rule.data;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSinkPageReqVO;
@@ -19,6 +20,7 @@ import java.util.List;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_DELETE_FAIL_USED_BY_RULE;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NAME_EXISTS;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS;
/**
@@ -39,6 +41,9 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
@Override
public Long createDataSink(IotDataSinkSaveReqVO createReqVO) {
// 校验名称唯一
validateDataSinkNameUnique(null, createReqVO.getName());
// 新增
IotDataSinkDO dataBridge = BeanUtils.toBean(createReqVO, IotDataSinkDO.class);
dataSinkMapper.insert(dataBridge);
return dataBridge.getId();
@@ -48,6 +53,8 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
public void updateDataSink(IotDataSinkSaveReqVO updateReqVO) {
// 校验存在
validateDataBridgeExists(updateReqVO.getId());
// 校验名称唯一
validateDataSinkNameUnique(updateReqVO.getId(), updateReqVO.getName());
// 更新
IotDataSinkDO updateObj = BeanUtils.toBean(updateReqVO, IotDataSinkDO.class);
dataSinkMapper.updateById(updateObj);
@@ -71,6 +78,29 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
}
}
/**
* 校验数据流转目的名称唯一性
*
* @param id 数据流转目的编号(用于更新时排除自身)
* @param name 数据流转目的名称
*/
private void validateDataSinkNameUnique(Long id, String name) {
if (StrUtil.isBlank(name)) {
return;
}
IotDataSinkDO dataSink = dataSinkMapper.selectByName(name);
if (dataSink == null) {
return;
}
// 如果 id 为空,说明不用比较是否为相同 id 的目的
if (id == null) {
throw exception(DATA_SINK_NAME_EXISTS);
}
if (!dataSink.getId().equals(id)) {
throw exception(DATA_SINK_NAME_EXISTS);
}
}
@Override
public IotDataSinkDO getDataSink(Long id) {
return dataSinkMapper.selectById(id);

View File

@@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* TCP 的 {@link IotDataRuleAction} 实现类
* <p>
@@ -23,9 +21,6 @@ import java.time.Duration;
public class IotTcpDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkTcpConfig, IotTcpClient> {
private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10);
@Override
public Integer getType() {
return IotDataSinkTypeEnum.TCP.getType();

View File

@@ -0,0 +1,85 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* WebSocket 的 {@link IotDataRuleAction} 实现类
* <p>
* 负责将设备消息发送到外部 WebSocket 服务器
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 使用连接池管理 WebSocket 连接,提高性能和资源利用率
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotWebSocketDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkWebSocketConfig, IotWebSocketClient> {
@Override
public Integer getType() {
return IotDataSinkTypeEnum.WEBSOCKET.getType();
}
@Override
protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception {
// 1. 参数校验
if (StrUtil.isBlank(config.getServerUrl())) {
throw new IllegalArgumentException("WebSocket 服务器地址不能为空");
}
if (!StrUtil.startWithAny(config.getServerUrl(), "ws://", "wss://")) {
throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头");
}
// 2.1 创建 WebSocket 客户端
IotWebSocketClient webSocketClient = new IotWebSocketClient(
config.getServerUrl(),
config.getConnectTimeoutMs(),
config.getSendTimeoutMs(),
config.getDataFormat()
);
// 2.2 连接服务器
webSocketClient.connect();
log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]",
config.getServerUrl(), config.getDataFormat());
return webSocketClient;
}
@Override
protected void closeProducer(IotWebSocketClient producer) throws Exception {
if (producer != null) {
producer.close();
}
}
@Override
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
try {
// 1.1 获取或创建 WebSocket 客户端
// TODO @puhui999需要加锁保证必须连接上
IotWebSocketClient webSocketClient = getProducer(config);
// 1.2 检查连接状态,如果断开则重新连接
if (!webSocketClient.isConnected()) {
log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
webSocketClient.connect();
}
// 2.1 发送消息
webSocketClient.sendMessage(message);
// 2.2 记录发送成功日志
log.info("[execute][message({}) config({}) 发送成功WebSocket 服务器: {}]",
message, config, config.getServerUrl());
} catch (Exception e) {
log.error("[execute][message({}) config({}) 发送失败WebSocket 服务器: {}]",
message, config, config.getServerUrl(), e);
throw e;
}
}
}

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
import lombok.extern.slf4j.Slf4j;
import javax.net.ssl.SSLSocketFactory;
@@ -30,6 +31,7 @@ public class IotTcpClient {
private final Integer connectTimeoutMs;
private final Integer readTimeoutMs;
private final Boolean ssl;
// TODO @puhui999sslCertPath 是不是没在用?
private final String sslCertPath;
private final String dataFormat;
@@ -38,16 +40,16 @@ public class IotTcpClient {
private BufferedReader reader;
private final AtomicBoolean connected = new AtomicBoolean(false);
// TODO @puhui999default 值IotDataSinkTcpConfig.java 枚举起来哈;
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
Boolean ssl, String sslCertPath, String dataFormat) {
this.host = host;
this.port = port;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000;
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000;
this.ssl = ssl != null ? ssl : false;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS;
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS;
this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL;
this.sslCertPath = sslCertPath;
this.dataFormat = dataFormat != null ? dataFormat : "JSON";
// TODO @puhui999可以使用 StrUtil.defaultIfBlank 方法简化
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT;
}
/**
@@ -99,9 +101,8 @@ public class IotTcpClient {
}
try {
// TODO @puhui999枚举值
String messageData;
if ("JSON".equalsIgnoreCase(dataFormat)) {
if (IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
// JSON 格式
messageData = JsonUtils.toJsonString(message);
} else {

View File

@@ -0,0 +1,177 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* IoT WebSocket 客户端
* <p>
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
*
* @author HUIHUI
*/
@Slf4j
public class IotWebSocketClient implements WebSocket.Listener {
private final String serverUrl;
private final Integer connectTimeoutMs;
private final Integer sendTimeoutMs;
private final String dataFormat;
private WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final StringBuilder messageBuffer = new StringBuilder();
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS;
this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_SEND_TIMEOUT_MS;
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT;
}
/**
* 连接到 WebSocket 服务器
*/
@SuppressWarnings("resource")
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
return;
}
try {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.build();
CompletableFuture<WebSocket> future = httpClient.newWebSocketBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.buildAsync(URI.create(serverUrl), this);
// 等待连接完成
webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
connected.set(true);
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
} catch (Exception e) {
close();
log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e);
throw e;
}
}
@Override
public void onOpen(WebSocket webSocket) {
log.debug("[onOpen][WebSocket 连接已打开]");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
messageBuffer.append(data);
if (last) {
log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
messageBuffer.setLength(0);
}
webSocket.request(1);
return null;
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
connected.set(false);
log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
connected.set(false);
log.error("[onError][WebSocket 发生错误]", error);
}
/**
* 发送设备消息
*
* @param message 设备消息
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
if (!connected.get() || webSocket == null) {
throw new IllegalStateException("WebSocket 客户端未连接");
}
try {
String messageData;
if (IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
messageData = JsonUtils.toJsonString(message);
} else {
messageData = message.toString();
}
// 发送消息并等待完成
CompletableFuture<WebSocket> future = webSocket.sendText(messageData, true);
future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
throw e;
}
}
/**
* 关闭连接
*/
public void close() {
if (!connected.get() && webSocket == null) {
return;
}
try {
if (webSocket != null) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(e -> {
log.warn("[close][发送关闭帧失败]", e);
return null;
});
}
connected.set(false);
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
} catch (Exception e) {
log.error("[close][关闭 WebSocket 客户端连接异常]", e);
}
}
/**
* 检查连接状态
*
* @return 是否已连接
*/
public boolean isConnected() {
return connected.get() && webSocket != null;
}
@Override
public String toString() {
return "IotWebSocketClient{" +
"serverUrl='" + serverUrl + '\'' +
", dataFormat='" + dataFormat + '\'' +
", connected=" + connected.get() +
'}';
}
}

View File

@@ -30,6 +30,7 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
@@ -392,9 +393,25 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
}
});
});
// 3. 更新最后触发时间
updateLastTriggerTime(sceneRule.getId());
});
}
/**
* 更新规则场景的最后触发时间
*
* @param id 规则场景编号
*/
private void updateLastTriggerTime(Long id) {
try {
sceneRuleMapper.updateById(new IotSceneRuleDO().setId(id).setLastTriggerTime(LocalDateTime.now()));
} catch (Exception e) {
log.error("[updateLastTriggerTime][规则场景编号({}) 更新最后触发时间异常]", id, e);
}
}
private IotSceneRuleServiceImpl getSelf() {
return SpringUtil.getBean(IotSceneRuleServiceImpl.class);
}

View File

@@ -36,11 +36,12 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM
return false;
}
// 1.3 检查标识符是否匹配
String messageIdentifier = IotDeviceMessageUtils.getIdentifier(message);
if (!IotSceneRuleMatcherHelper.isIdentifierMatched(trigger.getIdentifier(), messageIdentifier)) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "标识符不匹配,期望: " +
trigger.getIdentifier() + ", 实际: " + messageIdentifier);
// 1.3 检查消息中是否包含触发器指定的属性标识符
// 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中
// TODO @puhui999可以考虑 notXXX 方法,简化代码(尽量取反)
if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " +
trigger.getIdentifier());
return false;
}

View File

@@ -5,6 +5,7 @@ import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -69,6 +70,55 @@ public class IotDeviceMessageUtils {
return null;
}
/**
* 判断消息中是否包含指定的标识符
*
* 对于不同消息类型的处理:
* - EVENT_POST/SERVICE_INVOKE检查 params.identifier 是否匹配
* - STATE_UPDATE检查 params.state 是否匹配
* - PROPERTY_POST检查 params 中是否包含该属性 key
*
* @param message 消息
* @param identifier 要检查的标识符
* @return 是否包含
*/
public static boolean containsIdentifier(IotDeviceMessage message, String identifier) {
if (message.getParams() == null || StrUtil.isBlank(identifier)) {
return false;
}
// EVENT_POST / SERVICE_INVOKE / STATE_UPDATE使用原有逻辑
String messageIdentifier = getIdentifier(message);
if (messageIdentifier != null) {
return identifier.equals(messageIdentifier);
}
// PROPERTY_POST检查 params 中是否包含该属性 key
if (StrUtil.equals(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) {
Map<String, Object> params = parseParamsToMap(message.getParams());
return params != null && params.containsKey(identifier);
}
return false;
}
/**
* 将 params 解析为 Map
*
* @param params 参数(可能是 Map 或 JSON 字符串)
* @return Map解析失败返回 null
*/
@SuppressWarnings("unchecked")
private static Map<String, Object> parseParamsToMap(Object params) {
if (params instanceof Map) {
return (Map<String, Object>) params;
}
if (params instanceof String) {
try {
return JsonUtils.parseObject((String) params, Map.class);
} catch (Exception ignored) {
}
}
return null;
}
/**
* 从设备消息中提取指定标识符的属性值
* - 支持多种消息格式和属性值提取策略

View File

@@ -10,6 +10,10 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
@@ -17,6 +21,7 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
@@ -55,20 +60,20 @@ public class IotGatewayConfiguration {
@Slf4j
public static class EmqxProtocolConfiguration {
@Bean(destroyMethod = "close")
@Bean(name = "emqxVertx", destroyMethod = "close")
public Vertx emqxVertx() {
return Vertx.vertx();
}
@Bean
public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties,
Vertx emqxVertx) {
@Qualifier("emqxVertx") Vertx emqxVertx) {
return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
}
@Bean
public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties,
Vertx emqxVertx) {
@Qualifier("emqxVertx") Vertx emqxVertx) {
return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
}
@@ -87,7 +92,7 @@ public class IotGatewayConfiguration {
@Slf4j
public static class TcpProtocolConfiguration {
@Bean(destroyMethod = "close")
@Bean(name = "tcpVertx", destroyMethod = "close")
public Vertx tcpVertx() {
return Vertx.vertx();
}
@@ -97,7 +102,7 @@ public class IotGatewayConfiguration {
IotDeviceService deviceService,
IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager,
Vertx tcpVertx) {
@Qualifier("tcpVertx") Vertx tcpVertx) {
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
deviceService, messageService, connectionManager, tcpVertx);
}
@@ -122,7 +127,7 @@ public class IotGatewayConfiguration {
@Slf4j
public static class MqttProtocolConfiguration {
@Bean(destroyMethod = "close")
@Bean(name = "mqttVertx", destroyMethod = "close")
public Vertx mqttVertx() {
return Vertx.vertx();
}
@@ -131,7 +136,7 @@ public class IotGatewayConfiguration {
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
Vertx mqttVertx) {
@Qualifier("mqttVertx") Vertx mqttVertx) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService,
connectionManager, mqttVertx);
}
@@ -151,4 +156,42 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 MQTT WebSocket 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt-ws", name = "enabled", havingValue = "true")
@Slf4j
public static class MqttWsProtocolConfiguration {
@Bean(name = "mqttWsVertx", destroyMethod = "close")
public Vertx mqttWsVertx() {
return Vertx.vertx();
}
@Bean
public IotMqttWsUpstreamProtocol iotMqttWsUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceMessageService messageService,
IotMqttWsConnectionManager connectionManager,
@Qualifier("mqttWsVertx") Vertx mqttWsVertx) {
return new IotMqttWsUpstreamProtocol(gatewayProperties.getProtocol().getMqttWs(),
messageService, connectionManager, mqttWsVertx);
}
@Bean
public IotMqttWsDownstreamHandler iotMqttWsDownstreamHandler(IotDeviceMessageService messageService,
IotDeviceService deviceService,
IotMqttWsConnectionManager connectionManager) {
return new IotMqttWsDownstreamHandler(messageService, deviceService, connectionManager);
}
@Bean
public IotMqttWsDownstreamSubscriber iotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol mqttWsUpstreamProtocol,
IotMqttWsDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotMqttWsDownstreamSubscriber(mqttWsUpstreamProtocol, downstreamHandler, messageBus);
}
}
}

View File

@@ -88,6 +88,11 @@ public class IotGatewayProperties {
*/
private MqttProperties mqtt;
/**
* MQTT WebSocket 组件配置
*/
private MqttWsProperties mqttWs;
}
@Data
@@ -402,4 +407,100 @@ public class IotGatewayProperties {
}
@Data
public static class MqttWsProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* WebSocket 服务器端口默认8083
*/
private Integer port = 8083;
/**
* WebSocket 路径(默认:/mqtt
*/
@NotEmpty(message = "WebSocket 路径不能为空")
private String path = "/mqtt";
/**
* 最大消息大小(字节)
*/
private Integer maxMessageSize = 8192;
/**
* 连接超时时间(秒)
*/
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间(秒)
*/
private Integer keepAliveTimeoutSeconds = 300;
/**
* 是否启用 SSLwss://
*/
private Boolean sslEnabled = false;
/**
* SSL 配置
*/
private SslOptions sslOptions = new SslOptions();
/**
* WebSocket 子协议(通常为 "mqtt" 或 "mqttv3.1"
*/
@NotEmpty(message = "WebSocket 子协议不能为空")
private String subProtocol = "mqtt";
/**
* 最大帧大小(字节)
*/
private Integer maxFrameSize = 65536;
/**
* SSL 配置选项
*/
@Data
public static class SslOptions {
/**
* 密钥证书选项
*/
private io.vertx.core.net.KeyCertOptions keyCertOptions;
/**
* 信任选项
*/
private io.vertx.core.net.TrustOptions trustOptions;
/**
* SSL 证书路径
*/
private String certPath;
/**
* SSL 私钥路径
*/
private String keyPath;
/**
* 信任存储路径
*/
private String trustStorePath;
/**
* 信任存储密码
*/
private String trustStorePassword;
}
}
}

View File

@@ -0,0 +1,79 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
* IoT MQTT WebSocket 下行消息订阅器
* <p>
* 订阅消息总线的设备下行消息,并通过 WebSocket 发送到设备
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotMqttWsUpstreamProtocol upstreamProtocol;
private final IotMqttWsDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
public IotMqttWsDownstreamSubscriber(IotMqttWsUpstreamProtocol upstreamProtocol,
IotMqttWsDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
this.upstreamProtocol = upstreamProtocol;
this.downstreamHandler = downstreamHandler;
this.messageBus = messageBus;
}
@PostConstruct
public void init() {
messageBus.register(this);
log.info("[init][MQTT WebSocket 下行消息订阅器已启动topic: {}]", getTopic());
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][收到下行消息deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod());
try {
// 1. 校验
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
log.warn("[onMessage][消息方法为空deviceId: {}]", message.getDeviceId());
return;
}
// 2. 委托给下行处理器处理业务逻辑
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[onMessage][下行消息处理成功deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod());
} else {
log.warn("[onMessage][下行消息处理失败deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod());
}
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败deviceId: {}method: {}]",
message.getDeviceId(), message.getMethod(), e);
}
}
}

View File

@@ -0,0 +1,146 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.ServerWebSocket;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT WebSocket 协议:接收设备上行消息
* <p>
* 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持:
* - 标准 MQTT 3.1.1 协议
* - WebSocket 协议升级
* - SSL/TLS 加密wss://
* - 设备认证与连接管理
* - QoS 0/1/2 消息质量保证
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsUpstreamProtocol {
private final IotGatewayProperties.MqttWsProperties mqttWsProperties;
private final IotDeviceMessageService messageService;
private final IotMqttWsConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private HttpServer httpServer;
public IotMqttWsUpstreamProtocol(IotGatewayProperties.MqttWsProperties mqttWsProperties,
IotDeviceMessageService messageService,
IotMqttWsConnectionManager connectionManager,
Vertx vertx) {
this.mqttWsProperties = mqttWsProperties;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(mqttWsProperties.getPort());
}
@PostConstruct
public void start() {
// 创建 HTTP 服务器选项
HttpServerOptions options = new HttpServerOptions()
.setPort(mqttWsProperties.getPort())
.setIdleTimeout(mqttWsProperties.getKeepAliveTimeoutSeconds())
.setMaxWebSocketFrameSize(mqttWsProperties.getMaxFrameSize())
.setMaxWebSocketMessageSize(mqttWsProperties.getMaxMessageSize())
// 配置 WebSocket 子协议支持
.addWebSocketSubProtocol(mqttWsProperties.getSubProtocol());
// 配置 SSL如果启用
if (Boolean.TRUE.equals(mqttWsProperties.getSslEnabled())) {
options.setSsl(true)
.setKeyCertOptions(mqttWsProperties.getSslOptions().getKeyCertOptions())
.setTrustOptions(mqttWsProperties.getSslOptions().getTrustOptions());
log.info("[start][MQTT WebSocket 已启用 SSL/TLS (wss://)]");
}
// 创建 HTTP 服务器
httpServer = vertx.createHttpServer(options);
// 设置 WebSocket 处理器
httpServer.webSocketHandler(this::handleWebSocketConnection);
// 启动服务器
try {
httpServer.listen().result();
log.info("[start][IoT 网关 MQTT WebSocket 协议启动成功,端口: {},路径: {},支持子协议: {}]",
mqttWsProperties.getPort(), mqttWsProperties.getPath(),
"mqtt, mqttv3.1, " + mqttWsProperties.getSubProtocol());
} catch (Exception e) {
log.error("[start][IoT 网关 MQTT WebSocket 协议启动失败]", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (httpServer != null) {
try {
// 关闭所有连接
connectionManager.closeAllConnections();
// 关闭服务器
httpServer.close().result();
log.info("[stop][IoT 网关 MQTT WebSocket 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 MQTT WebSocket 协议停止失败]", e);
}
}
}
/**
* 处理 WebSocket 连接请求
*
* @param socket WebSocket 连接
*/
private void handleWebSocketConnection(ServerWebSocket socket) {
String path = socket.path();
String subProtocol = socket.subProtocol();
log.info("[handleWebSocketConnection][收到 WebSocket 连接请求path: {}subProtocol: {}remoteAddress: {}]",
path, subProtocol, socket.remoteAddress());
// 验证路径
if (!mqttWsProperties.getPath().equals(path)) {
log.warn("[handleWebSocketConnection][WebSocket 路径不匹配拒绝连接path: {},期望: {}]",
path, mqttWsProperties.getPath());
socket.close();
return;
}
// 验证子协议
// Vert.x 已经自动进行了子协议协商,这里只需要验证是否为 MQTT 相关协议
if (subProtocol != null && !subProtocol.startsWith("mqtt")) {
log.warn("[handleWebSocketConnection][WebSocket 子协议不支持拒绝连接subProtocol: {}]", subProtocol);
socket.close();
return;
}
log.info("[handleWebSocketConnection][WebSocket 连接已接受remoteAddress: {}subProtocol: {}]",
socket.remoteAddress(), subProtocol);
// 创建处理器并处理连接
IotMqttWsUpstreamHandler handler = new IotMqttWsUpstreamHandler(
this, messageService, connectionManager);
handler.handle(socket);
}
}

View File

@@ -0,0 +1,259 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager;
import cn.hutool.core.collection.CollUtil;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* IoT MQTT WebSocket 连接管理器
*
* @author 芋道源码
*/
@Slf4j
@Component
public class IotMqttWsConnectionManager {
/**
* 存储设备连接
* Key: 设备标识deviceKey
* Value: WebSocket 连接
*/
private final Map<String, ServerWebSocket> connections = new ConcurrentHashMap<>();
/**
* 存储设备标识与 Socket ID 的映射
* Key: 设备标识deviceKey
* Value: Socket IDUUID
*/
private final Map<String, String> deviceKeyToSocketId = new ConcurrentHashMap<>();
/**
* 存储 Socket ID 与设备标识的映射
* Key: Socket IDUUID
* Value: 设备标识deviceKey
*/
private final Map<String, String> socketIdToDeviceKey = new ConcurrentHashMap<>();
/**
* 存储设备订阅的主题
* Key: 设备标识deviceKey
* Value: 订阅的主题集合
*/
private final Map<String, Set<String>> deviceSubscriptions = new ConcurrentHashMap<>();
/**
* 添加连接
*
* @param deviceKey 设备标识
* @param socket WebSocket 连接
* @param socketId Socket IDUUID
*/
public void addConnection(String deviceKey, ServerWebSocket socket, String socketId) {
connections.put(deviceKey, socket);
deviceKeyToSocketId.put(deviceKey, socketId);
socketIdToDeviceKey.put(socketId, deviceKey);
log.info("[addConnection][设备连接已添加deviceKey: {}socketId: {},当前连接数: {}]",
deviceKey, socketId, connections.size());
}
/**
* 移除连接
*
* @param deviceKey 设备标识
*/
public void removeConnection(String deviceKey) {
ServerWebSocket socket = connections.remove(deviceKey);
String socketId = deviceKeyToSocketId.remove(deviceKey);
if (socketId != null) {
socketIdToDeviceKey.remove(socketId);
}
if (socket != null) {
log.info("[removeConnection][设备连接已移除deviceKey: {}socketId: {},当前连接数: {}]",
deviceKey, socketId, connections.size());
}
}
/**
* 根据 Socket ID 移除连接
*
* @param socketId WebSocket 文本框架 ID
*/
public void removeConnectionBySocketId(String socketId) {
String deviceKey = socketIdToDeviceKey.remove(socketId);
if (deviceKey != null) {
connections.remove(deviceKey);
log.info("[removeConnectionBySocketId][设备连接已移除socketId: {}deviceKey: {},当前连接数: {}]",
socketId, deviceKey, connections.size());
}
}
/**
* 获取连接
*
* @param deviceKey 设备标识
* @return WebSocket 连接
*/
public ServerWebSocket getConnection(String deviceKey) {
return connections.get(deviceKey);
}
/**
* 根据 Socket ID 获取设备标识
*
* @param socketId WebSocket 文本框架 ID
* @return 设备标识
*/
public String getDeviceKeyBySocketId(String socketId) {
return socketIdToDeviceKey.get(socketId);
}
/**
* 检查设备是否在线
*
* @param deviceKey 设备标识
* @return 是否在线
*/
public boolean isOnline(String deviceKey) {
return connections.containsKey(deviceKey);
}
/**
* 获取当前连接数
*
* @return 连接数
*/
public int getConnectionCount() {
return connections.size();
}
/**
* 关闭所有连接
*/
public void closeAllConnections() {
connections.forEach((deviceKey, socket) -> {
try {
socket.close();
log.info("[closeAllConnections][关闭设备连接deviceKey: {}]", deviceKey);
} catch (Exception e) {
log.error("[closeAllConnections][关闭设备连接失败deviceKey: {}]", deviceKey, e);
}
});
connections.clear();
deviceKeyToSocketId.clear();
socketIdToDeviceKey.clear();
deviceSubscriptions.clear();
log.info("[closeAllConnections][所有连接已关闭]");
}
// ==================== 订阅管理方法 ====================
/**
* 添加订阅
*
* @param deviceKey 设备标识
* @param topic 订阅主题
*/
public void addSubscription(String deviceKey, String topic) {
deviceSubscriptions.computeIfAbsent(deviceKey, k -> new CopyOnWriteArraySet<>()).add(topic);
log.debug("[addSubscription][设备订阅主题deviceKey: {}topic: {}]", deviceKey, topic);
}
/**
* 移除订阅
*
* @param deviceKey 设备标识
* @param topic 订阅主题
*/
public void removeSubscription(String deviceKey, String topic) {
Set<String> topics = deviceSubscriptions.get(deviceKey);
if (topics != null) {
topics.remove(topic);
log.debug("[removeSubscription][设备取消订阅deviceKey: {}topic: {}]", deviceKey, topic);
}
}
/**
* 检查设备是否订阅了指定主题
* 支持 MQTT 通配符匹配(+ 和 #
*
* @param deviceKey 设备标识
* @param topic 发布主题
* @return 是否匹配
*/
public boolean isSubscribed(String deviceKey, String topic) {
Set<String> subscriptions = deviceSubscriptions.get(deviceKey);
if (CollUtil.isEmpty(subscriptions)) {
return false;
}
// 检查是否有匹配的订阅
for (String subscription : subscriptions) {
if (topicMatches(subscription, topic)) {
return true;
}
}
return false;
}
/**
* 获取设备的所有订阅
*
* @param deviceKey 设备标识
* @return 订阅主题集合
*/
public Set<String> getSubscriptions(String deviceKey) {
return deviceSubscriptions.get(deviceKey);
}
// TODO @haohao这个方法是不是也可以考虑抽到 IotMqttTopicUtils 里面去哈;感觉更简洁一点?
/**
* MQTT 主题匹配
* 支持通配符:
* - +:匹配单层主题
* - #:匹配多层主题(必须在末尾)
*
* @param subscription 订阅主题(可能包含通配符)
* @param topic 发布主题(不包含通配符)
* @return 是否匹配
*/
private boolean topicMatches(String subscription, String topic) {
// 完全匹配
if (subscription.equals(topic)) {
return true;
}
// 不包含通配符
// TODO @haohao这里要不要枚举下哈+ #
if (!subscription.contains("+") && !subscription.contains("#")) {
return false;
}
String[] subscriptionParts = subscription.split("/");
String[] topicParts = topic.split("/");
int i = 0;
for (; i < subscriptionParts.length && i < topicParts.length; i++) {
String subPart = subscriptionParts[i];
String topicPart = topicParts[i];
// # 匹配剩余所有层级,且必须在末尾
if (subPart.equals("#")) {
return i == subscriptionParts.length - 1;
}
// 不是通配符且不匹配
if (!subPart.equals("+") && !subPart.equals(topicPart)) {
return false;
}
}
// 检查是否都匹配完
return i == subscriptionParts.length && i == topicParts.length;
}
}

View File

@@ -0,0 +1,15 @@
/**
* IoT 网关 MQTT WebSocket 协议实现
* <p>
* 基于 Vert.x 实现 MQTT over WebSocket 服务端,支持:
* - 标准 MQTT 3.1.1 协议
* - WebSocket 协议升级
* - SSL/TLS 加密wss://
* - 设备认证与连接管理
* - QoS 0/1/2 消息质量保证
* - 双向消息通信(上行/下行)
*
* @author 芋道源码
*/
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws;

View File

@@ -0,0 +1,221 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
/**
* IoT MQTT WebSocket 下行消息处理器
* <p>
* 处理从消息总线发送到设备的消息,包括:
* - 属性设置
* - 服务调用
* - 事件通知
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotMqttWsConnectionManager connectionManager;
/**
* 消息 ID 生成器(用于发布消息)
*/
private final AtomicInteger messageIdGenerator = new AtomicInteger(1);
public IotMqttWsDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotMqttWsConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
}
/**
* 处理下行消息
*
* @param message 设备消息
* @return 是否处理成功
*/
public boolean handleDownstreamMessage(IotDeviceMessage message) {
try {
// 1. 基础校验
if (message == null || message.getDeviceId() == null) {
log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]");
return false;
}
// 2. 获取设备信息
IotDeviceRespDTO deviceInfo = deviceService.getDeviceFromCache(message.getDeviceId());
if (deviceInfo == null) {
log.warn("[handleDownstreamMessage][设备不存在,设备 ID{}]", message.getDeviceId());
return false;
}
// 3. 构建设备标识
String deviceKey = deviceInfo.getProductKey() + ":" + deviceInfo.getDeviceName();
// 4. 检查设备是否在线
if (!connectionManager.isOnline(deviceKey)) {
log.warn("[handleDownstreamMessage][设备离线无法发送消息deviceKey: {}]", deviceKey);
return false;
}
// 5. 构建主题
String topic = buildDownstreamTopic(message, deviceInfo);
if (StrUtil.isBlank(topic)) {
log.warn("[handleDownstreamMessage][主题构建失败,设备 ID{},方法:{}]",
message.getDeviceId(), message.getMethod());
return false;
}
// 6. 检查设备是否订阅了该主题
if (!connectionManager.isSubscribed(deviceKey, topic)) {
log.warn("[handleDownstreamMessage][设备未订阅该主题deviceKey: {}topic: {}]", deviceKey, topic);
return false;
}
// 8. 编码消息
byte[] payload = deviceMessageService.encodeDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (payload == null || payload.length == 0) {
log.warn("[handleDownstreamMessage][消息编码失败,设备 ID{}]", message.getDeviceId());
return false;
}
// 9. 发送消息到设备
return sendMessageToDevice(deviceKey, topic, payload, 1);
} catch (Exception e) {
if (message != null) {
log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID{},错误:{}]",
message.getDeviceId(), e.getMessage(), e);
}
return false;
}
}
/**
* 构建下行消息主题
*
* @param message 设备消息
* @param deviceInfo 设备信息
* @return 主题
*/
private String buildDownstreamTopic(IotDeviceMessage message, IotDeviceRespDTO deviceInfo) {
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
return null;
}
// 使用工具类构建主题,支持回复消息处理
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
return IotMqttTopicUtils.buildTopicByMethod(method, deviceInfo.getProductKey(),
deviceInfo.getDeviceName(), isReply);
}
/**
* 发送消息到设备
*
* @param deviceKey 设备标识productKey:deviceName
* @param topic 主题
* @param payload 消息内容
* @param qos QoS 级别
* @return 是否发送成功
*/
private boolean sendMessageToDevice(String deviceKey, String topic, byte[] payload, int qos) {
// 获取设备连接
ServerWebSocket socket = connectionManager.getConnection(deviceKey);
if (socket == null) {
log.warn("[sendMessageToDevice][设备未连接deviceKey: {}]", deviceKey);
return false;
}
try {
int messageId = qos > 0 ? generateMessageId() : 0;
// 手动编码 MQTT PUBLISH 消息
io.netty.buffer.ByteBuf byteBuf = io.netty.buffer.Unpooled.buffer();
// 固定头:消息类型(PUBLISH=3) + DUP(0) + QoS + RETAIN
int fixedHeaderByte1 = 0x30 | (qos << 1); // PUBLISH类型
byteBuf.writeByte(fixedHeaderByte1);
// 计算剩余长度
int topicLength = topic.getBytes().length;
int remainingLength = 2 + topicLength + (qos > 0 ? 2 : 0) + payload.length;
// 写入剩余长度(简化版本,假设小于 128 字节)
if (remainingLength < 128) {
byteBuf.writeByte(remainingLength);
} else {
// 处理大于 127 的情况
int x = remainingLength;
do {
int encodedByte = x % 128;
x = x / 128;
if (x > 0) {
encodedByte = encodedByte | 128;
}
byteBuf.writeByte(encodedByte);
} while (x > 0);
}
// 可变头:主题名称
byteBuf.writeShort(topicLength);
byteBuf.writeBytes(topic.getBytes());
// 可变头:消息 ID仅 QoS > 0 时)
if (qos > 0) {
byteBuf.writeShort(messageId);
}
// 有效载荷
byteBuf.writeBytes(payload);
// 发送
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
byteBuf.release();
socket.writeBinaryMessage(Buffer.buffer(bytes));
log.info("[sendMessageToDevice][消息已发送到设备deviceKey: {}topic: {}qos: {}messageId: {}]",
deviceKey, topic, qos, messageId);
return true;
} catch (Exception e) {
log.error("[sendMessageToDevice][发送消息到设备失败deviceKey: {}topic: {}]", deviceKey, topic, e);
return false;
}
}
/**
* 生成消息 ID
*
* @return 消息 ID
*/
private int generateMessageId() {
int id = messageIdGenerator.getAndIncrement();
// MQTT 消息 ID 范围是 1-65535
// TODO @haohao并发可能有问题
if (id > 65535) {
messageIdGenerator.set(1);
return 1;
}
return id;
}
}

View File

@@ -0,0 +1,753 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.IotMqttWsUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.manager.IotMqttWsConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.mqtt.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* IoT MQTT WebSocket 上行消息处理器
* <p>
* 处理来自设备的 MQTT 消息,包括:
* - CONNECT设备连接认证
* - PUBLISH设备发布消息
* - SUBSCRIBE设备订阅主题
* - UNSUBSCRIBE设备取消订阅
* - PINGREQ心跳请求
* - DISCONNECT设备断开连接
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttWsUpstreamHandler {
private final IotMqttWsUpstreamProtocol upstreamProtocol;
private final IotDeviceCommonApi deviceApi;
private final IotDeviceMessageService messageService;
private final IotMqttWsConnectionManager connectionManager;
/**
* 存储 WebSocket 连接到 Socket ID 的映射
* Key: WebSocket 对象
* Value: Socket IDUUID
*/
private final ConcurrentHashMap<ServerWebSocket, String> socketIdMap = new ConcurrentHashMap<>();
/**
* 存储 Socket ID 对应的设备信息
* Key: Socket IDUUID
* Value: 设备信息
*/
private final ConcurrentHashMap<String, IotDeviceRespDTO> socketDeviceMap = new ConcurrentHashMap<>();
/**
* 存储设备的消息 ID 生成器(用于 QoS > 0 的消息)
*/
private final ConcurrentHashMap<String, AtomicInteger> deviceMessageIdMap = new ConcurrentHashMap<>();
/**
* MQTT 解码通道(用于解析 WebSocket 中的 MQTT 二进制消息)
*/
private final ThreadLocal<EmbeddedChannel> decoderChannelThreadLocal = ThreadLocal
.withInitial(() -> new EmbeddedChannel(new MqttDecoder()));
/**
* MQTT 编码通道(用于编码 MQTT 响应消息)
*/
private final ThreadLocal<EmbeddedChannel> encoderChannelThreadLocal = ThreadLocal
.withInitial(() -> new EmbeddedChannel(MqttEncoder.INSTANCE));
public IotMqttWsUpstreamHandler(IotMqttWsUpstreamProtocol upstreamProtocol,
IotDeviceMessageService messageService,
IotMqttWsConnectionManager connectionManager) {
this.upstreamProtocol = upstreamProtocol;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.messageService = messageService;
this.connectionManager = connectionManager;
}
/**
* 处理 WebSocket 连接
*
* @param socket WebSocket 连接
*/
public void handle(ServerWebSocket socket) {
// 生成唯一的 Socket ID因为 MQTT 使用二进制协议textHandlerID() 会返回 null
String socketId = IdUtil.simpleUUID();
socketIdMap.put(socket, socketId);
log.info("[handle][WebSocket 连接建立socketId: {}remoteAddress: {}]",
socketId, socket.remoteAddress());
// 设置二进制数据处理器
socket.binaryMessageHandler(buffer -> {
try {
handleMqttMessage(socket, buffer);
} catch (Exception e) {
log.error("[handle][处理 MQTT 消息异常socketId: {}]", socketId, e);
socket.close();
}
});
// 设置关闭处理器
socket.closeHandler(v -> {
socketIdMap.remove(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
deviceMessageIdMap.remove(deviceKey);
// 发送设备离线消息
sendOfflineMessage(device);
log.info("[handle][WebSocket 连接关闭deviceKey: {}socketId: {}]", deviceKey, socketId);
}
});
// 设置异常处理器
socket.exceptionHandler(e -> {
log.error("[handle][WebSocket 连接异常socketId: {}]", socketId, e);
socketIdMap.remove(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
deviceMessageIdMap.remove(deviceKey);
}
socket.close();
});
}
/**
* 处理 MQTT 消息
*
* @param socket WebSocket 连接
* @param buffer 消息缓冲区
*/
private void handleMqttMessage(ServerWebSocket socket, Buffer buffer) {
String socketId = socketIdMap.get(socket);
ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer.getBytes());
try {
// 使用 EmbeddedChannel 解码 MQTT 消息
EmbeddedChannel decoderChannel = decoderChannelThreadLocal.get();
decoderChannel.writeInbound(byteBuf.retain());
// 读取解码后的消息
MqttMessage mqttMessage = decoderChannel.readInbound();
if (mqttMessage == null) {
log.warn("[handleMqttMessage][MQTT 消息解码失败socketId: {}]", socketId);
return;
}
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
log.debug("[handleMqttMessage][收到 MQTT 消息,类型: {}socketId: {}]", messageType, socketId);
// 根据消息类型分发处理
switch (messageType) {
case CONNECT:
handleConnect(socket, (MqttConnectMessage) mqttMessage);
break;
case PUBLISH:
handlePublish(socket, (MqttPublishMessage) mqttMessage);
break;
case PUBACK:
handlePubAck(socket, mqttMessage);
break;
case PUBREC:
handlePubRec(socket, mqttMessage);
break;
case PUBREL:
handlePubRel(socket, mqttMessage);
break;
case PUBCOMP:
handlePubComp(socket, mqttMessage);
break;
case SUBSCRIBE:
handleSubscribe(socket, (MqttSubscribeMessage) mqttMessage);
break;
case UNSUBSCRIBE:
handleUnsubscribe(socket, (MqttUnsubscribeMessage) mqttMessage);
break;
case PINGREQ:
handlePingReq(socket);
break;
case DISCONNECT:
handleDisconnect(socket);
break;
default:
log.warn("[handleMqttMessage][不支持的消息类型: {}socketId: {}]", messageType, socketId);
}
} catch (DecoderException e) {
log.error("[handleMqttMessage][MQTT 消息解码异常socketId: {}]", socketId, e);
socket.close();
} catch (Exception e) {
log.error("[handleMqttMessage][处理 MQTT 消息失败socketId: {}]", socketId, e);
socket.close();
} finally {
byteBuf.release();
}
}
/**
* 处理 CONNECT 消息(设备认证)
*/
private void handleConnect(ServerWebSocket socket, MqttConnectMessage message) {
String socketId = socketIdMap.get(socket);
try {
// 1. 解析 CONNECT 消息
MqttConnectPayload payload = message.payload();
String clientId = payload.clientIdentifier();
String username = payload.userName();
String password = payload.passwordInBytes() != null
? new String(payload.passwordInBytes(), StandardCharsets.UTF_8)
: null;
log.info("[handleConnect][收到 CONNECT 消息clientId: {}username: {}socketId: {}]",
clientId, username, socketId);
// 2. 设备认证
IotDeviceRespDTO device = authenticateDevice(clientId, username, password);
if (device == null) {
log.warn("[handleConnect][设备认证失败clientId: {}socketId: {}]", clientId, socketId);
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
socket.close();
return;
}
// 3. 保存设备信息
socketDeviceMap.put(socketId, device);
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.addConnection(deviceKey, socket, socketId);
deviceMessageIdMap.put(deviceKey, new AtomicInteger(1));
log.info("[handleConnect][设备认证成功deviceId: {}deviceKey: {}socketId: {}]",
device.getId(), deviceKey, socketId);
// 4. 发送 CONNACK
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_ACCEPTED);
// 5. 发送设备上线消息
sendOnlineMessage(device);
} catch (Exception e) {
log.error("[handleConnect][处理 CONNECT 消息失败socketId: {}]", socketId, e);
sendConnAck(socket, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
socket.close();
}
}
/**
* 处理 PUBLISH 消息(设备发布消息)
*/
private void handlePublish(ServerWebSocket socket, MqttPublishMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePublish][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
try {
// 1. 解析 PUBLISH 消息
MqttFixedHeader fixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuf payload = message.payload();
String topic = variableHeader.topicName();
int messageId = variableHeader.packetId();
MqttQoS qos = fixedHeader.qosLevel();
log.debug("[handlePublish][收到 PUBLISH 消息topic: {}messageId: {}QoS: {}deviceId: {}]",
topic, messageId, qos, device.getId());
// 2. 读取 payload
byte[] payloadBytes = new byte[payload.readableBytes()];
payload.readBytes(payloadBytes);
// 3. 解码并发送消息
IotDeviceMessage deviceMessage = messageService.decodeDeviceMessage(payloadBytes,
device.getProductKey(), device.getDeviceName());
if (deviceMessage != null) {
deviceMessage.setServerId(upstreamProtocol.getServerId());
messageService.sendDeviceMessage(deviceMessage, device.getProductKey(),
device.getDeviceName(), upstreamProtocol.getServerId());
log.info("[handlePublish][设备消息已发送method: {}deviceId: {}]",
deviceMessage.getMethod(), device.getId());
}
// 4. 根据 QoS 级别发送相应的确认消息
if (qos == MqttQoS.AT_LEAST_ONCE) {
// QoS 1发送 PUBACK
sendPubAck(socket, messageId);
} else if (qos == MqttQoS.EXACTLY_ONCE) {
// QoS 2发送 PUBREC
sendPubRec(socket, messageId);
}
// QoS 0 无需确认
} catch (Exception e) {
log.error("[handlePublish][处理 PUBLISH 消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 处理 PUBACK 消息QoS 1 确认)
*/
private void handlePubAck(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubAck][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubAck][收到 PUBACKmessageId: {}deviceId: {}]", messageId, device.getId());
}
/**
* 处理 PUBREC 消息QoS 2 第一步确认)
*/
private void handlePubRec(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubRec][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubRec][收到 PUBRECmessageId: {}deviceId: {}]", messageId, device.getId());
// 发送 PUBREL
sendPubRel(socket, messageId);
}
/**
* 处理 PUBREL 消息QoS 2 第二步)
*/
private void handlePubRel(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubRel][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubRel][收到 PUBRELmessageId: {}deviceId: {}]", messageId, device.getId());
// 发送 PUBCOMP
sendPubComp(socket, messageId);
}
/**
* 处理 PUBCOMP 消息QoS 2 完成确认)
*/
private void handlePubComp(ServerWebSocket socket, MqttMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePubComp][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
log.debug("[handlePubComp][收到 PUBCOMPmessageId: {}deviceId: {}]", messageId, device.getId());
}
/**
* 处理 SUBSCRIBE 消息(设备订阅主题)
*/
private void handleSubscribe(ServerWebSocket socket, MqttSubscribeMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handleSubscribe][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
try {
// 1. 解析 SUBSCRIBE 消息
int messageId = message.variableHeader().messageId();
MqttSubscribePayload payload = message.payload();
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
log.info("[handleSubscribe][设备订阅请求deviceKey: {}messageId: {},主题数量: {}]",
deviceKey, messageId, payload.topicSubscriptions().size());
// 2. 构建 QoS 列表并记录订阅信息
int[] grantedQosList = new int[payload.topicSubscriptions().size()];
for (int i = 0; i < payload.topicSubscriptions().size(); i++) {
MqttTopicSubscription subscription = payload.topicSubscriptions().get(i);
String topic = subscription.topicFilter();
grantedQosList[i] = subscription.qualityOfService().value();
// 记录订阅信息到连接管理器
connectionManager.addSubscription(deviceKey, topic);
log.info("[handleSubscribe][订阅主题: {}QoS: {}deviceKey: {}]",
topic, subscription.qualityOfService(), deviceKey);
}
// 3. 发送 SUBACK
sendSubAck(socket, messageId, grantedQosList);
} catch (Exception e) {
log.error("[handleSubscribe][处理 SUBSCRIBE 消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 处理 UNSUBSCRIBE 消息(设备取消订阅)
*/
private void handleUnsubscribe(ServerWebSocket socket, MqttUnsubscribeMessage message) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handleUnsubscribe][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
try {
// 1. 解析 UNSUBSCRIBE 消息
int messageId = message.variableHeader().messageId();
MqttUnsubscribePayload payload = message.payload();
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
log.info("[handleUnsubscribe][设备取消订阅deviceKey: {}messageId: {},主题数量: {}]",
deviceKey, messageId, payload.topics().size());
// 2. 移除订阅信息
for (String topic : payload.topics()) {
connectionManager.removeSubscription(deviceKey, topic);
log.info("[handleUnsubscribe][取消订阅主题: {}deviceKey: {}]", topic, deviceKey);
}
// 3. 发送 UNSUBACK
sendUnsubAck(socket, messageId);
} catch (Exception e) {
log.error("[handleUnsubscribe][处理 UNSUBSCRIBE 消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 处理 PINGREQ 消息(心跳请求)
*/
private void handlePingReq(ServerWebSocket socket) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.get(socketId);
if (device == null) {
log.warn("[handlePingReq][设备未认证socketId: {}]", socketId);
socket.close();
return;
}
log.debug("[handlePingReq][收到心跳请求deviceId: {}]", device.getId());
// 发送 PINGRESP
sendPingResp(socket);
}
/**
* 处理 DISCONNECT 消息(设备断开连接)
*/
private void handleDisconnect(ServerWebSocket socket) {
String socketId = socketIdMap.get(socket);
IotDeviceRespDTO device = socketDeviceMap.remove(socketId);
if (device != null) {
String deviceKey = device.getProductKey() + ":" + device.getDeviceName();
connectionManager.removeConnection(deviceKey);
deviceMessageIdMap.remove(deviceKey);
sendOfflineMessage(device);
log.info("[handleDisconnect][设备主动断开连接deviceKey: {}]", deviceKey);
}
socket.close();
}
// ==================== 设备认证和状态相关方法 ====================
/**
* 设备认证
*/
private IotDeviceRespDTO authenticateDevice(String clientId, String username, String password) {
try {
// 1. 参数校验
if (StrUtil.hasEmpty(clientId, username, password)) {
log.warn("[authenticateDevice][认证参数不完整clientId: {}username: {}]", clientId, username);
return null;
}
// 2. 构建认证参数并调用 API
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) {
log.warn("[authenticateDevice][设备认证失败clientId: {}]", clientId);
return null;
}
// 3. 获取设备信息
IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[authenticateDevice][用户名格式不正确username: {}]", username);
return null;
}
IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO()
.setProductKey(deviceInfo.getProductKey())
.setDeviceName(deviceInfo.getDeviceName());
CommonResult<IotDeviceRespDTO> deviceResult = deviceApi.getDevice(getReqDTO);
if (!deviceResult.isSuccess() || deviceResult.getData() == null) {
log.warn("[authenticateDevice][获取设备信息失败username: {}]", username);
return null;
}
return deviceResult.getData();
} catch (Exception e) {
log.error("[authenticateDevice][设备认证异常clientId: {}]", clientId, e);
return null;
}
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
messageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), upstreamProtocol.getServerId());
log.info("[sendOnlineMessage][设备上线deviceId: {}]", device.getId());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送设备上线消息失败deviceId: {}]", device.getId(), e);
}
}
/**
* 发送设备离线消息
*/
private void sendOfflineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
messageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
device.getDeviceName(), upstreamProtocol.getServerId());
log.info("[sendOfflineMessage][设备离线deviceId: {}]", device.getId());
} catch (Exception e) {
log.error("[sendOfflineMessage][发送设备离线消息失败deviceId: {}]", device.getId(), e);
}
}
// ==================== 发送响应消息的辅助方法 ====================
/**
* 发送 CONNACK 消息
*/
private void sendConnAck(ServerWebSocket socket, MqttConnectReturnCode returnCode) {
try {
// 构建 CONNACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false);
MqttConnAckMessage connAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, connAckMessage);
log.debug("[sendConnAck][发送 CONNACK 消息returnCode: {}]", returnCode);
} catch (Exception e) {
log.error("[sendConnAck][发送 CONNACK 消息失败]", e);
}
}
/**
* 发送 PUBACK 消息QoS 1 确认)
*/
private void sendPubAck(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubAckMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubAckMessage);
log.debug("[sendPubAck][发送 PUBACK 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubAck][发送 PUBACK 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PUBREC 消息QoS 2 第一步确认)
*/
private void sendPubRec(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBREC 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubRecMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubRecMessage);
log.debug("[sendPubRec][发送 PUBREC 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubRec][发送 PUBREC 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PUBREL 消息QoS 2 第二步)
*/
private void sendPubRel(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBREL 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubRelMessage);
log.debug("[sendPubRel][发送 PUBREL 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubRel][发送 PUBREL 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PUBCOMP 消息QoS 2 完成确认)
*/
private void sendPubComp(ServerWebSocket socket, int messageId) {
try {
// 构建 PUBCOMP 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubCompMessage = new MqttMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, pubCompMessage);
log.debug("[sendPubComp][发送 PUBCOMP 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendPubComp][发送 PUBCOMP 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 SUBACK 消息
*/
private void sendSubAck(ServerWebSocket socket, int messageId, int[] grantedQosList) {
try {
// 构建 SUBACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQosList);
MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload);
// 编码并发送
sendMqttMessage(socket, subAckMessage);
log.debug("[sendSubAck][发送 SUBACK 消息messageId: {},主题数量: {}]", messageId, grantedQosList.length);
} catch (Exception e) {
log.error("[sendSubAck][发送 SUBACK 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 UNSUBACK 消息
*/
private void sendUnsubAck(ServerWebSocket socket, int messageId) {
try {
// 构建 UNSUBACK 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttUnsubAckMessage unsubAckMessage = new MqttUnsubAckMessage(fixedHeader, variableHeader);
// 编码并发送
sendMqttMessage(socket, unsubAckMessage);
log.debug("[sendUnsubAck][发送 UNSUBACK 消息messageId: {}]", messageId);
} catch (Exception e) {
log.error("[sendUnsubAck][发送 UNSUBACK 消息失败messageId: {}]", messageId, e);
}
}
/**
* 发送 PINGRESP 消息
*/
private void sendPingResp(ServerWebSocket socket) {
try {
// 构建 PINGRESP 消息
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pingRespMessage = new MqttMessage(fixedHeader);
// 编码并发送
sendMqttMessage(socket, pingRespMessage);
log.debug("[sendPingResp][发送 PINGRESP 消息]");
} catch (Exception e) {
log.error("[sendPingResp][发送 PINGRESP 消息失败]", e);
}
}
/**
* 发送 MQTT 消息到 WebSocket
*/
private void sendMqttMessage(ServerWebSocket socket, MqttMessage mqttMessage) {
ByteBuf byteBuf = null;
try {
// 使用 EmbeddedChannel 编码 MQTT 消息
EmbeddedChannel encoderChannel = encoderChannelThreadLocal.get();
encoderChannel.writeOutbound(mqttMessage);
// 读取编码后的 ByteBuf
byteBuf = encoderChannel.readOutbound();
if (byteBuf != null) {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
socket.writeBinaryMessage(Buffer.buffer(bytes));
}
} finally {
if (byteBuf != null) {
byteBuf.release();
}
}
}
}

View File

@@ -99,11 +99,24 @@ yudao:
# 针对引入的 MQTT 组件的配置
# ====================================
mqtt:
enabled: true
enabled: false
port: 1883
max-message-size: 8192
connect-timeout-seconds: 60
ssl-enabled: false
# ====================================
# 针对引入的 MQTT WebSocket 组件的配置
# ====================================
mqtt-ws:
enabled: false # 是否启用 MQTT WebSocket
port: 8083 # WebSocket 服务端口
path: /mqtt # WebSocket 路径
max-message-size: 8192 # 最大消息大小(字节)
max-frame-size: 65536 # 最大帧大小(字节)
connect-timeout-seconds: 60 # 连接超时时间(秒)
keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒)
ssl-enabled: false # 是否启用 SSLwss://
sub-protocol: mqtt # WebSocket 子协议
--- #################### 日志相关配置 ####################
@@ -123,6 +136,7 @@ logging:
cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG
# 根日志级别
root: INFO

View File

@@ -0,0 +1,888 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta content="width=device-width, initial-scale=1.0" name="viewport">
<title>MQTT WebSocket 测试客户端</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 20px;
min-height: 100vh;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
border-radius: 10px;
box-shadow: 0 10px 40px rgba(0, 0, 0, 0.1);
overflow: hidden;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
text-align: center;
}
.header h1 {
font-size: 28px;
margin-bottom: 10px;
}
.header p {
opacity: 0.9;
}
.info-box {
background: #f8f9fa;
border-left: 4px solid #667eea;
padding: 15px;
margin: 20px;
border-radius: 5px;
}
.info-box h3 {
color: #667eea;
margin-bottom: 10px;
font-size: 16px;
}
.info-box ul {
margin-left: 20px;
color: #666;
font-size: 14px;
line-height: 1.6;
}
.info-box code {
background: #e9ecef;
padding: 2px 6px;
border-radius: 3px;
font-family: 'Courier New', monospace;
color: #d63384;
}
.content {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
padding: 30px;
}
.panel {
background: #f8f9fa;
border-radius: 8px;
padding: 20px;
}
.panel h2 {
color: #667eea;
margin-bottom: 20px;
font-size: 20px;
border-bottom: 2px solid #667eea;
padding-bottom: 10px;
}
.form-group {
margin-bottom: 15px;
}
.form-group label {
display: block;
margin-bottom: 5px;
color: #333;
font-weight: 500;
}
.form-group input,
.form-group select,
.form-group textarea {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 14px;
}
.form-group input:focus,
.form-group select:focus,
.form-group textarea:focus {
outline: none;
border-color: #667eea;
}
.form-group textarea {
resize: vertical;
min-height: 80px;
font-family: monospace;
}
.btn {
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
font-weight: 500;
transition: all 0.3s;
}
.btn-primary {
background: #667eea;
color: white;
}
.btn-primary:hover {
background: #5568d3;
}
.btn-success {
background: #28a745;
color: white;
}
.btn-success:hover {
background: #218838;
}
.btn-danger {
background: #dc3545;
color: white;
}
.btn-danger:hover {
background: #c82333;
}
.btn-warning {
background: #ffc107;
color: #333;
}
.btn-warning:hover {
background: #e0a800;
}
.btn-group {
display: flex;
gap: 10px;
margin-top: 15px;
}
.status {
padding: 10px;
border-radius: 4px;
margin-bottom: 20px;
font-weight: 500;
}
.status.disconnected {
background: #f8d7da;
color: #721c24;
}
.status.connected {
background: #d4edda;
color: #155724;
}
.status.connecting {
background: #fff3cd;
color: #856404;
}
.log-area {
background: #1e1e1e;
color: #d4d4d4;
padding: 15px;
border-radius: 4px;
height: 400px;
overflow-y: auto;
font-family: 'Courier New', monospace;
font-size: 12px;
line-height: 1.6;
}
.log-entry {
margin-bottom: 5px;
}
.log-entry.info {
color: #4ec9b0;
}
.log-entry.success {
color: #6a9955;
}
.log-entry.error {
color: #f48771;
}
.log-entry.warning {
color: #dcdcaa;
}
.stats {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 10px;
margin-top: 15px;
}
.stat-item {
background: white;
padding: 15px;
border-radius: 4px;
text-align: center;
}
.stat-item .value {
font-size: 24px;
font-weight: bold;
color: #667eea;
}
.stat-item .label {
font-size: 12px;
color: #666;
margin-top: 5px;
}
@media (max-width: 768px) {
.content {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🚀 MQTT WebSocket 测试客户端</h1>
<p>RuoYi-Vue-Pro IoT 模块 - MQTT over WebSocket 在线测试工具</p>
</div>
<!-- 协议格式说明 -->
<div class="info-box">
<h3>📌 标准协议格式说明</h3>
<ul>
<li><strong>Topic 格式:</strong><code>/sys/{productKey}/{deviceName}/thing/property/post</code></li>
<li><strong>Client ID 格式:</strong><code>{productKey}.{deviceName}</code> 例如:<code>zOXKLvHjUqTo7ipD.ceshi001</code>
</li>
<li><strong>Username 格式:</strong><code>{deviceName}&{productKey}</code> 例如:<code>ceshi001&zOXKLvHjUqTo7ipD</code>
</li>
<li><strong>消息格式Alink 协议):</strong>
<pre style="background: #e9ecef; padding: 10px; border-radius: 5px; margin-top: 5px; overflow-x: auto;">
{
"id": "消息 ID唯一标识",
"version": "1.0",
"method": "thing.property.post",
"params": {
"temperature": 25.5,
"humidity": 60
}
}</pre>
</li>
<li><strong>常用 Topic下行 - 服务端推送):</strong>
<ul style="margin-top: 5px;">
<li>属性设置:<code>/sys/{pk}/{dn}/thing/property/set</code></li>
<li>服务调用:<code>/sys/{pk}/{dn}/thing/service/invoke</code></li>
<li>配置推送:<code>/sys/{pk}/{dn}/thing/config/push</code></li>
<li>OTA 升级:<code>/sys/{pk}/{dn}/thing/ota/upgrade</code></li>
</ul>
</li>
<li><strong>常用 Topic上行 - 设备上报):</strong>
<ul style="margin-top: 5px;">
<li>状态更新:<code>/sys/{pk}/{dn}/thing/state/update</code></li>
<li>属性上报:<code>/sys/{pk}/{dn}/thing/property/post</code></li>
<li>事件上报:<code>/sys/{pk}/{dn}/thing/event/post</code></li>
<li>OTA 进度:<code>/sys/{pk}/{dn}/thing/ota/progress</code></li>
</ul>
</li>
</ul>
</div>
<div class="content">
<!-- 连接配置面板 -->
<div class="panel">
<h2>📡 连接配置</h2>
<div class="status disconnected" id="statusBar">
⚫ 未连接
</div>
<div class="form-group">
<label>服务器地址</label>
<input id="serverUrl" placeholder="ws://host:port/path" type="text" value="ws://localhost:8083/mqtt">
<small style="color: #666; font-size: 12px;">WebSocket 地址,支持 ws:// 和 wss://</small>
</div>
<div class="form-group">
<label>Client ID</label>
<input id="clientId" placeholder="设备客户端 ID" type="text" value="fqTn4Afs982Nak4N.jiali001">
<small style="color: #666; font-size: 12px;">格式:{productKey}.{deviceName}</small>
</div>
<div class="form-group">
<label>Username</label>
<input id="username" placeholder="用户名" type="text" value="jiali001&fqTn4Afs982Nak4N">
<small style="color: #666; font-size: 12px;">格式:{deviceName}&{productKey}</small>
</div>
<div class="form-group">
<label>Password</label>
<input id="password" placeholder="设备密钥"
type="password" value="ae10188f93febbb6b37bd57f463b2a795ae2800fab8933aef75d3c6422873f28">
<small style="color: #666; font-size: 12px;">设备的认证密钥Device Secret</small>
</div>
<div class="btn-group">
<button class="btn btn-success" id="connectBtn" onclick="connect()">🔌 连接</button>
<button class="btn btn-danger" disabled id="disconnectBtn" onclick="disconnect()">🔌 断开</button>
<button class="btn btn-warning" onclick="clearLogs()">🗑️ 清空日志</button>
</div>
<!-- 统计信息 -->
<div class="stats">
<div class="stat-item">
<div class="value" id="sentCount">0</div>
<div class="label">发送消息数</div>
</div>
<div class="stat-item">
<div class="value" id="receivedCount">0</div>
<div class="label">接收消息数</div>
</div>
<div class="stat-item">
<div class="value" id="errorCount">0</div>
<div class="label">错误次数</div>
</div>
</div>
</div>
<!-- 消息发布面板 -->
<div class="panel">
<h2>📤 消息发布</h2>
<div class="form-group">
<label>快捷主题选择(上行消息 - 设备 → 服务端)</label>
<select id="quickPublishTopicSelect" onchange="selectQuickPublishTopic()"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 5px;">
<option value="">-- 选择上行消息类型 --</option>
<option value="thing.state.update">设备状态更新 (thing.state.update)</option>
<option value="thing.property.post">属性上报 (thing.property.post)</option>
<option value="thing.event.post">事件上报 (thing.event.post)</option>
<option value="thing.ota.progress">OTA 升级进度 (thing.ota.progress)</option>
</select>
</div>
<div class="form-group">
<label>主题 (Topic)</label>
<input id="pubTopic" placeholder="消息主题,格式:/sys/{productKey}/{deviceName}/thing/property/post" type="text"
value="/sys/fqTn4Afs982Nak4N/jiali001/thing/property/post">
<small style="color: #666; font-size: 12px;">标准格式:/sys/{productKey}/{deviceName}/thing/property/post</small>
</div>
<div class="form-group">
<label>QoS 级别</label>
<select id="pubQos">
<option value="0">0 - 最多一次</option>
<option selected value="1">1 - 至少一次</option>
<option value="2">2 - 刚好一次</option>
</select>
</div>
<div class="form-group">
<label>消息内容 (JSON - Alink 协议格式)</label>
<textarea id="pubMessage" placeholder='Alink 协议格式消息'>{
"id": "123456789",
"version": "1.0",
"method": "thing.property.post",
"params": {
"temperature": 25.5,
"humidity": 60
}
}</textarea>
<small style="color: #666; font-size: 12px;">
Alink 协议格式id消息 ID、version协议版本、method方法、params参数
</small>
</div>
<div class="btn-group">
<button class="btn btn-primary" onclick="publish()">📤 发布消息</button>
<button class="btn btn-success" onclick="publishSampleData()">📊 发送样例数据</button>
</div>
<h2 style="margin-top: 30px;">📥 主题订阅</h2>
<div class="form-group">
<label>快捷主题选择(下行消息 - 服务端 → 设备)</label>
<select id="quickTopicSelect" onchange="selectQuickTopic()"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 5px;">
<option value="">-- 选择下行消息类型 --</option>
<optgroup label="📥 下行消息">
<option value="thing.property.set">属性设置 (thing.property.set)</option>
<option value="thing.service.invoke">服务调用 (thing.service.invoke)</option>
<option value="thing.config.push">配置推送 (thing.config.push)</option>
<option value="thing.ota.upgrade">OTA 固件推送 (thing.ota.upgrade)</option>
</optgroup>
<optgroup label="🔄 回复主题(上行消息的回复)">
<option value="thing.property.post_reply">属性上报回复 (thing.property.post_reply)</option>
<option value="thing.event.post_reply">事件上报回复 (thing.event.post_reply)</option>
</optgroup>
<optgroup label="🔧 通配符订阅">
<option value="wildcard_all">订阅所有主题 (/sys/+/+/#)</option>
<option value="wildcard_thing">订阅所有 thing 主题 (/sys/+/+/thing/#)</option>
<option value="wildcard_reply">订阅所有回复主题 (/sys/+/+/#_reply)</option>
</optgroup>
</select>
</div>
<div class="form-group">
<label>订阅主题</label>
<input id="subTopic" placeholder="订阅主题,格式:/sys/{productKey}/{deviceName}/thing/property/set" type="text"
value="/sys/fqTn4Afs982Nak4N/jiali001/thing/property/set">
<small style="color: #666; font-size: 12px;">标准格式:/sys/{productKey}/{deviceName}/thing/method 或使用通配符
/sys/+/+/#</small>
</div>
<div class="form-group">
<label>QoS 级别</label>
<select id="subQos">
<option value="0">0 - 最多一次</option>
<option selected value="1">1 - 至少一次</option>
<option value="2">2 - 刚好一次</option>
</select>
</div>
<div class="btn-group">
<button class="btn btn-primary" onclick="subscribe()">📥 订阅</button>
<button class="btn btn-danger" onclick="unsubscribe()">❌ 取消订阅</button>
</div>
</div>
<!-- 日志面板 -->
<div class="panel" style="grid-column: 1 / -1;">
<h2>📝 日志输出</h2>
<div class="log-area" id="logArea"></div>
</div>
</div>
</div>
<!-- 使用 MQTT.js 库 -->
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
let client = null;
let sentCount = 0;
let receivedCount = 0;
let errorCount = 0;
// 添加日志
function addLog(message, type = 'info') {
const logArea = document.getElementById('logArea');
const timestamp = new Date().toLocaleTimeString();
const logEntry = document.createElement('div');
logEntry.className = `log-entry ${type}`;
logEntry.textContent = `[${timestamp}] ${message}`;
logArea.appendChild(logEntry);
logArea.scrollTop = logArea.scrollHeight;
}
// 更新状态栏
function updateStatus(status, text) {
const statusBar = document.getElementById('statusBar');
statusBar.className = `status ${status}`;
const icons = {
'disconnected': '⚫',
'connecting': '🟡',
'connected': '🟢'
};
statusBar.textContent = `${icons[status]} ${text}`;
}
// 更新统计信息
function updateStats() {
document.getElementById('sentCount').textContent = sentCount;
document.getElementById('receivedCount').textContent = receivedCount;
document.getElementById('errorCount').textContent = errorCount;
}
// 连接到服务器
function connect() {
const serverUrl = document.getElementById('serverUrl').value;
const clientId = document.getElementById('clientId').value;
const username = document.getElementById('username').value;
const password = document.getElementById('password').value;
if (!serverUrl || !clientId) {
addLog('❌ 请填写服务器地址和 Client ID', 'error');
errorCount++;
updateStats();
return;
}
updateStatus('connecting', '正在连接...');
addLog(`🔄 正在连接到 ${serverUrl}...`, 'info');
const options = {
clientId: clientId,
username: username,
password: password,
clean: true,
reconnectPeriod: 5000,
connectTimeout: 30000,
};
client = mqtt.connect(serverUrl, options);
// 连接成功
client.on('connect', () => {
updateStatus('connected', '已连接');
addLog('✅ 连接成功!', 'success');
document.getElementById('connectBtn').disabled = true;
document.getElementById('disconnectBtn').disabled = false;
});
// 接收消息
client.on('message', (topic, message) => {
receivedCount++;
updateStats();
addLog(`📥 收到消息 [${topic}]: ${message.toString()}`, 'success');
});
// 连接错误
client.on('error', (error) => {
errorCount++;
updateStats();
addLog(`❌ 连接错误: ${error.message}`, 'error');
});
// 断开连接
client.on('close', () => {
updateStatus('disconnected', '未连接');
addLog('🔌 连接已断开', 'warning');
document.getElementById('connectBtn').disabled = false;
document.getElementById('disconnectBtn').disabled = true;
});
// 离线
client.on('offline', () => {
updateStatus('disconnected', '离线');
addLog('⚠️ 客户端离线', 'warning');
});
// 重连
client.on('reconnect', () => {
updateStatus('connecting', '正在重连...');
addLog('🔄 正在重连...', 'info');
});
}
// 断开连接
function disconnect() {
if (client) {
client.end();
addLog('👋 主动断开连接', 'info');
}
}
// 发布消息
function publish() {
if (!client || !client.connected) {
addLog('❌ 请先连接到服务器', 'error');
errorCount++;
updateStats();
return;
}
const topic = document.getElementById('pubTopic').value;
const qos = parseInt(document.getElementById('pubQos').value);
const message = document.getElementById('pubMessage').value;
if (!topic || !message) {
addLog('❌ 请填写主题和消息内容', 'error');
errorCount++;
updateStats();
return;
}
// 验证 JSON 格式
try {
JSON.parse(message);
} catch (e) {
addLog('⚠️ 消息不是有效的 JSON 格式,将作为纯文本发送', 'warning');
}
client.publish(topic, message, {qos: qos}, (error) => {
if (error) {
errorCount++;
updateStats();
addLog(`❌ 发布失败: ${error.message}`, 'error');
} else {
sentCount++;
updateStats();
addLog(`📤 消息已发布 [${topic}] (QoS ${qos})`, 'success');
}
});
}
// 发送样例数据
function publishSampleData() {
// 使用 Alink 协议格式的样例数据
const sampleData = {
id: Date.now().toString(),
version: "1.0",
method: "thing.property.post",
params: {
temperature: parseFloat((20 + Math.random() * 10).toFixed(2)),
humidity: parseFloat((50 + Math.random() * 20).toFixed(2)),
pressure: parseFloat((1000 + Math.random() * 50).toFixed(2))
}
};
document.getElementById('pubMessage').value = JSON.stringify(sampleData, null, 2);
addLog('样例数据已生成Alink 协议格式)', 'info');
publish();
}
// 获取 productKey 和 deviceName
function getDeviceInfo() {
const clientId = document.getElementById('clientId').value;
const parts = clientId.split('.');
if (parts.length !== 2) {
addLog('❌ Client ID 格式不正确(应为 {productKey}.{deviceName}),无法生成主题', 'error');
return null;
}
return {
productKey: parts[0],
deviceName: parts[1]
};
}
// 快捷主题选择(消息发布 - 上行消息)
function selectQuickPublishTopic() {
const select = document.getElementById('quickPublishTopicSelect');
const selectedValue = select.value;
console.log('[selectQuickPublishTopic] 选择的值:', selectedValue);
if (!selectedValue) {
return;
}
const deviceInfo = getDeviceInfo();
if (!deviceInfo) {
return;
}
console.log('[selectQuickPublishTopic] 设备信息:', deviceInfo);
// 构建标准主题,将枚举中的点号替换为斜杠
// 例如thing.property.post -> /sys/{pk}/{dn}/thing/property/post
const topic = `/sys/${deviceInfo.productKey}/${deviceInfo.deviceName}/${selectedValue.replace(/\./g, '/')}`;
console.log('[selectQuickPublishTopic] 生成的主题:', topic);
const pubTopicInput = document.getElementById('pubTopic');
pubTopicInput.value = topic;
console.log('[selectQuickPublishTopic] 输入框的值已设置为:', pubTopicInput.value);
addLog(`📋 已选择发布主题: ${topic}`, 'info');
// 需要 reply 的消息类型(不在 REPLY_DISABLED 列表中)
const needsReply = [
'thing.property.post',
'thing.event.post'
];
// 如果需要 reply自动订阅 reply 主题
if (needsReply.includes(selectedValue)) {
const replyTopic = `${topic}_reply`;
if (client && client.connected) {
// 自动订阅 reply 主题
client.subscribe(replyTopic, {qos: 1}, (err) => {
if (!err) {
addLog(`✅ 已自动订阅回复主题: ${replyTopic}`, 'success');
} else {
addLog(`❌ 自动订阅回复主题失败: ${err.message}`, 'error');
}
});
} else {
addLog(`💡 提示: 该消息需要订阅回复主题 ${replyTopic}`, 'warning');
}
}
// 重置下拉框到默认选项
select.selectedIndex = 0;
console.log('[selectQuickPublishTopic] 下拉框已重置');
}
// 快捷主题选择(主题订阅 - 下行消息)
function selectQuickTopic() {
const select = document.getElementById('quickTopicSelect');
const selectedValue = select.value;
console.log('[selectQuickTopic] 选择的值:', selectedValue);
if (!selectedValue) {
return;
}
const subTopicInput = document.getElementById('subTopic');
// 处理通配符订阅
if (selectedValue === 'wildcard_all') {
subTopicInput.value = '/sys/+/+/#';
addLog('📋 已选择订阅主题: /sys/+/+/#(订阅所有主题)', 'info');
console.log('[selectQuickTopic] 输入框的值已设置为:', subTopicInput.value);
select.selectedIndex = 0;
console.log('[selectQuickTopic] 下拉框已重置');
return;
} else if (selectedValue === 'wildcard_thing') {
subTopicInput.value = '/sys/+/+/thing/#';
addLog('📋 已选择订阅主题: /sys/+/+/thing/#(订阅所有 thing 主题)', 'info');
console.log('[selectQuickTopic] 输入框的值已设置为:', subTopicInput.value);
select.selectedIndex = 0;
console.log('[selectQuickTopic] 下拉框已重置');
return;
} else if (selectedValue === 'wildcard_reply') {
const deviceInfo = getDeviceInfo();
if (!deviceInfo) {
select.selectedIndex = 0;
return;
}
subTopicInput.value = `/sys/${deviceInfo.productKey}/${deviceInfo.deviceName}/#_reply`;
addLog(`📋 已选择订阅主题: /sys/${deviceInfo.productKey}/${deviceInfo.deviceName}/#_reply订阅所有回复主题`, 'info');
console.log('[selectQuickTopic] 输入框的值已设置为:', subTopicInput.value);
select.selectedIndex = 0;
console.log('[selectQuickTopic] 下拉框已重置');
return;
}
const deviceInfo = getDeviceInfo();
if (!deviceInfo) {
select.selectedIndex = 0;
return;
}
console.log('[selectQuickTopic] 设备信息:', deviceInfo);
// 构建标准主题,将枚举中的点号替换为斜杠
// 例如thing.property.set -> /sys/{pk}/{dn}/thing/property/set
const topic = `/sys/${deviceInfo.productKey}/${deviceInfo.deviceName}/${selectedValue.replace(/\./g, '/')}`;
console.log('[selectQuickTopic] 生成的主题:', topic);
subTopicInput.value = topic;
addLog(`📋 已选择订阅主题: ${topic}`, 'info');
console.log('[selectQuickTopic] 输入框的值已设置为:', subTopicInput.value);
// 重置下拉框到默认选项
select.selectedIndex = 0;
console.log('[selectQuickTopic] 下拉框已重置');
}
// 订阅主题
function subscribe() {
if (!client || !client.connected) {
addLog('❌ 请先连接到服务器', 'error');
errorCount++;
updateStats();
return;
}
const topic = document.getElementById('subTopic').value;
const qos = parseInt(document.getElementById('subQos').value);
if (!topic) {
addLog('❌ 请填写订阅主题', 'error');
errorCount++;
updateStats();
return;
}
client.subscribe(topic, {qos: qos}, (error) => {
if (error) {
errorCount++;
updateStats();
addLog(`❌ 订阅失败: ${error.message}`, 'error');
} else {
addLog(`📥 已订阅主题 [${topic}] (QoS ${qos})`, 'success');
}
});
}
// 取消订阅
function unsubscribe() {
if (!client || !client.connected) {
addLog('❌ 请先连接到服务器', 'error');
errorCount++;
updateStats();
return;
}
const topic = document.getElementById('subTopic').value;
if (!topic) {
addLog('❌ 请填写要取消的订阅主题', 'error');
errorCount++;
updateStats();
return;
}
client.unsubscribe(topic, (error) => {
if (error) {
errorCount++;
updateStats();
addLog(`❌ 取消订阅失败: ${error.message}`, 'error');
} else {
addLog(`❌ 已取消订阅 [${topic}]`, 'info');
}
});
}
// 清空日志
function clearLogs() {
document.getElementById('logArea').innerHTML = '';
sentCount = 0;
receivedCount = 0;
errorCount = 0;
updateStats();
addLog('🗑️ 日志已清空', 'info');
}
// 页面加载完成
window.onload = function () {
addLog('👋 欢迎使用 MQTT WebSocket 测试客户端!', 'success');
addLog('📚 请配置连接参数后点击"连接"按钮', 'info');
};
// 页面关闭前断开连接
window.onbeforeunload = function () {
if (client && client.connected) {
client.end();
}
};
</script>
</body>
</html>

View File

@@ -69,7 +69,8 @@ spring:
username: root
password: 123456
# tdengine: # IoT 数据库(需要 IoT 物联网再开启噢!)
# url: jdbc:TAOS-WS://127.0.0.1:6041/ruoyi_vue_pro
# lazy: true # 开启懒加载,保证启动速度
# url: jdbc:TAOS-WS://127.0.0.1:6041/ruoyi_vue_pro?varcharAsString=true
# driver-class-name: com.taosdata.jdbc.ws.WebSocketDriver
# username: root
# password: taosdata