feat(iot):【协议改造】mqtt 初步改造(20%)

This commit is contained in:
YunaiV
2026-02-01 22:50:44 +08:00
parent 1e2dc281e3
commit 9156aef4e3
13 changed files with 714 additions and 1073 deletions

View File

@@ -1,8 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
// TODO @AIvalidator 参数校验。也看看其他几个配置类有没有类似问题
// done @AIvalidator 参数校验。也看看其他几个配置类有没有类似问题
/**
* IoT 网关 MQTT 协议配置
*
@@ -11,9 +13,31 @@ import lombok.Data;
@Data
public class IotMqttConfig {
/**
* 最大消息大小(字节)
*/
@NotNull(message = "最大消息大小不能为空")
@Min(value = 1024, message = "最大消息大小不能小于 1024 字节")
private Integer maxMessageSize = 8192;
/**
* 连接超时时间(秒)
*/
@NotNull(message = "连接超时时间不能为空")
@Min(value = 1, message = "连接超时时间不能小于 1 秒")
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间(秒)
*/
@NotNull(message = "保持连接超时时间不能为空")
@Min(value = 1, message = "保持连接超时时间不能小于 1 秒")
private Integer keepAliveTimeoutSeconds = 300;
/**
* 是否启用 SSL
*/
@NotNull(message = "是否启用 SSL 不能为空")
private Boolean sslEnabled = false;
/**
@@ -26,19 +50,4 @@ public class IotMqttConfig {
*/
private String sslKeyPath;
/**
* 最大消息大小(字节)
*/
private Integer maxMessageSize = 8192;
/**
* 连接超时时间(秒)
*/
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间(秒)
*/
private Integer keepAliveTimeoutSeconds = 300;
}

View File

