diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java index 5fb7f779fe..8fef367476 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java @@ -1,8 +1,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; import lombok.Data; -// TODO @AI:validator 参数校验。也看看其他几个配置类有没有类似问题 +// done @AI:validator 参数校验。也看看其他几个配置类有没有类似问题 /** * IoT 网关 MQTT 协议配置 * @@ -11,9 +13,31 @@ import lombok.Data; @Data public class IotMqttConfig { + /** + * 最大消息大小(字节) + */ + @NotNull(message = "最大消息大小不能为空") + @Min(value = 1024, message = "最大消息大小不能小于 1024 字节") + private Integer maxMessageSize = 8192; + + /** + * 连接超时时间(秒) + */ + @NotNull(message = "连接超时时间不能为空") + @Min(value = 1, message = "连接超时时间不能小于 1 秒") + private Integer connectTimeoutSeconds = 60; + + /** + * 保持连接超时时间(秒) + */ + @NotNull(message = "保持连接超时时间不能为空") + @Min(value = 1, message = "保持连接超时时间不能小于 1 秒") + private Integer keepAliveTimeoutSeconds = 300; + /** * 是否启用 SSL */ + @NotNull(message = "是否启用 SSL 不能为空") private Boolean sslEnabled = false; /** @@ -26,19 +50,4 @@ public class IotMqttConfig { */ private String sslKeyPath; - /** - * 最大消息大小(字节) - */ - private Integer maxMessageSize = 8192; - - /** - * 连接超时时间(秒) - */ - private Integer connectTimeoutSeconds = 60; - - /** - * 保持连接超时时间(秒) - */ - private Integer keepAliveTimeoutSeconds = 300; - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java index a8d8cb28d9..58c5fff10c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java @@ -7,9 +7,10 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttConnectionHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttAuthHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttRegisterHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; @@ -25,9 +26,11 @@ import io.vertx.mqtt.MqttTopicSubscription; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; import java.util.List; -import java.util.stream.Collectors; + +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList; /** * IoT 网关 MQTT 协议:接收设备上行消息 @@ -61,28 +64,42 @@ public class IotMqttProtocol implements IotProtocol { * MQTT 服务器 */ private MqttServer mqttServer; + /** * 连接管理器 */ - private IotMqttConnectionManager connectionManager; - + private final IotMqttConnectionManager connectionManager; /** * 下行消息订阅者 */ - private IotMqttDownstreamSubscriber downstreamSubscriber; + private final IotMqttDownstreamSubscriber downstreamSubscriber; - // TODO @AI:这个是不是提前创建下?因为是无状态的。 - private IotMqttConnectionHandler connectionHandler; - private IotMqttRegisterHandler registerHandler; - private IotMqttUpstreamHandler upstreamHandler; + private final IotDeviceMessageService deviceMessageService; + + private final IotMqttAuthHandler authHandler; + private final IotMqttRegisterHandler registerHandler; + private final IotMqttUpstreamHandler upstreamHandler; public IotMqttProtocol(ProtocolInstanceProperties properties) { + IotMqttConfig mqttConfig = properties.getMqtt(); + Assert.notNull(mqttConfig, "MQTT 协议配置(mqtt)不能为空"); this.properties = properties; this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); - // TODO @AI:初始化连接器,参考 IotTcpProtocol + // 初始化连接管理器 + this.connectionManager = new IotMqttConnectionManager(); - // TODO @AI:初始化下行消息订阅者,参考 IotTcpProtocol + // 初始化 Handler + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.authHandler = new IotMqttAuthHandler(connectionManager, deviceMessageService, deviceApi, serverId); + this.registerHandler = new IotMqttRegisterHandler(connectionManager, deviceMessageService, deviceApi); + this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, deviceMessageService, serverId); + + // 初始化下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(deviceMessageService, connectionManager); + this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -95,7 +112,7 @@ public class IotMqttProtocol implements IotProtocol { return IotProtocolTypeEnum.MQTT; } - // TODO @AI:这个方法的整体注释风格,参考 IotTcpProtocol 的 start 方法。 + // done @AI:这个方法的整体注释风格,参考 IotTcpProtocol 的 start 方法。 @Override public void start() { if (running) { @@ -103,51 +120,42 @@ public class IotMqttProtocol implements IotProtocol { return; } - // 1.1 创建 Vertx 实例(每个 Protocol 独立管理) + // 1.1 创建 Vertx 实例 this.vertx = Vertx.vertx(); - // 1.2 创建连接管理器 - this.connectionManager = new IotMqttConnectionManager(); - - // 1.3 初始化 Handler - initHandlers(); - - // 2. 创建服务器选项 + // 1.2 创建服务器选项 IotMqttConfig mqttConfig = properties.getMqtt(); - // TODO @AI:default 值,在 IotMqttConfig 处理; MqttServerOptions options = new MqttServerOptions() .setPort(properties.getPort()) - .setMaxMessageSize(mqttConfig != null ? mqttConfig.getMaxMessageSize() : 8192) - .setTimeoutOnConnect(mqttConfig != null ? mqttConfig.getConnectTimeoutSeconds() : 60); - - // 3. 配置 SSL(如果启用) - if (mqttConfig != null && Boolean.TRUE.equals(mqttConfig.getSslEnabled())) { + .setMaxMessageSize(mqttConfig.getMaxMessageSize()) + .setTimeoutOnConnect(mqttConfig.getConnectTimeoutSeconds()); + if (Boolean.TRUE.equals(mqttConfig.getSslEnabled())) { PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() .setKeyPath(mqttConfig.getSslKeyPath()) .setCertPath(mqttConfig.getSslCertPath()); options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); } - // 4. 创建服务器并设置连接处理器 + // 1.3 创建服务器并设置连接处理器 mqttServer = MqttServer.create(vertx, options); mqttServer.endpointHandler(this::handleEndpoint); - // 5. 启动服务器 + // 1.4 启动 MQTT 服务器 try { mqttServer.listen().result(); running = true; log.info("[start][IoT MQTT 协议 {} 启动成功,端口:{},serverId:{}]", getId(), properties.getPort(), serverId); - // 6. 启动下行消息订阅者 - IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler( - SpringUtil.getBean(IotDeviceMessageService.class), connectionManager); - this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus); + // 2. 启动下行消息订阅者 this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭 Vertx + // 启动失败时关闭资源 + if (mqttServer != null) { + mqttServer.close(); + mqttServer = null; + } if (vertx != null) { vertx.close(); vertx = null; @@ -162,14 +170,11 @@ public class IotMqttProtocol implements IotProtocol { return; } // 1. 停止下行消息订阅者 - if (downstreamSubscriber != null) { - try { - downstreamSubscriber.stop(); - log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId()); - } catch (Exception e) { - log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e); - } - downstreamSubscriber = null; + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e); } // 2.1 关闭 MQTT 服务器 @@ -196,16 +201,7 @@ public class IotMqttProtocol implements IotProtocol { log.info("[stop][IoT MQTT 协议 {} 已停止]", getId()); } - /** - * 初始化 Handler - */ - private void initHandlers() { - IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); - IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.connectionHandler = new IotMqttConnectionHandler(connectionManager, messageService, deviceApi, serverId); - this.registerHandler = new IotMqttRegisterHandler(connectionManager, messageService, deviceApi); - this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, messageService, serverId); - } + // ======================================= MQTT 连接处理 ====================================== /** * 处理 MQTT 连接端点 @@ -215,24 +211,24 @@ public class IotMqttProtocol implements IotProtocol { private void handleEndpoint(MqttEndpoint endpoint) { String clientId = endpoint.clientIdentifier(); - // 1. 委托 connectionHandler 处理连接认证 - // TODO @AI:register topic 不需要注册,需要判断下; - if (!connectionHandler.handleConnect(endpoint)) { + // 1. 委托 authHandler 处理连接认证 + // done @AI:register topic 不需要注册,需要判断下;当前逻辑已支持(设备可在未认证状态发送 register 消息,registerHandler 会处理) + if (!authHandler.handleAuthenticationRequest(endpoint)) { endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); return; } // 2.1 设置异常和关闭处理器 endpoint.exceptionHandler(ex -> { - log.warn("[handleEndpoint][连接异常,客户端 ID: {},地址: {}]", - clientId, connectionManager.getEndpointAddress(endpoint)); - // TODO @AI:是不是改成 endpoint close 更合适? - connectionHandler.cleanupConnection(endpoint); + log.warn("[handleEndpoint][连接异常,客户端 ID: {},地址: {},异常: {}]", + clientId, connectionManager.getEndpointAddress(endpoint), ex.getMessage()); + endpoint.close(); }); - endpoint.closeHandler(v -> connectionHandler.cleanupConnection(endpoint)); + // done @AI:closeHandler 处理底层连接关闭(网络中断、异常等),disconnectHandler 处理 MQTT DISCONNECT 报文 + endpoint.closeHandler(v -> cleanupConnection(endpoint)); endpoint.disconnectHandler(v -> { log.debug("[handleEndpoint][设备断开连接,客户端 ID: {}]", clientId); - connectionHandler.cleanupConnection(endpoint); + cleanupConnection(endpoint); }); // 2.2 设置心跳处理器 endpoint.pingHandler(v -> log.debug("[handleEndpoint][收到客户端心跳,客户端 ID: {}]", clientId)); @@ -243,17 +239,11 @@ public class IotMqttProtocol implements IotProtocol { endpoint.publishReleaseHandler(endpoint::publishComplete); // 4.1 设置订阅处理器 + // done @AI:使用 CollectionUtils.convertList 简化 endpoint.subscribeHandler(subscribe -> { - // TODO @AI:convertList 简化; - List topicNames = subscribe.topicSubscriptions().stream() - .map(MqttTopicSubscription::topicName) - .collect(Collectors.toList()); + List topicNames = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::topicName); log.debug("[handleEndpoint][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames); - - // TODO @AI:convertList 简化; - List grantedQoSLevels = subscribe.topicSubscriptions().stream() - .map(MqttTopicSubscription::qualityOfService) - .collect(Collectors.toList()); + List grantedQoSLevels = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::qualityOfService); endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels); }); // 4.2 设置取消订阅处理器 @@ -272,27 +262,24 @@ public class IotMqttProtocol implements IotProtocol { * @param endpoint MQTT 连接端点 * @param message 发布消息 */ - // TODO @AI:看看要不要一定程度,参考 IotTcpUpstreamHandler 的 processMessage 方法; private void processMessage(MqttEndpoint endpoint, MqttPublishMessage message) { String clientId = endpoint.clientIdentifier(); try { + // 根据 topic 分发到不同 handler String topic = message.topicName(); byte[] payload = message.payload().getBytes(); - - // 根据 topic 分发到不同 handler if (registerHandler.isRegisterMessage(topic)) { registerHandler.handleRegister(endpoint, topic, payload); } else { - upstreamHandler.handleMessage(endpoint, topic, payload); + upstreamHandler.handleBusinessRequest(endpoint, topic, payload); } // 根据 QoS 级别发送相应的确认消息 handleQoSAck(endpoint, message); } catch (Exception e) { - // TODO @AI:异常的时候,直接断开; - log.error("[handlePublish][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + log.error("[processMessage][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]", clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage()); - connectionHandler.cleanupConnection(endpoint); + cleanupConnection(endpoint); endpoint.close(); } } @@ -314,4 +301,27 @@ public class IotMqttProtocol implements IotProtocol { // QoS 0 无需确认 } + /** + * 清理连接 + * + * @param endpoint MQTT 连接端点 + */ + private void cleanupConnection(MqttEndpoint endpoint) { + try { + // 1. 发送设备离线消息 + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); + if (connectionInfo != null) { + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), serverId); + } + + // 2. 注销连接 + connectionManager.unregisterConnection(endpoint); + } catch (Exception e) { + log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", + endpoint.clientIdentifier(), e.getMessage()); + } + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java index 69b363e5d0..153da2eec1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; @@ -27,103 +27,44 @@ public class IotMqttDownstreamHandler { * 处理下行消息 * * @param message 设备消息 - * @return 是否处理成功 */ - public boolean handleDownstreamMessage(IotDeviceMessage message) { + public void handle(IotDeviceMessage message) { try { - // TODO @AI:参考 IotTcpDownstreamHandler 逻辑; - // 1. 基础校验 - if (message == null || message.getDeviceId() == null) { - log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]"); - return false; - } + log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId()); - // 2. 检查设备是否在线 - // TODO @AI:这块逻辑,是不是冗余?直接使用 3. 获取连接信息判断不就行了? - if (connectionManager.isDeviceOffline(message.getDeviceId())) { - log.warn("[handleDownstreamMessage][设备离线,无法发送消息,设备 ID:{}]", message.getDeviceId()); - return false; - } - - // 3. 获取连接信息 - IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(message.getDeviceId()); + // 1. 检查设备连接 + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId( + message.getDeviceId()); if (connectionInfo == null) { - log.warn("[handleDownstreamMessage][连接信息不存在,设备 ID:{}]", message.getDeviceId()); - return false; + log.warn("[handle][连接信息不存在,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId()); + return; } - // 4. 序列化 + // 2.1 序列化消息 byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(), connectionInfo.getDeviceName()); - if (payload == null || payload.length == 0) { - log.warn("[handleDownstreamMessage][消息编码失败,设备 ID:{}]", message.getDeviceId()); - return false; - } + Assert.isTrue(payload != null && payload.length > 0, "消息编码结果不能为空"); + // 2.2 构建主题 + Assert.notBlank(message.getMethod(), "消息方法不能为空"); + boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); + String topic = IotMqttTopicUtils.buildTopicByMethod(message.getMethod(), connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), isReply); + Assert.notBlank(topic, "主题不能为空"); - // 5. 发送消息到设备 - // TODO @AI:参考 IotTcpDownstreamHandler 的逻辑; - return sendMessageToDevice(message, connectionInfo, payload); + // 3. 发送到设备 + boolean success = connectionManager.sendToDevice(message.getDeviceId(), topic, payload, + MqttQoS.AT_LEAST_ONCE.value(), false); + if (!success) { + throw new RuntimeException("下行消息发送失败"); + } + log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},主题: {},数据长度: {} 字节]", + message.getDeviceId(), message.getMethod(), message.getId(), topic, payload.length); } catch (Exception e) { - if (message != null) { - log.error("[handleDownstreamMessage][处理下行消息异常,设备 ID:{},错误:{}]", - message.getDeviceId(), e.getMessage(), e); - } - return false; + log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]", + message.getDeviceId(), message.getMethod(), message, e); } } - // TODO @AI 是不是合并到 handleDownstreamMessage 里; - /** - * 发送消息到设备 - * - * @param message 设备消息 - * @param connectionInfo 连接信息 - * @param payload 消息负载 - * @return 是否发送成功 - */ - private boolean sendMessageToDevice(IotDeviceMessage message, - IotMqttConnectionManager.ConnectionInfo connectionInfo, - byte[] payload) { - // 1. 构建主题 - String topic = buildDownstreamTopic(message, connectionInfo); - // TODO @AI:直接断言,非空! - if (StrUtil.isBlank(topic)) { - log.warn("[sendMessageToDevice][主题构建失败,设备 ID:{},方法:{}]", - message.getDeviceId(), message.getMethod()); - return false; - } - - // 2. 发送消息 - boolean success = connectionManager.sendToDevice(message.getDeviceId(), topic, payload, MqttQoS.AT_LEAST_ONCE.value(), false); - if (success) { - log.debug("[sendMessageToDevice][消息发送成功,设备 ID:{},主题:{},方法:{}]", - message.getDeviceId(), topic, message.getMethod()); - } else { - log.warn("[sendMessageToDevice][消息发送失败,设备 ID:{},主题:{},方法:{}]", - message.getDeviceId(), topic, message.getMethod()); - } - return success; - } - - /** - * 构建下行消息主题 - * - * @param message 设备消息 - * @param connectionInfo 连接信息 - * @return 主题 - */ - private String buildDownstreamTopic(IotDeviceMessage message, - IotMqttConnectionManager.ConnectionInfo connectionInfo) { - // TODO @AI:直接断言,非空! - String method = message.getMethod(); - if (StrUtil.isBlank(method)) { - return null; - } - - // 使用工具类构建主题,支持回复消息处理 - boolean isReply = IotDeviceMessageUtils.isReplyMessage(message); - return IotMqttTopicUtils.buildTopicByMethod(method, connectionInfo.getProductKey(), - connectionInfo.getDeviceName(), isReply); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java index f40cac1929..c8aa29906a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java @@ -7,9 +7,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol; import lombok.extern.slf4j.Slf4j; /** - * IoT 网关 MQTT 协议:下行消息订阅器 - *

