diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index aa57281e04..3c62c0d221 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,23 +1,13 @@ package cn.iocoder.yudao.module.iot.gateway.config; -import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream.IotEmqxDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; -import io.vertx.core.Vertx; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - @Configuration @EnableConfigurationProperties(IotGatewayProperties.class) -@Slf4j public class IotGatewayConfiguration { @Bean @@ -30,36 +20,4 @@ public class IotGatewayConfiguration { return new IotProtocolManager(gatewayProperties); } - /** - * IoT 网关 EMQX 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.emqx", name = "enabled", havingValue = "true") - @Slf4j - public static class EmqxProtocolConfiguration { - - @Bean(name = "emqxVertx", destroyMethod = "close") - public Vertx emqxVertx() { - return Vertx.vertx(); - } - - @Bean - public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties, - @Qualifier("emqxVertx") Vertx emqxVertx) { - return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); - } - - @Bean - public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties, - @Qualifier("emqxVertx") Vertx emqxVertx) { - return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); - } - - @Bean - public IotEmqxDownstreamSubscriber iotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol mqttUpstreamProtocol, - IotMessageBus messageBus) { - return new IotEmqxDownstreamSubscriber(mqttUpstreamProtocol, messageBus); - } - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 8b2618aed2..d91a2f8041 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; @@ -31,11 +32,6 @@ public class IotGatewayProperties { */ private TokenProperties token; - /** - * 协议配置(旧版,保持兼容) - */ - private ProtocolProperties protocol; - /** * 协议实例列表 */ @@ -78,186 +74,6 @@ public class IotGatewayProperties { } - @Data - public static class ProtocolProperties { - - /** - * EMQX 组件配置 - */ - private EmqxProperties emqx; - - } - - @Data - public static class EmqxProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * HTTP 服务端口(默认:8090) - */ - private Integer httpPort = 8090; - - /** - * MQTT 服务器地址 - */ - @NotEmpty(message = "MQTT 服务器地址不能为空") - private String mqttHost; - - /** - * MQTT 服务器端口(默认:1883) - */ - @NotNull(message = "MQTT 服务器端口不能为空") - private Integer mqttPort = 1883; - - /** - * MQTT 用户名 - */ - @NotEmpty(message = "MQTT 用户名不能为空") - private String mqttUsername; - - /** - * MQTT 密码 - */ - @NotEmpty(message = "MQTT 密码不能为空") - private String mqttPassword; - - /** - * MQTT 客户端的 SSL 开关 - */ - @NotNull(message = "MQTT 是否开启 SSL 不能为空") - private Boolean mqttSsl = false; - - /** - * MQTT 客户端 ID(如果为空,系统将自动生成) - */ - @NotEmpty(message = "MQTT 客户端 ID 不能为空") - private String mqttClientId; - - /** - * MQTT 订阅的主题 - */ - @NotEmpty(message = "MQTT 主题不能为空") - private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; - - /** - * 默认 QoS 级别 - *

- * 0 - 最多一次 - * 1 - 至少一次 - * 2 - 刚好一次 - */ - private Integer mqttQos = 1; - - /** - * 连接超时时间(秒) - */ - private Integer connectTimeoutSeconds = 10; - - /** - * 重连延迟时间(毫秒) - */ - private Long reconnectDelayMs = 5000L; - - /** - * 是否启用 Clean Session (清理会话) - * true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。 - * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。 - */ - private Boolean cleanSession = true; - - /** - * 心跳间隔(秒) - * 用于保持连接活性,及时发现网络中断。 - */ - private Integer keepAliveIntervalSeconds = 60; - - /** - * 最大未确认消息队列大小 - * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。 - */ - private Integer maxInflightQueue = 10000; - - /** - * 是否信任所有 SSL 证书 - * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用! - * 在生产环境中,应设置为 false,并配置正确的信任库。 - */ - private Boolean trustAll = false; - - /** - * 遗嘱消息配置 (用于网关异常下线时通知其他系统) - */ - private final Will will = new Will(); - - /** - * 高级 SSL/TLS 配置 (用于生产环境) - */ - private final Ssl sslOptions = new Ssl(); - - /** - * 遗嘱消息 (Last Will and Testament) - */ - @Data - public static class Will { - - /** - * 是否启用遗嘱消息 - */ - private boolean enabled = false; - /** - * 遗嘱消息主题 - */ - private String topic; - /** - * 遗嘱消息内容 - */ - private String payload; - /** - * 遗嘱消息 QoS 等级 - */ - private Integer qos = 1; - /** - * 遗嘱消息是否作为保留消息发布 - */ - private boolean retain = true; - - } - - /** - * 高级 SSL/TLS 配置 - */ - @Data - public static class Ssl { - - /** - * 密钥库(KeyStore)路径,例如:classpath:certs/client.jks - * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。 - */ - private String keyStorePath; - /** - * 密钥库密码 - */ - private String keyStorePassword; - /** - * 信任库(TrustStore)路径,例如:classpath:certs/trust.jks - * 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。 - */ - private String trustStorePath; - /** - * 信任库密码 - */ - private String trustStorePassword; - - } - - } - - // NOTE:暂未统一为 ProtocolProperties,待协议改造完成再调整 /** * 协议实例配置 */ @@ -283,6 +99,10 @@ public class IotGatewayProperties { private String protocol; /** * 服务端口 + *