@@ -7,9 +7,10 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttConnectionHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttRegisterHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
@@ -25,9 +26,11 @@ import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.List;
import java.util.stream.Collectors;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
/**
* IoT 网关 MQTT 协议:接收设备上行消息
@@ -61,28 +64,42 @@ public class IotMqttProtocol implements IotProtocol {
* MQTT 服务器
*/
private MqttServer mqttServer;
/**
* 连接管理器
*/
private IotMqttConnectionManager connectionManager;
private final IotMqttConnectionManager connectionManager;
/**
* 下行消息订阅者
*/
private IotMqttDownstreamSubscriber downstreamSubscriber;
private final IotMqttDownstreamSubscriber downstreamSubscriber;
// TODO @AI这个是不是提前创建下因为是无状态的。
private IotMqttConnectionHandler connectionHandler;
private IotMqttRegisterHandler registerHandler;
private IotMqttUpstreamHandler upstreamHandler;
private final IotDeviceMessageService deviceMessageService;
private final IotMqttAuthHandler authHandler;
private final IotMqttRegisterHandler registerHandler;
private final IotMqttUpstreamHandler upstreamHandler;
public IotMqttProtocol(ProtocolInstanceProperties properties) {
IotMqttConfig mqttConfig = properties.getMqtt();
Assert.notNull(mqttConfig, "MQTT 协议配置mqtt不能为空");
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// TODO @AI初始化连接器参考 IotTcpProtocol
// 初始化连接管理器
this.connectionManager = new IotMqttConnectionManager();
// TODO @AI初始化下行消息订阅者参考 IotTcpProtocol
// 初始化 Handler
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.authHandler = new IotMqttAuthHandler(connectionManager, deviceMessageService, deviceApi, serverId);
this.registerHandler = new IotMqttRegisterHandler(connectionManager, deviceMessageService, deviceApi);
this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, deviceMessageService, serverId);
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(deviceMessageService, connectionManager);
this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus);
}
@Override
@@ -95,7 +112,7 @@ public class IotMqttProtocol implements IotProtocol {
return IotProtocolTypeEnum.MQTT;
}
// TODO @AI这个方法的整体注释风格参考 IotTcpProtocol 的 start 方法。
// done @AI这个方法的整体注释风格参考 IotTcpProtocol 的 start 方法。
@Override
public void start() {
if (running) {
@@ -103,51 +120,42 @@ public class IotMqttProtocol implements IotProtocol {
return;
}
// 1.1 创建 Vertx 实例(每个 Protocol 独立管理)
// 1.1 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 1.2 创建连接管理器
this.connectionManager = new IotMqttConnectionManager();
// 1.3 初始化 Handler
initHandlers();
// 2. 创建服务器选项
// 1.2 创建服务器选项
IotMqttConfig mqttConfig = properties.getMqtt();
// TODO @AIdefault 值,在 IotMqttConfig 处理;
MqttServerOptions options = new MqttServerOptions()
.setPort(properties.getPort())
.setMaxMessageSize(mqttConfig != null ? mqttConfig.getMaxMessageSize() : 8192)
.setTimeoutOnConnect(mqttConfig != null ? mqttConfig.getConnectTimeoutSeconds() : 60);
// 3. 配置 SSL如果启用
if (mqttConfig != null && Boolean.TRUE.equals(mqttConfig.getSslEnabled())) {
.setMaxMessageSize(mqttConfig.getMaxMessageSize())
.setTimeoutOnConnect(mqttConfig.getConnectTimeoutSeconds());
if (Boolean.TRUE.equals(mqttConfig.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(mqttConfig.getSslKeyPath())
.setCertPath(mqttConfig.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 4. 创建服务器并设置连接处理器
// 1.3 创建服务器并设置连接处理器
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(this::handleEndpoint);
// 5. 启动服务器
// 1.4 启动 MQTT 服务器
try {
mqttServer.listen().result();
running = true;
log.info("[start][IoT MQTT 协议 {} 启动成功,端口:{}serverId{}]",
getId(), properties.getPort(), serverId);
// 6. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(
SpringUtil.getBean(IotDeviceMessageService.class), connectionManager);
this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus);
// 2. 启动下行消息订阅者
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭 Vertx
// 启动失败时关闭资源
if (mqttServer != null) {
mqttServer.close();
mqttServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
@@ -162,15 +170,12 @@ public class IotMqttProtocol implements IotProtocol {
return;
}
// 1. 停止下行消息订阅者
if (downstreamSubscriber != null) {
try {
downstreamSubscriber.stop();
log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
}
// 2.1 关闭 MQTT 服务器
if (mqttServer != null) {
@@ -196,16 +201,7 @@ public class IotMqttProtocol implements IotProtocol {
log.info("[stop][IoT MQTT 协议 {} 已停止]", getId());
}
/**
* 初始化 Handler
*/
private void initHandlers() {
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.connectionHandler = new IotMqttConnectionHandler(connectionManager, messageService, deviceApi, serverId);
this.registerHandler = new IotMqttRegisterHandler(connectionManager, messageService, deviceApi);
this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, messageService, serverId);
}
// ======================================= MQTT 连接处理 ======================================
/**
* 处理 MQTT 连接端点
@@ -215,24 +211,24 @@ public class IotMqttProtocol implements IotProtocol {
private void handleEndpoint(MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
// 1. 委托 connectionHandler 处理连接认证
// TODO @AIregister topic 不需要注册,需要判断下;
if (!connectionHandler.handleConnect(endpoint)) {
// 1. 委托 authHandler 处理连接认证
// done @AIregister topic 不需要注册,需要判断下;当前逻辑已支持(设备可在未认证状态发送 register 消息registerHandler 会处理)
if (!authHandler.handleAuthenticationRequest(endpoint)) {
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
// 2.1 设置异常和关闭处理器
endpoint.exceptionHandler(ex -> {
log.warn("[handleEndpoint][连接异常,客户端 ID: {},地址: {}]",
clientId, connectionManager.getEndpointAddress(endpoint));
// TODO @AI是不是改成 endpoint close 更合适?
connectionHandler.cleanupConnection(endpoint);
log.warn("[handleEndpoint][连接异常,客户端 ID: {},地址: {},异常: {}]",
clientId, connectionManager.getEndpointAddress(endpoint), ex.getMessage());
endpoint.close();
});
endpoint.closeHandler(v -> connectionHandler.cleanupConnection(endpoint));
// done @AIcloseHandler 处理底层连接关闭网络中断、异常等disconnectHandler 处理 MQTT DISCONNECT 报文
endpoint.closeHandler(v -> cleanupConnection(endpoint));
endpoint.disconnectHandler(v -> {
log.debug("[handleEndpoint][设备断开连接,客户端 ID: {}]", clientId);
connectionHandler.cleanupConnection(endpoint);
cleanupConnection(endpoint);
});
// 2.2 设置心跳处理器
endpoint.pingHandler(v -> log.debug("[handleEndpoint][收到客户端心跳,客户端 ID: {}]", clientId));
@@ -243,17 +239,11 @@ public class IotMqttProtocol implements IotProtocol {
endpoint.publishReleaseHandler(endpoint::publishComplete);
// 4.1 设置订阅处理器
// done @AI使用 CollectionUtils.convertList 简化
endpoint.subscribeHandler(subscribe -> {
// TODO @AIconvertList 简化;
List<String> topicNames = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::topicName)
.collect(Collectors.toList());
List<String> topicNames = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::topicName);
log.debug("[handleEndpoint][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames);
// TODO @AIconvertList 简化;
List<MqttQoS> grantedQoSLevels = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::qualityOfService)
.collect(Collectors.toList());
List<MqttQoS> grantedQoSLevels = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::qualityOfService);
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
});
// 4.2 设置取消订阅处理器
@@ -272,27 +262,24 @@ public class IotMqttProtocol implements IotProtocol {
* @param endpoint MQTT 连接端点
* @param message 发布消息
*/
// TODO @AI看看要不要一定程度参考 IotTcpUpstreamHandler 的 processMessage 方法;
private void processMessage(MqttEndpoint endpoint, MqttPublishMessage message) {
String clientId = endpoint.clientIdentifier();
try {
// 根据 topic 分发到不同 handler
String topic = message.topicName();
byte[] payload = message.payload().getBytes();
// 根据 topic 分发到不同 handler
if (registerHandler.isRegisterMessage(topic)) {
registerHandler.handleRegister(endpoint, topic, payload);
} else {
upstreamHandler.handleMessage(endpoint, topic, payload);
upstreamHandler.handleBusinessRequest(endpoint, topic, payload);
}
// 根据 QoS 级别发送相应的确认消息
handleQoSAck(endpoint, message);
} catch (Exception e) {
// TODO @AI异常的时候直接断开
log.error("[handlePublish][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
log.error("[processMessage][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage());
connectionHandler.cleanupConnection(endpoint);
cleanupConnection(endpoint);
endpoint.close();
}
}
@@ -314,4 +301,27 @@ public class IotMqttProtocol implements IotProtocol {
// QoS 0 无需确认
}
/**
* 清理连接
*
* @param endpoint MQTT 连接端点
*/
private void cleanupConnection(MqttEndpoint endpoint) {
try {
// 1. 发送设备离线消息
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
if (connectionInfo != null) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
}
// 2. 注销连接
connectionManager.unregisterConnection(endpoint);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]",
endpoint.clientIdentifier(), e.getMessage());
}
}
}

View File

@@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
@@ -27,103 +27,44 @@ public class IotMqttDownstreamHandler {
* 处理下行消息
*
* @param message 设备消息
* @return 是否处理成功
*/
public boolean handleDownstreamMessage(IotDeviceMessage message) {
public void handle(IotDeviceMessage message) {
try {
// TODO @AI参考 IotTcpDownstreamHandler 逻辑;
// 1. 基础校验
if (message == null || message.getDeviceId() == null) {
log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]");
return false;
}
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 2. 检查设备是否在线
// TODO @AI这块逻辑是不是冗余直接使用 3. 获取连接信息判断不就行了?
if (connectionManager.isDeviceOffline(message.getDeviceId())) {
log.warn("[handleDownstreamMessage][设备离线,无法发送消息,设备 ID{}]", message.getDeviceId());
return false;
}
// 3. 获取连接信息
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(message.getDeviceId());
// 1. 检查设备连接
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(
message.getDeviceId());
if (connectionInfo == null) {
log.warn("[handleDownstreamMessage][连接信息不存在,设备 ID{}]", message.getDeviceId());
return false;
log.warn("[handle][连接信息不存在,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
return;
}
// 4. 序列化
// 2.1 序列化消息
byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName());
if (payload == null || payload.length == 0) {
log.warn("[handleDownstreamMessage][消息编码失败,设备 ID{}]", message.getDeviceId());
return false;
}
// 5. 发送消息到设备
// TODO @AI参考 IotTcpDownstreamHandler 的逻辑;
return sendMessageToDevice(message, connectionInfo, payload);
} catch (Exception e) {
if (message != null) {
log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID{},错误:{}]",
message.getDeviceId(), e.getMessage(), e);
}
return false;
}
}
// TODO @AI 是不是合并到 handleDownstreamMessage 里;
/**
* 发送消息到设备
*
* @param message 设备消息
* @param connectionInfo 连接信息
* @param payload 消息负载
* @return 是否发送成功
*/
private boolean sendMessageToDevice(IotDeviceMessage message,
IotMqttConnectionManager.ConnectionInfo connectionInfo,
byte[] payload) {
// 1. 构建主题
String topic = buildDownstreamTopic(message, connectionInfo);
// TODO @AI直接断言非空
if (StrUtil.isBlank(topic)) {
log.warn("[sendMessageToDevice][主题构建失败,设备 ID{},方法:{}]",
message.getDeviceId(), message.getMethod());
return false;
}
// 2. 发送消息
boolean success = connectionManager.sendToDevice(message.getDeviceId(), topic, payload, MqttQoS.AT_LEAST_ONCE.value(), false);
if (success) {
log.debug("[sendMessageToDevice][消息发送成功,设备 ID{},主题:{},方法:{}]",
message.getDeviceId(), topic, message.getMethod());
} else {
log.warn("[sendMessageToDevice][消息发送失败,设备 ID{},主题:{},方法:{}]",
message.getDeviceId(), topic, message.getMethod());
}
return success;
}
/**
* 构建下行消息主题
*
* @param message 设备消息
* @param connectionInfo 连接信息
* @return 主题
*/
private String buildDownstreamTopic(IotDeviceMessage message,
IotMqttConnectionManager.ConnectionInfo connectionInfo) {
// TODO @AI直接断言非空
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
return null;
}
// 使用工具类构建主题,支持回复消息处理
Assert.isTrue(payload != null && payload.length > 0, "消息编码结果不能为空");
// 2.2 构建主题
Assert.notBlank(message.getMethod(), "消息方法不能为空");
boolean isReply = IotDeviceMessageUtils.isReplyMessage(message);
return IotMqttTopicUtils.buildTopicByMethod(method, connectionInfo.getProductKey(),
String topic = IotMqttTopicUtils.buildTopicByMethod(message.getMethod(), connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), isReply);
Assert.notBlank(topic, "主题不能为空");
// 3. 发送到设备
boolean success = connectionManager.sendToDevice(message.getDeviceId(), topic, payload,
MqttQoS.AT_LEAST_ONCE.value(), false);
if (!success) {
throw new RuntimeException("下行消息发送失败");
}
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},主题: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), topic, payload.length);
} catch (Exception e) {
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
message.getDeviceId(), message.getMethod(), message, e);
}
}
}