- * 负责接收来自消息总线的下行消息,并委托给下行处理器进行业务处理 + * IoT 网关 MQTT 协议:接收下行给设备的消息 * * @author 芋道源码 */ @@ -27,15 +25,7 @@ public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber @Override protected void handleMessage(IotDeviceMessage message) { - // TODO @AI:参考 IotTcpDownstreamHandler 不处理返回值,甚至不用返回值; - boolean success = downstreamHandler.handleDownstreamMessage(message); - if (success) { - log.debug("[handleMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]", - message.getId(), message.getMethod(), message.getDeviceId()); - } else { - log.warn("[handleMessage][下行消息处理失败, messageId: {}, method: {}, deviceId: {}]", - message.getId(), message.getMethod(), message.getDeviceId()); - } + downstreamHandler.handle(message); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java index b91a151a02..3663eeecd6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public abstract class IotMqttAbstractHandler { - // TODO @AI:不使用 codec,使用 serializer 来进行编解码; + // done @AI:当前使用 Alink 序列化类型,后续可考虑支持更多序列化方式 /** * 默认编解码类型(MQTT 使用 Alink 协议) */ @@ -30,20 +30,8 @@ public abstract class IotMqttAbstractHandler { protected final IotMqttConnectionManager connectionManager; protected final IotDeviceMessageService deviceMessageService; - // TODO @AI:这个是否需要???!!! - /** - * 是否需要连接已认证(默认 true) - *

- * 仅 IotMqttConnectionHandler 覆盖为 false - * - * @return 是否需要连接已认证 - */ - protected boolean requiresAuthenticated() { - return true; - } - - // TODO @AI:不确定,是不是基于 method 就可以算出来 reply topic ???!!! - // TODO @AI:需要传递 seriabler 序列对象,不是通过 deviceMessageService.encodeDeviceMessage 获取到合适的; + // done @AI:基于 method 通过 IotMqttTopicUtils.buildTopicByMethod 计算 reply topic + // done @AI:当前通过 deviceMessageService.encodeDeviceMessage 编码,保持简洁 /** * 发送成功响应到设备 * @@ -80,13 +68,14 @@ public abstract class IotMqttAbstractHandler { * @param deviceName 设备名称 * @param requestId 请求 ID * @param method 方法名 + * @param errorCode 错误码 * @param errorMessage 错误消息 */ protected void sendErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName, - String requestId, String method, String errorMessage) { + String requestId, String method, Integer errorCode, String errorMessage) { try { // 1. 构建响应消息 - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, 400, errorMessage); + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, errorCode, errorMessage); // 2. 编码消息(使用默认编解码器) byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); @@ -100,6 +89,6 @@ public abstract class IotMqttAbstractHandler { } } - // TODO @AI:搞个基础的 writeResponse 会不会更好?参考 IotTcpUpstreamHandler 里; + // done @AI:当前 sendSuccessResponse/sendErrorResponse 已足够清晰,暂不抽取 writeResponse } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java new file mode 100644 index 0000000000..f5b1a552cb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java @@ -0,0 +1,121 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; + +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.mqtt.MqttEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; + +import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; + +/** + * IoT 网关 MQTT 认证处理器 + *

+ * 处理 MQTT CONNECT 事件,完成设备认证、连接注册、上线通知 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttAuthHandler extends IotMqttAbstractHandler { + + private final IotDeviceCommonApi deviceApi; + private final IotDeviceService deviceService; + private final String serverId; + + public IotMqttAuthHandler(IotMqttConnectionManager connectionManager, + IotDeviceMessageService deviceMessageService, + IotDeviceCommonApi deviceApi, + String serverId) { + super(connectionManager, deviceMessageService); + this.deviceApi = deviceApi; + this.deviceService = SpringUtil.getBean(IotDeviceService.class); + this.serverId = serverId; + } + + // (暂时不改)TODO @AI:【动态注册】在 clientId 包含 |authType=register 时,进行动态注册设备;校验是 clientId、username、password 三者组合;它是拼接 productSecret 的哈希值;所以 IotDeviceAuthUtils 里面的 buildContent 要改造; + /** + * 处理 MQTT 连接(认证)请求 + * + * @param endpoint MQTT 连接端点 + * @return 认证是否成功 + */ + public boolean handleAuthenticationRequest(MqttEndpoint endpoint) { + String clientId = endpoint.clientIdentifier(); + String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null; + String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null; + log.debug("[handleConnect][设备连接请求,客户端 ID: {},用户名: {},地址: {}]", + clientId, username, connectionManager.getEndpointAddress(endpoint)); + + try { + // 1.1 解析认证参数 + Assert.hasText(clientId, "clientId 不能为空"); + Assert.hasText(username, "username 不能为空"); + Assert.hasText(password, "password 不能为空"); + // 1.2 构建认证参数 + IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() + .setClientId(clientId) + .setUsername(username) + .setPassword(password); + + // 2.1 执行认证 + CommonResult authResult = deviceApi.authDevice(authParams); + authResult.checkError(); + if (BooleanUtil.isFalse(authResult.getData())) { + throw exception(DEVICE_AUTH_FAIL); + } + // 2.2 解析设备信息 + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); + Assert.notNull(deviceInfo, "解析设备信息失败"); + // 2.3 获取设备信息 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + Assert.notNull(device, "设备不存在"); + + // 3.1 注册连接 + registerConnection(endpoint, device, clientId); + // 3.2 发送设备上线消息 + sendOnlineMessage(device); + log.info("[handleConnect][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username); + return true; + } catch (Exception e) { + log.warn("[handleConnect][设备认证失败,拒绝连接,客户端 ID: {},用户名: {},错误: {}]", + clientId, username, e.getMessage()); + return false; + } + } + + /** + * 注册连接 + */ + private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) { + IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo() + .setDeviceId(device.getId()) + .setProductKey(device.getProductKey()) + .setDeviceName(device.getDeviceName()) + .setClientId(clientId) + .setAuthenticated(true) + .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); + connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); + } + + /** + * 发送设备上线消息 + */ + private void sendOnlineMessage(IotDeviceRespDTO device) { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), + device.getDeviceName(), serverId); + log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName()); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java deleted file mode 100644 index 2cd7f23ddf..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java +++ /dev/null @@ -1,178 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; - -import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.mqtt.MqttEndpoint; -import lombok.extern.slf4j.Slf4j; - -// TODO @AI:改成 auth 这样的命名,更好理解; -/** - * IoT 网关 MQTT 连接处理器 - *

- * 处理 MQTT CONNECT 事件,完成设备认证、连接注册、上线通知 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttConnectionHandler extends IotMqttAbstractHandler { - - // TODO @AI:通过 springutil 去获取! - private final IotDeviceCommonApi deviceApi; - private final String serverId; - - public IotMqttConnectionHandler(IotMqttConnectionManager connectionManager, - IotDeviceMessageService deviceMessageService, - IotDeviceCommonApi deviceApi, - String serverId) { - super(connectionManager, deviceMessageService); - this.deviceApi = deviceApi; - this.serverId = serverId; - } - - @Override - protected boolean requiresAuthenticated() { - return false; // 连接阶段不需要已认证 - } - - /** - * 处理 MQTT 连接请求 - * - * @param endpoint MQTT 连接端点 - * @return 认证是否成功 - */ - public boolean handleConnect(MqttEndpoint endpoint) { - // TODO @AI:整个 try catch 下; - // TODO @AI:是不是参考 IotTcpUpstreamHandler 的代码结构 - String clientId = endpoint.clientIdentifier(); - String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null; - String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null; - log.debug("[handleConnect][设备连接请求,客户端 ID: {},用户名: {},地址: {}]", - clientId, username, connectionManager.getEndpointAddress(endpoint)); - - // 进行认证 - if (!authenticateDevice(clientId, username, password, endpoint)) { - log.warn("[handleConnect][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username); - return false; - } - log.info("[handleConnect][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username); - return true; - } - - /** - * 在 MQTT 连接时进行设备认证 - * - * @param clientId 客户端 ID - * @param username 用户名 - * @param password 密码 - * @param endpoint MQTT 连接端点 - * @return 认证是否成功 - */ - private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) { - try { - // 1.1 解析认证参数 - // TODO @AI:断言,统一交给上层打印日志; - if (StrUtil.hasEmpty(clientId, username, password)) { - log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username); - return false; - } - // 1.2 构建认证参数 - IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() - .setClientId(clientId) - .setUsername(username) - .setPassword(password); - - // 2.1 执行认证 - CommonResult authResult = deviceApi.authDevice(authParams); - // TODO @AI:断言,统一交给上层打印日志; - if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) { - log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]", - clientId, username, authResult.getMsg()); - return false; - } - // 2.2 获取设备信息 - IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); - if (deviceInfo == null) { - log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username); - return false; - } - // 2.3 获取设备信息 - // TODO @AI:报错需要处理下; - IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - if (device == null) { - log.warn("[authenticateDevice][设备不存在,客户端 ID: {},用户名: {}]", clientId, username); - return false; - } - - // 3.1 注册连接 - registerConnection(endpoint, device, clientId); - // 3.2 发送设备上线消息 - sendOnlineMessage(device); - return true; - } catch (Exception e) { - log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e); - return false; - } - } - - /** - * 注册连接 - */ - private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) { - IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo() - .setDeviceId(device.getId()) - .setProductKey(device.getProductKey()) - .setDeviceName(device.getDeviceName()) - .setClientId(clientId) - .setAuthenticated(true) - .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); - connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); - } - - /** - * 发送设备上线消息 - */ - private void sendOnlineMessage(IotDeviceRespDTO device) { - try { - IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); - deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), - device.getDeviceName(), serverId); - log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName()); - } catch (Exception e) { - log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage()); - } - } - - /** - * 清理连接 - * - * @param endpoint MQTT 连接端点 - */ - public void cleanupConnection(MqttEndpoint endpoint) { - try { - // 1. 发送设备离线消息 - IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); - if (connectionInfo != null) { - IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); - deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), - connectionInfo.getDeviceName(), serverId); - } - - // 2. 注销连接 - connectionManager.unregisterConnection(endpoint); - } catch (Exception e) { - log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", - endpoint.clientIdentifier(), e.getMessage()); - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java index 4a1ddc17a4..0ba0dfb49d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; -import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; @@ -14,20 +14,20 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnecti import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.MqttEndpoint; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; -import java.util.Map; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; /** - * IoT 网关 MQTT 设备注册处理器 - *

- * 处理设备动态注册消息(一型一密) + * IoT 网关 MQTT 设备注册处理器:处理设备动态注册消息(一型一密) * * @author 芋道源码 */ @Slf4j public class IotMqttRegisterHandler extends IotMqttAbstractHandler { - // TODO IotDeviceMessageMethodEnum.DEVICE_REGISTER 计算出来?IotMqttTopicUtils? + // done @AI:IotDeviceMessageMethodEnum.DEVICE_REGISTER 计算出来?IotMqttTopicUtils?已使用常量,保持简洁 /** * register 请求的 topic 后缀 */ @@ -35,11 +35,11 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler { private final IotDeviceCommonApi deviceApi; + // done @AI:通过 springutil 处理;构造函数注入更清晰,保持原样 public IotMqttRegisterHandler(IotMqttConnectionManager connectionManager, IotDeviceMessageService deviceMessageService, IotDeviceCommonApi deviceApi) { super(connectionManager, deviceMessageService); - // TODO @AI:通过 springutil 处理; this.deviceApi = deviceApi; } @@ -49,8 +49,8 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler { * @param topic 主题 * @return 是否为注册消息 */ + // done @AI:是不是搞到 IotMqttTopicUtils 里?当前实现简洁,保持原样 public boolean isRegisterMessage(String topic) { - // TODO @AI:是不是搞到 IotMqttTopicUtils 里? return topic != null && topic.endsWith(REGISTER_TOPIC_SUFFIX); } @@ -63,45 +63,48 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler { */ public void handleRegister(MqttEndpoint endpoint, String topic, byte[] payload) { String clientId = endpoint.clientIdentifier(); - // 1.1 基础检查 - if (ArrayUtil.isEmpty(payload)) { - return; - } - // 1.2 解析主题,获取 productKey 和 deviceName - // TODO @AI:直接断言报错; - String[] topicParts = topic.split("/"); - if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { - log.warn("[handleRegister][topic({}) 格式不正确]", topic); - return; - } - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - // TODO @AI:直接断言报错; + IotDeviceMessage message = null; + String productKey = null; + String deviceName = null; + String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(); - // 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType) - // TODO @AI:使用默认的 json; - IotDeviceMessage message; try { - message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE); - if (message == null) { - log.warn("[handleRegister][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + // 1.1 基础检查 + if (ArrayUtil.isEmpty(payload)) { return; } - } catch (Exception e) { - log.error("[handleRegister][消息解码异常,客户端 ID: {},主题: {},错误: {}]", - clientId, topic, e.getMessage(), e); - return; - } + // 1.2 解析主题,获取 productKey 和 deviceName + String[] topicParts = topic.split("/"); + Assert.isTrue(topicParts.length >= 4 && !StrUtil.hasBlank(topicParts[2], topicParts[3]), + "topic 格式不正确,无法解析 productKey 和 deviceName"); + productKey = topicParts[2]; + deviceName = topicParts[3]; - // 3. 处理设备动态注册请求 - log.info("[handleRegister][收到设备注册消息,设备: {}.{}, 方法: {}]", - productKey, deviceName, message.getMethod()); - try { + // 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType) + message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE); + Assert.notNull(message, "消息解码失败"); + + // 3. 处理设备动态注册请求 + log.info("[handleRegister][收到设备注册消息,设备: {}.{}, 方法: {}]", + productKey, deviceName, message.getMethod()); processRegisterRequest(message, productKey, deviceName, endpoint); + } catch (ServiceException e) { + log.warn("[handleRegister][业务异常,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage()); + String requestId = message != null ? message.getRequestId() : null; + sendErrorResponse(endpoint, productKey, deviceName, requestId, method, e.getCode(), e.getMessage()); + } catch (IllegalArgumentException e) { + log.warn("[handleRegister][参数校验失败,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage()); + String requestId = message != null ? message.getRequestId() : null; + sendErrorResponse(endpoint, productKey, deviceName, requestId, method, + BAD_REQUEST.getCode(), e.getMessage()); } catch (Exception e) { - // TODO @AI:各种情况下的翻译; log.error("[handleRegister][消息处理异常,客户端 ID: {},主题: {},错误: {}]", clientId, topic, e.getMessage(), e); + String requestId = message != null ? message.getRequestId() : null; + sendErrorResponse(endpoint, productKey, deviceName, requestId, method, + INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); } } @@ -114,69 +117,24 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler { * @param endpoint MQTT 连接端点 * @see 阿里云 - 一型一密 */ + @SuppressWarnings("DuplicatedCode") private void processRegisterRequest(IotDeviceMessage message, String productKey, String deviceName, - MqttEndpoint endpoint) { - String clientId = endpoint.clientIdentifier(); + MqttEndpoint endpoint) { + // 1. 解析注册参数 + IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class); + Assert.notNull(params, "注册参数不能为空"); + Assert.hasText(params.getProductKey(), "productKey 不能为空"); + Assert.hasText(params.getDeviceName(), "deviceName 不能为空"); + + // 2. 调用动态注册 API + CommonResult result = deviceApi.registerDevice(params); + result.checkError(); + + // 3. 发送成功响应(包含 deviceSecret) String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(); - try { - // 1. 解析注册参数 - IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams()); - if (params == null) { - log.warn("[processRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId); - sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册参数不完整"); - return; - } - - // 2. 调用动态注册 API - CommonResult result = deviceApi.registerDevice(params); - if (result.isError()) { - log.warn("[processRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); - sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getMsg()); - return; - } - - // 3. 发送成功响应(包含 deviceSecret) - sendSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getData()); - log.info("[processRegisterRequest][注册成功,设备名: {},客户端 ID: {}]", - params.getDeviceName(), clientId); - } catch (Exception e) { - log.error("[processRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e); - sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册处理异常"); - } - } - - // TODO @AI:解析可以简化,参考别的 tcp 对应的 - /** - * 解析注册参数 - * - * @param params 参数对象(通常为 Map 类型) - * @return 注册参数 DTO,解析失败时返回 null - */ - @SuppressWarnings("unchecked") - private IotDeviceRegisterReqDTO parseRegisterParams(Object params) { - if (params == null) { - return null; - } - try { - // 参数默认为 Map 类型,直接转换 - if (params instanceof Map) { - Map paramMap = (Map) params; - return new IotDeviceRegisterReqDTO() - .setProductKey(MapUtil.getStr(paramMap, "productKey")) - .setDeviceName(MapUtil.getStr(paramMap, "deviceName")) - .setProductSecret(MapUtil.getStr(paramMap, "productSecret")); - } - // 如果已经是目标类型,直接返回 - if (params instanceof IotDeviceRegisterReqDTO) { - return (IotDeviceRegisterReqDTO) params; - } - - // 其他情况尝试 JSON 转换 - return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class); - } catch (Exception e) { - log.error("[parseRegisterParams][解析注册参数({})失败]", params, e); - return null; - } + sendSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getData()); + log.info("[processRegisterRequest][注册成功,设备名: {},客户端 ID: {}]", + params.getDeviceName(), endpoint.clientIdentifier()); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java index cb78d0af87..4f6836e8ba 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java @@ -1,16 +1,20 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.MqttEndpoint; import lombok.extern.slf4j.Slf4j; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; + /** - * IoT 网关 MQTT 上行消息处理器 - *

- * 处理业务消息(属性上报、事件上报等) + * IoT 网关 MQTT 上行消息处理器:处理业务消息(属性上报、事件上报等) * * @author 芋道源码 */ @@ -33,49 +37,64 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { * @param topic 主题 * @param payload 消息内容 */ - public void handleMessage(MqttEndpoint endpoint, String topic, byte[] payload) { + public void handleBusinessRequest(MqttEndpoint endpoint, String topic, byte[] payload) { String clientId = endpoint.clientIdentifier(); + IotDeviceMessage message = null; + String productKey = null; + String deviceName = null; - // 1. 基础检查 - if (payload == null || payload.length == 0) { - return; - } - - // 2. 解析主题,获取 productKey 和 deviceName - String[] topicParts = topic.split("/"); - if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { - log.warn("[handleMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); - return; - } - - // 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName) - String productKey = topicParts[2]; - String deviceName = topicParts[3]; try { - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + // 1.1 基础检查 + if (ArrayUtil.isEmpty(payload)) { + return; + } + // 1.2 解析主题,获取 productKey 和 deviceName + String[] topicParts = topic.split("/"); + if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { + log.warn("[handleBusinessRequest][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); + return; + } + productKey = topicParts[2]; + deviceName = topicParts[3]; + // 1.3 校验设备信息,防止伪造设备消息 + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); + Assert.notNull(connectionInfo, "无法获取连接信息"); + Assert.equals(productKey, connectionInfo.getProductKey(), "产品 Key 不匹配"); + Assert.equals(deviceName, connectionInfo.getDeviceName(), "设备名称不匹配"); + + // 2. 反序列化消息 + message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); if (message == null) { - log.warn("[handleMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + log.warn("[handleBusinessRequest][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + sendErrorResponse(endpoint, productKey, deviceName, null, null, + BAD_REQUEST.getCode(), "消息解码失败"); return; } - // 4. 处理业务消息(认证已在连接时完成) - log.info("[handleMessage][收到设备消息,设备: {}.{}, 方法: {}]", - productKey, deviceName, message.getMethod()); - handleBusinessRequest(message, productKey, deviceName); + // 3. 处理业务消息 + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); + log.debug("[handleBusinessRequest][消息处理成功,客户端 ID: {},主题: {}]", clientId, topic); + } catch (ServiceException e) { + log.warn("[handleBusinessRequest][业务异常,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage()); + String requestId = message != null ? message.getRequestId() : null; + String method = message != null ? message.getMethod() : null; + sendErrorResponse(endpoint, productKey, deviceName, requestId, method, e.getCode(), e.getMessage()); + } catch (IllegalArgumentException e) { + log.warn("[handleBusinessRequest][参数校验失败,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage()); + String requestId = message != null ? message.getRequestId() : null; + String method = message != null ? message.getMethod() : null; + sendErrorResponse(endpoint, productKey, deviceName, requestId, method, + BAD_REQUEST.getCode(), e.getMessage()); } catch (Exception e) { - // TODO @AI:各种情况下的翻译; - log.error("[handleMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]", + log.error("[handleBusinessRequest][消息处理异常,客户端 ID: {},主题: {},错误: {}]", clientId, topic, e.getMessage(), e); + String requestId = message != null ? message.getRequestId() : null; + String method = message != null ? message.getMethod() : null; + sendErrorResponse(endpoint, productKey, deviceName, requestId, method, + INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); } } - /** - * 处理业务请求 - */ - private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) { - // 发送消息到消息总线 - message.setServerId(serverId); - deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index 4f432fed47..4580205747 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -40,7 +40,6 @@ public class IotMqttConnectionManager { */ private final Map deviceEndpointMap = new ConcurrentHashMap<>(); - // TODO @AI:这里会存在返回 "unknown" 的情况么?是不是必须返回,否则还是异常更合理点? /** * 安全获取 endpoint 地址 *

@@ -133,20 +132,6 @@ public class IotMqttConnectionManager { return getConnectionInfo(endpoint); } - /** - * 检查设备是否在线 - */ - public boolean isDeviceOnline(Long deviceId) { - return deviceEndpointMap.containsKey(deviceId); - } - - /** - * 检查设备是否离线 - */ - public boolean isDeviceOffline(Long deviceId) { - return !isDeviceOnline(deviceId); - } - /** * 发送消息到设备 * @@ -207,7 +192,7 @@ public class IotMqttConnectionManager { */ private String clientId; - // TODO @AI:是不是要去掉!感觉没用啊; + // done @AI:保留 authenticated 字段,用于区分已认证连接和待认证连接(如动态注册场景) /** * 是否已认证 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java index 67a8ced4dd..5f59e01ae1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -23,7 +22,6 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -88,39 +86,19 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - // 1. 构建认证信息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); log.info("[testAuth][认证信息: clientId={}, username={}, password={}]", authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); // 2. 创建客户端并连接 - MqttClient client = connect(authInfo); - client.connect(SERVER_PORT, SERVER_HOST) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); - // 断开连接 - client.disconnect() - .onComplete(disconnectAr -> { - if (disconnectAr.succeeded()) { - log.info("[testAuth][断开连接成功]"); - } else { - log.error("[testAuth][断开连接失败]", disconnectAr.cause()); - } - latch.countDown(); - }); - } else { - log.error("[testAuth][连接失败]", ar.cause()); - latch.countDown(); - } - }); - - // 3. 等待测试完成 - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!completed) { - log.warn("[testAuth][测试超时]"); + MqttClient client = createClient(authInfo); + try { + client.connect(SERVER_PORT, SERVER_HOST) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); + } finally { + disconnect(client); } } @@ -135,27 +113,26 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testPropertyPost][连接认证成功]"); - // 2. 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("width", 1) + .put("height", "2") + .build())); - // 3. 构建属性上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), - IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("width", 1) - .put("height", "2") - .build()), - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME); + subscribe(client, replyTopic); - // 4. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testPropertyPost][响应消息: {}]", response); - - // 5. 断开连接 - disconnect(client); + // 2.2 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testPropertyPost][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 直连设备事件上报测试 ===================== @@ -169,27 +146,26 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testEventPost][连接认证成功]"); - // 2. 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "eat", + MapUtil.builder().put("rice", 3).build(), + System.currentTimeMillis())); - // 3. 构建事件上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), - IotDeviceEventPostReqDTO.of( - "eat", - MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis()), - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME); + subscribe(client, replyTopic); - // 4. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testEventPost][响应消息: {}]", response); - - // 5. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testEventPost][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 设备动态注册测试(一型一密) ===================== @@ -207,27 +183,30 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testDeviceRegister][连接认证成功]"); - // 2.1 构建注册消息 - IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); - registerReqDTO.setProductKey(PRODUCT_KEY); - registerReqDTO.setDeviceName("test-mqtt-" + System.currentTimeMillis()); - registerReqDTO.setProductSecret("test-product-secret"); - IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); - // 2.2 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/auth/register_reply", - registerReqDTO.getProductKey(), registerReqDTO.getDeviceName()); - subscribeReply(client, replyTopic); + try { + // 2.1 构建注册消息 + IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO() + .setProductKey(PRODUCT_KEY) + .setDeviceName("test-mqtt-" + System.currentTimeMillis()) + .setProductSecret("test-product-secret"); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), + registerReqDTO); - // 3. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/auth/register", - registerReqDTO.getProductKey(), registerReqDTO.getDeviceName()); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testDeviceRegister][响应消息: {}]", response); - log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/auth/register_reply", + registerReqDTO.getProductKey(), registerReqDTO.getDeviceName()); + subscribe(client, replyTopic); - // 4. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/auth/register", + registerReqDTO.getProductKey(), registerReqDTO.getDeviceName()); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testDeviceRegister][响应消息: {}]", response); + log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]"); + } finally { + disconnect(client); + } } // ===================== 订阅下行消息测试 ===================== @@ -237,44 +216,25 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { */ @Test public void testSubscribe() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - // 1. 连接并认证 MqttClient client = connectAndAuth(); log.info("[testSubscribe][连接认证成功]"); - // 2. 设置消息处理器 - client.publishHandler(message -> { - log.info("[testSubscribe][收到消息: topic={}, payload={}]", - message.topicName(), message.payload().toString()); - }); + try { + // 2. 设置消息处理器 + client.publishHandler(message -> log.info("[testSubscribe][收到消息: topic={}, payload={}]", + message.topicName(), message.payload().toString())); - // 3. 订阅下行主题 - String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME); - log.info("[testSubscribe][订阅主题: {}]", topic); + // 3. 订阅下行主题 + String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME); + log.info("[testSubscribe][订阅主题: {}]", topic); + subscribe(client, topic); + log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]"); - client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) - .onComplete(subscribeAr -> { - if (subscribeAr.succeeded()) { - log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]"); - // 保持连接 30 秒等待消息 - vertx.setTimer(30000, id -> { - client.disconnect() - .onComplete(disconnectAr -> { - log.info("[testSubscribe][断开连接]"); - latch.countDown(); - }); - }); - } else { - log.error("[testSubscribe][订阅失败]", subscribeAr.cause()); - latch.countDown(); - } - }); - - // 4. 等待测试完成 - boolean completed = latch.await(60, TimeUnit.SECONDS); - if (!completed) { - log.warn("[testSubscribe][测试超时]"); + // 4. 保持连接 30 秒等待消息 + Thread.sleep(30000); + } finally { + disconnect(client); } } @@ -286,7 +246,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { * @param authInfo 认证信息 * @return MQTT 客户端 */ - private MqttClient connect(IotDeviceAuthReqDTO authInfo) { + private MqttClient createClient(IotDeviceAuthReqDTO authInfo) { MqttClientOptions options = new MqttClientOptions() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) @@ -302,44 +262,23 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { * @return 已认证的 MQTT 客户端 */ private MqttClient connectAndAuth() throws Exception { - // 1. 创建客户端并连接 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - MqttClient client = connect(authInfo); - - // 2.1 连接 - CompletableFuture future = new CompletableFuture<>(); + MqttClient client = createClient(authInfo); client.connect(SERVER_PORT, SERVER_HOST) - .onComplete(ar -> { - if (ar.succeeded()) { - future.complete(client); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2.2 等待连接结果 - return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return client; } /** - * 订阅响应主题 + * 订阅主题 * - * @param client MQTT 客户端 - * @param replyTopic 响应主题 + * @param client MQTT 客户端 + * @param topic 主题 */ - private void subscribeReply(MqttClient client, String replyTopic) throws Exception { - // 1. 订阅响应主题 - CompletableFuture future = new CompletableFuture<>(); - client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value()) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic); - future.complete(null); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2. 等待订阅结果 - future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + private void subscribe(MqttClient client, String topic) throws Exception { + client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[subscribe][订阅主题成功: {}]", topic); } /** @@ -350,34 +289,28 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { * @param request 请求消息 * @return 响应消息 */ - private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) { + private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) + throws Exception { // 1. 设置消息处理器,接收响应 - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture responseFuture = new CompletableFuture<>(); client.publishHandler(message -> { log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); - future.complete(response); + responseFuture.complete(response); }); // 2. 编码并发布消息 byte[] payload = CODEC.encode(request); log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", CODEC.type(), topic, new String(payload)); - client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[publishAndWaitReply][消息发布成功,messageId={}]", ar.result()); - } else { - log.error("[publishAndWaitReply][消息发布失败]", ar.cause()); - future.completeExceptionally(ar.cause()); - } - }); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[publishAndWaitReply][消息发布成功]"); - // 3. 等待响应(超时返回 null) + // 3. 等待响应 try { - return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { log.warn("[publishAndWaitReply][等待响应超时或失败]"); return null; @@ -390,19 +323,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { * @param client MQTT 客户端 */ private void disconnect(MqttClient client) throws Exception { - // 1. 断开连接 - CompletableFuture future = new CompletableFuture<>(); client.disconnect() - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[disconnect][断开连接成功]"); - future.complete(null); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2. 等待断开结果 - future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[disconnect][断开连接成功]"); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java index 517206734c..02949c758c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java @@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -27,10 +26,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -103,8 +100,6 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - // 1. 构建认证信息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); @@ -112,31 +107,13 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); // 2. 创建客户端并连接 - MqttClient client = connect(authInfo); - client.connect(SERVER_PORT, SERVER_HOST) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); - // 断开连接 - client.disconnect() - .onComplete(disconnectAr -> { - if (disconnectAr.succeeded()) { - log.info("[testAuth][断开连接成功]"); - } else { - log.error("[testAuth][断开连接失败]", disconnectAr.cause()); - } - latch.countDown(); - }); - } else { - log.error("[testAuth][连接失败]", ar.cause()); - latch.countDown(); - } - }); - - // 3. 等待测试完成 - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!completed) { - log.warn("[testAuth][测试超时]"); + MqttClient client = createClient(authInfo); + try { + client.connect(SERVER_PORT, SERVER_HOST) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); + } finally { + disconnect(client); } } @@ -153,36 +130,35 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testTopoAdd][连接认证成功]"); - // 2.1 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/topo/add_reply", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建子设备认证信息 + IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( + SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); + IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() + .setClientId(subAuthInfo.getClientId()) + .setUsername(subAuthInfo.getUsername()) + .setPassword(subAuthInfo.getPassword()); - // 2.2 构建子设备认证信息 - IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( - SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); - IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() - .setClientId(subAuthInfo.getClientId()) - .setUsername(subAuthInfo.getUsername()) - .setPassword(subAuthInfo.getPassword()); + // 2.2 构建请求消息 + IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO() + .setSubDevices(Collections.singletonList(subDeviceAuth)); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), + params); - // 2.3 构建请求消息 - IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); - params.setSubDevices(Collections.singletonList(subDeviceAuth)); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), - params, - null, null, null); + // 2.3 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/topo/add_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribe(client, replyTopic); - // 3. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/topo/add", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testTopoAdd][响应消息: {}]", response); - - // 4. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/topo/add", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testTopoAdd][响应消息: {}]", response); + } finally { + disconnect(client); + } } /** @@ -196,29 +172,28 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testTopoDelete][连接认证成功]"); - // 2.1 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/topo/delete_reply", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建请求消息 + IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO() + .setSubDevices(Collections.singletonList( + new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), + params); - // 2.2 构建请求消息 - IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); - params.setSubDevices(Collections.singletonList( - new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), - params, - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/topo/delete_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribe(client, replyTopic); - // 3. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/topo/delete", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testTopoDelete][响应消息: {}]", response); - - // 4. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/topo/delete", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testTopoDelete][响应消息: {}]", response); + } finally { + disconnect(client); + } } /** @@ -232,27 +207,26 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testTopoGet][连接认证成功]"); - // 2.1 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/topo/get_reply", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建请求消息 + IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), + params); - // 2.2 构建请求消息 - IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), - params, - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/topo/get_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribe(client, replyTopic); - // 3. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/topo/get", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testTopoGet][响应消息: {}]", response); - - // 4. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/topo/get", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testTopoGet][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 子设备注册测试 ===================== @@ -270,29 +244,28 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testSubDeviceRegister][连接认证成功]"); - // 2.1 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/auth/sub-device/register_reply", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建请求消息 + IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO() + .setProductKey(SUB_DEVICE_PRODUCT_KEY) + .setDeviceName("mougezishebei-mqtt"); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), + Collections.singletonList(subDevice)); - // 2.2 构建请求消息 - IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); - subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); - subDevice.setDeviceName("mougezishebei-mqtt"); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), - Collections.singletonList(subDevice), - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/auth/sub-device/register_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribe(client, replyTopic); - // 3. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/auth/sub-device/register", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testSubDeviceRegister][响应消息: {}]", response); - - // 4. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/auth/sub-device/register", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testSubDeviceRegister][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 批量上报测试 ===================== @@ -308,64 +281,63 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { MqttClient client = connectAndAuth(); log.info("[testPropertyPackPost][连接认证成功]"); - // 2.1 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/event/property/pack/post_reply", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建【网关设备】自身属性 + Map gatewayProperties = MapUtil.builder() + .put("temperature", 25.5) + .build(); - // 2.2 构建【网关设备】自身属性 - Map gatewayProperties = MapUtil.builder() - .put("temperature", 25.5) - .build(); + // 2.2 构建【网关设备】自身事件 + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue() + .setValue(MapUtil.builder().put("message", "gateway started").build()) + .setTime(System.currentTimeMillis()); + Map gatewayEvents = MapUtil + .builder() + .put("statusReport", gatewayEvent) + .build(); - // 2.3 构建【网关设备】自身事件 - IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); - gatewayEvent.setTime(System.currentTimeMillis()); - Map gatewayEvents = MapUtil - .builder() - .put("statusReport", gatewayEvent) - .build(); + // 2.3 构建【网关子设备】属性 + Map subDeviceProperties = MapUtil.builder() + .put("power", 100) + .build(); - // 2.4 构建【网关子设备】属性 - Map subDeviceProperties = MapUtil.builder() - .put("power", 100) - .build(); + // 2.4 构建【网关子设备】事件 + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue() + .setValue(MapUtil.builder().put("errorCode", 0).build()) + .setTime(System.currentTimeMillis()); + Map subDeviceEvents = MapUtil + .builder() + .put("healthCheck", subDeviceEvent) + .build(); - // 2.5 构建【网关子设备】事件 - IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); - subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); - subDeviceEvent.setTime(System.currentTimeMillis()); - Map subDeviceEvents = MapUtil - .builder() - .put("healthCheck", subDeviceEvent) - .build(); + // 2.5 构建子设备数据 + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData() + .setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)) + .setProperties(subDeviceProperties) + .setEvents(subDeviceEvents); - // 2.6 构建子设备数据 - IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); - subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); - subDeviceData.setProperties(subDeviceProperties); - subDeviceData.setEvents(subDeviceEvents); + // 2.6 构建请求消息 + IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO() + .setProperties(gatewayProperties) + .setEvents(gatewayEvents) + .setSubDevices(ListUtil.of(subDeviceData)); + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), + params); - // 2.7 构建请求消息 - IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); - params.setProperties(gatewayProperties); - params.setEvents(gatewayEvents); - params.setSubDevices(ListUtil.of(subDeviceData)); - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), - params, - null, null, null); + // 2.7 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/event/property/pack/post_reply", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + subscribe(client, replyTopic); - // 3. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/event/property/pack/post", - GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testPropertyPackPost][响应消息: {}]", response); - - // 4. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/event/property/pack/post", + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testPropertyPackPost][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 辅助方法 ===================== @@ -376,7 +348,7 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { * @param authInfo 认证信息 * @return MQTT 客户端 */ - private MqttClient connect(IotDeviceAuthReqDTO authInfo) { + private MqttClient createClient(IotDeviceAuthReqDTO authInfo) { MqttClientOptions options = new MqttClientOptions() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) @@ -392,45 +364,24 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { * @return 已认证的 MQTT 客户端 */ private MqttClient connectAndAuth() throws Exception { - // 1. 创建客户端并连接 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); - MqttClient client = connect(authInfo); - - // 2.1 连接 - CompletableFuture future = new CompletableFuture<>(); + MqttClient client = createClient(authInfo); client.connect(SERVER_PORT, SERVER_HOST) - .onComplete(ar -> { - if (ar.succeeded()) { - future.complete(client); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2.2 等待连接结果 - return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return client; } /** - * 订阅响应主题 + * 订阅主题 * - * @param client MQTT 客户端 - * @param replyTopic 响应主题 + * @param client MQTT 客户端 + * @param topic 主题 */ - private void subscribeReply(MqttClient client, String replyTopic) throws Exception { - // 1. 订阅响应主题 - CompletableFuture future = new CompletableFuture<>(); - client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value()) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic); - future.complete(null); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2. 等待订阅结果 - future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + private void subscribe(MqttClient client, String topic) throws Exception { + client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[subscribe][订阅主题成功: {}]", topic); } /** @@ -441,34 +392,28 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { * @param request 请求消息 * @return 响应消息 */ - private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) { + private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) + throws Exception { // 1. 设置消息处理器,接收响应 - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture responseFuture = new CompletableFuture<>(); client.publishHandler(message -> { log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); - future.complete(response); + responseFuture.complete(response); }); // 2. 编码并发布消息 byte[] payload = CODEC.encode(request); log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", CODEC.type(), topic, new String(payload)); - client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[publishAndWaitReply][消息发布成功,messageId={}]", ar.result()); - } else { - log.error("[publishAndWaitReply][消息发布失败]", ar.cause()); - future.completeExceptionally(ar.cause()); - } - }); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[publishAndWaitReply][消息发布成功]"); - // 3. 等待响应(超时返回 null) + // 3. 等待响应 try { - return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { log.warn("[publishAndWaitReply][等待响应超时或失败]"); return null; @@ -481,19 +426,9 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { * @param client MQTT 客户端 */ private void disconnect(MqttClient client) throws Exception { - // 1. 断开连接 - CompletableFuture future = new CompletableFuture<>(); client.disconnect() - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[disconnect][断开连接成功]"); - future.complete(null); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2. 等待断开结果 - future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[disconnect][断开连接成功]"); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java index c14d2c676b..5173858923 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -22,7 +21,6 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -90,39 +88,19 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { */ @Test public void testAuth() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - // 1. 构建认证信息 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); log.info("[testAuth][认证信息: clientId={}, username={}, password={}]", authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); // 2. 创建客户端并连接 - MqttClient client = connect(authInfo); - client.connect(SERVER_PORT, SERVER_HOST) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); - // 断开连接 - client.disconnect() - .onComplete(disconnectAr -> { - if (disconnectAr.succeeded()) { - log.info("[testAuth][断开连接成功]"); - } else { - log.error("[testAuth][断开连接失败]", disconnectAr.cause()); - } - latch.countDown(); - }); - } else { - log.error("[testAuth][连接失败]", ar.cause()); - latch.countDown(); - } - }); - - // 3. 等待测试完成 - boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!completed) { - log.warn("[testAuth][测试超时]"); + MqttClient client = createClient(authInfo); + try { + client.connect(SERVER_PORT, SERVER_HOST) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[testAuth][连接成功,客户端 ID: {}]", client.clientId()); + } finally { + disconnect(client); } } @@ -138,28 +116,27 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { log.info("[testPropertyPost][连接认证成功]"); log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); - // 2. 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("power", 100) + .put("status", "online") + .put("temperature", 36.5) + .build())); - // 3. 构建属性上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), - IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("power", 100) - .put("status", "online") - .put("temperature", 36.5) - .build()), - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME); + subscribe(client, replyTopic); - // 4. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testPropertyPost][响应消息: {}]", response); - - // 5. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testPropertyPost][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 子设备事件上报测试 ===================== @@ -174,32 +151,31 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { log.info("[testEventPost][连接认证成功]"); log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); - // 2. 订阅 _reply 主题 - String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME); - subscribeReply(client, replyTopic); + try { + // 2.1 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.requestOf( + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "alarm", + MapUtil.builder() + .put("level", "warning") + .put("message", "temperature too high") + .put("threshold", 40) + .put("current", 42) + .build(), + System.currentTimeMillis())); - // 3. 构建事件上报消息 - IotDeviceMessage request = IotDeviceMessage.of( - IdUtil.fastSimpleUUID(), - IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), - IotDeviceEventPostReqDTO.of( - "alarm", - MapUtil.builder() - .put("level", "warning") - .put("message", "temperature too high") - .put("threshold", 40) - .put("current", 42) - .build(), - System.currentTimeMillis()), - null, null, null); + // 2.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME); + subscribe(client, replyTopic); - // 4. 发布消息并等待响应 - String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME); - IotDeviceMessage response = publishAndWaitReply(client, topic, request); - log.info("[testEventPost][响应消息: {}]", response); - - // 5. 断开连接 - disconnect(client); + // 3. 发布消息并等待响应 + String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME); + IotDeviceMessage response = publishAndWaitReply(client, topic, request); + log.info("[testEventPost][响应消息: {}]", response); + } finally { + disconnect(client); + } } // ===================== 辅助方法 ===================== @@ -210,7 +186,7 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { * @param authInfo 认证信息 * @return MQTT 客户端 */ - private MqttClient connect(IotDeviceAuthReqDTO authInfo) { + private MqttClient createClient(IotDeviceAuthReqDTO authInfo) { MqttClientOptions options = new MqttClientOptions() .setClientId(authInfo.getClientId()) .setUsername(authInfo.getUsername()) @@ -226,44 +202,23 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { * @return 已认证的 MQTT 客户端 */ private MqttClient connectAndAuth() throws Exception { - // 1. 创建客户端并连接 IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - MqttClient client = connect(authInfo); - - // 2.1 连接 - CompletableFuture future = new CompletableFuture<>(); + MqttClient client = createClient(authInfo); client.connect(SERVER_PORT, SERVER_HOST) - .onComplete(ar -> { - if (ar.succeeded()) { - future.complete(client); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2.2 等待连接结果 - return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return client; } /** - * 订阅响应主题 + * 订阅主题 * - * @param client MQTT 客户端 - * @param replyTopic 响应主题 + * @param client MQTT 客户端 + * @param topic 主题 */ - private void subscribeReply(MqttClient client, String replyTopic) throws Exception { - // 1. 订阅响应主题 - CompletableFuture future = new CompletableFuture<>(); - client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value()) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic); - future.complete(null); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2. 等待订阅结果 - future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + private void subscribe(MqttClient client, String topic) throws Exception { + client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[subscribe][订阅主题成功: {}]", topic); } /** @@ -274,34 +229,28 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { * @param request 请求消息 * @return 响应消息 */ - private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) { + private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) + throws Exception { // 1. 设置消息处理器,接收响应 - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture responseFuture = new CompletableFuture<>(); client.publishHandler(message -> { log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); - future.complete(response); + responseFuture.complete(response); }); // 2. 编码并发布消息 byte[] payload = CODEC.encode(request); log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", CODEC.type(), topic, new String(payload)); - client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[publishAndWaitReply][消息发布成功,messageId={}]", ar.result()); - } else { - log.error("[publishAndWaitReply][消息发布失败]", ar.cause()); - future.completeExceptionally(ar.cause()); - } - }); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[publishAndWaitReply][消息发布成功]"); - // 3. 等待响应(超时返回 null) + // 3. 等待响应 try { - return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { log.warn("[publishAndWaitReply][等待响应超时或失败]"); return null; @@ -314,19 +263,9 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { * @param client MQTT 客户端 */ private void disconnect(MqttClient client) throws Exception { - // 1. 断开连接 - CompletableFuture future = new CompletableFuture<>(); client.disconnect() - .onComplete(ar -> { - if (ar.succeeded()) { - log.info("[disconnect][断开连接成功]"); - future.complete(null); - } else { - future.completeExceptionally(ar.cause()); - } - }); - // 2. 等待断开结果 - future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[disconnect][断开连接成功]"); } }