diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java index 226e421024..5072159256 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java @@ -176,7 +176,8 @@ public class IotEmqxProtocol implements IotProtocol { // 2.4 关闭 Vertx if (vertx != null) { try { - vertx.close().result(); + vertx.close().toCompletionStage().toCompletableFuture() + .get(10, TimeUnit.SECONDS); log.info("[stop][IoT EMQX 协议 {} Vertx 已关闭]", getId()); } catch (Exception e) { log.error("[stop][IoT EMQX 协议 {} Vertx 关闭失败]", getId(), e); @@ -195,7 +196,7 @@ public class IotEmqxProtocol implements IotProtocol { private void startHttpServer() { // 1. 创建路由 Router router = Router.router(vertx); - router.route().handler(BodyHandler.create()); + router.route().handler(BodyHandler.create().setBodyLimit(1024 * 1024)); // 限制 body 大小为 1MB,防止大包攻击 // 2. 创建处理器 IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId); @@ -218,12 +219,13 @@ public class IotEmqxProtocol implements IotProtocol { httpServer = vertx.createHttpServer(options) .requestHandler(router) .listen() - .result(); + .toCompletionStage().toCompletableFuture() + .get(10, TimeUnit.SECONDS); log.info("[startHttpServer][IoT EMQX 协议 {} HTTP Hook 服务启动成功, port: {}, ssl: {}]", getId(), properties.getPort(), httpConfig != null && Boolean.TRUE.equals(httpConfig.getSslEnabled())); } catch (Exception e) { log.error("[startHttpServer][IoT EMQX 协议 {} HTTP Hook 服务启动失败, port: {}]", getId(), properties.getPort(), e); - throw e; + throw new RuntimeException("HTTP Hook 服务启动失败", e); } } @@ -232,7 +234,8 @@ public class IotEmqxProtocol implements IotProtocol { return; } try { - httpServer.close().result(); + httpServer.close().toCompletionStage().toCompletableFuture() + .get(5, TimeUnit.SECONDS); log.info("[stopHttpServer][IoT EMQX 协议 {} HTTP Hook 服务已停止]", getId()); } catch (Exception e) { log.error("[stopHttpServer][IoT EMQX 协议 {} HTTP Hook 服务停止失败]", getId(), e); @@ -258,19 +261,21 @@ public class IotEmqxProtocol implements IotProtocol { private void stopMqttClient() { MqttClient client = this.mqttClient; - if (client == null || !client.isConnected()) { + this.mqttClient = null; // 先清理引用 + if (client == null) { return; } - this.mqttClient = null; - // 1. 批量取消订阅 - List topicList = emqxConfig.getMqttTopics(); - if (CollUtil.isNotEmpty(topicList)) { - try { - client.unsubscribe(topicList).toCompletionStage().toCompletableFuture() - .get(5, TimeUnit.SECONDS); - } catch (Exception e) { - log.warn("[stopMqttClient][IoT EMQX 协议 {} 取消订阅异常]", getId(), e); + // 1. 批量取消订阅(仅在连接时) + if (client.isConnected()) { + List topicList = emqxConfig.getMqttTopics(); + if (CollUtil.isNotEmpty(topicList)) { + try { + client.unsubscribe(topicList).toCompletionStage().toCompletableFuture() + .get(5, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("[stopMqttClient][IoT EMQX 协议 {} 取消订阅异常]", getId(), e); + } } } @@ -296,15 +301,15 @@ public class IotEmqxProtocol implements IotProtocol { .setClientId(emqxConfig.getMqttClientId()) .setUsername(emqxConfig.getMqttUsername()) .setPassword(emqxConfig.getMqttPassword()) - .setSsl(emqxConfig.getMqttSsl()) - .setCleanSession(emqxConfig.getCleanSession()) + .setSsl(Boolean.TRUE.equals(emqxConfig.getMqttSsl())) + .setCleanSession(Boolean.TRUE.equals(emqxConfig.getCleanSession())) .setKeepAliveInterval(emqxConfig.getKeepAliveIntervalSeconds()) .setMaxInflightQueue(emqxConfig.getMaxInflightQueue()); options.setConnectTimeout(emqxConfig.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒 - options.setTrustAll(emqxConfig.getTrustAll()); + options.setTrustAll(Boolean.TRUE.equals(emqxConfig.getTrustAll())); // 1.2 配置遗嘱消息 IotEmqxConfig.Will will = emqxConfig.getWill(); - if (will.isEnabled()) { + if (will != null && will.isEnabled()) { Assert.notBlank(will.getTopic(), "遗嘱消息主题(emqx.will.topic)不能为空"); Assert.notNull(will.getPayload(), "遗嘱消息内容(emqx.will.payload)不能为空"); options.setWillFlag(true) @@ -313,9 +318,11 @@ public class IotEmqxProtocol implements IotProtocol { .setWillQoS(will.getQos()) .setWillRetain(will.isRetain()); } - // 1.3 配置高级 SSL/TLS(仅在启用 SSL 且不信任所有证书时生效) - if (Boolean.TRUE.equals(emqxConfig.getMqttSsl()) && !Boolean.TRUE.equals(emqxConfig.getTrustAll())) { - IotEmqxConfig.Ssl sslOptions = emqxConfig.getSslOptions(); + // 1.3 配置高级 SSL/TLS(仅在启用 SSL 且不信任所有证书时生效,且需要 sslOptions 非空) + IotEmqxConfig.Ssl sslOptions = emqxConfig.getSslOptions(); + if (Boolean.TRUE.equals(emqxConfig.getMqttSsl()) + && Boolean.FALSE.equals(emqxConfig.getTrustAll()) + && sslOptions != null) { if (StrUtil.isNotBlank(sslOptions.getTrustStorePath())) { options.setTrustStoreOptions(new JksOptions() .setPath(sslOptions.getTrustStorePath()) @@ -365,10 +372,11 @@ public class IotEmqxProtocol implements IotProtocol { */ private void closeMqttClient() { MqttClient oldClient = this.mqttClient; - if (oldClient == null || !oldClient.isConnected()) { + this.mqttClient = null; // 先清理引用 + if (oldClient == null) { return; } - this.mqttClient = null; + // 尽力释放(无论是否连接都尝试 disconnect) try { oldClient.disconnect().toCompletionStage().toCompletableFuture() .get(5, TimeUnit.SECONDS); @@ -391,7 +399,11 @@ public class IotEmqxProtocol implements IotProtocol { return; } log.info("[startMqttClientReconnectChecker][IoT EMQX 协议 {} 检测到断开,尝试重连]", getId()); - tryReconnectMqttClient(); + // 用 executeBlocking 避免阻塞 event-loop(tryReconnectMqttClient 内部有同步等待) + vertx.executeBlocking(() -> { + tryReconnectMqttClient(); + return null; + }); }); } @@ -449,7 +461,11 @@ public class IotEmqxProtocol implements IotProtocol { return; } log.warn("[setupMqttClientHandlers][IoT EMQX 协议 {} 连接断开,立即尝试重连]", getId()); - vertx.runOnContext(v -> tryReconnectMqttClient()); + // 用 executeBlocking 避免阻塞 event-loop(tryReconnectMqttClient 内部有同步等待) + vertx.executeBlocking(() -> { + tryReconnectMqttClient(); + return null; + }); }); // 2. 异常处理 @@ -497,7 +513,8 @@ public class IotEmqxProtocol implements IotProtocol { return; } MqttQoS qos = MqttQoS.valueOf(emqxConfig.getMqttQos()); - mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false); + mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false) + .onFailure(e -> log.error("[publishMessage][IoT EMQX 协议 {} 发布失败, topic: {}]", getId(), topic, e)); } }