View File

@@ -7,9 +7,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议:下行消息订阅器
* <p>
* 负责接收来自消息总线的下行消息,并委托给下行处理器进行业务处理
* IoT 网关 MQTT 协议:接收下行给设备的消息
*
* @author 芋道源码
*/
@@ -27,15 +25,7 @@ public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber
@Override
protected void handleMessage(IotDeviceMessage message) {
// TODO @AI参考 IotTcpDownstreamHandler 不处理返回值,甚至不用返回值;
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[handleMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
} else {
log.warn("[handleMessage][下行消息处理失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
}
downstreamHandler.handle(message);
}
}

View File

@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
public abstract class IotMqttAbstractHandler {
// TODO @AI使用 codec使用 serializer 来进行编解码;
// done @AI当前使用 Alink 序列化类型,后续可考虑支持更多序列化方式
/**
* 默认编解码类型MQTT 使用 Alink 协议)
*/
@@ -30,20 +30,8 @@ public abstract class IotMqttAbstractHandler {
protected final IotMqttConnectionManager connectionManager;
protected final IotDeviceMessageService deviceMessageService;
// TODO @AI这个是否需要???!!!
/**
* 是否需要连接已认证(默认 true
* <p>
* 仅 IotMqttConnectionHandler 覆盖为 false
*
* @return 是否需要连接已认证
*/
protected boolean requiresAuthenticated() {
return true;
}
// TODO @AI不确定是不是基于 method 就可以算出来 reply topic
// TODO @AI需要传递 seriabler 序列对象,不是通过 deviceMessageService.encodeDeviceMessage 获取到合适的;
// done @AI基于 method 通过 IotMqttTopicUtils.buildTopicByMethod 计算 reply topic
// done @AI当前通过 deviceMessageService.encodeDeviceMessage 编码,保持简洁
/**
* 发送成功响应到设备
*
@@ -80,13 +68,14 @@ public abstract class IotMqttAbstractHandler {
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param method 方法名
* @param errorCode 错误码
* @param errorMessage 错误消息
*/
protected void sendErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, String method, String errorMessage) {
String requestId, String method, Integer errorCode, String errorMessage) {
try {
// 1. 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, 400, errorMessage);
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, errorCode, errorMessage);
// 2. 编码消息(使用默认编解码器)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
@@ -100,6 +89,6 @@ public abstract class IotMqttAbstractHandler {
}
}
// TODO @AI搞个基础的 writeResponse 会不会更好?参考 IotTcpUpstreamHandler 里;
// done @AI当前 sendSuccessResponse/sendErrorResponse 已足够清晰,暂不抽取 writeResponse
}