+ * 不同协议含义不同: + * 1. TCP/UDP/HTTP/WebSocket/MQTT/CoAP:对应网关自身监听的服务端口 + * 2. EMQX:对应网关提供给 EMQX 回调的 HTTP Hook 端口(/mqtt/auth、/mqtt/acl、/mqtt/event) */ @NotNull(message = "服务端口不能为空") private Integer port; @@ -292,7 +112,7 @@ public class IotGatewayProperties { * @see cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum * * 为什么是可选的呢? - * 1. {@link IotProtocolTypeEnum#HTTP}、${@link IotProtocolTypeEnum#COAP} 协议,目前强制是 JSON 格式 + * 1. {@link IotProtocolTypeEnum#HTTP}、{@link IotProtocolTypeEnum#COAP} 协议,目前强制是 JSON 格式 * 2. {@link IotProtocolTypeEnum#EMQX} 协议,目前支持根据产品(设备)配置的序列化类型来解析 */ private String serialize; @@ -304,13 +124,17 @@ public class IotGatewayProperties { */ @Valid private IotHttpConfig http; + /** + * WebSocket 协议配置 + */ + @Valid + private IotWebSocketConfig websocket; /** * TCP 协议配置 */ @Valid private IotTcpConfig tcp; - /** * UDP 协议配置 */ @@ -323,17 +147,16 @@ public class IotGatewayProperties { @Valid private IotCoapConfig coap; - /** - * WebSocket 协议配置 - */ - @Valid - private IotWebSocketConfig websocket; - /** * MQTT 协议配置 */ @Valid private IotMqttConfig mqtt; + /** + * EMQX 协议配置 + */ + @Valid + private IotEmqxConfig emqx; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java index b97da74a72..5d1b6c6d7f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.BooleanUtil; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol; @@ -109,6 +110,8 @@ public class IotProtocolManager implements SmartLifecycle { return createWebSocketProtocol(config); case MQTT: return createMqttProtocol(config); + case EMQX: + return createEmqxProtocol(config); default: throw new IllegalArgumentException(String.format( "[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType)); @@ -175,4 +178,14 @@ public class IotProtocolManager implements SmartLifecycle { return new IotMqttProtocol(config); } + /** + * 创建 EMQX 协议实例 + * + * @param config 协议实例配置 + * @return EMQX 协议实例 + */ + private IotEmqxProtocol createEmqxProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { + return new IotEmqxProtocol(config); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java deleted file mode 100644 index 5d3b5e3c00..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java +++ /dev/null @@ -1,104 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; - -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream.IotEmqxAuthEventHandler; -import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; -import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServer; -import io.vertx.ext.web.Router; -import io.vertx.ext.web.handler.BodyHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 EMQX 认证事件协议服务 - *

- * 为 EMQX 提供 HTTP 接口服务,包括: - * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 - * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 - * - * @author 芋道源码 - */ -@Slf4j -public class IotEmqxAuthEventProtocol { - - private final IotGatewayProperties.EmqxProperties emqxProperties; - - private final String serverId; - - private final Vertx vertx; - - private HttpServer httpServer; - - public IotEmqxAuthEventProtocol(IotGatewayProperties.EmqxProperties emqxProperties, - Vertx vertx) { - this.emqxProperties = emqxProperties; - this.vertx = vertx; - this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); - } - - @PostConstruct - public void start() { - try { - startHttpServer(); - log.info("[start][IoT 网关 EMQX 认证事件协议服务启动成功, 端口: {}]", emqxProperties.getHttpPort()); - } catch (Exception e) { - log.error("[start][IoT 网关 EMQX 认证事件协议服务启动失败]", e); - throw e; - } - } - - @PreDestroy - public void stop() { - stopHttpServer(); - log.info("[stop][IoT 网关 EMQX 认证事件协议服务已停止]"); - } - - /** - * 启动 HTTP 服务器 - */ - private void startHttpServer() { - int port = emqxProperties.getHttpPort(); - - // 1. 创建路由 - Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); - - // 2. 创建处理器,传入 serverId - IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId); - router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth); - router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent); - // TODO @haohao:/mqtt/acl 需要处理么? - // TODO @芋艿:已在 EMQX 处理,如果是“设备直连”模式需要处理 - - // 3. 启动 HTTP 服务器 - try { - httpServer = vertx.createHttpServer() - .requestHandler(router) - .listen(port) - .result(); - } catch (Exception e) { - log.error("[startHttpServer][HTTP 服务器启动失败, 端口: {}]", port, e); - throw e; - } - } - - /** - * 停止 HTTP 服务器 - */ - private void stopHttpServer() { - if (httpServer == null) { - return; - } - - try { - httpServer.close().result(); - log.info("[stopHttpServer][HTTP 服务器已停止]"); - } catch (Exception e) { - log.error("[stopHttpServer][HTTP 服务器停止失败]", e); - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxConfig.java new file mode 100644 index 0000000000..bc039fe5c4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxConfig.java @@ -0,0 +1,225 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +/** + * IoT EMQX 协议配置 + * + * @author 芋道源码 + */ +@Data +public class IotEmqxConfig { + + // ========== MQTT Client 配置(连接 EMQX Broker) ========== + + /** + * MQTT 服务器地址 + */ + @NotEmpty(message = "MQTT 服务器地址不能为空") + private String mqttHost; + + /** + * MQTT 服务器端口(默认:1883) + */ + @NotNull(message = "MQTT 服务器端口不能为空") + private Integer mqttPort = 1883; + + /** + * MQTT 用户名 + */ + @NotEmpty(message = "MQTT 用户名不能为空") + private String mqttUsername; + + /** + * MQTT 密码 + */ + @NotEmpty(message = "MQTT 密码不能为空") + private String mqttPassword; + + /** + * MQTT 客户端的 SSL 开关 + */ + @NotNull(message = "MQTT 是否开启 SSL 不能为空") + private Boolean mqttSsl = false; + + /** + * MQTT 客户端 ID + */ + @NotEmpty(message = "MQTT 客户端 ID 不能为空") + private String mqttClientId; + + /** + * MQTT 订阅的主题 + */ + @NotEmpty(message = "MQTT 主题不能为空") + private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics; + + /** + * 默认 QoS 级别 + *

+ * 0 - 最多一次 + * 1 - 至少一次 + * 2 - 刚好一次 + */ + @NotNull(message = "MQTT QoS 不能为空") + @Min(value = 0, message = "MQTT QoS 不能小于 0") + @Max(value = 2, message = "MQTT QoS 不能大于 2") + private Integer mqttQos = 1; + + /** + * 连接超时时间(秒) + */ + @NotNull(message = "连接超时时间不能为空") + @Min(value = 1, message = "连接超时时间不能小于 1 秒") + private Integer connectTimeoutSeconds = 10; + + /** + * 重连延迟时间(毫秒) + */ + @NotNull(message = "重连延迟时间不能为空") + @Min(value = 0, message = "重连延迟时间不能小于 0 毫秒") + private Long reconnectDelayMs = 5000L; + + /** + * 是否启用 Clean Session (清理会话) + * true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。 + * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。 + */ + @NotNull(message = "是否启用 Clean Session 不能为空") + private Boolean cleanSession = true; + + /** + * 心跳间隔(秒) + * 用于保持连接活性,及时发现网络中断。 + */ + @NotNull(message = "心跳间隔不能为空") + @Min(value = 1, message = "心跳间隔不能小于 1 秒") + private Integer keepAliveIntervalSeconds = 60; + + /** + * 最大未确认消息队列大小 + * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。 + */ + @NotNull(message = "最大未确认消息队列大小不能为空") + @Min(value = 1, message = "最大未确认消息队列大小不能小于 1") + private Integer maxInflightQueue = 10000; + + /** + * 是否信任所有 SSL 证书 + * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用! + * 在生产环境中,应设置为 false,并配置正确的信任库。 + */ + @NotNull(message = "是否信任所有 SSL 证书不能为空") + private Boolean trustAll = false; + + // ========== MQTT Will / SSL 高级配置 ========== + + /** + * 遗嘱消息配置 (用于网关异常下线时通知其他系统) + */ + @Valid + private Will will = new Will(); + + /** + * 高级 SSL/TLS 配置 (用于生产环境) + */ + @Valid + private Ssl sslOptions = new Ssl(); + + // ========== HTTP Hook 配置(网关提供给 EMQX 调用) ========== + + /** + * HTTP Hook 服务配置(用于 /mqtt/auth、/mqtt/event) + */ + @Valid + private Http http = new Http(); + + /** + * 遗嘱消息 (Last Will and Testament) + */ + @Data + public static class Will { + + /** + * 是否启用遗嘱消息 + */ + private boolean enabled = false; + /** + * 遗嘱消息主题 + */ + private String topic; + /** + * 遗嘱消息内容 + */ + private String payload; + /** + * 遗嘱消息 QoS 等级 + */ + @Min(value = 0, message = "遗嘱消息 QoS 不能小于 0") + @Max(value = 2, message = "遗嘱消息 QoS 不能大于 2") + private Integer qos = 1; + /** + * 遗嘱消息是否作为保留消息发布 + */ + private boolean retain = true; + + } + + /** + * 高级 SSL/TLS 配置 + */ + @Data + public static class Ssl { + + /** + * 密钥库(KeyStore)路径,例如:classpath:certs/client.jks + * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。 + */ + private String keyStorePath; + /** + * 密钥库密码 + */ + private String keyStorePassword; + /** + * 信任库(TrustStore)路径,例如:classpath:certs/trust.jks + * 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。 + */ + private String trustStorePath; + /** + * 信任库密码 + */ + private String trustStorePassword; + + } + + /** + * HTTP Hook 服务 SSL 配置 + */ + @Data + public static class Http { + + /** + * 是否启用 SSL + */ + private Boolean sslEnabled = false; + + /** + * SSL 证书路径 + */ + private String sslCertPath; + + /** + * SSL 私钥路径 + */ + private String sslKeyPath; + + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java new file mode 100644 index 0000000000..226e421024 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java @@ -0,0 +1,503 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream.IotEmqxDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream.IotEmqxAuthEventHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream.IotEmqxUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.JksOptions; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertMap; + +/** + * IoT 网关 EMQX 协议实现: + *

+ * 1. 提供 HTTP Hook 服务(/mqtt/auth、/mqtt/acl、/mqtt/event)给 EMQX 调用 + * 2. 通过 MQTT Client 订阅设备上行消息,并发布下行消息到 Broker + * + * @author 芋道源码 + */ +@Slf4j +public class IotEmqxProtocol implements IotProtocol { + + /** + * 协议配置 + */ + private final ProtocolInstanceProperties properties; + /** + * EMQX 配置 + */ + private final IotEmqxConfig emqxConfig; + /** + * 服务器 ID + */ + @Getter + private final String serverId; + + /** + * 运行状态 + */ + @Getter + private volatile boolean running = false; + + /** + * Vert.x 实例 + */ + private Vertx vertx; + /** + * HTTP Hook 服务器 + */ + private HttpServer httpServer; + + /** + * MQTT Client + */ + private volatile MqttClient mqttClient; + /** + * MQTT 重连定时器 ID + */ + private volatile Long reconnectTimerId; + + /** + * 上行消息处理器 + */ + private final IotEmqxUpstreamHandler upstreamHandler; + + /** + * 下行消息订阅者 + */ + private final IotEmqxDownstreamSubscriber downstreamSubscriber; + + public IotEmqxProtocol(ProtocolInstanceProperties properties) { + Assert.notNull(properties, "协议实例配置不能为空"); + Assert.notNull(properties.getEmqx(), "EMQX 协议配置(emqx)不能为空"); + this.properties = properties; + this.emqxConfig = properties.getEmqx(); + Assert.notNull(emqxConfig.getConnectTimeoutSeconds(), + "MQTT 连接超时时间(emqx.connect-timeout-seconds)不能为空"); + this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); + this.upstreamHandler = new IotEmqxUpstreamHandler(serverId); + + // 初始化下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotEmqxDownstreamSubscriber(this, messageBus); + } + + @Override + public String getId() { + return properties.getId(); + } + + @Override + public IotProtocolTypeEnum getType() { + return IotProtocolTypeEnum.EMQX; + } + + @Override + public void start() { + if (running) { + log.warn("[start][IoT EMQX 协议 {} 已经在运行中]", getId()); + return; + } + + // 1.1 创建 Vertx 实例 + this.vertx = Vertx.vertx(); + + try { + // 1.2 启动 HTTP Hook 服务 + startHttpServer(); + + // 1.3 启动 MQTT Client + startMqttClient(); + running = true; + log.info("[start][IoT EMQX 协议 {} 启动成功,hookPort:{},serverId:{}]", + getId(), properties.getPort(), serverId); + + // 2. 启动下行消息订阅者 + this.downstreamSubscriber.start(); + } catch (Exception e) { + log.error("[start][IoT EMQX 协议 {} 启动失败]", getId(), e); + // 启动失败时,关闭资源 + stop0(); + throw e; + } + } + + @Override + public void stop() { + if (!running) { + return; + } + stop0(); + } + + private void stop0() { + // 1. 停止下行消息订阅者 + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT EMQX 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT EMQX 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + + // 2.1 先置为 false:避免 closeHandler 触发重连 + running = false; + stopMqttClientReconnectChecker(); + // 2.2 停止 MQTT Client + stopMqttClient(); + + // 2.3 停止 HTTP Hook 服务 + stopHttpServer(); + + // 2.4 关闭 Vertx + if (vertx != null) { + try { + vertx.close().result(); + log.info("[stop][IoT EMQX 协议 {} Vertx 已关闭]", getId()); + } catch (Exception e) { + log.error("[stop][IoT EMQX 协议 {} Vertx 关闭失败]", getId(), e); + } + vertx = null; + } + + log.info("[stop][IoT EMQX 协议 {} 已停止]", getId()); + } + + // ======================================= HTTP Hook Server ======================================= + + /** + * 启动 HTTP Hook 服务(/mqtt/auth、/mqtt/acl、/mqtt/event) + */ + private void startHttpServer() { + // 1. 创建路由 + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + + // 2. 创建处理器 + IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId); + router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth); + router.post(IotMqttTopicUtils.MQTT_ACL_PATH).handler(handler::handleAcl); + router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent); + + // 3. 启动 HTTP Server(支持 HTTPS) + IotEmqxConfig.Http httpConfig = emqxConfig.getHttp(); + HttpServerOptions options = new HttpServerOptions().setPort(properties.getPort()); + if (httpConfig != null && Boolean.TRUE.equals(httpConfig.getSslEnabled())) { + Assert.notBlank(httpConfig.getSslCertPath(), "EMQX HTTP SSL 证书路径(emqx.http.ssl-cert-path)不能为空"); + Assert.notBlank(httpConfig.getSslKeyPath(), "EMQX HTTP SSL 私钥路径(emqx.http.ssl-key-path)不能为空"); + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() + .setKeyPath(httpConfig.getSslKeyPath()) + .setCertPath(httpConfig.getSslCertPath()); + options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + try { + httpServer = vertx.createHttpServer(options) + .requestHandler(router) + .listen() + .result(); + log.info("[startHttpServer][IoT EMQX 协议 {} HTTP Hook 服务启动成功, port: {}, ssl: {}]", + getId(), properties.getPort(), httpConfig != null && Boolean.TRUE.equals(httpConfig.getSslEnabled())); + } catch (Exception e) { + log.error("[startHttpServer][IoT EMQX 协议 {} HTTP Hook 服务启动失败, port: {}]", getId(), properties.getPort(), e); + throw e; + } + } + + private void stopHttpServer() { + if (httpServer == null) { + return; + } + try { + httpServer.close().result(); + log.info("[stopHttpServer][IoT EMQX 协议 {} HTTP Hook 服务已停止]", getId()); + } catch (Exception e) { + log.error("[stopHttpServer][IoT EMQX 协议 {} HTTP Hook 服务停止失败]", getId(), e); + } finally { + httpServer = null; + } + } + + // ======================================= MQTT Client ====================================== + + private void startMqttClient() { + // 1.1 创建 MQTT Client + MqttClient client = createMqttClient(); + this.mqttClient = client; + // 1.2 连接 MQTT Broker + if (!connectMqttClient(client)) { + throw new RuntimeException("MQTT Client 启动失败: 连接 Broker 失败"); + } + + // 2. 启动定时重连检查 + startMqttClientReconnectChecker(); + } + + private void stopMqttClient() { + MqttClient client = this.mqttClient; + if (client == null || !client.isConnected()) { + return; + } + this.mqttClient = null; + + // 1. 批量取消订阅 + List topicList = emqxConfig.getMqttTopics(); + if (CollUtil.isNotEmpty(topicList)) { + try { + client.unsubscribe(topicList).toCompletionStage().toCompletableFuture() + .get(5, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("[stopMqttClient][IoT EMQX 协议 {} 取消订阅异常]", getId(), e); + } + } + + // 2. 断开 MQTT 连接 + try { + client.disconnect().toCompletionStage().toCompletableFuture() + .get(5, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("[stopMqttClient][IoT EMQX 协议 {} 断开连接异常]", getId(), e); + } + } + + // ======================================= MQTT 基础方法 ====================================== + + /** + * 创建 MQTT 客户端 + * + * @return 新创建的 MqttClient + */ + private MqttClient createMqttClient() { + // 1.1 基础配置 + MqttClientOptions options = new MqttClientOptions() + .setClientId(emqxConfig.getMqttClientId()) + .setUsername(emqxConfig.getMqttUsername()) + .setPassword(emqxConfig.getMqttPassword()) + .setSsl(emqxConfig.getMqttSsl()) + .setCleanSession(emqxConfig.getCleanSession()) + .setKeepAliveInterval(emqxConfig.getKeepAliveIntervalSeconds()) + .setMaxInflightQueue(emqxConfig.getMaxInflightQueue()); + options.setConnectTimeout(emqxConfig.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒 + options.setTrustAll(emqxConfig.getTrustAll()); + // 1.2 配置遗嘱消息 + IotEmqxConfig.Will will = emqxConfig.getWill(); + if (will.isEnabled()) { + Assert.notBlank(will.getTopic(), "遗嘱消息主题(emqx.will.topic)不能为空"); + Assert.notNull(will.getPayload(), "遗嘱消息内容(emqx.will.payload)不能为空"); + options.setWillFlag(true) + .setWillTopic(will.getTopic()) + .setWillMessageBytes(Buffer.buffer(will.getPayload())) + .setWillQoS(will.getQos()) + .setWillRetain(will.isRetain()); + } + // 1.3 配置高级 SSL/TLS(仅在启用 SSL 且不信任所有证书时生效) + if (Boolean.TRUE.equals(emqxConfig.getMqttSsl()) && !Boolean.TRUE.equals(emqxConfig.getTrustAll())) { + IotEmqxConfig.Ssl sslOptions = emqxConfig.getSslOptions(); + if (StrUtil.isNotBlank(sslOptions.getTrustStorePath())) { + options.setTrustStoreOptions(new JksOptions() + .setPath(sslOptions.getTrustStorePath()) + .setPassword(sslOptions.getTrustStorePassword())); + } + if (StrUtil.isNotBlank(sslOptions.getKeyStorePath())) { + options.setKeyStoreOptions(new JksOptions() + .setPath(sslOptions.getKeyStorePath()) + .setPassword(sslOptions.getKeyStorePassword())); + } + } + + // 2. 创建客户端 + return MqttClient.create(vertx, options); + } + + /** + * 连接 MQTT Broker(同步等待) + * + * @param client MQTT 客户端 + * @return 连接成功返回 true,失败返回 false + */ + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + private synchronized boolean connectMqttClient(MqttClient client) { + String host = emqxConfig.getMqttHost(); + int port = emqxConfig.getMqttPort(); + int timeoutSeconds = emqxConfig.getConnectTimeoutSeconds(); + try { + // 1. 连接 Broker + client.connect(port, host).toCompletionStage().toCompletableFuture() + .get(timeoutSeconds, TimeUnit.SECONDS); + log.info("[connectMqttClient][IoT EMQX 协议 {} 连接成功, host: {}, port: {}]", + getId(), host, port); + + // 2. 设置处理器 + setupMqttClientHandlers(client); + subscribeMqttClientTopics(client); + return true; + } catch (Exception e) { + log.error("[connectMqttClient][IoT EMQX 协议 {} 连接发生异常]", getId(), e); + return false; + } + } + + /** + * 关闭 MQTT 客户端 + */ + private void closeMqttClient() { + MqttClient oldClient = this.mqttClient; + if (oldClient == null || !oldClient.isConnected()) { + return; + } + this.mqttClient = null; + try { + oldClient.disconnect().toCompletionStage().toCompletableFuture() + .get(5, TimeUnit.SECONDS); + } catch (Exception ignored) { + } + } + + // ======================================= MQTT 重连机制 ====================================== + + /** + * 启动 MQTT Client 周期性重连检查器 + */ + private void startMqttClientReconnectChecker() { + long interval = emqxConfig.getReconnectDelayMs(); + this.reconnectTimerId = vertx.setPeriodic(interval, timerId -> { + if (!running) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + log.info("[startMqttClientReconnectChecker][IoT EMQX 协议 {} 检测到断开,尝试重连]", getId()); + tryReconnectMqttClient(); + }); + } + + /** + * 停止 MQTT Client 重连检查器 + */ + private void stopMqttClientReconnectChecker() { + if (reconnectTimerId != null && vertx != null) { + try { + vertx.cancelTimer(reconnectTimerId); + } catch (Exception ignored) { + } + reconnectTimerId = null; + } + } + + /** + * 尝试重连 MQTT Client + */ + private synchronized void tryReconnectMqttClient() { + // 1. 前置检查 + if (!running) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + + log.info("[tryReconnectMqttClient][IoT EMQX 协议 {} 开始重连]", getId()); + try { + // 2. 关闭旧客户端 + closeMqttClient(); + + // 3.1 创建新客户端 + MqttClient client = createMqttClient(); + this.mqttClient = client; + // 3.2 连接(失败只打印日志,等下次定时) + if (!connectMqttClient(client)) { + log.warn("[tryReconnectMqttClient][IoT EMQX 协议 {} 重连失败,等待下次重试]", getId()); + } + } catch (Exception e) { + log.error("[tryReconnectMqttClient][IoT EMQX 协议 {} 重连异常]", getId(), e); + } + } + + // ======================================= MQTT Handler ====================================== + + /** + * 设置 MQTT Client 事件处理器 + */ + private void setupMqttClientHandlers(MqttClient client) { + // 1. 断开重连监听 + client.closeHandler(closeEvent -> { + if (!running) { + return; + } + log.warn("[setupMqttClientHandlers][IoT EMQX 协议 {} 连接断开,立即尝试重连]", getId()); + vertx.runOnContext(v -> tryReconnectMqttClient()); + }); + + // 2. 异常处理 + client.exceptionHandler(exception -> + log.error("[setupMqttClientHandlers][IoT EMQX 协议 {} MQTT Client 异常]", getId(), exception)); + + // 3. 上行消息处理 + client.publishHandler(upstreamHandler::handle); + } + + /** + * 订阅 MQTT Client 主题(同步等待) + */ + private void subscribeMqttClientTopics(MqttClient client) { + List topicList = emqxConfig.getMqttTopics(); + if (!client.isConnected()) { + log.warn("[subscribeMqttClientTopics][IoT EMQX 协议 {} MQTT Client 未连接, 跳过订阅]", getId()); + return; + } + if (CollUtil.isEmpty(topicList)) { + log.warn("[subscribeMqttClientTopics][IoT EMQX 协议 {} 未配置订阅主题, 跳过订阅]", getId()); + return; + } + // 执行订阅 + Map topics = convertMap(emqxConfig.getMqttTopics(), topic -> topic, + topic -> emqxConfig.getMqttQos()); + try { + client.subscribe(topics).toCompletionStage().toCompletableFuture() + .get(10, TimeUnit.SECONDS); + log.info("[subscribeMqttClientTopics][IoT EMQX 协议 {} 订阅成功, 共 {} 个主题]", getId(), topicList.size()); + } catch (Exception e) { + log.error("[subscribeMqttClientTopics][IoT EMQX 协议 {} 订阅失败]", getId(), e); + } + } + + /** + * 发布消息到 MQTT Broker + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void publishMessage(String topic, byte[] payload) { + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("[publishMessage][IoT EMQX 协议 {} MQTT Client 未连接, 无法发布消息]", getId()); + return; + } + MqttQoS qos = MqttQoS.valueOf(emqxConfig.getMqttQos()); + mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java deleted file mode 100644 index 5ebaa1f01e..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ /dev/null @@ -1,386 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; - -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream.IotEmqxUpstreamHandler; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.net.JksOptions; -import io.vertx.mqtt.MqttClient; -import io.vertx.mqtt.MqttClientOptions; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * IoT 网关 EMQX 协议:接收设备上行消息 - * - * @author 芋道源码 - */ -@Slf4j -public class IotEmqxUpstreamProtocol implements IotProtocol { - - private static final String ID = "emqx"; - - private final IotGatewayProperties.EmqxProperties emqxProperties; - - private volatile boolean running = false; - - private final Vertx vertx; - - @Getter - private final String serverId; - - private MqttClient mqttClient; - - private IotEmqxUpstreamHandler upstreamHandler; - - public IotEmqxUpstreamProtocol(IotGatewayProperties.EmqxProperties emqxProperties, - Vertx vertx) { - this.emqxProperties = emqxProperties; - this.serverId = IotDeviceMessageUtils.generateServerId(emqxProperties.getMqttPort()); - this.vertx = vertx; - } - - @Override - public String getId() { - return ID; - } - - @Override - public IotProtocolTypeEnum getType() { - return IotProtocolTypeEnum.EMQX; - } - - @Override - @PostConstruct - public void start() { - if (running) { - return; - } - - try { - // 1. 启动 MQTT 客户端 - startMqttClient(); - - // 2. 标记服务为运行状态 - running = true; - log.info("[start][IoT 网关 EMQX 协议启动成功]"); - } catch (Exception e) { - log.error("[start][IoT 网关 EMQX 协议服务启动失败,应用将关闭]", e); - stop(); - - // 异步关闭应用 - Thread shutdownThread = new Thread(() -> { - try { - // 确保日志输出完成,使用更优雅的方式 - log.error("[start][由于 MQTT 连接失败,正在关闭应用]"); - // 等待日志输出完成 - Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - log.warn("[start][应用关闭被中断]"); - } - System.exit(1); - }); - shutdownThread.setDaemon(true); - shutdownThread.setName("emergency-shutdown"); - shutdownThread.start(); - - throw e; - } - } - - @Override - @PreDestroy - public void stop() { - if (!running) { - return; - } - - // 1. 停止 MQTT 客户端 - stopMqttClient(); - - // 2. 标记服务为停止状态 - running = false; - log.info("[stop][IoT 网关 MQTT 协议服务已停止]"); - } - - @Override - public boolean isRunning() { - return running; - } - - /** - * 启动 MQTT 客户端 - */ - private void startMqttClient() { - try { - // 1. 初始化消息处理器 - this.upstreamHandler = new IotEmqxUpstreamHandler(this); - - // 2. 创建 MQTT 客户端 - createMqttClient(); - - // 3. 同步连接 MQTT Broker - connectMqttSync(); - } catch (Exception e) { - log.error("[startMqttClient][MQTT 客户端启动失败]", e); - throw new RuntimeException("MQTT 客户端启动失败: " + e.getMessage(), e); - } - } - - /** - * 同步连接 MQTT Broker - */ - private void connectMqttSync() { - String host = emqxProperties.getMqttHost(); - int port = emqxProperties.getMqttPort(); - // 1. 连接 MQTT Broker - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean success = new AtomicBoolean(false); - mqttClient.connect(port, host, connectResult -> { - if (connectResult.succeeded()) { - log.info("[connectMqttSync][MQTT 客户端连接成功, host: {}, port: {}]", host, port); - setupMqttHandlers(); - subscribeToTopics(); - success.set(true); - } else { - log.error("[connectMqttSync][连接 MQTT Broker 失败, host: {}, port: {}]", - host, port, connectResult.cause()); - } - latch.countDown(); - }); - - // 2. 等待连接结果 - try { - // 应用层超时控制:防止启动过程无限阻塞,与MQTT客户端的网络超时是不同层次的控制 - boolean awaitResult = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); - if (!awaitResult) { - log.error("[connectMqttSync][等待连接结果超时]"); - throw new RuntimeException("连接 MQTT Broker 超时"); - } - if (!success.get()) { - throw new RuntimeException(String.format("首次连接 MQTT Broker 失败,地址: %s, 端口: %d", host, port)); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("[connectMqttSync][等待连接结果被中断]", e); - throw new RuntimeException("连接 MQTT Broker 被中断", e); - } - } - - /** - * 异步连接 MQTT Broker - */ - private void connectMqttAsync() { - String host = emqxProperties.getMqttHost(); - int port = emqxProperties.getMqttPort(); - mqttClient.connect(port, host, connectResult -> { - if (connectResult.succeeded()) { - log.info("[connectMqttAsync][MQTT 客户端重连成功]"); - setupMqttHandlers(); - subscribeToTopics(); - } else { - log.error("[connectMqttAsync][连接 MQTT Broker 失败, host: {}, port: {}]", - host, port, connectResult.cause()); - log.warn("[connectMqttAsync][重连失败,将再次尝试]"); - reconnectWithDelay(); - } - }); - } - - /** - * 延迟重连 - */ - private void reconnectWithDelay() { - if (!running) { - return; - } - if (mqttClient != null && mqttClient.isConnected()) { - return; - } - - long delay = emqxProperties.getReconnectDelayMs(); - log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay); - vertx.setTimer(delay, timerId -> { - if (!running) { - return; - } - if (mqttClient != null && mqttClient.isConnected()) { - return; - } - - log.info("[reconnectWithDelay][开始重连 MQTT Broker]"); - try { - createMqttClient(); - connectMqttAsync(); - } catch (Exception e) { - log.error("[reconnectWithDelay][重连过程中发生异常]", e); - vertx.setTimer(delay, t -> reconnectWithDelay()); - } - }); - } - - /** - * 停止 MQTT 客户端 - */ - private void stopMqttClient() { - if (mqttClient == null) { - return; - } - try { - if (mqttClient.isConnected()) { - // 1. 取消订阅所有主题 - List topicList = emqxProperties.getMqttTopics(); - for (String topic : topicList) { - try { - mqttClient.unsubscribe(topic); - } catch (Exception e) { - log.warn("[stopMqttClient][取消订阅主题({})异常]", topic, e); - } - } - - // 2. 断开 MQTT 客户端连接 - try { - CountDownLatch disconnectLatch = new CountDownLatch(1); - mqttClient.disconnect(ar -> disconnectLatch.countDown()); - if (!disconnectLatch.await(5, java.util.concurrent.TimeUnit.SECONDS)) { - log.warn("[stopMqttClient][断开 MQTT 连接超时]"); - } - } catch (Exception e) { - log.warn("[stopMqttClient][关闭 MQTT 客户端异常]", e); - } - } - } catch (Exception e) { - log.warn("[stopMqttClient][停止 MQTT 客户端过程中发生异常]", e); - } finally { - mqttClient = null; - } - } - - /** - * 创建 MQTT 客户端 - */ - private void createMqttClient() { - // 1.1 创建基础配置 - MqttClientOptions options = (MqttClientOptions) new MqttClientOptions() - .setClientId(emqxProperties.getMqttClientId()) - .setUsername(emqxProperties.getMqttUsername()) - .setPassword(emqxProperties.getMqttPassword()) - .setSsl(emqxProperties.getMqttSsl()) - .setCleanSession(emqxProperties.getCleanSession()) - .setKeepAliveInterval(emqxProperties.getKeepAliveIntervalSeconds()) - .setMaxInflightQueue(emqxProperties.getMaxInflightQueue()) - .setConnectTimeout(emqxProperties.getConnectTimeoutSeconds() * 1000) // Vert.x 需要毫秒 - .setTrustAll(emqxProperties.getTrustAll()); - // 1.2 配置遗嘱消息 - IotGatewayProperties.EmqxProperties.Will will = emqxProperties.getWill(); - if (will.isEnabled()) { - Assert.notBlank(will.getTopic(), "遗嘱消息主题(will.topic)不能为空"); - Assert.notNull(will.getPayload(), "遗嘱消息内容(will.payload)不能为空"); - options.setWillFlag(true) - .setWillTopic(will.getTopic()) - .setWillMessageBytes(Buffer.buffer(will.getPayload())) - .setWillQoS(will.getQos()) - .setWillRetain(will.isRetain()); - } - // 1.3 配置高级 SSL/TLS (仅在启用 SSL 且不信任所有证书时生效) - if (Boolean.TRUE.equals(emqxProperties.getMqttSsl()) && !Boolean.TRUE.equals(emqxProperties.getTrustAll())) { - IotGatewayProperties.EmqxProperties.Ssl sslOptions = emqxProperties.getSslOptions(); - if (StrUtil.isNotBlank(sslOptions.getTrustStorePath())) { - options.setTrustStoreOptions(new JksOptions() - .setPath(sslOptions.getTrustStorePath()) - .setPassword(sslOptions.getTrustStorePassword())); - } - if (StrUtil.isNotBlank(sslOptions.getKeyStorePath())) { - options.setKeyStoreOptions(new JksOptions() - .setPath(sslOptions.getKeyStorePath()) - .setPassword(sslOptions.getKeyStorePassword())); - } - } - // 1.4 安全警告日志 - if (Boolean.TRUE.equals(emqxProperties.getTrustAll())) { - log.warn("[createMqttClient][安全警告:当前配置信任所有 SSL 证书(trustAll=true),这在生产环境中存在严重安全风险!]"); - } - - // 2. 创建客户端实例 - this.mqttClient = MqttClient.create(vertx, options); - } - - /** - * 设置 MQTT 处理器 - */ - private void setupMqttHandlers() { - // 1. 设置断开重连监听器 - mqttClient.closeHandler(closeEvent -> { - if (!running) { - return; - } - log.warn("[closeHandler][MQTT 连接已断开, 准备重连]"); - reconnectWithDelay(); - }); - - // 2. 设置异常处理器 - mqttClient.exceptionHandler(exception -> - log.error("[exceptionHandler][MQTT 客户端异常]", exception)); - - // 3. 设置消息处理器 - mqttClient.publishHandler(upstreamHandler::handle); - } - - /** - * 订阅设备上行消息主题 - */ - private void subscribeToTopics() { - // 1. 校验 MQTT 客户端是否连接 - List topicList = emqxProperties.getMqttTopics(); - if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[subscribeToTopics][MQTT 客户端未连接, 跳过订阅]"); - return; - } - - // 2. 批量订阅所有主题 - Map topics = new HashMap<>(); - int qos = emqxProperties.getMqttQos(); - for (String topic : topicList) { - topics.put(topic, qos); - } - mqttClient.subscribe(topics, subscribeResult -> { - if (subscribeResult.succeeded()) { - log.info("[subscribeToTopics][订阅主题成功, 共 {} 个主题]", topicList.size()); - } else { - log.error("[subscribeToTopics][订阅主题失败, 共 {} 个主题, 原因: {}]", - topicList.size(), subscribeResult.cause().getMessage(), subscribeResult.cause()); - } - }); - } - - /** - * 发布消息到 MQTT Broker - * - * @param topic 主题 - * @param payload 消息内容 - */ - public void publishMessage(String topic, byte[] payload) { - if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[publishMessage][MQTT 客户端未连接, 无法发布消息]"); - return; - } - MqttQoS qos = MqttQoS.valueOf(emqxProperties.getMqttQos()); - mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java index a05fd1120a..77f777cafa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java @@ -5,7 +5,7 @@ import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; @@ -21,13 +21,13 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class IotEmqxDownstreamHandler { - private final IotEmqxUpstreamProtocol protocol; + private final IotEmqxProtocol protocol; private final IotDeviceService deviceService; private final IotDeviceMessageService deviceMessageService; - public IotEmqxDownstreamHandler(IotEmqxUpstreamProtocol protocol) { + public IotEmqxDownstreamHandler(IotEmqxProtocol protocol) { this.protocol = protocol; this.deviceService = SpringUtil.getBean(IotDeviceService.class); this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); @@ -74,4 +74,4 @@ public class IotEmqxDownstreamHandler { return IotMqttTopicUtils.buildTopicByMethod(message.getMethod(), productKey, deviceName, isReply); } -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java index bcce471987..55aaaac69c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java @@ -3,9 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol; import lombok.extern.slf4j.Slf4j; /** @@ -18,22 +16,11 @@ public class IotEmqxDownstreamSubscriber extends IotProtocolDownstreamSubscriber private final IotEmqxDownstreamHandler downstreamHandler; - public IotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol protocol, IotMessageBus messageBus) { + public IotEmqxDownstreamSubscriber(IotEmqxProtocol protocol, IotMessageBus messageBus) { super(protocol, messageBus); this.downstreamHandler = new IotEmqxDownstreamHandler(protocol); } - @PostConstruct - public void startSubscriber() { - // EMQX 模式下,由 Spring 管理 Bean 生命周期;需要显式启动订阅者,才能从消息总线消费下行消息并发布到 Broker - start(); - } - - @PreDestroy - public void stopSubscriber() { - stop(); - } - @Override protected void handleMessage(IotDeviceMessage message) { downstreamHandler.handle(message); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java index ae548cc4b6..0ba250cb1a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java @@ -10,16 +10,20 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; +import java.util.Locale; + /** * IoT 网关 EMQX 认证事件处理器 *

* 为 EMQX 提供 HTTP 接口服务,包括: - * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 - * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 + * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 {@link #handleAuth(RoutingContext)} + * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 {@link #handleEvent(RoutingContext)} + * 3. 设备 ACL 权限接口 - 对应 EMQX HTTP ACL 插件 {@link #handleAcl(RoutingContext)} * * @author 芋道源码 */ @@ -45,15 +49,17 @@ public class IotEmqxAuthEventHandler { private static final String RESULT_IGNORE = "ignore"; /** - * EMQX 事件类型常量 + * EMQX 事件类型常量 - 客户端连接 */ private static final String EVENT_CLIENT_CONNECTED = "client.connected"; + /** + * EMQX 事件类型常量 - 客户端断开连接 + */ private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected"; private final String serverId; private final IotDeviceMessageService deviceMessageService; - private final IotDeviceCommonApi deviceApi; public IotEmqxAuthEventHandler(String serverId) { @@ -62,13 +68,16 @@ public class IotEmqxAuthEventHandler { this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); } + // ========== 认证处理 ========== + /** * EMQX 认证接口 */ public void handleAuth(RoutingContext context) { + JsonObject body = null; try { // 1. 参数校验 - JsonObject body = parseRequestBody(context); + body = parseRequestBody(context); if (body == null) { return; } @@ -91,11 +100,179 @@ public class IotEmqxAuthEventHandler { sendAuthResponse(context, RESULT_DENY); } } catch (Exception e) { - log.error("[handleAuth][设备认证异常]", e); + log.error("[handleAuth][设备认证异常][body={}]", body, e); sendAuthResponse(context, RESULT_IGNORE); } } + /** + * 解析认证接口请求体 + *

+ * 认证接口解析失败时返回 JSON 格式响应(包含 result 字段) + * + * @param context 路由上下文 + * @return 请求体JSON对象,解析失败时返回null + */ + private JsonObject parseRequestBody(RoutingContext context) { + try { + JsonObject body = context.body().asJsonObject(); + if (body == null) { + log.info("[parseRequestBody][请求体为空]"); + sendAuthResponse(context, RESULT_IGNORE); + return null; + } + return body; + } catch (Exception e) { + log.error("[parseRequestBody][body({}) 解析请求体失败]", context.body().asString(), e); + sendAuthResponse(context, RESULT_IGNORE); + return null; + } + } + + /** + * 执行设备认证 + * + * @param clientId 客户端ID + * @param username 用户名 + * @param password 密码 + * @return 认证是否成功 + */ + private boolean handleDeviceAuth(String clientId, String username, String password) { + try { + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(clientId).setUsername(username).setPassword(password)); + result.checkError(); + return BooleanUtil.isTrue(result.getData()); + } catch (Exception e) { + log.error("[handleDeviceAuth][设备({}) 认证接口调用失败]", username, e); + throw e; + } + } + + /** + * 发送 EMQX 认证响应 + * 根据 EMQX 官方文档要求,必须返回 JSON 格式响应 + * + * @param context 路由上下文 + * @param result 认证结果:allow、deny、ignore + */ + private void sendAuthResponse(RoutingContext context, String result) { + // 构建符合 EMQX 官方规范的响应 + JsonObject response = new JsonObject() + .put("result", result) + .put("is_superuser", false); + // 可以根据业务需求添加客户端属性 + // response.put("client_attrs", new JsonObject().put("role", "device")); + // 可以添加认证过期时间(可选) + // response.put("expire_at", System.currentTimeMillis() / 1000 + 3600); + + // 回复响应 + context.response() + .setStatusCode(SUCCESS_STATUS_CODE) + .putHeader("Content-Type", "application/json; charset=utf-8") + .end(response.encode()); + } + + // ========== ACL 处理 ========== + + /** + * EMQX ACL 接口 + *

+ * 用于 EMQX 的 HTTP ACL 插件校验设备的 publish/subscribe 权限。 + * 若请求参数无法识别,则返回 ignore 交给 EMQX 自身 ACL 规则处理。 + */ + public void handleAcl(RoutingContext context) { + JsonObject body = null; + try { + // 1.1 解析请求体 + body = parseRequestBody(context); + if (body == null) { + return; + } + String username = body.getString("username"); + String topic = body.getString("topic"); + if (StrUtil.hasBlank(username, topic)) { + log.info("[handleAcl][ACL 参数不完整: username={}, topic={}]", username, topic); + sendAuthResponse(context, RESULT_IGNORE); + return; + } + // 1.2 解析设备身份 + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); + if (deviceInfo == null) { + sendAuthResponse(context, RESULT_IGNORE); + return; + } + // 1.3 解析 ACL 动作(兼容多种 EMQX 版本/插件字段) + Boolean subscribe = parseAclSubscribeFlag(body); + if (subscribe == null) { + sendAuthResponse(context, RESULT_IGNORE); + return; + } + + // 2. 执行 ACL 校验 + boolean allowed = subscribe + ? IotMqttTopicUtils.isTopicSubscribeAllowed(topic, deviceInfo.getProductKey(), deviceInfo.getDeviceName()) + : IotMqttTopicUtils.isTopicPublishAllowed(topic, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + sendAuthResponse(context, allowed ? RESULT_ALLOW : RESULT_DENY); + } catch (Exception e) { + log.error("[handleAcl][ACL 处理失败][body={}]", body, e); + sendAuthResponse(context, RESULT_IGNORE); + } + } + + /** + * 解析 ACL 动作类型:订阅/发布 + * + * @param body ACL 请求体 + * @return true 订阅;false 发布;null 不识别 + */ + private static Boolean parseAclSubscribeFlag(JsonObject body) { + // 1. action 字段(常见为 publish/subscribe) + String action = body.getString("action"); + if (StrUtil.isNotBlank(action)) { + String lower = action.toLowerCase(Locale.ROOT); + if (lower.contains("sub")) { + return true; + } + if (lower.contains("pub")) { + return false; + } + } + + // 2. access 字段:可能是数字或字符串 + Integer access = body.getInteger("access"); + if (access != null) { + if (access == 1) { + return true; + } + if (access == 2) { + return false; + } + } + String accessText = body.getString("access"); + if (StrUtil.isNotBlank(accessText)) { + String lower = accessText.toLowerCase(Locale.ROOT); + if (lower.contains("sub")) { + return true; + } + if (lower.contains("pub")) { + return false; + } + if (StrUtil.isNumeric(accessText)) { + int value = Integer.parseInt(accessText); + if (value == 1) { + return true; + } + if (value == 2) { + return false; + } + } + } + return null; + } + + // ========== 事件处理 ========== + /** * EMQX 统一事件处理接口:根据 EMQX 官方 Webhook 设计,统一处理所有客户端事件 * 支持的事件类型:client.connected、client.disconnected 等 @@ -124,58 +301,15 @@ public class IotEmqxAuthEventHandler { break; } - // EMQX Webhook 只需要 200 状态码,无需响应体 + // 3. EMQX Webhook 只需要 200 状态码,无需响应体 context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } catch (Exception e) { - log.error("[handleEvent][事件处理失败][body={}]", body != null ? body.encode() : "null", e); - // 即使处理失败,也返回 200 避免EMQX重试 + log.error("[handleEvent][事件处理失败][body={}]", body, e); + // 即使处理失败,也返回 200 避免 EMQX 重试 context.response().setStatusCode(SUCCESS_STATUS_CODE).end(); } } - /** - * 处理客户端连接事件 - */ - private void handleClientConnected(JsonObject body) { - String username = body.getString("username"); - log.info("[handleClientConnected][设备上线: {}]", username); - handleDeviceStateChange(username, true); - } - - /** - * 处理客户端断开连接事件 - */ - private void handleClientDisconnected(JsonObject body) { - String username = body.getString("username"); - String reason = body.getString("reason"); - log.info("[handleClientDisconnected][设备下线: {} ({})]", username, reason); - handleDeviceStateChange(username, false); - } - - /** - * 解析认证接口请求体 - *

- * 认证接口解析失败时返回 JSON 格式响应(包含 result 字段) - * - * @param context 路由上下文 - * @return 请求体JSON对象,解析失败时返回null - */ - private JsonObject parseRequestBody(RoutingContext context) { - try { - JsonObject body = context.body().asJsonObject(); - if (body == null) { - log.info("[parseRequestBody][请求体为空]"); - sendAuthResponse(context, RESULT_IGNORE); - return null; - } - return body; - } catch (Exception e) { - log.error("[parseRequestBody][body({}) 解析请求体失败]", context.body().asString(), e); - sendAuthResponse(context, RESULT_IGNORE); - return null; - } - } - /** * 解析事件接口请求体 *

@@ -201,23 +335,22 @@ public class IotEmqxAuthEventHandler { } /** - * 执行设备认证 - * - * @param clientId 客户端ID - * @param username 用户名 - * @param password 密码 - * @return 认证是否成功 + * 处理客户端连接事件 */ - private boolean handleDeviceAuth(String clientId, String username, String password) { - try { - CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(clientId).setUsername(username).setPassword(password)); - result.checkError(); - return BooleanUtil.isTrue(result.getData()); - } catch (Exception e) { - log.error("[handleDeviceAuth][设备({}) 认证接口调用失败]", username, e); - throw e; - } + private void handleClientConnected(JsonObject body) { + String username = body.getString("username"); + log.info("[handleClientConnected][设备上线: {}]", username); + handleDeviceStateChange(username, true); + } + + /** + * 处理客户端断开连接事件 + */ + private void handleClientDisconnected(JsonObject body) { + String username = body.getString("username"); + String reason = body.getString("reason"); + log.info("[handleClientDisconnected][设备下线: {} ({})]", username, reason); + handleDeviceStateChange(username, false); } /** @@ -247,29 +380,4 @@ public class IotEmqxAuthEventHandler { } } - /** - * 发送 EMQX 认证响应 - * 根据 EMQX 官方文档要求,必须返回 JSON 格式响应 - * - * @param context 路由上下文 - * @param result 认证结果:allow、deny、ignore - */ - private void sendAuthResponse(RoutingContext context, String result) { - // 构建符合 EMQX 官方规范的响应 - JsonObject response = new JsonObject() - .put("result", result) - .put("is_superuser", false); - - // 可以根据业务需求添加客户端属性 - // response.put("client_attrs", new JsonObject().put("role", "device")); - - // 可以添加认证过期时间(可选) - // response.put("expire_at", System.currentTimeMillis() / 1000 + 3600); - - context.response() - .setStatusCode(SUCCESS_STATUS_CODE) - .putHeader("Content-Type", "application/json; charset=utf-8") - .end(response.encode()); - } - -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java index 5ff8d120dc..4c2fa488f9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java @@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; @@ -20,16 +19,16 @@ public class IotEmqxUpstreamHandler { private final String serverId; - public IotEmqxUpstreamHandler(IotEmqxUpstreamProtocol protocol) { + public IotEmqxUpstreamHandler(String serverId) { this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); - this.serverId = protocol.getServerId(); + this.serverId = serverId; } /** * 处理 MQTT 发布消息 */ public void handle(MqttPublishMessage mqttMessage) { - log.info("[handle][收到 MQTT 消息, topic: {}, payload: {}]", mqttMessage.topicName(), mqttMessage.payload()); + log.debug("[handle][收到 MQTT 消息, topic: {}, payload: {}]", mqttMessage.topicName(), mqttMessage.payload()); String topic = mqttMessage.topicName(); byte[] payload = mqttMessage.payload().getBytes(); try { @@ -57,4 +56,4 @@ public class IotEmqxUpstreamHandler { } } -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index 2a842966fe..249b31544f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -38,6 +38,12 @@ public final class IotMqttTopicUtils { */ public static final String MQTT_EVENT_PATH = "/mqtt/event"; + /** + * MQTT ACL 接口路径 + * 对应 EMQX HTTP ACL 插件的 ACL 请求接口 + */ + public static final String MQTT_ACL_PATH = "/mqtt/acl"; + // ========== 工具方法 ========== /** @@ -85,4 +91,28 @@ public final class IotMqttTopicUtils { || topic.equals(SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/#"); } -} \ No newline at end of file + /** + * 校验主题是否允许发布 + *

+ * 规则:主题必须以 /sys/{productKey}/{deviceName}/ 开头,且不允许包含通配符(+/#)。 + * + * @param topic 发布的主题 + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 是否允许发布 + */ + // TODO DONE @AI:这个逻辑,是不是 mqtt 协议,也要使用???答:是通用工具方法,MQTT 协议可按需调用; + // TODO @AI:那你改下 mqtt,也调用!!! + public static boolean isTopicPublishAllowed(String topic, String productKey, String deviceName) { + if (!StrUtil.isAllNotBlank(topic, productKey, deviceName)) { + return false; + } + // MQTT publish topic 不允许包含通配符,但这里做一次兜底校验 + if (topic.contains("#") || topic.contains("+")) { + return false; + } + String deviceTopicPrefix = SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/"; + return topic.startsWith(deviceTopicPrefix); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 4916c0d238..aaf19dc1c7 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -125,43 +125,49 @@ yudao: max-message-size: 8192 # 最大消息大小(字节) connect-timeout-seconds: 60 # 连接超时时间(秒) ssl-enabled: false # 是否启用 SSL - - # 协议配置(旧版,保持兼容) - protocol: # ==================================== # 针对引入的 EMQX 组件的配置 # ==================================== - emqx: + - id: emqx-1 enabled: true - http-port: 8090 # MQTT HTTP 服务端口 - mqtt-host: 127.0.0.1 # MQTT Broker 地址 - mqtt-port: 1883 # MQTT Broker 端口 - mqtt-username: admin # MQTT 用户名 - mqtt-password: public # MQTT 密码 - mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID - mqtt-ssl: false # 是否开启 SSL - mqtt-topics: - - "/sys/#" # 系统主题 - clean-session: true # 是否启用 Clean Session (默认: true) - keep-alive-interval-seconds: 60 # 心跳间隔,单位秒 (默认: 60) - max-inflight-queue: 10000 # 最大飞行消息队列,单位:条 - connect-timeout-seconds: 10 # 连接超时,单位:秒 - # 是否信任所有 SSL 证书 (默认: false)。警告:生产环境必须为 false! - # 仅在开发环境或内网测试时,如果使用了自签名证书,可以临时设置为 true - trust-all: true # 在 dev 环境可以设为 true - # 遗嘱消息配置 (用于网关异常下线时通知其他系统) - will: - enabled: true # 生产环境强烈建议开启 - topic: "gateway/status/${yudao.iot.gateway.emqx.mqtt-client-id}" # 遗嘱消息主题 - payload: "offline" # 遗嘱消息负载 - qos: 1 # 遗嘱消息 QoS - retain: true # 遗嘱消息是否保留 - # 高级 SSL/TLS 配置 (当 trust-all: false 且 mqtt-ssl: true 时生效) - ssl-options: - key-store-path: "classpath:certs/client.jks" # 客户端证书库路径 - key-store-password: "your-keystore-password" # 客户端证书库密码 - trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 - trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 + protocol: emqx + port: 8090 # EMQX HTTP Hook 端口(/mqtt/auth、/mqtt/event) + emqx: + mqtt-host: 127.0.0.1 # MQTT Broker 地址 + mqtt-port: 1883 # MQTT Broker 端口 + mqtt-username: admin # MQTT 用户名 + mqtt-password: public # MQTT 密码 + mqtt-client-id: iot-gateway-mqtt # MQTT 客户端 ID + mqtt-ssl: false # 是否开启 SSL + mqtt-topics: + - "/sys/#" # 系统主题 + mqtt-qos: 1 # 默认 QoS + clean-session: true # 是否启用 Clean Session (默认: true) + keep-alive-interval-seconds: 60 # 心跳间隔,单位秒 (默认: 60) + max-inflight-queue: 10000 # 最大飞行消息队列,单位:条 + connect-timeout-seconds: 10 # 连接超时,单位:秒 + reconnect-delay-ms: 5000 # 重连延迟,单位:毫秒 + # 是否信任所有 SSL 证书 (默认: false)。警告:生产环境必须为 false! + # 仅在开发环境或内网测试时,如果使用了自签名证书,可以临时设置为 true + trust-all: true # 在 dev 环境可以设为 true + # EMQX HTTP Hook 回调网关的 HTTPS 配置(可选) + http: + ssl-enabled: false + # ssl-cert-path: "path/to/server.crt" + # ssl-key-path: "path/to/server.key" + # 遗嘱消息配置 (用于网关异常下线时通知其他系统) + will: + enabled: true # 生产环境强烈建议开启 + topic: "gateway/status/iot-gateway-mqtt" # 遗嘱消息主题 + payload: "offline" # 遗嘱消息负载 + qos: 1 # 遗嘱消息 QoS + retain: true # 遗嘱消息是否保留 + # 高级 SSL/TLS 配置 (当 trust-all: false 且 mqtt-ssl: true 时生效) + ssl-options: + key-store-path: "classpath:certs/client.jks" # 客户端证书库路径 + key-store-password: "your-keystore-password" # 客户端证书库密码 + trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 + trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 --- #################### 日志相关配置 #################### @@ -181,7 +187,6 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.emqx: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG - cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.websocket: DEBUG # 根日志级别 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotDirectDeviceEmqxProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotDirectDeviceEmqxProtocolIntegrationTest.java deleted file mode 100644 index a2e85919a5..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotDirectDeviceEmqxProtocolIntegrationTest.java +++ /dev/null @@ -1,437 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.http.HttpResponse; -import cn.hutool.http.HttpUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.mqtt.MqttClient; -import io.vertx.mqtt.MqttClientOptions; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.TimeUnit; - -/** - * IoT 直连设备 EMQX 协议集成测试(手动测试) - * - *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 EMQX Broker 连接平台 - * - *

EMQX 协议架构: - *

- *     +--------+       MQTT        +-------------+       HTTP Hook        +---------+
- *     | 设备   | ----------------> | EMQX Broker | --------------------> | 网关    |
- *     +--------+                   +-------------+                        +---------+
- *         |                              |                                     |
- *         | 1. 连接认证                   | 2. 调用 /mqtt/auth                   |
- *         | 3. 发布消息                   | 4. 调用 /mqtt/event (上线/下线)       |
- *         |                              | 5. 网关订阅 EMQX 消息                 |
- *         |                              |                                     |
- * 
- * - *

测试分类: - *

- * - *

使用步骤: - *

    - *
  1. 启动 EMQX Broker(MQTT 端口 1883)
  2. - *
  3. 启动 yudao-module-iot-gateway 服务(HTTP 端口 18083)
  4. - *
  5. 配置 EMQX HTTP 认证插件指向网关的 /mqtt/auth 接口
  6. - *
  7. 配置 EMQX Webhook 插件指向网关的 /mqtt/event 接口
  8. - *
  9. 运行测试方法
  10. - *
- * - * @author 芋道源码 - */ -@Slf4j -@Disabled -@SuppressWarnings("HttpUrlsUsage") -public class IotDirectDeviceEmqxProtocolIntegrationTest { - - private static final String SERVER_HOST = "127.0.0.1"; - /** - * EMQX 认证事件 HTTP 接口端口(网关提供给 EMQX Server 调用) - */ - private static final int HTTP_PORT = 18083; - /** - * EMQX Broker MQTT 端口(设备连接 EMQX) - */ - private static final int MQTT_PORT = 1883; - private static final int TIMEOUT_SECONDS = 10; - - private static Vertx vertx; - - // ===================== 编解码器(EMQX 使用 Alink 协议) ===================== - - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); - - // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== - - private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; - private static final String DEVICE_NAME = "small"; - private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; - - @BeforeAll - public static void setUp() { - vertx = Vertx.vertx(); - } - - @AfterAll - public static void tearDown() { - if (vertx != null) { - vertx.close(); - } - } - - // ================================================================================== - // 第一部分:模拟设备连接 EMQX Broker - // ================================================================================== - - /** - * 设备连接测试:模拟设备连接 EMQX Broker - *

- * 当设备连接 EMQX 时,EMQX 会自动调用网关的 /mqtt/auth 接口进行认证 - */ - @Test - public void testDeviceConnect() throws Exception { - // 1. 构建认证信息 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - log.info("[testDeviceConnect][认证信息: clientId={}, username={}, password={}]", - authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); - - // 2. 创建客户端并连接 EMQX Broker - MqttClient client = createClient(authInfo); - try { - client.connect(MQTT_PORT, SERVER_HOST) - .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - log.info("[testDeviceConnect][连接 EMQX Broker 成功,客户端 ID: {}]", client.clientId()); - log.info("[testDeviceConnect][EMQX 会自动调用网关的 /mqtt/auth 接口进行认证]"); - log.info("[testDeviceConnect][EMQX 会自动调用网关的 /mqtt/event 接口通知设备上线]"); - } finally { - disconnect(client); - log.info("[testDeviceConnect][EMQX 会自动调用网关的 /mqtt/event 接口通知设备下线]"); - } - } - - /** - * 属性上报测试:设备通过 EMQX Broker 发布属性消息 - *

- * 消息流程:设备 -> EMQX Broker -> 网关(订阅 EMQX 消息) - */ - @Test - public void testPropertyPost() throws Exception { - // 1. 连接 EMQX Broker - MqttClient client = connectToEmqx(); - log.info("[testPropertyPost][连接 EMQX Broker 成功]"); - - try { - // 2.1 构建属性上报消息 - IotDeviceMessage request = IotDeviceMessage.requestOf( - IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), - IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("width", 1) - .put("height", "2") - .build())); - - // 2.2 发布消息到 EMQX Broker - String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME); - publish(client, topic, request); - log.info("[testPropertyPost][属性上报消息已发送到 EMQX Broker]"); - log.info("[testPropertyPost][网关会通过订阅 EMQX 接收此消息]"); - - // 2.3 等待消息处理 - Thread.sleep(2000); - log.info("[testPropertyPost][请检查网关日志确认消息是否被正确处理]"); - } finally { - disconnect(client); - } - } - - /** - * 事件上报测试:设备通过 EMQX Broker 发布事件消息 - *

- * 消息流程:设备 -> EMQX Broker -> 网关(订阅 EMQX 消息) - */ - @Test - public void testEventPost() throws Exception { - // 1. 连接 EMQX Broker - MqttClient client = connectToEmqx(); - log.info("[testEventPost][连接 EMQX Broker 成功]"); - - try { - // 2.1 构建事件上报消息 - IotDeviceMessage request = IotDeviceMessage.requestOf( - IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), - IotDeviceEventPostReqDTO.of( - "eat", - MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis())); - - // 2.2 发布消息到 EMQX Broker - String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME); - publish(client, topic, request); - log.info("[testEventPost][事件上报消息已发送到 EMQX Broker]"); - log.info("[testEventPost][网关会通过订阅 EMQX 接收此消息]"); - - // 2.3 等待消息处理 - Thread.sleep(2000); - log.info("[testEventPost][请检查网关日志确认消息是否被正确处理]"); - } finally { - disconnect(client); - } - } - - /** - * 订阅下行消息测试:设备订阅服务端下发的消息 - *

- * 消息流程:网关 -> EMQX Broker -> 设备 - */ - @Test - public void testSubscribe() throws Exception { - // 1. 连接 EMQX Broker - MqttClient client = connectToEmqx(); - log.info("[testSubscribe][连接 EMQX Broker 成功]"); - - try { - // 2. 设置消息处理器 - client.publishHandler(message -> log.info("[testSubscribe][收到下行消息: topic={}, payload={}]", - message.topicName(), message.payload().toString())); - - // 3. 订阅下行主题 - String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME); - log.info("[testSubscribe][订阅主题: {}]", topic); - subscribe(client, topic); - log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]"); - log.info("[testSubscribe][网关下发的消息会通过 EMQX Broker 转发给设备]"); - - // 4. 保持连接 30 秒等待消息 - Thread.sleep(30000); - } finally { - disconnect(client); - } - } - - // ================================================================================== - // 第二部分:模拟 EMQX Server 调用网关 HTTP Hook 接口 - // 说明:这些接口是 EMQX Server 自动调用的,这里只是用于单独测试接口功能 - // ================================================================================== - - /** - * 认证接口测试:模拟 EMQX Server 调用 /mqtt/auth 接口 - *

- * 注意:正常情况下此接口由 EMQX HTTP 认证插件自动调用,这里只是测试接口本身 - */ - @Test - public void testEmqxAuthHook() { - // 1.1 构建请求 - String url = String.format("http://%s:%d/mqtt/auth", SERVER_HOST, HTTP_PORT); - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - // 1.2 EMQX 认证请求格式 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("clientid", authInfo.getClientId()) - .put("username", authInfo.getUsername()) - .put("password", authInfo.getPassword()) - .build()); - // 1.3 输出请求 - log.info("[testEmqxAuthHook][模拟 EMQX Server 调用认证接口]"); - log.info("[testEmqxAuthHook][请求 URL: {}]", url); - log.info("[testEmqxAuthHook][请求体: {}]", payload); - - // 2.1 发送请求 - try (HttpResponse httpResponse = HttpUtil.createPost(url) - .header("Content-Type", "application/json") - .body(payload) - .execute()) { - // 2.2 输出结果 - log.info("[testEmqxAuthHook][响应状态码: {}]", httpResponse.getStatus()); - log.info("[testEmqxAuthHook][响应体: {}]", httpResponse.body()); - log.info("[testEmqxAuthHook][认证结果: result=allow 表示认证成功, result=deny 表示认证失败]"); - } - } - - /** - * 认证失败测试:模拟 EMQX Server 调用 /mqtt/auth 接口(错误密码) - */ - @Test - public void testEmqxAuthHookFailed() { - // 1.1 构建请求 - String url = String.format("http://%s:%d/mqtt/auth", SERVER_HOST, HTTP_PORT); - // 1.2 使用错误的密码 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("clientid", PRODUCT_KEY + "." + DEVICE_NAME) - .put("username", DEVICE_NAME + "&" + PRODUCT_KEY) - .put("password", "wrong_password") - .build()); - // 1.3 输出请求 - log.info("[testEmqxAuthHookFailed][模拟 EMQX Server 调用认证接口(错误密码)]"); - log.info("[testEmqxAuthHookFailed][请求 URL: {}]", url); - log.info("[testEmqxAuthHookFailed][请求体: {}]", payload); - - // 2.1 发送请求 - try (HttpResponse httpResponse = HttpUtil.createPost(url) - .header("Content-Type", "application/json") - .body(payload) - .execute()) { - // 2.2 输出结果 - log.info("[testEmqxAuthHookFailed][响应状态码: {}]", httpResponse.getStatus()); - log.info("[testEmqxAuthHookFailed][响应体: {}]", httpResponse.body()); - log.info("[testEmqxAuthHookFailed][预期结果: result=deny]"); - } - } - - /** - * 设备上线事件测试:模拟 EMQX Server Webhook 调用 /mqtt/event 接口 - *

- * 注意:正常情况下此接口由 EMQX Webhook 插件自动调用,这里只是测试接口本身 - */ - @Test - public void testEmqxClientConnectedHook() { - // 1.1 构建请求 - String url = String.format("http://%s:%d/mqtt/event", SERVER_HOST, HTTP_PORT); - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - // 1.2 EMQX Webhook client.connected 事件格式 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("event", "client.connected") - .put("clientid", authInfo.getClientId()) - .put("username", authInfo.getUsername()) - .put("peername", "127.0.0.1:12345") - .put("connected_at", System.currentTimeMillis()) - .build()); - // 1.3 输出请求 - log.info("[testEmqxClientConnectedHook][模拟 EMQX Server Webhook 调用设备上线事件]"); - log.info("[testEmqxClientConnectedHook][请求 URL: {}]", url); - log.info("[testEmqxClientConnectedHook][请求体: {}]", payload); - - // 2.1 发送请求 - try (HttpResponse httpResponse = HttpUtil.createPost(url) - .header("Content-Type", "application/json") - .body(payload) - .execute()) { - // 2.2 输出结果 - log.info("[testEmqxClientConnectedHook][响应状态码: {}]", httpResponse.getStatus()); - log.info("[testEmqxClientConnectedHook][响应体: {}]", httpResponse.body()); - log.info("[testEmqxClientConnectedHook][预期结果: 状态码 200,设备状态更新为在线]"); - } - } - - /** - * 设备下线事件测试:模拟 EMQX Server Webhook 调用 /mqtt/event 接口 - *

- * 注意:正常情况下此接口由 EMQX Webhook 插件自动调用,这里只是测试接口本身 - */ - @Test - public void testEmqxClientDisconnectedHook() { - // 1.1 构建请求 - String url = String.format("http://%s:%d/mqtt/event", SERVER_HOST, HTTP_PORT); - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - // 1.2 EMQX Webhook client.disconnected 事件格式 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("event", "client.disconnected") - .put("clientid", authInfo.getClientId()) - .put("username", authInfo.getUsername()) - .put("reason", "normal") - .put("disconnected_at", System.currentTimeMillis()) - .build()); - // 1.3 输出请求 - log.info("[testEmqxClientDisconnectedHook][模拟 EMQX Server Webhook 调用设备下线事件]"); - log.info("[testEmqxClientDisconnectedHook][请求 URL: {}]", url); - log.info("[testEmqxClientDisconnectedHook][请求体: {}]", payload); - - // 2.1 发送请求 - try (HttpResponse httpResponse = HttpUtil.createPost(url) - .header("Content-Type", "application/json") - .body(payload) - .execute()) { - // 2.2 输出结果 - log.info("[testEmqxClientDisconnectedHook][响应状态码: {}]", httpResponse.getStatus()); - log.info("[testEmqxClientDisconnectedHook][响应体: {}]", httpResponse.body()); - log.info("[testEmqxClientDisconnectedHook][预期结果: 状态码 200,设备状态更新为离线]"); - } - } - - // ===================== 辅助方法 ===================== - - /** - * 创建 MQTT 客户端 - * - * @param authInfo 认证信息 - * @return MQTT 客户端 - */ - private MqttClient createClient(IotDeviceAuthReqDTO authInfo) { - MqttClientOptions options = new MqttClientOptions() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()) - .setCleanSession(true) - .setKeepAliveInterval(60); - return MqttClient.create(vertx, options); - } - - /** - * 连接 EMQX Broker 并认证设备 - * - * @return 已认证的 MQTT 客户端 - */ - private MqttClient connectToEmqx() throws Exception { - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - MqttClient client = createClient(authInfo); - client.connect(MQTT_PORT, SERVER_HOST) - .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - return client; - } - - /** - * 订阅主题 - * - * @param client MQTT 客户端 - * @param topic 主题 - */ - private void subscribe(MqttClient client, String topic) throws Exception { - client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) - .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - log.info("[subscribe][订阅主题成功: {}]", topic); - } - - /** - * 发布消息 - * - * @param client MQTT 客户端 - * @param topic 发布主题 - * @param request 请求消息 - */ - private void publish(MqttClient client, String topic, IotDeviceMessage request) throws Exception { - byte[] payload = CODEC.encode(request); - log.info("[publish][发送消息: topic={}, payload={}]", topic, new String(payload)); - client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) - .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - log.info("[publish][消息发布成功]"); - } - - /** - * 断开连接 - * - * @param client MQTT 客户端 - */ - private void disconnect(MqttClient client) throws Exception { - client.disconnect() - .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - log.info("[disconnect][断开连接成功]"); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java new file mode 100644 index 0000000000..d7d4535458 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java @@ -0,0 +1,18 @@ +/** + * IoT 网关 EMQX 协议集成测试包 + * + *

+ * 测试类直接使用 mqtt 包下的单测即可,因为设备都是通过 MQTT 协议连接 EMQX Broker。 + * + * @see cn.iocoder.yudao.module.iot.gateway.protocol.mqtt + * + *

架构

+ *
+ * +--------+      MQTT       +-------------+     HTTP Hook     +---------+
+ * |  设备  | --------------> | EMQX Broker | ----------------> |  网关   |
+ * +--------+                 +-------------+                   +---------+
+ * 
+ * + * @author 芋道源码 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;