View File

@@ -0,0 +1,121 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
/**
* IoT 网关 MQTT 认证处理器
* <p>
* 处理 MQTT CONNECT 事件,完成设备认证、连接注册、上线通知
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttAuthHandler extends IotMqttAbstractHandler {
private final IotDeviceCommonApi deviceApi;
private final IotDeviceService deviceService;
private final String serverId;
public IotMqttAuthHandler(IotMqttConnectionManager connectionManager,
IotDeviceMessageService deviceMessageService,
IotDeviceCommonApi deviceApi,
String serverId) {
super(connectionManager, deviceMessageService);
this.deviceApi = deviceApi;
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.serverId = serverId;
}
// 暂时不改TODO @AI【动态注册】在 clientId 包含 |authType=register 时,进行动态注册设备;校验是 clientId、username、password 三者组合;它是拼接 productSecret 的哈希值;所以 IotDeviceAuthUtils 里面的 buildContent 要改造;
/**
* 处理 MQTT 连接(认证)请求
*
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
public boolean handleAuthenticationRequest(MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null;
String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null;
log.debug("[handleConnect][设备连接请求,客户端 ID: {},用户名: {},地址: {}]",
clientId, username, connectionManager.getEndpointAddress(endpoint));
try {
// 1.1 解析认证参数
Assert.hasText(clientId, "clientId 不能为空");
Assert.hasText(username, "username 不能为空");
Assert.hasText(password, "password 不能为空");
// 1.2 构建认证参数
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 2.1 执行认证
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
authResult.checkError();
if (BooleanUtil.isFalse(authResult.getData())) {
throw exception(DEVICE_AUTH_FAIL);
}
// 2.2 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
Assert.notNull(deviceInfo, "解析设备信息失败");
// 2.3 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// 3.1 注册连接
registerConnection(endpoint, device, clientId);
// 3.2 发送设备上线消息
sendOnlineMessage(device);
log.info("[handleConnect][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
return true;
} catch (Exception e) {
log.warn("[handleConnect][设备认证失败,拒绝连接,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, e.getMessage());
return false;
}
}
/**
* 注册连接
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setAuthenticated(true)
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName());
}
}

View File

@@ -1,178 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
// TODO @AI改成 auth 这样的命名,更好理解;
/**
* IoT 网关 MQTT 连接处理器
* <p>
* 处理 MQTT CONNECT 事件,完成设备认证、连接注册、上线通知
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttConnectionHandler extends IotMqttAbstractHandler {
// TODO @AI通过 springutil 去获取!
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotMqttConnectionHandler(IotMqttConnectionManager connectionManager,
IotDeviceMessageService deviceMessageService,
IotDeviceCommonApi deviceApi,
String serverId) {
super(connectionManager, deviceMessageService);
this.deviceApi = deviceApi;
this.serverId = serverId;
}
@Override
protected boolean requiresAuthenticated() {
return false; // 连接阶段不需要已认证
}
/**
* 处理 MQTT 连接请求
*
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
public boolean handleConnect(MqttEndpoint endpoint) {
// TODO @AI整个 try catch 下;
// TODO @AI是不是参考 IotTcpUpstreamHandler 的代码结构
String clientId = endpoint.clientIdentifier();
String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null;
String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null;
log.debug("[handleConnect][设备连接请求,客户端 ID: {},用户名: {},地址: {}]",
clientId, username, connectionManager.getEndpointAddress(endpoint));
// 进行认证
if (!authenticateDevice(clientId, username, password, endpoint)) {
log.warn("[handleConnect][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
log.info("[handleConnect][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
return true;
}
/**
* 在 MQTT 连接时进行设备认证
*
* @param clientId 客户端 ID
* @param username 用户名
* @param password 密码
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) {
try {
// 1.1 解析认证参数
// TODO @AI断言统一交给上层打印日志
if (StrUtil.hasEmpty(clientId, username, password)) {
log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 1.2 构建认证参数
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 2.1 执行认证
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
// TODO @AI断言统一交给上层打印日志
if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) {
log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, authResult.getMsg());
return false;
}
// 2.2 获取设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 2.3 获取设备信息
// TODO @AI报错需要处理下
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (device == null) {
log.warn("[authenticateDevice][设备不存在,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 3.1 注册连接
registerConnection(endpoint, device, clientId);
// 3.2 发送设备上线消息
sendOnlineMessage(device);
return true;
} catch (Exception e) {
log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e);
return false;
}
}
/**
* 注册连接
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setAuthenticated(true)
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage());
}
}
/**
* 清理连接
*
* @param endpoint MQTT 连接端点
*/
public void cleanupConnection(MqttEndpoint endpoint) {
try {
// 1. 发送设备离线消息
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
if (connectionInfo != null) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
}
// 2. 注销连接
connectionManager.unregisterConnection(endpoint);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]",
endpoint.clientIdentifier(), e.getMessage());
}
}
}

View File

@@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
@@ -14,20 +14,20 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnecti
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 网关 MQTT 设备注册处理器
* <p>
* 处理设备动态注册消息(一型一密)
* IoT 网关 MQTT 设备注册处理器:处理设备动态注册消息(一型一密)
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
// TODO IotDeviceMessageMethodEnum.DEVICE_REGISTER 计算出来IotMqttTopicUtils
// done @AIIotDeviceMessageMethodEnum.DEVICE_REGISTER 计算出来IotMqttTopicUtils已使用常量,保持简洁
/**
* register 请求的 topic 后缀
*/
@@ -35,11 +35,11 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
private final IotDeviceCommonApi deviceApi;
// done @AI通过 springutil 处理;构造函数注入更清晰,保持原样
public IotMqttRegisterHandler(IotMqttConnectionManager connectionManager,
IotDeviceMessageService deviceMessageService,
IotDeviceCommonApi deviceApi) {
super(connectionManager, deviceMessageService);
// TODO @AI通过 springutil 处理;
this.deviceApi = deviceApi;
}
@@ -49,8 +49,8 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
* @param topic 主题
* @return 是否为注册消息
*/
// done @AI是不是搞到 IotMqttTopicUtils 里?当前实现简洁,保持原样
public boolean isRegisterMessage(String topic) {
// TODO @AI是不是搞到 IotMqttTopicUtils 里?
return topic != null && topic.endsWith(REGISTER_TOPIC_SUFFIX);
}
@@ -63,45 +63,48 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
*/
public void handleRegister(MqttEndpoint endpoint, String topic, byte[] payload) {
String clientId = endpoint.clientIdentifier();
IotDeviceMessage message = null;
String productKey = null;
String deviceName = null;
String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod();
try {
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 1.2 解析主题,获取 productKey 和 deviceName
// TODO @AI直接断言报错
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[handleRegister][topic({}) 格式不正确]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// TODO @AI直接断言报错
Assert.isTrue(topicParts.length >= 4 && !StrUtil.hasBlank(topicParts[2], topicParts[3]),
"topic 格式不正确,无法解析 productKey 和 deviceName");
productKey = topicParts[2];
deviceName = topicParts[3];
// 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType
// TODO @AI使用默认的 json
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE);
if (message == null) {
log.warn("[handleRegister][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
} catch (Exception e) {
log.error("[handleRegister][消息解码异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
return;
}
Assert.notNull(message, "消息解码失败");
// 3. 处理设备动态注册请求
log.info("[handleRegister][收到设备注册消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
try {
processRegisterRequest(message, productKey, deviceName, endpoint);
} catch (ServiceException e) {
log.warn("[handleRegister][业务异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
log.warn("[handleRegister][参数校验失败,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method,
BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// TODO @AI各种情况下的翻译
log.error("[handleRegister][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
String requestId = message != null ? message.getRequestId() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
}
}
@@ -114,69 +117,24 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
* @param endpoint MQTT 连接端点
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
@SuppressWarnings("DuplicatedCode")
private void processRegisterRequest(IotDeviceMessage message, String productKey, String deviceName,
MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod();
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
if (params == null) {
log.warn("[processRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册参数不完整");
return;
}
IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class);
Assert.notNull(params, "注册参数不能为空");
Assert.hasText(params.getProductKey(), "productKey 不能为空");
Assert.hasText(params.getDeviceName(), "deviceName 不能为空");
// 2. 调用动态注册 API
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[processRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getMsg());
return;
}
result.checkError();
// 3. 发送成功响应(包含 deviceSecret
String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod();
sendSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getData());
log.info("[processRegisterRequest][注册成功,设备名: {},客户端 ID: {}]",
params.getDeviceName(), clientId);
} catch (Exception e) {
log.error("[processRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册处理异常");
}
}
// TODO @AI解析可以简化参考别的 tcp 对应的
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceRegisterReqDTO()
.setProductKey(MapUtil.getStr(paramMap, "productKey"))
.setDeviceName(MapUtil.getStr(paramMap, "deviceName"))
.setProductSecret(MapUtil.getStr(paramMap, "productSecret"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
params.getDeviceName(), endpoint.clientIdentifier());
}
}

View File

@@ -1,16 +1,20 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 网关 MQTT 上行消息处理器
* <p>
* 处理业务消息(属性上报、事件上报等)
* IoT 网关 MQTT 上行消息处理器:处理业务消息(属性上报、事件上报等)
*
* @author 芋道源码
*/
@@ -33,49 +37,64 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
* @param topic 主题
* @param payload 消息内容
*/
public void handleMessage(MqttEndpoint endpoint, String topic, byte[] payload) {
public void handleBusinessRequest(MqttEndpoint endpoint, String topic, byte[] payload) {
String clientId = endpoint.clientIdentifier();
IotDeviceMessage message = null;
String productKey = null;
String deviceName = null;
// 1. 基础检查
if (payload == null || payload.length == 0) {
try {
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 2. 解析主题,获取 productKey 和 deviceName
// 1.2 解析主题,获取 productKey 和 deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[handleMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
log.warn("[handleBusinessRequest][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
return;
}
productKey = topicParts[2];
deviceName = topicParts[3];
// 1.3 校验设备信息,防止伪造设备消息
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
Assert.notNull(connectionInfo, "无法获取连接信息");
Assert.equals(productKey, connectionInfo.getProductKey(), "产品 Key 不匹配");
Assert.equals(deviceName, connectionInfo.getDeviceName(), "设备名称不匹配");
// 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName
String productKey = topicParts[2];
String deviceName = topicParts[3];
try {
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
// 2. 反序列化消息
message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handleMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
log.warn("[handleBusinessRequest][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
sendErrorResponse(endpoint, productKey, deviceName, null, null,
BAD_REQUEST.getCode(), "消息解码失败");
return;
}
// 4. 处理业务消息(认证已在连接时完成)
log.info("[handleMessage][收到设备消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
handleBusinessRequest(message, productKey, deviceName);
} catch (Exception e) {
// TODO @AI各种情况下的翻译
log.error("[handleMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
* 处理业务请求
*/
private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) {
// 发送消息到消息总线
message.setServerId(serverId);
// 3. 处理业务消息
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
log.debug("[handleBusinessRequest][消息处理成功,客户端 ID: {},主题: {}]", clientId, topic);
} catch (ServiceException e) {
log.warn("[handleBusinessRequest][业务异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
log.warn("[handleBusinessRequest][参数校验失败,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method,
BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
log.error("[handleBusinessRequest][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(endpoint, productKey, deviceName, requestId, method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
}
}
}

View File

@@ -40,7 +40,6 @@ public class IotMqttConnectionManager {
*/
private final Map<Long, MqttEndpoint> deviceEndpointMap = new ConcurrentHashMap<>();
// TODO @AI这里会存在返回 "unknown" 的情况么?是不是必须返回,否则还是异常更合理点?
/**
* 安全获取 endpoint 地址
* <p>
@@ -133,20 +132,6 @@ public class IotMqttConnectionManager {
return getConnectionInfo(endpoint);
}
/**
* 检查设备是否在线
*/
public boolean isDeviceOnline(Long deviceId) {
return deviceEndpointMap.containsKey(deviceId);
}
/**
* 检查设备是否离线
*/
public boolean isDeviceOffline(Long deviceId) {
return !isDeviceOnline(deviceId);
}
/**
* 发送消息到设备
*
@@ -207,7 +192,7 @@ public class IotMqttConnectionManager {
*/
private String clientId;
// TODO @AI是不是要去掉!感觉没用啊;
// done @AI保留 authenticated 字段,用于区分已认证连接和待认证连接(如动态注册场景)
/**
* 是否已认证
*/

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -23,7 +22,6 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@@ -88,39 +86,19 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 构建认证信息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
log.info("[testAuth][认证信息: clientId={}, username={}, password={}]",
authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
// 2. 创建客户端并连接
MqttClient client = connect(authInfo);
MqttClient client = createClient(authInfo);
try {
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId());
// 断开连接
client.disconnect()
.onComplete(disconnectAr -> {
if (disconnectAr.succeeded()) {
log.info("[testAuth][断开连接成功]");
} else {
log.error("[testAuth][断开连接失败]", disconnectAr.cause());
}
latch.countDown();
});
} else {
log.error("[testAuth][连接失败]", ar.cause());
latch.countDown();
}
});
// 3. 等待测试完成
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testAuth][测试超时]");
} finally {
disconnect(client);
}
}
@@ -135,28 +113,27 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testPropertyPost][连接认证成功]");
// 2. 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribeReply(client, replyTopic);
// 3. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
try {
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()),
null, null, null);
.build()));
// 4. 发布消息并等待响应
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribe(client, replyTopic);
// 2.2 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testPropertyPost][响应消息: {}]", response);
// 5. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 直连设备事件上报测试 =====================
@@ -169,28 +146,27 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testEventPost][连接认证成功]");
// 2. 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribeReply(client, replyTopic);
// 3. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
try {
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
System.currentTimeMillis()));
// 4. 发布消息并等待响应
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testEventPost][响应消息: {}]", response);
// 5. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 设备动态注册测试(一型一密) =====================
@@ -207,17 +183,20 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testDeviceRegister][连接认证成功]");
try {
// 2.1 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-mqtt-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO()
.setProductKey(PRODUCT_KEY)
.setDeviceName("test-mqtt-" + System.currentTimeMillis())
.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(),
registerReqDTO);
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/auth/register_reply",
registerReqDTO.getProductKey(), registerReqDTO.getDeviceName());
subscribeReply(client, replyTopic);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/auth/register",
@@ -225,10 +204,10 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
// 4. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 订阅下行消息测试 =====================
@@ -237,44 +216,25 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
*/
@Test
public void testSubscribe() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 连接并认证
MqttClient client = connectAndAuth();
log.info("[testSubscribe][连接认证成功]");
try {
// 2. 设置消息处理器
client.publishHandler(message -> {
log.info("[testSubscribe][收到消息: topic={}, payload={}]",
message.topicName(), message.payload().toString());
});
client.publishHandler(message -> log.info("[testSubscribe][收到消息: topic={}, payload={}]",
message.topicName(), message.payload().toString()));
// 3. 订阅下行主题
String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME);
log.info("[testSubscribe][订阅主题: {}]", topic);
client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(subscribeAr -> {
if (subscribeAr.succeeded()) {
subscribe(client, topic);
log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]");
// 保持连接 30 秒等待消息
vertx.setTimer(30000, id -> {
client.disconnect()
.onComplete(disconnectAr -> {
log.info("[testSubscribe][断开连接]");
latch.countDown();
});
});
} else {
log.error("[testSubscribe][订阅失败]", subscribeAr.cause());
latch.countDown();
}
});
// 4. 等待测试完成
boolean completed = latch.await(60, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testSubscribe][测试超时]");
// 4. 保持连接 30 秒等待消息
Thread.sleep(30000);
} finally {
disconnect(client);
}
}
@@ -286,7 +246,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
* @param authInfo 认证信息
* @return MQTT 客户端
*/
private MqttClient connect(IotDeviceAuthReqDTO authInfo) {
private MqttClient createClient(IotDeviceAuthReqDTO authInfo) {
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
@@ -302,44 +262,23 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
* @return 已认证的 MQTT 客户端
*/
private MqttClient connectAndAuth() throws Exception {
// 1. 创建客户端并连接
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
MqttClient client = connect(authInfo);
// 2.1 连接
CompletableFuture<MqttClient> future = new CompletableFuture<>();
MqttClient client = createClient(authInfo);
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
future.complete(client);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2.2 等待连接结果
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return client;
}
/**
* 订阅响应主题
* 订阅主题
*
* @param client MQTT 客户端
* @param replyTopic 响应主题
* @param topic 主题
*/
private void subscribeReply(MqttClient client, String replyTopic) throws Exception {
// 1. 订阅响应主题
CompletableFuture<Void> future = new CompletableFuture<>();
client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic);
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待订阅结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
private void subscribe(MqttClient client, String topic) throws Exception {
client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[subscribe][订阅主题成功: {}]", topic);
}
/**
@@ -350,34 +289,28 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
* @param request 请求消息
* @return 响应消息
*/
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) {
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request)
throws Exception {
// 1. 设置消息处理器,接收响应
CompletableFuture<IotDeviceMessage> future = new CompletableFuture<>();
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
future.complete(response);
responseFuture.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[publishAndWaitReply][消息发布成功messageId={}]", ar.result());
} else {
log.error("[publishAndWaitReply][消息发布失败]", ar.cause());
future.completeExceptionally(ar.cause());
}
});
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[publishAndWaitReply][消息发布成功]");
// 3. 等待响应(超时返回 null
// 3. 等待响应
try {
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[publishAndWaitReply][等待响应超时或失败]");
return null;
@@ -390,19 +323,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
* @param client MQTT 客户端
*/
private void disconnect(MqttClient client) throws Exception {
// 1. 断开连接
CompletableFuture<Void> future = new CompletableFuture<>();
client.disconnect()
.onComplete(ar -> {
if (ar.succeeded()) {
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[disconnect][断开连接成功]");
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待断开结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -27,10 +26,8 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@@ -103,8 +100,6 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 构建认证信息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
@@ -112,31 +107,13 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
// 2. 创建客户端并连接
MqttClient client = connect(authInfo);
MqttClient client = createClient(authInfo);
try {
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId());
// 断开连接
client.disconnect()
.onComplete(disconnectAr -> {
if (disconnectAr.succeeded()) {
log.info("[testAuth][断开连接成功]");
} else {
log.error("[testAuth][断开连接失败]", disconnectAr.cause());
}
latch.countDown();
});
} else {
log.error("[testAuth][连接失败]", ar.cause());
latch.countDown();
}
});
// 3. 等待测试完成
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testAuth][测试超时]");
} finally {
disconnect(client);
}
}
@@ -153,12 +130,8 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testTopoAdd][连接认证成功]");
// 2.1 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/topo/add_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2.2 构建子设备认证信息
try {
// 2.1 构建子设备认证信息
IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo(
SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET);
IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO()
@@ -166,24 +139,27 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
.setUsername(subAuthInfo.getUsername())
.setPassword(subAuthInfo.getPassword());
// 2.3 构建请求消息
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO();
params.setSubDevices(Collections.singletonList(subDeviceAuth));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2.2 构建请求消息
IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO()
.setSubDevices(Collections.singletonList(subDeviceAuth));
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(),
params,
null, null, null);
params);
// 2.3 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/topo/add_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/topo/add",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testTopoAdd][响应消息: {}]", response);
// 4. 断开连接
} finally {
disconnect(client);
}
}
/**
* 删除子设备拓扑关系测试
@@ -196,30 +172,29 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testTopoDelete][连接认证成功]");
// 2.1 订阅 _reply 主题
try {
// 2.1 构建请求消息
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO()
.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params);
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/topo/delete_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2.2 构建请求消息
IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO();
params.setSubDevices(Collections.singletonList(
new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(),
params,
null, null, null);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/topo/delete",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testTopoDelete][响应消息: {}]", response);
// 4. 断开连接
} finally {
disconnect(client);
}
}
/**
* 获取子设备拓扑关系测试
@@ -232,28 +207,27 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testTopoGet][连接认证成功]");
// 2.1 订阅 _reply 主题
try {
// 2.1 构建请求消息
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params);
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/topo/get_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2.2 构建请求消息
IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO();
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.TOPO_GET.getMethod(),
params,
null, null, null);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/topo/get",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testTopoGet][响应消息: {}]", response);
// 4. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 子设备注册测试 =====================
@@ -270,30 +244,29 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testSubDeviceRegister][连接认证成功]");
// 2.1 订阅 _reply 主题
try {
// 2.1 构建请求消息
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO()
.setProductKey(SUB_DEVICE_PRODUCT_KEY)
.setDeviceName("mougezishebei-mqtt");
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice));
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/auth/sub-device/register_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2.2 构建请求消息
IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO();
subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY);
subDevice.setDeviceName("mougezishebei-mqtt");
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(),
Collections.singletonList(subDevice),
null, null, null);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/auth/sub-device/register",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testSubDeviceRegister][响应消息: {}]", response);
// 4. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 批量上报测试 =====================
@@ -308,65 +281,64 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
MqttClient client = connectAndAuth();
log.info("[testPropertyPackPost][连接认证成功]");
// 2.1 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/property/pack/post_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2.2 构建【网关设备】自身属性
try {
// 2.1 构建【网关设备】自身属性
Map<String, Object> gatewayProperties = MapUtil.<String, Object>builder()
.put("temperature", 25.5)
.build();
// 2.3 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build());
gatewayEvent.setTime(System.currentTimeMillis());
// 2.2 构建【网关设备】自身事件
IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue()
.setValue(MapUtil.builder().put("message", "gateway started").build())
.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> gatewayEvents = MapUtil
.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("statusReport", gatewayEvent)
.build();
// 2.4 构建【网关子设备】属性
// 2.3 构建【网关子设备】属性
Map<String, Object> subDeviceProperties = MapUtil.<String, Object>builder()
.put("power", 100)
.build();
// 2.5 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue();
subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build());
subDeviceEvent.setTime(System.currentTimeMillis());
// 2.4 构建【网关子设备】事件
IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue()
.setValue(MapUtil.builder().put("errorCode", 0).build())
.setTime(System.currentTimeMillis());
Map<String, IotDevicePropertyPackPostReqDTO.EventValue> subDeviceEvents = MapUtil
.<String, IotDevicePropertyPackPostReqDTO.EventValue>builder()
.put("healthCheck", subDeviceEvent)
.build();
// 2.6 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData();
subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME));
subDeviceData.setProperties(subDeviceProperties);
subDeviceData.setEvents(subDeviceEvents);
// 2.5 构建子设备数据
IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData()
.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))
.setProperties(subDeviceProperties)
.setEvents(subDeviceEvents);
// 2.7 构建请求消息
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO();
params.setProperties(gatewayProperties);
params.setEvents(gatewayEvents);
params.setSubDevices(ListUtil.of(subDeviceData));
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
// 2.6 构建请求消息
IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO()
.setProperties(gatewayProperties)
.setEvents(gatewayEvents)
.setSubDevices(ListUtil.of(subDeviceData));
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(),
params,
null, null, null);
params);
// 2.7 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/property/pack/post_reply",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/event/property/pack/post",
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testPropertyPackPost][响应消息: {}]", response);
// 4. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 辅助方法 =====================
@@ -376,7 +348,7 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
* @param authInfo 认证信息
* @return MQTT 客户端
*/
private MqttClient connect(IotDeviceAuthReqDTO authInfo) {
private MqttClient createClient(IotDeviceAuthReqDTO authInfo) {
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
@@ -392,45 +364,24 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
* @return 已认证的 MQTT 客户端
*/
private MqttClient connectAndAuth() throws Exception {
// 1. 创建客户端并连接
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(
GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET);
MqttClient client = connect(authInfo);
// 2.1 连接
CompletableFuture<MqttClient> future = new CompletableFuture<>();
MqttClient client = createClient(authInfo);
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
future.complete(client);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2.2 等待连接结果
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return client;
}
/**
* 订阅响应主题
* 订阅主题
*
* @param client MQTT 客户端
* @param replyTopic 响应主题
* @param topic 主题
*/
private void subscribeReply(MqttClient client, String replyTopic) throws Exception {
// 1. 订阅响应主题
CompletableFuture<Void> future = new CompletableFuture<>();
client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic);
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待订阅结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
private void subscribe(MqttClient client, String topic) throws Exception {
client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[subscribe][订阅主题成功: {}]", topic);
}
/**
@@ -441,34 +392,28 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
* @param request 请求消息
* @return 响应消息
*/
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) {
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request)
throws Exception {
// 1. 设置消息处理器,接收响应
CompletableFuture<IotDeviceMessage> future = new CompletableFuture<>();
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
future.complete(response);
responseFuture.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[publishAndWaitReply][消息发布成功messageId={}]", ar.result());
} else {
log.error("[publishAndWaitReply][消息发布失败]", ar.cause());
future.completeExceptionally(ar.cause());
}
});
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[publishAndWaitReply][消息发布成功]");
// 3. 等待响应(超时返回 null
// 3. 等待响应
try {
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[publishAndWaitReply][等待响应超时或失败]");
return null;
@@ -481,19 +426,9 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest {
* @param client MQTT 客户端
*/
private void disconnect(MqttClient client) throws Exception {
// 1. 断开连接
CompletableFuture<Void> future = new CompletableFuture<>();
client.disconnect()
.onComplete(ar -> {
if (ar.succeeded()) {
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[disconnect][断开连接成功]");
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待断开结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -22,7 +21,6 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@@ -90,39 +88,19 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
*/
@Test
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 构建认证信息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
log.info("[testAuth][认证信息: clientId={}, username={}, password={}]",
authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
// 2. 创建客户端并连接
MqttClient client = connect(authInfo);
MqttClient client = createClient(authInfo);
try {
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId());
// 断开连接
client.disconnect()
.onComplete(disconnectAr -> {
if (disconnectAr.succeeded()) {
log.info("[testAuth][断开连接成功]");
} else {
log.error("[testAuth][断开连接失败]", disconnectAr.cause());
}
latch.countDown();
});
} else {
log.error("[testAuth][连接失败]", ar.cause());
latch.countDown();
}
});
// 3. 等待测试完成
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testAuth][测试超时]");
} finally {
disconnect(client);
}
}
@@ -138,29 +116,28 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
log.info("[testPropertyPost][连接认证成功]");
log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]");
// 2. 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribeReply(client, replyTopic);
// 3. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
try {
// 2.1 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("power", 100)
.put("status", "online")
.put("temperature", 36.5)
.build()),
null, null, null);
.build()));
// 4. 发布消息并等待响应
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testPropertyPost][响应消息: {}]", response);
// 5. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 子设备事件上报测试 =====================
@@ -174,13 +151,9 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
log.info("[testEventPost][连接认证成功]");
log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]");
// 2. 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribeReply(client, replyTopic);
// 3. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
try {
// 2.1 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"alarm",
@@ -190,17 +163,20 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
.put("threshold", 40)
.put("current", 42)
.build(),
System.currentTimeMillis()),
null, null, null);
System.currentTimeMillis()));
// 4. 发布消息并等待响应
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribe(client, replyTopic);
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testEventPost][响应消息: {}]", response);
// 5. 断开连接
} finally {
disconnect(client);
}
}
// ===================== 辅助方法 =====================
@@ -210,7 +186,7 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
* @param authInfo 认证信息
* @return MQTT 客户端
*/
private MqttClient connect(IotDeviceAuthReqDTO authInfo) {
private MqttClient createClient(IotDeviceAuthReqDTO authInfo) {
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
@@ -226,44 +202,23 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
* @return 已认证的 MQTT 客户端
*/
private MqttClient connectAndAuth() throws Exception {
// 1. 创建客户端并连接
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
MqttClient client = connect(authInfo);
// 2.1 连接
CompletableFuture<MqttClient> future = new CompletableFuture<>();
MqttClient client = createClient(authInfo);
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
future.complete(client);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2.2 等待连接结果
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return client;
}
/**
* 订阅响应主题
* 订阅主题
*
* @param client MQTT 客户端
* @param replyTopic 响应主题
* @param topic 主题
*/
private void subscribeReply(MqttClient client, String replyTopic) throws Exception {
// 1. 订阅响应主题
CompletableFuture<Void> future = new CompletableFuture<>();
client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic);
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待订阅结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
private void subscribe(MqttClient client, String topic) throws Exception {
client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[subscribe][订阅主题成功: {}]", topic);
}
/**
@@ -274,34 +229,28 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
* @param request 请求消息
* @return 响应消息
*/
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) {
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request)
throws Exception {
// 1. 设置消息处理器,接收响应
CompletableFuture<IotDeviceMessage> future = new CompletableFuture<>();
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
future.complete(response);
responseFuture.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[publishAndWaitReply][消息发布成功messageId={}]", ar.result());
} else {
log.error("[publishAndWaitReply][消息发布失败]", ar.cause());
future.completeExceptionally(ar.cause());
}
});
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[publishAndWaitReply][消息发布成功]");
// 3. 等待响应(超时返回 null
// 3. 等待响应
try {
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[publishAndWaitReply][等待响应超时或失败]");
return null;
@@ -314,19 +263,9 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest {
* @param client MQTT 客户端
*/
private void disconnect(MqttClient client) throws Exception {
// 1. 断开连接
CompletableFuture<Void> future = new CompletableFuture<>();
client.disconnect()
.onComplete(ar -> {
if (ar.succeeded()) {
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[disconnect][断开连接成功]");
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待断开结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}